TheNeuralBit commented on code in PR #17384: URL: https://github.com/apache/beam/pull/17384#discussion_r863281360
########## sdks/python/apache_beam/utils/windowed_value.py: ########## @@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN): return wv +class BatchingMode(Enum): + CONCRETE = 1 + HOMOGENEOUS = 2 + + +class WindowedBatch(object): + """A batch of N windowed values, each having a value, a timestamp and set of + windows.""" + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + + """Creates a new WindowedBatch with the same timestamps and windows as this. + + This is the fasted way to create a new WindowedValue. + """ + raise NotImplementedError + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + raise NotImplementedError + + @staticmethod + def from_windowed_values( + windowed_values: Sequence[WindowedValue], + *, + produce_fn: Callable, + mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']: + if mode == BatchingMode.HOMOGENEOUS: + import collections + grouped = collections.defaultdict(lambda: []) + for wv in windowed_values: + grouped[(wv.timestamp, tuple(wv.windows), Review Comment: great idea, done! To make this work I also had to update WindowedValue.__hash__ to coerce windows to a tuple -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org