This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 86efa45 [SPARK-32160][CORE][PYSPARK] Disallow to create SparkContext
in executors
86efa45 is described below
commit 86efa456d8e11a2b7e10bce70d4ead20c75acbe1
Author: Takuya UESHIN <[email protected]>
AuthorDate: Thu Jul 9 15:51:56 2020 +0900
[SPARK-32160][CORE][PYSPARK] Disallow to create SparkContext in executors
### What changes were proposed in this pull request?
This PR proposes to disallow to create `SparkContext` in executors, e.g.,
in UDFs.
### Why are the changes needed?
Currently executors can create SparkContext, but shouldn't be able to
create it.
```scala
sc.range(0, 1).foreach { _ =>
new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
}
```
### Does this PR introduce _any_ user-facing change?
Yes, users won't be able to create `SparkContext` in executors.
### How was this patch tested?
Addes tests.
Closes #28986 from
ueshin/issues/SPARK-32160/disallow_spark_context_in_executors.
Authored-by: Takuya UESHIN <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit cfecc2030d8b4774c1f4754fe81f57fbc61c9c75)
Signed-off-by: HyukjinKwon <[email protected]>
---
.../main/scala/org/apache/spark/SparkContext.scala | 16 ++++++++++
.../scala/org/apache/spark/SparkContextSuite.scala | 12 ++++++++
python/pyspark/context.py | 14 +++++++++
python/pyspark/tests/test_context.py | 8 +++++
.../scala/org/apache/spark/sql/SparkSession.scala | 2 +-
.../ExternalAppendOnlyUnsafeRowArraySuite.scala | 35 ++++++++++------------
6 files changed, 67 insertions(+), 20 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index bcbb7e4..2761f0d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -82,6 +82,9 @@ class SparkContext(config: SparkConf) extends Logging {
// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()
+ // In order to prevent SparkContext from being created in executors.
+ SparkContext.assertOnDriver()
+
// In order to prevent multiple SparkContexts from being active at the same
time, mark this
// context as having started construction.
// NOTE: this must be placed at the beginning of the SparkContext
constructor.
@@ -2540,6 +2543,19 @@ object SparkContext extends Logging {
}
/**
+ * Called to ensure that SparkContext is created or accessed only on the
Driver.
+ *
+ * Throws an exception if a SparkContext is about to be created in executors.
+ */
+ private def assertOnDriver(): Unit = {
+ if (TaskContext.get != null) {
+ // we're accessing it during task execution, fail.
+ throw new IllegalStateException(
+ "SparkContext should only be created and accessed on the driver.")
+ }
+ }
+
+ /**
* This function may be used to get or instantiate a SparkContext and
register it as a
* singleton object. Because we can only have one active SparkContext per
JVM,
* this is useful when applications may wish to share a SparkContext.
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 9f8fa89..2b1e110 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -950,6 +950,18 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
}
}
}
+
+ test("SPARK-32160: Disallow to create SparkContext in executors") {
+ sc = new SparkContext(new
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+
+ val error = intercept[SparkException] {
+ sc.range(0, 1).foreach { _ =>
+ new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+ }
+ }.getMessage()
+
+ assert(error.contains("SparkContext should only be created and accessed on
the driver."))
+ }
}
object SparkContextSuite {
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 5bb991e..ecd171a 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -38,6 +38,7 @@ from pyspark.serializers import PickleSerializer,
BatchedSerializer, UTF8Deseria
from pyspark.storagelevel import StorageLevel
from pyspark.resource import ResourceInformation
from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
+from pyspark.taskcontext import TaskContext
from pyspark.traceback_utils import CallSite, first_spark_call
from pyspark.status import StatusTracker
from pyspark.profiler import ProfilerCollector, BasicProfiler
@@ -118,6 +119,9 @@ class SparkContext(object):
...
ValueError:...
"""
+ # In order to prevent SparkContext from being created in executors.
+ SparkContext._assert_on_driver()
+
self._callsite = first_spark_call() or CallSite(None, None, None)
if gateway is not None and gateway.gateway_parameters.auth_token is
None:
raise ValueError(
@@ -1145,6 +1149,16 @@ class SparkContext(object):
resources[name] = ResourceInformation(name, addrs)
return resources
+ @staticmethod
+ def _assert_on_driver():
+ """
+ Called to ensure that SparkContext is created only on the Driver.
+
+ Throws an exception if a SparkContext is about to be created in
executors.
+ """
+ if TaskContext.get() is not None:
+ raise Exception("SparkContext should only be created and accessed
on the driver.")
+
def _test():
import atexit
diff --git a/python/pyspark/tests/test_context.py
b/python/pyspark/tests/test_context.py
index c7f435a..303635d 100644
--- a/python/pyspark/tests/test_context.py
+++ b/python/pyspark/tests/test_context.py
@@ -267,6 +267,14 @@ class ContextTests(unittest.TestCase):
resources = sc.resources
self.assertEqual(len(resources), 0)
+ def test_disallow_to_create_spark_context_in_executors(self):
+ # SPARK-32160: SparkContext should not be created in executors.
+ with SparkContext("local-cluster[3, 1, 1024]") as sc:
+ with self.assertRaises(Exception) as context:
+ sc.range(2).foreach(lambda _: SparkContext())
+ self.assertIn("SparkContext should only be created and accessed on
the driver.",
+ str(context.exception))
+
class ContextTestsWithResources(unittest.TestCase):
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 60a6037..e5d8710 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -1086,7 +1086,7 @@ object SparkSession extends Logging {
}
private def assertOnDriver(): Unit = {
- if (Utils.isTesting && TaskContext.get != null) {
+ if (TaskContext.get != null) {
// we're accessing it during task execution, fail.
throw new IllegalStateException(
"SparkSession should only be created and accessed on the driver.")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
index b29de9c..98aba3b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
@@ -27,32 +27,29 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with
LocalSparkContext {
private val random = new java.util.Random()
- private var taskContext: TaskContext = _
-
- override def afterAll(): Unit = try {
- TaskContext.unset()
- } finally {
- super.afterAll()
- }
private def withExternalArray(inMemoryThreshold: Int, spillThreshold: Int)
(f: ExternalAppendOnlyUnsafeRowArray => Unit):
Unit = {
sc = new SparkContext("local", "test", new SparkConf(false))
- taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
+ val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
TaskContext.setTaskContext(taskContext)
- val array = new ExternalAppendOnlyUnsafeRowArray(
- taskContext.taskMemoryManager(),
- SparkEnv.get.blockManager,
- SparkEnv.get.serializerManager,
- taskContext,
- 1024,
- SparkEnv.get.memoryManager.pageSizeBytes,
- inMemoryThreshold,
- spillThreshold)
- try f(array) finally {
- array.clear()
+ try {
+ val array = new ExternalAppendOnlyUnsafeRowArray(
+ taskContext.taskMemoryManager(),
+ SparkEnv.get.blockManager,
+ SparkEnv.get.serializerManager,
+ taskContext,
+ 1024,
+ SparkEnv.get.memoryManager.pageSizeBytes,
+ inMemoryThreshold,
+ spillThreshold)
+ try f(array) finally {
+ array.clear()
+ }
+ } finally {
+ TaskContext.unset()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]