This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 59fc9e284dd Skip asset-change registration for tasks with no outlets
(#68687)
59fc9e284dd is described below
commit 59fc9e284dd7d190b6e2dfed0e27daec2644a16a
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Jun 18 08:40:21 2026 -0700
Skip asset-change registration for tasks with no outlets (#68687)
TaskInstance.register_asset_changes_in_db runs on every task-success
state transition. For a task that declares no outlet assets and emits no
outlet events -- the common case -- the method still executed its body
and issued an AssetModel SELECT with empty IN () clauses before doing
nothing useful. Return early when both task_outlets and outlet_events are
empty, avoiding that round-trip on the hot path that gates scheduling the
next task.
Signed-off-by: Daniel Standish <[email protected]>
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
airflow-core/src/airflow/models/taskinstance.py | 8 ++++++++
airflow-core/tests/unit/models/test_taskinstance.py | 18 ++++++++++++++++++
2 files changed, 26 insertions(+)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index bbf4b0db923..5388b4b6b48 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1524,6 +1524,14 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
*,
session: Session = NEW_SESSION,
) -> None:
+ # Fast path: a task with no outlets and no outlet events has nothing to
+ # register. Returning early avoids the AssetModel lookup below (which
+ # would run with empty IN () clauses) and all downstream work. This is
+ # the common case -- most tasks declare no outlets -- and it sits on
the
+ # task-success path that gates scheduling the next task.
+ if not task_outlets and not outlet_events:
+ return
+
from airflow.serialization.definitions.assets import (
SerializedAsset,
SerializedAssetNameRef,
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py
b/airflow-core/tests/unit/models/test_taskinstance.py
index 4e3fc709028..97ec0242235 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -113,6 +113,7 @@ from airflow.utils.state import DagRunState, State,
TaskInstanceState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests_common.test_utils import db
+from tests_common.test_utils.asserts import assert_queries_count
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_runs
from tests_common.test_utils.mock_operators import MockOperator
@@ -3548,6 +3549,23 @@ def
test_find_relevant_relatives_with_non_mapped_task_as_tuple(dag_maker, sessio
assert result == {"t1"}
+def test_register_asset_changes_in_db_no_outlets_is_a_noop(dag_maker, session):
+ """A task with no outlets and no outlet events must not issue any
queries."""
+ with dag_maker(dag_id="no_outlets", schedule=None, session=session):
+ EmptyOperator(task_id="hi")
+ dr = dag_maker.create_dagrun(session=session)
+ [ti] = dr.get_task_instances(session=session)
+ session.commit()
+
+ with assert_queries_count(0):
+ TaskInstance.register_asset_changes_in_db(
+ ti=ti,
+ task_outlets=[],
+ outlet_events=[],
+ session=session,
+ )
+
+
def test_when_dag_run_has_partition_then_asset_does(dag_maker, session):
asset = Asset(name="hello")
with dag_maker(dag_id="asset_event_tester", schedule=PartitionAtRuntime())
as dag: