This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 04f674c13b Migrate Bigtable operators system tests according to AIP-47
(#26911)
04f674c13b is described below
commit 04f674c13bca9af78b65a643240da53aa556e2e1
Author: Bartłomiej Hirsz <[email protected]>
AuthorDate: Thu Oct 27 10:17:57 2022 +0200
Migrate Bigtable operators system tests according to AIP-47 (#26911)
Change-Id: Id9069e5e6453ae4c520dfc485c64c8b944dc9279
Co-authored-by: Bartlomiej Hirsz <[email protected]>
---
.../operators/cloud/bigtable.rst | 14 +-
.../google/cloud/operators/test_bigtable_system.py | 53 -----
.../providers/google/cloud/bigtable/__init__.py | 16 ++
.../google/cloud/bigtable}/example_bigtable.py | 213 +++++++++++----------
4 files changed, 137 insertions(+), 159 deletions(-)
diff --git a/docs/apache-airflow-providers-google/operators/cloud/bigtable.rst
b/docs/apache-airflow-providers-google/operators/cloud/bigtable.rst
index 5e8653e23c..9425d265de 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/bigtable.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/bigtable.rst
@@ -42,7 +42,7 @@ Using the operator
You can create the operator with or without project id. If project id is
missing
it will be retrieved from the Google Cloud connection used. Both variants are
shown:
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_bigtable.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/bigtable/example_bigtable.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_bigtable_instance_create]
@@ -65,7 +65,7 @@ Using the operator
You can create the operator with or without project id. If project id is
missing
it will be retrieved from the Google Cloud connection used. Both variants are
shown:
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_bigtable.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/bigtable/example_bigtable.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_bigtable_instance_update]
@@ -85,7 +85,7 @@ Using the operator
You can create the operator with or without project id. If project id is
missing
it will be retrieved from the Google Cloud connection used. Both variants are
shown:
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_bigtable.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/bigtable/example_bigtable.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_bigtable_instance_delete]
@@ -105,7 +105,7 @@ Using the operator
You can create the operator with or without project id. If project id is
missing
it will be retrieved from the Google Cloud connection used. Both variants are
shown:
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_bigtable.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/bigtable/example_bigtable.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_bigtable_cluster_update]
@@ -129,7 +129,7 @@ Using the operator
You can create the operator with or without project id. If project id is
missing
it will be retrieved from the Google Cloud connection used. Both variants are
shown:
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_bigtable.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/bigtable/example_bigtable.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_bigtable_table_create]
@@ -157,7 +157,7 @@ Using the operator
You can create the operator with or without project id. If project id is
missing
it will be retrieved from the Google Cloud connection used. Both variants are
shown:
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_bigtable.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/bigtable/example_bigtable.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_bigtable_table_delete]
@@ -182,7 +182,7 @@ timeout hits and does not raise any exception.
Using the operator
""""""""""""""""""
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_bigtable.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/bigtable/example_bigtable.py
:language: python
:dedent: 4
:start-after: [START
howto_operator_gcp_bigtable_table_wait_for_replication]
diff --git a/tests/providers/google/cloud/operators/test_bigtable_system.py
b/tests/providers/google/cloud/operators/test_bigtable_system.py
deleted file mode 100644
index e1e4844657..0000000000
--- a/tests/providers/google/cloud/operators/test_bigtable_system.py
+++ /dev/null
@@ -1,53 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import pytest
-
-from airflow.providers.google.cloud.example_dags.example_bigtable import
CBT_INSTANCE_ID, GCP_PROJECT_ID
-from tests.providers.google.cloud.utils.gcp_authenticator import
GCP_BIGTABLE_KEY
-from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER,
GoogleSystemTest, provide_gcp_context
-
-
[email protected]("mysql", "postgres")
[email protected]_file(GCP_BIGTABLE_KEY)
-class BigTableExampleDagsSystemTest(GoogleSystemTest):
- def setUp(self):
- super().setUp()
-
- @provide_gcp_context(GCP_BIGTABLE_KEY)
- def test_run_example_dag_gcs_bigtable(self):
- self.run_dag("example_gcp_bigtable_operators", CLOUD_DAG_FOLDER)
-
- @provide_gcp_context(GCP_BIGTABLE_KEY)
- def tearDown(self):
- self.execute_with_ctx(
- [
- "gcloud",
- "bigtable",
- "--project",
- GCP_PROJECT_ID,
- "--quiet",
- "--verbosity=none",
- "instances",
- "delete",
- CBT_INSTANCE_ID,
- ],
- key=GCP_BIGTABLE_KEY,
- )
- super().tearDown()
diff --git a/tests/system/providers/google/cloud/bigtable/__init__.py
b/tests/system/providers/google/cloud/bigtable/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/bigtable/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/google/cloud/example_dags/example_bigtable.py
b/tests/system/providers/google/cloud/bigtable/example_bigtable.py
similarity index 52%
rename from airflow/providers/google/cloud/example_dags/example_bigtable.py
rename to tests/system/providers/google/cloud/bigtable/example_bigtable.py
index 81c3639d51..96a3c1c450 100644
--- a/airflow/providers/google/cloud/example_dags/example_bigtable.py
+++ b/tests/system/providers/google/cloud/bigtable/example_bigtable.py
@@ -43,11 +43,11 @@ This DAG relies on the following environment variables:
"""
from __future__ import annotations
-import json
+import os
from datetime import datetime
-from os import getenv
from airflow import models
+from airflow.decorators import task_group
from airflow.providers.google.cloud.operators.bigtable import (
BigtableCreateInstanceOperator,
BigtableCreateTableOperator,
@@ -57,43 +57,47 @@ from airflow.providers.google.cloud.operators.bigtable
import (
BigtableUpdateInstanceOperator,
)
from airflow.providers.google.cloud.sensors.bigtable import
BigtableTableReplicationCompletedSensor
+from airflow.utils.trigger_rule import TriggerRule
-GCP_PROJECT_ID = getenv("GCP_PROJECT_ID", "example-project")
-CBT_INSTANCE_ID = getenv("GCP_BIG_TABLE_INSTANCE_ID", "some-instance-id")
-CBT_INSTANCE_DISPLAY_NAME = getenv("GCP_BIG_TABLE_INSTANCE_DISPLAY_NAME",
"Human-readable name")
-CBT_INSTANCE_DISPLAY_NAME_UPDATED = getenv(
- "GCP_BIG_TABLE_INSTANCE_DISPLAY_NAME_UPDATED",
f"{CBT_INSTANCE_DISPLAY_NAME} - updated"
-)
-CBT_INSTANCE_TYPE = getenv("GCP_BIG_TABLE_INSTANCE_TYPE", "2")
-CBT_INSTANCE_TYPE_PROD = getenv("GCP_BIG_TABLE_INSTANCE_TYPE_PROD", "1")
-CBT_INSTANCE_LABELS = getenv("GCP_BIG_TABLE_INSTANCE_LABELS", "{}")
-CBT_INSTANCE_LABELS_UPDATED = getenv("GCP_BIG_TABLE_INSTANCE_LABELS_UPDATED",
'{"env": "prod"}')
-CBT_CLUSTER_ID = getenv("GCP_BIG_TABLE_CLUSTER_ID", "some-cluster-id")
-CBT_CLUSTER_ZONE = getenv("GCP_BIG_TABLE_CLUSTER_ZONE", "europe-west1-b")
-CBT_CLUSTER_NODES = getenv("GCP_BIG_TABLE_CLUSTER_NODES", "3")
-CBT_CLUSTER_NODES_UPDATED = getenv("GCP_BIG_TABLE_CLUSTER_NODES_UPDATED", "5")
-CBT_CLUSTER_STORAGE_TYPE = getenv("GCP_BIG_TABLE_CLUSTER_STORAGE_TYPE", "2")
-CBT_TABLE_ID = getenv("GCP_BIG_TABLE_TABLE_ID", "some-table-id")
-CBT_POKE_INTERVAL = getenv("GCP_BIG_TABLE_POKE_INTERVAL", "60")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+DAG_ID = "bigtable"
+
+
+CBT_INSTANCE_ID = f"bigtable-instance-id-{ENV_ID}"
+CBT_INSTANCE_DISPLAY_NAME = "Instance-name"
+CBT_INSTANCE_DISPLAY_NAME_UPDATED = f"{CBT_INSTANCE_DISPLAY_NAME} - updated"
+CBT_INSTANCE_TYPE = 2
+CBT_INSTANCE_TYPE_PROD = 1
+CBT_INSTANCE_LABELS = {}
+CBT_INSTANCE_LABELS_UPDATED = {"env": "prod"}
+CBT_CLUSTER_ID = f"bigtable-cluster-id-{ENV_ID}"
+CBT_CLUSTER_ZONE = "europe-west1-b"
+CBT_CLUSTER_NODES = 3
+CBT_CLUSTER_NODES_UPDATED = 5
+CBT_CLUSTER_STORAGE_TYPE = 2
+CBT_TABLE_ID = f"bigtable-table-id{ENV_ID}"
+CBT_POKE_INTERVAL = 60
with models.DAG(
- "example_gcp_bigtable_operators",
+ DAG_ID,
+ schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example"],
+ tags=["bigtable", "example"],
) as dag:
# [START howto_operator_gcp_bigtable_instance_create]
create_instance_task = BigtableCreateInstanceOperator(
- project_id=GCP_PROJECT_ID,
+ project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
main_cluster_id=CBT_CLUSTER_ID,
main_cluster_zone=CBT_CLUSTER_ZONE,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
- instance_type=int(CBT_INSTANCE_TYPE),
- instance_labels=json.loads(CBT_INSTANCE_LABELS),
+ instance_type=CBT_INSTANCE_TYPE,
+ instance_labels=CBT_INSTANCE_LABELS,
cluster_nodes=None,
- cluster_storage_type=int(CBT_CLUSTER_STORAGE_TYPE),
+ cluster_storage_type=CBT_CLUSTER_STORAGE_TYPE,
task_id="create_instance_task",
)
create_instance_task2 = BigtableCreateInstanceOperator(
@@ -101,82 +105,66 @@ with models.DAG(
main_cluster_id=CBT_CLUSTER_ID,
main_cluster_zone=CBT_CLUSTER_ZONE,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
- instance_type=int(CBT_INSTANCE_TYPE),
- instance_labels=json.loads(CBT_INSTANCE_LABELS),
- cluster_nodes=int(CBT_CLUSTER_NODES),
- cluster_storage_type=int(CBT_CLUSTER_STORAGE_TYPE),
+ instance_type=CBT_INSTANCE_TYPE,
+ instance_labels=CBT_INSTANCE_LABELS,
+ cluster_nodes=CBT_CLUSTER_NODES,
+ cluster_storage_type=CBT_CLUSTER_STORAGE_TYPE,
task_id="create_instance_task2",
)
- create_instance_task >> create_instance_task2
# [END howto_operator_gcp_bigtable_instance_create]
- # [START howto_operator_gcp_bigtable_instance_update]
- update_instance_task = BigtableUpdateInstanceOperator(
- instance_id=CBT_INSTANCE_ID,
- instance_display_name=CBT_INSTANCE_DISPLAY_NAME_UPDATED,
- instance_type=int(CBT_INSTANCE_TYPE_PROD),
- instance_labels=json.loads(CBT_INSTANCE_LABELS_UPDATED),
- task_id="update_instance_task",
- )
- # [END howto_operator_gcp_bigtable_instance_update]
+ @task_group()
+ def create_tables():
+ # [START howto_operator_gcp_bigtable_table_create]
+ create_table_task = BigtableCreateTableOperator(
+ project_id=PROJECT_ID,
+ instance_id=CBT_INSTANCE_ID,
+ table_id=CBT_TABLE_ID,
+ task_id="create_table",
+ )
+ create_table_task2 = BigtableCreateTableOperator(
+ instance_id=CBT_INSTANCE_ID,
+ table_id=CBT_TABLE_ID,
+ task_id="create_table_task2",
+ )
+ # [END howto_operator_gcp_bigtable_table_create]
+ create_table_task >> create_table_task2
- # [START howto_operator_gcp_bigtable_cluster_update]
- cluster_update_task = BigtableUpdateClusterOperator(
- project_id=GCP_PROJECT_ID,
- instance_id=CBT_INSTANCE_ID,
- cluster_id=CBT_CLUSTER_ID,
- nodes=int(CBT_CLUSTER_NODES_UPDATED),
- task_id="update_cluster_task",
- )
- cluster_update_task2 = BigtableUpdateClusterOperator(
- instance_id=CBT_INSTANCE_ID,
- cluster_id=CBT_CLUSTER_ID,
- nodes=int(CBT_CLUSTER_NODES_UPDATED),
- task_id="update_cluster_task2",
- )
- cluster_update_task >> cluster_update_task2
- # [END howto_operator_gcp_bigtable_cluster_update]
+ @task_group()
+ def update_clusters_and_instance():
+ # [START howto_operator_gcp_bigtable_cluster_update]
+ cluster_update_task = BigtableUpdateClusterOperator(
+ project_id=PROJECT_ID,
+ instance_id=CBT_INSTANCE_ID,
+ cluster_id=CBT_CLUSTER_ID,
+ nodes=CBT_CLUSTER_NODES_UPDATED,
+ task_id="update_cluster_task",
+ )
+ cluster_update_task2 = BigtableUpdateClusterOperator(
+ instance_id=CBT_INSTANCE_ID,
+ cluster_id=CBT_CLUSTER_ID,
+ nodes=CBT_CLUSTER_NODES_UPDATED,
+ task_id="update_cluster_task2",
+ )
+ # [END howto_operator_gcp_bigtable_cluster_update]
- # [START howto_operator_gcp_bigtable_instance_delete]
- delete_instance_task = BigtableDeleteInstanceOperator(
- project_id=GCP_PROJECT_ID,
- instance_id=CBT_INSTANCE_ID,
- task_id="delete_instance_task",
- )
- delete_instance_task2 = BigtableDeleteInstanceOperator(
- instance_id=CBT_INSTANCE_ID,
- task_id="delete_instance_task2",
- )
- # [END howto_operator_gcp_bigtable_instance_delete]
+ # [START howto_operator_gcp_bigtable_instance_update]
+ update_instance_task = BigtableUpdateInstanceOperator(
+ instance_id=CBT_INSTANCE_ID,
+ instance_display_name=CBT_INSTANCE_DISPLAY_NAME_UPDATED,
+ instance_type=CBT_INSTANCE_TYPE_PROD,
+ instance_labels=CBT_INSTANCE_LABELS_UPDATED,
+ task_id="update_instance_task",
+ )
+ # [END howto_operator_gcp_bigtable_instance_update]
- # [START howto_operator_gcp_bigtable_table_create]
- create_table_task = BigtableCreateTableOperator(
- project_id=GCP_PROJECT_ID,
- instance_id=CBT_INSTANCE_ID,
- table_id=CBT_TABLE_ID,
- task_id="create_table",
- )
- create_table_task2 = BigtableCreateTableOperator(
- instance_id=CBT_INSTANCE_ID,
- table_id=CBT_TABLE_ID,
- task_id="create_table_task2",
- )
- create_table_task >> create_table_task2
- # [END howto_operator_gcp_bigtable_table_create]
+ [cluster_update_task, cluster_update_task2] >> update_instance_task
# [START howto_operator_gcp_bigtable_table_wait_for_replication]
wait_for_table_replication_task = BigtableTableReplicationCompletedSensor(
- project_id=GCP_PROJECT_ID,
- instance_id=CBT_INSTANCE_ID,
- table_id=CBT_TABLE_ID,
- poke_interval=int(CBT_POKE_INTERVAL),
- timeout=180,
- task_id="wait_for_table_replication_task",
- )
- wait_for_table_replication_task2 = BigtableTableReplicationCompletedSensor(
instance_id=CBT_INSTANCE_ID,
table_id=CBT_TABLE_ID,
- poke_interval=int(CBT_POKE_INTERVAL),
+ poke_interval=CBT_POKE_INTERVAL,
timeout=180,
task_id="wait_for_table_replication_task2",
)
@@ -184,7 +172,7 @@ with models.DAG(
# [START howto_operator_gcp_bigtable_table_delete]
delete_table_task = BigtableDeleteTableOperator(
- project_id=GCP_PROJECT_ID,
+ project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
table_id=CBT_TABLE_ID,
task_id="delete_table_task",
@@ -195,14 +183,41 @@ with models.DAG(
task_id="delete_table_task2",
)
# [END howto_operator_gcp_bigtable_table_delete]
+ delete_table_task.trigger_rule = TriggerRule.ALL_DONE
+ delete_table_task2.trigger_rule = TriggerRule.ALL_DONE
+
+ # [START howto_operator_gcp_bigtable_instance_delete]
+ delete_instance_task = BigtableDeleteInstanceOperator(
+ project_id=PROJECT_ID,
+ instance_id=CBT_INSTANCE_ID,
+ task_id="delete_instance_task",
+ )
+ delete_instance_task2 = BigtableDeleteInstanceOperator(
+ instance_id=CBT_INSTANCE_ID,
+ task_id="delete_instance_task2",
+ )
+ # [END howto_operator_gcp_bigtable_instance_delete]
+ delete_instance_task.trigger_rule = TriggerRule.ALL_DONE
+ delete_instance_task2.trigger_rule = TriggerRule.ALL_DONE
+
+ (
+ [create_instance_task, create_instance_task2]
+ >> create_tables()
+ >> wait_for_table_replication_task
+ >> update_clusters_and_instance()
+ >> delete_table_task
+ >> delete_table_task2
+ >> [delete_instance_task, delete_instance_task2]
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
- wait_for_table_replication_task >> delete_table_task
- wait_for_table_replication_task2 >> delete_table_task
- wait_for_table_replication_task >> delete_table_task2
- wait_for_table_replication_task2 >> delete_table_task2
- create_instance_task >> create_table_task >> cluster_update_task
- cluster_update_task >> update_instance_task >> delete_table_task
- create_instance_task2 >> create_table_task2 >> cluster_update_task2 >>
delete_table_task2
+from tests.system.utils import get_test_run # noqa: E402
- # Only delete instances after all tables are deleted
- [delete_table_task, delete_table_task2] >> delete_instance_task >>
delete_instance_task2
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)