Lee-W commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3323214549


##########
airflow-core/src/airflow/partition_mappers/window.py:
##########
@@ -0,0 +1,181 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from datetime import datetime, timedelta
+from typing import TYPE_CHECKING, Any, ClassVar
+
+if TYPE_CHECKING:
+    from collections.abc import Iterable
+
+
+def _require_day_one(dt: datetime, window_cls: type) -> None:
+    """
+    Raise ``ValueError`` if *dt* is not on day 1 of its month.
+
+    Month-aligned windows expand by month-step arithmetic, which is only
+    safe when the period starts on day 1 (otherwise e.g.
+    ``replace(month=feb)`` on Jan 31 raises ``ValueError: day is out of
+    range for month``). Built-in temporal upstream mappers normalise to
+    day 1, but a custom ``PartitionMapper.decode_downstream`` could return
+    any day — checking here turns a confusing scheduler-tick crash into
+    an explicit signal about the upstream-mapper contract.
+    """
+    if dt.day != 1:
+        raise ValueError(
+            f"{window_cls.__name__} expects a period start on day 1 of the 
month, "
+            f"got {dt.isoformat()}. The paired upstream mapper's 
decode_downstream "
+            "must normalise to day 1."
+        )
+
+
+def _shift_months(dt: datetime, months: int) -> datetime:
+    """
+    Return *dt* shifted forward by *months*, wrapping the year as needed.
+
+    Caller is responsible for ensuring *dt* is on day 1 (see
+    :func:`_require_day_one`) so that ``replace(month=...)`` is always valid.
+    """
+    total = dt.month - 1 + months
+    return dt.replace(year=dt.year + total // 12, month=total % 12 + 1)
+
+
+class Window(ABC):

Review Comment:
   "Complex Schedule-Partition Mapping" is future work and not part of 3.3. No 
plan for this yet. Added a docstring note on the Window ABC (core + SDK) 
describing the contiguous-only contract — to_upstream is only one side; the 
other (which downstreams an upstream feeds) lives on 
PartitionMapper.to_downstream, which already accepts an iterable for fan-out. A 
determined user can wire sliding today by subclassing both. Built-ins shipped 
here stay 1:1 contiguous; the API leaves room to consolidate both directions on 
Window later without breaking existing subclasses.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to