This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3165db4dfe7 refactor: Remove inactive asset check in scheduler (#55714)
3165db4dfe7 is described below

commit 3165db4dfe7568dbbe6b991bac35febc2027325d
Author: Wei Lee <[email protected]>
AuthorDate: Thu Sep 18 07:22:57 2025 +0800

    refactor: Remove inactive asset check in scheduler (#55714)
    
    * refactor: remove inactive asset check in scheduler
    
    this was an additional safe guard and unlikely to happen even in edge cases
    this check now happens in task_runner before actually running a task
    
    
    now whether a task instance is affected by inactive assets is only checked 
in task_runner.run
    this test does not make sense anymore
---
 airflow-core/src/airflow/models/taskinstance.py    | 24 +++++++--------
 .../versions/head/test_task_instances.py           | 34 --------------------
 .../tests/unit/models/test_taskinstance.py         | 36 ----------------------
 3 files changed, 12 insertions(+), 82 deletions(-)

diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 5f3eb91f865..e9806696e24 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -70,9 +70,6 @@ from airflow import settings
 from airflow._shared.timezones import timezone
 from airflow.assets.manager import asset_manager
 from airflow.configuration import conf
-from airflow.exceptions import (
-    AirflowInactiveAssetInInletOrOutletException,
-)
 from airflow.listeners.listener import get_listener_manager
 from airflow.models.asset import AssetEvent, AssetModel
 from airflow.models.base import Base, StringID, TaskInstanceDependencies
@@ -120,7 +117,7 @@ if TYPE_CHECKING:
     from airflow.models.mappedoperator import MappedOperator
     from airflow.sdk import DAG
     from airflow.sdk.api.datamodels._generated import AssetProfile
-    from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, 
AssetUriRef
+    from airflow.sdk.definitions.asset import AssetUniqueKey
     from airflow.sdk.types import RuntimeTaskInstanceProtocol
     from airflow.serialization.definitions.taskgroup import SerializedTaskGroup
     from airflow.serialization.serialized_objects import SerializedBaseOperator
@@ -1328,13 +1325,15 @@ class TaskInstance(Base, LoggingMixin):
             if "source_alias_name" not in event
         }
 
-        bad_asset_keys: set[AssetUniqueKey | AssetNameRef | AssetUriRef] = 
set()
-
         for key in asset_keys:
             try:
                 am = asset_models[key]
             except KeyError:
-                bad_asset_keys.add(key)
+                ti.log.warning(
+                    'Task has inactive assets "Asset(name=%s, uri=%s)" in 
inlets or outlets',
+                    key.name,
+                    key.uri,
+                )
                 continue
             ti.log.debug("register event for asset %s", am)
             asset_manager.register_asset_change(
@@ -1351,7 +1350,9 @@ class TaskInstance(Base, LoggingMixin):
                 try:
                     am = asset_models_by_name[nref.name]
                 except KeyError:
-                    bad_asset_keys.add(nref)
+                    ti.log.warning(
+                        'Task has inactive assets "Asset.ref(name=%s)" in 
inlets or outlets', nref.name
+                    )
                     continue
                 ti.log.debug("register event for asset name ref %s", am)
                 asset_manager.register_asset_change(
@@ -1367,7 +1368,9 @@ class TaskInstance(Base, LoggingMixin):
                 try:
                     am = asset_models_by_uri[uref.uri]
                 except KeyError:
-                    bad_asset_keys.add(uref)
+                    ti.log.warning(
+                        'Task has inactive assets "Asset.ref(uri=%s)" in 
inlets or outlets', uref.uri
+                    )
                     continue
                 ti.log.debug("register event for asset uri ref %s", am)
                 asset_manager.register_asset_change(
@@ -1414,9 +1417,6 @@ class TaskInstance(Base, LoggingMixin):
                         session=session,
                     )
 
-        if bad_asset_keys:
-            raise AirflowInactiveAssetInInletOrOutletException(bad_asset_keys)
-
     @provide_session
     def update_rtif(self, rendered_fields, session: Session = NEW_SESSION):
         from airflow.models.renderedtifields import RenderedTaskInstanceFields
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 4efaaa75eee..66ce18e2d35 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -782,40 +782,6 @@ class TestTIUpdateState:
         assert event[0].asset == AssetModel(name="my-task", 
uri="s3://bucket/my-task", extra={})
         assert event[0].extra == expected_extra
 
-    def test_ti_update_state_to_failed_with_inactive_asset(self, client, 
session, create_task_instance):
-        # inactive
-        asset = AssetModel(
-            id=1,
-            name="my-task-2",
-            uri="s3://bucket/my-task",
-            group="asset",
-            extra={},
-        )
-        session.add(asset)
-
-        ti = create_task_instance(
-            task_id="test_ti_update_state_to_success_with_asset_events",
-            start_date=DEFAULT_START_DATE,
-            state=State.RUNNING,
-        )
-        session.commit()
-
-        response = client.patch(
-            f"/execution/task-instances/{ti.id}/state",
-            json={
-                "state": "success",
-                "end_date": DEFAULT_END_DATE.isoformat(),
-                "task_outlets": [{"name": "my-task-2", "uri": 
"s3://bucket/my-task", "type": "Asset"}],
-                "outlet_events": [],
-            },
-        )
-
-        assert response.status_code == 204
-        session.expire_all()
-
-        ti = session.get(TaskInstance, ti.id)
-        assert ti.state == State.FAILED
-
     @pytest.mark.parametrize(
         "outlet_events, expected_extra",
         [
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py 
b/airflow-core/tests/unit/models/test_taskinstance.py
index 315d3c4cc7a..74260a05a11 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -38,7 +38,6 @@ from airflow._shared.timezones import timezone
 from airflow.exceptions import (
     AirflowException,
     AirflowFailException,
-    AirflowInactiveAssetInInletOrOutletException,
     AirflowSkipException,
 )
 from airflow.models.asset import AssetActive, AssetAliasModel, AssetEvent, 
AssetModel
@@ -2688,41 +2687,6 @@ class TestTaskInstance:
         # the new try_id should be different from what's recorded in tih
         assert str(tih[0].task_instance_id) == try_id
 
-    @pytest.mark.skip(
-        reason="This test has some issues that were surfaced when dag_maker 
started allowing multiple serdag versions. Issue #48539 will track fixing this."
-    )
-    def test_run_with_inactive_assets(self, dag_maker, session):
-        from airflow.sdk.definitions.asset import Asset
-
-        with dag_maker(schedule=None, serialized=True, session=session):
-
-            @task(outlets=Asset("asset_first"))
-            def first_asset_task(*, outlet_events):
-                outlet_events[Asset("asset_first")].extra = {"foo": "bar"}
-
-            first_asset_task()
-
-        with dag_maker(schedule=None, serialized=True, session=session):
-
-            @task(inlets=Asset("asset_second"))
-            def asset_task_in_inlet():
-                pass
-
-            @task(outlets=Asset(name="asset_first", uri="test://asset"), 
inlets=Asset("asset_second"))
-            def duplicate_asset_task_in_outlet(*, outlet_events):
-                outlet_events[Asset(name="asset_first", 
uri="test://asset")].extra = {"foo": "bar"}
-
-            duplicate_asset_task_in_outlet() >> asset_task_in_inlet()
-
-        tis = {ti.task_id: ti for ti in 
dag_maker.create_dagrun().task_instances}
-
-        tis["asset_task_in_inlet"].run(session=session)
-        with pytest.raises(AirflowInactiveAssetInInletOrOutletException) as 
exc:
-            tis["duplicate_asset_task_in_outlet"].run(session=session)
-
-        assert "Asset(name='asset_second', uri='asset_second')" in 
str(exc.value)
-        assert "Asset(name='asset_first', uri='test://asset/')" in 
str(exc.value)
-
 
 @pytest.mark.parametrize("pool_override", [None, "test_pool2"])
 @pytest.mark.parametrize("queue_by_policy", [None, "forced_queue"])

Reply via email to