[
https://issues.apache.org/jira/browse/BEAM-14334?focusedWorklogId=768952&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-768952
]
ASF GitHub Bot logged work on BEAM-14334:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 11/May/22 09:51
Start Date: 11/May/22 09:51
Worklog Time Spent: 10m
Work Description: mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870102329
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** The Spark context factory. */
-@SuppressWarnings({
- "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
public final class SparkContextFactory {
private static final Logger LOG =
LoggerFactory.getLogger(SparkContextFactory.class);
/**
* If the property {@code beam.spark.test.reuseSparkContext} is set to
{@code true} then the Spark
* context will be reused for beam pipelines. This property should only be
enabled for tests.
+ *
+ * @deprecated Please use {@link SparkContextOptions} instead to allow for
proper lifecycle
+ * control to not leak your SparkContext.
*/
+ @Deprecated
public static final String TEST_REUSE_SPARK_CONTEXT =
"beam.spark.test.reuseSparkContext";
// Spark allows only one context for JVM so this can be static.
- private static JavaSparkContext sparkContext;
- private static String sparkMaster;
- private static boolean usesProvidedSparkContext;
+ private static @Nullable JavaSparkContext sparkContext;
+ private static @Nullable String sparkMaster;
+
+ private static boolean hasProvidedSparkContext;
private SparkContextFactory() {}
+ /**
+ * Set an externally managed {@link JavaSparkContext} that will be used if
{@link
+ * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+ *
+ * <p>A Spark context can also be provided using {@link
+ * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However,
it will be dropped
+ * during serialization potentially leading to confusing behavior. This is
particularly the case
+ * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+ */
+ public static synchronized void setProvidedSparkContext(JavaSparkContext
providedSparkContext) {
+ sparkContext = checkNotNull(providedSparkContext);
+ hasProvidedSparkContext = true;
+ sparkMaster = null;
+ }
+
+ public static synchronized void clearProvidedSparkContext() {
+ hasProvidedSparkContext = false;
+ sparkContext = null;
+ }
+
public static synchronized JavaSparkContext
getSparkContext(SparkPipelineOptions options) {
SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
- usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
- // reuse should be ignored if the context is provided.
- if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) &&
!usesProvidedSparkContext) {
-
- // if the context is null or stopped for some reason, re-create it.
- if (sparkContext == null || sparkContext.sc().isStopped()) {
- sparkContext = createSparkContext(contextOptions);
+ if (contextOptions.getUsesProvidedSparkContext()) {
+ JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+ if (jsc != null) {
+ setProvidedSparkContext(jsc);
+ } else if (hasProvidedSparkContext) {
+ jsc = sparkContext;
+ }
+ if (jsc == null) {
+ throw new IllegalStateException(
+ "No Spark context was provided. Use
SparkContextFactor.setProvidedSparkContext to do so.");
+ } else if (jsc.sc().isStopped()) {
+ LOG.error("The provided Spark context " + jsc + " was already
stopped.");
+ throw new IllegalStateException("The provided Spark context was
already stopped");
+ }
+ LOG.info("Using a provided Spark Context");
+ return jsc;
+ } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+ // This is highly discouraged as it leaks SparkContexts without any
control.
+ // If the context is null or stopped for some reason, re-create it.
+ @Nullable JavaSparkContext jsc = sparkContext;
+ if (jsc == null || jsc.sc().isStopped()) {
+ sparkContext = jsc = createSparkContext(contextOptions);
sparkMaster = options.getSparkMaster();
+ hasProvidedSparkContext = false;
+ } else if (hasProvidedSparkContext) {
Review Comment:
done
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** The Spark context factory. */
-@SuppressWarnings({
- "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
public final class SparkContextFactory {
private static final Logger LOG =
LoggerFactory.getLogger(SparkContextFactory.class);
/**
* If the property {@code beam.spark.test.reuseSparkContext} is set to
{@code true} then the Spark
* context will be reused for beam pipelines. This property should only be
enabled for tests.
+ *
+ * @deprecated Please use {@link SparkContextOptions} instead to allow for
proper lifecycle
+ * control to not leak your SparkContext.
*/
+ @Deprecated
public static final String TEST_REUSE_SPARK_CONTEXT =
"beam.spark.test.reuseSparkContext";
// Spark allows only one context for JVM so this can be static.
- private static JavaSparkContext sparkContext;
- private static String sparkMaster;
- private static boolean usesProvidedSparkContext;
+ private static @Nullable JavaSparkContext sparkContext;
+ private static @Nullable String sparkMaster;
+
+ private static boolean hasProvidedSparkContext;
private SparkContextFactory() {}
+ /**
+ * Set an externally managed {@link JavaSparkContext} that will be used if
{@link
+ * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+ *
+ * <p>A Spark context can also be provided using {@link
+ * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However,
it will be dropped
+ * during serialization potentially leading to confusing behavior. This is
particularly the case
+ * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+ */
+ public static synchronized void setProvidedSparkContext(JavaSparkContext
providedSparkContext) {
+ sparkContext = checkNotNull(providedSparkContext);
+ hasProvidedSparkContext = true;
+ sparkMaster = null;
+ }
+
+ public static synchronized void clearProvidedSparkContext() {
+ hasProvidedSparkContext = false;
+ sparkContext = null;
+ }
+
public static synchronized JavaSparkContext
getSparkContext(SparkPipelineOptions options) {
SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
- usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
- // reuse should be ignored if the context is provided.
- if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) &&
!usesProvidedSparkContext) {
-
- // if the context is null or stopped for some reason, re-create it.
- if (sparkContext == null || sparkContext.sc().isStopped()) {
- sparkContext = createSparkContext(contextOptions);
+ if (contextOptions.getUsesProvidedSparkContext()) {
+ JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+ if (jsc != null) {
+ setProvidedSparkContext(jsc);
+ } else if (hasProvidedSparkContext) {
+ jsc = sparkContext;
+ }
+ if (jsc == null) {
+ throw new IllegalStateException(
+ "No Spark context was provided. Use
SparkContextFactor.setProvidedSparkContext to do so.");
+ } else if (jsc.sc().isStopped()) {
+ LOG.error("The provided Spark context " + jsc + " was already
stopped.");
+ throw new IllegalStateException("The provided Spark context was
already stopped");
+ }
+ LOG.info("Using a provided Spark Context");
+ return jsc;
+ } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+ // This is highly discouraged as it leaks SparkContexts without any
control.
+ // If the context is null or stopped for some reason, re-create it.
Review Comment:
done
Issue Time Tracking
-------------------
Worklog Id: (was: 768952)
Time Spent: 6h (was: 5h 50m)
> Avoid using forkEvery in Spark runner tests
> -------------------------------------------
>
> Key: BEAM-14334
> URL: https://issues.apache.org/jira/browse/BEAM-14334
> Project: Beam
> Issue Type: Improvement
> Components: runner-spark, testing
> Reporter: Moritz Mack
> Assignee: Moritz Mack
> Priority: P2
> Time Spent: 6h
> Remaining Estimate: 0h
>
> Usage of *{color:#FF0000}forkEvery 1{color}* is typically a strong sign of
> poor quality / bad code and should be avoided:
> * It significantly impacts performance when running tests.
> * And it often hides resource leaks, either in code or worse in the runner
> itself.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)