[ 
https://issues.apache.org/jira/browse/BEAM-4028?focusedWorklogId=90871&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90871
 ]

ASF GitHub Bot logged work on BEAM-4028:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Apr/18 16:29
            Start Date: 13/Apr/18 16:29
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #5043: [BEAM-4028] Adding 
NameContext to Python SDK.
URL: https://github.com/apache/beam/pull/5043
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 0bf5bac88b5..117ad3b6ef6 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -39,6 +39,76 @@
 from apache_beam.utils.windowed_value import WindowedValue
 
 
+class NameContext(object):
+  """Holds the name information for a step."""
+
+  def __init__(self, step_name):
+    """Creates a new step NameContext.
+
+    Args:
+      step_name: The name of the step.
+    """
+    self.step_name = step_name
+
+  def __eq__(self, other):
+    return self.step_name == other.step_name
+
+  def __ne__(self, other):
+    return not self == other
+
+  def __repr__(self):
+    return 'NameContext(%s)' % self.__dict__()
+
+  def __hash__(self):
+    return hash(self.step_name)
+
+  def metrics_name(self):
+    """Returns the step name used for metrics reporting."""
+    return self.step_name
+
+  def logging_name(self):
+    """Returns the step name used for logging."""
+    return self.step_name
+
+
+# TODO(BEAM-4028): Move DataflowNameContext to Dataflow internal code.
+class DataflowNameContext(NameContext):
+  """Holds the name information for a step in Dataflow.
+
+  This includes a step_name (e.g. s2), a user_name (e.g. Foo/Bar/ParDo(Fab)),
+  and a system_name (e.g. s2-shuffle-read34)."""
+
+  def __init__(self, step_name, user_name, system_name):
+    """Creates a new step NameContext.
+
+    Args:
+      step_name: The internal name of the step (e.g. s2).
+      user_name: The full user-given name of the step (e.g. 
Foo/Bar/ParDo(Far)).
+      system_name: The step name in the optimized graph (e.g. s2-1).
+    """
+    super(DataflowNameContext, self).__init__(step_name)
+    self.user_name = user_name
+    self.system_name = system_name
+
+  def __eq__(self, other):
+    return (self.step_name == other.step_name and
+            self.user_name == other.user_name and
+            self.system_name == other.system_name)
+
+  def __ne__(self, other):
+    return not self == other
+
+  def __hash__(self):
+    return hash((self.step_name, self.user_name, self.system_name))
+
+  def __repr__(self):
+    return 'DataflowNameContext(%s)' % self.__dict__()
+
+  def logging_name(self):
+    """Stackdriver logging relies on user-given step names (e.g. Foo/Bar)."""
+    return self.user_name
+
+
 class LoggingContext(object):
   """For internal use only; no backwards-compatibility guarantees."""
 
diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd 
b/sdks/python/apache_beam/runners/worker/operations.pxd
index cb05c90d242..0aee3371ff0 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -39,6 +39,7 @@ cdef class ConsumerSet(Receiver):
 
 
 cdef class Operation(object):
+  cdef readonly name_context
   cdef readonly operation_name
   cdef readonly spec
   cdef object consumers
diff --git a/sdks/python/apache_beam/runners/worker/operations.py 
b/sdks/python/apache_beam/runners/worker/operations.py
index 0fa32e3c997..977d4bba095 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -104,34 +104,45 @@ class Operation(object):
   one or more receiver operations that will take that as input.
   """
 
-  def __init__(self, operation_name, spec, counter_factory, state_sampler):
+  def __init__(self, name_context, spec, counter_factory, state_sampler):
     """Initializes a worker operation instance.
 
     Args:
-      operation_name: The system name assigned by the runner for this
-        operation.
+      name_context: A NameContext instance or string(deprecated), with the
+        name information for this operation.
       spec: A operation_specs.Worker* instance.
       counter_factory: The CounterFactory to use for our counters.
       state_sampler: The StateSampler for the current operation.
     """
-    self.operation_name = operation_name
+    if isinstance(name_context, common.NameContext):
+      # TODO(BEAM-4028): Clean this up once it's completely migrated.
+      # We use the specific operation name that is used for metrics and state
+      # sampling.
+      self.name_context = name_context
+    else:
+      logging.info('Creating namecontext within operation')
+      self.name_context = common.NameContext(name_context)
+
+    # TODO(BEAM-4028): Remove following two lines. Rely on name context.
+    self.operation_name = self.name_context.step_name
+    self.step_name = self.name_context.logging_name()
+
     self.spec = spec
     self.counter_factory = counter_factory
     self.consumers = collections.defaultdict(list)
 
     # These are overwritten in the legacy harness.
-    self.step_name = operation_name
-    self.metrics_container = MetricsContainer(self.step_name)
+    self.metrics_container = MetricsContainer(self.name_context.metrics_name())
     self.scoped_metrics_container = ScopedMetricsContainer(
         self.metrics_container)
 
     self.state_sampler = state_sampler
     self.scoped_start_state = self.state_sampler.scoped_state(
-        self.operation_name, 'start')
+        self.name_context.metrics_name(), 'start')
     self.scoped_process_state = self.state_sampler.scoped_state(
-        self.operation_name, 'process')
+        self.name_context.metrics_name(), 'process')
     self.scoped_finish_state = self.state_sampler.scoped_state(
-        self.operation_name, 'finish')
+        self.name_context.metrics_name(), 'finish')
     # TODO(ccy): the '-abort' state can be added when the abort is supported in
     # Operations.
     self.receivers = []
@@ -142,9 +153,12 @@ def start(self):
         logging.DEBUG)
     # Everything except WorkerSideInputSource, which is not a
     # top-level operation, should have output_coders
+    #TODO(pabloem): Define better what step name is used here.
     if getattr(self.spec, 'output_coders', None):
-      self.receivers = [ConsumerSet(self.counter_factory, self.step_name,
-                                    i, self.consumers[i], coder)
+      self.receivers = [ConsumerSet(self.counter_factory,
+                                    self.name_context.logging_name(),
+                                    i,
+                                    self.consumers[i], coder)
                         for i, coder in enumerate(self.spec.output_coders)]
 
   def finish(self):
@@ -205,7 +219,7 @@ def str_internal(self, is_recursive=False):
     """
     printable_name = self.__class__.__name__
     if hasattr(self, 'step_name'):
-      printable_name += ' %s' % self.step_name
+      printable_name += ' %s' % self.name_context.logging_name()
       if is_recursive:
         # If we have a step name, stop here, no more detail needed.
         return '<%s>' % printable_name
@@ -317,7 +331,7 @@ def _read_side_inputs(self, tags_and_types):
           si_counter = opcounters.SideInputReadCounter(
               self.counter_factory,
               self.state_sampler,
-              declaring_step=self.operation_name,
+              declaring_step=self.name_context.step_name,
               # Inputs are 1-indexed, so we add 1 to i in the side input id
               input_index=i + 1)
         else:
@@ -345,13 +359,13 @@ def start(self):
           pickler.loads(self.spec.serialized_fn))
 
       state = common.DoFnState(self.counter_factory)
-      state.step_name = self.step_name
+      state.step_name = self.name_context.logging_name()
 
       # Tag to output index map used to dispatch the side output values emitted
       # by the DoFn function to the appropriate receivers. The main output is
       # tagged with None and is associated with its corresponding index.
       self.tagged_receivers = _TaggedReceivers(
-          self.counter_factory, self.step_name)
+          self.counter_factory, self.name_context.logging_name())
 
       output_tag_prefix = PropertyNames.OUT + '_'
       for index, tag in enumerate(self.spec.output_tags):
@@ -372,9 +386,9 @@ def start(self):
       self.dofn_runner = common.DoFnRunner(
           fn, args, kwargs, self.side_input_maps, window_fn,
           tagged_receivers=self.tagged_receivers,
-          step_name=self.step_name,
+          step_name=self.name_context.logging_name(),
           logging_context=logger.PerThreadLoggingContext(
-              step_name=self.step_name),
+              step_name=self.name_context.logging_name()),
           state=state,
           scoped_metrics_container=self.scoped_metrics_container)
       self.dofn_receiver = (self.dofn_runner
@@ -413,9 +427,9 @@ def receive(self, windowed_value):
 class CombineOperation(Operation):
   """A Combine operation executing a CombineFn for each input element."""
 
-  def __init__(self, operation_name, spec, counter_factory, state_sampler):
+  def __init__(self, name_context, spec, counter_factory, state_sampler):
     super(CombineOperation, self).__init__(
-        operation_name, spec, counter_factory, state_sampler)
+        name_context, spec, counter_factory, state_sampler)
     # Combiners do not accept deferred side-inputs (the ignored fourth 
argument)
     # and therefore the code to handle the extra args/kwargs is simpler than 
for
     # the DoFn's of ParDo.
@@ -450,9 +464,9 @@ class PGBKOperation(Operation):
   values in this bundle, memory permitting.
   """
 
-  def __init__(self, operation_name, spec, counter_factory, state_sampler):
+  def __init__(self, name_context, spec, counter_factory, state_sampler):
     super(PGBKOperation, self).__init__(
-        operation_name, spec, counter_factory, state_sampler)
+        name_context, spec, counter_factory, state_sampler)
     assert not self.spec.combine_fn
     self.table = collections.defaultdict(list)
     self.size = 0
@@ -486,9 +500,9 @@ def flush(self, target):
 
 class PGBKCVOperation(Operation):
 
-  def __init__(self, operation_name, spec, counter_factory, state_sampler):
+  def __init__(self, name_context, spec, counter_factory, state_sampler):
     super(PGBKCVOperation, self).__init__(
-        operation_name, spec, counter_factory, state_sampler)
+        name_context, spec, counter_factory, state_sampler)
     # Combiners do not accept deferred side-inputs (the ignored fourth
     # argument) and therefore the code to handle the extra args/kwargs is
     # simpler than for the DoFn's of ParDo.
@@ -569,69 +583,72 @@ def process(self, o):
     self.output(o)
 
 
-def create_operation(operation_name, spec, counter_factory, step_name,
+def create_operation(name_context, spec, counter_factory, step_name,
                      state_sampler, test_shuffle_source=None,
                      test_shuffle_sink=None, is_streaming=False):
   """Create Operation object for given operation specification."""
+  if not isinstance(name_context, common.NameContext):
+    # TODO(BEAM-4028): Remove ad-hoc NameContext once all has been migrated.
+    name_context = common.DataflowNameContext(step_name=name_context,
+                                              user_name=step_name,
+                                              system_name=None)
+
   if isinstance(spec, operation_specs.WorkerRead):
     if isinstance(spec.source, iobase.SourceBundle):
       op = ReadOperation(
-          operation_name, spec, counter_factory, state_sampler)
+          name_context, spec, counter_factory, state_sampler)
     else:
       from dataflow_worker.native_operations import NativeReadOperation
       op = NativeReadOperation(
-          operation_name, spec, counter_factory, state_sampler)
+          name_context, spec, counter_factory, state_sampler)
   elif isinstance(spec, operation_specs.WorkerWrite):
     from dataflow_worker.native_operations import NativeWriteOperation
     op = NativeWriteOperation(
-        operation_name, spec, counter_factory, state_sampler)
+        name_context, spec, counter_factory, state_sampler)
   elif isinstance(spec, operation_specs.WorkerCombineFn):
     op = CombineOperation(
-        operation_name, spec, counter_factory, state_sampler)
+        name_context, spec, counter_factory, state_sampler)
   elif isinstance(spec, operation_specs.WorkerPartialGroupByKey):
-    op = create_pgbk_op(operation_name, spec, counter_factory, state_sampler)
+    op = create_pgbk_op(name_context, spec, counter_factory, state_sampler)
   elif isinstance(spec, operation_specs.WorkerDoFn):
-    op = DoOperation(operation_name, spec, counter_factory, state_sampler)
+    op = DoOperation(name_context, spec, counter_factory, state_sampler)
   elif isinstance(spec, operation_specs.WorkerGroupingShuffleRead):
     from dataflow_worker.shuffle_operations import GroupedShuffleReadOperation
     op = GroupedShuffleReadOperation(
-        operation_name, spec, counter_factory, state_sampler,
+        name_context, spec, counter_factory, state_sampler,
         shuffle_source=test_shuffle_source)
   elif isinstance(spec, operation_specs.WorkerUngroupedShuffleRead):
     from dataflow_worker.shuffle_operations import 
UngroupedShuffleReadOperation
     op = UngroupedShuffleReadOperation(
-        operation_name, spec, counter_factory, state_sampler,
+        name_context, spec, counter_factory, state_sampler,
         shuffle_source=test_shuffle_source)
   elif isinstance(spec, operation_specs.WorkerInMemoryWrite):
     op = InMemoryWriteOperation(
-        operation_name, spec, counter_factory, state_sampler)
+        name_context, spec, counter_factory, state_sampler)
   elif isinstance(spec, operation_specs.WorkerShuffleWrite):
     from dataflow_worker.shuffle_operations import ShuffleWriteOperation
     op = ShuffleWriteOperation(
-        operation_name, spec, counter_factory, state_sampler,
+        name_context, spec, counter_factory, state_sampler,
         shuffle_sink=test_shuffle_sink)
   elif isinstance(spec, operation_specs.WorkerFlatten):
     op = FlattenOperation(
-        operation_name, spec, counter_factory, state_sampler)
+        name_context, spec, counter_factory, state_sampler)
   elif isinstance(spec, operation_specs.WorkerMergeWindows):
     from dataflow_worker.shuffle_operations import 
BatchGroupAlsoByWindowsOperation
     from dataflow_worker.shuffle_operations import 
StreamingGroupAlsoByWindowsOperation
     if is_streaming:
       op = StreamingGroupAlsoByWindowsOperation(
-          operation_name, spec, counter_factory, state_sampler)
+          name_context, spec, counter_factory, state_sampler)
     else:
       op = BatchGroupAlsoByWindowsOperation(
-          operation_name, spec, counter_factory, state_sampler)
+          name_context, spec, counter_factory, state_sampler)
   elif isinstance(spec, operation_specs.WorkerReifyTimestampAndWindows):
     from dataflow_worker.shuffle_operations import 
ReifyTimestampAndWindowsOperation
     op = ReifyTimestampAndWindowsOperation(
-        operation_name, spec, counter_factory, state_sampler)
+        name_context, spec, counter_factory, state_sampler)
   else:
     raise TypeError('Expected an instance of operation_specs.Worker* class '
                     'instead of %s' % (spec,))
-  op.step_name = step_name
-  op.metrics_container = MetricsContainer(step_name)
-  op.scoped_metrics_container = ScopedMetricsContainer(op.metrics_container)
   return op
 
 
@@ -648,7 +665,9 @@ def __init__(
     """Initializes SimpleMapTaskExecutor.
 
     Args:
-      map_task: The map task we are to run.
+      map_task: The map task we are to run. The maptask contains a list of
+        operations, and aligned lists for step_names, original_names,
+        system_names of pipeline steps.
       counter_factory: The CounterFactory instance for the work item.
       state_sampler: The StateSampler tracking the execution step.
       test_shuffle_source: Used during tests for dependency injection into
@@ -682,14 +701,14 @@ def execute(self):
     # The order of the elements is important because the inputs use
     # list indexes as references.
 
-    step_names = (
-        self._map_task.step_names or [None] * len(self._map_task.operations))
     for ix, spec in enumerate(self._map_task.operations):
       # This is used for logging and assigning names to counters.
-      operation_name = self._map_task.system_names[ix]
-      step_name = step_names[ix]
+      name_context = common.DataflowNameContext(
+          step_name=self._map_task.original_names[ix],
+          user_name=self._map_task.step_names[ix],
+          system_name=self._map_task.system_names[ix])
       op = create_operation(
-          operation_name, spec, self._counter_factory, step_name,
+          name_context, spec, self._counter_factory, None,
           self._state_sampler,
           test_shuffle_source=self._test_shuffle_source,
           test_shuffle_sink=self._test_shuffle_sink)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 90871)
    Time Spent: 3h 10m  (was: 3h)

> Step / Operation naming should rely on a NameContext class
> ----------------------------------------------------------
>
>                 Key: BEAM-4028
>                 URL: https://issues.apache.org/jira/browse/BEAM-4028
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>            Priority: Major
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Steps can have different names depending on the runner (stage, step, user, 
> system name...). 
> Depending on the needs of different components (operations, logging, metrics, 
> statesampling) these step names are passed around without a specific order.
> Instead, SDK should rely on `NameContext` objects that carry all the naming 
> information for a single step.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to