kaxil commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3317834411
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1863,40 +1882,233 @@ def _do_scheduling(self, session: Session) -> int:
return num_queued_tis
+ def _check_rollup_asset_status(
+ self,
+ *,
+ asset_id: int,
+ apdr: AssetPartitionDagRun,
+ mapper: RollupMapper,
+ actual_by_asset: dict[int, set[str]],
+ ) -> bool:
+ expected = mapper.to_upstream(apdr.partition_key)
+ return expected.issubset(actual_by_asset.get(asset_id, set()))
+
+ def _resolve_asset_partition_status(
+ self,
+ *,
+ session: Session,
+ asset_id: int,
+ name: str,
+ uri: str,
+ apdr: AssetPartitionDagRun,
+ timetable: PartitionedAssetTimetable,
+ actual_by_asset: dict[int, set[str]],
+ ) -> bool:
+ """
+ Return whether *asset_id* has been satisfied for *apdr*.
+
+ Non-rollup assets resolve to ``True`` because the caller only invokes
+ this for assets that already have at least one logged event for *APDR*
+ (see :class:`~airflow.models.asset.PartitionedAssetKeyLog`), which is
+ the non-rollup contract for "received". Rollup assets defer to
+ :meth:`_check_rollup_asset_status` for the upstream-window check.
+
+ A misconfigured mapper that raises returns ``False`` (treated as
+ not-yet-satisfied) and an audit log entry is written so the operator
+ can see why the Dag run is being held in the UI.
+ """
+ try:
+ mapper = timetable.get_partition_mapper(name=name, uri=uri)
+ if not mapper.is_rollup:
+ return True
+ return self._check_rollup_asset_status(
+ asset_id=asset_id,
+ apdr=apdr,
+ mapper=cast("RollupMapper", mapper),
+ actual_by_asset=actual_by_asset,
+ )
+ except Exception as err:
+ self.log.exception(
+ "Failed to evaluate rollup status for asset; treating as
not-yet-satisfied. "
+ "This likely indicates a misconfigured partition mapper.",
+ dag_id=apdr.target_dag_id,
+ partition_key=apdr.partition_key,
+ asset_name=name,
+ asset_uri=uri,
+ )
+ audit_key = (apdr.target_dag_id, name, uri)
+ if audit_key not in self._partition_audit_seen:
+ # The audit Log row is committed on its own session so it
+ # survives even when the outer ``_create_dagruns_for_dags``
+ # transaction is rolled back (the caller is wrapped in
+ # ``@retry_db_transaction``, and a downstream
``OperationalError``
+ # or scheduler crash mid-tick would otherwise drop the row
while
+ # the in-memory set still suppressed the next attempt).
+ # The dedup-set update is gated on the independent commit
+ # succeeding; a transient DB failure on the audit session
leaves
+ # the key unmarked so the next tick retries.
+ if self._record_partition_audit_log(apdr=apdr, name=name,
uri=uri, err=err):
+ self._partition_audit_seen.add(audit_key)
+ return False
+
+ def _record_partition_audit_log(
+ self,
+ *,
+ apdr: AssetPartitionDagRun,
+ name: str,
+ uri: str,
+ err: BaseException,
+ ) -> bool:
+ """
+ Persist a misconfigured-rollup audit Log row on an independent session.
+
+ Returns ``True`` on commit success, ``False`` when the write fails.
+ Failures are logged and swallowed: the audit row is advisory (the
+ warning above already captures the same information for operators
+ reading scheduler logs), and propagating a Log-table failure would
+ taint the scheduler tick.
+
+ APDR attributes are captured into locals before opening the audit
+ session so the new session never has to lazy-load through an instance
+ attached to the outer session.
+ """
+ target_dag_id = apdr.target_dag_id
+ extra = (
+ "Could not evaluate rollup status for partition_key "
+ f"'{apdr.partition_key}' on asset (name='{name}', uri='{uri}') "
+ f"in target Dag '{target_dag_id}'. This likely indicates "
+ "that the rollup mapper is misconfigured or does not support "
+ f"this partition key.\n{type(err).__name__}: {err}"
+ )
+ try:
+ # ``scoped=False`` so this really is a separate connection /
session
+ # rather than the thread-scoped one shared with the outer
+ # transaction — otherwise an inner commit would close the outer
+ # session's state too.
+ with create_session(scoped=False) as audit_session:
+ audit_session.add(
+ Log(
+ event="failed to evaluate rollup status",
+ dag_id=target_dag_id,
+ extra=extra,
+ )
+ )
+ except Exception:
+ self.log.warning(
+ "Failed to write audit Log row for misconfigured rollup
mapper",
+ dag_id=target_dag_id,
+ asset_name=name,
+ asset_uri=uri,
+ exc_info=True,
+ )
+ return False
+ return True
+
def _create_dagruns_for_partitioned_asset_dags(self, session: Session) ->
set[str]:
+ """
+ Create Dag runs for pending :class:`AssetPartitionDagRun` rows whose
partition is satisfied.
+
+ Returns the set of ``dag_id`` strings that received a new
partition-driven Dag run in this
+ tick. The caller (:meth:`_create_dagruns_for_dags`) uses this set to
exclude the same Dags
+ from the standard schedule-driven and asset-triggered creation paths
so a single Dag never
+ gets two Dag runs for the same tick when it appears in more than one
creation path. We
+ return ``dag_id`` strings rather than full Dag/DagRun objects because
the only downstream
+ use is membership lookup, and a heavier return type would just be
discarded.
+
+ Asset deactivation freezes pending APDRs: when an asset becomes
inactive
+ (orphan — no Dag declares it any more), its ``PartitionedAssetKeyLog``
rows
+ stop contributing to the rollup. If the consumer Dag still depends on
that
+ asset, firing on stale history would conflict with the declared
topology,
+ so the APDR waits. Reactivating the asset resumes evaluation
automatically.
+ This matches the UI's progress view (``_fetch_active_assets_per_dag``).
+ """
+ # Cap per-tick work so the scheduler transaction stays bounded and
other
+ # scheduling work isn't starved. Remaining APDRs drain across
subsequent ticks.
+ # FIFO is intentional: the oldest pending APDR fires first. A
persistently
+ # unsatisfiable APDR at the head (e.g. broken mapper, upstream that
will
+ # never arrive) blocks newer ones until an operator removes it or fixes
+ # the underlying mapper. We surface the stuck state rather than
silently
+ # rotating past it.
+ pending_apdrs = session.scalars(
+ select(AssetPartitionDagRun)
+ .join(DagModel, DagModel.dag_id ==
AssetPartitionDagRun.target_dag_id)
+ .where(
+ AssetPartitionDagRun.created_dag_run_id.is_(None),
+ DagModel.is_stale.is_(False),
+ )
+ .order_by(AssetPartitionDagRun.created_at)
Review Comment:
No row lock or atomic claim on the pending-APDR select, so in HA two
schedulers can both grab the same satisfied APDR, both create asset-triggered
DagRuns (different `run_id`s), then race the `created_dag_run_id` UPDATE.
Whichever update lands last leaves the earlier DagRun orphaned -- still running
on the cluster, no APDR row pointing at it.
Tangentially, `order_by(AssetPartitionDagRun.created_at)` has no tiebreaker
-- two rows with identical `created_at` (which can happen under bulk
asset-event ingestion) can land in different orders between scheduler
instances. The two schedulers then disagree on which APDRs the LIMIT picks even
before the lock issue kicks in.
Easiest path is `.with_for_update(skip_locked=True)` + `.order_by(...,
AssetPartitionDagRun.id)`. An atomic `UPDATE ... WHERE created_dag_run_id IS
NULL RETURNING` would also work and might be cheaper on PG -- haven't measured.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1863,40 +1882,233 @@ def _do_scheduling(self, session: Session) -> int:
return num_queued_tis
+ def _check_rollup_asset_status(
+ self,
+ *,
+ asset_id: int,
+ apdr: AssetPartitionDagRun,
+ mapper: RollupMapper,
+ actual_by_asset: dict[int, set[str]],
+ ) -> bool:
+ expected = mapper.to_upstream(apdr.partition_key)
+ return expected.issubset(actual_by_asset.get(asset_id, set()))
+
+ def _resolve_asset_partition_status(
+ self,
+ *,
+ session: Session,
+ asset_id: int,
+ name: str,
+ uri: str,
+ apdr: AssetPartitionDagRun,
+ timetable: PartitionedAssetTimetable,
+ actual_by_asset: dict[int, set[str]],
+ ) -> bool:
+ """
+ Return whether *asset_id* has been satisfied for *apdr*.
+
+ Non-rollup assets resolve to ``True`` because the caller only invokes
+ this for assets that already have at least one logged event for *APDR*
+ (see :class:`~airflow.models.asset.PartitionedAssetKeyLog`), which is
+ the non-rollup contract for "received". Rollup assets defer to
+ :meth:`_check_rollup_asset_status` for the upstream-window check.
+
+ A misconfigured mapper that raises returns ``False`` (treated as
+ not-yet-satisfied) and an audit log entry is written so the operator
+ can see why the Dag run is being held in the UI.
+ """
+ try:
+ mapper = timetable.get_partition_mapper(name=name, uri=uri)
+ if not mapper.is_rollup:
+ return True
+ return self._check_rollup_asset_status(
+ asset_id=asset_id,
+ apdr=apdr,
+ mapper=cast("RollupMapper", mapper),
+ actual_by_asset=actual_by_asset,
+ )
+ except Exception as err:
+ self.log.exception(
+ "Failed to evaluate rollup status for asset; treating as
not-yet-satisfied. "
+ "This likely indicates a misconfigured partition mapper.",
+ dag_id=apdr.target_dag_id,
+ partition_key=apdr.partition_key,
+ asset_name=name,
+ asset_uri=uri,
+ )
+ audit_key = (apdr.target_dag_id, name, uri)
+ if audit_key not in self._partition_audit_seen:
+ # The audit Log row is committed on its own session so it
+ # survives even when the outer ``_create_dagruns_for_dags``
+ # transaction is rolled back (the caller is wrapped in
+ # ``@retry_db_transaction``, and a downstream
``OperationalError``
+ # or scheduler crash mid-tick would otherwise drop the row
while
+ # the in-memory set still suppressed the next attempt).
+ # The dedup-set update is gated on the independent commit
+ # succeeding; a transient DB failure on the audit session
leaves
+ # the key unmarked so the next tick retries.
+ if self._record_partition_audit_log(apdr=apdr, name=name,
uri=uri, err=err):
+ self._partition_audit_seen.add(audit_key)
+ return False
+
+ def _record_partition_audit_log(
+ self,
+ *,
+ apdr: AssetPartitionDagRun,
+ name: str,
+ uri: str,
+ err: BaseException,
+ ) -> bool:
+ """
+ Persist a misconfigured-rollup audit Log row on an independent session.
+
+ Returns ``True`` on commit success, ``False`` when the write fails.
+ Failures are logged and swallowed: the audit row is advisory (the
+ warning above already captures the same information for operators
+ reading scheduler logs), and propagating a Log-table failure would
+ taint the scheduler tick.
+
+ APDR attributes are captured into locals before opening the audit
+ session so the new session never has to lazy-load through an instance
+ attached to the outer session.
+ """
+ target_dag_id = apdr.target_dag_id
+ extra = (
+ "Could not evaluate rollup status for partition_key "
+ f"'{apdr.partition_key}' on asset (name='{name}', uri='{uri}') "
+ f"in target Dag '{target_dag_id}'. This likely indicates "
+ "that the rollup mapper is misconfigured or does not support "
+ f"this partition key.\n{type(err).__name__}: {err}"
+ )
+ try:
+ # ``scoped=False`` so this really is a separate connection /
session
+ # rather than the thread-scoped one shared with the outer
+ # transaction — otherwise an inner commit would close the outer
+ # session's state too.
+ with create_session(scoped=False) as audit_session:
+ audit_session.add(
+ Log(
+ event="failed to evaluate rollup status",
+ dag_id=target_dag_id,
+ extra=extra,
+ )
+ )
+ except Exception:
+ self.log.warning(
+ "Failed to write audit Log row for misconfigured rollup
mapper",
+ dag_id=target_dag_id,
+ asset_name=name,
+ asset_uri=uri,
+ exc_info=True,
+ )
+ return False
+ return True
+
def _create_dagruns_for_partitioned_asset_dags(self, session: Session) ->
set[str]:
+ """
+ Create Dag runs for pending :class:`AssetPartitionDagRun` rows whose
partition is satisfied.
+
+ Returns the set of ``dag_id`` strings that received a new
partition-driven Dag run in this
+ tick. The caller (:meth:`_create_dagruns_for_dags`) uses this set to
exclude the same Dags
+ from the standard schedule-driven and asset-triggered creation paths
so a single Dag never
+ gets two Dag runs for the same tick when it appears in more than one
creation path. We
+ return ``dag_id`` strings rather than full Dag/DagRun objects because
the only downstream
+ use is membership lookup, and a heavier return type would just be
discarded.
+
+ Asset deactivation freezes pending APDRs: when an asset becomes
inactive
+ (orphan — no Dag declares it any more), its ``PartitionedAssetKeyLog``
rows
+ stop contributing to the rollup. If the consumer Dag still depends on
that
+ asset, firing on stale history would conflict with the declared
topology,
+ so the APDR waits. Reactivating the asset resumes evaluation
automatically.
+ This matches the UI's progress view (``_fetch_active_assets_per_dag``).
+ """
+ # Cap per-tick work so the scheduler transaction stays bounded and
other
+ # scheduling work isn't starved. Remaining APDRs drain across
subsequent ticks.
+ # FIFO is intentional: the oldest pending APDR fires first. A
persistently
+ # unsatisfiable APDR at the head (e.g. broken mapper, upstream that
will
+ # never arrive) blocks newer ones until an operator removes it or fixes
+ # the underlying mapper. We surface the stuck state rather than
silently
+ # rotating past it.
+ pending_apdrs = session.scalars(
+ select(AssetPartitionDagRun)
+ .join(DagModel, DagModel.dag_id ==
AssetPartitionDagRun.target_dag_id)
+ .where(
+ AssetPartitionDagRun.created_dag_run_id.is_(None),
+ DagModel.is_stale.is_(False),
+ )
+ .order_by(AssetPartitionDagRun.created_at)
+ .limit(self._max_partition_dag_runs_per_loop)
Review Comment:
The comment above acknowledges intra-Dag head-blocking, but the LIMIT here
is global across all `dag_id`s, not per-Dag -- which means DagA's stuck head
can starve DagB. Say DagA has a broken mapper and a few hundred pending APDRs
piled up at the head (oldest `created_at`); those rows consume the entire
per-tick quota and DagB's freshly satisfied APDRs never get a turn until
someone manually unclogs DagA.
A per-Dag bound on the LIMIT (window function with
`partition_by(target_dag_id)` + per-Dag cap) would fix it. Persisting some kind
of `blocked_until` / try-counter so the stuck head can be skipped after N
attempts would handle the operator-isn't-watching case too. Or maybe wait for
#65921 / #66520 to land and rely on the operator escape hatch -- depends how
acceptable the cross-Dag interference is in the meantime.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1863,40 +1882,233 @@ def _do_scheduling(self, session: Session) -> int:
return num_queued_tis
+ def _check_rollup_asset_status(
+ self,
+ *,
+ asset_id: int,
+ apdr: AssetPartitionDagRun,
+ mapper: RollupMapper,
+ actual_by_asset: dict[int, set[str]],
+ ) -> bool:
+ expected = mapper.to_upstream(apdr.partition_key)
+ return expected.issubset(actual_by_asset.get(asset_id, set()))
+
+ def _resolve_asset_partition_status(
+ self,
+ *,
+ session: Session,
+ asset_id: int,
+ name: str,
+ uri: str,
+ apdr: AssetPartitionDagRun,
+ timetable: PartitionedAssetTimetable,
+ actual_by_asset: dict[int, set[str]],
+ ) -> bool:
+ """
+ Return whether *asset_id* has been satisfied for *apdr*.
+
+ Non-rollup assets resolve to ``True`` because the caller only invokes
+ this for assets that already have at least one logged event for *APDR*
+ (see :class:`~airflow.models.asset.PartitionedAssetKeyLog`), which is
+ the non-rollup contract for "received". Rollup assets defer to
+ :meth:`_check_rollup_asset_status` for the upstream-window check.
+
+ A misconfigured mapper that raises returns ``False`` (treated as
+ not-yet-satisfied) and an audit log entry is written so the operator
+ can see why the Dag run is being held in the UI.
+ """
+ try:
+ mapper = timetable.get_partition_mapper(name=name, uri=uri)
+ if not mapper.is_rollup:
+ return True
+ return self._check_rollup_asset_status(
+ asset_id=asset_id,
+ apdr=apdr,
+ mapper=cast("RollupMapper", mapper),
+ actual_by_asset=actual_by_asset,
+ )
+ except Exception as err:
+ self.log.exception(
+ "Failed to evaluate rollup status for asset; treating as
not-yet-satisfied. "
+ "This likely indicates a misconfigured partition mapper.",
+ dag_id=apdr.target_dag_id,
+ partition_key=apdr.partition_key,
+ asset_name=name,
+ asset_uri=uri,
+ )
+ audit_key = (apdr.target_dag_id, name, uri)
+ if audit_key not in self._partition_audit_seen:
+ # The audit Log row is committed on its own session so it
+ # survives even when the outer ``_create_dagruns_for_dags``
+ # transaction is rolled back (the caller is wrapped in
+ # ``@retry_db_transaction``, and a downstream
``OperationalError``
+ # or scheduler crash mid-tick would otherwise drop the row
while
+ # the in-memory set still suppressed the next attempt).
+ # The dedup-set update is gated on the independent commit
+ # succeeding; a transient DB failure on the audit session
leaves
+ # the key unmarked so the next tick retries.
+ if self._record_partition_audit_log(apdr=apdr, name=name,
uri=uri, err=err):
+ self._partition_audit_seen.add(audit_key)
+ return False
+
+ def _record_partition_audit_log(
+ self,
+ *,
+ apdr: AssetPartitionDagRun,
+ name: str,
+ uri: str,
+ err: BaseException,
+ ) -> bool:
+ """
+ Persist a misconfigured-rollup audit Log row on an independent session.
+
+ Returns ``True`` on commit success, ``False`` when the write fails.
+ Failures are logged and swallowed: the audit row is advisory (the
+ warning above already captures the same information for operators
+ reading scheduler logs), and propagating a Log-table failure would
+ taint the scheduler tick.
+
+ APDR attributes are captured into locals before opening the audit
+ session so the new session never has to lazy-load through an instance
+ attached to the outer session.
+ """
+ target_dag_id = apdr.target_dag_id
+ extra = (
+ "Could not evaluate rollup status for partition_key "
+ f"'{apdr.partition_key}' on asset (name='{name}', uri='{uri}') "
+ f"in target Dag '{target_dag_id}'. This likely indicates "
+ "that the rollup mapper is misconfigured or does not support "
+ f"this partition key.\n{type(err).__name__}: {err}"
+ )
+ try:
+ # ``scoped=False`` so this really is a separate connection /
session
+ # rather than the thread-scoped one shared with the outer
+ # transaction — otherwise an inner commit would close the outer
+ # session's state too.
+ with create_session(scoped=False) as audit_session:
+ audit_session.add(
+ Log(
+ event="failed to evaluate rollup status",
+ dag_id=target_dag_id,
+ extra=extra,
+ )
+ )
+ except Exception:
+ self.log.warning(
+ "Failed to write audit Log row for misconfigured rollup
mapper",
+ dag_id=target_dag_id,
+ asset_name=name,
+ asset_uri=uri,
+ exc_info=True,
+ )
+ return False
+ return True
+
def _create_dagruns_for_partitioned_asset_dags(self, session: Session) ->
set[str]:
+ """
+ Create Dag runs for pending :class:`AssetPartitionDagRun` rows whose
partition is satisfied.
+
+ Returns the set of ``dag_id`` strings that received a new
partition-driven Dag run in this
+ tick. The caller (:meth:`_create_dagruns_for_dags`) uses this set to
exclude the same Dags
+ from the standard schedule-driven and asset-triggered creation paths
so a single Dag never
+ gets two Dag runs for the same tick when it appears in more than one
creation path. We
+ return ``dag_id`` strings rather than full Dag/DagRun objects because
the only downstream
+ use is membership lookup, and a heavier return type would just be
discarded.
+
+ Asset deactivation freezes pending APDRs: when an asset becomes
inactive
+ (orphan — no Dag declares it any more), its ``PartitionedAssetKeyLog``
rows
+ stop contributing to the rollup. If the consumer Dag still depends on
that
+ asset, firing on stale history would conflict with the declared
topology,
+ so the APDR waits. Reactivating the asset resumes evaluation
automatically.
+ This matches the UI's progress view (``_fetch_active_assets_per_dag``).
+ """
+ # Cap per-tick work so the scheduler transaction stays bounded and
other
+ # scheduling work isn't starved. Remaining APDRs drain across
subsequent ticks.
+ # FIFO is intentional: the oldest pending APDR fires first. A
persistently
+ # unsatisfiable APDR at the head (e.g. broken mapper, upstream that
will
+ # never arrive) blocks newer ones until an operator removes it or fixes
+ # the underlying mapper. We surface the stuck state rather than
silently
+ # rotating past it.
+ pending_apdrs = session.scalars(
+ select(AssetPartitionDagRun)
+ .join(DagModel, DagModel.dag_id ==
AssetPartitionDagRun.target_dag_id)
+ .where(
+ AssetPartitionDagRun.created_dag_run_id.is_(None),
+ DagModel.is_stale.is_(False),
+ )
+ .order_by(AssetPartitionDagRun.created_at)
+ .limit(self._max_partition_dag_runs_per_loop)
+ ).all()
+ if not pending_apdrs:
+ return set()
+
partition_dag_ids: set[str] = set()
+ pending_apdr_ids = [apdr.id for apdr in pending_apdrs]
- evaluator = AssetEvaluator(session)
- for apdr in session.scalars(
-
select(AssetPartitionDagRun).where(AssetPartitionDagRun.created_dag_run_id.is_(None))
+ # Pre-fetch all required serialized Dags in one query.
+ dag_ids = list({apdr.target_dag_id for apdr in pending_apdrs})
+ # {"dag_id": Serialized Dag}
+ serialized_dags: dict[str, SerializedDAG] = {}
+ for serdag in
SerializedDagModel.get_latest_serialized_dags(dag_ids=dag_ids, session=session):
Review Comment:
The `get_latest_serialized_dags` call here rehydrates whichever Dag version
is current at evaluation time, not the version that was current when the APDR
row was created. APDR and `PartitionedAssetKeyLog` rows store target/source
keys but no pointer to the mapper or window contract that emitted them.
Bites with AIP-66 versioning. If someone switches
`RollupMapper(window=DayWindow())` to `IdentityMapper` after 12 of 24 hourly
events have arrived -- the APDR was queued under "need all 24" but now
evaluates under "any 1 satisfies" and fires immediately on the partial rollup.
The other direction is uglier: `IdentityMapper` ->
`RollupMapper(window=DayWindow())` after the single upstream arrived, APDR
queued under "any 1" but now needs 24 that will never come.
Pinning the contract at APDR creation would fix it. Either store the
required upstream key set directly on the APDR (simple, denormalized), or stamp
a `dag_version_id` and resolve the matching serialized version at evaluation.
Not sure which is preferable -- the key set means you can evaluate without
re-deserializing the Dag at all, but it locks the row to one version's mapper
output.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py:
##########
@@ -38,23 +45,185 @@
)
from airflow.models import DagModel
from airflow.models.asset import (
+ AssetActive,
AssetModel,
AssetPartitionDagRun,
DagScheduleAssetReference,
PartitionedAssetKeyLog,
)
from airflow.models.dagrun import DagRun
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
+
+ from airflow.partition_mappers.base import RollupMapper
+ from airflow.timetables.simple import PartitionedAssetTimetable
+
+
+log = structlog.get_logger(logger_name=__name__)
+
+
+AssetNameUri: TypeAlias = tuple[str, str]
+"""A ``(name, uri)`` pair identifying an asset."""
+
+
+def _fetch_active_assets_per_dag(
+ dag_ids: list[str], session: Session
+) -> dict[str, tuple[list[AssetNameUri], dict[int, AssetNameUri]]]:
+ """
+ Batch-fetch required assets for multiple Dags in a single query.
+
+ Returns ``{dag_id: ([(name, uri), ...], {asset_id: (name, uri)})}``.
+ Dags with no references are still included with empty containers
+ so callers can index by ``dag_id`` without ``KeyError``.
+
+ Inactive (deactivated) assets are still included so list-route totals stay
+ symmetric with the detail-route response; the per-asset ``asset_inactive``
+ flag (detail route only) surfaces the freeze state.
+ """
+ rows = session.execute(
+ select(
+ DagScheduleAssetReference.dag_id,
+ AssetModel.id,
+ AssetModel.name,
+ AssetModel.uri,
+ )
+ .join(DagScheduleAssetReference, DagScheduleAssetReference.asset_id ==
AssetModel.id)
+ .where(DagScheduleAssetReference.dag_id.in_(dag_ids))
+ ).all()
+ result: dict[str, tuple[list[AssetNameUri], dict[int, AssetNameUri]]] = {
+ dag_id: ([], {}) for dag_id in dag_ids
+ }
+ for row in rows:
+ info, id_to_info = result[row.dag_id]
+ info.append((row.name, row.uri))
+ id_to_info[row.id] = (row.name, row.uri)
+ return result
+
+
+class _RollupResolution(NamedTuple):
+ """
+ Outcome of resolving an asset's upstream-key requirement for one partition
key.
+
+ Three states, distinguished so callers can match the scheduler's
+ ``_resolve_asset_partition_status`` semantics:
+
+ - ``keys`` is a ``frozenset`` and ``mapper_failed`` is ``False``: rollup
+ asset, mapper succeeded — use ``keys`` as the required set.
+ - ``keys`` is ``None`` and ``mapper_failed`` is ``False``: not a rollup
+ asset — a single received event satisfies it.
+ - ``keys`` is ``None`` and ``mapper_failed`` is ``True``: rollup asset
+ whose mapper raised — the scheduler treats it as not-yet-satisfied; the
+ UI must not credit any received event either, otherwise progress would
+ silently show "ready" for a run the scheduler will never fire.
+ """
+
+ keys: frozenset[str] | None = None
+ mapper_failed: bool = False
+
+
+def _resolve_rollup_status(
+ dag_model: DagModel | None,
+ rollup_timetable: PartitionedAssetTimetable | None,
+ name: str,
+ uri: str,
+ partition_key: str,
+) -> _RollupResolution:
+ """
+ Resolve the rollup state for *(name, uri)* under the given partition key.
+
+ The ``dag_model is None`` / ``rollup_timetable is None`` cases
short-circuit
+ to "not rollup" because there is nothing to evaluate against, not because
+ the asset is mis-configured.
+ """
+ if dag_model is None or rollup_timetable is None or not
dag_model.is_rollup_asset(name=name, uri=uri):
+ return _RollupResolution()
+ try:
+ mapper = rollup_timetable.get_partition_mapper(name=name, uri=uri)
+ return _RollupResolution(keys=frozenset(cast("RollupMapper",
mapper).to_upstream(partition_key)))
+ except Exception:
+ # Mismatch with the scheduler's rollup contract. The scheduler writes a
+ # Log row for the same condition (once per misconfig); this path is
+ # per-request and lighter.
+ log.warning(
+ "Failed to evaluate rollup mapper; treating asset as
not-yet-satisfied",
+ dag_id=dag_model.dag_id,
+ asset_name=name,
+ asset_uri=uri,
+ partition_key=partition_key,
+ exc_info=True,
+ )
+ return _RollupResolution(mapper_failed=True)
+
+
+def _compute_total_required(
+ dag_model: DagModel | None,
+ rollup_timetable: PartitionedAssetTimetable | None,
+ asset_info: list[AssetNameUri],
+ partition_key: str,
+) -> int:
+ """
+ Sum required upstream events across all assets, using to_upstream for
rollup mappers.
+
+ Non-rollup assets and broken-mapper assets both count as 1: non-rollup
needs
+ one event to satisfy, broken-mapper counts as 1 unit of "blocked" so the
+ asset still contributes to the totals (received side credits 0, keeping the
+ progress short of "ready" as the scheduler intends).
+ """
+ total = 0
+ for name, uri in asset_info:
+ res = _resolve_rollup_status(dag_model, rollup_timetable, name, uri,
partition_key)
Review Comment:
`_compute_total_required` (this loop) and `_compute_received_count` below
both run the same `_resolve_rollup_status(name, uri, partition_key)` per asset
per row -- so the same `(dag_id, partition_key, name, uri)` resolves twice in a
row. Each call hits `get_partition_mapper` then builds a `frozenset` from
`to_upstream(partition_key)` (DayWindow yields 24, MonthWindow 12, HourWindow
60).
On a busy page the duplication piles up. Worst-ish case I can think of --
500-row page on a Dag with ~5 rollup assets -- is around ~5k mapper calls per
request, half pure duplication. Probably worth hoisting a per-row `dict[(name,
uri), _RollupResolution]` and passing it into both helpers.
The scheduler does the same shape in
`_create_dagruns_for_partitioned_asset_dags` (one `get_partition_mapper` per
(apdr, asset)), but it's bounded by `_max_partition_dag_runs_per_loop` so the
cost is smaller there.
##########
airflow-core/src/airflow/api_fastapi/common/partition_helpers.py:
##########
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import structlog
+
+from airflow.exceptions import DeserializationError
+from airflow.models.serialized_dag import SerializedDagModel
+from airflow.timetables.simple import PartitionedAssetTimetable
+
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
+
+
+log = structlog.get_logger(logger_name=__name__)
+
+
+def _extract_partitioned_timetable(serdag: SerializedDagModel) ->
PartitionedAssetTimetable | None:
+ """Return the ``PartitionedAssetTimetable`` carried by *serdag*, or
``None``."""
+ try:
+ timetable = serdag.dag.timetable
+ except (KeyError, ValueError, ImportError, AttributeError,
DeserializationError):
Review Comment:
Two things off here. The catch list grabs `KeyError` and `AttributeError`
alongside `DeserializationError`, but those two are exactly the classes that
mask refactor bugs (renamed attr on `serdag.dag`, missing dict key) -- the
opposite of what the comment above says it's doing. A bad refactor would just
degrade to "non-partitioned" with a log line and we'd not notice.
And it's missing `TypeError`, which the core `RollupMapper.__init__` now
throws for mis-paired upstream/window (see the comment I left on
`task-sdk/.../partition_mappers/base.py:49`). So a misconfigured rollup Dag
500s every UI call through `load_partitioned_timetable` rather than degrading.
Maybe just `(DeserializationError, TypeError)`?
##########
task-sdk/src/airflow/sdk/definitions/partition_mappers/base.py:
##########
@@ -16,10 +16,34 @@
# under the License.
from __future__ import annotations
+from typing import TYPE_CHECKING, ClassVar
+
+if TYPE_CHECKING:
+ from airflow.sdk.definitions.partition_mappers.window import Window
+
class PartitionMapper:
"""
Base partition mapper class.
- Maps keys from asset events to target dag run partitions.
+ Maps keys from asset events to target Dag run partitions.
+ """
+
+ is_rollup: ClassVar[bool] = False
+
+
+class RollupMapper(PartitionMapper):
+ """
+ Partition mapper that rolls up many upstream keys into one downstream key.
+
+ Compose a ``upstream_mapper`` (which normalizes each upstream key to the
+ downstream granularity) with a ``window`` that declares the full set of
+ upstream keys required for a given downstream key. The scheduler holds
+ the Dag run until every upstream key in the window has arrived.
"""
+
+ is_rollup: ClassVar[bool] = True
+
+ def __init__(self, *, upstream_mapper: PartitionMapper, window: Window) ->
None:
+ self.upstream_mapper = upstream_mapper
+ self.window = window
Review Comment:
The SDK constructor here just stores `upstream_mapper` and `window`. The
core constructor at `airflow-core/.../partition_mappers/base.py:115` does the
actual validation -- `decode_downstream` override check, plus the
`window.expected_decoded_type` check that catches
`RollupMapper(upstream_mapper=IdentityMapper(), window=DayWindow())`.
Problem is, user code only ever touches the SDK class (`from airflow.sdk
import RollupMapper`). So a mis-paired upstream/window slips through DAG parse
with no error. The core `TypeError` only fires once the scheduler deserializes
-- where it gets swallowed by the bare `except Exception` in
`_create_dagruns_for_partitioned_asset_dags` and shows up as "Failed to
deserialize Dag" spam every tick. The author has no idea what's wrong.
Could just mirror the same check in the SDK constructor. Bit of a chore
though -- `expected_decoded_type` is a `ClassVar` on the core `Window` but not
on the SDK one, so it'd need adding to the SDK `Window` ABC and all 6
subclasses too.
##########
airflow-core/src/airflow/ui/src/components/AssetProgressCell.tsx:
##########
@@ -38,12 +37,17 @@ export const AssetProgressCell = ({ dagId, partitionKey,
totalReceived, totalReq
const assetExpression = data?.asset_expression as ExpressionType | undefined;
Review Comment:
This `as ExpressionType` cast is here (and at
`pages/DagsList/AssetSchedule.tsx:331`) because the generated OpenAPI types
declare `asset_expression` as `{ [k: string]: unknown } | null` -- the Python
annotation is just `dict | None`, no shape. The `ExpressionType` discriminated
union is hand-written in `src/components/AssetExpression/` with no runtime
check, so the consumers are basically just trusting the server.
If the asset-expression shape ever changes server-side, the TS build won't
catch it -- you'd find out via a blank render or a discriminator-missing crash
at runtime. Worth pinning down the shape on the Python side (a Pydantic model
in `datamodels/ui/assets.py`, or a `oneOf` override on the OpenAPI schema) so
`ExpressionType` ends up in `openapi-gen/queries/types.gen.ts` and the cast can
go away on its own.
##########
airflow-core/newsfragments/64571.significant.rst:
##########
@@ -29,8 +29,33 @@ an hourly partition).
Mappers can be set globally on a ``PartitionedAssetTimetable`` or overridden
per upstream asset via ``partition_mapper_config``.
+**Rollup (one downstream run per window of upstream partitions)**:
+
+- ``RollupMapper`` — wraps a ``upstream_mapper`` (which normalises the
upstream key to the
Review Comment:
Newsfragment looks good. But the narrative doc side feels thin --
`airflow-core/docs/authoring-and-scheduling/assets.rst` § "Asset partitions"
(lines 496-630) covers the non-rollup mappers but doesn't reach the rollup
case. No subsection showing how to compose
`RollupMapper(upstream_mapper=StartOfDayMapper(), window=DayWindow())` onto a
`PartitionedAssetTimetable`, no "hourly -> daily summary" worked example, and
the DST mitigation lives in the newsfragment but not anywhere a user is likely
to read while building this.
Autoapi entries cover the reference but they won't help someone landing on
assets.rst trying to figure out if the feature exists. Either drop a section in
here before the 3.3 freeze or open a follow-up doc PR -- not necessarily
blocking but probably wants to ship in the same release.
(Also, the existing partitions section is at
[`authoring-and-scheduling/assets.rst#asset-partitions`](https://github.com/apache/airflow/blob/main/airflow-core/docs/authoring-and-scheduling/assets.rst)
for context.)
##########
task-sdk/docs/api.rst:
##########
@@ -239,10 +239,29 @@ Partition Mapper
.. autoapiclass:: airflow.sdk.StartOfYearMapper
+.. autoapiclass:: airflow.sdk.RollupMapper
Review Comment:
Noticing the AIP-76 wiki and the code have drifted on naming -- the [AIP
spec](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=311626969)
uses `PartitionByInterval` / `PartitionBySequence` / `PartitionByProduct` /
`PartitionAtRuntime`, but the shipped surface here is `CronPartitionTimetable`
/ `PartitionedAssetTimetable` / `StartOfDayMapper` / `RollupMapper` /
`ProductMapper` / `AllowedKeyMapper` / `ChainMapper`.
Not a code concern but external readers landing on the AIP and searching for
`PartitionByInterval` won't find anything in `airflow.sdk`. The May 2026
management deck even has a typo carried over (`CronPartitionedTimetable` --
doesn't exist). Probably worth a follow-up to either refresh the AIP wiki
against the shipped names + amendment, or add a mapping note somewhere visible.
cc TP, Wei since they're owning this AIP.
##########
airflow-core/src/airflow/partition_mappers/window.py:
##########
@@ -0,0 +1,181 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from datetime import datetime, timedelta
+from typing import TYPE_CHECKING, Any, ClassVar
+
+if TYPE_CHECKING:
+ from collections.abc import Iterable
+
+
+def _require_day_one(dt: datetime, window_cls: type) -> None:
+ """
+ Raise ``ValueError`` if *dt* is not on day 1 of its month.
+
+ Month-aligned windows expand by month-step arithmetic, which is only
+ safe when the period starts on day 1 (otherwise e.g.
+ ``replace(month=feb)`` on Jan 31 raises ``ValueError: day is out of
+ range for month``). Built-in temporal upstream mappers normalise to
+ day 1, but a custom ``PartitionMapper.decode_downstream`` could return
+ any day — checking here turns a confusing scheduler-tick crash into
+ an explicit signal about the upstream-mapper contract.
+ """
+ if dt.day != 1:
+ raise ValueError(
+ f"{window_cls.__name__} expects a period start on day 1 of the
month, "
+ f"got {dt.isoformat()}. The paired upstream mapper's
decode_downstream "
+ "must normalise to day 1."
+ )
+
+
+def _shift_months(dt: datetime, months: int) -> datetime:
+ """
+ Return *dt* shifted forward by *months*, wrapping the year as needed.
+
+ Caller is responsible for ensuring *dt* is on day 1 (see
+ :func:`_require_day_one`) so that ``replace(month=...)`` is always valid.
+ """
+ total = dt.month - 1 + months
+ return dt.replace(year=dt.year + total // 12, month=total % 12 + 1)
+
+
+class Window(ABC):
Review Comment:
Question on scope -- the [AIP-76
spec](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=311626969)
under "Complex Schedule-Partition Mapping" specifically cites "a
hourly-materialised asset that modifies data for the past **two** hours (the
new hour, and the hour before it that was also covered by the previous
partition -- a.k.a. a slicing window)" as the motivating example for the
pluggable `Window` interface. The 6 built-ins shipped here are all contiguous
full-period (Hour/Day/Week/Month/Quarter/Year). A rolling 7-day window every
day, or the past-two-hours sliding case the AIP names, can't be expressed by
subclassing this ABC -- `to_upstream` returns the full set for one downstream
period, with no notion of overlap.
Is sliding deliberate follow-up (3.4?) or in-scope for 3.3 alongside
`FanOutMapper` (#65654)? If it's follow-up, probably worth a sentence in the
`Window` docstring so users don't go subclass it for sliding semantics and hit
non-obvious limitations. If it's in-scope, worth coordinating the API shape now
before this lands.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py:
##########
@@ -55,10 +69,15 @@ def next_run_assets(
pending_partition_count: int | None = None
queued_expr: ColumnElement[int]
- if is_partitioned := dag_model.timetable_summary == "Partitioned Asset":
+ if is_partitioned := dag_model.timetable_partitioned:
pending_partition_count = session.scalar(
- select(func.count())
+ select(func.count(AssetPartitionDagRun.id.distinct()))
.select_from(AssetPartitionDagRun)
+ .join(
+ DagScheduleAssetReference,
+ DagScheduleAssetReference.dag_id ==
AssetPartitionDagRun.target_dag_id,
+ )
+ .join(AssetModel, AssetModel.id ==
DagScheduleAssetReference.asset_id)
Review Comment:
The joins to `DagScheduleAssetReference` and `AssetModel` aren't referenced
anywhere in the `WHERE` clause, so `count(AssetPartitionDagRun.id.distinct())`
ends up collapsing a cartesian product. For a Dag with N
`DagScheduleAssetReference` rows and M pending APDRs that's N*M rows scanned to
produce a count that
```python
select(func.count())
.select_from(AssetPartitionDagRun)
.where(
AssetPartitionDagRun.target_dag_id == dag_id,
AssetPartitionDagRun.created_dag_run_id.is_(None),
)
```
would do by scanning M. This runs once per partitioned Dag inside
`next_run_assets` which the UI calls on every home/Dags-list render. Both joins
look droppable.
##########
airflow-core/src/airflow/ui/src/components/AssetExpression/AssetNode.tsx:
##########
@@ -16,47 +16,130 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { Box, Text, HStack } from "@chakra-ui/react";
-import { FiDatabase } from "react-icons/fi";
+import { Box, Button, HStack, Text, VStack } from "@chakra-ui/react";
+import { FiCheck, FiDatabase, FiMinus } from "react-icons/fi";
import { PiRectangleDashed } from "react-icons/pi";
-import { RouterLink } from "src/components/ui";
+import type { NextRunAssetEventResponse } from "openapi/requests/types.gen";
+import { Popover, RouterLink } from "src/components/ui";
import Time from "../Time";
-import type { AssetSummary, NextRunEvent } from "./types";
+import type { AssetSummary } from "./types";
+
+const RollupKeyChecklistPopover = ({
Review Comment:
This `RollupKeyChecklistPopover` is a local `const` in this file -- not
exported. The same `Popover` + `VStack` + `FiCheck`/`FiMinus` block is
reimplemented by hand in three other places:
- `components/AssetProgressCell.tsx` (rebuilds the checklist as its popover
body)
- `pages/DagsList/AssetSchedule.tsx` -- twice, once for the single-asset
branch and once for multi-asset, each redoing the whole Popover wrapper.
So four copies of the same render logic. Hate to be that person but the next
icon swap or a11y label tweak is going to land in three of them and miss the
fourth. Worth exporting from this file? `AssetProgressCell` would need the body
without the Popover (since it's already inside its own), so maybe split: a
`RollupKeyChecklist` for the content + the existing wrapper around it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]