This is an automated email from the ASF dual-hosted git repository.

Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c01ceeab24c [Python] Honor disableCounterMetrics, 
disableStringSetMetrics, and disableBoundedTrieMetrics experiments (#38749)
c01ceeab24c is described below

commit c01ceeab24c7cae7167dbc08b21529339659094f
Author: Anurag Pappula <[email protected]>
AuthorDate: Thu Jun 4 22:53:44 2026 +0530

    [Python] Honor disableCounterMetrics, disableStringSetMetrics, and 
disableBoundedTrieMetrics experiments (#38749)
    
    * [Python] Honor disableCounterMetrics / disableStringSetMetrics / 
disableBoundedTrieMetrics experiments
    
    Mirrors the Java SDK Metrics.MetricsFlag behavior so high-throughput Python
    pipelines can opt out of user metric kinds that add pressure to metric
    backends.
    
    Adds a process-wide MetricsFlag singleton initialized once at worker harness
    startup from pipeline experiments. When a flag is set, the corresponding
    Delegating* class short-circuits its update path so no MetricCell is touched
    and no monitoring info is emitted.
    
    For #38746.
    
    * [Python] Add unit tests for MetricsFlag
    
    Ports Java MetricsTest.testMetricsFlag and adds three smoke tests confirming
    the disabled Counter / StringSet / BoundedTrie short-circuit without raising
    when called on a no-current-tracker path.
    
    For #38746.
    
    * [Python] Note disable*Metrics support in CHANGES.md
    
    For #38746.
    
    * [Python] Use public class attrs on MetricsFlag for hot-path reads
    
    Drops the classmethod getter wrapper around each disabled flag and exposes
    counter_disabled / string_set_disabled / bounded_trie_disabled directly as
    public class attributes. The gate runs on every metric update, so swapping
    a descriptor lookup + function call for a single attribute load matters in
    exactly the high-throughput pipelines these experiments are designed for.
    Idiomatic Python; java parity is about behavior, not internal API shape.
    
    Addresses review feedback.
    
    For #38746.
    
    * [Python] Address PR review: simplify MetricsFlag pydoc and reset in 
finally
    
    * [Python] Address PR review: assert no MetricCell created when disabled
    
    * [Python] Initialize MetricsFlag from Pipeline so in-process runners honor 
disable* experiments
    
    The previous init only ran in the gRPC SDK harness (sdk_worker_main),
    so DirectRunner and other in-process runners silently ignored the
    disableCounterMetrics / disableStringSetMetrics / disableBoundedTrieMetrics
    experiments. Initializing in Pipeline.__init__ matches the pattern of
    FileSystems.set_options at the same call site and mirrors the Java
    init point (PipelineRunner.run).
    
    ---------
    
    Co-authored-by: Yi Hu <[email protected]>
---
 CHANGES.md                                         |   1 +
 sdks/python/apache_beam/metrics/metric.py          |  56 ++++++++++-
 sdks/python/apache_beam/metrics/metric_test.py     | 104 +++++++++++++++++++++
 sdks/python/apache_beam/pipeline.py                |   2 +
 .../apache_beam/runners/worker/sdk_worker_main.py  |   2 +
 5 files changed, 161 insertions(+), 4 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 698d88b01fa..ac8215f3549 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -69,6 +69,7 @@
 
 ## New Features / Improvements
 
+* X feature added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
 * (Java) Enabled state tag encoding v2 by default for new Dataflow Streaming 
Engine jobs. It can be disabled by passing 
`--experiments=disable_streaming_engine_state_tag_encoding_v2` or 
`--updateCompatibilityVersion=2.74.0` pipeline option. Note that the tag 
encoding version cannot change during a job update. Jobs using tag encoding v2 
(enabled by default for new jobs on 2.75.0+) cannot be downgraded to Beam 
versions prior to 2.73.0, as only versions 2.73.0 and later support tag 
encoding [...]
 
 ## Breaking Changes
diff --git a/sdks/python/apache_beam/metrics/metric.py 
b/sdks/python/apache_beam/metrics/metric.py
index b15237cae02..a66eed640be 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -46,11 +46,13 @@ from apache_beam.metrics.metricbase import Gauge
 from apache_beam.metrics.metricbase import Histogram
 from apache_beam.metrics.metricbase import MetricName
 from apache_beam.metrics.metricbase import StringSet
+from apache_beam.options.pipeline_options import DebugOptions
 
 if TYPE_CHECKING:
   from apache_beam.internal.metrics.metric import MetricLogger
   from apache_beam.metrics.execution import MetricKey
   from apache_beam.metrics.metricbase import Metric
+  from apache_beam.options.pipeline_options import PipelineOptions
   from apache_beam.utils.histogram import BucketType
 
 __all__ = ['Metrics', 'MetricsFilter', 'Lineage']
@@ -58,6 +60,37 @@ __all__ = ['Metrics', 'MetricsFilter', 'Lineage']
 _LOGGER = logging.getLogger(__name__)
 
 
+class MetricsFlag(object):
+  """Process-wide flags controlling which user metric kinds are emitted."""
+  counter_disabled = False
+  string_set_disabled = False
+  bounded_trie_disabled = False
+  _initialized = False
+
+  @classmethod
+  def set_default_pipeline_options(cls, options: 'PipelineOptions') -> None:
+    if cls._initialized:
+      return
+    debug_options = options.view_as(DebugOptions)
+    if debug_options.lookup_experiment('disableCounterMetrics'):
+      cls.counter_disabled = True
+      _LOGGER.info('Counter metrics are disabled.')
+    if debug_options.lookup_experiment('disableStringSetMetrics'):
+      cls.string_set_disabled = True
+      _LOGGER.info('StringSet metrics are disabled.')
+    if debug_options.lookup_experiment('disableBoundedTrieMetrics'):
+      cls.bounded_trie_disabled = True
+      _LOGGER.info('BoundedTrie metrics are disabled.')
+    cls._initialized = True
+
+  @classmethod
+  def reset(cls) -> None:
+    cls.counter_disabled = False
+    cls.string_set_disabled = False
+    cls.bounded_trie_disabled = False
+    cls._initialized = False
+
+
 class Metrics(object):
   """Lets users create/access metric objects during pipeline execution."""
   @staticmethod
@@ -204,12 +237,17 @@ class Metrics(object):
     def __init__(
         self, metric_name: MetricName, process_wide: bool = False) -> None:
       super().__init__(metric_name)
-      self.inc = MetricUpdater(  # type: ignore[method-assign]
+      self._updater = MetricUpdater(
           cells.CounterCell,
           metric_name,
           default_value=1,
           process_wide=process_wide)
 
+    def inc(self, n: int = 1) -> None:
+      if MetricsFlag.counter_disabled:
+        return
+      self._updater(n)
+
   class DelegatingDistribution(Distribution):
     """Metrics Distribution Delegates functionality to MetricsEnvironment."""
     def __init__(
@@ -231,13 +269,23 @@ class Metrics(object):
     """Metrics StringSet that Delegates functionality to MetricsEnvironment."""
     def __init__(self, metric_name: MetricName) -> None:
       super().__init__(metric_name)
-      self.add = MetricUpdater(cells.StringSetCell, metric_name)  # type: 
ignore[method-assign]
+      self._updater = MetricUpdater(cells.StringSetCell, metric_name)
+
+    def add(self, value: str) -> None:
+      if MetricsFlag.string_set_disabled:
+        return
+      self._updater(value)
 
   class DelegatingBoundedTrie(BoundedTrie):
-    """Metrics StringSet that Delegates functionality to MetricsEnvironment."""
+    """Metrics BoundedTrie that Delegates functionality to 
MetricsEnvironment."""
     def __init__(self, metric_name: MetricName) -> None:
       super().__init__(metric_name)
-      self.add = MetricUpdater(cells.BoundedTrieCell, metric_name)  # type: 
ignore[method-assign]
+      self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name)
+
+    def add(self, value) -> None:
+      if MetricsFlag.bounded_trie_disabled:
+        return
+      self._updater(value)
 
 
 class MetricResults(object):
diff --git a/sdks/python/apache_beam/metrics/metric_test.py 
b/sdks/python/apache_beam/metrics/metric_test.py
index ae66200737b..6937236a8aa 100644
--- a/sdks/python/apache_beam/metrics/metric_test.py
+++ b/sdks/python/apache_beam/metrics/metric_test.py
@@ -32,7 +32,9 @@ from apache_beam.metrics.metric import Lineage
 from apache_beam.metrics.metric import MetricResults
 from apache_beam.metrics.metric import Metrics
 from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.metrics.metric import MetricsFlag
 from apache_beam.metrics.metricbase import MetricName
+from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner
 from apache_beam.runners.worker import statesampler
 from apache_beam.testing.metric_result_matchers import DistributionMatcher
@@ -121,6 +123,108 @@ class MetricsTest(unittest.TestCase):
     with self.assertRaises(ValueError):
       Metrics.get_namespace(object())
 
+  def test_metrics_flag(self):
+    MetricsFlag.reset()
+    try:
+      self.assertFalse(MetricsFlag.counter_disabled)
+      self.assertFalse(MetricsFlag.string_set_disabled)
+      self.assertFalse(MetricsFlag.bounded_trie_disabled)
+
+      options = PipelineOptions(['--experiments=disableCounterMetrics'])
+      MetricsFlag.set_default_pipeline_options(options)
+      self.assertTrue(MetricsFlag.counter_disabled)
+      self.assertFalse(MetricsFlag.string_set_disabled)
+      self.assertFalse(MetricsFlag.bounded_trie_disabled)
+
+      MetricsFlag.reset()
+      options = PipelineOptions(['--experiments=disableStringSetMetrics'])
+      MetricsFlag.set_default_pipeline_options(options)
+      self.assertFalse(MetricsFlag.counter_disabled)
+      self.assertTrue(MetricsFlag.string_set_disabled)
+      self.assertFalse(MetricsFlag.bounded_trie_disabled)
+
+      MetricsFlag.reset()
+      options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
+      MetricsFlag.set_default_pipeline_options(options)
+      self.assertFalse(MetricsFlag.counter_disabled)
+      self.assertFalse(MetricsFlag.string_set_disabled)
+      self.assertTrue(MetricsFlag.bounded_trie_disabled)
+
+      MetricsFlag.reset()
+      options = PipelineOptions([
+          '--experiments=disableCounterMetrics',
+          '--experiments=disableStringSetMetrics',
+          '--experiments=disableBoundedTrieMetrics',
+      ])
+      MetricsFlag.set_default_pipeline_options(options)
+      self.assertTrue(MetricsFlag.counter_disabled)
+      self.assertTrue(MetricsFlag.string_set_disabled)
+      self.assertTrue(MetricsFlag.bounded_trie_disabled)
+    finally:
+      MetricsFlag.reset()
+
+  def test_disabled_counter_is_noop(self):
+    sampler = statesampler.StateSampler('', counters.CounterFactory())
+    statesampler.set_current_tracker(sampler)
+    state = sampler.scoped_state(
+        'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
+    MetricsFlag.reset()
+    try:
+      sampler.start()
+      with state:
+        container = MetricsEnvironment.current_container()
+        Metrics.counter('ns', 'baseline').inc()
+        self.assertEqual(len(container.metrics), 1)
+        options = PipelineOptions(['--experiments=disableCounterMetrics'])
+        MetricsFlag.set_default_pipeline_options(options)
+        Metrics.counter('ns', 'after_disable').inc()
+        Metrics.counter('ns', 'after_disable').inc(5)
+        Metrics.counter('ns', 'after_disable').dec()
+        self.assertEqual(len(container.metrics), 1)
+    finally:
+      sampler.stop()
+      MetricsFlag.reset()
+
+  def test_disabled_string_set_is_noop(self):
+    sampler = statesampler.StateSampler('', counters.CounterFactory())
+    statesampler.set_current_tracker(sampler)
+    state = sampler.scoped_state(
+        'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
+    MetricsFlag.reset()
+    try:
+      sampler.start()
+      with state:
+        container = MetricsEnvironment.current_container()
+        Metrics.string_set('ns', 'baseline').add('seed')
+        self.assertEqual(len(container.metrics), 1)
+        options = PipelineOptions(['--experiments=disableStringSetMetrics'])
+        MetricsFlag.set_default_pipeline_options(options)
+        Metrics.string_set('ns', 'after_disable').add('value')
+        self.assertEqual(len(container.metrics), 1)
+    finally:
+      sampler.stop()
+      MetricsFlag.reset()
+
+  def test_disabled_bounded_trie_is_noop(self):
+    sampler = statesampler.StateSampler('', counters.CounterFactory())
+    statesampler.set_current_tracker(sampler)
+    state = sampler.scoped_state(
+        'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
+    MetricsFlag.reset()
+    try:
+      sampler.start()
+      with state:
+        container = MetricsEnvironment.current_container()
+        Metrics.bounded_trie('ns', 'baseline').add(['a'])
+        self.assertEqual(len(container.metrics), 1)
+        options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
+        MetricsFlag.set_default_pipeline_options(options)
+        Metrics.bounded_trie('ns', 'after_disable').add(['a', 'b'])
+        self.assertEqual(len(container.metrics), 1)
+    finally:
+      sampler.stop()
+      MetricsFlag.reset()
+
   def test_counter_empty_name(self):
     with self.assertRaises(ValueError):
       Metrics.counter("namespace", "")
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 750868f7443..594660d9bea 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -73,6 +73,7 @@ from apache_beam import pvalue
 from apache_beam.coders import typecoders
 from apache_beam.internal import pickler
 from apache_beam.io.filesystems import FileSystems
+from apache_beam.metrics.metric import MetricsFlag
 from apache_beam.options.pipeline_options import CrossLanguageOptions
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import PipelineOptions
@@ -192,6 +193,7 @@ class Pipeline(HasDisplayData):
       self._options = PipelineOptions([])
 
     FileSystems.set_options(self._options)
+    MetricsFlag.set_default_pipeline_options(self._options)
 
     if runner is None:
       runner = self._options.view_as(StandardOptions).runner
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 754a631eaf3..58beda96d63 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -33,6 +33,7 @@ from google.protobuf import text_format
 
 from apache_beam.internal import pickler
 from apache_beam.io import filesystems
+from apache_beam.metrics import metric
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import PipelineOptions
@@ -123,6 +124,7 @@ def create_harness(environment, dry_run=False):
   RuntimeValueProvider.set_runtime_options(pipeline_options_dict)
   sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict)
   filesystems.FileSystems.set_options(sdk_pipeline_options)
+  metric.MetricsFlag.set_default_pipeline_options(sdk_pipeline_options)
   pickle_library = sdk_pipeline_options.view_as(SetupOptions).pickle_library
   pickler.set_library(pickle_library)
 

Reply via email to