This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 24e682c593c Fix assets used only as inlets being incorrectly orphaned
(#58303) (#58368)
24e682c593c is described below
commit 24e682c593ce81a92917488f5c9a12e8eff9d471
Author: Mykola Shyshov <[email protected]>
AuthorDate: Wed Dec 3 03:13:22 2025 +0200
Fix assets used only as inlets being incorrectly orphaned (#58303) (#58368)
---
airflow-core/src/airflow/jobs/scheduler_job_runner.py | 15 +++++++++++----
airflow-core/tests/unit/jobs/test_scheduler_job.py | 8 ++++++--
devel-common/src/tests_common/test_utils/db.py | 4 ++++
3 files changed, 21 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index b0a8ef53f9f..87ed4646774 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -62,6 +62,7 @@ from airflow.models.asset import (
AssetWatcherModel,
DagScheduleAssetAliasReference,
DagScheduleAssetReference,
+ TaskInletAssetReference,
TaskOutletAssetReference,
)
from airflow.models.backfill import Backfill
@@ -2733,20 +2734,26 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
"""
Check assets orphanization and update their active entry.
- An orphaned asset is no longer referenced in any DAG schedule
parameters
- or task outlets. Active assets (non-orphaned) have entries in
AssetActive
- and must have unique names and URIs.
+ An orphaned asset is no longer referenced in any DAG schedule
parameters,
+ task outlets, or task inlets. Active assets (non-orphaned) have
entries in
+ AssetActive and must have unique names and URIs.
:seealso: :meth:`AssetModelOperation.activate_assets_if_possible`.
"""
# Group assets into orphaned=True and orphaned=False groups.
orphaned = (
- (func.count(DagScheduleAssetReference.dag_id) +
func.count(TaskOutletAssetReference.dag_id)) == 0
+ (
+ func.count(DagScheduleAssetReference.dag_id)
+ + func.count(TaskOutletAssetReference.dag_id)
+ + func.count(TaskInletAssetReference.dag_id)
+ )
+ == 0
).label("orphaned")
asset_reference_query = session.execute(
select(orphaned, AssetModel)
.outerjoin(DagScheduleAssetReference)
.outerjoin(TaskOutletAssetReference)
+ .outerjoin(TaskInletAssetReference)
.group_by(AssetModel.id)
.order_by(orphaned)
)
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 0bfd3128fe0..7779e3b519d 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -6823,18 +6823,22 @@ class TestSchedulerJob:
asset3 = Asset(uri="test://asset_3", name="test_asset_3",
group="test_group")
asset4 = Asset(uri="test://asset_4", name="test_asset_4",
group="test_group")
asset5 = Asset(uri="test://asset_5", name="test_asset_5",
group="test_group")
+ asset6 = Asset(uri="test://asset_5", name="test_asset_5",
group="test_group")
with dag_maker(dag_id="assets-1", schedule=[asset1, asset2],
session=session):
- BashOperator(task_id="task", bash_command="echo 1",
outlets=[asset3, asset4])
+ BashOperator(task_id="task", bash_command="echo 1",
outlets=[asset3, asset4], inlets=[asset6])
# asset5 is not registered (since it's not used anywhere).
orphaned, active = self._find_assets_activation(session)
- assert active == [asset1, asset2, asset3, asset4]
+ assert active == [asset1, asset2, asset3, asset4, asset6]
assert orphaned == []
self.job_runner._update_asset_orphanage(session=session)
session.flush()
+ assert active == [asset1, asset2, asset3, asset4, asset6]
+ assert orphaned == []
+
# Now remove 2 asset references and add asset5.
with dag_maker(dag_id="assets-1", schedule=[asset1], session=session):
BashOperator(task_id="task", bash_command="echo 1",
outlets=[asset3, asset5])
diff --git a/devel-common/src/tests_common/test_utils/db.py
b/devel-common/src/tests_common/test_utils/db.py
index 12a8edb40b6..89a79f91216 100644
--- a/devel-common/src/tests_common/test_utils/db.py
+++ b/devel-common/src/tests_common/test_utils/db.py
@@ -227,6 +227,10 @@ def clear_db_assets():
session.query(AssetDagRunQueue).delete()
session.query(DagScheduleAssetReference).delete()
session.query(TaskOutletAssetReference).delete()
+ if AIRFLOW_V_3_1_PLUS:
+ from airflow.models.asset import TaskInletAssetReference
+
+ session.query(TaskInletAssetReference).delete()
from tests_common.test_utils.compat import AssetAliasModel,
DagScheduleAssetAliasReference
session.query(AssetAliasModel).delete()