TheNeuralBit commented on code in PR #17384: URL: https://github.com/apache/beam/pull/17384#discussion_r863260992
########## sdks/python/apache_beam/utils/windowed_value.py: ########## @@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN): return wv +class BatchingMode(Enum): + CONCRETE = 1 + HOMOGENEOUS = 2 + + +class WindowedBatch(object): + """A batch of N windowed values, each having a value, a timestamp and set of + windows.""" + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + + """Creates a new WindowedBatch with the same timestamps and windows as this. + + This is the fasted way to create a new WindowedValue. + """ + raise NotImplementedError + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + raise NotImplementedError + + @staticmethod + def from_windowed_values( + windowed_values: Sequence[WindowedValue], + *, + produce_fn: Callable, + mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']: + if mode == BatchingMode.HOMOGENEOUS: + import collections + grouped = collections.defaultdict(lambda: []) + for wv in windowed_values: + grouped[(wv.timestamp, tuple(wv.windows), + wv.pane_info)].append(wv.value) + + for key, values in grouped.items(): + timestamp, windows, pane_info = key + yield HomogeneousWindowedBatch.of( + produce_fn(values), timestamp, windows, pane_info) + elif mode == BatchingMode.CONCRETE: + yield ConcreteWindowedBatch( + produce_fn([wv.value for wv in windowed_values]), + [wv.timestamp + for wv in windowed_values], [wv.windows for wv in windowed_values], + [wv.pane_info for wv in windowed_values]) + else: + raise AssertionError( + "Unrecognized BatchingMode in " + f"WindowedBatch.from_windowed_values: {mode!r}") + + +class HomogeneousWindowedBatch(WindowedBatch): + """A WindowedBatch with Homogeneous event-time information, represented + internally as a WindowedValue. + """ + def __init__(self, wv): + self._wv = wv + + @staticmethod + def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN): + return HomogeneousWindowedBatch( + WindowedValue(values, timestamp, windows, pane_info)) + + @property + def values(self): + return self._wv.value + + @property + def timestamp(self): Review Comment: These are used in `PerWindowInvoker._invoke_process_batch_per_window` to populate DoFn params -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org