[
https://issues.apache.org/jira/browse/BEAM-14294?focusedWorklogId=765596&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765596
]
ASF GitHub Bot logged work on BEAM-14294:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 03/May/22 18:06
Start Date: 03/May/22 18:06
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864048437
##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows,
pane_info=PANE_INFO_UNKNOWN):
return wv
+class BatchingMode(Enum):
+ CONCRETE = 1
+ HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+ """A batch of N windowed values, each having a value, a timestamp and set of
+ windows."""
+ def with_values(self, new_values):
+ # type: (Any) -> WindowedBatch
+
+ """Creates a new WindowedBatch with the same timestamps and windows as
this.
+
+ This is the fasted way to create a new WindowedValue.
+ """
+ raise NotImplementedError
+
+ def as_windowed_values(self, explode_fn: Callable) ->
Iterable[WindowedValue]:
+ raise NotImplementedError
+
+ @staticmethod
+ def from_windowed_values(
+ windowed_values: Sequence[WindowedValue],
+ *,
+ produce_fn: Callable,
+ mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+ if mode == BatchingMode.HOMOGENEOUS:
+ import collections
+ grouped = collections.defaultdict(lambda: [])
+ for wv in windowed_values:
+ grouped[(wv.timestamp, tuple(wv.windows),
+ wv.pane_info)].append(wv.value)
+
+ for key, values in grouped.items():
+ timestamp, windows, pane_info = key
+ yield HomogeneousWindowedBatch.of(
+ produce_fn(values), timestamp, windows, pane_info)
+ elif mode == BatchingMode.CONCRETE:
+ yield ConcreteWindowedBatch(
+ produce_fn([wv.value for wv in windowed_values]),
+ [wv.timestamp
+ for wv in windowed_values], [wv.windows for wv in windowed_values],
+ [wv.pane_info for wv in windowed_values])
+ else:
+ raise AssertionError(
+ "Unrecognized BatchingMode in "
+ f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+ """A WindowedBatch with Homogeneous event-time information, represented
+ internally as a WindowedValue.
+ """
+ def __init__(self, wv):
+ self._wv = wv
+
+ @staticmethod
+ def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+ return HomogeneousWindowedBatch(
+ WindowedValue(values, timestamp, windows, pane_info))
+
+ @property
+ def values(self):
+ return self._wv.value
+
+ @property
+ def timestamp(self):
+ return self._wv.timestamp
+
+ @property
+ def pane_info(self):
+ return self._wv.pane_info
+
+ @property
+ def windows(self):
+ return self._wv.windows
+
+ @windows.setter
Review Comment:
I'll resolve this comment, let's keep discussing this in your comment where
you discovered window assignment.
Issue Time Tracking
-------------------
Worklog Id: (was: 765596)
Time Spent: 5h 20m (was: 5h 10m)
> MVP for SDK worker changes to support process_batch
> ---------------------------------------------------
>
> Key: BEAM-14294
> URL: https://issues.apache.org/jira/browse/BEAM-14294
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Brian Hulette
> Priority: P2
> Time Spent: 5h 20m
> Remaining Estimate: 0h
>
> The initial MVP may only work in some restricted circumstances (e.g.
> @yields_element on process_batch, or batch-to-batch without a 1:1
> input:output mapping might not be supported). These cases should fail early.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)