pabloem commented on a change in pull request #15441:
URL: https://github.com/apache/beam/pull/15441#discussion_r725197488
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -584,100 +679,164 @@ def
_add_residuals_and_channel_splits_to_deferred_inputs(
channel_split.transform_id] = channel_split.last_primary_element
return pcolls_with_delayed_apps, transforms_with_channel_splits
- def _run_stage(self,
+ def _execute_bundle(self,
runner_execution_context, # type:
execution.FnApiRunnerExecutionContext
bundle_context_manager, # type:
execution.BundleContextManager
- ):
- # type: (...) -> beam_fn_api_pb2.InstructionResponse
-
- """Run an individual stage.
+ bundle_input: DataInput
+ ) -> beam_fn_api_pb2.InstructionResponse:
+ """Execute a bundle end-to-end.
Args:
runner_execution_context (execution.FnApiRunnerExecutionContext): An
object containing execution information for the pipeline.
bundle_context_manager (execution.BundleContextManager): A description of
the stage to execute, and its context.
+ bundle_input: The set of buffers to input into this bundle
"""
- data_input, data_output, expected_timer_output = (
- bundle_context_manager.extract_bundle_inputs_and_outputs())
- input_timers = {
- } # type: Mapping[Tuple[str, str], execution.PartitionableBuffer]
-
worker_handler_manager = runner_execution_context.worker_handler_manager
- _LOGGER.info('Running %s', bundle_context_manager.stage.name)
+
+ # TODO(pabloem): Should move this to be done once per stage
worker_handler_manager.register_process_bundle_descriptor(
bundle_context_manager.process_bundle_descriptor)
- # We create the bundle manager here, as it can be reused for bundles of the
- # same stage, but it may have to be created by-bundle later on.
+ # We create the bundle manager here, as it can be reused for bundles of
+ # the same stage, but it may have to be created by-bundle later on.
+ bundle_manager = self._get_bundle_manager(bundle_context_manager)
+
+ last_result, deferred_inputs, newly_set_timers, watermark_updates = (
+ self._run_bundle(
+ runner_execution_context,
+ bundle_context_manager,
+ bundle_input,
+ bundle_context_manager.stage_data_outputs,
+ bundle_context_manager.stage_timer_outputs,
+ bundle_manager))
+
+ for pc_name, watermark in watermark_updates.items():
+ runner_execution_context.watermark_manager.set_pcoll_watermark(
+ pc_name, watermark)
+
+ if deferred_inputs:
+ assert (runner_execution_context.watermark_manager.get_stage_node(
+ bundle_context_manager.stage.name).output_watermark()
+ < timestamp.MAX_TIMESTAMP), (
+ 'wrong timestamp for %s. '
+ % runner_execution_context.watermark_manager.get_stage_node(
+ bundle_context_manager.stage.name))
+ runner_execution_context.queues.ready_inputs.enque(
+ (bundle_context_manager.stage.name, DataInput(deferred_inputs, {})))
+
+ self._enqueue_set_timers(
+ runner_execution_context,
+ bundle_context_manager,
+ newly_set_timers,
+ bundle_input)
+
+ # Store the required downstream side inputs into state so it is accessible
+ # for the worker when it runs bundles that consume this stage's output.
+ data_side_input = (
+ runner_execution_context.side_input_descriptors_by_stage.get(
+ bundle_context_manager.stage.name, {}))
+ runner_execution_context.commit_side_inputs_to_state(data_side_input)
+
+ buffers_to_clean = set()
+ known_consumers = set()
+ for _, buffer_id in bundle_context_manager.stage_data_outputs.items():
+ for (consuming_stage_name, consuming_transform) in \
+ runner_execution_context.buffer_id_to_consumer_pairs.get(buffer_id,
+ []):
+ buffer = runner_execution_context.pcoll_buffers.get(
+ buffer_id, ListBuffer(None))
+
+ if (buffer_id in runner_execution_context.pcoll_buffers and
+ buffer_id not in buffers_to_clean):
+ buffers_to_clean.add(buffer_id)
+ elif buffer and buffer_id in buffers_to_clean:
+ # If the buffer_id has already been added to buffers_to_clean, this
+ # means that the buffer is being consumed by two separate stages,
+ # so we create a copy of the buffer for every new stage.
+ runner_execution_context.pcoll_buffers[buffer_id] = buffer.copy()
+ buffer = runner_execution_context.pcoll_buffers[buffer_id]
Review comment:
right - so the flow is like this:
1. Run stage, and write its outputs to `pcoll_buffers`
2. For each stage output, do:
3. Get the data buffer from `pcoll_buffers`
3. Find its next downstream consumer and enqueue this buffer to be consumed
4. If there are more downstream consumers, make a copy of the buffer, add it
to `pcoll_buffers`, and go back to point 3
5. If there are no more downstream consumers, continue to next step
In the general case, a PCollection will have only one consumer - so the
buffer will not need to be copied, but if there are multiple downstream
consumers, then we create copies starting for the second one, so that each
buffer copy is pushed to one consumer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]