Lee-W commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3322957313


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py:
##########
@@ -38,23 +45,185 @@
 )
 from airflow.models import DagModel
 from airflow.models.asset import (
+    AssetActive,
     AssetModel,
     AssetPartitionDagRun,
     DagScheduleAssetReference,
     PartitionedAssetKeyLog,
 )
 from airflow.models.dagrun import DagRun
 
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Session
+
+    from airflow.partition_mappers.base import RollupMapper
+    from airflow.timetables.simple import PartitionedAssetTimetable
+
+
+log = structlog.get_logger(logger_name=__name__)
+
+
+AssetNameUri: TypeAlias = tuple[str, str]
+"""A ``(name, uri)`` pair identifying an asset."""
+
+
+def _fetch_active_assets_per_dag(
+    dag_ids: list[str], session: Session
+) -> dict[str, tuple[list[AssetNameUri], dict[int, AssetNameUri]]]:
+    """
+    Batch-fetch required assets for multiple Dags in a single query.
+
+    Returns ``{dag_id: ([(name, uri), ...], {asset_id: (name, uri)})}``.
+    Dags with no references are still included with empty containers
+    so callers can index by ``dag_id`` without ``KeyError``.
+
+    Inactive (deactivated) assets are still included so list-route totals stay
+    symmetric with the detail-route response; the per-asset ``asset_inactive``
+    flag (detail route only) surfaces the freeze state.
+    """
+    rows = session.execute(
+        select(
+            DagScheduleAssetReference.dag_id,
+            AssetModel.id,
+            AssetModel.name,
+            AssetModel.uri,
+        )
+        .join(DagScheduleAssetReference, DagScheduleAssetReference.asset_id == 
AssetModel.id)
+        .where(DagScheduleAssetReference.dag_id.in_(dag_ids))
+    ).all()
+    result: dict[str, tuple[list[AssetNameUri], dict[int, AssetNameUri]]] = {
+        dag_id: ([], {}) for dag_id in dag_ids
+    }
+    for row in rows:
+        info, id_to_info = result[row.dag_id]
+        info.append((row.name, row.uri))
+        id_to_info[row.id] = (row.name, row.uri)
+    return result
+
+
+class _RollupResolution(NamedTuple):
+    """
+    Outcome of resolving an asset's upstream-key requirement for one partition 
key.
+
+    Three states, distinguished so callers can match the scheduler's
+    ``_resolve_asset_partition_status`` semantics:
+
+    - ``keys`` is a ``frozenset`` and ``mapper_failed`` is ``False``: rollup
+      asset, mapper succeeded — use ``keys`` as the required set.
+    - ``keys`` is ``None`` and ``mapper_failed`` is ``False``: not a rollup
+      asset — a single received event satisfies it.
+    - ``keys`` is ``None`` and ``mapper_failed`` is ``True``: rollup asset
+      whose mapper raised — the scheduler treats it as not-yet-satisfied; the
+      UI must not credit any received event either, otherwise progress would
+      silently show "ready" for a run the scheduler will never fire.
+    """
+
+    keys: frozenset[str] | None = None
+    mapper_failed: bool = False
+
+
+def _resolve_rollup_status(
+    dag_model: DagModel | None,
+    rollup_timetable: PartitionedAssetTimetable | None,
+    name: str,
+    uri: str,
+    partition_key: str,
+) -> _RollupResolution:
+    """
+    Resolve the rollup state for *(name, uri)* under the given partition key.
+
+    The ``dag_model is None`` / ``rollup_timetable is None`` cases 
short-circuit
+    to "not rollup" because there is nothing to evaluate against, not because
+    the asset is mis-configured.
+    """
+    if dag_model is None or rollup_timetable is None or not 
dag_model.is_rollup_asset(name=name, uri=uri):
+        return _RollupResolution()
+    try:
+        mapper = rollup_timetable.get_partition_mapper(name=name, uri=uri)
+        return _RollupResolution(keys=frozenset(cast("RollupMapper", 
mapper).to_upstream(partition_key)))
+    except Exception:
+        # Mismatch with the scheduler's rollup contract. The scheduler writes a
+        # Log row for the same condition (once per misconfig); this path is
+        # per-request and lighter.
+        log.warning(
+            "Failed to evaluate rollup mapper; treating asset as 
not-yet-satisfied",
+            dag_id=dag_model.dag_id,
+            asset_name=name,
+            asset_uri=uri,
+            partition_key=partition_key,
+            exc_info=True,
+        )
+        return _RollupResolution(mapper_failed=True)
+
+
+def _compute_total_required(
+    dag_model: DagModel | None,
+    rollup_timetable: PartitionedAssetTimetable | None,
+    asset_info: list[AssetNameUri],
+    partition_key: str,
+) -> int:
+    """
+    Sum required upstream events across all assets, using to_upstream for 
rollup mappers.
+
+    Non-rollup assets and broken-mapper assets both count as 1: non-rollup 
needs
+    one event to satisfy, broken-mapper counts as 1 unit of "blocked" so the
+    asset still contributes to the totals (received side credits 0, keeping the
+    progress short of "ready" as the scheduler intends).
+    """
+    total = 0
+    for name, uri in asset_info:
+        res = _resolve_rollup_status(dag_model, rollup_timetable, name, uri, 
partition_key)

Review Comment:
   New `_build_asset_resolutions` resolves each `(name, uri, partition_key)` 
once per row; both `_compute_total_required` and `_compute_received_count` 
consume the shared dict.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to