uranusjr commented on code in PR #58368:
URL: https://github.com/apache/airflow/pull/58368#discussion_r2545159188
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -6711,6 +6711,43 @@ def test_asset_orphaning_ignore_orphaned_assets(self,
dag_maker, session):
assert orphaned == [asset1]
assert [asset.updated_at for asset in orphaned] ==
updated_at_timestamps
+ def test_asset_orphaning_with_inlets_only(self, dag_maker, session):
+ """Test that assets used only as task inlets are NOT marked as
orphaned.
+
+ This is a regression test for issue #58303. Assets referenced only in
task
+ inlets should remain active, not be marked as orphaned.
+ """
+ self.job_runner = SchedulerJobRunner(job=Job())
+
+ asset_schedule = Asset(uri="test://asset_schedule",
name="asset_schedule")
+ asset_outlet = Asset(uri="test://asset_outlet", name="asset_outlet")
+ asset_inlet_only = Asset(uri="test://asset_inlet_only",
name="asset_inlet_only")
+
+ with dag_maker(dag_id="test-inlets", schedule=[asset_schedule],
session=session):
+ BashOperator(
+ task_id="task_with_inlet",
+ bash_command="echo 1",
+ inlets=[asset_inlet_only],
+ outlets=[asset_outlet],
+ )
+
+ # Initially, all referenced assets should be active
+ orphaned, active = self._find_assets_activation(session)
+ assert sorted(active, key=lambda a: a.uri) == sorted(
+ [asset_schedule, asset_outlet, asset_inlet_only], key=lambda a:
a.uri
+ )
Review Comment:
Please just write the right hand side list in the correct order instead of
needing to rely on `sorted()`. Same for the check below.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]