This is an automated email from the ASF dual-hosted git repository.
kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 9adc18d3b [GLUTEN-4882] ColumnarBroadcastExchangeExec should
set/cancel with job tag for Spark3.5 (#4882)
9adc18d3b is described below
commit 9adc18d3b35c014146ac9466d503a542e1c88c70
Author: Xiduo You <[email protected]>
AuthorDate: Fri Mar 8 19:16:55 2024 +0800
[GLUTEN-4882] ColumnarBroadcastExchangeExec should set/cancel with job tag
for Spark3.5 (#4882)
---
.../sql/execution/ColumnarBroadcastExchangeExec.scala | 9 +++------
.../scala/io/glutenproject/sql/shims/SparkShims.scala | 13 +++++++++++--
.../sql/shims/spark32/Spark32Shims.scala | 19 ++++++++++++++++++-
.../sql/shims/spark33/Spark33Shims.scala | 19 ++++++++++++++++++-
.../sql/shims/spark34/Spark34Shims.scala | 19 ++++++++++++++++++-
.../sql/shims/spark35/Spark35Shims.scala | 18 ++++++++++++++++--
6 files changed, 84 insertions(+), 13 deletions(-)
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index 8f43c9094..645bce76d 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.extension.{GlutenPlan, ValidationResult}
import io.glutenproject.metrics.GlutenTimeMetric
+import io.glutenproject.sql.shims.SparkShimLoader
import org.apache.spark.{broadcast, SparkException}
import org.apache.spark.launcher.SparkLauncher
@@ -59,11 +60,7 @@ case class ColumnarBroadcastExchangeExec(mode:
BroadcastMode, child: SparkPlan)
session,
BroadcastExchangeExec.executionContext) {
try {
- // Setup a job group here so later it may get cancelled by groupId if
necessary.
- sparkContext.setJobGroup(
- runId.toString,
- s"broadcast exchange (runId $runId)",
- interruptOnCancel = true)
+
SparkShimLoader.getSparkShims.setJobDescriptionOrTagForBroadcastExchange(sparkContext,
this)
val relation = GlutenTimeMetric.millis(longMetric("collectTime")) {
_ =>
// this created relation ignore HashedRelationBroadcastMode
isNullAware, because we
@@ -169,7 +166,7 @@ case class ColumnarBroadcastExchangeExec(mode:
BroadcastMode, child: SparkPlan)
case ex: TimeoutException =>
logError(s"Could not execute broadcast in $timeout secs.", ex)
if (!relationFuture.isDone) {
- sparkContext.cancelJobGroup(runId.toString)
+
SparkShimLoader.getSparkShims.cancelJobGroupForBroadcastExchange(sparkContext,
this)
relationFuture.cancel(true)
}
throw new SparkException(
diff --git
a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
index 3653593fe..4894ce34e 100644
--- a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
@@ -18,7 +18,7 @@ package io.glutenproject.sql.shims
import io.glutenproject.expression.Sig
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.scheduler.TaskInfo
import org.apache.spark.shuffle.{ShuffleHandle, ShuffleReader}
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.{FileSourceScanExec,
GlobalLimitExec, Spar
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD,
PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex,
WriteJobDescription, WriteTaskResult}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
-import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike,
ShuffleExchangeLike}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
@@ -117,6 +117,15 @@ trait SparkShims {
def createTestTaskContext(): TaskContext
+ // To be compatible with Spark-3.5 and later
+ // See https://github.com/apache/spark/pull/41440
+ def setJobDescriptionOrTagForBroadcastExchange(
+ sc: SparkContext,
+ broadcastExchange: BroadcastExchangeLike): Unit
+ def cancelJobGroupForBroadcastExchange(
+ sc: SparkContext,
+ broadcastExchange: BroadcastExchangeLike): Unit
+
def getShuffleReaderParam[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
diff --git
a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
index 0199f752c..622335046 100644
---
a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
+++
b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
@@ -20,7 +20,7 @@ import
io.glutenproject.execution.datasource.GlutenParquetWriterInjects
import io.glutenproject.expression.{ExpressionNames, Sig}
import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims}
-import org.apache.spark.{ShuffleUtils, TaskContext, TaskContextUtils}
+import org.apache.spark.{ShuffleUtils, SparkContext, TaskContext,
TaskContextUtils}
import org.apache.spark.scheduler.TaskInfo
import org.apache.spark.shuffle.ShuffleHandle
import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -38,6 +38,7 @@ import
org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
+import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
@@ -124,6 +125,22 @@ class Spark32Shims extends SparkShims {
TaskContextUtils.createTestTaskContext()
}
+ def setJobDescriptionOrTagForBroadcastExchange(
+ sc: SparkContext,
+ broadcastExchange: BroadcastExchangeLike): Unit = {
+ // Setup a job group here so later it may get cancelled by groupId if
necessary.
+ sc.setJobGroup(
+ broadcastExchange.runId.toString,
+ s"broadcast exchange (runId ${broadcastExchange.runId})",
+ interruptOnCancel = true)
+ }
+
+ def cancelJobGroupForBroadcastExchange(
+ sc: SparkContext,
+ broadcastExchange: BroadcastExchangeLike): Unit = {
+ sc.cancelJobGroup(broadcastExchange.runId.toString)
+ }
+
override def getShuffleReaderParam[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
diff --git
a/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
b/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
index 8774f70b3..b580e792b 100644
---
a/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
+++
b/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
@@ -21,7 +21,7 @@ import
io.glutenproject.execution.datasource.GlutenParquetWriterInjects
import io.glutenproject.expression.{ExpressionNames, Sig}
import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims}
-import org.apache.spark.{ShuffleDependency, ShuffleUtils, SparkEnv,
SparkException, TaskContext, TaskContextUtils}
+import org.apache.spark.{ShuffleDependency, ShuffleUtils, SparkContext,
SparkEnv, SparkException, TaskContext, TaskContextUtils}
import org.apache.spark.scheduler.TaskInfo
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.shuffle.ShuffleHandle
@@ -41,6 +41,7 @@ import
org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
+import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
@@ -167,6 +168,22 @@ class Spark33Shims extends SparkShims {
TaskContextUtils.createTestTaskContext()
}
+ def setJobDescriptionOrTagForBroadcastExchange(
+ sc: SparkContext,
+ broadcastExchange: BroadcastExchangeLike): Unit = {
+ // Setup a job group here so later it may get cancelled by groupId if
necessary.
+ sc.setJobGroup(
+ broadcastExchange.runId.toString,
+ s"broadcast exchange (runId ${broadcastExchange.runId})",
+ interruptOnCancel = true)
+ }
+
+ def cancelJobGroupForBroadcastExchange(
+ sc: SparkContext,
+ broadcastExchange: BroadcastExchangeLike): Unit = {
+ sc.cancelJobGroup(broadcastExchange.runId.toString)
+ }
+
override def getShuffleReaderParam[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
diff --git
a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
index 658eb38ec..a1851b276 100644
---
a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
@@ -20,7 +20,7 @@ import io.glutenproject.GlutenConfig
import io.glutenproject.expression.{ExpressionNames, Sig}
import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims}
-import org.apache.spark.{ShuffleUtils, SparkException, TaskContext,
TaskContextUtils}
+import org.apache.spark.{ShuffleUtils, SparkContext, SparkException,
TaskContext, TaskContextUtils}
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.paths.SparkPath
import org.apache.spark.scheduler.TaskInfo
@@ -40,6 +40,7 @@ import
org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
+import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
@@ -205,6 +206,22 @@ class Spark34Shims extends SparkShims {
TaskContextUtils.createTestTaskContext()
}
+ def setJobDescriptionOrTagForBroadcastExchange(
+ sc: SparkContext,
+ broadcastExchange: BroadcastExchangeLike): Unit = {
+ // Setup a job group here so later it may get cancelled by groupId if
necessary.
+ sc.setJobGroup(
+ broadcastExchange.runId.toString,
+ s"broadcast exchange (runId ${broadcastExchange.runId})",
+ interruptOnCancel = true)
+ }
+
+ def cancelJobGroupForBroadcastExchange(
+ sc: SparkContext,
+ broadcastExchange: BroadcastExchangeLike): Unit = {
+ sc.cancelJobGroup(broadcastExchange.runId.toString)
+ }
+
override def getShuffleReaderParam[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
diff --git
a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
index 0e9aa50bd..8468b7a81 100644
---
a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
@@ -20,7 +20,7 @@ import io.glutenproject.GlutenConfig
import io.glutenproject.expression.{ExpressionNames, Sig}
import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims}
-import org.apache.spark.{ShuffleUtils, SparkException, TaskContext,
TaskContextUtils}
+import org.apache.spark.{ShuffleUtils, SparkContext, SparkException,
TaskContext, TaskContextUtils}
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.paths.SparkPath
import org.apache.spark.scheduler.TaskInfo
@@ -41,7 +41,7 @@ import
org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
-import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike,
ShuffleExchangeLike}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
@@ -205,6 +205,20 @@ class Spark35Shims extends SparkShims {
TaskContextUtils.createTestTaskContext()
}
+ override def setJobDescriptionOrTagForBroadcastExchange(
+ sc: SparkContext,
+ broadcastExchange: BroadcastExchangeLike): Unit = {
+ // Setup a job tag here so later it may get cancelled by tag if necessary.
+ sc.addJobTag(broadcastExchange.jobTag)
+ sc.setInterruptOnCancel(true)
+ }
+
+ override def cancelJobGroupForBroadcastExchange(
+ sc: SparkContext,
+ broadcastExchange: BroadcastExchangeLike): Unit = {
+ sc.cancelJobsWithTag(broadcastExchange.jobTag)
+ }
+
override def getShuffleReaderParam[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]