[ 
https://issues.apache.org/jira/browse/BEAM-14334?focusedWorklogId=768926&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-768926
 ]

ASF GitHub Bot logged work on BEAM-14334:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/May/22 09:27
            Start Date: 11/May/22 09:27
    Worklog Time Spent: 10m 
      Work Description: echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870077639


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to 
{@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be 
enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for 
proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = 
"beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if 
{@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, 
it will be dropped
+   * during serialization potentially leading to confusing behavior. This is 
particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext 
providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext 
getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && 
!usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use 
SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already 
stopped.");
+        throw new IllegalStateException("The provided Spark context was 
already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any 
control.
+      // If the context is null or stopped for some reason, re-create it.

Review Comment:
   Ok thanks for the explanation. In think these details are worth being 
written in the comments and/or javadocs.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 768926)
    Time Spent: 4h 50m  (was: 4h 40m)

> Avoid using forkEvery in Spark runner tests
> -------------------------------------------
>
>                 Key: BEAM-14334
>                 URL: https://issues.apache.org/jira/browse/BEAM-14334
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-spark, testing
>            Reporter: Moritz Mack
>            Assignee: Moritz Mack
>            Priority: P2
>          Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Usage of *{color:#FF0000}forkEvery 1{color}* is typically a strong sign of 
> poor quality / bad code and should be avoided: 
>  * It significantly impacts performance when running tests.
>  * And it often hides resource leaks, either in code or worse in the runner 
> itself.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to