[
https://issues.apache.org/jira/browse/BEAM-14294?focusedWorklogId=765247&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765247
]
ASF GitHub Bot logged work on BEAM-14294:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 03/May/22 00:49
Start Date: 03/May/22 00:49
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863290541
##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
return self.consumer.current_element_progress()
+class GeneralPurposeConsumerSet(ConsumerSet):
+ """ConsumerSet implementation that handles all combinations of possible
edges.
+ """
+ def __init__(self,
+ counter_factory,
+ step_name, # type: str
+ output_index,
+ coder,
+ producer_type_hints,
+ consumers, # type: List[Operation]
+ producer_batch_converter):
+ super().__init__(
+ counter_factory,
+ step_name,
+ output_index,
+ consumers,
+ coder,
+ producer_type_hints)
+
+ self.producer_batch_converter = producer_batch_converter
+
+ # Partition consumers into three groups:
+ # - consumers that will be passed elements
+ # - consumers that will be passed batches (where their input batch type
+ # matches the output of the producer)
+ # - consumers that will be passed converted batches
+ self.element_consumers: List[Operation] = []
+ self.passthrough_batch_consumers: List[Operation] = []
+ other_batch_consumers: DefaultDict[
+ BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+ for consumer in consumers:
+ if not consumer.get_batching_preference().supports_batches:
+ self.element_consumers.append(consumer)
+ elif (consumer.get_input_batch_converter() ==
+ self.producer_batch_converter):
+ self.passthrough_batch_consumers.append(consumer)
+ else:
+ # Batch consumer with a mismatched batch type
+ if consumer.get_batching_preference().supports_elements:
+ # Pass it elements if we can
+ self.element_consumers.append(consumer)
+ else:
+ # As a last resort, explode and rebatch
+ consumer_batch_converter = consumer.get_input_batch_converter()
+ # This consumer supports batches, it must have a batch converter
+ assert consumer_batch_converter is not None
+ other_batch_consumers[consumer_batch_converter].append(consumer)
+
+ self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+ other_batch_consumers)
+
+ self.has_batch_consumers = (
+ self.passthrough_batch_consumers or self.other_batch_consumers)
+ self._batched_elements: List[Any] = []
+
+ def receive(self, windowed_value):
+ # type: (WindowedValue) -> None
+ self.update_counters_start(windowed_value)
+
+ for consumer in self.element_consumers:
+ cython.cast(Operation, consumer).process(windowed_value)
+
+ # TODO: Do this branching when contstructing ConsumerSet
+ if self.has_batch_consumers:
+ self._batched_elements.append(windowed_value)
+
+ self.update_counters_finish()
+
+ def receive_batch(self, windowed_batch):
+ #self.update_counters_start(windowed_value)
+ if self.element_consumers:
+ for wv in windowed_batch.as_windowed_values(
+ self.producer_batch_converter.explode_batch):
+ for consumer in self.element_consumers:
+ cython.cast(Operation, consumer).process(wv)
+
+ for consumer in self.passthrough_batch_consumers:
+ cython.cast(Operation, consumer).process_batch(windowed_batch)
+
+ for (consumer_batch_converter,
+ consumers) in self.other_batch_consumers.items():
+ # Explode and rebatch into the new batch type (ouch!)
Review Comment:
> No, it will be logged once per instance (which could be a lot, hundreds of
times in streaming).
Is there any way to reduce the number of times it's logged?
> Ideally we could just name the two operations involved.
My concern about it not being actionable was just that it might be difficult
to translate from the Operation names back to the code that produced them.
After looking at it closer it does look like just Operation has a
user-interpretable `__str__` implementation.
Issue Time Tracking
-------------------
Worklog Id: (was: 765247)
Time Spent: 4h (was: 3h 50m)
> MVP for SDK worker changes to support process_batch
> ---------------------------------------------------
>
> Key: BEAM-14294
> URL: https://issues.apache.org/jira/browse/BEAM-14294
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Brian Hulette
> Priority: P2
> Time Spent: 4h
> Remaining Estimate: 0h
>
> The initial MVP may only work in some restricted circumstances (e.g.
> @yields_element on process_batch, or batch-to-batch without a 1:1
> input:output mapping might not be supported). These cases should fail early.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)