This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f9b1e81185 Further simplify asset selection (#47089)
2f9b1e81185 is described below

commit 2f9b1e81185f5610f9a81ad442e267d9fd5be4ce
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Thu Feb 27 02:13:58 2025 +0800

    Further simplify asset selection (#47089)
---
 airflow/dag_processing/collection.py | 39 +++++++++---------------------------
 1 file changed, 9 insertions(+), 30 deletions(-)

diff --git a/airflow/dag_processing/collection.py 
b/airflow/dag_processing/collection.py
index 8010e12d4eb..0e411465416 100644
--- a/airflow/dag_processing/collection.py
+++ b/airflow/dag_processing/collection.py
@@ -31,13 +31,12 @@ import logging
 import traceback
 from typing import TYPE_CHECKING, NamedTuple, cast
 
-from sqlalchemy import and_, delete, exists, func, insert, select, tuple_
+from sqlalchemy import delete, func, insert, select, tuple_
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm import joinedload, load_only
 
 from airflow.assets.manager import asset_manager
 from airflow.models.asset import (
-    AssetActive,
     AssetAliasModel,
     AssetModel,
     DagScheduleAssetAliasReference,
@@ -495,8 +494,8 @@ def _find_all_assets(dags: Iterable[MaybeSerializedDAG]) -> 
Iterator[Asset]:
     for dag in dags:
         for _, asset in dag.timetable.asset_condition.iter_assets():
             yield asset
-        for _, alias in dag.get_task_assets(of_type=Asset):
-            yield alias
+        for _, asset in dag.get_task_assets(of_type=Asset):
+            yield asset
 
 
 def _find_all_asset_aliases(dags: Iterable[MaybeSerializedDAG]) -> 
Iterator[AssetAlias]:
@@ -507,34 +506,14 @@ def _find_all_asset_aliases(dags: 
Iterable[MaybeSerializedDAG]) -> Iterator[Asse
             yield alias
 
 
-def _find_active_assets(name_uri_assets, session: Session):
-    active_dags = {
-        dm.dag_id
-        for dm in 
session.scalars(select(DagModel).where(DagModel.is_active).where(~DagModel.is_paused))
-    }
-
+def _find_active_assets(name_uri_assets: Iterable[tuple[str, str]], session: 
Session) -> set[tuple[str, str]]:
     return set(
         session.execute(
-            select(
-                AssetModel.name,
-                AssetModel.uri,
-            ).where(
+            select(AssetModel.name, AssetModel.uri).where(
                 tuple_(AssetModel.name, AssetModel.uri).in_(name_uri_assets),
-                exists(
-                    select(1).where(
-                        and_(
-                            AssetActive.name == AssetModel.name,
-                            AssetActive.uri == AssetModel.uri,
-                        ),
-                    )
-                ),
-                exists(
-                    select(1).where(
-                        and_(
-                            DagScheduleAssetReference.asset_id == 
AssetModel.id,
-                            DagScheduleAssetReference.dag_id.in_(active_dags),
-                        )
-                    )
+                AssetModel.active.has(),
+                AssetModel.consuming_dags.any(
+                    DagScheduleAssetReference.dag.has(DagModel.is_active & 
~DagModel.is_paused)
                 ),
             )
         )
@@ -749,7 +728,7 @@ class AssetModelOperation(NamedTuple):
         triggers: dict[int, dict] = {}
 
         # Optimization: if no asset collected, skip fetching active assets
-        active_assets = _find_active_assets(self.assets.keys(), 
session=session) if self.assets else {}
+        active_assets = _find_active_assets(self.assets, session=session) if 
self.assets else {}
 
         for name_uri, asset in self.assets.items():
             # If the asset belong to a DAG not active or paused, consider 
there is no watcher associated to it

Reply via email to