This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ceab840f31 Cancel airbyte job when timeout exceeded to prevent 
inconsistency among airflow and airbyte (#36241)
ceab840f31 is described below

commit ceab840f31e2dcf591390bbace0ff9d74c6fc8fd
Author: shohamy7 <[email protected]>
AuthorDate: Sat Dec 16 20:11:28 2023 +0200

    Cancel airbyte job when timeout exceeded to prevent inconsistency among 
airflow and airbyte (#36241)
    
    Co-authored-by: Elad Kalif <[email protected]>
---
 airflow/providers/airbyte/hooks/airbyte.py    | 1 +
 tests/providers/airbyte/hooks/test_airbyte.py | 5 ++++-
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/airbyte/hooks/airbyte.py 
b/airflow/providers/airbyte/hooks/airbyte.py
index f0b68e1ad5..a9ce336022 100644
--- a/airflow/providers/airbyte/hooks/airbyte.py
+++ b/airflow/providers/airbyte/hooks/airbyte.py
@@ -63,6 +63,7 @@ class AirbyteHook(HttpHook):
         start = time.monotonic()
         while True:
             if timeout and start + timeout < time.monotonic():
+                self.cancel_job(job_id=(int(job_id)))
                 raise AirflowException(f"Timeout: Airbyte job {job_id} is not 
ready after {timeout}s")
             time.sleep(wait_seconds)
             try:
diff --git a/tests/providers/airbyte/hooks/test_airbyte.py 
b/tests/providers/airbyte/hooks/test_airbyte.py
index d4a954e2ad..400741820b 100644
--- a/tests/providers/airbyte/hooks/test_airbyte.py
+++ b/tests/providers/airbyte/hooks/test_airbyte.py
@@ -112,7 +112,8 @@ class TestAirbyteHook:
         mock_get_job.assert_has_calls(calls)
 
     @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job")
-    def test_wait_for_job_timeout(self, mock_get_job):
+    
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.cancel_job")
+    def test_wait_for_job_timeout(self, mock_cancel_job, mock_get_job):
         mock_get_job.side_effect = [
             self.return_value_get_job(self.hook.PENDING),
             self.return_value_get_job(self.hook.RUNNING),
@@ -122,7 +123,9 @@ class TestAirbyteHook:
 
         calls = [mock.call(job_id=self.job_id)]
         mock_get_job.assert_has_calls(calls)
+        mock_cancel_job.assert_has_calls(calls)
         assert mock_get_job.mock_calls == calls
+        assert mock_cancel_job.mock_calls == calls
 
     @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job")
     def test_wait_for_job_state_unrecognized(self, mock_get_job):

Reply via email to