Lee-W commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3279683750
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py:
##########
@@ -193,13 +225,17 @@ def get_pending_partitioned_dag_run(
f"No PartitionedDagRun for dag={dag_id} partition={partition_key}",
)
- received_subq = (
- select(PartitionedAssetKeyLog.asset_id).where(
- PartitionedAssetKeyLog.asset_partition_dag_run_id ==
partitioned_dag_run.id
+ # Count received PartitionedAssetKeyLog entries per asset for this
partition
+ received_count_col = (
+ select(func.count(PartitionedAssetKeyLog.id))
+ .where(
+ PartitionedAssetKeyLog.asset_partition_dag_run_id ==
partitioned_dag_run.id,
+ PartitionedAssetKeyLog.asset_id == AssetModel.id,
)
- ).correlate(AssetModel)
-
- received_expr = exists(received_subq.where(PartitionedAssetKeyLog.asset_id
== AssetModel.id))
+ .correlate(AssetModel)
+ .scalar_subquery()
+ .label("received_count")
+ )
Review Comment:
outdated
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py:
##########
@@ -115,6 +119,62 @@ def next_run_assets(
if not event.pop("queued", None):
event["lastUpdate"] = None
+ # For partitioned DAGs: enrich events with per-asset received/required
counts,
+ # using to_upstream for rollup mappers, and fix lastUpdate for partial
receipt.
+ if is_partitioned:
+ pending_apdr = session.execute(
+ select(AssetPartitionDagRun.id, AssetPartitionDagRun.partition_key)
+ .where(
+ AssetPartitionDagRun.target_dag_id == dag_id,
+ AssetPartitionDagRun.created_dag_run_id.is_(None),
+ )
+ .order_by(AssetPartitionDagRun.created_at.desc())
+ .limit(1)
+ ).one_or_none()
+
+ if pending_apdr is not None:
+ # Count received log entries per asset for this partition
+ received_by_asset: dict[int, int] = dict(
+ session.execute(
+ select(
+ PartitionedAssetKeyLog.asset_id,
+ func.count(PartitionedAssetKeyLog.id).label("cnt"),
+ )
+ .where(PartitionedAssetKeyLog.asset_partition_dag_run_id
== pending_apdr.id)
+ .group_by(PartitionedAssetKeyLog.asset_id)
+ ).all()
Review Comment:
outdated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]