Lee-W commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3279680845
##########
airflow-core/src/airflow/partition_mappers/temporal.py:
##########
@@ -98,18 +98,23 @@ def normalize(self, dt: datetime) -> datetime:
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
-class StartOfWeekMapper(_BaseTemporalMapper):
- """Map a time-based partition key to week."""
+class StartOfWeekMapper(_BaseTemporalMapper, RollupMapper):
+ """Map a time-based partition key to week (Mon–Sun), requiring all 7 daily
keys."""
default_output_format = "%Y-%m-%d (W%V)"
def normalize(self, dt: datetime) -> datetime:
start = dt - timedelta(days=dt.weekday())
return start.replace(hour=0, minute=0, second=0, microsecond=0)
+ def to_upstream(self, downstream_key: str) -> frozenset[str]:
+ # The output format starts with %Y-%m-%d which is always the Monday of
the week.
+ week_start = datetime.strptime(downstream_key[:10], "%Y-%m-%d")
+ return frozenset((week_start +
timedelta(days=i)).strftime(self.input_format) for i in range(7))
Review Comment:
Fixed. `decode_downstream` no longer slices `downstream_key[:10]` with a
hard-coded format — it now uses a compiled named-group regex derived from
`output_format` via `_compile_output_format_regex`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]