This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 31dded5fee0 Fix deferrable execution timeout handling for Airbyte
jobs. (#67816)
31dded5fee0 is described below
commit 31dded5fee05cfd80194c01adead0366cf153966
Author: SameerMesiah97 <[email protected]>
AuthorDate: Mon Jun 1 21:07:18 2026 +0100
Fix deferrable execution timeout handling for Airbyte jobs. (#67816)
Use wall-clock deadlines instead of monotonic time, enforce timeout checks
before job state evaluation, and add a defer timeout buffer to allow
execute_complete() to cancel timed-out jobs before task termination.
Co-authored-by: Sameer Mesiah <[email protected]>
---
.../airflow/providers/airbyte/operators/airbyte.py | 20 ++++--
.../airflow/providers/airbyte/triggers/airbyte.py | 44 ++++++------
.../tests/unit/airbyte/operators/test_airbyte.py | 81 +++++++++++++++++++---
.../tests/unit/airbyte/triggers/test_airbyte.py | 81 ++++++++++++++++++++--
4 files changed, 184 insertions(+), 42 deletions(-)
diff --git
a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
index ae4e19e125f..5cf7926f2fb 100644
--- a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
+++ b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+import datetime
import time
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any
@@ -87,16 +88,27 @@ class AirbyteTriggerSyncOperator(BaseOperator):
job_object =
hook.submit_sync_connection(connection_id=self.connection_id)
self.job_id = job_object.job_id
state = job_object.status
+ trigger_poll_interval = 60
+
+ now = time.time()
# Derive absolute deadlines for deferrable execution.
# execution_timeout is a hard task-level limit (cancels the job),
# while timeout only limits how long we wait for the job to finish.
# If both are set, the earliest deadline wins.
- end_time = time.monotonic() + self.timeout
+ end_time = now + self.timeout
execution_deadline = None
+ defer_timeout: datetime.timedelta | None = None
+
if self.execution_timeout is not None:
- execution_deadline = time.monotonic() +
self.execution_timeout.total_seconds()
+ execution_deadline = now + self.execution_timeout.total_seconds()
+
+ # Allow two trigger polling cycles for timeout events to be
+ # processed and Airbyte job cancellation to be initiated before
+ # the framework defer timeout expires.
+ poll_buffer = 2 * trigger_poll_interval
+ defer_timeout =
datetime.timedelta(seconds=self.execution_timeout.total_seconds() + poll_buffer)
self.log.info("Job %s was submitted to Airbyte Server", self.job_id)
@@ -111,13 +123,13 @@ class AirbyteTriggerSyncOperator(BaseOperator):
self.log.debug("Running in deferrable mode in job state %s...",
state)
if state in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING,
JobStatusEnum.INCOMPLETE):
self.defer(
- timeout=None,
+ timeout=defer_timeout,
trigger=AirbyteSyncTrigger(
conn_id=self.airbyte_conn_id,
job_id=self.job_id,
end_time=end_time,
execution_deadline=execution_deadline,
- poll_interval=60,
+ poll_interval=trigger_poll_interval,
),
method_name="execute_complete",
)
diff --git
a/providers/airbyte/src/airflow/providers/airbyte/triggers/airbyte.py
b/providers/airbyte/src/airflow/providers/airbyte/triggers/airbyte.py
index 7ff8a5e2794..6461993f8dd 100644
--- a/providers/airbyte/src/airflow/providers/airbyte/triggers/airbyte.py
+++ b/providers/airbyte/src/airflow/providers/airbyte/triggers/airbyte.py
@@ -76,28 +76,7 @@ class AirbyteSyncTrigger(BaseTrigger):
hook = AirbyteHook(airbyte_conn_id=self.conn_id)
try:
while True:
- now = time.monotonic()
-
- job_run_status = hook.get_job_status(self.job_id)
-
- if job_run_status == JobStatusEnum.SUCCEEDED:
- yield TriggerEvent(
- {
- "status": "success",
- "message": f"Job run {self.job_id} has completed
successfully.",
- "job_id": self.job_id,
- }
- )
- return
- elif job_run_status == JobStatusEnum.CANCELLED:
- yield TriggerEvent(
- {
- "status": "cancelled",
- "message": f"Job run {self.job_id} has been
cancelled.",
- "job_id": self.job_id,
- }
- )
- return
+ now = time.time()
if self.execution_deadline is not None:
if self.execution_deadline <= now:
@@ -121,6 +100,27 @@ class AirbyteSyncTrigger(BaseTrigger):
)
return
+ job_run_status = hook.get_job_status(self.job_id)
+
+ if job_run_status == JobStatusEnum.SUCCEEDED:
+ yield TriggerEvent(
+ {
+ "status": "success",
+ "message": f"Job run {self.job_id} has completed
successfully.",
+ "job_id": self.job_id,
+ }
+ )
+ return
+ elif job_run_status == JobStatusEnum.CANCELLED:
+ yield TriggerEvent(
+ {
+ "status": "cancelled",
+ "message": f"Job run {self.job_id} has been
cancelled.",
+ "job_id": self.job_id,
+ }
+ )
+ return
+
if job_run_status in (
JobStatusEnum.RUNNING,
JobStatusEnum.PENDING,
diff --git a/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
b/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
index b9165dca06f..9f2d4080369 100644
--- a/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
+++ b/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
@@ -39,9 +39,12 @@ class TestAirbyteTriggerSyncOp:
wait_seconds = 0
timeout = 360
+ @mock.patch("airflow.providers.airbyte.operators.airbyte.time")
@mock.patch("airbyte_api.jobs.Jobs.create_job")
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.wait_for_job",
return_value=None)
- def test_execute(self, mock_wait_for_job, mock_submit_sync_connection,
create_connection_without_db):
+ def test_execute(
+ self, mock_wait_for_job, mock_submit_sync_connection, mock_time,
create_connection_without_db
+ ):
conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte",
host="airbyte.com")
create_connection_without_db(conn)
mock_response = mock.Mock()
@@ -70,19 +73,27 @@ class TestAirbyteTriggerSyncOp:
job_id=self.job_id, wait_seconds=self.wait_seconds,
timeout=self.timeout
)
+ # Ensure that wall-clock time is used during operator execution flow.
+ mock_time.time.assert_called()
+ mock_time.monotonic.assert_not_called()
+
+ @mock.patch("airflow.providers.airbyte.operators.airbyte.time")
@mock.patch("airflow.providers.airbyte.operators.airbyte.AirbyteTriggerSyncOperator.defer")
@mock.patch("airflow.providers.airbyte.operators.airbyte.AirbyteSyncTrigger")
@mock.patch("airbyte_api.jobs.Jobs.create_job")
- def test_execute_deferrable_does_not_pass_execution_timeout_to_defer(
+ def test_execute_deferrable_without_execution_timeout(
self,
mock_create_job,
mock_airbyte_trigger,
mock_defer,
+ mock_time,
create_connection_without_db,
):
conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte",
host="airbyte.com")
create_connection_without_db(conn)
+ mock_time.time.return_value = 1000.0
+
mock_response = mock.Mock()
mock_response.job_response = JobResponse(
connection_id="connection-mock",
@@ -97,30 +108,78 @@ class TestAirbyteTriggerSyncOp:
task_id="test_airbyte_op",
airbyte_conn_id=self.airbyte_conn_id,
connection_id=self.connection_id,
- wait_seconds=self.wait_seconds,
timeout=self.timeout,
deferrable=True,
- execution_timeout=timedelta(seconds=30),
+ execution_timeout=None,
)
op.execute({})
- # Explicitly pass timeout=None so Airflow's framework-level deferred
- # timeout handling does not bypass execute_complete(), which is
- # responsible for Airbyte job cancellation in deferrable mode.
mock_defer.assert_called_once_with(
method_name="execute_complete",
trigger=mock_airbyte_trigger.return_value,
timeout=None,
)
- # Ensure the trigger still receives execution_deadline handling for
- # Airbyte job timeout cancellation processing.
mock_airbyte_trigger.assert_called_once_with(
conn_id=self.airbyte_conn_id,
job_id=self.job_id,
- end_time=mock.ANY,
- execution_deadline=mock.ANY,
+ end_time=1000.0 + self.timeout,
+ execution_deadline=None,
+ poll_interval=60,
+ )
+
+ @mock.patch("airflow.providers.airbyte.operators.airbyte.time")
+
@mock.patch("airflow.providers.airbyte.operators.airbyte.AirbyteTriggerSyncOperator.defer")
+
@mock.patch("airflow.providers.airbyte.operators.airbyte.AirbyteSyncTrigger")
+ @mock.patch("airbyte_api.jobs.Jobs.create_job")
+ def test_execute_deferrable_with_execution_timeout(
+ self,
+ mock_create_job,
+ mock_airbyte_trigger,
+ mock_defer,
+ mock_time,
+ create_connection_without_db,
+ ):
+ conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte",
host="airbyte.com")
+ create_connection_without_db(conn)
+
+ mock_time.time.return_value = 1000.0
+
+ mock_response = mock.Mock()
+ mock_response.job_response = JobResponse(
+ connection_id="connection-mock",
+ job_id=1,
+ start_time="today",
+ job_type=JobTypeEnum.SYNC,
+ status=JobStatusEnum.RUNNING,
+ )
+ mock_create_job.return_value = mock_response
+
+ execution_timeout = timedelta(seconds=60)
+
+ op = AirbyteTriggerSyncOperator(
+ task_id="test_airbyte_op",
+ airbyte_conn_id=self.airbyte_conn_id,
+ connection_id=self.connection_id,
+ timeout=self.timeout,
+ deferrable=True,
+ execution_timeout=execution_timeout,
+ )
+
+ op.execute({})
+
+ mock_defer.assert_called_once_with(
+ method_name="execute_complete",
+ trigger=mock_airbyte_trigger.return_value,
+ timeout=timedelta(seconds=180), # 60s timeout + 120s buffer
+ )
+
+ mock_airbyte_trigger.assert_called_once_with(
+ conn_id=self.airbyte_conn_id,
+ job_id=self.job_id,
+ end_time=1000.0 + self.timeout,
+ execution_deadline=1060.0,
poll_interval=60,
)
diff --git a/providers/airbyte/tests/unit/airbyte/triggers/test_airbyte.py
b/providers/airbyte/tests/unit/airbyte/triggers/test_airbyte.py
index c1777b834b9..ab554037cfe 100644
--- a/providers/airbyte/tests/unit/airbyte/triggers/test_airbyte.py
+++ b/providers/airbyte/tests/unit/airbyte/triggers/test_airbyte.py
@@ -18,6 +18,7 @@ from __future__ import annotations
import asyncio
import time
+from contextlib import suppress
from unittest import mock
import pytest
@@ -44,11 +45,11 @@ class TestAirbyteSyncTrigger:
@pytest.fixture
def end_time(self):
- return time.monotonic() + 60 * 60 * 24 * 7
+ return time.time() + 60 * 60 * 24 * 7
@pytest.fixture
def execution_deadline(self):
- return time.monotonic() + 60 * 60 * 24 * 7
+ return time.time() + 60 * 60 * 24 * 7
def test_serialization(self, end_time, execution_deadline):
"""Assert TestAirbyteSyncTrigger correctly serializes its arguments
and classpath."""
@@ -205,7 +206,7 @@ class TestAirbyteSyncTrigger:
"""Assert that run timeout after end_time elapsed"""
mock_get_job_status.side_effect = JobStatusEnum.RUNNING
- end_time = time.monotonic()
+ end_time = time.time()
trigger = AirbyteSyncTrigger(
conn_id=self.CONN_ID,
poll_interval=self.POLL_INTERVAL,
@@ -232,7 +233,7 @@ class TestAirbyteSyncTrigger:
async def test_airbyte_job_run_execution_timeout(self,
mock_get_job_status, end_time):
"""Assert that run timeout after execution_deadline has elapsed"""
mock_get_job_status.side_effect = JobStatusEnum.RUNNING
- execution_deadline = time.monotonic() - 1
+ execution_deadline = time.time() - 1
trigger = AirbyteSyncTrigger(
conn_id=self.CONN_ID,
@@ -255,6 +256,76 @@ class TestAirbyteSyncTrigger:
assert len(events) == 1
assert expected_result in events
+ @pytest.mark.asyncio
+
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job_status")
+ async def test_execution_timeout_takes_precedence_over_success_status(
+ self,
+ mock_get_job_status,
+ end_time,
+ ):
+ """Execution timeout should take precedence over terminal job
states."""
+
+ mock_get_job_status.return_value = JobStatusEnum.SUCCEEDED
+
+ trigger = AirbyteSyncTrigger(
+ conn_id=self.CONN_ID,
+ poll_interval=self.POLL_INTERVAL,
+ end_time=end_time,
+ execution_deadline=time.time() - 1,
+ job_id=self.JOB_ID,
+ )
+
+ events = [e async for e in trigger.run()]
+
+ expected_result = TriggerEvent(
+ {
+ "status": "timeout",
+ "message": f"Job run {self.JOB_ID} has reached execution
timeout.",
+ "job_id": self.JOB_ID,
+ }
+ )
+
+ assert len(events) == 1
+ assert expected_result in events
+
+ @pytest.mark.asyncio
+ @mock.patch("airflow.providers.airbyte.triggers.airbyte.time")
+
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job_status")
+ async def test_airbyte_job_run_trigger_uses_wall_clock_end_time(
+ self,
+ mock_get_job_status,
+ mock_time,
+ end_time,
+ execution_deadline,
+ ):
+ """Assert serialized deadlines are compared to wall-clock time."""
+
+ mock_time.time.return_value = end_time - 60
+
+ mock_get_job_status.return_value = JobStatusEnum.RUNNING
+
+ trigger = AirbyteSyncTrigger(
+ conn_id=self.CONN_ID,
+ poll_interval=self.POLL_INTERVAL,
+ end_time=end_time,
+ execution_deadline=execution_deadline,
+ job_id=self.JOB_ID,
+ )
+
+ task = asyncio.create_task(trigger.run().__anext__())
+
+ await asyncio.sleep(0)
+
+ assert task.done() is False
+
+ mock_time.time.assert_called()
+ mock_time.monotonic.assert_not_called()
+ mock_get_job_status.assert_called_once_with(self.JOB_ID)
+
+ task.cancel()
+ with suppress(asyncio.CancelledError):
+ await task
+
@pytest.mark.asyncio
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job_status")
async def test_terminal_yields_only_once(self, mock_get_job_status):
@@ -263,7 +334,7 @@ class TestAirbyteSyncTrigger:
trigger = AirbyteSyncTrigger(
conn_id="airbyte_default",
poll_interval=1,
- end_time=time.monotonic() + 100,
+ end_time=time.time() + 100,
job_id=1234,
)