Asquator commented on code in PR #62114:
URL: https://github.com/apache/airflow/pull/62114#discussion_r2879544521
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3028,44 +3021,41 @@ def _update_asset_orphanage(self, session: Session =
NEW_SESSION) -> None:
)
== 0
).label("orphaned")
- asset_reference_query = session.execute(
- select(orphaned, AssetModel)
+ asset_reference_query = (
+ select(AssetModel)
.outerjoin(DagScheduleAssetReference)
.outerjoin(TaskOutletAssetReference)
.outerjoin(TaskInletAssetReference)
.group_by(AssetModel.id)
- .order_by(orphaned)
Review Comment:
Same as https://github.com/apache/airflow/pull/62114/changes#r2879534892,
but I get you.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3028,44 +3021,41 @@ def _update_asset_orphanage(self, session: Session =
NEW_SESSION) -> None:
)
== 0
).label("orphaned")
- asset_reference_query = session.execute(
- select(orphaned, AssetModel)
+ asset_reference_query = (
+ select(AssetModel)
.outerjoin(DagScheduleAssetReference)
.outerjoin(TaskOutletAssetReference)
.outerjoin(TaskInletAssetReference)
.group_by(AssetModel.id)
- .order_by(orphaned)
)
- asset_orphanation: dict[bool, Collection[AssetModel]] = {
- orphaned: [asset for _, asset in group]
- for orphaned, group in itertools.groupby(asset_reference_query,
key=operator.itemgetter(0))
- }
- self._orphan_unreferenced_assets(asset_orphanation.get(True, ()),
session=session)
- self._activate_referenced_assets(asset_orphanation.get(False, ()),
session=session)
+
+ orphan_query = asset_reference_query.having(orphaned).cte()
+ activate_query = asset_reference_query.having(~orphaned).cte()
+
+ self._orphan_unreferenced_assets(orphan_query, session=session)
+ self._activate_referenced_assets(activate_query, session=session)
@staticmethod
- def _orphan_unreferenced_assets(assets: Collection[AssetModel], *,
session: Session) -> None:
- if assets:
- session.execute(
- delete(AssetActive).where(
- tuple_(AssetActive.name, AssetActive.uri).in_((a.name,
a.uri) for a in assets)
+ def _orphan_unreferenced_assets(assets_query: CTE, *, session: Session) ->
None:
Review Comment:
I'd prioritize scheduler simplicity over avoiding an additional patch, but
if you insist, we can hear others' opinions.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3028,44 +3021,41 @@ def _update_asset_orphanage(self, session: Session =
NEW_SESSION) -> None:
)
== 0
).label("orphaned")
- asset_reference_query = session.execute(
- select(orphaned, AssetModel)
+ asset_reference_query = (
+ select(AssetModel)
.outerjoin(DagScheduleAssetReference)
.outerjoin(TaskOutletAssetReference)
.outerjoin(TaskInletAssetReference)
.group_by(AssetModel.id)
- .order_by(orphaned)
)
- asset_orphanation: dict[bool, Collection[AssetModel]] = {
- orphaned: [asset for _, asset in group]
- for orphaned, group in itertools.groupby(asset_reference_query,
key=operator.itemgetter(0))
- }
- self._orphan_unreferenced_assets(asset_orphanation.get(True, ()),
session=session)
- self._activate_referenced_assets(asset_orphanation.get(False, ()),
session=session)
+
+ orphan_query = asset_reference_query.having(orphaned).cte()
+ activate_query = asset_reference_query.having(~orphaned).cte()
+
+ self._orphan_unreferenced_assets(orphan_query, session=session)
+ self._activate_referenced_assets(activate_query, session=session)
@staticmethod
- def _orphan_unreferenced_assets(assets: Collection[AssetModel], *,
session: Session) -> None:
- if assets:
- session.execute(
- delete(AssetActive).where(
- tuple_(AssetActive.name, AssetActive.uri).in_((a.name,
a.uri) for a in assets)
+ def _orphan_unreferenced_assets(assets_query: CTE, *, session: Session) ->
None:
+ deleted_orphaned_assets = session.execute(
+ delete(AssetActive).where(
+ exists().where(
+ and_(AssetActive.name == assets_query.c.name,
AssetActive.uri == assets_query.c.uri)
)
)
- Stats.gauge("asset.orphaned", len(assets))
+ )
- @staticmethod
- def _activate_referenced_assets(assets: Collection[AssetModel], *,
session: Session) -> None:
- if not assets:
- return
+ Stats.gauge("asset.orphaned", getattr(deleted_orphaned_assets,
"rowcount") or 0)
- active_assets = set(
- session.execute(
- select(AssetActive.name, AssetActive.uri).where(
- tuple_(AssetActive.name, AssetActive.uri).in_((a.name,
a.uri) for a in assets)
- )
- )
+ @staticmethod
+ def _activate_referenced_assets(assets_query: CTE, *, session: Session) ->
None:
+ active_assets_query = select(AssetActive.name, AssetActive.uri).join(
+ assets_query,
+ and_(AssetActive.name == assets_query.c.name, AssetActive.uri ==
assets_query.c.uri),
Review Comment:
Same as https://github.com/apache/airflow/pull/62114/changes#r2879534892,
but I get you.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]