TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863280259


##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, 
pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as 
this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> 
Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):
+    return self._wv.timestamp
+
+  @property
+  def pane_info(self):
+    return self._wv.pane_info
+
+  @property
+  def windows(self):
+    return self._wv.windows
+
+  @windows.setter
+  def windows(self, value):
+    self._wv.windows = value
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+    return HomogeneousWindowedBatch(self._wv.with_value(new_values))
+
+  def as_windowed_values(self, explode_fn: Callable) -> 
Iterable[WindowedValue]:
+    for value in explode_fn(self._wv.value):
+      yield self._wv.with_value(value)
+
+  def __eq__(self, other):
+    if isinstance(other, HomogeneousWindowedBatch):
+      return self._wv == other._wv
+    return NotImplemented
+
+  def __hash__(self):
+    return hash(self._wv)
+
+
+class ConcreteWindowedBatch(WindowedBatch):
+  """A concrete WindowedBatch where all event-time information is stored
+  independently for each element.
+
+  Attributes:
+    values: The underlying values of the windowed batch.
+    timestamp: An iterable of timestamps associated with the value as seconds
+      since Unix epoch.
+    windows: An iterable with a set (iterable) of window objects for each 
value.
+      The window objects are descendants of the BoundedWindow class.
+    pane_info: An iterable of PaneInfo descriptors describing the triggering
+      information for the pane that contained each value. Alternatively, a
+      single PaneInfo may be specified to use for every value. If None, will be
+      set to PANE_INFO_UNKNOWN.
+  """
+  def __init__(
+      self,
+      values,
+      timestamps,  # type: Sequence[TimestampTypes]
+      windows,  # type: Iterable[Tuple[BoundedWindow, ...]]
+      pane_infos=PANE_INFO_UNKNOWN  # type: Union[Iterable[PaneInfo],PaneInfo]

Review Comment:
   Sorry which suggestion above?
   
   I went ahead and removed all the defaults for pane_infos in the 
WindowedBatch implementations



##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, 
pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as 
this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> 
Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):
+    return self._wv.timestamp
+
+  @property
+  def pane_info(self):
+    return self._wv.pane_info
+
+  @property
+  def windows(self):
+    return self._wv.windows
+
+  @windows.setter
+  def windows(self, value):
+    self._wv.windows = value
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+    return HomogeneousWindowedBatch(self._wv.with_value(new_values))
+
+  def as_windowed_values(self, explode_fn: Callable) -> 
Iterable[WindowedValue]:
+    for value in explode_fn(self._wv.value):
+      yield self._wv.with_value(value)
+
+  def __eq__(self, other):
+    if isinstance(other, HomogeneousWindowedBatch):
+      return self._wv == other._wv
+    return NotImplemented
+
+  def __hash__(self):
+    return hash(self._wv)
+
+
+class ConcreteWindowedBatch(WindowedBatch):
+  """A concrete WindowedBatch where all event-time information is stored
+  independently for each element.
+
+  Attributes:
+    values: The underlying values of the windowed batch.
+    timestamp: An iterable of timestamps associated with the value as seconds
+      since Unix epoch.
+    windows: An iterable with a set (iterable) of window objects for each 
value.
+      The window objects are descendants of the BoundedWindow class.
+    pane_info: An iterable of PaneInfo descriptors describing the triggering
+      information for the pane that contained each value. Alternatively, a
+      single PaneInfo may be specified to use for every value. If None, will be
+      set to PANE_INFO_UNKNOWN.
+  """
+  def __init__(
+      self,
+      values,
+      timestamps,  # type: Sequence[TimestampTypes]
+      windows,  # type: Iterable[Tuple[BoundedWindow, ...]]
+      pane_infos=PANE_INFO_UNKNOWN  # type: Union[Iterable[PaneInfo],PaneInfo]
+  ):
+    self.values = values
+
+    def convert_timestamp(timestamp: TimestampTypes) -> int:
+      if isinstance(timestamp, int):
+        return timestamp * 1000000
+      else:
+        # TODO: Cache Timestamp object as in WindowedValue?
+        timestamp_object = (
+            timestamp
+            if isinstance(timestamp, Timestamp) else Timestamp.of(timestamp))
+        return timestamp_object.micros
+
+    self.timestamp_objects: Optional[List[Timestamp]] = None
+    self.timestamps_micros = [convert_timestamp(t) for t in timestamps]
+    self.windows = windows
+    #TODO: Should we store length?
+    #self.length = length
+    self.pane_infos = pane_infos
+
+  @property
+  def timestamps(self) -> Sequence[Timestamp]:
+    if self.timestamp_objects is None:
+      self.timestamp_objects = [
+          Timestamp(0, micros) for micros in self.timestamps_micros
+      ]
+
+    return self.timestamp_objects
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as 
this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    return create_batch(
+        new_values, self.timestamps_micros, self.windows, self.pane_infos)
+
+  def as_windowed_values(self, explode_fn: Callable) -> 
Iterable[WindowedValue]:
+    for value, timestamp, windows, pane_info in zip(explode_fn(self.values),
+                                                    self.timestamps_micros,
+                                                    self.windows,
+                                                    self._pane_infos_iter()):
+      yield create(value, timestamp, windows, pane_info)
+
+  def _pane_infos_iter(self):
+    if isinstance(self.pane_infos, PaneInfo):
+      return itertools.repeat(self.pane_infos, len(self.timestamps_micros))
+    else:
+      return self.pane_infos
+
+  def __eq__(self, other):
+    if isinstance(other, ConcreteWindowedBatch):
+      return (
+          type(self) == type(other) and
+          self.timestamps_micros == other.timestamps_micros and
+          self.values == other.values and self.windows == other.windows and
+          self.pane_infos == other.pane_infos)
+    return NotImplemented
+
+  def __hash__(self):
+    if isinstance(self.pane_infos, PaneInfo):
+      pane_infos_hash = hash(self.pane_infos)
+    else:
+      pane_infos_hash = sum(hash(p) for p in self.pane_infos)
+
+    return ((hash(self.values) & 0xFFFFFFFFFFFFFFF) + 3 *
+            (sum(self.timestamps_micros) & 0xFFFFFFFFFFFFFF) + 7 *
+            (sum(hash(w) for w in self.windows) & 0xFFFFFFFFFFFFF) + 11 *
+            (pane_infos_hash & 0xFFFFFFFFFFFFF))
+
+
+def create_batch(

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to