This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ff8475  [BEAM-7088] Implement switch to using MonitoringInfo labels 
for Name and Namespace. (#8316)
5ff8475 is described below

commit 5ff8475e370c5a746ddc6e6cfd7b63004d8c40e3
Author: Mikhail Gryzykhin <[email protected]>
AuthorDate: Fri Apr 19 16:08:28 2019 -0700

    [BEAM-7088] Implement switch to using MonitoringInfo labels for Name and 
Namespace. (#8316)
    
    * Implement switch to using MonitoringInfo labels for Name and Namespace
    in Java.
    
    * Add backwards compatibility to MetricName and patch flink tests
---
 model/pipeline/src/main/proto/metrics.proto        |  16 +++-
 .../beam/runners/core/metrics/MetricUrns.java      |  41 --------
 .../runners/core/metrics/MetricsContainerImpl.java |  87 +++++++++--------
 .../core/metrics/MonitoringInfoConstants.java      |   7 +-
 .../core/metrics/MonitoringInfoMetricName.java     |  43 ++++-----
 .../core/metrics/SimpleMonitoringInfoBuilder.java  |  50 +---------
 .../core/metrics/MetricsContainerImplTest.java     |  27 +++---
 .../core/metrics/MetricsContainerStepMapTest.java  |   9 +-
 .../core/metrics/MonitoringInfoMetricNameTest.java |  29 +++---
 .../core/metrics/MonitoringInfoTestUtil.java       |   2 +-
 .../metrics/SimpleMonitoringInfoBuilderTest.java   |  40 ++------
 .../core/metrics/SimpleStateRegistryTest.java      |   6 +-
 .../metrics/SpecMonitoringInfoValidatorTest.java   |   6 +-
 .../flink/metrics/FlinkMetricContainerTest.java    |  68 +++++++------
 ...onMonitoringInfoToCounterUpdateTransformer.java |  29 ++----
 ...erMonitoringInfoToCounterUpdateTransformer.java |  25 ++---
 .../fn/control/BeamFnMapTaskExecutorTest.java      |   6 +-
 ...nitoringInfoToCounterUpdateTransformerTest.java |   8 +-
 ...nitoringInfoToCounterUpdateTransformerTest.java |   8 +-
 .../fnexecution/control/RemoteExecutionTest.java   |  47 ++++++---
 .../beam/fn/harness/FnApiDoFnRunnerTest.java       |  20 ++--
 .../data/ElementCountFnDataReceiverTest.java       |   4 +-
 sdks/python/apache_beam/metrics/execution.py       |  14 ++-
 .../python/apache_beam/metrics/monitoring_infos.py | 106 +++++++++++----------
 .../apache_beam/metrics/monitoring_infos_test.py   |  18 ++--
 sdks/python/apache_beam/portability/common_urns.py |   4 +
 26 files changed, 335 insertions(+), 385 deletions(-)

diff --git a/model/pipeline/src/main/proto/metrics.proto 
b/model/pipeline/src/main/proto/metrics.proto
index 43ec994..3b45efb 100644
--- a/model/pipeline/src/main/proto/metrics.proto
+++ b/model/pipeline/src/main/proto/metrics.proto
@@ -62,8 +62,13 @@ message MonitoringInfoSpecs {
     // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
     // upgrading the python SDK.
     USER_COUNTER = 0 [(monitoring_info_spec) = {
-      urn: "beam:metric:user:",
+      urn: "beam:metric:user",
       type_urn: "beam:metrics:sum_int_64",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user numeric counters."
+      }]
     }];
 
     ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
@@ -122,8 +127,13 @@ message MonitoringInfoSpecs {
     // TODO(BEAM-6926): Add the PTRANSFORM name as a required label after
     // upgrading the python SDK.
     USER_DISTRIBUTION_COUNTER = 6 [(monitoring_info_spec) = {
-      urn: "beam:metric:user_distribution:",
+      urn: "beam:metric:user_distribution",
       type_urn: "beam:metrics:distribution_int_64",
+      required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
+      annotations: [{
+        key: "description",
+        value: "URN utilized to report user distribution counters."
+      }]
     }];
   }
 }
@@ -171,6 +181,8 @@ message MonitoringInfo {
     WINDOWING_STRATEGY = 2 [(label_props) = { name: "WINDOWING_STRATEGY" }];
     CODER = 3 [(label_props) = { name: "CODER" }];
     ENVIRONMENT = 4 [(label_props) = { name: "ENVIRONMENT" }];
+    NAMESPACE = 5 [(label_props) = { name: "NAMESPACE" }];
+    NAME = 6 [(label_props) = { name: "NAME" }];
   }
   // A set of key+value labels which define the scope of the metric.
   // Either a well defined entity id for matching the enum names in
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java
deleted file mode 100644
index 4ed5fc4..0000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.metrics;
-
-import org.apache.beam.sdk.metrics.MetricName;
-
-/** Utility for parsing a URN to a {@link MetricName}. */
-public class MetricUrns {
-  /**
-   * Parse a {@link MetricName} from a {@link
-   * org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpecs.Enum}.
-   *
-   * <p>Should be consistent with {@code parse_namespace_and_name} in 
monitoring_infos.py.
-   */
-  public static MetricName parseUrn(String urn) {
-    if (urn.startsWith(MonitoringInfoConstants.Urns.USER_COUNTER_PREFIX)) {
-      urn = 
urn.substring(MonitoringInfoConstants.Urns.USER_COUNTER_PREFIX.length());
-    }
-    // If it is not a user counter, just use the first part of the URN, i.e. 
'beam'
-    String[] pieces = urn.split(":", 2);
-    if (pieces.length != 2) {
-      throw new IllegalArgumentException("Invalid metric URN: " + urn);
-    }
-    return MetricName.named(pieces[0], pieces[1]);
-  }
-}
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index bf5acfe..827c9bf 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.core.metrics;
 
-import static org.apache.beam.runners.core.metrics.MetricUrns.parseUrn;
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.Serializable;
@@ -26,6 +25,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
 import org.apache.beam.model.pipeline.v1.MetricsApi.CounterData;
 import org.apache.beam.model.pipeline.v1.MetricsApi.DistributionData;
 import org.apache.beam.model.pipeline.v1.MetricsApi.ExtremaData;
@@ -59,6 +59,7 @@ import org.slf4j.LoggerFactory;
  */
 @Experimental(Kind.METRICS)
 public class MetricsContainerImpl implements Serializable, MetricsContainer {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(MetricsContainerImpl.class);
 
   @Nullable private final String stepName;
@@ -150,13 +151,11 @@ public class MetricsContainerImpl implements 
Serializable, MetricsContainer {
         extractUpdates(counters), extractUpdates(distributions), 
extractUpdates(gauges));
   }
 
-  /**
-   * @param metricUpdate
-   * @return The MonitoringInfo generated from the metricUpdate.
-   */
+  /** @return The MonitoringInfo generated from the metricUpdate. */
   @Nullable
   private MonitoringInfo counterUpdateToMonitoringInfo(MetricUpdate<Long> 
metricUpdate) {
     SimpleMonitoringInfoBuilder builder = new 
SimpleMonitoringInfoBuilder(true);
+
     MetricName metricName = metricUpdate.getKey().metricName();
     if (metricName instanceof MonitoringInfoMetricName) {
       MonitoringInfoMetricName monitoringInfoName = (MonitoringInfoMetricName) 
metricName;
@@ -165,20 +164,26 @@ public class MetricsContainerImpl implements 
Serializable, MetricsContainer {
       for (Entry<String, String> e : 
monitoringInfoName.getLabels().entrySet()) {
         builder.setLabel(e.getKey(), e.getValue());
       }
-    } else { // Note: (metricName instanceof MetricName) is always True.
-      // Represents a user counter.
-      builder.setUrnForUserMetric(
-          metricUpdate.getKey().metricName().getNamespace(),
-          metricUpdate.getKey().metricName().getName());
+    } else { // Represents a user counter.
       // Drop if the stepname is not set. All user counters must be
       // defined for a PTransform. They must be defined on a container bound 
to a step.
       if (this.stepName == null) {
         return null;
       }
-      builder.setPTransformLabel(metricUpdate.getKey().stepName());
+
+      builder
+          .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+          .setLabel(
+              MonitoringInfoConstants.Labels.NAMESPACE,
+              metricUpdate.getKey().metricName().getNamespace())
+          .setLabel(
+              MonitoringInfoConstants.Labels.NAME, 
metricUpdate.getKey().metricName().getName())
+          .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, 
metricUpdate.getKey().stepName());
     }
+
     builder.setInt64Value(metricUpdate.getUpdate());
     builder.setTimestampToNow();
+
     return builder.build();
   }
 
@@ -191,7 +196,7 @@ public class MetricsContainerImpl implements Serializable, 
MetricsContainer {
     for (MetricUpdate<Long> metricUpdate : metricUpdates.counterUpdates()) {
       MonitoringInfo mi = counterUpdateToMonitoringInfo(metricUpdate);
       if (mi != null) {
-        monitoringInfos.add(counterUpdateToMonitoringInfo(metricUpdate));
+        monitoringInfos.add(mi);
       }
     }
     return monitoringInfos;
@@ -245,35 +250,37 @@ public class MetricsContainerImpl implements 
Serializable, MetricsContainer {
   public void update(Iterable<MonitoringInfo> monitoringInfos) {
     monitoringInfos.forEach(
         monitoringInfo -> {
-          if (monitoringInfo.hasMetric()) {
-            String urn = monitoringInfo.getUrn();
-            MetricName metricName = parseUrn(urn);
-            org.apache.beam.model.pipeline.v1.MetricsApi.Metric metric = 
monitoringInfo.getMetric();
-            if (metric.hasCounterData()) {
-              CounterData counterData = metric.getCounterData();
-              if (counterData.getValueCase() == 
CounterData.ValueCase.INT64_VALUE) {
-                Counter counter = getCounter(metricName);
-                counter.inc(counterData.getInt64Value());
-              } else {
-                LOG.warn("Unsupported CounterData type: {}", counterData);
-              }
-            } else if (metric.hasDistributionData()) {
-              DistributionData distributionData = metric.getDistributionData();
-              if (distributionData.hasIntDistributionData()) {
-                Distribution distribution = getDistribution(metricName);
-                IntDistributionData intDistributionData = 
distributionData.getIntDistributionData();
-                distribution.update(
-                    intDistributionData.getSum(),
-                    intDistributionData.getCount(),
-                    intDistributionData.getMin(),
-                    intDistributionData.getMax());
-              } else {
-                LOG.warn("Unsupported DistributionData type: {}", 
distributionData);
-              }
-            } else if (metric.hasExtremaData()) {
-              ExtremaData extremaData = metric.getExtremaData();
-              LOG.warn("Extrema metric unsupported: {}", extremaData);
+          if (!monitoringInfo.hasMetric()) {
+            return;
+          }
+
+          MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo);
+
+          MetricsApi.Metric metric = monitoringInfo.getMetric();
+          if (metric.hasCounterData()) {
+            CounterData counterData = metric.getCounterData();
+            if (counterData.getValueCase() == 
CounterData.ValueCase.INT64_VALUE) {
+              Counter counter = getCounter(metricName);
+              counter.inc(counterData.getInt64Value());
+            } else {
+              LOG.warn("Unsupported CounterData type: {}", counterData);
+            }
+          } else if (metric.hasDistributionData()) {
+            DistributionData distributionData = metric.getDistributionData();
+            if (distributionData.hasIntDistributionData()) {
+              Distribution distribution = getDistribution(metricName);
+              IntDistributionData intDistributionData = 
distributionData.getIntDistributionData();
+              distribution.update(
+                  intDistributionData.getSum(),
+                  intDistributionData.getCount(),
+                  intDistributionData.getMin(),
+                  intDistributionData.getMax());
+            } else {
+              LOG.warn("Unsupported DistributionData type: {}", 
distributionData);
             }
+          } else if (metric.hasExtremaData()) {
+            ExtremaData extremaData = metric.getExtremaData();
+            LOG.warn("Extrema metric unsupported: {}", extremaData);
           }
         });
   }
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
index 66695e9..3716709 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
@@ -39,9 +39,8 @@ public final class MonitoringInfoConstants {
     public static final String FINISH_BUNDLE_MSECS =
         extractUrn(MonitoringInfoSpecs.Enum.FINISH_BUNDLE_MSECS);
     public static final String TOTAL_MSECS = 
extractUrn(MonitoringInfoSpecs.Enum.TOTAL_MSECS);
-    public static final String USER_COUNTER_PREFIX =
-        extractUrn(MonitoringInfoSpecs.Enum.USER_COUNTER);
-    public static final String USER_DISTRIBUTION_COUNTER_PREFIX =
+    public static final String USER_COUNTER = 
extractUrn(MonitoringInfoSpecs.Enum.USER_COUNTER);
+    public static final String USER_DISTRIBUTION_COUNTER =
         extractUrn(MonitoringInfoSpecs.Enum.USER_DISTRIBUTION_COUNTER);
   }
 
@@ -53,6 +52,8 @@ public final class MonitoringInfoConstants {
         extractLabel(MonitoringInfoLabels.WINDOWING_STRATEGY);
     public static final String CODER = 
extractLabel(MonitoringInfoLabels.CODER);
     public static final String ENVIRONMENT = 
extractLabel(MonitoringInfoLabels.ENVIRONMENT);
+    public static final String NAMESPACE = 
extractLabel(MonitoringInfoLabels.NAMESPACE);
+    public static final String NAME = extractLabel(MonitoringInfoLabels.NAME);
   }
 
   /** MonitoringInfo type Urns. */
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java
index f0a20e1..9143eff 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java
@@ -19,13 +19,11 @@ package org.apache.beam.runners.core.metrics;
 
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
-import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.MetricsApi;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
 
@@ -37,11 +35,9 @@ import 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
 public class MonitoringInfoMetricName extends MetricName {
 
   private String urn;
-  @Nullable private String name;
-  @Nullable private String namespace;
-  private HashMap<String, String> labels = new HashMap<String, String>();
+  private Map<String, String> labels = new HashMap<String, String>();
 
-  private MonitoringInfoMetricName(String urn, HashMap<String, String> labels) 
{
+  private MonitoringInfoMetricName(String urn, Map<String, String> labels) {
     checkArgument(!Strings.isNullOrEmpty(urn), "MonitoringInfoMetricName urn 
must be non-empty");
     checkArgument(labels != null, "MonitoringInfoMetricName labels must be 
non-null");
     // TODO(ajamato): Move SimpleMonitoringInfoBuilder to 
beam-runner-core-construction-java
@@ -52,31 +48,22 @@ public class MonitoringInfoMetricName extends MetricName {
     }
   }
 
-  /** Parse the urn field into a name and namespace field. */
-  private void parseUrn() {
-    if (this.urn.startsWith(MonitoringInfoConstants.Urns.USER_COUNTER_PREFIX)) 
{
-      List<String> split = new 
ArrayList<String>(Arrays.asList(this.getUrn().split(":")));
-      this.name = split.get(split.size() - 1);
-      this.namespace = split.get(split.size() - 2);
-    }
-  }
-
-  /** @return the parsed namespace from the user metric URN, otherwise null. */
   @Override
   public String getNamespace() {
-    if (this.namespace == null) {
-      parseUrn();
+    if (labels.containsKey(MonitoringInfoConstants.Labels.NAMESPACE)) {
+      return labels.getOrDefault(MonitoringInfoConstants.Labels.NAMESPACE, 
null);
+    } else {
+      return urn.split(":", 2)[0];
     }
-    return this.namespace;
   }
 
-  /** @return the parsed name from the user metric URN, otherwise null. */
   @Override
   public String getName() {
-    if (this.name == null) {
-      parseUrn();
+    if (labels.containsKey(MonitoringInfoConstants.Labels.NAME)) {
+      return labels.getOrDefault(MonitoringInfoConstants.Labels.NAME, null);
+    } else {
+      return urn.split(":", 2)[1];
     }
-    return this.name;
   }
 
   /** @return the urn of this MonitoringInfo metric. */
@@ -85,10 +72,14 @@ public class MonitoringInfoMetricName extends MetricName {
   }
 
   /** @return The labels associated with this MonitoringInfo. */
-  public HashMap<String, String> getLabels() {
+  public Map<String, String> getLabels() {
     return this.labels;
   }
 
+  public static MonitoringInfoMetricName of(MetricsApi.MonitoringInfo mi) {
+    return new MonitoringInfoMetricName(mi.getUrn(), mi.getLabelsMap());
+  }
+
   public static MonitoringInfoMetricName named(String urn, HashMap<String, 
String> labels) {
     return new MonitoringInfoMetricName(urn, labels);
   }
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
index 8ab8403..23fa440 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
@@ -44,12 +44,6 @@ import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleF
  * builder.setUrn(SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN); 
builder.setInt64Value(1);
  * builder.setPTransformLabel("myTransform"); 
builder.setPCollectionLabel("myPcollection");
  * MonitoringInfo mi = builder.build();
- *
- * <p>Example Usage (ElementCount counter):
- *
- * <p>SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
- * 
builder.setUrn(SimpleMonitoringInfoBuilder.setUrnForUserMetric("myNamespace", 
"myName"));
- * builder.setInt64Value(1); MonitoringInfo mi = builder.build();
  */
 public class SimpleMonitoringInfoBuilder {
   private final boolean validateAndDropInvalid;
@@ -81,18 +75,6 @@ public class SimpleMonitoringInfoBuilder {
     this.validateAndDropInvalid = validateAndDropInvalid;
   }
 
-  /** @return The metric URN for a user metric, with a proper URN prefix. */
-  public static String userMetricUrn(String metricNamespace, String 
metricName) {
-    String fixedMetricNamespace = metricNamespace.replace(':', '_');
-    String fixedMetricName = metricName.replace(':', '_');
-    StringBuilder sb = new StringBuilder();
-    sb.append(MonitoringInfoConstants.Urns.USER_COUNTER_PREFIX);
-    sb.append(fixedMetricNamespace);
-    sb.append(':');
-    sb.append(fixedMetricName);
-    return sb.toString();
-  }
-
   /**
    * Sets the urn of the MonitoringInfo.
    *
@@ -103,17 +85,6 @@ public class SimpleMonitoringInfoBuilder {
     return this;
   }
 
-  /**
-   * Sets the urn of the MonitoringInfo to a proper user metric URN for the 
given params.
-   *
-   * @param namespace
-   * @param name
-   */
-  public SimpleMonitoringInfoBuilder setUrnForUserMetric(String namespace, 
String name) {
-    this.builder.setUrn(userMetricUrn(namespace, name));
-    return this;
-  }
-
   /** Sets the timestamp of the MonitoringInfo to the current time. */
   public SimpleMonitoringInfoBuilder setTimestampToNow() {
     Instant time = Instant.now();
@@ -134,28 +105,17 @@ public class SimpleMonitoringInfoBuilder {
     return this;
   }
 
-  /** Sets the PTRANSFORM MonitoringInfo label to the given param. */
-  public SimpleMonitoringInfoBuilder setPTransformLabel(String pTransform) {
-    // TODO(ajamato): Add validation that it is a valid pTransform name in the 
bundle descriptor.
-    setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, pTransform);
-    return this;
-  }
-
-  /** Sets the PCOLLECTION MonitoringInfo label to the given param. */
-  public SimpleMonitoringInfoBuilder setPCollectionLabel(String pCollection) {
-    setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, pCollection);
-    return this;
-  }
-
   /** Sets the MonitoringInfo label to the given name and value. */
   public SimpleMonitoringInfoBuilder setLabel(String labelName, String 
labelValue) {
     this.builder.putLabels(labelName, labelValue);
     return this;
   }
 
-  /** Clear the builder and merge from the provided monitoringInfo. */
-  public void clearAndMerge(MonitoringInfo monitoringInfo) {
+  public void clear() {
     this.builder = MonitoringInfo.newBuilder();
+  }
+  /** Clear the builder and merge from the provided monitoringInfo. */
+  public void merge(MonitoringInfo monitoringInfo) {
     this.builder.mergeFrom(monitoringInfo);
   }
 
@@ -164,7 +124,7 @@ public class SimpleMonitoringInfoBuilder {
    *     MonitoringInfos.
    */
   @VisibleForTesting
-  public static MonitoringInfo clearTimestamp(MonitoringInfo input) {
+  public static MonitoringInfo copyAndClearTimestamp(MonitoringInfo input) {
     MonitoringInfo.Builder builder = MonitoringInfo.newBuilder();
     builder.mergeFrom(input);
     builder.clearTimestamp();
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
index 28a846d..61e4077 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
@@ -159,20 +159,24 @@ public class MetricsContainerImplTest {
     c1.inc(3L);
 
     SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
-    builder1.setUrnForUserMetric("ns", "name1");
-    builder1.setInt64Value(5);
-    builder1.setPTransformLabel("step1");
-    builder1.build();
+    builder1
+        .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+        .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+        .setLabel(MonitoringInfoConstants.Labels.NAME, "name1")
+        .setInt64Value(5)
+        .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");
 
     SimpleMonitoringInfoBuilder builder2 = new SimpleMonitoringInfoBuilder();
-    builder2.setUrnForUserMetric("ns", "name2");
-    builder2.setInt64Value(4);
-    builder2.setPTransformLabel("step1");
-    builder2.build();
+    builder2
+        .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+        .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+        .setLabel(MonitoringInfoConstants.Labels.NAME, "name2")
+        .setInt64Value(4)
+        .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");
 
     ArrayList<MonitoringInfo> actualMonitoringInfos = new 
ArrayList<MonitoringInfo>();
     for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
-      
actualMonitoringInfos.add(SimpleMonitoringInfoBuilder.clearTimestamp(mi));
+      
actualMonitoringInfos.add(SimpleMonitoringInfoBuilder.copyAndClearTimestamp(mi));
     }
 
     assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), 
builder2.build()));
@@ -190,13 +194,12 @@ public class MetricsContainerImplTest {
 
     SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
     builder1.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
-    builder1.setPCollectionLabel("pcollection");
+    builder1.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, 
"pcollection");
     builder1.setInt64Value(2);
-    builder1.build();
 
     ArrayList<MonitoringInfo> actualMonitoringInfos = new 
ArrayList<MonitoringInfo>();
     for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
-      
actualMonitoringInfos.add(SimpleMonitoringInfoBuilder.clearTimestamp(mi));
+      
actualMonitoringInfos.add(SimpleMonitoringInfoBuilder.copyAndClearTimestamp(mi));
     }
     assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build()));
   }
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
index a761bdd..9be564a 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
@@ -208,8 +208,11 @@ public class MetricsContainerStepMapTest {
     List<MonitoringInfo> expected = new ArrayList<MonitoringInfo>();
 
     SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
-    builder.setUrnForUserMetric("ns", "name1");
-    builder.setPTransformLabel(STEP1);
+    builder
+        .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+        .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
+        .setLabel(MonitoringInfoConstants.Labels.NAME, "name1");
+    builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, STEP1);
     builder.setInt64Value(7);
     expected.add(builder.build());
 
@@ -218,7 +221,7 @@ public class MetricsContainerStepMapTest {
     ArrayList<MonitoringInfo> actual = new ArrayList<MonitoringInfo>();
 
     for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
-      actual.add(SimpleMonitoringInfoBuilder.clearTimestamp(mi));
+      actual.add(SimpleMonitoringInfoBuilder.copyAndClearTimestamp(mi));
     }
     assertThat(actual, containsInAnyOrder(expected.toArray()));
   }
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricNameTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricNameTest.java
index fa1b989..33ba9cd 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricNameTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricNameTest.java
@@ -36,8 +36,6 @@ public class MonitoringInfoMetricNameTest implements 
Serializable {
     HashMap<String, String> labels = new HashMap<String, String>();
     String urn = MonitoringInfoConstants.Urns.ELEMENT_COUNT;
     MonitoringInfoMetricName name = MonitoringInfoMetricName.named(urn, 
labels);
-    assertEquals(null, name.getName());
-    assertEquals(null, name.getNamespace());
     assertEquals(labels, name.getLabels());
     assertEquals(urn, name.getUrn());
 
@@ -53,24 +51,19 @@ public class MonitoringInfoMetricNameTest implements 
Serializable {
   }
 
   @Test
-  public void testUserCounterUrnConstruction() {
-    String urn = SimpleMonitoringInfoBuilder.userMetricUrn("namespace", 
"name");
+  public void testGetNameReturnsNameIfLabelIsPresent() {
     HashMap<String, String> labels = new HashMap<String, String>();
-    MonitoringInfoMetricName name = MonitoringInfoMetricName.named(urn, 
labels);
-    assertEquals("name", name.getName());
-    assertEquals("namespace", name.getNamespace());
-    assertEquals(labels, name.getLabels());
-    assertEquals(urn, name.getUrn());
-
-    assertEquals(name, name); // test self equals;
-
-    // Reconstruct and test equality and hash code equivalence
-    urn = SimpleMonitoringInfoBuilder.userMetricUrn("namespace", "name");
-    labels = new HashMap<String, String>();
-    MonitoringInfoMetricName name2 = MonitoringInfoMetricName.named(urn, 
labels);
+    labels.put(MonitoringInfoConstants.Labels.NAME, "anyName");
+    MonitoringInfoMetricName name = MonitoringInfoMetricName.named("anyUrn", 
labels);
+    assertEquals("anyName", name.getName());
+  }
 
-    assertEquals(name, name2);
-    assertEquals(name.hashCode(), name2.hashCode());
+  @Test
+  public void testGetNamespaceReturnsNamespaceIfLabelIsPresent() {
+    HashMap<String, String> labels = new HashMap<String, String>();
+    labels.put(MonitoringInfoConstants.Labels.NAMESPACE, "anyNamespace");
+    MonitoringInfoMetricName name = MonitoringInfoMetricName.named("anyUrn", 
labels);
+    assertEquals("anyNamespace", name.getNamespace());
   }
 
   @Test
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoTestUtil.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoTestUtil.java
index 3b33d42..832c07d 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoTestUtil.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoTestUtil.java
@@ -37,7 +37,7 @@ public class MonitoringInfoTestUtil {
   public static MonitoringInfo testElementCountMonitoringInfo(long value) {
     SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
     builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
-    builder.setPCollectionLabel("testPCollection");
+    builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, 
"testPCollection");
     builder.setInt64Value(value);
     return builder.build();
   }
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
index 34f5fb4..ee42da4 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
@@ -38,48 +38,26 @@ public class SimpleMonitoringInfoBuilderTest {
 
     builder.setInt64Value(1);
     assertNull(builder.build());
-
-    builder.setPCollectionLabel("myPcollection");
-    // Pass now that the spec is fully met.
-    MonitoringInfo monitoringInfo = builder.build();
-    assertTrue(monitoringInfo != null);
-    assertEquals(
-        "myPcollection",
-        
monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PCOLLECTION, 
null));
-    assertEquals(MonitoringInfoConstants.Urns.ELEMENT_COUNT, 
monitoringInfo.getUrn());
-    assertEquals(MonitoringInfoConstants.TypeUrns.SUM_INT64, 
monitoringInfo.getType());
-    assertEquals(1, 
monitoringInfo.getMetric().getCounterData().getInt64Value());
   }
 
   @Test
-  public void testUserCounter() {
+  public void testReturnsExpectedMonitoringInfo() {
     SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
-    builder.setUrnForUserMetric("myNamespace", "myName");
-    assertNull(builder.build());
-
+    builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
     builder.setInt64Value(1);
-    // Pass now that the spec is fully met.
-    MonitoringInfo monitoringInfo = builder.build();
-    assertTrue(monitoringInfo != null);
-    assertEquals(
-        MonitoringInfoConstants.Urns.USER_COUNTER_PREFIX + 
"myNamespace:myName",
-        monitoringInfo.getUrn());
-    assertEquals(MonitoringInfoConstants.TypeUrns.SUM_INT64, 
monitoringInfo.getType());
-    assertEquals(1, 
monitoringInfo.getMetric().getCounterData().getInt64Value());
-  }
+    builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, 
"myPcollection");
 
-  @Test
-  public void testUserMetricWithInvalidDelimiterCharacterIsReplaced() {
-    SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
-    builder.setUrnForUserMetric("myNamespace:withInvalidChar", "myName");
-    builder.setInt64Value(1);
     // Pass now that the spec is fully met.
     MonitoringInfo monitoringInfo = builder.build();
     assertTrue(monitoringInfo != null);
     assertEquals(
-        MonitoringInfoConstants.Urns.USER_COUNTER_PREFIX + 
"myNamespace_withInvalidChar:myName",
-        monitoringInfo.getUrn());
+        "myPcollection",
+        
monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PCOLLECTION, 
null));
+    assertEquals(MonitoringInfoConstants.Urns.ELEMENT_COUNT, 
monitoringInfo.getUrn());
     assertEquals(MonitoringInfoConstants.TypeUrns.SUM_INT64, 
monitoringInfo.getType());
     assertEquals(1, 
monitoringInfo.getMetric().getCounterData().getInt64Value());
+    assertEquals(
+        "myPcollection",
+        
monitoringInfo.getLabelsMap().get(MonitoringInfoConstants.Labels.PCOLLECTION));
   }
 }
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleStateRegistryTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleStateRegistryTest.java
index 2caba26..205fde0 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleStateRegistryTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleStateRegistryTest.java
@@ -61,20 +61,20 @@ public class SimpleStateRegistryTest {
     SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
     builder.setUrn(MonitoringInfoConstants.Urns.START_BUNDLE_MSECS);
     builder.setInt64Value(0);
-    builder.setPTransformLabel(testPTransformId);
+    builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, 
testPTransformId);
     matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
     // Check for execution time metrics for the testPTransformId
     builder = new SimpleMonitoringInfoBuilder();
     builder.setUrn(MonitoringInfoConstants.Urns.PROCESS_BUNDLE_MSECS);
     builder.setInt64Value(0);
-    builder.setPTransformLabel(testPTransformId);
+    builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, 
testPTransformId);
     matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
     builder = new SimpleMonitoringInfoBuilder();
     builder.setUrn(MonitoringInfoConstants.Urns.FINISH_BUNDLE_MSECS);
     builder.setInt64Value(0);
-    builder.setPTransformLabel(testPTransformId);
+    builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, 
testPTransformId);
     matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
     for (Matcher<MonitoringInfo> matcher : matchers) {
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
index b3abbc0..a993545 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java
@@ -50,9 +50,11 @@ public class SpecMonitoringInfoValidatorTest {
   public void validateReturnsNoErrorOnValidMonitoringInfo() {
     MonitoringInfo testInput =
         MonitoringInfo.newBuilder()
-            .setUrn(Urns.USER_COUNTER_PREFIX + "someCounter")
+            .setUrn(Urns.USER_COUNTER)
+            .putLabels(MonitoringInfoConstants.Labels.NAME, "anyCounter")
+            .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "")
+            .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyString")
             .setType(TypeUrns.SUM_INT64)
-            .putLabels("dummy", "value")
             .build();
     assertFalse(testObject.validate(testInput).isPresent());
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
index a73d897..237d176b 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.flink.metrics;
 
-import static org.apache.beam.model.pipeline.v1.MetricsApi.labelProps;
 import static 
org.apache.beam.runners.flink.metrics.FlinkMetricContainer.getFlinkMetricNameString;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertNotNull;
@@ -35,13 +34,13 @@ import 
org.apache.beam.model.pipeline.v1.MetricsApi.DoubleDistributionData;
 import org.apache.beam.model.pipeline.v1.MetricsApi.IntDistributionData;
 import org.apache.beam.model.pipeline.v1.MetricsApi.Metric;
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
-import 
org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo.MonitoringInfoLabels;
 import org.apache.beam.runners.core.metrics.CounterCell;
 import org.apache.beam.runners.core.metrics.DistributionCell;
 import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
-import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Urns;
+import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
 import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
 import 
org.apache.beam.runners.flink.metrics.FlinkMetricContainer.FlinkDistributionGauge;
 import org.apache.beam.sdk.metrics.Counter;
@@ -68,13 +67,6 @@ public class FlinkMetricContainerTest {
   @Mock private RuntimeContext runtimeContext;
   @Mock private MetricGroup metricGroup;
 
-  static final String PTRANSFORM_LABEL =
-      MonitoringInfoLabels.forNumber(MonitoringInfoLabels.TRANSFORM_VALUE)
-          .getValueDescriptor()
-          .getOptions()
-          .getExtension(labelProps)
-          .getName();
-
   @Before
   public void beforeTest() {
     MockitoAnnotations.initMocks(this);
@@ -139,18 +131,24 @@ public class FlinkMetricContainerTest {
     SimpleCounter elemCounter = new SimpleCounter();
     
when(metricGroup.counter("beam.metric:element_count:v1")).thenReturn(elemCounter);
 
-    SimpleMonitoringInfoBuilder userCountBuilder = new 
SimpleMonitoringInfoBuilder();
-    userCountBuilder.setUrnForUserMetric("ns1", "metric1");
-    userCountBuilder.setInt64Value(111);
-    MonitoringInfo userCountMonitoringInfo = userCountBuilder.build();
+    MonitoringInfo userCountMonitoringInfo =
+        new SimpleMonitoringInfoBuilder()
+            .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+            .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns1")
+            .setLabel(MonitoringInfoConstants.Labels.NAME, "metric1")
+            .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, 
"anyPTransform")
+            .setInt64Value(111)
+            .build();
     assertNotNull(userCountMonitoringInfo);
 
-    SimpleMonitoringInfoBuilder elemCountBuilder = new 
SimpleMonitoringInfoBuilder();
-    elemCountBuilder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
-    elemCountBuilder.setInt64Value(222);
-    elemCountBuilder.setPTransformLabel("step");
-    elemCountBuilder.setPCollectionLabel("pcoll");
-    MonitoringInfo elemCountMonitoringInfo = elemCountBuilder.build();
+    MonitoringInfo elemCountMonitoringInfo =
+        new SimpleMonitoringInfoBuilder()
+            .setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT)
+            .setInt64Value(222)
+            .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step")
+            .setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, "pcoll")
+            .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, 
"anyPTransform")
+            .build();
     assertNotNull(elemCountMonitoringInfo);
 
     assertThat(userCounter.getCount(), is(0L));
@@ -164,28 +162,34 @@ public class FlinkMetricContainerTest {
   @Test
   public void testDropUnexpectedMonitoringInfoTypes() {
     FlinkMetricContainer flinkContainer = new 
FlinkMetricContainer(runtimeContext);
-    MetricsContainer step = flinkContainer.getMetricsContainer("step");
+    MetricsContainerImpl step = flinkContainer.getMetricsContainer("step");
 
     MonitoringInfo intCounter =
         MonitoringInfo.newBuilder()
-            .setUrn(Urns.USER_COUNTER_PREFIX + "ns1:int_counter")
-            .putLabels(PTRANSFORM_LABEL, "step")
+            .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+            .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "ns1")
+            .putLabels(MonitoringInfoConstants.Labels.NAME, "int_counter")
+            .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "step")
             .setMetric(
                 
Metric.newBuilder().setCounterData(CounterData.newBuilder().setInt64Value(111)))
             .build();
 
     MonitoringInfo doubleCounter =
         MonitoringInfo.newBuilder()
-            .setUrn(Urns.USER_COUNTER_PREFIX + "ns2:double_counter")
-            .putLabels(PTRANSFORM_LABEL, "step")
+            .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+            .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "ns2")
+            .putLabels(MonitoringInfoConstants.Labels.NAME, "double_counter")
+            .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "step")
             .setMetric(
                 
Metric.newBuilder().setCounterData(CounterData.newBuilder().setDoubleValue(222)))
             .build();
 
     MonitoringInfo intDistribution =
         MonitoringInfo.newBuilder()
-            .setUrn(Urns.USER_COUNTER_PREFIX + "ns3:int_distribution")
-            .putLabels(PTRANSFORM_LABEL, "step")
+            .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+            .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "ns3")
+            .putLabels(MonitoringInfoConstants.Labels.NAME, "int_distribution")
+            .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "step")
             .setMetric(
                 Metric.newBuilder()
                     .setDistributionData(
@@ -200,8 +204,10 @@ public class FlinkMetricContainerTest {
 
     MonitoringInfo doubleDistribution =
         MonitoringInfo.newBuilder()
-            .setUrn(Urns.USER_COUNTER_PREFIX + "ns4:double_distribution")
-            .putLabels(PTRANSFORM_LABEL, "step")
+            .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+            .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "ns4")
+            .putLabels(MonitoringInfoConstants.Labels.NAME, 
"double_distribution")
+            .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "step")
             .setMetric(
                 Metric.newBuilder()
                     .setDistributionData(
@@ -231,7 +237,7 @@ public class FlinkMetricContainerTest {
 
     // Verify the counter in the java SDK MetricsContainer
     long count =
-        ((CounterCell) step.getCounter(MetricName.named("ns1", 
"int_counter"))).getCumulative();
+        ((CounterCell) 
step.tryGetCounter(MonitoringInfoMetricName.of(intCounter))).getCumulative();
     assertThat(count, is(111L));
 
     // The one Flink distribution that gets created is a 
FlinkDistributionGauge; here we verify its
@@ -251,7 +257,7 @@ public class FlinkMetricContainerTest {
 
     // Verify that the Java SDK MetricsContainer holds the same information
     DistributionData distributionData =
-        ((DistributionCell) step.getDistribution(MetricName.named("ns3", 
"int_distribution")))
+        ((DistributionCell) 
step.getDistribution(MonitoringInfoMetricName.of(intDistribution)))
             .getCumulative();
     assertThat(distributionData, is(DistributionData.create(30, 10, 1, 5)));
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserDistributionMonitoringInfoToCounterUpdateTransformer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserDistributionMonitoringInfoToCounterUpdateTransformer.java
index ab0c29f..6fa5530 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserDistributionMonitoringInfoToCounterUpdateTransformer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserDistributionMonitoringInfoToCounterUpdateTransformer.java
@@ -55,8 +55,8 @@ class UserDistributionMonitoringInfoToCounterUpdateTransformer
     this.specValidator = specMonitoringInfoValidator;
   }
 
-  static final String BEAM_METRICS_USER_DISTRIBUTION_PREFIX =
-      MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER_PREFIX;
+  static final String BEAM_METRICS_USER_DISTRIBUTION_URN =
+      MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER;
 
   private Optional<String> validate(MonitoringInfo monitoringInfo) {
     Optional<String> validatorResult = specValidator.validate(monitoringInfo);
@@ -65,11 +65,11 @@ class 
UserDistributionMonitoringInfoToCounterUpdateTransformer
     }
 
     String urn = monitoringInfo.getUrn();
-    if (!urn.startsWith(BEAM_METRICS_USER_DISTRIBUTION_PREFIX)) {
+    if (!urn.equals(BEAM_METRICS_USER_DISTRIBUTION_URN)) {
       throw new RuntimeException(
           String.format(
-              "Received unexpected counter urn. Expected urn starting with: 
%s, received: %s",
-              BEAM_METRICS_USER_DISTRIBUTION_PREFIX, urn));
+              "Received unexpected counter urn. Expected urn: %s, received: 
%s",
+              BEAM_METRICS_USER_DISTRIBUTION_URN, urn));
     }
 
     final String ptransform =
@@ -99,22 +99,13 @@ class 
UserDistributionMonitoringInfoToCounterUpdateTransformer
 
     IntDistributionData value =
         
monitoringInfo.getMetric().getDistributionData().getIntDistributionData();
-    String urn = monitoringInfo.getUrn();
 
-    final String ptransform =
-        
monitoringInfo.getLabelsMap().get(MonitoringInfoConstants.Labels.PTRANSFORM);
+    Map<String, String> miLabels = monitoringInfo.getLabelsMap();
+    final String ptransform = 
miLabels.get(MonitoringInfoConstants.Labels.PTRANSFORM);
+    final String counterName = 
miLabels.get(MonitoringInfoConstants.Labels.NAME);
+    final String counterNamespace = 
miLabels.get(MonitoringInfoConstants.Labels.NAMESPACE);
 
     CounterStructuredNameAndMetadata name = new 
CounterStructuredNameAndMetadata();
-
-    String nameWithNamespace =
-        
urn.substring(BEAM_METRICS_USER_DISTRIBUTION_PREFIX.length()).replace("^:", "");
-
-    // TODO(BEAM-6925) Extract common logic to separate class.
-    final int lastColonIndex = nameWithNamespace.lastIndexOf(':');
-    String counterName = nameWithNamespace.substring(lastColonIndex + 1);
-    String counterNamespace =
-        lastColonIndex == -1 ? "" : nameWithNamespace.substring(0, 
lastColonIndex);
-
     DataflowStepContext stepContext = transformIdMapping.get(ptransform);
     name.setName(
             new CounterStructuredName()
@@ -137,6 +128,6 @@ class 
UserDistributionMonitoringInfoToCounterUpdateTransformer
 
   /** @return MonitoringInfo urns prefix that this transformer can convert to 
CounterUpdates. */
   public String getSupportedUrnPrefix() {
-    return BEAM_METRICS_USER_DISTRIBUTION_PREFIX;
+    return BEAM_METRICS_USER_DISTRIBUTION_URN;
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java
index 0dfaee0..0d3ed42 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java
@@ -53,7 +53,7 @@ class UserMonitoringInfoToCounterUpdateTransformer
     this.specValidator = specMonitoringInfoValidator;
   }
 
-  static final String BEAM_METRICS_USER_PREFIX = 
MonitoringInfoConstants.Urns.USER_COUNTER_PREFIX;
+  static final String BEAM_METRICS_USER_URN = 
MonitoringInfoConstants.Urns.USER_COUNTER;
 
   private Optional<String> validate(MonitoringInfo monitoringInfo) {
     Optional<String> validatorResult = specValidator.validate(monitoringInfo);
@@ -62,11 +62,11 @@ class UserMonitoringInfoToCounterUpdateTransformer
     }
 
     String urn = monitoringInfo.getUrn();
-    if (!urn.startsWith(BEAM_METRICS_USER_PREFIX)) {
+    if (!urn.equals(BEAM_METRICS_USER_URN)) {
       throw new RuntimeException(
           String.format(
-              "Received unexpected counter urn. Expected urn starting with: 
%s, received: %s",
-              BEAM_METRICS_USER_PREFIX, urn));
+              "Received unexpected counter urn. Expected urn: %s, received: 
%s",
+              BEAM_METRICS_USER_URN, urn));
     }
 
     final String ptransform =
@@ -95,20 +95,13 @@ class UserMonitoringInfoToCounterUpdateTransformer
     }
 
     long value = monitoringInfo.getMetric().getCounterData().getInt64Value();
-    String urn = monitoringInfo.getUrn();
 
-    final String ptransform =
-        
monitoringInfo.getLabelsMap().get(MonitoringInfoConstants.Labels.PTRANSFORM);
+    Map<String, String> miLabels = monitoringInfo.getLabelsMap();
+    final String ptransform = 
miLabels.get(MonitoringInfoConstants.Labels.PTRANSFORM);
+    final String counterName = 
miLabels.get(MonitoringInfoConstants.Labels.NAME);
+    final String counterNamespace = 
miLabels.get(MonitoringInfoConstants.Labels.NAMESPACE);
 
     CounterStructuredNameAndMetadata name = new 
CounterStructuredNameAndMetadata();
-
-    String nameWithNamespace = 
urn.substring(BEAM_METRICS_USER_PREFIX.length()).replace("^:", "");
-
-    final int lastColonIndex = nameWithNamespace.lastIndexOf(':');
-    String counterName = nameWithNamespace.substring(lastColonIndex + 1);
-    String counterNamespace =
-        lastColonIndex == -1 ? "" : nameWithNamespace.substring(0, 
lastColonIndex);
-
     DataflowStepContext stepContext = transformIdMapping.get(ptransform);
     name.setName(
             new CounterStructuredName()
@@ -126,6 +119,6 @@ class UserMonitoringInfoToCounterUpdateTransformer
 
   /** @return MonitoringInfo urns prefix that this transformer can convert to 
CounterUpdates. */
   public String getSupportedUrnPrefix() {
-    return BEAM_METRICS_USER_PREFIX;
+    return BEAM_METRICS_USER_URN;
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
index 248b9e6..6263610 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
@@ -462,8 +462,10 @@ public class BeamFnMapTaskExecutorTest {
     final int expectedCounterValue = 5;
     final MonitoringInfo expectedMonitoringInfo =
         MonitoringInfo.newBuilder()
-            .setUrn("beam:metric:user:ExpectedCounter")
-            .setType("beam:metrics:sum_int_64")
+            .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+            .putLabels(MonitoringInfoConstants.Labels.NAME, "ExpectedCounter")
+            .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "anyString")
+            .setType(MonitoringInfoConstants.TypeUrns.SUM_INT64)
             .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, 
"ExpectedPTransform")
             .setMetric(
                 Metric.newBuilder()
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserDistributionMonitoringInfoToCounterUpdateTransformerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserDistributionMonitoringInfoToCounterUpdateTransformerTest.java
index 203ae8b..1d9c9f5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserDistributionMonitoringInfoToCounterUpdateTransformerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserDistributionMonitoringInfoToCounterUpdateTransformerTest.java
@@ -80,7 +80,9 @@ public class 
UserDistributionMonitoringInfoToCounterUpdateTransformerTest {
     Map<String, DataflowStepContext> stepContextMapping = new HashMap<>();
     MonitoringInfo monitoringInfo =
         MonitoringInfo.newBuilder()
-            .setUrn("beam:metric:user_distribution:anyNamespace:anyName")
+            .setUrn("beam:metric:user_distribution")
+            .putLabels(MonitoringInfoConstants.Labels.NAME, "anyName")
+            .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, 
"anyNamespace")
             .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyValue")
             .build();
     UserDistributionMonitoringInfoToCounterUpdateTransformer testObject =
@@ -101,7 +103,9 @@ public class 
UserDistributionMonitoringInfoToCounterUpdateTransformerTest {
 
     MonitoringInfo monitoringInfo =
         MonitoringInfo.newBuilder()
-            .setUrn("beam:metric:user_distribution:anyNamespace:anyName")
+            .setUrn("beam:metric:user_distribution")
+            .putLabels(MonitoringInfoConstants.Labels.NAME, "anyName")
+            .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, 
"anyNamespace")
             .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyValue")
             .build();
     UserDistributionMonitoringInfoToCounterUpdateTransformer testObject =
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java
index 6d65e1b..e2992f5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java
@@ -78,7 +78,9 @@ public class UserMonitoringInfoToCounterUpdateTransformerTest 
{
     Map<String, DataflowStepContext> stepContextMapping = new HashMap<>();
     MonitoringInfo monitoringInfo =
         MonitoringInfo.newBuilder()
-            .setUrn("beam:metric:user:anyNamespace:anyName")
+            .setUrn("beam:metric:user")
+            .putLabels(MonitoringInfoConstants.Labels.NAME, "anyName")
+            .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, 
"anyNamespace")
             .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyValue")
             .build();
     UserMonitoringInfoToCounterUpdateTransformer testObject =
@@ -98,7 +100,9 @@ public class 
UserMonitoringInfoToCounterUpdateTransformerTest {
 
     MonitoringInfo monitoringInfo =
         MonitoringInfo.newBuilder()
-            .setUrn("beam:metric:user:anyNamespace:anyName")
+            .setUrn("beam:metric:user")
+            .putLabels(MonitoringInfoConstants.Labels.NAME, "anyName")
+            .putLabels(MonitoringInfoConstants.Labels.NAMESPACE, 
"anyNamespace")
             .putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "anyValue")
             .build();
     UserMonitoringInfoToCounterUpdateTransformer testObject =
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 4839fef..108f88a 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -637,21 +637,35 @@ public class RemoteExecutionTest implements Serializable {
             List<Matcher<MonitoringInfo>> matchers = new 
ArrayList<Matcher<MonitoringInfo>>();
 
             SimpleMonitoringInfoBuilder builder = new 
SimpleMonitoringInfoBuilder();
-            builder.setUrnForUserMetric(
-                RemoteExecutionTest.class.getName(), processUserCounterName);
-            builder.setPTransformLabel("create/ParMultiDo(Anonymous)");
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, 
RemoteExecutionTest.class.getName())
+                .setLabel(MonitoringInfoConstants.Labels.NAME, 
processUserCounterName);
+            builder.setLabel(
+                MonitoringInfoConstants.Labels.PTRANSFORM, 
"create/ParMultiDo(Anonymous)");
             builder.setInt64Value(1);
             
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
             builder = new SimpleMonitoringInfoBuilder();
-            builder.setUrnForUserMetric(RemoteExecutionTest.class.getName(), 
startUserCounterName);
-            builder.setPTransformLabel("create/ParMultiDo(Anonymous)");
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, 
RemoteExecutionTest.class.getName())
+                .setLabel(MonitoringInfoConstants.Labels.NAME, 
startUserCounterName);
+            builder.setLabel(
+                MonitoringInfoConstants.Labels.PTRANSFORM, 
"create/ParMultiDo(Anonymous)");
             builder.setInt64Value(10);
             
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
             builder = new SimpleMonitoringInfoBuilder();
-            builder.setUrnForUserMetric(RemoteExecutionTest.class.getName(), 
finishUserCounterName);
-            builder.setPTransformLabel("create/ParMultiDo(Anonymous)");
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, 
RemoteExecutionTest.class.getName())
+                .setLabel(MonitoringInfoConstants.Labels.NAME, 
finishUserCounterName);
+            builder.setLabel(
+                MonitoringInfoConstants.Labels.PTRANSFORM, 
"create/ParMultiDo(Anonymous)");
             builder.setInt64Value(100);
             
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
@@ -659,26 +673,31 @@ public class RemoteExecutionTest implements Serializable {
             // So there should be only two elements.
             builder = new SimpleMonitoringInfoBuilder();
             builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
-            builder.setPCollectionLabel("impulse.out");
+            builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, 
"impulse.out");
             builder.setInt64Value(2);
             
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
             builder = new SimpleMonitoringInfoBuilder();
             builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
-            builder.setPCollectionLabel("create/ParMultiDo(Anonymous).output");
+            builder.setLabel(
+                MonitoringInfoConstants.Labels.PCOLLECTION, 
"create/ParMultiDo(Anonymous).output");
             builder.setInt64Value(3);
             
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
             // Verify that the element count is not double counted if two 
PCollections consume it.
             builder = new SimpleMonitoringInfoBuilder();
             builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
-            
builder.setPCollectionLabel("processA/ParMultiDo(Anonymous).output");
+            builder.setLabel(
+                MonitoringInfoConstants.Labels.PCOLLECTION,
+                "processA/ParMultiDo(Anonymous).output");
             builder.setInt64Value(6);
             
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
             builder = new SimpleMonitoringInfoBuilder();
             builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
-            
builder.setPCollectionLabel("processB/ParMultiDo(Anonymous).output");
+            builder.setLabel(
+                MonitoringInfoConstants.Labels.PCOLLECTION,
+                "processB/ParMultiDo(Anonymous).output");
             builder.setInt64Value(6);
             
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
@@ -686,7 +705,7 @@ public class RemoteExecutionTest implements Serializable {
             builder = new SimpleMonitoringInfoBuilder();
             builder.setUrn(MonitoringInfoConstants.Urns.START_BUNDLE_MSECS);
             builder.setInt64TypeUrn();
-            builder.setPTransformLabel(testPTransformId);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, 
testPTransformId);
             matchers.add(
                 allOf(
                     MonitoringInfoMatchers.matchSetFields(builder.build()),
@@ -696,7 +715,7 @@ public class RemoteExecutionTest implements Serializable {
             builder = new SimpleMonitoringInfoBuilder();
             builder.setUrn(Urns.PROCESS_BUNDLE_MSECS);
             builder.setInt64TypeUrn();
-            builder.setPTransformLabel(testPTransformId);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, 
testPTransformId);
             matchers.add(
                 allOf(
                     MonitoringInfoMatchers.matchSetFields(builder.build()),
@@ -705,7 +724,7 @@ public class RemoteExecutionTest implements Serializable {
             builder = new SimpleMonitoringInfoBuilder();
             builder.setUrn(Urns.FINISH_BUNDLE_MSECS);
             builder.setInt64TypeUrn();
-            builder.setPTransformLabel(testPTransformId);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, 
testPTransformId);
             matchers.add(
                 allOf(
                     MonitoringInfoMatchers.matchSetFields(builder.build()),
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 2e8b33c..db4751f 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -665,29 +665,35 @@ public class FnApiDoFnRunnerTest implements Serializable {
     List<MonitoringInfo> expected = new ArrayList<MonitoringInfo>();
     SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
     builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
-    builder.setPCollectionLabel("Window.Into()/Window.Assign.out");
+    builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, 
"Window.Into()/Window.Assign.out");
     builder.setInt64Value(2);
     expected.add(builder.build());
 
     builder = new SimpleMonitoringInfoBuilder();
     builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
-    builder.setPCollectionLabel(
+    builder.setLabel(
+        MonitoringInfoConstants.Labels.PCOLLECTION,
         
"pTransformId/ParMultiDo(TestSideInputIsAccessibleForDownstreamCallers).output");
     builder.setInt64Value(2);
     expected.add(builder.build());
 
     builder = new SimpleMonitoringInfoBuilder();
-    builder.setUrnForUserMetric(
-        TestSideInputIsAccessibleForDownstreamCallersDoFn.class.getName(),
-        TestSideInputIsAccessibleForDownstreamCallersDoFn.USER_COUNTER_NAME);
-    builder.setPTransformLabel(TEST_PTRANSFORM_ID);
+    builder
+        .setUrn(MonitoringInfoConstants.Urns.USER_COUNTER)
+        .setLabel(
+            MonitoringInfoConstants.Labels.NAMESPACE,
+            TestSideInputIsAccessibleForDownstreamCallersDoFn.class.getName())
+        .setLabel(
+            MonitoringInfoConstants.Labels.NAME,
+            
TestSideInputIsAccessibleForDownstreamCallersDoFn.USER_COUNTER_NAME);
+    builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, 
TEST_PTRANSFORM_ID);
     builder.setInt64Value(2);
     expected.add(builder.build());
 
     closeable.close();
     List<MonitoringInfo> result = new ArrayList<MonitoringInfo>();
     for (MonitoringInfo mi : metricsContainerRegistry.getMonitoringInfos()) {
-      result.add(SimpleMonitoringInfoBuilder.clearTimestamp(mi));
+      result.add(SimpleMonitoringInfoBuilder.copyAndClearTimestamp(mi));
     }
     assertThat(result, containsInAnyOrder(expected.toArray()));
   }
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/ElementCountFnDataReceiverTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/ElementCountFnDataReceiverTest.java
index b6768e1..316caa7 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/ElementCountFnDataReceiverTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/ElementCountFnDataReceiverTest.java
@@ -65,13 +65,13 @@ public class ElementCountFnDataReceiverTest {
 
     SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
     builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
-    builder.setPCollectionLabel(pCollectionA);
+    builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, pCollectionA);
     builder.setInt64Value(numElements);
     MonitoringInfo expected = builder.build();
 
     // Clear the timestamp before comparison.
     MonitoringInfo first = 
metricsContainerRegistry.getMonitoringInfos().iterator().next();
-    MonitoringInfo result = SimpleMonitoringInfoBuilder.clearTimestamp(first);
+    MonitoringInfo result = 
SimpleMonitoringInfoBuilder.copyAndClearTimestamp(first);
     assertEquals(expected, result);
   }
 
diff --git a/sdks/python/apache_beam/metrics/execution.py 
b/sdks/python/apache_beam/metrics/execution.py
index 12ecf20..420f7ff 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -40,8 +40,6 @@ from apache_beam.metrics import monitoring_infos
 from apache_beam.metrics.cells import CounterCell
 from apache_beam.metrics.cells import DistributionCell
 from apache_beam.metrics.cells import GaugeCell
-from apache_beam.metrics.monitoring_infos import user_distribution_metric_urn
-from apache_beam.metrics.monitoring_infos import user_metric_urn
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.runners.worker import statesampler
 
@@ -238,22 +236,22 @@ class MetricsContainer(object):
     """Returns a list of MonitoringInfos for the metrics in this container."""
     all_user_metrics = []
     for k, v in self.counters.items():
-      all_user_metrics.append(monitoring_infos.int64_counter(
-          user_metric_urn(k.namespace, k.name),
+      all_user_metrics.append(monitoring_infos.int64_user_counter(
+          k.namespace, k.name,
           v.to_runner_api_monitoring_info(),
           ptransform=transform_id
       ))
 
     for k, v in self.distributions.items():
-      all_user_metrics.append(monitoring_infos.int64_distribution(
-          user_distribution_metric_urn(k.namespace, k.name),
+      all_user_metrics.append(monitoring_infos.int64_user_distribution(
+          k.namespace, k.name,
           v.get_cumulative().to_runner_api_monitoring_info(),
           ptransform=transform_id
       ))
 
     for k, v in self.gauges.items():
-      all_user_metrics.append(monitoring_infos.int64_gauge(
-          user_metric_urn(k.namespace, k.name),
+      all_user_metrics.append(monitoring_infos.int64_user_gauge(
+          k.namespace, k.name,
           v.get_cumulative().to_runner_api_monitoring_info(),
           ptransform=transform_id
       ))
diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py 
b/sdks/python/apache_beam/metrics/monitoring_infos.py
index aa4615d..76f3b45 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos.py
@@ -43,9 +43,9 @@ PROCESS_BUNDLE_MSECS_URN = (
 FINISH_BUNDLE_MSECS_URN = (
     common_urns.monitoring_info_specs.FINISH_BUNDLE_MSECS.spec.urn)
 TOTAL_MSECS_URN = common_urns.monitoring_info_specs.TOTAL_MSECS.spec.urn
-USER_COUNTER_URN_PREFIX = (
+USER_COUNTER_URN = (
     common_urns.monitoring_info_specs.USER_COUNTER.spec.urn)
-USER_DISTRIBUTION_COUNTER_URN_PREFIX = (
+USER_DISTRIBUTION_COUNTER_URN = (
     common_urns.monitoring_info_specs.USER_DISTRIBUTION_COUNTER.spec.urn)
 
 # TODO(ajamato): Implement the remaining types, i.e. Double types
@@ -61,9 +61,14 @@ DISTRIBUTION_TYPES = set([DISTRIBUTION_INT64_TYPE])
 GAUGE_TYPES = set([LATEST_INT64_TYPE])
 
 # TODO(migryz) extract values from beam_fn_api.proto::MonitoringInfoLabels
-PCOLLECTION_LABEL = 'PCOLLECTION'
-PTRANSFORM_LABEL = 'PTRANSFORM'
-TAG_LABEL = 'TAG'
+PCOLLECTION_LABEL = (
+    common_urns.monitoring_info_labels.PCOLLECTION.label_props.name)
+PTRANSFORM_LABEL = (
+    common_urns.monitoring_info_labels.TRANSFORM.label_props.name)
+NAMESPACE_LABEL = (
+    common_urns.monitoring_info_labels.NAMESPACE.label_props.name)
+NAME_LABEL = (common_urns.monitoring_info_labels.NAME.label_props.name)
+TAG_LABEL = "TAG"
 
 
 def to_timestamp_proto(timestamp_secs):
@@ -104,7 +109,7 @@ def extract_distribution(monitoring_info_proto):
   return None
 
 
-def create_labels(ptransform='', tag=''):
+def create_labels(ptransform=None, tag=None, namespace=None, name=None):
   """Create the label dictionary based on the provided tags.
 
   Args:
@@ -116,10 +121,36 @@ def create_labels(ptransform='', tag=''):
     labels[TAG_LABEL] = tag
   if ptransform:
     labels[PTRANSFORM_LABEL] = ptransform
+  if namespace:
+    labels[NAMESPACE_LABEL] = namespace
+  if name:
+    labels[NAME_LABEL] = name
   return labels
 
 
-def int64_counter(urn, metric, ptransform='', tag=''):
+def int64_user_counter(namespace, name, metric, ptransform=None, tag=None):
+  """Return the counter monitoring info for the specifed URN, metric and 
labels.
+
+  Args:
+    urn: The URN of the monitoring info/metric.
+    metric: The metric proto field to use in the monitoring info.
+        Or an int value.
+    ptransform: The ptransform/step name used as a label.
+    tag: The output tag name, used as a label.
+  """
+  labels = create_labels(ptransform=ptransform, tag=tag, namespace=namespace,
+                         name=name)
+  if isinstance(metric, int):
+    metric = Metric(
+        counter_data=CounterData(
+            int64_value=metric
+        )
+    )
+  return create_monitoring_info(USER_COUNTER_URN, SUM_INT64_TYPE, metric,
+                                labels)
+
+
+def int64_counter(urn, metric, ptransform=None, tag=None):
   """Return the counter monitoring info for the specifed URN, metric and 
labels.
 
   Args:
@@ -139,7 +170,7 @@ def int64_counter(urn, metric, ptransform='', tag=''):
   return create_monitoring_info(urn, SUM_INT64_TYPE, metric, labels)
 
 
-def int64_distribution(urn, metric, ptransform='', tag=''):
+def int64_user_distribution(namespace, name, metric, ptransform=None, 
tag=None):
   """Return the distribution monitoring info for the URN, metric and labels.
 
   Args:
@@ -149,22 +180,27 @@ def int64_distribution(urn, metric, ptransform='', 
tag=''):
     ptransform: The ptransform/step name used as a label.
     tag: The output tag name, used as a label.
   """
-  labels = create_labels(ptransform=ptransform, tag=tag)
-  return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, metric, labels)
+  labels = create_labels(ptransform=ptransform, tag=tag, namespace=namespace,
+                         name=name)
+  return create_monitoring_info(USER_DISTRIBUTION_COUNTER_URN,
+                                DISTRIBUTION_INT64_TYPE, metric, labels)
 
 
-def int64_gauge(urn, metric, ptransform='', tag=''):
+def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
   """Return the gauge monitoring info for the URN, metric and labels.
 
   Args:
-    urn: The URN of the monitoring info/metric.
+    namespace: User-defined namespace of counter.
+    name: Name of counter.
     metric: The metric proto field to use in the monitoring info.
         Or an int value.
     ptransform: The ptransform/step name used as a label.
     tag: The output tag name, used as a label.
   """
-  labels = create_labels(ptransform=ptransform, tag=tag)
-  return create_monitoring_info(urn, LATEST_INT64_TYPE, metric, labels)
+  labels = create_labels(ptransform=ptransform, tag=tag, namespace=namespace,
+                         name=name)
+  return create_monitoring_info(USER_COUNTER_URN, LATEST_INT64_TYPE, metric,
+                                labels)
 
 
 def create_monitoring_info(urn, type_urn, metric_proto, labels=None):
@@ -187,27 +223,6 @@ def create_monitoring_info(urn, type_urn, metric_proto, 
labels=None):
   )
 
 
-def user_metric_urn(namespace, name):
-  """Returns the metric URN for a user metric, with a proper URN prefix.
-
-  Args:
-    namespace: The namespace of the metric.
-    name: The name of the metric.
-  """
-  return '%s%s:%s' % (USER_COUNTER_URN_PREFIX, namespace, name)
-
-
-def user_distribution_metric_urn(namespace, name):
-  """Returns the metric URN for a user distribution metric,
-  with a proper URN prefix.
-
-  Args:
-    namespace: The namespace of the metric.
-    name: The name of the metric.
-  """
-  return '%s%s:%s' % (USER_DISTRIBUTION_COUNTER_URN_PREFIX, namespace, name)
-
-
 def is_counter(monitoring_info_proto):
   """Returns true if the monitoring info is a coutner metric."""
   return monitoring_info_proto.type in COUNTER_TYPES
@@ -224,13 +239,11 @@ def is_gauge(monitoring_info_proto):
 
 
 def _is_user_monitoring_info(monitoring_info_proto):
-  return monitoring_info_proto.urn.startswith(
-      USER_COUNTER_URN_PREFIX)
+  return monitoring_info_proto.urn == USER_COUNTER_URN
 
 
 def _is_user_distribution_monitoring_info(monitoring_info_proto):
-  return monitoring_info_proto.urn.startswith(
-      USER_DISTRIBUTION_COUNTER_URN_PREFIX)
+  return monitoring_info_proto.urn == USER_DISTRIBUTION_COUNTER_URN
 
 
 def is_user_monitoring_info(monitoring_info_proto):
@@ -263,19 +276,14 @@ def 
extract_metric_result_map_value(monitoring_info_proto):
 
 def parse_namespace_and_name(monitoring_info_proto):
   """Returns the (namespace, name) tuple of the URN in the monitoring info."""
-  to_split = monitoring_info_proto.urn
-
   # Remove the URN prefix which indicates that it is a user counter.
-  prefix_len = 0
-  if _is_user_distribution_monitoring_info(monitoring_info_proto):
-    prefix_len = len(USER_DISTRIBUTION_COUNTER_URN_PREFIX)
-  elif _is_user_monitoring_info(monitoring_info_proto):
-    prefix_len = len(USER_COUNTER_URN_PREFIX)
-  to_split = monitoring_info_proto.urn[prefix_len:]
+  if is_user_monitoring_info(monitoring_info_proto):
+    labels = monitoring_info_proto.labels
+    return labels[NAMESPACE_LABEL], labels[NAME_LABEL]
 
   # If it is not a user counter, just use the first part of the URN, i.e. 
'beam'
-  split = to_split.split(':')
-  return split[0], ':'.join(split[1:])
+  split = monitoring_info_proto.urn.split(':', 1)
+  return split[0], split[1]
 
 
 def to_key(monitoring_info_proto):
diff --git a/sdks/python/apache_beam/metrics/monitoring_infos_test.py 
b/sdks/python/apache_beam/metrics/monitoring_infos_test.py
index 67bca09..466969a 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos_test.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos_test.py
@@ -31,17 +31,23 @@ class MonitoringInfosTest(unittest.TestCase):
     self.assertEqual(name, "dummy:metric")
 
   def test_parse_namespace_and_name_for_user_metric(self):
-    urn = (monitoring_infos.USER_COUNTER_URN_PREFIX +
-           "counternamespace:countername")
-    input = monitoring_infos.create_monitoring_info(urn, "typeurn", None)
+    urn = monitoring_infos.USER_COUNTER_URN
+    labels = {}
+    labels[monitoring_infos.NAMESPACE_LABEL] = "counternamespace"
+    labels[monitoring_infos.NAME_LABEL] = "countername"
+    input = monitoring_infos.create_monitoring_info(urn, "typeurn", None,
+                                                    labels)
     namespace, name = monitoring_infos.parse_namespace_and_name(input)
     self.assertEqual(namespace, "counternamespace")
     self.assertEqual(name, "countername")
 
   def test_parse_namespace_and_name_for_user_distribution_metric(self):
-    urn = (monitoring_infos.USER_DISTRIBUTION_COUNTER_URN_PREFIX +
-           "counternamespace:countername")
-    input = monitoring_infos.create_monitoring_info(urn, "typeurn", None)
+    urn = monitoring_infos.USER_DISTRIBUTION_COUNTER_URN
+    labels = {}
+    labels[monitoring_infos.NAMESPACE_LABEL] = "counternamespace"
+    labels[monitoring_infos.NAME_LABEL] = "countername"
+    input = monitoring_infos.create_monitoring_info(urn, "typeurn", None,
+                                                    labels)
     namespace, name = monitoring_infos.parse_namespace_and_name(input)
     self.assertEqual(namespace, "counternamespace")
     self.assertEqual(name, "countername")
diff --git a/sdks/python/apache_beam/portability/common_urns.py 
b/sdks/python/apache_beam/portability/common_urns.py
index 693133b..c1164e3 100644
--- a/sdks/python/apache_beam/portability/common_urns.py
+++ b/sdks/python/apache_beam/portability/common_urns.py
@@ -34,6 +34,8 @@ class PropertiesFromEnumValue(object):
         beam_runner_api_pb2.beam_constant])
     self.spec = (value_descriptor.GetOptions().Extensions[
         metrics_pb2.monitoring_info_spec])
+    self.label_props = (value_descriptor.GetOptions().Extensions[
+        metrics_pb2.label_props])
 
 
 class PropertiesFromEnumType(object):
@@ -82,3 +84,5 @@ monitoring_info_specs = PropertiesFromEnumType(
     metrics_pb2.MonitoringInfoSpecs.Enum)
 monitoring_info_types = PropertiesFromEnumType(
     metrics_pb2.MonitoringInfoTypeUrns.Enum)
+monitoring_info_labels = PropertiesFromEnumType(
+    metrics_pb2.MonitoringInfo.MonitoringInfoLabels)

Reply via email to