This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a4d2ea  [BEAM-4268] Improving the separation between Metrics API and 
Execution (#5323)
2a4d2ea is described below

commit 2a4d2ea3b01802c2462c3f4afa9f004f6a7f57c8
Author: Pablo <[email protected]>
AuthorDate: Thu May 10 22:05:43 2018 -0700

    [BEAM-4268] Improving the separation between Metrics API and Execution 
(#5323)
    
    * Improving metrics api - execution separation.
---
 sdks/python/apache_beam/metrics/execution.py                | 7 +------
 sdks/python/apache_beam/metrics/metric.py                   | 9 +++++++++
 sdks/python/apache_beam/metrics/metric_test.py              | 2 ++
 sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 3 ++-
 sdks/python/apache_beam/runners/direct/direct_runner.py     | 3 ++-
 sdks/python/apache_beam/runners/worker/statesampler.py      | 2 --
 6 files changed, 16 insertions(+), 10 deletions(-)

diff --git a/sdks/python/apache_beam/metrics/execution.py 
b/sdks/python/apache_beam/metrics/execution.py
index 310faf6..cb0f071 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -36,6 +36,7 @@ from apache_beam.metrics.cells import CounterCell
 from apache_beam.metrics.cells import DistributionCell
 from apache_beam.metrics.cells import GaugeCell
 from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.runners.worker import statesampler
 
 
 class MetricKey(object):
@@ -149,12 +150,6 @@ class _MetricsEnvironment(object):
 MetricsEnvironment = _MetricsEnvironment()
 
 
-def metrics_startup():
-  """Initialize metrics context to run."""
-  global statesampler  # pylint: disable=global-variable-not-assigned
-  from apache_beam.runners.worker import statesampler
-
-
 class MetricsContainer(object):
   """Holds the metrics of a single step and a single bundle."""
   def __init__(self, step_name):
diff --git a/sdks/python/apache_beam/metrics/metric.py 
b/sdks/python/apache_beam/metrics/metric.py
index 99c435c..8b6c50f 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -93,7 +93,10 @@ class Metrics(object):
     return Metrics.DelegatingGauge(MetricName(namespace, name))
 
   class DelegatingCounter(Counter):
+    """Metrics Counter that Delegates functionality to MetricsEnvironment."""
+
     def __init__(self, metric_name):
+      super(Metrics.DelegatingCounter, self).__init__()
       self.metric_name = metric_name
 
     def inc(self, n=1):
@@ -102,7 +105,10 @@ class Metrics(object):
         container.get_counter(self.metric_name).inc(n)
 
   class DelegatingDistribution(Distribution):
+    """Metrics Distribution Delegates functionality to MetricsEnvironment."""
+
     def __init__(self, metric_name):
+      super(Metrics.DelegatingDistribution, self).__init__()
       self.metric_name = metric_name
 
     def update(self, value):
@@ -111,7 +117,10 @@ class Metrics(object):
         container.get_distribution(self.metric_name).update(value)
 
   class DelegatingGauge(Gauge):
+    """Metrics Gauge that Delegates functionality to MetricsEnvironment."""
+
     def __init__(self, metric_name):
+      super(Metrics.DelegatingGauge, self).__init__()
       self.metric_name = metric_name
 
     def set(self, value):
diff --git a/sdks/python/apache_beam/metrics/metric_test.py 
b/sdks/python/apache_beam/metrics/metric_test.py
index 385d270..84f6ae9 100644
--- a/sdks/python/apache_beam/metrics/metric_test.py
+++ b/sdks/python/apache_beam/metrics/metric_test.py
@@ -121,6 +121,7 @@ class MetricsTest(unittest.TestCase):
     statesampler.set_current_tracker(sampler)
     state1 = sampler.scoped_state('mystep', 'myState',
                                   metrics_container=MetricsContainer('mystep'))
+    sampler.start()
     with state1:
       counter_ns = 'aCounterNamespace'
       distro_ns = 'aDistributionNamespace'
@@ -144,6 +145,7 @@ class MetricsTest(unittest.TestCase):
       self.assertEqual(
           container.distributions[MetricName(distro_ns, 
name)].get_cumulative(),
           DistributionData(12, 2, 2, 10))
+    sampler.stop()
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 6159410..66fe46a 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -39,7 +39,6 @@ from apache_beam.options.pipeline_options import 
StandardOptions
 from apache_beam.options.pipeline_options import TestOptions
 from apache_beam.portability import common_urns
 from apache_beam.pvalue import AsSideInput
-from apache_beam.runners.dataflow.dataflow_metrics import DataflowMetrics
 from apache_beam.runners.dataflow.internal import names
 from apache_beam.runners.dataflow.internal.clients import dataflow as 
dataflow_api
 from apache_beam.runners.dataflow.internal.names import PropertyNames
@@ -362,6 +361,8 @@ class DataflowRunner(PipelineRunner):
     result = DataflowPipelineResult(
         self.dataflow_client.create_job(self.job), self)
 
+    # TODO(BEAM-4274): Circular import runners-metrics. Requires refactoring.
+    from apache_beam.runners.dataflow.dataflow_metrics import DataflowMetrics
     self._metrics = DataflowMetrics(self.dataflow_client, result, self.job)
     result.metric_results = self._metrics
     return result
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py 
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 510a4e6..bea54ff 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -32,7 +32,6 @@ import apache_beam as beam
 from apache_beam import coders
 from apache_beam import typehints
 from apache_beam.internal.util import ArgumentPlaceholder
-from apache_beam.metrics.execution import MetricsEnvironment
 from apache_beam.options.pipeline_options import DirectOptions
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.value_provider import RuntimeValueProvider
@@ -357,6 +356,8 @@ class BundleBasedDirectRunner(PipelineRunner):
     pipeline.visit(visitor)
     clock = TestClock() if visitor.uses_test_stream else RealClock()
 
+    # TODO(BEAM-4274): Circular import runners-metrics. Requires refactoring.
+    from apache_beam.metrics.execution import MetricsEnvironment
     MetricsEnvironment.set_metrics_supported(True)
     logging.info('Running pipeline with DirectRunner.')
     self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py 
b/sdks/python/apache_beam/runners/worker/statesampler.py
index 8a00079..f3916a2 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler.py
@@ -19,7 +19,6 @@
 import threading
 from collections import namedtuple
 
-from apache_beam.metrics import execution
 from apache_beam.utils.counters import Counter
 from apache_beam.utils.counters import CounterName
 
@@ -76,7 +75,6 @@ class StateSampler(statesampler_impl.StateSampler):
   def start(self):
     self.tracked_thread = threading.current_thread()
     set_current_tracker(self)
-    execution.metrics_startup()
     super(StateSampler, self).start()
     self.started = True
 

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to