This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fb51e04cfd Migrate Google firestore example to new design AIP-47
(#24830)
fb51e04cfd is described below
commit fb51e04cfdbe1a5f80eb51d9ffc3db4bb57a7c34
Author: Chenglong Yan <[email protected]>
AuthorDate: Tue Jul 12 18:54:35 2022 +0800
Migrate Google firestore example to new design AIP-47 (#24830)
related: #22447, #22430
---
.../google/firebase/example_dags/__init__.py | 16 -----
.../example-dags.rst | 2 +-
.../operators/firebase/firestore.rst | 2 +-
.../firebase/operators/test_firestore_system.py | 48 --------------
.../google/cloud/gcs}/example_firestore.py | 73 +++++++++++++++-------
5 files changed, 52 insertions(+), 89 deletions(-)
diff --git a/airflow/providers/google/firebase/example_dags/__init__.py
b/airflow/providers/google/firebase/example_dags/__init__.py
deleted file mode 100644
index 13a83393a9..0000000000
--- a/airflow/providers/google/firebase/example_dags/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/docs/apache-airflow-providers-google/example-dags.rst
b/docs/apache-airflow-providers-google/example-dags.rst
index 6c11c5faca..970e6b12b4 100644
--- a/docs/apache-airflow-providers-google/example-dags.rst
+++ b/docs/apache-airflow-providers-google/example-dags.rst
@@ -23,7 +23,7 @@ You can learn how to use Google integrations by analyzing the
source code of the
* `Google Ads
<https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/ads/example_dags>`__
* `Google Cloud (legacy)
<https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/cloud/example_dags>`__
* `Google Cloud
<https://github.com/apache/airflow/tree/providers-google/8.0.0/tests/system/providers/google/cloud>`__
-* `Google Firebase
<https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/firebase/example_dags>`__
+* `Google Firebase
<https://github.com/apache/airflow/tree/providers-google/8.1.0/tests/system/providers/google/firebase>`__
* `Google Marketing Platform
<https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/marketing_platform/example_dags>`__
* `Google Workplace
<https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/suite/example_dags>`__
(formerly Google Suite)
* `Google LevelDB
<https://github.com/apache/airflow/tree/providers-google/8.0.0/tests/system/providers/google/leveldb>`__
diff --git
a/docs/apache-airflow-providers-google/operators/firebase/firestore.rst
b/docs/apache-airflow-providers-google/operators/firebase/firestore.rst
index 4147e0134f..bbfc67f652 100644
--- a/docs/apache-airflow-providers-google/operators/firebase/firestore.rst
+++ b/docs/apache-airflow-providers-google/operators/firebase/firestore.rst
@@ -41,7 +41,7 @@ Export database
Exports a copy of all or a subset of documents from Google Cloud Firestore to
Google Cloud Storage is performed with the
:class:`~airflow.providers.google.firebase.operators.firestore.CloudFirestoreExportDatabaseOperator`
operator.
-.. exampleinclude::
/../../airflow/providers/google/firebase/example_dags/example_firestore.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/gcs/example_firestore.py
:language: python
:dedent: 4
:start-after: [START howto_operator_export_database_to_gcs]
diff --git a/tests/providers/google/firebase/operators/test_firestore_system.py
b/tests/providers/google/firebase/operators/test_firestore_system.py
deleted file mode 100644
index 9a42a1d130..0000000000
--- a/tests/providers/google/firebase/operators/test_firestore_system.py
+++ /dev/null
@@ -1,48 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-import pytest
-
-from airflow.providers.google.firebase.example_dags.example_firestore import (
- DATASET_NAME,
- EXPORT_DESTINATION_URL,
-)
-from tests.providers.google.cloud.utils.gcp_authenticator import G_FIREBASE_KEY
-from tests.test_utils.gcp_system_helpers import FIREBASE_DAG_FOLDER,
GoogleSystemTest, provide_gcp_context
-
-
[email protected]("google.firebase")
[email protected]("google.cloud")
[email protected]_file(G_FIREBASE_KEY)
-class CloudFirestoreSystemTest(GoogleSystemTest):
- def setUp(self):
- super().setUp()
- self.clean_up()
-
- def tearDown(self):
- self.clean_up()
- super().tearDown()
-
- def clean_up(self):
- self.execute_with_ctx(["gsutil", "rm", "-r",
f"{EXPORT_DESTINATION_URL}"], G_FIREBASE_KEY)
- self.execute_with_ctx(
- ["bq", "rm", '--force', '--recursive',
f"{self._project_id()}:{DATASET_NAME}"], G_FIREBASE_KEY
- )
-
- @provide_gcp_context(G_FIREBASE_KEY)
- def test_run_example_dag(self):
- self.run_dag('example_google_firestore', FIREBASE_DAG_FOLDER)
diff --git
a/airflow/providers/google/firebase/example_dags/example_firestore.py
b/tests/system/providers/google/cloud/gcs/example_firestore.py
similarity index 76%
rename from airflow/providers/google/firebase/example_dags/example_firestore.py
rename to tests/system/providers/google/cloud/gcs/example_firestore.py
index 19e0071fef..088bf5fc87 100644
--- a/airflow/providers/google/firebase/example_dags/example_firestore.py
+++ b/tests/system/providers/google/cloud/gcs/example_firestore.py
@@ -48,43 +48,39 @@ from datetime import datetime
from urllib.parse import urlparse
from airflow import models
-from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator,
BigQueryCreateExternalTableOperator,
BigQueryDeleteDatasetOperator,
BigQueryInsertJobOperator,
)
+from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.firebase.operators.firestore import
CloudFirestoreExportDatabaseOperator
+from airflow.utils.trigger_rule import TriggerRule
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_google_firestore"
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-gcp-project")
FIRESTORE_PROJECT_ID = os.environ.get("G_FIRESTORE_PROJECT_ID",
"example-firebase-project")
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
EXPORT_DESTINATION_URL = os.environ.get("GCP_FIRESTORE_ARCHIVE_URL",
"gs://INVALID BUCKET NAME/namespace/")
-BUCKET_NAME = urlparse(EXPORT_DESTINATION_URL).hostname
EXPORT_PREFIX = urlparse(EXPORT_DESTINATION_URL).path
-
EXPORT_COLLECTION_ID = os.environ.get("GCP_FIRESTORE_COLLECTION_ID",
"firestore_collection_id")
-DATASET_NAME = os.environ.get("GCP_FIRESTORE_DATASET_NAME",
"test_firestore_export")
DATASET_LOCATION = os.environ.get("GCP_FIRESTORE_DATASET_LOCATION", "EU")
if BUCKET_NAME is None:
raise ValueError("Bucket name is required. Please set
GCP_FIRESTORE_ARCHIVE_URL env variable.")
with models.DAG(
- "example_google_firestore",
+ DAG_ID,
start_date=datetime(2021, 1, 1),
schedule_interval='@once',
catchup=False,
- tags=["example"],
+ tags=["example", "firestore"],
) as dag:
- # [START howto_operator_export_database_to_gcs]
- export_database_to_gcs = CloudFirestoreExportDatabaseOperator(
- task_id="export_database_to_gcs",
- project_id=FIRESTORE_PROJECT_ID,
- body={"outputUriPrefix": EXPORT_DESTINATION_URL, "collectionIds":
[EXPORT_COLLECTION_ID]},
- )
- # [END howto_operator_export_database_to_gcs]
+ create_bucket = GCSCreateBucketOperator(task_id="create_bucket",
bucket_name=BUCKET_NAME)
create_dataset = BigQueryCreateEmptyDatasetOperator(
task_id="create_dataset",
@@ -93,9 +89,13 @@ with models.DAG(
project_id=GCP_PROJECT_ID,
)
- delete_dataset = BigQueryDeleteDatasetOperator(
- task_id="delete_dataset", dataset_id=DATASET_NAME,
project_id=GCP_PROJECT_ID, delete_contents=True
+ # [START howto_operator_export_database_to_gcs]
+ export_database_to_gcs = CloudFirestoreExportDatabaseOperator(
+ task_id="export_database_to_gcs",
+ project_id=FIRESTORE_PROJECT_ID,
+ body={"outputUriPrefix": EXPORT_DESTINATION_URL, "collectionIds":
[EXPORT_COLLECTION_ID]},
)
+ # [END howto_operator_export_database_to_gcs]
# [START howto_operator_create_external_table_multiple_types]
create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
@@ -132,12 +132,39 @@ with models.DAG(
},
)
- chain(
- # Firestore
- export_database_to_gcs,
- # BigQuery
- create_dataset,
- create_external_table_multiple_types,
- read_data_from_gcs_multiple_types,
- delete_dataset,
+ delete_dataset = BigQueryDeleteDatasetOperator(
+ task_id="delete_dataset",
+ dataset_id=DATASET_NAME,
+ project_id=GCP_PROJECT_ID,
+ delete_contents=True,
+ trigger_rule=TriggerRule.ALL_DONE,
)
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ >> create_dataset
+ # TEST BODY
+ >> export_database_to_gcs
+ >> create_external_table_multiple_types
+ >> read_data_from_gcs_multiple_types
+ # TEST TEARDOWN
+ >> delete_dataset
+ >> delete_bucket
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)