[ 
https://issues.apache.org/jira/browse/BEAM-4775?focusedWorklogId=205996&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-205996
 ]

ASF GitHub Bot logged work on BEAM-4775:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Feb/19 19:54
            Start Date: 28/Feb/19 19:54
    Worklog Time Spent: 10m 
      Work Description: ryan-williams commented on pull request #7934: 
[BEAM-4775] add GetJobMetrics RPC
URL: https://github.com/apache/beam/pull/7934#discussion_r261357015
 
 

 ##########
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricResultsProtos.java
 ##########
 @@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfos.keyFromMonitoringInfo;
+import static 
org.apache.beam.runners.core.metrics.MonitoringInfos.processMetric;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metric;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import org.apache.beam.runners.core.construction.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Timestamp;
+
+/**
+ * Convert between Java SDK {@link MetricResults} and corresponding protobuf 
{@link
+ * BeamFnApi.MetricResults}.
+ *
+ * <p>Their structures are similar, but the former distinguishes "attempted" 
and "committed" values
+ * at the lowest level (a {@link MetricResult} contains both), while the 
latter separates them into
+ * two lists of {@link MonitoringInfo}s at the top level ({@link
+ * BeamFnApi.MetricResults#getAttemptedList attempted}, {@link
+ * BeamFnApi.MetricResults#getCommittedList} committed}.
+ *
+ * <p>THe proto form also holds more kinds of values, so some {@link 
MonitoringInfo}s may be dropped
+ * from converting to the Java SDK form.
+ */
+public class MetricResultsProtos {
+
+  /** Convert metric results from proto to Java SDK form. */
+  public static MetricResults fromProto(BeamFnApi.MetricResults metricResults) 
{
+    return new MetricResultsBuilder(metricResults).build();
+  }
+
+  /** Convert metric results from Java SDK to proto form. */
+  public static BeamFnApi.MetricResults toProto(MetricResults metricResults) {
+    BeamFnApi.MetricResults.Builder builder = 
BeamFnApi.MetricResults.newBuilder();
+    MetricQueryResults results = metricResults.queryMetrics(null);
+    results
+        .getCounters()
+        .forEach(counter -> process(builder, counter, 
SimpleMonitoringInfoBuilder::setInt64Value));
+    results
+        .getDistributions()
+        .forEach(
+            distribution ->
+                process(
+                    builder, distribution, 
SimpleMonitoringInfoBuilder::setIntDistributionValue));
+    results
+        .getGauges()
+        .forEach(gauge -> process(builder, gauge, 
SimpleMonitoringInfoBuilder::setGaugeValue));
+    return builder.build();
+  }
+
+  // `toProto` helpers
+
+  /**
+   * Add this {@link MetricResult}'s "attempted" and "committed" values to the 
corresponding lists
+   * of {@param builder}.
+   */
+  private static <T> void process(
+      BeamFnApi.MetricResults.Builder builder,
+      MetricResult<T> metricResult,
+      BiConsumer<SimpleMonitoringInfoBuilder, T> set) {
+    MetricKey metricKey = MetricKey.create(metricResult.getStep(), 
metricResult.getName());
+    ;
+    process(
+        builder,
+        metricKey,
+        metricResult.getAttempted(),
+        set,
+        BeamFnApi.MetricResults.Builder::addAttempted);
+    try {
+      process(
+          builder,
+          metricKey,
+          metricResult.getCommitted(),
+          set,
+          BeamFnApi.MetricResults.Builder::addCommitted);
+    } catch (UnsupportedOperationException ignored) {
+    }
+  }
+
+  /**
+   * Add a metric key and value (from a {@link MetricResult}) to a {@link
+   * BeamFnApi.MetricResults.Builder}.
+   *
+   * @param builder proto {@link BeamFnApi.MetricResults.Builder} to add to
+   * @param set set this metric's value on a {@link 
SimpleMonitoringInfoBuilder} (via an API that's
+   *     specific to this metric's type, as provided by the caller)
+   * @param add add the {@link MonitoringInfo} created from this key/value to 
either the "attempted"
+   *     or "committed" list of the {@link BeamFnApi.MetricResults.Builder 
builder}
+   */
+  private static <T> void process(
+      BeamFnApi.MetricResults.Builder builder,
+      MetricKey metricKey,
+      T value,
+      BiConsumer<SimpleMonitoringInfoBuilder, T> set,
+      BiConsumer<BeamFnApi.MetricResults.Builder, MonitoringInfo> add) {
+    if (value != null) {
+      SimpleMonitoringInfoBuilder partial =
+          new SimpleMonitoringInfoBuilder().handleMetricKey(metricKey);
+      ;
+      set.accept(partial, value);
+      MonitoringInfo monitoringInfo = partial.build();
+      if (monitoringInfo != null) {
+        add.accept(builder, monitoringInfo);
+      }
+    }
+  }
+
+  // `fromProto` helpers
+
+  /**
+   * Helper for converting {@link BeamFnApi.MetricResults} to {@link 
MetricResults}.
+   *
+   * <p>The former separates "attempted" and "committed" metrics at the 
highest level, while the
+   * latter splits on metric-type (counter, distribution, or gauge) first, so 
converting basically
+   * amounts pivoting those between those two.
+   */
+  private static class MetricResultsBuilder {
+    private final Map<MetricKey, MetricResultBuilder<Long>> counters = new 
ConcurrentHashMap<>();
+    private final Map<MetricKey, MetricResultBuilder<DistributionResult>> 
distributions =
+        new ConcurrentHashMap<>();
+    private final Map<MetricKey, MetricResultBuilder<GaugeResult>> gauges =
+        new ConcurrentHashMap<>();
+
+    /**
+     * Populate metric-type-specific maps with {@link MonitoringInfo}s from a 
{@link
+     * BeamFnApi.MetricResults}.
+     */
+    public MetricResultsBuilder(BeamFnApi.MetricResults metrics) {
+      add(metrics.getAttemptedList(), false);
+      add(metrics.getCommittedList(), true);
+    }
+
+    private void add(Iterable<MonitoringInfo> monitoringInfos, Boolean 
committed) {
+      for (MonitoringInfo monitoringInfo : monitoringInfos) {
+        add(monitoringInfo, committed);
+      }
+    }
+
+    public MetricResults build() {
+      return new DefaultMetricResults(build(counters), build(distributions), 
build(gauges));
+    }
+
+    /** Helper for turning a map of result-builders into a sequence of 
results. */
+    private static <T> List<MetricResult<T>> build(Map<MetricKey, 
MetricResultBuilder<T>> map) {
+      return 
map.values().stream().map(MetricResultBuilder::build).collect(toList());
+    }
+
+    private void add(MonitoringInfo monitoringInfo, Boolean committed) {
+      add(
+          keyFromMonitoringInfo(monitoringInfo),
+          monitoringInfo.getType(),
+          monitoringInfo.getTimestamp(),
+          monitoringInfo.getMetric(),
+          committed);
+    }
+
+    private void add(
+        MetricKey metricKey, String type, Timestamp timestamp, Metric metric, 
Boolean committed) {
+      processMetric(
+          metric,
+          type,
+          timestamp,
+          counter -> add(metricKey, counter, committed, counters),
+          distribution -> add(metricKey, distribution, committed, 
distributions),
+          gauge -> add(metricKey, gauge, committed, gauges));
+    }
+
+    private <T> void add(
+        MetricKey key, T value, Boolean committed, Map<MetricKey, 
MetricResultBuilder<T>> map) {
+      if (committed) {
+        MetricResultBuilder builder = map.get(key);
+        if (builder == null) {
+          throw new IllegalStateException(
+              String.format("No attempted value found for committed metric %s: 
%s", key, value));
+        }
+        builder.setCommitted(value);
+      } else {
+        map.put(key, new MetricResultBuilder(key, value));
+      }
+    }
+
+    /**
+     * Helper class for storing and combining {@link MetricResult#getAttempted 
attempted} and {@link
+     * MetricResult#getCommitted committed} values, to obtain complete {@link 
MetricResult}s.
+     *
+     * <p>Used while traversing {@link MonitoringInfo} lists where they are 
stored separately.
+     */
+    private static class MetricResultBuilder<T> {
+      private final MetricKey key;
+      private final T attempted;
+      @Nullable private T committed;
+
+      public MetricResultBuilder(MetricKey key, T attempted) {
+        this.key = key;
+        this.attempted = attempted;
 
 Review comment:
   There are a couple meanings of {"attempted", "committed"} floating around. 
   
   You've implied they correspond to MIs that come in via 
{`ProcessBundleProgressResponse`, `ProcessBundleResponse`}, resp., but there's 
a pre-fn-api {"physical", "logical"} meaning that most existing code uses.
   
   I'm not trying to resolve these two meanings in this PR. I wrote things to 
conform to exactly what was already happening, while still supporting the 
changes I'm attempting to make (which require converting between [new proto] 
and [existing SDK] representations of metrics).
   
   What was already happening, that I'm preserving, is:
   - "attempted" ("physical") is always set
   - "committed" ("logical") is sometimes set
     - some runners don't support the latter, so they don't set it (by going 
through 
[`MCSM.asAttemptedOnlyMetricResults`](https://github.com/apache/beam/blob/3040767e0312fa73cd08d8f592ce958c484d9fc1/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java#L114)):
       - 
[flink](https://github.com/apache/beam/blob/3040767e0312fa73cd08d8f592ce958c484d9fc1/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java#L146)
       - spark 
[1](https://github.com/apache/beam/blob/3040767e0312fa73cd08d8f592ce958c484d9fc1/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java#L47)
 
[2](https://github.com/apache/beam/blob/3040767e0312fa73cd08d8f592ce958c484d9fc1/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java#L109)
       - samza 
[1](https://github.com/apache/beam/blob/3040767e0312fa73cd08d8f592ce958c484d9fc1/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java#L62)
 
[2](https://github.com/apache/beam/blob/3040767e0312fa73cd08d8f592ce958c484d9fc1/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java#L86)
     - other runners do set both:
       - DataflowMetrics 
[1](https://github.com/apache/beam/blob/3040767e0312fa73cd08d8f592ce958c484d9fc1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java#L143),
 
[2](https://github.com/apache/beam/blob/3040767e0312fa73cd08d8f592ce958c484d9fc1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java#L152):
 "attempted" and "committed" are set [to the same 
value](https://github.com/apache/beam/blob/3040767e0312fa73cd08d8f592ce958c484d9fc1/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java#L93)
       - 
[DirectMetrics](https://github.com/apache/beam/blob/3040767e0312fa73cd08d8f592ce958c484d9fc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java#L259-L262):
 both values set, representing "physical" and "logical" values
       - [portable 
DirectMetrics](https://github.com/apache/beam/blob/3040767e0312fa73cd08d8f592ce958c484d9fc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectMetrics.java#L263-L266)
         - most portable DirectRunner metrics code seems like copy-pasta from 
the legacy version. I've consolidated / deleted some of it, but haven't gotten 
to this yet.
   - accordingly, ["committed" is 
`@Nullable`](https://github.com/apache/beam/blob/3040767e0312fa73cd08d8f592ce958c484d9fc1/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java#L65-L66),
 and ["attempted" is 
not](https://github.com/apache/beam/blob/3040767e0312fa73cd08d8f592ce958c484d9fc1/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java#L68-L69).
 
   
   In this `private static` helper, I've written the simplest conversion I can 
manage between [an attempted metric value] and [optional corresponding 
committed value that comes in via a different proto field] and a MetricResult 
that joins them.
   
   lmk if that doesn't make sense!
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 205996)
    Time Spent: 30.5h  (was: 30h 20m)

> JobService should support returning metrics
> -------------------------------------------
>
>                 Key: BEAM-4775
>                 URL: https://issues.apache.org/jira/browse/BEAM-4775
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-model
>            Reporter: Eugene Kirpichov
>            Assignee: Ryan Williams
>            Priority: Major
>              Labels: triaged
>          Time Spent: 30.5h
>  Remaining Estimate: 0h
>
> Design doc: [https://s.apache.org/get-metrics-api].
> h1. Relevant PRs in flight:
> h2. Approved / Ready to merge:
> None atm.
> h2. Ready for Review:
>  * [#7934|https://github.com/apache/beam/pull/7934]: add GetJobMetrics RPC, 
> Java+Python SDK support for it
>  * #[7915|https://github.com/apache/beam/pull/7915]: use MonitoringInfo data 
> model in Java SDK metrics
>  ** Depends on [#7867|https://github.com/apache/beam/pull/7867]
>  ** will be need to be rebased on top of 
> [#7938|https://github.com/apache/beam/pull/7938]
>  ** Both of these require adding a {{sdks/java/core}} dependency on the 
> {{model/fn-execution}} protos module.
>  *** I want to discuss whether that's ok.
>  *** It may not be totally necessary; see discussion on 
> #[7915|https://github.com/apache/beam/pull/7915].
> h2. Iterating / Discussing:
>  * [#7868|https://github.com/apache/beam/pull/7868]: MonitoringInfo URN tweaks
> h2. Merged
>  * [#7867|https://github.com/apache/beam/pull/7867]: key MetricResult by a 
> MetricKey
>  * [#7938|https://github.com/apache/beam/pull/7938]: move MonitoringInfo 
> protos to model/pipeline module
>  * [#7883|https://github.com/apache/beam/pull/7883]: Add 
> MetricQueryResults.allMetrics() helper
>  * [#7866|https://github.com/apache/beam/pull/7866]: move function helpers 
> from fn-harness to sdks/java/core
>  * [#7890|https://github.com/apache/beam/pull/7890]: consolidate MetricResult 
> implementations
> h2. Closed
>  * [#7876|https://github.com/apache/beam/pull/7876]: Clean up metric protos; 
> support integer distributions, gauges
> h1. Likely pieces still to come:
> Per recent discussion with [~robertwb], I'm going to move the MonitoringInfo 
> protos into the {{model/pipeline}} module, which the Job API, Fn API, and 
> sdks/java/core all depend on already, as it's a good/central place for them.
> h1. Previous Description:
> [https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto]
>  currently doesn't appear to have a way for JobService to return metrics to a 
> user, even though 
> [https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto]
>  includes support for reporting SDK metrics to the runner harness.
> Metrics are apparently necessary to run any ValidatesRunner tests because 
> PAssert needs to validate that the assertions succeeded. However, this 
> statement should be double-checked: perhaps it's possible to somehow work 
> with PAssert without metrics support.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to