This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9abf706 Add some pydoc for runner classes in Python
new 516cdb6 Merge pull request #8415 from pabloem/doc-fn
9abf706 is described below
commit 9abf7067d41511d687ba197995d1f94a99e9c708
Author: pabloem <[email protected]>
AuthorDate: Fri Apr 26 11:15:15 2019 -0700
Add some pydoc for runner classes in Python
---
.../runners/portability/fn_api_runner.py | 26 +++++++++++++------
.../portability/fn_api_runner_transforms.py | 23 +++++++++++++----
.../apache_beam/runners/worker/bundle_processor.py | 29 +++++++++++++++++++---
.../apache_beam/runners/worker/sdk_worker.py | 28 +++++++++++++++++++++
4 files changed, 90 insertions(+), 16 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index bdbbda2..bc330b1 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -267,6 +267,7 @@ class FnApiRunner(runner.PipelineRunner):
# to enforce that the inputs (and outputs) of GroupByKey operations
# are known to be KVs.
from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
+ # TODO: Move group_by_key_input_visitor() to a non-dataflow specific file.
pipeline.visit(DataflowRunner.group_by_key_input_visitor())
self._bundle_repeat = self._bundle_repeat or options.view_as(
pipeline_options.DirectOptions).direct_runner_bundle_repeat
@@ -362,13 +363,22 @@ class FnApiRunner(runner.PipelineRunner):
return RunnerResult(
runner.PipelineState.DONE, monitoring_infos_by_stage, metrics_by_stage)
- def run_stage(
- self,
- worker_handler_factory,
- pipeline_components,
- stage,
- pcoll_buffers,
- safe_coders):
+ def run_stage(self,
+ worker_handler_factory,
+ pipeline_components,
+ stage,
+ pcoll_buffers,
+ safe_coders):
+ """Run an individual stage.
+
+ Args:
+ worker_handler_factory: A ``callable`` that takes in an environment, and
+ returns a ``WorkerHandler`` class.
+ pipeline_components: TODO
+ stage: TODO
+ pcoll_buffers: TODO
+ safe_coders: TODO
+ """
def iterable_state_write(values, element_coder_impl):
token = unique_name(None, 'iter').encode('ascii')
@@ -1444,7 +1454,7 @@ class RunnerResult(runner.PipelineResult):
return self._state
def metrics(self):
- """Returns a queryable oject including user metrics only."""
+ """Returns a queryable object including user metrics only."""
if self._metrics is None:
self._metrics = FnApiMetrics(
self._monitoring_infos_by_stage, user_metrics_only=True)
diff --git
a/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
index f1aef40..fdced4b 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
@@ -449,11 +449,24 @@ def pipeline_from_stages(
return new_proto
-def create_and_optimize_stages(
- pipeline_proto,
- phases,
- known_runner_urns,
- use_state_iterables=False):
+def create_and_optimize_stages(pipeline_proto,
+ phases,
+ known_runner_urns,
+ use_state_iterables=False):
+ """Create a set of stages given a pipeline proto, and set of optimizations.
+
+ Args:
+ pipeline_proto (beam_runner_api_pb2.Pipeline): A pipeline defined by a
user.
+ phases (callable): Each phase identifies a specific transformation to be
+ applied to the pipeline graph. Existing phases are defined in this file,
+ and receive a list of stages, and a pipeline context. Some available
+ transformations are ``lift_combiners``, ``expand_sdf``, ``expand_gbk``,
+ etc.
+
+ Returns:
+ A tuple with a pipeline context, and a list of stages (i.e. an optimized
+ graph).
+ """
pipeline_context = TransformContext(
pipeline_proto.components,
known_runner_urns,
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index dc060bb..1437508 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -104,8 +104,7 @@ class DataOutputOperation(RunnerIOOperation):
class DataInputOperation(RunnerIOOperation):
- """A source-like operation that gathers input from the runner.
- """
+ """A source-like operation that gathers input from the runner."""
def __init__(self, operation_name, step_name, consumers, counter_factory,
state_sampler, windowed_coder, input_target, data_channel):
@@ -390,8 +389,20 @@ class OutputTimer(object):
class FnApiUserStateContext(userstate.UserStateContext):
+ """Interface for state and timers from SDK to Fn API servicer of state.."""
+
def __init__(
self, state_handler, transform_id, key_coder, window_coder, timer_specs):
+ """Initialize a ``FnApiUserStateContext``.
+
+ Args:
+ state_handler: A StateServicer object.
+ transform_id: The name of the PTransform that this context is associated.
+ key_coder:
+ window_coder:
+ timer_specs: A list of ``userstate.TimerSpec`` objects specifying the
+ timers associated with this operation.
+ """
self._state_handler = state_handler
self._transform_id = transform_id
self._key_coder = key_coder
@@ -401,6 +412,7 @@ class FnApiUserStateContext(userstate.UserStateContext):
self._all_states = {}
def update_timer_receivers(self, receivers):
+ """TODO"""
self._timer_receivers = {}
for tag in self._timer_specs:
self._timer_receivers[tag] = receivers.pop(tag)
@@ -461,9 +473,20 @@ def only_element(iterable):
class BundleProcessor(object):
- """A class for processing bundles of elements."""
+ """A class for processing bundles of elements.
+
+ """
+
def __init__(
self, process_bundle_descriptor, state_handler, data_channel_factory):
+ """Initialize a bundle processor.
+
+ Args:
+ process_bundle_descriptor (``beam_fn_api_pb2.ProcessBundleDescriptor``):
+ a description of the stage that this ``BundleProcessor``is to execute.
+ state_handler (beam_fn_api_pb2_grpc.BeamFnStateServicer).
+ data_channel_factory (``data_plane.DataChannelFactory``).
+ """
self.process_bundle_descriptor = process_bundle_descriptor
self.state_handler = state_handler
self.data_channel_factory = data_channel_factory
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 7be8cf5..663f224 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -262,6 +262,26 @@ class SdkHarness(object):
class BundleProcessorCache(object):
+ """A cache for ``BundleProcessor``s.
+
+ ``BundleProcessor`` objects are cached by the id of their
+ ``beam_fn_api_pb2.ProcessBundleDescriptor``.
+
+ Attributes:
+ fns (dict): A dictionary that maps bundle descriptor IDs to instances of
+ ``beam_fn_api_pb2.ProcessBundleDescriptor``.
+ state_handler_factory (``StateHandlerFactory``): Used to create state
+ handlers to be used by a ``bundle_processor.BundleProcessor`` during
+ processing.
+ data_channel_factory (``data_plane.DataChannelFactory``)
+ active_bundle_processors (dict): A dictionary, indexed by instruction IDs,
+ containing ``bundle_processor.BundleProcessor`` objects that are
currently
+ active processing the corresponding instruction.
+ cached_bundle_processors (dict): A dictionary, indexed by bundle processor
+ id, of cached ``bundle_processor.BundleProcessor`` that are not currently
+ performing processing.
+ """
+
def __init__(self, state_handler_factory, data_channel_factory, fns):
self.fns = fns
self.state_handler_factory = state_handler_factory
@@ -270,6 +290,7 @@ class BundleProcessorCache(object):
self.cached_bundle_processors = collections.defaultdict(list)
def register(self, bundle_descriptor):
+ """Register a ``beam_fn_api_pb2.ProcessBundleDescriptor`` by its id."""
self.fns[bundle_descriptor.id] = bundle_descriptor
def get(self, instruction_id, bundle_descriptor_id):
@@ -314,6 +335,13 @@ class SdkWorker(object):
raise NotImplementedError
def register(self, request, instruction_id):
+ """Registers a set of ``beam_fn_api_pb2.ProcessBundleDescriptor``s.
+
+ This set of ``beam_fn_api_pb2.ProcessBundleDescriptor`` come as part of a
+ ``beam_fn_api_pb2.RegisterRequest``, which the runner sends to the SDK
+ worker before starting processing to register stages.
+ """
+
for process_bundle_descriptor in request.process_bundle_descriptor:
self.bundle_processor_cache.register(process_bundle_descriptor)
return beam_fn_api_pb2.InstructionResponse(