mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r869996630


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to 
{@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be 
enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for 
proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = 
"beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if 
{@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, 
it will be dropped
+   * during serialization potentially leading to confusing behavior. This is 
particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext 
providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext 
getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && 
!usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use 
SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already 
stopped.");
+        throw new IllegalStateException("The provided Spark context was 
already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any 
control.
+      // If the context is null or stopped for some reason, re-create it.

Review Comment:
   No, GC is pretty much irrelevant ... by leak i mean a resource leak. There's 
a SparkContext left behind that is never stopped without any way for users to 
do so. The problem is that this will prevent any further attempt to create a 
new SparkContext in the same JVM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to