[
https://issues.apache.org/jira/browse/BEAM-14294?focusedWorklogId=765600&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765600
]
ASF GitHub Bot logged work on BEAM-14294:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 03/May/22 18:09
Start Date: 03/May/22 18:09
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864050411
##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
return self.consumer.current_element_progress()
+class GeneralPurposeConsumerSet(ConsumerSet):
+ """ConsumerSet implementation that handles all combinations of possible
edges.
+ """
+ def __init__(self,
+ counter_factory,
+ step_name, # type: str
+ output_index,
+ coder,
+ producer_type_hints,
+ consumers, # type: List[Operation]
+ producer_batch_converter):
+ super().__init__(
+ counter_factory,
+ step_name,
+ output_index,
+ consumers,
+ coder,
+ producer_type_hints)
+
+ self.producer_batch_converter = producer_batch_converter
+
+ # Partition consumers into three groups:
+ # - consumers that will be passed elements
+ # - consumers that will be passed batches (where their input batch type
+ # matches the output of the producer)
+ # - consumers that will be passed converted batches
+ self.element_consumers: List[Operation] = []
+ self.passthrough_batch_consumers: List[Operation] = []
+ other_batch_consumers: DefaultDict[
+ BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+ for consumer in consumers:
+ if not consumer.get_batching_preference().supports_batches:
+ self.element_consumers.append(consumer)
+ elif (consumer.get_input_batch_converter() ==
+ self.producer_batch_converter):
+ self.passthrough_batch_consumers.append(consumer)
+ else:
+ # Batch consumer with a mismatched batch type
+ if consumer.get_batching_preference().supports_elements:
+ # Pass it elements if we can
+ self.element_consumers.append(consumer)
+ else:
+ # As a last resort, explode and rebatch
+ consumer_batch_converter = consumer.get_input_batch_converter()
+ # This consumer supports batches, it must have a batch converter
+ assert consumer_batch_converter is not None
+ other_batch_consumers[consumer_batch_converter].append(consumer)
+
+ self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+ other_batch_consumers)
+
+ self.has_batch_consumers = (
+ self.passthrough_batch_consumers or self.other_batch_consumers)
+ self._batched_elements: List[Any] = []
+
+ def receive(self, windowed_value):
+ # type: (WindowedValue) -> None
+ self.update_counters_start(windowed_value)
+
+ for consumer in self.element_consumers:
+ cython.cast(Operation, consumer).process(windowed_value)
+
+ # TODO: Do this branching when contstructing ConsumerSet
+ if self.has_batch_consumers:
+ self._batched_elements.append(windowed_value)
+
+ self.update_counters_finish()
Review Comment:
I'm not sure how to resolve this
Issue Time Tracking
-------------------
Worklog Id: (was: 765600)
Time Spent: 5h 50m (was: 5h 40m)
> MVP for SDK worker changes to support process_batch
> ---------------------------------------------------
>
> Key: BEAM-14294
> URL: https://issues.apache.org/jira/browse/BEAM-14294
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Brian Hulette
> Priority: P2
> Time Spent: 5h 50m
> Remaining Estimate: 0h
>
> The initial MVP may only work in some restricted circumstances (e.g.
> @yields_element on process_batch, or batch-to-batch without a 1:1
> input:output mapping might not be supported). These cases should fail early.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)