[
https://issues.apache.org/jira/browse/BEAM-6100?focusedWorklogId=170320&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-170320
]
ASF GitHub Bot logged work on BEAM-6100:
----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Nov/18 17:08
Start Date: 28/Nov/18 17:08
Worklog Time Spent: 10m
Work Description: swegner closed pull request #7087: [BEAM-6100] Collect
metrics properly in Load tests
URL: https://github.com/apache/beam/pull/7087
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CoGroupByKeyLoadTest.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CoGroupByKeyLoadTest.java
index 85d27ea81239..0cb8f0e50ef5 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CoGroupByKeyLoadTest.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CoGroupByKeyLoadTest.java
@@ -22,7 +22,7 @@
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
import
org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO.SyntheticSourceOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticStep;
-import org.apache.beam.sdk.loadtests.metrics.MetricsMonitor;
+import org.apache.beam.sdk.loadtests.metrics.ByteMonitor;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
@@ -92,22 +92,22 @@ void loadTest() throws IOException {
Optional<SyntheticStep> syntheticStep =
createStep(options.getStepOptions());
PCollection<KV<byte[], byte[]>> input =
- applyStepIfPresent(
- pipeline.apply("Read input",
SyntheticBoundedIO.readFrom(sourceOptions)),
- "Synthetic step for input",
- syntheticStep);
+ pipeline.apply("Read input",
SyntheticBoundedIO.readFrom(sourceOptions));
+ input = input.apply("Collect start time metrics (input)",
ParDo.of(runtimeMonitor));
+ applyStepIfPresent(input, "Synthetic step for input", syntheticStep);
PCollection<KV<byte[], byte[]>> coInput =
- applyStepIfPresent(
- pipeline.apply("Read co-input",
SyntheticBoundedIO.readFrom(coSourceOptions)),
- "Synthetic step for co-input",
- syntheticStep);
+ pipeline.apply("Read co-input",
SyntheticBoundedIO.readFrom(coSourceOptions));
+ coInput = coInput.apply("Collect start time metrics (co-input)",
ParDo.of(runtimeMonitor));
+ applyStepIfPresent(coInput, "Synthetic step for co-input", syntheticStep);
KeyedPCollectionTuple.of(INPUT_TAG, input)
.and(CO_INPUT_TAG, coInput)
.apply("CoGroupByKey", CoGroupByKey.create())
.apply("Ungroup and reiterate", ParDo.of(new
UngroupAndReiterate(options.getIterations())))
- .apply("Collect metrics", ParDo.of(new
MetricsMonitor(METRICS_NAMESPACE)));
+ .apply(
+ "Collect total bytes", ParDo.of(new ByteMonitor(METRICS_NAMESPACE,
"totalBytes.count")))
+ .apply("Collect end time metrics", ParDo.of(runtimeMonitor));
}
private static class UngroupAndReiterate
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java
index a85f23bfaef6..316d626c33f7 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java
@@ -25,7 +25,8 @@
import java.util.Optional;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
import org.apache.beam.sdk.io.synthetic.SyntheticStep;
-import org.apache.beam.sdk.loadtests.metrics.MetricsMonitor;
+import org.apache.beam.sdk.loadtests.metrics.ByteMonitor;
+import org.apache.beam.sdk.loadtests.metrics.TimeMonitor;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.transforms.Combine;
@@ -107,23 +108,30 @@ private CombineLoadTest(String[] args) throws IOException
{
@Override
protected void loadTest() throws IOException {
- PTransform combiner =
createPerKeyCombiner(options.getPerKeyCombinerType());
-
Optional<SyntheticStep> syntheticStep =
createStep(options.getStepOptions());
PCollection<KV<byte[], byte[]>> input =
pipeline
.apply("Read input", SyntheticBoundedIO.readFrom(sourceOptions))
- .apply("Collect metrics", ParDo.of(new
MetricsMonitor(METRICS_NAMESPACE)));
+ .apply(
+ "Collect start time metric",
+ ParDo.of(new TimeMonitor<>(METRICS_NAMESPACE, "runtime")))
+ .apply(
+ "Collect metrics",
+ ParDo.of(new ByteMonitor(METRICS_NAMESPACE,
"totalBytes.count")));
for (int i = 0; i < options.getFanout(); i++) {
applyStepIfPresent(input, format("Step: %d", i), syntheticStep)
- .apply(format("Convert to BigInteger: %d", i), MapElements.via(new
ByteValueToLong()))
- .apply(format("Combine: %d", i), combiner);
+ .apply(format("Convert to Long: %d", i), MapElements.via(new
ByteValueToLong()))
+ .apply(format("Combine: %d", i),
getPerKeyCombiner(options.getPerKeyCombinerType()))
+ .apply(
+ "Collect end time metric",
+ ParDo.of(new TimeMonitor<byte[], Object>(METRICS_NAMESPACE,
"runtime")));
}
}
- private PTransform createPerKeyCombiner(CombinerType combinerType) {
+ private PTransform<PCollection<KV<byte[], Long>>, ? extends PCollection>
getPerKeyCombiner(
+ CombinerType combinerType) {
switch (combinerType) {
case MEAN:
return Mean.perKey();
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
index fb0bf8411079..557cca7e067c 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
@@ -23,7 +23,7 @@
import java.util.Optional;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
import org.apache.beam.sdk.io.synthetic.SyntheticStep;
-import org.apache.beam.sdk.loadtests.metrics.MetricsMonitor;
+import org.apache.beam.sdk.loadtests.metrics.ByteMonitor;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.transforms.DoFn;
@@ -84,15 +84,20 @@ void loadTest() throws IOException {
Optional<SyntheticStep> syntheticStep =
createStep(options.getStepOptions());
PCollection<KV<byte[], byte[]>> input =
- pipeline.apply(SyntheticBoundedIO.readFrom(sourceOptions));
+ pipeline
+ .apply(SyntheticBoundedIO.readFrom(sourceOptions))
+ .apply("Collect start time metrics", ParDo.of(runtimeMonitor))
+ .apply(
+ "Total bytes monitor",
+ ParDo.of(new ByteMonitor(METRICS_NAMESPACE,
"totalBytes.count")));
for (int branch = 0; branch < options.getFanout(); branch++) {
applyStepIfPresent(input, format("Synthetic step (%s)", branch),
syntheticStep)
- .apply(ParDo.of(new MetricsMonitor(METRICS_NAMESPACE)))
.apply(format("Group by key (%s)", branch), GroupByKey.create())
.apply(
format("Ungroup and reiterate (%s)", branch),
- ParDo.of(new UngroupAndReiterate(options.getIterations())));
+ ParDo.of(new UngroupAndReiterate(options.getIterations())))
+ .apply(format("Collect end time metrics (%s)", branch),
ParDo.of(runtimeMonitor));
}
}
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
index b3f2d1f02888..0f936d11583c 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
@@ -26,6 +26,7 @@
import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticStep;
import org.apache.beam.sdk.loadtests.metrics.MetricsPublisher;
+import org.apache.beam.sdk.loadtests.metrics.TimeMonitor;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -38,24 +39,23 @@
private String metricsNamespace;
- OptionsT options;
+ protected TimeMonitor<byte[], byte[]> runtimeMonitor;
- SyntheticBoundedIO.SyntheticSourceOptions sourceOptions;
+ protected OptionsT options;
- SyntheticStep.Options stepOptions;
+ protected SyntheticBoundedIO.SyntheticSourceOptions sourceOptions;
- Pipeline pipeline;
+ protected SyntheticStep.Options stepOptions;
+
+ protected Pipeline pipeline;
LoadTest(String[] args, Class<OptionsT> testOptions, String
metricsNamespace) throws IOException {
this.metricsNamespace = metricsNamespace;
-
+ this.runtimeMonitor = new TimeMonitor<>(metricsNamespace, "runtime");
this.options = LoadTestOptions.readFromArgs(args, testOptions);
-
this.sourceOptions =
fromJsonString(options.getSourceOptions(),
SyntheticBoundedIO.SyntheticSourceOptions.class);
-
this.stepOptions = fromJsonString(options.getStepOptions(),
SyntheticStep.Options.class);
-
this.pipeline = Pipeline.create(options);
}
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/ParDoLoadTest.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/ParDoLoadTest.java
index 3916ad6eb119..3e51f2a73131 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/ParDoLoadTest.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/ParDoLoadTest.java
@@ -20,7 +20,7 @@
import java.io.IOException;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
import org.apache.beam.sdk.io.synthetic.SyntheticStep;
-import org.apache.beam.sdk.loadtests.metrics.MetricsMonitor;
+import org.apache.beam.sdk.loadtests.metrics.ByteMonitor;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
@@ -73,13 +73,16 @@ private ParDoLoadTest(String[] args) throws IOException {
@Override
protected void loadTest() {
PCollection<KV<byte[], byte[]>> input =
- pipeline.apply("Read input",
SyntheticBoundedIO.readFrom(sourceOptions));
+ pipeline
+ .apply("Read input", SyntheticBoundedIO.readFrom(sourceOptions))
+ .apply(ParDo.of(runtimeMonitor))
+ .apply(ParDo.of(new ByteMonitor(METRICS_NAMESPACE,
"totalBytes.count")));
for (int i = 0; i < options.getNumberOfCounterOperations(); i++) {
input = input.apply(String.format("Step: %d", i), ParDo.of(new
SyntheticStep(stepOptions)));
}
- input.apply("Collect metrics", ParDo.of(new
MetricsMonitor(METRICS_NAMESPACE)));
+ input.apply(ParDo.of(runtimeMonitor));
}
public static void main(String[] args) throws IOException {
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsMonitor.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/ByteMonitor.java
similarity index 72%
rename from
sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsMonitor.java
rename to
sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/ByteMonitor.java
index b1785f2f8fea..26a3ef5e3529 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsMonitor.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/ByteMonitor.java
@@ -18,26 +18,27 @@
package org.apache.beam.sdk.loadtests.metrics;
import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
-/** Monitors various metrics from within a pipeline. */
-public class MetricsMonitor extends DoFn<KV<byte[], byte[]>, KV<byte[],
byte[]>> {
-
- private Distribution timeDistribution;
+/**
+ * Monitor that records the number of bytes flowing through a PCollection.
+ *
+ * <p>To use: apply a monitor in a desired place in the pipeline. This will
capture how many bytes
+ * flew through this DoFn which then can be collected and written out using
{@link
+ * MetricsPublisher}.
+ */
+public class ByteMonitor extends DoFn<KV<byte[], byte[]>, KV<byte[], byte[]>> {
private Counter totalBytes;
- public MetricsMonitor(String namespace) {
- this.timeDistribution = Metrics.distribution(namespace, "runtime");
- this.totalBytes = Metrics.counter(namespace, "totalBytes.count");
+ public ByteMonitor(String namespace, String name) {
+ this.totalBytes = Metrics.counter(namespace, name);
}
@ProcessElement
public void processElement(ProcessContext c) {
- timeDistribution.update(System.currentTimeMillis());
totalBytes.inc(c.element().getKey().length +
c.element().getValue().length);
c.output(c.element());
}
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsPublisher.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsPublisher.java
index a6c05abbd01a..365b95babca2 100644
---
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsPublisher.java
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/MetricsPublisher.java
@@ -23,12 +23,18 @@
/** Provides ways to publish metrics gathered during test invocation. */
public class MetricsPublisher {
+ /**
+ * This prints out metrics results to console. It will work only if metrics
with appropriate
+ * (conventional) names are present to be collected in {@link PipelineResult}
+ *
+ * <p>See {@link org.apache.beam.sdk.loadtests.GroupByKeyLoadTest} for hints
on how to use it.
+ */
public static void toConsole(PipelineResult result, String namespace) {
MetricsReader resultMetrics = new MetricsReader(result, namespace);
- long totalBytes = resultMetrics.getCounterMetric("totalBytes.count", -1);
- long startTime =
resultMetrics.getStartTimeMetric(System.currentTimeMillis(), "runtime");
- long endTime = resultMetrics.getEndTimeMetric(System.currentTimeMillis(),
"runtime");
+ long totalBytes = resultMetrics.getCounterMetric("totalBytes.count");
+ long startTime = resultMetrics.getStartTimeMetric("runtime");
+ long endTime = resultMetrics.getEndTimeMetric("runtime");
System.out.println(String.format("Total bytes: %s", totalBytes));
System.out.println(String.format("Total time (millis): %s", endTime -
startTime));
diff --git
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/TimeMonitor.java
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/TimeMonitor.java
new file mode 100644
index 000000000000..128162da41c6
--- /dev/null
+++
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/metrics/TimeMonitor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.loadtests.metrics;
+
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Monitor that records processing time distribution in the pipeline.
+ *
+ * <p>To use: apply a monitor directly after each source and sink transform.
This will capture a
+ * distribution of element processing timestamps, which can be collected and
written out using
+ * {@link MetricsPublisher}.
+ */
+public class TimeMonitor<K, V> extends DoFn<KV<K, V>, KV<K, V>> {
+
+ private Distribution timeDistribution;
+
+ public TimeMonitor(String namespace, String name) {
+ this.timeDistribution = Metrics.distribution(namespace, name);
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ timeDistribution.update(System.currentTimeMillis());
+ c.output(c.element());
+ }
+}
diff --git
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index 73b8344e21c3..a852881416f3 100644
---
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -252,20 +252,20 @@ private NexmarkPerf currentPerf(
MetricsReader eventMetrics = new MetricsReader(result, eventMonitor.name);
- long numEvents = eventMetrics.getCounterMetric(eventMonitor.prefix +
".elements", -1);
- long numEventBytes = eventMetrics.getCounterMetric(eventMonitor.prefix +
".bytes", -1);
- long eventStart = eventMetrics.getStartTimeMetric(now, eventMonitor.prefix
+ ".startTime");
- long eventEnd = eventMetrics.getEndTimeMetric(now, eventMonitor.prefix +
".endTime");
+ long numEvents = eventMetrics.getCounterMetric(eventMonitor.prefix +
".elements");
+ long numEventBytes = eventMetrics.getCounterMetric(eventMonitor.prefix +
".bytes");
+ long eventStart = eventMetrics.getStartTimeMetric(eventMonitor.prefix +
".startTime");
+ long eventEnd = eventMetrics.getEndTimeMetric(eventMonitor.prefix +
".endTime");
MetricsReader resultMetrics = new MetricsReader(result,
resultMonitor.name);
- long numResults = resultMetrics.getCounterMetric(resultMonitor.prefix +
".elements", -1);
- long numResultBytes = resultMetrics.getCounterMetric(resultMonitor.prefix
+ ".bytes", -1);
- long resultStart = resultMetrics.getStartTimeMetric(now,
resultMonitor.prefix + ".startTime");
- long resultEnd = resultMetrics.getEndTimeMetric(now, resultMonitor.prefix
+ ".endTime");
+ long numResults = resultMetrics.getCounterMetric(resultMonitor.prefix +
".elements");
+ long numResultBytes = resultMetrics.getCounterMetric(resultMonitor.prefix
+ ".bytes");
+ long resultStart = resultMetrics.getStartTimeMetric(resultMonitor.prefix +
".startTime");
+ long resultEnd = resultMetrics.getEndTimeMetric(resultMonitor.prefix +
".endTime");
long timestampStart =
- resultMetrics.getStartTimeMetric(now, resultMonitor.prefix +
".startTimestamp");
- long timestampEnd = resultMetrics.getEndTimeMetric(now,
resultMonitor.prefix + ".endTimestamp");
+ resultMetrics.getStartTimeMetric(resultMonitor.prefix +
".startTimestamp");
+ long timestampEnd = resultMetrics.getEndTimeMetric(resultMonitor.prefix +
".endTimestamp");
long effectiveEnd = -1;
if (eventEnd >= 0 && resultEnd >= 0) {
@@ -449,7 +449,12 @@ private NexmarkPerf monitor(NexmarkQuery query) {
if (options.isStreaming() && !waitingForShutdown) {
Duration quietFor = new Duration(lastActivityMsSinceEpoch, now);
- long fatalCount = new MetricsReader(job,
query.getName()).getCounterMetric("fatal", 0);
+ long fatalCount = new MetricsReader(job,
query.getName()).getCounterMetric("fatal");
+
+ if (fatalCount == -1) {
+ fatalCount = 0;
+ }
+
if (fatalCount > 0) {
NexmarkUtils.console("job has fatal errors, cancelling.");
errors.add(String.format("Pipeline reported %s fatal errors",
fatalCount));
diff --git
a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/MetricsReader.java
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/MetricsReader.java
index 7dd8bcb5785e..e5d18544c87d 100644
---
a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/MetricsReader.java
+++
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/MetricsReader.java
@@ -17,9 +17,12 @@
*/
package org.apache.beam.sdk.testutils.metrics;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.stream.StreamSupport;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.MetricNameFilter;
@@ -34,25 +37,30 @@
private static final org.slf4j.Logger LOG =
LoggerFactory.getLogger(MetricsReader.class);
- private enum DistributionType {
- MIN,
- MAX
- }
+ private static final long ERRONEOUS_METRIC_VALUE = -1;
private final PipelineResult result;
private final String namespace;
- public MetricsReader(PipelineResult result, String namespace) {
+ private final long now;
+
+ @VisibleForTesting
+ MetricsReader(PipelineResult result, String namespace, long now) {
this.result = result;
this.namespace = namespace;
+ this.now = now;
+ }
+
+ public MetricsReader(PipelineResult result, String namespace) {
+ this(result, namespace, System.currentTimeMillis());
}
/**
- * Return the current value for a long counter, or a default value if can't
be retrieved. Note
- * this uses only attempted metrics because some runners don't support
committed metrics.
+ * Return the current value for a long counter, or -1 if can't be retrieved.
Note this uses only
+ * attempted metrics because some runners don't support committed metrics.
*/
- public long getCounterMetric(String name, long defaultValue) {
+ public long getCounterMetric(String name) {
MetricQueryResults metrics =
result
.metrics()
@@ -70,30 +78,48 @@ public long getCounterMetric(String name, long
defaultValue) {
} catch (NoSuchElementException e) {
LOG.error("Failed to get metric {}, from namespace {}", name, namespace);
}
- return defaultValue;
+ return ERRONEOUS_METRIC_VALUE;
}
/**
* Return start time metric by counting the difference between "now" and min
value from a
* distribution metric.
*/
- public long getStartTimeMetric(long now, String name) {
- return this.getTimestampMetric(now, this.getDistributionMetric(name,
DistributionType.MIN, -1));
+ public long getStartTimeMetric(String name) {
+ Iterable<MetricResult<DistributionResult>> timeDistributions =
getDistributions(name);
+ return getLowestMin(timeDistributions);
+ }
+
+ private Long getLowestMin(Iterable<MetricResult<DistributionResult>>
distributions) {
+ Optional<Long> lowestMin =
+ StreamSupport.stream(distributions.spliterator(), true)
+ .map(element -> element.getAttempted().getMin())
+ .filter(this::isCredible)
+ .min(Long::compareTo);
+
+ return lowestMin.orElse(ERRONEOUS_METRIC_VALUE);
}
/**
* Return end time metric by counting the difference between "now" and MAX
value from a
* distribution metric.
*/
- public long getEndTimeMetric(long now, String name) {
- return this.getTimestampMetric(now, this.getDistributionMetric(name,
DistributionType.MAX, -1));
+ public long getEndTimeMetric(String name) {
+ Iterable<MetricResult<DistributionResult>> timeDistributions =
getDistributions(name);
+ return getGreatestMax(timeDistributions);
}
- /**
- * Return the current value for a long counter, or a default value if can't
be retrieved. Note
- * this uses only attempted metrics because some runners don't support
committed metrics.
- */
- private long getDistributionMetric(String name, DistributionType distType,
long defaultValue) {
+ private Long getGreatestMax(Iterable<MetricResult<DistributionResult>>
distributions) {
+ Optional<Long> greatestMax =
+ StreamSupport.stream(distributions.spliterator(), true)
+ .map(element -> element.getAttempted().getMax())
+ .filter(this::isCredible)
+ .max(Long::compareTo);
+
+ return greatestMax.orElse(ERRONEOUS_METRIC_VALUE);
+ }
+
+ private Iterable<MetricResult<DistributionResult>> getDistributions(String
name) {
MetricQueryResults metrics =
result
.metrics()
@@ -101,24 +127,7 @@ private long getDistributionMetric(String name,
DistributionType distType, long
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.named(namespace, name))
.build());
- Iterable<MetricResult<DistributionResult>> distributions =
metrics.getDistributions();
-
- checkIfMetricResultIsUnique(name, distributions);
-
- try {
- MetricResult<DistributionResult> distributionResult =
distributions.iterator().next();
- switch (distType) {
- case MIN:
- return distributionResult.getAttempted().getMin();
- case MAX:
- return distributionResult.getAttempted().getMax();
- default:
- return defaultValue;
- }
- } catch (NoSuchElementException e) {
- LOG.error("Failed to get distribution metric {} for namespace {}", name,
namespace);
- }
- return defaultValue;
+ return metrics.getDistributions();
}
private <T> void checkIfMetricResultIsUnique(String name,
Iterable<MetricResult<T>> metricResult)
@@ -133,14 +142,12 @@ private long getDistributionMetric(String name,
DistributionType distType, long
resultCount);
}
- /** Return the current value for a time counter, or -1 if can't be
retrieved. */
- private long getTimestampMetric(long now, long value) {
- // timestamp metrics are used to monitor time of execution of transforms.
- // If result timestamp metric is too far from now, consider that metric is
erroneous
-
- if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
- return -1;
- }
- return value;
+ /**
+ * timestamp metrics are used to monitor time of execution of transforms. If
result timestamp
+ * metric is too far from now, consider that metric is erroneous private
boolean isCredible(long
+ * value) {
+ */
+ private boolean isCredible(long value) {
+ return (Math.abs(value - now) <= Duration.standardDays(10000).getMillis());
}
}
diff --git
a/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/metrics/MetricsReaderTest.java
b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/metrics/MetricsReaderTest.java
index d0ad4a745246..e2fef254d130 100644
---
a/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/metrics/MetricsReaderTest.java
+++
b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/metrics/MetricsReaderTest.java
@@ -28,7 +28,10 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -51,39 +54,49 @@ public void testCounterMetricReceivedFromPipelineResult() {
MetricsReader reader = new MetricsReader(result, NAMESPACE);
- assertEquals(5, reader.getCounterMetric("counter", -1));
+ assertEquals(5, reader.getCounterMetric("counter"));
}
@Test
- public void testStartTimeIsTheMinimumOfTheDistribution() {
+ public void testStartTimeIsTheMinimumFromAllCollectedDistributions() {
List<Integer> sampleInputData = Arrays.asList(1, 2, 3, 4, 5);
- createTestPipeline(sampleInputData, new MonitorWithTimeDistribution());
- PipelineResult result = testPipeline.run();
-
- MetricsReader reader = new MetricsReader(result, NAMESPACE);
+ createTestPipelineWithBranches(sampleInputData);
- assertEquals(1, reader.getStartTimeMetric(0, "timeDist"));
+ PipelineResult result = testPipeline.run();
+ MetricsReader reader = new MetricsReader(result, NAMESPACE, 0);
+ assertEquals(1, reader.getStartTimeMetric("timeDist"));
}
@Test
- public void testEndTimeIsTheMaximumOfTheDistribution() {
+ public void testEndTimeIsTheMaximumOfAllCollectedDistributions() {
List<Integer> sampleInputData = Arrays.asList(1, 2, 3, 4, 5);
- createTestPipeline(sampleInputData, new MonitorWithTimeDistribution());
+ createTestPipelineWithBranches(sampleInputData);
PipelineResult result = testPipeline.run();
+ MetricsReader reader = new MetricsReader(result, NAMESPACE, 0);
+ assertEquals(10, reader.getEndTimeMetric("timeDist"));
+ }
- MetricsReader reader = new MetricsReader(result, NAMESPACE);
-
- assertEquals(5, reader.getEndTimeMetric(0, "timeDist"));
+ /**
+ * Branching pipelines ensure that multiple metric results of the same name
are created. Thanks to
+ * that it is possible to test if MetricsReader can collect metrics in such
case.
+ */
+ private void createTestPipelineWithBranches(List<Integer> sampleInputData) {
+ PCollection<Integer> inputData =
testPipeline.apply(Create.of(sampleInputData));
+ inputData.apply("Monitor #1", ParDo.of(new MonitorWithTimeDistribution()));
+
+ inputData
+ .apply("Multiply input", MapElements.via(new MultiplyElements()))
+ .apply("Monitor #2", ParDo.of(new MonitorWithTimeDistribution()));
}
@Test
public void doesntThrowIllegalStateExceptionWhenThereIsNoMetricFound() {
PipelineResult result = testPipeline.run();
MetricsReader reader = new MetricsReader(result, NAMESPACE);
- reader.getCounterMetric("nonexistent", -1);
+ reader.getCounterMetric("nonexistent");
}
@Test
@@ -93,10 +106,10 @@ public void
testTimeIsMinusOneIfTimeMetricIsTooFarFromNow() {
createTestPipeline(sampleInputData, new MonitorWithTimeDistribution());
PipelineResult result = testPipeline.run();
- MetricsReader reader = new MetricsReader(result, NAMESPACE);
+ MetricsReader reader = new MetricsReader(result, NAMESPACE, 900000000001L);
- assertEquals(-1, reader.getStartTimeMetric(900000000001L, "timeDist"));
- assertEquals(-1, reader.getEndTimeMetric(900000000001L, "timeDist"));
+ assertEquals(-1, reader.getStartTimeMetric("timeDist"));
+ assertEquals(-1, reader.getEndTimeMetric("timeDist"));
}
private void createTestPipeline(List<Integer> sampleInputData, DoFn<Integer,
Integer> monitor) {
@@ -122,4 +135,11 @@ public void processElement(ProcessContext c) {
timeDistribution.update(c.element().longValue());
}
}
+
+ private static class MultiplyElements extends SimpleFunction<Integer,
Integer> {
+ @Override
+ public Integer apply(Integer input) {
+ return input * 2;
+ }
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 170320)
Time Spent: 4h 10m (was: 4h)
> Runtime and total bytes metrics are not collected properly
> ----------------------------------------------------------
>
> Key: BEAM-6100
> URL: https://issues.apache.org/jira/browse/BEAM-6100
> Project: Beam
> Issue Type: Sub-task
> Components: testing
> Reporter: Lukasz Gajowy
> Assignee: Lukasz Gajowy
> Priority: Major
> Time Spent: 4h 10m
> Remaining Estimate: 0h
>
> Currently, we collect time (distribution) and bytes (counter) metrics from
> one ParDo (called MetricsMonitor) that is put in pipelines in one,
> arbitrarily chosen place (usually "in the middle" of pipeline's graph. In
> some cases, invalid time (or total bytes count) is registered.
> Taking [this|https://github.com/apache/beam/pull/6987#discussion_r231976671]
> discussion into account, ideally, we'd like to:
> - collect runtime by recording time at the root and sink(s) of the pipeline
> - collect total bytes in a separate ParDo that allows deciding what byte
> amount do we actually want to collect (now it's coupled to the
> time-collecting Monitor which is inconvenient).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)