[
https://issues.apache.org/jira/browse/BEAM-14294?focusedWorklogId=765245&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765245
]
ASF GitHub Bot logged work on BEAM-14294:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 03/May/22 00:41
Start Date: 03/May/22 00:41
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863285586
##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1580,80 @@ def process_outputs(
self.main_receivers.receive(windowed_value)
else:
self.tagged_receivers[tag].receive(windowed_value)
+
+ # TODO(BEAM-3937): Remove if block after output counter released.
+ # Only enable per_element_output_counter when counter cythonized
+ if self.per_element_output_counter is not None:
+ self.per_element_output_counter.add_input(output_element_count)
+
+ def process_batch_outputs(
+ self, windowed_input_batch, results, watermark_estimator=None):
+ # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) ->
None
+
+ """Dispatch the result of process computation to the appropriate receivers.
+
+ A value wrapped in a TaggedOutput object will be unwrapped and
+ then dispatched to the appropriate indexed output.
+ """
+ if results is None:
+ # TODO(BEAM-3937): Remove if block after output counter released.
+ # Only enable per_element_output_counter when counter cythonized.
+ if self.per_element_output_counter is not None:
+ self.per_element_output_counter.add_input(0)
+ return
+
+ # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+ # if performance_runtime_type_check is active, without harming performance
+
+ output_element_count = 0
+ for result in results:
+ # results here may be a generator, which cannot call len on it.
+ output_element_count += 1
+ tag = None
+ if isinstance(result, TaggedOutput):
+ tag = result.tag
+ if not isinstance(tag, str):
+ raise TypeError('In %s, tag %s is not a string' % (self, tag))
+ result = result.value
+ if isinstance(result, (WindowedValue, TimestampedValue)):
+ raise TypeError(
+ f"Received {type(result).__name__} from DoFn that was "
+ "expected to produce a batch.")
+ if isinstance(result, WindowedBatch):
+ if isinstance(result, ConcreteWindowedBatch):
+ # TODO: Rebatch into homogenous batches (or remove
+ # ConcreteWindowedBatch)
+ raise NotImplementedError
+ elif isinstance(result, HomogeneousWindowedBatch):
+ windowed_batch = result
+ else:
+ raise AssertionError(
+ "Unrecognized WindowedBatch implementation: "
+ f"{type(windowed_batch)}")
+
+ if (windowed_input_batch is not None and
+ len(windowed_input_batch.windows) != 1):
+ windowed_batch.windows *= len(windowed_input_batch.windows)
+ # TODO(BEAM-14292): Add TimestampedBatch, an analogue for
TimestampedValue
Review Comment:
Ack, that could be a better solution. We could make it work nicely with
schemas + process_batch
Issue Time Tracking
-------------------
Worklog Id: (was: 765245)
Time Spent: 3h 50m (was: 3h 40m)
> MVP for SDK worker changes to support process_batch
> ---------------------------------------------------
>
> Key: BEAM-14294
> URL: https://issues.apache.org/jira/browse/BEAM-14294
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Brian Hulette
> Priority: P2
> Time Spent: 3h 50m
> Remaining Estimate: 0h
>
> The initial MVP may only work in some restricted circumstances (e.g.
> @yields_element on process_batch, or batch-to-batch without a 1:1
> input:output mapping might not be supported). These cases should fail early.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)