This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f9bd53cacf Fix CloudFirestoreExportDatabaseOperator system test
(#32967)
f9bd53cacf is described below
commit f9bd53cacfc380928dd19f6b12fa7bfd90c6199d
Author: Beata Kossakowska <[email protected]>
AuthorDate: Wed Aug 2 14:43:24 2023 +0200
Fix CloudFirestoreExportDatabaseOperator system test (#32967)
Co-authored-by: Beata Kossakowska <[email protected]>
---
.../google/cloud/gcs/example_firestore.py | 106 ++++++++++++++-------
1 file changed, 69 insertions(+), 37 deletions(-)
diff --git a/tests/system/providers/google/cloud/gcs/example_firestore.py
b/tests/system/providers/google/cloud/gcs/example_firestore.py
index 12953e168d..8c3b95d7d1 100644
--- a/tests/system/providers/google/cloud/gcs/example_firestore.py
+++ b/tests/system/providers/google/cloud/gcs/example_firestore.py
@@ -16,30 +16,11 @@
# specific language governing permissions and limitations
# under the License.
"""
-Example Airflow DAG that shows interactions with Google Cloud Firestore.
+Example Airflow DAG showing export of database from Google Cloud Firestore to
Cloud Storage.
-Prerequisites
-=============
-
-This example uses two Google Cloud projects:
-
-* ``GCP_PROJECT_ID`` - It contains a bucket and a firestore database.
-* ``G_FIRESTORE_PROJECT_ID`` - it contains the Data Warehouse based on the
BigQuery service.
-
-Saving in a bucket should be possible from the ``G_FIRESTORE_PROJECT_ID``
project.
-Reading from a bucket should be possible from the ``GCP_PROJECT_ID`` project.
-
-The bucket and dataset should be located in the same region.
-
-If you want to run this example, you must do the following:
-
-1. Create Google Cloud project and enable the BigQuery API
-2. Create the Firebase project
-3. Create a bucket in the same location as the Firebase project
-4. Grant Firebase admin account permissions to manage BigQuery. This is
required to create a dataset.
-5. Create a bucket in Firebase project and
-6. Give read/write access for Firebase admin to bucket to step no. 5.
-7. Create collection in the Firestore database.
+This example creates collections in the default namespaces in Firestore, adds
some data to the collection
+and exports this database from Cloud Firestore to Cloud Storage.
+It then creates a table from the exported data in Bigquery and checks that the
data is in it.
"""
from __future__ import annotations
@@ -53,27 +34,35 @@ from airflow.providers.google.cloud.operators.bigquery
import (
BigQueryDeleteDatasetOperator,
BigQueryInsertJobOperator,
)
+from airflow.providers.google.cloud.operators.dataflow import
DataflowTemplatedJobStartOperator
+from airflow.providers.google.cloud.operators.datastore import (
+ CloudDatastoreBeginTransactionOperator,
+ CloudDatastoreCommitOperator,
+)
from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.firebase.operators.firestore import
CloudFirestoreExportDatabaseOperator
from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "example_gcp_firestore"
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-gcp-project")
-FIRESTORE_PROJECT_ID = os.environ.get("G_FIRESTORE_PROJECT_ID",
"example-firebase-project")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+DAG_ID = "example_firestore_to_gcp"
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
-EXPORT_DESTINATION_URL = os.environ.get("GCP_FIRESTORE_ARCHIVE_URL",
f"gs://{BUCKET_NAME}/namespace/")
-EXPORT_COLLECTION_ID = os.environ.get("GCP_FIRESTORE_COLLECTION_ID",
"firestore_collection_id")
+
+DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}".replace("-", "_")
+
+EXPORT_DESTINATION_URL = f"gs://{BUCKET_NAME}/namespace"
+EXPORT_COLLECTION_ID = f"collection_{DAG_ID}_{ENV_ID}".replace("-", "_")
EXTERNAL_TABLE_SOURCE_URI = (
f"{EXPORT_DESTINATION_URL}/all_namespaces/kind_{EXPORT_COLLECTION_ID}"
f"/all_namespaces_kind_{EXPORT_COLLECTION_ID}.export_metadata"
)
-DATASET_LOCATION = os.environ.get("GCP_FIRESTORE_DATASET_LOCATION", "EU")
+DATASET_LOCATION = "EU"
+KEYS = {
+ "partitionId": {"projectId": PROJECT_ID, "namespaceId": ""},
+ "path": {"kind": f"{EXPORT_COLLECTION_ID}"},
+}
-if BUCKET_NAME is None:
- raise ValueError("Bucket name is required. Please set
GCP_FIRESTORE_ARCHIVE_URL env variable.")
with models.DAG(
DAG_ID,
@@ -90,13 +79,36 @@ with models.DAG(
task_id="create_dataset",
dataset_id=DATASET_NAME,
location=DATASET_LOCATION,
- project_id=GCP_PROJECT_ID,
+ project_id=PROJECT_ID,
+ )
+
+ begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
+ task_id="begin_transaction_commit",
+ transaction_options={"readWrite": {}},
+ project_id=PROJECT_ID,
+ )
+
+ commit_task = CloudDatastoreCommitOperator(
+ task_id="commit_task",
+ body={
+ "mode": "TRANSACTIONAL",
+ "mutations": [
+ {
+ "insert": {
+ "key": KEYS,
+ "properties": {"string": {"stringValue": "test!"}},
+ }
+ }
+ ],
+ "transaction": begin_transaction_commit.output,
+ },
+ project_id=PROJECT_ID,
)
# [START howto_operator_export_database_to_gcs]
export_database_to_gcs = CloudFirestoreExportDatabaseOperator(
task_id="export_database_to_gcs",
- project_id=FIRESTORE_PROJECT_ID,
+ project_id=PROJECT_ID,
body={"outputUriPrefix": EXPORT_DESTINATION_URL, "collectionIds":
[EXPORT_COLLECTION_ID]},
)
# [END howto_operator_export_database_to_gcs]
@@ -107,7 +119,7 @@ with models.DAG(
bucket=BUCKET_NAME,
table_resource={
"tableReference": {
- "projectId": GCP_PROJECT_ID,
+ "projectId": PROJECT_ID,
"datasetId": DATASET_NAME,
"tableId": "firestore_data",
},
@@ -124,16 +136,33 @@ with models.DAG(
task_id="execute_query",
configuration={
"query": {
- "query": f"SELECT COUNT(*) FROM
`{GCP_PROJECT_ID}.{DATASET_NAME}.firestore_data`",
+ "query": f"SELECT COUNT(*) FROM
`{PROJECT_ID}.{DATASET_NAME}.firestore_data`",
"useLegacySql": False,
}
},
)
+ delete_entity = DataflowTemplatedJobStartOperator(
+ task_id="delete-entity-firestore",
+ project_id=PROJECT_ID,
+
template="gs://dataflow-templates/latest/Firestore_to_Firestore_Delete",
+ parameters={
+ "firestoreReadGqlQuery": f"SELECT * FROM {EXPORT_COLLECTION_ID}",
+ "firestoreReadProjectId": PROJECT_ID,
+ "firestoreDeleteProjectId": PROJECT_ID,
+ },
+ environment={
+ "tempLocation": f"gs://{BUCKET_NAME}/tmp",
+ },
+ location="us-central1",
+ append_job_name=False,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset",
dataset_id=DATASET_NAME,
- project_id=GCP_PROJECT_ID,
+ project_id=PROJECT_ID,
delete_contents=True,
trigger_rule=TriggerRule.ALL_DONE,
)
@@ -145,11 +174,14 @@ with models.DAG(
(
# TEST SETUP
[create_bucket, create_dataset]
+ >> begin_transaction_commit
+ >> commit_task
# TEST BODY
>> export_database_to_gcs
>> create_external_table_multiple_types
>> read_data_from_gcs_multiple_types
# TEST TEARDOWN
+ >> delete_entity
>> [delete_dataset, delete_bucket]
)