[BEAM-1763] Verify PAssert execution in runners which support metrics.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95ade45e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95ade45e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95ade45e Branch: refs/heads/master Commit: 95ade45eced4787eb67a9d4d13dae48ffb176919 Parents: 48c8ed1 Author: Aviem Zur <aviem...@gmail.com> Authored: Tue May 2 19:00:29 2017 +0300 Committer: Aviem Zur <aviem...@gmail.com> Committed: Thu May 4 20:48:56 2017 +0300 ---------------------------------------------------------------------- .../apache/beam/runners/flink/FlinkRunner.java | 3 ++ .../beam/runners/spark/TestSparkRunner.java | 47 -------------------- .../ResumeFromCheckpointStreamingTest.java | 12 +++-- .../beam/sdk/metrics/MetricsEnvironment.java | 5 +++ .../apache/beam/sdk/testing/TestPipeline.java | 46 ++++++++++++++++--- 5 files changed, 57 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index 181ffda..a5972ef 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -31,6 +31,7 @@ import java.util.SortedSet; import java.util.TreeSet; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; @@ -103,6 +104,8 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> { public PipelineResult run(Pipeline pipeline) { logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline); + MetricsEnvironment.setMetricsSupported(true); + LOG.info("Executing pipeline using FlinkRunner."); FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options); http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index 10e98b8..1e67813 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -40,15 +40,11 @@ import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; -import org.apache.beam.sdk.metrics.MetricNameFilter; -import org.apache.beam.sdk.metrics.MetricResult; -import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -116,8 +112,6 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { } SparkPipelineResult result = null; - int expectedNumberOfAssertions = PAssert.countAsserts(pipeline); - // clear state of Aggregators, Metrics and Watermarks if exists. AggregatorsAccumulator.clear(); MetricsAccumulator.clear(); @@ -137,47 +131,6 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { String.format("Finish state %s is not allowed.", finishState), finishState, isOneOf(PipelineResult.State.STOPPED, PipelineResult.State.DONE)); - - // validate assertion succeeded (at least once). - long successAssertions = 0; - Iterable<MetricResult<Long>> counterResults = result.metrics().queryMetrics( - MetricsFilter.builder() - .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER)) - .build()).counters(); - for (MetricResult<Long> counter : counterResults) { - if (counter.attempted().longValue() > 0) { - successAssertions++; - } - } - Integer expectedAssertions = testSparkPipelineOptions.getExpectedAssertions() != null - ? testSparkPipelineOptions.getExpectedAssertions() : expectedNumberOfAssertions; - assertThat( - String.format( - "Expected %d successful assertions, but found %d.", - expectedAssertions, successAssertions), - successAssertions, - is(expectedAssertions.longValue())); - // validate assertion didn't fail. - long failedAssertions = 0; - Iterable<MetricResult<Long>> failCounterResults = result.metrics().queryMetrics( - MetricsFilter.builder() - .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.FAILURE_COUNTER)) - .build()).counters(); - for (MetricResult<Long> counter : failCounterResults) { - if (counter.attempted().longValue() > 0) { - failedAssertions++; - } - } - assertThat( - String.format("Found %d failed assertions.", failedAssertions), - failedAssertions, - is(0L)); - - LOG.info( - String.format( - "Successfully asserted pipeline %s with %d successful assertions.", - testSparkPipelineOptions.getJobName(), - successAssertions)); } finally { try { // cleanup checkpoint dir. http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index 7d7fd08..33571f0 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -180,7 +180,8 @@ public class ResumeFromCheckpointStreamingTest { long successAssertions = 0; Iterable<MetricResult<Long>> counterResults = res.metrics().queryMetrics( MetricsFilter.builder() - .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER)) + .addNameFilter( + MetricNameFilter.named(PAssertWithoutFlatten.class, PAssert.SUCCESS_COUNTER)) .build()).counters(); for (MetricResult<Long> counter : counterResults) { if (counter.attempted().longValue() > 0) { @@ -196,7 +197,8 @@ public class ResumeFromCheckpointStreamingTest { long failedAssertions = 0; Iterable<MetricResult<Long>> failCounterResults = res.metrics().queryMetrics( MetricsFilter.builder() - .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.FAILURE_COUNTER)) + .addNameFilter(MetricNameFilter.named( + PAssertWithoutFlatten.class, PAssert.FAILURE_COUNTER)) .build()).counters(); for (MetricResult<Long> counter : failCounterResults) { if (counter.attempted().longValue() > 0) { @@ -330,8 +332,10 @@ public class ResumeFromCheckpointStreamingTest { } private static class AssertDoFn<T> extends DoFn<Iterable<T>, Void> { - private final Counter success = Metrics.counter(PAssert.class, PAssert.SUCCESS_COUNTER); - private final Counter failure = Metrics.counter(PAssert.class, PAssert.FAILURE_COUNTER); + private final Counter success = + Metrics.counter(PAssertWithoutFlatten.class, PAssert.SUCCESS_COUNTER); + private final Counter failure = + Metrics.counter(PAssertWithoutFlatten.class, PAssert.FAILURE_COUNTER); private final T[] expected; AssertDoFn(T[] expected) { http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java index 2942578..a4b311f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java @@ -70,6 +70,11 @@ public class MetricsEnvironment { METRICS_SUPPORTED.set(supported); } + /** Indicates whether metrics reporting is supported. */ + public static boolean isMetricsSupported() { + return METRICS_SUPPORTED.get(); + } + /** * Set the {@link MetricsContainer} for the current thread. * http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index 868dcbd..d8fe51d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.testing; import static com.google.common.base.Preconditions.checkState; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.TreeNode; @@ -41,6 +43,10 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; @@ -186,8 +192,8 @@ public class TestPipeline extends Pipeline implements TestRule { if (pipelineRunSucceeded() && !visitedAll(pipelineNodes)) { final boolean hasDanglingPAssert = FluentIterable.from(pipelineNodes) - .filter(Predicates.not(Predicates.in(runVisitedNodes))) - .anyMatch(isPAssertNode); + .filter(Predicates.not(Predicates.in(runVisitedNodes))) + .anyMatch(isPAssertNode); if (hasDanglingPAssert) { throw new AbandonedNodeException("The pipeline contains abandoned PAssert(s)."); } else { @@ -319,12 +325,13 @@ public class TestPipeline extends Pipeline implements TestRule { checkState( enforcement.isPresent(), "Is your TestPipeline declaration missing a @Rule annotation? Usage: " - + "@Rule public final transient TestPipeline pipeline = TestPipeline.create();"); + + "@Rule public final transient TestPipeline pipeline = TestPipeline.create();"); final PipelineResult pipelineResult; try { enforcement.get().beforePipelineExecution(); pipelineResult = super.run(); + verifyPAssertsSucceeded(pipelineResult); } catch (RuntimeException exc) { Throwable cause = exc.getCause(); if (cause instanceof AssertionError) { @@ -385,8 +392,8 @@ public class TestPipeline extends Pipeline implements TestRule { Strings.isNullOrEmpty(beamTestPipelineOptions) ? PipelineOptionsFactory.create() : PipelineOptionsFactory.fromArgs( - MAPPER.readValue(beamTestPipelineOptions, String[].class)) - .as(TestPipelineOptions.class); + MAPPER.readValue(beamTestPipelineOptions, String[].class)) + .as(TestPipelineOptions.class); options.as(ApplicationNameOptions.class).setAppName(getAppName()); // If no options were specified, set some reasonable defaults @@ -488,6 +495,35 @@ public class TestPipeline extends Pipeline implements TestRule { return firstInstanceAfterTestPipeline; } + /** + * Verifies all {{@link PAssert PAsserts}} in the pipeline have been executed and were successful. + * + * <p>Note this only runs for runners which support Metrics. Runners which do not should verify + * this in some other way. See: https://issues.apache.org/jira/browse/BEAM-2001</p> + */ + private void verifyPAssertsSucceeded(PipelineResult pipelineResult) { + if (MetricsEnvironment.isMetricsSupported()) { + long expectedNumberOfAssertions = (long) PAssert.countAsserts(this); + + long successfulAssertions = 0; + Iterable<MetricResult<Long>> successCounterResults = + pipelineResult.metrics().queryMetrics( + MetricsFilter.builder() + .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER)) + .build()) + .counters(); + for (MetricResult<Long> counter : successCounterResults) { + if (counter.attempted() > 0) { + successfulAssertions++; + } + } + + assertThat(String + .format("Expected %d successful assertions, but found %d.", expectedNumberOfAssertions, + successfulAssertions), successfulAssertions, is(expectedNumberOfAssertions)); + } + } + private static class IsEmptyVisitor extends PipelineVisitor.Defaults { private boolean empty = true;