[ 
https://issues.apache.org/jira/browse/BEAM-4374?focusedWorklogId=154064&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-154064
 ]

ASF GitHub Bot logged work on BEAM-4374:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Oct/18 23:58
            Start Date: 12/Oct/18 23:58
    Worklog Time Spent: 10m 
      Work Description: pabloem closed pull request #6205: [BEAM-4374] 
Implementing a subset of the new metrics framework in python.
URL: https://github.com/apache/beam/pull/6205
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto 
b/model/fn-execution/src/main/proto/beam_fn_api.proto
index a0795a7c285..915686de6b3 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -40,6 +40,7 @@ option java_outer_classname = "BeamFnApi";
 
 import "beam_runner_api.proto";
 import "endpoints.proto";
+import "google/protobuf/descriptor.proto";
 import "google/protobuf/timestamp.proto";
 import "google/protobuf/wrappers.proto";
 
@@ -250,11 +251,16 @@ message ProcessBundleRequest {
 message ProcessBundleResponse {
   // (Optional) If metrics reporting is supported by the SDK, this represents
   // the final metrics to record for this bundle.
+  // DEPRECATED
   Metrics metrics = 1;
 
   // (Optional) Specifies that the bundle has been split since the last
   // ProcessBundleProgressResponse was sent.
   BundleSplit split = 2;
+
+  // (Required) The list of metrics or other MonitoredState
+  // collected while processing this bundle.
+  repeated MonitoringInfo monitoring_infos = 3;
 }
 
 // A request to report progress information for a given bundle.
@@ -275,9 +281,9 @@ message MonitoringInfo {
   // Sub types like field formats - int64, double, string.
   // Aggregation methods - SUM, LATEST, TOP-N, BOTTOM-N, DISTRIBUTION
   // valid values are:
-  // beam:metrics:[SumInt64|LatestInt64|Top-NInt64|Bottom-NInt64|
-  //     SumDouble|LatestDouble|Top-NDouble|Bottom-NDouble|DistributionInt64|
-  //     DistributionDouble|MonitoringDataTable]
+  // beam:metrics:[sum_int_64|latest_int_64|top_n_int_64|bottom_n_int_64|
+  //     sum_double|latest_double|top_n_double|bottom_n_double|
+  //     distribution_int_64|distribution_double|monitoring_data_table
   string type = 2;
 
   // The Metric or monitored state.
@@ -302,6 +308,45 @@ message MonitoringInfo {
   // Some systems such as Stackdriver will be able to aggregate the metrics
   // using a subset of the provided labels
   map<string, string> labels = 5;
+
+  // The walltime of the most recent update.
+  // Useful for aggregation for Latest types such as LatestInt64.
+  google.protobuf.Timestamp timestamp = 6;
+}
+
+message MonitoringInfoUrns {
+  enum Enum {
+    USER_COUNTER_URN_PREFIX = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+        "beam:metric:user"];
+
+    ELEMENT_COUNT = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+        "beam:metric:element_count:v1"];
+
+    START_BUNDLE_MSECS = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+      "beam:metric:pardo_execution_time:start_bundle_msecs:v1"];
+
+    PROCESS_BUNDLE_MSECS = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+        "beam:metric:pardo_execution_time:process_bundle_msecs:v1"];
+
+    FINISH_BUNDLE_MSECS = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+        "beam:metric:pardo_execution_time:finish_bundle_msecs:v1"];
+
+    TOTAL_MSECS = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+        "beam:metric:ptransform_execution_time:total_msecs:v1"];
+  }
+}
+
+message MonitoringInfoTypeUrns {
+  enum Enum {
+    SUM_INT64_TYPE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+        "beam:metrics:sum_int_64"];
+
+    DISTRIBUTION_INT64_TYPE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+        "beam:metrics:distribution_int_64"];
+
+    LATEST_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+      "beam:metrics:latest_int_64"];
+  }
 }
 
 message Metric {
@@ -525,12 +570,16 @@ message Metrics {
 }
 
 message ProcessBundleProgressResponse {
-  // (Required)
+  // DEPRECATED (Required)
   Metrics metrics = 1;
 
   // (Optional) Specifies that the bundle has been split since the last
   // ProcessBundleProgressResponse was sent.
   BundleSplit split = 2;
+
+  // (Required) The list of metrics or other MonitoredState
+  // collected while processing this bundle.
+  repeated MonitoringInfo monitoring_infos = 3;
 }
 
 message ProcessBundleSplitRequest {
@@ -795,7 +844,6 @@ message LogEntry {
     enum Enum {
       // Unspecified level information. Will be logged at the TRACE level.
       UNSPECIFIED = 0;
-      // Trace level information.
       TRACE = 1;
       // Debugging information.
       DEBUG = 2;
@@ -863,4 +911,3 @@ service BeamFnLogging {
     stream LogControl
   ) {}
 }
-
diff --git a/model/fn-execution/src/main/proto/metric_definitions.yaml 
b/model/fn-execution/src/main/proto/metric_definitions.yaml
new file mode 100644
index 00000000000..f32f7cded2b
--- /dev/null
+++ b/model/fn-execution/src/main/proto/metric_definitions.yaml
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Metrics Definitions describing various BEAM metrics.
+# See: https://s.apache.org/beam-fn-api-metrics
+
+- annotations:
+    description: The total estimated execution time of the ptransform
+    unit: msecs
+  labels:
+  - PTRANSFORM
+  type: beam:metrics:SumInt64
+  urn: beam:metric:ptransform_execution_time:total_msecs:v1
+- annotations:
+    description: The total estimated execution time of the start bundle 
function in
+      a pardo
+    unit: msecs
+  labels:
+  - PTRANSFORM
+  type: beam:metrics:SumInt64
+  urn: beam:metric:pardo_execution_time:start_bundle_msecs:v1
+- annotations:
+    description: The total estimated execution time of the process bundle 
function
+      in a pardo
+    unit: msecs
+  labels:
+  - PTRANSFORM
+  type: beam:metrics:SumInt64
+  urn: beam:metric:pardo_execution_time:process_bundle_msecs:v1
+- annotations:
+    description: The total estimated execution time of the finish bundle 
function
+      in a pardo
+    unit: msecs
+  labels:
+  - PTRANSFORM
+  type: beam:metrics:SumInt64
+  urn: beam:metric:pardo_execution_time:finish_bundle_msecs:v1
+- annotations:
+    description: The total elements counted for a metric.
+  labels:
+  - PTRANSFORM
+  type: beam:metrics:SumInt64
+  urn: beam:metric:element_count:v1
diff --git a/sdks/python/apache_beam/metrics/cells.py 
b/sdks/python/apache_beam/metrics/cells.py
index 8f93d7fffe5..b177d2014d7 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -161,6 +161,18 @@ def get_cumulative(self):
     with self._lock:
       return self.value
 
+  def to_runner_api_monitoring_info(self):
+    """Returns a Metric with this counter value for use in a MonitoringInfo."""
+    # TODO(ajamato): Update this code to be consisten with Gauges
+    # and Distributions. Since there is no CounterData class this method
+    # was added to CounterCell. Consider adding a CounterData class or
+    # removing the GaugeData and DistributionData classes.
+    return beam_fn_api_pb2.Metric(
+        counter_data=beam_fn_api_pb2.CounterData(
+            int64_value=self.get_cumulative()
+        )
+    )
+
 
 class DistributionCell(Distribution, MetricCell):
   """For internal use only; no backwards-compatibility guarantees.
@@ -375,6 +387,14 @@ def from_runner_api(proto):
                        float(proto.timestamp.nanos) / 10**9)
     return GaugeData(proto.value, timestamp=gauge_timestamp)
 
+  def to_runner_api_monitoring_info(self):
+    """Returns a Metric with this value for use in a MonitoringInfo."""
+    return beam_fn_api_pb2.Metric(
+        counter_data=beam_fn_api_pb2.CounterData(
+            int64_value=self.value
+        )
+    )
+
 
 class DistributionData(object):
   """For internal use only; no backwards-compatibility guarantees.
@@ -440,6 +460,13 @@ def to_runner_api(self):
   def from_runner_api(proto):
     return DistributionData(proto.sum, proto.count, proto.min, proto.max)
 
+  def to_runner_api_monitoring_info(self):
+    """Returns a Metric with this value for use in a MonitoringInfo."""
+    return beam_fn_api_pb2.Metric(
+        distribution_data=beam_fn_api_pb2.DistributionData(
+            int_distribution_data=beam_fn_api_pb2.IntDistributionData(
+                count=self.count, sum=self.sum, min=self.min, max=self.max)))
+
 
 class MetricAggregator(object):
   """For internal use only; no backwards-compatibility guarantees.
diff --git a/sdks/python/apache_beam/metrics/execution.py 
b/sdks/python/apache_beam/metrics/execution.py
index 789d85bc34a..2d771394c23 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -36,9 +36,11 @@
 from builtins import object
 from collections import defaultdict
 
+from apache_beam.metrics import monitoring_infos
 from apache_beam.metrics.cells import CounterCell
 from apache_beam.metrics.cells import DistributionCell
 from apache_beam.metrics.cells import GaugeCell
+from apache_beam.metrics.monitoring_infos import user_metric_urn
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.runners.worker import statesampler
 
@@ -211,6 +213,31 @@ def to_runner_api(self):
          for k, v in self.gauges.items()]
     )
 
+  def to_runner_api_monitoring_infos(self, transform_id):
+    """Returns a list of MonitoringInfos for the metrics in this container."""
+    all_user_metrics = []
+    for k, v in self.counters.items():
+      all_user_metrics.append(monitoring_infos.int64_counter(
+          user_metric_urn(k.namespace, k.name),
+          v.to_runner_api_monitoring_info(),
+          ptransform=transform_id
+      ))
+
+    for k, v in self.distributions.items():
+      all_user_metrics.append(monitoring_infos.int64_distribution(
+          user_metric_urn(k.namespace, k.name),
+          v.get_cumulative().to_runner_api_monitoring_info(),
+          ptransform=transform_id
+      ))
+
+    for k, v in self.gauges.items():
+      all_user_metrics.append(monitoring_infos.int64_gauge(
+          user_metric_urn(k.namespace, k.name),
+          v.get_cumulative().to_runner_api_monitoring_info(),
+          ptransform=transform_id
+      ))
+    return {monitoring_infos.to_key(mi) : mi for mi in all_user_metrics}
+
 
 class MetricUpdates(object):
   """Contains updates for several metrics.
diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py 
b/sdks/python/apache_beam/metrics/monitoring_infos.py
new file mode 100644
index 00000000000..73a0599e46a
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/monitoring_infos.py
@@ -0,0 +1,246 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# cython: language_level=3
+# cython: profile=True
+
+from __future__ import absolute_import
+
+import time
+
+from google.protobuf import timestamp_pb2
+
+from apache_beam.metrics.cells import DistributionData
+from apache_beam.metrics.cells import DistributionResult
+from apache_beam.metrics.cells import GaugeData
+from apache_beam.metrics.cells import GaugeResult
+from apache_beam.portability import common_urns
+from apache_beam.portability.api.beam_fn_api_pb2 import CounterData
+from apache_beam.portability.api.beam_fn_api_pb2 import Metric
+from apache_beam.portability.api.beam_fn_api_pb2 import MonitoringInfo
+
+ELEMENT_COUNT_URN = common_urns.monitoring_infos.ELEMENT_COUNT.urn
+START_BUNDLE_MSECS_URN = common_urns.monitoring_infos.START_BUNDLE_MSECS.urn
+PROCESS_BUNDLE_MSECS_URN = 
common_urns.monitoring_infos.PROCESS_BUNDLE_MSECS.urn
+FINISH_BUNDLE_MSECS_URN = common_urns.monitoring_infos.FINISH_BUNDLE_MSECS.urn
+TOTAL_MSECS_URN = common_urns.monitoring_infos.TOTAL_MSECS.urn
+USER_COUNTER_URN_PREFIX = (
+    common_urns.monitoring_infos.USER_COUNTER_URN_PREFIX.urn)
+
+# TODO(ajamato): Implement the remaining types, i.e. Double types
+# Extrema types, etc. See:
+# https://s.apache.org/beam-fn-api-metrics
+SUM_INT64_TYPE = common_urns.monitoring_info_types.SUM_INT64_TYPE.urn
+DISTRIBUTION_INT64_TYPE = (
+    common_urns.monitoring_info_types.DISTRIBUTION_INT64_TYPE.urn)
+LATEST_INT64_TYPE = common_urns.monitoring_info_types.LATEST_INT64_TYPE.urn
+
+COUNTER_TYPES = set([SUM_INT64_TYPE])
+DISTRIBUTION_TYPES = set([DISTRIBUTION_INT64_TYPE])
+GAUGE_TYPES = set([LATEST_INT64_TYPE])
+
+
+def to_timestamp_proto(timestamp_secs):
+  """Converts seconds since epoch to a google.protobuf.Timestamp.
+
+  Args:
+    timestamp_secs: The timestamp in seconds since epoch.
+  """
+  seconds = int(timestamp_secs)
+  nanos = int((timestamp_secs - seconds) * 10**9)
+  return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
+
+
+def to_timestamp_secs(timestamp_proto):
+  """Converts a google.protobuf.Timestamp to seconds since epoch.
+
+  Args:
+    timestamp_proto: The google.protobuf.Timestamp.
+  """
+  return timestamp_proto.seconds + timestamp_proto.nanos * 10**-9
+
+
+def extract_counter_value(monitoring_info_proto):
+  """Returns the int coutner value of the monitoring info."""
+  if is_counter(monitoring_info_proto) or is_gauge(monitoring_info_proto):
+    return monitoring_info_proto.metric.counter_data.int64_value
+  return None
+
+
+def extract_distribution(monitoring_info_proto):
+  """Returns the relevant DistributionInt64 or DistributionDouble.
+
+  Args:
+    monitoring_info_proto: The monitoring infor for the distribution.
+  """
+  if is_distribution(monitoring_info_proto):
+    return monitoring_info_proto.metric.distribution_data.int_distribution_data
+  return None
+
+
+def create_labels(ptransform='', tag=''):
+  """Create the label dictionary based on the provided tags.
+
+  Args:
+    ptransform: The ptransform/step name.
+    tag: he output tag name, used as a label.
+  """
+  labels = {}
+  if tag:
+    labels['TAG'] = tag
+  if ptransform:
+    labels['PTRANSFORM'] = ptransform
+  return labels
+
+
+def int64_counter(urn, metric, ptransform='', tag=''):
+  """Return the counter monitoring info for the specifed URN, metric and 
labels.
+
+  Args:
+    urn: The URN of the monitoring info/metric.
+    metric: The metric proto field to use in the monitoring info.
+        Or an int value.
+    ptransform: The ptransform/step name used as a label.
+    tag: The output tag name, used as a label.
+  """
+  labels = create_labels(ptransform=ptransform, tag=tag)
+  if isinstance(metric, int):
+    metric = Metric(
+        counter_data=CounterData(
+            int64_value=metric
+        )
+    )
+  return create_monitoring_info(urn, SUM_INT64_TYPE, metric, labels)
+
+
+def int64_distribution(urn, metric, ptransform='', tag=''):
+  """Return the distribution monitoring info for the URN, metric and labels.
+
+  Args:
+    urn: The URN of the monitoring info/metric.
+    metric: The metric proto field to use in the monitoring info.
+        Or an int value.
+    ptransform: The ptransform/step name used as a label.
+    tag: The output tag name, used as a label.
+  """
+  labels = create_labels(ptransform=ptransform, tag=tag)
+  return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, metric, labels)
+
+
+def int64_gauge(urn, metric, ptransform='', tag=''):
+  """Return the gauge monitoring info for the URN, metric and labels.
+
+  Args:
+    urn: The URN of the monitoring info/metric.
+    metric: The metric proto field to use in the monitoring info.
+        Or an int value.
+    ptransform: The ptransform/step name used as a label.
+    tag: The output tag name, used as a label.
+  """
+  labels = create_labels(ptransform=ptransform, tag=tag)
+  return create_monitoring_info(urn, LATEST_INT64_TYPE, metric, labels)
+
+
+def create_monitoring_info(urn, type_urn, metric_proto, labels=None):
+  """Return the gauge monitoring info for the URN, type, metric and labels.
+
+  Args:
+    urn: The URN of the monitoring info/metric.
+    type_urn: The URN of the type of the monitoring info/metric.
+        i.e. beam:metrics:sum_int_64, beam:metrics:latest_int_64.
+    metric_proto: The metric proto field to use in the monitoring info.
+        Or an int value.
+    labels: The label dictionary to use in the MonitoringInfo.
+  """
+  return MonitoringInfo(
+      urn=urn,
+      type=type_urn,
+      labels=labels or dict(),
+      metric=metric_proto,
+      timestamp=to_timestamp_proto(time.time())
+  )
+
+
+def user_metric_urn(namespace, name):
+  """Returns the metric URN for a user metric, with a proper URN prefix.
+
+  Args:
+    namespace: The namespace of the metric.
+    name: The name of the metric.
+  """
+  return '%s%s:%s' % (USER_COUNTER_URN_PREFIX, namespace, name)
+
+
+def is_counter(monitoring_info_proto):
+  """Returns true if the monitoring info is a coutner metric."""
+  return monitoring_info_proto.type in COUNTER_TYPES
+
+
+def is_distribution(monitoring_info_proto):
+  """Returns true if the monitoring info is a distrbution metric."""
+  return monitoring_info_proto.type in DISTRIBUTION_TYPES
+
+
+def is_gauge(monitoring_info_proto):
+  """Returns true if the monitoring info is a gauge metric."""
+  return monitoring_info_proto.type in GAUGE_TYPES
+
+
+def is_user_monitoring_info(monitoring_info_proto):
+  """Returns true if the monitoring info is a user metric."""
+  return monitoring_info_proto.urn.startswith(USER_COUNTER_URN_PREFIX)
+
+
+def extract_metric_result_map_value(monitoring_info_proto):
+  """Returns the relevant GaugeResult, DistributionResult or int value.
+
+  These are the proper format for use in the MetricResult.query() result.
+  """
+  # Returns a metric result (AKA the legacy format).
+  # from the MonitoringInfo
+  if is_counter(monitoring_info_proto):
+    return extract_counter_value(monitoring_info_proto)
+  if is_distribution(monitoring_info_proto):
+    distribution_data = extract_distribution(monitoring_info_proto)
+    return DistributionResult(
+        DistributionData(distribution_data.sum, distribution_data.count,
+                         distribution_data.min, distribution_data.max))
+  if is_gauge(monitoring_info_proto):
+    timestamp_secs = to_timestamp_secs(monitoring_info_proto.timestamp)
+    return GaugeResult(GaugeData(
+        extract_counter_value(monitoring_info_proto), timestamp_secs))
+
+
+def parse_namespace_and_name(monitoring_info_proto):
+  """Returns the (namespace, name) tuple of the URN in the monitoring info."""
+  to_split = monitoring_info_proto.urn
+  if is_user_monitoring_info(monitoring_info_proto):
+    # Remove the URN prefix which indicates that it is a user counter.
+    to_split = monitoring_info_proto.urn[len(USER_COUNTER_URN_PREFIX):]
+  # If it is not a user counter, just use the first part fo the URN, i.e. 
'beam'
+  split = to_split.split(':')
+  return split[0], ':'.join(split[1:])
+
+
+def to_key(monitoring_info_proto):
+  """Returns a key based on the URN and labels.
+
+  This is useful in maps to prevent reporting the same MonitoringInfo twice.
+  """
+  key_items = [i for i in monitoring_info_proto.labels.items()]
+  key_items.append(monitoring_info_proto.urn)
+  return frozenset(key_items)
diff --git a/sdks/python/apache_beam/portability/common_urns.py 
b/sdks/python/apache_beam/portability/common_urns.py
index 30d499e2bde..793bfcbf5df 100644
--- a/sdks/python/apache_beam/portability/common_urns.py
+++ b/sdks/python/apache_beam/portability/common_urns.py
@@ -21,6 +21,7 @@
 
 from builtins import object
 
+from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.portability.api import standard_window_fns_pb2
 
@@ -67,3 +68,8 @@ def PropertiesFromPayloadType(payload_type):
     standard_window_fns_pb2.SlidingWindowsPayload)
 session_windows = PropertiesFromPayloadType(
     standard_window_fns_pb2.SessionsPayload)
+
+monitoring_infos = PropertiesFromEnumType(
+    beam_fn_api_pb2.MonitoringInfoUrns.Enum)
+monitoring_info_types = PropertiesFromEnumType(
+    beam_fn_api_pb2.MonitoringInfoTypeUrns.Enum)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 692a9c8997b..f1b205a8a29 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -39,6 +39,8 @@
 from apache_beam.coders.coder_impl import create_InputStream
 from apache_beam.coders.coder_impl import create_OutputStream
 from apache_beam.internal import pickler
+from apache_beam.metrics import monitoring_infos
+from apache_beam.metrics.execution import MetricKey
 from apache_beam.metrics.execution import MetricsEnvironment
 from apache_beam.options.value_provider import RuntimeValueProvider
 from apache_beam.portability import common_urns
@@ -1000,23 +1002,26 @@ def leaf_transforms(root_ids):
     return pipeline_components, stages, safe_coders
 
   def run_stages(self, pipeline_components, stages, safe_coders):
-
     if self._use_grpc:
       controller = FnApiRunner.GrpcController(self._sdk_harness_factory)
     else:
       controller = FnApiRunner.DirectController()
     metrics_by_stage = {}
+    monitoring_infos_by_stage = {}
 
     try:
       pcoll_buffers = collections.defaultdict(list)
       for stage in stages:
-        metrics_by_stage[stage.name] = self.run_stage(
+        stage_results = self.run_stage(
             controller, pipeline_components, stage,
-            pcoll_buffers, safe_coders).process_bundle.metrics
+            pcoll_buffers, safe_coders)
+        metrics_by_stage[stage.name] = stage_results.process_bundle.metrics
+        monitoring_infos_by_stage[stage.name] = (
+            stage_results.process_bundle.monitoring_infos)
     finally:
       controller.close()
-
-    return RunnerResult(runner.PipelineState.DONE, metrics_by_stage)
+    return RunnerResult(
+        runner.PipelineState.DONE, monitoring_infos_by_stage, metrics_by_stage)
 
   def run_stage(
       self, controller, pipeline_components, stage, pcoll_buffers, 
safe_coders):
@@ -1471,28 +1476,43 @@ def get(self, timeout=None):
 
 
 class FnApiMetrics(metrics.metric.MetricResults):
-  def __init__(self, step_metrics):
+  def __init__(self, step_monitoring_infos, user_metrics_only=True):
+    """Used for querying metrics from the PipelineResult object.
+
+      step_monitoring_infos: Per step metrics specified as MonitoringInfos.
+      use_monitoring_infos: If true, return the metrics based on the
+          step_monitoring_infos.
+    """
     self._counters = {}
     self._distributions = {}
     self._gauges = {}
-    for step_metric in step_metrics.values():
-      for ptransform_id, ptransform in step_metric.ptransforms.items():
-        for proto in ptransform.user:
-          key = metrics.execution.MetricKey(
-              ptransform_id,
-              metrics.metricbase.MetricName.from_runner_api(proto.metric_name))
-          if proto.HasField('counter_data'):
-            self._counters[key] = proto.counter_data.value
-          elif proto.HasField('distribution_data'):
-            self._distributions[
-                key] = metrics.cells.DistributionResult(
-                    metrics.cells.DistributionData.from_runner_api(
-                        proto.distribution_data))
-          elif proto.HasField('gauge_data'):
-            self._gauges[
-                key] = metrics.cells.GaugeResult(
-                    metrics.cells.GaugeData.from_runner_api(
-                        proto.gauge_data))
+    self._user_metrics_only = user_metrics_only
+    self._init_metrics_from_monitoring_infos(step_monitoring_infos)
+
+  def _init_metrics_from_monitoring_infos(self, step_monitoring_infos):
+    for smi in step_monitoring_infos.values():
+      # Only include user metrics.
+      for mi in smi:
+        if (self._user_metrics_only and
+            not monitoring_infos.is_user_monitoring_info(mi)):
+          continue
+        key = self._to_metric_key(mi)
+        if monitoring_infos.is_counter(mi):
+          self._counters[key] = (
+              monitoring_infos.extract_metric_result_map_value(mi))
+        elif monitoring_infos.is_distribution(mi):
+          self._distributions[key] = (
+              monitoring_infos.extract_metric_result_map_value(mi))
+        elif monitoring_infos.is_gauge(mi):
+          self._gauges[key] = (
+              monitoring_infos.extract_metric_result_map_value(mi))
+
+  def _to_metric_key(self, monitoring_info):
+    # Right now this assumes that all metrics have a PTRANSFORM
+    ptransform_id = monitoring_info.labels['PTRANSFORM']
+    namespace, name = 
monitoring_infos.parse_namespace_and_name(monitoring_info)
+    return MetricKey(
+        ptransform_id, metrics.metricbase.MetricName(namespace, name))
 
   def query(self, filter=None):
     counters = [metrics.execution.MetricResult(k, v, v)
@@ -1505,24 +1525,35 @@ def query(self, filter=None):
               for k, v in self._gauges.items()
               if self.matches(filter, k)]
 
-    return {'counters': counters,
-            'distributions': distributions,
-            'gauges': gauges}
+    return {self.COUNTERS: counters,
+            self.DISTRIBUTIONS: distributions,
+            self.GAUGES: gauges}
 
 
 class RunnerResult(runner.PipelineResult):
-  def __init__(self, state, metrics_by_stage):
+  def __init__(self, state, monitoring_infos_by_stage, metrics_by_stage):
     super(RunnerResult, self).__init__(state)
+    self._monitoring_infos_by_stage = monitoring_infos_by_stage
     self._metrics_by_stage = metrics_by_stage
-    self._user_metrics = None
+    self._metrics = None
+    self._monitoring_metrics = None
 
   def wait_until_finish(self, duration=None):
     return self._state
 
   def metrics(self):
-    if self._user_metrics is None:
-      self._user_metrics = FnApiMetrics(self._metrics_by_stage)
-    return self._user_metrics
+    """Returns a queryable oject including user metrics only."""
+    if self._metrics is None:
+      self._metrics = FnApiMetrics(
+          self._monitoring_infos_by_stage, user_metrics_only=True)
+    return self._metrics
+
+  def monitoring_metrics(self):
+    """Returns a queryable object including all metrics."""
+    if self._monitoring_metrics is None:
+      self._monitoring_metrics = FnApiMetrics(
+          self._monitoring_infos_by_stage, user_metrics_only=False)
+    return self._monitoring_metrics
 
 
 def only_element(iterable):
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index a7dcbf32fe9..657bd11af09 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -28,6 +28,7 @@
 from builtins import range
 
 import apache_beam as beam
+from apache_beam.metrics import monitoring_infos
 from apache_beam.metrics.execution import MetricKey
 from apache_beam.metrics.execution import MetricsEnvironment
 from apache_beam.metrics.metricbase import MetricName
@@ -435,7 +436,6 @@ def expand(self, pcolls):
       assert_that((pcoll_a, pcoll_b) | First(), equal_to(['a']))
 
   def test_metrics(self):
-
     p = self.create_pipeline()
     if not isinstance(p.runner, fn_api_runner.FnApiRunner):
       # This test is inherited by others that may not support the same
@@ -470,6 +470,46 @@ def test_metrics(self):
     self.assertEqual(dist.committed.mean, 2.0)
     self.assertEqual(gaug.committed.value, 3)
 
+  def test_non_user_metrics(self):
+    p = self.create_pipeline()
+    if not isinstance(p.runner, fn_api_runner.FnApiRunner):
+      # This test is inherited by others that may not support the same
+      # internal way of accessing progress metrics.
+      self.skipTest('Metrics not supported.')
+
+    pcoll = p | beam.Create(['a', 'zzz'])
+    # pylint: disable=expression-not-assigned
+    pcoll | 'MyStep' >> beam.FlatMap(lambda x: None)
+    res = p.run()
+    res.wait_until_finish()
+
+    result_metrics = res.monitoring_metrics()
+    all_metrics_via_montoring_infos = result_metrics.query()
+
+    def assert_counter_exists(metrics, namespace, name, step):
+      found = 0
+      metric_key = MetricKey(step, MetricName(namespace, name))
+      for m in metrics['counters']:
+        if m.key == metric_key:
+          found = found + 1
+      self.assertEqual(
+          1, found, "Did not find exactly 1 metric for %s." % metric_key)
+    urns = [
+        monitoring_infos.ELEMENT_COUNT_URN,
+        monitoring_infos.START_BUNDLE_MSECS_URN,
+        monitoring_infos.PROCESS_BUNDLE_MSECS_URN,
+        monitoring_infos.FINISH_BUNDLE_MSECS_URN,
+        monitoring_infos.TOTAL_MSECS_URN,
+    ]
+    for urn in urns:
+      split = urn.split(':')
+      namespace = split[0]
+      name = ':'.join(split[1:])
+      assert_counter_exists(
+          all_metrics_via_montoring_infos, namespace, name, step='Create/Read')
+      assert_counter_exists(
+          all_metrics_via_montoring_infos, namespace, name, step='MyStep')
+
   def test_progress_metrics(self):
     p = self.create_pipeline()
     if not isinstance(p.runner, fn_api_runner.FnApiRunner):
@@ -489,13 +529,21 @@ def test_progress_metrics(self):
              beam.pvalue.TaggedOutput('twice', x)]))
     res = p.run()
     res.wait_until_finish()
+
+    def has_mi_for_ptransform(monitoring_infos, ptransform):
+      for mi in monitoring_infos:
+        if ptransform in mi.labels['PTRANSFORM']:
+          return True
+      return False
+
     try:
-      self.assertEqual(2, len(res._metrics_by_stage))
-      pregbk_metrics, postgbk_metrics = list(res._metrics_by_stage.values())
+      # TODO(ajamato): Delete this block after deleting the legacy metrics 
code.
+      # Test the DEPRECATED legacy metrics
+      pregbk_metrics, postgbk_metrics = list(
+          res._metrics_by_stage.values())
       if 'Create/Read' not in pregbk_metrics.ptransforms:
         # The metrics above are actually unordered. Swap.
         pregbk_metrics, postgbk_metrics = postgbk_metrics, pregbk_metrics
-
       self.assertEqual(
           4,
           pregbk_metrics.ptransforms['Create/Read']
@@ -527,8 +575,58 @@ def test_progress_metrics(self):
           2,
           m_out.processed_elements.measured.output_element_counts['twice'])
 
+      # Test the new MonitoringInfo monitoring format.
+      self.assertEqual(2, len(res._monitoring_infos_by_stage))
+      pregbk_mis, postgbk_mis = list(res._monitoring_infos_by_stage.values())
+      if not has_mi_for_ptransform(pregbk_mis, 'Create/Read'):
+        # The monitoring infos above are actually unordered. Swap.
+        pregbk_mis, postgbk_mis = postgbk_mis, pregbk_mis
+
+      def assert_has_monitoring_info(
+          monitoring_infos, urn, labels, value=None, ge_value=None):
+        # TODO(ajamato): Consider adding a matcher framework
+        found = 0
+        for m in monitoring_infos:
+          if m.labels == labels and m.urn == urn:
+            if (ge_value is not None and
+                m.metric.counter_data.int64_value >= ge_value):
+              found = found + 1
+            elif (value is not None and
+                  m.metric.counter_data.int64_value == value):
+              found = found + 1
+        ge_value_str = {'ge_value' : ge_value} if ge_value else ''
+        value_str = {'value' : value} if value else ''
+        self.assertEqual(
+            1, found, "Found (%s) Expected only 1 monitoring_info for %s." %
+            (found, (urn, labels, value_str, ge_value_str),))
+
+      # pregbk monitoring infos
+      labels = {'PTRANSFORM' : 'Create/Read', 'TAG' : 'out'}
+      assert_has_monitoring_info(
+          pregbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=4)
+      labels = {'PTRANSFORM' : 'Map(sleep)', 'TAG' : 'None'}
+      assert_has_monitoring_info(
+          pregbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=4)
+      labels = {'PTRANSFORM' : 'Map(sleep)'}
+      assert_has_monitoring_info(
+          pregbk_mis, monitoring_infos.TOTAL_MSECS_URN,
+          labels, ge_value=4 * DEFAULT_SAMPLING_PERIOD_MS)
+
+      # postgbk monitoring infos
+      labels = {'PTRANSFORM' : 'GroupByKey/Read', 'TAG' : 'None'}
+      assert_has_monitoring_info(
+          postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=1)
+      labels = {'PTRANSFORM' : 'm_out', 'TAG' : 'None'}
+      assert_has_monitoring_info(
+          postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=5)
+      labels = {'PTRANSFORM' : 'm_out', 'TAG' : 'once'}
+      assert_has_monitoring_info(
+          postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=1)
+      labels = {'PTRANSFORM' : 'm_out', 'TAG' : 'twice'}
+      assert_has_monitoring_info(
+          postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=2)
     except:
-      print(res._metrics_by_stage)
+      print(res._monitoring_infos_by_stage)
       raise
 
 
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 70968206123..e4737b4ad09 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -37,6 +37,7 @@
 from apache_beam.coders import coder_impl
 from apache_beam.internal import pickler
 from apache_beam.io import iobase
+from apache_beam.metrics import monitoring_infos
 from apache_beam.portability import common_urns
 from apache_beam.portability import python_urns
 from apache_beam.portability.api import beam_fn_api_pb2
@@ -426,6 +427,7 @@ def process_bundle(self, instruction_id):
       self.state_sampler.stop_if_still_running()
 
   def metrics(self):
+    # DEPRECATED
     return beam_fn_api_pb2.Metrics(
         # TODO(robertwb): Rename to progress?
         ptransforms={
@@ -434,16 +436,18 @@ def metrics(self):
             for transform_id, op in self.ops.items()})
 
   def _fix_output_tags(self, transform_id, metrics):
+    # DEPRECATED
+    actual_output_tags = list(
+        self.process_bundle_descriptor.transforms[transform_id].outputs.keys())
     # Outputs are still referred to by index, not by name, in many Operations.
     # However, if there is exactly one output, we can fix up the name here.
+
     def fix_only_output_tag(actual_output_tag, mapping):
       if len(mapping) == 1:
         fake_output_tag, count = only_element(list(mapping.items()))
         if fake_output_tag != actual_output_tag:
           del mapping[fake_output_tag]
           mapping[actual_output_tag] = count
-    actual_output_tags = list(
-        self.process_bundle_descriptor.transforms[transform_id].outputs.keys())
     if len(actual_output_tags) == 1:
       fix_only_output_tag(
           actual_output_tags[0],
@@ -453,6 +457,25 @@ def fix_only_output_tag(actual_output_tag, mapping):
           metrics.active_elements.measured.output_element_counts)
     return metrics
 
+  def monitoring_infos(self):
+    """Returns the list of MonitoringInfos collected processing this bundle."""
+    # Construct a new dict first to remove duplciates.
+    all_monitoring_infos_dict = {}
+    for transform_id, op in self.ops.items():
+      for mi in op.monitoring_infos(transform_id).values():
+        fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
+        all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi
+    return list(all_monitoring_infos_dict.values())
+
+  def _fix_output_tags_monitoring_info(self, transform_id, monitoring_info):
+    actual_output_tags = list(
+        self.process_bundle_descriptor.transforms[transform_id].outputs.keys())
+    if ('TAG' in monitoring_info.labels and
+        monitoring_info.labels['TAG'] == 'ONLY_OUTPUT'):
+      if len(actual_output_tags) == 1:
+        monitoring_info.labels['TAG'] = actual_output_tags[0]
+    return monitoring_info
+
 
 class BeamTransformFactory(object):
   """Factory for turning transform_protos into executable operations."""
diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd 
b/sdks/python/apache_beam/runners/worker/operations.pxd
index 9cde9da75b2..318ea516810 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -20,7 +20,7 @@ cimport cython
 from apache_beam.runners.common cimport Receiver
 from apache_beam.runners.worker cimport opcounters
 from apache_beam.utils.windowed_value cimport WindowedValue
-
+#from libcpp.string cimport string
 
 cdef WindowedValue _globally_windowed_value
 cdef type _global_window_type
@@ -62,7 +62,10 @@ cdef class Operation(object):
   cpdef process(self, WindowedValue windowed_value)
   cpdef finish(self)
   cpdef output(self, WindowedValue windowed_value, int output_index=*)
-  cpdef progress_metrics(self)
+  cpdef execution_time_monitoring_infos(self, transform_id)
+  cpdef user_monitoring_infos(self, transform_id)
+  cpdef element_count_monitoring_infos(self, transform_id)
+  cpdef monitoring_infos(self, transform_id)
 
 
 cdef class ReadOperation(Operation):
diff --git a/sdks/python/apache_beam/runners/worker/operations.py 
b/sdks/python/apache_beam/runners/worker/operations.py
index b16a2c9486d..61dd632aafe 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -31,6 +31,7 @@
 from apache_beam import pvalue
 from apache_beam.internal import pickler
 from apache_beam.io import iobase
+from apache_beam.metrics import monitoring_infos
 from apache_beam.metrics.execution import MetricsContainer
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.runners import common
@@ -190,6 +191,62 @@ def progress_metrics(self):
                     else None))),
         user=self.metrics_container.to_runner_api())
 
+  def monitoring_infos(self, transform_id):
+    """Returns the list of MonitoringInfos collected by this operation."""
+    all_monitoring_infos = self.execution_time_monitoring_infos(transform_id)
+    all_monitoring_infos.update(
+        self.element_count_monitoring_infos(transform_id))
+    all_monitoring_infos.update(self.user_monitoring_infos(transform_id))
+    return all_monitoring_infos
+
+  def element_count_monitoring_infos(self, transform_id):
+    """Returns the element count MonitoringInfo collected by this operation."""
+    if len(self.receivers) == 1:
+      # If there is exactly one output, we can unambiguously
+      # fix its name later, which we do.
+      # TODO(robertwb): Plumb the actual name here.
+      mi = monitoring_infos.int64_counter(
+          monitoring_infos.ELEMENT_COUNT_URN,
+          self.receivers[0].opcounter.element_counter.value(),
+          ptransform=transform_id,
+          tag='ONLY_OUTPUT' if len(self.receivers) == 1 else str(None),
+      )
+      return {monitoring_infos.to_key(mi) : mi}
+    return {}
+
+  def user_monitoring_infos(self, transform_id):
+    """Returns the user MonitoringInfos collected by this operation."""
+    return self.metrics_container.to_runner_api_monitoring_infos(transform_id)
+
+  def execution_time_monitoring_infos(self, transform_id):
+    total_time_spent_msecs = (
+        self.scoped_start_state.sampled_msecs_int()
+        + self.scoped_process_state.sampled_msecs_int()
+        + self.scoped_finish_state.sampled_msecs_int())
+    mis = [
+        monitoring_infos.int64_counter(
+            monitoring_infos.START_BUNDLE_MSECS_URN,
+            self.scoped_start_state.sampled_msecs_int(),
+            ptransform=transform_id
+        ),
+        monitoring_infos.int64_counter(
+            monitoring_infos.PROCESS_BUNDLE_MSECS_URN,
+            self.scoped_process_state.sampled_msecs_int(),
+            ptransform=transform_id
+        ),
+        monitoring_infos.int64_counter(
+            monitoring_infos.FINISH_BUNDLE_MSECS_URN,
+            self.scoped_finish_state.sampled_msecs_int(),
+            ptransform=transform_id
+        ),
+        monitoring_infos.int64_counter(
+            monitoring_infos.TOTAL_MSECS_URN,
+            total_time_spent_msecs,
+            ptransform=transform_id
+        ),
+    ]
+    return {monitoring_infos.to_key(mi) : mi for mi in mis}
+
   def __str__(self):
     """Generates a useful string for this object.
 
@@ -442,6 +499,19 @@ def progress_metrics(self):
             str(tag)] = receiver.opcounter.element_counter.value()
     return metrics
 
+  def monitoring_infos(self, transform_id):
+    infos = super(DoOperation, self).monitoring_infos(transform_id)
+    if self.tagged_receivers:
+      for tag, receiver in self.tagged_receivers.items():
+        mi = monitoring_infos.int64_counter(
+            monitoring_infos.ELEMENT_COUNT_URN,
+            receiver.opcounter.element_counter.value(),
+            ptransform=transform_id,
+            tag=str(tag)
+        )
+        infos[monitoring_infos.to_key(mi)] = mi
+    return infos
+
 
 class DoFnRunnerReceiver(Receiver):
 
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index fb11278437f..b07dd9f0317 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -238,7 +238,8 @@ def process_bundle(self, request, instruction_id):
     return beam_fn_api_pb2.InstructionResponse(
         instruction_id=instruction_id,
         process_bundle=beam_fn_api_pb2.ProcessBundleResponse(
-            metrics=processor.metrics()))
+            metrics=processor.metrics(),
+            monitoring_infos=processor.monitoring_infos()))
 
   def process_bundle_progress(self, request, instruction_id):
     # It is an error to get progress for a not-in-flight bundle.
@@ -246,7 +247,8 @@ def process_bundle_progress(self, request, instruction_id):
     return beam_fn_api_pb2.InstructionResponse(
         instruction_id=instruction_id,
         process_bundle_progress=beam_fn_api_pb2.ProcessBundleProgressResponse(
-            metrics=processor.metrics() if processor else None))
+            metrics=processor.metrics() if processor else None,
+            monitoring_infos=processor.monitoring_infos() if processor else 
[]))
 
 
 class StateHandlerFactory(with_metaclass(abc.ABCMeta, object)):
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx 
b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
index 8aa5217d8d1..73fc4be2e79 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
+++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
@@ -209,6 +209,9 @@ cdef class ScopedState(object):
   def sampled_seconds(self):
     return 1e-9 * self.nsecs
 
+  def sampled_msecs_int(self):
+    return int(1e-6 * self.nsecs)
+
   def __repr__(self):
     return "ScopedState[%s, %s]" % (self.name, self.nsecs)
 
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_slow.py 
b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
index 4b1bf830073..196b35d6003 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_slow.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
@@ -80,6 +80,9 @@ def __init__(self, sampler, name, step_name_context,
   def sampled_seconds(self):
     return 1e-9 * self.nsecs
 
+  def sampled_msecs_int(self):
+    return int(1e-6 * self.nsecs)
+
   def __repr__(self):
     return "ScopedState[%s, %s]" % (self.name, self.nsecs)
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 154064)
    Time Spent: 8.5h  (was: 8h 20m)

> Update existing metrics in the FN API to use new Metric Schema
> --------------------------------------------------------------
>
>                 Key: BEAM-4374
>                 URL: https://issues.apache.org/jira/browse/BEAM-4374
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model
>            Reporter: Alex Amato
>            Priority: Major
>          Time Spent: 8.5h
>  Remaining Estimate: 0h
>
> Update existing metrics to use the new proto and cataloging schema defined in:
> [_https://s.apache.org/beam-fn-api-metrics_]
>  * Check in new protos
>  * Define catalog file for metrics
>  * Port existing metrics to use this new format, based on catalog 
> names+metadata



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to