This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 373d8a57b2 Deprecate `CloudComposerEnvironmentSensor` in favor of 
`CloudComposerCreateEnvironmentOperator` with defer mode (#35775)
373d8a57b2 is described below

commit 373d8a57b225a1a5e79b92c3d84b618b3522bfa5
Author: VladaZakharova <[email protected]>
AuthorDate: Sat Nov 25 09:22:46 2023 +0100

    Deprecate `CloudComposerEnvironmentSensor` in favor of 
`CloudComposerCreateEnvironmentOperator` with defer mode (#35775)
    
    Co-authored-by: Ulada Zakharava <[email protected]>
---
 .../google/cloud/operators/cloud_composer.py       |   2 +-
 .../google/cloud/sensors/cloud_composer.py         |  15 ++-
 .../operators/cloud/cloud_composer.rst             |   6 +-
 tests/always/test_project_structure.py             |   1 +
 .../cloud/composer/example_cloud_composer.py       |  53 ++++++++-
 .../composer/example_cloud_composer_deferrable.py  | 126 ---------------------
 6 files changed, 66 insertions(+), 137 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/cloud_composer.py 
b/airflow/providers/google/cloud/operators/cloud_composer.py
index b28b0fd476..de6d49d656 100644
--- a/airflow/providers/google/cloud/operators/cloud_composer.py
+++ b/airflow/providers/google/cloud/operators/cloud_composer.py
@@ -112,7 +112,7 @@ class 
CloudComposerCreateEnvironmentOperator(GoogleCloudBaseOperator):
     :param metadata: Strings which should be sent along with the request as 
metadata.
     :param deferrable: Run operator in the deferrable mode
     :param pooling_period_seconds: Optional: Control the rate of the poll for 
the result of deferrable run.
-        By default the trigger will poll every 30 seconds.
+        By default, the trigger will poll every 30 seconds.
     """
 
     template_fields = (
diff --git a/airflow/providers/google/cloud/sensors/cloud_composer.py 
b/airflow/providers/google/cloud/sensors/cloud_composer.py
index 1873b51d68..9cc742b002 100644
--- a/airflow/providers/google/cloud/sensors/cloud_composer.py
+++ b/airflow/providers/google/cloud/sensors/cloud_composer.py
@@ -19,9 +19,10 @@
 
 from __future__ import annotations
 
+import warnings
 from typing import TYPE_CHECKING, Any, Sequence
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowSkipException
 from airflow.providers.google.cloud.triggers.cloud_composer import 
CloudComposerExecutionTrigger
 from airflow.sensors.base import BaseSensorOperator
 
@@ -33,6 +34,11 @@ class CloudComposerEnvironmentSensor(BaseSensorOperator):
     """
     Check the status of the Cloud Composer Environment task.
 
+    This Sensor is deprecated. You can achieve the same functionality by using 
Cloud Composer Operators
+    CloudComposerCreateEnvironmentOperator, 
CloudComposerDeleteEnvironmentOperator and
+    CloudComposerUpdateEnvironmentOperator in  deferrable or non-deferrable 
mode, since every operator
+    gives user a possibility to wait (asynchronously or synchronously) until 
Operation will be finished.
+
     :param project_id: Required. The ID of the Google Cloud project that the 
service belongs to.
     :param region: Required. The ID of the Google Cloud region that the 
service belongs to.
     :param operation_name: The name of the operation resource
@@ -59,6 +65,13 @@ class CloudComposerEnvironmentSensor(BaseSensorOperator):
         pooling_period_seconds: int = 30,
         **kwargs,
     ):
+        warnings.warn(
+            f"The `{self.__class__.__name__}` operator is deprecated. You can 
achieve the same functionality "
+            f"by using operators in deferrable or non-deferrable mode, since 
every operator for Cloud "
+            f"Composer will wait for the operation to complete.",
+            AirflowProviderDeprecationWarning,
+            stacklevel=2,
+        )
         super().__init__(**kwargs)
         self.project_id = project_id
         self.region = region
diff --git 
a/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst 
b/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst
index ada347def7..b55063c8f6 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst
@@ -57,7 +57,7 @@ With this configuration we can create the environment:
 or you can define the same operator in the deferrable mode:
 
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerCreateEnvironmentOperator`
 
-.. exampleinclude:: 
/../../tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
     :language: python
     :dedent: 4
     :start-after: [START 
howto_operator_create_composer_environment_deferrable_mode]
@@ -116,7 +116,7 @@ To update a service you can use:
 or you can define the same operator in the deferrable mode:
 
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerCreateEnvironmentOperator`
 
-.. exampleinclude:: 
/../../tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
     :language: python
     :dedent: 4
     :start-after: [START 
howto_operator_update_composer_environment_deferrable_mode]
@@ -138,7 +138,7 @@ To delete a service you can use:
 or you can define the same operator in the deferrable mode:
 
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerDeleteEnvironmentOperator`
 
-.. exampleinclude:: 
/../../tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
     :language: python
     :dedent: 4
     :start-after: [START 
howto_operator_delete_composer_environment_deferrable_mode]
diff --git a/tests/always/test_project_structure.py 
b/tests/always/test_project_structure.py
index 967c3ecbb5..bd5098c04f 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -395,6 +395,7 @@ class 
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
         
"airflow.providers.google.cloud.operators.bigquery.BigQueryPatchDatasetOperator",
         
"airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator",
         
"airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator",
+        
"airflow.providers.google.cloud.sensors.cloud_composer.CloudComposerEnvironmentSensor",
         
"airflow.providers.google.marketing_platform.operators.GoogleDisplayVideo360CreateQueryOperator",
         
"airflow.providers.google.marketing_platform.operators.GoogleDisplayVideo360RunQueryOperator",
         
"airflow.providers.google.marketing_platform.operators.GoogleDisplayVideo360DownloadReportV2Operator",
diff --git 
a/tests/system/providers/google/cloud/composer/example_cloud_composer.py 
b/tests/system/providers/google/cloud/composer/example_cloud_composer.py
index ab9c6a0e1b..fb8958412e 100644
--- a/tests/system/providers/google/cloud/composer/example_cloud_composer.py
+++ b/tests/system/providers/google/cloud/composer/example_cloud_composer.py
@@ -32,20 +32,20 @@ from 
airflow.providers.google.cloud.operators.cloud_composer import (
 )
 from airflow.utils.trigger_rule import TriggerRule
 
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
 
 DAG_ID = "example_composer"
-
 REGION = "us-central1"
 
 # [START howto_operator_composer_simple_environment]
 
 ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}".replace("_", "-")
+ENVIRONMENT_ID_ASYNC = f"test-deferrable-{DAG_ID}-{ENV_ID}".replace("_", "-")
 
 ENVIRONMENT = {
     "config": {
-        "software_config": {"image_version": "composer-2.0.28-airflow-2.2.5"},
+        "software_config": {"image_version": "composer-2.5.0-airflow-2.5.3"},
     }
 }
 # [END howto_operator_composer_simple_environment]
@@ -53,10 +53,10 @@ ENVIRONMENT = {
 # [START howto_operator_composer_update_environment]
 UPDATED_ENVIRONMENT = {
     "labels": {
-        "label1": "testing",
+        "label": "testing",
     }
 }
-UPDATE_MASK = {"paths": ["labels.label1"]}
+UPDATE_MASK = {"paths": ["labels.label"]}
 # [END howto_operator_composer_update_environment]
 
 
@@ -85,6 +85,17 @@ with DAG(
     )
     # [END howto_operator_create_composer_environment]
 
+    # [START howto_operator_create_composer_environment_deferrable_mode]
+    defer_create_env = CloudComposerCreateEnvironmentOperator(
+        task_id="defer_create_env",
+        project_id=PROJECT_ID,
+        region=REGION,
+        environment_id=ENVIRONMENT_ID_ASYNC,
+        environment=ENVIRONMENT,
+        deferrable=True,
+    )
+    # [END howto_operator_create_composer_environment_deferrable_mode]
+
     # [START howto_operator_list_composer_environments]
     list_envs = CloudComposerListEnvironmentsOperator(
         task_id="list_envs", project_id=PROJECT_ID, region=REGION
@@ -111,6 +122,18 @@ with DAG(
     )
     # [END howto_operator_update_composer_environment]
 
+    # [START howto_operator_update_composer_environment_deferrable_mode]
+    defer_update_env = CloudComposerUpdateEnvironmentOperator(
+        task_id="defer_update_env",
+        project_id=PROJECT_ID,
+        region=REGION,
+        environment_id=ENVIRONMENT_ID_ASYNC,
+        update_mask=UPDATE_MASK,
+        environment=UPDATED_ENVIRONMENT,
+        deferrable=True,
+    )
+    # [END howto_operator_update_composer_environment_deferrable_mode]
+
     # [START howto_operator_delete_composer_environment]
     delete_env = CloudComposerDeleteEnvironmentOperator(
         task_id="delete_env",
@@ -121,7 +144,25 @@ with DAG(
     # [END howto_operator_delete_composer_environment]
     delete_env.trigger_rule = TriggerRule.ALL_DONE
 
-    chain(image_versions, create_env, list_envs, get_env, update_env, 
delete_env)
+    # [START howto_operator_delete_composer_environment_deferrable_mode]
+    defer_delete_env = CloudComposerDeleteEnvironmentOperator(
+        task_id="defer_delete_env",
+        project_id=PROJECT_ID,
+        region=REGION,
+        environment_id=ENVIRONMENT_ID_ASYNC,
+        deferrable=True,
+    )
+    # [END howto_operator_delete_composer_environment_deferrable_mode]
+    defer_delete_env.trigger_rule = TriggerRule.ALL_DONE
+
+    chain(
+        image_versions,
+        [create_env, defer_create_env],
+        list_envs,
+        get_env,
+        [update_env, defer_update_env],
+        [delete_env, defer_delete_env],
+    )
 
     from tests.system.utils.watcher import watcher
 
diff --git 
a/tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
 
b/tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
deleted file mode 100644
index 1a69e12d7b..0000000000
--- 
a/tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
+++ /dev/null
@@ -1,126 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import os
-from datetime import datetime
-
-from airflow.models.baseoperator import chain
-from airflow.models.dag import DAG
-from airflow.providers.google.cloud.operators.cloud_composer import (
-    CloudComposerCreateEnvironmentOperator,
-    CloudComposerDeleteEnvironmentOperator,
-    CloudComposerUpdateEnvironmentOperator,
-)
-from airflow.providers.google.cloud.sensors.cloud_composer import 
CloudComposerEnvironmentSensor
-from airflow.utils.trigger_rule import TriggerRule
-
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-
-DAG_ID = "example_composer_deferrable"
-
-REGION = "us-central1"
-
-ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}".replace("_", "-")
-# [START howto_operator_composer_simple_environment]
-ENVIRONMENT = {
-    "config": {
-        "software_config": {"image_version": "composer-2.0.28-airflow-2.2.5"},
-    }
-}
-# [END howto_operator_composer_simple_environment]
-
-# [START howto_operator_composer_update_environment]
-UPDATED_ENVIRONMENT = {
-    "labels": {
-        "label2": "testing",
-    }
-}
-UPDATE_MASK = {"paths": ["labels.label2"]}
-# [END howto_operator_composer_update_environment]
-
-
-with DAG(
-    DAG_ID,
-    schedule="@once",
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=["example", "composer"],
-) as dag:
-    # [START howto_operator_create_composer_environment_deferrable_mode]
-    defer_create_env = CloudComposerCreateEnvironmentOperator(
-        task_id="defer_create_env",
-        project_id=PROJECT_ID,
-        region=REGION,
-        environment_id=ENVIRONMENT_ID,
-        environment=ENVIRONMENT,
-        deferrable=True,
-    )
-    # [END howto_operator_create_composer_environment_deferrable_mode]
-
-    operation_name = defer_create_env.output["operation_id"]
-
-    wait_for_execution = CloudComposerEnvironmentSensor(
-        task_id="wait_for_execution",
-        operation_name=operation_name,
-        region=REGION,
-        project_id=PROJECT_ID,
-    )
-
-    # [START howto_operator_update_composer_environment_deferrable_mode]
-    defer_update_env = CloudComposerUpdateEnvironmentOperator(
-        task_id="defer_update_env",
-        project_id=PROJECT_ID,
-        region=REGION,
-        environment_id=ENVIRONMENT_ID,
-        update_mask=UPDATE_MASK,
-        environment=UPDATED_ENVIRONMENT,
-        deferrable=True,
-    )
-    # [END howto_operator_update_composer_environment_deferrable_mode]
-
-    # [START howto_operator_delete_composer_environment_deferrable_mode]
-    defer_delete_env = CloudComposerDeleteEnvironmentOperator(
-        task_id="defer_delete_env",
-        project_id=PROJECT_ID,
-        region=REGION,
-        environment_id=ENVIRONMENT_ID,
-        deferrable=True,
-    )
-    # [END howto_operator_delete_composer_environment_deferrable_mode]
-    defer_delete_env.trigger_rule = TriggerRule.ALL_DONE
-
-    chain(
-        defer_create_env,
-        wait_for_execution,
-        defer_update_env,
-        defer_delete_env,
-    )
-
-    from tests.system.utils.watcher import watcher
-
-    # This test needs watcher in order to properly mark success/failure
-    # when "teardown" task with trigger rule is part of the DAG
-    list(dag.tasks) >> watcher()
-
-
-from tests.system.utils import get_test_run  # noqa: E402
-
-# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
-test_run = get_test_run(dag)

Reply via email to