[ 
https://issues.apache.org/jira/browse/BEAM-4028?focusedWorklogId=95822&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95822
 ]

ASF GitHub Bot logged work on BEAM-4028:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Apr/18 03:33
            Start Date: 27/Apr/18 03:33
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #5135: [BEAM-4028] 
Transitioning MapTask objects to NameContext
URL: https://github.com/apache/beam/pull/5135
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index fbc137cec4f..1e879a844a7 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -56,7 +56,7 @@ def __ne__(self, other):
     return not self == other
 
   def __repr__(self):
-    return 'NameContext(%s)' % self.__dict__()
+    return 'NameContext(%s)' % self.__dict__
 
   def __hash__(self):
     return hash(self.step_name)
@@ -101,7 +101,7 @@ def __hash__(self):
     return hash((self.step_name, self.user_name, self.system_name))
 
   def __repr__(self):
-    return 'DataflowNameContext(%s)' % self.__dict__()
+    return 'DataflowNameContext(%s)' % self.__dict__
 
   def logging_name(self):
     """Stackdriver logging relies on user-given step names (e.g. Foo/Bar)."""
diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py 
b/sdks/python/apache_beam/runners/worker/operation_specs.py
index bdafbeaf44a..d38bc7788fa 100644
--- a/sdks/python/apache_beam/runners/worker/operation_specs.py
+++ b/sdks/python/apache_beam/runners/worker/operation_specs.py
@@ -24,6 +24,7 @@
 import collections
 
 from apache_beam import coders
+from apache_beam.runners import common
 
 # This module is experimental. No backwards-compatibility guarantees.
 
@@ -359,17 +360,57 @@ class MapTask(object):
     stage_name: The name of this map task execution stage.
     system_names: The system names of the step corresponding to each map task
       operation in the execution graph.
-    step_names: The names of the step corresponding to each map task operation.
+    step_names: The user-given names of the step corresponding to each map task
+      operation (e.g. Foo/Bar/ParDo).
     original_names: The internal name of a step in the original workflow graph.
+    name_contexts: A common.NameContext object containing name information
+      about a step.
   """
 
-  def __init__(
-      self, operations, stage_name, system_names, step_names, original_names):
+  def __init__(self, operations, stage_name,
+               system_names=None,
+               step_names=None,
+               original_names=None,
+               name_contexts=None):
     self.operations = operations
     self.stage_name = stage_name
-    self.system_names = system_names
-    self.step_names = step_names
-    self.original_names = original_names
+    # TODO(BEAM-4028): Remove arguments other than name_contexts.
+    self.name_contexts = name_contexts or self._make_name_contexts(
+        original_names, step_names, system_names)
+
+  @staticmethod
+  def _make_name_contexts(original_names, user_names, system_names):
+    # TODO(BEAM-4028): Remove method once map task relies on name contexts.
+    return [common.DataflowNameContext(step_name, user_name, system_name)
+            for step_name, user_name, system_name in zip(original_names,
+                                                         user_names,
+                                                         system_names)]
+
+  @property
+  def system_names(self):
+    """Returns a list containing the system names of steps.
+
+    A System name is the name of a step in the optimized Dataflow graph.
+    """
+    return [nc.system_name for nc in self.name_contexts]
+
+  @property
+  def original_names(self):
+    """Returns a list containing the original names of steps.
+
+    An original name is the internal name of a step in the Dataflow graph
+    (e.g. 's2').
+    """
+    return [nc.step_name for nc in self.name_contexts]
+
+  @property
+  def step_names(self):
+    """Returns a list containing the user names of steps.
+
+    In this context, a step name is the user-given name of a step in the
+    Dataflow graph (e.g. 's2').
+    """
+    return [nc.user_name for nc in self.name_contexts]
 
   def __str__(self):
     return '<%s %s steps=%s>' % (self.__class__.__name__, self.stage_name,
diff --git a/sdks/python/apache_beam/runners/worker/operations.py 
b/sdks/python/apache_beam/runners/worker/operations.py
index aac56402db8..b5a75a574f7 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -120,7 +120,6 @@ def __init__(self, name_context, spec, counter_factory, 
state_sampler):
       # sampling.
       self.name_context = name_context
     else:
-      logging.info('Creating namecontext within operation')
       self.name_context = common.NameContext(name_context)
 
     # TODO(BEAM-4028): Remove following two lines. Rely on name context.
@@ -701,13 +700,9 @@ def execute(self):
     # operations is a list of operation_specs.Worker* instances.
     # The order of the elements is important because the inputs use
     # list indexes as references.
-
-    for ix, spec in enumerate(self._map_task.operations):
+    for name_context, spec in zip(self._map_task.name_contexts,
+                                  self._map_task.operations):
       # This is used for logging and assigning names to counters.
-      name_context = common.DataflowNameContext(
-          step_name=self._map_task.original_names[ix],
-          user_name=self._map_task.step_names[ix],
-          system_name=self._map_task.system_names[ix])
       op = create_operation(
           name_context, spec, self._counter_factory, None,
           self._state_sampler,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 95822)
    Time Spent: 5h 10m  (was: 5h)

> Step / Operation naming should rely on a NameContext class
> ----------------------------------------------------------
>
>                 Key: BEAM-4028
>                 URL: https://issues.apache.org/jira/browse/BEAM-4028
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>            Priority: Major
>          Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Steps can have different names depending on the runner (stage, step, user, 
> system name...). 
> Depending on the needs of different components (operations, logging, metrics, 
> statesampling) these step names are passed around without a specific order.
> Instead, SDK should rely on `NameContext` objects that carry all the naming 
> information for a single step.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to