kaxil commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3317834411


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1863,40 +1882,233 @@ def _do_scheduling(self, session: Session) -> int:
 
         return num_queued_tis
 
+    def _check_rollup_asset_status(
+        self,
+        *,
+        asset_id: int,
+        apdr: AssetPartitionDagRun,
+        mapper: RollupMapper,
+        actual_by_asset: dict[int, set[str]],
+    ) -> bool:
+        expected = mapper.to_upstream(apdr.partition_key)
+        return expected.issubset(actual_by_asset.get(asset_id, set()))
+
+    def _resolve_asset_partition_status(
+        self,
+        *,
+        session: Session,
+        asset_id: int,
+        name: str,
+        uri: str,
+        apdr: AssetPartitionDagRun,
+        timetable: PartitionedAssetTimetable,
+        actual_by_asset: dict[int, set[str]],
+    ) -> bool:
+        """
+        Return whether *asset_id* has been satisfied for *apdr*.
+
+        Non-rollup assets resolve to ``True`` because the caller only invokes
+        this for assets that already have at least one logged event for *APDR*
+        (see :class:`~airflow.models.asset.PartitionedAssetKeyLog`), which is
+        the non-rollup contract for "received". Rollup assets defer to
+        :meth:`_check_rollup_asset_status` for the upstream-window check.
+
+        A misconfigured mapper that raises returns ``False`` (treated as
+        not-yet-satisfied) and an audit log entry is written so the operator
+        can see why the Dag run is being held in the UI.
+        """
+        try:
+            mapper = timetable.get_partition_mapper(name=name, uri=uri)
+            if not mapper.is_rollup:
+                return True
+            return self._check_rollup_asset_status(
+                asset_id=asset_id,
+                apdr=apdr,
+                mapper=cast("RollupMapper", mapper),
+                actual_by_asset=actual_by_asset,
+            )
+        except Exception as err:
+            self.log.exception(
+                "Failed to evaluate rollup status for asset; treating as 
not-yet-satisfied. "
+                "This likely indicates a misconfigured partition mapper.",
+                dag_id=apdr.target_dag_id,
+                partition_key=apdr.partition_key,
+                asset_name=name,
+                asset_uri=uri,
+            )
+            audit_key = (apdr.target_dag_id, name, uri)
+            if audit_key not in self._partition_audit_seen:
+                # The audit Log row is committed on its own session so it
+                # survives even when the outer ``_create_dagruns_for_dags``
+                # transaction is rolled back (the caller is wrapped in
+                # ``@retry_db_transaction``, and a downstream 
``OperationalError``
+                # or scheduler crash mid-tick would otherwise drop the row 
while
+                # the in-memory set still suppressed the next attempt).
+                # The dedup-set update is gated on the independent commit
+                # succeeding; a transient DB failure on the audit session 
leaves
+                # the key unmarked so the next tick retries.
+                if self._record_partition_audit_log(apdr=apdr, name=name, 
uri=uri, err=err):
+                    self._partition_audit_seen.add(audit_key)
+            return False
+
+    def _record_partition_audit_log(
+        self,
+        *,
+        apdr: AssetPartitionDagRun,
+        name: str,
+        uri: str,
+        err: BaseException,
+    ) -> bool:
+        """
+        Persist a misconfigured-rollup audit Log row on an independent session.
+
+        Returns ``True`` on commit success, ``False`` when the write fails.
+        Failures are logged and swallowed: the audit row is advisory (the
+        warning above already captures the same information for operators
+        reading scheduler logs), and propagating a Log-table failure would
+        taint the scheduler tick.
+
+        APDR attributes are captured into locals before opening the audit
+        session so the new session never has to lazy-load through an instance
+        attached to the outer session.
+        """
+        target_dag_id = apdr.target_dag_id
+        extra = (
+            "Could not evaluate rollup status for partition_key "
+            f"'{apdr.partition_key}' on asset (name='{name}', uri='{uri}') "
+            f"in target Dag '{target_dag_id}'. This likely indicates "
+            "that the rollup mapper is misconfigured or does not support "
+            f"this partition key.\n{type(err).__name__}: {err}"
+        )
+        try:
+            # ``scoped=False`` so this really is a separate connection / 
session
+            # rather than the thread-scoped one shared with the outer
+            # transaction — otherwise an inner commit would close the outer
+            # session's state too.
+            with create_session(scoped=False) as audit_session:
+                audit_session.add(
+                    Log(
+                        event="failed to evaluate rollup status",
+                        dag_id=target_dag_id,
+                        extra=extra,
+                    )
+                )
+        except Exception:
+            self.log.warning(
+                "Failed to write audit Log row for misconfigured rollup 
mapper",
+                dag_id=target_dag_id,
+                asset_name=name,
+                asset_uri=uri,
+                exc_info=True,
+            )
+            return False
+        return True
+
     def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> 
set[str]:
+        """
+        Create Dag runs for pending :class:`AssetPartitionDagRun` rows whose 
partition is satisfied.
+
+        Returns the set of ``dag_id`` strings that received a new 
partition-driven Dag run in this
+        tick. The caller (:meth:`_create_dagruns_for_dags`) uses this set to 
exclude the same Dags
+        from the standard schedule-driven and asset-triggered creation paths 
so a single Dag never
+        gets two Dag runs for the same tick when it appears in more than one 
creation path. We
+        return ``dag_id`` strings rather than full Dag/DagRun objects because 
the only downstream
+        use is membership lookup, and a heavier return type would just be 
discarded.
+
+        Asset deactivation freezes pending APDRs: when an asset becomes 
inactive
+        (orphan — no Dag declares it any more), its ``PartitionedAssetKeyLog`` 
rows
+        stop contributing to the rollup. If the consumer Dag still depends on 
that
+        asset, firing on stale history would conflict with the declared 
topology,
+        so the APDR waits. Reactivating the asset resumes evaluation 
automatically.
+        This matches the UI's progress view (``_fetch_active_assets_per_dag``).
+        """
+        # Cap per-tick work so the scheduler transaction stays bounded and 
other
+        # scheduling work isn't starved. Remaining APDRs drain across 
subsequent ticks.
+        # FIFO is intentional: the oldest pending APDR fires first. A 
persistently
+        # unsatisfiable APDR at the head (e.g. broken mapper, upstream that 
will
+        # never arrive) blocks newer ones until an operator removes it or fixes
+        # the underlying mapper. We surface the stuck state rather than 
silently
+        # rotating past it.
+        pending_apdrs = session.scalars(
+            select(AssetPartitionDagRun)
+            .join(DagModel, DagModel.dag_id == 
AssetPartitionDagRun.target_dag_id)
+            .where(
+                AssetPartitionDagRun.created_dag_run_id.is_(None),
+                DagModel.is_stale.is_(False),
+            )
+            .order_by(AssetPartitionDagRun.created_at)

Review Comment:
   No row lock or atomic claim on the pending-APDR select, so in HA two 
schedulers can both grab the same satisfied APDR, both create asset-triggered 
DagRuns (different `run_id`s), then race the `created_dag_run_id` UPDATE. 
Whichever update lands last leaves the earlier DagRun orphaned -- still running 
on the cluster, no APDR row pointing at it.
   
   Tangentially, `order_by(AssetPartitionDagRun.created_at)` has no tiebreaker 
-- two rows with identical `created_at` (which can happen under bulk 
asset-event ingestion) can land in different orders between scheduler 
instances. The two schedulers then disagree on which APDRs the LIMIT picks even 
before the lock issue kicks in.
   
   Easiest path is `.with_for_update(skip_locked=True)` + `.order_by(..., 
AssetPartitionDagRun.id)`. An atomic `UPDATE ... WHERE created_dag_run_id IS 
NULL RETURNING` would also work and might be cheaper on PG -- haven't measured.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1863,40 +1882,233 @@ def _do_scheduling(self, session: Session) -> int:
 
         return num_queued_tis
 
+    def _check_rollup_asset_status(
+        self,
+        *,
+        asset_id: int,
+        apdr: AssetPartitionDagRun,
+        mapper: RollupMapper,
+        actual_by_asset: dict[int, set[str]],
+    ) -> bool:
+        expected = mapper.to_upstream(apdr.partition_key)
+        return expected.issubset(actual_by_asset.get(asset_id, set()))
+
+    def _resolve_asset_partition_status(
+        self,
+        *,
+        session: Session,
+        asset_id: int,
+        name: str,
+        uri: str,
+        apdr: AssetPartitionDagRun,
+        timetable: PartitionedAssetTimetable,
+        actual_by_asset: dict[int, set[str]],
+    ) -> bool:
+        """
+        Return whether *asset_id* has been satisfied for *apdr*.
+
+        Non-rollup assets resolve to ``True`` because the caller only invokes
+        this for assets that already have at least one logged event for *APDR*
+        (see :class:`~airflow.models.asset.PartitionedAssetKeyLog`), which is
+        the non-rollup contract for "received". Rollup assets defer to
+        :meth:`_check_rollup_asset_status` for the upstream-window check.
+
+        A misconfigured mapper that raises returns ``False`` (treated as
+        not-yet-satisfied) and an audit log entry is written so the operator
+        can see why the Dag run is being held in the UI.
+        """
+        try:
+            mapper = timetable.get_partition_mapper(name=name, uri=uri)
+            if not mapper.is_rollup:
+                return True
+            return self._check_rollup_asset_status(
+                asset_id=asset_id,
+                apdr=apdr,
+                mapper=cast("RollupMapper", mapper),
+                actual_by_asset=actual_by_asset,
+            )
+        except Exception as err:
+            self.log.exception(
+                "Failed to evaluate rollup status for asset; treating as 
not-yet-satisfied. "
+                "This likely indicates a misconfigured partition mapper.",
+                dag_id=apdr.target_dag_id,
+                partition_key=apdr.partition_key,
+                asset_name=name,
+                asset_uri=uri,
+            )
+            audit_key = (apdr.target_dag_id, name, uri)
+            if audit_key not in self._partition_audit_seen:
+                # The audit Log row is committed on its own session so it
+                # survives even when the outer ``_create_dagruns_for_dags``
+                # transaction is rolled back (the caller is wrapped in
+                # ``@retry_db_transaction``, and a downstream 
``OperationalError``
+                # or scheduler crash mid-tick would otherwise drop the row 
while
+                # the in-memory set still suppressed the next attempt).
+                # The dedup-set update is gated on the independent commit
+                # succeeding; a transient DB failure on the audit session 
leaves
+                # the key unmarked so the next tick retries.
+                if self._record_partition_audit_log(apdr=apdr, name=name, 
uri=uri, err=err):
+                    self._partition_audit_seen.add(audit_key)
+            return False
+
+    def _record_partition_audit_log(
+        self,
+        *,
+        apdr: AssetPartitionDagRun,
+        name: str,
+        uri: str,
+        err: BaseException,
+    ) -> bool:
+        """
+        Persist a misconfigured-rollup audit Log row on an independent session.
+
+        Returns ``True`` on commit success, ``False`` when the write fails.
+        Failures are logged and swallowed: the audit row is advisory (the
+        warning above already captures the same information for operators
+        reading scheduler logs), and propagating a Log-table failure would
+        taint the scheduler tick.
+
+        APDR attributes are captured into locals before opening the audit
+        session so the new session never has to lazy-load through an instance
+        attached to the outer session.
+        """
+        target_dag_id = apdr.target_dag_id
+        extra = (
+            "Could not evaluate rollup status for partition_key "
+            f"'{apdr.partition_key}' on asset (name='{name}', uri='{uri}') "
+            f"in target Dag '{target_dag_id}'. This likely indicates "
+            "that the rollup mapper is misconfigured or does not support "
+            f"this partition key.\n{type(err).__name__}: {err}"
+        )
+        try:
+            # ``scoped=False`` so this really is a separate connection / 
session
+            # rather than the thread-scoped one shared with the outer
+            # transaction — otherwise an inner commit would close the outer
+            # session's state too.
+            with create_session(scoped=False) as audit_session:
+                audit_session.add(
+                    Log(
+                        event="failed to evaluate rollup status",
+                        dag_id=target_dag_id,
+                        extra=extra,
+                    )
+                )
+        except Exception:
+            self.log.warning(
+                "Failed to write audit Log row for misconfigured rollup 
mapper",
+                dag_id=target_dag_id,
+                asset_name=name,
+                asset_uri=uri,
+                exc_info=True,
+            )
+            return False
+        return True
+
     def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> 
set[str]:
+        """
+        Create Dag runs for pending :class:`AssetPartitionDagRun` rows whose 
partition is satisfied.
+
+        Returns the set of ``dag_id`` strings that received a new 
partition-driven Dag run in this
+        tick. The caller (:meth:`_create_dagruns_for_dags`) uses this set to 
exclude the same Dags
+        from the standard schedule-driven and asset-triggered creation paths 
so a single Dag never
+        gets two Dag runs for the same tick when it appears in more than one 
creation path. We
+        return ``dag_id`` strings rather than full Dag/DagRun objects because 
the only downstream
+        use is membership lookup, and a heavier return type would just be 
discarded.
+
+        Asset deactivation freezes pending APDRs: when an asset becomes 
inactive
+        (orphan — no Dag declares it any more), its ``PartitionedAssetKeyLog`` 
rows
+        stop contributing to the rollup. If the consumer Dag still depends on 
that
+        asset, firing on stale history would conflict with the declared 
topology,
+        so the APDR waits. Reactivating the asset resumes evaluation 
automatically.
+        This matches the UI's progress view (``_fetch_active_assets_per_dag``).
+        """
+        # Cap per-tick work so the scheduler transaction stays bounded and 
other
+        # scheduling work isn't starved. Remaining APDRs drain across 
subsequent ticks.
+        # FIFO is intentional: the oldest pending APDR fires first. A 
persistently
+        # unsatisfiable APDR at the head (e.g. broken mapper, upstream that 
will
+        # never arrive) blocks newer ones until an operator removes it or fixes
+        # the underlying mapper. We surface the stuck state rather than 
silently
+        # rotating past it.
+        pending_apdrs = session.scalars(
+            select(AssetPartitionDagRun)
+            .join(DagModel, DagModel.dag_id == 
AssetPartitionDagRun.target_dag_id)
+            .where(
+                AssetPartitionDagRun.created_dag_run_id.is_(None),
+                DagModel.is_stale.is_(False),
+            )
+            .order_by(AssetPartitionDagRun.created_at)
+            .limit(self._max_partition_dag_runs_per_loop)

Review Comment:
   The comment above acknowledges intra-Dag head-blocking, but the LIMIT here 
is global across all `dag_id`s, not per-Dag -- which means DagA's stuck head 
can starve DagB. Say DagA has a broken mapper and a few hundred pending APDRs 
piled up at the head (oldest `created_at`); those rows consume the entire 
per-tick quota and DagB's freshly satisfied APDRs never get a turn until 
someone manually unclogs DagA.
   
   A per-Dag bound on the LIMIT (window function with 
`partition_by(target_dag_id)` + per-Dag cap) would fix it. Persisting some kind 
of `blocked_until` / try-counter so the stuck head can be skipped after N 
attempts would handle the operator-isn't-watching case too. Or maybe wait for 
#65921 / #66520 to land and rely on the operator escape hatch -- depends how 
acceptable the cross-Dag interference is in the meantime.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1863,40 +1882,233 @@ def _do_scheduling(self, session: Session) -> int:
 
         return num_queued_tis
 
+    def _check_rollup_asset_status(
+        self,
+        *,
+        asset_id: int,
+        apdr: AssetPartitionDagRun,
+        mapper: RollupMapper,
+        actual_by_asset: dict[int, set[str]],
+    ) -> bool:
+        expected = mapper.to_upstream(apdr.partition_key)
+        return expected.issubset(actual_by_asset.get(asset_id, set()))
+
+    def _resolve_asset_partition_status(
+        self,
+        *,
+        session: Session,
+        asset_id: int,
+        name: str,
+        uri: str,
+        apdr: AssetPartitionDagRun,
+        timetable: PartitionedAssetTimetable,
+        actual_by_asset: dict[int, set[str]],
+    ) -> bool:
+        """
+        Return whether *asset_id* has been satisfied for *apdr*.
+
+        Non-rollup assets resolve to ``True`` because the caller only invokes
+        this for assets that already have at least one logged event for *APDR*
+        (see :class:`~airflow.models.asset.PartitionedAssetKeyLog`), which is
+        the non-rollup contract for "received". Rollup assets defer to
+        :meth:`_check_rollup_asset_status` for the upstream-window check.
+
+        A misconfigured mapper that raises returns ``False`` (treated as
+        not-yet-satisfied) and an audit log entry is written so the operator
+        can see why the Dag run is being held in the UI.
+        """
+        try:
+            mapper = timetable.get_partition_mapper(name=name, uri=uri)
+            if not mapper.is_rollup:
+                return True
+            return self._check_rollup_asset_status(
+                asset_id=asset_id,
+                apdr=apdr,
+                mapper=cast("RollupMapper", mapper),
+                actual_by_asset=actual_by_asset,
+            )
+        except Exception as err:
+            self.log.exception(
+                "Failed to evaluate rollup status for asset; treating as 
not-yet-satisfied. "
+                "This likely indicates a misconfigured partition mapper.",
+                dag_id=apdr.target_dag_id,
+                partition_key=apdr.partition_key,
+                asset_name=name,
+                asset_uri=uri,
+            )
+            audit_key = (apdr.target_dag_id, name, uri)
+            if audit_key not in self._partition_audit_seen:
+                # The audit Log row is committed on its own session so it
+                # survives even when the outer ``_create_dagruns_for_dags``
+                # transaction is rolled back (the caller is wrapped in
+                # ``@retry_db_transaction``, and a downstream 
``OperationalError``
+                # or scheduler crash mid-tick would otherwise drop the row 
while
+                # the in-memory set still suppressed the next attempt).
+                # The dedup-set update is gated on the independent commit
+                # succeeding; a transient DB failure on the audit session 
leaves
+                # the key unmarked so the next tick retries.
+                if self._record_partition_audit_log(apdr=apdr, name=name, 
uri=uri, err=err):
+                    self._partition_audit_seen.add(audit_key)
+            return False
+
+    def _record_partition_audit_log(
+        self,
+        *,
+        apdr: AssetPartitionDagRun,
+        name: str,
+        uri: str,
+        err: BaseException,
+    ) -> bool:
+        """
+        Persist a misconfigured-rollup audit Log row on an independent session.
+
+        Returns ``True`` on commit success, ``False`` when the write fails.
+        Failures are logged and swallowed: the audit row is advisory (the
+        warning above already captures the same information for operators
+        reading scheduler logs), and propagating a Log-table failure would
+        taint the scheduler tick.
+
+        APDR attributes are captured into locals before opening the audit
+        session so the new session never has to lazy-load through an instance
+        attached to the outer session.
+        """
+        target_dag_id = apdr.target_dag_id
+        extra = (
+            "Could not evaluate rollup status for partition_key "
+            f"'{apdr.partition_key}' on asset (name='{name}', uri='{uri}') "
+            f"in target Dag '{target_dag_id}'. This likely indicates "
+            "that the rollup mapper is misconfigured or does not support "
+            f"this partition key.\n{type(err).__name__}: {err}"
+        )
+        try:
+            # ``scoped=False`` so this really is a separate connection / 
session
+            # rather than the thread-scoped one shared with the outer
+            # transaction — otherwise an inner commit would close the outer
+            # session's state too.
+            with create_session(scoped=False) as audit_session:
+                audit_session.add(
+                    Log(
+                        event="failed to evaluate rollup status",
+                        dag_id=target_dag_id,
+                        extra=extra,
+                    )
+                )
+        except Exception:
+            self.log.warning(
+                "Failed to write audit Log row for misconfigured rollup 
mapper",
+                dag_id=target_dag_id,
+                asset_name=name,
+                asset_uri=uri,
+                exc_info=True,
+            )
+            return False
+        return True
+
     def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> 
set[str]:
+        """
+        Create Dag runs for pending :class:`AssetPartitionDagRun` rows whose 
partition is satisfied.
+
+        Returns the set of ``dag_id`` strings that received a new 
partition-driven Dag run in this
+        tick. The caller (:meth:`_create_dagruns_for_dags`) uses this set to 
exclude the same Dags
+        from the standard schedule-driven and asset-triggered creation paths 
so a single Dag never
+        gets two Dag runs for the same tick when it appears in more than one 
creation path. We
+        return ``dag_id`` strings rather than full Dag/DagRun objects because 
the only downstream
+        use is membership lookup, and a heavier return type would just be 
discarded.
+
+        Asset deactivation freezes pending APDRs: when an asset becomes 
inactive
+        (orphan — no Dag declares it any more), its ``PartitionedAssetKeyLog`` 
rows
+        stop contributing to the rollup. If the consumer Dag still depends on 
that
+        asset, firing on stale history would conflict with the declared 
topology,
+        so the APDR waits. Reactivating the asset resumes evaluation 
automatically.
+        This matches the UI's progress view (``_fetch_active_assets_per_dag``).
+        """
+        # Cap per-tick work so the scheduler transaction stays bounded and 
other
+        # scheduling work isn't starved. Remaining APDRs drain across 
subsequent ticks.
+        # FIFO is intentional: the oldest pending APDR fires first. A 
persistently
+        # unsatisfiable APDR at the head (e.g. broken mapper, upstream that 
will
+        # never arrive) blocks newer ones until an operator removes it or fixes
+        # the underlying mapper. We surface the stuck state rather than 
silently
+        # rotating past it.
+        pending_apdrs = session.scalars(
+            select(AssetPartitionDagRun)
+            .join(DagModel, DagModel.dag_id == 
AssetPartitionDagRun.target_dag_id)
+            .where(
+                AssetPartitionDagRun.created_dag_run_id.is_(None),
+                DagModel.is_stale.is_(False),
+            )
+            .order_by(AssetPartitionDagRun.created_at)
+            .limit(self._max_partition_dag_runs_per_loop)
+        ).all()
+        if not pending_apdrs:
+            return set()
+
         partition_dag_ids: set[str] = set()
+        pending_apdr_ids = [apdr.id for apdr in pending_apdrs]
 
-        evaluator = AssetEvaluator(session)
-        for apdr in session.scalars(
-            
select(AssetPartitionDagRun).where(AssetPartitionDagRun.created_dag_run_id.is_(None))
+        # Pre-fetch all required serialized Dags in one query.
+        dag_ids = list({apdr.target_dag_id for apdr in pending_apdrs})
+        # {"dag_id": Serialized Dag}
+        serialized_dags: dict[str, SerializedDAG] = {}
+        for serdag in 
SerializedDagModel.get_latest_serialized_dags(dag_ids=dag_ids, session=session):

Review Comment:
   The `get_latest_serialized_dags` call here rehydrates whichever Dag version 
is current at evaluation time, not the version that was current when the APDR 
row was created. APDR and `PartitionedAssetKeyLog` rows store target/source 
keys but no pointer to the mapper or window contract that emitted them.
   
   Bites with AIP-66 versioning. If someone switches 
`RollupMapper(window=DayWindow())` to `IdentityMapper` after 12 of 24 hourly 
events have arrived -- the APDR was queued under "need all 24" but now 
evaluates under "any 1 satisfies" and fires immediately on the partial rollup. 
The other direction is uglier: `IdentityMapper` -> 
`RollupMapper(window=DayWindow())` after the single upstream arrived, APDR 
queued under "any 1" but now needs 24 that will never come.
   
   Pinning the contract at APDR creation would fix it. Either store the 
required upstream key set directly on the APDR (simple, denormalized), or stamp 
a `dag_version_id` and resolve the matching serialized version at evaluation. 
Not sure which is preferable -- the key set means you can evaluate without 
re-deserializing the Dag at all, but it locks the row to one version's mapper 
output.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py:
##########
@@ -38,23 +45,185 @@
 )
 from airflow.models import DagModel
 from airflow.models.asset import (
+    AssetActive,
     AssetModel,
     AssetPartitionDagRun,
     DagScheduleAssetReference,
     PartitionedAssetKeyLog,
 )
 from airflow.models.dagrun import DagRun
 
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Session
+
+    from airflow.partition_mappers.base import RollupMapper
+    from airflow.timetables.simple import PartitionedAssetTimetable
+
+
+log = structlog.get_logger(logger_name=__name__)
+
+
+AssetNameUri: TypeAlias = tuple[str, str]
+"""A ``(name, uri)`` pair identifying an asset."""
+
+
+def _fetch_active_assets_per_dag(
+    dag_ids: list[str], session: Session
+) -> dict[str, tuple[list[AssetNameUri], dict[int, AssetNameUri]]]:
+    """
+    Batch-fetch required assets for multiple Dags in a single query.
+
+    Returns ``{dag_id: ([(name, uri), ...], {asset_id: (name, uri)})}``.
+    Dags with no references are still included with empty containers
+    so callers can index by ``dag_id`` without ``KeyError``.
+
+    Inactive (deactivated) assets are still included so list-route totals stay
+    symmetric with the detail-route response; the per-asset ``asset_inactive``
+    flag (detail route only) surfaces the freeze state.
+    """
+    rows = session.execute(
+        select(
+            DagScheduleAssetReference.dag_id,
+            AssetModel.id,
+            AssetModel.name,
+            AssetModel.uri,
+        )
+        .join(DagScheduleAssetReference, DagScheduleAssetReference.asset_id == 
AssetModel.id)
+        .where(DagScheduleAssetReference.dag_id.in_(dag_ids))
+    ).all()
+    result: dict[str, tuple[list[AssetNameUri], dict[int, AssetNameUri]]] = {
+        dag_id: ([], {}) for dag_id in dag_ids
+    }
+    for row in rows:
+        info, id_to_info = result[row.dag_id]
+        info.append((row.name, row.uri))
+        id_to_info[row.id] = (row.name, row.uri)
+    return result
+
+
+class _RollupResolution(NamedTuple):
+    """
+    Outcome of resolving an asset's upstream-key requirement for one partition 
key.
+
+    Three states, distinguished so callers can match the scheduler's
+    ``_resolve_asset_partition_status`` semantics:
+
+    - ``keys`` is a ``frozenset`` and ``mapper_failed`` is ``False``: rollup
+      asset, mapper succeeded — use ``keys`` as the required set.
+    - ``keys`` is ``None`` and ``mapper_failed`` is ``False``: not a rollup
+      asset — a single received event satisfies it.
+    - ``keys`` is ``None`` and ``mapper_failed`` is ``True``: rollup asset
+      whose mapper raised — the scheduler treats it as not-yet-satisfied; the
+      UI must not credit any received event either, otherwise progress would
+      silently show "ready" for a run the scheduler will never fire.
+    """
+
+    keys: frozenset[str] | None = None
+    mapper_failed: bool = False
+
+
+def _resolve_rollup_status(
+    dag_model: DagModel | None,
+    rollup_timetable: PartitionedAssetTimetable | None,
+    name: str,
+    uri: str,
+    partition_key: str,
+) -> _RollupResolution:
+    """
+    Resolve the rollup state for *(name, uri)* under the given partition key.
+
+    The ``dag_model is None`` / ``rollup_timetable is None`` cases 
short-circuit
+    to "not rollup" because there is nothing to evaluate against, not because
+    the asset is mis-configured.
+    """
+    if dag_model is None or rollup_timetable is None or not 
dag_model.is_rollup_asset(name=name, uri=uri):
+        return _RollupResolution()
+    try:
+        mapper = rollup_timetable.get_partition_mapper(name=name, uri=uri)
+        return _RollupResolution(keys=frozenset(cast("RollupMapper", 
mapper).to_upstream(partition_key)))
+    except Exception:
+        # Mismatch with the scheduler's rollup contract. The scheduler writes a
+        # Log row for the same condition (once per misconfig); this path is
+        # per-request and lighter.
+        log.warning(
+            "Failed to evaluate rollup mapper; treating asset as 
not-yet-satisfied",
+            dag_id=dag_model.dag_id,
+            asset_name=name,
+            asset_uri=uri,
+            partition_key=partition_key,
+            exc_info=True,
+        )
+        return _RollupResolution(mapper_failed=True)
+
+
+def _compute_total_required(
+    dag_model: DagModel | None,
+    rollup_timetable: PartitionedAssetTimetable | None,
+    asset_info: list[AssetNameUri],
+    partition_key: str,
+) -> int:
+    """
+    Sum required upstream events across all assets, using to_upstream for 
rollup mappers.
+
+    Non-rollup assets and broken-mapper assets both count as 1: non-rollup 
needs
+    one event to satisfy, broken-mapper counts as 1 unit of "blocked" so the
+    asset still contributes to the totals (received side credits 0, keeping the
+    progress short of "ready" as the scheduler intends).
+    """
+    total = 0
+    for name, uri in asset_info:
+        res = _resolve_rollup_status(dag_model, rollup_timetable, name, uri, 
partition_key)

Review Comment:
   `_compute_total_required` (this loop) and `_compute_received_count` below 
both run the same `_resolve_rollup_status(name, uri, partition_key)` per asset 
per row -- so the same `(dag_id, partition_key, name, uri)` resolves twice in a 
row. Each call hits `get_partition_mapper` then builds a `frozenset` from 
`to_upstream(partition_key)` (DayWindow yields 24, MonthWindow 12, HourWindow 
60).
   
   On a busy page the duplication piles up. Worst-ish case I can think of -- 
500-row page on a Dag with ~5 rollup assets -- is around ~5k mapper calls per 
request, half pure duplication. Probably worth hoisting a per-row `dict[(name, 
uri), _RollupResolution]` and passing it into both helpers.
   
   The scheduler does the same shape in 
`_create_dagruns_for_partitioned_asset_dags` (one `get_partition_mapper` per 
(apdr, asset)), but it's bounded by `_max_partition_dag_runs_per_loop` so the 
cost is smaller there.



##########
airflow-core/src/airflow/api_fastapi/common/partition_helpers.py:
##########
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import structlog
+
+from airflow.exceptions import DeserializationError
+from airflow.models.serialized_dag import SerializedDagModel
+from airflow.timetables.simple import PartitionedAssetTimetable
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Session
+
+
+log = structlog.get_logger(logger_name=__name__)
+
+
+def _extract_partitioned_timetable(serdag: SerializedDagModel) -> 
PartitionedAssetTimetable | None:
+    """Return the ``PartitionedAssetTimetable`` carried by *serdag*, or 
``None``."""
+    try:
+        timetable = serdag.dag.timetable
+    except (KeyError, ValueError, ImportError, AttributeError, 
DeserializationError):

Review Comment:
   Two things off here. The catch list grabs `KeyError` and `AttributeError` 
alongside `DeserializationError`, but those two are exactly the classes that 
mask refactor bugs (renamed attr on `serdag.dag`, missing dict key) -- the 
opposite of what the comment above says it's doing. A bad refactor would just 
degrade to "non-partitioned" with a log line and we'd not notice.
   
   And it's missing `TypeError`, which the core `RollupMapper.__init__` now 
throws for mis-paired upstream/window (see the comment I left on 
`task-sdk/.../partition_mappers/base.py:49`). So a misconfigured rollup Dag 
500s every UI call through `load_partitioned_timetable` rather than degrading.
   
   Maybe just `(DeserializationError, TypeError)`?



##########
task-sdk/src/airflow/sdk/definitions/partition_mappers/base.py:
##########
@@ -16,10 +16,34 @@
 # under the License.
 from __future__ import annotations
 
+from typing import TYPE_CHECKING, ClassVar
+
+if TYPE_CHECKING:
+    from airflow.sdk.definitions.partition_mappers.window import Window
+
 
 class PartitionMapper:
     """
     Base partition mapper class.
 
-    Maps keys from asset events to target dag run partitions.
+    Maps keys from asset events to target Dag run partitions.
+    """
+
+    is_rollup: ClassVar[bool] = False
+
+
+class RollupMapper(PartitionMapper):
+    """
+    Partition mapper that rolls up many upstream keys into one downstream key.
+
+    Compose a ``upstream_mapper`` (which normalizes each upstream key to the
+    downstream granularity) with a ``window`` that declares the full set of
+    upstream keys required for a given downstream key. The scheduler holds
+    the Dag run until every upstream key in the window has arrived.
     """
+
+    is_rollup: ClassVar[bool] = True
+
+    def __init__(self, *, upstream_mapper: PartitionMapper, window: Window) -> 
None:
+        self.upstream_mapper = upstream_mapper
+        self.window = window

Review Comment:
   The SDK constructor here just stores `upstream_mapper` and `window`. The 
core constructor at `airflow-core/.../partition_mappers/base.py:115` does the 
actual validation -- `decode_downstream` override check, plus the 
`window.expected_decoded_type` check that catches 
`RollupMapper(upstream_mapper=IdentityMapper(), window=DayWindow())`.
   
   Problem is, user code only ever touches the SDK class (`from airflow.sdk 
import RollupMapper`). So a mis-paired upstream/window slips through DAG parse 
with no error. The core `TypeError` only fires once the scheduler deserializes 
-- where it gets swallowed by the bare `except Exception` in 
`_create_dagruns_for_partitioned_asset_dags` and shows up as "Failed to 
deserialize Dag" spam every tick. The author has no idea what's wrong.
   
   Could just mirror the same check in the SDK constructor. Bit of a chore 
though -- `expected_decoded_type` is a `ClassVar` on the core `Window` but not 
on the SDK one, so it'd need adding to the SDK `Window` ABC and all 6 
subclasses too.



##########
airflow-core/src/airflow/ui/src/components/AssetProgressCell.tsx:
##########
@@ -38,12 +37,17 @@ export const AssetProgressCell = ({ dagId, partitionKey, 
totalReceived, totalReq
   const assetExpression = data?.asset_expression as ExpressionType | undefined;

Review Comment:
   This `as ExpressionType` cast is here (and at 
`pages/DagsList/AssetSchedule.tsx:331`) because the generated OpenAPI types 
declare `asset_expression` as `{ [k: string]: unknown } | null` -- the Python 
annotation is just `dict | None`, no shape. The `ExpressionType` discriminated 
union is hand-written in `src/components/AssetExpression/` with no runtime 
check, so the consumers are basically just trusting the server.
   
   If the asset-expression shape ever changes server-side, the TS build won't 
catch it -- you'd find out via a blank render or a discriminator-missing crash 
at runtime. Worth pinning down the shape on the Python side (a Pydantic model 
in `datamodels/ui/assets.py`, or a `oneOf` override on the OpenAPI schema) so 
`ExpressionType` ends up in `openapi-gen/queries/types.gen.ts` and the cast can 
go away on its own.



##########
airflow-core/newsfragments/64571.significant.rst:
##########
@@ -29,8 +29,33 @@ an hourly partition).
 
 Mappers can be set globally on a ``PartitionedAssetTimetable`` or overridden 
per upstream asset via ``partition_mapper_config``.
 
+**Rollup (one downstream run per window of upstream partitions)**:
+
+- ``RollupMapper`` — wraps a ``upstream_mapper`` (which normalises the 
upstream key to the

Review Comment:
   Newsfragment looks good. But the narrative doc side feels thin -- 
`airflow-core/docs/authoring-and-scheduling/assets.rst` § "Asset partitions" 
(lines 496-630) covers the non-rollup mappers but doesn't reach the rollup 
case. No subsection showing how to compose 
`RollupMapper(upstream_mapper=StartOfDayMapper(), window=DayWindow())` onto a 
`PartitionedAssetTimetable`, no "hourly -> daily summary" worked example, and 
the DST mitigation lives in the newsfragment but not anywhere a user is likely 
to read while building this.
   
   Autoapi entries cover the reference but they won't help someone landing on 
assets.rst trying to figure out if the feature exists. Either drop a section in 
here before the 3.3 freeze or open a follow-up doc PR -- not necessarily 
blocking but probably wants to ship in the same release.
   
   (Also, the existing partitions section is at 
[`authoring-and-scheduling/assets.rst#asset-partitions`](https://github.com/apache/airflow/blob/main/airflow-core/docs/authoring-and-scheduling/assets.rst)
 for context.)



##########
task-sdk/docs/api.rst:
##########
@@ -239,10 +239,29 @@ Partition Mapper
 
 .. autoapiclass:: airflow.sdk.StartOfYearMapper
 
+.. autoapiclass:: airflow.sdk.RollupMapper

Review Comment:
   Noticing the AIP-76 wiki and the code have drifted on naming -- the [AIP 
spec](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=311626969)
 uses `PartitionByInterval` / `PartitionBySequence` / `PartitionByProduct` / 
`PartitionAtRuntime`, but the shipped surface here is `CronPartitionTimetable` 
/ `PartitionedAssetTimetable` / `StartOfDayMapper` / `RollupMapper` / 
`ProductMapper` / `AllowedKeyMapper` / `ChainMapper`.
   
   Not a code concern but external readers landing on the AIP and searching for 
`PartitionByInterval` won't find anything in `airflow.sdk`. The May 2026 
management deck even has a typo carried over (`CronPartitionedTimetable` -- 
doesn't exist). Probably worth a follow-up to either refresh the AIP wiki 
against the shipped names + amendment, or add a mapping note somewhere visible. 
cc TP, Wei since they're owning this AIP.



##########
airflow-core/src/airflow/partition_mappers/window.py:
##########
@@ -0,0 +1,181 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from datetime import datetime, timedelta
+from typing import TYPE_CHECKING, Any, ClassVar
+
+if TYPE_CHECKING:
+    from collections.abc import Iterable
+
+
+def _require_day_one(dt: datetime, window_cls: type) -> None:
+    """
+    Raise ``ValueError`` if *dt* is not on day 1 of its month.
+
+    Month-aligned windows expand by month-step arithmetic, which is only
+    safe when the period starts on day 1 (otherwise e.g.
+    ``replace(month=feb)`` on Jan 31 raises ``ValueError: day is out of
+    range for month``). Built-in temporal upstream mappers normalise to
+    day 1, but a custom ``PartitionMapper.decode_downstream`` could return
+    any day — checking here turns a confusing scheduler-tick crash into
+    an explicit signal about the upstream-mapper contract.
+    """
+    if dt.day != 1:
+        raise ValueError(
+            f"{window_cls.__name__} expects a period start on day 1 of the 
month, "
+            f"got {dt.isoformat()}. The paired upstream mapper's 
decode_downstream "
+            "must normalise to day 1."
+        )
+
+
+def _shift_months(dt: datetime, months: int) -> datetime:
+    """
+    Return *dt* shifted forward by *months*, wrapping the year as needed.
+
+    Caller is responsible for ensuring *dt* is on day 1 (see
+    :func:`_require_day_one`) so that ``replace(month=...)`` is always valid.
+    """
+    total = dt.month - 1 + months
+    return dt.replace(year=dt.year + total // 12, month=total % 12 + 1)
+
+
+class Window(ABC):

Review Comment:
   Question on scope -- the [AIP-76 
spec](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=311626969)
 under "Complex Schedule-Partition Mapping" specifically cites "a 
hourly-materialised asset that modifies data for the past **two** hours (the 
new hour, and the hour before it that was also covered by the previous 
partition -- a.k.a. a slicing window)" as the motivating example for the 
pluggable `Window` interface. The 6 built-ins shipped here are all contiguous 
full-period (Hour/Day/Week/Month/Quarter/Year). A rolling 7-day window every 
day, or the past-two-hours sliding case the AIP names, can't be expressed by 
subclassing this ABC -- `to_upstream` returns the full set for one downstream 
period, with no notion of overlap.
   
   Is sliding deliberate follow-up (3.4?) or in-scope for 3.3 alongside 
`FanOutMapper` (#65654)? If it's follow-up, probably worth a sentence in the 
`Window` docstring so users don't go subclass it for sliding semantics and hit 
non-obvious limitations. If it's in-scope, worth coordinating the API shape now 
before this lands.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py:
##########
@@ -55,10 +69,15 @@ def next_run_assets(
     pending_partition_count: int | None = None
 
     queued_expr: ColumnElement[int]
-    if is_partitioned := dag_model.timetable_summary == "Partitioned Asset":
+    if is_partitioned := dag_model.timetable_partitioned:
         pending_partition_count = session.scalar(
-            select(func.count())
+            select(func.count(AssetPartitionDagRun.id.distinct()))
             .select_from(AssetPartitionDagRun)
+            .join(
+                DagScheduleAssetReference,
+                DagScheduleAssetReference.dag_id == 
AssetPartitionDagRun.target_dag_id,
+            )
+            .join(AssetModel, AssetModel.id == 
DagScheduleAssetReference.asset_id)

Review Comment:
   The joins to `DagScheduleAssetReference` and `AssetModel` aren't referenced 
anywhere in the `WHERE` clause, so `count(AssetPartitionDagRun.id.distinct())` 
ends up collapsing a cartesian product. For a Dag with N 
`DagScheduleAssetReference` rows and M pending APDRs that's N*M rows scanned to 
produce a count that
   
   ```python
   select(func.count())
       .select_from(AssetPartitionDagRun)
       .where(
           AssetPartitionDagRun.target_dag_id == dag_id,
           AssetPartitionDagRun.created_dag_run_id.is_(None),
       )
   ```
   
   would do by scanning M. This runs once per partitioned Dag inside 
`next_run_assets` which the UI calls on every home/Dags-list render. Both joins 
look droppable.



##########
airflow-core/src/airflow/ui/src/components/AssetExpression/AssetNode.tsx:
##########
@@ -16,47 +16,130 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-import { Box, Text, HStack } from "@chakra-ui/react";
-import { FiDatabase } from "react-icons/fi";
+import { Box, Button, HStack, Text, VStack } from "@chakra-ui/react";
+import { FiCheck, FiDatabase, FiMinus } from "react-icons/fi";
 import { PiRectangleDashed } from "react-icons/pi";
 
-import { RouterLink } from "src/components/ui";
+import type { NextRunAssetEventResponse } from "openapi/requests/types.gen";
+import { Popover, RouterLink } from "src/components/ui";
 
 import Time from "../Time";
-import type { AssetSummary, NextRunEvent } from "./types";
+import type { AssetSummary } from "./types";
+
+const RollupKeyChecklistPopover = ({

Review Comment:
   This `RollupKeyChecklistPopover` is a local `const` in this file -- not 
exported. The same `Popover` + `VStack` + `FiCheck`/`FiMinus` block is 
reimplemented by hand in three other places:
   
   - `components/AssetProgressCell.tsx` (rebuilds the checklist as its popover 
body)
   - `pages/DagsList/AssetSchedule.tsx` -- twice, once for the single-asset 
branch and once for multi-asset, each redoing the whole Popover wrapper.
   
   So four copies of the same render logic. Hate to be that person but the next 
icon swap or a11y label tweak is going to land in three of them and miss the 
fourth. Worth exporting from this file? `AssetProgressCell` would need the body 
without the Popover (since it's already inside its own), so maybe split: a 
`RollupKeyChecklist` for the content + the existing wrapper around it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to