Repository: incubator-beam Updated Branches: refs/heads/master ef9871c36 -> 6893a7270
[BEAM-1000, BEAM-1050] Fixed PipelineResult.State Failed for streaming, support non-blocking cancel/waituntilfinish in batch. Added a SparkPipelineResult class to address PipelineResult#waitUntilFinish() and SparkRunner#run() semantics. * Simplified beamExceptionFrom() to abstract away SparkExceptions. * Reordered methods according to access level. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1a67934 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1a67934 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1a67934 Branch: refs/heads/master Commit: b1a67934d1496e221718599301635c38f8e3b7ec Parents: ef9871c Author: Stas Levin <[email protected]> Authored: Mon Nov 28 11:11:10 2016 +0200 Committer: Sela <[email protected]> Committed: Mon Dec 5 12:56:39 2016 +0200 ---------------------------------------------------------------------- .../beam/runners/spark/EvaluationResult.java | 67 ------ .../beam/runners/spark/SparkPipelineResult.java | 179 +++++++++++++++ .../apache/beam/runners/spark/SparkRunner.java | 98 +++++---- .../beam/runners/spark/TestSparkRunner.java | 10 +- .../beam/runners/spark/examples/WordCount.java | 2 +- .../spark/translation/EvaluationContext.java | 119 ++-------- .../spark/translation/SparkContextFactory.java | 2 +- .../runners/spark/ProvidedSparkContextTest.java | 6 +- .../runners/spark/SparkPipelineStateTest.java | 219 +++++++++++++++++++ .../metrics/sink/ClearAggregatorsRule.java | 2 +- .../metrics/sink/NamedAggregatorsTest.java | 2 +- .../beam/runners/spark/io/AvroPipelineTest.java | 2 +- .../beam/runners/spark/io/NumShardsTest.java | 2 +- .../io/hadoop/HadoopFileFormatPipelineTest.java | 2 +- .../spark/translation/SideEffectsTest.java | 59 ----- .../streaming/EmptyStreamAssertionTest.java | 4 + .../ResumeFromCheckpointStreamingTest.java | 8 +- .../streaming/utils/PAssertStreaming.java | 9 +- 18 files changed, 500 insertions(+), 292 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java deleted file mode 100644 index 52606a3..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PValue; - -/** - * Interface for retrieving the result(s) of running a pipeline. Allows us to translate between - * {@code PObject<T>}s or {@code PCollection<T>}s and Ts or collections of Ts. - */ -public interface EvaluationResult extends PipelineResult { - /** - * Retrieves an iterable of results associated with the PCollection passed in. - * - * @param pcollection Collection we wish to translate. - * @param <T> Type of elements contained in collection. - * @return Natively types result associated with collection. - */ - <T> Iterable<T> get(PCollection<T> pcollection); - - /** - * Retrieve an object of Type T associated with the PValue passed in. - * - * @param pval PValue to retrieve associated data for. - * @param <T> Type of object to return. - * @return Native object. - */ - <T> T get(PValue pval); - - /** - * Retrieves the final value of the aggregator. - * - * @param aggName name of aggregator. - * @param resultType Class of final result of aggregation. - * @param <T> Type of final result of aggregation. - * @return Result of aggregation associated with specified name. - */ - <T> T getAggregatorValue(String aggName, Class<T> resultType); - - /** - * Releases any runtime resources, including distributed-execution contexts currently held by - * this EvaluationResult; once close() has been called, - * {@link EvaluationResult#get(PCollection)} might - * not work for subsequent calls. - * - * @param gracefully true if Spark should finish all ongoing work before closing. - */ - void close(boolean gracefully); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java new file mode 100644 index 0000000..ec0610c --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.beam.runners.spark.translation.EvaluationContext; +import org.apache.beam.runners.spark.translation.SparkContextFactory; +import org.apache.beam.sdk.AggregatorRetrievalException; +import org.apache.beam.sdk.AggregatorValues; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.spark.SparkException; +import org.joda.time.Duration; + +/** + * Represents a Spark pipeline execution result. + */ +public abstract class SparkPipelineResult implements PipelineResult { + + protected final Future pipelineExecution; + protected final EvaluationContext context; + + protected PipelineResult.State state; + + SparkPipelineResult(final Future<?> pipelineExecution, + final EvaluationContext evaluationContext) { + this.pipelineExecution = pipelineExecution; + this.context = evaluationContext; + // pipelineExecution is expected to have started executing eagerly. + state = State.RUNNING; + } + + private RuntimeException runtimeExceptionFrom(Throwable e) { + return (e instanceof RuntimeException) ? (RuntimeException) e : new RuntimeException(e); + } + + private RuntimeException beamExceptionFrom(Throwable e) { + // Scala doesn't declare checked exceptions in the bytecode, and the Java compiler + // won't let you catch something that is not declared, so we can't catch + // SparkException directly, instead we do an instanceof check. + return (e instanceof SparkException) + ? new Pipeline.PipelineExecutionException(e.getCause() != null ? e.getCause() : e) + : runtimeExceptionFrom(e); + } + + protected abstract void stop(); + + protected abstract State awaitTermination(Duration duration) + throws TimeoutException, ExecutionException, InterruptedException; + + public <T> T getAggregatorValue(String named, Class<T> resultType) { + return context.getAggregatorValue(named, resultType); + } + + @Override + public PipelineResult.State getState() { + return state; + } + + @Override + public PipelineResult.State waitUntilFinish() { + return waitUntilFinish(Duration.millis(Long.MAX_VALUE)); + } + + @Override + public State waitUntilFinish(Duration duration) { + try { + state = awaitTermination(duration); + } catch (TimeoutException e) { + state = null; + } catch (ExecutionException e) { + state = PipelineResult.State.FAILED; + throw beamExceptionFrom(e.getCause()); + } catch (Exception e) { + state = PipelineResult.State.FAILED; + throw beamExceptionFrom(e); + } finally { + stop(); + } + + return state; + } + + @Override + public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) + throws AggregatorRetrievalException { + return context.getAggregatorValues(aggregator); + } + + @Override + public MetricResults metrics() { + throw new UnsupportedOperationException("The SparkRunner does not currently support metrics."); + } + + @Override + public PipelineResult.State cancel() throws IOException { + if (state != null && !state.isTerminal()) { + stop(); + state = PipelineResult.State.CANCELLED; + } + + return state; + } + + /** + * Represents the result of running a batch pipeline. + */ + static class BatchMode extends SparkPipelineResult { + + BatchMode(final Future<?> pipelineExecution, + final EvaluationContext evaluationContext) { + super(pipelineExecution, evaluationContext); + } + + @Override + protected void stop() { + SparkContextFactory.stopSparkContext(context.getSparkContext()); + } + + @Override + protected State awaitTermination(Duration duration) + throws TimeoutException, ExecutionException, InterruptedException { + pipelineExecution.get(duration.getMillis(), TimeUnit.MILLISECONDS); + return PipelineResult.State.DONE; + } + } + + /** + * Represents a streaming Spark pipeline result. + */ + static class StreamingMode extends SparkPipelineResult { + + StreamingMode(final Future<?> pipelineExecution, + final EvaluationContext evaluationContext) { + super(pipelineExecution, evaluationContext); + } + + @Override + protected void stop() { + context.getStreamingContext().stop(false, true); + SparkContextFactory.stopSparkContext(context.getSparkContext()); + } + + @Override + protected State awaitTermination(Duration duration) throws TimeoutException, + ExecutionException, InterruptedException { + pipelineExecution.get(duration.getMillis(), TimeUnit.MILLISECONDS); + if (context.getStreamingContext().awaitTerminationOrTimeout(duration.getMillis())) { + return State.DONE; + } else { + return null; + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 63f77c0..a8c600e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -20,6 +20,9 @@ package org.apache.beam.runners.spark; import java.util.Collection; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; @@ -36,14 +39,12 @@ import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; -import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.slf4j.Logger; @@ -58,7 +59,7 @@ import org.slf4j.LoggerFactory; * * {@code * Pipeline p = [logic for pipeline creation] - * EvaluationResult result = (EvaluationResult) p.run(); + * SparkPipelineResult result = (SparkPipelineResult) p.run(); * } * * <p>To create a pipeline runner to run against a different spark cluster, with a custom master url @@ -68,10 +69,10 @@ import org.slf4j.LoggerFactory; * Pipeline p = [logic for pipeline creation] * SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); * options.setSparkMaster("spark://host:port"); - * EvaluationResult result = (EvaluationResult) p.run(); + * SparkPipelineResult result = (SparkPipelineResult) p.run(); * } */ -public final class SparkRunner extends PipelineRunner<EvaluationResult> { +public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class); /** @@ -122,50 +123,57 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> { } @Override - public EvaluationResult run(Pipeline pipeline) { - try { - LOG.info("Executing pipeline using the SparkRunner."); - - detectTranslationMode(pipeline); - if (mOptions.isStreaming()) { - SparkRunnerStreamingContextFactory contextFactory = - new SparkRunnerStreamingContextFactory(pipeline, mOptions); - JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(mOptions.getCheckpointDir(), - contextFactory); - - LOG.info("Starting streaming pipeline execution."); - jssc.start(); - - // if recovering from checkpoint, we have to reconstruct the EvaluationResult instance. - return contextFactory.getCtxt() == null ? new EvaluationContext(jssc.sparkContext(), - pipeline, jssc) : contextFactory.getCtxt(); - } else { - JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions); - EvaluationContext ctxt = new EvaluationContext(jsc, pipeline); - SparkPipelineTranslator translator = new TransformTranslator.Translator(); - pipeline.traverseTopologically(new Evaluator(translator, ctxt)); - ctxt.computeOutputs(); + public SparkPipelineResult run(final Pipeline pipeline) { + LOG.info("Executing pipeline using the SparkRunner."); + + final SparkPipelineResult result; + final EvaluationContext evaluationContext; + final Future<?> startPipeline; + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + + detectTranslationMode(pipeline); + + if (mOptions.isStreaming()) { + final SparkRunnerStreamingContextFactory contextFactory = + new SparkRunnerStreamingContextFactory(pipeline, mOptions); + final JavaStreamingContext jssc = + JavaStreamingContext.getOrCreate(mOptions.getCheckpointDir(), contextFactory); + + // if recovering from checkpoint, we have to reconstruct the Evaluation instance. + evaluationContext = + contextFactory.getCtxt() == null + ? new EvaluationContext(jssc.sparkContext(), pipeline, jssc) + : contextFactory.getCtxt(); + + startPipeline = executorService.submit(new Runnable() { + + @Override + public void run() { + LOG.info("Starting streaming pipeline execution."); + jssc.start(); + } + }); - LOG.info("Pipeline execution complete."); + result = new SparkPipelineResult.StreamingMode(startPipeline, evaluationContext); + } else { + final JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions); + evaluationContext = new EvaluationContext(jsc, pipeline); - return ctxt; - } - } catch (Exception e) { - // Scala doesn't declare checked exceptions in the bytecode, and the Java compiler - // won't let you catch something that is not declared, so we can't catch - // SparkException here. Instead we do an instanceof check. - // Then we find the cause by seeing if it's a user exception (wrapped by Beam's - // UserCodeException), or just use the SparkException cause. - if (e instanceof SparkException && e.getCause() != null) { - if (e.getCause() instanceof UserCodeException && e.getCause().getCause() != null) { - throw UserCodeException.wrap(e.getCause().getCause()); - } else { - throw new RuntimeException(e.getCause()); + startPipeline = executorService.submit(new Runnable() { + + @Override + public void run() { + pipeline.traverseTopologically(new Evaluator(new TransformTranslator.Translator(), + evaluationContext)); + evaluationContext.computeOutputs(); + LOG.info("Batch pipeline execution complete."); } - } - // otherwise just wrap in a RuntimeException - throw new RuntimeException(e); + }); + + result = new SparkPipelineResult.BatchMode(startPipeline, evaluationContext); } + + return result; } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index a4ddca0..9a67f9c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -37,7 +37,7 @@ import org.apache.beam.sdk.values.POutput; * * {@code * Pipeline p = [logic for pipeline creation] - * EvaluationResult result = (EvaluationResult) p.run(); + * SparkPipelineResult result = (SparkPipelineResult) p.run(); * } * * <p>To create a pipeline runner to run against a different spark cluster, with a custom master url @@ -47,10 +47,10 @@ import org.apache.beam.sdk.values.POutput; * Pipeline p = [logic for pipeline creation] * SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); * options.setSparkMaster("spark://host:port"); - * EvaluationResult result = (EvaluationResult) p.run(); + * SparkPipelineResult result = (SparkPipelineResult) p.run(); * } */ -public final class TestSparkRunner extends PipelineRunner<EvaluationResult> { +public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { private SparkRunner delegate; @@ -72,9 +72,9 @@ public final class TestSparkRunner extends PipelineRunner<EvaluationResult> { }; @Override - public EvaluationResult run(Pipeline pipeline) { + public SparkPipelineResult run(Pipeline pipeline) { TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class); - EvaluationResult result = delegate.run(pipeline); + SparkPipelineResult result = delegate.run(pipeline); assertThat(result, testPipelineOptions.getOnCreateMatcher()); assertThat(result, testPipelineOptions.getOnSuccessMatcher()); return result; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java index 0ae78f2..38dae38 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java @@ -132,6 +132,6 @@ public class WordCount { .apply(MapElements.via(new FormatAsTextFn())) .apply("WriteCounts", TextIO.Write.to(options.getOutput())); - p.run(); + p.run().waitUntilFinish(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index ae45609..425f114 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -21,12 +21,10 @@ package org.apache.beam.runners.spark.translation; import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.Iterables; -import java.io.IOException; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; -import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset; @@ -34,7 +32,6 @@ import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; @@ -46,15 +43,13 @@ import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.streaming.StreamingContextState; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.joda.time.Duration; - /** - * Evaluation context allows us to define how pipeline instructions. + * The EvaluationContext allows us to define pipeline instructions and translate between + * {@code PObject<T>}s or {@code PCollection<T>}s and Ts or DStreams/RDDs of Ts. */ -public class EvaluationContext implements EvaluationResult { +public class EvaluationContext { private final JavaSparkContext jsc; private JavaStreamingContext jssc; private final SparkRuntimeContext runtime; @@ -66,24 +61,20 @@ public class EvaluationContext implements EvaluationResult { private final Map<PValue, Object> pobjects = new LinkedHashMap<>(); private final Map<PValue, Iterable<? extends WindowedValue<?>>> pview = new LinkedHashMap<>(); private AppliedPTransform<?, ?, ?> currentTransform; - private State state; public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) { this.jsc = jsc; this.pipeline = pipeline; this.runtime = new SparkRuntimeContext(pipeline, jsc); - // A batch pipeline is blocking by nature - this.state = State.DONE; } public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline, JavaStreamingContext jssc) { this(jsc, pipeline); this.jssc = jssc; - this.state = State.RUNNING; } - JavaSparkContext getSparkContext() { + public JavaSparkContext getSparkContext() { return jsc; } @@ -179,8 +170,14 @@ public class EvaluationContext implements EvaluationResult { } } + /** + * Retrieve an object of Type T associated with the PValue passed in. + * + * @param value PValue to retrieve associated data for. + * @param <T> Type of object to return. + * @return Native object. + */ @SuppressWarnings("unchecked") - @Override public <T> T get(PValue value) { if (pobjects.containsKey(value)) { T result = (T) pobjects.get(value); @@ -195,23 +192,24 @@ public class EvaluationContext implements EvaluationResult { throw new IllegalStateException("Cannot resolve un-known PObject: " + value); } - @Override - public <T> T getAggregatorValue(String named, Class<T> resultType) { - return runtime.getAggregatorValue(AccumulatorSingleton.getInstance(jsc), named, resultType); - } - - @Override public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) throws AggregatorRetrievalException { return runtime.getAggregatorValues(AccumulatorSingleton.getInstance(jsc), aggregator); } - @Override - public MetricResults metrics() { - throw new UnsupportedOperationException("The SparkRunner does not currently support metrics."); + public <T> T getAggregatorValue(String named, Class<T> resultType) { + return runtime.getAggregatorValue(AccumulatorSingleton.getInstance(jsc), + named, + resultType); } - @Override + /** + * Retrieves an iterable of results associated with the PCollection passed in. + * + * @param pcollection Collection we wish to translate. + * @param <T> Type of elements contained in collection. + * @return Natively types result associated with collection. + */ public <T> Iterable<T> get(PCollection<T> pcollection) { @SuppressWarnings("unchecked") BoundedDataset<T> boundedDataset = (BoundedDataset<T>) datasets.get(pcollection); @@ -225,79 +223,6 @@ public class EvaluationContext implements EvaluationResult { return boundedDataset.getValues(pcollection); } - @Override - public void close(boolean gracefully) { - // Stopping streaming job if running - if (isStreamingPipeline() && !state.isTerminal()) { - try { - cancel(gracefully); - } catch (IOException e) { - throw new RuntimeException("Failed to cancel streaming job", e); - } - } - SparkContextFactory.stopSparkContext(jsc); - } - - @Override - public State getState() { - return state; - } - - @Override - public State cancel() throws IOException { - return cancel(true); - } - - private State cancel(boolean gracefully) throws IOException { - if (isStreamingPipeline()) { - if (!state.isTerminal()) { - jssc.stop(false, gracefully); - state = State.CANCELLED; - } - return state; - } else { - // Batch is currently blocking so - // there is no way to cancel a batch job - // will be handled at BEAM-1000 - throw new UnsupportedOperationException( - "Spark runner EvaluationContext does not support cancel."); - } - } - - @Override - public State waitUntilFinish() { - return waitUntilFinish(Duration.ZERO); - } - - @Override - public State waitUntilFinish(Duration duration) { - if (isStreamingPipeline()) { - // According to PipelineResult: Provide a value less than 1 ms for an infinite wait - if (duration.getMillis() < 1L) { - jssc.awaitTermination(); - state = State.DONE; - } else { - jssc.awaitTermination(duration.getMillis()); - // According to PipelineResult: The final state of the pipeline or null on timeout - if (jssc.getState().equals(StreamingContextState.STOPPED)) { - state = State.DONE; - } else { - return null; - } - } - return state; - } else { - // This is no-op, since Spark runner in batch is blocking. - // It needs to be updated once SparkRunner supports non-blocking execution: - // https://issues.apache.org/jira/browse/BEAM-595 - return State.DONE; - } - } - - private boolean isStreamingPipeline() { - return jssc != null; - } - private String storageLevel() { return runtime.getPipelineOptions().as(SparkPipelineOptions.class).getStorageLevel(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java index c7f90b4..67839a8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java @@ -66,7 +66,7 @@ public final class SparkContextFactory { } } - static synchronized void stopSparkContext(JavaSparkContext context) { + public static synchronized void stopSparkContext(JavaSparkContext context) { if (!Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) { context.stop(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java index fe73aba..2982844 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java @@ -76,7 +76,7 @@ public class ProvidedSparkContextTest { PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); // Run test from pipeline - p.run(); + p.run().waitUntilFinish(); jsc.stop(); } @@ -100,7 +100,7 @@ public class ProvidedSparkContextTest { PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); try { - p.run(); + p.run().waitUntilFinish(); fail("Should throw an exception when The provided Spark context is null"); } catch (RuntimeException e){ assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION)); @@ -128,7 +128,7 @@ public class ProvidedSparkContextTest { PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); try { - p.run(); + p.run().waitUntilFinish(); fail("Should throw an exception when The provided Spark context is stopped"); } catch (RuntimeException e){ assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java new file mode 100644 index 0000000..69cf1c4 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import com.google.common.collect.Lists; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.runners.spark.io.CreateStream; +import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +/** + * This suite tests that various scenarios result in proper states of the pipeline. + */ +public class SparkPipelineStateTest implements Serializable { + + private static class UserException extends RuntimeException { + + UserException(String message) { + super(message); + } + } + + @Rule + public transient SparkTestPipelineOptions commonOptions = new SparkTestPipelineOptions(); + + @Rule + public transient TestName testName = new TestName(); + + private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the batch intentionally"; + + private static final List<String> BATCH_WORDS = Arrays.asList("one", "two"); + + private static final List<Iterable<String>> STREAMING_WORDS = + Lists.<Iterable<String>>newArrayList(BATCH_WORDS); + + private ParDo.Bound<String, String> printParDo(final String prefix) { + return ParDo.of(new DoFn<String, String>() { + + @ProcessElement + public void processElement(ProcessContext c) { + System.out.println(prefix + " " + c.element()); + } + }); + } + + private PTransform<PBegin, PCollection<String>> getValues(SparkPipelineOptions options) { + return options.isStreaming() + ? CreateStream.fromQueue(STREAMING_WORDS) + : Create.of(BATCH_WORDS); + } + + private SparkPipelineOptions getStreamingOptions() { + final SparkPipelineOptions options = commonOptions.getOptions(); + options.setStreaming(true); + return options; + } + + private SparkPipelineOptions getBatchOptions() { + return commonOptions.getOptions(); + } + + private Pipeline getPipeline(SparkPipelineOptions options) { + + final Pipeline pipeline = Pipeline.create(options); + final String name = testName.getMethodName() + "(isStreaming=" + options.isStreaming() + ")"; + + pipeline + .apply(getValues(options)).setCoder(StringUtf8Coder.of()) + .apply(printParDo(name)); + + return pipeline; + } + + private void testFailedPipeline(SparkPipelineOptions options) throws Exception { + + SparkPipelineResult result = null; + + try { + final Pipeline pipeline = Pipeline.create(options); + pipeline + .apply(getValues(options)).setCoder(StringUtf8Coder.of()) + .apply(MapElements.via(new SimpleFunction<String, String>() { + + @Override + public String apply(String input) { + throw new UserException(FAILED_THE_BATCH_INTENTIONALLY); + } + })); + + result = (SparkPipelineResult) pipeline.run(); + result.waitUntilFinish(); + } catch (Exception e) { + assertThat(e, instanceOf(Pipeline.PipelineExecutionException.class)); + assertThat(e.getCause(), instanceOf(UserCodeException.class)); + assertThat(e.getCause().getCause(), instanceOf(UserException.class)); + assertThat(e.getCause().getCause().getMessage(), is(FAILED_THE_BATCH_INTENTIONALLY)); + assertThat(result.getState(), is(PipelineResult.State.FAILED)); + result.cancel(); + return; + } + + fail("An injected failure did not affect the pipeline as expected."); + } + + private void testTimeoutPipeline(SparkPipelineOptions options) throws Exception { + + final Pipeline pipeline = getPipeline(options); + + SparkPipelineResult result = (SparkPipelineResult) pipeline.run(); + + result.waitUntilFinish(Duration.millis(1)); + + assertThat(result.getState(), nullValue()); + + result.cancel(); + } + + private void testCanceledPipeline(SparkPipelineOptions options) throws Exception { + + final Pipeline pipeline = getPipeline(options); + + SparkPipelineResult result = (SparkPipelineResult) pipeline.run(); + + result.cancel(); + + assertThat(result.getState(), is(PipelineResult.State.CANCELLED)); + } + + private void testRunningPipeline(SparkPipelineOptions options) throws Exception { + + final Pipeline pipeline = getPipeline(options); + + SparkPipelineResult result = (SparkPipelineResult) pipeline.run(); + + assertThat(result.getState(), is(PipelineResult.State.RUNNING)); + + result.cancel(); + } + + @Test + public void testStreamingPipelineRunningState() throws Exception { + testRunningPipeline(getStreamingOptions()); + } + + @Test + public void testBatchPipelineRunningState() throws Exception { + testRunningPipeline(getBatchOptions()); + } + + @Test + public void testStreamingPipelineCanceledState() throws Exception { + testCanceledPipeline(getStreamingOptions()); + } + + @Test + public void testBatchPipelineCanceledState() throws Exception { + testCanceledPipeline(getBatchOptions()); + } + + @Test + public void testStreamingPipelineFailedState() throws Exception { + testFailedPipeline(getStreamingOptions()); + } + + @Test + public void testBatchPipelineFailedState() throws Exception { + testFailedPipeline(getBatchOptions()); + } + + @Test + public void testStreamingPipelineTimeoutState() throws Exception { + testTimeoutPipeline(getStreamingOptions()); + } + + @Test + public void testBatchPipelineTimeoutState() throws Exception { + testTimeoutPipeline(getBatchOptions()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java index 79c58a7..52ae019 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java @@ -25,7 +25,7 @@ import org.junit.rules.ExternalResource; * A rule that clears the {@link org.apache.beam.runners.spark.aggregators.AccumulatorSingleton} * which represents the Beam {@link org.apache.beam.sdk.transforms.Aggregator}s. */ -class ClearAggregatorsRule extends ExternalResource { +public class ClearAggregatorsRule extends ExternalResource { @Override protected void before() throws Throwable { AccumulatorSingleton.clear(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java index c16574c..6b36bcc 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java @@ -78,7 +78,7 @@ public class NamedAggregatorsTest { PAssert.that(output).containsInAnyOrder(expectedCounts); - pipeline.run(); + pipeline.run().waitUntilFinish(); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index 03f96d5..c5bb583 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -76,7 +76,7 @@ public class AvroPipelineTest { PCollection<GenericRecord> input = p.apply( AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema)); input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema)); - p.run(); + p.run().waitUntilFinish(); List<GenericRecord> records = readGenericFile(); assertEquals(Lists.newArrayList(savedRecord), records); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java index 4e5435f..34d6818 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java @@ -74,7 +74,7 @@ public class NumShardsTest { PCollection<String> output = inputWords.apply(new WordCount.CountWords()) .apply(MapElements.via(new WordCount.FormatAsTextFn())); output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt")); - p.run(); + p.run().waitUntilFinish(); int count = 0; Set<String> expected = Sets.newHashSet("hi: 5", "there: 1", "sue: 2", "bob: 2"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java index b68e8d4..9efc670 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java @@ -88,7 +88,7 @@ public class HadoopFileFormatPipelineTest { HadoopIO.Write.Bound<IntWritable, Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(), outputFormatClass, IntWritable.class, Text.class); input.apply(write.withoutSharding()); - p.run(); + p.run().waitUntilFinish(); IntWritable key = new IntWritable(); Text value = new Text(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java deleted file mode 100644 index 3b79d03..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.translation; - -import static org.hamcrest.core.Is.isA; - -import java.io.Serializable; -import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -/** - * Side effects test. - */ -public class SideEffectsTest implements Serializable { - private static class UserException extends RuntimeException { - } - - @Rule - public final transient SparkTestPipelineOptions pipelineOptions = new SparkTestPipelineOptions(); - @Rule - public final transient ExpectedException expectedException = ExpectedException.none(); - - @Test - public void test() throws Exception { - Pipeline p = Pipeline.create(pipelineOptions.getOptions()); - - p.apply(Create.of("a")).apply(ParDo.of(new DoFn<String, String>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - throw new UserException(); - } - })); - - expectedException.expectCause(isA(UserException.class)); - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java index 656107a..e3561d6 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import java.io.Serializable; import java.util.Collections; import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.aggregators.metrics.sink.ClearAggregatorsRule; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming; @@ -54,6 +55,9 @@ public class EmptyStreamAssertionTest implements Serializable { public SparkTestPipelineOptionsForStreaming commonOptions = new SparkTestPipelineOptionsForStreaming(); + @Rule + public ClearAggregatorsRule clearAggregatorsRule = new ClearAggregatorsRule(); + @Test public void testAssertion() throws Exception { SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index b57787f..e0d71d4 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -27,8 +27,8 @@ import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; -import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.SparkPipelineResult; import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster; import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; @@ -118,7 +118,7 @@ public class ResumeFromCheckpointStreamingTest { options.setCheckpointDurationMillis(options.getBatchIntervalMillis()); // first run will read from Kafka backlog - "auto.offset.reset=smallest" - EvaluationResult res = run(options); + SparkPipelineResult res = run(options); long processedMessages1 = res.getAggregatorValue("processedMessages", Long.class); assertThat(String.format("Expected %d processed messages count but " + "found %d", EXPECTED_AGG_FIRST, processedMessages1), processedMessages1, @@ -132,14 +132,14 @@ public class ResumeFromCheckpointStreamingTest { equalTo(EXPECTED_AGG_FIRST)); } - private static EvaluationResult runAgain(SparkPipelineOptions options) { + private static SparkPipelineResult runAgain(SparkPipelineOptions options) { AccumulatorSingleton.clear(); // sleep before next run. Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); return run(options); } - private static EvaluationResult run(SparkPipelineOptions options) { + private static SparkPipelineResult run(SparkPipelineOptions options) { // write to Kafka produce(); Map<String, Object> consumerProps = ImmutableMap.<String, Object>of( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java index 23aca43..471ec92 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java @@ -23,7 +23,7 @@ import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; import java.io.Serializable; -import org.apache.beam.runners.spark.EvaluationResult; +import org.apache.beam.runners.spark.SparkPipelineResult; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; @@ -55,7 +55,7 @@ public final class PAssertStreaming implements Serializable { * Note that it is oblivious to windowing, so the assertion will apply indiscriminately to all * windows. */ - public static <T> EvaluationResult runAndAssertContents(Pipeline p, + public static <T> SparkPipelineResult runAndAssertContents(Pipeline p, PCollection<T> actual, T[] expected, Duration timeout, @@ -69,9 +69,8 @@ public final class PAssertStreaming implements Serializable { .apply(ParDo.of(new AssertDoFn<>(expected))); // run the pipeline. - EvaluationResult res = (EvaluationResult) p.run(); + SparkPipelineResult res = (SparkPipelineResult) p.run(); res.waitUntilFinish(timeout); - res.close(stopGracefully); // validate assertion succeeded (at least once). int success = res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class); Assert.assertThat("Success aggregator should be greater than zero.", success, not(0)); @@ -87,7 +86,7 @@ public final class PAssertStreaming implements Serializable { * Default to stop gracefully so that tests will finish processing even if slower for reasons * such as a slow runtime environment. */ - public static <T> EvaluationResult runAndAssertContents(Pipeline p, + public static <T> SparkPipelineResult runAndAssertContents(Pipeline p, PCollection<T> actual, T[] expected, Duration timeout) {
