This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f9bd53cacf Fix CloudFirestoreExportDatabaseOperator system test 
(#32967)
f9bd53cacf is described below

commit f9bd53cacfc380928dd19f6b12fa7bfd90c6199d
Author: Beata Kossakowska <[email protected]>
AuthorDate: Wed Aug 2 14:43:24 2023 +0200

    Fix CloudFirestoreExportDatabaseOperator system test (#32967)
    
    Co-authored-by: Beata Kossakowska <[email protected]>
---
 .../google/cloud/gcs/example_firestore.py          | 106 ++++++++++++++-------
 1 file changed, 69 insertions(+), 37 deletions(-)

diff --git a/tests/system/providers/google/cloud/gcs/example_firestore.py 
b/tests/system/providers/google/cloud/gcs/example_firestore.py
index 12953e168d..8c3b95d7d1 100644
--- a/tests/system/providers/google/cloud/gcs/example_firestore.py
+++ b/tests/system/providers/google/cloud/gcs/example_firestore.py
@@ -16,30 +16,11 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example Airflow DAG that shows interactions with Google Cloud Firestore.
+Example Airflow DAG showing export of database from Google Cloud Firestore to 
Cloud Storage.
 
-Prerequisites
-=============
-
-This example uses two Google Cloud projects:
-
-* ``GCP_PROJECT_ID`` - It contains a bucket and a firestore database.
-* ``G_FIRESTORE_PROJECT_ID`` - it contains the Data Warehouse based on the 
BigQuery service.
-
-Saving in a bucket should be possible from the ``G_FIRESTORE_PROJECT_ID`` 
project.
-Reading from a bucket should be possible from the ``GCP_PROJECT_ID`` project.
-
-The bucket and dataset should be located in the same region.
-
-If you want to run this example, you must do the following:
-
-1. Create Google Cloud project and enable the BigQuery API
-2. Create the Firebase project
-3. Create a bucket in the same location as the Firebase project
-4. Grant Firebase admin account permissions to manage BigQuery. This is 
required to create a dataset.
-5. Create a bucket in Firebase project and
-6. Give read/write access for Firebase admin to bucket to step no. 5.
-7. Create collection in the Firestore database.
+This example creates collections in the default namespaces in Firestore, adds 
some data to the collection
+and exports this database from Cloud Firestore to Cloud Storage.
+It then creates a table from the exported data in Bigquery and checks that the 
data is in it.
 """
 from __future__ import annotations
 
@@ -53,27 +34,35 @@ from airflow.providers.google.cloud.operators.bigquery 
import (
     BigQueryDeleteDatasetOperator,
     BigQueryInsertJobOperator,
 )
+from airflow.providers.google.cloud.operators.dataflow import 
DataflowTemplatedJobStartOperator
+from airflow.providers.google.cloud.operators.datastore import (
+    CloudDatastoreBeginTransactionOperator,
+    CloudDatastoreCommitOperator,
+)
 from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
 from airflow.providers.google.firebase.operators.firestore import 
CloudFirestoreExportDatabaseOperator
 from airflow.utils.trigger_rule import TriggerRule
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "example_gcp_firestore"
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-gcp-project")
-FIRESTORE_PROJECT_ID = os.environ.get("G_FIRESTORE_PROJECT_ID", 
"example-firebase-project")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+DAG_ID = "example_firestore_to_gcp"
 
 BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
-EXPORT_DESTINATION_URL = os.environ.get("GCP_FIRESTORE_ARCHIVE_URL", 
f"gs://{BUCKET_NAME}/namespace/")
-EXPORT_COLLECTION_ID = os.environ.get("GCP_FIRESTORE_COLLECTION_ID", 
"firestore_collection_id")
+
+DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}".replace("-", "_")
+
+EXPORT_DESTINATION_URL = f"gs://{BUCKET_NAME}/namespace"
+EXPORT_COLLECTION_ID = f"collection_{DAG_ID}_{ENV_ID}".replace("-", "_")
 EXTERNAL_TABLE_SOURCE_URI = (
     f"{EXPORT_DESTINATION_URL}/all_namespaces/kind_{EXPORT_COLLECTION_ID}"
     f"/all_namespaces_kind_{EXPORT_COLLECTION_ID}.export_metadata"
 )
-DATASET_LOCATION = os.environ.get("GCP_FIRESTORE_DATASET_LOCATION", "EU")
+DATASET_LOCATION = "EU"
+KEYS = {
+    "partitionId": {"projectId": PROJECT_ID, "namespaceId": ""},
+    "path": {"kind": f"{EXPORT_COLLECTION_ID}"},
+}
 
-if BUCKET_NAME is None:
-    raise ValueError("Bucket name is required. Please set 
GCP_FIRESTORE_ARCHIVE_URL env variable.")
 
 with models.DAG(
     DAG_ID,
@@ -90,13 +79,36 @@ with models.DAG(
         task_id="create_dataset",
         dataset_id=DATASET_NAME,
         location=DATASET_LOCATION,
-        project_id=GCP_PROJECT_ID,
+        project_id=PROJECT_ID,
+    )
+
+    begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
+        task_id="begin_transaction_commit",
+        transaction_options={"readWrite": {}},
+        project_id=PROJECT_ID,
+    )
+
+    commit_task = CloudDatastoreCommitOperator(
+        task_id="commit_task",
+        body={
+            "mode": "TRANSACTIONAL",
+            "mutations": [
+                {
+                    "insert": {
+                        "key": KEYS,
+                        "properties": {"string": {"stringValue": "test!"}},
+                    }
+                }
+            ],
+            "transaction": begin_transaction_commit.output,
+        },
+        project_id=PROJECT_ID,
     )
 
     # [START howto_operator_export_database_to_gcs]
     export_database_to_gcs = CloudFirestoreExportDatabaseOperator(
         task_id="export_database_to_gcs",
-        project_id=FIRESTORE_PROJECT_ID,
+        project_id=PROJECT_ID,
         body={"outputUriPrefix": EXPORT_DESTINATION_URL, "collectionIds": 
[EXPORT_COLLECTION_ID]},
     )
     # [END howto_operator_export_database_to_gcs]
@@ -107,7 +119,7 @@ with models.DAG(
         bucket=BUCKET_NAME,
         table_resource={
             "tableReference": {
-                "projectId": GCP_PROJECT_ID,
+                "projectId": PROJECT_ID,
                 "datasetId": DATASET_NAME,
                 "tableId": "firestore_data",
             },
@@ -124,16 +136,33 @@ with models.DAG(
         task_id="execute_query",
         configuration={
             "query": {
-                "query": f"SELECT COUNT(*) FROM 
`{GCP_PROJECT_ID}.{DATASET_NAME}.firestore_data`",
+                "query": f"SELECT COUNT(*) FROM 
`{PROJECT_ID}.{DATASET_NAME}.firestore_data`",
                 "useLegacySql": False,
             }
         },
     )
 
+    delete_entity = DataflowTemplatedJobStartOperator(
+        task_id="delete-entity-firestore",
+        project_id=PROJECT_ID,
+        
template="gs://dataflow-templates/latest/Firestore_to_Firestore_Delete",
+        parameters={
+            "firestoreReadGqlQuery": f"SELECT * FROM {EXPORT_COLLECTION_ID}",
+            "firestoreReadProjectId": PROJECT_ID,
+            "firestoreDeleteProjectId": PROJECT_ID,
+        },
+        environment={
+            "tempLocation": f"gs://{BUCKET_NAME}/tmp",
+        },
+        location="us-central1",
+        append_job_name=False,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
     delete_dataset = BigQueryDeleteDatasetOperator(
         task_id="delete_dataset",
         dataset_id=DATASET_NAME,
-        project_id=GCP_PROJECT_ID,
+        project_id=PROJECT_ID,
         delete_contents=True,
         trigger_rule=TriggerRule.ALL_DONE,
     )
@@ -145,11 +174,14 @@ with models.DAG(
     (
         # TEST SETUP
         [create_bucket, create_dataset]
+        >> begin_transaction_commit
+        >> commit_task
         # TEST BODY
         >> export_database_to_gcs
         >> create_external_table_multiple_types
         >> read_data_from_gcs_multiple_types
         # TEST TEARDOWN
+        >> delete_entity
         >> [delete_dataset, delete_bucket]
     )
 

Reply via email to