This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 70a6d18  [BEAM-4028] Adding NameContext to Python SDK. (#5043)
70a6d18 is described below

commit 70a6d18345e3794bd757206f5c3c2a42fa016ed4
Author: Pablo <pabl...@users.noreply.github.com>
AuthorDate: Fri Apr 13 09:29:24 2018 -0700

    [BEAM-4028] Adding NameContext to Python SDK. (#5043)
    
    Adding NameContext to Python SDK.
---
 sdks/python/apache_beam/runners/common.py          |  70 +++++++++++++
 .../apache_beam/runners/worker/operations.pxd      |   1 +
 .../apache_beam/runners/worker/operations.py       | 113 ++++++++++++---------
 3 files changed, 137 insertions(+), 47 deletions(-)

diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index e3c768b..44f9083 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -39,6 +39,76 @@ from apache_beam.transforms.window import WindowFn
 from apache_beam.utils.windowed_value import WindowedValue
 
 
+class NameContext(object):
+  """Holds the name information for a step."""
+
+  def __init__(self, step_name):
+    """Creates a new step NameContext.
+
+    Args:
+      step_name: The name of the step.
+    """
+    self.step_name = step_name
+
+  def __eq__(self, other):
+    return self.step_name == other.step_name
+
+  def __ne__(self, other):
+    return not self == other
+
+  def __repr__(self):
+    return 'NameContext(%s)' % self.__dict__()
+
+  def __hash__(self):
+    return hash(self.step_name)
+
+  def metrics_name(self):
+    """Returns the step name used for metrics reporting."""
+    return self.step_name
+
+  def logging_name(self):
+    """Returns the step name used for logging."""
+    return self.step_name
+
+
+# TODO(BEAM-4028): Move DataflowNameContext to Dataflow internal code.
+class DataflowNameContext(NameContext):
+  """Holds the name information for a step in Dataflow.
+
+  This includes a step_name (e.g. s2), a user_name (e.g. Foo/Bar/ParDo(Fab)),
+  and a system_name (e.g. s2-shuffle-read34)."""
+
+  def __init__(self, step_name, user_name, system_name):
+    """Creates a new step NameContext.
+
+    Args:
+      step_name: The internal name of the step (e.g. s2).
+      user_name: The full user-given name of the step (e.g. 
Foo/Bar/ParDo(Far)).
+      system_name: The step name in the optimized graph (e.g. s2-1).
+    """
+    super(DataflowNameContext, self).__init__(step_name)
+    self.user_name = user_name
+    self.system_name = system_name
+
+  def __eq__(self, other):
+    return (self.step_name == other.step_name and
+            self.user_name == other.user_name and
+            self.system_name == other.system_name)
+
+  def __ne__(self, other):
+    return not self == other
+
+  def __hash__(self):
+    return hash((self.step_name, self.user_name, self.system_name))
+
+  def __repr__(self):
+    return 'DataflowNameContext(%s)' % self.__dict__()
+
+  def logging_name(self):
+    """Stackdriver logging relies on user-given step names (e.g. Foo/Bar)."""
+    return self.user_name
+
+
 class LoggingContext(object):
   """For internal use only; no backwards-compatibility guarantees."""
 
diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd 
b/sdks/python/apache_beam/runners/worker/operations.pxd
index cb05c90..0aee337 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -39,6 +39,7 @@ cdef class ConsumerSet(Receiver):
 
 
 cdef class Operation(object):
+  cdef readonly name_context
   cdef readonly operation_name
   cdef readonly spec
   cdef object consumers
diff --git a/sdks/python/apache_beam/runners/worker/operations.py 
b/sdks/python/apache_beam/runners/worker/operations.py
index 0fa32e3..977d4bb 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -104,34 +104,45 @@ class Operation(object):
   one or more receiver operations that will take that as input.
   """
 
-  def __init__(self, operation_name, spec, counter_factory, state_sampler):
+  def __init__(self, name_context, spec, counter_factory, state_sampler):
     """Initializes a worker operation instance.
 
     Args:
-      operation_name: The system name assigned by the runner for this
-        operation.
+      name_context: A NameContext instance or string(deprecated), with the
+        name information for this operation.
       spec: A operation_specs.Worker* instance.
       counter_factory: The CounterFactory to use for our counters.
       state_sampler: The StateSampler for the current operation.
     """
-    self.operation_name = operation_name
+    if isinstance(name_context, common.NameContext):
+      # TODO(BEAM-4028): Clean this up once it's completely migrated.
+      # We use the specific operation name that is used for metrics and state
+      # sampling.
+      self.name_context = name_context
+    else:
+      logging.info('Creating namecontext within operation')
+      self.name_context = common.NameContext(name_context)
+
+    # TODO(BEAM-4028): Remove following two lines. Rely on name context.
+    self.operation_name = self.name_context.step_name
+    self.step_name = self.name_context.logging_name()
+
     self.spec = spec
     self.counter_factory = counter_factory
     self.consumers = collections.defaultdict(list)
 
     # These are overwritten in the legacy harness.
-    self.step_name = operation_name
-    self.metrics_container = MetricsContainer(self.step_name)
+    self.metrics_container = MetricsContainer(self.name_context.metrics_name())
     self.scoped_metrics_container = ScopedMetricsContainer(
         self.metrics_container)
 
     self.state_sampler = state_sampler
     self.scoped_start_state = self.state_sampler.scoped_state(
-        self.operation_name, 'start')
+        self.name_context.metrics_name(), 'start')
     self.scoped_process_state = self.state_sampler.scoped_state(
-        self.operation_name, 'process')
+        self.name_context.metrics_name(), 'process')
     self.scoped_finish_state = self.state_sampler.scoped_state(
-        self.operation_name, 'finish')
+        self.name_context.metrics_name(), 'finish')
     # TODO(ccy): the '-abort' state can be added when the abort is supported in
     # Operations.
     self.receivers = []
@@ -142,9 +153,12 @@ class Operation(object):
         logging.DEBUG)
     # Everything except WorkerSideInputSource, which is not a
     # top-level operation, should have output_coders
+    #TODO(pabloem): Define better what step name is used here.
     if getattr(self.spec, 'output_coders', None):
-      self.receivers = [ConsumerSet(self.counter_factory, self.step_name,
-                                    i, self.consumers[i], coder)
+      self.receivers = [ConsumerSet(self.counter_factory,
+                                    self.name_context.logging_name(),
+                                    i,
+                                    self.consumers[i], coder)
                         for i, coder in enumerate(self.spec.output_coders)]
 
   def finish(self):
@@ -205,7 +219,7 @@ class Operation(object):
     """
     printable_name = self.__class__.__name__
     if hasattr(self, 'step_name'):
-      printable_name += ' %s' % self.step_name
+      printable_name += ' %s' % self.name_context.logging_name()
       if is_recursive:
         # If we have a step name, stop here, no more detail needed.
         return '<%s>' % printable_name
@@ -317,7 +331,7 @@ class DoOperation(Operation):
           si_counter = opcounters.SideInputReadCounter(
               self.counter_factory,
               self.state_sampler,
-              declaring_step=self.operation_name,
+              declaring_step=self.name_context.step_name,
               # Inputs are 1-indexed, so we add 1 to i in the side input id
               input_index=i + 1)
         else:
@@ -345,13 +359,13 @@ class DoOperation(Operation):
           pickler.loads(self.spec.serialized_fn))
 
       state = common.DoFnState(self.counter_factory)
-      state.step_name = self.step_name
+      state.step_name = self.name_context.logging_name()
 
       # Tag to output index map used to dispatch the side output values emitted
       # by the DoFn function to the appropriate receivers. The main output is
       # tagged with None and is associated with its corresponding index.
       self.tagged_receivers = _TaggedReceivers(
-          self.counter_factory, self.step_name)
+          self.counter_factory, self.name_context.logging_name())
 
       output_tag_prefix = PropertyNames.OUT + '_'
       for index, tag in enumerate(self.spec.output_tags):
@@ -372,9 +386,9 @@ class DoOperation(Operation):
       self.dofn_runner = common.DoFnRunner(
           fn, args, kwargs, self.side_input_maps, window_fn,
           tagged_receivers=self.tagged_receivers,
-          step_name=self.step_name,
+          step_name=self.name_context.logging_name(),
           logging_context=logger.PerThreadLoggingContext(
-              step_name=self.step_name),
+              step_name=self.name_context.logging_name()),
           state=state,
           scoped_metrics_container=self.scoped_metrics_container)
       self.dofn_receiver = (self.dofn_runner
@@ -413,9 +427,9 @@ class DoFnRunnerReceiver(Receiver):
 class CombineOperation(Operation):
   """A Combine operation executing a CombineFn for each input element."""
 
-  def __init__(self, operation_name, spec, counter_factory, state_sampler):
+  def __init__(self, name_context, spec, counter_factory, state_sampler):
     super(CombineOperation, self).__init__(
-        operation_name, spec, counter_factory, state_sampler)
+        name_context, spec, counter_factory, state_sampler)
     # Combiners do not accept deferred side-inputs (the ignored fourth 
argument)
     # and therefore the code to handle the extra args/kwargs is simpler than 
for
     # the DoFn's of ParDo.
@@ -450,9 +464,9 @@ class PGBKOperation(Operation):
   values in this bundle, memory permitting.
   """
 
-  def __init__(self, operation_name, spec, counter_factory, state_sampler):
+  def __init__(self, name_context, spec, counter_factory, state_sampler):
     super(PGBKOperation, self).__init__(
-        operation_name, spec, counter_factory, state_sampler)
+        name_context, spec, counter_factory, state_sampler)
     assert not self.spec.combine_fn
     self.table = collections.defaultdict(list)
     self.size = 0
@@ -486,9 +500,9 @@ class PGBKOperation(Operation):
 
 class PGBKCVOperation(Operation):
 
-  def __init__(self, operation_name, spec, counter_factory, state_sampler):
+  def __init__(self, name_context, spec, counter_factory, state_sampler):
     super(PGBKCVOperation, self).__init__(
-        operation_name, spec, counter_factory, state_sampler)
+        name_context, spec, counter_factory, state_sampler)
     # Combiners do not accept deferred side-inputs (the ignored fourth
     # argument) and therefore the code to handle the extra args/kwargs is
     # simpler than for the DoFn's of ParDo.
@@ -569,69 +583,72 @@ class FlattenOperation(Operation):
     self.output(o)
 
 
-def create_operation(operation_name, spec, counter_factory, step_name,
+def create_operation(name_context, spec, counter_factory, step_name,
                      state_sampler, test_shuffle_source=None,
                      test_shuffle_sink=None, is_streaming=False):
   """Create Operation object for given operation specification."""
+  if not isinstance(name_context, common.NameContext):
+    # TODO(BEAM-4028): Remove ad-hoc NameContext once all has been migrated.
+    name_context = common.DataflowNameContext(step_name=name_context,
+                                              user_name=step_name,
+                                              system_name=None)
+
   if isinstance(spec, operation_specs.WorkerRead):
     if isinstance(spec.source, iobase.SourceBundle):
       op = ReadOperation(
-          operation_name, spec, counter_factory, state_sampler)
+          name_context, spec, counter_factory, state_sampler)
     else:
       from dataflow_worker.native_operations import NativeReadOperation
       op = NativeReadOperation(
-          operation_name, spec, counter_factory, state_sampler)
+          name_context, spec, counter_factory, state_sampler)
   elif isinstance(spec, operation_specs.WorkerWrite):
     from dataflow_worker.native_operations import NativeWriteOperation
     op = NativeWriteOperation(
-        operation_name, spec, counter_factory, state_sampler)
+        name_context, spec, counter_factory, state_sampler)
   elif isinstance(spec, operation_specs.WorkerCombineFn):
     op = CombineOperation(
-        operation_name, spec, counter_factory, state_sampler)
+        name_context, spec, counter_factory, state_sampler)
   elif isinstance(spec, operation_specs.WorkerPartialGroupByKey):
-    op = create_pgbk_op(operation_name, spec, counter_factory, state_sampler)
+    op = create_pgbk_op(name_context, spec, counter_factory, state_sampler)
   elif isinstance(spec, operation_specs.WorkerDoFn):
-    op = DoOperation(operation_name, spec, counter_factory, state_sampler)
+    op = DoOperation(name_context, spec, counter_factory, state_sampler)
   elif isinstance(spec, operation_specs.WorkerGroupingShuffleRead):
     from dataflow_worker.shuffle_operations import GroupedShuffleReadOperation
     op = GroupedShuffleReadOperation(
-        operation_name, spec, counter_factory, state_sampler,
+        name_context, spec, counter_factory, state_sampler,
         shuffle_source=test_shuffle_source)
   elif isinstance(spec, operation_specs.WorkerUngroupedShuffleRead):
     from dataflow_worker.shuffle_operations import 
UngroupedShuffleReadOperation
     op = UngroupedShuffleReadOperation(
-        operation_name, spec, counter_factory, state_sampler,
+        name_context, spec, counter_factory, state_sampler,
         shuffle_source=test_shuffle_source)
   elif isinstance(spec, operation_specs.WorkerInMemoryWrite):
     op = InMemoryWriteOperation(
-        operation_name, spec, counter_factory, state_sampler)
+        name_context, spec, counter_factory, state_sampler)
   elif isinstance(spec, operation_specs.WorkerShuffleWrite):
     from dataflow_worker.shuffle_operations import ShuffleWriteOperation
     op = ShuffleWriteOperation(
-        operation_name, spec, counter_factory, state_sampler,
+        name_context, spec, counter_factory, state_sampler,
         shuffle_sink=test_shuffle_sink)
   elif isinstance(spec, operation_specs.WorkerFlatten):
     op = FlattenOperation(
-        operation_name, spec, counter_factory, state_sampler)
+        name_context, spec, counter_factory, state_sampler)
   elif isinstance(spec, operation_specs.WorkerMergeWindows):
     from dataflow_worker.shuffle_operations import 
BatchGroupAlsoByWindowsOperation
     from dataflow_worker.shuffle_operations import 
StreamingGroupAlsoByWindowsOperation
     if is_streaming:
       op = StreamingGroupAlsoByWindowsOperation(
-          operation_name, spec, counter_factory, state_sampler)
+          name_context, spec, counter_factory, state_sampler)
     else:
       op = BatchGroupAlsoByWindowsOperation(
-          operation_name, spec, counter_factory, state_sampler)
+          name_context, spec, counter_factory, state_sampler)
   elif isinstance(spec, operation_specs.WorkerReifyTimestampAndWindows):
     from dataflow_worker.shuffle_operations import 
ReifyTimestampAndWindowsOperation
     op = ReifyTimestampAndWindowsOperation(
-        operation_name, spec, counter_factory, state_sampler)
+        name_context, spec, counter_factory, state_sampler)
   else:
     raise TypeError('Expected an instance of operation_specs.Worker* class '
                     'instead of %s' % (spec,))
-  op.step_name = step_name
-  op.metrics_container = MetricsContainer(step_name)
-  op.scoped_metrics_container = ScopedMetricsContainer(op.metrics_container)
   return op
 
 
@@ -648,7 +665,9 @@ class SimpleMapTaskExecutor(object):
     """Initializes SimpleMapTaskExecutor.
 
     Args:
-      map_task: The map task we are to run.
+      map_task: The map task we are to run. The maptask contains a list of
+        operations, and aligned lists for step_names, original_names,
+        system_names of pipeline steps.
       counter_factory: The CounterFactory instance for the work item.
       state_sampler: The StateSampler tracking the execution step.
       test_shuffle_source: Used during tests for dependency injection into
@@ -682,14 +701,14 @@ class SimpleMapTaskExecutor(object):
     # The order of the elements is important because the inputs use
     # list indexes as references.
 
-    step_names = (
-        self._map_task.step_names or [None] * len(self._map_task.operations))
     for ix, spec in enumerate(self._map_task.operations):
       # This is used for logging and assigning names to counters.
-      operation_name = self._map_task.system_names[ix]
-      step_name = step_names[ix]
+      name_context = common.DataflowNameContext(
+          step_name=self._map_task.original_names[ix],
+          user_name=self._map_task.step_names[ix],
+          system_name=self._map_task.system_names[ix])
       op = create_operation(
-          operation_name, spec, self._counter_factory, step_name,
+          name_context, spec, self._counter_factory, None,
           self._state_sampler,
           test_shuffle_source=self._test_shuffle_source,
           test_shuffle_sink=self._test_shuffle_sink)

-- 
To stop receiving notification emails like this one, please contact
al...@apache.org.

Reply via email to