This is an automated email from the ASF dual-hosted git repository.

Abacn pushed a commit to branch revert-38749-py-disable-user-metrics-38746
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 56fd97f4f9a778e4e7bf4a9ec452190ab8d87352
Author: Yi Hu <[email protected]>
AuthorDate: Wed Jun 10 15:47:25 2026 -0400

    Revert "[Python] Honor disableCounterMetrics, disableStringSetMetrics, and 
di…"
    
    This reverts commit c01ceeab24c7cae7167dbc08b21529339659094f.
---
 CHANGES.md                                         |   1 -
 sdks/python/apache_beam/metrics/metric.py          |  56 +----------
 sdks/python/apache_beam/metrics/metric_test.py     | 104 ---------------------
 sdks/python/apache_beam/pipeline.py                |   2 -
 .../apache_beam/runners/worker/sdk_worker_main.py  |   2 -
 5 files changed, 4 insertions(+), 161 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ac8215f3549..698d88b01fa 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -69,7 +69,6 @@
 
 ## New Features / Improvements
 
-* X feature added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
 * (Java) Enabled state tag encoding v2 by default for new Dataflow Streaming 
Engine jobs. It can be disabled by passing 
`--experiments=disable_streaming_engine_state_tag_encoding_v2` or 
`--updateCompatibilityVersion=2.74.0` pipeline option. Note that the tag 
encoding version cannot change during a job update. Jobs using tag encoding v2 
(enabled by default for new jobs on 2.75.0+) cannot be downgraded to Beam 
versions prior to 2.73.0, as only versions 2.73.0 and later support tag 
encoding [...]
 
 ## Breaking Changes
diff --git a/sdks/python/apache_beam/metrics/metric.py 
b/sdks/python/apache_beam/metrics/metric.py
index a66eed640be..b15237cae02 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -46,13 +46,11 @@ from apache_beam.metrics.metricbase import Gauge
 from apache_beam.metrics.metricbase import Histogram
 from apache_beam.metrics.metricbase import MetricName
 from apache_beam.metrics.metricbase import StringSet
-from apache_beam.options.pipeline_options import DebugOptions
 
 if TYPE_CHECKING:
   from apache_beam.internal.metrics.metric import MetricLogger
   from apache_beam.metrics.execution import MetricKey
   from apache_beam.metrics.metricbase import Metric
-  from apache_beam.options.pipeline_options import PipelineOptions
   from apache_beam.utils.histogram import BucketType
 
 __all__ = ['Metrics', 'MetricsFilter', 'Lineage']
@@ -60,37 +58,6 @@ __all__ = ['Metrics', 'MetricsFilter', 'Lineage']
 _LOGGER = logging.getLogger(__name__)
 
 
-class MetricsFlag(object):
-  """Process-wide flags controlling which user metric kinds are emitted."""
-  counter_disabled = False
-  string_set_disabled = False
-  bounded_trie_disabled = False
-  _initialized = False
-
-  @classmethod
-  def set_default_pipeline_options(cls, options: 'PipelineOptions') -> None:
-    if cls._initialized:
-      return
-    debug_options = options.view_as(DebugOptions)
-    if debug_options.lookup_experiment('disableCounterMetrics'):
-      cls.counter_disabled = True
-      _LOGGER.info('Counter metrics are disabled.')
-    if debug_options.lookup_experiment('disableStringSetMetrics'):
-      cls.string_set_disabled = True
-      _LOGGER.info('StringSet metrics are disabled.')
-    if debug_options.lookup_experiment('disableBoundedTrieMetrics'):
-      cls.bounded_trie_disabled = True
-      _LOGGER.info('BoundedTrie metrics are disabled.')
-    cls._initialized = True
-
-  @classmethod
-  def reset(cls) -> None:
-    cls.counter_disabled = False
-    cls.string_set_disabled = False
-    cls.bounded_trie_disabled = False
-    cls._initialized = False
-
-
 class Metrics(object):
   """Lets users create/access metric objects during pipeline execution."""
   @staticmethod
@@ -237,17 +204,12 @@ class Metrics(object):
     def __init__(
         self, metric_name: MetricName, process_wide: bool = False) -> None:
       super().__init__(metric_name)
-      self._updater = MetricUpdater(
+      self.inc = MetricUpdater(  # type: ignore[method-assign]
           cells.CounterCell,
           metric_name,
           default_value=1,
           process_wide=process_wide)
 
-    def inc(self, n: int = 1) -> None:
-      if MetricsFlag.counter_disabled:
-        return
-      self._updater(n)
-
   class DelegatingDistribution(Distribution):
     """Metrics Distribution Delegates functionality to MetricsEnvironment."""
     def __init__(
@@ -269,23 +231,13 @@ class Metrics(object):
     """Metrics StringSet that Delegates functionality to MetricsEnvironment."""
     def __init__(self, metric_name: MetricName) -> None:
       super().__init__(metric_name)
-      self._updater = MetricUpdater(cells.StringSetCell, metric_name)
-
-    def add(self, value: str) -> None:
-      if MetricsFlag.string_set_disabled:
-        return
-      self._updater(value)
+      self.add = MetricUpdater(cells.StringSetCell, metric_name)  # type: 
ignore[method-assign]
 
   class DelegatingBoundedTrie(BoundedTrie):
-    """Metrics BoundedTrie that Delegates functionality to 
MetricsEnvironment."""
+    """Metrics StringSet that Delegates functionality to MetricsEnvironment."""
     def __init__(self, metric_name: MetricName) -> None:
       super().__init__(metric_name)
-      self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name)
-
-    def add(self, value) -> None:
-      if MetricsFlag.bounded_trie_disabled:
-        return
-      self._updater(value)
+      self.add = MetricUpdater(cells.BoundedTrieCell, metric_name)  # type: 
ignore[method-assign]
 
 
 class MetricResults(object):
diff --git a/sdks/python/apache_beam/metrics/metric_test.py 
b/sdks/python/apache_beam/metrics/metric_test.py
index 6937236a8aa..ae66200737b 100644
--- a/sdks/python/apache_beam/metrics/metric_test.py
+++ b/sdks/python/apache_beam/metrics/metric_test.py
@@ -32,9 +32,7 @@ from apache_beam.metrics.metric import Lineage
 from apache_beam.metrics.metric import MetricResults
 from apache_beam.metrics.metric import Metrics
 from apache_beam.metrics.metric import MetricsFilter
-from apache_beam.metrics.metric import MetricsFlag
 from apache_beam.metrics.metricbase import MetricName
-from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner
 from apache_beam.runners.worker import statesampler
 from apache_beam.testing.metric_result_matchers import DistributionMatcher
@@ -123,108 +121,6 @@ class MetricsTest(unittest.TestCase):
     with self.assertRaises(ValueError):
       Metrics.get_namespace(object())
 
-  def test_metrics_flag(self):
-    MetricsFlag.reset()
-    try:
-      self.assertFalse(MetricsFlag.counter_disabled)
-      self.assertFalse(MetricsFlag.string_set_disabled)
-      self.assertFalse(MetricsFlag.bounded_trie_disabled)
-
-      options = PipelineOptions(['--experiments=disableCounterMetrics'])
-      MetricsFlag.set_default_pipeline_options(options)
-      self.assertTrue(MetricsFlag.counter_disabled)
-      self.assertFalse(MetricsFlag.string_set_disabled)
-      self.assertFalse(MetricsFlag.bounded_trie_disabled)
-
-      MetricsFlag.reset()
-      options = PipelineOptions(['--experiments=disableStringSetMetrics'])
-      MetricsFlag.set_default_pipeline_options(options)
-      self.assertFalse(MetricsFlag.counter_disabled)
-      self.assertTrue(MetricsFlag.string_set_disabled)
-      self.assertFalse(MetricsFlag.bounded_trie_disabled)
-
-      MetricsFlag.reset()
-      options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
-      MetricsFlag.set_default_pipeline_options(options)
-      self.assertFalse(MetricsFlag.counter_disabled)
-      self.assertFalse(MetricsFlag.string_set_disabled)
-      self.assertTrue(MetricsFlag.bounded_trie_disabled)
-
-      MetricsFlag.reset()
-      options = PipelineOptions([
-          '--experiments=disableCounterMetrics',
-          '--experiments=disableStringSetMetrics',
-          '--experiments=disableBoundedTrieMetrics',
-      ])
-      MetricsFlag.set_default_pipeline_options(options)
-      self.assertTrue(MetricsFlag.counter_disabled)
-      self.assertTrue(MetricsFlag.string_set_disabled)
-      self.assertTrue(MetricsFlag.bounded_trie_disabled)
-    finally:
-      MetricsFlag.reset()
-
-  def test_disabled_counter_is_noop(self):
-    sampler = statesampler.StateSampler('', counters.CounterFactory())
-    statesampler.set_current_tracker(sampler)
-    state = sampler.scoped_state(
-        'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
-    MetricsFlag.reset()
-    try:
-      sampler.start()
-      with state:
-        container = MetricsEnvironment.current_container()
-        Metrics.counter('ns', 'baseline').inc()
-        self.assertEqual(len(container.metrics), 1)
-        options = PipelineOptions(['--experiments=disableCounterMetrics'])
-        MetricsFlag.set_default_pipeline_options(options)
-        Metrics.counter('ns', 'after_disable').inc()
-        Metrics.counter('ns', 'after_disable').inc(5)
-        Metrics.counter('ns', 'after_disable').dec()
-        self.assertEqual(len(container.metrics), 1)
-    finally:
-      sampler.stop()
-      MetricsFlag.reset()
-
-  def test_disabled_string_set_is_noop(self):
-    sampler = statesampler.StateSampler('', counters.CounterFactory())
-    statesampler.set_current_tracker(sampler)
-    state = sampler.scoped_state(
-        'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
-    MetricsFlag.reset()
-    try:
-      sampler.start()
-      with state:
-        container = MetricsEnvironment.current_container()
-        Metrics.string_set('ns', 'baseline').add('seed')
-        self.assertEqual(len(container.metrics), 1)
-        options = PipelineOptions(['--experiments=disableStringSetMetrics'])
-        MetricsFlag.set_default_pipeline_options(options)
-        Metrics.string_set('ns', 'after_disable').add('value')
-        self.assertEqual(len(container.metrics), 1)
-    finally:
-      sampler.stop()
-      MetricsFlag.reset()
-
-  def test_disabled_bounded_trie_is_noop(self):
-    sampler = statesampler.StateSampler('', counters.CounterFactory())
-    statesampler.set_current_tracker(sampler)
-    state = sampler.scoped_state(
-        'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
-    MetricsFlag.reset()
-    try:
-      sampler.start()
-      with state:
-        container = MetricsEnvironment.current_container()
-        Metrics.bounded_trie('ns', 'baseline').add(['a'])
-        self.assertEqual(len(container.metrics), 1)
-        options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
-        MetricsFlag.set_default_pipeline_options(options)
-        Metrics.bounded_trie('ns', 'after_disable').add(['a', 'b'])
-        self.assertEqual(len(container.metrics), 1)
-    finally:
-      sampler.stop()
-      MetricsFlag.reset()
-
   def test_counter_empty_name(self):
     with self.assertRaises(ValueError):
       Metrics.counter("namespace", "")
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 594660d9bea..750868f7443 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -73,7 +73,6 @@ from apache_beam import pvalue
 from apache_beam.coders import typecoders
 from apache_beam.internal import pickler
 from apache_beam.io.filesystems import FileSystems
-from apache_beam.metrics.metric import MetricsFlag
 from apache_beam.options.pipeline_options import CrossLanguageOptions
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import PipelineOptions
@@ -193,7 +192,6 @@ class Pipeline(HasDisplayData):
       self._options = PipelineOptions([])
 
     FileSystems.set_options(self._options)
-    MetricsFlag.set_default_pipeline_options(self._options)
 
     if runner is None:
       runner = self._options.view_as(StandardOptions).runner
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 58beda96d63..754a631eaf3 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -33,7 +33,6 @@ from google.protobuf import text_format
 
 from apache_beam.internal import pickler
 from apache_beam.io import filesystems
-from apache_beam.metrics import metric
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import PipelineOptions
@@ -124,7 +123,6 @@ def create_harness(environment, dry_run=False):
   RuntimeValueProvider.set_runtime_options(pipeline_options_dict)
   sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict)
   filesystems.FileSystems.set_options(sdk_pipeline_options)
-  metric.MetricsFlag.set_default_pipeline_options(sdk_pipeline_options)
   pickle_library = sdk_pipeline_options.view_as(SetupOptions).pickle_library
   pickler.set_library(pickle_library)
 

Reply via email to