This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 373d8a57b2 Deprecate `CloudComposerEnvironmentSensor` in favor of
`CloudComposerCreateEnvironmentOperator` with defer mode (#35775)
373d8a57b2 is described below
commit 373d8a57b225a1a5e79b92c3d84b618b3522bfa5
Author: VladaZakharova <[email protected]>
AuthorDate: Sat Nov 25 09:22:46 2023 +0100
Deprecate `CloudComposerEnvironmentSensor` in favor of
`CloudComposerCreateEnvironmentOperator` with defer mode (#35775)
Co-authored-by: Ulada Zakharava <[email protected]>
---
.../google/cloud/operators/cloud_composer.py | 2 +-
.../google/cloud/sensors/cloud_composer.py | 15 ++-
.../operators/cloud/cloud_composer.rst | 6 +-
tests/always/test_project_structure.py | 1 +
.../cloud/composer/example_cloud_composer.py | 53 ++++++++-
.../composer/example_cloud_composer_deferrable.py | 126 ---------------------
6 files changed, 66 insertions(+), 137 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/cloud_composer.py
b/airflow/providers/google/cloud/operators/cloud_composer.py
index b28b0fd476..de6d49d656 100644
--- a/airflow/providers/google/cloud/operators/cloud_composer.py
+++ b/airflow/providers/google/cloud/operators/cloud_composer.py
@@ -112,7 +112,7 @@ class
CloudComposerCreateEnvironmentOperator(GoogleCloudBaseOperator):
:param metadata: Strings which should be sent along with the request as
metadata.
:param deferrable: Run operator in the deferrable mode
:param pooling_period_seconds: Optional: Control the rate of the poll for
the result of deferrable run.
- By default the trigger will poll every 30 seconds.
+ By default, the trigger will poll every 30 seconds.
"""
template_fields = (
diff --git a/airflow/providers/google/cloud/sensors/cloud_composer.py
b/airflow/providers/google/cloud/sensors/cloud_composer.py
index 1873b51d68..9cc742b002 100644
--- a/airflow/providers/google/cloud/sensors/cloud_composer.py
+++ b/airflow/providers/google/cloud/sensors/cloud_composer.py
@@ -19,9 +19,10 @@
from __future__ import annotations
+import warnings
from typing import TYPE_CHECKING, Any, Sequence
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.providers.google.cloud.triggers.cloud_composer import
CloudComposerExecutionTrigger
from airflow.sensors.base import BaseSensorOperator
@@ -33,6 +34,11 @@ class CloudComposerEnvironmentSensor(BaseSensorOperator):
"""
Check the status of the Cloud Composer Environment task.
+ This Sensor is deprecated. You can achieve the same functionality by using
Cloud Composer Operators
+ CloudComposerCreateEnvironmentOperator,
CloudComposerDeleteEnvironmentOperator and
+ CloudComposerUpdateEnvironmentOperator in deferrable or non-deferrable
mode, since every operator
+ gives user a possibility to wait (asynchronously or synchronously) until
Operation will be finished.
+
:param project_id: Required. The ID of the Google Cloud project that the
service belongs to.
:param region: Required. The ID of the Google Cloud region that the
service belongs to.
:param operation_name: The name of the operation resource
@@ -59,6 +65,13 @@ class CloudComposerEnvironmentSensor(BaseSensorOperator):
pooling_period_seconds: int = 30,
**kwargs,
):
+ warnings.warn(
+ f"The `{self.__class__.__name__}` operator is deprecated. You can
achieve the same functionality "
+ f"by using operators in deferrable or non-deferrable mode, since
every operator for Cloud "
+ f"Composer will wait for the operation to complete.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
super().__init__(**kwargs)
self.project_id = project_id
self.region = region
diff --git
a/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst
b/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst
index ada347def7..b55063c8f6 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst
@@ -57,7 +57,7 @@ With this configuration we can create the environment:
or you can define the same operator in the deferrable mode:
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerCreateEnvironmentOperator`
-.. exampleinclude::
/../../tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
:language: python
:dedent: 4
:start-after: [START
howto_operator_create_composer_environment_deferrable_mode]
@@ -116,7 +116,7 @@ To update a service you can use:
or you can define the same operator in the deferrable mode:
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerCreateEnvironmentOperator`
-.. exampleinclude::
/../../tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
:language: python
:dedent: 4
:start-after: [START
howto_operator_update_composer_environment_deferrable_mode]
@@ -138,7 +138,7 @@ To delete a service you can use:
or you can define the same operator in the deferrable mode:
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerDeleteEnvironmentOperator`
-.. exampleinclude::
/../../tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
:language: python
:dedent: 4
:start-after: [START
howto_operator_delete_composer_environment_deferrable_mode]
diff --git a/tests/always/test_project_structure.py
b/tests/always/test_project_structure.py
index 967c3ecbb5..bd5098c04f 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -395,6 +395,7 @@ class
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
"airflow.providers.google.cloud.operators.bigquery.BigQueryPatchDatasetOperator",
"airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator",
"airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator",
+
"airflow.providers.google.cloud.sensors.cloud_composer.CloudComposerEnvironmentSensor",
"airflow.providers.google.marketing_platform.operators.GoogleDisplayVideo360CreateQueryOperator",
"airflow.providers.google.marketing_platform.operators.GoogleDisplayVideo360RunQueryOperator",
"airflow.providers.google.marketing_platform.operators.GoogleDisplayVideo360DownloadReportV2Operator",
diff --git
a/tests/system/providers/google/cloud/composer/example_cloud_composer.py
b/tests/system/providers/google/cloud/composer/example_cloud_composer.py
index ab9c6a0e1b..fb8958412e 100644
--- a/tests/system/providers/google/cloud/composer/example_cloud_composer.py
+++ b/tests/system/providers/google/cloud/composer/example_cloud_composer.py
@@ -32,20 +32,20 @@ from
airflow.providers.google.cloud.operators.cloud_composer import (
)
from airflow.utils.trigger_rule import TriggerRule
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
DAG_ID = "example_composer"
-
REGION = "us-central1"
# [START howto_operator_composer_simple_environment]
ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}".replace("_", "-")
+ENVIRONMENT_ID_ASYNC = f"test-deferrable-{DAG_ID}-{ENV_ID}".replace("_", "-")
ENVIRONMENT = {
"config": {
- "software_config": {"image_version": "composer-2.0.28-airflow-2.2.5"},
+ "software_config": {"image_version": "composer-2.5.0-airflow-2.5.3"},
}
}
# [END howto_operator_composer_simple_environment]
@@ -53,10 +53,10 @@ ENVIRONMENT = {
# [START howto_operator_composer_update_environment]
UPDATED_ENVIRONMENT = {
"labels": {
- "label1": "testing",
+ "label": "testing",
}
}
-UPDATE_MASK = {"paths": ["labels.label1"]}
+UPDATE_MASK = {"paths": ["labels.label"]}
# [END howto_operator_composer_update_environment]
@@ -85,6 +85,17 @@ with DAG(
)
# [END howto_operator_create_composer_environment]
+ # [START howto_operator_create_composer_environment_deferrable_mode]
+ defer_create_env = CloudComposerCreateEnvironmentOperator(
+ task_id="defer_create_env",
+ project_id=PROJECT_ID,
+ region=REGION,
+ environment_id=ENVIRONMENT_ID_ASYNC,
+ environment=ENVIRONMENT,
+ deferrable=True,
+ )
+ # [END howto_operator_create_composer_environment_deferrable_mode]
+
# [START howto_operator_list_composer_environments]
list_envs = CloudComposerListEnvironmentsOperator(
task_id="list_envs", project_id=PROJECT_ID, region=REGION
@@ -111,6 +122,18 @@ with DAG(
)
# [END howto_operator_update_composer_environment]
+ # [START howto_operator_update_composer_environment_deferrable_mode]
+ defer_update_env = CloudComposerUpdateEnvironmentOperator(
+ task_id="defer_update_env",
+ project_id=PROJECT_ID,
+ region=REGION,
+ environment_id=ENVIRONMENT_ID_ASYNC,
+ update_mask=UPDATE_MASK,
+ environment=UPDATED_ENVIRONMENT,
+ deferrable=True,
+ )
+ # [END howto_operator_update_composer_environment_deferrable_mode]
+
# [START howto_operator_delete_composer_environment]
delete_env = CloudComposerDeleteEnvironmentOperator(
task_id="delete_env",
@@ -121,7 +144,25 @@ with DAG(
# [END howto_operator_delete_composer_environment]
delete_env.trigger_rule = TriggerRule.ALL_DONE
- chain(image_versions, create_env, list_envs, get_env, update_env,
delete_env)
+ # [START howto_operator_delete_composer_environment_deferrable_mode]
+ defer_delete_env = CloudComposerDeleteEnvironmentOperator(
+ task_id="defer_delete_env",
+ project_id=PROJECT_ID,
+ region=REGION,
+ environment_id=ENVIRONMENT_ID_ASYNC,
+ deferrable=True,
+ )
+ # [END howto_operator_delete_composer_environment_deferrable_mode]
+ defer_delete_env.trigger_rule = TriggerRule.ALL_DONE
+
+ chain(
+ image_versions,
+ [create_env, defer_create_env],
+ list_envs,
+ get_env,
+ [update_env, defer_update_env],
+ [delete_env, defer_delete_env],
+ )
from tests.system.utils.watcher import watcher
diff --git
a/tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
b/tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
deleted file mode 100644
index 1a69e12d7b..0000000000
---
a/tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
+++ /dev/null
@@ -1,126 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import os
-from datetime import datetime
-
-from airflow.models.baseoperator import chain
-from airflow.models.dag import DAG
-from airflow.providers.google.cloud.operators.cloud_composer import (
- CloudComposerCreateEnvironmentOperator,
- CloudComposerDeleteEnvironmentOperator,
- CloudComposerUpdateEnvironmentOperator,
-)
-from airflow.providers.google.cloud.sensors.cloud_composer import
CloudComposerEnvironmentSensor
-from airflow.utils.trigger_rule import TriggerRule
-
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-
-DAG_ID = "example_composer_deferrable"
-
-REGION = "us-central1"
-
-ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}".replace("_", "-")
-# [START howto_operator_composer_simple_environment]
-ENVIRONMENT = {
- "config": {
- "software_config": {"image_version": "composer-2.0.28-airflow-2.2.5"},
- }
-}
-# [END howto_operator_composer_simple_environment]
-
-# [START howto_operator_composer_update_environment]
-UPDATED_ENVIRONMENT = {
- "labels": {
- "label2": "testing",
- }
-}
-UPDATE_MASK = {"paths": ["labels.label2"]}
-# [END howto_operator_composer_update_environment]
-
-
-with DAG(
- DAG_ID,
- schedule="@once",
- start_date=datetime(2021, 1, 1),
- catchup=False,
- tags=["example", "composer"],
-) as dag:
- # [START howto_operator_create_composer_environment_deferrable_mode]
- defer_create_env = CloudComposerCreateEnvironmentOperator(
- task_id="defer_create_env",
- project_id=PROJECT_ID,
- region=REGION,
- environment_id=ENVIRONMENT_ID,
- environment=ENVIRONMENT,
- deferrable=True,
- )
- # [END howto_operator_create_composer_environment_deferrable_mode]
-
- operation_name = defer_create_env.output["operation_id"]
-
- wait_for_execution = CloudComposerEnvironmentSensor(
- task_id="wait_for_execution",
- operation_name=operation_name,
- region=REGION,
- project_id=PROJECT_ID,
- )
-
- # [START howto_operator_update_composer_environment_deferrable_mode]
- defer_update_env = CloudComposerUpdateEnvironmentOperator(
- task_id="defer_update_env",
- project_id=PROJECT_ID,
- region=REGION,
- environment_id=ENVIRONMENT_ID,
- update_mask=UPDATE_MASK,
- environment=UPDATED_ENVIRONMENT,
- deferrable=True,
- )
- # [END howto_operator_update_composer_environment_deferrable_mode]
-
- # [START howto_operator_delete_composer_environment_deferrable_mode]
- defer_delete_env = CloudComposerDeleteEnvironmentOperator(
- task_id="defer_delete_env",
- project_id=PROJECT_ID,
- region=REGION,
- environment_id=ENVIRONMENT_ID,
- deferrable=True,
- )
- # [END howto_operator_delete_composer_environment_deferrable_mode]
- defer_delete_env.trigger_rule = TriggerRule.ALL_DONE
-
- chain(
- defer_create_env,
- wait_for_execution,
- defer_update_env,
- defer_delete_env,
- )
-
- from tests.system.utils.watcher import watcher
-
- # This test needs watcher in order to properly mark success/failure
- # when "teardown" task with trigger rule is part of the DAG
- list(dag.tasks) >> watcher()
-
-
-from tests.system.utils import get_test_run # noqa: E402
-
-# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
-test_run = get_test_run(dag)