dabla commented on code in PR #60650:
URL: https://github.com/apache/airflow/pull/60650#discussion_r2709518476


##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -69,44 +69,51 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
             },
         )
 
+    def _build_trigger_event(self, pipeline_status: str) -> TriggerEvent | 
None:
+        """Build TriggerEvent based on pipeline status. Returns None if status 
is not terminal."""
+        if pipeline_status == AzureDataFactoryPipelineRunStatus.FAILED:
+            return TriggerEvent({"status": "error", "message": f"Pipeline run 
{self.run_id} has Failed."})
+        if pipeline_status == AzureDataFactoryPipelineRunStatus.CANCELLED:
+            return TriggerEvent(
+                {"status": "error", "message": f"Pipeline run {self.run_id} 
has been Cancelled."}
+            )
+        if pipeline_status == AzureDataFactoryPipelineRunStatus.SUCCEEDED:
+            return TriggerEvent(
+                {"status": "success", "message": f"Pipeline run {self.run_id} 
has been Succeeded."}
+            )
+        return None
+
     async def run(self) -> AsyncIterator[TriggerEvent]:
         """Make async connection to Azure Data Factory, polls for the pipeline 
run status."""
-        hook = 
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
         executed_after_token_refresh = False
         try:
-            while True:
-                try:
-                    pipeline_status = await hook.get_adf_pipeline_run_status(
-                        run_id=self.run_id,
-                        resource_group_name=self.resource_group_name,
-                        factory_name=self.factory_name,
-                    )
-                    executed_after_token_refresh = False
-                    if pipeline_status == 
AzureDataFactoryPipelineRunStatus.FAILED:
-                        yield TriggerEvent(
-                            {"status": "error", "message": f"Pipeline run 
{self.run_id} has Failed."}
+            async with AzureDataFactoryAsyncHook(
+                azure_data_factory_conn_id=self.azure_data_factory_conn_id
+            ) as hook:
+                while True:
+                    try:
+                        pipeline_status = await 
hook.get_adf_pipeline_run_status(
+                            run_id=self.run_id,
+                            resource_group_name=self.resource_group_name,
+                            factory_name=self.factory_name,
                         )
-                        return
-                    elif pipeline_status == 
AzureDataFactoryPipelineRunStatus.CANCELLED:
-                        msg = f"Pipeline run {self.run_id} has been Cancelled."
-                        yield TriggerEvent({"status": "error", "message": msg})
-                        return
-                    elif pipeline_status == 
AzureDataFactoryPipelineRunStatus.SUCCEEDED:
-                        msg = f"Pipeline run {self.run_id} has been Succeeded."
-                        yield TriggerEvent({"status": "success", "message": 
msg})
-                        return
-                    await asyncio.sleep(self.poke_interval)
-                except ServiceRequestError:
-                    # conn might expire during long running pipeline.
-                    # If exception is caught, it tries to refresh connection 
once.
-                    # If it still doesn't fix the issue,
-                    # than the execute_after_token_refresh would still be False
-                    # and an exception will be raised
-                    if executed_after_token_refresh:
-                        await hook.refresh_conn()
                         executed_after_token_refresh = False
-                    else:
-                        raise
+                        event = self._build_trigger_event(pipeline_status)
+                        if event:
+                            yield event
+                            return
+                        await asyncio.sleep(self.poke_interval)
+                    except ServiceRequestError:
+                        # conn might expire during long running pipeline.
+                        # If exception is caught, it tries to refresh 
connection once.
+                        # If it still doesn't fix the issue,
+                        # than the execute_after_token_refresh would still be 
False
+                        # and an exception will be raised
+                        if executed_after_token_refresh:

Review Comment:
   Nit, but I would write it like this:
   
   ```
   if not executed_after_token_refresh:
        raise
   await hook.refresh_conn()
   executed_after_token_refresh = False                            
   ```
   
   



##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -160,84 +167,94 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
             },
         )
 
+    def _build_trigger_event(self, pipeline_status: str) -> TriggerEvent | 
None:
+        """Build TriggerEvent based on pipeline status. Returns None if status 
is not terminal."""
+        if pipeline_status in AzureDataFactoryPipelineRunStatus.FAILURE_STATES:
+            return TriggerEvent(
+                {
+                    "status": "error",
+                    "message": f"The pipeline run {self.run_id} has 
{pipeline_status}.",
+                    "run_id": self.run_id,
+                }
+            )
+        if pipeline_status == AzureDataFactoryPipelineRunStatus.SUCCEEDED:
+            return TriggerEvent(
+                {
+                    "status": "success",
+                    "message": f"The pipeline run {self.run_id} has 
{pipeline_status}.",
+                    "run_id": self.run_id,
+                }
+            )
+        return None
+
     async def run(self) -> AsyncIterator[TriggerEvent]:
         """Make async connection to Azure Data Factory, polls for the pipeline 
run status."""
-        hook = 
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
-        try:
-            pipeline_status = await hook.get_adf_pipeline_run_status(
-                run_id=self.run_id,
-                resource_group_name=self.resource_group_name,
-                factory_name=self.factory_name,
-            )
-            executed_after_token_refresh = True
-            if self.wait_for_termination:
-                while self.end_time > time.time():
+        async with AzureDataFactoryAsyncHook(
+            azure_data_factory_conn_id=self.azure_data_factory_conn_id
+        ) as hook:
+            try:
+                pipeline_status = await hook.get_adf_pipeline_run_status(
+                    run_id=self.run_id,
+                    resource_group_name=self.resource_group_name,
+                    factory_name=self.factory_name,
+                )
+                executed_after_token_refresh = True
+                if self.wait_for_termination:
+                    while self.end_time > time.time():
+                        try:
+                            pipeline_status = await 
hook.get_adf_pipeline_run_status(
+                                run_id=self.run_id,
+                                resource_group_name=self.resource_group_name,
+                                factory_name=self.factory_name,
+                            )
+                            executed_after_token_refresh = True
+                            event = self._build_trigger_event(pipeline_status)
+                            if event:
+                                yield event
+                                return
+                            self.log.info(
+                                "Sleeping for %s. The pipeline state is %s.",
+                                self.check_interval,
+                                pipeline_status,
+                            )
+                            await asyncio.sleep(self.check_interval)
+                        except ServiceRequestError:
+                            # conn might expire during long running pipeline.
+                            # If exception is caught, it tries to refresh 
connection once.
+                            # If it still doesn't fix the issue,
+                            # than the execute_after_token_refresh would still 
be False
+                            # and an exception will be raised
+                            if executed_after_token_refresh:

Review Comment:
   Same here:
   
   ```
   if not executed_after_token_refresh:
        raise
   await hook.refresh_conn()
   executed_after_token_refresh = False                            
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to