This is an automated email from the ASF dual-hosted git repository.
scott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 68be16a [BEAM-6100] Collect metrics properly in Load tests (#7087)
68be16a is described below
commit 68be16a34823b7d322f3cc0c6d3a3aa53bdecccb
Author: Ćukasz Gajowy <[email protected]>
AuthorDate: Wed Nov 28 18:07:57 2018 +0100
[BEAM-6100] Collect metrics properly in Load tests (#7087)
---
.../beam/sdk/loadtests/CoGroupByKeyLoadTest.java | 20 ++---
.../apache/beam/sdk/loadtests/CombineLoadTest.java | 22 +++--
.../beam/sdk/loadtests/GroupByKeyLoadTest.java | 13 ++-
.../org/apache/beam/sdk/loadtests/LoadTest.java | 16 ++--
.../apache/beam/sdk/loadtests/ParDoLoadTest.java | 9 +-
.../{MetricsMonitor.java => ByteMonitor.java} | 19 +++--
.../sdk/loadtests/metrics/MetricsPublisher.java | 12 ++-
.../{MetricsMonitor.java => TimeMonitor.java} | 19 +++--
.../apache/beam/sdk/nexmark/NexmarkLauncher.java | 27 +++---
.../beam/sdk/testutils/metrics/MetricsReader.java | 97 ++++++++++++----------
.../sdk/testutils/metrics/MetricsReaderTest.java | 52 ++++++++----
11 files changed, 181 insertions(+), 125 deletions(-)
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CoGroupByKeyLoadTest.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CoGroupByKeyLoadTest.java
index 85d27ea..0cb8f0e 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CoGroupByKeyLoadTest.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CoGroupByKeyLoadTest.java
@@ -22,7 +22,7 @@ import java.util.Optional;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
import
org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO.SyntheticSourceOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticStep;
-import org.apache.beam.sdk.loadtests.metrics.MetricsMonitor;
+import org.apache.beam.sdk.loadtests.metrics.ByteMonitor;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
@@ -92,22 +92,22 @@ public class CoGroupByKeyLoadTest extends
LoadTest<CoGroupByKeyLoadTest.Options>
Optional<SyntheticStep> syntheticStep =
createStep(options.getStepOptions());
PCollection<KV<byte[], byte[]>> input =
- applyStepIfPresent(
- pipeline.apply("Read input",
SyntheticBoundedIO.readFrom(sourceOptions)),
- "Synthetic step for input",
- syntheticStep);
+ pipeline.apply("Read input",
SyntheticBoundedIO.readFrom(sourceOptions));
+ input = input.apply("Collect start time metrics (input)",
ParDo.of(runtimeMonitor));
+ applyStepIfPresent(input, "Synthetic step for input", syntheticStep);
PCollection<KV<byte[], byte[]>> coInput =
- applyStepIfPresent(
- pipeline.apply("Read co-input",
SyntheticBoundedIO.readFrom(coSourceOptions)),
- "Synthetic step for co-input",
- syntheticStep);
+ pipeline.apply("Read co-input",
SyntheticBoundedIO.readFrom(coSourceOptions));
+ coInput = coInput.apply("Collect start time metrics (co-input)",
ParDo.of(runtimeMonitor));
+ applyStepIfPresent(coInput, "Synthetic step for co-input", syntheticStep);
KeyedPCollectionTuple.of(INPUT_TAG, input)
.and(CO_INPUT_TAG, coInput)
.apply("CoGroupByKey", CoGroupByKey.create())
.apply("Ungroup and reiterate", ParDo.of(new
UngroupAndReiterate(options.getIterations())))
- .apply("Collect metrics", ParDo.of(new
MetricsMonitor(METRICS_NAMESPACE)));
+ .apply(
+ "Collect total bytes", ParDo.of(new ByteMonitor(METRICS_NAMESPACE,
"totalBytes.count")))
+ .apply("Collect end time metrics", ParDo.of(runtimeMonitor));
}
private static class UngroupAndReiterate
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java
index a85f23b..316d626 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java
@@ -25,7 +25,8 @@ import java.math.BigInteger;
import java.util.Optional;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
import org.apache.beam.sdk.io.synthetic.SyntheticStep;
-import org.apache.beam.sdk.loadtests.metrics.MetricsMonitor;
+import org.apache.beam.sdk.loadtests.metrics.ByteMonitor;
+import org.apache.beam.sdk.loadtests.metrics.TimeMonitor;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.transforms.Combine;
@@ -107,23 +108,30 @@ public class CombineLoadTest extends
LoadTest<CombineLoadTest.Options> {
@Override
protected void loadTest() throws IOException {
- PTransform combiner =
createPerKeyCombiner(options.getPerKeyCombinerType());
-
Optional<SyntheticStep> syntheticStep =
createStep(options.getStepOptions());
PCollection<KV<byte[], byte[]>> input =
pipeline
.apply("Read input", SyntheticBoundedIO.readFrom(sourceOptions))
- .apply("Collect metrics", ParDo.of(new
MetricsMonitor(METRICS_NAMESPACE)));
+ .apply(
+ "Collect start time metric",
+ ParDo.of(new TimeMonitor<>(METRICS_NAMESPACE, "runtime")))
+ .apply(
+ "Collect metrics",
+ ParDo.of(new ByteMonitor(METRICS_NAMESPACE,
"totalBytes.count")));
for (int i = 0; i < options.getFanout(); i++) {
applyStepIfPresent(input, format("Step: %d", i), syntheticStep)
- .apply(format("Convert to BigInteger: %d", i), MapElements.via(new
ByteValueToLong()))
- .apply(format("Combine: %d", i), combiner);
+ .apply(format("Convert to Long: %d", i), MapElements.via(new
ByteValueToLong()))
+ .apply(format("Combine: %d", i),
getPerKeyCombiner(options.getPerKeyCombinerType()))
+ .apply(
+ "Collect end time metric",
+ ParDo.of(new TimeMonitor<byte[], Object>(METRICS_NAMESPACE,
"runtime")));
}
}
- private PTransform createPerKeyCombiner(CombinerType combinerType) {
+ private PTransform<PCollection<KV<byte[], Long>>, ? extends PCollection>
getPerKeyCombiner(
+ CombinerType combinerType) {
switch (combinerType) {
case MEAN:
return Mean.perKey();
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
index fb0bf84..557cca7 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
@@ -23,7 +23,7 @@ import java.io.IOException;
import java.util.Optional;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
import org.apache.beam.sdk.io.synthetic.SyntheticStep;
-import org.apache.beam.sdk.loadtests.metrics.MetricsMonitor;
+import org.apache.beam.sdk.loadtests.metrics.ByteMonitor;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.transforms.DoFn;
@@ -84,15 +84,20 @@ public class GroupByKeyLoadTest extends
LoadTest<GroupByKeyLoadTest.Options> {
Optional<SyntheticStep> syntheticStep =
createStep(options.getStepOptions());
PCollection<KV<byte[], byte[]>> input =
- pipeline.apply(SyntheticBoundedIO.readFrom(sourceOptions));
+ pipeline
+ .apply(SyntheticBoundedIO.readFrom(sourceOptions))
+ .apply("Collect start time metrics", ParDo.of(runtimeMonitor))
+ .apply(
+ "Total bytes monitor",
+ ParDo.of(new ByteMonitor(METRICS_NAMESPACE,
"totalBytes.count")));
for (int branch = 0; branch < options.getFanout(); branch++) {
applyStepIfPresent(input, format("Synthetic step (%s)", branch),
syntheticStep)
- .apply(ParDo.of(new MetricsMonitor(METRICS_NAMESPACE)))
.apply(format("Group by key (%s)", branch), GroupByKey.create())
.apply(
format("Ungroup and reiterate (%s)", branch),
- ParDo.of(new UngroupAndReiterate(options.getIterations())));
+ ParDo.of(new UngroupAndReiterate(options.getIterations())))
+ .apply(format("Collect end time metrics (%s)", branch),
ParDo.of(runtimeMonitor));
}
}
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
index b3f2d1f..0f936d1 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticStep;
import org.apache.beam.sdk.loadtests.metrics.MetricsPublisher;
+import org.apache.beam.sdk.loadtests.metrics.TimeMonitor;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -38,24 +39,23 @@ abstract class LoadTest<OptionsT extends LoadTestOptions> {
private String metricsNamespace;
- OptionsT options;
+ protected TimeMonitor<byte[], byte[]> runtimeMonitor;
- SyntheticBoundedIO.SyntheticSourceOptions sourceOptions;
+ protected OptionsT options;
- SyntheticStep.Options stepOptions;
+ protected SyntheticBoundedIO.SyntheticSourceOptions sourceOptions;
- Pipeline pipeline;
+ protected SyntheticStep.Options stepOptions;
+
+ protected Pipeline pipeline;
LoadTest(String[] args, Class<OptionsT> testOptions, String
metricsNamespace) throws IOException {
this.metricsNamespace = metricsNamespace;
-
+ this.runtimeMonitor = new TimeMonitor<>(metricsNamespace, "runtime");
this.options = LoadTestOptions.readFromArgs(args, testOptions);
-
this.sourceOptions =
fromJsonString(options.getSourceOptions(),
SyntheticBoundedIO.SyntheticSourceOptions.class);
-
this.stepOptions = fromJsonString(options.getStepOptions(),
SyntheticStep.Options.class);
-
this.pipeline = Pipeline.create(options);
}
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/ParDoLoadTest.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/ParDoLoadTest.java
index 3916ad6..3e51f2a 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/ParDoLoadTest.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/ParDoLoadTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.loadtests;
import java.io.IOException;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
import org.apache.beam.sdk.io.synthetic.SyntheticStep;
-import org.apache.beam.sdk.loadtests.metrics.MetricsMonitor;
+import org.apache.beam.sdk.loadtests.metrics.ByteMonitor;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
@@ -73,13 +73,16 @@ public class ParDoLoadTest extends
LoadTest<ParDoLoadTest.Options> {
@Override
protected void loadTest() {
PCollection<KV<byte[], byte[]>> input =
- pipeline.apply("Read input",
SyntheticBoundedIO.readFrom(sourceOptions));
+ pipeline
+ .apply("Read input", SyntheticBoundedIO.readFrom(sourceOptions))
+ .apply(ParDo.of(runtimeMonitor))
+ .apply(ParDo.of(new ByteMonitor(METRICS_NAMESPACE,
"totalBytes.count")));
for (int i = 0; i < options.getNumberOfCounterOperations(); i++) {
input = input.apply(String.format("Step: %d", i), ParDo.of(new
SyntheticStep(stepOptions)));
}
- input.apply("Collect metrics", ParDo.of(new
MetricsMonitor(METRICS_NAMESPACE)));
+ input.apply(ParDo.of(runtimeMonitor));
}
public static void main(String[] args) throws IOException {
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsMonitor.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/ByteMonitor.java
similarity index 72%
copy from
sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsMonitor.java
copy to
sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/ByteMonitor.java
index b1785f2..26a3ef5 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsMonitor.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/ByteMonitor.java
@@ -18,26 +18,27 @@
package org.apache.beam.sdk.loadtests.metrics;
import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
-/** Monitors various metrics from within a pipeline. */
-public class MetricsMonitor extends DoFn<KV<byte[], byte[]>, KV<byte[],
byte[]>> {
-
- private Distribution timeDistribution;
+/**
+ * Monitor that records the number of bytes flowing through a PCollection.
+ *
+ * <p>To use: apply a monitor in a desired place in the pipeline. This will
capture how many bytes
+ * flew through this DoFn which then can be collected and written out using
{@link
+ * MetricsPublisher}.
+ */
+public class ByteMonitor extends DoFn<KV<byte[], byte[]>, KV<byte[], byte[]>> {
private Counter totalBytes;
- public MetricsMonitor(String namespace) {
- this.timeDistribution = Metrics.distribution(namespace, "runtime");
- this.totalBytes = Metrics.counter(namespace, "totalBytes.count");
+ public ByteMonitor(String namespace, String name) {
+ this.totalBytes = Metrics.counter(namespace, name);
}
@ProcessElement
public void processElement(ProcessContext c) {
- timeDistribution.update(System.currentTimeMillis());
totalBytes.inc(c.element().getKey().length +
c.element().getValue().length);
c.output(c.element());
}
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsPublisher.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsPublisher.java
index a6c05ab..365b95b 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsPublisher.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsPublisher.java
@@ -23,12 +23,18 @@ import org.apache.beam.sdk.testutils.metrics.MetricsReader;
/** Provides ways to publish metrics gathered during test invocation. */
public class MetricsPublisher {
+ /**
+ * This prints out metrics results to console. It will work only if metrics
with appropriate
+ * (conventional) names are present to be collected in {@link PipelineResult}
+ *
+ * <p>See {@link org.apache.beam.sdk.loadtests.GroupByKeyLoadTest} for hints
on how to use it.
+ */
public static void toConsole(PipelineResult result, String namespace) {
MetricsReader resultMetrics = new MetricsReader(result, namespace);
- long totalBytes = resultMetrics.getCounterMetric("totalBytes.count", -1);
- long startTime =
resultMetrics.getStartTimeMetric(System.currentTimeMillis(), "runtime");
- long endTime = resultMetrics.getEndTimeMetric(System.currentTimeMillis(),
"runtime");
+ long totalBytes = resultMetrics.getCounterMetric("totalBytes.count");
+ long startTime = resultMetrics.getStartTimeMetric("runtime");
+ long endTime = resultMetrics.getEndTimeMetric("runtime");
System.out.println(String.format("Total bytes: %s", totalBytes));
System.out.println(String.format("Total time (millis): %s", endTime -
startTime));
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsMonitor.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/TimeMonitor.java
similarity index 71%
rename from
sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsMonitor.java
rename to
sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/TimeMonitor.java
index b1785f2..128162d 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsMonitor.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/TimeMonitor.java
@@ -17,28 +17,29 @@
*/
package org.apache.beam.sdk.loadtests.metrics;
-import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
-/** Monitors various metrics from within a pipeline. */
-public class MetricsMonitor extends DoFn<KV<byte[], byte[]>, KV<byte[],
byte[]>> {
+/**
+ * Monitor that records processing time distribution in the pipeline.
+ *
+ * <p>To use: apply a monitor directly after each source and sink transform.
This will capture a
+ * distribution of element processing timestamps, which can be collected and
written out using
+ * {@link MetricsPublisher}.
+ */
+public class TimeMonitor<K, V> extends DoFn<KV<K, V>, KV<K, V>> {
private Distribution timeDistribution;
- private Counter totalBytes;
-
- public MetricsMonitor(String namespace) {
- this.timeDistribution = Metrics.distribution(namespace, "runtime");
- this.totalBytes = Metrics.counter(namespace, "totalBytes.count");
+ public TimeMonitor(String namespace, String name) {
+ this.timeDistribution = Metrics.distribution(namespace, name);
}
@ProcessElement
public void processElement(ProcessContext c) {
timeDistribution.update(System.currentTimeMillis());
- totalBytes.inc(c.element().getKey().length +
c.element().getValue().length);
c.output(c.element());
}
}
diff --git
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index cab4b83..c797064 100644
---
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -252,20 +252,20 @@ public class NexmarkLauncher<OptionT extends
NexmarkOptions> {
MetricsReader eventMetrics = new MetricsReader(result, eventMonitor.name);
- long numEvents = eventMetrics.getCounterMetric(eventMonitor.prefix +
".elements", -1);
- long numEventBytes = eventMetrics.getCounterMetric(eventMonitor.prefix +
".bytes", -1);
- long eventStart = eventMetrics.getStartTimeMetric(now, eventMonitor.prefix
+ ".startTime");
- long eventEnd = eventMetrics.getEndTimeMetric(now, eventMonitor.prefix +
".endTime");
+ long numEvents = eventMetrics.getCounterMetric(eventMonitor.prefix +
".elements");
+ long numEventBytes = eventMetrics.getCounterMetric(eventMonitor.prefix +
".bytes");
+ long eventStart = eventMetrics.getStartTimeMetric(eventMonitor.prefix +
".startTime");
+ long eventEnd = eventMetrics.getEndTimeMetric(eventMonitor.prefix +
".endTime");
MetricsReader resultMetrics = new MetricsReader(result,
resultMonitor.name);
- long numResults = resultMetrics.getCounterMetric(resultMonitor.prefix +
".elements", -1);
- long numResultBytes = resultMetrics.getCounterMetric(resultMonitor.prefix
+ ".bytes", -1);
- long resultStart = resultMetrics.getStartTimeMetric(now,
resultMonitor.prefix + ".startTime");
- long resultEnd = resultMetrics.getEndTimeMetric(now, resultMonitor.prefix
+ ".endTime");
+ long numResults = resultMetrics.getCounterMetric(resultMonitor.prefix +
".elements");
+ long numResultBytes = resultMetrics.getCounterMetric(resultMonitor.prefix
+ ".bytes");
+ long resultStart = resultMetrics.getStartTimeMetric(resultMonitor.prefix +
".startTime");
+ long resultEnd = resultMetrics.getEndTimeMetric(resultMonitor.prefix +
".endTime");
long timestampStart =
- resultMetrics.getStartTimeMetric(now, resultMonitor.prefix +
".startTimestamp");
- long timestampEnd = resultMetrics.getEndTimeMetric(now,
resultMonitor.prefix + ".endTimestamp");
+ resultMetrics.getStartTimeMetric(resultMonitor.prefix +
".startTimestamp");
+ long timestampEnd = resultMetrics.getEndTimeMetric(resultMonitor.prefix +
".endTimestamp");
long effectiveEnd = -1;
if (eventEnd >= 0 && resultEnd >= 0) {
@@ -449,7 +449,12 @@ public class NexmarkLauncher<OptionT extends
NexmarkOptions> {
if (options.isStreaming() && !waitingForShutdown) {
Duration quietFor = new Duration(lastActivityMsSinceEpoch, now);
- long fatalCount = new MetricsReader(job,
query.getName()).getCounterMetric("fatal", 0);
+ long fatalCount = new MetricsReader(job,
query.getName()).getCounterMetric("fatal");
+
+ if (fatalCount == -1) {
+ fatalCount = 0;
+ }
+
if (fatalCount > 0) {
NexmarkUtils.console("job has fatal errors, cancelling.");
errors.add(String.format("Pipeline reported %s fatal errors",
fatalCount));
diff --git
a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/MetricsReader.java
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/MetricsReader.java
index 7dd8bcb..e5d1854 100644
---
a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/MetricsReader.java
+++
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/MetricsReader.java
@@ -17,9 +17,12 @@
*/
package org.apache.beam.sdk.testutils.metrics;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.stream.StreamSupport;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.MetricNameFilter;
@@ -34,25 +37,30 @@ public class MetricsReader {
private static final org.slf4j.Logger LOG =
LoggerFactory.getLogger(MetricsReader.class);
- private enum DistributionType {
- MIN,
- MAX
- }
+ private static final long ERRONEOUS_METRIC_VALUE = -1;
private final PipelineResult result;
private final String namespace;
- public MetricsReader(PipelineResult result, String namespace) {
+ private final long now;
+
+ @VisibleForTesting
+ MetricsReader(PipelineResult result, String namespace, long now) {
this.result = result;
this.namespace = namespace;
+ this.now = now;
+ }
+
+ public MetricsReader(PipelineResult result, String namespace) {
+ this(result, namespace, System.currentTimeMillis());
}
/**
- * Return the current value for a long counter, or a default value if can't
be retrieved. Note
- * this uses only attempted metrics because some runners don't support
committed metrics.
+ * Return the current value for a long counter, or -1 if can't be retrieved.
Note this uses only
+ * attempted metrics because some runners don't support committed metrics.
*/
- public long getCounterMetric(String name, long defaultValue) {
+ public long getCounterMetric(String name) {
MetricQueryResults metrics =
result
.metrics()
@@ -70,30 +78,48 @@ public class MetricsReader {
} catch (NoSuchElementException e) {
LOG.error("Failed to get metric {}, from namespace {}", name, namespace);
}
- return defaultValue;
+ return ERRONEOUS_METRIC_VALUE;
}
/**
* Return start time metric by counting the difference between "now" and min
value from a
* distribution metric.
*/
- public long getStartTimeMetric(long now, String name) {
- return this.getTimestampMetric(now, this.getDistributionMetric(name,
DistributionType.MIN, -1));
+ public long getStartTimeMetric(String name) {
+ Iterable<MetricResult<DistributionResult>> timeDistributions =
getDistributions(name);
+ return getLowestMin(timeDistributions);
+ }
+
+ private Long getLowestMin(Iterable<MetricResult<DistributionResult>>
distributions) {
+ Optional<Long> lowestMin =
+ StreamSupport.stream(distributions.spliterator(), true)
+ .map(element -> element.getAttempted().getMin())
+ .filter(this::isCredible)
+ .min(Long::compareTo);
+
+ return lowestMin.orElse(ERRONEOUS_METRIC_VALUE);
}
/**
* Return end time metric by counting the difference between "now" and MAX
value from a
* distribution metric.
*/
- public long getEndTimeMetric(long now, String name) {
- return this.getTimestampMetric(now, this.getDistributionMetric(name,
DistributionType.MAX, -1));
+ public long getEndTimeMetric(String name) {
+ Iterable<MetricResult<DistributionResult>> timeDistributions =
getDistributions(name);
+ return getGreatestMax(timeDistributions);
}
- /**
- * Return the current value for a long counter, or a default value if can't
be retrieved. Note
- * this uses only attempted metrics because some runners don't support
committed metrics.
- */
- private long getDistributionMetric(String name, DistributionType distType,
long defaultValue) {
+ private Long getGreatestMax(Iterable<MetricResult<DistributionResult>>
distributions) {
+ Optional<Long> greatestMax =
+ StreamSupport.stream(distributions.spliterator(), true)
+ .map(element -> element.getAttempted().getMax())
+ .filter(this::isCredible)
+ .max(Long::compareTo);
+
+ return greatestMax.orElse(ERRONEOUS_METRIC_VALUE);
+ }
+
+ private Iterable<MetricResult<DistributionResult>> getDistributions(String
name) {
MetricQueryResults metrics =
result
.metrics()
@@ -101,24 +127,7 @@ public class MetricsReader {
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.named(namespace, name))
.build());
- Iterable<MetricResult<DistributionResult>> distributions =
metrics.getDistributions();
-
- checkIfMetricResultIsUnique(name, distributions);
-
- try {
- MetricResult<DistributionResult> distributionResult =
distributions.iterator().next();
- switch (distType) {
- case MIN:
- return distributionResult.getAttempted().getMin();
- case MAX:
- return distributionResult.getAttempted().getMax();
- default:
- return defaultValue;
- }
- } catch (NoSuchElementException e) {
- LOG.error("Failed to get distribution metric {} for namespace {}", name,
namespace);
- }
- return defaultValue;
+ return metrics.getDistributions();
}
private <T> void checkIfMetricResultIsUnique(String name,
Iterable<MetricResult<T>> metricResult)
@@ -133,14 +142,12 @@ public class MetricsReader {
resultCount);
}
- /** Return the current value for a time counter, or -1 if can't be
retrieved. */
- private long getTimestampMetric(long now, long value) {
- // timestamp metrics are used to monitor time of execution of transforms.
- // If result timestamp metric is too far from now, consider that metric is
erroneous
-
- if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
- return -1;
- }
- return value;
+ /**
+ * timestamp metrics are used to monitor time of execution of transforms. If
result timestamp
+ * metric is too far from now, consider that metric is erroneous private
boolean isCredible(long
+ * value) {
+ */
+ private boolean isCredible(long value) {
+ return (Math.abs(value - now) <= Duration.standardDays(10000).getMillis());
}
}
diff --git
a/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/metrics/MetricsReaderTest.java
b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/metrics/MetricsReaderTest.java
index d0ad4a7..e2fef25 100644
---
a/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/metrics/MetricsReaderTest.java
+++
b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/metrics/MetricsReaderTest.java
@@ -28,7 +28,10 @@ import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -51,39 +54,49 @@ public class MetricsReaderTest {
MetricsReader reader = new MetricsReader(result, NAMESPACE);
- assertEquals(5, reader.getCounterMetric("counter", -1));
+ assertEquals(5, reader.getCounterMetric("counter"));
}
@Test
- public void testStartTimeIsTheMinimumOfTheDistribution() {
+ public void testStartTimeIsTheMinimumFromAllCollectedDistributions() {
List<Integer> sampleInputData = Arrays.asList(1, 2, 3, 4, 5);
- createTestPipeline(sampleInputData, new MonitorWithTimeDistribution());
- PipelineResult result = testPipeline.run();
-
- MetricsReader reader = new MetricsReader(result, NAMESPACE);
+ createTestPipelineWithBranches(sampleInputData);
- assertEquals(1, reader.getStartTimeMetric(0, "timeDist"));
+ PipelineResult result = testPipeline.run();
+ MetricsReader reader = new MetricsReader(result, NAMESPACE, 0);
+ assertEquals(1, reader.getStartTimeMetric("timeDist"));
}
@Test
- public void testEndTimeIsTheMaximumOfTheDistribution() {
+ public void testEndTimeIsTheMaximumOfAllCollectedDistributions() {
List<Integer> sampleInputData = Arrays.asList(1, 2, 3, 4, 5);
- createTestPipeline(sampleInputData, new MonitorWithTimeDistribution());
+ createTestPipelineWithBranches(sampleInputData);
PipelineResult result = testPipeline.run();
+ MetricsReader reader = new MetricsReader(result, NAMESPACE, 0);
+ assertEquals(10, reader.getEndTimeMetric("timeDist"));
+ }
- MetricsReader reader = new MetricsReader(result, NAMESPACE);
-
- assertEquals(5, reader.getEndTimeMetric(0, "timeDist"));
+ /**
+ * Branching pipelines ensure that multiple metric results of the same name
are created. Thanks to
+ * that it is possible to test if MetricsReader can collect metrics in such
case.
+ */
+ private void createTestPipelineWithBranches(List<Integer> sampleInputData) {
+ PCollection<Integer> inputData =
testPipeline.apply(Create.of(sampleInputData));
+ inputData.apply("Monitor #1", ParDo.of(new MonitorWithTimeDistribution()));
+
+ inputData
+ .apply("Multiply input", MapElements.via(new MultiplyElements()))
+ .apply("Monitor #2", ParDo.of(new MonitorWithTimeDistribution()));
}
@Test
public void doesntThrowIllegalStateExceptionWhenThereIsNoMetricFound() {
PipelineResult result = testPipeline.run();
MetricsReader reader = new MetricsReader(result, NAMESPACE);
- reader.getCounterMetric("nonexistent", -1);
+ reader.getCounterMetric("nonexistent");
}
@Test
@@ -93,10 +106,10 @@ public class MetricsReaderTest {
createTestPipeline(sampleInputData, new MonitorWithTimeDistribution());
PipelineResult result = testPipeline.run();
- MetricsReader reader = new MetricsReader(result, NAMESPACE);
+ MetricsReader reader = new MetricsReader(result, NAMESPACE, 900000000001L);
- assertEquals(-1, reader.getStartTimeMetric(900000000001L, "timeDist"));
- assertEquals(-1, reader.getEndTimeMetric(900000000001L, "timeDist"));
+ assertEquals(-1, reader.getStartTimeMetric("timeDist"));
+ assertEquals(-1, reader.getEndTimeMetric("timeDist"));
}
private void createTestPipeline(List<Integer> sampleInputData, DoFn<Integer,
Integer> monitor) {
@@ -122,4 +135,11 @@ public class MetricsReaderTest {
timeDistribution.update(c.element().longValue());
}
}
+
+ private static class MultiplyElements extends SimpleFunction<Integer,
Integer> {
+ @Override
+ public Integer apply(Integer input) {
+ return input * 2;
+ }
+ }
}