[ 
https://issues.apache.org/jira/browse/BEAM-14294?focusedWorklogId=765775&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765775
 ]

ASF GitHub Bot logged work on BEAM-14294:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/May/22 00:32
            Start Date: 04/May/22 00:32
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864371637


##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1580,80 @@ def process_outputs(
         self.main_receivers.receive(windowed_value)
       else:
         self.tagged_receivers[tag].receive(windowed_value)
+
+    # TODO(BEAM-3937): Remove if block after output counter released.
+    # Only enable per_element_output_counter when counter cythonized
+    if self.per_element_output_counter is not None:
+      self.per_element_output_counter.add_input(output_element_count)
+
+  def process_batch_outputs(
+      self, windowed_input_batch, results, watermark_estimator=None):
+    # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> 
None
+
+    """Dispatch the result of process computation to the appropriate receivers.
+
+    A value wrapped in a TaggedOutput object will be unwrapped and
+    then dispatched to the appropriate indexed output.
+    """
+    if results is None:
+      # TODO(BEAM-3937): Remove if block after output counter released.
+      # Only enable per_element_output_counter when counter cythonized.
+      if self.per_element_output_counter is not None:
+        self.per_element_output_counter.add_input(0)
+      return
+
+    # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+    #  if performance_runtime_type_check is active, without harming performance
+
+    output_element_count = 0
+    for result in results:
+      # results here may be a generator, which cannot call len on it.
+      output_element_count += 1

Review Comment:
   Done





Issue Time Tracking
-------------------

    Worklog Id:     (was: 765775)
    Time Spent: 6h 40m  (was: 6.5h)

> MVP for SDK worker changes to support process_batch
> ---------------------------------------------------
>
>                 Key: BEAM-14294
>                 URL: https://issues.apache.org/jira/browse/BEAM-14294
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Brian Hulette
>            Priority: P2
>          Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> The initial MVP may only work in some restricted circumstances (e.g. 
> @yields_element on process_batch, or batch-to-batch without a 1:1 
> input:output mapping might not be supported). These cases should fail early. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to