mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870001724
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** The Spark context factory. */
-@SuppressWarnings({
- "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
public final class SparkContextFactory {
private static final Logger LOG =
LoggerFactory.getLogger(SparkContextFactory.class);
/**
* If the property {@code beam.spark.test.reuseSparkContext} is set to
{@code true} then the Spark
* context will be reused for beam pipelines. This property should only be
enabled for tests.
+ *
+ * @deprecated Please use {@link SparkContextOptions} instead to allow for
proper lifecycle
+ * control to not leak your SparkContext.
*/
+ @Deprecated
public static final String TEST_REUSE_SPARK_CONTEXT =
"beam.spark.test.reuseSparkContext";
// Spark allows only one context for JVM so this can be static.
- private static JavaSparkContext sparkContext;
- private static String sparkMaster;
- private static boolean usesProvidedSparkContext;
+ private static @Nullable JavaSparkContext sparkContext;
+ private static @Nullable String sparkMaster;
+
+ private static boolean hasProvidedSparkContext;
private SparkContextFactory() {}
+ /**
+ * Set an externally managed {@link JavaSparkContext} that will be used if
{@link
+ * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+ *
+ * <p>A Spark context can also be provided using {@link
+ * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However,
it will be dropped
+ * during serialization potentially leading to confusing behavior. This is
particularly the case
+ * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+ */
+ public static synchronized void setProvidedSparkContext(JavaSparkContext
providedSparkContext) {
+ sparkContext = checkNotNull(providedSparkContext);
+ hasProvidedSparkContext = true;
+ sparkMaster = null;
+ }
+
+ public static synchronized void clearProvidedSparkContext() {
+ hasProvidedSparkContext = false;
+ sparkContext = null;
+ }
+
public static synchronized JavaSparkContext
getSparkContext(SparkPipelineOptions options) {
SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
- usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
- // reuse should be ignored if the context is provided.
- if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) &&
!usesProvidedSparkContext) {
-
- // if the context is null or stopped for some reason, re-create it.
- if (sparkContext == null || sparkContext.sc().isStopped()) {
- sparkContext = createSparkContext(contextOptions);
+ if (contextOptions.getUsesProvidedSparkContext()) {
+ JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+ if (jsc != null) {
+ setProvidedSparkContext(jsc);
+ } else if (hasProvidedSparkContext) {
+ jsc = sparkContext;
+ }
+ if (jsc == null) {
+ throw new IllegalStateException(
+ "No Spark context was provided. Use
SparkContextFactor.setProvidedSparkContext to do so.");
+ } else if (jsc.sc().isStopped()) {
+ LOG.error("The provided Spark context " + jsc + " was already
stopped.");
+ throw new IllegalStateException("The provided Spark context was
already stopped");
+ }
+ LOG.info("Using a provided Spark Context");
+ return jsc;
+ } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+ // This is highly discouraged as it leaks SparkContexts without any
control.
+ // If the context is null or stopped for some reason, re-create it.
+ @Nullable JavaSparkContext jsc = sparkContext;
+ if (jsc == null || jsc.sc().isStopped()) {
+ sparkContext = jsc = createSparkContext(contextOptions);
sparkMaster = options.getSparkMaster();
+ hasProvidedSparkContext = false;
+ } else if (hasProvidedSparkContext) {
Review Comment:
No, this is correct... in this case there's a provided SparkContext, but
usage of it is disabled according to
`SparkPipelineOptions.getUsesProvidedSparkContext()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]