This is an automated email from the ASF dual-hosted git repository.
Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c01ceeab24c [Python] Honor disableCounterMetrics,
disableStringSetMetrics, and disableBoundedTrieMetrics experiments (#38749)
c01ceeab24c is described below
commit c01ceeab24c7cae7167dbc08b21529339659094f
Author: Anurag Pappula <[email protected]>
AuthorDate: Thu Jun 4 22:53:44 2026 +0530
[Python] Honor disableCounterMetrics, disableStringSetMetrics, and
disableBoundedTrieMetrics experiments (#38749)
* [Python] Honor disableCounterMetrics / disableStringSetMetrics /
disableBoundedTrieMetrics experiments
Mirrors the Java SDK Metrics.MetricsFlag behavior so high-throughput Python
pipelines can opt out of user metric kinds that add pressure to metric
backends.
Adds a process-wide MetricsFlag singleton initialized once at worker harness
startup from pipeline experiments. When a flag is set, the corresponding
Delegating* class short-circuits its update path so no MetricCell is touched
and no monitoring info is emitted.
For #38746.
* [Python] Add unit tests for MetricsFlag
Ports Java MetricsTest.testMetricsFlag and adds three smoke tests confirming
the disabled Counter / StringSet / BoundedTrie short-circuit without raising
when called on a no-current-tracker path.
For #38746.
* [Python] Note disable*Metrics support in CHANGES.md
For #38746.
* [Python] Use public class attrs on MetricsFlag for hot-path reads
Drops the classmethod getter wrapper around each disabled flag and exposes
counter_disabled / string_set_disabled / bounded_trie_disabled directly as
public class attributes. The gate runs on every metric update, so swapping
a descriptor lookup + function call for a single attribute load matters in
exactly the high-throughput pipelines these experiments are designed for.
Idiomatic Python; java parity is about behavior, not internal API shape.
Addresses review feedback.
For #38746.
* [Python] Address PR review: simplify MetricsFlag pydoc and reset in
finally
* [Python] Address PR review: assert no MetricCell created when disabled
* [Python] Initialize MetricsFlag from Pipeline so in-process runners honor
disable* experiments
The previous init only ran in the gRPC SDK harness (sdk_worker_main),
so DirectRunner and other in-process runners silently ignored the
disableCounterMetrics / disableStringSetMetrics / disableBoundedTrieMetrics
experiments. Initializing in Pipeline.__init__ matches the pattern of
FileSystems.set_options at the same call site and mirrors the Java
init point (PipelineRunner.run).
---------
Co-authored-by: Yi Hu <[email protected]>
---
CHANGES.md | 1 +
sdks/python/apache_beam/metrics/metric.py | 56 ++++++++++-
sdks/python/apache_beam/metrics/metric_test.py | 104 +++++++++++++++++++++
sdks/python/apache_beam/pipeline.py | 2 +
.../apache_beam/runners/worker/sdk_worker_main.py | 2 +
5 files changed, 161 insertions(+), 4 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 698d88b01fa..ac8215f3549 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -69,6 +69,7 @@
## New Features / Improvements
+* X feature added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
* (Java) Enabled state tag encoding v2 by default for new Dataflow Streaming
Engine jobs. It can be disabled by passing
`--experiments=disable_streaming_engine_state_tag_encoding_v2` or
`--updateCompatibilityVersion=2.74.0` pipeline option. Note that the tag
encoding version cannot change during a job update. Jobs using tag encoding v2
(enabled by default for new jobs on 2.75.0+) cannot be downgraded to Beam
versions prior to 2.73.0, as only versions 2.73.0 and later support tag
encoding [...]
## Breaking Changes
diff --git a/sdks/python/apache_beam/metrics/metric.py
b/sdks/python/apache_beam/metrics/metric.py
index b15237cae02..a66eed640be 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -46,11 +46,13 @@ from apache_beam.metrics.metricbase import Gauge
from apache_beam.metrics.metricbase import Histogram
from apache_beam.metrics.metricbase import MetricName
from apache_beam.metrics.metricbase import StringSet
+from apache_beam.options.pipeline_options import DebugOptions
if TYPE_CHECKING:
from apache_beam.internal.metrics.metric import MetricLogger
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.metricbase import Metric
+ from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.utils.histogram import BucketType
__all__ = ['Metrics', 'MetricsFilter', 'Lineage']
@@ -58,6 +60,37 @@ __all__ = ['Metrics', 'MetricsFilter', 'Lineage']
_LOGGER = logging.getLogger(__name__)
+class MetricsFlag(object):
+ """Process-wide flags controlling which user metric kinds are emitted."""
+ counter_disabled = False
+ string_set_disabled = False
+ bounded_trie_disabled = False
+ _initialized = False
+
+ @classmethod
+ def set_default_pipeline_options(cls, options: 'PipelineOptions') -> None:
+ if cls._initialized:
+ return
+ debug_options = options.view_as(DebugOptions)
+ if debug_options.lookup_experiment('disableCounterMetrics'):
+ cls.counter_disabled = True
+ _LOGGER.info('Counter metrics are disabled.')
+ if debug_options.lookup_experiment('disableStringSetMetrics'):
+ cls.string_set_disabled = True
+ _LOGGER.info('StringSet metrics are disabled.')
+ if debug_options.lookup_experiment('disableBoundedTrieMetrics'):
+ cls.bounded_trie_disabled = True
+ _LOGGER.info('BoundedTrie metrics are disabled.')
+ cls._initialized = True
+
+ @classmethod
+ def reset(cls) -> None:
+ cls.counter_disabled = False
+ cls.string_set_disabled = False
+ cls.bounded_trie_disabled = False
+ cls._initialized = False
+
+
class Metrics(object):
"""Lets users create/access metric objects during pipeline execution."""
@staticmethod
@@ -204,12 +237,17 @@ class Metrics(object):
def __init__(
self, metric_name: MetricName, process_wide: bool = False) -> None:
super().__init__(metric_name)
- self.inc = MetricUpdater( # type: ignore[method-assign]
+ self._updater = MetricUpdater(
cells.CounterCell,
metric_name,
default_value=1,
process_wide=process_wide)
+ def inc(self, n: int = 1) -> None:
+ if MetricsFlag.counter_disabled:
+ return
+ self._updater(n)
+
class DelegatingDistribution(Distribution):
"""Metrics Distribution Delegates functionality to MetricsEnvironment."""
def __init__(
@@ -231,13 +269,23 @@ class Metrics(object):
"""Metrics StringSet that Delegates functionality to MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
- self.add = MetricUpdater(cells.StringSetCell, metric_name) # type:
ignore[method-assign]
+ self._updater = MetricUpdater(cells.StringSetCell, metric_name)
+
+ def add(self, value: str) -> None:
+ if MetricsFlag.string_set_disabled:
+ return
+ self._updater(value)
class DelegatingBoundedTrie(BoundedTrie):
- """Metrics StringSet that Delegates functionality to MetricsEnvironment."""
+ """Metrics BoundedTrie that Delegates functionality to
MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
- self.add = MetricUpdater(cells.BoundedTrieCell, metric_name) # type:
ignore[method-assign]
+ self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name)
+
+ def add(self, value) -> None:
+ if MetricsFlag.bounded_trie_disabled:
+ return
+ self._updater(value)
class MetricResults(object):
diff --git a/sdks/python/apache_beam/metrics/metric_test.py
b/sdks/python/apache_beam/metrics/metric_test.py
index ae66200737b..6937236a8aa 100644
--- a/sdks/python/apache_beam/metrics/metric_test.py
+++ b/sdks/python/apache_beam/metrics/metric_test.py
@@ -32,7 +32,9 @@ from apache_beam.metrics.metric import Lineage
from apache_beam.metrics.metric import MetricResults
from apache_beam.metrics.metric import Metrics
from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.metrics.metric import MetricsFlag
from apache_beam.metrics.metricbase import MetricName
+from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner
from apache_beam.runners.worker import statesampler
from apache_beam.testing.metric_result_matchers import DistributionMatcher
@@ -121,6 +123,108 @@ class MetricsTest(unittest.TestCase):
with self.assertRaises(ValueError):
Metrics.get_namespace(object())
+ def test_metrics_flag(self):
+ MetricsFlag.reset()
+ try:
+ self.assertFalse(MetricsFlag.counter_disabled)
+ self.assertFalse(MetricsFlag.string_set_disabled)
+ self.assertFalse(MetricsFlag.bounded_trie_disabled)
+
+ options = PipelineOptions(['--experiments=disableCounterMetrics'])
+ MetricsFlag.set_default_pipeline_options(options)
+ self.assertTrue(MetricsFlag.counter_disabled)
+ self.assertFalse(MetricsFlag.string_set_disabled)
+ self.assertFalse(MetricsFlag.bounded_trie_disabled)
+
+ MetricsFlag.reset()
+ options = PipelineOptions(['--experiments=disableStringSetMetrics'])
+ MetricsFlag.set_default_pipeline_options(options)
+ self.assertFalse(MetricsFlag.counter_disabled)
+ self.assertTrue(MetricsFlag.string_set_disabled)
+ self.assertFalse(MetricsFlag.bounded_trie_disabled)
+
+ MetricsFlag.reset()
+ options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
+ MetricsFlag.set_default_pipeline_options(options)
+ self.assertFalse(MetricsFlag.counter_disabled)
+ self.assertFalse(MetricsFlag.string_set_disabled)
+ self.assertTrue(MetricsFlag.bounded_trie_disabled)
+
+ MetricsFlag.reset()
+ options = PipelineOptions([
+ '--experiments=disableCounterMetrics',
+ '--experiments=disableStringSetMetrics',
+ '--experiments=disableBoundedTrieMetrics',
+ ])
+ MetricsFlag.set_default_pipeline_options(options)
+ self.assertTrue(MetricsFlag.counter_disabled)
+ self.assertTrue(MetricsFlag.string_set_disabled)
+ self.assertTrue(MetricsFlag.bounded_trie_disabled)
+ finally:
+ MetricsFlag.reset()
+
+ def test_disabled_counter_is_noop(self):
+ sampler = statesampler.StateSampler('', counters.CounterFactory())
+ statesampler.set_current_tracker(sampler)
+ state = sampler.scoped_state(
+ 'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
+ MetricsFlag.reset()
+ try:
+ sampler.start()
+ with state:
+ container = MetricsEnvironment.current_container()
+ Metrics.counter('ns', 'baseline').inc()
+ self.assertEqual(len(container.metrics), 1)
+ options = PipelineOptions(['--experiments=disableCounterMetrics'])
+ MetricsFlag.set_default_pipeline_options(options)
+ Metrics.counter('ns', 'after_disable').inc()
+ Metrics.counter('ns', 'after_disable').inc(5)
+ Metrics.counter('ns', 'after_disable').dec()
+ self.assertEqual(len(container.metrics), 1)
+ finally:
+ sampler.stop()
+ MetricsFlag.reset()
+
+ def test_disabled_string_set_is_noop(self):
+ sampler = statesampler.StateSampler('', counters.CounterFactory())
+ statesampler.set_current_tracker(sampler)
+ state = sampler.scoped_state(
+ 'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
+ MetricsFlag.reset()
+ try:
+ sampler.start()
+ with state:
+ container = MetricsEnvironment.current_container()
+ Metrics.string_set('ns', 'baseline').add('seed')
+ self.assertEqual(len(container.metrics), 1)
+ options = PipelineOptions(['--experiments=disableStringSetMetrics'])
+ MetricsFlag.set_default_pipeline_options(options)
+ Metrics.string_set('ns', 'after_disable').add('value')
+ self.assertEqual(len(container.metrics), 1)
+ finally:
+ sampler.stop()
+ MetricsFlag.reset()
+
+ def test_disabled_bounded_trie_is_noop(self):
+ sampler = statesampler.StateSampler('', counters.CounterFactory())
+ statesampler.set_current_tracker(sampler)
+ state = sampler.scoped_state(
+ 'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
+ MetricsFlag.reset()
+ try:
+ sampler.start()
+ with state:
+ container = MetricsEnvironment.current_container()
+ Metrics.bounded_trie('ns', 'baseline').add(['a'])
+ self.assertEqual(len(container.metrics), 1)
+ options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
+ MetricsFlag.set_default_pipeline_options(options)
+ Metrics.bounded_trie('ns', 'after_disable').add(['a', 'b'])
+ self.assertEqual(len(container.metrics), 1)
+ finally:
+ sampler.stop()
+ MetricsFlag.reset()
+
def test_counter_empty_name(self):
with self.assertRaises(ValueError):
Metrics.counter("namespace", "")
diff --git a/sdks/python/apache_beam/pipeline.py
b/sdks/python/apache_beam/pipeline.py
index 750868f7443..594660d9bea 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -73,6 +73,7 @@ from apache_beam import pvalue
from apache_beam.coders import typecoders
from apache_beam.internal import pickler
from apache_beam.io.filesystems import FileSystems
+from apache_beam.metrics.metric import MetricsFlag
from apache_beam.options.pipeline_options import CrossLanguageOptions
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
@@ -192,6 +193,7 @@ class Pipeline(HasDisplayData):
self._options = PipelineOptions([])
FileSystems.set_options(self._options)
+ MetricsFlag.set_default_pipeline_options(self._options)
if runner is None:
runner = self._options.view_as(StandardOptions).runner
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 754a631eaf3..58beda96d63 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -33,6 +33,7 @@ from google.protobuf import text_format
from apache_beam.internal import pickler
from apache_beam.io import filesystems
+from apache_beam.metrics import metric
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
@@ -123,6 +124,7 @@ def create_harness(environment, dry_run=False):
RuntimeValueProvider.set_runtime_options(pipeline_options_dict)
sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict)
filesystems.FileSystems.set_options(sdk_pipeline_options)
+ metric.MetricsFlag.set_default_pipeline_options(sdk_pipeline_options)
pickle_library = sdk_pipeline_options.view_as(SetupOptions).pickle_library
pickler.set_library(pickle_library)