This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7ae3a55194 Add early job_id xcom_push for google provider Beam
Pipeline operators (#42982)
7ae3a55194 is described below
commit 7ae3a5519446c6e3e090ec44036883764a3dffc5
Author: olegkachur-e <[email protected]>
AuthorDate: Mon Oct 14 03:49:35 2024 +0200
Add early job_id xcom_push for google provider Beam Pipeline operators
(#42982)
- To let GCP Beam Sensor operators 'sense' the pipeline changes,
by having dataflow job_id been xcom_push as soon as it available.
Related issue: https://github.com/apache/airflow/issues/30007.
Co-authored-by: Oleg Kachur <[email protected]>
---
.../airflow/providers/apache/beam/operators/beam.py | 19 +++++++++++++++++--
providers/tests/apache/beam/operators/test_beam.py | 21 +++++++++++++++++++++
2 files changed, 38 insertions(+), 2 deletions(-)
diff --git a/providers/src/airflow/providers/apache/beam/operators/beam.py
b/providers/src/airflow/providers/apache/beam/operators/beam.py
index 41c55ede2a..65f2333658 100644
--- a/providers/src/airflow/providers/apache/beam/operators/beam.py
+++ b/providers/src/airflow/providers/apache/beam/operators/beam.py
@@ -187,7 +187,20 @@ class BeamBasePipelineOperator(BaseOperator,
BeamDataflowMixin, ABC):
self.gcp_conn_id = gcp_conn_id
self.beam_hook: BeamHook
self.dataflow_hook: DataflowHook | None = None
- self.dataflow_job_id: str | None = None
+ self._dataflow_job_id: str | None = None
+ self._execute_context: Context | None = None
+
+ @property
+ def dataflow_job_id(self):
+ return self._dataflow_job_id
+
+ @dataflow_job_id.setter
+ def dataflow_job_id(self, new_value):
+ if all([new_value, not self._dataflow_job_id, self._execute_context]):
+ # push job_id as soon as it's ready, to let Sensors work before
the job finished
+ # and job_id pushed as returned value item.
+ self.xcom_push(context=self._execute_context,
key="dataflow_job_id", value=new_value)
+ self._dataflow_job_id = new_value
def _cast_dataflow_config(self):
if isinstance(self.dataflow_config, dict):
@@ -346,6 +359,7 @@ class
BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
def execute(self, context: Context):
"""Execute the Apache Beam Python Pipeline."""
+ self._execute_context = context
self._cast_dataflow_config()
self.pipeline_options.setdefault("labels", {}).update(
{"airflow-version": "v" + version.replace(".", "-").replace("+",
"-")}
@@ -540,6 +554,7 @@ class BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
def execute(self, context: Context):
"""Execute the Apache Beam Python Pipeline."""
+ self._execute_context = context
self._cast_dataflow_config()
(
self.is_dataflow,
@@ -738,7 +753,7 @@ class BeamRunGoPipelineOperator(BeamBasePipelineOperator):
"""Execute the Apache Beam Pipeline."""
if not exactly_one(self.go_file, self.launcher_binary):
raise ValueError("Exactly one of `go_file` and `launcher_binary`
must be set")
-
+ self._execute_context = context
self._cast_dataflow_config()
if self.dataflow_config.impersonation_chain:
self.log.warning(
diff --git a/providers/tests/apache/beam/operators/test_beam.py
b/providers/tests/apache/beam/operators/test_beam.py
index 6d1b4b5d1b..fd2e706c29 100644
--- a/providers/tests/apache/beam/operators/test_beam.py
+++ b/providers/tests/apache/beam/operators/test_beam.py
@@ -110,6 +110,27 @@ class TestBeamBasePipelineOperator:
)
assert f"{TASK_ID} completed with response Pipeline has finished
SUCCESSFULLY" in caplog.text
+ def test_early_dataflow_id_xcom_push(self, default_options,
pipeline_options):
+ with mock.patch.object(BeamBasePipelineOperator, "xcom_push") as
mock_xcom_push:
+ op = BeamBasePipelineOperator(
+ **self.default_op_kwargs,
+ default_pipeline_options=copy.deepcopy(default_options),
+ pipeline_options=copy.deepcopy(pipeline_options),
+ dataflow_config={},
+ )
+ sample_df_job_id = "sample_df_job_id_value"
+ op._execute_context = MagicMock()
+
+ assert op.dataflow_job_id is None
+
+ op.dataflow_job_id = sample_df_job_id
+ mock_xcom_push.assert_called_once_with(
+ context=op._execute_context, key="dataflow_job_id",
value=sample_df_job_id
+ )
+ mock_xcom_push.reset_mock()
+ op.dataflow_job_id = "sample_df_job_same_value_id"
+ mock_xcom_push.assert_not_called()
+
class TestBeamRunPythonPipelineOperator:
@pytest.fixture(autouse=True)