This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2a4d2ea [BEAM-4268] Improving the separation between Metrics API and
Execution (#5323)
2a4d2ea is described below
commit 2a4d2ea3b01802c2462c3f4afa9f004f6a7f57c8
Author: Pablo <[email protected]>
AuthorDate: Thu May 10 22:05:43 2018 -0700
[BEAM-4268] Improving the separation between Metrics API and Execution
(#5323)
* Improving metrics api - execution separation.
---
sdks/python/apache_beam/metrics/execution.py | 7 +------
sdks/python/apache_beam/metrics/metric.py | 9 +++++++++
sdks/python/apache_beam/metrics/metric_test.py | 2 ++
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 3 ++-
sdks/python/apache_beam/runners/direct/direct_runner.py | 3 ++-
sdks/python/apache_beam/runners/worker/statesampler.py | 2 --
6 files changed, 16 insertions(+), 10 deletions(-)
diff --git a/sdks/python/apache_beam/metrics/execution.py
b/sdks/python/apache_beam/metrics/execution.py
index 310faf6..cb0f071 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -36,6 +36,7 @@ from apache_beam.metrics.cells import CounterCell
from apache_beam.metrics.cells import DistributionCell
from apache_beam.metrics.cells import GaugeCell
from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.runners.worker import statesampler
class MetricKey(object):
@@ -149,12 +150,6 @@ class _MetricsEnvironment(object):
MetricsEnvironment = _MetricsEnvironment()
-def metrics_startup():
- """Initialize metrics context to run."""
- global statesampler # pylint: disable=global-variable-not-assigned
- from apache_beam.runners.worker import statesampler
-
-
class MetricsContainer(object):
"""Holds the metrics of a single step and a single bundle."""
def __init__(self, step_name):
diff --git a/sdks/python/apache_beam/metrics/metric.py
b/sdks/python/apache_beam/metrics/metric.py
index 99c435c..8b6c50f 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -93,7 +93,10 @@ class Metrics(object):
return Metrics.DelegatingGauge(MetricName(namespace, name))
class DelegatingCounter(Counter):
+ """Metrics Counter that Delegates functionality to MetricsEnvironment."""
+
def __init__(self, metric_name):
+ super(Metrics.DelegatingCounter, self).__init__()
self.metric_name = metric_name
def inc(self, n=1):
@@ -102,7 +105,10 @@ class Metrics(object):
container.get_counter(self.metric_name).inc(n)
class DelegatingDistribution(Distribution):
+ """Metrics Distribution Delegates functionality to MetricsEnvironment."""
+
def __init__(self, metric_name):
+ super(Metrics.DelegatingDistribution, self).__init__()
self.metric_name = metric_name
def update(self, value):
@@ -111,7 +117,10 @@ class Metrics(object):
container.get_distribution(self.metric_name).update(value)
class DelegatingGauge(Gauge):
+ """Metrics Gauge that Delegates functionality to MetricsEnvironment."""
+
def __init__(self, metric_name):
+ super(Metrics.DelegatingGauge, self).__init__()
self.metric_name = metric_name
def set(self, value):
diff --git a/sdks/python/apache_beam/metrics/metric_test.py
b/sdks/python/apache_beam/metrics/metric_test.py
index 385d270..84f6ae9 100644
--- a/sdks/python/apache_beam/metrics/metric_test.py
+++ b/sdks/python/apache_beam/metrics/metric_test.py
@@ -121,6 +121,7 @@ class MetricsTest(unittest.TestCase):
statesampler.set_current_tracker(sampler)
state1 = sampler.scoped_state('mystep', 'myState',
metrics_container=MetricsContainer('mystep'))
+ sampler.start()
with state1:
counter_ns = 'aCounterNamespace'
distro_ns = 'aDistributionNamespace'
@@ -144,6 +145,7 @@ class MetricsTest(unittest.TestCase):
self.assertEqual(
container.distributions[MetricName(distro_ns,
name)].get_cumulative(),
DistributionData(12, 2, 2, 10))
+ sampler.stop()
if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 6159410..66fe46a 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -39,7 +39,6 @@ from apache_beam.options.pipeline_options import
StandardOptions
from apache_beam.options.pipeline_options import TestOptions
from apache_beam.portability import common_urns
from apache_beam.pvalue import AsSideInput
-from apache_beam.runners.dataflow.dataflow_metrics import DataflowMetrics
from apache_beam.runners.dataflow.internal import names
from apache_beam.runners.dataflow.internal.clients import dataflow as
dataflow_api
from apache_beam.runners.dataflow.internal.names import PropertyNames
@@ -362,6 +361,8 @@ class DataflowRunner(PipelineRunner):
result = DataflowPipelineResult(
self.dataflow_client.create_job(self.job), self)
+ # TODO(BEAM-4274): Circular import runners-metrics. Requires refactoring.
+ from apache_beam.runners.dataflow.dataflow_metrics import DataflowMetrics
self._metrics = DataflowMetrics(self.dataflow_client, result, self.job)
result.metric_results = self._metrics
return result
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 510a4e6..bea54ff 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -32,7 +32,6 @@ import apache_beam as beam
from apache_beam import coders
from apache_beam import typehints
from apache_beam.internal.util import ArgumentPlaceholder
-from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.options.pipeline_options import DirectOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.value_provider import RuntimeValueProvider
@@ -357,6 +356,8 @@ class BundleBasedDirectRunner(PipelineRunner):
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
+ # TODO(BEAM-4274): Circular import runners-metrics. Requires refactoring.
+ from apache_beam.metrics.execution import MetricsEnvironment
MetricsEnvironment.set_metrics_supported(True)
logging.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py
b/sdks/python/apache_beam/runners/worker/statesampler.py
index 8a00079..f3916a2 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler.py
@@ -19,7 +19,6 @@
import threading
from collections import namedtuple
-from apache_beam.metrics import execution
from apache_beam.utils.counters import Counter
from apache_beam.utils.counters import CounterName
@@ -76,7 +75,6 @@ class StateSampler(statesampler_impl.StateSampler):
def start(self):
self.tracked_thread = threading.current_thread()
set_current_tracker(self)
- execution.metrics_startup()
super(StateSampler, self).start()
self.started = True
--
To stop receiving notification emails like this one, please contact
[email protected].