This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 76dc7375b2 Migrate Datastore system tests to new design (AIP-47)
76dc7375b2 is described below
commit 76dc7375b27976968d37143f7e6dfab1049665f6
Author: Bartlomiej Hirsz <[email protected]>
AuthorDate: Wed Apr 20 06:29:25 2022 +0000
Migrate Datastore system tests to new design (AIP-47)
Change-Id: Ibc6f0a03a0c6fb374de85d74a7ac62cf6fa55bec
---
.../google/cloud/example_dags/example_datastore.py | 185 ---------------------
.../operators/cloud/datastore.rst | 26 +--
.../google/datastore/example_datastore_commit.py | 99 +++++++++++
.../datastore/example_datastore_export_import.py | 114 +++++++++++++
.../google/datastore/example_datastore_query.py | 84 ++++++++++
.../google/datastore/example_datastore_rollback.py | 67 ++++++++
6 files changed, 377 insertions(+), 198 deletions(-)
diff --git a/airflow/providers/google/cloud/example_dags/example_datastore.py
b/airflow/providers/google/cloud/example_dags/example_datastore.py
deleted file mode 100644
index d55d7d3c08..0000000000
--- a/airflow/providers/google/cloud/example_dags/example_datastore.py
+++ /dev/null
@@ -1,185 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-Example Airflow DAG that shows how to use Datastore operators.
-
-This example requires that your project contains Datastore instance.
-"""
-
-import os
-from datetime import datetime
-from typing import Any, Dict
-
-from airflow import models
-from airflow.providers.google.cloud.operators.datastore import (
- CloudDatastoreAllocateIdsOperator,
- CloudDatastoreBeginTransactionOperator,
- CloudDatastoreCommitOperator,
- CloudDatastoreDeleteOperationOperator,
- CloudDatastoreExportEntitiesOperator,
- CloudDatastoreGetOperationOperator,
- CloudDatastoreImportEntitiesOperator,
- CloudDatastoreRollbackOperator,
- CloudDatastoreRunQueryOperator,
-)
-
-START_DATE = datetime(2021, 1, 1)
-
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
-BUCKET = os.environ.get("GCP_DATASTORE_BUCKET", "datastore-system-test")
-
-with models.DAG(
- "example_gcp_datastore",
- schedule_interval='@once', # Override to match your needs
- start_date=START_DATE,
- catchup=False,
- tags=["example"],
-) as dag:
- # [START how_to_export_task]
- export_task = CloudDatastoreExportEntitiesOperator(
- task_id="export_task",
- bucket=BUCKET,
- project_id=GCP_PROJECT_ID,
- overwrite_existing=True,
- )
- # [END how_to_export_task]
-
- # [START how_to_import_task]
- import_task = CloudDatastoreImportEntitiesOperator(
- task_id="import_task",
- bucket="{{
task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2]
}}",
- file="{{
'/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:])
}}",
- project_id=GCP_PROJECT_ID,
- )
- # [END how_to_import_task]
-
- export_task >> import_task
-
-# [START how_to_keys_def]
-KEYS = [
- {
- "partitionId": {"projectId": GCP_PROJECT_ID, "namespaceId": ""},
- "path": {"kind": "airflow"},
- }
-]
-# [END how_to_keys_def]
-
-# [START how_to_transaction_def]
-TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}}
-# [END how_to_transaction_def]
-
-
-with models.DAG(
- "example_gcp_datastore_operations",
- schedule_interval='@once', # Override to match your needs
- start_date=START_DATE,
- catchup=False,
- tags=["example"],
-) as dag2:
- # [START how_to_allocate_ids]
- allocate_ids = CloudDatastoreAllocateIdsOperator(
- task_id="allocate_ids", partial_keys=KEYS, project_id=GCP_PROJECT_ID
- )
- # [END how_to_allocate_ids]
-
- # [START how_to_begin_transaction]
- begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
- task_id="begin_transaction_commit",
- transaction_options=TRANSACTION_OPTIONS,
- project_id=GCP_PROJECT_ID,
- )
- # [END how_to_begin_transaction]
-
- # [START how_to_commit_def]
- COMMIT_BODY = {
- "mode": "TRANSACTIONAL",
- "mutations": [
- {
- "insert": {
- "key": KEYS[0],
- "properties": {"string": {"stringValue": "airflow is
awesome!"}},
- }
- }
- ],
- "transaction": begin_transaction_commit.output,
- }
- # [END how_to_commit_def]
-
- # [START how_to_commit_task]
- commit_task = CloudDatastoreCommitOperator(
- task_id="commit_task", body=COMMIT_BODY, project_id=GCP_PROJECT_ID
- )
- # [END how_to_commit_task]
-
- allocate_ids >> begin_transaction_commit
-
- begin_transaction_query = CloudDatastoreBeginTransactionOperator(
- task_id="begin_transaction_query",
- transaction_options=TRANSACTION_OPTIONS,
- project_id=GCP_PROJECT_ID,
- )
-
- # [START how_to_query_def]
- QUERY = {
- "partitionId": {"projectId": GCP_PROJECT_ID, "namespaceId": "query"},
- "readOptions": {"transaction": begin_transaction_query.output},
- "query": {},
- }
- # [END how_to_query_def]
-
- # [START how_to_run_query]
- run_query = CloudDatastoreRunQueryOperator(task_id="run_query",
body=QUERY, project_id=GCP_PROJECT_ID)
- # [END how_to_run_query]
-
- allocate_ids >> begin_transaction_query
-
- begin_transaction_to_rollback = CloudDatastoreBeginTransactionOperator(
- task_id="begin_transaction_to_rollback",
- transaction_options=TRANSACTION_OPTIONS,
- project_id=GCP_PROJECT_ID,
- )
-
- # [START how_to_rollback_transaction]
- rollback_transaction = CloudDatastoreRollbackOperator(
- task_id="rollback_transaction",
- transaction=begin_transaction_to_rollback.output,
- )
- # [END how_to_rollback_transaction]
-
- # Task dependencies created via `XComArgs`:
- # begin_transaction_commit >> commit_task
- # begin_transaction_to_rollback >> rollback_transaction
- # begin_transaction_query >> run_query
-
- OPERATION_NAME = 'operations/example-operation-unique-id'
- # [START get_operation_state]
- get_operation = CloudDatastoreGetOperationOperator(
- task_id='get_operation',
- name=OPERATION_NAME,
- gcp_conn_id='google_cloud_default',
- )
- # [END get_operation_state]
-
- # [START delete_operation]
- delete_operation = CloudDatastoreDeleteOperationOperator(
- task_id='delete_operation',
- name=OPERATION_NAME,
- gcp_conn_id='google_cloud_default',
- )
- # [END delete_operation]
diff --git a/docs/apache-airflow-providers-google/operators/cloud/datastore.rst
b/docs/apache-airflow-providers-google/operators/cloud/datastore.rst
index 7068d1c7a3..4a8e623d6e 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/datastore.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/datastore.rst
@@ -38,7 +38,7 @@ Export Entities
To export entities from Google Cloud Datastore to Cloud Storage use
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreExportEntitiesOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude::
/../../tests/system/providers/google/datastore/example_datastore_export_import.py
:language: python
:dedent: 4
:start-after: [START how_to_export_task]
@@ -52,7 +52,7 @@ Import Entities
To import entities from Cloud Storage to Google Cloud Datastore use
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreImportEntitiesOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude::
/../../tests/system/providers/google/datastore/example_datastore_export_import.py
:language: python
:dedent: 4
:start-after: [START how_to_import_task]
@@ -66,7 +66,7 @@ Allocate Ids
To allocate IDs for incomplete keys use
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreAllocateIdsOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude::
/../../tests/system/providers/google/datastore/example_datastore_commit.py
:language: python
:dedent: 4
:start-after: [START how_to_allocate_ids]
@@ -74,7 +74,7 @@ To allocate IDs for incomplete keys use
An example of a partial keys required by the operator:
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude::
/../../tests/system/providers/google/datastore/example_datastore_commit.py
:language: python
:dedent: 0
:start-after: [START how_to_keys_def]
@@ -88,7 +88,7 @@ Begin transaction
To begin a new transaction use
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreBeginTransactionOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude::
/../../tests/system/providers/google/datastore/example_datastore_commit.py
:language: python
:dedent: 4
:start-after: [START how_to_begin_transaction]
@@ -96,7 +96,7 @@ To begin a new transaction use
An example of a transaction options required by the operator:
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude::
/../../tests/system/providers/google/datastore/example_datastore_commit.py
:language: python
:dedent: 0
:start-after: [START how_to_transaction_def]
@@ -110,7 +110,7 @@ Commit transaction
To commit a transaction, optionally creating, deleting or modifying some
entities
use
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreCommitOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude::
/../../tests/system/providers/google/datastore/example_datastore_commit.py
:language: python
:dedent: 4
:start-after: [START how_to_commit_task]
@@ -118,7 +118,7 @@ use
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreCo
An example of a commit information required by the operator:
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude::
/../../tests/system/providers/google/datastore/example_datastore_commit.py
:language: python
:dedent: 0
:start-after: [START how_to_commit_def]
@@ -132,7 +132,7 @@ Run query
To run a query for entities use
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreRunQueryOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude::
/../../tests/system/providers/google/datastore/example_datastore_query.py
:language: python
:dedent: 4
:start-after: [START how_to_run_query]
@@ -140,7 +140,7 @@ To run a query for entities use
An example of a query required by the operator:
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude::
/../../tests/system/providers/google/datastore/example_datastore_query.py
:language: python
:dedent: 0
:start-after: [START how_to_query_def]
@@ -154,7 +154,7 @@ Roll back transaction
To roll back a transaction
use
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreRollbackOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude::
/../../tests/system/providers/google/datastore/example_datastore_rollback.py
:language: python
:dedent: 4
:start-after: [START how_to_rollback_transaction]
@@ -168,7 +168,7 @@ Get operation state
To get the current state of a long-running operation use
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreGetOperationOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude::
/../../tests/system/providers/google/datastore/example_datastore_export_import.py
:language: python
:dedent: 4
:start-after: [START get_operation_state]
@@ -182,7 +182,7 @@ Delete operation
To delete an operation use
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreDeleteOperationOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_datastore.py
+.. exampleinclude::
/../../tests/system/providers/google/datastore/example_datastore_export_import.py
:language: python
:dedent: 4
:start-after: [START delete_operation]
diff --git
a/tests/system/providers/google/datastore/example_datastore_commit.py
b/tests/system/providers/google/datastore/example_datastore_commit.py
new file mode 100644
index 0000000000..c00847fa2c
--- /dev/null
+++ b/tests/system/providers/google/datastore/example_datastore_commit.py
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Airflow System Test DAG that verifies Datastore commit operators.
+"""
+
+import os
+from datetime import datetime
+from typing import Any, Dict
+
+from airflow import models
+from airflow.providers.google.cloud.operators.datastore import (
+ CloudDatastoreAllocateIdsOperator,
+ CloudDatastoreBeginTransactionOperator,
+ CloudDatastoreCommitOperator,
+)
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "datastore_commit"
+
+# [START how_to_keys_def]
+KEYS = [
+ {
+ "partitionId": {"projectId": PROJECT_ID, "namespaceId": ""},
+ "path": {"kind": "airflow"},
+ }
+]
+# [END how_to_keys_def]
+
+# [START how_to_transaction_def]
+TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}}
+# [END how_to_transaction_def]
+
+
+with models.DAG(
+ DAG_ID,
+ schedule_interval='@once',
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["datastore", "example"],
+) as dag:
+ # [START how_to_allocate_ids]
+ allocate_ids = CloudDatastoreAllocateIdsOperator(
+ task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
+ )
+ # [END how_to_allocate_ids]
+
+ # [START how_to_begin_transaction]
+ begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
+ task_id="begin_transaction_commit",
+ transaction_options=TRANSACTION_OPTIONS,
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_begin_transaction]
+
+ # [START how_to_commit_def]
+ COMMIT_BODY = {
+ "mode": "TRANSACTIONAL",
+ "mutations": [
+ {
+ "insert": {
+ "key": KEYS[0],
+ "properties": {"string": {"stringValue": "airflow is
awesome!"}},
+ }
+ }
+ ],
+ "transaction": begin_transaction_commit.output,
+ }
+ # [END how_to_commit_def]
+
+ # [START how_to_commit_task]
+ commit_task = CloudDatastoreCommitOperator(task_id="commit_task",
body=COMMIT_BODY, project_id=PROJECT_ID)
+ # [END how_to_commit_task]
+
+ allocate_ids >> begin_transaction_commit >> commit_task
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git
a/tests/system/providers/google/datastore/example_datastore_export_import.py
b/tests/system/providers/google/datastore/example_datastore_export_import.py
new file mode 100644
index 0000000000..ab791b8c3b
--- /dev/null
+++ b/tests/system/providers/google/datastore/example_datastore_export_import.py
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Airflow System Test DAG that verifies Datastore export and import operators.
+"""
+
+import os
+from datetime import datetime
+
+from airflow import models
+from airflow.models.baseoperator import chain
+from airflow.providers.google.cloud.operators.datastore import (
+ CloudDatastoreDeleteOperationOperator,
+ CloudDatastoreExportEntitiesOperator,
+ CloudDatastoreGetOperationOperator,
+ CloudDatastoreImportEntitiesOperator,
+)
+from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "datastore_export_import"
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+
+
+with models.DAG(
+ DAG_ID,
+ schedule_interval='@once',
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["datastore", "example"],
+) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket", bucket_name=BUCKET_NAME,
project_id=PROJECT_ID, location="EU"
+ )
+
+ # [START how_to_export_task]
+ export_task = CloudDatastoreExportEntitiesOperator(
+ task_id="export_task",
+ bucket=BUCKET_NAME,
+ project_id=PROJECT_ID,
+ overwrite_existing=True,
+ )
+ # [END how_to_export_task]
+
+ # [START how_to_import_task]
+ import_task = CloudDatastoreImportEntitiesOperator(
+ task_id="import_task",
+ bucket="{{
task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2]
}}",
+ file="{{
'/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:])
}}",
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_import_task]
+
+ # [START get_operation_state]
+ get_operation = CloudDatastoreGetOperationOperator(
+ task_id='get_operation', name="{{
task_instance.xcom_pull('export_task')['name'] }}"
+ )
+ # [END get_operation_state]
+
+ # [START delete_operation]
+ delete_export_operation = CloudDatastoreDeleteOperationOperator(
+ task_id='delete_export_operation',
+ name="{{ task_instance.xcom_pull('export_task')['name'] }}",
+ )
+ # [END delete_operation]
+ delete_export_operation.trigger_rule = TriggerRule.ALL_DONE
+
+ delete_import_operation = CloudDatastoreDeleteOperationOperator(
+ task_id='delete_import_operation',
+ name="{{ task_instance.xcom_pull('export_task')['name'] }}",
+ )
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
+ )
+
+ chain(
+ create_bucket,
+ export_task,
+ import_task,
+ get_operation,
+ [delete_bucket, delete_export_operation, delete_import_operation],
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/datastore/example_datastore_query.py
b/tests/system/providers/google/datastore/example_datastore_query.py
new file mode 100644
index 0000000000..d42255034f
--- /dev/null
+++ b/tests/system/providers/google/datastore/example_datastore_query.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Airflow System Test DAG that verifies Datastore query operators.
+"""
+
+import os
+from datetime import datetime
+from typing import Any, Dict
+
+from airflow import models
+from airflow.providers.google.cloud.operators.datastore import (
+ CloudDatastoreAllocateIdsOperator,
+ CloudDatastoreBeginTransactionOperator,
+ CloudDatastoreRunQueryOperator,
+)
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "datastore_query"
+
+KEYS = [
+ {
+ "partitionId": {"projectId": PROJECT_ID, "namespaceId": ""},
+ "path": {"kind": "airflow"},
+ }
+]
+
+TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}}
+
+
+with models.DAG(
+ DAG_ID,
+ schedule_interval='@once',
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["datastore", "example"],
+) as dag:
+ allocate_ids = CloudDatastoreAllocateIdsOperator(
+ task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
+ )
+
+ begin_transaction_query = CloudDatastoreBeginTransactionOperator(
+ task_id="begin_transaction_query",
+ transaction_options=TRANSACTION_OPTIONS,
+ project_id=PROJECT_ID,
+ )
+
+ # [START how_to_query_def]
+ QUERY = {
+ "partitionId": {"projectId": PROJECT_ID, "namespaceId": "query"},
+ "readOptions": {"transaction": begin_transaction_query.output},
+ "query": {},
+ }
+ # [END how_to_query_def]
+
+ # [START how_to_run_query]
+ run_query = CloudDatastoreRunQueryOperator(task_id="run_query",
body=QUERY, project_id=PROJECT_ID)
+ # [END how_to_run_query]
+
+ allocate_ids >> begin_transaction_query >> run_query
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git
a/tests/system/providers/google/datastore/example_datastore_rollback.py
b/tests/system/providers/google/datastore/example_datastore_rollback.py
new file mode 100644
index 0000000000..2fc102f6bb
--- /dev/null
+++ b/tests/system/providers/google/datastore/example_datastore_rollback.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Airflow System Test DAG that verifies Datastore rollback operators.
+"""
+
+import os
+from datetime import datetime
+from typing import Any, Dict
+
+from airflow import models
+from airflow.providers.google.cloud.operators.datastore import (
+ CloudDatastoreBeginTransactionOperator,
+ CloudDatastoreRollbackOperator,
+)
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "datastore_rollback"
+
+TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}}
+
+
+with models.DAG(
+ DAG_ID,
+ schedule_interval='@once',
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["datastore", "example"],
+) as dag:
+ begin_transaction_to_rollback = CloudDatastoreBeginTransactionOperator(
+ task_id="begin_transaction_to_rollback",
+ transaction_options=TRANSACTION_OPTIONS,
+ project_id=PROJECT_ID,
+ )
+
+ # [START how_to_rollback_transaction]
+ rollback_transaction = CloudDatastoreRollbackOperator(
+ task_id="rollback_transaction",
+ transaction=begin_transaction_to_rollback.output,
+ )
+ # [END how_to_rollback_transaction]
+
+ begin_transaction_to_rollback >> rollback_transaction
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)