TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864334832


##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible 
edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)
+
+    self.update_counters_finish()
+
+  def receive_batch(self, windowed_batch):
+    #self.update_counters_start(windowed_value)
+    if self.element_consumers:
+      for wv in windowed_batch.as_windowed_values(
+          self.producer_batch_converter.explode_batch):
+        for consumer in self.element_consumers:
+          cython.cast(Operation, consumer).process(wv)
+
+    for consumer in self.passthrough_batch_consumers:
+      cython.cast(Operation, consumer).process_batch(windowed_batch)
+
+    for (consumer_batch_converter,
+         consumers) in self.other_batch_consumers.items():
+      # Explode and rebatch into the new batch type (ouch!)

Review Comment:
   It looks like we could do something like BeamDeprecationWarning to make sure 
the warning is just logged once: 
https://github.com/apache/beam/blob/be7823d0a81b00bb0f036b129176dba858e8f6c0/sdks/python/apache_beam/utils/annotations.py#L91-L96
   
   Unfortunately I realized we don't have the producing operation at hand, I'll 
have to see if there's a lightweight way to get it  when we need it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to