This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 349de0187b6 Fix deferrable execution_timeout handling in 
DbtCloudRunJobOperator (#67360)
349de0187b6 is described below

commit 349de0187b679f8f70affeee039f35f3c575f398
Author: SameerMesiah97 <[email protected]>
AuthorDate: Mon May 25 16:00:34 2026 +0100

    Fix deferrable execution_timeout handling in DbtCloudRunJobOperator (#67360)
    
    Prevent framework-level deferred timeouts from bypassing
    execute_complete() cancellation handling for dbt job runs.
    
    Co-authored-by: Sameer Mesiah <[email protected]>
---
 .../airflow/providers/dbt/cloud/operators/dbt.py   |  2 +-
 .../tests/unit/dbt/cloud/operators/test_dbt.py     | 54 +++++++++++++++++++++-
 2 files changed, 54 insertions(+), 2 deletions(-)

diff --git 
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py 
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
index 9c519a78461..088e5f48de2 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
@@ -259,7 +259,7 @@ class DbtCloudRunJobOperator(BaseOperator):
             job_run_status = self.hook.get_job_run_status(**job_run_info)
             if not DbtCloudJobRunStatus.is_terminal(job_run_status):
                 self.defer(
-                    timeout=self.execution_timeout,
+                    timeout=None,
                     trigger=DbtCloudRunJobTrigger(
                         conn_id=self.dbt_cloud_conn_id,
                         run_id=self.run_id,
diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py 
b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
index 2d2ed3c28f1..bb5988acdea 100644
--- a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
+++ b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
@@ -18,7 +18,7 @@ from __future__ import annotations
 
 import os
 from datetime import timedelta
-from unittest.mock import MagicMock, patch
+from unittest.mock import ANY, MagicMock, patch
 
 import pytest
 
@@ -180,6 +180,58 @@ class TestDbtCloudRunJobOperator:
             dbt_op.execute(MagicMock())
         assert not mock_defer.called
 
+    @patch(
+        
"airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_run_status",
+        return_value=DbtCloudJobRunStatus.QUEUED.value,
+    )
+    
@patch("airflow.providers.dbt.cloud.operators.dbt.DbtCloudRunJobOperator.defer")
+    @patch("airflow.providers.dbt.cloud.operators.dbt.DbtCloudRunJobTrigger")
+    @patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_connection")
+    @patch(
+        "airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.trigger_job_run",
+        return_value=mock_response_json(DEFAULT_ACCOUNT_JOB_RUN_RESPONSE),
+    )
+    def test_execute_deferrable_does_not_pass_execution_timeout_to_defer(
+        self,
+        mock_trigger_job_run,
+        mock_dbt_hook,
+        mock_dbt_trigger,
+        mock_defer,
+        mock_job_run_status,
+    ):
+        dbt_op = DbtCloudRunJobOperator(
+            dbt_cloud_conn_id=ACCOUNT_ID_CONN,
+            task_id=TASK_ID,
+            job_id=JOB_ID,
+            check_interval=1,
+            timeout=3,
+            dag=self.dag,
+            deferrable=True,
+            execution_timeout=timedelta(seconds=3),
+        )
+
+        dbt_op.execute(MagicMock())
+
+        # Explicitly pass timeout=None to defer() so Airflow's framework-level
+        # deferred timeout handling does not raise TaskDeferredTimeout before
+        # execute_complete() can perform dbt job cancellation.
+        mock_defer.assert_called_once_with(
+            method_name="execute_complete",
+            trigger=mock_dbt_trigger.return_value,
+            timeout=None,
+        )
+
+        # The dbt trigger should still receive the calculated execution 
deadline
+        # used for dbt job cancellation handling within execute_complete().
+        mock_dbt_trigger.assert_called_once_with(
+            conn_id=ACCOUNT_ID_CONN,
+            run_id=5555,
+            end_time=ANY,
+            execution_deadline=ANY,
+            account_id=None,
+            poll_interval=1,
+        )
+
     @pytest.mark.parametrize(
         "status",
         (

Reply via email to