josh-fell commented on code in PR #30074:
URL: https://github.com/apache/airflow/pull/30074#discussion_r1138581704
##########
docs/apache-airflow-providers-microsoft-azure/operators/adf_run_pipeline.rst:
##########
@@ -37,6 +37,15 @@ Below is an example of using this operator to execute an
Azure Data Factory pipe
:start-after: [START howto_operator_adf_run_pipeline]
:end-before: [END howto_operator_adf_run_pipeline]
+Below is an example of using this operator to execute an Azure Data Factory
pipeline with a deferrable flag
+so that polling for the status of the pipeline occurs on the Airflow Triggerer.
+
+ .. exampleinclude::
/../../tests/system/providers/microsoft/azure/example_adf_run_pipeline.py
+ :language: python
+ :dedent: 0
Review Comment:
```suggestion
:dedent: 4
```
So the code snippet doesn't have extra whitespace on its left side in the
docs.
##########
airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -30,7 +31,6 @@ class ADFPipelineRunStatusSensorTrigger(BaseTrigger):
"""
ADFPipelineRunStatusSensorTrigger is fired as deferred class with params
to run the
task in trigger worker, when ADF Pipeline is running
-
Review Comment:
There needs to be an empty between docstring description and parameter
directives to make sure the params are rendered correctly in the Python API
docs.
##########
airflow/providers/microsoft/azure/triggers/data_factory.py:
##########
@@ -89,3 +89,119 @@ async def run(self) -> AsyncIterator["TriggerEvent"]:
await asyncio.sleep(self.poke_interval)
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
+
+
+class AzureDataFactoryTrigger(BaseTrigger):
+ """
+ AzureDataFactoryTrigger is triggered when Azure data factory pipeline job
succeeded or failed.
+ When wait_for_termination is set to False it triggered immediately with
success status
+
+ :param run_id: Run id of a Azure data pipeline run job.
+ :param azure_data_factory_conn_id: The connection identifier for
connecting to Azure Data Factory.
+ :param end_time: Time in seconds when triggers will timeout.
+ :param resource_group_name: The resource group name.
+ :param factory_name: The data factory name.
+ :param wait_for_termination: Flag to wait on a pipeline run's termination.
+ :param check_interval: Time in seconds to check on a pipeline run's status.
+ """
+
+ QUEUED = "Queued"
+ IN_PROGRESS = "InProgress"
+ SUCCEEDED = "Succeeded"
+ FAILED = "Failed"
+ CANCELING = "Canceling"
+ CANCELLED = "Cancelled"
+
+ INTERMEDIATE_STATES: list[str] = [QUEUED, IN_PROGRESS, CANCELING]
+ FAILURE_STATES: list[str] = [FAILED, CANCELLED]
+ SUCCESS_STATES: list[str] = [SUCCEEDED]
+ TERMINAL_STATUSES: list[str] = [CANCELLED, FAILED, SUCCEEDED]
Review Comment:
[`AzureDataFactoryPipelineRunStatus`](https://github.com/apache/airflow/blob/f6472c83eb5c50b567d51e3a8869df5442f6953c/airflow/providers/microsoft/azure/hooks/data_factory.py#L111)
already exists so let's use that. No problem augmenting the status groups too
with `INTERMEDIATE_STATUSES` and `FAILURE_STATUSES`. I don't think
`SUCCESS_STATES` is needed though since the logic could be `pipeline_status ==
AzureDataFactoryPipelineRunStatus.SUCCESS`. `TERMINAL_STATUSES` already exists
in AzureDataFactoryPipelineRunStatus.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]