phanikumv commented on code in PR #32323:
URL: https://github.com/apache/airflow/pull/32323#discussion_r1251127914
##########
airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -147,33 +162,47 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
resource_group_name=self.resource_group_name,
factory_name=self.factory_name,
)
+ executed_after_token_refresh = True
if self.wait_for_termination:
while self.end_time > time.time():
- pipeline_status = await hook.get_adf_pipeline_run_status(
- run_id=self.run_id,
- resource_group_name=self.resource_group_name,
- factory_name=self.factory_name,
- )
- if pipeline_status in
AzureDataFactoryPipelineRunStatus.FAILURE_STATES:
- yield TriggerEvent(
- {
- "status": "error",
- "message": f"The pipeline run {self.run_id}
has {pipeline_status}.",
- "run_id": self.run_id,
- }
+ try:
+ pipeline_status = await
hook.get_adf_pipeline_run_status(
+ run_id=self.run_id,
+ resource_group_name=self.resource_group_name,
+ factory_name=self.factory_name,
)
- elif pipeline_status ==
AzureDataFactoryPipelineRunStatus.SUCCEEDED:
- yield TriggerEvent(
- {
- "status": "success",
- "message": f"The pipeline run {self.run_id}
has {pipeline_status}.",
- "run_id": self.run_id,
- }
+ executed_after_token_refresh = True
+ if pipeline_status in
AzureDataFactoryPipelineRunStatus.FAILURE_STATES:
+ yield TriggerEvent(
+ {
+ "status": "error",
+ "message": f"The pipeline run
{self.run_id} has {pipeline_status}.",
+ "run_id": self.run_id,
+ }
+ )
+ elif pipeline_status ==
AzureDataFactoryPipelineRunStatus.SUCCEEDED:
+ yield TriggerEvent(
+ {
+ "status": "success",
+ "message": f"The pipeline run
{self.run_id} has {pipeline_status}.",
+ "run_id": self.run_id,
+ }
+ )
+ self.log.info(
+ "Sleeping for %s. The pipeline state is %s.",
self.check_interval, pipeline_status
)
- self.log.info(
- "Sleeping for %s. The pipeline state is %s.",
self.check_interval, pipeline_status
- )
- await asyncio.sleep(self.check_interval)
+ await asyncio.sleep(self.check_interval)
+ except Exception:
+ # conn might expire during long running pipeline.
+ # If expcetion is caught, it tries to refresh
connection once.
Review Comment:
```suggestion
# If exception is caught, it tries to refresh
connection once.
```
##########
airflow/providers/microsoft/azure/hooks/data_factory.py:
##########
@@ -828,7 +833,14 @@ def wait_for_pipeline_run_status(
# Wait to check the status of the pipeline run based on the
``check_interval`` configured.
time.sleep(check_interval)
- pipeline_run_status =
self.get_pipeline_run_status(**pipeline_run_info)
+ try:
+ pipeline_run_status =
self.get_pipeline_run_status(**pipeline_run_info)
+ executed_after_token_refresh = True
+ except Exception:
Review Comment:
Can we try using a specific exception instead of using `Exception` class
which is too generic
##########
airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -68,24 +68,39 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make async connection to Azure Data Factory, polls for the pipeline
run status."""
hook =
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
+ executed_after_token_refresh = False
try:
while True:
- pipeline_status = await hook.get_adf_pipeline_run_status(
- run_id=self.run_id,
- resource_group_name=self.resource_group_name,
- factory_name=self.factory_name,
- )
- if pipeline_status == AzureDataFactoryPipelineRunStatus.FAILED:
- yield TriggerEvent(
- {"status": "error", "message": f"Pipeline run
{self.run_id} has Failed."}
+ try:
+ pipeline_status = await hook.get_adf_pipeline_run_status(
+ run_id=self.run_id,
+ resource_group_name=self.resource_group_name,
+ factory_name=self.factory_name,
)
- elif pipeline_status ==
AzureDataFactoryPipelineRunStatus.CANCELLED:
- msg = f"Pipeline run {self.run_id} has been Cancelled."
- yield TriggerEvent({"status": "error", "message": msg})
- elif pipeline_status ==
AzureDataFactoryPipelineRunStatus.SUCCEEDED:
- msg = f"Pipeline run {self.run_id} has been Succeeded."
- yield TriggerEvent({"status": "success", "message": msg})
- await asyncio.sleep(self.poke_interval)
+ executed_after_token_refresh = False
+ if pipeline_status ==
AzureDataFactoryPipelineRunStatus.FAILED:
+ yield TriggerEvent(
+ {"status": "error", "message": f"Pipeline run
{self.run_id} has Failed."}
+ )
+ elif pipeline_status ==
AzureDataFactoryPipelineRunStatus.CANCELLED:
+ msg = f"Pipeline run {self.run_id} has been Cancelled."
+ yield TriggerEvent({"status": "error", "message": msg})
+ elif pipeline_status ==
AzureDataFactoryPipelineRunStatus.SUCCEEDED:
+ msg = f"Pipeline run {self.run_id} has been Succeeded."
+ yield TriggerEvent({"status": "success", "message":
msg})
+ await asyncio.sleep(self.poke_interval)
+
+ except Exception:
+ # conn might expire during long running pipeline.
+ # If expcetion is caught, it tries to refresh connection
once.
Review Comment:
```suggestion
# If exception is caught, it tries to refresh connection
once.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]