This is an automated email from the ASF dual-hosted git repository.

joshfell pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d3015731fc Implement execution_timeout semantics for 
DbtCloudRunJobOperator in deferrable mode (#61472)
5d3015731fc is described below

commit 5d3015731fc3822142b177abf142327143c2b0e5
Author: SameerMesiah97 <[email protected]>
AuthorDate: Wed Mar 11 19:21:28 2026 +0000

    Implement execution_timeout semantics for DbtCloudRunJobOperator in 
deferrable mode (#61472)
    
    * Enforce execution_timeout in deferrable DbtCloudRunJobOperator
    
    Restore execution_timeout semantics in deferrable mode by propagating
    timeouts through the trigger and explicitly cancelling dbt Cloud jobs
    when the task exceeds its execution deadline.
    
    This preserves behavior parity with non-deferrable execution and avoids
    leaking dbt jobs.
    
    * Update 
providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
    
    Co-authored-by: Josh Fell <[email protected]>
    
    * Adding docstring entry for execution_timeout parameter.
    
    ---------
    
    Co-authored-by: Sameer Mesiah <[email protected]>
    Co-authored-by: Josh Fell <[email protected]>
---
 .../airflow/providers/dbt/cloud/operators/dbt.py   | 27 +++++++++++++--
 .../airflow/providers/dbt/cloud/triggers/dbt.py    | 16 +++++++++
 .../tests/unit/dbt/cloud/operators/test_dbt.py     | 38 +++++++++++++++++++++-
 .../tests/unit/dbt/cloud/triggers/test_dbt.py      | 33 +++++++++++++++++++
 4 files changed, 110 insertions(+), 4 deletions(-)

diff --git 
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py 
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
index ad5f1418807..4d036f7c36b 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
@@ -23,7 +23,7 @@ from functools import cached_property
 from pathlib import Path
 from typing import TYPE_CHECKING, Any
 
-from airflow.providers.common.compat.sdk import BaseOperator, 
BaseOperatorLink, XCom, conf
+from airflow.providers.common.compat.sdk import AirflowException, 
BaseOperator, BaseOperatorLink, XCom, conf
 from airflow.providers.dbt.cloud.hooks.dbt import (
     DbtCloudHook,
     DbtCloudJobRunException,
@@ -70,7 +70,9 @@ class DbtCloudRunJobOperator(BaseOperator):
         enabled but could be disabled to perform an asynchronous wait for a 
long-running job run execution
         using the ``DbtCloudJobRunSensor``.
     :param timeout: Time in seconds to wait for a job run to reach a terminal 
status for non-asynchronous
-        waits. Used only if ``wait_for_termination`` is True. Defaults to 7 
days.
+        waits. Used only if ``wait_for_termination`` is True. This limits how 
long the operator waits for the
+        job to complete and does not imply job cancellation. Task-level 
timeouts should be
+        enforced via ``execution_timeout``. Defaults to 7 days.
     :param check_interval: Time in seconds to check on a job run's status for 
non-asynchronous waits.
         Used only if ``wait_for_termination`` is True. Defaults to 60 seconds.
     :param additional_run_config: Optional. Any additional parameters that 
should be included in the API
@@ -83,6 +85,9 @@ class DbtCloudRunJobOperator(BaseOperator):
         
https://docs.getdbt.com/dbt-cloud/api-v2#/operations/Retry%20Failed%20Job
     :param deferrable: Run operator in the deferrable mode
     :param hook_params: Extra arguments passed to the DbtCloudHook constructor.
+    :param execution_timeout: Maximum time allowed for the task to run. If 
exceeded, the dbt Cloud
+        job will be cancelled and the task will fail. When both 
``execution_timeout`` and
+        ``timeout`` are set, the earlier deadline takes precedence.
     :return: The ID of the triggered dbt Cloud job run.
     """
 
@@ -212,16 +217,26 @@ class DbtCloudRunJobOperator(BaseOperator):
                     raise DbtCloudJobRunException(f"Job run {self.run_id} has 
failed or has been cancelled.")
 
                 return self.run_id
+
+            # Derive absolute deadlines for deferrable execution.
+            # execution_timeout is a hard task-level limit (cancels the job),
+            # while timeout only limits how long we wait for the job to finish.
+            # If both are set, the earliest deadline wins.
             end_time = time.time() + self.timeout
+            execution_deadline = None
+            if self.execution_timeout:
+                execution_deadline = time.time() + 
self.execution_timeout.total_seconds()
+
             job_run_info = JobRunInfo(account_id=self.account_id, 
run_id=self.run_id)
             job_run_status = self.hook.get_job_run_status(**job_run_info)
             if not DbtCloudJobRunStatus.is_terminal(job_run_status):
                 self.defer(
-                    timeout=self.execution_timeout,
+                    timeout=None,
                     trigger=DbtCloudRunJobTrigger(
                         conn_id=self.dbt_cloud_conn_id,
                         run_id=self.run_id,
                         end_time=end_time,
+                        execution_deadline=execution_deadline,
                         account_id=self.account_id,
                         poll_interval=self.check_interval,
                     ),
@@ -252,6 +267,12 @@ class DbtCloudRunJobOperator(BaseOperator):
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been 
cancelled.")
         if event["status"] == "error":
             raise DbtCloudJobRunException(f"Job run {self.run_id} has failed.")
+
+        # Enforce execution_timeout semantics in deferrable mode by cancelling 
the job.
+        if event["status"] == "timeout":
+            self.hook.cancel_job_run(account_id=self.account_id, 
run_id=self.run_id)
+            raise AirflowException(f"Job run {self.run_id} has timed out.")
+
         self.log.info(event["message"])
         return int(event["run_id"])
 
diff --git 
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py 
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
index a63a516c85e..a0bb91861a8 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
@@ -34,6 +34,8 @@ class DbtCloudRunJobTrigger(BaseTrigger):
     :param conn_id: The connection identifier for connecting to Dbt.
     :param run_id: The ID of a dbt Cloud job.
     :param end_time: Time in seconds to wait for a job run to reach a terminal 
status. Defaults to 7 days.
+    :param execution_deadline: Optional absolute timestamp (in seconds since 
the epoch) after which
+        the task is considered timed out.
     :param account_id: The ID of a dbt Cloud account.
     :param poll_interval:  polling period in seconds to check for the status.
     :param hook_params: Extra arguments passed to the DbtCloudHook constructor.
@@ -47,12 +49,14 @@ class DbtCloudRunJobTrigger(BaseTrigger):
         poll_interval: float,
         account_id: int | None,
         hook_params: dict[str, Any] | None = None,
+        execution_deadline: float | None = None,
     ):
         super().__init__()
         self.run_id = run_id
         self.account_id = account_id
         self.conn_id = conn_id
         self.end_time = end_time
+        self.execution_deadline = execution_deadline
         self.poll_interval = poll_interval
         self.hook_params = hook_params or {}
 
@@ -65,6 +69,7 @@ class DbtCloudRunJobTrigger(BaseTrigger):
                 "account_id": self.account_id,
                 "conn_id": self.conn_id,
                 "end_time": self.end_time,
+                "execution_deadline": self.execution_deadline,
                 "poll_interval": self.poll_interval,
                 "hook_params": self.hook_params,
             },
@@ -75,6 +80,17 @@ class DbtCloudRunJobTrigger(BaseTrigger):
         hook = DbtCloudHook(self.conn_id, **self.hook_params)
         try:
             while await self.is_still_running(hook):
+                if self.execution_deadline is not None:
+                    if self.execution_deadline < time.time():
+                        yield TriggerEvent(
+                            {
+                                "status": "timeout",
+                                "message": f"Job run {self.run_id} has timed 
out.",
+                                "run_id": self.run_id,
+                            }
+                        )
+                        return
+
                 if self.end_time < time.time():
                     # Perform a final status check before declaring timeout, 
in case the
                     # job completed between the last poll and the timeout 
expiry.
diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py 
b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
index e41089c9647..784cb62695e 100644
--- a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
+++ b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
@@ -23,7 +23,7 @@ from unittest.mock import MagicMock, patch
 import pytest
 
 from airflow.models import DAG, Connection
-from airflow.providers.common.compat.sdk import TaskDeferred, timezone
+from airflow.providers.common.compat.sdk import AirflowException, 
TaskDeferred, timezone
 from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, 
DbtCloudJobRunException, DbtCloudJobRunStatus
 from airflow.providers.dbt.cloud.operators.dbt import (
     DbtCloudGetJobRunArtifactOperator,
@@ -214,6 +214,42 @@ class TestDbtCloudRunJobOperator:
             dbt_op.execute(MagicMock())
         assert isinstance(exc.value.trigger, DbtCloudRunJobTrigger), "Trigger 
is not a DbtCloudRunJobTrigger"
 
+    def test_execute_complete_timeout_cancels_job(self):
+        """
+        Verify that when a deferrable dbt job emits a timeout event,
+        the operator cancels the job and fails.
+        """
+        operator = DbtCloudRunJobOperator(
+            task_id=TASK_ID,
+            dbt_cloud_conn_id=ACCOUNT_ID_CONN,
+            job_id=JOB_ID,
+            dag=self.dag,
+            deferrable=True,
+        )
+
+        # Pretend the job was already triggered.
+        operator.run_id = RUN_ID
+
+        # Mock the hook so we can assert cancellation.
+        operator.hook = MagicMock()
+
+        timeout_event = {
+            "status": "timeout",
+            "run_id": RUN_ID,
+            "message": "Job run timed out.",
+        }
+
+        with pytest.raises(AirflowException, match="has timed out"):
+            operator.execute_complete(
+                context=self.mock_context,
+                event=timeout_event,
+            )
+
+        operator.hook.cancel_job_run.assert_called_once_with(
+            account_id=operator.account_id,
+            run_id=RUN_ID,
+        )
+
     @patch(
         "airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_by_name",
         return_value=mock_response_json(DEFAULT_ACCOUNT_JOB_RESPONSE),
diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py 
b/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
index aa4765a8387..4d1d971f60d 100644
--- a/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
+++ b/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
@@ -35,6 +35,7 @@ class TestDbtCloudRunJobTrigger:
     CONN_ID = "dbt_cloud_default"
     ACCOUNT_ID = 12340
     END_TIME = time.time() + 60 * 60 * 24 * 7
+    EXECUTION_DEADLINE = time.time() + 60 * 60 * 24 * 7
     POLL_INTERVAL = 3.0
 
     def test_serialization(self):
@@ -43,6 +44,7 @@ class TestDbtCloudRunJobTrigger:
             conn_id=self.CONN_ID,
             poll_interval=self.POLL_INTERVAL,
             end_time=self.END_TIME,
+            execution_deadline=self.EXECUTION_DEADLINE,
             run_id=self.RUN_ID,
             account_id=self.ACCOUNT_ID,
             hook_params={"retry_delay": 10},
@@ -54,6 +56,7 @@ class TestDbtCloudRunJobTrigger:
             "account_id": self.ACCOUNT_ID,
             "conn_id": self.CONN_ID,
             "end_time": self.END_TIME,
+            "execution_deadline": self.EXECUTION_DEADLINE,
             "poll_interval": self.POLL_INTERVAL,
             "hook_params": {"retry_delay": 10},
         }
@@ -255,6 +258,36 @@ class TestDbtCloudRunJobTrigger:
         )
         assert expected == actual
 
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
+    async def test_dbt_job_run_execution_timeout(self, 
mocked_is_still_running):
+        """Assert that run emits timeout event after execution_deadline 
elapsed"""
+        mocked_is_still_running.return_value = True
+
+        execution_deadline = time.time()
+
+        trigger = DbtCloudRunJobTrigger(
+            conn_id=self.CONN_ID,
+            poll_interval=self.POLL_INTERVAL,
+            end_time=time.time() + 60,
+            execution_deadline=execution_deadline,
+            run_id=self.RUN_ID,
+            account_id=self.ACCOUNT_ID,
+        )
+
+        generator = trigger.run()
+        actual = await generator.asend(None)
+
+        expected = TriggerEvent(
+            {
+                "status": "timeout",
+                "message": f"Job run {self.RUN_ID} has timed out.",
+                "run_id": self.RUN_ID,
+            }
+        )
+
+        assert expected == actual
+
     @pytest.mark.asyncio
     @pytest.mark.parametrize(
         ("mock_response", "expected_status"),

Reply via email to