[ 
https://issues.apache.org/jira/browse/BEAM-14334?focusedWorklogId=768569&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-768569
 ]

ASF GitHub Bot logged work on BEAM-14334:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/May/22 16:21
            Start Date: 10/May/22 16:21
    Worklog Time Spent: 10m 
      Work Description: echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r866852800


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to 
{@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be 
enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for 
proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = 
"beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if 
{@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, 
it will be dropped
+   * during serialization potentially leading to confusing behavior. This is 
particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext 
providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext 
getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && 
!usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use 
SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already 
stopped.");
+        throw new IllegalStateException("The provided Spark context was 
already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any 
control.
+      // If the context is null or stopped for some reason, re-create it.
+      @Nullable JavaSparkContext jsc = sparkContext;
+      if (jsc == null || jsc.sc().isStopped()) {
+        sparkContext = jsc = createSparkContext(contextOptions);
         sparkMaster = options.getSparkMaster();
+        hasProvidedSparkContext = false;
+      } else if (hasProvidedSparkContext) {

Review Comment:
   is it missing a ! in the condition ?



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to 
{@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be 
enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for 
proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = 
"beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if 
{@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, 
it will be dropped
+   * during serialization potentially leading to confusing behavior. This is 
particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext 
providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;

Review Comment:
   set sparkMaster from provided `SparkContext` ?



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to 
{@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be 
enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for 
proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = 
"beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if 
{@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, 
it will be dropped
+   * during serialization potentially leading to confusing behavior. This is 
particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext 
providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;

Review Comment:
   clear `sparkMaster` as well ?



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java:
##########
@@ -47,72 +54,49 @@ public class ProvidedSparkContextTest {
   private static final String PROVIDED_CONTEXT_EXCEPTION =
       "The provided Spark context was not created or was stopped";
 
+  @ClassRule public static SparkContextOptionsRule contextRule = new 
SparkContextOptionsRule();

Review Comment:
   name `contextRule` is missleading as a `SparkContextRule` class exists, 
pleaxe rename to `contextOptionsRule`



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java:
##########
@@ -20,190 +20,136 @@
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.fail;
+import static org.joda.time.Duration.millis;
+import static org.junit.Assert.assertThrows;
 
 import java.io.Serializable;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.rules.TestName;
 
 /** This suite tests that various scenarios result in proper states of the 
pipeline. */
 public class SparkPipelineStateTest implements Serializable {
 
-  private static class MyCustomException extends RuntimeException {
+  @ClassRule public static SparkContextRule contextRule = new 
SparkContextRule();
 
+  private static class MyCustomException extends RuntimeException {

Review Comment:
   naming is so cute :smile: please rename to CustomException



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java:
##########
@@ -18,24 +18,31 @@
 package org.apache.beam.runners.spark;
 
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertThrows;
 
 import org.apache.beam.runners.spark.examples.WordCount;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
-import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.ClassRule;
+import org.junit.FixMethodOrder;
 import org.junit.Test;
+import org.junit.runners.MethodSorters;
 
-/** Provided Spark Context tests. */
+/**
+ * Provided Spark Context tests.
+ *
+ * <p>Note: These tests are run sequentially ordered by their name to reuse 
the Spark context and
+ * speed up testing.

Review Comment:
   clever !



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java:
##########
@@ -20,190 +20,136 @@
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.fail;
+import static org.joda.time.Duration.millis;
+import static org.junit.Assert.assertThrows;
 
 import java.io.Serializable;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.rules.TestName;
 
 /** This suite tests that various scenarios result in proper states of the 
pipeline. */
 public class SparkPipelineStateTest implements Serializable {
 
-  private static class MyCustomException extends RuntimeException {
+  @ClassRule public static SparkContextRule contextRule = new 
SparkContextRule();
 
+  private static class MyCustomException extends RuntimeException {
     MyCustomException(final String message) {
       super(message);
     }
   }
 
-  private final transient SparkPipelineOptions options =
-      PipelineOptionsFactory.as(SparkPipelineOptions.class);
-
-  @Rule public transient TestName testName = new TestName();
-
-  private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the 
batch intentionally";
-
-  private ParDo.SingleOutput<String, String> printParDo(final String prefix) {
-    return ParDo.of(
-        new DoFn<String, String>() {
-
-          @ProcessElement
-          public void processElement(final ProcessContext c) {
-            System.out.println(prefix + " " + c.element());
-          }
-        });
-  }
-
-  private PTransform<PBegin, PCollection<String>> getValues(final 
SparkPipelineOptions options) {
-    final boolean doNotSyncWithWatermark = false;
-    return options.isStreaming()
-        ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1), 
doNotSyncWithWatermark)
-            .nextBatch("one", "two")
-        : Create.of("one", "two");
+  private static class FailAlways extends SimpleFunction<String, String> {
+    @Override
+    public String apply(final String input) {
+      throw new MyCustomException(FAILED_THE_BATCH_INTENTIONALLY);
+    }
   }
 
-  private SparkPipelineOptions getStreamingOptions() {
-    options.setRunner(SparkRunner.class);
-    options.setStreaming(true);
-    return options;
-  }
+  private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the 
batch intentionally";
 
-  private SparkPipelineOptions getBatchOptions() {
+  private Pipeline createPipeline(
+      boolean isStreaming, @Nullable SimpleFunction<String, String> mapFun) {
+    SparkContextOptions options = contextRule.createPipelineOptions();
     options.setRunner(SparkRunner.class);
-    options.setStreaming(false); // explicit because options is reused 
throughout the test.
-    return options;
-  }
-
-  private Pipeline getPipeline(final SparkPipelineOptions options) {
-
-    final Pipeline pipeline = Pipeline.create(options);
-    final String name = testName.getMethodName() + "(isStreaming=" + 
options.isStreaming() + ")";
+    options.setStreaming(isStreaming);
 
-    
pipeline.apply(getValues(options)).setCoder(StringUtf8Coder.of()).apply(printParDo(name));
+    Pipeline pipeline = Pipeline.create(options);
+    PTransform<PBegin, PCollection<String>> values =
+        isStreaming
+            ? CreateStream.of(StringUtf8Coder.of(), millis(1), 
false).nextBatch("one", "two")
+            : Create.of("one", "two");
 
+    PCollection<String> collection = 
pipeline.apply(values).setCoder(StringUtf8Coder.of());
+    if (mapFun != null) {
+      collection.apply(MapElements.via(mapFun));
+    }
     return pipeline;
   }
 
-  private void testFailedPipeline(final SparkPipelineOptions options) throws 
Exception {
-
-    SparkPipelineResult result = null;
-
-    try {
-      final Pipeline pipeline = Pipeline.create(options);
-      pipeline
-          .apply(getValues(options))
-          .setCoder(StringUtf8Coder.of())
-          .apply(
-              MapElements.via(
-                  new SimpleFunction<String, String>() {
-
-                    @Override
-                    public String apply(final String input) {
-                      throw new 
MyCustomException(FAILED_THE_BATCH_INTENTIONALLY);
-                    }
-                  }));
-
-      result = (SparkPipelineResult) pipeline.run();
-      result.waitUntilFinish();
-    } catch (final Exception e) {
-      assertThat(e, instanceOf(Pipeline.PipelineExecutionException.class));
-      assertThat(e.getCause(), instanceOf(MyCustomException.class));
-      assertThat(e.getCause().getMessage(), 
is(FAILED_THE_BATCH_INTENTIONALLY));
-      assertThat(result.getState(), is(PipelineResult.State.FAILED));
-      result.cancel();
-      return;
-    }
+  private void testFailedPipeline(boolean isStreaming) throws Exception {
+    Pipeline pipeline = createPipeline(isStreaming, new FailAlways());
+    SparkPipelineResult result = (SparkPipelineResult) pipeline.run();
 
-    fail("An injected failure did not affect the pipeline as expected.");
+    PipelineExecutionException e =
+        assertThrows(PipelineExecutionException.class, () -> 
result.waitUntilFinish());
+    assertThat(e.getCause(), instanceOf(MyCustomException.class));
+    assertThat(e.getCause().getMessage(), is(FAILED_THE_BATCH_INTENTIONALLY));
+    assertThat(result.getState(), is(PipelineResult.State.FAILED));
+    result.cancel();
   }
 
-  private void testTimeoutPipeline(final SparkPipelineOptions options) throws 
Exception {
-
-    final Pipeline pipeline = getPipeline(options);
-
-    final SparkPipelineResult result = (SparkPipelineResult) pipeline.run();
-
-    result.waitUntilFinish(Duration.millis(1));
+  private void testTimeoutPipeline(boolean isStreaming) throws Exception {

Review Comment:
   I know you did not change this test functionnally but it is very strange as 
it does not assert any timeout as the TimeoutException is just ignored in 
`waitUntilFinish` and nothing is thrown and no state change is done. I think 
you should add a comment to explain that



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java:
##########
@@ -112,8 +110,6 @@ public class ResumeFromCheckpointStreamingTest implements 
Serializable {
 
   private transient TemporaryFolder temporaryFolder;
 
-  @Rule public final transient ReuseSparkContextRule noContextReuse = 
ReuseSparkContextRule.no();
-

Review Comment:
     default of your sparkContextRule is to reuse the context. So to disable 
that, you just don't add a context rule, right ?



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpersTest.java:
##########
@@ -32,16 +33,15 @@
 @RunWith(JUnit4.class)
 public class EncoderHelpersTest {
 
+  @ClassRule public static SparkSessionRule sessionRule = new 
SparkSessionRule();
+
   @Test
   public void beamCoderToSparkEncoderTest() {
-    SparkSession sparkSession =
-        SparkSession.builder()
-            .appName("beamCoderToSparkEncoderTest")
-            .master("local[4]")
-            .getOrCreate();

Review Comment:
   Thanks, it is cleaner to use a session test rule than manually creating it. 
I just did the simplest when writing this test :smile: 



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java:
##########
@@ -22,91 +22,94 @@
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Registration;
-import org.apache.beam.runners.spark.SparkContextOptions;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkRunner;
+import org.apache.beam.runners.spark.SparkContextRule;
 import org.apache.beam.runners.spark.io.MicrobatchSource;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.beam.sdk.values.KV;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
 
-/** Testing of beam registrar. */
+/**
+ * Testing of beam registrar. Note: There can only be one Spark context at a 
time. For that reason
+ * tests requiring a different context have to be forked using separate test 
classes.
+ */
 @SuppressWarnings({
   "rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
 })
+@RunWith(Enclosed.class)
 public class SparkRunnerKryoRegistratorTest {
 
-  @Test
-  public void testKryoRegistration() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-    conf.set("spark.kryo.registrator", WrapperKryoRegistrator.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-    assertTrue(
-        "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is 
not set",
-        WrapperKryoRegistrator.wasInitiated);
-  }
+  public static class WithKryoSerializer {
 
-  @Test
-  public void testDefaultSerializerNotCallingKryo() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.kryo.registrator", 
KryoRegistratorIsNotCalled.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-  }
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer"),
+            KV.of("spark.kryo.registrator", 
WrapperKryoRegistrator.class.getName()));
 
-  private void runSimplePipelineWithSparkContext(SparkConf conf) {
-    SparkPipelineOptions options =
-        PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
-    options.setRunner(TestSparkRunner.class);
+    @Test
+    public void testKryoRegistration() {
+      runSimplePipelineWithSparkContextOptions(contextRule);
+      assertTrue(
+          "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is 
not set",
+          WrapperKryoRegistrator.wasInitiated);
+    }
 
-    conf.set("spark.master", "local");
-    conf.setAppName("test");
+    /**
+     * A {@link SparkRunnerKryoRegistrator} that registers an internal class 
to validate
+     * KryoSerialization resolution. Use only for test purposes. Needs to be 
public for
+     * serialization.
+     */
+    public static class WrapperKryoRegistrator extends 
SparkRunnerKryoRegistrator {
 
-    JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
-    options.setUsesProvidedSparkContext(true);
-    
options.as(SparkContextOptions.class).setProvidedSparkContext(javaSparkContext);
-    Pipeline p = Pipeline.create(options);
-    p.apply(Create.of("a")); // some operation to trigger pipeline construction
-    p.run().waitUntilFinish();
-    javaSparkContext.stop();
-  }
+      static boolean wasInitiated = false;
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that fails if called. Use only for 
test purposes. Needs to
-   * be public for serialization.
-   */
-  public static class KryoRegistratorIsNotCalled extends 
SparkRunnerKryoRegistrator {
+      public WrapperKryoRegistrator() {
+        wasInitiated = true;
+      }
 
-    @Override
-    public void registerClasses(Kryo kryo) {
-      fail(
-          "Default spark.serializer is JavaSerializer"
-              + " so spark.kryo.registrator shouldn't be called");
+      @Override
+      public void registerClasses(Kryo kryo) {
+        super.registerClasses(kryo);
+        Registration registration = 
kryo.getRegistration(MicrobatchSource.class);
+        com.esotericsoftware.kryo.Serializer kryoSerializer = 
registration.getSerializer();
+        assertTrue(kryoSerializer instanceof StatelessJavaSerializer);
+      }
     }
   }
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that registers an internal class to 
validate
-   * KryoSerialization resolution. Use only for test purposes. Needs to be 
public for serialization.
-   */
-  public static class WrapperKryoRegistrator extends 
SparkRunnerKryoRegistrator {
+  public static class WithoutKryoSerializer {
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.kryo.registrator", 
KryoRegistratorIsNotCalled.class.getName()));
 
-    static boolean wasInitiated = false;
-
-    public WrapperKryoRegistrator() {
-      wasInitiated = true;
+    @Test
+    public void testDefaultSerializerNotCallingKryo() {

Review Comment:
   the assertion here is just that no exception is thrown upon serde ?



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java:
##########
@@ -47,72 +54,49 @@ public class ProvidedSparkContextTest {
   private static final String PROVIDED_CONTEXT_EXCEPTION =
       "The provided Spark context was not created or was stopped";
 
+  @ClassRule public static SparkContextOptionsRule contextRule = new 
SparkContextOptionsRule();
+
   /** Provide a context and call pipeline run. */
   @Test
-  public void testWithProvidedContext() throws Exception {
-    JavaSparkContext jsc = new JavaSparkContext("local[*]", 
"Existing_Context");
-    testWithValidProvidedContext(jsc);
+  public void testAWithProvidedContext() throws Exception {
+    Pipeline p = createPipeline();
+    PipelineResult result = p.run(); // Run test from pipeline
+    result.waitUntilFinish();

Review Comment:
   should not be needed for batch pipelines but good to get used to call it



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java:
##########
@@ -47,72 +54,49 @@ public class ProvidedSparkContextTest {
   private static final String PROVIDED_CONTEXT_EXCEPTION =
       "The provided Spark context was not created or was stopped";
 
+  @ClassRule public static SparkContextOptionsRule contextRule = new 
SparkContextOptionsRule();
+
   /** Provide a context and call pipeline run. */
   @Test
-  public void testWithProvidedContext() throws Exception {
-    JavaSparkContext jsc = new JavaSparkContext("local[*]", 
"Existing_Context");
-    testWithValidProvidedContext(jsc);
+  public void testAWithProvidedContext() throws Exception {
+    Pipeline p = createPipeline();
+    PipelineResult result = p.run(); // Run test from pipeline
+    result.waitUntilFinish();
+    TestPipeline.verifyPAssertsSucceeded(p, result);

Review Comment:
   why is this needed, adding the PAssert to the pipeline DAG and running the 
pipeline should be enough



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java:
##########
@@ -68,7 +73,7 @@ public void testInBatchMode() throws Exception {
             .apply(new WordCount.CountWords())
             .apply(MapElements.via(new WordCount.FormatAsTextFn()));
     PAssert.that(output).containsInAnyOrder(EXPECTED_COUNTS);
-    pipeline.run();
+    pipeline.run().waitUntilFinish();

Review Comment:
   good catch !



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import static java.util.stream.Collectors.toMap;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.rules.ExternalResource;
+
+public class SparkContextRule extends ExternalResource implements Serializable 
{
+  private transient SparkConf sparkConf;
+  private transient @Nullable JavaSparkContext sparkContext = null;
+
+  public SparkContextRule(String sparkMaster, Map<String, String> sparkConfig) 
{
+    sparkConf = new SparkConf();
+    sparkConfig.forEach(sparkConf::set);
+    sparkConf.setMaster(sparkMaster).setAppName(sparkMaster);

Review Comment:
   get the app name from pipeline options



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import static java.util.stream.Collectors.toMap;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.rules.ExternalResource;
+
+public class SparkContextRule extends ExternalResource implements Serializable 
{
+  private transient SparkConf sparkConf;
+  private transient @Nullable JavaSparkContext sparkContext = null;
+
+  public SparkContextRule(String sparkMaster, Map<String, String> sparkConfig) 
{
+    sparkConf = new SparkConf();
+    sparkConfig.forEach(sparkConf::set);
+    sparkConf.setMaster(sparkMaster).setAppName(sparkMaster);
+  }
+
+  public SparkContextRule(KV<String, String>... sparkConfig) {
+    this("local", sparkConfig);
+  }
+
+  public SparkContextRule(String sparkMaster, KV<String, String>... 
sparkConfig) {
+    this(sparkMaster, Arrays.stream(sparkConfig).collect(toMap(KV::getKey, 
KV::getValue)));
+  }
+
+  public JavaSparkContext getSparkContext() {
+    if (sparkContext == null) {
+      throw new IllegalStateException("SparkContext not available");
+    }
+    return sparkContext;
+  }
+
+  public SparkContextOptions createPipelineOptions() {
+    return configure(TestPipeline.testingPipelineOptions());
+  }
+
+  public SparkContextOptions configure(PipelineOptions opts) {
+    SparkContextOptions ctxOpts = opts.as(SparkContextOptions.class);
+    ctxOpts.setUsesProvidedSparkContext(true);
+    ctxOpts.setProvidedSparkContext(getSparkContext());
+    return ctxOpts;
+  }
+
+  @Override
+  protected void before() throws Throwable {
+    sparkContext = new JavaSparkContext(sparkConf);
+    SparkContextFactory.setProvidedSparkContext(sparkContext);
+  }
+
+  @Override
+  protected void after() {
+    SparkContextFactory.clearProvidedSparkContext();
+    getSparkContext().stop();

Review Comment:
   if it is no more used, set to null to be sure that the GC will free memory ?



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming;
+
+import static java.util.stream.Collectors.toMap;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.values.KV;
+import org.apache.spark.sql.SparkSession;
+import org.junit.rules.ExternalResource;
+
+public class SparkSessionRule extends ExternalResource implements Serializable 
{
+  private transient SparkSession.Builder builder;
+  private transient @Nullable SparkSession session = null;
+
+  public SparkSessionRule(String sparkMaster, Map<String, String> sparkConfig) 
{
+    builder = SparkSession.builder();
+    sparkConfig.forEach(builder::config);
+    builder.master(sparkMaster).appName("test");

Review Comment:
   appName should be fetchable from beam pipeline options



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to 
{@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be 
enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for 
proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = 
"beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if 
{@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, 
it will be dropped
+   * during serialization potentially leading to confusing behavior. This is 
particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext 
providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext 
getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && 
!usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use 
SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already 
stopped.");
+        throw new IllegalStateException("The provided Spark context was 
already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any 
control.
+      // If the context is null or stopped for some reason, re-create it.

Review Comment:
   If by leak you mean that we keep a reference to the context and thus the GC 
can't free the object, would it be possible in that case to solve the leak for 
example by clearing the references that point to it (or using an alternative 
reference type) and keep this setting? 



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to 
{@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be 
enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for 
proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = 
"beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if 
{@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, 
it will be dropped

Review Comment:
   please add this comment and deprecation notice to 
`SparkContextOptions#setProvidedSparkContext`



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java:
##########
@@ -47,72 +54,49 @@ public class ProvidedSparkContextTest {
   private static final String PROVIDED_CONTEXT_EXCEPTION =
       "The provided Spark context was not created or was stopped";
 
+  @ClassRule public static SparkContextOptionsRule contextRule = new 
SparkContextOptionsRule();
+
   /** Provide a context and call pipeline run. */
   @Test
-  public void testWithProvidedContext() throws Exception {
-    JavaSparkContext jsc = new JavaSparkContext("local[*]", 
"Existing_Context");
-    testWithValidProvidedContext(jsc);
+  public void testAWithProvidedContext() throws Exception {
+    Pipeline p = createPipeline();
+    PipelineResult result = p.run(); // Run test from pipeline
+    result.waitUntilFinish();
+    TestPipeline.verifyPAssertsSucceeded(p, result);
     // A provided context must not be stopped after execution
-    assertFalse(jsc.sc().isStopped());
-    jsc.stop();
+    assertFalse(contextRule.getSparkContext().sc().isStopped());
   }
 
-  /** Provide a context and call pipeline run. */
+  /** A SparkRunner with a stopped provided Spark context cannot run 
pipelines. */
   @Test
-  public void testWithNullContext() throws Exception {
-    testWithInvalidContext(null);
+  public void testBWithStoppedProvidedContext() {
+    // Stop the provided Spark context
+    contextRule.getSparkContext().sc().stop();
+    assertThrows(
+        PROVIDED_CONTEXT_EXCEPTION,
+        RuntimeException.class,
+        () -> createPipeline().run().waitUntilFinish());
   }
 
-  /** A SparkRunner with a stopped provided Spark context cannot run 
pipelines. */
+  /** Provide a context and call pipeline run. */
   @Test
-  public void testWithStoppedProvidedContext() throws Exception {
-    JavaSparkContext jsc = new JavaSparkContext("local[*]", 
"Existing_Context");
-    // Stop the provided Spark context directly
-    jsc.stop();
-    testWithInvalidContext(jsc);
+  public void testCWithNullContext() {
+    contextRule.getOptions().setProvidedSparkContext(null);
+    assertThrows(
+        PROVIDED_CONTEXT_EXCEPTION,
+        RuntimeException.class,
+        () -> createPipeline().run().waitUntilFinish());
   }
 
-  private void testWithValidProvidedContext(JavaSparkContext jsc) throws 
Exception {
-    SparkContextOptions options = getSparkContextOptions(jsc);
-
-    Pipeline p = Pipeline.create(options);
+  private Pipeline createPipeline() {
+    Pipeline p = Pipeline.create(contextRule.getOptions());
     PCollection<String> inputWords = 
p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
     PCollection<String> output =
         inputWords
             .apply(new WordCount.CountWords())
             .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
-    PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
-
     // Run test from pipeline
-    PipelineResult result = p.run();
-
-    TestPipeline.verifyPAssertsSucceeded(p, result);
-  }
-
-  private void testWithInvalidContext(JavaSparkContext jsc) {
-    SparkContextOptions options = getSparkContextOptions(jsc);
-
-    Pipeline p = Pipeline.create(options);
-    PCollection<String> inputWords = 
p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
-    inputWords
-        .apply(new WordCount.CountWords())
-        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
-    try {
-      p.run().waitUntilFinish();
-      fail("Should throw an exception when The provided Spark context is null 
or stopped");
-    } catch (RuntimeException e) {
-      assert e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION);
-    }
-  }
-
-  private static SparkContextOptions getSparkContextOptions(JavaSparkContext 
jsc) {
-    final SparkContextOptions options = 
PipelineOptionsFactory.as(SparkContextOptions.class);
-    options.setRunner(TestSparkRunner.class);
-    options.setUsesProvidedSparkContext(true);
-    options.setProvidedSparkContext(jsc);
-    options.setEnableSparkMetricSinks(false);
-    return options;
+    PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
+    return p;

Review Comment:
   yes I prefer this coding style too: preparing the pipeline in a util method 
and do the run and assertions inside the test method. + it avoids the pipeline 
code duplication that was in place



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java:
##########
@@ -86,7 +85,6 @@
 public class CreateStreamTest implements Serializable {
 
   @Rule public final transient TestPipeline p = TestPipeline.create();
-  @Rule public final transient ReuseSparkContextRule noContextResue = 
ReuseSparkContextRule.no();

Review Comment:
   and here again



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to 
{@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be 
enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for 
proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = 
"beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if 
{@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, 
it will be dropped
+   * during serialization potentially leading to confusing behavior. This is 
particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext 
providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext 
getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && 
!usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use 
SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already 
stopped.");
+        throw new IllegalStateException("The provided Spark context was 
already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any 
control.
+      // If the context is null or stopped for some reason, re-create it.
+      @Nullable JavaSparkContext jsc = sparkContext;
+      if (jsc == null || jsc.sc().isStopped()) {
+        sparkContext = jsc = createSparkContext(contextOptions);
         sparkMaster = options.getSparkMaster();
+        hasProvidedSparkContext = false;

Review Comment:
   should be true ?



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java:
##########
@@ -53,8 +52,6 @@ public class SparkCoGroupByKeyStreamingTest {
   private static final TupleTag<Integer> INPUT1_TAG = new TupleTag<>("input1");
   private static final TupleTag<Integer> INPUT2_TAG = new TupleTag<>("input2");
 
-  @Rule public final transient ReuseSparkContextRule noContextResue = 
ReuseSparkContextRule.no();
-

Review Comment:
   same here



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming;
+
+import static java.util.stream.Collectors.toMap;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.values.KV;
+import org.apache.spark.sql.SparkSession;
+import org.junit.rules.ExternalResource;
+
+public class SparkSessionRule extends ExternalResource implements Serializable 
{
+  private transient SparkSession.Builder builder;
+  private transient @Nullable SparkSession session = null;
+
+  public SparkSessionRule(String sparkMaster, Map<String, String> sparkConfig) 
{
+    builder = SparkSession.builder();
+    sparkConfig.forEach(builder::config);
+    builder.master(sparkMaster).appName("test");
+  }
+
+  public SparkSessionRule(KV<String, String>... sparkConfig) {
+    this("local", sparkConfig);
+  }
+
+  public SparkSessionRule(String sparkMaster, KV<String, String>... 
sparkConfig) {
+    this(sparkMaster, Arrays.stream(sparkConfig).collect(toMap(KV::getKey, 
KV::getValue)));
+  }
+
+  public SparkSession getSession() {
+    if (session == null) {
+      throw new IllegalStateException("SparkSession not available");
+    }
+    return session;
+  }
+
+  @Override
+  protected void before() throws Throwable {
+    session = builder.getOrCreate();
+  }
+
+  @Override
+  protected void after() {
+    getSession().stop();

Review Comment:
   set session to null to be sure that it is freed by GC ?



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java:
##########
@@ -52,8 +51,6 @@ public class SparkMetricsPusherTest {
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkMetricsPusherTest.class);
   private static final String COUNTER_NAME = "counter";
 
-  @Rule public final transient ReuseSparkContextRule noContextResue = 
ReuseSparkContextRule.no();
-

Review Comment:
   once again





Issue Time Tracking
-------------------

    Worklog Id:     (was: 768569)
    Time Spent: 1h 40m  (was: 1.5h)

> Avoid using forkEvery in Spark runner tests
> -------------------------------------------
>
>                 Key: BEAM-14334
>                 URL: https://issues.apache.org/jira/browse/BEAM-14334
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-spark, testing
>            Reporter: Moritz Mack
>            Assignee: Moritz Mack
>            Priority: P2
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Usage of *{color:#FF0000}forkEvery 1{color}* is typically a strong sign of 
> poor quality / bad code and should be avoided: 
>  * It significantly impacts performance when running tests.
>  * And it often hides resource leaks, either in code or worse in the runner 
> itself.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to