This is an automated email from the ASF dual-hosted git repository.
joshfell pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b7418576cc7 Raise on unexpected terminal dbt Cloud job run states
(#61300)
b7418576cc7 is described below
commit b7418576cc7fb0d5298b8a54be76390fbada0360
Author: SameerMesiah97 <[email protected]>
AuthorDate: Fri Feb 27 15:17:46 2026 +0000
Raise on unexpected terminal dbt Cloud job run states (#61300)
DbtCloudHook.wait_for_job_run_status previously returned False when a job
run
reached a terminal failure state (ERROR or CANCELLED), which could allow
Airflow
tasks to succeed silently when dbt Cloud jobs failed.
This change updates the helper to raise DbtCloudJobRunException when a job
run
reaches an unexpected terminal state before the expected status is reached,
ensuring task failure semantics correctly reflect external job failures.
Call sites are updated accordingly, and on_kill now guards against
propagated
exceptions since cancellation confirmation is best-effort and should not
affect task
termination behavior.
Co-authored-by: Sameer Mesiah <[email protected]>
---
.../src/airflow/providers/dbt/cloud/hooks/dbt.py | 26 ++++++++++++++--------
.../airflow/providers/dbt/cloud/operators/dbt.py | 16 +++++++++++--
.../cloud/tests/unit/dbt/cloud/hooks/test_dbt.py | 6 ++---
.../tests/unit/dbt/cloud/operators/test_dbt.py | 2 +-
4 files changed, 35 insertions(+), 15 deletions(-)
diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/hooks/dbt.py
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/hooks/dbt.py
index ca20480abd2..8910f9c8ebc 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/hooks/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/hooks/dbt.py
@@ -797,20 +797,32 @@ class DbtCloudHook(HttpHook):
:param check_interval: Time in seconds to check on a pipeline run's
status.
:param timeout: Time in seconds to wait for a pipeline to reach a
terminal status or the expected
status.
- :return: Boolean indicating if the job run has reached the
``expected_status``.
+ :return: ``True`` if the job run has reached the ``expected_status``.
+ :raises: ``DbtCloudJobRunException`` If the job run reaches an
unexpected terminal status
+ or does not reach an expected status within the timeout.
"""
expected_statuses = (expected_statuses,) if
isinstance(expected_statuses, int) else expected_statuses
DbtCloudJobRunStatus.check_is_valid(expected_statuses)
job_run_info = JobRunInfo(account_id=account_id, run_id=run_id)
- job_run_status = self.get_job_run_status(**job_run_info)
start_time = time.monotonic()
- while (
- not DbtCloudJobRunStatus.is_terminal(job_run_status) and
job_run_status not in expected_statuses
- ):
+ while True:
+ job_run_status = self.get_job_run_status(**job_run_info)
+
+ if job_run_status in expected_statuses:
+ return True
+
+ # Reached terminal failure before expected state.
+ if DbtCloudJobRunStatus.is_terminal(job_run_status):
+ raise DbtCloudJobRunException(
+ f"Job run {run_id} reached terminal status "
+ f"{DbtCloudJobRunStatus(job_run_status).name} "
+ f"before reaching expected statuses {expected_statuses}"
+ )
+
# Check if the job-run duration has exceeded the ``timeout``
configured.
if start_time + timeout < time.monotonic():
raise DbtCloudJobRunException(
@@ -820,10 +832,6 @@ class DbtCloudHook(HttpHook):
# Wait to check the status of the job run based on the
``check_interval`` configured.
time.sleep(check_interval)
- job_run_status = self.get_job_run_status(**job_run_info)
-
- return job_run_status in expected_statuses
-
@fallback_to_default_account
def cancel_job_run(self, run_id: int, account_id: int | None = None) ->
None:
"""
diff --git
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
index 63ce7955697..ad5f1418807 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
@@ -256,9 +256,14 @@ class DbtCloudRunJobOperator(BaseOperator):
return int(event["run_id"])
def on_kill(self) -> None:
- if self.run_id:
- self.hook.cancel_job_run(account_id=self.account_id,
run_id=self.run_id)
+ if not self.run_id:
+ return
+ self.hook.cancel_job_run(account_id=self.account_id,
run_id=self.run_id)
+
+ # Attempt best-effort confirmation of cancellation.
+ try:
+ # This can raise a DbtCloudJobRunException under normal operation.
if self.hook.wait_for_job_run_status(
run_id=self.run_id,
account_id=self.account_id,
@@ -268,6 +273,13 @@ class DbtCloudRunJobOperator(BaseOperator):
):
self.log.info("Job run %s has been cancelled successfully.",
self.run_id)
+ except DbtCloudJobRunException as exc:
+ self.log.warning(
+ "Failed to confirm cancellation of job run %s during task
kill: %s",
+ self.run_id,
+ exc,
+ )
+
@cached_property
def hook(self):
"""Returns DBT Cloud hook."""
diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/hooks/test_dbt.py
b/providers/dbt/cloud/tests/unit/dbt/cloud/hooks/test_dbt.py
index 45f940236b7..6cddddd4bf1 100644
--- a/providers/dbt/cloud/tests/unit/dbt/cloud/hooks/test_dbt.py
+++ b/providers/dbt/cloud/tests/unit/dbt/cloud/hooks/test_dbt.py
@@ -904,8 +904,8 @@ class TestDbtCloudHook:
wait_for_job_run_status_test_args = [
(DbtCloudJobRunStatus.SUCCESS.value,
DbtCloudJobRunStatus.SUCCESS.value, True),
- (DbtCloudJobRunStatus.ERROR.value, DbtCloudJobRunStatus.SUCCESS.value,
False),
- (DbtCloudJobRunStatus.CANCELLED.value,
DbtCloudJobRunStatus.SUCCESS.value, False),
+ (DbtCloudJobRunStatus.ERROR.value, DbtCloudJobRunStatus.SUCCESS.value,
"exception"),
+ (DbtCloudJobRunStatus.CANCELLED.value,
DbtCloudJobRunStatus.SUCCESS.value, "exception"),
(DbtCloudJobRunStatus.RUNNING.value,
DbtCloudJobRunStatus.SUCCESS.value, "timeout"),
(DbtCloudJobRunStatus.QUEUED.value,
DbtCloudJobRunStatus.SUCCESS.value, "timeout"),
(DbtCloudJobRunStatus.STARTING.value,
DbtCloudJobRunStatus.SUCCESS.value, "timeout"),
@@ -943,7 +943,7 @@ class TestDbtCloudHook:
):
mock_job_run_status.return_value = job_run_status
- if expected_output != "timeout":
+ if expected_output not in ("timeout", "exception"):
assert hook.wait_for_job_run_status(**config) ==
expected_output
else:
with pytest.raises(DbtCloudJobRunException):
diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
index 4aa7a72f962..e41089c9647 100644
--- a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
+++ b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
@@ -376,7 +376,7 @@ class TestDbtCloudRunJobOperator:
assert mock_run_job.return_value.data["id"] == RUN_ID
elif expected_output == "exception":
# The operator should fail if the job run fails or is
cancelled.
- error_message = r"has failed or has been cancelled\.$"
+ error_message = r"reached terminal status (ERROR|CANCELLED)"
with pytest.raises(DbtCloudJobRunException,
match=error_message):
operator.execute(context=self.mock_context)
else: