This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7d3af58 Make num-stages counter into an internal counter.
new c7a7eec Merge pull request #16638 from robertwb/internal-counters
7d3af58 is described below
commit 7d3af58d08c2b4bbe46b76ed76621fc7ea3303f6
Author: Robert Bradshaw <[email protected]>
AuthorDate: Thu Jan 27 14:52:05 2022 -0800
Make num-stages counter into an internal counter.
This unbreaks tests that (arguably to brittly) reject extra counters.
---
sdks/python/apache_beam/dataframe/transforms_test.py | 2 +-
.../apache_beam/runners/portability/fn_api_runner/fn_runner.py | 7 +++++--
2 files changed, 6 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py
b/sdks/python/apache_beam/dataframe/transforms_test.py
index c4aa6869..cd76f07 100644
--- a/sdks/python/apache_beam/dataframe/transforms_test.py
+++ b/sdks/python/apache_beam/dataframe/transforms_test.py
@@ -353,7 +353,7 @@ class TransformTest(unittest.TestCase):
class FusionTest(unittest.TestCase):
@staticmethod
def fused_stages(p):
- return p.result.metrics().query(
+ return p.result.monitoring_metrics().query(
metrics.MetricsFilter().with_name(
fn_runner.FnApiRunner.NUM_FUSED_STAGES_COUNTER)
)['counters'][0].result
diff --git
a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index 9b08427..9a3e100 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -358,8 +358,11 @@ class FnApiRunner(runner.PipelineRunner):
stage_context.components.environments, self._provision_info)
pipeline_metrics = MetricsContainer('')
pipeline_metrics.get_counter(
- MetricName(str(type(self)),
- self.NUM_FUSED_STAGES_COUNTER)).update(len(stages))
+ MetricName(
+ str(type(self)),
+ self.NUM_FUSED_STAGES_COUNTER,
+ urn='internal:' + self.NUM_FUSED_STAGES_COUNTER)).update(
+ len(stages))
monitoring_infos_by_stage = {}
runner_execution_context = execution.FnApiRunnerExecutionContext(