pankajastro commented on code in PR #30074:
URL: https://github.com/apache/airflow/pull/30074#discussion_r1136610185
##########
airflow/providers/microsoft/azure/operators/data_factory.py:
##########
@@ -169,21 +176,55 @@ def execute(self, context: Context) -> None:
context["ti"].xcom_push(key="run_id", value=self.run_id)
if self.wait_for_termination:
- self.log.info("Waiting for pipeline run %s to terminate.",
self.run_id)
-
- if self.hook.wait_for_pipeline_run_status(
- run_id=self.run_id,
- expected_statuses=AzureDataFactoryPipelineRunStatus.SUCCEEDED,
- check_interval=self.check_interval,
- timeout=self.timeout,
- resource_group_name=self.resource_group_name,
- factory_name=self.factory_name,
- ):
- self.log.info("Pipeline run %s has completed successfully.",
self.run_id)
+ if self.deferrable is False:
+ self.log.info("Waiting for pipeline run %s to terminate.",
self.run_id)
+
+ if self.hook.wait_for_pipeline_run_status(
+ run_id=self.run_id,
+
expected_statuses=AzureDataFactoryPipelineRunStatus.SUCCEEDED,
+ check_interval=self.check_interval,
+ timeout=self.timeout,
+ resource_group_name=self.resource_group_name,
+ factory_name=self.factory_name,
+ ):
+ self.log.info("Pipeline run %s has completed
successfully.", self.run_id)
+ else:
+ raise AzureDataFactoryPipelineRunException(
+ f"Pipeline run {self.run_id} has failed or has been
cancelled."
+ )
else:
- raise AzureDataFactoryPipelineRunException(
- f"Pipeline run {self.run_id} has failed or has been
cancelled."
+ end_time = time.time() + self.timeout
+ self.defer(
+ timeout=self.execution_timeout,
+ trigger=AzureDataFactoryTrigger(
+
azure_data_factory_conn_id=self.azure_data_factory_conn_id,
+ run_id=self.run_id,
+ wait_for_termination=self.wait_for_termination,
+ resource_group_name=self.resource_group_name,
+ factory_name=self.factory_name,
+ check_interval=self.check_interval,
+ end_time=end_time,
+ ),
+ method_name="execute_complete",
+ )
+ else:
+ if self.deferrable is True:
+ warnings.warn(
Review Comment:
So if `wait_for_termination` is false and `deferable` is true then the
operator will succeed without actually running the pipeline? This behaviour
does not look right to me.
I think rather we should allow users to set either wait_for_termination or
deferable as true at a time and if both are set to true raise an error/warning
if wait_for_termination is set to true run in sync mode and if deferable is set
to true run in async mode. WDYT?
##########
airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -89,3 +89,119 @@ async def run(self) -> AsyncIterator["TriggerEvent"]:
await asyncio.sleep(self.poke_interval)
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
+
+
+class AzureDataFactoryTrigger(BaseTrigger):
+ """
+ AzureDataFactoryTrigger is triggered when Azure data factory pipeline job
succeeded or failed.
+ When wait_for_termination is set to False it triggered immediately with
success status
+
+ :param run_id: Run id of a Azure data pipeline run job.
+ :param azure_data_factory_conn_id: The connection identifier for
connecting to Azure Data Factory.
+ :param end_time: Time in seconds when triggers will timeout.
+ :param resource_group_name: The resource group name.
+ :param factory_name: The data factory name.
+ :param wait_for_termination: Flag to wait on a pipeline run's termination.
+ :param check_interval: Time in seconds to check on a pipeline run's status.
+ """
+
+ QUEUED = "Queued"
+ IN_PROGRESS = "InProgress"
+ SUCCEEDED = "Succeeded"
+ FAILED = "Failed"
+ CANCELING = "Canceling"
+ CANCELLED = "Cancelled"
+
+ INTERMEDIATE_STATES: list[str] = [QUEUED, IN_PROGRESS, CANCELING]
+ FAILURE_STATES: list[str] = [FAILED, CANCELLED]
+ SUCCESS_STATES: list[str] = [SUCCEEDED]
+ TERMINAL_STATUSES: list[str] = [CANCELLED, FAILED, SUCCEEDED]
+
+ def __init__(
+ self,
+ run_id: str,
+ azure_data_factory_conn_id: str,
+ end_time: float,
+ resource_group_name: str | None = None,
+ factory_name: str | None = None,
+ wait_for_termination: bool = True,
+ check_interval: int = 60,
+ ):
+ super().__init__()
+ self.azure_data_factory_conn_id = azure_data_factory_conn_id
+ self.check_interval = check_interval
+ self.run_id = run_id
+ self.wait_for_termination = wait_for_termination
+ self.resource_group_name = resource_group_name
+ self.factory_name = factory_name
+ self.end_time = end_time
+
+ def serialize(self) -> tuple[str, dict[str, Any]]:
+ """Serializes AzureDataFactoryTrigger arguments and classpath."""
+ return (
+
"airflow.providers.microsoft.azure.triggers.data_factory.AzureDataFactoryTrigger",
+ {
+ "azure_data_factory_conn_id": self.azure_data_factory_conn_id,
+ "check_interval": self.check_interval,
+ "run_id": self.run_id,
+ "wait_for_termination": self.wait_for_termination,
+ "resource_group_name": self.resource_group_name,
+ "factory_name": self.factory_name,
+ "end_time": self.end_time,
+ },
+ )
+
+ async def run(self) -> AsyncIterator["TriggerEvent"]:
+ """Make async connection to Azure Data Factory, polls for the pipeline
run status"""
+ hook =
AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
+ try:
+ pipeline_status = await hook.get_adf_pipeline_run_status(
+ run_id=self.run_id,
+ resource_group_name=self.resource_group_name,
+ factory_name=self.factory_name,
+ )
+ if self.wait_for_termination:
Review Comment:
since this is a deferable operator does not make sense to wait always,
`self.wait_for_termination` should be always true?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]