This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0e076dceaf Fix catching 409 error (#33173)
0e076dceaf is described below
commit 0e076dceaf169174bbd4d8ee46911b60505eb098
Author: VladaZakharova <[email protected]>
AuthorDate: Mon Aug 7 16:59:33 2023 +0200
Fix catching 409 error (#33173)
---
airflow/providers/google/cloud/hooks/datafusion.py | 27 +++++++++++++---------
.../google/cloud/hooks/test_datafusion.py | 22 ++++++++++++++++++
2 files changed, 38 insertions(+), 11 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/datafusion.py
b/airflow/providers/google/cloud/hooks/datafusion.py
index de426bc242..38b0660212 100644
--- a/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/airflow/providers/google/cloud/hooks/datafusion.py
@@ -39,6 +39,12 @@ from airflow.providers.google.common.hooks.base_google
import (
Operation = Dict[str, Any]
+class ConflictException(AirflowException):
+ """Exception to catch 409 error."""
+
+ pass
+
+
class PipelineStates:
"""Data Fusion pipeline states."""
@@ -163,6 +169,8 @@ class DataFusionHook(GoogleBaseHook):
def _check_response_status_and_data(response, message: str) -> None:
if response.status == 404:
raise AirflowNotFoundException(message)
+ elif response.status == 409:
+ raise ConflictException("Conflict: Resource is still in use.")
elif response.status != 200:
raise AirflowException(message)
if response.data is None:
@@ -356,21 +364,18 @@ class DataFusionHook(GoogleBaseHook):
if version_id:
url = os.path.join(url, "versions", version_id)
- response = self._cdap_request(url=url, method="DELETE", body=None)
- # Check for 409 error: the previous step for starting/stopping
pipeline could still be in progress.
- # Waiting some time before retry.
- for time_to_wait in exponential_sleep_generator(initial=10,
maximum=120):
+ for time_to_wait in exponential_sleep_generator(initial=1, maximum=10):
try:
+ response = self._cdap_request(url=url, method="DELETE",
body=None)
self._check_response_status_and_data(
response, f"Deleting a pipeline failed with code
{response.status}: {response.data}"
)
- break
- except AirflowException as exc:
- if "409" in str(exc):
- sleep(time_to_wait)
- response = self._cdap_request(url=url, method="DELETE",
body=None)
- else:
- raise
+ if response.status == 200:
+ break
+ except ConflictException as exc:
+ self.log.info(exc)
+ sleep(time_to_wait)
+ continue
def list_pipelines(
self,
diff --git a/tests/providers/google/cloud/hooks/test_datafusion.py
b/tests/providers/google/cloud/hooks/test_datafusion.py
index 58e1c048d0..50ea6b19d0 100644
--- a/tests/providers/google/cloud/hooks/test_datafusion.py
+++ b/tests/providers/google/cloud/hooks/test_datafusion.py
@@ -52,6 +52,12 @@ CONSTRUCTED_PIPELINE_URL_GET = (
)
+class MockResponse:
+ def __init__(self, status, data=None):
+ self.status = status
+ self.data = data
+
+
@pytest.fixture
def hook():
with mock.patch(
@@ -255,6 +261,22 @@ class TestDataFusionHook:
body=None,
)
+ @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
+ def test_delete_pipeline_should_fail_if_status_409(self, mock_request,
hook, caplog):
+ mock_request.side_effect = [
+ MockResponse(status=409, data="Conflict: Resource is still in
use."),
+ MockResponse(status=200, data="Success"),
+ ]
+ hook.delete_pipeline(pipeline_name=PIPELINE_NAME,
instance_url=INSTANCE_URL)
+
+ assert mock_request.call_count == 2
+ assert "Conflict: Resource is still in use." in caplog.text
+ mock_request.assert_called_with(
+ url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}",
+ method="DELETE",
+ body=None,
+ )
+
@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_list_pipelines(self, mock_request, hook):
data = {"data": "test"}