pankajastro commented on code in PR #32238:
URL: https://github.com/apache/airflow/pull/32238#discussion_r1245664110


##########
airflow/providers/microsoft/azure/operators/data_factory.py:
##########
@@ -158,24 +160,39 @@ def __init__(
         self.check_interval = check_interval
         self.deferrable = deferrable
 
+        self.run_id: str = ""
+        self.pipeline_run_exists = False
+
     def execute(self, context: Context) -> None:
         self.hook = 
AzureDataFactoryHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
         self.log.info("Executing the %s pipeline.", self.pipeline_name)
-        response = self.hook.run_pipeline(
-            pipeline_name=self.pipeline_name,
-            resource_group_name=self.resource_group_name,
-            factory_name=self.factory_name,
-            reference_pipeline_run_id=self.reference_pipeline_run_id,
-            is_recovery=self.is_recovery,
-            start_activity_name=self.start_activity_name,
-            start_from_failure=self.start_from_failure,
-            parameters=self.parameters,
-        )
-        self.run_id = vars(response)["run_id"]
-        # Push the ``run_id`` value to XCom regardless of what happens during 
execution. This allows for
-        # retrieval the executed pipeline's ``run_id`` for downstream tasks 
especially if performing an
-        # asynchronous wait.
-        context["ti"].xcom_push(key="run_id", value=self.run_id)
+
+        skip_pipeline_run = False
+        if self.retries and self.retries > 1:
+            self.run_id = context["ti"].xcom_pull(key="run_id")
+            if self.run_id:
+                pipeline_run: PipelineRun = self.hook.get_pipeline_run(
+                    run_id=self.run_id,
+                    resource_group_name=self.resource_group_name,
+                    factory_name=self.factory_name,
+                )
+
+                if pipeline_run.statuts not in 
AzureDataFactoryPipelineRunStatus.FAILURE_STATES:
+                    skip_pipeline_run = True
+
+        if not skip_pipeline_run:
+            response = self.hook.run_pipeline(
+                pipeline_name=self.pipeline_name,
+                resource_group_name=self.resource_group_name,
+                factory_name=self.factory_name,
+                reference_pipeline_run_id=self.reference_pipeline_run_id,
+                is_recovery=self.is_recovery,
+                start_activity_name=self.start_activity_name,
+                start_from_failure=self.start_from_failure,
+                parameters=self.parameters,
+            )
+            self.run_id = vars(response)["run_id"]
+            context["ti"].xcom_push(key="run_id", value=self.run_id)

Review Comment:
   if you return it from execute method it will get stored with the 
"return_value" key but with `context["ti"].xcom_push(...)` you can have a 
custom key name for example run_id in this case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to