This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f063b15 Logging relies on StateSampler for context
f063b15 is described below
commit f063b157eea480d079c4e966e528eef050a0c192
Author: Pablo <[email protected]>
AuthorDate: Mon May 14 11:29:21 2018 -0700
Logging relies on StateSampler for context
---
sdks/python/apache_beam/runners/common.pxd | 12 -------
sdks/python/apache_beam/runners/common.py | 29 ++--------------
.../apache_beam/runners/worker/bundle_processor.py | 11 +++---
sdks/python/apache_beam/runners/worker/logger.pxd | 25 --------------
sdks/python/apache_beam/runners/worker/logger.py | 17 ++++++----
.../apache_beam/runners/worker/logger_test.py | 39 ++++++++++++++--------
.../apache_beam/runners/worker/operation_specs.py | 2 +-
.../apache_beam/runners/worker/operations.py | 18 +++-------
.../apache_beam/runners/worker/statesampler.py | 31 +++++++++++++++--
.../runners/worker/statesampler_fast.pxd | 4 ++-
.../runners/worker/statesampler_fast.pyx | 21 +++++++++---
.../runners/worker/statesampler_slow.py | 20 +++++++----
12 files changed, 109 insertions(+), 120 deletions(-)
diff --git a/sdks/python/apache_beam/runners/common.pxd
b/sdks/python/apache_beam/runners/common.pxd
index 5c5eba2..4bb2264 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -81,9 +81,7 @@ cdef class PerWindowInvoker(DoFnInvoker):
cdef class DoFnRunner(Receiver):
cdef DoFnContext context
- cdef LoggingContext logging_context
cdef object step_name
- cdef ScopedMetricsContainer scoped_metrics_container
cdef list side_inputs
cdef DoFnInvoker do_fn_invoker
@@ -112,15 +110,5 @@ cdef class DoFnContext(object):
cpdef set_element(self, WindowedValue windowed_value)
-cdef class LoggingContext(object):
- # TODO(robertwb): Optimize "with [cdef class]"
- cpdef enter(self)
- cpdef exit(self)
-
-
-cdef class _LoggingContextAdapter(LoggingContext):
- cdef object underlying
-
-
cdef class _ReceiverAdapter(Receiver):
cdef object underlying
diff --git a/sdks/python/apache_beam/runners/common.py
b/sdks/python/apache_beam/runners/common.py
index d5f35de..88745c7 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -119,16 +119,6 @@ class DataflowNameContext(NameContext):
return self.user_name
-class LoggingContext(object):
- """For internal use only; no backwards-compatibility guarantees."""
-
- def enter(self):
- pass
-
- def exit(self):
- pass
-
-
class Receiver(object):
"""For internal use only; no backwards-compatibility guarantees.
@@ -551,20 +541,15 @@ class DoFnRunner(Receiver):
windowing: windowing properties of the output PCollection(s)
tagged_receivers: a dict of tag name to Receiver objects
step_name: the name of this step
- logging_context: a LoggingContext object
+ logging_context: DEPRECATED [BEAM-4728]
state: handle for accessing DoFn state
- scoped_metrics_container: Context switcher for metrics container
+ scoped_metrics_container: DEPRECATED
operation_name: The system name assigned by the runner for this
operation.
"""
# Need to support multiple iterations.
side_inputs = list(side_inputs)
- from apache_beam.metrics.execution import ScopedMetricsContainer
-
- self.scoped_metrics_container = (
- scoped_metrics_container or ScopedMetricsContainer())
self.step_name = step_name
- self.logging_context = logging_context or LoggingContext()
self.context = DoFnContext(step_name, state=state)
do_fn_signature = DoFnSignature(fn)
@@ -595,26 +580,16 @@ class DoFnRunner(Receiver):
def process(self, windowed_value):
try:
- self.logging_context.enter()
- self.scoped_metrics_container.enter()
self.do_fn_invoker.invoke_process(windowed_value)
except BaseException as exn:
self._reraise_augmented(exn)
- finally:
- self.scoped_metrics_container.exit()
- self.logging_context.exit()
def _invoke_bundle_method(self, bundle_method):
try:
- self.logging_context.enter()
- self.scoped_metrics_container.enter()
self.context.set_element(None)
bundle_method()
except BaseException as exn:
self._reraise_augmented(exn)
- finally:
- self.scoped_metrics_container.exit()
- self.logging_context.exit()
def start(self):
self._invoke_bundle_method(self.do_fn_invoker.invoke_start_bundle)
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 4193ea2..958731d 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -63,13 +63,12 @@ OLD_DATAFLOW_RUNNER_HARNESS_READ_URN =
'urn:org.apache.beam:source:java:0.1'
class RunnerIOOperation(operations.Operation):
"""Common baseclass for runner harness IO operations."""
- def __init__(self, operation_name, step_name, consumers, counter_factory,
+ def __init__(self, name_context, step_name, consumers, counter_factory,
state_sampler, windowed_coder, target, data_channel):
super(RunnerIOOperation, self).__init__(
- operation_name, None, counter_factory, state_sampler)
+ name_context, None, counter_factory, state_sampler)
self.windowed_coder = windowed_coder
self.windowed_coder_impl = windowed_coder.get_impl()
- self.step_name = step_name
# target represents the consumer for the bytes in the data plane for a
# DataInputOperation or a producer of these bytes for a
DataOutputOperation.
self.target = target
@@ -106,9 +105,9 @@ class DataInputOperation(RunnerIOOperation):
windowed_coder, target=input_target, data_channel=data_channel)
# We must do this manually as we don't have a spec or spec.output_coders.
self.receivers = [
- operations.ConsumerSet(self.counter_factory, self.step_name, 0,
- next(itervalues(consumers)),
- self.windowed_coder)]
+ operations.ConsumerSet(
+ self.counter_factory, self.name_context.step_name, 0,
+ next(itervalues(consumers)), self.windowed_coder)]
def process(self, windowed_value):
self.output(windowed_value)
diff --git a/sdks/python/apache_beam/runners/worker/logger.pxd
b/sdks/python/apache_beam/runners/worker/logger.pxd
deleted file mode 100644
index 201daf4..0000000
--- a/sdks/python/apache_beam/runners/worker/logger.pxd
+++ /dev/null
@@ -1,25 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-cimport cython
-
-from apache_beam.runners.common cimport LoggingContext
-
-
-cdef class PerThreadLoggingContext(LoggingContext):
- cdef kwargs
- cdef list stack
diff --git a/sdks/python/apache_beam/runners/worker/logger.py
b/sdks/python/apache_beam/runners/worker/logger.py
index 07cd320..ae9cdd3 100644
--- a/sdks/python/apache_beam/runners/worker/logger.py
+++ b/sdks/python/apache_beam/runners/worker/logger.py
@@ -26,7 +26,7 @@ import logging
import threading
import traceback
-from apache_beam.runners.common import LoggingContext
+from apache_beam.runners.worker import statesampler
# This module is experimental. No backwards-compatibility guarantees.
@@ -38,7 +38,6 @@ class _PerThreadWorkerData(threading.local):
def __init__(self):
super(_PerThreadWorkerData, self).__init__()
- # TODO(robertwb): Consider starting with an initial (ignored) ~20 elements
# in the list, as going up and down all the way to zero incurs several
# reallocations.
self.stack = []
@@ -53,7 +52,7 @@ class _PerThreadWorkerData(threading.local):
per_thread_worker_data = _PerThreadWorkerData()
-class PerThreadLoggingContext(LoggingContext):
+class PerThreadLoggingContext(object):
"""A context manager to add per thread attributes."""
def __init__(self, **kwargs):
@@ -150,10 +149,14 @@ class JsonLogFormatter(logging.Formatter):
data = per_thread_worker_data.get_data()
if 'work_item_id' in data:
output['work'] = data['work_item_id']
- if 'stage_name' in data:
- output['stage'] = data['stage_name']
- if 'step_name' in data:
- output['step'] = data['step_name']
+
+ tracker = statesampler.get_current_tracker()
+ if tracker:
+ output['stage'] = tracker.stage_name
+
+ if tracker.current_state() and tracker.current_state().name_context:
+ output['step'] = tracker.current_state().name_context.logging_name()
+
# All logging happens using the root logger. We will add the basename of
the
# file and the function name where the logging happened to make it easier
# to identify who generated the record.
diff --git a/sdks/python/apache_beam/runners/worker/logger_test.py
b/sdks/python/apache_beam/runners/worker/logger_test.py
index 73ec1aa..c131775 100644
--- a/sdks/python/apache_beam/runners/worker/logger_test.py
+++ b/sdks/python/apache_beam/runners/worker/logger_test.py
@@ -18,6 +18,7 @@
"""Tests for worker logging utilities."""
from __future__ import absolute_import
+from __future__ import unicode_literals
import json
import logging
@@ -27,6 +28,8 @@ import unittest
from builtins import object
from apache_beam.runners.worker import logger
+from apache_beam.runners.worker import statesampler
+from apache_beam.utils.counters import CounterFactory
class PerThreadLoggingContextTest(unittest.TestCase):
@@ -129,30 +132,38 @@ class JsonLogFormatterTest(unittest.TestCase):
self.execute_multiple_cases(test_cases)
def test_record_with_per_thread_info(self):
- with logger.PerThreadLoggingContext(
- work_item_id='workitem', stage_name='stage', step_name='step'):
- formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid')
- record = self.create_log_record(**self.SAMPLE_RECORD)
- log_output = json.loads(formatter.format(record))
+ self.maxDiff = None
+ tracker = statesampler.StateSampler('stage', CounterFactory())
+ statesampler.set_current_tracker(tracker)
+ formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid')
+ with logger.PerThreadLoggingContext(work_item_id='workitem'):
+ with tracker.scoped_state('step', 'process'):
+ record = self.create_log_record(**self.SAMPLE_RECORD)
+ log_output = json.loads(formatter.format(record))
expected_output = dict(self.SAMPLE_OUTPUT)
expected_output.update(
{'work': 'workitem', 'stage': 'stage', 'step': 'step'})
self.assertEqual(log_output, expected_output)
+ statesampler.set_current_tracker(None)
def test_nested_with_per_thread_info(self):
+ self.maxDiff = None
+ tracker = statesampler.StateSampler('stage', CounterFactory())
+ statesampler.set_current_tracker(tracker)
formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid')
- with logger.PerThreadLoggingContext(
- work_item_id='workitem', stage_name='stage', step_name='step1'):
- record = self.create_log_record(**self.SAMPLE_RECORD)
- log_output1 = json.loads(formatter.format(record))
-
- with logger.PerThreadLoggingContext(step_name='step2'):
+ with logger.PerThreadLoggingContext(work_item_id='workitem'):
+ with tracker.scoped_state('step1', 'process'):
record = self.create_log_record(**self.SAMPLE_RECORD)
- log_output2 = json.loads(formatter.format(record))
+ log_output1 = json.loads(formatter.format(record))
- record = self.create_log_record(**self.SAMPLE_RECORD)
- log_output3 = json.loads(formatter.format(record))
+ with tracker.scoped_state('step2', 'process'):
+ record = self.create_log_record(**self.SAMPLE_RECORD)
+ log_output2 = json.loads(formatter.format(record))
+
+ record = self.create_log_record(**self.SAMPLE_RECORD)
+ log_output3 = json.loads(formatter.format(record))
+ statesampler.set_current_tracker(None)
record = self.create_log_record(**self.SAMPLE_RECORD)
log_output4 = json.loads(formatter.format(record))
diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py
b/sdks/python/apache_beam/runners/worker/operation_specs.py
index 58ba571..d64920f 100644
--- a/sdks/python/apache_beam/runners/worker/operation_specs.py
+++ b/sdks/python/apache_beam/runners/worker/operation_specs.py
@@ -378,9 +378,9 @@ class MapTask(object):
step_names=None,
original_names=None,
name_contexts=None):
+ # TODO(BEAM-4028): Remove arguments other than name_contexts.
self.operations = operations
self.stage_name = stage_name
- # TODO(BEAM-4028): Remove arguments other than name_contexts.
self.name_contexts = name_contexts or self._make_name_contexts(
original_names, step_names, system_names)
diff --git a/sdks/python/apache_beam/runners/worker/operations.py
b/sdks/python/apache_beam/runners/worker/operations.py
index 143974e..78a67bc 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -38,7 +38,6 @@ from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.runners import common
from apache_beam.runners.common import Receiver
from apache_beam.runners.dataflow.internal.names import PropertyNames
-from apache_beam.runners.worker import logger
from apache_beam.runners.worker import opcounters
from apache_beam.runners.worker import operation_specs
from apache_beam.runners.worker import sideinputs
@@ -127,10 +126,6 @@ class Operation(object):
else:
self.name_context = common.NameContext(name_context)
- # TODO(BEAM-4028): Remove following two lines. Rely on name context.
- self.operation_name = self.name_context.step_name
- self.step_name = self.name_context.logging_name()
-
self.spec = spec
self.counter_factory = counter_factory
self.consumers = collections.defaultdict(list)
@@ -143,14 +138,11 @@ class Operation(object):
self.state_sampler = state_sampler
self.scoped_start_state = self.state_sampler.scoped_state(
- self.name_context.metrics_name(), 'start',
- metrics_container=self.metrics_container)
+ self.name_context, 'start', metrics_container=self.metrics_container)
self.scoped_process_state = self.state_sampler.scoped_state(
- self.name_context.metrics_name(), 'process',
- metrics_container=self.metrics_container)
+ self.name_context, 'process', metrics_container=self.metrics_container)
self.scoped_finish_state = self.state_sampler.scoped_state(
- self.name_context.metrics_name(), 'finish',
- metrics_container=self.metrics_container)
+ self.name_context, 'finish', metrics_container=self.metrics_container)
# TODO(ccy): the '-abort' state can be added when the abort is supported in
# Operations.
self.receivers = []
@@ -390,11 +382,9 @@ class DoOperation(Operation):
fn, args, kwargs, self.side_input_maps, window_fn,
tagged_receivers=self.tagged_receivers,
step_name=self.name_context.logging_name(),
- logging_context=logger.PerThreadLoggingContext(
- step_name=self.name_context.logging_name()),
state=state,
- scoped_metrics_container=None,
operation_name=self.name_context.metrics_name())
+
self.dofn_receiver = (self.dofn_runner
if isinstance(self.dofn_runner, Receiver)
else DoFnRunnerReceiver(self.dofn_runner))
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py
b/sdks/python/apache_beam/runners/worker/statesampler.py
index b0c2b67..b73029c 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler.py
@@ -22,6 +22,7 @@ from __future__ import absolute_import
import threading
from collections import namedtuple
+from apache_beam.runners import common
from apache_beam.utils.counters import Counter
from apache_beam.utils.counters import CounterName
@@ -69,8 +70,18 @@ class StateSampler(statesampler_impl.StateSampler):
self._states_by_name = {}
self.sampling_period_ms = sampling_period_ms
self.tracked_thread = None
+ self.finished = False
+ self.started = False
super(StateSampler, self).__init__(sampling_period_ms)
+ @property
+ def stage_name(self):
+ return self._prefix
+
+ def stop(self):
+ set_current_tracker(None)
+ super(StateSampler, self).stop()
+
def stop_if_still_running(self):
if self.started and not self.finished:
self.stop()
@@ -90,13 +101,28 @@ class StateSampler(statesampler_impl.StateSampler):
self.tracked_thread)
def scoped_state(self,
- step_name,
+ name_context,
state_name,
io_target=None,
metrics_container=None):
+ """Returns a ScopedState object associated to a Step and a State.
+
+ Args:
+ name_context: common.NameContext. It is the step name information.
+ state_name: str. It is the state name (e.g. process / start / finish).
+ io_target:
+ metrics_container: MetricsContainer. The step's metrics container.
+
+ Returns:
+ A ScopedState that keeps the execution context and is able to switch it
+ for the execution thread.
+ """
+ if not isinstance(name_context, common.NameContext):
+ name_context = common.NameContext(name_context)
+
counter_name = CounterName(state_name + '-msecs',
stage_name=self._prefix,
- step_name=step_name,
+ step_name=name_context.metrics_name(),
io_target=io_target)
if counter_name in self._states_by_name:
return self._states_by_name[counter_name]
@@ -105,6 +131,7 @@ class StateSampler(statesampler_impl.StateSampler):
Counter.SUM)
self._states_by_name[counter_name] = super(
StateSampler, self)._scoped_state(counter_name,
+ name_context,
output_counter,
metrics_container)
return self._states_by_name[counter_name]
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd
b/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd
index 76b379b..799bd0d 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd
+++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd
@@ -43,7 +43,8 @@ cdef class StateSampler(object):
cdef int32_t current_state_index
- cpdef _scoped_state(self, counter_name, output_counter, metrics_container)
+ cpdef _scoped_state(
+ self, counter_name, name_context, output_counter, metrics_container)
cdef class ScopedState(object):
"""Context manager class managing transitions for a given sampler state."""
@@ -52,6 +53,7 @@ cdef class ScopedState(object):
cdef readonly int32_t state_index
cdef readonly object counter
cdef readonly object name
+ cdef readonly object name_context
cdef readonly int64_t _nsecs
cdef int32_t old_state_index
cdef readonly MetricsContainer _metrics_container
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
index fdf4969..8aa5217 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
+++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
@@ -90,8 +90,12 @@ cdef class StateSampler(object):
self.current_state_index = 0
self.time_since_transition = 0
self.state_transition_count = 0
- unknown_state = ScopedState(
- self, CounterName('unknown'), self.current_state_index)
+ unknown_state = ScopedState(self,
+ CounterName('unknown'),
+ None,
+ self.current_state_index,
+ None,
+ None)
pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
self.scoped_states_by_index = [unknown_state]
pythread.PyThread_release_lock(self.lock)
@@ -153,7 +157,7 @@ cdef class StateSampler(object):
def current_state(self):
return self.scoped_states_by_index[self.current_state_index]
- cpdef _scoped_state(self, counter_name, output_counter,
+ cpdef _scoped_state(self, counter_name, name_context, output_counter,
metrics_container):
"""Returns a context manager managing transitions for a given state.
Args:
@@ -168,6 +172,7 @@ cdef class StateSampler(object):
new_state_index = len(self.scoped_states_by_index)
scoped_state = ScopedState(self,
counter_name,
+ name_context,
new_state_index,
output_counter,
metrics_container)
@@ -183,10 +188,16 @@ cdef class StateSampler(object):
cdef class ScopedState(object):
"""Context manager class managing transitions for a given sampler state."""
- def __init__(
- self, sampler, name, state_index, counter=None, metrics_container=None):
+ def __init__(self,
+ sampler,
+ name,
+ step_name_context,
+ state_index,
+ counter,
+ metrics_container):
self.sampler = sampler
self.name = name
+ self.name_context = step_name_context
self.state_index = state_index
self.counter = counter
self._metrics_container = metrics_container
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_slow.py
b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
index 2f09d0e..4b1bf83 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_slow.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
@@ -21,15 +21,18 @@ from __future__ import absolute_import
from builtins import object
+from apache_beam.runners import common
+from apache_beam.utils import counters
+
class StateSampler(object):
def __init__(self, sampling_period_ms):
- self._state_stack = [ScopedState(None, self, None)]
+ self._state_stack = [ScopedState(self,
+ counters.CounterName('unknown'),
+ None)]
self.state_transition_count = 0
self.time_since_transition = 0
- self.started = False
- self.finished = False
def current_state(self):
"""Returns the current execution state.
@@ -40,9 +43,12 @@ class StateSampler(object):
def _scoped_state(self,
counter_name,
+ name_context,
output_counter,
metrics_container=None):
- return ScopedState(self, counter_name, output_counter, metrics_container)
+ assert isinstance(name_context, common.NameContext)
+ return ScopedState(
+ self, counter_name, name_context, output_counter, metrics_container)
def _enter_state(self, state):
self.state_transition_count += 1
@@ -57,14 +63,16 @@ class StateSampler(object):
pass
def stop(self):
- self.finished = True
+ pass
class ScopedState(object):
- def __init__(self, sampler, name, counter=None, metrics_container=None):
+ def __init__(self, sampler, name, step_name_context,
+ counter=None, metrics_container=None):
self.state_sampler = sampler
self.name = name
+ self.name_context = step_name_context
self.counter = counter
self.nsecs = 0
self.metrics_container = metrics_container