This is an automated email from the ASF dual-hosted git repository.

scott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 68be16a  [BEAM-6100] Collect metrics properly in Load tests (#7087)
68be16a is described below

commit 68be16a34823b7d322f3cc0c6d3a3aa53bdecccb
Author: Ɓukasz Gajowy <[email protected]>
AuthorDate: Wed Nov 28 18:07:57 2018 +0100

    [BEAM-6100] Collect metrics properly in Load tests (#7087)
---
 .../beam/sdk/loadtests/CoGroupByKeyLoadTest.java   | 20 ++---
 .../apache/beam/sdk/loadtests/CombineLoadTest.java | 22 +++--
 .../beam/sdk/loadtests/GroupByKeyLoadTest.java     | 13 ++-
 .../org/apache/beam/sdk/loadtests/LoadTest.java    | 16 ++--
 .../apache/beam/sdk/loadtests/ParDoLoadTest.java   |  9 +-
 .../{MetricsMonitor.java => ByteMonitor.java}      | 19 +++--
 .../sdk/loadtests/metrics/MetricsPublisher.java    | 12 ++-
 .../{MetricsMonitor.java => TimeMonitor.java}      | 19 +++--
 .../apache/beam/sdk/nexmark/NexmarkLauncher.java   | 27 +++---
 .../beam/sdk/testutils/metrics/MetricsReader.java  | 97 ++++++++++++----------
 .../sdk/testutils/metrics/MetricsReaderTest.java   | 52 ++++++++----
 11 files changed, 181 insertions(+), 125 deletions(-)

diff --git 
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CoGroupByKeyLoadTest.java
 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CoGroupByKeyLoadTest.java
index 85d27ea..0cb8f0e 100644
--- 
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CoGroupByKeyLoadTest.java
+++ 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CoGroupByKeyLoadTest.java
@@ -22,7 +22,7 @@ import java.util.Optional;
 import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
 import 
org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO.SyntheticSourceOptions;
 import org.apache.beam.sdk.io.synthetic.SyntheticStep;
-import org.apache.beam.sdk.loadtests.metrics.MetricsMonitor;
+import org.apache.beam.sdk.loadtests.metrics.ByteMonitor;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.Validation;
@@ -92,22 +92,22 @@ public class CoGroupByKeyLoadTest extends 
LoadTest<CoGroupByKeyLoadTest.Options>
     Optional<SyntheticStep> syntheticStep = 
createStep(options.getStepOptions());
 
     PCollection<KV<byte[], byte[]>> input =
-        applyStepIfPresent(
-            pipeline.apply("Read input", 
SyntheticBoundedIO.readFrom(sourceOptions)),
-            "Synthetic step for input",
-            syntheticStep);
+        pipeline.apply("Read input", 
SyntheticBoundedIO.readFrom(sourceOptions));
+    input = input.apply("Collect start time metrics (input)", 
ParDo.of(runtimeMonitor));
+    applyStepIfPresent(input, "Synthetic step for input", syntheticStep);
 
     PCollection<KV<byte[], byte[]>> coInput =
-        applyStepIfPresent(
-            pipeline.apply("Read co-input", 
SyntheticBoundedIO.readFrom(coSourceOptions)),
-            "Synthetic step for co-input",
-            syntheticStep);
+        pipeline.apply("Read co-input", 
SyntheticBoundedIO.readFrom(coSourceOptions));
+    coInput = coInput.apply("Collect start time metrics (co-input)", 
ParDo.of(runtimeMonitor));
+    applyStepIfPresent(coInput, "Synthetic step for co-input", syntheticStep);
 
     KeyedPCollectionTuple.of(INPUT_TAG, input)
         .and(CO_INPUT_TAG, coInput)
         .apply("CoGroupByKey", CoGroupByKey.create())
         .apply("Ungroup and reiterate", ParDo.of(new 
UngroupAndReiterate(options.getIterations())))
-        .apply("Collect metrics", ParDo.of(new 
MetricsMonitor(METRICS_NAMESPACE)));
+        .apply(
+            "Collect total bytes", ParDo.of(new ByteMonitor(METRICS_NAMESPACE, 
"totalBytes.count")))
+        .apply("Collect end time metrics", ParDo.of(runtimeMonitor));
   }
 
   private static class UngroupAndReiterate
diff --git 
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java
 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java
index a85f23b..316d626 100644
--- 
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java
+++ 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java
@@ -25,7 +25,8 @@ import java.math.BigInteger;
 import java.util.Optional;
 import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
 import org.apache.beam.sdk.io.synthetic.SyntheticStep;
-import org.apache.beam.sdk.loadtests.metrics.MetricsMonitor;
+import org.apache.beam.sdk.loadtests.metrics.ByteMonitor;
+import org.apache.beam.sdk.loadtests.metrics.TimeMonitor;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.transforms.Combine;
@@ -107,23 +108,30 @@ public class CombineLoadTest extends 
LoadTest<CombineLoadTest.Options> {
 
   @Override
   protected void loadTest() throws IOException {
-    PTransform combiner = 
createPerKeyCombiner(options.getPerKeyCombinerType());
-
     Optional<SyntheticStep> syntheticStep = 
createStep(options.getStepOptions());
 
     PCollection<KV<byte[], byte[]>> input =
         pipeline
             .apply("Read input", SyntheticBoundedIO.readFrom(sourceOptions))
-            .apply("Collect metrics", ParDo.of(new 
MetricsMonitor(METRICS_NAMESPACE)));
+            .apply(
+                "Collect start time metric",
+                ParDo.of(new TimeMonitor<>(METRICS_NAMESPACE, "runtime")))
+            .apply(
+                "Collect metrics",
+                ParDo.of(new ByteMonitor(METRICS_NAMESPACE, 
"totalBytes.count")));
 
     for (int i = 0; i < options.getFanout(); i++) {
       applyStepIfPresent(input, format("Step: %d", i), syntheticStep)
-          .apply(format("Convert to BigInteger: %d", i), MapElements.via(new 
ByteValueToLong()))
-          .apply(format("Combine: %d", i), combiner);
+          .apply(format("Convert to Long: %d", i), MapElements.via(new 
ByteValueToLong()))
+          .apply(format("Combine: %d", i), 
getPerKeyCombiner(options.getPerKeyCombinerType()))
+          .apply(
+              "Collect end time metric",
+              ParDo.of(new TimeMonitor<byte[], Object>(METRICS_NAMESPACE, 
"runtime")));
     }
   }
 
-  private PTransform createPerKeyCombiner(CombinerType combinerType) {
+  private PTransform<PCollection<KV<byte[], Long>>, ? extends PCollection> 
getPerKeyCombiner(
+      CombinerType combinerType) {
     switch (combinerType) {
       case MEAN:
         return Mean.perKey();
diff --git 
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
index fb0bf84..557cca7 100644
--- 
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
+++ 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import java.util.Optional;
 import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
 import org.apache.beam.sdk.io.synthetic.SyntheticStep;
-import org.apache.beam.sdk.loadtests.metrics.MetricsMonitor;
+import org.apache.beam.sdk.loadtests.metrics.ByteMonitor;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -84,15 +84,20 @@ public class GroupByKeyLoadTest extends 
LoadTest<GroupByKeyLoadTest.Options> {
     Optional<SyntheticStep> syntheticStep = 
createStep(options.getStepOptions());
 
     PCollection<KV<byte[], byte[]>> input =
-        pipeline.apply(SyntheticBoundedIO.readFrom(sourceOptions));
+        pipeline
+            .apply(SyntheticBoundedIO.readFrom(sourceOptions))
+            .apply("Collect start time metrics", ParDo.of(runtimeMonitor))
+            .apply(
+                "Total bytes monitor",
+                ParDo.of(new ByteMonitor(METRICS_NAMESPACE, 
"totalBytes.count")));
 
     for (int branch = 0; branch < options.getFanout(); branch++) {
       applyStepIfPresent(input, format("Synthetic step (%s)", branch), 
syntheticStep)
-          .apply(ParDo.of(new MetricsMonitor(METRICS_NAMESPACE)))
           .apply(format("Group by key (%s)", branch), GroupByKey.create())
           .apply(
               format("Ungroup and reiterate (%s)", branch),
-              ParDo.of(new UngroupAndReiterate(options.getIterations())));
+              ParDo.of(new UngroupAndReiterate(options.getIterations())))
+          .apply(format("Collect end time metrics (%s)", branch), 
ParDo.of(runtimeMonitor));
     }
   }
 
diff --git 
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
index b3f2d1f..0f936d1 100644
--- 
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
+++ 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
 import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
 import org.apache.beam.sdk.io.synthetic.SyntheticStep;
 import org.apache.beam.sdk.loadtests.metrics.MetricsPublisher;
+import org.apache.beam.sdk.loadtests.metrics.TimeMonitor;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -38,24 +39,23 @@ abstract class LoadTest<OptionsT extends LoadTestOptions> {
 
   private String metricsNamespace;
 
-  OptionsT options;
+  protected TimeMonitor<byte[], byte[]> runtimeMonitor;
 
-  SyntheticBoundedIO.SyntheticSourceOptions sourceOptions;
+  protected OptionsT options;
 
-  SyntheticStep.Options stepOptions;
+  protected SyntheticBoundedIO.SyntheticSourceOptions sourceOptions;
 
-  Pipeline pipeline;
+  protected SyntheticStep.Options stepOptions;
+
+  protected Pipeline pipeline;
 
   LoadTest(String[] args, Class<OptionsT> testOptions, String 
metricsNamespace) throws IOException {
     this.metricsNamespace = metricsNamespace;
-
+    this.runtimeMonitor = new TimeMonitor<>(metricsNamespace, "runtime");
     this.options = LoadTestOptions.readFromArgs(args, testOptions);
-
     this.sourceOptions =
         fromJsonString(options.getSourceOptions(), 
SyntheticBoundedIO.SyntheticSourceOptions.class);
-
     this.stepOptions = fromJsonString(options.getStepOptions(), 
SyntheticStep.Options.class);
-
     this.pipeline = Pipeline.create(options);
   }
 
diff --git 
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/ParDoLoadTest.java
 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/ParDoLoadTest.java
index 3916ad6..3e51f2a 100644
--- 
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/ParDoLoadTest.java
+++ 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/ParDoLoadTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.loadtests;
 import java.io.IOException;
 import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
 import org.apache.beam.sdk.io.synthetic.SyntheticStep;
-import org.apache.beam.sdk.loadtests.metrics.MetricsMonitor;
+import org.apache.beam.sdk.loadtests.metrics.ByteMonitor;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.Validation;
@@ -73,13 +73,16 @@ public class ParDoLoadTest extends 
LoadTest<ParDoLoadTest.Options> {
   @Override
   protected void loadTest() {
     PCollection<KV<byte[], byte[]>> input =
-        pipeline.apply("Read input", 
SyntheticBoundedIO.readFrom(sourceOptions));
+        pipeline
+            .apply("Read input", SyntheticBoundedIO.readFrom(sourceOptions))
+            .apply(ParDo.of(runtimeMonitor))
+            .apply(ParDo.of(new ByteMonitor(METRICS_NAMESPACE, 
"totalBytes.count")));
 
     for (int i = 0; i < options.getNumberOfCounterOperations(); i++) {
       input = input.apply(String.format("Step: %d", i), ParDo.of(new 
SyntheticStep(stepOptions)));
     }
 
-    input.apply("Collect metrics", ParDo.of(new 
MetricsMonitor(METRICS_NAMESPACE)));
+    input.apply(ParDo.of(runtimeMonitor));
   }
 
   public static void main(String[] args) throws IOException {
diff --git 
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsMonitor.java
 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/ByteMonitor.java
similarity index 72%
copy from 
sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsMonitor.java
copy to 
sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/ByteMonitor.java
index b1785f2..26a3ef5 100644
--- 
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsMonitor.java
+++ 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/ByteMonitor.java
@@ -18,26 +18,27 @@
 package org.apache.beam.sdk.loadtests.metrics;
 
 import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.KV;
 
-/** Monitors various metrics from within a pipeline. */
-public class MetricsMonitor extends DoFn<KV<byte[], byte[]>, KV<byte[], 
byte[]>> {
-
-  private Distribution timeDistribution;
+/**
+ * Monitor that records the number of bytes flowing through a PCollection.
+ *
+ * <p>To use: apply a monitor in a desired place in the pipeline. This will 
capture how many bytes
+ * flew through this DoFn which then can be collected and written out using 
{@link
+ * MetricsPublisher}.
+ */
+public class ByteMonitor extends DoFn<KV<byte[], byte[]>, KV<byte[], byte[]>> {
 
   private Counter totalBytes;
 
-  public MetricsMonitor(String namespace) {
-    this.timeDistribution = Metrics.distribution(namespace, "runtime");
-    this.totalBytes = Metrics.counter(namespace, "totalBytes.count");
+  public ByteMonitor(String namespace, String name) {
+    this.totalBytes = Metrics.counter(namespace, name);
   }
 
   @ProcessElement
   public void processElement(ProcessContext c) {
-    timeDistribution.update(System.currentTimeMillis());
     totalBytes.inc(c.element().getKey().length + 
c.element().getValue().length);
     c.output(c.element());
   }
diff --git 
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsPublisher.java
 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsPublisher.java
index a6c05ab..365b95b 100644
--- 
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsPublisher.java
+++ 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsPublisher.java
@@ -23,12 +23,18 @@ import org.apache.beam.sdk.testutils.metrics.MetricsReader;
 /** Provides ways to publish metrics gathered during test invocation. */
 public class MetricsPublisher {
 
+  /**
+   * This prints out metrics results to console. It will work only if metrics 
with appropriate
+   * (conventional) names are present to be collected in {@link PipelineResult}
+   *
+   * <p>See {@link org.apache.beam.sdk.loadtests.GroupByKeyLoadTest} for hints 
on how to use it.
+   */
   public static void toConsole(PipelineResult result, String namespace) {
     MetricsReader resultMetrics = new MetricsReader(result, namespace);
 
-    long totalBytes = resultMetrics.getCounterMetric("totalBytes.count", -1);
-    long startTime = 
resultMetrics.getStartTimeMetric(System.currentTimeMillis(), "runtime");
-    long endTime = resultMetrics.getEndTimeMetric(System.currentTimeMillis(), 
"runtime");
+    long totalBytes = resultMetrics.getCounterMetric("totalBytes.count");
+    long startTime = resultMetrics.getStartTimeMetric("runtime");
+    long endTime = resultMetrics.getEndTimeMetric("runtime");
 
     System.out.println(String.format("Total bytes: %s", totalBytes));
     System.out.println(String.format("Total time (millis): %s", endTime - 
startTime));
diff --git 
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsMonitor.java
 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/TimeMonitor.java
similarity index 71%
rename from 
sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsMonitor.java
rename to 
sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/TimeMonitor.java
index b1785f2..128162d 100644
--- 
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsMonitor.java
+++ 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/TimeMonitor.java
@@ -17,28 +17,29 @@
  */
 package org.apache.beam.sdk.loadtests.metrics;
 
-import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.KV;
 
-/** Monitors various metrics from within a pipeline. */
-public class MetricsMonitor extends DoFn<KV<byte[], byte[]>, KV<byte[], 
byte[]>> {
+/**
+ * Monitor that records processing time distribution in the pipeline.
+ *
+ * <p>To use: apply a monitor directly after each source and sink transform. 
This will capture a
+ * distribution of element processing timestamps, which can be collected and 
written out using
+ * {@link MetricsPublisher}.
+ */
+public class TimeMonitor<K, V> extends DoFn<KV<K, V>, KV<K, V>> {
 
   private Distribution timeDistribution;
 
-  private Counter totalBytes;
-
-  public MetricsMonitor(String namespace) {
-    this.timeDistribution = Metrics.distribution(namespace, "runtime");
-    this.totalBytes = Metrics.counter(namespace, "totalBytes.count");
+  public TimeMonitor(String namespace, String name) {
+    this.timeDistribution = Metrics.distribution(namespace, name);
   }
 
   @ProcessElement
   public void processElement(ProcessContext c) {
     timeDistribution.update(System.currentTimeMillis());
-    totalBytes.inc(c.element().getKey().length + 
c.element().getValue().length);
     c.output(c.element());
   }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index cab4b83..c797064 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -252,20 +252,20 @@ public class NexmarkLauncher<OptionT extends 
NexmarkOptions> {
 
     MetricsReader eventMetrics = new MetricsReader(result, eventMonitor.name);
 
-    long numEvents = eventMetrics.getCounterMetric(eventMonitor.prefix + 
".elements", -1);
-    long numEventBytes = eventMetrics.getCounterMetric(eventMonitor.prefix + 
".bytes", -1);
-    long eventStart = eventMetrics.getStartTimeMetric(now, eventMonitor.prefix 
+ ".startTime");
-    long eventEnd = eventMetrics.getEndTimeMetric(now, eventMonitor.prefix + 
".endTime");
+    long numEvents = eventMetrics.getCounterMetric(eventMonitor.prefix + 
".elements");
+    long numEventBytes = eventMetrics.getCounterMetric(eventMonitor.prefix + 
".bytes");
+    long eventStart = eventMetrics.getStartTimeMetric(eventMonitor.prefix + 
".startTime");
+    long eventEnd = eventMetrics.getEndTimeMetric(eventMonitor.prefix + 
".endTime");
 
     MetricsReader resultMetrics = new MetricsReader(result, 
resultMonitor.name);
 
-    long numResults = resultMetrics.getCounterMetric(resultMonitor.prefix + 
".elements", -1);
-    long numResultBytes = resultMetrics.getCounterMetric(resultMonitor.prefix 
+ ".bytes", -1);
-    long resultStart = resultMetrics.getStartTimeMetric(now, 
resultMonitor.prefix + ".startTime");
-    long resultEnd = resultMetrics.getEndTimeMetric(now, resultMonitor.prefix 
+ ".endTime");
+    long numResults = resultMetrics.getCounterMetric(resultMonitor.prefix + 
".elements");
+    long numResultBytes = resultMetrics.getCounterMetric(resultMonitor.prefix 
+ ".bytes");
+    long resultStart = resultMetrics.getStartTimeMetric(resultMonitor.prefix + 
".startTime");
+    long resultEnd = resultMetrics.getEndTimeMetric(resultMonitor.prefix + 
".endTime");
     long timestampStart =
-        resultMetrics.getStartTimeMetric(now, resultMonitor.prefix + 
".startTimestamp");
-    long timestampEnd = resultMetrics.getEndTimeMetric(now, 
resultMonitor.prefix + ".endTimestamp");
+        resultMetrics.getStartTimeMetric(resultMonitor.prefix + 
".startTimestamp");
+    long timestampEnd = resultMetrics.getEndTimeMetric(resultMonitor.prefix + 
".endTimestamp");
 
     long effectiveEnd = -1;
     if (eventEnd >= 0 && resultEnd >= 0) {
@@ -449,7 +449,12 @@ public class NexmarkLauncher<OptionT extends 
NexmarkOptions> {
 
       if (options.isStreaming() && !waitingForShutdown) {
         Duration quietFor = new Duration(lastActivityMsSinceEpoch, now);
-        long fatalCount = new MetricsReader(job, 
query.getName()).getCounterMetric("fatal", 0);
+        long fatalCount = new MetricsReader(job, 
query.getName()).getCounterMetric("fatal");
+
+        if (fatalCount == -1) {
+          fatalCount = 0;
+        }
+
         if (fatalCount > 0) {
           NexmarkUtils.console("job has fatal errors, cancelling.");
           errors.add(String.format("Pipeline reported %s fatal errors", 
fatalCount));
diff --git 
a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/MetricsReader.java
 
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/MetricsReader.java
index 7dd8bcb..e5d1854 100644
--- 
a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/MetricsReader.java
+++ 
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/MetricsReader.java
@@ -17,9 +17,12 @@
  */
 package org.apache.beam.sdk.testutils.metrics;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.stream.StreamSupport;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.DistributionResult;
 import org.apache.beam.sdk.metrics.MetricNameFilter;
@@ -34,25 +37,30 @@ public class MetricsReader {
 
   private static final org.slf4j.Logger LOG = 
LoggerFactory.getLogger(MetricsReader.class);
 
-  private enum DistributionType {
-    MIN,
-    MAX
-  }
+  private static final long ERRONEOUS_METRIC_VALUE = -1;
 
   private final PipelineResult result;
 
   private final String namespace;
 
-  public MetricsReader(PipelineResult result, String namespace) {
+  private final long now;
+
+  @VisibleForTesting
+  MetricsReader(PipelineResult result, String namespace, long now) {
     this.result = result;
     this.namespace = namespace;
+    this.now = now;
+  }
+
+  public MetricsReader(PipelineResult result, String namespace) {
+    this(result, namespace, System.currentTimeMillis());
   }
 
   /**
-   * Return the current value for a long counter, or a default value if can't 
be retrieved. Note
-   * this uses only attempted metrics because some runners don't support 
committed metrics.
+   * Return the current value for a long counter, or -1 if can't be retrieved. 
Note this uses only
+   * attempted metrics because some runners don't support committed metrics.
    */
-  public long getCounterMetric(String name, long defaultValue) {
+  public long getCounterMetric(String name) {
     MetricQueryResults metrics =
         result
             .metrics()
@@ -70,30 +78,48 @@ public class MetricsReader {
     } catch (NoSuchElementException e) {
       LOG.error("Failed to get metric {}, from namespace {}", name, namespace);
     }
-    return defaultValue;
+    return ERRONEOUS_METRIC_VALUE;
   }
 
   /**
    * Return start time metric by counting the difference between "now" and min 
value from a
    * distribution metric.
    */
-  public long getStartTimeMetric(long now, String name) {
-    return this.getTimestampMetric(now, this.getDistributionMetric(name, 
DistributionType.MIN, -1));
+  public long getStartTimeMetric(String name) {
+    Iterable<MetricResult<DistributionResult>> timeDistributions = 
getDistributions(name);
+    return getLowestMin(timeDistributions);
+  }
+
+  private Long getLowestMin(Iterable<MetricResult<DistributionResult>> 
distributions) {
+    Optional<Long> lowestMin =
+        StreamSupport.stream(distributions.spliterator(), true)
+            .map(element -> element.getAttempted().getMin())
+            .filter(this::isCredible)
+            .min(Long::compareTo);
+
+    return lowestMin.orElse(ERRONEOUS_METRIC_VALUE);
   }
 
   /**
    * Return end time metric by counting the difference between "now" and MAX 
value from a
    * distribution metric.
    */
-  public long getEndTimeMetric(long now, String name) {
-    return this.getTimestampMetric(now, this.getDistributionMetric(name, 
DistributionType.MAX, -1));
+  public long getEndTimeMetric(String name) {
+    Iterable<MetricResult<DistributionResult>> timeDistributions = 
getDistributions(name);
+    return getGreatestMax(timeDistributions);
   }
 
-  /**
-   * Return the current value for a long counter, or a default value if can't 
be retrieved. Note
-   * this uses only attempted metrics because some runners don't support 
committed metrics.
-   */
-  private long getDistributionMetric(String name, DistributionType distType, 
long defaultValue) {
+  private Long getGreatestMax(Iterable<MetricResult<DistributionResult>> 
distributions) {
+    Optional<Long> greatestMax =
+        StreamSupport.stream(distributions.spliterator(), true)
+            .map(element -> element.getAttempted().getMax())
+            .filter(this::isCredible)
+            .max(Long::compareTo);
+
+    return greatestMax.orElse(ERRONEOUS_METRIC_VALUE);
+  }
+
+  private Iterable<MetricResult<DistributionResult>> getDistributions(String 
name) {
     MetricQueryResults metrics =
         result
             .metrics()
@@ -101,24 +127,7 @@ public class MetricsReader {
                 MetricsFilter.builder()
                     .addNameFilter(MetricNameFilter.named(namespace, name))
                     .build());
-    Iterable<MetricResult<DistributionResult>> distributions = 
metrics.getDistributions();
-
-    checkIfMetricResultIsUnique(name, distributions);
-
-    try {
-      MetricResult<DistributionResult> distributionResult = 
distributions.iterator().next();
-      switch (distType) {
-        case MIN:
-          return distributionResult.getAttempted().getMin();
-        case MAX:
-          return distributionResult.getAttempted().getMax();
-        default:
-          return defaultValue;
-      }
-    } catch (NoSuchElementException e) {
-      LOG.error("Failed to get distribution metric {} for namespace {}", name, 
namespace);
-    }
-    return defaultValue;
+    return metrics.getDistributions();
   }
 
   private <T> void checkIfMetricResultIsUnique(String name, 
Iterable<MetricResult<T>> metricResult)
@@ -133,14 +142,12 @@ public class MetricsReader {
         resultCount);
   }
 
-  /** Return the current value for a time counter, or -1 if can't be 
retrieved. */
-  private long getTimestampMetric(long now, long value) {
-    // timestamp metrics are used to monitor time of execution of transforms.
-    // If result timestamp metric is too far from now, consider that metric is 
erroneous
-
-    if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
-      return -1;
-    }
-    return value;
+  /**
+   * timestamp metrics are used to monitor time of execution of transforms. If 
result timestamp
+   * metric is too far from now, consider that metric is erroneous private 
boolean isCredible(long
+   * value) {
+   */
+  private boolean isCredible(long value) {
+    return (Math.abs(value - now) <= Duration.standardDays(10000).getMillis());
   }
 }
diff --git 
a/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/metrics/MetricsReaderTest.java
 
b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/metrics/MetricsReaderTest.java
index d0ad4a7..e2fef25 100644
--- 
a/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/metrics/MetricsReaderTest.java
+++ 
b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/metrics/MetricsReaderTest.java
@@ -28,7 +28,10 @@ import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -51,39 +54,49 @@ public class MetricsReaderTest {
 
     MetricsReader reader = new MetricsReader(result, NAMESPACE);
 
-    assertEquals(5, reader.getCounterMetric("counter", -1));
+    assertEquals(5, reader.getCounterMetric("counter"));
   }
 
   @Test
-  public void testStartTimeIsTheMinimumOfTheDistribution() {
+  public void testStartTimeIsTheMinimumFromAllCollectedDistributions() {
     List<Integer> sampleInputData = Arrays.asList(1, 2, 3, 4, 5);
 
-    createTestPipeline(sampleInputData, new MonitorWithTimeDistribution());
-    PipelineResult result = testPipeline.run();
-
-    MetricsReader reader = new MetricsReader(result, NAMESPACE);
+    createTestPipelineWithBranches(sampleInputData);
 
-    assertEquals(1, reader.getStartTimeMetric(0, "timeDist"));
+    PipelineResult result = testPipeline.run();
+    MetricsReader reader = new MetricsReader(result, NAMESPACE, 0);
+    assertEquals(1, reader.getStartTimeMetric("timeDist"));
   }
 
   @Test
-  public void testEndTimeIsTheMaximumOfTheDistribution() {
+  public void testEndTimeIsTheMaximumOfAllCollectedDistributions() {
     List<Integer> sampleInputData = Arrays.asList(1, 2, 3, 4, 5);
 
-    createTestPipeline(sampleInputData, new MonitorWithTimeDistribution());
+    createTestPipelineWithBranches(sampleInputData);
 
     PipelineResult result = testPipeline.run();
+    MetricsReader reader = new MetricsReader(result, NAMESPACE, 0);
+    assertEquals(10, reader.getEndTimeMetric("timeDist"));
+  }
 
-    MetricsReader reader = new MetricsReader(result, NAMESPACE);
-
-    assertEquals(5, reader.getEndTimeMetric(0, "timeDist"));
+  /**
+   * Branching pipelines ensure that multiple metric results of the same name 
are created. Thanks to
+   * that it is possible to test if MetricsReader can collect metrics in such 
case.
+   */
+  private void createTestPipelineWithBranches(List<Integer> sampleInputData) {
+    PCollection<Integer> inputData = 
testPipeline.apply(Create.of(sampleInputData));
+    inputData.apply("Monitor #1", ParDo.of(new MonitorWithTimeDistribution()));
+
+    inputData
+        .apply("Multiply input", MapElements.via(new MultiplyElements()))
+        .apply("Monitor #2", ParDo.of(new MonitorWithTimeDistribution()));
   }
 
   @Test
   public void doesntThrowIllegalStateExceptionWhenThereIsNoMetricFound() {
     PipelineResult result = testPipeline.run();
     MetricsReader reader = new MetricsReader(result, NAMESPACE);
-    reader.getCounterMetric("nonexistent", -1);
+    reader.getCounterMetric("nonexistent");
   }
 
   @Test
@@ -93,10 +106,10 @@ public class MetricsReaderTest {
     createTestPipeline(sampleInputData, new MonitorWithTimeDistribution());
     PipelineResult result = testPipeline.run();
 
-    MetricsReader reader = new MetricsReader(result, NAMESPACE);
+    MetricsReader reader = new MetricsReader(result, NAMESPACE, 900000000001L);
 
-    assertEquals(-1, reader.getStartTimeMetric(900000000001L, "timeDist"));
-    assertEquals(-1, reader.getEndTimeMetric(900000000001L, "timeDist"));
+    assertEquals(-1, reader.getStartTimeMetric("timeDist"));
+    assertEquals(-1, reader.getEndTimeMetric("timeDist"));
   }
 
   private void createTestPipeline(List<Integer> sampleInputData, DoFn<Integer, 
Integer> monitor) {
@@ -122,4 +135,11 @@ public class MetricsReaderTest {
       timeDistribution.update(c.element().longValue());
     }
   }
+
+  private static class MultiplyElements extends SimpleFunction<Integer, 
Integer> {
+    @Override
+    public Integer apply(Integer input) {
+      return input * 2;
+    }
+  }
 }

Reply via email to