robertwb commented on code in PR #17384: URL: https://github.com/apache/beam/pull/17384#discussion_r863163035
########## sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py: ########## @@ -121,6 +126,148 @@ def test_pardo(self): | beam.Map(lambda e: e + 'x')) assert_that(res, equal_to(['aax', 'bcbcx'])) + def test_batch_pardo(self): + with self.create_pipeline() as p: + res = ( + p + | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types( + np.int64) + | beam.ParDo(ArrayMultiplyDoFn()) + | beam.Map(lambda x: x * 3)) + + assert_that(res, equal_to([6, 12, 18])) + + def test_batch_rebatch_pardos(self): + with self.create_pipeline() as p: + res = ( + p + | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types( + np.int64) + | beam.ParDo(ArrayMultiplyDoFn()) + | beam.ParDo(ListPlusOneDoFn()) + | beam.Map(lambda x: x * 3)) + + assert_that(res, equal_to([9, 15, 21])) + + def test_batch_pardo_fusion_break(self): + class NormalizeDoFn(beam.DoFn): + @no_type_check + def process_batch( + self, + batch: np.ndarray, + mean: np.float64, + ) -> Iterator[np.ndarray]: + assert isinstance(batch, np.ndarray) + yield batch - mean + + # infer_output_type must be defined (when there's no process method), Review Comment: Does it not just fall back to Any? ########## sdks/python/apache_beam/runners/worker/operations.py: ########## @@ -223,6 +235,120 @@ def current_element_progress(self): return self.consumer.current_element_progress() +class GeneralPurposeConsumerSet(ConsumerSet): + """ConsumerSet implementation that handles all combinations of possible edges. + """ + def __init__(self, + counter_factory, + step_name, # type: str + output_index, + coder, + producer_type_hints, + consumers, # type: List[Operation] + producer_batch_converter): + super().__init__( + counter_factory, + step_name, + output_index, + consumers, + coder, + producer_type_hints) + + self.producer_batch_converter = producer_batch_converter + + # Partition consumers into three groups: + # - consumers that will be passed elements + # - consumers that will be passed batches (where their input batch type + # matches the output of the producer) + # - consumers that will be passed converted batches + self.element_consumers: List[Operation] = [] + self.passthrough_batch_consumers: List[Operation] = [] + other_batch_consumers: DefaultDict[ + BatchConverter, List[Operation]] = collections.defaultdict(lambda: []) + + for consumer in consumers: + if not consumer.get_batching_preference().supports_batches: + self.element_consumers.append(consumer) + elif (consumer.get_input_batch_converter() == + self.producer_batch_converter): + self.passthrough_batch_consumers.append(consumer) + else: + # Batch consumer with a mismatched batch type + if consumer.get_batching_preference().supports_elements: + # Pass it elements if we can + self.element_consumers.append(consumer) + else: + # As a last resort, explode and rebatch + consumer_batch_converter = consumer.get_input_batch_converter() + # This consumer supports batches, it must have a batch converter + assert consumer_batch_converter is not None + other_batch_consumers[consumer_batch_converter].append(consumer) + + self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict( + other_batch_consumers) + + self.has_batch_consumers = ( + self.passthrough_batch_consumers or self.other_batch_consumers) + self._batched_elements: List[Any] = [] + + def receive(self, windowed_value): + # type: (WindowedValue) -> None + self.update_counters_start(windowed_value) + + for consumer in self.element_consumers: + cython.cast(Operation, consumer).process(windowed_value) + + # TODO: Do this branching when contstructing ConsumerSet + if self.has_batch_consumers: + self._batched_elements.append(windowed_value) + + self.update_counters_finish() Review Comment: This is not going to properly record the sizes of lazily observed values that were (only?) placed into a batch. ########## sdks/python/apache_beam/runners/worker/operations.py: ########## @@ -223,6 +235,120 @@ def current_element_progress(self): return self.consumer.current_element_progress() +class GeneralPurposeConsumerSet(ConsumerSet): + """ConsumerSet implementation that handles all combinations of possible edges. + """ + def __init__(self, + counter_factory, + step_name, # type: str + output_index, + coder, + producer_type_hints, + consumers, # type: List[Operation] + producer_batch_converter): + super().__init__( + counter_factory, + step_name, + output_index, + consumers, + coder, + producer_type_hints) + + self.producer_batch_converter = producer_batch_converter + + # Partition consumers into three groups: + # - consumers that will be passed elements + # - consumers that will be passed batches (where their input batch type + # matches the output of the producer) + # - consumers that will be passed converted batches + self.element_consumers: List[Operation] = [] + self.passthrough_batch_consumers: List[Operation] = [] + other_batch_consumers: DefaultDict[ + BatchConverter, List[Operation]] = collections.defaultdict(lambda: []) + + for consumer in consumers: + if not consumer.get_batching_preference().supports_batches: + self.element_consumers.append(consumer) + elif (consumer.get_input_batch_converter() == + self.producer_batch_converter): + self.passthrough_batch_consumers.append(consumer) + else: + # Batch consumer with a mismatched batch type + if consumer.get_batching_preference().supports_elements: + # Pass it elements if we can + self.element_consumers.append(consumer) + else: + # As a last resort, explode and rebatch + consumer_batch_converter = consumer.get_input_batch_converter() + # This consumer supports batches, it must have a batch converter + assert consumer_batch_converter is not None + other_batch_consumers[consumer_batch_converter].append(consumer) + + self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict( + other_batch_consumers) + + self.has_batch_consumers = ( + self.passthrough_batch_consumers or self.other_batch_consumers) + self._batched_elements: List[Any] = [] + + def receive(self, windowed_value): + # type: (WindowedValue) -> None + self.update_counters_start(windowed_value) + + for consumer in self.element_consumers: + cython.cast(Operation, consumer).process(windowed_value) + + # TODO: Do this branching when contstructing ConsumerSet + if self.has_batch_consumers: + self._batched_elements.append(windowed_value) Review Comment: We should put some limit here on the batch size, even if it's a fixed constant. ########## sdks/python/apache_beam/utils/windowed_value.pxd: ########## @@ -43,6 +43,23 @@ cdef class WindowedValue(object): cpdef WindowedValue with_value(self, new_value) +cdef class WindowedBatch(object): + cpdef WindowedBatch with_values(self, object new_values) + +cdef class HomogeneousWindowedBatch(WindowedBatch): + cdef public WindowedValue _wv Review Comment: Why is this public? ########## sdks/python/apache_beam/runners/worker/operations.py: ########## @@ -223,6 +235,120 @@ def current_element_progress(self): return self.consumer.current_element_progress() +class GeneralPurposeConsumerSet(ConsumerSet): + """ConsumerSet implementation that handles all combinations of possible edges. + """ + def __init__(self, + counter_factory, + step_name, # type: str + output_index, + coder, + producer_type_hints, + consumers, # type: List[Operation] + producer_batch_converter): + super().__init__( + counter_factory, + step_name, + output_index, + consumers, + coder, + producer_type_hints) + + self.producer_batch_converter = producer_batch_converter + + # Partition consumers into three groups: + # - consumers that will be passed elements + # - consumers that will be passed batches (where their input batch type + # matches the output of the producer) + # - consumers that will be passed converted batches + self.element_consumers: List[Operation] = [] + self.passthrough_batch_consumers: List[Operation] = [] + other_batch_consumers: DefaultDict[ + BatchConverter, List[Operation]] = collections.defaultdict(lambda: []) + + for consumer in consumers: + if not consumer.get_batching_preference().supports_batches: + self.element_consumers.append(consumer) + elif (consumer.get_input_batch_converter() == + self.producer_batch_converter): + self.passthrough_batch_consumers.append(consumer) + else: + # Batch consumer with a mismatched batch type + if consumer.get_batching_preference().supports_elements: + # Pass it elements if we can + self.element_consumers.append(consumer) + else: + # As a last resort, explode and rebatch + consumer_batch_converter = consumer.get_input_batch_converter() + # This consumer supports batches, it must have a batch converter + assert consumer_batch_converter is not None + other_batch_consumers[consumer_batch_converter].append(consumer) + + self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict( + other_batch_consumers) + + self.has_batch_consumers = ( + self.passthrough_batch_consumers or self.other_batch_consumers) + self._batched_elements: List[Any] = [] + + def receive(self, windowed_value): + # type: (WindowedValue) -> None + self.update_counters_start(windowed_value) + + for consumer in self.element_consumers: + cython.cast(Operation, consumer).process(windowed_value) + + # TODO: Do this branching when contstructing ConsumerSet + if self.has_batch_consumers: + self._batched_elements.append(windowed_value) + + self.update_counters_finish() + + def receive_batch(self, windowed_batch): + #self.update_counters_start(windowed_value) Review Comment: At least increment the element counter by the batch size and drop a TODO about getting the bytes (from the batch converter(?)). ########## sdks/python/apache_beam/runners/worker/operations.py: ########## @@ -223,6 +235,120 @@ def current_element_progress(self): return self.consumer.current_element_progress() +class GeneralPurposeConsumerSet(ConsumerSet): + """ConsumerSet implementation that handles all combinations of possible edges. + """ + def __init__(self, + counter_factory, + step_name, # type: str + output_index, + coder, + producer_type_hints, + consumers, # type: List[Operation] + producer_batch_converter): + super().__init__( + counter_factory, + step_name, + output_index, + consumers, + coder, + producer_type_hints) + + self.producer_batch_converter = producer_batch_converter + + # Partition consumers into three groups: + # - consumers that will be passed elements + # - consumers that will be passed batches (where their input batch type + # matches the output of the producer) + # - consumers that will be passed converted batches + self.element_consumers: List[Operation] = [] + self.passthrough_batch_consumers: List[Operation] = [] + other_batch_consumers: DefaultDict[ + BatchConverter, List[Operation]] = collections.defaultdict(lambda: []) + + for consumer in consumers: + if not consumer.get_batching_preference().supports_batches: + self.element_consumers.append(consumer) + elif (consumer.get_input_batch_converter() == + self.producer_batch_converter): + self.passthrough_batch_consumers.append(consumer) + else: + # Batch consumer with a mismatched batch type + if consumer.get_batching_preference().supports_elements: + # Pass it elements if we can + self.element_consumers.append(consumer) + else: + # As a last resort, explode and rebatch + consumer_batch_converter = consumer.get_input_batch_converter() + # This consumer supports batches, it must have a batch converter + assert consumer_batch_converter is not None + other_batch_consumers[consumer_batch_converter].append(consumer) + + self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict( + other_batch_consumers) + + self.has_batch_consumers = ( + self.passthrough_batch_consumers or self.other_batch_consumers) + self._batched_elements: List[Any] = [] + + def receive(self, windowed_value): + # type: (WindowedValue) -> None + self.update_counters_start(windowed_value) + + for consumer in self.element_consumers: + cython.cast(Operation, consumer).process(windowed_value) + + # TODO: Do this branching when contstructing ConsumerSet + if self.has_batch_consumers: + self._batched_elements.append(windowed_value) + + self.update_counters_finish() + + def receive_batch(self, windowed_batch): + #self.update_counters_start(windowed_value) + if self.element_consumers: + for wv in windowed_batch.as_windowed_values( + self.producer_batch_converter.explode_batch): + for consumer in self.element_consumers: + cython.cast(Operation, consumer).process(wv) + + for consumer in self.passthrough_batch_consumers: + cython.cast(Operation, consumer).process_batch(windowed_batch) + + for (consumer_batch_converter, + consumers) in self.other_batch_consumers.items(): + # Explode and rebatch into the new batch type (ouch!) Review Comment: Maybe worth logging a warning at least once? ########## sdks/python/apache_beam/utils/windowed_value.pxd: ########## @@ -43,6 +43,23 @@ cdef class WindowedValue(object): cpdef WindowedValue with_value(self, new_value) +cdef class WindowedBatch(object): + cpdef WindowedBatch with_values(self, object new_values) + +cdef class HomogeneousWindowedBatch(WindowedBatch): + cdef public WindowedValue _wv + + cpdef WindowedBatch with_values(self, object new_values) + +cdef class ConcreteWindowedBatch(WindowedBatch): + cdef public object values Review Comment: Why not store this as a list of WindowedValues? ########## sdks/python/apache_beam/utils/windowed_value.pxd: ########## @@ -43,6 +43,23 @@ cdef class WindowedValue(object): cpdef WindowedValue with_value(self, new_value) +cdef class WindowedBatch(object): + cpdef WindowedBatch with_values(self, object new_values) + +cdef class HomogeneousWindowedBatch(WindowedBatch): + cdef public WindowedValue _wv + + cpdef WindowedBatch with_values(self, object new_values) + +cdef class ConcreteWindowedBatch(WindowedBatch): Review Comment: Heterogeneous? ########## sdks/python/apache_beam/utils/windowed_value.py: ########## @@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN): return wv +class BatchingMode(Enum): + CONCRETE = 1 + HOMOGENEOUS = 2 + + +class WindowedBatch(object): + """A batch of N windowed values, each having a value, a timestamp and set of + windows.""" + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + + """Creates a new WindowedBatch with the same timestamps and windows as this. + + This is the fasted way to create a new WindowedValue. + """ + raise NotImplementedError + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + raise NotImplementedError + + @staticmethod + def from_windowed_values( + windowed_values: Sequence[WindowedValue], + *, + produce_fn: Callable, + mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']: + if mode == BatchingMode.HOMOGENEOUS: + import collections + grouped = collections.defaultdict(lambda: []) + for wv in windowed_values: + grouped[(wv.timestamp, tuple(wv.windows), + wv.pane_info)].append(wv.value) + + for key, values in grouped.items(): + timestamp, windows, pane_info = key + yield HomogeneousWindowedBatch.of( + produce_fn(values), timestamp, windows, pane_info) + elif mode == BatchingMode.CONCRETE: + yield ConcreteWindowedBatch( + produce_fn([wv.value for wv in windowed_values]), + [wv.timestamp + for wv in windowed_values], [wv.windows for wv in windowed_values], + [wv.pane_info for wv in windowed_values]) + else: + raise AssertionError( + "Unrecognized BatchingMode in " + f"WindowedBatch.from_windowed_values: {mode!r}") + + +class HomogeneousWindowedBatch(WindowedBatch): + """A WindowedBatch with Homogeneous event-time information, represented + internally as a WindowedValue. + """ + def __init__(self, wv): + self._wv = wv + + @staticmethod + def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN): + return HomogeneousWindowedBatch( + WindowedValue(values, timestamp, windows, pane_info)) + + @property + def values(self): + return self._wv.value + + @property + def timestamp(self): + return self._wv.timestamp + + @property + def pane_info(self): + return self._wv.pane_info + + @property + def windows(self): + return self._wv.windows + + @windows.setter + def windows(self, value): + self._wv.windows = value + + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + return HomogeneousWindowedBatch(self._wv.with_value(new_values)) + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + for value in explode_fn(self._wv.value): + yield self._wv.with_value(value) + + def __eq__(self, other): + if isinstance(other, HomogeneousWindowedBatch): + return self._wv == other._wv + return NotImplemented + + def __hash__(self): + return hash(self._wv) + + +class ConcreteWindowedBatch(WindowedBatch): + """A concrete WindowedBatch where all event-time information is stored + independently for each element. + + Attributes: + values: The underlying values of the windowed batch. + timestamp: An iterable of timestamps associated with the value as seconds + since Unix epoch. + windows: An iterable with a set (iterable) of window objects for each value. + The window objects are descendants of the BoundedWindow class. + pane_info: An iterable of PaneInfo descriptors describing the triggering + information for the pane that contained each value. Alternatively, a + single PaneInfo may be specified to use for every value. If None, will be + set to PANE_INFO_UNKNOWN. + """ + def __init__( + self, + values, + timestamps, # type: Sequence[TimestampTypes] + windows, # type: Iterable[Tuple[BoundedWindow, ...]] + pane_infos=PANE_INFO_UNKNOWN # type: Union[Iterable[PaneInfo],PaneInfo] + ): + self.values = values + + def convert_timestamp(timestamp: TimestampTypes) -> int: + if isinstance(timestamp, int): + return timestamp * 1000000 + else: + # TODO: Cache Timestamp object as in WindowedValue? + timestamp_object = ( + timestamp + if isinstance(timestamp, Timestamp) else Timestamp.of(timestamp)) + return timestamp_object.micros + + self.timestamp_objects: Optional[List[Timestamp]] = None + self.timestamps_micros = [convert_timestamp(t) for t in timestamps] + self.windows = windows + #TODO: Should we store length? + #self.length = length + self.pane_infos = pane_infos + + @property + def timestamps(self) -> Sequence[Timestamp]: Review Comment: Is this ever needed/used? ########## sdks/python/apache_beam/utils/windowed_value.py: ########## @@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN): return wv +class BatchingMode(Enum): + CONCRETE = 1 + HOMOGENEOUS = 2 + + +class WindowedBatch(object): + """A batch of N windowed values, each having a value, a timestamp and set of + windows.""" + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + + """Creates a new WindowedBatch with the same timestamps and windows as this. + + This is the fasted way to create a new WindowedValue. + """ + raise NotImplementedError + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + raise NotImplementedError + + @staticmethod + def from_windowed_values( + windowed_values: Sequence[WindowedValue], + *, + produce_fn: Callable, + mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']: + if mode == BatchingMode.HOMOGENEOUS: + import collections + grouped = collections.defaultdict(lambda: []) + for wv in windowed_values: + grouped[(wv.timestamp, tuple(wv.windows), + wv.pane_info)].append(wv.value) + + for key, values in grouped.items(): + timestamp, windows, pane_info = key + yield HomogeneousWindowedBatch.of( + produce_fn(values), timestamp, windows, pane_info) + elif mode == BatchingMode.CONCRETE: + yield ConcreteWindowedBatch( + produce_fn([wv.value for wv in windowed_values]), + [wv.timestamp + for wv in windowed_values], [wv.windows for wv in windowed_values], + [wv.pane_info for wv in windowed_values]) + else: + raise AssertionError( + "Unrecognized BatchingMode in " + f"WindowedBatch.from_windowed_values: {mode!r}") + + +class HomogeneousWindowedBatch(WindowedBatch): + """A WindowedBatch with Homogeneous event-time information, represented + internally as a WindowedValue. + """ + def __init__(self, wv): + self._wv = wv + + @staticmethod + def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN): + return HomogeneousWindowedBatch( + WindowedValue(values, timestamp, windows, pane_info)) + + @property + def values(self): + return self._wv.value + + @property + def timestamp(self): Review Comment: Is this (and pane_info and windows) actually used anywhere? ########## sdks/python/apache_beam/utils/windowed_value.py: ########## @@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN): return wv +class BatchingMode(Enum): + CONCRETE = 1 + HOMOGENEOUS = 2 + + +class WindowedBatch(object): + """A batch of N windowed values, each having a value, a timestamp and set of + windows.""" + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + + """Creates a new WindowedBatch with the same timestamps and windows as this. + + This is the fasted way to create a new WindowedValue. + """ + raise NotImplementedError + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + raise NotImplementedError + + @staticmethod + def from_windowed_values( + windowed_values: Sequence[WindowedValue], + *, + produce_fn: Callable, + mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']: + if mode == BatchingMode.HOMOGENEOUS: + import collections + grouped = collections.defaultdict(lambda: []) + for wv in windowed_values: + grouped[(wv.timestamp, tuple(wv.windows), Review Comment: Pulling timestamp etc. out as Python objects could be expensive. Could use wv.with_value(None) as the key. ########## sdks/python/apache_beam/utils/windowed_value.pxd: ########## @@ -43,6 +43,23 @@ cdef class WindowedValue(object): cpdef WindowedValue with_value(self, new_value) +cdef class WindowedBatch(object): + cpdef WindowedBatch with_values(self, object new_values) + +cdef class HomogeneousWindowedBatch(WindowedBatch): + cdef public WindowedValue _wv + + cpdef WindowedBatch with_values(self, object new_values) + +cdef class ConcreteWindowedBatch(WindowedBatch): + cdef public object values Review Comment: Or, if the Values need to be arbitrary batches, let's store this as (values, List[WindowedvValue] where the latter all have value None. ########## sdks/python/apache_beam/runners/worker/operations.py: ########## @@ -329,7 +432,9 @@ def finish(self): # type: () -> None """Finish operation.""" - pass + # TODO: Do we need an output_index here Review Comment: I don't think so; we should be able to finish everything here. ########## sdks/python/apache_beam/utils/windowed_value.py: ########## @@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN): return wv +class BatchingMode(Enum): + CONCRETE = 1 + HOMOGENEOUS = 2 + + +class WindowedBatch(object): + """A batch of N windowed values, each having a value, a timestamp and set of + windows.""" + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + + """Creates a new WindowedBatch with the same timestamps and windows as this. + + This is the fasted way to create a new WindowedValue. + """ + raise NotImplementedError + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + raise NotImplementedError + + @staticmethod + def from_windowed_values( + windowed_values: Sequence[WindowedValue], + *, + produce_fn: Callable, + mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']: + if mode == BatchingMode.HOMOGENEOUS: Review Comment: Do we need an enum here, or should we just implement this on the subclasses, e.g. HomogeneousWindowedBatch.from_windowed_values to get a set of homogeneous windowed values? ########## sdks/python/apache_beam/utils/windowed_value.py: ########## @@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN): return wv +class BatchingMode(Enum): + CONCRETE = 1 + HOMOGENEOUS = 2 + + +class WindowedBatch(object): + """A batch of N windowed values, each having a value, a timestamp and set of + windows.""" + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + + """Creates a new WindowedBatch with the same timestamps and windows as this. + + This is the fasted way to create a new WindowedValue. + """ + raise NotImplementedError + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + raise NotImplementedError + + @staticmethod + def from_windowed_values( + windowed_values: Sequence[WindowedValue], + *, + produce_fn: Callable, + mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']: + if mode == BatchingMode.HOMOGENEOUS: Review Comment: Maybe we could want to hide the concrete subclasses, but it seems like this always gets overridden. Perhaps we don't need the concrete implementation until we solve the questions of batching + windowing in the API. ########## sdks/python/apache_beam/utils/windowed_value.py: ########## @@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN): return wv +class BatchingMode(Enum): + CONCRETE = 1 + HOMOGENEOUS = 2 + + +class WindowedBatch(object): + """A batch of N windowed values, each having a value, a timestamp and set of + windows.""" + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + + """Creates a new WindowedBatch with the same timestamps and windows as this. + + This is the fasted way to create a new WindowedValue. + """ + raise NotImplementedError + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + raise NotImplementedError + + @staticmethod + def from_windowed_values( + windowed_values: Sequence[WindowedValue], + *, + produce_fn: Callable, + mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']: + if mode == BatchingMode.HOMOGENEOUS: + import collections + grouped = collections.defaultdict(lambda: []) + for wv in windowed_values: + grouped[(wv.timestamp, tuple(wv.windows), + wv.pane_info)].append(wv.value) + + for key, values in grouped.items(): + timestamp, windows, pane_info = key + yield HomogeneousWindowedBatch.of( + produce_fn(values), timestamp, windows, pane_info) + elif mode == BatchingMode.CONCRETE: + yield ConcreteWindowedBatch( + produce_fn([wv.value for wv in windowed_values]), + [wv.timestamp + for wv in windowed_values], [wv.windows for wv in windowed_values], + [wv.pane_info for wv in windowed_values]) + else: + raise AssertionError( + "Unrecognized BatchingMode in " + f"WindowedBatch.from_windowed_values: {mode!r}") + + +class HomogeneousWindowedBatch(WindowedBatch): + """A WindowedBatch with Homogeneous event-time information, represented + internally as a WindowedValue. + """ + def __init__(self, wv): + self._wv = wv + + @staticmethod + def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN): + return HomogeneousWindowedBatch( + WindowedValue(values, timestamp, windows, pane_info)) + + @property + def values(self): + return self._wv.value + + @property + def timestamp(self): + return self._wv.timestamp + + @property + def pane_info(self): + return self._wv.pane_info + + @property + def windows(self): + return self._wv.windows + + @windows.setter + def windows(self, value): + self._wv.windows = value + + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + return HomogeneousWindowedBatch(self._wv.with_value(new_values)) + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + for value in explode_fn(self._wv.value): + yield self._wv.with_value(value) + + def __eq__(self, other): + if isinstance(other, HomogeneousWindowedBatch): + return self._wv == other._wv + return NotImplemented + + def __hash__(self): + return hash(self._wv) + + +class ConcreteWindowedBatch(WindowedBatch): + """A concrete WindowedBatch where all event-time information is stored + independently for each element. + + Attributes: + values: The underlying values of the windowed batch. + timestamp: An iterable of timestamps associated with the value as seconds + since Unix epoch. + windows: An iterable with a set (iterable) of window objects for each value. + The window objects are descendants of the BoundedWindow class. + pane_info: An iterable of PaneInfo descriptors describing the triggering + information for the pane that contained each value. Alternatively, a + single PaneInfo may be specified to use for every value. If None, will be + set to PANE_INFO_UNKNOWN. + """ + def __init__( + self, + values, + timestamps, # type: Sequence[TimestampTypes] + windows, # type: Iterable[Tuple[BoundedWindow, ...]] + pane_infos=PANE_INFO_UNKNOWN # type: Union[Iterable[PaneInfo],PaneInfo] + ): + self.values = values + + def convert_timestamp(timestamp: TimestampTypes) -> int: + if isinstance(timestamp, int): + return timestamp * 1000000 + else: + # TODO: Cache Timestamp object as in WindowedValue? + timestamp_object = ( + timestamp + if isinstance(timestamp, Timestamp) else Timestamp.of(timestamp)) + return timestamp_object.micros + + self.timestamp_objects: Optional[List[Timestamp]] = None + self.timestamps_micros = [convert_timestamp(t) for t in timestamps] + self.windows = windows + #TODO: Should we store length? + #self.length = length + self.pane_infos = pane_infos + + @property + def timestamps(self) -> Sequence[Timestamp]: + if self.timestamp_objects is None: + self.timestamp_objects = [ + Timestamp(0, micros) for micros in self.timestamps_micros + ] + + return self.timestamp_objects + + def with_values(self, new_values): Review Comment: Actually, does this ever get used? ########## sdks/python/apache_beam/utils/windowed_value.py: ########## @@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN): return wv +class BatchingMode(Enum): + CONCRETE = 1 + HOMOGENEOUS = 2 + + +class WindowedBatch(object): + """A batch of N windowed values, each having a value, a timestamp and set of + windows.""" + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + + """Creates a new WindowedBatch with the same timestamps and windows as this. + + This is the fasted way to create a new WindowedValue. + """ + raise NotImplementedError + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + raise NotImplementedError + + @staticmethod + def from_windowed_values( + windowed_values: Sequence[WindowedValue], + *, + produce_fn: Callable, + mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']: + if mode == BatchingMode.HOMOGENEOUS: + import collections + grouped = collections.defaultdict(lambda: []) + for wv in windowed_values: + grouped[(wv.timestamp, tuple(wv.windows), + wv.pane_info)].append(wv.value) + + for key, values in grouped.items(): + timestamp, windows, pane_info = key + yield HomogeneousWindowedBatch.of( + produce_fn(values), timestamp, windows, pane_info) + elif mode == BatchingMode.CONCRETE: + yield ConcreteWindowedBatch( + produce_fn([wv.value for wv in windowed_values]), + [wv.timestamp + for wv in windowed_values], [wv.windows for wv in windowed_values], + [wv.pane_info for wv in windowed_values]) + else: + raise AssertionError( + "Unrecognized BatchingMode in " + f"WindowedBatch.from_windowed_values: {mode!r}") + + +class HomogeneousWindowedBatch(WindowedBatch): + """A WindowedBatch with Homogeneous event-time information, represented + internally as a WindowedValue. + """ + def __init__(self, wv): + self._wv = wv + + @staticmethod + def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN): + return HomogeneousWindowedBatch( + WindowedValue(values, timestamp, windows, pane_info)) + + @property + def values(self): + return self._wv.value + + @property + def timestamp(self): + return self._wv.timestamp + + @property + def pane_info(self): + return self._wv.pane_info + + @property + def windows(self): + return self._wv.windows + + @windows.setter + def windows(self, value): + self._wv.windows = value + + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + return HomogeneousWindowedBatch(self._wv.with_value(new_values)) + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + for value in explode_fn(self._wv.value): + yield self._wv.with_value(value) + + def __eq__(self, other): + if isinstance(other, HomogeneousWindowedBatch): + return self._wv == other._wv + return NotImplemented + + def __hash__(self): + return hash(self._wv) + + +class ConcreteWindowedBatch(WindowedBatch): + """A concrete WindowedBatch where all event-time information is stored + independently for each element. + + Attributes: + values: The underlying values of the windowed batch. + timestamp: An iterable of timestamps associated with the value as seconds + since Unix epoch. + windows: An iterable with a set (iterable) of window objects for each value. + The window objects are descendants of the BoundedWindow class. + pane_info: An iterable of PaneInfo descriptors describing the triggering + information for the pane that contained each value. Alternatively, a + single PaneInfo may be specified to use for every value. If None, will be + set to PANE_INFO_UNKNOWN. + """ + def __init__( + self, + values, + timestamps, # type: Sequence[TimestampTypes] + windows, # type: Iterable[Tuple[BoundedWindow, ...]] + pane_infos=PANE_INFO_UNKNOWN # type: Union[Iterable[PaneInfo],PaneInfo] + ): + self.values = values + + def convert_timestamp(timestamp: TimestampTypes) -> int: + if isinstance(timestamp, int): + return timestamp * 1000000 + else: + # TODO: Cache Timestamp object as in WindowedValue? + timestamp_object = ( + timestamp + if isinstance(timestamp, Timestamp) else Timestamp.of(timestamp)) + return timestamp_object.micros + + self.timestamp_objects: Optional[List[Timestamp]] = None + self.timestamps_micros = [convert_timestamp(t) for t in timestamps] + self.windows = windows + #TODO: Should we store length? + #self.length = length + self.pane_infos = pane_infos + + @property + def timestamps(self) -> Sequence[Timestamp]: + if self.timestamp_objects is None: + self.timestamp_objects = [ + Timestamp(0, micros) for micros in self.timestamps_micros + ] + + return self.timestamp_objects + + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + + """Creates a new WindowedBatch with the same timestamps and windows as this. + + This is the fasted way to create a new WindowedValue. + """ + return create_batch( + new_values, self.timestamps_micros, self.windows, self.pane_infos) + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + for value, timestamp, windows, pane_info in zip(explode_fn(self.values), + self.timestamps_micros, + self.windows, + self._pane_infos_iter()): + yield create(value, timestamp, windows, pane_info) + + def _pane_infos_iter(self): + if isinstance(self.pane_infos, PaneInfo): + return itertools.repeat(self.pane_infos, len(self.timestamps_micros)) + else: + return self.pane_infos + + def __eq__(self, other): + if isinstance(other, ConcreteWindowedBatch): + return ( + type(self) == type(other) and + self.timestamps_micros == other.timestamps_micros and + self.values == other.values and self.windows == other.windows and + self.pane_infos == other.pane_infos) + return NotImplemented + + def __hash__(self): + if isinstance(self.pane_infos, PaneInfo): + pane_infos_hash = hash(self.pane_infos) + else: + pane_infos_hash = sum(hash(p) for p in self.pane_infos) + + return ((hash(self.values) & 0xFFFFFFFFFFFFFFF) + 3 * + (sum(self.timestamps_micros) & 0xFFFFFFFFFFFFFF) + 7 * + (sum(hash(w) for w in self.windows) & 0xFFFFFFFFFFFFF) + 11 * + (pane_infos_hash & 0xFFFFFFFFFFFFF)) + + +def create_batch( Review Comment: This indirection was for performance reasons; just inline it above. ########## sdks/python/apache_beam/utils/windowed_value.py: ########## @@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN): return wv +class BatchingMode(Enum): + CONCRETE = 1 + HOMOGENEOUS = 2 + + +class WindowedBatch(object): + """A batch of N windowed values, each having a value, a timestamp and set of + windows.""" + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + + """Creates a new WindowedBatch with the same timestamps and windows as this. + + This is the fasted way to create a new WindowedValue. + """ + raise NotImplementedError + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + raise NotImplementedError + + @staticmethod + def from_windowed_values( + windowed_values: Sequence[WindowedValue], + *, + produce_fn: Callable, + mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']: + if mode == BatchingMode.HOMOGENEOUS: + import collections + grouped = collections.defaultdict(lambda: []) + for wv in windowed_values: + grouped[(wv.timestamp, tuple(wv.windows), + wv.pane_info)].append(wv.value) + + for key, values in grouped.items(): + timestamp, windows, pane_info = key + yield HomogeneousWindowedBatch.of( + produce_fn(values), timestamp, windows, pane_info) + elif mode == BatchingMode.CONCRETE: + yield ConcreteWindowedBatch( + produce_fn([wv.value for wv in windowed_values]), + [wv.timestamp + for wv in windowed_values], [wv.windows for wv in windowed_values], + [wv.pane_info for wv in windowed_values]) + else: + raise AssertionError( + "Unrecognized BatchingMode in " + f"WindowedBatch.from_windowed_values: {mode!r}") + + +class HomogeneousWindowedBatch(WindowedBatch): + """A WindowedBatch with Homogeneous event-time information, represented + internally as a WindowedValue. + """ + def __init__(self, wv): + self._wv = wv + + @staticmethod + def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN): + return HomogeneousWindowedBatch( + WindowedValue(values, timestamp, windows, pane_info)) + + @property + def values(self): + return self._wv.value + + @property + def timestamp(self): + return self._wv.timestamp + + @property + def pane_info(self): + return self._wv.pane_info + + @property + def windows(self): + return self._wv.windows + + @windows.setter + def windows(self, value): + self._wv.windows = value + + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + return HomogeneousWindowedBatch(self._wv.with_value(new_values)) + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + for value in explode_fn(self._wv.value): + yield self._wv.with_value(value) + + def __eq__(self, other): + if isinstance(other, HomogeneousWindowedBatch): + return self._wv == other._wv + return NotImplemented + + def __hash__(self): + return hash(self._wv) + + +class ConcreteWindowedBatch(WindowedBatch): + """A concrete WindowedBatch where all event-time information is stored + independently for each element. + + Attributes: + values: The underlying values of the windowed batch. + timestamp: An iterable of timestamps associated with the value as seconds + since Unix epoch. + windows: An iterable with a set (iterable) of window objects for each value. + The window objects are descendants of the BoundedWindow class. + pane_info: An iterable of PaneInfo descriptors describing the triggering + information for the pane that contained each value. Alternatively, a + single PaneInfo may be specified to use for every value. If None, will be + set to PANE_INFO_UNKNOWN. + """ + def __init__( + self, + values, + timestamps, # type: Sequence[TimestampTypes] + windows, # type: Iterable[Tuple[BoundedWindow, ...]] + pane_infos=PANE_INFO_UNKNOWN # type: Union[Iterable[PaneInfo],PaneInfo] Review Comment: Let's avoid making this optional, for WindowedValue that was only for backwards compatibility. (Per the suggestion above, perhaps this signature should be different anyway, and we'd avoid things like convert_timestamp...) ########## sdks/python/apache_beam/utils/windowed_value.py: ########## @@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN): return wv +class BatchingMode(Enum): + CONCRETE = 1 + HOMOGENEOUS = 2 + + +class WindowedBatch(object): + """A batch of N windowed values, each having a value, a timestamp and set of + windows.""" + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + + """Creates a new WindowedBatch with the same timestamps and windows as this. + + This is the fasted way to create a new WindowedValue. + """ + raise NotImplementedError + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + raise NotImplementedError + + @staticmethod + def from_windowed_values( + windowed_values: Sequence[WindowedValue], + *, + produce_fn: Callable, + mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']: + if mode == BatchingMode.HOMOGENEOUS: + import collections + grouped = collections.defaultdict(lambda: []) + for wv in windowed_values: + grouped[(wv.timestamp, tuple(wv.windows), + wv.pane_info)].append(wv.value) + + for key, values in grouped.items(): + timestamp, windows, pane_info = key + yield HomogeneousWindowedBatch.of( + produce_fn(values), timestamp, windows, pane_info) + elif mode == BatchingMode.CONCRETE: + yield ConcreteWindowedBatch( + produce_fn([wv.value for wv in windowed_values]), + [wv.timestamp + for wv in windowed_values], [wv.windows for wv in windowed_values], + [wv.pane_info for wv in windowed_values]) + else: + raise AssertionError( + "Unrecognized BatchingMode in " + f"WindowedBatch.from_windowed_values: {mode!r}") + + +class HomogeneousWindowedBatch(WindowedBatch): + """A WindowedBatch with Homogeneous event-time information, represented + internally as a WindowedValue. + """ + def __init__(self, wv): + self._wv = wv + + @staticmethod + def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN): + return HomogeneousWindowedBatch( + WindowedValue(values, timestamp, windows, pane_info)) + + @property + def values(self): + return self._wv.value + + @property + def timestamp(self): + return self._wv.timestamp + + @property + def pane_info(self): + return self._wv.pane_info + + @property + def windows(self): + return self._wv.windows + + @windows.setter + def windows(self, value): + self._wv.windows = value + + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + return HomogeneousWindowedBatch(self._wv.with_value(new_values)) + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + for value in explode_fn(self._wv.value): + yield self._wv.with_value(value) + + def __eq__(self, other): + if isinstance(other, HomogeneousWindowedBatch): + return self._wv == other._wv + return NotImplemented + + def __hash__(self): + return hash(self._wv) + + +class ConcreteWindowedBatch(WindowedBatch): + """A concrete WindowedBatch where all event-time information is stored + independently for each element. + + Attributes: + values: The underlying values of the windowed batch. + timestamp: An iterable of timestamps associated with the value as seconds + since Unix epoch. + windows: An iterable with a set (iterable) of window objects for each value. + The window objects are descendants of the BoundedWindow class. + pane_info: An iterable of PaneInfo descriptors describing the triggering + information for the pane that contained each value. Alternatively, a + single PaneInfo may be specified to use for every value. If None, will be + set to PANE_INFO_UNKNOWN. + """ + def __init__( + self, + values, + timestamps, # type: Sequence[TimestampTypes] + windows, # type: Iterable[Tuple[BoundedWindow, ...]] + pane_infos=PANE_INFO_UNKNOWN # type: Union[Iterable[PaneInfo],PaneInfo] + ): + self.values = values + + def convert_timestamp(timestamp: TimestampTypes) -> int: + if isinstance(timestamp, int): + return timestamp * 1000000 + else: + # TODO: Cache Timestamp object as in WindowedValue? + timestamp_object = ( + timestamp + if isinstance(timestamp, Timestamp) else Timestamp.of(timestamp)) + return timestamp_object.micros + + self.timestamp_objects: Optional[List[Timestamp]] = None + self.timestamps_micros = [convert_timestamp(t) for t in timestamps] + self.windows = windows + #TODO: Should we store length? + #self.length = length + self.pane_infos = pane_infos + + @property + def timestamps(self) -> Sequence[Timestamp]: + if self.timestamp_objects is None: + self.timestamp_objects = [ + Timestamp(0, micros) for micros in self.timestamps_micros + ] + + return self.timestamp_objects + + def with_values(self, new_values): Review Comment: Presumably there's some constraints on the size and ordering of new_values? ########## sdks/python/apache_beam/utils/windowed_value.py: ########## @@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN): return wv +class BatchingMode(Enum): + CONCRETE = 1 + HOMOGENEOUS = 2 + + +class WindowedBatch(object): + """A batch of N windowed values, each having a value, a timestamp and set of + windows.""" + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + + """Creates a new WindowedBatch with the same timestamps and windows as this. + + This is the fasted way to create a new WindowedValue. + """ + raise NotImplementedError + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + raise NotImplementedError + + @staticmethod + def from_windowed_values( + windowed_values: Sequence[WindowedValue], + *, + produce_fn: Callable, + mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']: + if mode == BatchingMode.HOMOGENEOUS: + import collections + grouped = collections.defaultdict(lambda: []) + for wv in windowed_values: + grouped[(wv.timestamp, tuple(wv.windows), + wv.pane_info)].append(wv.value) + + for key, values in grouped.items(): + timestamp, windows, pane_info = key + yield HomogeneousWindowedBatch.of( + produce_fn(values), timestamp, windows, pane_info) + elif mode == BatchingMode.CONCRETE: + yield ConcreteWindowedBatch( + produce_fn([wv.value for wv in windowed_values]), + [wv.timestamp + for wv in windowed_values], [wv.windows for wv in windowed_values], + [wv.pane_info for wv in windowed_values]) + else: + raise AssertionError( + "Unrecognized BatchingMode in " + f"WindowedBatch.from_windowed_values: {mode!r}") + + +class HomogeneousWindowedBatch(WindowedBatch): + """A WindowedBatch with Homogeneous event-time information, represented + internally as a WindowedValue. + """ + def __init__(self, wv): + self._wv = wv + + @staticmethod + def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN): + return HomogeneousWindowedBatch( + WindowedValue(values, timestamp, windows, pane_info)) + + @property + def values(self): + return self._wv.value + + @property + def timestamp(self): + return self._wv.timestamp + + @property + def pane_info(self): + return self._wv.pane_info + + @property + def windows(self): + return self._wv.windows + + @windows.setter Review Comment: Where is this used? I'd prefer this be immutable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org