This is an automated email from the ASF dual-hosted git repository.
Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 02168d3babe Revert "[Python] Honor disableCounterMetrics,
disableStringSetMetrics, and disableBoundedTrieMetrics experiments" (#38901)
02168d3babe is described below
commit 02168d3babe5dbb27c4892bf57c8cdae4d56242c
Author: Yi Hu <[email protected]>
AuthorDate: Wed Jun 10 17:37:16 2026 -0400
Revert "[Python] Honor disableCounterMetrics, disableStringSetMetrics, and
disableBoundedTrieMetrics experiments" (#38901)
* Revert "[Python] Honor disableCounterMetrics, disableStringSetMetrics,
and di…"
This reverts commit c01ceeab24c7cae7167dbc08b21529339659094f.
* Apply suggestions from code review
Co-authored-by: gemini-code-assist[bot]
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
---------
Co-authored-by: gemini-code-assist[bot]
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
---
CHANGES.md | 1 -
sdks/python/apache_beam/metrics/metric.py | 54 +----------
sdks/python/apache_beam/metrics/metric_test.py | 104 ---------------------
sdks/python/apache_beam/pipeline.py | 2 -
.../apache_beam/runners/worker/sdk_worker_main.py | 2 -
5 files changed, 3 insertions(+), 160 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index ac8215f3549..698d88b01fa 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -69,7 +69,6 @@
## New Features / Improvements
-* X feature added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
* (Java) Enabled state tag encoding v2 by default for new Dataflow Streaming
Engine jobs. It can be disabled by passing
`--experiments=disable_streaming_engine_state_tag_encoding_v2` or
`--updateCompatibilityVersion=2.74.0` pipeline option. Note that the tag
encoding version cannot change during a job update. Jobs using tag encoding v2
(enabled by default for new jobs on 2.75.0+) cannot be downgraded to Beam
versions prior to 2.73.0, as only versions 2.73.0 and later support tag
encoding [...]
## Breaking Changes
diff --git a/sdks/python/apache_beam/metrics/metric.py
b/sdks/python/apache_beam/metrics/metric.py
index a66eed640be..6e6757be11d 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -46,13 +46,11 @@ from apache_beam.metrics.metricbase import Gauge
from apache_beam.metrics.metricbase import Histogram
from apache_beam.metrics.metricbase import MetricName
from apache_beam.metrics.metricbase import StringSet
-from apache_beam.options.pipeline_options import DebugOptions
if TYPE_CHECKING:
from apache_beam.internal.metrics.metric import MetricLogger
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.metricbase import Metric
- from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.utils.histogram import BucketType
__all__ = ['Metrics', 'MetricsFilter', 'Lineage']
@@ -60,37 +58,6 @@ __all__ = ['Metrics', 'MetricsFilter', 'Lineage']
_LOGGER = logging.getLogger(__name__)
-class MetricsFlag(object):
- """Process-wide flags controlling which user metric kinds are emitted."""
- counter_disabled = False
- string_set_disabled = False
- bounded_trie_disabled = False
- _initialized = False
-
- @classmethod
- def set_default_pipeline_options(cls, options: 'PipelineOptions') -> None:
- if cls._initialized:
- return
- debug_options = options.view_as(DebugOptions)
- if debug_options.lookup_experiment('disableCounterMetrics'):
- cls.counter_disabled = True
- _LOGGER.info('Counter metrics are disabled.')
- if debug_options.lookup_experiment('disableStringSetMetrics'):
- cls.string_set_disabled = True
- _LOGGER.info('StringSet metrics are disabled.')
- if debug_options.lookup_experiment('disableBoundedTrieMetrics'):
- cls.bounded_trie_disabled = True
- _LOGGER.info('BoundedTrie metrics are disabled.')
- cls._initialized = True
-
- @classmethod
- def reset(cls) -> None:
- cls.counter_disabled = False
- cls.string_set_disabled = False
- cls.bounded_trie_disabled = False
- cls._initialized = False
-
-
class Metrics(object):
"""Lets users create/access metric objects during pipeline execution."""
@staticmethod
@@ -237,17 +204,12 @@ class Metrics(object):
def __init__(
self, metric_name: MetricName, process_wide: bool = False) -> None:
super().__init__(metric_name)
- self._updater = MetricUpdater(
+ self.inc = MetricUpdater( # type: ignore[method-assign]
cells.CounterCell,
metric_name,
default_value=1,
process_wide=process_wide)
- def inc(self, n: int = 1) -> None:
- if MetricsFlag.counter_disabled:
- return
- self._updater(n)
-
class DelegatingDistribution(Distribution):
"""Metrics Distribution Delegates functionality to MetricsEnvironment."""
def __init__(
@@ -269,23 +231,13 @@ class Metrics(object):
"""Metrics StringSet that Delegates functionality to MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
- self._updater = MetricUpdater(cells.StringSetCell, metric_name)
-
- def add(self, value: str) -> None:
- if MetricsFlag.string_set_disabled:
- return
- self._updater(value)
+ self.add = MetricUpdater(cells.StringSetCell, metric_name) # type:
ignore[method-assign]
class DelegatingBoundedTrie(BoundedTrie):
"""Metrics BoundedTrie that Delegates functionality to
MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
- self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name)
-
- def add(self, value) -> None:
- if MetricsFlag.bounded_trie_disabled:
- return
- self._updater(value)
+ self.add = MetricUpdater(cells.BoundedTrieCell, metric_name) # type:
ignore[method-assign]
class MetricResults(object):
diff --git a/sdks/python/apache_beam/metrics/metric_test.py
b/sdks/python/apache_beam/metrics/metric_test.py
index 6937236a8aa..ae66200737b 100644
--- a/sdks/python/apache_beam/metrics/metric_test.py
+++ b/sdks/python/apache_beam/metrics/metric_test.py
@@ -32,9 +32,7 @@ from apache_beam.metrics.metric import Lineage
from apache_beam.metrics.metric import MetricResults
from apache_beam.metrics.metric import Metrics
from apache_beam.metrics.metric import MetricsFilter
-from apache_beam.metrics.metric import MetricsFlag
from apache_beam.metrics.metricbase import MetricName
-from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner
from apache_beam.runners.worker import statesampler
from apache_beam.testing.metric_result_matchers import DistributionMatcher
@@ -123,108 +121,6 @@ class MetricsTest(unittest.TestCase):
with self.assertRaises(ValueError):
Metrics.get_namespace(object())
- def test_metrics_flag(self):
- MetricsFlag.reset()
- try:
- self.assertFalse(MetricsFlag.counter_disabled)
- self.assertFalse(MetricsFlag.string_set_disabled)
- self.assertFalse(MetricsFlag.bounded_trie_disabled)
-
- options = PipelineOptions(['--experiments=disableCounterMetrics'])
- MetricsFlag.set_default_pipeline_options(options)
- self.assertTrue(MetricsFlag.counter_disabled)
- self.assertFalse(MetricsFlag.string_set_disabled)
- self.assertFalse(MetricsFlag.bounded_trie_disabled)
-
- MetricsFlag.reset()
- options = PipelineOptions(['--experiments=disableStringSetMetrics'])
- MetricsFlag.set_default_pipeline_options(options)
- self.assertFalse(MetricsFlag.counter_disabled)
- self.assertTrue(MetricsFlag.string_set_disabled)
- self.assertFalse(MetricsFlag.bounded_trie_disabled)
-
- MetricsFlag.reset()
- options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
- MetricsFlag.set_default_pipeline_options(options)
- self.assertFalse(MetricsFlag.counter_disabled)
- self.assertFalse(MetricsFlag.string_set_disabled)
- self.assertTrue(MetricsFlag.bounded_trie_disabled)
-
- MetricsFlag.reset()
- options = PipelineOptions([
- '--experiments=disableCounterMetrics',
- '--experiments=disableStringSetMetrics',
- '--experiments=disableBoundedTrieMetrics',
- ])
- MetricsFlag.set_default_pipeline_options(options)
- self.assertTrue(MetricsFlag.counter_disabled)
- self.assertTrue(MetricsFlag.string_set_disabled)
- self.assertTrue(MetricsFlag.bounded_trie_disabled)
- finally:
- MetricsFlag.reset()
-
- def test_disabled_counter_is_noop(self):
- sampler = statesampler.StateSampler('', counters.CounterFactory())
- statesampler.set_current_tracker(sampler)
- state = sampler.scoped_state(
- 'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
- MetricsFlag.reset()
- try:
- sampler.start()
- with state:
- container = MetricsEnvironment.current_container()
- Metrics.counter('ns', 'baseline').inc()
- self.assertEqual(len(container.metrics), 1)
- options = PipelineOptions(['--experiments=disableCounterMetrics'])
- MetricsFlag.set_default_pipeline_options(options)
- Metrics.counter('ns', 'after_disable').inc()
- Metrics.counter('ns', 'after_disable').inc(5)
- Metrics.counter('ns', 'after_disable').dec()
- self.assertEqual(len(container.metrics), 1)
- finally:
- sampler.stop()
- MetricsFlag.reset()
-
- def test_disabled_string_set_is_noop(self):
- sampler = statesampler.StateSampler('', counters.CounterFactory())
- statesampler.set_current_tracker(sampler)
- state = sampler.scoped_state(
- 'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
- MetricsFlag.reset()
- try:
- sampler.start()
- with state:
- container = MetricsEnvironment.current_container()
- Metrics.string_set('ns', 'baseline').add('seed')
- self.assertEqual(len(container.metrics), 1)
- options = PipelineOptions(['--experiments=disableStringSetMetrics'])
- MetricsFlag.set_default_pipeline_options(options)
- Metrics.string_set('ns', 'after_disable').add('value')
- self.assertEqual(len(container.metrics), 1)
- finally:
- sampler.stop()
- MetricsFlag.reset()
-
- def test_disabled_bounded_trie_is_noop(self):
- sampler = statesampler.StateSampler('', counters.CounterFactory())
- statesampler.set_current_tracker(sampler)
- state = sampler.scoped_state(
- 'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
- MetricsFlag.reset()
- try:
- sampler.start()
- with state:
- container = MetricsEnvironment.current_container()
- Metrics.bounded_trie('ns', 'baseline').add(['a'])
- self.assertEqual(len(container.metrics), 1)
- options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
- MetricsFlag.set_default_pipeline_options(options)
- Metrics.bounded_trie('ns', 'after_disable').add(['a', 'b'])
- self.assertEqual(len(container.metrics), 1)
- finally:
- sampler.stop()
- MetricsFlag.reset()
-
def test_counter_empty_name(self):
with self.assertRaises(ValueError):
Metrics.counter("namespace", "")
diff --git a/sdks/python/apache_beam/pipeline.py
b/sdks/python/apache_beam/pipeline.py
index 594660d9bea..750868f7443 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -73,7 +73,6 @@ from apache_beam import pvalue
from apache_beam.coders import typecoders
from apache_beam.internal import pickler
from apache_beam.io.filesystems import FileSystems
-from apache_beam.metrics.metric import MetricsFlag
from apache_beam.options.pipeline_options import CrossLanguageOptions
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
@@ -193,7 +192,6 @@ class Pipeline(HasDisplayData):
self._options = PipelineOptions([])
FileSystems.set_options(self._options)
- MetricsFlag.set_default_pipeline_options(self._options)
if runner is None:
runner = self._options.view_as(StandardOptions).runner
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 58beda96d63..754a631eaf3 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -33,7 +33,6 @@ from google.protobuf import text_format
from apache_beam.internal import pickler
from apache_beam.io import filesystems
-from apache_beam.metrics import metric
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
@@ -124,7 +123,6 @@ def create_harness(environment, dry_run=False):
RuntimeValueProvider.set_runtime_options(pipeline_options_dict)
sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict)
filesystems.FileSystems.set_options(sdk_pipeline_options)
- metric.MetricsFlag.set_default_pipeline_options(sdk_pipeline_options)
pickle_library = sdk_pipeline_options.view_as(SetupOptions).pickle_library
pickler.set_library(pickle_library)