This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 76dc7375b2 Migrate Datastore system tests to new design (AIP-47)
76dc7375b2 is described below

commit 76dc7375b27976968d37143f7e6dfab1049665f6
Author: Bartlomiej Hirsz <[email protected]>
AuthorDate: Wed Apr 20 06:29:25 2022 +0000

    Migrate Datastore system tests to new design (AIP-47)
    
    Change-Id: Ibc6f0a03a0c6fb374de85d74a7ac62cf6fa55bec
---
 .../google/cloud/example_dags/example_datastore.py | 185 ---------------------
 .../operators/cloud/datastore.rst                  |  26 +--
 .../google/datastore/example_datastore_commit.py   |  99 +++++++++++
 .../datastore/example_datastore_export_import.py   | 114 +++++++++++++
 .../google/datastore/example_datastore_query.py    |  84 ++++++++++
 .../google/datastore/example_datastore_rollback.py |  67 ++++++++
 6 files changed, 377 insertions(+), 198 deletions(-)

diff --git a/airflow/providers/google/cloud/example_dags/example_datastore.py 
b/airflow/providers/google/cloud/example_dags/example_datastore.py
deleted file mode 100644
index d55d7d3c08..0000000000
--- a/airflow/providers/google/cloud/example_dags/example_datastore.py
+++ /dev/null
@@ -1,185 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-Example Airflow DAG that shows how to use Datastore operators.
-
-This example requires that your project contains Datastore instance.
-"""
-
-import os
-from datetime import datetime
-from typing import Any, Dict
-
-from airflow import models
-from airflow.providers.google.cloud.operators.datastore import (
-    CloudDatastoreAllocateIdsOperator,
-    CloudDatastoreBeginTransactionOperator,
-    CloudDatastoreCommitOperator,
-    CloudDatastoreDeleteOperationOperator,
-    CloudDatastoreExportEntitiesOperator,
-    CloudDatastoreGetOperationOperator,
-    CloudDatastoreImportEntitiesOperator,
-    CloudDatastoreRollbackOperator,
-    CloudDatastoreRunQueryOperator,
-)
-
-START_DATE = datetime(2021, 1, 1)
-
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
-BUCKET = os.environ.get("GCP_DATASTORE_BUCKET", "datastore-system-test")
-
-with models.DAG(
-    "example_gcp_datastore",
-    schedule_interval='@once',  # Override to match your needs
-    start_date=START_DATE,
-    catchup=False,
-    tags=["example"],
-) as dag:
-    # [START how_to_export_task]
-    export_task = CloudDatastoreExportEntitiesOperator(
-        task_id="export_task",
-        bucket=BUCKET,
-        project_id=GCP_PROJECT_ID,
-        overwrite_existing=True,
-    )
-    # [END how_to_export_task]
-
-    # [START how_to_import_task]
-    import_task = CloudDatastoreImportEntitiesOperator(
-        task_id="import_task",
-        bucket="{{ 
task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] 
}}",
-        file="{{ 
'/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:])
 }}",
-        project_id=GCP_PROJECT_ID,
-    )
-    # [END how_to_import_task]
-
-    export_task >> import_task
-
-# [START how_to_keys_def]
-KEYS = [
-    {
-        "partitionId": {"projectId": GCP_PROJECT_ID, "namespaceId": ""},
-        "path": {"kind": "airflow"},
-    }
-]
-# [END how_to_keys_def]
-
-# [START how_to_transaction_def]
-TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}}
-# [END how_to_transaction_def]
-
-
-with models.DAG(
-    "example_gcp_datastore_operations",
-    schedule_interval='@once',  # Override to match your needs
-    start_date=START_DATE,
-    catchup=False,
-    tags=["example"],
-) as dag2:
-    # [START how_to_allocate_ids]
-    allocate_ids = CloudDatastoreAllocateIdsOperator(
-        task_id="allocate_ids", partial_keys=KEYS, project_id=GCP_PROJECT_ID
-    )
-    # [END how_to_allocate_ids]
-
-    # [START how_to_begin_transaction]
-    begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
-        task_id="begin_transaction_commit",
-        transaction_options=TRANSACTION_OPTIONS,
-        project_id=GCP_PROJECT_ID,
-    )
-    # [END how_to_begin_transaction]
-
-    # [START how_to_commit_def]
-    COMMIT_BODY = {
-        "mode": "TRANSACTIONAL",
-        "mutations": [
-            {
-                "insert": {
-                    "key": KEYS[0],
-                    "properties": {"string": {"stringValue": "airflow is 
awesome!"}},
-                }
-            }
-        ],
-        "transaction": begin_transaction_commit.output,
-    }
-    # [END how_to_commit_def]
-
-    # [START how_to_commit_task]
-    commit_task = CloudDatastoreCommitOperator(
-        task_id="commit_task", body=COMMIT_BODY, project_id=GCP_PROJECT_ID
-    )
-    # [END how_to_commit_task]
-
-    allocate_ids >> begin_transaction_commit
-
-    begin_transaction_query = CloudDatastoreBeginTransactionOperator(
-        task_id="begin_transaction_query",
-        transaction_options=TRANSACTION_OPTIONS,
-        project_id=GCP_PROJECT_ID,
-    )
-
-    # [START how_to_query_def]
-    QUERY = {
-        "partitionId": {"projectId": GCP_PROJECT_ID, "namespaceId": "query"},
-        "readOptions": {"transaction": begin_transaction_query.output},
-        "query": {},
-    }
-    # [END how_to_query_def]
-
-    # [START how_to_run_query]
-    run_query = CloudDatastoreRunQueryOperator(task_id="run_query", 
body=QUERY, project_id=GCP_PROJECT_ID)
-    # [END how_to_run_query]
-
-    allocate_ids >> begin_transaction_query
-
-    begin_transaction_to_rollback = CloudDatastoreBeginTransactionOperator(
-        task_id="begin_transaction_to_rollback",
-        transaction_options=TRANSACTION_OPTIONS,
-        project_id=GCP_PROJECT_ID,
-    )
-
-    # [START how_to_rollback_transaction]
-    rollback_transaction = CloudDatastoreRollbackOperator(
-        task_id="rollback_transaction",
-        transaction=begin_transaction_to_rollback.output,
-    )
-    # [END how_to_rollback_transaction]
-
-    # Task dependencies created via `XComArgs`:
-    #   begin_transaction_commit >> commit_task
-    #   begin_transaction_to_rollback >> rollback_transaction
-    #   begin_transaction_query >> run_query
-
-    OPERATION_NAME = 'operations/example-operation-unique-id'
-    # [START get_operation_state]
-    get_operation = CloudDatastoreGetOperationOperator(
-        task_id='get_operation',
-        name=OPERATION_NAME,
-        gcp_conn_id='google_cloud_default',
-    )
-    # [END get_operation_state]
-
-    # [START delete_operation]
-    delete_operation = CloudDatastoreDeleteOperationOperator(
-        task_id='delete_operation',
-        name=OPERATION_NAME,
-        gcp_conn_id='google_cloud_default',
-    )
-    # [END delete_operation]
diff --git a/docs/apache-airflow-providers-google/operators/cloud/datastore.rst 
b/docs/apache-airflow-providers-google/operators/cloud/datastore.rst
index 7068d1c7a3..4a8e623d6e 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/datastore.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/datastore.rst
@@ -38,7 +38,7 @@ Export Entities
 To export entities from Google Cloud Datastore to Cloud Storage use
 
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreExportEntitiesOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/datastore/example_datastore_export_import.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_export_task]
@@ -52,7 +52,7 @@ Import Entities
 To import entities from Cloud Storage to Google Cloud Datastore use
 
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreImportEntitiesOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/datastore/example_datastore_export_import.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_import_task]
@@ -66,7 +66,7 @@ Allocate Ids
 To allocate IDs for incomplete keys use
 
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreAllocateIdsOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/datastore/example_datastore_commit.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_allocate_ids]
@@ -74,7 +74,7 @@ To allocate IDs for incomplete keys use
 
 An example of a partial keys required by the operator:
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/datastore/example_datastore_commit.py
     :language: python
     :dedent: 0
     :start-after: [START how_to_keys_def]
@@ -88,7 +88,7 @@ Begin transaction
 To begin a new transaction use
 
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreBeginTransactionOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/datastore/example_datastore_commit.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_begin_transaction]
@@ -96,7 +96,7 @@ To begin a new transaction use
 
 An example of a transaction options required by the operator:
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/datastore/example_datastore_commit.py
     :language: python
     :dedent: 0
     :start-after: [START how_to_transaction_def]
@@ -110,7 +110,7 @@ Commit transaction
 To commit a transaction, optionally creating, deleting or modifying some 
entities
 use 
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreCommitOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/datastore/example_datastore_commit.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_commit_task]
@@ -118,7 +118,7 @@ use 
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreCo
 
 An example of a commit information required by the operator:
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/datastore/example_datastore_commit.py
     :language: python
     :dedent: 0
     :start-after: [START how_to_commit_def]
@@ -132,7 +132,7 @@ Run query
 To run a query for entities use
 
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreRunQueryOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/datastore/example_datastore_query.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_run_query]
@@ -140,7 +140,7 @@ To run a query for entities use
 
 An example of a query required by the operator:
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/datastore/example_datastore_query.py
     :language: python
     :dedent: 0
     :start-after: [START how_to_query_def]
@@ -154,7 +154,7 @@ Roll back transaction
 To roll back a transaction
 use 
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreRollbackOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/datastore/example_datastore_rollback.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_rollback_transaction]
@@ -168,7 +168,7 @@ Get operation state
 To get the current state of a long-running operation use
 
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreGetOperationOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/datastore/example_datastore_export_import.py
     :language: python
     :dedent: 4
     :start-after: [START get_operation_state]
@@ -182,7 +182,7 @@ Delete operation
 To delete an operation use
 
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreDeleteOperationOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/datastore/example_datastore_export_import.py
     :language: python
     :dedent: 4
     :start-after: [START delete_operation]
diff --git 
a/tests/system/providers/google/datastore/example_datastore_commit.py 
b/tests/system/providers/google/datastore/example_datastore_commit.py
new file mode 100644
index 0000000000..c00847fa2c
--- /dev/null
+++ b/tests/system/providers/google/datastore/example_datastore_commit.py
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Airflow System Test DAG that verifies Datastore commit operators.
+"""
+
+import os
+from datetime import datetime
+from typing import Any, Dict
+
+from airflow import models
+from airflow.providers.google.cloud.operators.datastore import (
+    CloudDatastoreAllocateIdsOperator,
+    CloudDatastoreBeginTransactionOperator,
+    CloudDatastoreCommitOperator,
+)
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "datastore_commit"
+
+# [START how_to_keys_def]
+KEYS = [
+    {
+        "partitionId": {"projectId": PROJECT_ID, "namespaceId": ""},
+        "path": {"kind": "airflow"},
+    }
+]
+# [END how_to_keys_def]
+
+# [START how_to_transaction_def]
+TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}}
+# [END how_to_transaction_def]
+
+
+with models.DAG(
+    DAG_ID,
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["datastore", "example"],
+) as dag:
+    # [START how_to_allocate_ids]
+    allocate_ids = CloudDatastoreAllocateIdsOperator(
+        task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
+    )
+    # [END how_to_allocate_ids]
+
+    # [START how_to_begin_transaction]
+    begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
+        task_id="begin_transaction_commit",
+        transaction_options=TRANSACTION_OPTIONS,
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_begin_transaction]
+
+    # [START how_to_commit_def]
+    COMMIT_BODY = {
+        "mode": "TRANSACTIONAL",
+        "mutations": [
+            {
+                "insert": {
+                    "key": KEYS[0],
+                    "properties": {"string": {"stringValue": "airflow is 
awesome!"}},
+                }
+            }
+        ],
+        "transaction": begin_transaction_commit.output,
+    }
+    # [END how_to_commit_def]
+
+    # [START how_to_commit_task]
+    commit_task = CloudDatastoreCommitOperator(task_id="commit_task", 
body=COMMIT_BODY, project_id=PROJECT_ID)
+    # [END how_to_commit_task]
+
+    allocate_ids >> begin_transaction_commit >> commit_task
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git 
a/tests/system/providers/google/datastore/example_datastore_export_import.py 
b/tests/system/providers/google/datastore/example_datastore_export_import.py
new file mode 100644
index 0000000000..ab791b8c3b
--- /dev/null
+++ b/tests/system/providers/google/datastore/example_datastore_export_import.py
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Airflow System Test DAG that verifies Datastore export and import operators.
+"""
+
+import os
+from datetime import datetime
+
+from airflow import models
+from airflow.models.baseoperator import chain
+from airflow.providers.google.cloud.operators.datastore import (
+    CloudDatastoreDeleteOperationOperator,
+    CloudDatastoreExportEntitiesOperator,
+    CloudDatastoreGetOperationOperator,
+    CloudDatastoreImportEntitiesOperator,
+)
+from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "datastore_export_import"
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+
+
+with models.DAG(
+    DAG_ID,
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["datastore", "example"],
+) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket", bucket_name=BUCKET_NAME, 
project_id=PROJECT_ID, location="EU"
+    )
+
+    # [START how_to_export_task]
+    export_task = CloudDatastoreExportEntitiesOperator(
+        task_id="export_task",
+        bucket=BUCKET_NAME,
+        project_id=PROJECT_ID,
+        overwrite_existing=True,
+    )
+    # [END how_to_export_task]
+
+    # [START how_to_import_task]
+    import_task = CloudDatastoreImportEntitiesOperator(
+        task_id="import_task",
+        bucket="{{ 
task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] 
}}",
+        file="{{ 
'/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:])
 }}",
+        project_id=PROJECT_ID,
+    )
+    # [END how_to_import_task]
+
+    # [START get_operation_state]
+    get_operation = CloudDatastoreGetOperationOperator(
+        task_id='get_operation', name="{{ 
task_instance.xcom_pull('export_task')['name'] }}"
+    )
+    # [END get_operation_state]
+
+    # [START delete_operation]
+    delete_export_operation = CloudDatastoreDeleteOperationOperator(
+        task_id='delete_export_operation',
+        name="{{ task_instance.xcom_pull('export_task')['name'] }}",
+    )
+    # [END delete_operation]
+    delete_export_operation.trigger_rule = TriggerRule.ALL_DONE
+
+    delete_import_operation = CloudDatastoreDeleteOperationOperator(
+        task_id='delete_import_operation',
+        name="{{ task_instance.xcom_pull('export_task')['name'] }}",
+    )
+
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
+    )
+
+    chain(
+        create_bucket,
+        export_task,
+        import_task,
+        get_operation,
+        [delete_bucket, delete_export_operation, delete_import_operation],
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/datastore/example_datastore_query.py 
b/tests/system/providers/google/datastore/example_datastore_query.py
new file mode 100644
index 0000000000..d42255034f
--- /dev/null
+++ b/tests/system/providers/google/datastore/example_datastore_query.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Airflow System Test DAG that verifies Datastore query operators.
+"""
+
+import os
+from datetime import datetime
+from typing import Any, Dict
+
+from airflow import models
+from airflow.providers.google.cloud.operators.datastore import (
+    CloudDatastoreAllocateIdsOperator,
+    CloudDatastoreBeginTransactionOperator,
+    CloudDatastoreRunQueryOperator,
+)
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "datastore_query"
+
+KEYS = [
+    {
+        "partitionId": {"projectId": PROJECT_ID, "namespaceId": ""},
+        "path": {"kind": "airflow"},
+    }
+]
+
+TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}}
+
+
+with models.DAG(
+    DAG_ID,
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["datastore", "example"],
+) as dag:
+    allocate_ids = CloudDatastoreAllocateIdsOperator(
+        task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
+    )
+
+    begin_transaction_query = CloudDatastoreBeginTransactionOperator(
+        task_id="begin_transaction_query",
+        transaction_options=TRANSACTION_OPTIONS,
+        project_id=PROJECT_ID,
+    )
+
+    # [START how_to_query_def]
+    QUERY = {
+        "partitionId": {"projectId": PROJECT_ID, "namespaceId": "query"},
+        "readOptions": {"transaction": begin_transaction_query.output},
+        "query": {},
+    }
+    # [END how_to_query_def]
+
+    # [START how_to_run_query]
+    run_query = CloudDatastoreRunQueryOperator(task_id="run_query", 
body=QUERY, project_id=PROJECT_ID)
+    # [END how_to_run_query]
+
+    allocate_ids >> begin_transaction_query >> run_query
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git 
a/tests/system/providers/google/datastore/example_datastore_rollback.py 
b/tests/system/providers/google/datastore/example_datastore_rollback.py
new file mode 100644
index 0000000000..2fc102f6bb
--- /dev/null
+++ b/tests/system/providers/google/datastore/example_datastore_rollback.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Airflow System Test DAG that verifies Datastore rollback operators.
+"""
+
+import os
+from datetime import datetime
+from typing import Any, Dict
+
+from airflow import models
+from airflow.providers.google.cloud.operators.datastore import (
+    CloudDatastoreBeginTransactionOperator,
+    CloudDatastoreRollbackOperator,
+)
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "datastore_rollback"
+
+TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}}
+
+
+with models.DAG(
+    DAG_ID,
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["datastore", "example"],
+) as dag:
+    begin_transaction_to_rollback = CloudDatastoreBeginTransactionOperator(
+        task_id="begin_transaction_to_rollback",
+        transaction_options=TRANSACTION_OPTIONS,
+        project_id=PROJECT_ID,
+    )
+
+    # [START how_to_rollback_transaction]
+    rollback_transaction = CloudDatastoreRollbackOperator(
+        task_id="rollback_transaction",
+        transaction=begin_transaction_to_rollback.output,
+    )
+    # [END how_to_rollback_transaction]
+
+    begin_transaction_to_rollback >> rollback_transaction
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)

Reply via email to