mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870015838
##########
runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java:
##########
@@ -20,190 +20,136 @@
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.fail;
+import static org.joda.time.Duration.millis;
+import static org.junit.Assert.assertThrows;
import java.io.Serializable;
+import javax.annotation.Nullable;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Rule;
+import org.junit.ClassRule;
import org.junit.Test;
-import org.junit.rules.TestName;
/** This suite tests that various scenarios result in proper states of the
pipeline. */
public class SparkPipelineStateTest implements Serializable {
- private static class MyCustomException extends RuntimeException {
+ @ClassRule public static SparkContextRule contextRule = new
SparkContextRule();
+ private static class MyCustomException extends RuntimeException {
MyCustomException(final String message) {
super(message);
}
}
- private final transient SparkPipelineOptions options =
- PipelineOptionsFactory.as(SparkPipelineOptions.class);
-
- @Rule public transient TestName testName = new TestName();
-
- private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the
batch intentionally";
-
- private ParDo.SingleOutput<String, String> printParDo(final String prefix) {
- return ParDo.of(
- new DoFn<String, String>() {
-
- @ProcessElement
- public void processElement(final ProcessContext c) {
- System.out.println(prefix + " " + c.element());
- }
- });
- }
-
- private PTransform<PBegin, PCollection<String>> getValues(final
SparkPipelineOptions options) {
- final boolean doNotSyncWithWatermark = false;
- return options.isStreaming()
- ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1),
doNotSyncWithWatermark)
- .nextBatch("one", "two")
- : Create.of("one", "two");
+ private static class FailAlways extends SimpleFunction<String, String> {
+ @Override
+ public String apply(final String input) {
+ throw new MyCustomException(FAILED_THE_BATCH_INTENTIONALLY);
+ }
}
- private SparkPipelineOptions getStreamingOptions() {
- options.setRunner(SparkRunner.class);
- options.setStreaming(true);
- return options;
- }
+ private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the
batch intentionally";
- private SparkPipelineOptions getBatchOptions() {
+ private Pipeline createPipeline(
+ boolean isStreaming, @Nullable SimpleFunction<String, String> mapFun) {
+ SparkContextOptions options = contextRule.createPipelineOptions();
options.setRunner(SparkRunner.class);
- options.setStreaming(false); // explicit because options is reused
throughout the test.
- return options;
- }
-
- private Pipeline getPipeline(final SparkPipelineOptions options) {
-
- final Pipeline pipeline = Pipeline.create(options);
- final String name = testName.getMethodName() + "(isStreaming=" +
options.isStreaming() + ")";
+ options.setStreaming(isStreaming);
-
pipeline.apply(getValues(options)).setCoder(StringUtf8Coder.of()).apply(printParDo(name));
+ Pipeline pipeline = Pipeline.create(options);
+ PTransform<PBegin, PCollection<String>> values =
+ isStreaming
+ ? CreateStream.of(StringUtf8Coder.of(), millis(1),
false).nextBatch("one", "two")
+ : Create.of("one", "two");
+ PCollection<String> collection =
pipeline.apply(values).setCoder(StringUtf8Coder.of());
+ if (mapFun != null) {
+ collection.apply(MapElements.via(mapFun));
+ }
return pipeline;
}
- private void testFailedPipeline(final SparkPipelineOptions options) throws
Exception {
-
- SparkPipelineResult result = null;
-
- try {
- final Pipeline pipeline = Pipeline.create(options);
- pipeline
- .apply(getValues(options))
- .setCoder(StringUtf8Coder.of())
- .apply(
- MapElements.via(
- new SimpleFunction<String, String>() {
-
- @Override
- public String apply(final String input) {
- throw new
MyCustomException(FAILED_THE_BATCH_INTENTIONALLY);
- }
- }));
-
- result = (SparkPipelineResult) pipeline.run();
- result.waitUntilFinish();
- } catch (final Exception e) {
- assertThat(e, instanceOf(Pipeline.PipelineExecutionException.class));
- assertThat(e.getCause(), instanceOf(MyCustomException.class));
- assertThat(e.getCause().getMessage(),
is(FAILED_THE_BATCH_INTENTIONALLY));
- assertThat(result.getState(), is(PipelineResult.State.FAILED));
- result.cancel();
- return;
- }
+ private void testFailedPipeline(boolean isStreaming) throws Exception {
+ Pipeline pipeline = createPipeline(isStreaming, new FailAlways());
+ SparkPipelineResult result = (SparkPipelineResult) pipeline.run();
- fail("An injected failure did not affect the pipeline as expected.");
+ PipelineExecutionException e =
+ assertThrows(PipelineExecutionException.class, () ->
result.waitUntilFinish());
+ assertThat(e.getCause(), instanceOf(MyCustomException.class));
+ assertThat(e.getCause().getMessage(), is(FAILED_THE_BATCH_INTENTIONALLY));
+ assertThat(result.getState(), is(PipelineResult.State.FAILED));
+ result.cancel();
}
- private void testTimeoutPipeline(final SparkPipelineOptions options) throws
Exception {
-
- final Pipeline pipeline = getPipeline(options);
-
- final SparkPipelineResult result = (SparkPipelineResult) pipeline.run();
-
- result.waitUntilFinish(Duration.millis(1));
+ private void testTimeoutPipeline(boolean isStreaming) throws Exception {
Review Comment:
Agree, this is strange ... Added a comment and renamed the tests to make
clear what this does
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]