[
https://issues.apache.org/jira/browse/BEAM-14294?focusedWorklogId=765188&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765188
]
ASF GitHub Bot logged work on BEAM-14294:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 02/May/22 23:00
Start Date: 02/May/22 23:00
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863246975
##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows,
pane_info=PANE_INFO_UNKNOWN):
return wv
+class BatchingMode(Enum):
+ CONCRETE = 1
+ HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+ """A batch of N windowed values, each having a value, a timestamp and set of
+ windows."""
+ def with_values(self, new_values):
+ # type: (Any) -> WindowedBatch
+
+ """Creates a new WindowedBatch with the same timestamps and windows as
this.
+
+ This is the fasted way to create a new WindowedValue.
+ """
+ raise NotImplementedError
+
+ def as_windowed_values(self, explode_fn: Callable) ->
Iterable[WindowedValue]:
+ raise NotImplementedError
+
+ @staticmethod
+ def from_windowed_values(
+ windowed_values: Sequence[WindowedValue],
+ *,
+ produce_fn: Callable,
+ mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+ if mode == BatchingMode.HOMOGENEOUS:
+ import collections
+ grouped = collections.defaultdict(lambda: [])
+ for wv in windowed_values:
+ grouped[(wv.timestamp, tuple(wv.windows),
+ wv.pane_info)].append(wv.value)
+
+ for key, values in grouped.items():
+ timestamp, windows, pane_info = key
+ yield HomogeneousWindowedBatch.of(
+ produce_fn(values), timestamp, windows, pane_info)
+ elif mode == BatchingMode.CONCRETE:
+ yield ConcreteWindowedBatch(
+ produce_fn([wv.value for wv in windowed_values]),
+ [wv.timestamp
+ for wv in windowed_values], [wv.windows for wv in windowed_values],
+ [wv.pane_info for wv in windowed_values])
+ else:
+ raise AssertionError(
+ "Unrecognized BatchingMode in "
+ f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+ """A WindowedBatch with Homogeneous event-time information, represented
+ internally as a WindowedValue.
+ """
+ def __init__(self, wv):
+ self._wv = wv
+
+ @staticmethod
+ def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+ return HomogeneousWindowedBatch(
+ WindowedValue(values, timestamp, windows, pane_info))
+
+ @property
+ def values(self):
+ return self._wv.value
+
+ @property
+ def timestamp(self):
+ return self._wv.timestamp
+
+ @property
+ def pane_info(self):
+ return self._wv.pane_info
+
+ @property
+ def windows(self):
+ return self._wv.windows
+
+ @windows.setter
+ def windows(self, value):
+ self._wv.windows = value
+
+ def with_values(self, new_values):
+ # type: (Any) -> WindowedBatch
+ return HomogeneousWindowedBatch(self._wv.with_value(new_values))
+
+ def as_windowed_values(self, explode_fn: Callable) ->
Iterable[WindowedValue]:
+ for value in explode_fn(self._wv.value):
+ yield self._wv.with_value(value)
+
+ def __eq__(self, other):
+ if isinstance(other, HomogeneousWindowedBatch):
+ return self._wv == other._wv
+ return NotImplemented
+
+ def __hash__(self):
+ return hash(self._wv)
+
+
+class ConcreteWindowedBatch(WindowedBatch):
+ """A concrete WindowedBatch where all event-time information is stored
+ independently for each element.
+
+ Attributes:
+ values: The underlying values of the windowed batch.
+ timestamp: An iterable of timestamps associated with the value as seconds
+ since Unix epoch.
+ windows: An iterable with a set (iterable) of window objects for each
value.
+ The window objects are descendants of the BoundedWindow class.
+ pane_info: An iterable of PaneInfo descriptors describing the triggering
+ information for the pane that contained each value. Alternatively, a
+ single PaneInfo may be specified to use for every value. If None, will be
+ set to PANE_INFO_UNKNOWN.
+ """
+ def __init__(
+ self,
+ values,
+ timestamps, # type: Sequence[TimestampTypes]
+ windows, # type: Iterable[Tuple[BoundedWindow, ...]]
+ pane_infos=PANE_INFO_UNKNOWN # type: Union[Iterable[PaneInfo],PaneInfo]
+ ):
+ self.values = values
+
+ def convert_timestamp(timestamp: TimestampTypes) -> int:
+ if isinstance(timestamp, int):
+ return timestamp * 1000000
+ else:
+ # TODO: Cache Timestamp object as in WindowedValue?
+ timestamp_object = (
+ timestamp
+ if isinstance(timestamp, Timestamp) else Timestamp.of(timestamp))
+ return timestamp_object.micros
+
+ self.timestamp_objects: Optional[List[Timestamp]] = None
+ self.timestamps_micros = [convert_timestamp(t) for t in timestamps]
+ self.windows = windows
+ #TODO: Should we store length?
+ #self.length = length
+ self.pane_infos = pane_infos
+
+ @property
+ def timestamps(self) -> Sequence[Timestamp]:
+ if self.timestamp_objects is None:
+ self.timestamp_objects = [
+ Timestamp(0, micros) for micros in self.timestamps_micros
+ ]
+
+ return self.timestamp_objects
+
+ def with_values(self, new_values):
Review Comment:
No this implementation is never used, since everything is built on
HomogeneousWindowedBatch for now.
Issue Time Tracking
-------------------
Worklog Id: (was: 765188)
Time Spent: 1h 50m (was: 1h 40m)
> MVP for SDK worker changes to support process_batch
> ---------------------------------------------------
>
> Key: BEAM-14294
> URL: https://issues.apache.org/jira/browse/BEAM-14294
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Brian Hulette
> Priority: P2
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> The initial MVP may only work in some restricted circumstances (e.g.
> @yields_element on process_batch, or batch-to-batch without a 1:1
> input:output mapping might not be supported). These cases should fail early.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)