This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e5064705cb6 Add separate property to decide if task is to be scheduled 
or marked straight as success (#56039)
e5064705cb6 is described below

commit e5064705cb664c4be27d6f134301f783c2d279a6
Author: Kacper Muda <[email protected]>
AuthorDate: Tue Oct 28 14:40:10 2025 +0100

    Add separate property to decide if task is to be scheduled or marked 
straight as success (#56039)
    
    The check whether or not to schedule a task is now done within schedule_tis 
method, I think it would be beneficial to extract that logic into a separate 
public method.
    
    If task is not being scheduled, the listeners are not called for it - and 
some listeners would like to know that ahead of time, which would be easier if 
there was a separate method for that decision. Right now the OL listener was 
re-creating the logic from the Airflow core, but it became complex and hard to 
keep up with. I'll have another PR for the OpenLineage provider to check for 
existence of this new is_schedulable_task method, and call it instead.
---
 airflow-core/src/airflow/models/dagrun.py          | 18 ++------
 airflow-core/src/airflow/models/taskinstance.py    | 35 ++++++++++++++
 .../tests/unit/models/test_taskinstance.py         | 42 +++++++++++++++++
 .../unit/serialization/test_dag_serialization.py   | 54 ++++++++++++++++++++++
 devel-common/src/tests_common/pytest_plugin.py     |  6 +++
 5 files changed, 142 insertions(+), 13 deletions(-)

diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index 43d7f3803bd..8d6a03756e2 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -2014,7 +2014,8 @@ class DagRun(Base, LoggingMixin):
 
         Each element of ``schedulable_tis`` should have its ``task`` attribute 
already set.
 
-        Any EmptyOperator without callbacks or outlets is instead set straight 
to the success state.
+        Any EmptyOperator without ``on_execute_callback`` or 
``on_success_callback`` or ``inlets`` or
+        ``outlets`` is instead set straight to the success state, without 
execution.
 
         All the TIs should belong to this DagRun, but this code is in the 
hot-path, this is not checked -- it
         is the caller's responsibility to call this function only with TIs 
from a single dag run.
@@ -2024,17 +2025,8 @@ class DagRun(Base, LoggingMixin):
         empty_ti_ids: list[str] = []
         schedulable_ti_ids: list[str] = []
         for ti in schedulable_tis:
-            task = ti.task
-            if TYPE_CHECKING:
-                assert isinstance(task, Operator)
-            if (
-                task.inherits_from_empty_operator
-                and not task.has_on_execute_callback
-                and not task.has_on_success_callback
-                and not task.outlets
-                and not task.inlets
-            ):
-                empty_ti_ids.append(ti.id)
+            if ti.is_schedulable:
+                schedulable_ti_ids.append(ti.id)
             # Check "start_trigger_args" to see whether the operator supports
             # start execution from triggerer. If so, we'll check 
"start_from_trigger"
             # to see whether this feature is turned on and defer this task.
@@ -2053,7 +2045,7 @@ class DagRun(Base, LoggingMixin):
             #     else:
             #         schedulable_ti_ids.append(ti.id)
             else:
-                schedulable_ti_ids.append(ti.id)
+                empty_ti_ids.append(ti.id)
 
         count = 0
 
diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 3fbb937f773..0a080ca4ca7 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -2183,6 +2183,41 @@ class TaskInstance(Base, LoggingMixin):
             }
         )
 
+    @property
+    def is_schedulable(self):
+        """Determine if the task_instance should be scheduled or 
short-circuited to ``success``."""
+        return self.is_task_schedulable(self.task)
+
+    @staticmethod
+    def is_task_schedulable(task: Operator) -> bool:
+        """
+        Determine if the task should be scheduled instead of being 
short-circuited to ``success``.
+
+        A task requires scheduling if it is not a trivial EmptyOperator, i.e. 
one of the
+        following conditions holds:
+
+        * it does **not** inherit from ``EmptyOperator``
+        * it defines an ``on_execute_callback``
+        * it defines an ``on_success_callback``
+        * it declares any ``outlets``
+        * it declares any ``inlets``
+
+        If none of these are true, the task is considered empty and is 
immediately marked
+        successful without being scheduled.
+
+        Note: keeping this check as a separate public method is important so 
it can also be used
+        by listeners (when a task is not scheduled, listeners are never 
called). For example,
+        the OpenLineage listener checks all tasks at DAG start, and using this 
method lets
+        it consistently determine whether the listener will run for each task.
+        """
+        return bool(
+            not task.inherits_from_empty_operator
+            or task.has_on_execute_callback
+            or task.has_on_success_callback
+            or task.outlets
+            or task.inlets
+        )
+
 
 def _find_common_ancestor_mapped_group(node1: Operator, node2: Operator) -> 
SerializedTaskGroup | None:
     """Given two operators, find their innermost common mapped task group."""
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py 
b/airflow-core/tests/unit/models/test_taskinstance.py
index d2ddd32848b..3a166e14330 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -1568,6 +1568,48 @@ class TestTaskInstance:
         expected_url = 
"http://localhost:8080/dags/my_dag/runs/test/tasks/op/mapped/1?try_number=2";
         assert ti.log_url == expected_url
 
+    @pytest.mark.parametrize(
+        "kwargs",
+        [
+            {"inlets": [Asset(uri="file://some.txt")]},
+            {"outlets": [Asset(uri="file://some.txt")]},
+            {"on_success_callback": lambda *args, **kwargs: None},
+            {"on_execute_callback": lambda *args, **kwargs: None},
+        ],
+    )
+    def test_is_schedulable_task_empty_operator_evaluates_true(self, kwargs, 
create_task_instance):
+        ti = create_task_instance(
+            dag_id="my_dag", task_id="op", 
logical_date=timezone.datetime(2018, 1, 1), **kwargs
+        )
+        assert ti.is_schedulable
+
+    @pytest.mark.parametrize(
+        "kwargs",
+        [
+            {},
+            {"on_failure_callback": lambda *args, **kwargs: None},
+            {"on_skipped_callback": lambda *args, **kwargs: None},
+            {"on_retry_callback": lambda *args, **kwargs: None},
+        ],
+    )
+    def test_is_schedulable_task_empty_operator_evaluates_false(self, kwargs, 
create_task_instance):
+        ti = create_task_instance(
+            dag_id="my_dag", task_id="op", 
logical_date=timezone.datetime(2018, 1, 1), **kwargs
+        )
+        assert not ti.is_schedulable
+
+    def test_is_schedulable_task_non_empty_operator(self):
+        dag = DAG(dag_id="test_dag")
+
+        regular_task = BashOperator(task_id="regular", bash_command="echo 
test", dag=dag)
+        mapped_task = BashOperator.partial(task_id="mapped", 
dag=dag).expand(bash_command=["echo 1"])
+
+        regular_ti = TaskInstance(task=regular_task, 
dag_version_id=mock.MagicMock())
+        mapped_ti = TaskInstance(task=mapped_task, 
dag_version_id=mock.MagicMock())
+
+        assert regular_ti.is_schedulable
+        assert mapped_ti.is_schedulable
+
     def test_mark_success_url(self, create_task_instance):
         now = pendulum.now("Europe/Brussels")
         ti = create_task_instance(dag_id="dag", task_id="op", logical_date=now)
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 0d070bfbdef..9c2926bd342 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -56,6 +56,7 @@ from airflow.exceptions import (
 from airflow.models.asset import AssetModel
 from airflow.models.connection import Connection
 from airflow.models.mappedoperator import MappedOperator
+from airflow.models.taskinstance import TaskInstance as TI
 from airflow.models.xcom import XCOM_RETURN_KEY, XComModel
 from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
 from airflow.providers.standard.operators.bash import BashOperator
@@ -3820,6 +3821,59 @@ def 
test_task_callback_boolean_optimization(callback_config, expected_flags, is_
             assert getattr(deserialized, flag) is expected
 
 
[email protected](
+    "kwargs",
+    [
+        {"inlets": [Asset(uri="file://some.txt")]},
+        {"outlets": [Asset(uri="file://some.txt")]},
+        {"on_success_callback": lambda *args, **kwargs: None},
+        {"on_execute_callback": lambda *args, **kwargs: None},
+    ],
+)
+def test_is_schedulable_task_empty_operator_evaluates_true(kwargs):
+    from airflow.providers.standard.operators.empty import EmptyOperator
+
+    dag = DAG(dag_id="test_dag")
+    task = EmptyOperator(task_id="empty_task", dag=dag, **kwargs)
+
+    serialized_task = 
BaseSerialization.deserialize(BaseSerialization.serialize(task))
+
+    assert TI.is_task_schedulable(serialized_task)
+
+
[email protected](
+    "kwargs",
+    [
+        {},
+        {"on_failure_callback": lambda *args, **kwargs: None},
+        {"on_skipped_callback": lambda *args, **kwargs: None},
+        {"on_retry_callback": lambda *args, **kwargs: None},
+    ],
+)
+def test_is_schedulable_task_empty_operator_evaluates_false(kwargs):
+    from airflow.providers.standard.operators.empty import EmptyOperator
+
+    dag = DAG(dag_id="test_dag")
+    task = EmptyOperator(task_id="empty_task", dag=dag, **kwargs)
+
+    serialized_task = 
BaseSerialization.deserialize(BaseSerialization.serialize(task))
+
+    assert not TI.is_task_schedulable(serialized_task)
+
+
+def test_is_schedulable_task_non_empty_operator():
+    dag = DAG(dag_id="test_dag")
+
+    regular_task = BashOperator(task_id="regular", bash_command="echo test", 
dag=dag)
+    mapped_task = BashOperator.partial(task_id="mapped", 
dag=dag).expand(bash_command=["echo 1"])
+
+    serialized_regular = 
BaseSerialization.deserialize(BaseSerialization.serialize(regular_task))
+    serialized_mapped = 
BaseSerialization.deserialize(BaseSerialization.serialize(mapped_task))
+
+    assert TI.is_task_schedulable(serialized_regular)
+    assert TI.is_task_schedulable(serialized_mapped)
+
+
 def test_task_callback_properties_exist():
     """Test that all callback boolean properties exist on both regular and 
mapped operators."""
     dag = DAG(dag_id="test_dag")
diff --git a/devel-common/src/tests_common/pytest_plugin.py 
b/devel-common/src/tests_common/pytest_plugin.py
index 8047c383fb7..c800df50643 100644
--- a/devel-common/src/tests_common/pytest_plugin.py
+++ b/devel-common/src/tests_common/pytest_plugin.py
@@ -1487,6 +1487,9 @@ def create_task_instance(dag_maker: DagMaker, 
create_dummy_dag: CreateDummyDAG)
         on_execute_callback=None,
         on_failure_callback=None,
         on_retry_callback=None,
+        on_skipped_callback=None,
+        inlets=None,
+        outlets=None,
         email=None,
         map_index=-1,
         hostname=None,
@@ -1512,6 +1515,9 @@ def create_task_instance(dag_maker: DagMaker, 
create_dummy_dag: CreateDummyDAG)
                 on_execute_callback=on_execute_callback,
                 on_failure_callback=on_failure_callback,
                 on_retry_callback=on_retry_callback,
+                on_skipped_callback=on_skipped_callback,
+                inlets=inlets,
+                outlets=outlets,
                 email=email,
                 pool=pool,
                 trigger_rule=trigger_rule,

Reply via email to