This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e076dceaf Fix catching 409 error (#33173)
0e076dceaf is described below

commit 0e076dceaf169174bbd4d8ee46911b60505eb098
Author: VladaZakharova <[email protected]>
AuthorDate: Mon Aug 7 16:59:33 2023 +0200

    Fix catching 409 error (#33173)
---
 airflow/providers/google/cloud/hooks/datafusion.py | 27 +++++++++++++---------
 .../google/cloud/hooks/test_datafusion.py          | 22 ++++++++++++++++++
 2 files changed, 38 insertions(+), 11 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/datafusion.py 
b/airflow/providers/google/cloud/hooks/datafusion.py
index de426bc242..38b0660212 100644
--- a/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/airflow/providers/google/cloud/hooks/datafusion.py
@@ -39,6 +39,12 @@ from airflow.providers.google.common.hooks.base_google 
import (
 Operation = Dict[str, Any]
 
 
+class ConflictException(AirflowException):
+    """Exception to catch 409 error."""
+
+    pass
+
+
 class PipelineStates:
     """Data Fusion pipeline states."""
 
@@ -163,6 +169,8 @@ class DataFusionHook(GoogleBaseHook):
     def _check_response_status_and_data(response, message: str) -> None:
         if response.status == 404:
             raise AirflowNotFoundException(message)
+        elif response.status == 409:
+            raise ConflictException("Conflict: Resource is still in use.")
         elif response.status != 200:
             raise AirflowException(message)
         if response.data is None:
@@ -356,21 +364,18 @@ class DataFusionHook(GoogleBaseHook):
         if version_id:
             url = os.path.join(url, "versions", version_id)
 
-        response = self._cdap_request(url=url, method="DELETE", body=None)
-        # Check for 409 error: the previous step for starting/stopping 
pipeline could still be in progress.
-        # Waiting some time before retry.
-        for time_to_wait in exponential_sleep_generator(initial=10, 
maximum=120):
+        for time_to_wait in exponential_sleep_generator(initial=1, maximum=10):
             try:
+                response = self._cdap_request(url=url, method="DELETE", 
body=None)
                 self._check_response_status_and_data(
                     response, f"Deleting a pipeline failed with code 
{response.status}: {response.data}"
                 )
-                break
-            except AirflowException as exc:
-                if "409" in str(exc):
-                    sleep(time_to_wait)
-                    response = self._cdap_request(url=url, method="DELETE", 
body=None)
-                else:
-                    raise
+                if response.status == 200:
+                    break
+            except ConflictException as exc:
+                self.log.info(exc)
+                sleep(time_to_wait)
+                continue
 
     def list_pipelines(
         self,
diff --git a/tests/providers/google/cloud/hooks/test_datafusion.py 
b/tests/providers/google/cloud/hooks/test_datafusion.py
index 58e1c048d0..50ea6b19d0 100644
--- a/tests/providers/google/cloud/hooks/test_datafusion.py
+++ b/tests/providers/google/cloud/hooks/test_datafusion.py
@@ -52,6 +52,12 @@ CONSTRUCTED_PIPELINE_URL_GET = (
 )
 
 
+class MockResponse:
+    def __init__(self, status, data=None):
+        self.status = status
+        self.data = data
+
+
 @pytest.fixture
 def hook():
     with mock.patch(
@@ -255,6 +261,22 @@ class TestDataFusionHook:
             body=None,
         )
 
+    @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
+    def test_delete_pipeline_should_fail_if_status_409(self, mock_request, 
hook, caplog):
+        mock_request.side_effect = [
+            MockResponse(status=409, data="Conflict: Resource is still in 
use."),
+            MockResponse(status=200, data="Success"),
+        ]
+        hook.delete_pipeline(pipeline_name=PIPELINE_NAME, 
instance_url=INSTANCE_URL)
+
+        assert mock_request.call_count == 2
+        assert "Conflict: Resource is still in use." in caplog.text
+        mock_request.assert_called_with(
+            url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}",
+            method="DELETE",
+            body=None,
+        )
+
     @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
     def test_list_pipelines(self, mock_request, hook):
         data = {"data": "test"}

Reply via email to