This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3165db4dfe7 refactor: Remove inactive asset check in scheduler (#55714)
3165db4dfe7 is described below
commit 3165db4dfe7568dbbe6b991bac35febc2027325d
Author: Wei Lee <[email protected]>
AuthorDate: Thu Sep 18 07:22:57 2025 +0800
refactor: Remove inactive asset check in scheduler (#55714)
* refactor: remove inactive asset check in scheduler
this was an additional safe guard and unlikely to happen even in edge cases
this check now happens in task_runner before actually running a task
now whether a task instance is affected by inactive assets is only checked
in task_runner.run
this test does not make sense anymore
---
airflow-core/src/airflow/models/taskinstance.py | 24 +++++++--------
.../versions/head/test_task_instances.py | 34 --------------------
.../tests/unit/models/test_taskinstance.py | 36 ----------------------
3 files changed, 12 insertions(+), 82 deletions(-)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 5f3eb91f865..e9806696e24 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -70,9 +70,6 @@ from airflow import settings
from airflow._shared.timezones import timezone
from airflow.assets.manager import asset_manager
from airflow.configuration import conf
-from airflow.exceptions import (
- AirflowInactiveAssetInInletOrOutletException,
-)
from airflow.listeners.listener import get_listener_manager
from airflow.models.asset import AssetEvent, AssetModel
from airflow.models.base import Base, StringID, TaskInstanceDependencies
@@ -120,7 +117,7 @@ if TYPE_CHECKING:
from airflow.models.mappedoperator import MappedOperator
from airflow.sdk import DAG
from airflow.sdk.api.datamodels._generated import AssetProfile
- from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey,
AssetUriRef
+ from airflow.sdk.definitions.asset import AssetUniqueKey
from airflow.sdk.types import RuntimeTaskInstanceProtocol
from airflow.serialization.definitions.taskgroup import SerializedTaskGroup
from airflow.serialization.serialized_objects import SerializedBaseOperator
@@ -1328,13 +1325,15 @@ class TaskInstance(Base, LoggingMixin):
if "source_alias_name" not in event
}
- bad_asset_keys: set[AssetUniqueKey | AssetNameRef | AssetUriRef] =
set()
-
for key in asset_keys:
try:
am = asset_models[key]
except KeyError:
- bad_asset_keys.add(key)
+ ti.log.warning(
+ 'Task has inactive assets "Asset(name=%s, uri=%s)" in
inlets or outlets',
+ key.name,
+ key.uri,
+ )
continue
ti.log.debug("register event for asset %s", am)
asset_manager.register_asset_change(
@@ -1351,7 +1350,9 @@ class TaskInstance(Base, LoggingMixin):
try:
am = asset_models_by_name[nref.name]
except KeyError:
- bad_asset_keys.add(nref)
+ ti.log.warning(
+ 'Task has inactive assets "Asset.ref(name=%s)" in
inlets or outlets', nref.name
+ )
continue
ti.log.debug("register event for asset name ref %s", am)
asset_manager.register_asset_change(
@@ -1367,7 +1368,9 @@ class TaskInstance(Base, LoggingMixin):
try:
am = asset_models_by_uri[uref.uri]
except KeyError:
- bad_asset_keys.add(uref)
+ ti.log.warning(
+ 'Task has inactive assets "Asset.ref(uri=%s)" in
inlets or outlets', uref.uri
+ )
continue
ti.log.debug("register event for asset uri ref %s", am)
asset_manager.register_asset_change(
@@ -1414,9 +1417,6 @@ class TaskInstance(Base, LoggingMixin):
session=session,
)
- if bad_asset_keys:
- raise AirflowInactiveAssetInInletOrOutletException(bad_asset_keys)
-
@provide_session
def update_rtif(self, rendered_fields, session: Session = NEW_SESSION):
from airflow.models.renderedtifields import RenderedTaskInstanceFields
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 4efaaa75eee..66ce18e2d35 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -782,40 +782,6 @@ class TestTIUpdateState:
assert event[0].asset == AssetModel(name="my-task",
uri="s3://bucket/my-task", extra={})
assert event[0].extra == expected_extra
- def test_ti_update_state_to_failed_with_inactive_asset(self, client,
session, create_task_instance):
- # inactive
- asset = AssetModel(
- id=1,
- name="my-task-2",
- uri="s3://bucket/my-task",
- group="asset",
- extra={},
- )
- session.add(asset)
-
- ti = create_task_instance(
- task_id="test_ti_update_state_to_success_with_asset_events",
- start_date=DEFAULT_START_DATE,
- state=State.RUNNING,
- )
- session.commit()
-
- response = client.patch(
- f"/execution/task-instances/{ti.id}/state",
- json={
- "state": "success",
- "end_date": DEFAULT_END_DATE.isoformat(),
- "task_outlets": [{"name": "my-task-2", "uri":
"s3://bucket/my-task", "type": "Asset"}],
- "outlet_events": [],
- },
- )
-
- assert response.status_code == 204
- session.expire_all()
-
- ti = session.get(TaskInstance, ti.id)
- assert ti.state == State.FAILED
-
@pytest.mark.parametrize(
"outlet_events, expected_extra",
[
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py
b/airflow-core/tests/unit/models/test_taskinstance.py
index 315d3c4cc7a..74260a05a11 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -38,7 +38,6 @@ from airflow._shared.timezones import timezone
from airflow.exceptions import (
AirflowException,
AirflowFailException,
- AirflowInactiveAssetInInletOrOutletException,
AirflowSkipException,
)
from airflow.models.asset import AssetActive, AssetAliasModel, AssetEvent,
AssetModel
@@ -2688,41 +2687,6 @@ class TestTaskInstance:
# the new try_id should be different from what's recorded in tih
assert str(tih[0].task_instance_id) == try_id
- @pytest.mark.skip(
- reason="This test has some issues that were surfaced when dag_maker
started allowing multiple serdag versions. Issue #48539 will track fixing this."
- )
- def test_run_with_inactive_assets(self, dag_maker, session):
- from airflow.sdk.definitions.asset import Asset
-
- with dag_maker(schedule=None, serialized=True, session=session):
-
- @task(outlets=Asset("asset_first"))
- def first_asset_task(*, outlet_events):
- outlet_events[Asset("asset_first")].extra = {"foo": "bar"}
-
- first_asset_task()
-
- with dag_maker(schedule=None, serialized=True, session=session):
-
- @task(inlets=Asset("asset_second"))
- def asset_task_in_inlet():
- pass
-
- @task(outlets=Asset(name="asset_first", uri="test://asset"),
inlets=Asset("asset_second"))
- def duplicate_asset_task_in_outlet(*, outlet_events):
- outlet_events[Asset(name="asset_first",
uri="test://asset")].extra = {"foo": "bar"}
-
- duplicate_asset_task_in_outlet() >> asset_task_in_inlet()
-
- tis = {ti.task_id: ti for ti in
dag_maker.create_dagrun().task_instances}
-
- tis["asset_task_in_inlet"].run(session=session)
- with pytest.raises(AirflowInactiveAssetInInletOrOutletException) as
exc:
- tis["duplicate_asset_task_in_outlet"].run(session=session)
-
- assert "Asset(name='asset_second', uri='asset_second')" in
str(exc.value)
- assert "Asset(name='asset_first', uri='test://asset/')" in
str(exc.value)
-
@pytest.mark.parametrize("pool_override", [None, "test_pool2"])
@pytest.mark.parametrize("queue_by_policy", [None, "forced_queue"])