This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 24e682c593c Fix assets used only as inlets being incorrectly orphaned 
(#58303) (#58368)
24e682c593c is described below

commit 24e682c593ce81a92917488f5c9a12e8eff9d471
Author: Mykola Shyshov <[email protected]>
AuthorDate: Wed Dec 3 03:13:22 2025 +0200

    Fix assets used only as inlets being incorrectly orphaned (#58303) (#58368)
---
 airflow-core/src/airflow/jobs/scheduler_job_runner.py | 15 +++++++++++----
 airflow-core/tests/unit/jobs/test_scheduler_job.py    |  8 ++++++--
 devel-common/src/tests_common/test_utils/db.py        |  4 ++++
 3 files changed, 21 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index b0a8ef53f9f..87ed4646774 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -62,6 +62,7 @@ from airflow.models.asset import (
     AssetWatcherModel,
     DagScheduleAssetAliasReference,
     DagScheduleAssetReference,
+    TaskInletAssetReference,
     TaskOutletAssetReference,
 )
 from airflow.models.backfill import Backfill
@@ -2733,20 +2734,26 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         """
         Check assets orphanization and update their active entry.
 
-        An orphaned asset is no longer referenced in any DAG schedule 
parameters
-        or task outlets. Active assets (non-orphaned) have entries in 
AssetActive
-        and must have unique names and URIs.
+        An orphaned asset is no longer referenced in any DAG schedule 
parameters,
+        task outlets, or task inlets. Active assets (non-orphaned) have 
entries in
+        AssetActive and must have unique names and URIs.
 
         :seealso: :meth:`AssetModelOperation.activate_assets_if_possible`.
         """
         # Group assets into orphaned=True and orphaned=False groups.
         orphaned = (
-            (func.count(DagScheduleAssetReference.dag_id) + 
func.count(TaskOutletAssetReference.dag_id)) == 0
+            (
+                func.count(DagScheduleAssetReference.dag_id)
+                + func.count(TaskOutletAssetReference.dag_id)
+                + func.count(TaskInletAssetReference.dag_id)
+            )
+            == 0
         ).label("orphaned")
         asset_reference_query = session.execute(
             select(orphaned, AssetModel)
             .outerjoin(DagScheduleAssetReference)
             .outerjoin(TaskOutletAssetReference)
+            .outerjoin(TaskInletAssetReference)
             .group_by(AssetModel.id)
             .order_by(orphaned)
         )
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 0bfd3128fe0..7779e3b519d 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -6823,18 +6823,22 @@ class TestSchedulerJob:
         asset3 = Asset(uri="test://asset_3", name="test_asset_3", 
group="test_group")
         asset4 = Asset(uri="test://asset_4", name="test_asset_4", 
group="test_group")
         asset5 = Asset(uri="test://asset_5", name="test_asset_5", 
group="test_group")
+        asset6 = Asset(uri="test://asset_5", name="test_asset_5", 
group="test_group")
 
         with dag_maker(dag_id="assets-1", schedule=[asset1, asset2], 
session=session):
-            BashOperator(task_id="task", bash_command="echo 1", 
outlets=[asset3, asset4])
+            BashOperator(task_id="task", bash_command="echo 1", 
outlets=[asset3, asset4], inlets=[asset6])
 
         # asset5 is not registered (since it's not used anywhere).
         orphaned, active = self._find_assets_activation(session)
-        assert active == [asset1, asset2, asset3, asset4]
+        assert active == [asset1, asset2, asset3, asset4, asset6]
         assert orphaned == []
 
         self.job_runner._update_asset_orphanage(session=session)
         session.flush()
 
+        assert active == [asset1, asset2, asset3, asset4, asset6]
+        assert orphaned == []
+
         # Now remove 2 asset references and add asset5.
         with dag_maker(dag_id="assets-1", schedule=[asset1], session=session):
             BashOperator(task_id="task", bash_command="echo 1", 
outlets=[asset3, asset5])
diff --git a/devel-common/src/tests_common/test_utils/db.py 
b/devel-common/src/tests_common/test_utils/db.py
index 12a8edb40b6..89a79f91216 100644
--- a/devel-common/src/tests_common/test_utils/db.py
+++ b/devel-common/src/tests_common/test_utils/db.py
@@ -227,6 +227,10 @@ def clear_db_assets():
         session.query(AssetDagRunQueue).delete()
         session.query(DagScheduleAssetReference).delete()
         session.query(TaskOutletAssetReference).delete()
+        if AIRFLOW_V_3_1_PLUS:
+            from airflow.models.asset import TaskInletAssetReference
+
+            session.query(TaskInletAssetReference).delete()
         from tests_common.test_utils.compat import AssetAliasModel, 
DagScheduleAssetAliasReference
 
         session.query(AssetAliasModel).delete()

Reply via email to