This is an automated email from the ASF dual-hosted git repository.

kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 508ced8e82d Fix TriggerDagRunOperator fail_when_dag_is_paused on 
Airflow 3.2+ (#67726)
508ced8e82d is described below

commit 508ced8e82db10c786b511b5b4e73f5f1748fb6f
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri May 29 19:08:49 2026 +0100

    Fix TriggerDagRunOperator fail_when_dag_is_paused on Airflow 3.2+ (#67726)
    
    On Airflow 3.x the paused check called DagModel.get_current(), which
    accesses the ORM directly and is blocked from task execution, so
    fail_when_dag_is_paused=True raised "Direct database access via the ORM
    is not allowed". It was previously stubbed to raise NotImplementedError
    on Airflow 3.x.
    
    Resolve the target DAG's paused state through the task-SDK get_dag()
    accessor (backed by the GetDag execution-API endpoint) and raise
    DagIsPaused when paused. This works on Airflow 3.2.0+, where the endpoint
    and accessor exist. On Airflow 3.0/3.1 the operator still fails fast at
    construction with a clearer NotImplementedError pointing to the 3.2.0
    requirement. The Airflow 2.x path is unchanged.
    
    Fixes #56954.
---
 .../providers/standard/operators/trigger_dagrun.py | 37 ++++++++++-----
 .../unit/standard/operators/test_trigger_dagrun.py | 54 ++++++++++++++++++++--
 2 files changed, 75 insertions(+), 16 deletions(-)

diff --git 
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py 
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
index 73884433af5..4aca1bfc4c1 100644
--- 
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
+++ 
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
@@ -43,7 +43,12 @@ from airflow.providers.common.compat.sdk import (
 )
 from airflow.providers.standard.triggers.external_task import DagStateTrigger
 from airflow.providers.standard.utils.openlineage import 
safe_inject_openlineage_properties_into_dagrun_conf
-from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, 
BaseOperator, is_arg_set
+from airflow.providers.standard.version_compat import (
+    AIRFLOW_V_3_0_PLUS,
+    AIRFLOW_V_3_2_PLUS,
+    BaseOperator,
+    is_arg_set,
+)
 from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunType
 
@@ -145,7 +150,9 @@ class TriggerDagRunOperator(BaseOperator):
         Default is ``[DagRunState.FAILED]``.
     :param skip_when_already_exists: Set to true to mark the task as SKIPPED 
if a DAG run of the triggered
         DAG for the same logical date already exists.
-    :param fail_when_dag_is_paused: If the dag to trigger is paused, 
DagIsPaused will be raised.
+    :param fail_when_dag_is_paused: If the dag to trigger is paused, 
DagIsPaused will be raised. On
+        Airflow 3.x this requires Airflow 3.2.0+ (it relies on the task-SDK 
DAG state endpoint added then);
+        on Airflow 3.0/3.1 setting this raises ``NotImplementedError``.
     :param deferrable: If waiting for completion, whether to defer the task 
until done, default is ``False``.
     :param openlineage_inject_parent_info: whether to include OpenLineage 
metadata about the parent task
         in the triggered DAG run's conf, enabling improved lineage tracking. 
The metadata is only injected
@@ -218,8 +225,11 @@ class TriggerDagRunOperator(BaseOperator):
         run_after = _validate_datetime_param("run_after", run_after)
         self.logical_date = logical_date
         self.run_after = run_after
-        if fail_when_dag_is_paused and AIRFLOW_V_3_0_PLUS:
-            raise NotImplementedError("Setting `fail_when_dag_is_paused` not 
yet supported for Airflow 3.x")
+        if fail_when_dag_is_paused and AIRFLOW_V_3_0_PLUS and not 
AIRFLOW_V_3_2_PLUS:
+            raise NotImplementedError(
+                "Setting `fail_when_dag_is_paused` requires Airflow 3.2.0+ on 
Airflow 3.x "
+                "(it relies on the task-SDK DAG state endpoint added in 
3.2.0)."
+            )
 
     def execute(self, context: Context):
         if self.logical_date is NOTSET:
@@ -267,14 +277,17 @@ class TriggerDagRunOperator(BaseOperator):
         self.trigger_run_id = run_id
 
         if self.fail_when_dag_is_paused:
-            dag_model = DagModel.get_current(self.trigger_dag_id)
-            if not dag_model:
-                raise ValueError(f"Dag {self.trigger_dag_id} is not found")
-            if dag_model.is_paused:
-                # TODO: enable this when dag state endpoint available from 
task sdk
-                # if AIRFLOW_V_3_0_PLUS:
-                #     raise DagIsPaused(dag_id=self.trigger_dag_id)
-                raise AirflowException(f"Dag {self.trigger_dag_id} is paused")
+            if AIRFLOW_V_3_0_PLUS:
+                # Tasks cannot access the ORM directly in Airflow 3.x; fetch 
the DAG state via the
+                # task-SDK supervisor (GetDag execution-API endpoint, 
available from Airflow 3.2.0).
+                if context["ti"].get_dag(self.trigger_dag_id).is_paused:
+                    raise DagIsPaused(dag_id=self.trigger_dag_id)
+            else:
+                dag_model = DagModel.get_current(self.trigger_dag_id)
+                if not dag_model:
+                    raise ValueError(f"Dag {self.trigger_dag_id} is not found")
+                if dag_model.is_paused:
+                    raise AirflowException(f"Dag {self.trigger_dag_id} is 
paused")
 
         if AIRFLOW_V_3_0_PLUS:
             self._trigger_dag_af_3(
diff --git 
a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py 
b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
index bcc5ad6153c..f4908336ab3 100644
--- a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
+++ b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
@@ -38,7 +38,11 @@ from airflow.utils.state import DagRunState, 
TaskInstanceState
 from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.db import parse_and_sync_to_db
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, 
AIRFLOW_V_3_1_PLUS
+from tests_common.test_utils.version_compat import (
+    AIRFLOW_V_3_0_PLUS,
+    AIRFLOW_V_3_1_PLUS,
+    AIRFLOW_V_3_2_PLUS,
+)
 
 if AIRFLOW_V_3_0_PLUS:
     from airflow.providers.common.compat.sdk import DagRunTriggerException
@@ -332,10 +336,13 @@ class TestDagRunOperator:
                 {}, {"dag_id": "dag_id", "run_ids": ["run_id_1"], 
"poll_interval": 15, "run_id_1": "success"}
             )
 
-    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is 
different for Airflow 2 & 3")
-    def test_trigger_dag_run_with_fail_when_dag_is_paused_should_fail(self):
+    @pytest.mark.skipif(
+        not (AIRFLOW_V_3_0_PLUS and not AIRFLOW_V_3_2_PLUS),
+        reason="On Airflow 3.0/3.1 the worker cannot resolve the DAG paused 
state",
+    )
+    def 
test_trigger_dag_run_with_fail_when_dag_is_paused_should_fail_below_3_2(self):
         with pytest.raises(
-            NotImplementedError, match="Setting `fail_when_dag_is_paused` not 
yet supported for Airflow 3.x"
+            NotImplementedError, match="Setting `fail_when_dag_is_paused` 
requires Airflow 3.2.0"
         ):
             TriggerDagRunOperator(
                 task_id="test_task",
@@ -344,6 +351,45 @@ class TestDagRunOperator:
                 fail_when_dag_is_paused=True,
             )
 
+    @pytest.mark.skipif(
+        not AIRFLOW_V_3_2_PLUS, reason="Needs the task-SDK GetDag endpoint 
added in Airflow 3.2.0"
+    )
+    def test_trigger_dagrun_fails_when_target_dag_is_paused(self):
+        from airflow.providers.standard.operators.trigger_dagrun import 
DagIsPaused
+
+        task = TriggerDagRunOperator(
+            task_id="test_task",
+            trigger_dag_id=TRIGGERED_DAG_ID,
+            fail_when_dag_is_paused=True,
+            openlineage_inject_parent_info=False,
+        )
+        mock_ti = mock.MagicMock()
+        mock_ti.get_dag.return_value.is_paused = True
+
+        with pytest.raises(DagIsPaused, match=f"Dag {TRIGGERED_DAG_ID} is 
paused"):
+            task.execute(context={"ti": mock_ti})
+
+        mock_ti.get_dag.assert_called_once_with(TRIGGERED_DAG_ID)
+
+    @pytest.mark.skipif(
+        not AIRFLOW_V_3_2_PLUS, reason="Needs the task-SDK GetDag endpoint 
added in Airflow 3.2.0"
+    )
+    def test_trigger_dagrun_proceeds_when_target_dag_is_not_paused(self):
+        task = TriggerDagRunOperator(
+            task_id="test_task",
+            trigger_dag_id=TRIGGERED_DAG_ID,
+            fail_when_dag_is_paused=True,
+            openlineage_inject_parent_info=False,
+        )
+        mock_ti = mock.MagicMock()
+        mock_ti.get_dag.return_value.is_paused = False
+
+        with pytest.raises(DagRunTriggerException) as exc_info:
+            task.execute(context={"ti": mock_ti})
+
+        assert exc_info.value.trigger_dag_id == TRIGGERED_DAG_ID
+        mock_ti.get_dag.assert_called_once_with(TRIGGERED_DAG_ID)
+
     @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is 
different for Airflow 2 & 3")
     def test_trigger_dagrun_with_str_conf(self):
         """

Reply via email to