This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3e4ed12907 Fix dataform and datastore system tests (#40295)
3e4ed12907 is described below
commit 3e4ed12907e5b5f9676fabb12cde7e12afbce7e8
Author: VladaZakharova <[email protected]>
AuthorDate: Tue Jun 18 14:25:46 2024 +0200
Fix dataform and datastore system tests (#40295)
---
.../google/cloud/dataform/example_dataform.py | 22 ++++++++++++++--------
.../cloud/datastore/example_datastore_commit.py | 17 +++++++++++++++--
.../cloud/datastore/example_datastore_query.py | 2 +-
.../cloud/datastore/example_datastore_rollback.py | 2 +-
4 files changed, 31 insertions(+), 12 deletions(-)
diff --git a/tests/system/providers/google/cloud/dataform/example_dataform.py
b/tests/system/providers/google/cloud/dataform/example_dataform.py
index fb2f74ab2a..78ea252f5c 100644
--- a/tests/system/providers/google/cloud/dataform/example_dataform.py
+++ b/tests/system/providers/google/cloud/dataform/example_dataform.py
@@ -48,11 +48,12 @@ from airflow.providers.google.cloud.operators.dataform
import (
from airflow.providers.google.cloud.sensors.dataform import
DataformWorkflowInvocationStateSensor
from airflow.providers.google.cloud.utils.dataform import
make_initialization_workspace_flow
from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or
DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
-DAG_ID = "example_dataform"
+DAG_ID = "dataform"
REPOSITORY_ID = f"example_dataform_repository_{ENV_ID}"
REGION = "us-central1"
@@ -281,23 +282,27 @@ with DAG(
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_operator_delete_workspace]
- delete_workspace.trigger_rule = TriggerRule.ALL_DONE
-
# [START howto_operator_delete_repository]
delete_repository = DataformDeleteRepositoryOperator(
task_id="delete-repository",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_operator_delete_repository]
- delete_repository.trigger_rule = TriggerRule.ALL_DONE
-
- (make_repository >> make_workspace >> first_initialization_step)
+ (
+ # TEST SETUP
+ make_repository
+ >> make_workspace
+ # TEST BODY
+ >> first_initialization_step
+ )
(
last_initialization_step
>> install_npm_packages
@@ -312,6 +317,7 @@ with DAG(
>> cancel_workflow_invocation
>> make_test_directory
>> write_test_file
+ # TEST TEARDOWN
>> remove_test_file
>> remove_test_directory
>> delete_dataset
diff --git
a/tests/system/providers/google/cloud/datastore/example_datastore_commit.py
b/tests/system/providers/google/cloud/datastore/example_datastore_commit.py
index 95162fbc0e..4f540c6aec 100644
--- a/tests/system/providers/google/cloud/datastore/example_datastore_commit.py
+++ b/tests/system/providers/google/cloud/datastore/example_datastore_commit.py
@@ -64,16 +64,18 @@ with DAG(
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["datastore", "example"],
+ tags=["example", "datastore"],
) as dag:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME,
project_id=PROJECT_ID, location="EU"
)
+
# [START how_to_allocate_ids]
allocate_ids = CloudDatastoreAllocateIdsOperator(
task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
)
# [END how_to_allocate_ids]
+
# [START how_to_begin_transaction]
begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
task_id="begin_transaction_commit",
@@ -81,6 +83,7 @@ with DAG(
project_id=PROJECT_ID,
)
# [END how_to_begin_transaction]
+
# [START how_to_commit_def]
COMMIT_BODY = {
"mode": "TRANSACTIONAL",
@@ -95,9 +98,11 @@ with DAG(
"singleUseTransaction": {"readWrite": {}},
}
# [END how_to_commit_def]
+
# [START how_to_commit_task]
commit_task = CloudDatastoreCommitOperator(task_id="commit_task",
body=COMMIT_BODY, project_id=PROJECT_ID)
# [END how_to_commit_task]
+
# [START how_to_export_task]
export_task = CloudDatastoreExportEntitiesOperator(
task_id="export_task",
@@ -106,6 +111,7 @@ with DAG(
overwrite_existing=True,
)
# [END how_to_export_task]
+
# [START how_to_import_task]
import_task = CloudDatastoreImportEntitiesOperator(
task_id="import_task",
@@ -114,35 +120,42 @@ with DAG(
project_id=PROJECT_ID,
)
# [END how_to_import_task]
+
# [START get_operation_state]
get_operation = CloudDatastoreGetOperationOperator(
task_id="get_operation", name="{{
task_instance.xcom_pull('export_task')['name'] }}"
)
# [END get_operation_state]
+
# [START delete_operation]
delete_export_operation = CloudDatastoreDeleteOperationOperator(
task_id="delete_export_operation",
name="{{ task_instance.xcom_pull('export_task')['name'] }}",
+ trigger_rule=TriggerRule.ALL_DONE,
)
# [END delete_operation]
- delete_export_operation.trigger_rule = TriggerRule.ALL_DONE
+
delete_import_operation = CloudDatastoreDeleteOperationOperator(
task_id="delete_import_operation",
name="{{ task_instance.xcom_pull('import_task')['name'] }}",
trigger_rule=TriggerRule.ALL_DONE,
)
+
delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
)
chain(
+ # TEST SETUP
create_bucket,
+ # TEST BODY
allocate_ids,
begin_transaction_commit,
commit_task,
export_task,
import_task,
get_operation,
+ # TEST TEARDOWN
[delete_bucket, delete_export_operation, delete_import_operation],
)
diff --git
a/tests/system/providers/google/cloud/datastore/example_datastore_query.py
b/tests/system/providers/google/cloud/datastore/example_datastore_query.py
index 0a5a812baf..24bb1b85dd 100644
--- a/tests/system/providers/google/cloud/datastore/example_datastore_query.py
+++ b/tests/system/providers/google/cloud/datastore/example_datastore_query.py
@@ -33,7 +33,7 @@ from airflow.providers.google.cloud.operators.datastore
import (
)
from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or
DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
DAG_ID = "datastore_query"
diff --git
a/tests/system/providers/google/cloud/datastore/example_datastore_rollback.py
b/tests/system/providers/google/cloud/datastore/example_datastore_rollback.py
index 5435de1b11..09661b0ae0 100644
---
a/tests/system/providers/google/cloud/datastore/example_datastore_rollback.py
+++
b/tests/system/providers/google/cloud/datastore/example_datastore_rollback.py
@@ -32,7 +32,7 @@ from airflow.providers.google.cloud.operators.datastore
import (
)
from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or
DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
DAG_ID = "datastore_rollback"