This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0d95acef1d Add deferrable mode to
DataprocInstantiateInlineWorkflowTemplateOperator (#30878)
0d95acef1d is described below
commit 0d95acef1d0d47fd95545645a75e64fe7c4bb6a6
Author: Beata Kossakowska <[email protected]>
AuthorDate: Fri Apr 28 00:48:31 2023 +0200
Add deferrable mode to DataprocInstantiateInlineWorkflowTemplateOperator
(#30878)
Co-authored-by: Beata Kossakowska <[email protected]>
---
.../providers/google/cloud/operators/dataproc.py | 42 +++++++++++++++++++---
.../providers/google/cloud/triggers/dataproc.py | 4 +--
.../operators/cloud/dataproc.rst | 11 +++++-
.../google/cloud/operators/test_dataproc.py | 27 ++++++++++++++
.../google/cloud/triggers/test_dataproc.py | 3 --
.../cloud/dataproc/example_dataproc_workflow.py | 19 ++--------
....py => example_dataproc_workflow_deferrable.py} | 28 +++++----------
7 files changed, 86 insertions(+), 48 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/dataproc.py
b/airflow/providers/google/cloud/operators/dataproc.py
index 3ea5325740..534a73b7f4 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -1780,7 +1780,6 @@ class
DataprocInstantiateWorkflowTemplateOperator(GoogleCloudBaseOperator):
else:
self.defer(
trigger=DataprocWorkflowTrigger(
- template_name=self.template_id,
name=operation.operation.name,
project_id=self.project_id,
region=self.region,
@@ -1843,6 +1842,8 @@ class
DataprocInstantiateInlineWorkflowTemplateOperator(GoogleCloudBaseOperator)
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
+ :param deferrable: Run operator in the deferrable mode.
+ :param polling_interval_seconds: Time (seconds) to wait between calls to
check the run status.
"""
template_fields: Sequence[str] = ("template", "impersonation_chain")
@@ -1861,9 +1862,13 @@ class
DataprocInstantiateInlineWorkflowTemplateOperator(GoogleCloudBaseOperator)
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
+ deferrable: bool = False,
+ polling_interval_seconds: int = 10,
**kwargs,
) -> None:
super().__init__(**kwargs)
+ if deferrable and polling_interval_seconds <= 0:
+ raise ValueError("Invalid value for polling_interval_seconds.
Expected value greater than 0")
self.template = template
self.project_id = project_id
self.region = region
@@ -1874,13 +1879,15 @@ class
DataprocInstantiateInlineWorkflowTemplateOperator(GoogleCloudBaseOperator)
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
+ self.deferrable = deferrable
+ self.polling_interval_seconds = polling_interval_seconds
def execute(self, context: Context):
self.log.info("Instantiating Inline Template")
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain)
operation = hook.instantiate_inline_workflow_template(
template=self.template,
- project_id=self.project_id,
+ project_id=self.project_id or hook.project_id,
region=self.region,
request_id=self.request_id,
retry=self.retry,
@@ -1891,9 +1898,34 @@ class
DataprocInstantiateInlineWorkflowTemplateOperator(GoogleCloudBaseOperator)
DataprocLink.persist(
context=context, task_instance=self, url=DATAPROC_WORKFLOW_LINK,
resource=self.workflow_id
)
- self.log.info("Template instantiated. Workflow Id : %s",
self.workflow_id)
- operation.result()
- self.log.info("Workflow %s completed successfully", self.workflow_id)
+ if not self.deferrable:
+ self.log.info("Template instantiated. Workflow Id : %s",
self.workflow_id)
+ operation.result()
+ self.log.info("Workflow %s completed successfully",
self.workflow_id)
+ else:
+ self.defer(
+ trigger=DataprocWorkflowTrigger(
+ name=operation.operation.name,
+ project_id=self.project_id or hook.project_id,
+ region=self.region,
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ polling_interval_seconds=self.polling_interval_seconds,
+ ),
+ method_name="execute_complete",
+ )
+
+ def execute_complete(self, context, event=None) -> None:
+ """
+ Callback for when the trigger fires - returns immediately.
+ Relies on trigger to throw an exception, otherwise it assumes
execution was
+ successful.
+ """
+ if event["status"] == "failed" or event["status"] == "error":
+ self.log.exception("Unexpected error in the operation.")
+ raise AirflowException(event["message"])
+
+ self.log.info("Workflow %s completed successfully",
event["operation_name"])
class DataprocSubmitJobOperator(GoogleCloudBaseOperator):
diff --git a/airflow/providers/google/cloud/triggers/dataproc.py
b/airflow/providers/google/cloud/triggers/dataproc.py
index c340b44efe..d896f1190d 100644
--- a/airflow/providers/google/cloud/triggers/dataproc.py
+++ b/airflow/providers/google/cloud/triggers/dataproc.py
@@ -290,16 +290,14 @@ class DataprocWorkflowTrigger(DataprocBaseTrigger):
Implementation leverages asynchronous transport.
"""
- def __init__(self, template_name: str, name: str, **kwargs: Any):
+ def __init__(self, name: str, **kwargs: Any):
super().__init__(**kwargs)
- self.template_name = template_name
self.name = name
def serialize(self):
return (
"airflow.providers.google.cloud.triggers.dataproc.DataprocWorkflowTrigger",
{
- "template_name": self.template_name,
"name": self.name,
"project_id": self.project_id,
"region": self.region,
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
index a3c1fc4198..f7775c6bfe 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
@@ -264,7 +264,7 @@ Once a workflow is created users can trigger it using
Also for all this action you can use operator in the deferrable mode:
-.. exampleinclude::
/../../tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_dataproc_trigger_workflow_template_async]
@@ -279,6 +279,15 @@ The inline operator is an alternative. It creates a
workflow, run it, and delete
:start-after: [START
how_to_cloud_dataproc_instantiate_inline_workflow_template]
:end-before: [END
how_to_cloud_dataproc_instantiate_inline_workflow_template]
+Also for all this action you can use operator in the deferrable mode:
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
+ :language: python
+ :dedent: 4
+ :start-after: [START
how_to_cloud_dataproc_instantiate_inline_workflow_template_async]
+ :end-before: [END
how_to_cloud_dataproc_instantiate_inline_workflow_template_async]
+
+
Create a Batch
--------------
diff --git a/tests/providers/google/cloud/operators/test_dataproc.py
b/tests/providers/google/cloud/operators/test_dataproc.py
index 6bf7e97489..a2d3f957d4 100644
--- a/tests/providers/google/cloud/operators/test_dataproc.py
+++ b/tests/providers/google/cloud/operators/test_dataproc.py
@@ -1464,6 +1464,33 @@ class
TestDataprocWorkflowTemplateInstantiateInlineOperator:
metadata=METADATA,
)
+ @mock.patch(DATAPROC_PATH.format("DataprocHook"))
+ @mock.patch(DATAPROC_TRIGGERS_PATH.format("DataprocAsyncHook"))
+ def test_execute_call_defer_method(self, mock_trigger_hook, mock_hook):
+ operator = DataprocInstantiateInlineWorkflowTemplateOperator(
+ task_id=TASK_ID,
+ template={},
+ region=GCP_REGION,
+ project_id=GCP_PROJECT,
+ request_id=REQUEST_ID,
+ retry=RETRY,
+ timeout=TIMEOUT,
+ metadata=METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ deferrable=True,
+ )
+
+ with pytest.raises(TaskDeferred) as exc:
+ operator.execute(mock.MagicMock())
+
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN)
+
+
mock_hook.return_value.instantiate_inline_workflow_template.assert_called_once()
+
+ assert isinstance(exc.value.trigger, DataprocWorkflowTrigger)
+ assert exc.value.method_name == GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME
+
@pytest.mark.need_serialized_dag
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
diff --git a/tests/providers/google/cloud/triggers/test_dataproc.py
b/tests/providers/google/cloud/triggers/test_dataproc.py
index 73da7c077c..ea75aae5cc 100644
--- a/tests/providers/google/cloud/triggers/test_dataproc.py
+++ b/tests/providers/google/cloud/triggers/test_dataproc.py
@@ -44,7 +44,6 @@ BATCH_CONFIG = {
TEST_CLUSTER_NAME = "cluster_name"
TEST_POLL_INTERVAL = 5
TEST_GCP_CONN_ID = "google_cloud_default"
-TEST_TEMPLATE_NAME = "template_name"
TEST_OPERATION_NAME = "name"
@@ -76,7 +75,6 @@ def batch_trigger():
@pytest.fixture
def workflow_trigger():
return DataprocWorkflowTrigger(
- template_name=TEST_TEMPLATE_NAME,
name=TEST_OPERATION_NAME,
project_id=TEST_PROJECT_ID,
region=TEST_REGION,
@@ -291,7 +289,6 @@ class TestDataprocWorkflowTrigger:
classpath, kwargs = workflow_trigger.serialize()
assert classpath ==
"airflow.providers.google.cloud.triggers.dataproc.DataprocWorkflowTrigger"
assert kwargs == {
- "template_name": TEST_TEMPLATE_NAME,
"name": TEST_OPERATION_NAME,
"project_id": TEST_PROJECT_ID,
"region": TEST_REGION,
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
index 778b24342e..52a9f3094f 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
@@ -31,7 +31,7 @@ from airflow.providers.google.cloud.operators.dataproc import
(
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "dataproc_workflow"
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
REGION = "europe-west1"
CLUSTER_NAME = f"cluster-dataproc-workflow-{ENV_ID}"
@@ -83,28 +83,13 @@ with models.DAG(
)
# [END how_to_cloud_dataproc_trigger_workflow_template]
- # [START how_to_cloud_dataproc_trigger_workflow_template_async]
- trigger_workflow_async = DataprocInstantiateWorkflowTemplateOperator(
- task_id="trigger_workflow_async",
- region=REGION,
- project_id=PROJECT_ID,
- template_id=WORKFLOW_NAME,
- deferrable=True,
- )
- # [END how_to_cloud_dataproc_trigger_workflow_template_async]
-
# [START how_to_cloud_dataproc_instantiate_inline_workflow_template]
instantiate_inline_workflow_template =
DataprocInstantiateInlineWorkflowTemplateOperator(
task_id="instantiate_inline_workflow_template",
template=WORKFLOW_TEMPLATE, region=REGION
)
# [END how_to_cloud_dataproc_instantiate_inline_workflow_template]
- (
- create_workflow_template
- >> trigger_workflow
- >> instantiate_inline_workflow_template
- >> trigger_workflow_async
- )
+ (create_workflow_template >> trigger_workflow >>
instantiate_inline_workflow_template)
from tests.system.utils.watcher import watcher
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
similarity index 79%
copy from
tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
copy to
tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
index 778b24342e..d8dfab2267 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
+++
b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
@@ -31,7 +31,7 @@ from airflow.providers.google.cloud.operators.dataproc import
(
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "dataproc_workflow"
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
REGION = "europe-west1"
CLUSTER_NAME = f"cluster-dataproc-workflow-{ENV_ID}"
@@ -68,20 +68,12 @@ with models.DAG(
catchup=False,
tags=["example", "dataproc"],
) as dag:
- # [START how_to_cloud_dataproc_create_workflow_template]
create_workflow_template = DataprocCreateWorkflowTemplateOperator(
task_id="create_workflow_template",
template=WORKFLOW_TEMPLATE,
project_id=PROJECT_ID,
region=REGION,
)
- # [END how_to_cloud_dataproc_create_workflow_template]
-
- # [START how_to_cloud_dataproc_trigger_workflow_template]
- trigger_workflow = DataprocInstantiateWorkflowTemplateOperator(
- task_id="trigger_workflow", region=REGION, project_id=PROJECT_ID,
template_id=WORKFLOW_NAME
- )
- # [END how_to_cloud_dataproc_trigger_workflow_template]
# [START how_to_cloud_dataproc_trigger_workflow_template_async]
trigger_workflow_async = DataprocInstantiateWorkflowTemplateOperator(
@@ -93,18 +85,16 @@ with models.DAG(
)
# [END how_to_cloud_dataproc_trigger_workflow_template_async]
- # [START how_to_cloud_dataproc_instantiate_inline_workflow_template]
- instantiate_inline_workflow_template =
DataprocInstantiateInlineWorkflowTemplateOperator(
- task_id="instantiate_inline_workflow_template",
template=WORKFLOW_TEMPLATE, region=REGION
+ # [START how_to_cloud_dataproc_instantiate_inline_workflow_template_async]
+ instantiate_inline_workflow_template_async =
DataprocInstantiateInlineWorkflowTemplateOperator(
+ task_id="instantiate_inline_workflow_template_async",
+ template=WORKFLOW_TEMPLATE,
+ region=REGION,
+ deferrable=True,
)
- # [END how_to_cloud_dataproc_instantiate_inline_workflow_template]
+ # [END how_to_cloud_dataproc_instantiate_inline_workflow_template_async]
- (
- create_workflow_template
- >> trigger_workflow
- >> instantiate_inline_workflow_template
- >> trigger_workflow_async
- )
+ (create_workflow_template >> trigger_workflow_async >>
instantiate_inline_workflow_template_async)
from tests.system.utils.watcher import watcher