This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e5064705cb6 Add separate property to decide if task is to be scheduled
or marked straight as success (#56039)
e5064705cb6 is described below
commit e5064705cb664c4be27d6f134301f783c2d279a6
Author: Kacper Muda <[email protected]>
AuthorDate: Tue Oct 28 14:40:10 2025 +0100
Add separate property to decide if task is to be scheduled or marked
straight as success (#56039)
The check whether or not to schedule a task is now done within schedule_tis
method, I think it would be beneficial to extract that logic into a separate
public method.
If task is not being scheduled, the listeners are not called for it - and
some listeners would like to know that ahead of time, which would be easier if
there was a separate method for that decision. Right now the OL listener was
re-creating the logic from the Airflow core, but it became complex and hard to
keep up with. I'll have another PR for the OpenLineage provider to check for
existence of this new is_schedulable_task method, and call it instead.
---
airflow-core/src/airflow/models/dagrun.py | 18 ++------
airflow-core/src/airflow/models/taskinstance.py | 35 ++++++++++++++
.../tests/unit/models/test_taskinstance.py | 42 +++++++++++++++++
.../unit/serialization/test_dag_serialization.py | 54 ++++++++++++++++++++++
devel-common/src/tests_common/pytest_plugin.py | 6 +++
5 files changed, 142 insertions(+), 13 deletions(-)
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index 43d7f3803bd..8d6a03756e2 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -2014,7 +2014,8 @@ class DagRun(Base, LoggingMixin):
Each element of ``schedulable_tis`` should have its ``task`` attribute
already set.
- Any EmptyOperator without callbacks or outlets is instead set straight
to the success state.
+ Any EmptyOperator without ``on_execute_callback`` or
``on_success_callback`` or ``inlets`` or
+ ``outlets`` is instead set straight to the success state, without
execution.
All the TIs should belong to this DagRun, but this code is in the
hot-path, this is not checked -- it
is the caller's responsibility to call this function only with TIs
from a single dag run.
@@ -2024,17 +2025,8 @@ class DagRun(Base, LoggingMixin):
empty_ti_ids: list[str] = []
schedulable_ti_ids: list[str] = []
for ti in schedulable_tis:
- task = ti.task
- if TYPE_CHECKING:
- assert isinstance(task, Operator)
- if (
- task.inherits_from_empty_operator
- and not task.has_on_execute_callback
- and not task.has_on_success_callback
- and not task.outlets
- and not task.inlets
- ):
- empty_ti_ids.append(ti.id)
+ if ti.is_schedulable:
+ schedulable_ti_ids.append(ti.id)
# Check "start_trigger_args" to see whether the operator supports
# start execution from triggerer. If so, we'll check
"start_from_trigger"
# to see whether this feature is turned on and defer this task.
@@ -2053,7 +2045,7 @@ class DagRun(Base, LoggingMixin):
# else:
# schedulable_ti_ids.append(ti.id)
else:
- schedulable_ti_ids.append(ti.id)
+ empty_ti_ids.append(ti.id)
count = 0
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 3fbb937f773..0a080ca4ca7 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -2183,6 +2183,41 @@ class TaskInstance(Base, LoggingMixin):
}
)
+ @property
+ def is_schedulable(self):
+ """Determine if the task_instance should be scheduled or
short-circuited to ``success``."""
+ return self.is_task_schedulable(self.task)
+
+ @staticmethod
+ def is_task_schedulable(task: Operator) -> bool:
+ """
+ Determine if the task should be scheduled instead of being
short-circuited to ``success``.
+
+ A task requires scheduling if it is not a trivial EmptyOperator, i.e.
one of the
+ following conditions holds:
+
+ * it does **not** inherit from ``EmptyOperator``
+ * it defines an ``on_execute_callback``
+ * it defines an ``on_success_callback``
+ * it declares any ``outlets``
+ * it declares any ``inlets``
+
+ If none of these are true, the task is considered empty and is
immediately marked
+ successful without being scheduled.
+
+ Note: keeping this check as a separate public method is important so
it can also be used
+ by listeners (when a task is not scheduled, listeners are never
called). For example,
+ the OpenLineage listener checks all tasks at DAG start, and using this
method lets
+ it consistently determine whether the listener will run for each task.
+ """
+ return bool(
+ not task.inherits_from_empty_operator
+ or task.has_on_execute_callback
+ or task.has_on_success_callback
+ or task.outlets
+ or task.inlets
+ )
+
def _find_common_ancestor_mapped_group(node1: Operator, node2: Operator) ->
SerializedTaskGroup | None:
"""Given two operators, find their innermost common mapped task group."""
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py
b/airflow-core/tests/unit/models/test_taskinstance.py
index d2ddd32848b..3a166e14330 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -1568,6 +1568,48 @@ class TestTaskInstance:
expected_url =
"http://localhost:8080/dags/my_dag/runs/test/tasks/op/mapped/1?try_number=2"
assert ti.log_url == expected_url
+ @pytest.mark.parametrize(
+ "kwargs",
+ [
+ {"inlets": [Asset(uri="file://some.txt")]},
+ {"outlets": [Asset(uri="file://some.txt")]},
+ {"on_success_callback": lambda *args, **kwargs: None},
+ {"on_execute_callback": lambda *args, **kwargs: None},
+ ],
+ )
+ def test_is_schedulable_task_empty_operator_evaluates_true(self, kwargs,
create_task_instance):
+ ti = create_task_instance(
+ dag_id="my_dag", task_id="op",
logical_date=timezone.datetime(2018, 1, 1), **kwargs
+ )
+ assert ti.is_schedulable
+
+ @pytest.mark.parametrize(
+ "kwargs",
+ [
+ {},
+ {"on_failure_callback": lambda *args, **kwargs: None},
+ {"on_skipped_callback": lambda *args, **kwargs: None},
+ {"on_retry_callback": lambda *args, **kwargs: None},
+ ],
+ )
+ def test_is_schedulable_task_empty_operator_evaluates_false(self, kwargs,
create_task_instance):
+ ti = create_task_instance(
+ dag_id="my_dag", task_id="op",
logical_date=timezone.datetime(2018, 1, 1), **kwargs
+ )
+ assert not ti.is_schedulable
+
+ def test_is_schedulable_task_non_empty_operator(self):
+ dag = DAG(dag_id="test_dag")
+
+ regular_task = BashOperator(task_id="regular", bash_command="echo
test", dag=dag)
+ mapped_task = BashOperator.partial(task_id="mapped",
dag=dag).expand(bash_command=["echo 1"])
+
+ regular_ti = TaskInstance(task=regular_task,
dag_version_id=mock.MagicMock())
+ mapped_ti = TaskInstance(task=mapped_task,
dag_version_id=mock.MagicMock())
+
+ assert regular_ti.is_schedulable
+ assert mapped_ti.is_schedulable
+
def test_mark_success_url(self, create_task_instance):
now = pendulum.now("Europe/Brussels")
ti = create_task_instance(dag_id="dag", task_id="op", logical_date=now)
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 0d070bfbdef..9c2926bd342 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -56,6 +56,7 @@ from airflow.exceptions import (
from airflow.models.asset import AssetModel
from airflow.models.connection import Connection
from airflow.models.mappedoperator import MappedOperator
+from airflow.models.taskinstance import TaskInstance as TI
from airflow.models.xcom import XCOM_RETURN_KEY, XComModel
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.providers.standard.operators.bash import BashOperator
@@ -3820,6 +3821,59 @@ def
test_task_callback_boolean_optimization(callback_config, expected_flags, is_
assert getattr(deserialized, flag) is expected
[email protected](
+ "kwargs",
+ [
+ {"inlets": [Asset(uri="file://some.txt")]},
+ {"outlets": [Asset(uri="file://some.txt")]},
+ {"on_success_callback": lambda *args, **kwargs: None},
+ {"on_execute_callback": lambda *args, **kwargs: None},
+ ],
+)
+def test_is_schedulable_task_empty_operator_evaluates_true(kwargs):
+ from airflow.providers.standard.operators.empty import EmptyOperator
+
+ dag = DAG(dag_id="test_dag")
+ task = EmptyOperator(task_id="empty_task", dag=dag, **kwargs)
+
+ serialized_task =
BaseSerialization.deserialize(BaseSerialization.serialize(task))
+
+ assert TI.is_task_schedulable(serialized_task)
+
+
[email protected](
+ "kwargs",
+ [
+ {},
+ {"on_failure_callback": lambda *args, **kwargs: None},
+ {"on_skipped_callback": lambda *args, **kwargs: None},
+ {"on_retry_callback": lambda *args, **kwargs: None},
+ ],
+)
+def test_is_schedulable_task_empty_operator_evaluates_false(kwargs):
+ from airflow.providers.standard.operators.empty import EmptyOperator
+
+ dag = DAG(dag_id="test_dag")
+ task = EmptyOperator(task_id="empty_task", dag=dag, **kwargs)
+
+ serialized_task =
BaseSerialization.deserialize(BaseSerialization.serialize(task))
+
+ assert not TI.is_task_schedulable(serialized_task)
+
+
+def test_is_schedulable_task_non_empty_operator():
+ dag = DAG(dag_id="test_dag")
+
+ regular_task = BashOperator(task_id="regular", bash_command="echo test",
dag=dag)
+ mapped_task = BashOperator.partial(task_id="mapped",
dag=dag).expand(bash_command=["echo 1"])
+
+ serialized_regular =
BaseSerialization.deserialize(BaseSerialization.serialize(regular_task))
+ serialized_mapped =
BaseSerialization.deserialize(BaseSerialization.serialize(mapped_task))
+
+ assert TI.is_task_schedulable(serialized_regular)
+ assert TI.is_task_schedulable(serialized_mapped)
+
+
def test_task_callback_properties_exist():
"""Test that all callback boolean properties exist on both regular and
mapped operators."""
dag = DAG(dag_id="test_dag")
diff --git a/devel-common/src/tests_common/pytest_plugin.py
b/devel-common/src/tests_common/pytest_plugin.py
index 8047c383fb7..c800df50643 100644
--- a/devel-common/src/tests_common/pytest_plugin.py
+++ b/devel-common/src/tests_common/pytest_plugin.py
@@ -1487,6 +1487,9 @@ def create_task_instance(dag_maker: DagMaker,
create_dummy_dag: CreateDummyDAG)
on_execute_callback=None,
on_failure_callback=None,
on_retry_callback=None,
+ on_skipped_callback=None,
+ inlets=None,
+ outlets=None,
email=None,
map_index=-1,
hostname=None,
@@ -1512,6 +1515,9 @@ def create_task_instance(dag_maker: DagMaker,
create_dummy_dag: CreateDummyDAG)
on_execute_callback=on_execute_callback,
on_failure_callback=on_failure_callback,
on_retry_callback=on_retry_callback,
+ on_skipped_callback=on_skipped_callback,
+ inlets=inlets,
+ outlets=outlets,
email=email,
pool=pool,
trigger_rule=trigger_rule,