This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d96ee1f [SPARK-37785][SQL][CORE] Add Utils.isInRunningSparkTask
d96ee1f is described below
commit d96ee1f3b3d136971b1893741f4b022a9f15ae20
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Thu Dec 30 23:09:45 2021 +0900
[SPARK-37785][SQL][CORE] Add Utils.isInRunningSparkTask
### What changes were proposed in this pull request?
This PR proposes to add `Utils.isInRunningSparkTask` to see if the codes
are running on tasks e.g., on executors.
### Why are the changes needed?
There is currently no single call to see if we're in a running Spark task
(e.g., in executors). `TaskContext.get == null` is being used for that way. We
should better explicitly factor out to `Utils`.
### Does this PR introduce _any_ user-facing change?
No, dev-only.
### How was this patch tested?
Existing unittests should cover this case.
Closes #35065 from HyukjinKwon/mindor-util-at-executor.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +-
core/src/main/scala/org/apache/spark/util/Utils.scala | 5 +++++
.../spark/sql/catalyst/expressions/EquivalentExpressions.scala | 4 ++--
.../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +-
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 4 ++--
.../src/main/scala/org/apache/spark/sql/execution/Columnar.scala | 3 ++-
6 files changed, 13 insertions(+), 7 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a14efa5..0c5fb0a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2680,7 +2680,7 @@ object SparkContext extends Logging {
* Throws an exception if a SparkContext is about to be created in executors.
*/
private def assertOnDriver(): Unit = {
- if (TaskContext.get != null) {
+ if (Utils.isInRunningSparkTask) {
// we're accessing it during task execution, fail.
throw new IllegalStateException(
"SparkContext should only be created and accessed on the driver.")
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 6597750..4410fe7 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -875,6 +875,11 @@ private[spark] object Utils extends Logging {
}
/**
+ * Returns if the current codes are running in a Spark task, e.g., in
executors.
+ */
+ def isInRunningSparkTask: Boolean = TaskContext.get() != null
+
+ /**
* Gets or creates the directories listed in spark.local.dir or
SPARK_LOCAL_DIRS,
* and returns only the directories that exist / could be created.
*
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
index 269ab31..59e2be4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
@@ -21,9 +21,9 @@ import java.util.Objects
import scala.collection.mutable
-import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.expressions.objects.LambdaVariable
+import org.apache.spark.util.Utils
/**
* This class is used to compute equality of (sub)expression trees.
Expressions can be added
@@ -197,7 +197,7 @@ class EquivalentExpressions {
expr.find(_.isInstanceOf[LambdaVariable]).isDefined ||
// `PlanExpression` wraps query plan. To compare query plans of
`PlanExpression` on executor,
// can cause error like NPE.
- (expr.isInstanceOf[PlanExpression[_]] && TaskContext.get != null)
+ (expr.isInstanceOf[PlanExpression[_]] && Utils.isInRunningSparkTask)
if (!skip && !updateExprInMap(expr, map, useCount)) {
val uc = useCount.signum
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 2ca68c6..105a1c4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -198,7 +198,7 @@ object SQLConf {
* run unit tests (that does not involve SparkSession) in serial order.
*/
def get: SQLConf = {
- if (TaskContext.get != null) {
+ if (Utils.isInRunningSparkTask) {
val conf = existingConf.get()
if (conf != null) {
conf
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index cdd57d7..734b8e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -1023,7 +1023,7 @@ object SparkSession extends Logging {
* @since 2.2.0
*/
def getActiveSession: Option[SparkSession] = {
- if (TaskContext.get != null) {
+ if (Utils.isInRunningSparkTask) {
// Return None when running on executors.
None
} else {
@@ -1039,7 +1039,7 @@ object SparkSession extends Logging {
* @since 2.2.0
*/
def getDefaultSession: Option[SparkSession] = {
- if (TaskContext.get != null) {
+ if (Utils.isInRunningSparkTask) {
// Return None when running on executors.
None
} else {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index 628d4a3..70a508e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.metric.{SQLMetric,
SQLMetrics}
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector,
OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.util.Utils
/**
* Holds a user defined rule that can be used to inject columnar
implementations of various
@@ -66,7 +67,7 @@ trait ColumnarToRowTransition extends UnaryExecNode
*/
case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition
with CodegenSupport {
// supportsColumnar requires to be only called on driver side, see also
SPARK-37779.
- assert(TaskContext.get != null || child.supportsColumnar)
+ assert(Utils.isInRunningSparkTask || child.supportsColumnar)
override def output: Seq[Attribute] = child.output
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]