This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 04a1309d033 Fix dbt Cloud trigger timeout deadline clock (#67626)
04a1309d033 is described below
commit 04a1309d03390e2acadda23bf6722de72ade46dd
Author: Revanth <[email protected]>
AuthorDate: Sun May 31 15:08:11 2026 -0500
Fix dbt Cloud trigger timeout deadline clock (#67626)
---
.../airflow/providers/dbt/cloud/operators/dbt.py | 4 +--
.../airflow/providers/dbt/cloud/triggers/dbt.py | 2 +-
.../tests/unit/dbt/cloud/triggers/test_dbt.py | 39 +++++++++++++++++++---
3 files changed, 38 insertions(+), 7 deletions(-)
diff --git
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
index 088e5f48de2..6547fadb0f9 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
@@ -250,10 +250,10 @@ class DbtCloudRunJobOperator(BaseOperator):
# execution_timeout is a hard task-level limit (cancels the job),
# while timeout only limits how long we wait for the job to finish.
# If both are set, the earliest deadline wins.
- end_time = time.monotonic() + self.timeout
+ end_time = time.time() + self.timeout
execution_deadline = None
if self.execution_timeout is not None:
- execution_deadline = time.monotonic() +
self.execution_timeout.total_seconds()
+ execution_deadline = time.time() +
self.execution_timeout.total_seconds()
job_run_info = JobRunInfo(account_id=self.account_id,
run_id=self.run_id)
job_run_status = self.hook.get_job_run_status(**job_run_info)
diff --git
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
index 8efd918780b..f2525406221 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
@@ -80,7 +80,7 @@ class DbtCloudRunJobTrigger(BaseTrigger):
hook = DbtCloudHook(self.conn_id, **self.hook_params)
try:
while True:
- now = time.monotonic()
+ now = time.time()
job_run_status = await hook.get_job_status(self.run_id,
self.account_id)
diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
b/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
index 12c94d19b9e..63376e562e6 100644
--- a/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
+++ b/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
@@ -18,6 +18,7 @@ from __future__ import annotations
import asyncio
import time
+from contextlib import suppress
from unittest import mock
from unittest.mock import AsyncMock
@@ -38,11 +39,11 @@ class TestDbtCloudRunJobTrigger:
@pytest.fixture
def end_time(self):
- return time.monotonic() + 60 * 60 * 24 * 7
+ return time.time() + 60 * 60 * 24 * 7
@pytest.fixture
def execution_deadline(self):
- return time.monotonic() + 60 * 60 * 24 * 7
+ return time.time() + 60 * 60 * 24 * 7
def test_serialization(self, end_time, execution_deadline):
"""Assert DbtCloudRunJobTrigger correctly serializes its arguments and
classpath."""
@@ -87,6 +88,36 @@ class TestDbtCloudRunJobTrigger:
assert task.done() is False
asyncio.get_event_loop().stop()
+ @pytest.mark.asyncio
+ @mock.patch("airflow.providers.dbt.cloud.triggers.dbt.time")
+
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+ async def test_dbt_run_job_trigger_uses_wall_clock_end_time(
+ self, mock_get_job_status, mock_time, end_time
+ ):
+ """Assert serialized end_time is compared to wall-clock time, not
monotonic time."""
+
+ mock_time.time.return_value = end_time - 60
+ mock_get_job_status.return_value = DbtCloudJobRunStatus.RUNNING.value
+ trigger = DbtCloudRunJobTrigger(
+ conn_id=self.CONN_ID,
+ poll_interval=self.POLL_INTERVAL,
+ end_time=end_time,
+ run_id=self.RUN_ID,
+ account_id=self.ACCOUNT_ID,
+ )
+
+ task = asyncio.create_task(trigger.run().__anext__())
+ await asyncio.sleep(0)
+
+ assert task.done() is False
+ mock_time.time.assert_called()
+ mock_time.monotonic.assert_not_called()
+ mock_get_job_status.assert_called_once_with(self.RUN_ID,
self.ACCOUNT_ID)
+
+ task.cancel()
+ with suppress(asyncio.CancelledError):
+ await task
+
@pytest.mark.asyncio
@pytest.mark.parametrize(
("mock_value", "mock_status", "mock_message"),
@@ -151,7 +182,7 @@ class TestDbtCloudRunJobTrigger:
mock_get_job_status.return_value = DbtCloudJobRunStatus.RUNNING.value
- end_time = time.monotonic() - 1
+ end_time = time.time() - 1
trigger = DbtCloudRunJobTrigger(
conn_id=self.CONN_ID,
poll_interval=self.POLL_INTERVAL,
@@ -178,7 +209,7 @@ class TestDbtCloudRunJobTrigger:
mock_get_job_status.return_value = DbtCloudJobRunStatus.RUNNING.value
- execution_deadline = time.monotonic() - 1
+ execution_deadline = time.time() - 1
trigger = DbtCloudRunJobTrigger(
conn_id=self.CONN_ID,