Adaverse commented on code in PR #32238:
URL: https://github.com/apache/airflow/pull/32238#discussion_r1245384687
##########
airflow/providers/microsoft/azure/operators/data_factory.py:
##########
@@ -158,24 +160,39 @@ def __init__(
self.check_interval = check_interval
self.deferrable = deferrable
+ self.run_id: str = ""
+ self.pipeline_run_exists = False
+
def execute(self, context: Context) -> None:
self.hook =
AzureDataFactoryHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
self.log.info("Executing the %s pipeline.", self.pipeline_name)
- response = self.hook.run_pipeline(
- pipeline_name=self.pipeline_name,
- resource_group_name=self.resource_group_name,
- factory_name=self.factory_name,
- reference_pipeline_run_id=self.reference_pipeline_run_id,
- is_recovery=self.is_recovery,
- start_activity_name=self.start_activity_name,
- start_from_failure=self.start_from_failure,
- parameters=self.parameters,
- )
- self.run_id = vars(response)["run_id"]
- # Push the ``run_id`` value to XCom regardless of what happens during
execution. This allows for
- # retrieval the executed pipeline's ``run_id`` for downstream tasks
especially if performing an
- # asynchronous wait.
- context["ti"].xcom_push(key="run_id", value=self.run_id)
+
+ skip_pipeline_run = False
+ if self.retries and self.retries > 1:
+ self.run_id = context["ti"].xcom_pull(key="run_id")
+ if self.run_id:
+ pipeline_run: PipelineRun = self.hook.get_pipeline_run(
+ run_id=self.run_id,
+ resource_group_name=self.resource_group_name,
+ factory_name=self.factory_name,
+ )
+
+ if pipeline_run.statuts not in
AzureDataFactoryPipelineRunStatus.FAILURE_STATES:
+ skip_pipeline_run = True
+
+ if not skip_pipeline_run:
+ response = self.hook.run_pipeline(
+ pipeline_name=self.pipeline_name,
+ resource_group_name=self.resource_group_name,
+ factory_name=self.factory_name,
+ reference_pipeline_run_id=self.reference_pipeline_run_id,
+ is_recovery=self.is_recovery,
+ start_activity_name=self.start_activity_name,
+ start_from_failure=self.start_from_failure,
+ parameters=self.parameters,
+ )
+ self.run_id = vars(response)["run_id"]
+ context["ti"].xcom_push(key="run_id", value=self.run_id)
Review Comment:
In a few `execute` methods of Operators, I see we return to push to `xcom`.
Was wondering if is there any advantage of one over the other.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]