dstandish commented on code in PR #44796:
URL: https://github.com/apache/airflow/pull/44796#discussion_r1876534069


##########
airflow/dag_processing/collection.py:
##########
@@ -281,18 +281,30 @@ def _find_active_assets(name_uri_assets, session: 
Session):
         for dm in 
session.scalars(select(DagModel).where(DagModel.is_active).where(~DagModel.is_paused))
     }
 
-    return {
-        (asset_model.name, asset_model.uri)
-        for asset_model in session.scalars(
-            select(AssetModel)
-            .join(AssetActive, (AssetActive.name == AssetModel.name) & 
(AssetActive.uri == AssetModel.uri))
-            .where(tuple_(AssetActive.name, 
AssetActive.uri).in_(name_uri_assets))
-            
.where(AssetModel.consuming_dags.any(DagScheduleAssetReference.dag_id.in_(active_dags)))
-            .options(
-                
joinedload(AssetModel.consuming_dags).joinedload(DagScheduleAssetReference.dag),
+    # Check that a record in AssetActive exists
+    statement_active_exists = (
+        select(1).where(and_(AssetActive.name == AssetModel.name, 
AssetActive.uri == AssetModel.uri)).exists()
+    )
+    # Check that the asset is used to schedule a non-paused DAG
+    statement_asset_references = (
+        select(1)
+        .where(
+            and_(
+                DagScheduleAssetReference.asset_id == AssetModel.id,
+                DagScheduleAssetReference.dag_id.in_(active_dags),
             )
-        ).unique()
-    }
+        )
+        .exists()
+    )
+
+    return set(
+        session.execute(
+            select(AssetModel.name, AssetModel.uri)
+            .where(tuple_(AssetModel.name, 
AssetModel.uri).in_(name_uri_assets))
+            .where(statement_active_exists)
+            .where(statement_asset_references)
+        ).all()
+    )

Review Comment:
   Totally stylistic so, ignore away, but i think it may be more readable with 
those exists clauses inline like so
   
   ```python
       return set(
           session.execute(
               select(
                   AssetModel.name,
                   AssetModel.uri,
               ).where(
                   tuple_(AssetModel.name, AssetModel.uri).in_(name_uri_assets),
                   exists(
                       select(1).where(
                           and_(
                               AssetActive.name == AssetModel.name,
                               AssetActive.uri == AssetModel.uri,
                           ),
                       )
                   ),
                   exists(
                       select(1).where(
                           and_(
                               DagScheduleAssetReference.asset_id == 
AssetModel.id,
                               
DagScheduleAssetReference.dag_id.in_(active_dags),
                           )
                       )
                   ),
               )
           )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to