This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 59fc9e284dd Skip asset-change registration for tasks with no outlets 
(#68687)
59fc9e284dd is described below

commit 59fc9e284dd7d190b6e2dfed0e27daec2644a16a
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Jun 18 08:40:21 2026 -0700

    Skip asset-change registration for tasks with no outlets (#68687)
    
    TaskInstance.register_asset_changes_in_db runs on every task-success
    state transition. For a task that declares no outlet assets and emits no
    outlet events -- the common case -- the method still executed its body
    and issued an AssetModel SELECT with empty IN () clauses before doing
    nothing useful. Return early when both task_outlets and outlet_events are
    empty, avoiding that round-trip on the hot path that gates scheduling the
    next task.
    
    Signed-off-by: Daniel Standish <[email protected]>
    Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
 airflow-core/src/airflow/models/taskinstance.py     |  8 ++++++++
 airflow-core/tests/unit/models/test_taskinstance.py | 18 ++++++++++++++++++
 2 files changed, 26 insertions(+)

diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index bbf4b0db923..5388b4b6b48 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1524,6 +1524,14 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
         *,
         session: Session = NEW_SESSION,
     ) -> None:
+        # Fast path: a task with no outlets and no outlet events has nothing to
+        # register. Returning early avoids the AssetModel lookup below (which
+        # would run with empty IN () clauses) and all downstream work. This is
+        # the common case -- most tasks declare no outlets -- and it sits on 
the
+        # task-success path that gates scheduling the next task.
+        if not task_outlets and not outlet_events:
+            return
+
         from airflow.serialization.definitions.assets import (
             SerializedAsset,
             SerializedAssetNameRef,
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py 
b/airflow-core/tests/unit/models/test_taskinstance.py
index 4e3fc709028..97ec0242235 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -113,6 +113,7 @@ from airflow.utils.state import DagRunState, State, 
TaskInstanceState
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
 from tests_common.test_utils import db
+from tests_common.test_utils.asserts import assert_queries_count
 from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.db import clear_db_runs
 from tests_common.test_utils.mock_operators import MockOperator
@@ -3548,6 +3549,23 @@ def 
test_find_relevant_relatives_with_non_mapped_task_as_tuple(dag_maker, sessio
     assert result == {"t1"}
 
 
+def test_register_asset_changes_in_db_no_outlets_is_a_noop(dag_maker, session):
+    """A task with no outlets and no outlet events must not issue any 
queries."""
+    with dag_maker(dag_id="no_outlets", schedule=None, session=session):
+        EmptyOperator(task_id="hi")
+    dr = dag_maker.create_dagrun(session=session)
+    [ti] = dr.get_task_instances(session=session)
+    session.commit()
+
+    with assert_queries_count(0):
+        TaskInstance.register_asset_changes_in_db(
+            ti=ti,
+            task_outlets=[],
+            outlet_events=[],
+            session=session,
+        )
+
+
 def test_when_dag_run_has_partition_then_asset_does(dag_maker, session):
     asset = Asset(name="hello")
     with dag_maker(dag_id="asset_event_tester", schedule=PartitionAtRuntime()) 
as dag:

Reply via email to