This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9abf706  Add some pydoc for runner classes in Python
     new 516cdb6  Merge pull request #8415 from pabloem/doc-fn
9abf706 is described below

commit 9abf7067d41511d687ba197995d1f94a99e9c708
Author: pabloem <[email protected]>
AuthorDate: Fri Apr 26 11:15:15 2019 -0700

    Add some pydoc for runner classes in Python
---
 .../runners/portability/fn_api_runner.py           | 26 +++++++++++++------
 .../portability/fn_api_runner_transforms.py        | 23 +++++++++++++----
 .../apache_beam/runners/worker/bundle_processor.py | 29 +++++++++++++++++++---
 .../apache_beam/runners/worker/sdk_worker.py       | 28 +++++++++++++++++++++
 4 files changed, 90 insertions(+), 16 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index bdbbda2..bc330b1 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -267,6 +267,7 @@ class FnApiRunner(runner.PipelineRunner):
     # to enforce that the inputs (and outputs) of GroupByKey operations
     # are known to be KVs.
     from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
+    # TODO: Move group_by_key_input_visitor() to a non-dataflow specific file.
     pipeline.visit(DataflowRunner.group_by_key_input_visitor())
     self._bundle_repeat = self._bundle_repeat or options.view_as(
         pipeline_options.DirectOptions).direct_runner_bundle_repeat
@@ -362,13 +363,22 @@ class FnApiRunner(runner.PipelineRunner):
     return RunnerResult(
         runner.PipelineState.DONE, monitoring_infos_by_stage, metrics_by_stage)
 
-  def run_stage(
-      self,
-      worker_handler_factory,
-      pipeline_components,
-      stage,
-      pcoll_buffers,
-      safe_coders):
+  def run_stage(self,
+                worker_handler_factory,
+                pipeline_components,
+                stage,
+                pcoll_buffers,
+                safe_coders):
+    """Run an individual stage.
+
+    Args:
+      worker_handler_factory: A ``callable`` that takes in an environment, and
+        returns a ``WorkerHandler`` class.
+      pipeline_components: TODO
+      stage: TODO
+      pcoll_buffers: TODO
+      safe_coders: TODO
+    """
 
     def iterable_state_write(values, element_coder_impl):
       token = unique_name(None, 'iter').encode('ascii')
@@ -1444,7 +1454,7 @@ class RunnerResult(runner.PipelineResult):
     return self._state
 
   def metrics(self):
-    """Returns a queryable oject including user metrics only."""
+    """Returns a queryable object including user metrics only."""
     if self._metrics is None:
       self._metrics = FnApiMetrics(
           self._monitoring_infos_by_stage, user_metrics_only=True)
diff --git 
a/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
index f1aef40..fdced4b 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
@@ -449,11 +449,24 @@ def pipeline_from_stages(
   return new_proto
 
 
-def create_and_optimize_stages(
-    pipeline_proto,
-    phases,
-    known_runner_urns,
-    use_state_iterables=False):
+def create_and_optimize_stages(pipeline_proto,
+                               phases,
+                               known_runner_urns,
+                               use_state_iterables=False):
+  """Create a set of stages given a pipeline proto, and set of optimizations.
+
+  Args:
+    pipeline_proto (beam_runner_api_pb2.Pipeline): A pipeline defined by a 
user.
+    phases (callable): Each phase identifies a specific transformation to be
+      applied to the pipeline graph. Existing phases are defined in this file,
+      and receive a list of stages, and a pipeline context. Some available
+      transformations are ``lift_combiners``, ``expand_sdf``, ``expand_gbk``,
+      etc.
+
+  Returns:
+    A tuple with a pipeline context, and a list of stages (i.e. an optimized
+    graph).
+  """
   pipeline_context = TransformContext(
       pipeline_proto.components,
       known_runner_urns,
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index dc060bb..1437508 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -104,8 +104,7 @@ class DataOutputOperation(RunnerIOOperation):
 
 
 class DataInputOperation(RunnerIOOperation):
-  """A source-like operation that gathers input from the runner.
-  """
+  """A source-like operation that gathers input from the runner."""
 
   def __init__(self, operation_name, step_name, consumers, counter_factory,
                state_sampler, windowed_coder, input_target, data_channel):
@@ -390,8 +389,20 @@ class OutputTimer(object):
 
 
 class FnApiUserStateContext(userstate.UserStateContext):
+  """Interface for state and timers from SDK to Fn API servicer of state.."""
+
   def __init__(
       self, state_handler, transform_id, key_coder, window_coder, timer_specs):
+    """Initialize a ``FnApiUserStateContext``.
+
+    Args:
+      state_handler: A StateServicer object.
+      transform_id: The name of the PTransform that this context is associated.
+      key_coder:
+      window_coder:
+      timer_specs: A list of ``userstate.TimerSpec`` objects specifying the
+        timers associated with this operation.
+    """
     self._state_handler = state_handler
     self._transform_id = transform_id
     self._key_coder = key_coder
@@ -401,6 +412,7 @@ class FnApiUserStateContext(userstate.UserStateContext):
     self._all_states = {}
 
   def update_timer_receivers(self, receivers):
+    """TODO"""
     self._timer_receivers = {}
     for tag in self._timer_specs:
       self._timer_receivers[tag] = receivers.pop(tag)
@@ -461,9 +473,20 @@ def only_element(iterable):
 
 
 class BundleProcessor(object):
-  """A class for processing bundles of elements."""
+  """A class for processing bundles of elements.
+
+  """
+
   def __init__(
       self, process_bundle_descriptor, state_handler, data_channel_factory):
+    """Initialize a bundle processor.
+
+    Args:
+      process_bundle_descriptor (``beam_fn_api_pb2.ProcessBundleDescriptor``):
+        a description of the stage that this ``BundleProcessor``is to execute.
+      state_handler (beam_fn_api_pb2_grpc.BeamFnStateServicer).
+      data_channel_factory (``data_plane.DataChannelFactory``).
+    """
     self.process_bundle_descriptor = process_bundle_descriptor
     self.state_handler = state_handler
     self.data_channel_factory = data_channel_factory
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 7be8cf5..663f224 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -262,6 +262,26 @@ class SdkHarness(object):
 
 
 class BundleProcessorCache(object):
+  """A cache for ``BundleProcessor``s.
+
+  ``BundleProcessor`` objects are cached by the id of their
+  ``beam_fn_api_pb2.ProcessBundleDescriptor``.
+
+  Attributes:
+    fns (dict): A dictionary that maps bundle descriptor IDs to instances of
+      ``beam_fn_api_pb2.ProcessBundleDescriptor``.
+    state_handler_factory (``StateHandlerFactory``): Used to create state
+      handlers to be used by a ``bundle_processor.BundleProcessor`` during
+      processing.
+    data_channel_factory (``data_plane.DataChannelFactory``)
+    active_bundle_processors (dict): A dictionary, indexed by instruction IDs,
+      containing ``bundle_processor.BundleProcessor`` objects that are 
currently
+      active processing the corresponding instruction.
+    cached_bundle_processors (dict): A dictionary, indexed by bundle processor
+      id, of cached ``bundle_processor.BundleProcessor`` that are not currently
+      performing processing.
+  """
+
   def __init__(self, state_handler_factory, data_channel_factory, fns):
     self.fns = fns
     self.state_handler_factory = state_handler_factory
@@ -270,6 +290,7 @@ class BundleProcessorCache(object):
     self.cached_bundle_processors = collections.defaultdict(list)
 
   def register(self, bundle_descriptor):
+    """Register a ``beam_fn_api_pb2.ProcessBundleDescriptor`` by its id."""
     self.fns[bundle_descriptor.id] = bundle_descriptor
 
   def get(self, instruction_id, bundle_descriptor_id):
@@ -314,6 +335,13 @@ class SdkWorker(object):
       raise NotImplementedError
 
   def register(self, request, instruction_id):
+    """Registers a set of ``beam_fn_api_pb2.ProcessBundleDescriptor``s.
+
+    This set of ``beam_fn_api_pb2.ProcessBundleDescriptor`` come as part of a
+    ``beam_fn_api_pb2.RegisterRequest``, which the runner sends to the SDK
+    worker before starting processing to register stages.
+    """
+
     for process_bundle_descriptor in request.process_bundle_descriptor:
       self.bundle_processor_cache.register(process_bundle_descriptor)
     return beam_fn_api_pb2.InstructionResponse(

Reply via email to