[ 
https://issues.apache.org/jira/browse/BEAM-14294?focusedWorklogId=765227&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765227
 ]

ASF GitHub Bot logged work on BEAM-14294:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/May/22 00:15
            Start Date: 03/May/22 00:15
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863272376


##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1580,80 @@ def process_outputs(
         self.main_receivers.receive(windowed_value)
       else:
         self.tagged_receivers[tag].receive(windowed_value)
+
+    # TODO(BEAM-3937): Remove if block after output counter released.
+    # Only enable per_element_output_counter when counter cythonized
+    if self.per_element_output_counter is not None:
+      self.per_element_output_counter.add_input(output_element_count)
+
+  def process_batch_outputs(
+      self, windowed_input_batch, results, watermark_estimator=None):
+    # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> 
None
+
+    """Dispatch the result of process computation to the appropriate receivers.
+
+    A value wrapped in a TaggedOutput object will be unwrapped and
+    then dispatched to the appropriate indexed output.
+    """
+    if results is None:
+      # TODO(BEAM-3937): Remove if block after output counter released.
+      # Only enable per_element_output_counter when counter cythonized.
+      if self.per_element_output_counter is not None:
+        self.per_element_output_counter.add_input(0)
+      return
+
+    # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+    #  if performance_runtime_type_check is active, without harming performance
+
+    output_element_count = 0
+    for result in results:
+      # results here may be a generator, which cannot call len on it.
+      output_element_count += 1
+      tag = None
+      if isinstance(result, TaggedOutput):
+        tag = result.tag
+        if not isinstance(tag, str):
+          raise TypeError('In %s, tag %s is not a string' % (self, tag))
+        result = result.value
+      if isinstance(result, (WindowedValue, TimestampedValue)):
+        raise TypeError(
+            f"Received {type(result).__name__} from DoFn that was "
+            "expected to produce a batch.")
+      if isinstance(result, WindowedBatch):
+        if isinstance(result, ConcreteWindowedBatch):
+          # TODO: Rebatch into homogenous batches (or remove
+          # ConcreteWindowedBatch)
+          raise NotImplementedError
+        elif isinstance(result, HomogeneousWindowedBatch):
+          windowed_batch = result
+        else:
+          raise AssertionError(
+              "Unrecognized WindowedBatch implementation: "
+              f"{type(windowed_batch)}")
+
+        if (windowed_input_batch is not None and
+            len(windowed_input_batch.windows) != 1):
+          windowed_batch.windows *= len(windowed_input_batch.windows)

Review Comment:
   Ah yes, thank you. FWIW this was lifted from: 
https://github.com/apache/beam/blob/0daef62a7bd993b13064de80588e343ee764e004/sdks/python/apache_beam/runners/common.py#L1354-1355





Issue Time Tracking
-------------------

    Worklog Id:     (was: 765227)
    Time Spent: 2h 50m  (was: 2h 40m)

> MVP for SDK worker changes to support process_batch
> ---------------------------------------------------
>
>                 Key: BEAM-14294
>                 URL: https://issues.apache.org/jira/browse/BEAM-14294
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Brian Hulette
>            Priority: P2
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> The initial MVP may only work in some restricted circumstances (e.g. 
> @yields_element on process_batch, or batch-to-batch without a 1:1 
> input:output mapping might not be supported). These cases should fail early. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to