[ 
https://issues.apache.org/jira/browse/BEAM-14294?focusedWorklogId=758894&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-758894
 ]

ASF GitHub Bot logged work on BEAM-14294:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Apr/22 23:58
            Start Date: 19/Apr/22 23:58
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r853599231


##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1170,6 +1241,9 @@ def __init__(self,
     else:
       per_element_output_counter = None
 
+    # TODO: output processor assumes DoFns are batch-to-batch or
+    # element-to-element, @yields_batches and @yields_elements will break this
+    # assumption.

Review Comment:
   Tag the appropriate jira here



##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -329,7 +432,9 @@ def finish(self):
     # type: () -> None
 
     """Finish operation."""
-    pass
+    # TODO: Do we need an output_index here

Review Comment:
   This is still an open question



##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1450,78 @@ def process_outputs(
         self.main_receivers.receive(windowed_value)
       else:
         self.tagged_receivers[tag].receive(windowed_value)
+
+    # TODO(BEAM-3937): Remove if block after output counter released.
+    # Only enable per_element_output_counter when counter cythonized
+    if self.per_element_output_counter is not None:
+      self.per_element_output_counter.add_input(output_element_count)
+
+  def process_batch_outputs(
+      self, windowed_input_batch, results, watermark_estimator=None):
+    # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> 
None
+
+    """Dispatch the result of process computation to the appropriate receivers.
+
+    A value wrapped in a TaggedOutput object will be unwrapped and
+    then dispatched to the appropriate indexed output.
+    """
+    if results is None:
+      # TODO(BEAM-3937): Remove if block after output counter released.
+      # Only enable per_element_output_counter when counter cythonized.
+      if self.per_element_output_counter is not None:
+        self.per_element_output_counter.add_input(0)
+      return
+
+    # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+    #  if performance_runtime_type_check is active, without harming performance
+
+    output_element_count = 0
+    for result in results:
+      # results here may be a generator, which cannot call len on it.
+      output_element_count += 1
+      tag = None
+      if isinstance(result, TaggedOutput):
+        tag = result.tag
+        if not isinstance(tag, str):
+          raise TypeError('In %s, tag %s is not a string' % (self, tag))
+        result = result.value
+      if isinstance(result, (WindowedValue, TimestampedValue)):
+        # TODO: Helpful message
+        raise RuntimeError
+      if isinstance(result, WindowedBatch):
+        windowed_batch = result
+        # TODO: Should we do this in batches?
+        #       Would need to require one batch per window
+        #if (windowed_input_element is not None and
+        #    len(windowed_input_element.windows) != 1):
+        #  windowed_value.windows *= len(windowed_input_element.windows)
+      # TODO: TimestampedBatch
+      #elif isinstance(result, TimestampedValue):
+      #  assign_context = WindowFn.AssignContext(result.timestamp, 
result.value)
+      #  windowed_value = WindowedValue(
+      #      result.value,
+      #      result.timestamp,
+      #      self.window_fn.assign(assign_context))
+      #  if len(windowed_input_element.windows) != 1:
+      #    windowed_value.windows *= len(windowed_input_element.windows)
+      else:
+        # TODO: This should error unless the DoFn was defined with
+        # @DoFn.yields_batches(output_aligned_with_input=True)
+        # We should consider also validating that the length is the same as
+        # windowed_input_batch
+        windowed_batch = windowed_input_batch.with_values(result)
+
+      if watermark_estimator is not None:
+        for timestamp in windowed_batch.timestamps:
+          watermark_estimator.observe_timestamp(timestamp)
+      if tag is None:
+        self.main_receivers.receive_batch(windowed_batch)
+      else:
+        self.tagged_receivers[tag].receive_batch(windowed_batch)

Review Comment:
   OutputProcessor logic needs to be cleaned up





Issue Time Tracking
-------------------

    Worklog Id:     (was: 758894)
    Time Spent: 40m  (was: 0.5h)

> MVP for SDK worker changes to support process_batch
> ---------------------------------------------------
>
>                 Key: BEAM-14294
>                 URL: https://issues.apache.org/jira/browse/BEAM-14294
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Brian Hulette
>            Priority: P2
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> The initial MVP may only work in some restricted circumstances (e.g. 
> @yields_element on process_batch, or batch-to-batch without a 1:1 
> input:output mapping might not be supported). These cases should fail early. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to