TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863272376


##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1580,80 @@ def process_outputs(
         self.main_receivers.receive(windowed_value)
       else:
         self.tagged_receivers[tag].receive(windowed_value)
+
+    # TODO(BEAM-3937): Remove if block after output counter released.
+    # Only enable per_element_output_counter when counter cythonized
+    if self.per_element_output_counter is not None:
+      self.per_element_output_counter.add_input(output_element_count)
+
+  def process_batch_outputs(
+      self, windowed_input_batch, results, watermark_estimator=None):
+    # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> 
None
+
+    """Dispatch the result of process computation to the appropriate receivers.
+
+    A value wrapped in a TaggedOutput object will be unwrapped and
+    then dispatched to the appropriate indexed output.
+    """
+    if results is None:
+      # TODO(BEAM-3937): Remove if block after output counter released.
+      # Only enable per_element_output_counter when counter cythonized.
+      if self.per_element_output_counter is not None:
+        self.per_element_output_counter.add_input(0)
+      return
+
+    # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+    #  if performance_runtime_type_check is active, without harming performance
+
+    output_element_count = 0
+    for result in results:
+      # results here may be a generator, which cannot call len on it.
+      output_element_count += 1
+      tag = None
+      if isinstance(result, TaggedOutput):
+        tag = result.tag
+        if not isinstance(tag, str):
+          raise TypeError('In %s, tag %s is not a string' % (self, tag))
+        result = result.value
+      if isinstance(result, (WindowedValue, TimestampedValue)):
+        raise TypeError(
+            f"Received {type(result).__name__} from DoFn that was "
+            "expected to produce a batch.")
+      if isinstance(result, WindowedBatch):
+        if isinstance(result, ConcreteWindowedBatch):
+          # TODO: Rebatch into homogenous batches (or remove
+          # ConcreteWindowedBatch)
+          raise NotImplementedError
+        elif isinstance(result, HomogeneousWindowedBatch):
+          windowed_batch = result
+        else:
+          raise AssertionError(
+              "Unrecognized WindowedBatch implementation: "
+              f"{type(windowed_batch)}")
+
+        if (windowed_input_batch is not None and
+            len(windowed_input_batch.windows) != 1):
+          windowed_batch.windows *= len(windowed_input_batch.windows)

Review Comment:
   Ah yes, thank you. FWIW this was lifted from: 
https://github.com/apache/beam/blob/0daef62a7bd993b13064de80588e343ee764e004/sdks/python/apache_beam/runners/common.py#L1354-1355



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to