TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863244420


##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible 
edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)
+
+    self.update_counters_finish()
+
+  def receive_batch(self, windowed_batch):
+    #self.update_counters_start(windowed_value)
+    if self.element_consumers:
+      for wv in windowed_batch.as_windowed_values(
+          self.producer_batch_converter.explode_batch):
+        for consumer in self.element_consumers:
+          cython.cast(Operation, consumer).process(wv)
+
+    for consumer in self.passthrough_batch_consumers:
+      cython.cast(Operation, consumer).process_batch(windowed_batch)
+
+    for (consumer_batch_converter,
+         consumers) in self.other_batch_consumers.items():
+      # Explode and rebatch into the new batch type (ouch!)

Review Comment:
   If we log this when populating other_batch_consumers in `__init__` will it 
only be logged once (per worker)?
   
   I'm also a little worried that the error will be a little opaque to the 
user, and un-actionable. Any thoughts on how to make it more helpful? I suppose 
we can recognize the DoOperation to DoOperation case and make that nice at 
least.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to