phanikumv commented on code in PR #32323:
URL: https://github.com/apache/airflow/pull/32323#discussion_r1251127914


##########
airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -147,33 +162,47 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
                 resource_group_name=self.resource_group_name,
                 factory_name=self.factory_name,
             )
+            executed_after_token_refresh = True
             if self.wait_for_termination:
                 while self.end_time > time.time():
-                    pipeline_status = await hook.get_adf_pipeline_run_status(
-                        run_id=self.run_id,
-                        resource_group_name=self.resource_group_name,
-                        factory_name=self.factory_name,
-                    )
-                    if pipeline_status in 
AzureDataFactoryPipelineRunStatus.FAILURE_STATES:
-                        yield TriggerEvent(
-                            {
-                                "status": "error",
-                                "message": f"The pipeline run {self.run_id} 
has {pipeline_status}.",
-                                "run_id": self.run_id,
-                            }
+                    try:
+                        pipeline_status = await 
hook.get_adf_pipeline_run_status(
+                            run_id=self.run_id,
+                            resource_group_name=self.resource_group_name,
+                            factory_name=self.factory_name,
                         )
-                    elif pipeline_status == 
AzureDataFactoryPipelineRunStatus.SUCCEEDED:
-                        yield TriggerEvent(
-                            {
-                                "status": "success",
-                                "message": f"The pipeline run {self.run_id} 
has {pipeline_status}.",
-                                "run_id": self.run_id,
-                            }
+                        executed_after_token_refresh = True
+                        if pipeline_status in 
AzureDataFactoryPipelineRunStatus.FAILURE_STATES:
+                            yield TriggerEvent(
+                                {
+                                    "status": "error",
+                                    "message": f"The pipeline run 
{self.run_id} has {pipeline_status}.",
+                                    "run_id": self.run_id,
+                                }
+                            )
+                        elif pipeline_status == 
AzureDataFactoryPipelineRunStatus.SUCCEEDED:
+                            yield TriggerEvent(
+                                {
+                                    "status": "success",
+                                    "message": f"The pipeline run 
{self.run_id} has {pipeline_status}.",
+                                    "run_id": self.run_id,
+                                }
+                            )
+                        self.log.info(
+                            "Sleeping for %s. The pipeline state is %s.", 
self.check_interval, pipeline_status
                         )
-                    self.log.info(
-                        "Sleeping for %s. The pipeline state is %s.", 
self.check_interval, pipeline_status
-                    )
-                    await asyncio.sleep(self.check_interval)
+                        await asyncio.sleep(self.check_interval)
+                    except Exception:
+                        # conn might expire during long running pipeline.
+                        # If expcetion is caught, it tries to refresh 
connection once.

Review Comment:
   ```suggestion
                           # If exception is caught, it tries to refresh 
connection once.
   ```



##########
airflow/providers/microsoft/azure/hooks/data_factory.py:
##########
@@ -828,7 +833,14 @@ def wait_for_pipeline_run_status(
             # Wait to check the status of the pipeline run based on the 
``check_interval`` configured.
             time.sleep(check_interval)
 
-            pipeline_run_status = 
self.get_pipeline_run_status(**pipeline_run_info)
+            try:
+                pipeline_run_status = 
self.get_pipeline_run_status(**pipeline_run_info)
+                executed_after_token_refresh = True
+            except Exception:

Review Comment:
   Can we try using a specific exception instead of using `Exception` class 
which is too generic



##########
airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -68,24 +68,39 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
     async def run(self) -> AsyncIterator[TriggerEvent]:
         """Make async connection to Azure Data Factory, polls for the pipeline 
run status."""
         hook = 
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
+        executed_after_token_refresh = False
         try:
             while True:
-                pipeline_status = await hook.get_adf_pipeline_run_status(
-                    run_id=self.run_id,
-                    resource_group_name=self.resource_group_name,
-                    factory_name=self.factory_name,
-                )
-                if pipeline_status == AzureDataFactoryPipelineRunStatus.FAILED:
-                    yield TriggerEvent(
-                        {"status": "error", "message": f"Pipeline run 
{self.run_id} has Failed."}
+                try:
+                    pipeline_status = await hook.get_adf_pipeline_run_status(
+                        run_id=self.run_id,
+                        resource_group_name=self.resource_group_name,
+                        factory_name=self.factory_name,
                     )
-                elif pipeline_status == 
AzureDataFactoryPipelineRunStatus.CANCELLED:
-                    msg = f"Pipeline run {self.run_id} has been Cancelled."
-                    yield TriggerEvent({"status": "error", "message": msg})
-                elif pipeline_status == 
AzureDataFactoryPipelineRunStatus.SUCCEEDED:
-                    msg = f"Pipeline run {self.run_id} has been Succeeded."
-                    yield TriggerEvent({"status": "success", "message": msg})
-                await asyncio.sleep(self.poke_interval)
+                    executed_after_token_refresh = False
+                    if pipeline_status == 
AzureDataFactoryPipelineRunStatus.FAILED:
+                        yield TriggerEvent(
+                            {"status": "error", "message": f"Pipeline run 
{self.run_id} has Failed."}
+                        )
+                    elif pipeline_status == 
AzureDataFactoryPipelineRunStatus.CANCELLED:
+                        msg = f"Pipeline run {self.run_id} has been Cancelled."
+                        yield TriggerEvent({"status": "error", "message": msg})
+                    elif pipeline_status == 
AzureDataFactoryPipelineRunStatus.SUCCEEDED:
+                        msg = f"Pipeline run {self.run_id} has been Succeeded."
+                        yield TriggerEvent({"status": "success", "message": 
msg})
+                    await asyncio.sleep(self.poke_interval)
+
+                except Exception:
+                    # conn might expire during long running pipeline.
+                    # If expcetion is caught, it tries to refresh connection 
once.

Review Comment:
   ```suggestion
                       # If exception is caught, it tries to refresh connection 
once.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to