This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 02828c2c6e3 Fix DbtCloudRunJobTrigger timeout error message and add
final status check (#61979) (#62306)
02828c2c6e3 is described below
commit 02828c2c6e31d254769959d6ad732e622a6413b4
Author: deepinsight coder <[email protected]>
AuthorDate: Sun Feb 22 01:21:29 2026 -0800
Fix DbtCloudRunJobTrigger timeout error message and add final status check
(#61979) (#62306)
Fix two bugs in DbtCloudRunJobTrigger.run():
1. The timeout error message displayed the raw epoch timestamp
(self.end_time)
instead of a meaningful duration, producing nonsensical messages like
"after 1771200015.8 seconds". Changed to "within the configured timeout."
2. When the timeout expires, the trigger now performs a final
is_still_running()
check before yielding a timeout error. This prevents false timeout
failures
when a dbt Cloud job completes between the last poll and the timeout
boundary.
Closes: #61979
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../airflow/providers/dbt/cloud/triggers/dbt.py | 8 ++++--
.../tests/unit/dbt/cloud/triggers/test_dbt.py | 32 +++++++++++++++++++++-
2 files changed, 37 insertions(+), 3 deletions(-)
diff --git
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
index 9d4c59473b1..a63a516c85e 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
@@ -76,11 +76,15 @@ class DbtCloudRunJobTrigger(BaseTrigger):
try:
while await self.is_still_running(hook):
if self.end_time < time.time():
+ # Perform a final status check before declaring timeout,
in case the
+ # job completed between the last poll and the timeout
expiry.
+ if not await self.is_still_running(hook):
+ break
yield TriggerEvent(
{
"status": "error",
- "message": f"Job run {self.run_id} has not reached
a terminal status after "
- f"{self.end_time} seconds.",
+ "message": f"Job run {self.run_id} has not reached
a terminal status "
+ f"within the configured timeout.",
"run_id": self.run_id,
}
)
diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
b/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
index 57ca1848d15..aa4765a8387 100644
--- a/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
+++ b/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
@@ -219,7 +219,37 @@ class TestDbtCloudRunJobTrigger:
{
"status": "error",
"message": f"Job run {self.RUN_ID} has not reached a terminal
status "
- f"after {end_time} seconds.",
+ f"within the configured timeout.",
+ "run_id": self.RUN_ID,
+ }
+ )
+ assert expected == actual
+
+ @pytest.mark.asyncio
+
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+ async def test_dbt_job_run_timeout_with_final_status_check(self,
mock_get_job_status):
+ """Assert that a final status check prevents false timeout when job
completes near timeout."""
+ mock_get_job_status.return_value = DbtCloudJobRunStatus.SUCCESS.value
+ # Simulate: first is_still_running call returns True (job running),
+ # then the timeout check fires, but the final is_still_running call
returns False
+ # (job just completed). The trigger should yield success, not a
timeout error.
+ end_time = time.time() # Already expired
+ trigger = DbtCloudRunJobTrigger(
+ conn_id=self.CONN_ID,
+ poll_interval=self.POLL_INTERVAL,
+ end_time=end_time,
+ run_id=self.RUN_ID,
+ account_id=self.ACCOUNT_ID,
+ )
+ with mock.patch.object(trigger, "is_still_running",
new_callable=AsyncMock) as mock_running:
+ # First call: still running; second call (final check): no longer
running
+ mock_running.side_effect = [True, False]
+ generator = trigger.run()
+ actual = await generator.asend(None)
+ expected = TriggerEvent(
+ {
+ "status": "success",
+ "message": f"Job run {self.RUN_ID} has completed
successfully.",
"run_id": self.RUN_ID,
}
)