This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 31dded5fee0 Fix deferrable execution timeout handling for Airbyte 
jobs. (#67816)
31dded5fee0 is described below

commit 31dded5fee05cfd80194c01adead0366cf153966
Author: SameerMesiah97 <[email protected]>
AuthorDate: Mon Jun 1 21:07:18 2026 +0100

    Fix deferrable execution timeout handling for Airbyte jobs. (#67816)
    
    Use wall-clock deadlines instead of monotonic time, enforce timeout checks
    before job state evaluation, and add a defer timeout buffer to allow
    execute_complete() to cancel timed-out jobs before task termination.
    
    Co-authored-by: Sameer Mesiah <[email protected]>
---
 .../airflow/providers/airbyte/operators/airbyte.py | 20 ++++--
 .../airflow/providers/airbyte/triggers/airbyte.py  | 44 ++++++------
 .../tests/unit/airbyte/operators/test_airbyte.py   | 81 +++++++++++++++++++---
 .../tests/unit/airbyte/triggers/test_airbyte.py    | 81 ++++++++++++++++++++--
 4 files changed, 184 insertions(+), 42 deletions(-)

diff --git 
a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py 
b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
index ae4e19e125f..5cf7926f2fb 100644
--- a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
+++ b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+import datetime
 import time
 from collections.abc import Sequence
 from typing import TYPE_CHECKING, Any
@@ -87,16 +88,27 @@ class AirbyteTriggerSyncOperator(BaseOperator):
         job_object = 
hook.submit_sync_connection(connection_id=self.connection_id)
         self.job_id = job_object.job_id
         state = job_object.status
+        trigger_poll_interval = 60
+
+        now = time.time()
 
         # Derive absolute deadlines for deferrable execution.
         # execution_timeout is a hard task-level limit (cancels the job),
         # while timeout only limits how long we wait for the job to finish.
         # If both are set, the earliest deadline wins.
-        end_time = time.monotonic() + self.timeout
+        end_time = now + self.timeout
         execution_deadline = None
 
+        defer_timeout: datetime.timedelta | None = None
+
         if self.execution_timeout is not None:
-            execution_deadline = time.monotonic() + 
self.execution_timeout.total_seconds()
+            execution_deadline = now + self.execution_timeout.total_seconds()
+
+            # Allow two trigger polling cycles for timeout events to be
+            # processed and Airbyte job cancellation to be initiated before
+            # the framework defer timeout expires.
+            poll_buffer = 2 * trigger_poll_interval
+            defer_timeout = 
datetime.timedelta(seconds=self.execution_timeout.total_seconds() + poll_buffer)
 
         self.log.info("Job %s was submitted to Airbyte Server", self.job_id)
 
@@ -111,13 +123,13 @@ class AirbyteTriggerSyncOperator(BaseOperator):
             self.log.debug("Running in deferrable mode in job state %s...", 
state)
             if state in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING, 
JobStatusEnum.INCOMPLETE):
                 self.defer(
-                    timeout=None,
+                    timeout=defer_timeout,
                     trigger=AirbyteSyncTrigger(
                         conn_id=self.airbyte_conn_id,
                         job_id=self.job_id,
                         end_time=end_time,
                         execution_deadline=execution_deadline,
-                        poll_interval=60,
+                        poll_interval=trigger_poll_interval,
                     ),
                     method_name="execute_complete",
                 )
diff --git 
a/providers/airbyte/src/airflow/providers/airbyte/triggers/airbyte.py 
b/providers/airbyte/src/airflow/providers/airbyte/triggers/airbyte.py
index 7ff8a5e2794..6461993f8dd 100644
--- a/providers/airbyte/src/airflow/providers/airbyte/triggers/airbyte.py
+++ b/providers/airbyte/src/airflow/providers/airbyte/triggers/airbyte.py
@@ -76,28 +76,7 @@ class AirbyteSyncTrigger(BaseTrigger):
         hook = AirbyteHook(airbyte_conn_id=self.conn_id)
         try:
             while True:
-                now = time.monotonic()
-
-                job_run_status = hook.get_job_status(self.job_id)
-
-                if job_run_status == JobStatusEnum.SUCCEEDED:
-                    yield TriggerEvent(
-                        {
-                            "status": "success",
-                            "message": f"Job run {self.job_id} has completed 
successfully.",
-                            "job_id": self.job_id,
-                        }
-                    )
-                    return
-                elif job_run_status == JobStatusEnum.CANCELLED:
-                    yield TriggerEvent(
-                        {
-                            "status": "cancelled",
-                            "message": f"Job run {self.job_id} has been 
cancelled.",
-                            "job_id": self.job_id,
-                        }
-                    )
-                    return
+                now = time.time()
 
                 if self.execution_deadline is not None:
                     if self.execution_deadline <= now:
@@ -121,6 +100,27 @@ class AirbyteSyncTrigger(BaseTrigger):
                     )
                     return
 
+                job_run_status = hook.get_job_status(self.job_id)
+
+                if job_run_status == JobStatusEnum.SUCCEEDED:
+                    yield TriggerEvent(
+                        {
+                            "status": "success",
+                            "message": f"Job run {self.job_id} has completed 
successfully.",
+                            "job_id": self.job_id,
+                        }
+                    )
+                    return
+                elif job_run_status == JobStatusEnum.CANCELLED:
+                    yield TriggerEvent(
+                        {
+                            "status": "cancelled",
+                            "message": f"Job run {self.job_id} has been 
cancelled.",
+                            "job_id": self.job_id,
+                        }
+                    )
+                    return
+
                 if job_run_status in (
                     JobStatusEnum.RUNNING,
                     JobStatusEnum.PENDING,
diff --git a/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py 
b/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
index b9165dca06f..9f2d4080369 100644
--- a/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
+++ b/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
@@ -39,9 +39,12 @@ class TestAirbyteTriggerSyncOp:
     wait_seconds = 0
     timeout = 360
 
+    @mock.patch("airflow.providers.airbyte.operators.airbyte.time")
     @mock.patch("airbyte_api.jobs.Jobs.create_job")
     
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.wait_for_job", 
return_value=None)
-    def test_execute(self, mock_wait_for_job, mock_submit_sync_connection, 
create_connection_without_db):
+    def test_execute(
+        self, mock_wait_for_job, mock_submit_sync_connection, mock_time, 
create_connection_without_db
+    ):
         conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte", 
host="airbyte.com")
         create_connection_without_db(conn)
         mock_response = mock.Mock()
@@ -70,19 +73,27 @@ class TestAirbyteTriggerSyncOp:
             job_id=self.job_id, wait_seconds=self.wait_seconds, 
timeout=self.timeout
         )
 
+        # Ensure that wall-clock time is used during operator execution flow.
+        mock_time.time.assert_called()
+        mock_time.monotonic.assert_not_called()
+
+    @mock.patch("airflow.providers.airbyte.operators.airbyte.time")
     
@mock.patch("airflow.providers.airbyte.operators.airbyte.AirbyteTriggerSyncOperator.defer")
     
@mock.patch("airflow.providers.airbyte.operators.airbyte.AirbyteSyncTrigger")
     @mock.patch("airbyte_api.jobs.Jobs.create_job")
-    def test_execute_deferrable_does_not_pass_execution_timeout_to_defer(
+    def test_execute_deferrable_without_execution_timeout(
         self,
         mock_create_job,
         mock_airbyte_trigger,
         mock_defer,
+        mock_time,
         create_connection_without_db,
     ):
         conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte", 
host="airbyte.com")
         create_connection_without_db(conn)
 
+        mock_time.time.return_value = 1000.0
+
         mock_response = mock.Mock()
         mock_response.job_response = JobResponse(
             connection_id="connection-mock",
@@ -97,30 +108,78 @@ class TestAirbyteTriggerSyncOp:
             task_id="test_airbyte_op",
             airbyte_conn_id=self.airbyte_conn_id,
             connection_id=self.connection_id,
-            wait_seconds=self.wait_seconds,
             timeout=self.timeout,
             deferrable=True,
-            execution_timeout=timedelta(seconds=30),
+            execution_timeout=None,
         )
 
         op.execute({})
 
-        # Explicitly pass timeout=None so Airflow's framework-level deferred
-        # timeout handling does not bypass execute_complete(), which is
-        # responsible for Airbyte job cancellation in deferrable mode.
         mock_defer.assert_called_once_with(
             method_name="execute_complete",
             trigger=mock_airbyte_trigger.return_value,
             timeout=None,
         )
 
-        # Ensure the trigger still receives execution_deadline handling for
-        # Airbyte job timeout cancellation processing.
         mock_airbyte_trigger.assert_called_once_with(
             conn_id=self.airbyte_conn_id,
             job_id=self.job_id,
-            end_time=mock.ANY,
-            execution_deadline=mock.ANY,
+            end_time=1000.0 + self.timeout,
+            execution_deadline=None,
+            poll_interval=60,
+        )
+
+    @mock.patch("airflow.providers.airbyte.operators.airbyte.time")
+    
@mock.patch("airflow.providers.airbyte.operators.airbyte.AirbyteTriggerSyncOperator.defer")
+    
@mock.patch("airflow.providers.airbyte.operators.airbyte.AirbyteSyncTrigger")
+    @mock.patch("airbyte_api.jobs.Jobs.create_job")
+    def test_execute_deferrable_with_execution_timeout(
+        self,
+        mock_create_job,
+        mock_airbyte_trigger,
+        mock_defer,
+        mock_time,
+        create_connection_without_db,
+    ):
+        conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte", 
host="airbyte.com")
+        create_connection_without_db(conn)
+
+        mock_time.time.return_value = 1000.0
+
+        mock_response = mock.Mock()
+        mock_response.job_response = JobResponse(
+            connection_id="connection-mock",
+            job_id=1,
+            start_time="today",
+            job_type=JobTypeEnum.SYNC,
+            status=JobStatusEnum.RUNNING,
+        )
+        mock_create_job.return_value = mock_response
+
+        execution_timeout = timedelta(seconds=60)
+
+        op = AirbyteTriggerSyncOperator(
+            task_id="test_airbyte_op",
+            airbyte_conn_id=self.airbyte_conn_id,
+            connection_id=self.connection_id,
+            timeout=self.timeout,
+            deferrable=True,
+            execution_timeout=execution_timeout,
+        )
+
+        op.execute({})
+
+        mock_defer.assert_called_once_with(
+            method_name="execute_complete",
+            trigger=mock_airbyte_trigger.return_value,
+            timeout=timedelta(seconds=180),  # 60s timeout + 120s buffer
+        )
+
+        mock_airbyte_trigger.assert_called_once_with(
+            conn_id=self.airbyte_conn_id,
+            job_id=self.job_id,
+            end_time=1000.0 + self.timeout,
+            execution_deadline=1060.0,
             poll_interval=60,
         )
 
diff --git a/providers/airbyte/tests/unit/airbyte/triggers/test_airbyte.py 
b/providers/airbyte/tests/unit/airbyte/triggers/test_airbyte.py
index c1777b834b9..ab554037cfe 100644
--- a/providers/airbyte/tests/unit/airbyte/triggers/test_airbyte.py
+++ b/providers/airbyte/tests/unit/airbyte/triggers/test_airbyte.py
@@ -18,6 +18,7 @@ from __future__ import annotations
 
 import asyncio
 import time
+from contextlib import suppress
 from unittest import mock
 
 import pytest
@@ -44,11 +45,11 @@ class TestAirbyteSyncTrigger:
 
     @pytest.fixture
     def end_time(self):
-        return time.monotonic() + 60 * 60 * 24 * 7
+        return time.time() + 60 * 60 * 24 * 7
 
     @pytest.fixture
     def execution_deadline(self):
-        return time.monotonic() + 60 * 60 * 24 * 7
+        return time.time() + 60 * 60 * 24 * 7
 
     def test_serialization(self, end_time, execution_deadline):
         """Assert TestAirbyteSyncTrigger correctly serializes its arguments 
and classpath."""
@@ -205,7 +206,7 @@ class TestAirbyteSyncTrigger:
         """Assert that run timeout after end_time elapsed"""
         mock_get_job_status.side_effect = JobStatusEnum.RUNNING
 
-        end_time = time.monotonic()
+        end_time = time.time()
         trigger = AirbyteSyncTrigger(
             conn_id=self.CONN_ID,
             poll_interval=self.POLL_INTERVAL,
@@ -232,7 +233,7 @@ class TestAirbyteSyncTrigger:
     async def test_airbyte_job_run_execution_timeout(self, 
mock_get_job_status, end_time):
         """Assert that run timeout after execution_deadline has elapsed"""
         mock_get_job_status.side_effect = JobStatusEnum.RUNNING
-        execution_deadline = time.monotonic() - 1
+        execution_deadline = time.time() - 1
 
         trigger = AirbyteSyncTrigger(
             conn_id=self.CONN_ID,
@@ -255,6 +256,76 @@ class TestAirbyteSyncTrigger:
         assert len(events) == 1
         assert expected_result in events
 
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job_status")
+    async def test_execution_timeout_takes_precedence_over_success_status(
+        self,
+        mock_get_job_status,
+        end_time,
+    ):
+        """Execution timeout should take precedence over terminal job 
states."""
+
+        mock_get_job_status.return_value = JobStatusEnum.SUCCEEDED
+
+        trigger = AirbyteSyncTrigger(
+            conn_id=self.CONN_ID,
+            poll_interval=self.POLL_INTERVAL,
+            end_time=end_time,
+            execution_deadline=time.time() - 1,
+            job_id=self.JOB_ID,
+        )
+
+        events = [e async for e in trigger.run()]
+
+        expected_result = TriggerEvent(
+            {
+                "status": "timeout",
+                "message": f"Job run {self.JOB_ID} has reached execution 
timeout.",
+                "job_id": self.JOB_ID,
+            }
+        )
+
+        assert len(events) == 1
+        assert expected_result in events
+
+    @pytest.mark.asyncio
+    @mock.patch("airflow.providers.airbyte.triggers.airbyte.time")
+    
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job_status")
+    async def test_airbyte_job_run_trigger_uses_wall_clock_end_time(
+        self,
+        mock_get_job_status,
+        mock_time,
+        end_time,
+        execution_deadline,
+    ):
+        """Assert serialized deadlines are compared to wall-clock time."""
+
+        mock_time.time.return_value = end_time - 60
+
+        mock_get_job_status.return_value = JobStatusEnum.RUNNING
+
+        trigger = AirbyteSyncTrigger(
+            conn_id=self.CONN_ID,
+            poll_interval=self.POLL_INTERVAL,
+            end_time=end_time,
+            execution_deadline=execution_deadline,
+            job_id=self.JOB_ID,
+        )
+
+        task = asyncio.create_task(trigger.run().__anext__())
+
+        await asyncio.sleep(0)
+
+        assert task.done() is False
+
+        mock_time.time.assert_called()
+        mock_time.monotonic.assert_not_called()
+        mock_get_job_status.assert_called_once_with(self.JOB_ID)
+
+        task.cancel()
+        with suppress(asyncio.CancelledError):
+            await task
+
     @pytest.mark.asyncio
     
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job_status")
     async def test_terminal_yields_only_once(self, mock_get_job_status):
@@ -263,7 +334,7 @@ class TestAirbyteSyncTrigger:
         trigger = AirbyteSyncTrigger(
             conn_id="airbyte_default",
             poll_interval=1,
-            end_time=time.monotonic() + 100,
+            end_time=time.time() + 100,
             job_id=1234,
         )
 

Reply via email to