[ 
https://issues.apache.org/jira/browse/BEAM-6138?focusedWorklogId=174633&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-174633
 ]

ASF GitHub Bot logged work on BEAM-6138:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Dec/18 18:44
            Start Date: 12/Dec/18 18:44
    Worklog Time Spent: 10m 
      Work Description: swegner closed pull request #7244: [BEAM-6138] Add a 
MonitoringInfoSpec proto and SimpleMonitoringInfoBuilder to pro…
URL: https://github.com/apache/beam/pull/7244
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto 
b/model/fn-execution/src/main/proto/beam_fn_api.proto
index a8d9f709a557..948d07805d83 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -77,17 +77,20 @@ message Target {
 message RemoteGrpcPort {
   // (Required) An API descriptor which describes where to
   // connect to including any authentication that is required.
-  org.apache.beam.model.pipeline.v1.ApiServiceDescriptor 
api_service_descriptor = 1;
+  org.apache.beam.model.pipeline.v1.ApiServiceDescriptor
+      api_service_descriptor = 1;
 
-  // (Required) The ID of the Coder that will be used to encode and decode 
data sent over this port.
+  // (Required) The ID of the Coder that will be used to encode and decode data
+  // sent over this port.
   string coder_id = 2;
 }
 
 /*
  * Control Plane API
  *
- * Progress reporting and splitting still need further vetting. Also, this may 
change
- * with the addition of new types of instructions/responses related to metrics.
+ * Progress reporting and splitting still need further vetting. Also, this may
+ * change with the addition of new types of instructions/responses related to
+ * metrics.
  */
 
 // An API that describes the work that a SDK harness is meant to do.
@@ -96,12 +99,12 @@ service BeamFnControl {
   // Instructions sent by the runner to the SDK requesting different types
   // of work.
   rpc Control(
-    // A stream of responses to instructions the SDK was asked to be performed.
-    stream InstructionResponse
-  ) returns (
-    // A stream of instructions requested of the SDK to be performed.
-    stream InstructionRequest
-  ) {}
+      // A stream of responses to instructions the SDK was asked to be
+      // performed.
+      stream InstructionResponse)
+      returns (
+          // A stream of instructions requested of the SDK to be performed.
+          stream InstructionRequest) {}
 }
 
 // A request sent by a runner which the SDK is asked to fulfill.
@@ -156,8 +159,7 @@ message RegisterRequest {
 }
 
 // Stable
-message RegisterResponse {
-}
+message RegisterResponse {}
 
 // Definitions that should be used to construct the bundle processing graph.
 message ProcessBundleDescriptor {
@@ -172,7 +174,8 @@ message ProcessBundleDescriptor {
   map<string, org.apache.beam.model.pipeline.v1.PCollection> pcollections = 3;
 
   // (Required) A map from pipeline-scoped id to WindowingStrategy.
-  map<string, org.apache.beam.model.pipeline.v1.WindowingStrategy> 
windowing_strategies = 4;
+  map<string, org.apache.beam.model.pipeline.v1.WindowingStrategy>
+      windowing_strategies = 4;
 
   // (Required) A map from pipeline-scoped id to Coder.
   map<string, org.apache.beam.model.pipeline.v1.Coder> coders = 5;
@@ -183,20 +186,24 @@ message ProcessBundleDescriptor {
   // A descriptor describing the end point to use for State API
   // calls. Required if the Runner intends to send remote references over the
   // data plane or if any of the transforms rely on user state or side inputs.
-  org.apache.beam.model.pipeline.v1.ApiServiceDescriptor 
state_api_service_descriptor = 7;
+  org.apache.beam.model.pipeline.v1.ApiServiceDescriptor
+      state_api_service_descriptor = 7;
 }
 
-
-// Represents a non-negative decimal number: unscaled_value * 10^(scale) 
(scientific notation)
+// Represents a non-negative decimal number: unscaled_value * 10^(scale)
+// (scientific notation)
 message Decimal {
-  // Represents the unscaled value as a big endian unlimited precision 
non-negative integer.
+  // Represents the unscaled value as a big endian unlimited precision
+  // non-negative integer.
   bytes unscaled_value = 1;
   // Represents the scale
   int32 scale = 2;
 }
 
 // One of the applications specifying the scope of work for a bundle.
-// See 
https://docs.google.com/document/d/1tUDb45sStdR8u7-jBkGdw3OGFK7aa2-V7eo86zYSE_4/edit#heading=h.9g3g5weg2u9
 for further details.
+// See
+// 
https://docs.google.com/document/d/1tUDb45sStdR8u7-jBkGdw3OGFK7aa2-V7eo86zYSE_4/edit#heading=h.9g3g5weg2u9
+// for further details.
 message BundleApplication {
   // (Required) The primitive transform to which to pass the element
   string ptransform_id = 1;
@@ -223,7 +230,8 @@ message BundleApplication {
     //  * a globally shared resource such as a Pubsub queue should set this
     //    to “”.
     //  * a shared partitioned resource should use the partition identifier.
-    //  * a uniquely partitioned resource such as a file range should set this 
to
+    //  * a uniquely partitioned resource such as a file range should set this
+    //  to
     //    file name + start offset.
     bytes partition = 1;
 
@@ -310,6 +318,97 @@ message ProcessBundleProgressRequest {
   string instruction_reference = 1;
 }
 
+// A specification containing required set of fields and labels required
+// to be set on a MonitoringInfo for the specific URN for SDK->RunnerHarness
+// ProcessBundleResponse reporting.
+message MonitoringInfoSpec {
+  string urn = 1;
+  string type_urn = 2;
+  // The list of required
+  repeated string required_labels = 3;
+  // Extra non functional parts of the spec for descriptive purposes.
+  // i.e. description, units, etc.
+  repeated Annotation annotations = 4;
+}
+
+extend google.protobuf.EnumValueOptions {
+  // Enum extension to store the MonitoringInfoSpecs.
+  MonitoringInfoSpec monitoring_info_spec = 207174266;
+}
+
+// The key name and value string of MonitoringInfo annotations.
+message Annotation {
+  string key = 1;
+  string value = 2;
+}
+
+// Populated MonitoringInfoSpecs for specific URNs.
+// Indicating the required fields to be set.
+// SDKS and RunnerHarnesses can load these instances into memory and write a
+// validator or code generator to assist with populating and validating
+// MonitoringInfo protos.
+message MonitoringInfoSpecs {
+  enum Enum {
+    USER_COUNTER = 0 [(monitoring_info_spec) = {
+      urn: "beam:metric:user:",
+      type_urn: "beam:metrics:sum_int_64",
+    }];
+
+    ELEMENT_COUNT = 1 [(monitoring_info_spec) = {
+      urn: "beam:metric:element_count:v1",
+      type_urn: "beam:metrics:sum_int_64",
+      required_labels: [ "PTRANSFORM", "PCOLLECTION" ],
+      annotations: [ {
+        key: "description",
+        value: "The total elements output to a Pcollection by a PTransform."
+      } ]
+    }];
+
+    START_BUNDLE_MSECS = 2 [(monitoring_info_spec) = {
+      urn: "beam:metric:pardo_execution_time:start_bundle_msecs:v1",
+      type_urn: "beam:metrics:sum_int_64",
+      required_labels: [ "PTRANSFORM" ],
+      annotations: [ {
+        key: "description",
+        value: "The total estimated execution time of the start bundle"
+               "function in a pardo"
+      } ]
+    }];
+
+    PROCESS_BUNDLE_MSECS = 3 [(monitoring_info_spec) = {
+      urn: "beam:metric:pardo_execution_time:process_bundle_msecs:v1",
+      type_urn: "beam:metrics:sum_int_64",
+      required_labels: [ "PTRANSFORM" ],
+      annotations: [ {
+        key: "description",
+        value: "The total estimated execution time of the process bundle"
+               "function in a pardo"
+      } ]
+    }];
+
+    FINISH_BUNDLE_MSECS = 4 [(monitoring_info_spec) = {
+      urn: "beam:metric:pardo_execution_time:finish_bundle_msecs:v1",
+      type_urn: "beam:metrics:sum_int_64",
+      required_labels: [ "PTRANSFORM" ],
+      annotations: [ {
+        key: "description",
+        value: "The total estimated execution time of the finish bundle "
+               "function in a pardo"
+      } ]
+    }];
+
+    TOTAL_MSECS = 5 [(monitoring_info_spec) = {
+      urn: "beam:metric:ptransform_execution_time:total_msecs:v1",
+      type_urn: "beam:metrics:sum_int_64",
+      required_labels: [ "PTRANSFORM" ],
+      annotations: [ {
+        key: "description",
+        value: "The total estimated execution time of the ptransform"
+      } ]
+    }];
+  }
+}
+
 // A set of properties for the MonitoringInfoLabel, this is useful to obtain
 // the proper label string for the MonitoringInfoLabel.
 message MonitoringInfoLabelProps {
@@ -320,7 +419,7 @@ message MonitoringInfoLabelProps {
 // Enum extension to store MonitoringInfo related
 // specifications, constants, etc.
 extend google.protobuf.EnumValueOptions {
-  MonitoringInfoLabelProps label_props = 127337796; // From: commit 0x7970544.
+  MonitoringInfoLabelProps label_props = 127337796;  // From: commit 0x7970544.
 }
 
 message MonitoringInfo {
@@ -345,30 +444,20 @@ message MonitoringInfo {
 
   enum MonitoringInfoLabels {
     // TODO(ajamato): Rename all references to TRANSFORM to PTRANSFORM
-    TRANSFORM = 0 [(label_props) = {
-      name: "PTRANSFORM"
-    }];
-    PCOLLECTION = 1 [(label_props) = {
-      name: "PCOLLECTION"
-    }];
-    WINDOWING_STRATEGY = 2 [(label_props) = {
-      name: "WINDOWING_STRATEGY"
-    }];
-    CODER = 3 [(label_props) = {
-      name: "CODER"
-    }];
-    ENVIRONMENT = 4 [(label_props) = {
-      name: "ENVIRONMENT"
-    }];
+    TRANSFORM = 0 [(label_props) = { name: "PTRANSFORM" }];
+    PCOLLECTION = 1 [(label_props) = { name: "PCOLLECTION" }];
+    WINDOWING_STRATEGY = 2 [(label_props) = { name: "WINDOWING_STRATEGY" }];
+    CODER = 3 [(label_props) = { name: "CODER" }];
+    ENVIRONMENT = 4 [(label_props) = { name: "ENVIRONMENT" }];
   }
   // A set of key+value labels which define the scope of the metric.
   // Either a well defined entity id for matching the enum names in
   // the MonitoringInfoLabels enum or any arbitrary label
   // set by a custom metric or user metric.
-  // A monitoring system is expected to be able to aggregate the metrics 
together
-  // for all updates having the same URN and labels.
-  // Some systems such as Stackdriver will be able to aggregate the metrics
-  // using a subset of the provided labels
+  // A monitoring system is expected to be able to aggregate the metrics
+  // together for all updates having the same URN and labels. Some systems such
+  // as Stackdriver will be able to aggregate the metrics using a subset of the
+  // provided labels
   map<string, string> labels = 5;
 
   // The walltime of the most recent update.
@@ -379,36 +468,40 @@ message MonitoringInfo {
 message MonitoringInfoUrns {
   enum Enum {
     // User counter have this format: 'beam:metric:user:<namespace>:<name>'.
-    USER_COUNTER_URN_PREFIX = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-        "beam:metric:user:"];
+    USER_COUNTER_URN_PREFIX = 0
+        [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:metric:user:"];
 
     ELEMENT_COUNT = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-        "beam:metric:element_count:v1"];
+                           "beam:metric:element_count:v1"];
 
-    START_BUNDLE_MSECS = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-      "beam:metric:pardo_execution_time:start_bundle_msecs:v1"];
+    START_BUNDLE_MSECS = 2
+        [(org.apache.beam.model.pipeline.v1.beam_urn) =
+             "beam:metric:pardo_execution_time:start_bundle_msecs:v1"];
 
-    PROCESS_BUNDLE_MSECS = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-        "beam:metric:pardo_execution_time:process_bundle_msecs:v1"];
+    PROCESS_BUNDLE_MSECS = 3
+        [(org.apache.beam.model.pipeline.v1.beam_urn) =
+             "beam:metric:pardo_execution_time:process_bundle_msecs:v1"];
 
-    FINISH_BUNDLE_MSECS = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-        "beam:metric:pardo_execution_time:finish_bundle_msecs:v1"];
+    FINISH_BUNDLE_MSECS = 4
+        [(org.apache.beam.model.pipeline.v1.beam_urn) =
+             "beam:metric:pardo_execution_time:finish_bundle_msecs:v1"];
 
-    TOTAL_MSECS = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-        "beam:metric:ptransform_execution_time:total_msecs:v1"];
+    TOTAL_MSECS = 5
+        [(org.apache.beam.model.pipeline.v1.beam_urn) =
+             "beam:metric:ptransform_execution_time:total_msecs:v1"];
   }
 }
 
 message MonitoringInfoTypeUrns {
   enum Enum {
     SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-        "beam:metrics:sum_int_64"];
+                            "beam:metrics:sum_int_64"];
 
     DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-        "beam:metrics:distribution_int_64"];
+                                     "beam:metrics:distribution_int_64"];
 
     LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
-      "beam:metrics:latest_int_64"];
+                               "beam:metrics:latest_int_64"];
   }
 }
 
@@ -425,11 +518,11 @@ message Metric {
 // This is designed to be compatible with metric collection
 // systems such as DropWizard.
 message CounterData {
-   oneof value {
-     int64 int64_value = 1;
-     double double_value = 2;
-     string string_value = 3;
-   }
+  oneof value {
+    int64 int64_value = 1;
+    double double_value = 2;
+    string string_value = 3;
+  }
 }
 
 // Extrema messages are used for calculating
@@ -508,10 +601,10 @@ message MonitoringTableData {
     repeated MonitoringColumnValue values = 1;
   }
 
- // The number of column names must match the
- // number of values in each MonitoringRow.
- repeated string column_names = 1;
- repeated MonitoringRow row_data = 2;
+  // The number of column names must match the
+  // number of values in each MonitoringRow.
+  repeated string column_names = 1;
+  repeated MonitoringRow row_data = 2;
 }
 
 // DEPRECATED
@@ -589,7 +682,6 @@ message Metrics {
 
   // User defined metrics
   message User {
-
     // A key for identifying a metric at the most granular level.
     message MetricName {
       // (Required): The namespace of this metric.
@@ -736,12 +828,11 @@ message Elements {
 service BeamFnData {
   // Used to send data between harnesses.
   rpc Data(
-    // A stream of data representing input.
-    stream Elements
-  ) returns (
-    // A stream of data representing output.
-    stream Elements
-  ) {}
+      // A stream of data representing input.
+      stream Elements)
+      returns (
+          // A stream of data representing output.
+          stream Elements) {}
 }
 
 /*
@@ -806,13 +897,12 @@ message StateResponse {
 service BeamFnState {
   // Used to get/append/clear state stored by the runner on behalf of the SDK.
   rpc State(
-    // A stream of state instructions requested of the runner.
-    stream StateRequest
-  ) returns (
-    // A stream of responses to state instructions the runner was asked to be
-    // performed.
-    stream StateResponse
-  ) {}
+      // A stream of state instructions requested of the runner.
+      stream StateRequest)
+      returns (
+          // A stream of responses to state instructions the runner was asked 
to
+          // be performed.
+          stream StateResponse) {}
 }
 
 message StateKey {
@@ -888,16 +978,13 @@ message StateAppendRequest {
 }
 
 // A response to append state.
-message StateAppendResponse {
-}
+message StateAppendResponse {}
 
 // A request to clear state.
-message StateClearRequest {
-}
+message StateClearRequest {}
 
 // A response to clear state.
-message StateClearResponse {
-}
+message StateClearResponse {}
 
 /*
  * Logging API
@@ -985,23 +1072,20 @@ message LogEntry {
   string thread = 8;
 }
 
-message LogControl {
-}
+message LogControl {}
 
 // Stable
 service BeamFnLogging {
   // Allows for the SDK to emit log entries which the runner can
   // associate with the active job.
   rpc Logging(
-    // A stream of log entries batched into lists emitted by the SDK harness.
-    stream LogEntry.List
-  ) returns (
-    // A stream of log control messages used to configure the SDK.
-    stream LogControl
-  ) {}
+      // A stream of log entries batched into lists emitted by the SDK harness.
+      stream LogEntry.List)
+      returns (
+          // A stream of log control messages used to configure the SDK.
+          stream LogControl) {}
 }
 
-
 message NotifyRunnerAvailableRequest {
   string worker_id = 1;
   org.apache.beam.model.pipeline.v1.ApiServiceDescriptor control_endpoint = 2;
@@ -1017,6 +1101,5 @@ message NotifyRunnerAvailableResponse {
 
 service BeamFnExternalWorkerPool {
   rpc NotifyRunnerAvailable(NotifyRunnerAvailableRequest)
-    returns (NotifyRunnerAvailableResponse) {
-  }
+      returns (NotifyRunnerAvailableResponse) {}
 }
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
new file mode 100644
index 000000000000..c8e7390ea20a
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import com.google.common.base.Splitter;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpec;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpecs;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoTypeUrns;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoUrns;
+import org.apache.beam.runners.core.construction.BeamUrns;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simplified building of MonitoringInfo fields, allows setting one field at a 
time with simpler
+ * method calls, without needing to dive into the details of the nested protos.
+ *
+ * <p>There is no need to set the type field, by setting the appropriate value 
field: (i.e.
+ * setInt64Value), the typeUrn field is automatically set.
+ *
+ * <p>Additionally, if validateAndDropInvalid is set to true in the ctor, then 
MonitoringInfos will
+ * be returned as null when build() is called if any fields are not properly 
set. This is based on
+ * comparing the fields which are set to the MonitoringInfoSpec in 
beam_fn_api.proto.
+ *
+ * <p>Example Usage (ElementCount counter):
+ *
+ * <p>SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
+ * builder.setUrn(SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN); 
builder.setInt64Value(1);
+ * builder.setPTransformLabel("myTransform"); 
builder.setPCollectionLabel("myPcollection");
+ * MonitoringInfo mi = builder.build();
+ *
+ * <p>Example Usage (ElementCount counter):
+ *
+ * <p>SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
+ * 
builder.setUrn(SimpleMonitoringInfoBuilder.setUrnForUserMetric("myNamespace", 
"myName"));
+ * builder.setInt64Value(1); MonitoringInfo mi = builder.build();
+ */
+public class SimpleMonitoringInfoBuilder {
+  public static final String ELEMENT_COUNT_URN =
+      BeamUrns.getUrn(MonitoringInfoUrns.Enum.ELEMENT_COUNT);
+  public static final String USER_COUNTER_URN_PREFIX =
+      BeamUrns.getUrn(MonitoringInfoUrns.Enum.USER_COUNTER_URN_PREFIX);
+  public static final String SUM_INT64_TYPE_URN =
+      BeamUrns.getUrn(MonitoringInfoTypeUrns.Enum.SUM_INT64_TYPE);
+
+  private static final HashMap<String, MonitoringInfoSpec> specs =
+      new HashMap<String, MonitoringInfoSpec>();
+
+  private final boolean validateAndDropInvalid;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SimpleMonitoringInfoBuilder.class);
+
+  private MonitoringInfo.Builder builder;
+
+  static {
+    for (MonitoringInfoSpecs.Enum val : MonitoringInfoSpecs.Enum.values()) {
+      // The enum iterator inserts an UNRECOGNIZED = -1 value which isn't 
explicitly added in
+      // the proto files.
+      if (!((Enum) val).name().equals("UNRECOGNIZED")) {
+        MonitoringInfoSpec spec =
+            
val.getValueDescriptor().getOptions().getExtension(BeamFnApi.monitoringInfoSpec);
+        SimpleMonitoringInfoBuilder.specs.put(spec.getUrn(), spec);
+      }
+    }
+  }
+
+  public SimpleMonitoringInfoBuilder() {
+    this(true);
+  }
+
+  public SimpleMonitoringInfoBuilder(boolean validateAndDropInvalid) {
+    this.builder = MonitoringInfo.newBuilder();
+    this.validateAndDropInvalid = validateAndDropInvalid;
+  }
+
+  /** @return True if the MonitoringInfo has valid fields set, matching the 
spec */
+  private boolean validate() {
+    String urn = this.builder.getUrn();
+    if (urn == null || urn.isEmpty()) {
+      LOG.warn("Dropping MonitoringInfo since no URN was specified.");
+      return false;
+    }
+
+    MonitoringInfoSpec spec;
+    // If it's a user counter, and it has this prefix.
+    if (urn.startsWith(USER_COUNTER_URN_PREFIX)) {
+      spec = SimpleMonitoringInfoBuilder.specs.get(USER_COUNTER_URN_PREFIX);
+      List<String> split = Splitter.on(':').splitToList(urn);
+      if (split.size() != 5) {
+        LOG.warn(
+            "Dropping MonitoringInfo for URN {}, UserMetric namespaces and "
+                + "name cannot contain ':' characters.",
+            urn);
+        return false;
+      }
+    } else if (!SimpleMonitoringInfoBuilder.specs.containsKey(urn)) {
+      // Succeed for unknown URNs, this is an extensible metric.
+      return true;
+    } else {
+      spec = SimpleMonitoringInfoBuilder.specs.get(urn);
+    }
+
+    if (!this.builder.getType().equals(spec.getTypeUrn())) {
+      LOG.warn(
+          "Dropping MonitoringInfo since for URN {} with invalid type field. 
Expected: {}"
+              + " Actual: {}",
+          this.builder.getUrn(),
+          spec.getTypeUrn(),
+          this.builder.getType());
+      return false;
+    }
+
+    Set<String> requiredLabels = new 
HashSet<String>(spec.getRequiredLabelsList());
+    if (!this.builder.getLabels().keySet().equals(requiredLabels)) {
+      LOG.warn(
+          "Dropping MonitoringInfo since for URN {} with invalid labels. 
Expected: {}"
+              + " Actual: {}",
+          this.builder.getUrn(),
+          requiredLabels,
+          this.builder.getLabels().keySet());
+      return false;
+    }
+    return true;
+  }
+
+  /** @return The metric URN for a user metric, with a proper URN prefix. */
+  private static String userMetricUrn(String metricNamespace, String 
metricName) {
+    String fixedMetricNamespace = metricNamespace.replace(':', '_');
+    String fixedMetricName = metricName.replace(':', '_');
+    StringBuilder sb = new StringBuilder();
+    sb.append(USER_COUNTER_URN_PREFIX);
+    sb.append(fixedMetricNamespace);
+    sb.append(':');
+    sb.append(fixedMetricName);
+    return sb.toString();
+  }
+
+  /**
+   * Sets the urn of the MonitoringInfo.
+   *
+   * @param urn The urn of the MonitoringInfo
+   */
+  public void setUrn(String urn) {
+    this.builder.setUrn(urn);
+  }
+
+  /**
+   * Sets the urn of the MonitoringInfo to a proper user metric URN for the 
given params.
+   *
+   * @param namespace
+   * @param name
+   */
+  public void setUrnForUserMetric(String namespace, String name) {
+    this.builder.setUrn(userMetricUrn(namespace, name));
+  }
+
+  /** Sets the timestamp of the MonitoringInfo to the current time. */
+  public void setTimestampToNow() {
+    Instant time = Instant.now();
+    
this.builder.getTimestampBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano());
+  }
+
+  /** Sets the int64Value of the CounterData in the MonitoringInfo, and the 
appropraite type URN. */
+  public void setInt64Value(long value) {
+    
this.builder.getMetricBuilder().getCounterDataBuilder().setInt64Value(value);
+    this.builder.setType(SUM_INT64_TYPE_URN);
+  }
+
+  /** Sets the PTRANSFORM MonitoringInfo label to the given param. */
+  public void setPTransformLabel(String pTransform) {
+    // TODO(ajamato): Add validation that it is a valid pTransform name in the 
bundle descriptor.
+    setLabel("PTRANSFORM", pTransform);
+  }
+
+  /** Sets the PCOLLECTION MonitoringInfo label to the given param. */
+  public void setPCollectionLabel(String pCollection) {
+    setLabel("PCOLLECTION", pCollection);
+  }
+
+  /** Sets the MonitoringInfo label to the given name and value. */
+  public void setLabel(String labelName, String labelValue) {
+    this.builder.putLabels(labelName, labelValue);
+  }
+
+  /**
+   * Builds the provided MonitoringInfo. Returns null if 
validateAndDropInvalid set and fields do
+   * not match respecting MonitoringInfoSpec based on urn.
+   */
+  @Nullable
+  public MonitoringInfo build() {
+    if (validateAndDropInvalid && !validate()) {
+      return null;
+    }
+    return this.builder.build();
+  }
+}
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
new file mode 100644
index 000000000000..2697ef8df497
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilderTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SimpleMonitoringInfoBuilder}. */
+@RunWith(JUnit4.class)
+public class SimpleMonitoringInfoBuilderTest {
+
+  @Test
+  public void testReturnsNullIfSpecRequirementsNotMet() {
+    SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
+    builder.setUrn(SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN);
+    assertNull(builder.build());
+
+    builder.setInt64Value(1);
+    assertNull(builder.build());
+
+    builder.setPTransformLabel("myTransform");
+    assertNull(builder.build());
+
+    builder.setPCollectionLabel("myPcollection");
+    // Pass now that the spec is fully met.
+    MonitoringInfo monitoringInfo = builder.build();
+    assertTrue(monitoringInfo != null);
+    assertEquals("myTransform", 
monitoringInfo.getLabelsOrDefault("PTRANSFORM", null));
+    assertEquals("myPcollection", 
monitoringInfo.getLabelsOrDefault("PCOLLECTION", null));
+    assertEquals(SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN, 
monitoringInfo.getUrn());
+    assertEquals(SimpleMonitoringInfoBuilder.SUM_INT64_TYPE_URN, 
monitoringInfo.getType());
+    assertEquals(1, 
monitoringInfo.getMetric().getCounterData().getInt64Value());
+  }
+
+  @Test
+  public void testUserCounter() {
+    SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
+    builder.setUrnForUserMetric("myNamespace", "myName");
+    assertNull(builder.build());
+
+    builder.setInt64Value(1);
+    // Pass now that the spec is fully met.
+    MonitoringInfo monitoringInfo = builder.build();
+    assertTrue(monitoringInfo != null);
+    assertEquals(
+        SimpleMonitoringInfoBuilder.USER_COUNTER_URN_PREFIX + 
"myNamespace:myName",
+        monitoringInfo.getUrn());
+    assertEquals(SimpleMonitoringInfoBuilder.SUM_INT64_TYPE_URN, 
monitoringInfo.getType());
+    assertEquals(1, 
monitoringInfo.getMetric().getCounterData().getInt64Value());
+  }
+
+  @Test
+  public void testUserMetricWithInvalidDelimiterCharacterIsReplaced() {
+    SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
+    builder.setUrnForUserMetric("myNamespace:withInvalidChar", "myName");
+    builder.setInt64Value(1);
+    // Pass now that the spec is fully met.
+    MonitoringInfo monitoringInfo = builder.build();
+    assertTrue(monitoringInfo != null);
+    assertEquals(
+        SimpleMonitoringInfoBuilder.USER_COUNTER_URN_PREFIX + 
"myNamespace_withInvalidChar:myName",
+        monitoringInfo.getUrn());
+    assertEquals(SimpleMonitoringInfoBuilder.SUM_INT64_TYPE_URN, 
monitoringInfo.getType());
+    assertEquals(1, 
monitoringInfo.getMetric().getCounterData().getInt64Value());
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 174633)
    Time Spent: 4h 50m  (was: 4h 40m)

> Add User Metric Support to Java SDK
> -----------------------------------
>
>                 Key: BEAM-6138
>                 URL: https://issues.apache.org/jira/browse/BEAM-6138
>             Project: Beam
>          Issue Type: New Feature
>          Components: java-fn-execution
>            Reporter: Alex Amato
>            Assignee: Alex Amato
>            Priority: Major
>          Time Spent: 4h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to