This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 70a6d18 [BEAM-4028] Adding NameContext to Python SDK. (#5043) 70a6d18 is described below commit 70a6d18345e3794bd757206f5c3c2a42fa016ed4 Author: Pablo <pabl...@users.noreply.github.com> AuthorDate: Fri Apr 13 09:29:24 2018 -0700 [BEAM-4028] Adding NameContext to Python SDK. (#5043) Adding NameContext to Python SDK. --- sdks/python/apache_beam/runners/common.py | 70 +++++++++++++ .../apache_beam/runners/worker/operations.pxd | 1 + .../apache_beam/runners/worker/operations.py | 113 ++++++++++++--------- 3 files changed, 137 insertions(+), 47 deletions(-) diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index e3c768b..44f9083 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -39,6 +39,76 @@ from apache_beam.transforms.window import WindowFn from apache_beam.utils.windowed_value import WindowedValue +class NameContext(object): + """Holds the name information for a step.""" + + def __init__(self, step_name): + """Creates a new step NameContext. + + Args: + step_name: The name of the step. + """ + self.step_name = step_name + + def __eq__(self, other): + return self.step_name == other.step_name + + def __ne__(self, other): + return not self == other + + def __repr__(self): + return 'NameContext(%s)' % self.__dict__() + + def __hash__(self): + return hash(self.step_name) + + def metrics_name(self): + """Returns the step name used for metrics reporting.""" + return self.step_name + + def logging_name(self): + """Returns the step name used for logging.""" + return self.step_name + + +# TODO(BEAM-4028): Move DataflowNameContext to Dataflow internal code. +class DataflowNameContext(NameContext): + """Holds the name information for a step in Dataflow. + + This includes a step_name (e.g. s2), a user_name (e.g. Foo/Bar/ParDo(Fab)), + and a system_name (e.g. s2-shuffle-read34).""" + + def __init__(self, step_name, user_name, system_name): + """Creates a new step NameContext. + + Args: + step_name: The internal name of the step (e.g. s2). + user_name: The full user-given name of the step (e.g. Foo/Bar/ParDo(Far)). + system_name: The step name in the optimized graph (e.g. s2-1). + """ + super(DataflowNameContext, self).__init__(step_name) + self.user_name = user_name + self.system_name = system_name + + def __eq__(self, other): + return (self.step_name == other.step_name and + self.user_name == other.user_name and + self.system_name == other.system_name) + + def __ne__(self, other): + return not self == other + + def __hash__(self): + return hash((self.step_name, self.user_name, self.system_name)) + + def __repr__(self): + return 'DataflowNameContext(%s)' % self.__dict__() + + def logging_name(self): + """Stackdriver logging relies on user-given step names (e.g. Foo/Bar).""" + return self.user_name + + class LoggingContext(object): """For internal use only; no backwards-compatibility guarantees.""" diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd index cb05c90..0aee337 100644 --- a/sdks/python/apache_beam/runners/worker/operations.pxd +++ b/sdks/python/apache_beam/runners/worker/operations.pxd @@ -39,6 +39,7 @@ cdef class ConsumerSet(Receiver): cdef class Operation(object): + cdef readonly name_context cdef readonly operation_name cdef readonly spec cdef object consumers diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 0fa32e3..977d4bb 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -104,34 +104,45 @@ class Operation(object): one or more receiver operations that will take that as input. """ - def __init__(self, operation_name, spec, counter_factory, state_sampler): + def __init__(self, name_context, spec, counter_factory, state_sampler): """Initializes a worker operation instance. Args: - operation_name: The system name assigned by the runner for this - operation. + name_context: A NameContext instance or string(deprecated), with the + name information for this operation. spec: A operation_specs.Worker* instance. counter_factory: The CounterFactory to use for our counters. state_sampler: The StateSampler for the current operation. """ - self.operation_name = operation_name + if isinstance(name_context, common.NameContext): + # TODO(BEAM-4028): Clean this up once it's completely migrated. + # We use the specific operation name that is used for metrics and state + # sampling. + self.name_context = name_context + else: + logging.info('Creating namecontext within operation') + self.name_context = common.NameContext(name_context) + + # TODO(BEAM-4028): Remove following two lines. Rely on name context. + self.operation_name = self.name_context.step_name + self.step_name = self.name_context.logging_name() + self.spec = spec self.counter_factory = counter_factory self.consumers = collections.defaultdict(list) # These are overwritten in the legacy harness. - self.step_name = operation_name - self.metrics_container = MetricsContainer(self.step_name) + self.metrics_container = MetricsContainer(self.name_context.metrics_name()) self.scoped_metrics_container = ScopedMetricsContainer( self.metrics_container) self.state_sampler = state_sampler self.scoped_start_state = self.state_sampler.scoped_state( - self.operation_name, 'start') + self.name_context.metrics_name(), 'start') self.scoped_process_state = self.state_sampler.scoped_state( - self.operation_name, 'process') + self.name_context.metrics_name(), 'process') self.scoped_finish_state = self.state_sampler.scoped_state( - self.operation_name, 'finish') + self.name_context.metrics_name(), 'finish') # TODO(ccy): the '-abort' state can be added when the abort is supported in # Operations. self.receivers = [] @@ -142,9 +153,12 @@ class Operation(object): logging.DEBUG) # Everything except WorkerSideInputSource, which is not a # top-level operation, should have output_coders + #TODO(pabloem): Define better what step name is used here. if getattr(self.spec, 'output_coders', None): - self.receivers = [ConsumerSet(self.counter_factory, self.step_name, - i, self.consumers[i], coder) + self.receivers = [ConsumerSet(self.counter_factory, + self.name_context.logging_name(), + i, + self.consumers[i], coder) for i, coder in enumerate(self.spec.output_coders)] def finish(self): @@ -205,7 +219,7 @@ class Operation(object): """ printable_name = self.__class__.__name__ if hasattr(self, 'step_name'): - printable_name += ' %s' % self.step_name + printable_name += ' %s' % self.name_context.logging_name() if is_recursive: # If we have a step name, stop here, no more detail needed. return '<%s>' % printable_name @@ -317,7 +331,7 @@ class DoOperation(Operation): si_counter = opcounters.SideInputReadCounter( self.counter_factory, self.state_sampler, - declaring_step=self.operation_name, + declaring_step=self.name_context.step_name, # Inputs are 1-indexed, so we add 1 to i in the side input id input_index=i + 1) else: @@ -345,13 +359,13 @@ class DoOperation(Operation): pickler.loads(self.spec.serialized_fn)) state = common.DoFnState(self.counter_factory) - state.step_name = self.step_name + state.step_name = self.name_context.logging_name() # Tag to output index map used to dispatch the side output values emitted # by the DoFn function to the appropriate receivers. The main output is # tagged with None and is associated with its corresponding index. self.tagged_receivers = _TaggedReceivers( - self.counter_factory, self.step_name) + self.counter_factory, self.name_context.logging_name()) output_tag_prefix = PropertyNames.OUT + '_' for index, tag in enumerate(self.spec.output_tags): @@ -372,9 +386,9 @@ class DoOperation(Operation): self.dofn_runner = common.DoFnRunner( fn, args, kwargs, self.side_input_maps, window_fn, tagged_receivers=self.tagged_receivers, - step_name=self.step_name, + step_name=self.name_context.logging_name(), logging_context=logger.PerThreadLoggingContext( - step_name=self.step_name), + step_name=self.name_context.logging_name()), state=state, scoped_metrics_container=self.scoped_metrics_container) self.dofn_receiver = (self.dofn_runner @@ -413,9 +427,9 @@ class DoFnRunnerReceiver(Receiver): class CombineOperation(Operation): """A Combine operation executing a CombineFn for each input element.""" - def __init__(self, operation_name, spec, counter_factory, state_sampler): + def __init__(self, name_context, spec, counter_factory, state_sampler): super(CombineOperation, self).__init__( - operation_name, spec, counter_factory, state_sampler) + name_context, spec, counter_factory, state_sampler) # Combiners do not accept deferred side-inputs (the ignored fourth argument) # and therefore the code to handle the extra args/kwargs is simpler than for # the DoFn's of ParDo. @@ -450,9 +464,9 @@ class PGBKOperation(Operation): values in this bundle, memory permitting. """ - def __init__(self, operation_name, spec, counter_factory, state_sampler): + def __init__(self, name_context, spec, counter_factory, state_sampler): super(PGBKOperation, self).__init__( - operation_name, spec, counter_factory, state_sampler) + name_context, spec, counter_factory, state_sampler) assert not self.spec.combine_fn self.table = collections.defaultdict(list) self.size = 0 @@ -486,9 +500,9 @@ class PGBKOperation(Operation): class PGBKCVOperation(Operation): - def __init__(self, operation_name, spec, counter_factory, state_sampler): + def __init__(self, name_context, spec, counter_factory, state_sampler): super(PGBKCVOperation, self).__init__( - operation_name, spec, counter_factory, state_sampler) + name_context, spec, counter_factory, state_sampler) # Combiners do not accept deferred side-inputs (the ignored fourth # argument) and therefore the code to handle the extra args/kwargs is # simpler than for the DoFn's of ParDo. @@ -569,69 +583,72 @@ class FlattenOperation(Operation): self.output(o) -def create_operation(operation_name, spec, counter_factory, step_name, +def create_operation(name_context, spec, counter_factory, step_name, state_sampler, test_shuffle_source=None, test_shuffle_sink=None, is_streaming=False): """Create Operation object for given operation specification.""" + if not isinstance(name_context, common.NameContext): + # TODO(BEAM-4028): Remove ad-hoc NameContext once all has been migrated. + name_context = common.DataflowNameContext(step_name=name_context, + user_name=step_name, + system_name=None) + if isinstance(spec, operation_specs.WorkerRead): if isinstance(spec.source, iobase.SourceBundle): op = ReadOperation( - operation_name, spec, counter_factory, state_sampler) + name_context, spec, counter_factory, state_sampler) else: from dataflow_worker.native_operations import NativeReadOperation op = NativeReadOperation( - operation_name, spec, counter_factory, state_sampler) + name_context, spec, counter_factory, state_sampler) elif isinstance(spec, operation_specs.WorkerWrite): from dataflow_worker.native_operations import NativeWriteOperation op = NativeWriteOperation( - operation_name, spec, counter_factory, state_sampler) + name_context, spec, counter_factory, state_sampler) elif isinstance(spec, operation_specs.WorkerCombineFn): op = CombineOperation( - operation_name, spec, counter_factory, state_sampler) + name_context, spec, counter_factory, state_sampler) elif isinstance(spec, operation_specs.WorkerPartialGroupByKey): - op = create_pgbk_op(operation_name, spec, counter_factory, state_sampler) + op = create_pgbk_op(name_context, spec, counter_factory, state_sampler) elif isinstance(spec, operation_specs.WorkerDoFn): - op = DoOperation(operation_name, spec, counter_factory, state_sampler) + op = DoOperation(name_context, spec, counter_factory, state_sampler) elif isinstance(spec, operation_specs.WorkerGroupingShuffleRead): from dataflow_worker.shuffle_operations import GroupedShuffleReadOperation op = GroupedShuffleReadOperation( - operation_name, spec, counter_factory, state_sampler, + name_context, spec, counter_factory, state_sampler, shuffle_source=test_shuffle_source) elif isinstance(spec, operation_specs.WorkerUngroupedShuffleRead): from dataflow_worker.shuffle_operations import UngroupedShuffleReadOperation op = UngroupedShuffleReadOperation( - operation_name, spec, counter_factory, state_sampler, + name_context, spec, counter_factory, state_sampler, shuffle_source=test_shuffle_source) elif isinstance(spec, operation_specs.WorkerInMemoryWrite): op = InMemoryWriteOperation( - operation_name, spec, counter_factory, state_sampler) + name_context, spec, counter_factory, state_sampler) elif isinstance(spec, operation_specs.WorkerShuffleWrite): from dataflow_worker.shuffle_operations import ShuffleWriteOperation op = ShuffleWriteOperation( - operation_name, spec, counter_factory, state_sampler, + name_context, spec, counter_factory, state_sampler, shuffle_sink=test_shuffle_sink) elif isinstance(spec, operation_specs.WorkerFlatten): op = FlattenOperation( - operation_name, spec, counter_factory, state_sampler) + name_context, spec, counter_factory, state_sampler) elif isinstance(spec, operation_specs.WorkerMergeWindows): from dataflow_worker.shuffle_operations import BatchGroupAlsoByWindowsOperation from dataflow_worker.shuffle_operations import StreamingGroupAlsoByWindowsOperation if is_streaming: op = StreamingGroupAlsoByWindowsOperation( - operation_name, spec, counter_factory, state_sampler) + name_context, spec, counter_factory, state_sampler) else: op = BatchGroupAlsoByWindowsOperation( - operation_name, spec, counter_factory, state_sampler) + name_context, spec, counter_factory, state_sampler) elif isinstance(spec, operation_specs.WorkerReifyTimestampAndWindows): from dataflow_worker.shuffle_operations import ReifyTimestampAndWindowsOperation op = ReifyTimestampAndWindowsOperation( - operation_name, spec, counter_factory, state_sampler) + name_context, spec, counter_factory, state_sampler) else: raise TypeError('Expected an instance of operation_specs.Worker* class ' 'instead of %s' % (spec,)) - op.step_name = step_name - op.metrics_container = MetricsContainer(step_name) - op.scoped_metrics_container = ScopedMetricsContainer(op.metrics_container) return op @@ -648,7 +665,9 @@ class SimpleMapTaskExecutor(object): """Initializes SimpleMapTaskExecutor. Args: - map_task: The map task we are to run. + map_task: The map task we are to run. The maptask contains a list of + operations, and aligned lists for step_names, original_names, + system_names of pipeline steps. counter_factory: The CounterFactory instance for the work item. state_sampler: The StateSampler tracking the execution step. test_shuffle_source: Used during tests for dependency injection into @@ -682,14 +701,14 @@ class SimpleMapTaskExecutor(object): # The order of the elements is important because the inputs use # list indexes as references. - step_names = ( - self._map_task.step_names or [None] * len(self._map_task.operations)) for ix, spec in enumerate(self._map_task.operations): # This is used for logging and assigning names to counters. - operation_name = self._map_task.system_names[ix] - step_name = step_names[ix] + name_context = common.DataflowNameContext( + step_name=self._map_task.original_names[ix], + user_name=self._map_task.step_names[ix], + system_name=self._map_task.system_names[ix]) op = create_operation( - operation_name, spec, self._counter_factory, step_name, + name_context, spec, self._counter_factory, None, self._state_sampler, test_shuffle_source=self._test_shuffle_source, test_shuffle_sink=self._test_shuffle_sink) -- To stop receiving notification emails like this one, please contact al...@apache.org.