[
https://issues.apache.org/jira/browse/BEAM-14334?focusedWorklogId=768893&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-768893
]
ASF GitHub Bot logged work on BEAM-14334:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 11/May/22 08:26
Start Date: 11/May/22 08:26
Worklog Time Spent: 10m
Work Description: mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870015838
##########
runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java:
##########
@@ -20,190 +20,136 @@
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.fail;
+import static org.joda.time.Duration.millis;
+import static org.junit.Assert.assertThrows;
import java.io.Serializable;
+import javax.annotation.Nullable;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Rule;
+import org.junit.ClassRule;
import org.junit.Test;
-import org.junit.rules.TestName;
/** This suite tests that various scenarios result in proper states of the
pipeline. */
public class SparkPipelineStateTest implements Serializable {
- private static class MyCustomException extends RuntimeException {
+ @ClassRule public static SparkContextRule contextRule = new
SparkContextRule();
+ private static class MyCustomException extends RuntimeException {
MyCustomException(final String message) {
super(message);
}
}
- private final transient SparkPipelineOptions options =
- PipelineOptionsFactory.as(SparkPipelineOptions.class);
-
- @Rule public transient TestName testName = new TestName();
-
- private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the
batch intentionally";
-
- private ParDo.SingleOutput<String, String> printParDo(final String prefix) {
- return ParDo.of(
- new DoFn<String, String>() {
-
- @ProcessElement
- public void processElement(final ProcessContext c) {
- System.out.println(prefix + " " + c.element());
- }
- });
- }
-
- private PTransform<PBegin, PCollection<String>> getValues(final
SparkPipelineOptions options) {
- final boolean doNotSyncWithWatermark = false;
- return options.isStreaming()
- ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1),
doNotSyncWithWatermark)
- .nextBatch("one", "two")
- : Create.of("one", "two");
+ private static class FailAlways extends SimpleFunction<String, String> {
+ @Override
+ public String apply(final String input) {
+ throw new MyCustomException(FAILED_THE_BATCH_INTENTIONALLY);
+ }
}
- private SparkPipelineOptions getStreamingOptions() {
- options.setRunner(SparkRunner.class);
- options.setStreaming(true);
- return options;
- }
+ private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the
batch intentionally";
- private SparkPipelineOptions getBatchOptions() {
+ private Pipeline createPipeline(
+ boolean isStreaming, @Nullable SimpleFunction<String, String> mapFun) {
+ SparkContextOptions options = contextRule.createPipelineOptions();
options.setRunner(SparkRunner.class);
- options.setStreaming(false); // explicit because options is reused
throughout the test.
- return options;
- }
-
- private Pipeline getPipeline(final SparkPipelineOptions options) {
-
- final Pipeline pipeline = Pipeline.create(options);
- final String name = testName.getMethodName() + "(isStreaming=" +
options.isStreaming() + ")";
+ options.setStreaming(isStreaming);
-
pipeline.apply(getValues(options)).setCoder(StringUtf8Coder.of()).apply(printParDo(name));
+ Pipeline pipeline = Pipeline.create(options);
+ PTransform<PBegin, PCollection<String>> values =
+ isStreaming
+ ? CreateStream.of(StringUtf8Coder.of(), millis(1),
false).nextBatch("one", "two")
+ : Create.of("one", "two");
+ PCollection<String> collection =
pipeline.apply(values).setCoder(StringUtf8Coder.of());
+ if (mapFun != null) {
+ collection.apply(MapElements.via(mapFun));
+ }
return pipeline;
}
- private void testFailedPipeline(final SparkPipelineOptions options) throws
Exception {
-
- SparkPipelineResult result = null;
-
- try {
- final Pipeline pipeline = Pipeline.create(options);
- pipeline
- .apply(getValues(options))
- .setCoder(StringUtf8Coder.of())
- .apply(
- MapElements.via(
- new SimpleFunction<String, String>() {
-
- @Override
- public String apply(final String input) {
- throw new
MyCustomException(FAILED_THE_BATCH_INTENTIONALLY);
- }
- }));
-
- result = (SparkPipelineResult) pipeline.run();
- result.waitUntilFinish();
- } catch (final Exception e) {
- assertThat(e, instanceOf(Pipeline.PipelineExecutionException.class));
- assertThat(e.getCause(), instanceOf(MyCustomException.class));
- assertThat(e.getCause().getMessage(),
is(FAILED_THE_BATCH_INTENTIONALLY));
- assertThat(result.getState(), is(PipelineResult.State.FAILED));
- result.cancel();
- return;
- }
+ private void testFailedPipeline(boolean isStreaming) throws Exception {
+ Pipeline pipeline = createPipeline(isStreaming, new FailAlways());
+ SparkPipelineResult result = (SparkPipelineResult) pipeline.run();
- fail("An injected failure did not affect the pipeline as expected.");
+ PipelineExecutionException e =
+ assertThrows(PipelineExecutionException.class, () ->
result.waitUntilFinish());
+ assertThat(e.getCause(), instanceOf(MyCustomException.class));
+ assertThat(e.getCause().getMessage(), is(FAILED_THE_BATCH_INTENTIONALLY));
+ assertThat(result.getState(), is(PipelineResult.State.FAILED));
+ result.cancel();
}
- private void testTimeoutPipeline(final SparkPipelineOptions options) throws
Exception {
-
- final Pipeline pipeline = getPipeline(options);
-
- final SparkPipelineResult result = (SparkPipelineResult) pipeline.run();
-
- result.waitUntilFinish(Duration.millis(1));
+ private void testTimeoutPipeline(boolean isStreaming) throws Exception {
Review Comment:
Agree, this is strange ... Added a comment and renamed the tests to make
clear what this does
Issue Time Tracking
-------------------
Worklog Id: (was: 768893)
Time Spent: 3h 10m (was: 3h)
> Avoid using forkEvery in Spark runner tests
> -------------------------------------------
>
> Key: BEAM-14334
> URL: https://issues.apache.org/jira/browse/BEAM-14334
> Project: Beam
> Issue Type: Improvement
> Components: runner-spark, testing
> Reporter: Moritz Mack
> Assignee: Moritz Mack
> Priority: P2
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
> Usage of *{color:#FF0000}forkEvery 1{color}* is typically a strong sign of
> poor quality / bad code and should be avoided:
> * It significantly impacts performance when running tests.
> * And it often hides resource leaks, either in code or worse in the runner
> itself.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)