kyungjunleeme commented on code in PR #52607:
URL: https://github.com/apache/airflow/pull/52607#discussion_r2176192598
##########
providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py:
##########
@@ -116,25 +116,24 @@ def
test_async_execute_logging_should_execute_successfully(self, caplog):
assert f"{TASK_ID} completed with response Pipeline has finished
SUCCESSFULLY" in caplog.text
def test_early_dataflow_id_xcom_push(self, default_options,
pipeline_options):
- with mock.patch.object(BeamBasePipelineOperator, "xcom_push") as
mock_xcom_push:
- op = BeamBasePipelineOperator(
- **self.default_op_kwargs,
- default_pipeline_options=copy.deepcopy(default_options),
- pipeline_options=copy.deepcopy(pipeline_options),
- dataflow_config={},
- )
- sample_df_job_id = "sample_df_job_id_value"
- op._execute_context = MagicMock()
-
- assert op.dataflow_job_id is None
-
- op.dataflow_job_id = sample_df_job_id
- mock_xcom_push.assert_called_once_with(
- context=op._execute_context, key="dataflow_job_id",
value=sample_df_job_id
- )
- mock_xcom_push.reset_mock()
- op.dataflow_job_id = "sample_df_job_same_value_id"
- mock_xcom_push.assert_not_called()
+ op = BeamBasePipelineOperator(
+ **self.default_op_kwargs,
+ default_pipeline_options=copy.deepcopy(default_options),
+ pipeline_options=copy.deepcopy(pipeline_options),
+ dataflow_config={},
+ )
+ sample_df_job_id = "sample_df_job_id_value"
+ # Mock the task instance with xcom_push method
+ mock_ti = MagicMock()
+ op._execute_context = {"ti": mock_ti}
+
+ assert op.dataflow_job_id is None
+
+ op.dataflow_job_id = sample_df_job_id
+ mock_ti.xcom_push.assert_called_once_with(key="dataflow_job_id",
value=sample_df_job_id)
+ mock_ti.xcom_push.reset_mock()
+ op.dataflow_job_id = "sample_df_job_same_value_id"
+ mock_ti.xcom_push.assert_not_called()
Review Comment:
```suggestion
# Mock TaskInstance and inject into execution context
mock_ti = MagicMock()
op._execute_context = {"ti": mock_ti}
assert op.dataflow_job_id is None
op.dataflow_job_id = sample_df_job_id
assert any(
kwargs.get("key") == "dataflow_job_id" and kwargs.get("value")
== sample_df_job_id
for _, kwargs in mock_ti.xcom_push.call_args_list
)
# If the same value is set again, it should not push to XCom again
mock_ti.reset_mock()
op.dataflow_job_id = sample_df_job_id
assert mock_ti.xcom_push.call_count == 0
```
I do not test, but I think like that
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]