This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cad983dac9 Fix datastore system tests. (#34215)
cad983dac9 is described below
commit cad983dac9f240a2d0658531235acfd003b64760
Author: Beata Kossakowska <[email protected]>
AuthorDate: Mon Sep 11 12:27:50 2023 +0200
Fix datastore system tests. (#34215)
Co-authored-by: Beata Kossakowska <[email protected]>
---
.../operators/cloud/datastore.rst | 8 +-
.../cloud/datastore/example_datastore_commit.py | 70 +++++++++++--
.../datastore/example_datastore_export_import.py | 114 ---------------------
.../cloud/datastore/example_datastore_query.py | 6 ++
.../cloud/datastore/example_datastore_rollback.py | 6 ++
.../google/cloud/gcs/example_firestore.py | 18 +---
6 files changed, 81 insertions(+), 141 deletions(-)
diff --git a/docs/apache-airflow-providers-google/operators/cloud/datastore.rst
b/docs/apache-airflow-providers-google/operators/cloud/datastore.rst
index 3b3997bb86..8218e526a1 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/datastore.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/datastore.rst
@@ -38,7 +38,7 @@ Export Entities
To export entities from Google Cloud Datastore to Cloud Storage use
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreExportEntitiesOperator`
-.. exampleinclude::
/../../tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/datastore/example_datastore_commit.py
:language: python
:dedent: 4
:start-after: [START how_to_export_task]
@@ -52,7 +52,7 @@ Import Entities
To import entities from Cloud Storage to Google Cloud Datastore use
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreImportEntitiesOperator`
-.. exampleinclude::
/../../tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/datastore/example_datastore_commit.py
:language: python
:dedent: 4
:start-after: [START how_to_import_task]
@@ -168,7 +168,7 @@ Get operation state
To get the current state of a long-running operation use
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreGetOperationOperator`
-.. exampleinclude::
/../../tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/datastore/example_datastore_commit.py
:language: python
:dedent: 4
:start-after: [START get_operation_state]
@@ -182,7 +182,7 @@ Delete operation
To delete an operation use
:class:`~airflow.providers.google.cloud.operators.datastore.CloudDatastoreDeleteOperationOperator`
-.. exampleinclude::
/../../tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/datastore/example_datastore_commit.py
:language: python
:dedent: 4
:start-after: [START delete_operation]
diff --git
a/tests/system/providers/google/cloud/datastore/example_datastore_commit.py
b/tests/system/providers/google/cloud/datastore/example_datastore_commit.py
index 2fd122a1c1..0c5cf5cda3 100644
--- a/tests/system/providers/google/cloud/datastore/example_datastore_commit.py
+++ b/tests/system/providers/google/cloud/datastore/example_datastore_commit.py
@@ -25,17 +25,24 @@ from datetime import datetime
from typing import Any
from airflow import models
+from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.datastore import (
CloudDatastoreAllocateIdsOperator,
CloudDatastoreBeginTransactionOperator,
CloudDatastoreCommitOperator,
+ CloudDatastoreDeleteOperationOperator,
+ CloudDatastoreExportEntitiesOperator,
+ CloudDatastoreGetOperationOperator,
+ CloudDatastoreImportEntitiesOperator,
)
+from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
DAG_ID = "datastore_commit"
-
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
# [START how_to_keys_def]
KEYS = [
{
@@ -57,12 +64,14 @@ with models.DAG(
catchup=False,
tags=["datastore", "example"],
) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket", bucket_name=BUCKET_NAME,
project_id=PROJECT_ID, location="EU"
+ )
# [START how_to_allocate_ids]
allocate_ids = CloudDatastoreAllocateIdsOperator(
task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
)
# [END how_to_allocate_ids]
-
# [START how_to_begin_transaction]
begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
task_id="begin_transaction_commit",
@@ -70,7 +79,6 @@ with models.DAG(
project_id=PROJECT_ID,
)
# [END how_to_begin_transaction]
-
# [START how_to_commit_def]
COMMIT_BODY = {
"mode": "TRANSACTIONAL",
@@ -82,15 +90,65 @@ with models.DAG(
}
}
],
- "transaction": begin_transaction_commit.output,
+ "singleUseTransaction": {"readWrite": {}},
}
# [END how_to_commit_def]
-
# [START how_to_commit_task]
commit_task = CloudDatastoreCommitOperator(task_id="commit_task",
body=COMMIT_BODY, project_id=PROJECT_ID)
# [END how_to_commit_task]
+ # [START how_to_export_task]
+ export_task = CloudDatastoreExportEntitiesOperator(
+ task_id="export_task",
+ bucket=BUCKET_NAME,
+ project_id=PROJECT_ID,
+ overwrite_existing=True,
+ )
+ # [END how_to_export_task]
+ # [START how_to_import_task]
+ import_task = CloudDatastoreImportEntitiesOperator(
+ task_id="import_task",
+ bucket="{{
task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2]
}}",
+ file="{{
'/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:])
}}",
+ project_id=PROJECT_ID,
+ )
+ # [END how_to_import_task]
+ # [START get_operation_state]
+ get_operation = CloudDatastoreGetOperationOperator(
+ task_id="get_operation", name="{{
task_instance.xcom_pull('export_task')['name'] }}"
+ )
+ # [END get_operation_state]
+ # [START delete_operation]
+ delete_export_operation = CloudDatastoreDeleteOperationOperator(
+ task_id="delete_export_operation",
+ name="{{ task_instance.xcom_pull('export_task')['name'] }}",
+ )
+ # [END delete_operation]
+ delete_export_operation.trigger_rule = TriggerRule.ALL_DONE
+ delete_import_operation = CloudDatastoreDeleteOperationOperator(
+ task_id="delete_import_operation",
+ name="{{ task_instance.xcom_pull('import_task')['name'] }}",
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
+ )
+
+ chain(
+ create_bucket,
+ allocate_ids,
+ begin_transaction_commit,
+ commit_task,
+ export_task,
+ import_task,
+ get_operation,
+ [delete_bucket, delete_export_operation, delete_import_operation],
+ )
+
+ from tests.system.utils.watcher import watcher
- allocate_ids >> begin_transaction_commit >> commit_task
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402
diff --git
a/tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
b/tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
deleted file mode 100644
index f134ee14fb..0000000000
---
a/tests/system/providers/google/cloud/datastore/example_datastore_export_import.py
+++ /dev/null
@@ -1,114 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-Airflow System Test DAG that verifies Datastore export and import operators.
-"""
-from __future__ import annotations
-
-import os
-from datetime import datetime
-
-from airflow import models
-from airflow.models.baseoperator import chain
-from airflow.providers.google.cloud.operators.datastore import (
- CloudDatastoreDeleteOperationOperator,
- CloudDatastoreExportEntitiesOperator,
- CloudDatastoreGetOperationOperator,
- CloudDatastoreImportEntitiesOperator,
-)
-from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
-from airflow.utils.trigger_rule import TriggerRule
-
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
-
-DAG_ID = "datastore_export_import"
-BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-
-
-with models.DAG(
- DAG_ID,
- schedule="@once",
- start_date=datetime(2021, 1, 1),
- catchup=False,
- tags=["datastore", "example"],
-) as dag:
- create_bucket = GCSCreateBucketOperator(
- task_id="create_bucket", bucket_name=BUCKET_NAME,
project_id=PROJECT_ID, location="EU"
- )
-
- # [START how_to_export_task]
- export_task = CloudDatastoreExportEntitiesOperator(
- task_id="export_task",
- bucket=BUCKET_NAME,
- project_id=PROJECT_ID,
- overwrite_existing=True,
- )
- # [END how_to_export_task]
-
- # [START how_to_import_task]
- import_task = CloudDatastoreImportEntitiesOperator(
- task_id="import_task",
- bucket="{{
task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2]
}}",
- file="{{
'/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:])
}}",
- project_id=PROJECT_ID,
- )
- # [END how_to_import_task]
-
- # [START get_operation_state]
- get_operation = CloudDatastoreGetOperationOperator(
- task_id="get_operation", name="{{
task_instance.xcom_pull('export_task')['name'] }}"
- )
- # [END get_operation_state]
-
- # [START delete_operation]
- delete_export_operation = CloudDatastoreDeleteOperationOperator(
- task_id="delete_export_operation",
- name="{{ task_instance.xcom_pull('export_task')['name'] }}",
- )
- # [END delete_operation]
- delete_export_operation.trigger_rule = TriggerRule.ALL_DONE
-
- delete_import_operation = CloudDatastoreDeleteOperationOperator(
- task_id="delete_import_operation",
- name="{{ task_instance.xcom_pull('export_task')['name'] }}",
- )
-
- delete_bucket = GCSDeleteBucketOperator(
- task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
- )
-
- chain(
- create_bucket,
- export_task,
- import_task,
- get_operation,
- [delete_bucket, delete_export_operation, delete_import_operation],
- )
-
- from tests.system.utils.watcher import watcher
-
- # This test needs watcher in order to properly mark success/failure
- # when "tearDown" task with trigger rule is part of the DAG
- list(dag.tasks) >> watcher()
-
-
-from tests.system.utils import get_test_run # noqa: E402
-
-# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
-test_run = get_test_run(dag)
diff --git
a/tests/system/providers/google/cloud/datastore/example_datastore_query.py
b/tests/system/providers/google/cloud/datastore/example_datastore_query.py
index a0713e8088..2ac98ac85d 100644
--- a/tests/system/providers/google/cloud/datastore/example_datastore_query.py
+++ b/tests/system/providers/google/cloud/datastore/example_datastore_query.py
@@ -77,6 +77,12 @@ with models.DAG(
allocate_ids >> begin_transaction_query >> run_query
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
from tests.system.utils import get_test_run # noqa: E402
diff --git
a/tests/system/providers/google/cloud/datastore/example_datastore_rollback.py
b/tests/system/providers/google/cloud/datastore/example_datastore_rollback.py
index e9ffd00885..c8d06cab18 100644
---
a/tests/system/providers/google/cloud/datastore/example_datastore_rollback.py
+++
b/tests/system/providers/google/cloud/datastore/example_datastore_rollback.py
@@ -60,6 +60,12 @@ with models.DAG(
begin_transaction_to_rollback >> rollback_transaction
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
from tests.system.utils import get_test_run # noqa: E402
diff --git a/tests/system/providers/google/cloud/gcs/example_firestore.py
b/tests/system/providers/google/cloud/gcs/example_firestore.py
index 8c3b95d7d1..ae904cf8dc 100644
--- a/tests/system/providers/google/cloud/gcs/example_firestore.py
+++ b/tests/system/providers/google/cloud/gcs/example_firestore.py
@@ -36,7 +36,6 @@ from airflow.providers.google.cloud.operators.bigquery import
(
)
from airflow.providers.google.cloud.operators.dataflow import
DataflowTemplatedJobStartOperator
from airflow.providers.google.cloud.operators.datastore import (
- CloudDatastoreBeginTransactionOperator,
CloudDatastoreCommitOperator,
)
from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
@@ -74,20 +73,12 @@ with models.DAG(
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME,
location=DATASET_LOCATION
)
-
create_dataset = BigQueryCreateEmptyDatasetOperator(
task_id="create_dataset",
dataset_id=DATASET_NAME,
location=DATASET_LOCATION,
project_id=PROJECT_ID,
)
-
- begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
- task_id="begin_transaction_commit",
- transaction_options={"readWrite": {}},
- project_id=PROJECT_ID,
- )
-
commit_task = CloudDatastoreCommitOperator(
task_id="commit_task",
body={
@@ -100,11 +91,10 @@ with models.DAG(
}
}
],
- "transaction": begin_transaction_commit.output,
+ "singleUseTransaction": {"readWrite": {}},
},
project_id=PROJECT_ID,
)
-
# [START howto_operator_export_database_to_gcs]
export_database_to_gcs = CloudFirestoreExportDatabaseOperator(
task_id="export_database_to_gcs",
@@ -112,7 +102,6 @@ with models.DAG(
body={"outputUriPrefix": EXPORT_DESTINATION_URL, "collectionIds":
[EXPORT_COLLECTION_ID]},
)
# [END howto_operator_export_database_to_gcs]
-
# [START howto_operator_create_external_table_multiple_types]
create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
task_id="create_external_table",
@@ -131,7 +120,6 @@ with models.DAG(
},
)
# [END howto_operator_create_external_table_multiple_types]
-
read_data_from_gcs_multiple_types = BigQueryInsertJobOperator(
task_id="execute_query",
configuration={
@@ -141,7 +129,6 @@ with models.DAG(
}
},
)
-
delete_entity = DataflowTemplatedJobStartOperator(
task_id="delete-entity-firestore",
project_id=PROJECT_ID,
@@ -158,7 +145,6 @@ with models.DAG(
append_job_name=False,
trigger_rule=TriggerRule.ALL_DONE,
)
-
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset",
dataset_id=DATASET_NAME,
@@ -166,7 +152,6 @@ with models.DAG(
delete_contents=True,
trigger_rule=TriggerRule.ALL_DONE,
)
-
delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
)
@@ -174,7 +159,6 @@ with models.DAG(
(
# TEST SETUP
[create_bucket, create_dataset]
- >> begin_transaction_commit
>> commit_task
# TEST BODY
>> export_database_to_gcs