This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2f9b1e81185 Further simplify asset selection (#47089)
2f9b1e81185 is described below
commit 2f9b1e81185f5610f9a81ad442e267d9fd5be4ce
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Thu Feb 27 02:13:58 2025 +0800
Further simplify asset selection (#47089)
---
airflow/dag_processing/collection.py | 39 +++++++++---------------------------
1 file changed, 9 insertions(+), 30 deletions(-)
diff --git a/airflow/dag_processing/collection.py
b/airflow/dag_processing/collection.py
index 8010e12d4eb..0e411465416 100644
--- a/airflow/dag_processing/collection.py
+++ b/airflow/dag_processing/collection.py
@@ -31,13 +31,12 @@ import logging
import traceback
from typing import TYPE_CHECKING, NamedTuple, cast
-from sqlalchemy import and_, delete, exists, func, insert, select, tuple_
+from sqlalchemy import delete, func, insert, select, tuple_
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import joinedload, load_only
from airflow.assets.manager import asset_manager
from airflow.models.asset import (
- AssetActive,
AssetAliasModel,
AssetModel,
DagScheduleAssetAliasReference,
@@ -495,8 +494,8 @@ def _find_all_assets(dags: Iterable[MaybeSerializedDAG]) ->
Iterator[Asset]:
for dag in dags:
for _, asset in dag.timetable.asset_condition.iter_assets():
yield asset
- for _, alias in dag.get_task_assets(of_type=Asset):
- yield alias
+ for _, asset in dag.get_task_assets(of_type=Asset):
+ yield asset
def _find_all_asset_aliases(dags: Iterable[MaybeSerializedDAG]) ->
Iterator[AssetAlias]:
@@ -507,34 +506,14 @@ def _find_all_asset_aliases(dags:
Iterable[MaybeSerializedDAG]) -> Iterator[Asse
yield alias
-def _find_active_assets(name_uri_assets, session: Session):
- active_dags = {
- dm.dag_id
- for dm in
session.scalars(select(DagModel).where(DagModel.is_active).where(~DagModel.is_paused))
- }
-
+def _find_active_assets(name_uri_assets: Iterable[tuple[str, str]], session:
Session) -> set[tuple[str, str]]:
return set(
session.execute(
- select(
- AssetModel.name,
- AssetModel.uri,
- ).where(
+ select(AssetModel.name, AssetModel.uri).where(
tuple_(AssetModel.name, AssetModel.uri).in_(name_uri_assets),
- exists(
- select(1).where(
- and_(
- AssetActive.name == AssetModel.name,
- AssetActive.uri == AssetModel.uri,
- ),
- )
- ),
- exists(
- select(1).where(
- and_(
- DagScheduleAssetReference.asset_id ==
AssetModel.id,
- DagScheduleAssetReference.dag_id.in_(active_dags),
- )
- )
+ AssetModel.active.has(),
+ AssetModel.consuming_dags.any(
+ DagScheduleAssetReference.dag.has(DagModel.is_active &
~DagModel.is_paused)
),
)
)
@@ -749,7 +728,7 @@ class AssetModelOperation(NamedTuple):
triggers: dict[int, dict] = {}
# Optimization: if no asset collected, skip fetching active assets
- active_assets = _find_active_assets(self.assets.keys(),
session=session) if self.assets else {}
+ active_assets = _find_active_assets(self.assets, session=session) if
self.assets else {}
for name_uri, asset in self.assets.items():
# If the asset belong to a DAG not active or paused, consider
there is no watcher associated to it