[ 
https://issues.apache.org/jira/browse/BEAM-14334?focusedWorklogId=768893&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-768893
 ]

ASF GitHub Bot logged work on BEAM-14334:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/May/22 08:26
            Start Date: 11/May/22 08:26
    Worklog Time Spent: 10m 
      Work Description: mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870015838


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java:
##########
@@ -20,190 +20,136 @@
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.fail;
+import static org.joda.time.Duration.millis;
+import static org.junit.Assert.assertThrows;
 
 import java.io.Serializable;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.rules.TestName;
 
 /** This suite tests that various scenarios result in proper states of the 
pipeline. */
 public class SparkPipelineStateTest implements Serializable {
 
-  private static class MyCustomException extends RuntimeException {
+  @ClassRule public static SparkContextRule contextRule = new 
SparkContextRule();
 
+  private static class MyCustomException extends RuntimeException {
     MyCustomException(final String message) {
       super(message);
     }
   }
 
-  private final transient SparkPipelineOptions options =
-      PipelineOptionsFactory.as(SparkPipelineOptions.class);
-
-  @Rule public transient TestName testName = new TestName();
-
-  private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the 
batch intentionally";
-
-  private ParDo.SingleOutput<String, String> printParDo(final String prefix) {
-    return ParDo.of(
-        new DoFn<String, String>() {
-
-          @ProcessElement
-          public void processElement(final ProcessContext c) {
-            System.out.println(prefix + " " + c.element());
-          }
-        });
-  }
-
-  private PTransform<PBegin, PCollection<String>> getValues(final 
SparkPipelineOptions options) {
-    final boolean doNotSyncWithWatermark = false;
-    return options.isStreaming()
-        ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1), 
doNotSyncWithWatermark)
-            .nextBatch("one", "two")
-        : Create.of("one", "two");
+  private static class FailAlways extends SimpleFunction<String, String> {
+    @Override
+    public String apply(final String input) {
+      throw new MyCustomException(FAILED_THE_BATCH_INTENTIONALLY);
+    }
   }
 
-  private SparkPipelineOptions getStreamingOptions() {
-    options.setRunner(SparkRunner.class);
-    options.setStreaming(true);
-    return options;
-  }
+  private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the 
batch intentionally";
 
-  private SparkPipelineOptions getBatchOptions() {
+  private Pipeline createPipeline(
+      boolean isStreaming, @Nullable SimpleFunction<String, String> mapFun) {
+    SparkContextOptions options = contextRule.createPipelineOptions();
     options.setRunner(SparkRunner.class);
-    options.setStreaming(false); // explicit because options is reused 
throughout the test.
-    return options;
-  }
-
-  private Pipeline getPipeline(final SparkPipelineOptions options) {
-
-    final Pipeline pipeline = Pipeline.create(options);
-    final String name = testName.getMethodName() + "(isStreaming=" + 
options.isStreaming() + ")";
+    options.setStreaming(isStreaming);
 
-    
pipeline.apply(getValues(options)).setCoder(StringUtf8Coder.of()).apply(printParDo(name));
+    Pipeline pipeline = Pipeline.create(options);
+    PTransform<PBegin, PCollection<String>> values =
+        isStreaming
+            ? CreateStream.of(StringUtf8Coder.of(), millis(1), 
false).nextBatch("one", "two")
+            : Create.of("one", "two");
 
+    PCollection<String> collection = 
pipeline.apply(values).setCoder(StringUtf8Coder.of());
+    if (mapFun != null) {
+      collection.apply(MapElements.via(mapFun));
+    }
     return pipeline;
   }
 
-  private void testFailedPipeline(final SparkPipelineOptions options) throws 
Exception {
-
-    SparkPipelineResult result = null;
-
-    try {
-      final Pipeline pipeline = Pipeline.create(options);
-      pipeline
-          .apply(getValues(options))
-          .setCoder(StringUtf8Coder.of())
-          .apply(
-              MapElements.via(
-                  new SimpleFunction<String, String>() {
-
-                    @Override
-                    public String apply(final String input) {
-                      throw new 
MyCustomException(FAILED_THE_BATCH_INTENTIONALLY);
-                    }
-                  }));
-
-      result = (SparkPipelineResult) pipeline.run();
-      result.waitUntilFinish();
-    } catch (final Exception e) {
-      assertThat(e, instanceOf(Pipeline.PipelineExecutionException.class));
-      assertThat(e.getCause(), instanceOf(MyCustomException.class));
-      assertThat(e.getCause().getMessage(), 
is(FAILED_THE_BATCH_INTENTIONALLY));
-      assertThat(result.getState(), is(PipelineResult.State.FAILED));
-      result.cancel();
-      return;
-    }
+  private void testFailedPipeline(boolean isStreaming) throws Exception {
+    Pipeline pipeline = createPipeline(isStreaming, new FailAlways());
+    SparkPipelineResult result = (SparkPipelineResult) pipeline.run();
 
-    fail("An injected failure did not affect the pipeline as expected.");
+    PipelineExecutionException e =
+        assertThrows(PipelineExecutionException.class, () -> 
result.waitUntilFinish());
+    assertThat(e.getCause(), instanceOf(MyCustomException.class));
+    assertThat(e.getCause().getMessage(), is(FAILED_THE_BATCH_INTENTIONALLY));
+    assertThat(result.getState(), is(PipelineResult.State.FAILED));
+    result.cancel();
   }
 
-  private void testTimeoutPipeline(final SparkPipelineOptions options) throws 
Exception {
-
-    final Pipeline pipeline = getPipeline(options);
-
-    final SparkPipelineResult result = (SparkPipelineResult) pipeline.run();
-
-    result.waitUntilFinish(Duration.millis(1));
+  private void testTimeoutPipeline(boolean isStreaming) throws Exception {

Review Comment:
   Agree, this is strange ... Added a comment and renamed the tests to make 
clear what this does





Issue Time Tracking
-------------------

    Worklog Id:     (was: 768893)
    Time Spent: 3h 10m  (was: 3h)

> Avoid using forkEvery in Spark runner tests
> -------------------------------------------
>
>                 Key: BEAM-14334
>                 URL: https://issues.apache.org/jira/browse/BEAM-14334
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-spark, testing
>            Reporter: Moritz Mack
>            Assignee: Moritz Mack
>            Priority: P2
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Usage of *{color:#FF0000}forkEvery 1{color}* is typically a strong sign of 
> poor quality / bad code and should be avoided: 
>  * It significantly impacts performance when running tests.
>  * And it often hides resource leaks, either in code or worse in the runner 
> itself.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to