[
https://issues.apache.org/jira/browse/BEAM-14294?focusedWorklogId=765775&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765775
]
ASF GitHub Bot logged work on BEAM-14294:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 04/May/22 00:32
Start Date: 04/May/22 00:32
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864371637
##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1580,80 @@ def process_outputs(
self.main_receivers.receive(windowed_value)
else:
self.tagged_receivers[tag].receive(windowed_value)
+
+ # TODO(BEAM-3937): Remove if block after output counter released.
+ # Only enable per_element_output_counter when counter cythonized
+ if self.per_element_output_counter is not None:
+ self.per_element_output_counter.add_input(output_element_count)
+
+ def process_batch_outputs(
+ self, windowed_input_batch, results, watermark_estimator=None):
+ # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) ->
None
+
+ """Dispatch the result of process computation to the appropriate receivers.
+
+ A value wrapped in a TaggedOutput object will be unwrapped and
+ then dispatched to the appropriate indexed output.
+ """
+ if results is None:
+ # TODO(BEAM-3937): Remove if block after output counter released.
+ # Only enable per_element_output_counter when counter cythonized.
+ if self.per_element_output_counter is not None:
+ self.per_element_output_counter.add_input(0)
+ return
+
+ # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+ # if performance_runtime_type_check is active, without harming performance
+
+ output_element_count = 0
+ for result in results:
+ # results here may be a generator, which cannot call len on it.
+ output_element_count += 1
Review Comment:
Done
Issue Time Tracking
-------------------
Worklog Id: (was: 765775)
Time Spent: 6h 40m (was: 6.5h)
> MVP for SDK worker changes to support process_batch
> ---------------------------------------------------
>
> Key: BEAM-14294
> URL: https://issues.apache.org/jira/browse/BEAM-14294
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Brian Hulette
> Priority: P2
> Time Spent: 6h 40m
> Remaining Estimate: 0h
>
> The initial MVP may only work in some restricted circumstances (e.g.
> @yields_element on process_batch, or batch-to-batch without a 1:1
> input:output mapping might not be supported). These cases should fail early.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)