This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new de780e0d08 Merge system tests for CloudSQLExportInstanceOperator into
one test (#33315)
de780e0d08 is described below
commit de780e0d08f9516d368930547a5844520617ffe1
Author: VladaZakharova <[email protected]>
AuthorDate: Mon Aug 14 01:21:29 2023 +0200
Merge system tests for CloudSQLExportInstanceOperator into one test (#33315)
---
.../operators/cloud/cloud_sql.rst | 2 +-
.../google/cloud/cloud_sql/example_cloud_sql.py | 49 ++++--
.../cloud_sql/example_cloud_sql_deferrable.py | 184 ---------------------
3 files changed, 35 insertions(+), 200 deletions(-)
diff --git a/docs/apache-airflow-providers-google/operators/cloud/cloud_sql.rst
b/docs/apache-airflow-providers-google/operators/cloud/cloud_sql.rst
index afdedcac74..9b859c63db 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/cloud_sql.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/cloud_sql.rst
@@ -243,7 +243,7 @@ it will be retrieved from the Google Cloud connection used.
Both variants are sh
Also for all this action you can use operator in the deferrable mode:
-.. exampleinclude::
/../../tests/system/providers/google/cloud/cloud_sql/example_cloud_sql_deferrable.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloudsql_export_async]
diff --git a/tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py
b/tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py
index c0d4e277d4..408d11a334 100644
--- a/tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py
+++ b/tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py
@@ -51,12 +51,14 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
DAG_ID = "cloudsql"
-INSTANCE_NAME = f"{DAG_ID}-{ENV_ID}-instance"
-DB_NAME = f"{DAG_ID}-{ENV_ID}-db"
+INSTANCE_NAME = f"{DAG_ID}-{ENV_ID}-instance".replace("_", "-")
+DB_NAME = f"{DAG_ID}-{ENV_ID}-db".replace("_", "-")
-BUCKET_NAME = f"{DAG_ID}_{ENV_ID}_bucket"
-FILE_NAME = f"{DAG_ID}_{ENV_ID}_exportImportTestFile"
+BUCKET_NAME = f"{DAG_ID}_{ENV_ID}_bucket".replace("-", "_")
+FILE_NAME = f"{DAG_ID}_{ENV_ID}_exportImportTestFile".replace("-", "_")
+FILE_NAME_DEFERRABLE =
f"{DAG_ID}_{ENV_ID}_def_exportImportTestFile".replace("-", "_")
FILE_URI = f"gs://{BUCKET_NAME}/{FILE_NAME}"
+FILE_URI_DEFERRABLE = f"gs://{BUCKET_NAME}/{FILE_NAME_DEFERRABLE}"
FAILOVER_REPLICA_NAME = f"{INSTANCE_NAME}-failover-replica"
READ_REPLICA_NAME = f"{INSTANCE_NAME}-read-replica"
@@ -125,6 +127,14 @@ export_body = {
"offload": True,
}
}
+export_body_deferrable = {
+ "exportContext": {
+ "fileType": "sql",
+ "uri": FILE_URI_DEFERRABLE,
+ "sqlExportOptions": {"schemaOnly": False},
+ "offload": True,
+ }
+}
# [END howto_operator_cloudsql_export_body]
# [START howto_operator_cloudsql_import_body]
import_body = {"importContext": {"fileType": "sql", "uri": FILE_URI}}
@@ -210,6 +220,15 @@ with models.DAG(
)
# [END howto_operator_cloudsql_export]
+ # [START howto_operator_cloudsql_export_async]
+ sql_export_def_task = CloudSQLExportInstanceOperator(
+ body=export_body_deferrable,
+ instance=INSTANCE_NAME,
+ task_id="sql_export_def_task",
+ deferrable=True,
+ )
+ # [END howto_operator_cloudsql_export_async]
+
# For import to work we need to add the Cloud SQL instance's Service
Account
# read access to the target GCS object.
# [START howto_operator_cloudsql_import_gcs_permissions]
@@ -243,10 +262,12 @@ with models.DAG(
# [START howto_operator_cloudsql_db_delete]
sql_db_delete_task = CloudSQLDeleteInstanceDatabaseOperator(
- instance=INSTANCE_NAME, database=DB_NAME, task_id="sql_db_delete_task"
+ instance=INSTANCE_NAME,
+ database=DB_NAME,
+ task_id="sql_db_delete_task",
+ trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_operator_cloudsql_db_delete]
- sql_db_delete_task.trigger_rule = TriggerRule.ALL_DONE
# ############################################## #
# ### INSTANCES TEAR DOWN ###################### #
@@ -256,26 +277,27 @@ with models.DAG(
sql_instance_failover_replica_delete_task = CloudSQLDeleteInstanceOperator(
instance=FAILOVER_REPLICA_NAME,
task_id="sql_instance_failover_replica_delete_task",
+ trigger_rule=TriggerRule.ALL_DONE,
)
sql_instance_read_replica_delete_task = CloudSQLDeleteInstanceOperator(
- instance=READ_REPLICA_NAME,
task_id="sql_instance_read_replica_delete_task"
+ instance=READ_REPLICA_NAME,
+ task_id="sql_instance_read_replica_delete_task",
+ trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_operator_cloudsql_replicas_delete]
- sql_instance_failover_replica_delete_task.trigger_rule =
TriggerRule.ALL_DONE
- sql_instance_read_replica_delete_task.trigger_rule = TriggerRule.ALL_DONE
sql_instance_clone_delete_task = CloudSQLDeleteInstanceOperator(
instance=CLONED_INSTANCE_NAME,
task_id="sql_instance_clone_delete_task",
+ trigger_rule=TriggerRule.ALL_DONE,
)
# [START howto_operator_cloudsql_delete]
sql_instance_delete_task = CloudSQLDeleteInstanceOperator(
- instance=INSTANCE_NAME, task_id="sql_instance_delete_task"
+ instance=INSTANCE_NAME, task_id="sql_instance_delete_task",
trigger_rule=TriggerRule.ALL_DONE
)
# [END howto_operator_cloudsql_delete]
- sql_instance_delete_task.trigger_rule = TriggerRule.ALL_DONE
delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
@@ -292,6 +314,7 @@ with models.DAG(
>> sql_db_patch_task
>> sql_gcp_add_bucket_permission_task
>> sql_export_task
+ >> sql_export_def_task
>> sql_gcp_add_object_permission_task
>> sql_import_task
>> sql_instance_clone
@@ -304,10 +327,6 @@ with models.DAG(
>> delete_bucket
)
- # Task dependencies created via `XComArgs`:
- # sql_instance_create_task >> sql_gcp_add_bucket_permission_task
- # sql_instance_create_task >> sql_gcp_add_object_permission_task
-
# ### Everything below this line is not part of example ###
# ### Just for system tests purpose ###
from tests.system.utils.watcher import watcher
diff --git
a/tests/system/providers/google/cloud/cloud_sql/example_cloud_sql_deferrable.py
b/tests/system/providers/google/cloud/cloud_sql/example_cloud_sql_deferrable.py
deleted file mode 100644
index 7859d93870..0000000000
---
a/tests/system/providers/google/cloud/cloud_sql/example_cloud_sql_deferrable.py
+++ /dev/null
@@ -1,184 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-Example Airflow DAG that creates, patches and deletes a Cloud SQL instance,
and also
-creates, patches and deletes a database inside the instance, in Google Cloud.
-
-This DAG relies on the following OS environment variables
-https://airflow.apache.org/concepts.html#variables
-* GCP_PROJECT_ID - Google Cloud project for the Cloud SQL instance.
-* INSTANCE_NAME - Name of the Cloud SQL instance.
-* DB_NAME - Name of the database inside a Cloud SQL instance.
-"""
-from __future__ import annotations
-
-import os
-from datetime import datetime
-from urllib.parse import urlsplit
-
-from airflow import models
-from airflow.models.xcom_arg import XComArg
-from airflow.providers.google.cloud.operators.cloud_sql import (
- CloudSQLCreateInstanceDatabaseOperator,
- CloudSQLCreateInstanceOperator,
- CloudSQLDeleteInstanceDatabaseOperator,
- CloudSQLDeleteInstanceOperator,
- CloudSQLExportInstanceOperator,
-)
-from airflow.providers.google.cloud.operators.gcs import (
- GCSBucketCreateAclEntryOperator,
- GCSCreateBucketOperator,
- GCSDeleteBucketOperator,
-)
-from airflow.utils.trigger_rule import TriggerRule
-
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
-DAG_ID = "cloudsql-def"
-
-INSTANCE_NAME = f"{DAG_ID}-{ENV_ID}-instance"
-DB_NAME = f"{DAG_ID}-{ENV_ID}-db"
-
-BUCKET_NAME = f"{DAG_ID}_{ENV_ID}_bucket"
-FILE_NAME = f"{DAG_ID}_{ENV_ID}_exportImportTestFile"
-FILE_URI = f"gs://{BUCKET_NAME}/{FILE_NAME}"
-
-# Bodies below represent Cloud SQL instance resources:
-# https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances
-
-body = {
- "name": INSTANCE_NAME,
- "settings": {
- "tier": "db-n1-standard-1",
- "backupConfiguration": {"binaryLogEnabled": True, "enabled": True,
"startTime": "05:00"},
- "activationPolicy": "ALWAYS",
- "dataDiskSizeGb": 30,
- "dataDiskType": "PD_SSD",
- "databaseFlags": [],
- "ipConfiguration": {
- "ipv4Enabled": True,
- "requireSsl": True,
- },
- "locationPreference": {"zone": "europe-west4-a"},
- "maintenanceWindow": {"hour": 5, "day": 7, "updateTrack": "canary"},
- "pricingPlan": "PER_USE",
- "replicationType": "ASYNCHRONOUS",
- "storageAutoResize": True,
- "storageAutoResizeLimit": 0,
- "userLabels": {"my-key": "my-value"},
- },
- "databaseVersion": "MYSQL_5_7",
- "region": "europe-west4",
-}
-
-export_body = {
- "exportContext": {
- "fileType": "sql",
- "uri": FILE_URI,
- "sqlExportOptions": {"schemaOnly": False},
- "offload": True,
- }
-}
-
-db_create_body = {"instance": INSTANCE_NAME, "name": DB_NAME, "project":
PROJECT_ID}
-
-
-with models.DAG(
- DAG_ID,
- schedule="@once",
- start_date=datetime(2021, 1, 1),
- catchup=False,
- tags=["example", "cloud_sql"],
-) as dag:
- create_bucket = GCSCreateBucketOperator(task_id="create_bucket",
bucket_name=BUCKET_NAME)
-
- sql_instance_create_task = CloudSQLCreateInstanceOperator(
- body=body, instance=INSTANCE_NAME, task_id="sql_instance_create_task"
- )
-
- sql_db_create_task = CloudSQLCreateInstanceDatabaseOperator(
- body=db_create_body, instance=INSTANCE_NAME,
task_id="sql_db_create_task"
- )
-
- file_url_split = urlsplit(FILE_URI)
-
- # For export & import to work we need to add the Cloud SQL instance's
Service Account
- # write access to the destination GCS bucket.
- service_account_email = XComArg(sql_instance_create_task,
key="service_account_email")
-
- sql_gcp_add_bucket_permission_task = GCSBucketCreateAclEntryOperator(
- entity=f"user-{service_account_email}",
- role="WRITER",
- bucket=file_url_split[1], # netloc (bucket)
- task_id="sql_gcp_add_bucket_permission_task",
- )
-
- # [START howto_operator_cloudsql_export_async]
- sql_export_task = CloudSQLExportInstanceOperator(
- body=export_body,
- instance=INSTANCE_NAME,
- task_id="sql_export_task",
- deferrable=True,
- )
- # [END howto_operator_cloudsql_export_async]
-
- sql_db_delete_task = CloudSQLDeleteInstanceDatabaseOperator(
- instance=INSTANCE_NAME, database=DB_NAME, task_id="sql_db_delete_task"
- )
- sql_db_delete_task.trigger_rule = TriggerRule.ALL_DONE
-
- sql_instance_delete_task = CloudSQLDeleteInstanceOperator(
- instance=INSTANCE_NAME, task_id="sql_instance_delete_task"
- )
- sql_instance_delete_task.trigger_rule = TriggerRule.ALL_DONE
-
- delete_bucket = GCSDeleteBucketOperator(
- task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
- )
-
- (
- # TEST SETUP
- create_bucket
- # TEST BODY
- >> sql_instance_create_task
- >> sql_db_create_task
- >> sql_gcp_add_bucket_permission_task
- >> sql_export_task
- >> sql_db_delete_task
- >> sql_instance_delete_task
- # TEST TEARDOWN
- >> delete_bucket
- )
-
- # Task dependencies created via `XComArgs`:
- # sql_instance_create_task >> sql_gcp_add_bucket_permission_task
- # sql_instance_create_task >> sql_gcp_add_object_permission_task
-
- # ### Everything below this line is not part of example ###
- # ### Just for system tests purpose ###
- from tests.system.utils.watcher import watcher
-
- # This test needs watcher in order to properly mark success/failure
- # when "tearDown" task with trigger rule is part of the DAG
- list(dag.tasks) >> watcher()
-
-
-from tests.system.utils import get_test_run # noqa: E402
-
-# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
-test_run = get_test_run(dag)