kaxil commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3307554819


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py:
##########
@@ -55,13 +68,19 @@ def next_run_assets(
     pending_partition_count: int | None = None
 
     queued_expr: ColumnElement[int]
-    if is_partitioned := dag_model.timetable_summary == "Partitioned Asset":
+    if is_partitioned := dag_model.timetable_partitioned:
         pending_partition_count = session.scalar(
-            select(func.count())
+            select(func.count(AssetPartitionDagRun.id.distinct()))
             .select_from(AssetPartitionDagRun)
+            .join(
+                DagScheduleAssetReference,
+                DagScheduleAssetReference.dag_id == 
AssetPartitionDagRun.target_dag_id,
+            )
+            .join(AssetModel, AssetModel.id == 
DagScheduleAssetReference.asset_id)
             .where(
                 AssetPartitionDagRun.target_dag_id == dag_id,
                 AssetPartitionDagRun.created_dag_run_id.is_(None),
+                AssetModel.active.has(),

Review Comment:
   Adding `AssetModel.active.has()` makes `pending_partition_count` return 0 
when all of a Dag's scheduled assets are inactive, even if pending APDRs exist. 
That matches the scheduler's freeze contract 
(`_create_dagruns_for_partitioned_asset_dags` similarly skips inactive-asset 
logs), but the UI consumer reading "0 pending" has no way to distinguish 
"nothing pending" from "frozen due to deactivated upstream." Worth either 
dropping the `active` filter for the count (count reflects actual pending rows, 
surface the frozen state separately) or adding a follow-up field like 
`frozen_partition_count` so the UI can show why nothing's progressing.



##########
airflow-core/src/airflow/partition_mappers/temporal.py:
##########
@@ -62,6 +180,29 @@ def format(self, dt: datetime) -> str:
         """Format the normalized datetime."""
         return dt.strftime(self.output_format)
 
+    def decode_downstream(self, downstream_key: str) -> datetime:
+        """
+        Recover the period-start datetime from a previously formatted 
downstream key.
+
+        Inverse of ``format``. The default implementation uses ``strptime`` 
with
+        ``output_format``, which works for any format made of standard strptime
+        directives. Subclasses with custom format markers (e.g. ``{quarter}``) 
or
+        ambiguous directives (e.g. bare ``%V``) override this.
+        """
+        return datetime.strptime(downstream_key, self.output_format)
+
+    def encode_upstream(self, dt: datetime) -> str:
+        """
+        Format *dt* as an upstream partition key string.
+
+        Pair of :meth:`decode_downstream`: takes a (decoded) period-start
+        datetime and produces a key string in the upstream's ``input_format``
+        with ``timezone`` applied. Used by :class:`RollupMapper` to render each
+        upstream member yielded by the window back into the form upstream
+        producers actually emit.
+        """
+        return make_aware(dt, self._timezone).strftime(self.input_format)

Review Comment:
   `make_aware()` raises `ValueError` on aware input, but 
`_BaseTemporalMapper.decode_downstream` (line 192) uses the default 
`strptime(downstream_key, self.output_format)` and returns a tz-aware datetime 
when `output_format` contains `%z`. That aware dt flows through 
`window.to_upstream(decoded)` (preserved across `timedelta` arithmetic) and 
lands here. The scheduler's outer `try/except` catches it and writes an audit 
log, but the failure mode is non-obvious. Operators see "failed to evaluate 
rollup status" with no hint that the cause is `output_format` containing a 
timezone directive. Either strip tzinfo before `make_aware` 
(`make_aware(dt.replace(tzinfo=None), self._timezone)`) so aware inputs 
round-trip, or document the constraint alongside the other `output_format` 
requirements on the temporal mappers.



##########
airflow-core/src/airflow/partition_mappers/base.py:
##########
@@ -18,26 +18,128 @@
 from __future__ import annotations
 
 from abc import ABC, abstractmethod
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, ClassVar
 
 if TYPE_CHECKING:
     from collections.abc import Iterable
 
+    from airflow.partition_mappers.window import Window
+
 
 class PartitionMapper(ABC):
     """
     Base partition mapper class.
 
-    Maps keys from asset events to target dag run partitions.
+    Maps keys from asset events to target Dag run partitions.
     """
 
+    is_rollup: ClassVar[bool] = False
+
+    def __init_subclass__(cls, **kwargs: Any) -> None:
+        super().__init_subclass__(**kwargs)
+        decode_overridden = cls.decode_downstream is not 
PartitionMapper.decode_downstream
+        encode_overridden = cls.encode_upstream is not 
PartitionMapper.encode_upstream
+        if decode_overridden ^ encode_overridden:
+            raise TypeError(
+                f"{cls.__qualname__} overrides only one side of the 
decode/encode pair. "
+                "Both 'decode_downstream' and 'encode_upstream' must be 
overridden together "
+                "or not at all. Overriding only one side leaves 
RollupMapper.to_upstream "
+                "producing non-str members, causing the scheduler's 
upstream-window check "
+                "to silently never satisfy and the Dag run to be held forever."
+            )
+
     @abstractmethod
     def to_downstream(self, key: str) -> str | Iterable[str]:
         """Return the target key that the given source partition key maps 
to."""
 
+    def decode_downstream(self, downstream_key: str) -> Any:
+        """
+        Recover the canonical decoded form of *downstream_key*.
+
+        Used by :class:`RollupMapper` to hand the window an opaque "anchor"
+        for the downstream period; the window iterates in this decoded space
+        and the mapper re-encodes each expected upstream via
+        :meth:`encode_upstream`. Default is identity (string in, string out)
+        — temporal mappers override to return ``datetime``, future segment
+        mappers will return whatever shape suits them.
+
+        .. warning::
+
+            ``decode_downstream`` and :meth:`encode_upstream` form an inverse
+            pair. If you override this method to return a non-str (e.g.
+            ``datetime``, ``tuple``), you **must** also override
+            ``encode_upstream`` to convert the decoded form back to an
+            upstream key string. Overriding only one side leaves
+            :class:`RollupMapper.to_upstream` producing non-str members, the
+            scheduler's upstream-window check silently never satisfies, and
+            the Dag run is held forever with no audit log entry.
+        """
+        return downstream_key
+
+    def encode_upstream(self, decoded: Any) -> str:
+        """
+        Encode an expected upstream object back into a key string.
+
+        Pair of :meth:`decode_downstream`. Default is identity. Temporal
+        mappers override to apply timezone + ``input_format``.
+
+        .. warning::
+
+            The default identity implementation is only correct when
+            :meth:`decode_downstream` also uses the identity default (str in,
+            str out). See :meth:`decode_downstream` for the consequences of
+            overriding only one side of the pair.
+        """
+        return decoded
+
     def serialize(self) -> dict[str, Any]:
         return {}
 
     @classmethod
     def deserialize(cls, data: dict[str, Any]) -> PartitionMapper:
         return cls()
+
+
+class RollupMapper(PartitionMapper):
+    """
+    Partition mapper that rolls up many upstream keys into one downstream key.
+
+    Compose a ``upstream_mapper`` (which normalizes each upstream key to the
+    downstream granularity) with a ``window`` that declares the full set of
+    upstream keys required for a given downstream key. The scheduler holds
+    the Dag run until every upstream key in the window has arrived.
+    """
+
+    is_rollup: ClassVar[bool] = True
+
+    def __init__(self, *, upstream_mapper: PartitionMapper, window: Window) -> 
None:

Review Comment:
   `RollupMapper.__init__` accepts any `(upstream_mapper, window)` pair, but 
the pairs are coupled: a `DayWindow`/`MonthWindow`/etc. needs 
`upstream_mapper.decode_downstream` to return a `datetime`, and an 
`IdentityMapper`-style mapper that doesn't override the decode/encode pair will 
hand the window a `str`. The `__init_subclass__` check at line 38 catches 
"overrides one side but not both" within a single mapper, but doesn't catch 
"didn't override either side, paired with a window that needs the override." 
Today the failure surfaces as a runtime `TypeError` deep inside 
`window.to_upstream(decoded)` on the first scheduler tick, gets swallowed by 
`_resolve_asset_partition_status`, and the Dag is silently held with only an 
audit log row. Consider validating compatibility at construction. For example, 
windows could declare an expected decoded type and `RollupMapper.__init__` 
would raise when the upstream mapper's `decode_downstream` is the base identity 
but the window isn't string-frien
 dly.



##########
airflow-core/src/airflow/partition_mappers/temporal.py:
##########
@@ -27,7 +28,124 @@
     from pendulum import FixedTimezone, Timezone
 
 
-class _BaseTemporalMapper(PartitionMapper, ABC):
+_STRPTIME_PATTERNS: dict[str, str] = {
+    "%Y": r"\d{4}",
+    "%m": r"\d{2}",
+    "%d": r"\d{2}",
+    "%H": r"\d{2}",
+    "%M": r"\d{2}",
+    "%S": r"\d{2}",
+    "%V": r"\d{2}",
+    "%U": r"\d{2}",
+    "%W": r"\d{2}",
+}
+
+
+def _compile_output_format_regex(
+    fmt: str, placeholder_patterns: dict[str, str] | None = None
+) -> re.Pattern[str]:
+    r"""
+    Compile *fmt* into a regex with named groups so a formatted key can be 
parsed back.
+
+    Walks *fmt* left-to-right, classifying each position into one of three
+    branches:
+
+    1. A two-character strftime directive listed in ``_STRPTIME_PATTERNS``
+       (``%Y``, ``%m``, ``%V`` …) becomes a named group keyed on the directive
+       letter — e.g. ``%Y`` → ``(?P<Y>\d{4})``. The ``i + 1 < len(fmt)`` guard
+       lets a trailing bare ``%`` fall through to the literal branch (escaped
+       as ``\%``), and unrecognised directives like ``%X`` likewise fall
+       through (matched literally; the format will not round-trip through
+       strftime output that uses them).
+    2. A FastAPI-style ``{name}`` placeholder becomes a named group keyed on
+       *name*. The pattern defaults to ``\w+``; pass *placeholder_patterns*
+       to narrow it (e.g. ``{"quarter": r"[1-4]"}``). A ``{`` with no
+       matching ``}`` falls through to the literal branch (so a stray ``{``
+       is escaped, not raised).
+    3. Anything else is escaped via :func:`re.escape` and emitted verbatim,
+       so separators, literal hyphens, ``Q``/``W`` prefix letters, and so on
+       all participate in the match.
+
+    The compiled regex is consumed via ``.search`` (not ``.fullmatch``) so the
+    caller does not need to anchor on the literal prefix/suffix length.
+
+    Raises :exc:`ValueError` at compile time when *fmt* is structurally
+    malformed — empty ``{}``, a placeholder name that is not a valid Python
+    identifier, the same strftime directive used twice, the same
+    placeholder used twice, or two adjacent default-pattern placeholders
+    (the greedy ``\w+`` default would consume the previous group's tail,
+    yielding a parse the caller almost never intends — insert a separator
+    or narrow at least one placeholder via *placeholder_patterns*).
+    Catching these at construction keeps a misconfigured mapper from
+    surfacing as an opaque ``re.error`` deep inside the scheduler tick or
+    a UI route.
+
+    Known limitation: locale-sensitive strftime directives (``%A``, ``%a``,
+    ``%B``, ``%b``, ``%p``) are not in ``_STRPTIME_PATTERNS`` and are
+    treated as literals; formats relying on them are not invertible by
+    this helper.
+    """
+    placeholder_patterns = placeholder_patterns or {}
+    parts: list[str] = []
+    seen_groups: set[str] = set()
+    prev_was_default_placeholder = False
+    i = 0
+    while i < len(fmt):
+        if fmt[i] == "%" and i + 1 < len(fmt) and fmt[i : i + 2] in 
_STRPTIME_PATTERNS:
+            directive = fmt[i : i + 2]
+            name = directive[1]
+            if name in seen_groups:
+                raise ValueError(
+                    f"output_format {fmt!r} reuses directive {directive!r}; "
+                    "each strftime directive may appear at most once."
+                )
+            seen_groups.add(name)
+            parts.append(f"(?P<{name}>{_STRPTIME_PATTERNS[directive]})")
+            i += 2
+            prev_was_default_placeholder = False
+            continue
+        if fmt[i] == "{":
+            end = fmt.find("}", i)
+            if end != -1:
+                name = fmt[i + 1 : end]
+                if not name.isidentifier():
+                    raise ValueError(
+                        f"output_format {fmt!r} has invalid placeholder 
{{{name}}}; "
+                        "placeholder names must be valid Python identifiers."
+                    )
+                if name in seen_groups:
+                    raise ValueError(
+                        f"output_format {fmt!r} reuses placeholder {{{name}}}; 
"
+                        "each placeholder may appear at most once."
+                    )
+                seen_groups.add(name)
+                is_default = name not in placeholder_patterns
+                if is_default and prev_was_default_placeholder:
+                    # Two adjacent default-pattern (``\w+``) placeholders.
+                    # The greedy first group consumes everything except the
+                    # minimum length the second needs, producing a parse the
+                    # caller almost never intends (e.g. ``{a}{b}`` on "foo123"
+                    # yields a="foo12", b="3"). Either insert a literal
+                    # separator between them or narrow at least one pattern
+                    # via *placeholder_patterns*.
+                    raise ValueError(
+                        f"output_format {fmt!r} has adjacent default-pattern 
placeholders "
+                        f"ending at {{{name}}}; the greedy ``\\w+`` default 
would consume "
+                        "the previous group's tail. Insert a separator between 
the "
+                        "placeholders, or narrow at least one of them via 
placeholder_patterns."
+                    )
+                pattern = placeholder_patterns.get(name, r"\w+")
+                parts.append(f"(?P<{name}>{pattern})")
+                i = end + 1
+                prev_was_default_placeholder = is_default
+                continue
+        parts.append(re.escape(fmt[i]))
+        i += 1
+        prev_was_default_placeholder = False
+    return re.compile("".join(parts))

Review Comment:
   `_compile_output_format_regex` returns an unanchored pattern and the 
subclass `decode_downstream` overrides use `.search()`, so a malformed key with 
extra prefix/suffix matches the first numeric run and silently succeeds. 
Example: `output_format="%Y-%m-%d (W%V)"` would happily parse `"2024-03-11 
(W11) trailing-garbage"` as a valid week key. The compile-time validations 
(duplicate directives, adjacent default placeholders) catch construction-side 
bugs, but the runtime decode path doesn't notice malformed inputs. Worth 
anchoring with `re.fullmatch` (or wrapping the compiled pattern with `^...$`) 
so a key the mapper can't actually produce raises instead of silently parsing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to