This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 06ad86039da2d34fd372c9885f9a0974ec34e3b1 Author: Wei Lee <[email protected]> AuthorDate: Thu Sep 18 07:22:57 2025 +0800 refactor: Remove inactive asset check in scheduler (#55714) * refactor: remove inactive asset check in scheduler this was an additional safe guard and unlikely to happen even in edge cases this check now happens in task_runner before actually running a task now whether a task instance is affected by inactive assets is only checked in task_runner.run this test does not make sense anymore (cherry picked from commit 3165db4dfe7568dbbe6b991bac35febc2027325d) --- airflow-core/src/airflow/models/taskinstance.py | 24 +++++++-------- .../versions/head/test_task_instances.py | 34 -------------------- .../tests/unit/models/test_taskinstance.py | 36 ---------------------- 3 files changed, 12 insertions(+), 82 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 5f3eb91f865..e9806696e24 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -70,9 +70,6 @@ from airflow import settings from airflow._shared.timezones import timezone from airflow.assets.manager import asset_manager from airflow.configuration import conf -from airflow.exceptions import ( - AirflowInactiveAssetInInletOrOutletException, -) from airflow.listeners.listener import get_listener_manager from airflow.models.asset import AssetEvent, AssetModel from airflow.models.base import Base, StringID, TaskInstanceDependencies @@ -120,7 +117,7 @@ if TYPE_CHECKING: from airflow.models.mappedoperator import MappedOperator from airflow.sdk import DAG from airflow.sdk.api.datamodels._generated import AssetProfile - from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef + from airflow.sdk.definitions.asset import AssetUniqueKey from airflow.sdk.types import RuntimeTaskInstanceProtocol from airflow.serialization.definitions.taskgroup import SerializedTaskGroup from airflow.serialization.serialized_objects import SerializedBaseOperator @@ -1328,13 +1325,15 @@ class TaskInstance(Base, LoggingMixin): if "source_alias_name" not in event } - bad_asset_keys: set[AssetUniqueKey | AssetNameRef | AssetUriRef] = set() - for key in asset_keys: try: am = asset_models[key] except KeyError: - bad_asset_keys.add(key) + ti.log.warning( + 'Task has inactive assets "Asset(name=%s, uri=%s)" in inlets or outlets', + key.name, + key.uri, + ) continue ti.log.debug("register event for asset %s", am) asset_manager.register_asset_change( @@ -1351,7 +1350,9 @@ class TaskInstance(Base, LoggingMixin): try: am = asset_models_by_name[nref.name] except KeyError: - bad_asset_keys.add(nref) + ti.log.warning( + 'Task has inactive assets "Asset.ref(name=%s)" in inlets or outlets', nref.name + ) continue ti.log.debug("register event for asset name ref %s", am) asset_manager.register_asset_change( @@ -1367,7 +1368,9 @@ class TaskInstance(Base, LoggingMixin): try: am = asset_models_by_uri[uref.uri] except KeyError: - bad_asset_keys.add(uref) + ti.log.warning( + 'Task has inactive assets "Asset.ref(uri=%s)" in inlets or outlets', uref.uri + ) continue ti.log.debug("register event for asset uri ref %s", am) asset_manager.register_asset_change( @@ -1414,9 +1417,6 @@ class TaskInstance(Base, LoggingMixin): session=session, ) - if bad_asset_keys: - raise AirflowInactiveAssetInInletOrOutletException(bad_asset_keys) - @provide_session def update_rtif(self, rendered_fields, session: Session = NEW_SESSION): from airflow.models.renderedtifields import RenderedTaskInstanceFields diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index 4efaaa75eee..66ce18e2d35 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -782,40 +782,6 @@ class TestTIUpdateState: assert event[0].asset == AssetModel(name="my-task", uri="s3://bucket/my-task", extra={}) assert event[0].extra == expected_extra - def test_ti_update_state_to_failed_with_inactive_asset(self, client, session, create_task_instance): - # inactive - asset = AssetModel( - id=1, - name="my-task-2", - uri="s3://bucket/my-task", - group="asset", - extra={}, - ) - session.add(asset) - - ti = create_task_instance( - task_id="test_ti_update_state_to_success_with_asset_events", - start_date=DEFAULT_START_DATE, - state=State.RUNNING, - ) - session.commit() - - response = client.patch( - f"/execution/task-instances/{ti.id}/state", - json={ - "state": "success", - "end_date": DEFAULT_END_DATE.isoformat(), - "task_outlets": [{"name": "my-task-2", "uri": "s3://bucket/my-task", "type": "Asset"}], - "outlet_events": [], - }, - ) - - assert response.status_code == 204 - session.expire_all() - - ti = session.get(TaskInstance, ti.id) - assert ti.state == State.FAILED - @pytest.mark.parametrize( "outlet_events, expected_extra", [ diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 315d3c4cc7a..74260a05a11 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -38,7 +38,6 @@ from airflow._shared.timezones import timezone from airflow.exceptions import ( AirflowException, AirflowFailException, - AirflowInactiveAssetInInletOrOutletException, AirflowSkipException, ) from airflow.models.asset import AssetActive, AssetAliasModel, AssetEvent, AssetModel @@ -2688,41 +2687,6 @@ class TestTaskInstance: # the new try_id should be different from what's recorded in tih assert str(tih[0].task_instance_id) == try_id - @pytest.mark.skip( - reason="This test has some issues that were surfaced when dag_maker started allowing multiple serdag versions. Issue #48539 will track fixing this." - ) - def test_run_with_inactive_assets(self, dag_maker, session): - from airflow.sdk.definitions.asset import Asset - - with dag_maker(schedule=None, serialized=True, session=session): - - @task(outlets=Asset("asset_first")) - def first_asset_task(*, outlet_events): - outlet_events[Asset("asset_first")].extra = {"foo": "bar"} - - first_asset_task() - - with dag_maker(schedule=None, serialized=True, session=session): - - @task(inlets=Asset("asset_second")) - def asset_task_in_inlet(): - pass - - @task(outlets=Asset(name="asset_first", uri="test://asset"), inlets=Asset("asset_second")) - def duplicate_asset_task_in_outlet(*, outlet_events): - outlet_events[Asset(name="asset_first", uri="test://asset")].extra = {"foo": "bar"} - - duplicate_asset_task_in_outlet() >> asset_task_in_inlet() - - tis = {ti.task_id: ti for ti in dag_maker.create_dagrun().task_instances} - - tis["asset_task_in_inlet"].run(session=session) - with pytest.raises(AirflowInactiveAssetInInletOrOutletException) as exc: - tis["duplicate_asset_task_in_outlet"].run(session=session) - - assert "Asset(name='asset_second', uri='asset_second')" in str(exc.value) - assert "Asset(name='asset_first', uri='test://asset/')" in str(exc.value) - @pytest.mark.parametrize("pool_override", [None, "test_pool2"]) @pytest.mark.parametrize("queue_by_policy", [None, "forced_queue"])
