This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 57e8a7a6a19 Fix deferrable execution_timeout handling in
AirbyteTriggerSyncOperator (#67382)
57e8a7a6a19 is described below
commit 57e8a7a6a1979ff9d82e7c6a8c62f048027d2d8f
Author: SameerMesiah97 <[email protected]>
AuthorDate: Mon May 25 16:00:41 2026 +0100
Fix deferrable execution_timeout handling in AirbyteTriggerSyncOperator
(#67382)
Prevent framework-level deferred timeouts from bypassing
execute_complete() cancellation handling for Airbyte jobs.
Co-authored-by: Sameer Mesiah <[email protected]>
---
.../airflow/providers/airbyte/operators/airbyte.py | 2 +-
.../tests/unit/airbyte/operators/test_airbyte.py | 55 ++++++++++++++++++++++
2 files changed, 56 insertions(+), 1 deletion(-)
diff --git
a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
index 7f8f80a1b0b..ae4e19e125f 100644
--- a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
+++ b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
@@ -111,7 +111,7 @@ class AirbyteTriggerSyncOperator(BaseOperator):
self.log.debug("Running in deferrable mode in job state %s...",
state)
if state in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING,
JobStatusEnum.INCOMPLETE):
self.defer(
- timeout=self.execution_timeout,
+ timeout=None,
trigger=AirbyteSyncTrigger(
conn_id=self.airbyte_conn_id,
job_id=self.job_id,
diff --git a/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
b/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
index 7cb792e960d..b9165dca06f 100644
--- a/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
+++ b/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+from datetime import timedelta
from unittest import mock
import pytest
@@ -69,6 +70,60 @@ class TestAirbyteTriggerSyncOp:
job_id=self.job_id, wait_seconds=self.wait_seconds,
timeout=self.timeout
)
+
@mock.patch("airflow.providers.airbyte.operators.airbyte.AirbyteTriggerSyncOperator.defer")
+
@mock.patch("airflow.providers.airbyte.operators.airbyte.AirbyteSyncTrigger")
+ @mock.patch("airbyte_api.jobs.Jobs.create_job")
+ def test_execute_deferrable_does_not_pass_execution_timeout_to_defer(
+ self,
+ mock_create_job,
+ mock_airbyte_trigger,
+ mock_defer,
+ create_connection_without_db,
+ ):
+ conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte",
host="airbyte.com")
+ create_connection_without_db(conn)
+
+ mock_response = mock.Mock()
+ mock_response.job_response = JobResponse(
+ connection_id="connection-mock",
+ job_id=1,
+ start_time="today",
+ job_type=JobTypeEnum.SYNC,
+ status=JobStatusEnum.RUNNING,
+ )
+ mock_create_job.return_value = mock_response
+
+ op = AirbyteTriggerSyncOperator(
+ task_id="test_airbyte_op",
+ airbyte_conn_id=self.airbyte_conn_id,
+ connection_id=self.connection_id,
+ wait_seconds=self.wait_seconds,
+ timeout=self.timeout,
+ deferrable=True,
+ execution_timeout=timedelta(seconds=30),
+ )
+
+ op.execute({})
+
+ # Explicitly pass timeout=None so Airflow's framework-level deferred
+ # timeout handling does not bypass execute_complete(), which is
+ # responsible for Airbyte job cancellation in deferrable mode.
+ mock_defer.assert_called_once_with(
+ method_name="execute_complete",
+ trigger=mock_airbyte_trigger.return_value,
+ timeout=None,
+ )
+
+ # Ensure the trigger still receives execution_deadline handling for
+ # Airbyte job timeout cancellation processing.
+ mock_airbyte_trigger.assert_called_once_with(
+ conn_id=self.airbyte_conn_id,
+ job_id=self.job_id,
+ end_time=mock.ANY,
+ execution_deadline=mock.ANY,
+ poll_interval=60,
+ )
+
@pytest.mark.parametrize(
("status", "should_raise", "expected_message"),
[