This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cad983dac9 Fix datastore system tests. (#34215)
cad983dac9 is described below

commit cad983dac9f240a2d0658531235acfd003b64760
Author: Beata Kossakowska <[email protected]>
AuthorDate: Mon Sep 11 12:27:50 2023 +0200

    Fix datastore system tests. (#34215)
    
    Co-authored-by: Beata Kossakowska <[email protected]>
---
 .../operators/cloud/datastore.rst                  |   8 +-
 .../cloud/datastore/example_datastore_commit.py    |  70 +++++++++++--
 .../datastore/example_datastore_export_import.py   | 114 ---------------------
 .../cloud/datastore/example_datastore_query.py     |   6 ++
 .../cloud/datastore/example_datastore_rollback.py  |   6 ++
 .../google/cloud/gcs/example_firestore.py          |  18 +---
 6 files changed, 81 insertions(+), 141 deletions(-)

diff --git a/docs/apache-airflow-providers-google/operators/cloud/datastore.rst 
b/docs/apache-airflow-providers-google/operators/cloud/datastore.rst
index 3b3997bb86..8218e526a1 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/datastore.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/datastore.rst
@@ -38,7 +38,7 @@ Export Entities
 To export entities from Google Cloud Datastore to Cloud Storage use
 
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreExportEntitiesOperator`
 
-.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datastore/example_datastore_commit.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_export_task]
@@ -52,7 +52,7 @@ Import Entities
 To import entities from Cloud Storage to Google Cloud Datastore use
 
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreImportEntitiesOperator`
 
-.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datastore/example_datastore_commit.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_import_task]
@@ -168,7 +168,7 @@ Get operation state
 To get the current state of a long-running operation use
 
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreGetOperationOperator`
 
-.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datastore/example_datastore_commit.py
     :language: python
     :dedent: 4
     :start-after: [START get_operation_state]
@@ -182,7 +182,7 @@ Delete operation
 To delete an operation use
 
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreDeleteOperationOperator`
 
-.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datastore/example_datastore_commit.py
     :language: python
     :dedent: 4
     :start-after: [START delete_operation]
diff --git 
a/tests/system/providers/google/cloud/datastore/example_datastore_commit.py 
b/tests/system/providers/google/cloud/datastore/example_datastore_commit.py
index 2fd122a1c1..0c5cf5cda3 100644
--- a/tests/system/providers/google/cloud/datastore/example_datastore_commit.py
+++ b/tests/system/providers/google/cloud/datastore/example_datastore_commit.py
@@ -25,17 +25,24 @@ from datetime import datetime
 from typing import Any
 
 from airflow import models
+from airflow.models.baseoperator import chain
 from airflow.providers.google.cloud.operators.datastore import (
     CloudDatastoreAllocateIdsOperator,
     CloudDatastoreBeginTransactionOperator,
     CloudDatastoreCommitOperator,
+    CloudDatastoreDeleteOperationOperator,
+    CloudDatastoreExportEntitiesOperator,
+    CloudDatastoreGetOperationOperator,
+    CloudDatastoreImportEntitiesOperator,
 )
+from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.utils.trigger_rule import TriggerRule
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
 DAG_ID = "datastore_commit"
-
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
 # [START how_to_keys_def]
 KEYS = [
     {
@@ -57,12 +64,14 @@ with models.DAG(
     catchup=False,
     tags=["datastore", "example"],
 ) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket", bucket_name=BUCKET_NAME, 
project_id=PROJECT_ID, location="EU"
+    )
     # [START how_to_allocate_ids]
     allocate_ids = CloudDatastoreAllocateIdsOperator(
         task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
     )
     # [END how_to_allocate_ids]
-
     # [START how_to_begin_transaction]
     begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
         task_id="begin_transaction_commit",
@@ -70,7 +79,6 @@ with models.DAG(
         project_id=PROJECT_ID,
     )
     # [END how_to_begin_transaction]
-
     # [START how_to_commit_def]
     COMMIT_BODY = {
         "mode": "TRANSACTIONAL",
@@ -82,15 +90,65 @@ with models.DAG(
                 }
             }
         ],
-        "transaction": begin_transaction_commit.output,
+        "singleUseTransaction": {"readWrite": {}},
     }
     # [END how_to_commit_def]
-
     # [START how_to_commit_task]
     commit_task = CloudDatastoreCommitOperator(task_id="commit_task", 
body=COMMIT_BODY, project_id=PROJECT_ID)
     # [END how_to_commit_task]
+    # [START how_to_export_task]
+    export_task = CloudDatastoreExportEntitiesOperator(
+        task_id="export_task",
+        bucket=BUCKET_NAME,
+        project_id=PROJECT_ID,
+        overwrite_existing=True,
+    )
+    # [END how_to_export_task]
+    # [START how_to_import_task]
+    import_task = CloudDatastoreImportEntitiesOperator(
+        task_id="import_task",
+        bucket="{{ 
task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] 
}}",
+        file="{{ 
'/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:])
 }}",
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_import_task]
+    # [START get_operation_state]
+    get_operation = CloudDatastoreGetOperationOperator(
+        task_id="get_operation", name="{{ 
task_instance.xcom_pull('export_task')['name'] }}"
+    )
+    # [END get_operation_state]
+    # [START delete_operation]
+    delete_export_operation = CloudDatastoreDeleteOperationOperator(
+        task_id="delete_export_operation",
+        name="{{ task_instance.xcom_pull('export_task')['name'] }}",
+    )
+    # [END delete_operation]
+    delete_export_operation.trigger_rule = TriggerRule.ALL_DONE
+    delete_import_operation = CloudDatastoreDeleteOperationOperator(
+        task_id="delete_import_operation",
+        name="{{ task_instance.xcom_pull('import_task')['name'] }}",
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
+    )
+
+    chain(
+        create_bucket,
+        allocate_ids,
+        begin_transaction_commit,
+        commit_task,
+        export_task,
+        import_task,
+        get_operation,
+        [delete_bucket, delete_export_operation, delete_import_operation],
+    )
+
+    from tests.system.utils.watcher import watcher
 
-    allocate_ids >> begin_transaction_commit >> commit_task
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
 
 
 from tests.system.utils import get_test_run  # noqa: E402
diff --git 
a/tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
 
b/tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
deleted file mode 100644
index f134ee14fb..0000000000
--- 
a/tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
+++ /dev/null
@@ -1,114 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-Airflow System Test DAG that verifies Datastore export and import operators.
-"""
-from __future__ import annotations
-
-import os
-from datetime import datetime
-
-from airflow import models
-from airflow.models.baseoperator import chain
-from airflow.providers.google.cloud.operators.datastore import (
-    CloudDatastoreDeleteOperationOperator,
-    CloudDatastoreExportEntitiesOperator,
-    CloudDatastoreGetOperationOperator,
-    CloudDatastoreImportEntitiesOperator,
-)
-from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
-from airflow.utils.trigger_rule import TriggerRule
-
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
-
-DAG_ID = "datastore_export_import"
-BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-
-
-with models.DAG(
-    DAG_ID,
-    schedule="@once",
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=["datastore", "example"],
-) as dag:
-    create_bucket = GCSCreateBucketOperator(
-        task_id="create_bucket", bucket_name=BUCKET_NAME, 
project_id=PROJECT_ID, location="EU"
-    )
-
-    # [START how_to_export_task]
-    export_task = CloudDatastoreExportEntitiesOperator(
-        task_id="export_task",
-        bucket=BUCKET_NAME,
-        project_id=PROJECT_ID,
-        overwrite_existing=True,
-    )
-    # [END how_to_export_task]
-
-    # [START how_to_import_task]
-    import_task = CloudDatastoreImportEntitiesOperator(
-        task_id="import_task",
-        bucket="{{ 
task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] 
}}",
-        file="{{ 
'/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:])
 }}",
-        project_id=PROJECT_ID,
-    )
-    # [END how_to_import_task]
-
-    # [START get_operation_state]
-    get_operation = CloudDatastoreGetOperationOperator(
-        task_id="get_operation", name="{{ 
task_instance.xcom_pull('export_task')['name'] }}"
-    )
-    # [END get_operation_state]
-
-    # [START delete_operation]
-    delete_export_operation = CloudDatastoreDeleteOperationOperator(
-        task_id="delete_export_operation",
-        name="{{ task_instance.xcom_pull('export_task')['name'] }}",
-    )
-    # [END delete_operation]
-    delete_export_operation.trigger_rule = TriggerRule.ALL_DONE
-
-    delete_import_operation = CloudDatastoreDeleteOperationOperator(
-        task_id="delete_import_operation",
-        name="{{ task_instance.xcom_pull('export_task')['name'] }}",
-    )
-
-    delete_bucket = GCSDeleteBucketOperator(
-        task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
-    )
-
-    chain(
-        create_bucket,
-        export_task,
-        import_task,
-        get_operation,
-        [delete_bucket, delete_export_operation, delete_import_operation],
-    )
-
-    from tests.system.utils.watcher import watcher
-
-    # This test needs watcher in order to properly mark success/failure
-    # when "tearDown" task with trigger rule is part of the DAG
-    list(dag.tasks) >> watcher()
-
-
-from tests.system.utils import get_test_run  # noqa: E402
-
-# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
-test_run = get_test_run(dag)
diff --git 
a/tests/system/providers/google/cloud/datastore/example_datastore_query.py 
b/tests/system/providers/google/cloud/datastore/example_datastore_query.py
index a0713e8088..2ac98ac85d 100644
--- a/tests/system/providers/google/cloud/datastore/example_datastore_query.py
+++ b/tests/system/providers/google/cloud/datastore/example_datastore_query.py
@@ -77,6 +77,12 @@ with models.DAG(
 
     allocate_ids >> begin_transaction_query >> run_query
 
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
 
 from tests.system.utils import get_test_run  # noqa: E402
 
diff --git 
a/tests/system/providers/google/cloud/datastore/example_datastore_rollback.py 
b/tests/system/providers/google/cloud/datastore/example_datastore_rollback.py
index e9ffd00885..c8d06cab18 100644
--- 
a/tests/system/providers/google/cloud/datastore/example_datastore_rollback.py
+++ 
b/tests/system/providers/google/cloud/datastore/example_datastore_rollback.py
@@ -60,6 +60,12 @@ with models.DAG(
 
     begin_transaction_to_rollback >> rollback_transaction
 
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
 
 from tests.system.utils import get_test_run  # noqa: E402
 
diff --git a/tests/system/providers/google/cloud/gcs/example_firestore.py 
b/tests/system/providers/google/cloud/gcs/example_firestore.py
index 8c3b95d7d1..ae904cf8dc 100644
--- a/tests/system/providers/google/cloud/gcs/example_firestore.py
+++ b/tests/system/providers/google/cloud/gcs/example_firestore.py
@@ -36,7 +36,6 @@ from airflow.providers.google.cloud.operators.bigquery import 
(
 )
 from airflow.providers.google.cloud.operators.dataflow import 
DataflowTemplatedJobStartOperator
 from airflow.providers.google.cloud.operators.datastore import (
-    CloudDatastoreBeginTransactionOperator,
     CloudDatastoreCommitOperator,
 )
 from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
@@ -74,20 +73,12 @@ with models.DAG(
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket", bucket_name=BUCKET_NAME, 
location=DATASET_LOCATION
     )
-
     create_dataset = BigQueryCreateEmptyDatasetOperator(
         task_id="create_dataset",
         dataset_id=DATASET_NAME,
         location=DATASET_LOCATION,
         project_id=PROJECT_ID,
     )
-
-    begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
-        task_id="begin_transaction_commit",
-        transaction_options={"readWrite": {}},
-        project_id=PROJECT_ID,
-    )
-
     commit_task = CloudDatastoreCommitOperator(
         task_id="commit_task",
         body={
@@ -100,11 +91,10 @@ with models.DAG(
                     }
                 }
             ],
-            "transaction": begin_transaction_commit.output,
+            "singleUseTransaction": {"readWrite": {}},
         },
         project_id=PROJECT_ID,
     )
-
     # [START howto_operator_export_database_to_gcs]
     export_database_to_gcs = CloudFirestoreExportDatabaseOperator(
         task_id="export_database_to_gcs",
@@ -112,7 +102,6 @@ with models.DAG(
         body={"outputUriPrefix": EXPORT_DESTINATION_URL, "collectionIds": 
[EXPORT_COLLECTION_ID]},
     )
     # [END howto_operator_export_database_to_gcs]
-
     # [START howto_operator_create_external_table_multiple_types]
     create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
         task_id="create_external_table",
@@ -131,7 +120,6 @@ with models.DAG(
         },
     )
     # [END howto_operator_create_external_table_multiple_types]
-
     read_data_from_gcs_multiple_types = BigQueryInsertJobOperator(
         task_id="execute_query",
         configuration={
@@ -141,7 +129,6 @@ with models.DAG(
             }
         },
     )
-
     delete_entity = DataflowTemplatedJobStartOperator(
         task_id="delete-entity-firestore",
         project_id=PROJECT_ID,
@@ -158,7 +145,6 @@ with models.DAG(
         append_job_name=False,
         trigger_rule=TriggerRule.ALL_DONE,
     )
-
     delete_dataset = BigQueryDeleteDatasetOperator(
         task_id="delete_dataset",
         dataset_id=DATASET_NAME,
@@ -166,7 +152,6 @@ with models.DAG(
         delete_contents=True,
         trigger_rule=TriggerRule.ALL_DONE,
     )
-
     delete_bucket = GCSDeleteBucketOperator(
         task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
     )
@@ -174,7 +159,6 @@ with models.DAG(
     (
         # TEST SETUP
         [create_bucket, create_dataset]
-        >> begin_transaction_commit
         >> commit_task
         # TEST BODY
         >> export_database_to_gcs

Reply via email to