[
https://issues.apache.org/jira/browse/BEAM-14294?focusedWorklogId=758894&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-758894
]
ASF GitHub Bot logged work on BEAM-14294:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Apr/22 23:58
Start Date: 19/Apr/22 23:58
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r853599231
##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1170,6 +1241,9 @@ def __init__(self,
else:
per_element_output_counter = None
+ # TODO: output processor assumes DoFns are batch-to-batch or
+ # element-to-element, @yields_batches and @yields_elements will break this
+ # assumption.
Review Comment:
Tag the appropriate jira here
##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -329,7 +432,9 @@ def finish(self):
# type: () -> None
"""Finish operation."""
- pass
+ # TODO: Do we need an output_index here
Review Comment:
This is still an open question
##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1450,78 @@ def process_outputs(
self.main_receivers.receive(windowed_value)
else:
self.tagged_receivers[tag].receive(windowed_value)
+
+ # TODO(BEAM-3937): Remove if block after output counter released.
+ # Only enable per_element_output_counter when counter cythonized
+ if self.per_element_output_counter is not None:
+ self.per_element_output_counter.add_input(output_element_count)
+
+ def process_batch_outputs(
+ self, windowed_input_batch, results, watermark_estimator=None):
+ # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) ->
None
+
+ """Dispatch the result of process computation to the appropriate receivers.
+
+ A value wrapped in a TaggedOutput object will be unwrapped and
+ then dispatched to the appropriate indexed output.
+ """
+ if results is None:
+ # TODO(BEAM-3937): Remove if block after output counter released.
+ # Only enable per_element_output_counter when counter cythonized.
+ if self.per_element_output_counter is not None:
+ self.per_element_output_counter.add_input(0)
+ return
+
+ # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+ # if performance_runtime_type_check is active, without harming performance
+
+ output_element_count = 0
+ for result in results:
+ # results here may be a generator, which cannot call len on it.
+ output_element_count += 1
+ tag = None
+ if isinstance(result, TaggedOutput):
+ tag = result.tag
+ if not isinstance(tag, str):
+ raise TypeError('In %s, tag %s is not a string' % (self, tag))
+ result = result.value
+ if isinstance(result, (WindowedValue, TimestampedValue)):
+ # TODO: Helpful message
+ raise RuntimeError
+ if isinstance(result, WindowedBatch):
+ windowed_batch = result
+ # TODO: Should we do this in batches?
+ # Would need to require one batch per window
+ #if (windowed_input_element is not None and
+ # len(windowed_input_element.windows) != 1):
+ # windowed_value.windows *= len(windowed_input_element.windows)
+ # TODO: TimestampedBatch
+ #elif isinstance(result, TimestampedValue):
+ # assign_context = WindowFn.AssignContext(result.timestamp,
result.value)
+ # windowed_value = WindowedValue(
+ # result.value,
+ # result.timestamp,
+ # self.window_fn.assign(assign_context))
+ # if len(windowed_input_element.windows) != 1:
+ # windowed_value.windows *= len(windowed_input_element.windows)
+ else:
+ # TODO: This should error unless the DoFn was defined with
+ # @DoFn.yields_batches(output_aligned_with_input=True)
+ # We should consider also validating that the length is the same as
+ # windowed_input_batch
+ windowed_batch = windowed_input_batch.with_values(result)
+
+ if watermark_estimator is not None:
+ for timestamp in windowed_batch.timestamps:
+ watermark_estimator.observe_timestamp(timestamp)
+ if tag is None:
+ self.main_receivers.receive_batch(windowed_batch)
+ else:
+ self.tagged_receivers[tag].receive_batch(windowed_batch)
Review Comment:
OutputProcessor logic needs to be cleaned up
Issue Time Tracking
-------------------
Worklog Id: (was: 758894)
Time Spent: 40m (was: 0.5h)
> MVP for SDK worker changes to support process_batch
> ---------------------------------------------------
>
> Key: BEAM-14294
> URL: https://issues.apache.org/jira/browse/BEAM-14294
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Brian Hulette
> Priority: P2
> Time Spent: 40m
> Remaining Estimate: 0h
>
> The initial MVP may only work in some restricted circumstances (e.g.
> @yields_element on process_batch, or batch-to-batch without a 1:1
> input:output mapping might not be supported). These cases should fail early.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)