[ 
https://issues.apache.org/jira/browse/BEAM-14294?focusedWorklogId=765240&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765240
 ]

ASF GitHub Bot logged work on BEAM-14294:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/May/22 00:35
            Start Date: 03/May/22 00:35
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863280259


##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, 
pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as 
this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> 
Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):
+    return self._wv.timestamp
+
+  @property
+  def pane_info(self):
+    return self._wv.pane_info
+
+  @property
+  def windows(self):
+    return self._wv.windows
+
+  @windows.setter
+  def windows(self, value):
+    self._wv.windows = value
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+    return HomogeneousWindowedBatch(self._wv.with_value(new_values))
+
+  def as_windowed_values(self, explode_fn: Callable) -> 
Iterable[WindowedValue]:
+    for value in explode_fn(self._wv.value):
+      yield self._wv.with_value(value)
+
+  def __eq__(self, other):
+    if isinstance(other, HomogeneousWindowedBatch):
+      return self._wv == other._wv
+    return NotImplemented
+
+  def __hash__(self):
+    return hash(self._wv)
+
+
+class ConcreteWindowedBatch(WindowedBatch):
+  """A concrete WindowedBatch where all event-time information is stored
+  independently for each element.
+
+  Attributes:
+    values: The underlying values of the windowed batch.
+    timestamp: An iterable of timestamps associated with the value as seconds
+      since Unix epoch.
+    windows: An iterable with a set (iterable) of window objects for each 
value.
+      The window objects are descendants of the BoundedWindow class.
+    pane_info: An iterable of PaneInfo descriptors describing the triggering
+      information for the pane that contained each value. Alternatively, a
+      single PaneInfo may be specified to use for every value. If None, will be
+      set to PANE_INFO_UNKNOWN.
+  """
+  def __init__(
+      self,
+      values,
+      timestamps,  # type: Sequence[TimestampTypes]
+      windows,  # type: Iterable[Tuple[BoundedWindow, ...]]
+      pane_infos=PANE_INFO_UNKNOWN  # type: Union[Iterable[PaneInfo],PaneInfo]

Review Comment:
   Sorry which suggestion above?
   
   I went ahead and removed all the defaults for pane_infos in the 
WindowedBatch implementations



##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, 
pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as 
this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> 
Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):
+    return self._wv.timestamp
+
+  @property
+  def pane_info(self):
+    return self._wv.pane_info
+
+  @property
+  def windows(self):
+    return self._wv.windows
+
+  @windows.setter
+  def windows(self, value):
+    self._wv.windows = value
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+    return HomogeneousWindowedBatch(self._wv.with_value(new_values))
+
+  def as_windowed_values(self, explode_fn: Callable) -> 
Iterable[WindowedValue]:
+    for value in explode_fn(self._wv.value):
+      yield self._wv.with_value(value)
+
+  def __eq__(self, other):
+    if isinstance(other, HomogeneousWindowedBatch):
+      return self._wv == other._wv
+    return NotImplemented
+
+  def __hash__(self):
+    return hash(self._wv)
+
+
+class ConcreteWindowedBatch(WindowedBatch):
+  """A concrete WindowedBatch where all event-time information is stored
+  independently for each element.
+
+  Attributes:
+    values: The underlying values of the windowed batch.
+    timestamp: An iterable of timestamps associated with the value as seconds
+      since Unix epoch.
+    windows: An iterable with a set (iterable) of window objects for each 
value.
+      The window objects are descendants of the BoundedWindow class.
+    pane_info: An iterable of PaneInfo descriptors describing the triggering
+      information for the pane that contained each value. Alternatively, a
+      single PaneInfo may be specified to use for every value. If None, will be
+      set to PANE_INFO_UNKNOWN.
+  """
+  def __init__(
+      self,
+      values,
+      timestamps,  # type: Sequence[TimestampTypes]
+      windows,  # type: Iterable[Tuple[BoundedWindow, ...]]
+      pane_infos=PANE_INFO_UNKNOWN  # type: Union[Iterable[PaneInfo],PaneInfo]
+  ):
+    self.values = values
+
+    def convert_timestamp(timestamp: TimestampTypes) -> int:
+      if isinstance(timestamp, int):
+        return timestamp * 1000000
+      else:
+        # TODO: Cache Timestamp object as in WindowedValue?
+        timestamp_object = (
+            timestamp
+            if isinstance(timestamp, Timestamp) else Timestamp.of(timestamp))
+        return timestamp_object.micros
+
+    self.timestamp_objects: Optional[List[Timestamp]] = None
+    self.timestamps_micros = [convert_timestamp(t) for t in timestamps]
+    self.windows = windows
+    #TODO: Should we store length?
+    #self.length = length
+    self.pane_infos = pane_infos
+
+  @property
+  def timestamps(self) -> Sequence[Timestamp]:
+    if self.timestamp_objects is None:
+      self.timestamp_objects = [
+          Timestamp(0, micros) for micros in self.timestamps_micros
+      ]
+
+    return self.timestamp_objects
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as 
this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    return create_batch(
+        new_values, self.timestamps_micros, self.windows, self.pane_infos)
+
+  def as_windowed_values(self, explode_fn: Callable) -> 
Iterable[WindowedValue]:
+    for value, timestamp, windows, pane_info in zip(explode_fn(self.values),
+                                                    self.timestamps_micros,
+                                                    self.windows,
+                                                    self._pane_infos_iter()):
+      yield create(value, timestamp, windows, pane_info)
+
+  def _pane_infos_iter(self):
+    if isinstance(self.pane_infos, PaneInfo):
+      return itertools.repeat(self.pane_infos, len(self.timestamps_micros))
+    else:
+      return self.pane_infos
+
+  def __eq__(self, other):
+    if isinstance(other, ConcreteWindowedBatch):
+      return (
+          type(self) == type(other) and
+          self.timestamps_micros == other.timestamps_micros and
+          self.values == other.values and self.windows == other.windows and
+          self.pane_infos == other.pane_infos)
+    return NotImplemented
+
+  def __hash__(self):
+    if isinstance(self.pane_infos, PaneInfo):
+      pane_infos_hash = hash(self.pane_infos)
+    else:
+      pane_infos_hash = sum(hash(p) for p in self.pane_infos)
+
+    return ((hash(self.values) & 0xFFFFFFFFFFFFFFF) + 3 *
+            (sum(self.timestamps_micros) & 0xFFFFFFFFFFFFFF) + 7 *
+            (sum(hash(w) for w in self.windows) & 0xFFFFFFFFFFFFF) + 11 *
+            (pane_infos_hash & 0xFFFFFFFFFFFFF))
+
+
+def create_batch(

Review Comment:
   Done





Issue Time Tracking
-------------------

    Worklog Id:     (was: 765240)
    Time Spent: 3h 20m  (was: 3h 10m)

> MVP for SDK worker changes to support process_batch
> ---------------------------------------------------
>
>                 Key: BEAM-14294
>                 URL: https://issues.apache.org/jira/browse/BEAM-14294
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Brian Hulette
>            Priority: P2
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> The initial MVP may only work in some restricted circumstances (e.g. 
> @yields_element on process_batch, or batch-to-batch without a 1:1 
> input:output mapping might not be supported). These cases should fail early. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to