This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5ff8475 [BEAM-7088] Implement switch to using MonitoringInfo labels
for Name and Namespace. (#8316)
5ff8475 is described below
commit 5ff8475e370c5a746ddc6e6cfd7b63004d8c40e3
Author: Mikhail Gryzykhin <[email protected]>
AuthorDate: Fri Apr 19 16:08:28 2019 -0700
[BEAM-7088] Implement switch to using MonitoringInfo labels for Name and
Namespace. (#8316)
* Implement switch to using MonitoringInfo labels for Name and Namespace
in Java.
* Add backwards compatibility to MetricName and patch flink tests
---
model/pipeline/src/main/proto/metrics.proto | 16 +++-
.../beam/runners/core/metrics/MetricUrns.java | 41 --------
.../runners/core/metrics/MetricsContainerImpl.java | 87 +++++++++--------
.../core/metrics/MonitoringInfoConstants.java | 7 +-
.../core/metrics/MonitoringInfoMetricName.java | 43 ++++-----
.../core/metrics/SimpleMonitoringInfoBuilder.java | 50 +---------
.../core/metrics/MetricsContainerImplTest.java | 27 +++---
.../core/metrics/MetricsContainerStepMapTest.java | 9 +-
.../core/metrics/MonitoringInfoMetricNameTest.java | 29 +++---
.../core/metrics/MonitoringInfoTestUtil.java | 2 +-
.../metrics/SimpleMonitoringInfoBuilderTest.java | 40 ++------
.../core/metrics/SimpleStateRegistryTest.java | 6 +-
.../metrics/SpecMonitoringInfoValidatorTest.java | 6 +-
.../flink/metrics/FlinkMetricContainerTest.java | 68 +++++++------
...onMonitoringInfoToCounterUpdateTransformer.java | 29 ++----
...erMonitoringInfoToCounterUpdateTransformer.java | 25 ++---
.../fn/control/BeamFnMapTaskExecutorTest.java | 6 +-
...nitoringInfoToCounterUpdateTransformerTest.java | 8 +-
...nitoringInfoToCounterUpdateTransformerTest.java | 8 +-
.../fnexecution/control/RemoteExecutionTest.java | 47 ++++++---
.../beam/fn/harness/FnApiDoFnRunnerTest.java | 20 ++--
.../data/ElementCountFnDataReceiverTest.java | 4 +-
sdks/python/apache_beam/metrics/execution.py | 14 ++-
.../python/apache_beam/metrics/monitoring_infos.py | 106 +++++++++++----------
.../apache_beam/metrics/monitoring_infos_test.py | 18 ++--
sdks/python/apache_beam/portability/common_urns.py | 4 +
26 files changed, 335 insertions(+), 385 deletions(-)
diff --git a/model/pipeline/src/main/proto/metrics.proto
b/model/pipeline/src/main/proto/metrics.proto
index 43ec994..3b45efb 100644
--- a/model/pipeline/src/main/proto/metrics.proto
+++ b/model/pipeline/src/main/proto/metrics.proto
@@ -62,8 +62,13 @@ message MonitoringInfoSpecs {
// TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
// upgrading the python SDK.
USER_COUNTER = 0 [(monitoring_info_spec) = {
- urn: "beam:metric:user:",
+ urn: "beam:metric:user",
type_urn: "beam:metrics:sum_int_64",
+ required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+ annotations: [{
+ key: "description",
+ value: "URN utilized to report user numeric counters."
+ }]
}];
ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
@@ -122,8 +127,13 @@ message MonitoringInfoSpecs {
// TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
// upgrading the python SDK.
USER_DISTRIBUTION_COUNTER = 6 [(monitoring_info_spec) = {
- urn: "beam:metric:user_distribution:",
+ urn: "beam:metric:user_distribution",
type_urn: "beam:metrics:distribution_int_64",
+ required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+ annotations: [{
+ key: "description",
+ value: "URN utilized to report user distribution counters."
+ }]
}];
}
}
@@ -171,6 +181,8 @@ message MonitoringInfo {
WINDOWING_STRATEGY = 2 [(label_props) = { name: "WINDOWING_STRATEGY" }];
CODER = 3 [(label_props) = { name: "CODER" }];
ENVIRONMENT = 4 [(label_props) = { name: "ENVIRONMENT" }];
+ NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
+ NAME = 6 [(label_props) = { name: "NAME" }];
}
// A set of key+value labels which define the scope of the metric.
// Either a well defined entity id for matching the enum names in
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java
deleted file mode 100644
index 4ed5fc4..0000000
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.metrics;
-
-import org.apache.beam.sdk.metrics.MetricName;
-
-/** Utility for parsing a URN to a {@link MetricName}. */
-public class MetricUrns {
- /**
- * Parse a {@link MetricName} from a {@link
- * org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpecs.Enum}.
- *
- * <p>Should be consistent with {@code parse_namespace_and_name} in
monitoring_infos.py.
- */
- public static MetricName parseUrn(String urn) {
- if (urn.startsWith(MonitoringInfoConstants.Urns.USER_COUNTER_PREFIX)) {
- urn =
urn.substring(MonitoringInfoConstants.Urns.USER_COUNTER_PREFIX.length());
- }
- // If it is not a user counter, just use the first part of the URN, i.e.
'beam'
- String[] pieces = urn.split(":", 2);
- if (pieces.length != 2) {
- throw new IllegalArgumentException("Invalid metric URN: " + urn);
- }
- return MetricName.named(pieces[0], pieces[1]);
- }
-}
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index bf5acfe..827c9bf 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.core.metrics;
-import static org.apache.beam.runners.core.metrics.MetricUrns.parseUrn;
import static
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import java.io.Serializable;
@@ -26,6 +25,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.MetricsApi.CounterData;
import org.apache.beam.model.pipeline.v1.MetricsApi.DistributionData;
import org.apache.beam.model.pipeline.v1.MetricsApi.ExtremaData;
@@ -59,6 +59,7 @@ import org.slf4j.LoggerFactory;
*/
@Experimental(Kind.METRICS)
public class MetricsContainerImpl implements Serializable, MetricsContainer {
+
private static final Logger LOG =
LoggerFactory.getLogger(MetricsContainerImpl.class);
@Nullable private final String stepName;
@@ -150,13 +151,11 @@ public class MetricsContainerImpl implements
Serializable, MetricsContainer {
extractUpdates(counters), extractUpdates(distributions),
extractUpdates(gauges));
}
- /**
- * @param metricUpdate
- * @return The MonitoringInfo generated from the metricUpdate.
- */
+ /** @return The MonitoringInfo generated from the metricUpdate. */
@Nullable
private MonitoringInfo counterUpdateToMonitoringInfo(MetricUpdate<Long>
metricUpdate) {
SimpleMonitoringInfoBuilder builder = new
SimpleMonitoringInfoBuilder(true);
+
MetricName metricName = metricUpdate.getKey().metricName();
if (metricName instanceof MonitoringInfoMetricName) {
MonitoringInfoMetricName monitoringInfoName = (MonitoringInfoMetricName)
metricName;
@@ -165,20 +164,26 @@ public class MetricsContainerImpl implements
Serializable, MetricsContainer {
for (Entry<String, String> e :
monitoringInfoName.getLabels().entrySet()) {
builder.setLabel(e.getKey(), e.getValue());
}
- } else { // Note: (metricName instanceof MetricName) is always True.
- // Represents a user counter.
- builder.setUrnForUserMetric(
- metricUpdate.getKey().metricName().getNamespace(),
- metricUpdate.getKey().metricName().getName());
+ } else { // Represents a user counter.
// Drop if the stepname is not set. All user counters must be
// defined for a PTransform. They must be defined on a container bound
to a step.
if (this.stepName == null) {
return null;
}
- builder.setPTransformLabel(metricUpdate.getKey().stepName());
+
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE,
+ metricUpdate.getKey().metricName().getNamespace())
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAME,
metricUpdate.getKey().metricName().getName())
+ .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
metricUpdate.getKey().stepName());
}
+
builder.setInt64Value(metricUpdate.getUpdate());
builder.setTimestampToNow();
+
return builder.build();
}
@@ -191,7 +196,7 @@ public class MetricsContainerImpl implements Serializable,
MetricsContainer {
for (MetricUpdate<Long> metricUpdate : metricUpdates.counterUpdates()) {
MonitoringInfo mi = counterUpdateToMonitoringInfo(metricUpdate);
if (mi != null) {
- monitoringInfos.add(counterUpdateToMonitoringInfo(metricUpdate));
+ monitoringInfos.add(mi);
}
}
return monitoringInfos;
@@ -245,35 +250,37 @@ public class MetricsContainerImpl implements
Serializable, MetricsContainer {
public void update(Iterable<MonitoringInfo> monitoringInfos) {
monitoringInfos.forEach(
monitoringInfo -> {
- if (monitoringInfo.hasMetric()) {
- String urn = monitoringInfo.getUrn();
- MetricName metricName = parseUrn(urn);
- org.apache.beam.model.pipeline.v1.MetricsApi.Metric metric =
monitoringInfo.getMetric();
- if (metric.hasCounterData()) {
- CounterData counterData = metric.getCounterData();
- if (counterData.getValueCase() ==
CounterData.ValueCase.INT64_VALUE) {
- Counter counter = getCounter(metricName);
- counter.inc(counterData.getInt64Value());
- } else {
- LOG.warn("Unsupported CounterData type: {}", counterData);
- }
- } else if (metric.hasDistributionData()) {
- DistributionData distributionData = metric.getDistributionData();
- if (distributionData.hasIntDistributionData()) {
- Distribution distribution = getDistribution(metricName);
- IntDistributionData intDistributionData =
distributionData.getIntDistributionData();
- distribution.update(
- intDistributionData.getSum(),
- intDistributionData.getCount(),
- intDistributionData.getMin(),
- intDistributionData.getMax());
- } else {
- LOG.warn("Unsupported DistributionData type: {}",
distributionData);
- }
- } else if (metric.hasExtremaData()) {
- ExtremaData extremaData = metric.getExtremaData();
- LOG.warn("Extrema metric unsupported: {}", extremaData);
+ if (!monitoringInfo.hasMetric()) {
+ return;
+ }
+
+ MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo);
+
+ MetricsApi.Metric metric = monitoringInfo.getMetric();
+ if (metric.hasCounterData()) {
+ CounterData counterData = metric.getCounterData();
+ if (counterData.getValueCase() ==
CounterData.ValueCase.INT64_VALUE) {
+ Counter counter = getCounter(metricName);
+ counter.inc(counterData.getInt64Value());
+ } else {
+ LOG.warn("Unsupported CounterData type: {}", counterData);
+ }
+ } else if (metric.hasDistributionData()) {
+ DistributionData distributionData = metric.getDistributionData();
+ if (distributionData.hasIntDistributionData()) {
+ Distribution distribution = getDistribution(metricName);
+ IntDistributionData intDistributionData =
distributionData.getIntDistributionData();
+ distribution.update(
+ intDistributionData.getSum(),
+ intDistributionData.getCount(),
+ intDistributionData.getMin(),
+ intDistributionData.getMax());
+ } else {
+ LOG.warn("Unsupported DistributionData type: {}",
distributionData);
}
+ } else if (metric.hasExtremaData()) {
+ ExtremaData extremaData = metric.getExtremaData();
+ LOG.warn("Extrema metric unsupported: {}", extremaData);
}
});
}
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
index 66695e9..3716709 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
@@ -39,9 +39,8 @@ public final class MonitoringInfoConstants {
public static final String FINISH_BUNDLE_MSECS =
extractUrn(MonitoringInfoSpecs.Enum.FINISH_BUNDLE_MSECS);
public static final String TOTAL_MSECS =
extractUrn(MonitoringInfoSpecs.Enum.TOTAL_MSECS);
- public static final String USER_COUNTER_PREFIX =
- extractUrn(MonitoringInfoSpecs.Enum.USER_COUNTER);
- public static final String USER_DISTRIBUTION_COUNTER_PREFIX =
+ public static final String USER_COUNTER =
extractUrn(MonitoringInfoSpecs.Enum.USER_COUNTER);
+ public static final String USER_DISTRIBUTION_COUNTER =
extractUrn(MonitoringInfoSpecs.Enum.USER_DISTRIBUTION_COUNTER);
}
@@ -53,6 +52,8 @@ public final class MonitoringInfoConstants {
extractLabel(MonitoringInfoLabels.WINDOWING_STRATEGY);
public static final String CODER =
extractLabel(MonitoringInfoLabels.CODER);
public static final String ENVIRONMENT =
extractLabel(MonitoringInfoLabels.ENVIRONMENT);
+ public static final String NAMESPACE =
extractLabel(MonitoringInfoLabels.NAMESPACE);
+ public static final String NAME = extractLabel(MonitoringInfoLabels.NAME);
}
/** MonitoringInfo type Urns. */
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java
index f0a20e1..9143eff 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java
@@ -19,13 +19,11 @@ package org.apache.beam.runners.core.metrics;
import static
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
-import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
@@ -37,11 +35,9 @@ import
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
public class MonitoringInfoMetricName extends MetricName {
private String urn;
- @Nullable private String name;
- @Nullable private String namespace;
- private HashMap<String, String> labels = new HashMap<String, String>();
+ private Map<String, String> labels = new HashMap<String, String>();
- private MonitoringInfoMetricName(String urn, HashMap<String, String> labels)
{
+ private MonitoringInfoMetricName(String urn, Map<String, String> labels) {
checkArgument(!Strings.isNullOrEmpty(urn), "MonitoringInfoMetricName urn
must be non-empty");
checkArgument(labels != null, "MonitoringInfoMetricName labels must be
non-null");
// TODO(ajamato): Move SimpleMonitoringInfoBuilder to
beam-runner-core-construction-java
@@ -52,31 +48,22 @@ public class MonitoringInfoMetricName extends MetricName {
}
}
- /** Parse the urn field into a name and namespace field. */
- private void parseUrn() {
- if (this.urn.startsWith(MonitoringInfoConstants.Urns.USER_COUNTER_PREFIX))
{
- List<String> split = new
ArrayList<String>(Arrays.asList(this.getUrn().split(":")));
- this.name = split.get(split.size() - 1);
- this.namespace = split.get(split.size() - 2);
- }
- }
-
- /** @return the parsed namespace from the user metric URN, otherwise null. */
@Override
public String getNamespace() {
- if (this.namespace == null) {
- parseUrn();
+ if (labels.containsKey(MonitoringInfoConstants.Labels.NAMESPACE)) {
+ return labels.getOrDefault(MonitoringInfoConstants.Labels.NAMESPACE,
null);
+ } else {
+ return urn.split(":", 2)[0];
}
- return this.namespace;
}
- /** @return the parsed name from the user metric URN, otherwise null. */
@Override
public String getName() {
- if (this.name == null) {
- parseUrn();
+ if (labels.containsKey(MonitoringInfoConstants.Labels.NAME)) {
+ return labels.getOrDefault(MonitoringInfoConstants.Labels.NAME, null);
+ } else {
+ return urn.split(":", 2)[1];
}
- return this.name;
}
/** @return the urn of this MonitoringInfo metric. */
@@ -85,10 +72,14 @@ public class MonitoringInfoMetricName extends MetricName {
}
/** @return The labels associated with this MonitoringInfo. */
- public HashMap<String, String> getLabels() {
+ public Map<String, String> getLabels() {
return this.labels;
}
+ public static MonitoringInfoMetricName of(MetricsApi.MonitoringInfo mi) {
+ return new MonitoringInfoMetricName(mi.getUrn(), mi.getLabelsMap());
+ }
+
public static MonitoringInfoMetricName named(String urn, HashMap<String,
String> labels) {
return new MonitoringInfoMetricName(urn, labels);
}
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
index 8ab8403..23fa440 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
@@ -44,12 +44,6 @@ import
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleF
* builder.setUrn(SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN);
builder.setInt64Value(1);
* builder.setPTransformLabel("myTransform");
builder.setPCollectionLabel("myPcollection");
* MonitoringInfo mi = builder.build();
- *
- * <p>Example Usage (ElementCount counter):
- *
- * <p>SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
- *
builder.setUrn(SimpleMonitoringInfoBuilder.setUrnForUserMetric("myNamespace",
"myName"));
- * builder.setInt64Value(1); MonitoringInfo mi = builder.build();
*/
public class SimpleMonitoringInfoBuilder {
private final boolean validateAndDropInvalid;
@@ -81,18 +75,6 @@ public class SimpleMonitoringInfoBuilder {
this.validateAndDropInvalid = validateAndDropInvalid;
}
- /** @return The metric URN for a user metric, with a proper URN prefix. */
- public static String userMetricUrn(String metricNamespace, String
metricName) {
- String fixedMetricNamespace = metricNamespace.replace(':', '_');
- String fixedMetricName = metricName.replace(':', '_');
- StringBuilder sb = new StringBuilder();
- sb.append(MonitoringInfoConstants.Urns.USER_COUNTER_PREFIX);
- sb.append(fixedMetricNamespace);
- sb.append(':');
- sb.append(fixedMetricName);
- return sb.toString();
- }
-
/**
* Sets the urn of the MonitoringInfo.
*
@@ -103,17 +85,6 @@ public class SimpleMonitoringInfoBuilder {
return this;
}
- /**
- * Sets the urn of the MonitoringInfo to a proper user metric URN for the
given params.
- *
- * @param namespace
- * @param name
- */
- public SimpleMonitoringInfoBuilder setUrnForUserMetric(String namespace,
String name) {
- this.builder.setUrn(userMetricUrn(namespace, name));
- return this;
- }
-
/** Sets the timestamp of the MonitoringInfo to the current time. */
public SimpleMonitoringInfoBuilder setTimestampToNow() {
Instant time = Instant.now();
@@ -134,28 +105,17 @@ public class SimpleMonitoringInfoBuilder {
return this;
}
- /** Sets the PTRANSFORM MonitoringInfo label to the given param. */
- public SimpleMonitoringInfoBuilder setPTransformLabel(String pTransform) {
- // TODO(ajamato): Add validation that it is a valid pTransform name in the
bundle descriptor.
- setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, pTransform);
- return this;
- }
-
- /** Sets the PCOLLECTION MonitoringInfo label to the given param. */
- public SimpleMonitoringInfoBuilder setPCollectionLabel(String pCollection) {
- setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, pCollection);
- return this;
- }
-
/** Sets the MonitoringInfo label to the given name and value. */
public SimpleMonitoringInfoBuilder setLabel(String labelName, String
labelValue) {
this.builder.putLabels(labelName, labelValue);
return this;
}
- /** Clear the builder and merge from the provided monitoringInfo. */
- public void clearAndMerge(MonitoringInfo monitoringInfo) {
+ public void clear() {
this.builder = MonitoringInfo.newBuilder();
+ }
+ /** Clear the builder and merge from the provided monitoringInfo. */
+ public void merge(MonitoringInfo monitoringInfo) {
this.builder.mergeFrom(monitoringInfo);
}
@@ -164,7 +124,7 @@ public class SimpleMonitoringInfoBuilder {
* MonitoringInfos.
*/
@VisibleForTesting
- public static MonitoringInfo clearTimestamp(MonitoringInfo input) {
+ public static MonitoringInfo copyAndClearTimestamp(MonitoringInfo input) {
MonitoringInfo.Builder builder = MonitoringInfo.newBuilder();
builder.mergeFrom(input);
builder.clearTimestamp();
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
index 28a846d..61e4077 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
@@ -159,20 +159,24 @@ public class MetricsContainerImplTest {
c1.inc(3L);
SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
- builder1.setUrnForUserMetric("ns", "name1");
- builder1.setInt64Value(5);
- builder1.setPTransformLabel("step1");
- builder1.build();
+ builder1
+ .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+ .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+ .setLabel(MonitoringInfoConstants.Labels.NAME, "name1")
+ .setInt64Value(5)
+ .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");
SimpleMonitoringInfoBuilder builder2 = new SimpleMonitoringInfoBuilder();
- builder2.setUrnForUserMetric("ns", "name2");
- builder2.setInt64Value(4);
- builder2.setPTransformLabel("step1");
- builder2.build();
+ builder2
+ .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+ .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+ .setLabel(MonitoringInfoConstants.Labels.NAME, "name2")
+ .setInt64Value(4)
+ .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");
ArrayList<MonitoringInfo> actualMonitoringInfos = new
ArrayList<MonitoringInfo>();
for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
-
actualMonitoringInfos.add(SimpleMonitoringInfoBuilder.clearTimestamp(mi));
+
actualMonitoringInfos.add(SimpleMonitoringInfoBuilder.copyAndClearTimestamp(mi));
}
assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(),
builder2.build()));
@@ -190,13 +194,12 @@ public class MetricsContainerImplTest {
SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
builder1.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
- builder1.setPCollectionLabel("pcollection");
+ builder1.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION,
"pcollection");
builder1.setInt64Value(2);
- builder1.build();
ArrayList<MonitoringInfo> actualMonitoringInfos = new
ArrayList<MonitoringInfo>();
for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
-
actualMonitoringInfos.add(SimpleMonitoringInfoBuilder.clearTimestamp(mi));
+
actualMonitoringInfos.add(SimpleMonitoringInfoBuilder.copyAndClearTimestamp(mi));
}
assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build()));
}
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
index a761bdd..9be564a 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
@@ -208,8 +208,11 @@ public class MetricsContainerStepMapTest {
List<MonitoringInfo> expected = new ArrayList<MonitoringInfo>();
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
- builder.setUrnForUserMetric("ns", "name1");
- builder.setPTransformLabel(STEP1);
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+ .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+ .setLabel(MonitoringInfoConstants.Labels.NAME, "name1");
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, STEP1);
builder.setInt64Value(7);
expected.add(builder.build());
@@ -218,7 +221,7 @@ public class MetricsContainerStepMapTest {
ArrayList<MonitoringInfo> actual = new ArrayList<MonitoringInfo>();
for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
- actual.add(SimpleMonitoringInfoBuilder.clearTimestamp(mi));
+ actual.add(SimpleMonitoringInfoBuilder.copyAndClearTimestamp(mi));
}
assertThat(actual, containsInAnyOrder(expected.toArray()));
}
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricNameTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricNameTest.java
index fa1b989..33ba9cd 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricNameTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricNameTest.java
@@ -36,8 +36,6 @@ public class MonitoringInfoMetricNameTest implements
Serializable {
HashMap<String, String> labels = new HashMap<String, String>();
String urn = MonitoringInfoConstants.Urns.ELEMENT_COUNT;
MonitoringInfoMetricName name = MonitoringInfoMetricName.named(urn,
labels);
- assertEquals(null, name.getName());
- assertEquals(null, name.getNamespace());
assertEquals(labels, name.getLabels());
assertEquals(urn, name.getUrn());
@@ -53,24 +51,19 @@ public class MonitoringInfoMetricNameTest implements
Serializable {
}
@Test
- public void testUserCounterUrnConstruction() {
- String urn = SimpleMonitoringInfoBuilder.userMetricUrn("namespace",
"name");
+ public void testGetNameReturnsNameIfLabelIsPresent() {
HashMap<String, String> labels = new HashMap<String, String>();
- MonitoringInfoMetricName name = MonitoringInfoMetricName.named(urn,
labels);
- assertEquals("name", name.getName());
- assertEquals("namespace", name.getNamespace());
- assertEquals(labels, name.getLabels());
- assertEquals(urn, name.getUrn());
-
- assertEquals(name, name); // test self equals;
-
- // Reconstruct and test equality and hash code equivalence
- urn = SimpleMonitoringInfoBuilder.userMetricUrn("namespace", "name");
- labels = new HashMap<String, String>();
- MonitoringInfoMetricName name2 = MonitoringInfoMetricName.named(urn,
labels);
+ labels.put(MonitoringInfoConstants.Labels.NAME, "anyName");
+ MonitoringInfoMetricName name = MonitoringInfoMetricName.named("anyUrn",
labels);
+ assertEquals("anyName", name.getName());
+ }
- assertEquals(name, name2);
- assertEquals(name.hashCode(), name2.hashCode());
+ @Test
+ public void testGetNamespaceReturnsNamespaceIfLabelIsPresent() {
+ HashMap<String, String> labels = new HashMap<String, String>();
+ labels.put(MonitoringInfoConstants.Labels.NAMESPACE, "anyNamespace");
+ MonitoringInfoMetricName name = MonitoringInfoMetricName.named("anyUrn",
labels);
+ assertEquals("anyNamespace", name.getNamespace());
}
@Test
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoTestUtil.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoTestUtil.java
index 3b33d42..832c07d 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoTestUtil.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoTestUtil.java
@@ -37,7 +37,7 @@ public class MonitoringInfoTestUtil {
public static MonitoringInfo testElementCountMonitoringInfo(long value) {
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
- builder.setPCollectionLabel("testPCollection");
+ builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION,
"testPCollection");
builder.setInt64Value(value);
return builder.build();
}
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
index 34f5fb4..ee42da4 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
@@ -38,48 +38,26 @@ public class SimpleMonitoringInfoBuilderTest {
builder.setInt64Value(1);
assertNull(builder.build());
-
- builder.setPCollectionLabel("myPcollection");
- // Pass now that the spec is fully met.
- MonitoringInfo monitoringInfo = builder.build();
- assertTrue(monitoringInfo != null);
- assertEquals(
- "myPcollection",
-
monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PCOLLECTION,
null));
- assertEquals(MonitoringInfoConstants.Urns.ELEMENT_COUNT,
monitoringInfo.getUrn());
- assertEquals(MonitoringInfoConstants.TypeUrns.SUM_INT64,
monitoringInfo.getType());
- assertEquals(1,
monitoringInfo.getMetric().getCounterData().getInt64Value());
}
@Test
- public void testUserCounter() {
+ public void testReturnsExpectedMonitoringInfo() {
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
- builder.setUrnForUserMetric("myNamespace", "myName");
- assertNull(builder.build());
-
+ builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
builder.setInt64Value(1);
- // Pass now that the spec is fully met.
- MonitoringInfo monitoringInfo = builder.build();
- assertTrue(monitoringInfo != null);
- assertEquals(
- MonitoringInfoConstants.Urns.USER_COUNTER_PREFIX +
"myNamespace:myName",
- monitoringInfo.getUrn());
- assertEquals(MonitoringInfoConstants.TypeUrns.SUM_INT64,
monitoringInfo.getType());
- assertEquals(1,
monitoringInfo.getMetric().getCounterData().getInt64Value());
- }
+ builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION,
"myPcollection");
- @Test
- public void testUserMetricWithInvalidDelimiterCharacterIsReplaced() {
- SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
- builder.setUrnForUserMetric("myNamespace:withInvalidChar", "myName");
- builder.setInt64Value(1);
// Pass now that the spec is fully met.
MonitoringInfo monitoringInfo = builder.build();
assertTrue(monitoringInfo != null);
assertEquals(
- MonitoringInfoConstants.Urns.USER_COUNTER_PREFIX +
"myNamespace_withInvalidChar:myName",
- monitoringInfo.getUrn());
+ "myPcollection",
+
monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PCOLLECTION,
null));
+ assertEquals(MonitoringInfoConstants.Urns.ELEMENT_COUNT,
monitoringInfo.getUrn());
assertEquals(MonitoringInfoConstants.TypeUrns.SUM_INT64,
monitoringInfo.getType());
assertEquals(1,
monitoringInfo.getMetric().getCounterData().getInt64Value());
+ assertEquals(
+ "myPcollection",
+
monitoringInfo.getLabelsMap().get(MonitoringInfoConstants.Labels.PCOLLECTION));
}
}
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleStateRegistryTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleStateRegistryTest.java
index 2caba26..205fde0 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleStateRegistryTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleStateRegistryTest.java
@@ -61,20 +61,20 @@ public class SimpleStateRegistryTest {
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(MonitoringInfoConstants.Urns.START_BUNDLE_MSECS);
builder.setInt64Value(0);
- builder.setPTransformLabel(testPTransformId);
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
testPTransformId);
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
// Check for execution time metrics for the testPTransformId
builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(MonitoringInfoConstants.Urns.PROCESS_BUNDLE_MSECS);
builder.setInt64Value(0);
- builder.setPTransformLabel(testPTransformId);
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
testPTransformId);
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(MonitoringInfoConstants.Urns.FINISH_BUNDLE_MSECS);
builder.setInt64Value(0);
- builder.setPTransformLabel(testPTransformId);
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
testPTransformId);
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
for (Matcher<MonitoringInfo> matcher : matchers) {
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
index b3abbc0..a993545 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
@@ -50,9 +50,11 @@ public class SpecMonitoringInfoValidatorTest {
public void validateReturnsNoErrorOnValidMonitoringInfo() {
MonitoringInfo testInput =
MonitoringInfo.newBuilder()
- .setUrn(Urns.USER_COUNTER_PREFIX + "someCounter")
+ .setUrn(Urns.USER_COUNTER)
+ .putLabels(MonitoringInfoConstants.Labels.NAME, "anyCounter")
+ .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "")
+ .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyString")
.setType(TypeUrns.SUM_INT64)
- .putLabels("dummy", "value")
.build();
assertFalse(testObject.validate(testInput).isPresent());
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
index a73d897..237d176b 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.flink.metrics;
-import static org.apache.beam.model.pipeline.v1.MetricsApi.labelProps;
import static
org.apache.beam.runners.flink.metrics.FlinkMetricContainer.getFlinkMetricNameString;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertNotNull;
@@ -35,13 +34,13 @@ import
org.apache.beam.model.pipeline.v1.MetricsApi.DoubleDistributionData;
import org.apache.beam.model.pipeline.v1.MetricsApi.IntDistributionData;
import org.apache.beam.model.pipeline.v1.MetricsApi.Metric;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
-import
org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo.MonitoringInfoLabels;
import org.apache.beam.runners.core.metrics.CounterCell;
import org.apache.beam.runners.core.metrics.DistributionCell;
import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
-import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Urns;
+import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import
org.apache.beam.runners.flink.metrics.FlinkMetricContainer.FlinkDistributionGauge;
import org.apache.beam.sdk.metrics.Counter;
@@ -68,13 +67,6 @@ public class FlinkMetricContainerTest {
@Mock private RuntimeContext runtimeContext;
@Mock private MetricGroup metricGroup;
- static final String PTRANSFORM_LABEL =
- MonitoringInfoLabels.forNumber(MonitoringInfoLabels.TRANSFORM_VALUE)
- .getValueDescriptor()
- .getOptions()
- .getExtension(labelProps)
- .getName();
-
@Before
public void beforeTest() {
MockitoAnnotations.initMocks(this);
@@ -139,18 +131,24 @@ public class FlinkMetricContainerTest {
SimpleCounter elemCounter = new SimpleCounter();
when(metricGroup.counter("beam.metric:element_count:v1")).thenReturn(elemCounter);
- SimpleMonitoringInfoBuilder userCountBuilder = new
SimpleMonitoringInfoBuilder();
- userCountBuilder.setUrnForUserMetric("ns1", "metric1");
- userCountBuilder.setInt64Value(111);
- MonitoringInfo userCountMonitoringInfo = userCountBuilder.build();
+ MonitoringInfo userCountMonitoringInfo =
+ new SimpleMonitoringInfoBuilder()
+ .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+ .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns1")
+ .setLabel(MonitoringInfoConstants.Labels.NAME, "metric1")
+ .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
"anyPTransform")
+ .setInt64Value(111)
+ .build();
assertNotNull(userCountMonitoringInfo);
- SimpleMonitoringInfoBuilder elemCountBuilder = new
SimpleMonitoringInfoBuilder();
- elemCountBuilder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
- elemCountBuilder.setInt64Value(222);
- elemCountBuilder.setPTransformLabel("step");
- elemCountBuilder.setPCollectionLabel("pcoll");
- MonitoringInfo elemCountMonitoringInfo = elemCountBuilder.build();
+ MonitoringInfo elemCountMonitoringInfo =
+ new SimpleMonitoringInfoBuilder()
+ .setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT)
+ .setInt64Value(222)
+ .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step")
+ .setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, "pcoll")
+ .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
"anyPTransform")
+ .build();
assertNotNull(elemCountMonitoringInfo);
assertThat(userCounter.getCount(), is(0L));
@@ -164,28 +162,34 @@ public class FlinkMetricContainerTest {
@Test
public void testDropUnexpectedMonitoringInfoTypes() {
FlinkMetricContainer flinkContainer = new
FlinkMetricContainer(runtimeContext);
- MetricsContainer step = flinkContainer.getMetricsContainer("step");
+ MetricsContainerImpl step = flinkContainer.getMetricsContainer("step");
MonitoringInfo intCounter =
MonitoringInfo.newBuilder()
- .setUrn(Urns.USER_COUNTER_PREFIX + "ns1:int_counter")
- .putLabels(PTRANSFORM_LABEL, "step")
+ .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+ .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "ns1")
+ .putLabels(MonitoringInfoConstants.Labels.NAME, "int_counter")
+ .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "step")
.setMetric(
Metric.newBuilder().setCounterData(CounterData.newBuilder().setInt64Value(111)))
.build();
MonitoringInfo doubleCounter =
MonitoringInfo.newBuilder()
- .setUrn(Urns.USER_COUNTER_PREFIX + "ns2:double_counter")
- .putLabels(PTRANSFORM_LABEL, "step")
+ .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+ .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "ns2")
+ .putLabels(MonitoringInfoConstants.Labels.NAME, "double_counter")
+ .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "step")
.setMetric(
Metric.newBuilder().setCounterData(CounterData.newBuilder().setDoubleValue(222)))
.build();
MonitoringInfo intDistribution =
MonitoringInfo.newBuilder()
- .setUrn(Urns.USER_COUNTER_PREFIX + "ns3:int_distribution")
- .putLabels(PTRANSFORM_LABEL, "step")
+ .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+ .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "ns3")
+ .putLabels(MonitoringInfoConstants.Labels.NAME, "int_distribution")
+ .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "step")
.setMetric(
Metric.newBuilder()
.setDistributionData(
@@ -200,8 +204,10 @@ public class FlinkMetricContainerTest {
MonitoringInfo doubleDistribution =
MonitoringInfo.newBuilder()
- .setUrn(Urns.USER_COUNTER_PREFIX + "ns4:double_distribution")
- .putLabels(PTRANSFORM_LABEL, "step")
+ .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+ .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "ns4")
+ .putLabels(MonitoringInfoConstants.Labels.NAME,
"double_distribution")
+ .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "step")
.setMetric(
Metric.newBuilder()
.setDistributionData(
@@ -231,7 +237,7 @@ public class FlinkMetricContainerTest {
// Verify the counter in the java SDK MetricsContainer
long count =
- ((CounterCell) step.getCounter(MetricName.named("ns1",
"int_counter"))).getCumulative();
+ ((CounterCell)
step.tryGetCounter(MonitoringInfoMetricName.of(intCounter))).getCumulative();
assertThat(count, is(111L));
// The one Flink distribution that gets created is a
FlinkDistributionGauge; here we verify its
@@ -251,7 +257,7 @@ public class FlinkMetricContainerTest {
// Verify that the Java SDK MetricsContainer holds the same information
DistributionData distributionData =
- ((DistributionCell) step.getDistribution(MetricName.named("ns3",
"int_distribution")))
+ ((DistributionCell)
step.getDistribution(MonitoringInfoMetricName.of(intDistribution)))
.getCumulative();
assertThat(distributionData, is(DistributionData.create(30, 10, 1, 5)));
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserDistributionMonitoringInfoToCounterUpdateTransformer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserDistributionMonitoringInfoToCounterUpdateTransformer.java
index ab0c29f..6fa5530 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserDistributionMonitoringInfoToCounterUpdateTransformer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserDistributionMonitoringInfoToCounterUpdateTransformer.java
@@ -55,8 +55,8 @@ class UserDistributionMonitoringInfoToCounterUpdateTransformer
this.specValidator = specMonitoringInfoValidator;
}
- static final String BEAM_METRICS_USER_DISTRIBUTION_PREFIX =
- MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER_PREFIX;
+ static final String BEAM_METRICS_USER_DISTRIBUTION_URN =
+ MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER;
private Optional<String> validate(MonitoringInfo monitoringInfo) {
Optional<String> validatorResult = specValidator.validate(monitoringInfo);
@@ -65,11 +65,11 @@ class
UserDistributionMonitoringInfoToCounterUpdateTransformer
}
String urn = monitoringInfo.getUrn();
- if (!urn.startsWith(BEAM_METRICS_USER_DISTRIBUTION_PREFIX)) {
+ if (!urn.equals(BEAM_METRICS_USER_DISTRIBUTION_URN)) {
throw new RuntimeException(
String.format(
- "Received unexpected counter urn. Expected urn starting with:
%s, received: %s",
- BEAM_METRICS_USER_DISTRIBUTION_PREFIX, urn));
+ "Received unexpected counter urn. Expected urn: %s, received:
%s",
+ BEAM_METRICS_USER_DISTRIBUTION_URN, urn));
}
final String ptransform =
@@ -99,22 +99,13 @@ class
UserDistributionMonitoringInfoToCounterUpdateTransformer
IntDistributionData value =
monitoringInfo.getMetric().getDistributionData().getIntDistributionData();
- String urn = monitoringInfo.getUrn();
- final String ptransform =
-
monitoringInfo.getLabelsMap().get(MonitoringInfoConstants.Labels.PTRANSFORM);
+ Map<String, String> miLabels = monitoringInfo.getLabelsMap();
+ final String ptransform =
miLabels.get(MonitoringInfoConstants.Labels.PTRANSFORM);
+ final String counterName =
miLabels.get(MonitoringInfoConstants.Labels.NAME);
+ final String counterNamespace =
miLabels.get(MonitoringInfoConstants.Labels.NAMESPACE);
CounterStructuredNameAndMetadata name = new
CounterStructuredNameAndMetadata();
-
- String nameWithNamespace =
-
urn.substring(BEAM_METRICS_USER_DISTRIBUTION_PREFIX.length()).replace("^:", "");
-
- // TODO(BEAM-6925) Extract common logic to separate class.
- final int lastColonIndex = nameWithNamespace.lastIndexOf(':');
- String counterName = nameWithNamespace.substring(lastColonIndex + 1);
- String counterNamespace =
- lastColonIndex == -1 ? "" : nameWithNamespace.substring(0,
lastColonIndex);
-
DataflowStepContext stepContext = transformIdMapping.get(ptransform);
name.setName(
new CounterStructuredName()
@@ -137,6 +128,6 @@ class
UserDistributionMonitoringInfoToCounterUpdateTransformer
/** @return MonitoringInfo urns prefix that this transformer can convert to
CounterUpdates. */
public String getSupportedUrnPrefix() {
- return BEAM_METRICS_USER_DISTRIBUTION_PREFIX;
+ return BEAM_METRICS_USER_DISTRIBUTION_URN;
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java
index 0dfaee0..0d3ed42 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java
@@ -53,7 +53,7 @@ class UserMonitoringInfoToCounterUpdateTransformer
this.specValidator = specMonitoringInfoValidator;
}
- static final String BEAM_METRICS_USER_PREFIX =
MonitoringInfoConstants.Urns.USER_COUNTER_PREFIX;
+ static final String BEAM_METRICS_USER_URN =
MonitoringInfoConstants.Urns.USER_COUNTER;
private Optional<String> validate(MonitoringInfo monitoringInfo) {
Optional<String> validatorResult = specValidator.validate(monitoringInfo);
@@ -62,11 +62,11 @@ class UserMonitoringInfoToCounterUpdateTransformer
}
String urn = monitoringInfo.getUrn();
- if (!urn.startsWith(BEAM_METRICS_USER_PREFIX)) {
+ if (!urn.equals(BEAM_METRICS_USER_URN)) {
throw new RuntimeException(
String.format(
- "Received unexpected counter urn. Expected urn starting with:
%s, received: %s",
- BEAM_METRICS_USER_PREFIX, urn));
+ "Received unexpected counter urn. Expected urn: %s, received:
%s",
+ BEAM_METRICS_USER_URN, urn));
}
final String ptransform =
@@ -95,20 +95,13 @@ class UserMonitoringInfoToCounterUpdateTransformer
}
long value = monitoringInfo.getMetric().getCounterData().getInt64Value();
- String urn = monitoringInfo.getUrn();
- final String ptransform =
-
monitoringInfo.getLabelsMap().get(MonitoringInfoConstants.Labels.PTRANSFORM);
+ Map<String, String> miLabels = monitoringInfo.getLabelsMap();
+ final String ptransform =
miLabels.get(MonitoringInfoConstants.Labels.PTRANSFORM);
+ final String counterName =
miLabels.get(MonitoringInfoConstants.Labels.NAME);
+ final String counterNamespace =
miLabels.get(MonitoringInfoConstants.Labels.NAMESPACE);
CounterStructuredNameAndMetadata name = new
CounterStructuredNameAndMetadata();
-
- String nameWithNamespace =
urn.substring(BEAM_METRICS_USER_PREFIX.length()).replace("^:", "");
-
- final int lastColonIndex = nameWithNamespace.lastIndexOf(':');
- String counterName = nameWithNamespace.substring(lastColonIndex + 1);
- String counterNamespace =
- lastColonIndex == -1 ? "" : nameWithNamespace.substring(0,
lastColonIndex);
-
DataflowStepContext stepContext = transformIdMapping.get(ptransform);
name.setName(
new CounterStructuredName()
@@ -126,6 +119,6 @@ class UserMonitoringInfoToCounterUpdateTransformer
/** @return MonitoringInfo urns prefix that this transformer can convert to
CounterUpdates. */
public String getSupportedUrnPrefix() {
- return BEAM_METRICS_USER_PREFIX;
+ return BEAM_METRICS_USER_URN;
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
index 248b9e6..6263610 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
@@ -462,8 +462,10 @@ public class BeamFnMapTaskExecutorTest {
final int expectedCounterValue = 5;
final MonitoringInfo expectedMonitoringInfo =
MonitoringInfo.newBuilder()
- .setUrn("beam:metric:user:ExpectedCounter")
- .setType("beam:metrics:sum_int_64")
+ .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+ .putLabels(MonitoringInfoConstants.Labels.NAME, "ExpectedCounter")
+ .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "anyString")
+ .setType(MonitoringInfoConstants.TypeUrns.SUM_INT64)
.putLabels(MonitoringInfoConstants.Labels.PTRANSFORM,
"ExpectedPTransform")
.setMetric(
Metric.newBuilder()
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserDistributionMonitoringInfoToCounterUpdateTransformerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserDistributionMonitoringInfoToCounterUpdateTransformerTest.java
index 203ae8b..1d9c9f5 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserDistributionMonitoringInfoToCounterUpdateTransformerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserDistributionMonitoringInfoToCounterUpdateTransformerTest.java
@@ -80,7 +80,9 @@ public class
UserDistributionMonitoringInfoToCounterUpdateTransformerTest {
Map<String, DataflowStepContext> stepContextMapping = new HashMap<>();
MonitoringInfo monitoringInfo =
MonitoringInfo.newBuilder()
- .setUrn("beam:metric:user_distribution:anyNamespace:anyName")
+ .setUrn("beam:metric:user_distribution")
+ .putLabels(MonitoringInfoConstants.Labels.NAME, "anyName")
+ .putLabels(MonitoringInfoConstants.Labels.NAMESPACE,
"anyNamespace")
.putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyValue")
.build();
UserDistributionMonitoringInfoToCounterUpdateTransformer testObject =
@@ -101,7 +103,9 @@ public class
UserDistributionMonitoringInfoToCounterUpdateTransformerTest {
MonitoringInfo monitoringInfo =
MonitoringInfo.newBuilder()
- .setUrn("beam:metric:user_distribution:anyNamespace:anyName")
+ .setUrn("beam:metric:user_distribution")
+ .putLabels(MonitoringInfoConstants.Labels.NAME, "anyName")
+ .putLabels(MonitoringInfoConstants.Labels.NAMESPACE,
"anyNamespace")
.putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyValue")
.build();
UserDistributionMonitoringInfoToCounterUpdateTransformer testObject =
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java
index 6d65e1b..e2992f5 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java
@@ -78,7 +78,9 @@ public class UserMonitoringInfoToCounterUpdateTransformerTest
{
Map<String, DataflowStepContext> stepContextMapping = new HashMap<>();
MonitoringInfo monitoringInfo =
MonitoringInfo.newBuilder()
- .setUrn("beam:metric:user:anyNamespace:anyName")
+ .setUrn("beam:metric:user")
+ .putLabels(MonitoringInfoConstants.Labels.NAME, "anyName")
+ .putLabels(MonitoringInfoConstants.Labels.NAMESPACE,
"anyNamespace")
.putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyValue")
.build();
UserMonitoringInfoToCounterUpdateTransformer testObject =
@@ -98,7 +100,9 @@ public class
UserMonitoringInfoToCounterUpdateTransformerTest {
MonitoringInfo monitoringInfo =
MonitoringInfo.newBuilder()
- .setUrn("beam:metric:user:anyNamespace:anyName")
+ .setUrn("beam:metric:user")
+ .putLabels(MonitoringInfoConstants.Labels.NAME, "anyName")
+ .putLabels(MonitoringInfoConstants.Labels.NAMESPACE,
"anyNamespace")
.putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyValue")
.build();
UserMonitoringInfoToCounterUpdateTransformer testObject =
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 4839fef..108f88a 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -637,21 +637,35 @@ public class RemoteExecutionTest implements Serializable {
List<Matcher<MonitoringInfo>> matchers = new
ArrayList<Matcher<MonitoringInfo>>();
SimpleMonitoringInfoBuilder builder = new
SimpleMonitoringInfoBuilder();
- builder.setUrnForUserMetric(
- RemoteExecutionTest.class.getName(), processUserCounterName);
- builder.setPTransformLabel("create/ParMultiDo(Anonymous)");
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE,
RemoteExecutionTest.class.getName())
+ .setLabel(MonitoringInfoConstants.Labels.NAME,
processUserCounterName);
+ builder.setLabel(
+ MonitoringInfoConstants.Labels.PTRANSFORM,
"create/ParMultiDo(Anonymous)");
builder.setInt64Value(1);
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
builder = new SimpleMonitoringInfoBuilder();
- builder.setUrnForUserMetric(RemoteExecutionTest.class.getName(),
startUserCounterName);
- builder.setPTransformLabel("create/ParMultiDo(Anonymous)");
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE,
RemoteExecutionTest.class.getName())
+ .setLabel(MonitoringInfoConstants.Labels.NAME,
startUserCounterName);
+ builder.setLabel(
+ MonitoringInfoConstants.Labels.PTRANSFORM,
"create/ParMultiDo(Anonymous)");
builder.setInt64Value(10);
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
builder = new SimpleMonitoringInfoBuilder();
- builder.setUrnForUserMetric(RemoteExecutionTest.class.getName(),
finishUserCounterName);
- builder.setPTransformLabel("create/ParMultiDo(Anonymous)");
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE,
RemoteExecutionTest.class.getName())
+ .setLabel(MonitoringInfoConstants.Labels.NAME,
finishUserCounterName);
+ builder.setLabel(
+ MonitoringInfoConstants.Labels.PTRANSFORM,
"create/ParMultiDo(Anonymous)");
builder.setInt64Value(100);
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
@@ -659,26 +673,31 @@ public class RemoteExecutionTest implements Serializable {
// So there should be only two elements.
builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
- builder.setPCollectionLabel("impulse.out");
+ builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION,
"impulse.out");
builder.setInt64Value(2);
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
- builder.setPCollectionLabel("create/ParMultiDo(Anonymous).output");
+ builder.setLabel(
+ MonitoringInfoConstants.Labels.PCOLLECTION,
"create/ParMultiDo(Anonymous).output");
builder.setInt64Value(3);
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
// Verify that the element count is not double counted if two
PCollections consume it.
builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
-
builder.setPCollectionLabel("processA/ParMultiDo(Anonymous).output");
+ builder.setLabel(
+ MonitoringInfoConstants.Labels.PCOLLECTION,
+ "processA/ParMultiDo(Anonymous).output");
builder.setInt64Value(6);
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
-
builder.setPCollectionLabel("processB/ParMultiDo(Anonymous).output");
+ builder.setLabel(
+ MonitoringInfoConstants.Labels.PCOLLECTION,
+ "processB/ParMultiDo(Anonymous).output");
builder.setInt64Value(6);
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
@@ -686,7 +705,7 @@ public class RemoteExecutionTest implements Serializable {
builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(MonitoringInfoConstants.Urns.START_BUNDLE_MSECS);
builder.setInt64TypeUrn();
- builder.setPTransformLabel(testPTransformId);
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
testPTransformId);
matchers.add(
allOf(
MonitoringInfoMatchers.matchSetFields(builder.build()),
@@ -696,7 +715,7 @@ public class RemoteExecutionTest implements Serializable {
builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(Urns.PROCESS_BUNDLE_MSECS);
builder.setInt64TypeUrn();
- builder.setPTransformLabel(testPTransformId);
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
testPTransformId);
matchers.add(
allOf(
MonitoringInfoMatchers.matchSetFields(builder.build()),
@@ -705,7 +724,7 @@ public class RemoteExecutionTest implements Serializable {
builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(Urns.FINISH_BUNDLE_MSECS);
builder.setInt64TypeUrn();
- builder.setPTransformLabel(testPTransformId);
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
testPTransformId);
matchers.add(
allOf(
MonitoringInfoMatchers.matchSetFields(builder.build()),
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 2e8b33c..db4751f 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -665,29 +665,35 @@ public class FnApiDoFnRunnerTest implements Serializable {
List<MonitoringInfo> expected = new ArrayList<MonitoringInfo>();
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
- builder.setPCollectionLabel("Window.Into()/Window.Assign.out");
+ builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION,
"Window.Into()/Window.Assign.out");
builder.setInt64Value(2);
expected.add(builder.build());
builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
- builder.setPCollectionLabel(
+ builder.setLabel(
+ MonitoringInfoConstants.Labels.PCOLLECTION,
"pTransformId/ParMultiDo(TestSideInputIsAccessibleForDownstreamCallers).output");
builder.setInt64Value(2);
expected.add(builder.build());
builder = new SimpleMonitoringInfoBuilder();
- builder.setUrnForUserMetric(
- TestSideInputIsAccessibleForDownstreamCallersDoFn.class.getName(),
- TestSideInputIsAccessibleForDownstreamCallersDoFn.USER_COUNTER_NAME);
- builder.setPTransformLabel(TEST_PTRANSFORM_ID);
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE,
+ TestSideInputIsAccessibleForDownstreamCallersDoFn.class.getName())
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAME,
+
TestSideInputIsAccessibleForDownstreamCallersDoFn.USER_COUNTER_NAME);
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
TEST_PTRANSFORM_ID);
builder.setInt64Value(2);
expected.add(builder.build());
closeable.close();
List<MonitoringInfo> result = new ArrayList<MonitoringInfo>();
for (MonitoringInfo mi : metricsContainerRegistry.getMonitoringInfos()) {
- result.add(SimpleMonitoringInfoBuilder.clearTimestamp(mi));
+ result.add(SimpleMonitoringInfoBuilder.copyAndClearTimestamp(mi));
}
assertThat(result, containsInAnyOrder(expected.toArray()));
}
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/ElementCountFnDataReceiverTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/ElementCountFnDataReceiverTest.java
index b6768e1..316caa7 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/ElementCountFnDataReceiverTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/ElementCountFnDataReceiverTest.java
@@ -65,13 +65,13 @@ public class ElementCountFnDataReceiverTest {
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
- builder.setPCollectionLabel(pCollectionA);
+ builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, pCollectionA);
builder.setInt64Value(numElements);
MonitoringInfo expected = builder.build();
// Clear the timestamp before comparison.
MonitoringInfo first =
metricsContainerRegistry.getMonitoringInfos().iterator().next();
- MonitoringInfo result = SimpleMonitoringInfoBuilder.clearTimestamp(first);
+ MonitoringInfo result =
SimpleMonitoringInfoBuilder.copyAndClearTimestamp(first);
assertEquals(expected, result);
}
diff --git a/sdks/python/apache_beam/metrics/execution.py
b/sdks/python/apache_beam/metrics/execution.py
index 12ecf20..420f7ff 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -40,8 +40,6 @@ from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.cells import CounterCell
from apache_beam.metrics.cells import DistributionCell
from apache_beam.metrics.cells import GaugeCell
-from apache_beam.metrics.monitoring_infos import user_distribution_metric_urn
-from apache_beam.metrics.monitoring_infos import user_metric_urn
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.runners.worker import statesampler
@@ -238,22 +236,22 @@ class MetricsContainer(object):
"""Returns a list of MonitoringInfos for the metrics in this container."""
all_user_metrics = []
for k, v in self.counters.items():
- all_user_metrics.append(monitoring_infos.int64_counter(
- user_metric_urn(k.namespace, k.name),
+ all_user_metrics.append(monitoring_infos.int64_user_counter(
+ k.namespace, k.name,
v.to_runner_api_monitoring_info(),
ptransform=transform_id
))
for k, v in self.distributions.items():
- all_user_metrics.append(monitoring_infos.int64_distribution(
- user_distribution_metric_urn(k.namespace, k.name),
+ all_user_metrics.append(monitoring_infos.int64_user_distribution(
+ k.namespace, k.name,
v.get_cumulative().to_runner_api_monitoring_info(),
ptransform=transform_id
))
for k, v in self.gauges.items():
- all_user_metrics.append(monitoring_infos.int64_gauge(
- user_metric_urn(k.namespace, k.name),
+ all_user_metrics.append(monitoring_infos.int64_user_gauge(
+ k.namespace, k.name,
v.get_cumulative().to_runner_api_monitoring_info(),
ptransform=transform_id
))
diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py
b/sdks/python/apache_beam/metrics/monitoring_infos.py
index aa4615d..76f3b45 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos.py
@@ -43,9 +43,9 @@ PROCESS_BUNDLE_MSECS_URN = (
FINISH_BUNDLE_MSECS_URN = (
common_urns.monitoring_info_specs.FINISH_BUNDLE_MSECS.spec.urn)
TOTAL_MSECS_URN = common_urns.monitoring_info_specs.TOTAL_MSECS.spec.urn
-USER_COUNTER_URN_PREFIX = (
+USER_COUNTER_URN = (
common_urns.monitoring_info_specs.USER_COUNTER.spec.urn)
-USER_DISTRIBUTION_COUNTER_URN_PREFIX = (
+USER_DISTRIBUTION_COUNTER_URN = (
common_urns.monitoring_info_specs.USER_DISTRIBUTION_COUNTER.spec.urn)
# TODO(ajamato): Implement the remaining types, i.e. Double types
@@ -61,9 +61,14 @@ DISTRIBUTION_TYPES = set([DISTRIBUTION_INT64_TYPE])
GAUGE_TYPES = set([LATEST_INT64_TYPE])
# TODO(migryz) extract values from beam_fn_api.proto::MonitoringInfoLabels
-PCOLLECTION_LABEL = 'PCOLLECTION'
-PTRANSFORM_LABEL = 'PTRANSFORM'
-TAG_LABEL = 'TAG'
+PCOLLECTION_LABEL = (
+ common_urns.monitoring_info_labels.PCOLLECTION.label_props.name)
+PTRANSFORM_LABEL = (
+ common_urns.monitoring_info_labels.TRANSFORM.label_props.name)
+NAMESPACE_LABEL = (
+ common_urns.monitoring_info_labels.NAMESPACE.label_props.name)
+NAME_LABEL = (common_urns.monitoring_info_labels.NAME.label_props.name)
+TAG_LABEL = "TAG"
def to_timestamp_proto(timestamp_secs):
@@ -104,7 +109,7 @@ def extract_distribution(monitoring_info_proto):
return None
-def create_labels(ptransform='', tag=''):
+def create_labels(ptransform=None, tag=None, namespace=None, name=None):
"""Create the label dictionary based on the provided tags.
Args:
@@ -116,10 +121,36 @@ def create_labels(ptransform='', tag=''):
labels[TAG_LABEL] = tag
if ptransform:
labels[PTRANSFORM_LABEL] = ptransform
+ if namespace:
+ labels[NAMESPACE_LABEL] = namespace
+ if name:
+ labels[NAME_LABEL] = name
return labels
-def int64_counter(urn, metric, ptransform='', tag=''):
+def int64_user_counter(namespace, name, metric, ptransform=None, tag=None):
+ """Return the counter monitoring info for the specifed URN, metric and
labels.
+
+ Args:
+ urn: The URN of the monitoring info/metric.
+ metric: The metric proto field to use in the monitoring info.
+ Or an int value.
+ ptransform: The ptransform/step name used as a label.
+ tag: The output tag name, used as a label.
+ """
+ labels = create_labels(ptransform=ptransform, tag=tag, namespace=namespace,
+ name=name)
+ if isinstance(metric, int):
+ metric = Metric(
+ counter_data=CounterData(
+ int64_value=metric
+ )
+ )
+ return create_monitoring_info(USER_COUNTER_URN, SUM_INT64_TYPE, metric,
+ labels)
+
+
+def int64_counter(urn, metric, ptransform=None, tag=None):
"""Return the counter monitoring info for the specifed URN, metric and
labels.
Args:
@@ -139,7 +170,7 @@ def int64_counter(urn, metric, ptransform='', tag=''):
return create_monitoring_info(urn, SUM_INT64_TYPE, metric, labels)
-def int64_distribution(urn, metric, ptransform='', tag=''):
+def int64_user_distribution(namespace, name, metric, ptransform=None,
tag=None):
"""Return the distribution monitoring info for the URN, metric and labels.
Args:
@@ -149,22 +180,27 @@ def int64_distribution(urn, metric, ptransform='',
tag=''):
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
"""
- labels = create_labels(ptransform=ptransform, tag=tag)
- return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, metric, labels)
+ labels = create_labels(ptransform=ptransform, tag=tag, namespace=namespace,
+ name=name)
+ return create_monitoring_info(USER_DISTRIBUTION_COUNTER_URN,
+ DISTRIBUTION_INT64_TYPE, metric, labels)
-def int64_gauge(urn, metric, ptransform='', tag=''):
+def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
"""Return the gauge monitoring info for the URN, metric and labels.
Args:
- urn: The URN of the monitoring info/metric.
+ namespace: User-defined namespace of counter.
+ name: Name of counter.
metric: The metric proto field to use in the monitoring info.
Or an int value.
ptransform: The ptransform/step name used as a label.
tag: The output tag name, used as a label.
"""
- labels = create_labels(ptransform=ptransform, tag=tag)
- return create_monitoring_info(urn, LATEST_INT64_TYPE, metric, labels)
+ labels = create_labels(ptransform=ptransform, tag=tag, namespace=namespace,
+ name=name)
+ return create_monitoring_info(USER_COUNTER_URN, LATEST_INT64_TYPE, metric,
+ labels)
def create_monitoring_info(urn, type_urn, metric_proto, labels=None):
@@ -187,27 +223,6 @@ def create_monitoring_info(urn, type_urn, metric_proto,
labels=None):
)
-def user_metric_urn(namespace, name):
- """Returns the metric URN for a user metric, with a proper URN prefix.
-
- Args:
- namespace: The namespace of the metric.
- name: The name of the metric.
- """
- return '%s%s:%s' % (USER_COUNTER_URN_PREFIX, namespace, name)
-
-
-def user_distribution_metric_urn(namespace, name):
- """Returns the metric URN for a user distribution metric,
- with a proper URN prefix.
-
- Args:
- namespace: The namespace of the metric.
- name: The name of the metric.
- """
- return '%s%s:%s' % (USER_DISTRIBUTION_COUNTER_URN_PREFIX, namespace, name)
-
-
def is_counter(monitoring_info_proto):
"""Returns true if the monitoring info is a coutner metric."""
return monitoring_info_proto.type in COUNTER_TYPES
@@ -224,13 +239,11 @@ def is_gauge(monitoring_info_proto):
def _is_user_monitoring_info(monitoring_info_proto):
- return monitoring_info_proto.urn.startswith(
- USER_COUNTER_URN_PREFIX)
+ return monitoring_info_proto.urn == USER_COUNTER_URN
def _is_user_distribution_monitoring_info(monitoring_info_proto):
- return monitoring_info_proto.urn.startswith(
- USER_DISTRIBUTION_COUNTER_URN_PREFIX)
+ return monitoring_info_proto.urn == USER_DISTRIBUTION_COUNTER_URN
def is_user_monitoring_info(monitoring_info_proto):
@@ -263,19 +276,14 @@ def
extract_metric_result_map_value(monitoring_info_proto):
def parse_namespace_and_name(monitoring_info_proto):
"""Returns the (namespace, name) tuple of the URN in the monitoring info."""
- to_split = monitoring_info_proto.urn
-
# Remove the URN prefix which indicates that it is a user counter.
- prefix_len = 0
- if _is_user_distribution_monitoring_info(monitoring_info_proto):
- prefix_len = len(USER_DISTRIBUTION_COUNTER_URN_PREFIX)
- elif _is_user_monitoring_info(monitoring_info_proto):
- prefix_len = len(USER_COUNTER_URN_PREFIX)
- to_split = monitoring_info_proto.urn[prefix_len:]
+ if is_user_monitoring_info(monitoring_info_proto):
+ labels = monitoring_info_proto.labels
+ return labels[NAMESPACE_LABEL], labels[NAME_LABEL]
# If it is not a user counter, just use the first part of the URN, i.e.
'beam'
- split = to_split.split(':')
- return split[0], ':'.join(split[1:])
+ split = monitoring_info_proto.urn.split(':', 1)
+ return split[0], split[1]
def to_key(monitoring_info_proto):
diff --git a/sdks/python/apache_beam/metrics/monitoring_infos_test.py
b/sdks/python/apache_beam/metrics/monitoring_infos_test.py
index 67bca09..466969a 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos_test.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos_test.py
@@ -31,17 +31,23 @@ class MonitoringInfosTest(unittest.TestCase):
self.assertEqual(name, "dummy:metric")
def test_parse_namespace_and_name_for_user_metric(self):
- urn = (monitoring_infos.USER_COUNTER_URN_PREFIX +
- "counternamespace:countername")
- input = monitoring_infos.create_monitoring_info(urn, "typeurn", None)
+ urn = monitoring_infos.USER_COUNTER_URN
+ labels = {}
+ labels[monitoring_infos.NAMESPACE_LABEL] = "counternamespace"
+ labels[monitoring_infos.NAME_LABEL] = "countername"
+ input = monitoring_infos.create_monitoring_info(urn, "typeurn", None,
+ labels)
namespace, name = monitoring_infos.parse_namespace_and_name(input)
self.assertEqual(namespace, "counternamespace")
self.assertEqual(name, "countername")
def test_parse_namespace_and_name_for_user_distribution_metric(self):
- urn = (monitoring_infos.USER_DISTRIBUTION_COUNTER_URN_PREFIX +
- "counternamespace:countername")
- input = monitoring_infos.create_monitoring_info(urn, "typeurn", None)
+ urn = monitoring_infos.USER_DISTRIBUTION_COUNTER_URN
+ labels = {}
+ labels[monitoring_infos.NAMESPACE_LABEL] = "counternamespace"
+ labels[monitoring_infos.NAME_LABEL] = "countername"
+ input = monitoring_infos.create_monitoring_info(urn, "typeurn", None,
+ labels)
namespace, name = monitoring_infos.parse_namespace_and_name(input)
self.assertEqual(namespace, "counternamespace")
self.assertEqual(name, "countername")
diff --git a/sdks/python/apache_beam/portability/common_urns.py
b/sdks/python/apache_beam/portability/common_urns.py
index 693133b..c1164e3 100644
--- a/sdks/python/apache_beam/portability/common_urns.py
+++ b/sdks/python/apache_beam/portability/common_urns.py
@@ -34,6 +34,8 @@ class PropertiesFromEnumValue(object):
beam_runner_api_pb2.beam_constant])
self.spec = (value_descriptor.GetOptions().Extensions[
metrics_pb2.monitoring_info_spec])
+ self.label_props = (value_descriptor.GetOptions().Extensions[
+ metrics_pb2.label_props])
class PropertiesFromEnumType(object):
@@ -82,3 +84,5 @@ monitoring_info_specs = PropertiesFromEnumType(
metrics_pb2.MonitoringInfoSpecs.Enum)
monitoring_info_types = PropertiesFromEnumType(
metrics_pb2.MonitoringInfoTypeUrns.Enum)
+monitoring_info_labels = PropertiesFromEnumType(
+ metrics_pb2.MonitoringInfo.MonitoringInfoLabels)