This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 04a1309d033 Fix dbt Cloud trigger timeout deadline clock (#67626)
04a1309d033 is described below

commit 04a1309d03390e2acadda23bf6722de72ade46dd
Author: Revanth <[email protected]>
AuthorDate: Sun May 31 15:08:11 2026 -0500

    Fix dbt Cloud trigger timeout deadline clock (#67626)
---
 .../airflow/providers/dbt/cloud/operators/dbt.py   |  4 +--
 .../airflow/providers/dbt/cloud/triggers/dbt.py    |  2 +-
 .../tests/unit/dbt/cloud/triggers/test_dbt.py      | 39 +++++++++++++++++++---
 3 files changed, 38 insertions(+), 7 deletions(-)

diff --git 
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py 
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
index 088e5f48de2..6547fadb0f9 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
@@ -250,10 +250,10 @@ class DbtCloudRunJobOperator(BaseOperator):
             # execution_timeout is a hard task-level limit (cancels the job),
             # while timeout only limits how long we wait for the job to finish.
             # If both are set, the earliest deadline wins.
-            end_time = time.monotonic() + self.timeout
+            end_time = time.time() + self.timeout
             execution_deadline = None
             if self.execution_timeout is not None:
-                execution_deadline = time.monotonic() + 
self.execution_timeout.total_seconds()
+                execution_deadline = time.time() + 
self.execution_timeout.total_seconds()
 
             job_run_info = JobRunInfo(account_id=self.account_id, 
run_id=self.run_id)
             job_run_status = self.hook.get_job_run_status(**job_run_info)
diff --git 
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py 
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
index 8efd918780b..f2525406221 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
@@ -80,7 +80,7 @@ class DbtCloudRunJobTrigger(BaseTrigger):
         hook = DbtCloudHook(self.conn_id, **self.hook_params)
         try:
             while True:
-                now = time.monotonic()
+                now = time.time()
 
                 job_run_status = await hook.get_job_status(self.run_id, 
self.account_id)
 
diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py 
b/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
index 12c94d19b9e..63376e562e6 100644
--- a/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
+++ b/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
@@ -18,6 +18,7 @@ from __future__ import annotations
 
 import asyncio
 import time
+from contextlib import suppress
 from unittest import mock
 from unittest.mock import AsyncMock
 
@@ -38,11 +39,11 @@ class TestDbtCloudRunJobTrigger:
 
     @pytest.fixture
     def end_time(self):
-        return time.monotonic() + 60 * 60 * 24 * 7
+        return time.time() + 60 * 60 * 24 * 7
 
     @pytest.fixture
     def execution_deadline(self):
-        return time.monotonic() + 60 * 60 * 24 * 7
+        return time.time() + 60 * 60 * 24 * 7
 
     def test_serialization(self, end_time, execution_deadline):
         """Assert DbtCloudRunJobTrigger correctly serializes its arguments and 
classpath."""
@@ -87,6 +88,36 @@ class TestDbtCloudRunJobTrigger:
         assert task.done() is False
         asyncio.get_event_loop().stop()
 
+    @pytest.mark.asyncio
+    @mock.patch("airflow.providers.dbt.cloud.triggers.dbt.time")
+    
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+    async def test_dbt_run_job_trigger_uses_wall_clock_end_time(
+        self, mock_get_job_status, mock_time, end_time
+    ):
+        """Assert serialized end_time is compared to wall-clock time, not 
monotonic time."""
+
+        mock_time.time.return_value = end_time - 60
+        mock_get_job_status.return_value = DbtCloudJobRunStatus.RUNNING.value
+        trigger = DbtCloudRunJobTrigger(
+            conn_id=self.CONN_ID,
+            poll_interval=self.POLL_INTERVAL,
+            end_time=end_time,
+            run_id=self.RUN_ID,
+            account_id=self.ACCOUNT_ID,
+        )
+
+        task = asyncio.create_task(trigger.run().__anext__())
+        await asyncio.sleep(0)
+
+        assert task.done() is False
+        mock_time.time.assert_called()
+        mock_time.monotonic.assert_not_called()
+        mock_get_job_status.assert_called_once_with(self.RUN_ID, 
self.ACCOUNT_ID)
+
+        task.cancel()
+        with suppress(asyncio.CancelledError):
+            await task
+
     @pytest.mark.asyncio
     @pytest.mark.parametrize(
         ("mock_value", "mock_status", "mock_message"),
@@ -151,7 +182,7 @@ class TestDbtCloudRunJobTrigger:
 
         mock_get_job_status.return_value = DbtCloudJobRunStatus.RUNNING.value
 
-        end_time = time.monotonic() - 1
+        end_time = time.time() - 1
         trigger = DbtCloudRunJobTrigger(
             conn_id=self.CONN_ID,
             poll_interval=self.POLL_INTERVAL,
@@ -178,7 +209,7 @@ class TestDbtCloudRunJobTrigger:
 
         mock_get_job_status.return_value = DbtCloudJobRunStatus.RUNNING.value
 
-        execution_deadline = time.monotonic() - 1
+        execution_deadline = time.time() - 1
 
         trigger = DbtCloudRunJobTrigger(
             conn_id=self.CONN_ID,

Reply via email to