dabla commented on code in PR #60650:
URL: https://github.com/apache/airflow/pull/60650#discussion_r2704780853


##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -74,39 +74,40 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
         hook = 
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
         executed_after_token_refresh = False
         try:
-            while True:
-                try:
-                    pipeline_status = await hook.get_adf_pipeline_run_status(
-                        run_id=self.run_id,
-                        resource_group_name=self.resource_group_name,
-                        factory_name=self.factory_name,
-                    )
-                    executed_after_token_refresh = False
-                    if pipeline_status == 
AzureDataFactoryPipelineRunStatus.FAILED:
-                        yield TriggerEvent(
-                            {"status": "error", "message": f"Pipeline run 
{self.run_id} has Failed."}
+            async with hook:
+                while True:
+                    try:
+                        pipeline_status = await 
hook.get_adf_pipeline_run_status(
+                            run_id=self.run_id,
+                            resource_group_name=self.resource_group_name,
+                            factory_name=self.factory_name,
                         )
-                        return
-                    elif pipeline_status == 
AzureDataFactoryPipelineRunStatus.CANCELLED:
-                        msg = f"Pipeline run {self.run_id} has been Cancelled."
-                        yield TriggerEvent({"status": "error", "message": msg})
-                        return
-                    elif pipeline_status == 
AzureDataFactoryPipelineRunStatus.SUCCEEDED:
-                        msg = f"Pipeline run {self.run_id} has been Succeeded."
-                        yield TriggerEvent({"status": "success", "message": 
msg})
-                        return
-                    await asyncio.sleep(self.poke_interval)
-                except ServiceRequestError:
-                    # conn might expire during long running pipeline.
-                    # If exception is caught, it tries to refresh connection 
once.
-                    # If it still doesn't fix the issue,
-                    # than the execute_after_token_refresh would still be False
-                    # and an exception will be raised
-                    if executed_after_token_refresh:
-                        await hook.refresh_conn()
                         executed_after_token_refresh = False
-                    else:
-                        raise
+                        if pipeline_status == 
AzureDataFactoryPipelineRunStatus.FAILED:

Review Comment:
   Maybe write a helper method which constructs the TriggerEvent base on the 
pipeline_status so then you could yield the resulting event from it and 
encapsulating the if/else in a dedicated method.
   
   ```
   def trigger_event_from_pipline_status(self, pipeline_status: 
AzureDataFactoryPipelineRunStatus) -> TriggerEvent:
       if pipeline_status == AzureDataFactoryPipelineRunStatus.FAILED:
           return TriggerEvent(
               {"status": "error", "message": f"Pipeline run {self.run_id} has 
Failed."}
           )
       if pipeline_status == AzureDataFactoryPipelineRunStatus.CANCELLED:
           msg = f"Pipeline run {self.run_id} has been Cancelled."
           return TriggerEvent({"status": "error", "message": msg})
       if pipeline_status == AzureDataFactoryPipelineRunStatus.SUCCEEDED:
           msg = f"Pipeline run {self.run_id} has been Succeeded."
           return TriggerEvent({"status": "success", "message": msg})
           
   ...
   
           event = self.trigger_event_from_pipline_status(pipeline_status)
           yield event
   ```



##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -164,70 +165,73 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
         """Make async connection to Azure Data Factory, polls for the pipeline 
run status."""
         hook = 
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
         try:
-            pipeline_status = await hook.get_adf_pipeline_run_status(
-                run_id=self.run_id,
-                resource_group_name=self.resource_group_name,
-                factory_name=self.factory_name,
-            )
-            executed_after_token_refresh = True
-            if self.wait_for_termination:
-                while self.end_time > time.time():
-                    try:
-                        pipeline_status = await 
hook.get_adf_pipeline_run_status(
-                            run_id=self.run_id,
-                            resource_group_name=self.resource_group_name,
-                            factory_name=self.factory_name,
-                        )
-                        executed_after_token_refresh = True
-                        if pipeline_status in 
AzureDataFactoryPipelineRunStatus.FAILURE_STATES:
-                            yield TriggerEvent(
-                                {
-                                    "status": "error",
-                                    "message": f"The pipeline run 
{self.run_id} has {pipeline_status}.",
-                                    "run_id": self.run_id,
-                                }
+            async with hook:

Review Comment:
   Same here:
   
   `with 
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
 as hook:`



##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -240,4 +244,6 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
                     )
                 except Exception:
                     self.log.exception("Failed to cancel pipeline run %s", 
self.run_id)
+                finally:
+                    await hook.close()

Review Comment:
   Do we still need to close when we use the async context manager?  I would 
expect this now the handled by the context manager?



##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -74,39 +74,40 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
         hook = 
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
         executed_after_token_refresh = False
         try:
-            while True:
-                try:
-                    pipeline_status = await hook.get_adf_pipeline_run_status(
-                        run_id=self.run_id,
-                        resource_group_name=self.resource_group_name,
-                        factory_name=self.factory_name,
-                    )
-                    executed_after_token_refresh = False
-                    if pipeline_status == 
AzureDataFactoryPipelineRunStatus.FAILED:
-                        yield TriggerEvent(
-                            {"status": "error", "message": f"Pipeline run 
{self.run_id} has Failed."}
+            async with hook:

Review Comment:
   wouldn't it be nicer to write this like:
   
   `with 
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
 as hook:`



##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -164,70 +165,73 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
         """Make async connection to Azure Data Factory, polls for the pipeline 
run status."""
         hook = 
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
         try:
-            pipeline_status = await hook.get_adf_pipeline_run_status(
-                run_id=self.run_id,
-                resource_group_name=self.resource_group_name,
-                factory_name=self.factory_name,
-            )
-            executed_after_token_refresh = True
-            if self.wait_for_termination:
-                while self.end_time > time.time():
-                    try:
-                        pipeline_status = await 
hook.get_adf_pipeline_run_status(
-                            run_id=self.run_id,
-                            resource_group_name=self.resource_group_name,
-                            factory_name=self.factory_name,
-                        )
-                        executed_after_token_refresh = True
-                        if pipeline_status in 
AzureDataFactoryPipelineRunStatus.FAILURE_STATES:
-                            yield TriggerEvent(
-                                {
-                                    "status": "error",
-                                    "message": f"The pipeline run 
{self.run_id} has {pipeline_status}.",
-                                    "run_id": self.run_id,
-                                }
+            async with hook:
+                pipeline_status = await hook.get_adf_pipeline_run_status(
+                    run_id=self.run_id,
+                    resource_group_name=self.resource_group_name,
+                    factory_name=self.factory_name,
+                )
+                executed_after_token_refresh = True
+                if self.wait_for_termination:
+                    while self.end_time > time.time():
+                        try:
+                            pipeline_status = await 
hook.get_adf_pipeline_run_status(
+                                run_id=self.run_id,
+                                resource_group_name=self.resource_group_name,
+                                factory_name=self.factory_name,
                             )
-                            return
-                        elif pipeline_status == 
AzureDataFactoryPipelineRunStatus.SUCCEEDED:

Review Comment:
   Same remark here regarding building trigger event



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to