kaxil commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3268775477
##########
airflow-core/src/airflow/serialization/decoders.py:
##########
@@ -202,3 +203,13 @@ def decode_partition_mapper(var: dict[str, Any]) ->
PartitionMapper:
else:
partition_mapper_cls =
find_registered_custom_partition_mapper(importable_string)
return partition_mapper_cls.deserialize(var[Encoding.VAR])
+
+
+def decode_window(var: dict[str, Any]) -> Window:
+ """
+ Decode a previously serialized :class:`Window`.
+
+ :meta private:
+ """
+ window_cls: type[Window] = import_string(var[Encoding.TYPE])
Review Comment:
`decode_window` calls `import_string` on the serialized `Encoding.TYPE`
directly with no gate. Compare with the sibling `decode_partition_mapper` (line
202) which checks `is_core_partition_mapper_import_path` and falls back to
`find_registered_custom_partition_mapper`, and `decode_timetable` (line 185)
which uses `is_core_timetable_import_path`.
A tampered serialized DAG can name any importable Python class and have the
scheduler import it during deserialization. Mirror the partition-mapper pattern
with `is_core_window_import_path` + `find_registered_custom_window` (or fold
Windows under the partition-mapper registry).
##########
airflow-core/src/airflow/serialization/encoders.py:
##########
@@ -552,3 +596,25 @@ def encode_partition_mapper(var: PartitionMapper |
CorePartitionMapper) -> dict[
Encoding.TYPE: qn,
Encoding.VAR: _serializer.serialize_partition_mapper(var),
}
+
+
+def encode_window(var: Window | CoreWindow) -> dict[str, Any]:
+ """
+ Encode a :class:`Window` instance.
+
+ :meta private:
+ """
+ var_type = type(var)
+ importable_string = _serializer.BUILTIN_WINDOWS.get(var_type)
+ if importable_string is not None:
+ return {
+ Encoding.TYPE: importable_string,
+ Encoding.VAR: _serializer.serialize_window(var),
+ }
+
+ # Custom Window subclasses must live at an importable path so the
+ # scheduler can reconstruct them via import_string during deserialization.
+ return {
+ Encoding.TYPE: qualname(var),
Review Comment:
Asymmetry with `encode_partition_mapper` above: that helper calls
`find_registered_custom_partition_mapper(qn)` to require registration for
non-builtin types, but `encode_window` accepts any `qualname(var)` without
validation.
Combined with the unguarded `decode_window`, this means any `Window`
subclass at any import path round-trips silently. Moving a custom Window class
to a different module then breaks deserialization of older DagRuns with no
early error.
##########
airflow-core/src/airflow/partition_mappers/window.py:
##########
@@ -0,0 +1,140 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from datetime import datetime, timedelta
+from typing import TYPE_CHECKING, Any
+
+if TYPE_CHECKING:
+ from collections.abc import Iterable
+
+
+def _shift_months(dt: datetime, months: int) -> datetime:
+ """
+ Return *dt* shifted forward by *months*, wrapping the year as needed.
+
+ All built-in temporal upstream mappers normalise to day 1, so
+ :meth:`datetime.replace` on the new month is always valid.
+ """
+ total = dt.month - 1 + months
+ return dt.replace(year=dt.year + total // 12, month=total % 12 + 1)
+
+
+class Window(ABC):
+ """
+ Describes a rollup window: which decoded upstream items make up one
decoded downstream period.
+
+ Paired with a upstream mapper inside a :class:`RollupMapper`. The window
+ operates purely in the upstream mapper's *decoded* form (``datetime`` for
+ temporal mappers, domain-specific types for future segment / runtime
+ mappers). It does not touch key strings, timezones, or formats — those
+ belong to the upstream mapper. ``RollupMapper`` orchestrates the three:
+ decode the downstream key, expand via the window, encode each upstream.
+ """
+
+ @abstractmethod
+ def to_upstream(self, decoded_downstream: Any) -> Iterable[Any]:
+ """Yield each decoded upstream item composing *decoded_downstream*."""
+
+ def serialize(self) -> dict[str, Any]:
+ return {}
+
+ @classmethod
+ def deserialize(cls, data: dict[str, Any]) -> Window:
+ return cls()
+
+
+class HourWindow(Window):
+ """Sixty consecutive minute period-starts making up one hour."""
+
+ def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
+ return (period_start + timedelta(minutes=i) for i in range(60))
+
+
+class DayWindow(Window):
+ """
+ Twenty-four consecutive hourly period-starts making up one day.
+
+ Arithmetic is done on naive datetime steps so the 24-hour stride is
+ unambiguous across DST transitions; the upstream mapper handles timezone
+ awareness when it encodes each upstream member back to a key string.
+
+ .. warning:: **DST edge cases with local-timezone upstream mappers**
+
+ ``DayWindow`` always yields exactly 24 steps regardless of the local
+ calendar date. When the upstream mapper uses a local timezone
+ (e.g. ``StartOfDayMapper(timezone="America/New_York")``), DST gaps
+ and folds can cause a mismatch:
+
+ - **Spring-forward (clock skips ahead)**: the local day has fewer than
+ 24 real hours. One naive step falls in the gap (e.g. 02:00 ET on
+ spring-forward day does not exist), so the upstream mapper encodes it
+ to the *next* local hour. That key (e.g. ``"2024-03-10T03"``) does
+ not match any upstream event — the rollup window can never be fully
+ satisfied.
+ - **Fall-back (clock repeats)**: the local day has 25 real hours, but
+ ``DayWindow`` only enumerates 24 steps. The extra hour's upstream
+ events are never included in the expected set, so those events do not
+ contribute to any rollup.
+
+ **Mitigation**: use UTC ``input_format`` (e.g. ``%Y-%m-%dT%H%z``) and
+ ensure upstream producers emit UTC partition keys so local-clock
+ ambiguity never arises.
+ """
+
+ def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
+ return (period_start + timedelta(hours=i) for i in range(24))
+
+
+class WeekWindow(Window):
+ """Seven consecutive daily period-starts making up one week."""
+
+ def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
+ return (period_start + timedelta(days=i) for i in range(7))
+
+
+class MonthWindow(Window):
+ """
+ All daily period-starts making up one calendar month.
+
+ Assumes *period_start* falls on day 1 of the month (which all built-in
+ temporal upstream mappers normalise to). Iterates from day 1 to the last
+ day of that calendar month, so the yielded count varies between 28 and
+ 31 depending on the month.
+ """
+
+ def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
+ next_month = period_start.month % 12 + 1
+ next_year = period_start.year + (1 if period_start.month == 12 else 0)
+ next_start = period_start.replace(year=next_year, month=next_month)
Review Comment:
`period_start.replace(year=..., month=...)` raises `ValueError` if
`period_start.day` isn't valid in the target month -- e.g.
`MonthWindow().to_upstream(datetime(2024, 1, 31))` raises `day 31 must be in
range 1..29 for month 2 in year 2024`.
The docstring says "assumes day 1" but nothing enforces it. A custom
`PartitionMapper.decode_downstream` returning a non-day-1 datetime crashes the
scheduler tick. Add an explicit `if period_start.day != 1: raise
ValueError(...)` (and the same precondition applies to `QuarterWindow` and
`YearWindow` via `_shift_months`).
##########
airflow-core/src/airflow/models/dag.py:
##########
@@ -481,6 +491,38 @@ def __init__(self, **kwargs):
def __repr__(self):
return f"<DAG: {self.dag_id}>"
+ def is_rollup_asset(self, *, name: str, uri: str) -> bool:
+ """
+ Return whether the asset identified by *name*/*uri* uses a rollup
mapper.
+
+ Reads the cached ``partition_mapper_info`` populated during Dag
+ serialization, mirroring
``PartitionedAssetTimetable.get_partition_mapper``.
+
+ Entries come from three shapes:
+
+ - Regular ``Asset`` → ``{"name": ..., "uri": ..., "is_rollup": ...}``
+ - ``Asset.ref(name=...)`` (``SerializedAssetNameRef``) → ``{"name":
..., "is_rollup": ...}``
+ - ``Asset.ref(uri=...)`` (``SerializedAssetUriRef``) → ``{"uri": ...,
"is_rollup": ...}``
+
+ Name match wins over uri match (any name hit in the list outranks
+ any uri hit), so the first pass scans for a name match and the
+ second pass falls back to uri. The uri pass exists for uri-only
+ ref entries (which carry no ``name`` field) — without it, those
+ refs would never resolve.
+ """
+ for entry in self.partition_mapper_info:
+ if entry.get("name") == name:
+ return entry.get("is_rollup", False)
+ for entry in self.partition_mapper_info:
+ if entry.get("uri") == uri:
+ return entry.get("is_rollup", False)
+ return False
+
+ @property
+ def has_rollup_mappers(self) -> bool:
+ """Whether any cached partition mapper is a rollup mapper."""
+ return any(entry.get("is_rollup", False) for entry in
self.partition_mapper_info)
Review Comment:
`entry.get("is_rollup", False)` defaults silently when the field is missing.
The `PartitionMapperInfo` TypedDict declares `is_rollup: bool` as required, so
the default never triggers in normal use, but a future migration that adds a
new shape and forgets to backfill won't fail loudly -- it'll just report
`has_rollup_mappers=False`.
Use `entry["is_rollup"]` and let the `KeyError` surface; the migration in
this PR backfills all existing rows.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1859,40 +1876,175 @@ def _do_scheduling(self, session: Session) -> int:
return num_queued_tis
+ def _check_rollup_asset_status(
+ self,
+ *,
+ asset_id: int,
+ apdr: AssetPartitionDagRun,
+ mapper: RollupMapper,
+ actual_by_asset: dict[int, set[str]],
+ ) -> bool:
+ if TYPE_CHECKING:
+ assert apdr.partition_key is not None
+ expected = mapper.to_upstream(apdr.partition_key)
+ return expected.issubset(actual_by_asset.get(asset_id, set()))
+
+ def _resolve_asset_partition_status(
+ self,
+ *,
+ session: Session,
+ asset_id: int,
+ name: str,
+ uri: str,
+ apdr: AssetPartitionDagRun,
+ timetable: PartitionedAssetTimetable,
+ actual_by_asset: dict[int, set[str]],
+ ) -> bool:
+ """
+ Return whether *asset_id* has been satisfied for *apdr*.
+
+ Non-rollup assets resolve to ``True`` because the caller only invokes
+ this for assets that already have at least one logged event for *APDR*
+ (see :class:`~airflow.models.asset.PartitionedAssetKeyLog`), which is
+ the non-rollup contract for "received". Rollup assets defer to
+ :meth:`_check_rollup_asset_status` for the upstream-window check.
+
+ A misconfigured mapper that raises returns ``False`` (treated as
+ not-yet-satisfied) and an audit log entry is written so the operator
+ can see why the Dag run is being held in the UI.
+ """
+ try:
+ mapper = timetable.get_partition_mapper(name=name, uri=uri)
+ if not mapper.is_rollup:
+ return True
+ return self._check_rollup_asset_status(
+ asset_id=asset_id,
+ apdr=apdr,
+ mapper=cast("RollupMapper", mapper),
+ actual_by_asset=actual_by_asset,
+ )
+ except Exception as err:
+ self.log.exception(
+ "Failed to evaluate rollup status for asset; treating as
not-yet-satisfied. "
+ "This likely indicates a misconfigured partition mapper.",
+ dag_id=apdr.target_dag_id,
+ partition_key=apdr.partition_key,
+ asset_name=name,
+ asset_uri=uri,
+ )
+ if TYPE_CHECKING:
+ assert apdr.target_dag_id is not None
+ audit_key = (apdr.target_dag_id, name, uri)
+ if audit_key not in self._partition_audit_seen:
+ self._partition_audit_seen.add(audit_key)
+ session.add(
+ Log(
+ event="failed to evaluate rollup status",
+ dag_id=apdr.target_dag_id,
+ extra=(
+ "Could not evaluate rollup status for
partition_key "
+ f"'{apdr.partition_key}' on asset (name='{name}',
uri='{uri}') "
+ f"in target Dag '{apdr.target_dag_id}'. This
likely indicates "
+ "that the rollup mapper is misconfigured or does
not support "
+ f"this partition key.\n{type(err).__name__}: {err}"
+ ),
+ )
+ )
+ return False
+
def _create_dagruns_for_partitioned_asset_dags(self, session: Session) ->
set[str]:
+ """
+ Create Dag runs for pending :class:`AssetPartitionDagRun` rows whose
partition is satisfied.
+
+ Returns the set of ``dag_id`` strings that received a new
partition-driven Dag run in this
+ tick. The caller (:meth:`_create_dagruns_for_dags`) uses this set to
exclude the same Dags
+ from the standard schedule-driven and asset-triggered creation paths
so a single Dag never
+ gets two Dag runs for the same tick when it appears in more than one
creation path. We
+ return ``dag_id`` strings rather than full Dag/DagRun objects because
the only downstream
+ use is membership lookup, and a heavier return type would just be
discarded.
+ """
+ # Cap per-tick work so the scheduler transaction stays bounded and
other
+ # scheduling work isn't starved. Remaining APDRs drain across
subsequent ticks.
+ # Note: with strict FIFO ordering, persistently-unsatisfied APDRs at
the head
+ # of the queue would block newer ones; switch to updated_at-based
ordering if
+ # that becomes an issue.
+ pending_apdrs = session.scalars(
+ select(AssetPartitionDagRun)
+ .join(DagModel, DagModel.dag_id ==
AssetPartitionDagRun.target_dag_id)
+ .where(
+ AssetPartitionDagRun.created_dag_run_id.is_(None),
+ DagModel.is_stale.is_(False),
+ )
+ .order_by(AssetPartitionDagRun.created_at)
Review Comment:
The scheduler picks the oldest pending APDR (FIFO), but the UI route at
`api_fastapi/core_api/routes/ui/assets.py:145` picks the newest
(`order_by(...created_at.desc()).limit(1)`).
When more than one APDR is pending for the same DAG, the UI shows "X/Y
received" for `partition_key=newest` while the scheduler is about to fire
`partition_key=oldest`. Users will be confused about why the run that fires is
not the one shown. Pick the same ordering on both sides, or surface multiple
pending partitions in the UI.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1859,40 +1876,175 @@ def _do_scheduling(self, session: Session) -> int:
return num_queued_tis
+ def _check_rollup_asset_status(
+ self,
+ *,
+ asset_id: int,
+ apdr: AssetPartitionDagRun,
+ mapper: RollupMapper,
+ actual_by_asset: dict[int, set[str]],
+ ) -> bool:
+ if TYPE_CHECKING:
+ assert apdr.partition_key is not None
+ expected = mapper.to_upstream(apdr.partition_key)
+ return expected.issubset(actual_by_asset.get(asset_id, set()))
+
+ def _resolve_asset_partition_status(
+ self,
+ *,
+ session: Session,
+ asset_id: int,
+ name: str,
+ uri: str,
+ apdr: AssetPartitionDagRun,
+ timetable: PartitionedAssetTimetable,
+ actual_by_asset: dict[int, set[str]],
+ ) -> bool:
+ """
+ Return whether *asset_id* has been satisfied for *apdr*.
+
+ Non-rollup assets resolve to ``True`` because the caller only invokes
+ this for assets that already have at least one logged event for *APDR*
+ (see :class:`~airflow.models.asset.PartitionedAssetKeyLog`), which is
+ the non-rollup contract for "received". Rollup assets defer to
+ :meth:`_check_rollup_asset_status` for the upstream-window check.
+
+ A misconfigured mapper that raises returns ``False`` (treated as
+ not-yet-satisfied) and an audit log entry is written so the operator
+ can see why the Dag run is being held in the UI.
+ """
+ try:
+ mapper = timetable.get_partition_mapper(name=name, uri=uri)
+ if not mapper.is_rollup:
+ return True
+ return self._check_rollup_asset_status(
+ asset_id=asset_id,
+ apdr=apdr,
+ mapper=cast("RollupMapper", mapper),
+ actual_by_asset=actual_by_asset,
+ )
+ except Exception as err:
+ self.log.exception(
+ "Failed to evaluate rollup status for asset; treating as
not-yet-satisfied. "
+ "This likely indicates a misconfigured partition mapper.",
+ dag_id=apdr.target_dag_id,
+ partition_key=apdr.partition_key,
+ asset_name=name,
+ asset_uri=uri,
+ )
+ if TYPE_CHECKING:
+ assert apdr.target_dag_id is not None
+ audit_key = (apdr.target_dag_id, name, uri)
+ if audit_key not in self._partition_audit_seen:
+ self._partition_audit_seen.add(audit_key)
Review Comment:
The set is mutated before the `Log` row is added to the session, and
`_create_dagruns_for_partitioned_asset_dags` is invoked from
`_create_dagruns_for_dags` which carries `@retry_db_transaction`. If the
transaction rolls back (DB error retried, downstream `OperationalError`,
scheduler crash mid-tick), `_partition_audit_seen` still says "logged" but no
row hit the DB.
For the rest of that process lifetime, the operator never sees an audit Log
for that misconfigured mapper -- only the Dag silently held. Either flush the
Log row first and add to the set only after success, or write the audit row in
an independent `create_session(scoped=False)` transaction.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py:
##########
@@ -134,32 +215,70 @@ def get_partitioned_dag_runs(
query = query.order_by(AssetPartitionDagRun.created_at.desc())
if not (rows := session.execute(query).all()):
+ if dag_id.value is not None:
+ dag_exists =
session.scalar(select(DagModel.dag_id).where(DagModel.dag_id == dag_id.value))
+ if dag_exists is None:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id
{dag_id.value} was not found")
return PartitionedDagRunCollectionResponse(partitioned_dag_runs=[],
total=0)
- if dag_id.value is not None:
- results = [_build_response(row, required_count) for row in rows]
- return
PartitionedDagRunCollectionResponse(partitioned_dag_runs=results,
total=len(results))
+ # Batch-fetch DagModels (for cached partition_mapper_info), required
assets,
+ # and APDR log entries in three single queries instead of N per-Dag
queries.
+ # Timetables are only loaded for Dags that actually have rollup mappers,
+ # since that's the only case where ``to_upstream`` evaluation is needed.
+ # A SQL count subquery for total_received cannot honour rollup windows
+ # without running the mapper, so the rollup-aware Python computation runs
+ # uniformly across single-Dag and global views.
+ unique_dag_ids = list({row.target_dag_id for row in rows})
+ dag_models: dict[str, DagModel] = {
+ dm.dag_id: dm
+ for dm in
session.scalars(select(DagModel).where(DagModel.dag_id.in_(unique_dag_ids))).all()
+ }
+ assets_by_dag = _fetch_active_assets_per_dag(unique_dag_ids, session)
+ rollup_timetables_by_dag: dict[str, PartitionedAssetTimetable | None] = {
+ d_id: (
+ load_partitioned_timetable(d_id, session)
Review Comment:
`load_partitioned_timetable` calls `SerializedDagModel.get(dag_id=...)` once
per DAG, which is one query per iteration. The PR's commit message claims to
remove N+1 queries, but this comprehension reintroduces them on the rollup path.
Use `SerializedDagModel.get_latest_serialized_dags(dag_ids=[...])` (the same
helper the scheduler uses at line 2004) to fetch in a single query.
##########
airflow-core/src/airflow/api_fastapi/common/partition_helpers.py:
##########
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import structlog
+
+from airflow.models.serialized_dag import SerializedDagModel
+from airflow.timetables.simple import PartitionedAssetTimetable
+
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
+
+
+log = structlog.get_logger(logger_name=__name__)
+
+
+def load_partitioned_timetable(dag_id: str, session: Session) ->
PartitionedAssetTimetable | None:
+ """
+ Return the PartitionedAssetTimetable for *dag_id*, or None if absent or
not partitioned.
+
+ Callers gate this behind ``DagModel.has_rollup_mappers``, which is only
+ populated for ``PartitionedAssetTimetable``. The ``TYPE_CHECKING`` assert
+ narrows the type for mypy without a runtime ``isinstance`` cost.
+ """
+ serdag = SerializedDagModel.get(dag_id=dag_id, session=session)
+ if serdag is None:
+ return None
+ try:
+ timetable = serdag.dag.timetable
+ except Exception:
Review Comment:
`except Exception` swallows everything from `serdag.dag.timetable`,
including config errors and `AttributeError`s that aren't really
deserialization failures. Narrow to the actual classes timetable
deserialization raises (`KeyError`/`ValueError`/`ImportError`/`AttributeError`)
so unrelated bugs surface instead of silently downgrading to non-rollup.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py:
##########
@@ -201,38 +320,67 @@ def get_pending_partitioned_dag_run(
f"No PartitionedDagRun for dag={dag_id} partition={partition_key}",
)
- received_subq = (
- select(PartitionedAssetKeyLog.asset_id).where(
- PartitionedAssetKeyLog.asset_partition_dag_run_id ==
partitioned_dag_run.id
- )
- ).correlate(AssetModel)
-
- received_expr = exists(received_subq.where(PartitionedAssetKeyLog.asset_id
== AssetModel.id))
+ # Collect received upstream partition keys per asset for this partition
run.
+ # Use a set to deduplicate: multiple events for the same key count as one.
+ received_keys_by_asset: dict[int, set[str]] = {}
+ for row in session.execute(
+ select(
+ PartitionedAssetKeyLog.asset_id,
+ PartitionedAssetKeyLog.source_partition_key,
+ ).where(PartitionedAssetKeyLog.asset_partition_dag_run_id ==
partitioned_dag_run.id)
+ ):
+ received_keys_by_asset.setdefault(row.asset_id,
set()).add(row.source_partition_key or "")
- asset_expression_subq = (
- select(DagModel.asset_expression).where(DagModel.dag_id ==
dag_id).scalar_subquery()
- )
+ dag_model = session.get(DagModel, dag_id)
asset_rows = session.execute(
select(
AssetModel.id,
AssetModel.uri,
AssetModel.name,
- received_expr.label("received"),
- asset_expression_subq.label("asset_expression"),
)
.join(DagScheduleAssetReference, DagScheduleAssetReference.asset_id ==
AssetModel.id)
.where(DagScheduleAssetReference.dag_id == dag_id,
AssetModel.active.has())
- .order_by(received_expr.asc(), AssetModel.uri)
+ .order_by(AssetModel.uri)
).all()
- assets = [
- PartitionedDagRunAssetResponse(
- asset_id=row.id, asset_name=row.name, asset_uri=row.uri,
received=row.received
+ # Skip the timetable load when no rollup mapper is configured — the cached
+ # ``partition_mapper_info`` already tells us whether we will need
+ # ``to_upstream`` evaluation, which is the only thing the timetable adds
here.
+ has_rollup_mappers = dag_model is not None and dag_model.has_rollup_mappers
+ rollup_timetable = load_partitioned_timetable(dag_id, session) if
has_rollup_mappers else None
+
+ assets = []
+ for asset_row in asset_rows:
+ received_keys = sorted(received_keys_by_asset.get(asset_row.id, set()))
+ required_keys: list[str] = [partition_key]
+ is_rollup = (
+ has_rollup_mappers
+ and dag_model is not None
+ and dag_model.is_rollup_asset(name=asset_row.name,
uri=asset_row.uri)
)
- for row in asset_rows
- ]
- total_received = sum(1 for a in assets if a.received)
- asset_expression = asset_rows[0].asset_expression if asset_rows else None
+ if is_rollup and rollup_timetable is not None:
+ with suppress(Exception):
Review Comment:
`suppress(Exception)` here (and at `:115` and in `assets.py:192`) silently
falls back to non-rollup behaviour when the mapper raises. No log line, no
audit.
The scheduler at least writes a `Log` row once per process for the same
condition. Users will see "0/1 received" forever with no UI/log signal that the
mapper is broken. Add a `log.warning(...)` inside each suppress.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1859,40 +1876,175 @@ def _do_scheduling(self, session: Session) -> int:
return num_queued_tis
+ def _check_rollup_asset_status(
+ self,
+ *,
+ asset_id: int,
+ apdr: AssetPartitionDagRun,
+ mapper: RollupMapper,
+ actual_by_asset: dict[int, set[str]],
+ ) -> bool:
+ if TYPE_CHECKING:
+ assert apdr.partition_key is not None
+ expected = mapper.to_upstream(apdr.partition_key)
+ return expected.issubset(actual_by_asset.get(asset_id, set()))
+
+ def _resolve_asset_partition_status(
+ self,
+ *,
+ session: Session,
+ asset_id: int,
+ name: str,
+ uri: str,
+ apdr: AssetPartitionDagRun,
+ timetable: PartitionedAssetTimetable,
+ actual_by_asset: dict[int, set[str]],
+ ) -> bool:
+ """
+ Return whether *asset_id* has been satisfied for *apdr*.
+
+ Non-rollup assets resolve to ``True`` because the caller only invokes
+ this for assets that already have at least one logged event for *APDR*
+ (see :class:`~airflow.models.asset.PartitionedAssetKeyLog`), which is
+ the non-rollup contract for "received". Rollup assets defer to
+ :meth:`_check_rollup_asset_status` for the upstream-window check.
+
+ A misconfigured mapper that raises returns ``False`` (treated as
+ not-yet-satisfied) and an audit log entry is written so the operator
+ can see why the Dag run is being held in the UI.
+ """
+ try:
+ mapper = timetable.get_partition_mapper(name=name, uri=uri)
+ if not mapper.is_rollup:
+ return True
+ return self._check_rollup_asset_status(
+ asset_id=asset_id,
+ apdr=apdr,
+ mapper=cast("RollupMapper", mapper),
+ actual_by_asset=actual_by_asset,
+ )
+ except Exception as err:
+ self.log.exception(
+ "Failed to evaluate rollup status for asset; treating as
not-yet-satisfied. "
+ "This likely indicates a misconfigured partition mapper.",
+ dag_id=apdr.target_dag_id,
+ partition_key=apdr.partition_key,
+ asset_name=name,
+ asset_uri=uri,
+ )
+ if TYPE_CHECKING:
+ assert apdr.target_dag_id is not None
+ audit_key = (apdr.target_dag_id, name, uri)
+ if audit_key not in self._partition_audit_seen:
+ self._partition_audit_seen.add(audit_key)
+ session.add(
+ Log(
+ event="failed to evaluate rollup status",
+ dag_id=apdr.target_dag_id,
+ extra=(
+ "Could not evaluate rollup status for
partition_key "
+ f"'{apdr.partition_key}' on asset (name='{name}',
uri='{uri}') "
+ f"in target Dag '{apdr.target_dag_id}'. This
likely indicates "
+ "that the rollup mapper is misconfigured or does
not support "
+ f"this partition key.\n{type(err).__name__}: {err}"
+ ),
+ )
+ )
+ return False
+
def _create_dagruns_for_partitioned_asset_dags(self, session: Session) ->
set[str]:
+ """
+ Create Dag runs for pending :class:`AssetPartitionDagRun` rows whose
partition is satisfied.
+
+ Returns the set of ``dag_id`` strings that received a new
partition-driven Dag run in this
+ tick. The caller (:meth:`_create_dagruns_for_dags`) uses this set to
exclude the same Dags
+ from the standard schedule-driven and asset-triggered creation paths
so a single Dag never
+ gets two Dag runs for the same tick when it appears in more than one
creation path. We
+ return ``dag_id`` strings rather than full Dag/DagRun objects because
the only downstream
+ use is membership lookup, and a heavier return type would just be
discarded.
+ """
+ # Cap per-tick work so the scheduler transaction stays bounded and
other
+ # scheduling work isn't starved. Remaining APDRs drain across
subsequent ticks.
+ # Note: with strict FIFO ordering, persistently-unsatisfied APDRs at
the head
+ # of the queue would block newer ones; switch to updated_at-based
ordering if
+ # that becomes an issue.
+ pending_apdrs = session.scalars(
+ select(AssetPartitionDagRun)
+ .join(DagModel, DagModel.dag_id ==
AssetPartitionDagRun.target_dag_id)
+ .where(
+ AssetPartitionDagRun.created_dag_run_id.is_(None),
+ DagModel.is_stale.is_(False),
+ )
+ .order_by(AssetPartitionDagRun.created_at)
+ .limit(self._max_partition_dag_runs_per_loop)
+ ).all()
+ if not pending_apdrs:
+ return set()
+
partition_dag_ids: set[str] = set()
+ pending_apdr_ids = [apdr.id for apdr in pending_apdrs]
- evaluator = AssetEvaluator(session)
- for apdr in session.scalars(
-
select(AssetPartitionDagRun).where(AssetPartitionDagRun.created_dag_run_id.is_(None))
+ # Pre-fetch all required serialized Dags in one query.
+ dag_ids = list({apdr.target_dag_id for apdr in pending_apdrs if
apdr.target_dag_id})
+ # {"dag_id": Serialized Dag}
+ serialized_dags: dict[str, SerializedDAG] = {}
+ for serdag in
SerializedDagModel.get_latest_serialized_dags(dag_ids=dag_ids, session=session):
+ try:
+ serdag.load_op_links = False
+ serialized_dags[serdag.dag_id] = serdag.dag
+ except Exception:
+ self.log.exception("Failed to deserialize Dag '%s'",
serdag.dag_id)
+
+ # {apdr_id: {asset_id: set(source_key, ...)}
+ source_key_by_asset_per_apdr: dict[int, dict[int, set[str]]] =
defaultdict(lambda: defaultdict(set))
+ # {apdr_id: {asset_id: (asset_name, asset_uri)}
+ asset_info_per_apdr: dict[int, dict[int, tuple[str, str]]] =
defaultdict(dict)
+ for apdr_id, asset_id, source_key, name, uri in session.execute(
+ select(
+ PartitionedAssetKeyLog.asset_partition_dag_run_id,
+ PartitionedAssetKeyLog.asset_id,
+ PartitionedAssetKeyLog.source_partition_key,
+ AssetModel.name,
+ AssetModel.uri,
+ )
+ .join(AssetModel, AssetModel.id == PartitionedAssetKeyLog.asset_id)
+
.where(PartitionedAssetKeyLog.asset_partition_dag_run_id.in_(pending_apdr_ids))
):
+ # ``source_partition_key`` is ``str | None`` in the schema; coerce
+ # NULL to ``""`` so the set stays ``set[str]`` and matches how the
+ # UI routes uniformly normalise the same column.
+ source_key_by_asset_per_apdr[apdr_id][asset_id].add(source_key or
"")
Review Comment:
`source_partition_key` is typed `Mapped[str | None]` but the column is
`nullable=False` in `models/asset.py:944` (pre-existing inconsistency).
The defensive `or ""` here masks two things: (a) a NULL slip-in via raw SQL
or migration would silently collapse all keys to `""` and falsely satisfy a
rollup, (b) an upstream emitting an empty-string `partition_key` is
indistinguishable from a buggy NULL. Worth fixing the column type hint to
`Mapped[str]` and dropping the `or ""` here (same pattern repeats in
`assets.py:453` and `partitioned_dag_runs.py:771`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]