Lee-W commented on code in PR #58368:
URL: https://github.com/apache/airflow/pull/58368#discussion_r2576392777
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -6806,6 +6806,47 @@ def test_asset_orphaning_ignore_orphaned_assets(self,
dag_maker, session):
assert orphaned == [asset1]
assert [asset.updated_at for asset in orphaned] ==
updated_at_timestamps
+ def test_asset_orphaning_with_inlets_only(self, dag_maker, session):
+ """Test that assets used only as task inlets are NOT marked as
orphaned.
+
+ This is a regression test for issue #58303. Assets referenced only in
task
+ inlets should remain active, not be marked as orphaned.
+ """
+ self.job_runner = SchedulerJobRunner(job=Job())
+
+ asset_schedule = Asset(uri="test://asset_schedule",
name="asset_schedule")
+ asset_outlet = Asset(uri="test://asset_outlet", name="asset_outlet")
+ asset_inlet_only = Asset(uri="test://asset_inlet_only",
name="asset_inlet_only")
+
+ with dag_maker(dag_id="test-inlets", schedule=[asset_schedule],
session=session):
+ BashOperator(
+ task_id="task_with_inlet",
+ bash_command="echo 1",
+ inlets=[asset_inlet_only],
+ outlets=[asset_outlet],
+ )
+
+ # Initially, all referenced assets should be active
+ orphaned, active = self._find_assets_activation(session)
+ assert {a.uri for a in active} == {
+ asset_schedule.uri,
+ asset_outlet.uri,
+ asset_inlet_only.uri,
+ }
+ assert orphaned == []
+
+ self.job_runner._update_asset_orphanage(session=session)
+ session.flush()
+
+ # After orphanage check, inlet-only asset should still be active
+ orphaned, active = self._find_assets_activation(session)
Review Comment:
Would be nice if we could improve the test by making one of them inactive to
ensure `_find_assets_activation` works fine. If `_find_assets_activation` is
covered elsewhere, we might be more ok with what we have now
##########
devel-common/src/tests_common/test_utils/db.py:
##########
@@ -227,6 +227,10 @@ def clear_db_assets():
session.query(AssetDagRunQueue).delete()
session.query(DagScheduleAssetReference).delete()
session.query(TaskOutletAssetReference).delete()
+ if AIRFLOW_V_3_1_PLUS:
Review Comment:
Why do we need to make it `v_3_1` plus? Would it be an error for earlier
version?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]