dstandish commented on code in PR #44796:
URL: https://github.com/apache/airflow/pull/44796#discussion_r1876534069
##########
airflow/dag_processing/collection.py:
##########
@@ -281,18 +281,30 @@ def _find_active_assets(name_uri_assets, session:
Session):
for dm in
session.scalars(select(DagModel).where(DagModel.is_active).where(~DagModel.is_paused))
}
- return {
- (asset_model.name, asset_model.uri)
- for asset_model in session.scalars(
- select(AssetModel)
- .join(AssetActive, (AssetActive.name == AssetModel.name) &
(AssetActive.uri == AssetModel.uri))
- .where(tuple_(AssetActive.name,
AssetActive.uri).in_(name_uri_assets))
-
.where(AssetModel.consuming_dags.any(DagScheduleAssetReference.dag_id.in_(active_dags)))
- .options(
-
joinedload(AssetModel.consuming_dags).joinedload(DagScheduleAssetReference.dag),
+ # Check that a record in AssetActive exists
+ statement_active_exists = (
+ select(1).where(and_(AssetActive.name == AssetModel.name,
AssetActive.uri == AssetModel.uri)).exists()
+ )
+ # Check that the asset is used to schedule a non-paused DAG
+ statement_asset_references = (
+ select(1)
+ .where(
+ and_(
+ DagScheduleAssetReference.asset_id == AssetModel.id,
+ DagScheduleAssetReference.dag_id.in_(active_dags),
)
- ).unique()
- }
+ )
+ .exists()
+ )
+
+ return set(
+ session.execute(
+ select(AssetModel.name, AssetModel.uri)
+ .where(tuple_(AssetModel.name,
AssetModel.uri).in_(name_uri_assets))
+ .where(statement_active_exists)
+ .where(statement_asset_references)
+ ).all()
+ )
Review Comment:
Totally stylistic so, ignore away, but i think it may be more readable with
those exists clauses inline like so
```python
return set(
session.execute(
select(
AssetModel.name,
AssetModel.uri,
).where(
tuple_(AssetModel.name, AssetModel.uri).in_(name_uri_assets),
exists(
select(1).where(
and_(
AssetActive.name == AssetModel.name,
AssetActive.uri == AssetModel.uri,
),
)
),
exists(
select(1).where(
and_(
DagScheduleAssetReference.asset_id ==
AssetModel.id,
DagScheduleAssetReference.dag_id.in_(active_dags),
)
)
),
)
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]