This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 5e0d53a643d [v3-1-test] Fix assets used only as inlets being 
incorrectly orphaned (#58303) (#58368) (#58986)
5e0d53a643d is described below

commit 5e0d53a643d3fa15456b7be7b411dd2e83fa9389
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Dec 3 10:22:04 2025 +0800

    [v3-1-test] Fix assets used only as inlets being incorrectly orphaned 
(#58303) (#58368) (#58986)
    
    Co-authored-by: Mykola Shyshov <[email protected]>
---
 airflow-core/src/airflow/jobs/scheduler_job_runner.py | 15 +++++++++++----
 airflow-core/tests/unit/jobs/test_scheduler_job.py    |  8 ++++++--
 devel-common/src/tests_common/test_utils/db.py        |  4 ++++
 3 files changed, 21 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index cc2968c8d82..10c511e8bb4 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -61,6 +61,7 @@ from airflow.models.asset import (
     AssetModel,
     DagScheduleAssetAliasReference,
     DagScheduleAssetReference,
+    TaskInletAssetReference,
     TaskOutletAssetReference,
     asset_trigger_association_table,
 )
@@ -2554,20 +2555,26 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         """
         Check assets orphanization and update their active entry.
 
-        An orphaned asset is no longer referenced in any DAG schedule 
parameters
-        or task outlets. Active assets (non-orphaned) have entries in 
AssetActive
-        and must have unique names and URIs.
+        An orphaned asset is no longer referenced in any DAG schedule 
parameters,
+        task outlets, or task inlets. Active assets (non-orphaned) have 
entries in
+        AssetActive and must have unique names and URIs.
 
         :seealso: :meth:`AssetModelOperation.activate_assets_if_possible`.
         """
         # Group assets into orphaned=True and orphaned=False groups.
         orphaned = (
-            (func.count(DagScheduleAssetReference.dag_id) + 
func.count(TaskOutletAssetReference.dag_id)) == 0
+            (
+                func.count(DagScheduleAssetReference.dag_id)
+                + func.count(TaskOutletAssetReference.dag_id)
+                + func.count(TaskInletAssetReference.dag_id)
+            )
+            == 0
         ).label("orphaned")
         asset_reference_query = session.execute(
             select(orphaned, AssetModel)
             .outerjoin(DagScheduleAssetReference)
             .outerjoin(TaskOutletAssetReference)
+            .outerjoin(TaskInletAssetReference)
             .group_by(AssetModel.id)
             .order_by(orphaned)
         )
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 1ad6ffb9984..7f815ccea73 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -6726,18 +6726,22 @@ class TestSchedulerJob:
         asset3 = Asset(uri="test://asset_3", name="test_asset_3", 
group="test_group")
         asset4 = Asset(uri="test://asset_4", name="test_asset_4", 
group="test_group")
         asset5 = Asset(uri="test://asset_5", name="test_asset_5", 
group="test_group")
+        asset6 = Asset(uri="test://asset_5", name="test_asset_5", 
group="test_group")
 
         with dag_maker(dag_id="assets-1", schedule=[asset1, asset2], 
session=session):
-            BashOperator(task_id="task", bash_command="echo 1", 
outlets=[asset3, asset4])
+            BashOperator(task_id="task", bash_command="echo 1", 
outlets=[asset3, asset4], inlets=[asset6])
 
         # asset5 is not registered (since it's not used anywhere).
         orphaned, active = self._find_assets_activation(session)
-        assert active == [asset1, asset2, asset3, asset4]
+        assert active == [asset1, asset2, asset3, asset4, asset6]
         assert orphaned == []
 
         self.job_runner._update_asset_orphanage(session=session)
         session.flush()
 
+        assert active == [asset1, asset2, asset3, asset4, asset6]
+        assert orphaned == []
+
         # Now remove 2 asset references and add asset5.
         with dag_maker(dag_id="assets-1", schedule=[asset1], session=session):
             BashOperator(task_id="task", bash_command="echo 1", 
outlets=[asset3, asset5])
diff --git a/devel-common/src/tests_common/test_utils/db.py 
b/devel-common/src/tests_common/test_utils/db.py
index 4c40cc74d00..63b9d93d3b0 100644
--- a/devel-common/src/tests_common/test_utils/db.py
+++ b/devel-common/src/tests_common/test_utils/db.py
@@ -215,6 +215,10 @@ def clear_db_assets():
         session.query(AssetDagRunQueue).delete()
         session.query(DagScheduleAssetReference).delete()
         session.query(TaskOutletAssetReference).delete()
+        if AIRFLOW_V_3_1_PLUS:
+            from airflow.models.asset import TaskInletAssetReference
+
+            session.query(TaskInletAssetReference).delete()
         from tests_common.test_utils.compat import AssetAliasModel, 
DagScheduleAssetAliasReference
 
         session.query(AssetAliasModel).delete()

Reply via email to