This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ceab840f31 Cancel airbyte job when timeout exceeded to prevent
inconsistency among airflow and airbyte (#36241)
ceab840f31 is described below
commit ceab840f31e2dcf591390bbace0ff9d74c6fc8fd
Author: shohamy7 <[email protected]>
AuthorDate: Sat Dec 16 20:11:28 2023 +0200
Cancel airbyte job when timeout exceeded to prevent inconsistency among
airflow and airbyte (#36241)
Co-authored-by: Elad Kalif <[email protected]>
---
airflow/providers/airbyte/hooks/airbyte.py | 1 +
tests/providers/airbyte/hooks/test_airbyte.py | 5 ++++-
2 files changed, 5 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/airbyte/hooks/airbyte.py
b/airflow/providers/airbyte/hooks/airbyte.py
index f0b68e1ad5..a9ce336022 100644
--- a/airflow/providers/airbyte/hooks/airbyte.py
+++ b/airflow/providers/airbyte/hooks/airbyte.py
@@ -63,6 +63,7 @@ class AirbyteHook(HttpHook):
start = time.monotonic()
while True:
if timeout and start + timeout < time.monotonic():
+ self.cancel_job(job_id=(int(job_id)))
raise AirflowException(f"Timeout: Airbyte job {job_id} is not
ready after {timeout}s")
time.sleep(wait_seconds)
try:
diff --git a/tests/providers/airbyte/hooks/test_airbyte.py
b/tests/providers/airbyte/hooks/test_airbyte.py
index d4a954e2ad..400741820b 100644
--- a/tests/providers/airbyte/hooks/test_airbyte.py
+++ b/tests/providers/airbyte/hooks/test_airbyte.py
@@ -112,7 +112,8 @@ class TestAirbyteHook:
mock_get_job.assert_has_calls(calls)
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job")
- def test_wait_for_job_timeout(self, mock_get_job):
+
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.cancel_job")
+ def test_wait_for_job_timeout(self, mock_cancel_job, mock_get_job):
mock_get_job.side_effect = [
self.return_value_get_job(self.hook.PENDING),
self.return_value_get_job(self.hook.RUNNING),
@@ -122,7 +123,9 @@ class TestAirbyteHook:
calls = [mock.call(job_id=self.job_id)]
mock_get_job.assert_has_calls(calls)
+ mock_cancel_job.assert_has_calls(calls)
assert mock_get_job.mock_calls == calls
+ assert mock_cancel_job.mock_calls == calls
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job")
def test_wait_for_job_state_unrecognized(self, mock_get_job):