This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ce51ac5d3 Cancel pipeline if unexpected exception caught (#32238)
2ce51ac5d3 is described below

commit 2ce51ac5d3b1e3bcb253b04bd72c04cfb2af700a
Author: Wei Lee <[email protected]>
AuthorDate: Thu Jun 29 23:46:18 2023 +0800

    Cancel pipeline if unexpected exception caught (#32238)
---
 .../microsoft/azure/hooks/data_factory.py          | 22 ++++++++++++++++++++++
 .../microsoft/azure/triggers/data_factory.py       |  7 +++++++
 .../azure/operators/test_azure_data_factory.py     |  2 +-
 .../azure/triggers/test_azure_data_factory.py      |  6 +++++-
 4 files changed, 35 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/microsoft/azure/hooks/data_factory.py 
b/airflow/providers/microsoft/azure/hooks/data_factory.py
index da8455daa2..b9e7feea0a 100644
--- a/airflow/providers/microsoft/azure/hooks/data_factory.py
+++ b/airflow/providers/microsoft/azure/hooks/data_factory.py
@@ -1175,3 +1175,25 @@ class AzureDataFactoryAsyncHook(AzureDataFactoryHook):
             return status
         except Exception as e:
             raise AirflowException(e)
+
+    @provide_targeted_factory_async
+    async def cancel_pipeline_run(
+        self,
+        run_id: str,
+        resource_group_name: str | None = None,
+        factory_name: str | None = None,
+        **config: Any,
+    ) -> None:
+        """
+        Cancel the pipeline run.
+
+        :param run_id: The pipeline run identifier.
+        :param resource_group_name: The resource group name.
+        :param factory_name: The factory name.
+        :param config: Extra parameters for the ADF client.
+        """
+        client = await self.get_async_conn()
+        try:
+            await client.pipeline_runs.cancel(resource_group_name, 
factory_name, run_id)
+        except Exception as e:
+            raise AirflowException(e)
diff --git a/airflow/providers/microsoft/azure/triggers/data_factory.py 
b/airflow/providers/microsoft/azure/triggers/data_factory.py
index c68d6e96b7..40b9555940 100644
--- a/airflow/providers/microsoft/azure/triggers/data_factory.py
+++ b/airflow/providers/microsoft/azure/triggers/data_factory.py
@@ -191,4 +191,11 @@ class AzureDataFactoryTrigger(BaseTrigger):
                     }
                 )
         except Exception as e:
+            if self.run_id:
+                await hook.cancel_pipeline_run(
+                    run_id=self.run_id,
+                    resource_group_name=self.resource_group_name,
+                    factory_name=self.factory_name,
+                )
+                self.log.info("Unexpected error %s caught. Cancel pipeline run 
%s", str(e), self.run_id)
             yield TriggerEvent({"status": "error", "message": str(e), 
"run_id": self.run_id})
diff --git 
a/tests/providers/microsoft/azure/operators/test_azure_data_factory.py 
b/tests/providers/microsoft/azure/operators/test_azure_data_factory.py
index 461c1502c6..2a545cd137 100644
--- a/tests/providers/microsoft/azure/operators/test_azure_data_factory.py
+++ b/tests/providers/microsoft/azure/operators/test_azure_data_factory.py
@@ -363,7 +363,7 @@ class TestAzureDataFactoryRunPipelineOperatorWithDeferrable:
     )
     
@mock.patch("airflow.providers.microsoft.azure.hooks.data_factory.AzureDataFactoryHook.run_pipeline")
     def test_azure_data_factory_run_pipeline_operator_async(self, 
mock_run_pipeline, mock_get_status, status):
-        """Assert that AzureDataFactoryRunPipelineOperatorAsync deferred"""
+        """Assert that AzureDataFactoryRunPipelineOperator(..., 
deferrable=True) deferred"""
 
         class CreateRunResponse:
             pass
diff --git 
a/tests/providers/microsoft/azure/triggers/test_azure_data_factory.py 
b/tests/providers/microsoft/azure/triggers/test_azure_data_factory.py
index c7c31d2818..9df51bae71 100644
--- a/tests/providers/microsoft/azure/triggers/test_azure_data_factory.py
+++ b/tests/providers/microsoft/azure/triggers/test_azure_data_factory.py
@@ -316,8 +316,11 @@ class TestAzureDataFactoryTrigger:
         assert expected == actual
 
     @pytest.mark.asyncio
+    
@mock.patch(f"{MODULE}.hooks.data_factory.AzureDataFactoryAsyncHook.cancel_pipeline_run")
     
@mock.patch(f"{MODULE}.hooks.data_factory.AzureDataFactoryAsyncHook.get_adf_pipeline_run_status")
-    async def test_azure_data_factory_trigger_run_exception(self, 
mock_pipeline_run_status):
+    async def test_azure_data_factory_trigger_run_exception(
+        self, mock_pipeline_run_status, mock_cancel_pipeline_run
+    ):
         """Assert that run catch exception if Azure API throw exception"""
         mock_pipeline_run_status.side_effect = Exception("Test exception")
 
@@ -331,6 +334,7 @@ class TestAzureDataFactoryTrigger:
         )
         assert len(task) == 1
         assert response in task
+        mock_cancel_pipeline_run.assert_called_once()
 
     @pytest.mark.asyncio
     
@mock.patch(f"{MODULE}.hooks.data_factory.AzureDataFactoryAsyncHook.get_adf_pipeline_run_status")

Reply via email to