This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d95acef1d Add deferrable mode to 
DataprocInstantiateInlineWorkflowTemplateOperator (#30878)
0d95acef1d is described below

commit 0d95acef1d0d47fd95545645a75e64fe7c4bb6a6
Author: Beata Kossakowska <[email protected]>
AuthorDate: Fri Apr 28 00:48:31 2023 +0200

    Add deferrable mode to DataprocInstantiateInlineWorkflowTemplateOperator 
(#30878)
    
    Co-authored-by: Beata Kossakowska <[email protected]>
---
 .../providers/google/cloud/operators/dataproc.py   | 42 +++++++++++++++++++---
 .../providers/google/cloud/triggers/dataproc.py    |  4 +--
 .../operators/cloud/dataproc.rst                   | 11 +++++-
 .../google/cloud/operators/test_dataproc.py        | 27 ++++++++++++++
 .../google/cloud/triggers/test_dataproc.py         |  3 --
 .../cloud/dataproc/example_dataproc_workflow.py    | 19 ++--------
 ....py => example_dataproc_workflow_deferrable.py} | 28 +++++----------
 7 files changed, 86 insertions(+), 48 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/dataproc.py 
b/airflow/providers/google/cloud/operators/dataproc.py
index 3ea5325740..534a73b7f4 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -1780,7 +1780,6 @@ class 
DataprocInstantiateWorkflowTemplateOperator(GoogleCloudBaseOperator):
         else:
             self.defer(
                 trigger=DataprocWorkflowTrigger(
-                    template_name=self.template_id,
                     name=operation.operation.name,
                     project_id=self.project_id,
                     region=self.region,
@@ -1843,6 +1842,8 @@ class 
DataprocInstantiateInlineWorkflowTemplateOperator(GoogleCloudBaseOperator)
         If set as a sequence, the identities from the list must grant
         Service Account Token Creator IAM role to the directly preceding 
identity, with first
         account from the list granting this role to the originating account 
(templated).
+    :param deferrable: Run operator in the deferrable mode.
+    :param polling_interval_seconds: Time (seconds) to wait between calls to 
check the run status.
     """
 
     template_fields: Sequence[str] = ("template", "impersonation_chain")
@@ -1861,9 +1862,13 @@ class 
DataprocInstantiateInlineWorkflowTemplateOperator(GoogleCloudBaseOperator)
         metadata: Sequence[tuple[str, str]] = (),
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
+        deferrable: bool = False,
+        polling_interval_seconds: int = 10,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
+        if deferrable and polling_interval_seconds <= 0:
+            raise ValueError("Invalid value for polling_interval_seconds. 
Expected value greater than 0")
         self.template = template
         self.project_id = project_id
         self.region = region
@@ -1874,13 +1879,15 @@ class 
DataprocInstantiateInlineWorkflowTemplateOperator(GoogleCloudBaseOperator)
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
+        self.deferrable = deferrable
+        self.polling_interval_seconds = polling_interval_seconds
 
     def execute(self, context: Context):
         self.log.info("Instantiating Inline Template")
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, 
impersonation_chain=self.impersonation_chain)
         operation = hook.instantiate_inline_workflow_template(
             template=self.template,
-            project_id=self.project_id,
+            project_id=self.project_id or hook.project_id,
             region=self.region,
             request_id=self.request_id,
             retry=self.retry,
@@ -1891,9 +1898,34 @@ class 
DataprocInstantiateInlineWorkflowTemplateOperator(GoogleCloudBaseOperator)
         DataprocLink.persist(
             context=context, task_instance=self, url=DATAPROC_WORKFLOW_LINK, 
resource=self.workflow_id
         )
-        self.log.info("Template instantiated. Workflow Id : %s", 
self.workflow_id)
-        operation.result()
-        self.log.info("Workflow %s completed successfully", self.workflow_id)
+        if not self.deferrable:
+            self.log.info("Template instantiated. Workflow Id : %s", 
self.workflow_id)
+            operation.result()
+            self.log.info("Workflow %s completed successfully", 
self.workflow_id)
+        else:
+            self.defer(
+                trigger=DataprocWorkflowTrigger(
+                    name=operation.operation.name,
+                    project_id=self.project_id or hook.project_id,
+                    region=self.region,
+                    gcp_conn_id=self.gcp_conn_id,
+                    impersonation_chain=self.impersonation_chain,
+                    polling_interval_seconds=self.polling_interval_seconds,
+                ),
+                method_name="execute_complete",
+            )
+
+    def execute_complete(self, context, event=None) -> None:
+        """
+        Callback for when the trigger fires - returns immediately.
+        Relies on trigger to throw an exception, otherwise it assumes 
execution was
+        successful.
+        """
+        if event["status"] == "failed" or event["status"] == "error":
+            self.log.exception("Unexpected error in the operation.")
+            raise AirflowException(event["message"])
+
+        self.log.info("Workflow %s completed successfully", 
event["operation_name"])
 
 
 class DataprocSubmitJobOperator(GoogleCloudBaseOperator):
diff --git a/airflow/providers/google/cloud/triggers/dataproc.py 
b/airflow/providers/google/cloud/triggers/dataproc.py
index c340b44efe..d896f1190d 100644
--- a/airflow/providers/google/cloud/triggers/dataproc.py
+++ b/airflow/providers/google/cloud/triggers/dataproc.py
@@ -290,16 +290,14 @@ class DataprocWorkflowTrigger(DataprocBaseTrigger):
     Implementation leverages asynchronous transport.
     """
 
-    def __init__(self, template_name: str, name: str, **kwargs: Any):
+    def __init__(self, name: str, **kwargs: Any):
         super().__init__(**kwargs)
-        self.template_name = template_name
         self.name = name
 
     def serialize(self):
         return (
             
"airflow.providers.google.cloud.triggers.dataproc.DataprocWorkflowTrigger",
             {
-                "template_name": self.template_name,
                 "name": self.name,
                 "project_id": self.project_id,
                 "region": self.region,
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst 
b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
index a3c1fc4198..f7775c6bfe 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
@@ -264,7 +264,7 @@ Once a workflow is created users can trigger it using
 
 Also for all this action you can use operator in the deferrable mode:
 
-.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_dataproc_trigger_workflow_template_async]
@@ -279,6 +279,15 @@ The inline operator is an alternative. It creates a 
workflow, run it, and delete
     :start-after: [START 
how_to_cloud_dataproc_instantiate_inline_workflow_template]
     :end-before: [END 
how_to_cloud_dataproc_instantiate_inline_workflow_template]
 
+Also for all this action you can use operator in the deferrable mode:
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
+    :language: python
+    :dedent: 4
+    :start-after: [START 
how_to_cloud_dataproc_instantiate_inline_workflow_template_async]
+    :end-before: [END 
how_to_cloud_dataproc_instantiate_inline_workflow_template_async]
+
+
 Create a Batch
 --------------
 
diff --git a/tests/providers/google/cloud/operators/test_dataproc.py 
b/tests/providers/google/cloud/operators/test_dataproc.py
index 6bf7e97489..a2d3f957d4 100644
--- a/tests/providers/google/cloud/operators/test_dataproc.py
+++ b/tests/providers/google/cloud/operators/test_dataproc.py
@@ -1464,6 +1464,33 @@ class 
TestDataprocWorkflowTemplateInstantiateInlineOperator:
             metadata=METADATA,
         )
 
+    @mock.patch(DATAPROC_PATH.format("DataprocHook"))
+    @mock.patch(DATAPROC_TRIGGERS_PATH.format("DataprocAsyncHook"))
+    def test_execute_call_defer_method(self, mock_trigger_hook, mock_hook):
+        operator = DataprocInstantiateInlineWorkflowTemplateOperator(
+            task_id=TASK_ID,
+            template={},
+            region=GCP_REGION,
+            project_id=GCP_PROJECT,
+            request_id=REQUEST_ID,
+            retry=RETRY,
+            timeout=TIMEOUT,
+            metadata=METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+            deferrable=True,
+        )
+
+        with pytest.raises(TaskDeferred) as exc:
+            operator.execute(mock.MagicMock())
+
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, 
impersonation_chain=IMPERSONATION_CHAIN)
+
+        
mock_hook.return_value.instantiate_inline_workflow_template.assert_called_once()
+
+        assert isinstance(exc.value.trigger, DataprocWorkflowTrigger)
+        assert exc.value.method_name == GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME
+
 
 @pytest.mark.need_serialized_dag
 @mock.patch(DATAPROC_PATH.format("DataprocHook"))
diff --git a/tests/providers/google/cloud/triggers/test_dataproc.py 
b/tests/providers/google/cloud/triggers/test_dataproc.py
index 73da7c077c..ea75aae5cc 100644
--- a/tests/providers/google/cloud/triggers/test_dataproc.py
+++ b/tests/providers/google/cloud/triggers/test_dataproc.py
@@ -44,7 +44,6 @@ BATCH_CONFIG = {
 TEST_CLUSTER_NAME = "cluster_name"
 TEST_POLL_INTERVAL = 5
 TEST_GCP_CONN_ID = "google_cloud_default"
-TEST_TEMPLATE_NAME = "template_name"
 TEST_OPERATION_NAME = "name"
 
 
@@ -76,7 +75,6 @@ def batch_trigger():
 @pytest.fixture
 def workflow_trigger():
     return DataprocWorkflowTrigger(
-        template_name=TEST_TEMPLATE_NAME,
         name=TEST_OPERATION_NAME,
         project_id=TEST_PROJECT_ID,
         region=TEST_REGION,
@@ -291,7 +289,6 @@ class TestDataprocWorkflowTrigger:
         classpath, kwargs = workflow_trigger.serialize()
         assert classpath == 
"airflow.providers.google.cloud.triggers.dataproc.DataprocWorkflowTrigger"
         assert kwargs == {
-            "template_name": TEST_TEMPLATE_NAME,
             "name": TEST_OPERATION_NAME,
             "project_id": TEST_PROJECT_ID,
             "region": TEST_REGION,
diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
index 778b24342e..52a9f3094f 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
@@ -31,7 +31,7 @@ from airflow.providers.google.cloud.operators.dataproc import 
(
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 DAG_ID = "dataproc_workflow"
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
 REGION = "europe-west1"
 CLUSTER_NAME = f"cluster-dataproc-workflow-{ENV_ID}"
@@ -83,28 +83,13 @@ with models.DAG(
     )
     # [END how_to_cloud_dataproc_trigger_workflow_template]
 
-    # [START how_to_cloud_dataproc_trigger_workflow_template_async]
-    trigger_workflow_async = DataprocInstantiateWorkflowTemplateOperator(
-        task_id="trigger_workflow_async",
-        region=REGION,
-        project_id=PROJECT_ID,
-        template_id=WORKFLOW_NAME,
-        deferrable=True,
-    )
-    # [END how_to_cloud_dataproc_trigger_workflow_template_async]
-
     # [START how_to_cloud_dataproc_instantiate_inline_workflow_template]
     instantiate_inline_workflow_template = 
DataprocInstantiateInlineWorkflowTemplateOperator(
         task_id="instantiate_inline_workflow_template", 
template=WORKFLOW_TEMPLATE, region=REGION
     )
     # [END how_to_cloud_dataproc_instantiate_inline_workflow_template]
 
-    (
-        create_workflow_template
-        >> trigger_workflow
-        >> instantiate_inline_workflow_template
-        >> trigger_workflow_async
-    )
+    (create_workflow_template >> trigger_workflow >> 
instantiate_inline_workflow_template)
 
     from tests.system.utils.watcher import watcher
 
diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
similarity index 79%
copy from 
tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
copy to 
tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
index 778b24342e..d8dfab2267 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
+++ 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
@@ -31,7 +31,7 @@ from airflow.providers.google.cloud.operators.dataproc import 
(
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 DAG_ID = "dataproc_workflow"
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
 REGION = "europe-west1"
 CLUSTER_NAME = f"cluster-dataproc-workflow-{ENV_ID}"
@@ -68,20 +68,12 @@ with models.DAG(
     catchup=False,
     tags=["example", "dataproc"],
 ) as dag:
-    # [START how_to_cloud_dataproc_create_workflow_template]
     create_workflow_template = DataprocCreateWorkflowTemplateOperator(
         task_id="create_workflow_template",
         template=WORKFLOW_TEMPLATE,
         project_id=PROJECT_ID,
         region=REGION,
     )
-    # [END how_to_cloud_dataproc_create_workflow_template]
-
-    # [START how_to_cloud_dataproc_trigger_workflow_template]
-    trigger_workflow = DataprocInstantiateWorkflowTemplateOperator(
-        task_id="trigger_workflow", region=REGION, project_id=PROJECT_ID, 
template_id=WORKFLOW_NAME
-    )
-    # [END how_to_cloud_dataproc_trigger_workflow_template]
 
     # [START how_to_cloud_dataproc_trigger_workflow_template_async]
     trigger_workflow_async = DataprocInstantiateWorkflowTemplateOperator(
@@ -93,18 +85,16 @@ with models.DAG(
     )
     # [END how_to_cloud_dataproc_trigger_workflow_template_async]
 
-    # [START how_to_cloud_dataproc_instantiate_inline_workflow_template]
-    instantiate_inline_workflow_template = 
DataprocInstantiateInlineWorkflowTemplateOperator(
-        task_id="instantiate_inline_workflow_template", 
template=WORKFLOW_TEMPLATE, region=REGION
+    # [START how_to_cloud_dataproc_instantiate_inline_workflow_template_async]
+    instantiate_inline_workflow_template_async = 
DataprocInstantiateInlineWorkflowTemplateOperator(
+        task_id="instantiate_inline_workflow_template_async",
+        template=WORKFLOW_TEMPLATE,
+        region=REGION,
+        deferrable=True,
     )
-    # [END how_to_cloud_dataproc_instantiate_inline_workflow_template]
+    # [END how_to_cloud_dataproc_instantiate_inline_workflow_template_async]
 
-    (
-        create_workflow_template
-        >> trigger_workflow
-        >> instantiate_inline_workflow_template
-        >> trigger_workflow_async
-    )
+    (create_workflow_template >> trigger_workflow_async >> 
instantiate_inline_workflow_template_async)
 
     from tests.system.utils.watcher import watcher
 

Reply via email to