This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2ce51ac5d3 Cancel pipeline if unexpected exception caught (#32238)
2ce51ac5d3 is described below
commit 2ce51ac5d3b1e3bcb253b04bd72c04cfb2af700a
Author: Wei Lee <[email protected]>
AuthorDate: Thu Jun 29 23:46:18 2023 +0800
Cancel pipeline if unexpected exception caught (#32238)
---
.../microsoft/azure/hooks/data_factory.py | 22 ++++++++++++++++++++++
.../microsoft/azure/triggers/data_factory.py | 7 +++++++
.../azure/operators/test_azure_data_factory.py | 2 +-
.../azure/triggers/test_azure_data_factory.py | 6 +++++-
4 files changed, 35 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/microsoft/azure/hooks/data_factory.py
b/airflow/providers/microsoft/azure/hooks/data_factory.py
index da8455daa2..b9e7feea0a 100644
--- a/airflow/providers/microsoft/azure/hooks/data_factory.py
+++ b/airflow/providers/microsoft/azure/hooks/data_factory.py
@@ -1175,3 +1175,25 @@ class AzureDataFactoryAsyncHook(AzureDataFactoryHook):
return status
except Exception as e:
raise AirflowException(e)
+
+ @provide_targeted_factory_async
+ async def cancel_pipeline_run(
+ self,
+ run_id: str,
+ resource_group_name: str | None = None,
+ factory_name: str | None = None,
+ **config: Any,
+ ) -> None:
+ """
+ Cancel the pipeline run.
+
+ :param run_id: The pipeline run identifier.
+ :param resource_group_name: The resource group name.
+ :param factory_name: The factory name.
+ :param config: Extra parameters for the ADF client.
+ """
+ client = await self.get_async_conn()
+ try:
+ await client.pipeline_runs.cancel(resource_group_name,
factory_name, run_id)
+ except Exception as e:
+ raise AirflowException(e)
diff --git a/airflow/providers/microsoft/azure/triggers/data_factory.py
b/airflow/providers/microsoft/azure/triggers/data_factory.py
index c68d6e96b7..40b9555940 100644
--- a/airflow/providers/microsoft/azure/triggers/data_factory.py
+++ b/airflow/providers/microsoft/azure/triggers/data_factory.py
@@ -191,4 +191,11 @@ class AzureDataFactoryTrigger(BaseTrigger):
}
)
except Exception as e:
+ if self.run_id:
+ await hook.cancel_pipeline_run(
+ run_id=self.run_id,
+ resource_group_name=self.resource_group_name,
+ factory_name=self.factory_name,
+ )
+ self.log.info("Unexpected error %s caught. Cancel pipeline run
%s", str(e), self.run_id)
yield TriggerEvent({"status": "error", "message": str(e),
"run_id": self.run_id})
diff --git
a/tests/providers/microsoft/azure/operators/test_azure_data_factory.py
b/tests/providers/microsoft/azure/operators/test_azure_data_factory.py
index 461c1502c6..2a545cd137 100644
--- a/tests/providers/microsoft/azure/operators/test_azure_data_factory.py
+++ b/tests/providers/microsoft/azure/operators/test_azure_data_factory.py
@@ -363,7 +363,7 @@ class TestAzureDataFactoryRunPipelineOperatorWithDeferrable:
)
@mock.patch("airflow.providers.microsoft.azure.hooks.data_factory.AzureDataFactoryHook.run_pipeline")
def test_azure_data_factory_run_pipeline_operator_async(self,
mock_run_pipeline, mock_get_status, status):
- """Assert that AzureDataFactoryRunPipelineOperatorAsync deferred"""
+ """Assert that AzureDataFactoryRunPipelineOperator(...,
deferrable=True) deferred"""
class CreateRunResponse:
pass
diff --git
a/tests/providers/microsoft/azure/triggers/test_azure_data_factory.py
b/tests/providers/microsoft/azure/triggers/test_azure_data_factory.py
index c7c31d2818..9df51bae71 100644
--- a/tests/providers/microsoft/azure/triggers/test_azure_data_factory.py
+++ b/tests/providers/microsoft/azure/triggers/test_azure_data_factory.py
@@ -316,8 +316,11 @@ class TestAzureDataFactoryTrigger:
assert expected == actual
@pytest.mark.asyncio
+
@mock.patch(f"{MODULE}.hooks.data_factory.AzureDataFactoryAsyncHook.cancel_pipeline_run")
@mock.patch(f"{MODULE}.hooks.data_factory.AzureDataFactoryAsyncHook.get_adf_pipeline_run_status")
- async def test_azure_data_factory_trigger_run_exception(self,
mock_pipeline_run_status):
+ async def test_azure_data_factory_trigger_run_exception(
+ self, mock_pipeline_run_status, mock_cancel_pipeline_run
+ ):
"""Assert that run catch exception if Azure API throw exception"""
mock_pipeline_run_status.side_effect = Exception("Test exception")
@@ -331,6 +334,7 @@ class TestAzureDataFactoryTrigger:
)
assert len(task) == 1
assert response in task
+ mock_cancel_pipeline_run.assert_called_once()
@pytest.mark.asyncio
@mock.patch(f"{MODULE}.hooks.data_factory.AzureDataFactoryAsyncHook.get_adf_pipeline_run_status")