This is an automated email from the ASF dual-hosted git repository.

kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 9adc18d3b [GLUTEN-4882] ColumnarBroadcastExchangeExec should 
set/cancel with job tag for Spark3.5 (#4882)
9adc18d3b is described below

commit 9adc18d3b35c014146ac9466d503a542e1c88c70
Author: Xiduo You <[email protected]>
AuthorDate: Fri Mar 8 19:16:55 2024 +0800

    [GLUTEN-4882] ColumnarBroadcastExchangeExec should set/cancel with job tag 
for Spark3.5 (#4882)
---
 .../sql/execution/ColumnarBroadcastExchangeExec.scala |  9 +++------
 .../scala/io/glutenproject/sql/shims/SparkShims.scala | 13 +++++++++++--
 .../sql/shims/spark32/Spark32Shims.scala              | 19 ++++++++++++++++++-
 .../sql/shims/spark33/Spark33Shims.scala              | 19 ++++++++++++++++++-
 .../sql/shims/spark34/Spark34Shims.scala              | 19 ++++++++++++++++++-
 .../sql/shims/spark35/Spark35Shims.scala              | 18 ++++++++++++++++--
 6 files changed, 84 insertions(+), 13 deletions(-)

diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index 8f43c9094..645bce76d 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
 import io.glutenproject.backendsapi.BackendsApiManager
 import io.glutenproject.extension.{GlutenPlan, ValidationResult}
 import io.glutenproject.metrics.GlutenTimeMetric
+import io.glutenproject.sql.shims.SparkShimLoader
 
 import org.apache.spark.{broadcast, SparkException}
 import org.apache.spark.launcher.SparkLauncher
@@ -59,11 +60,7 @@ case class ColumnarBroadcastExchangeExec(mode: 
BroadcastMode, child: SparkPlan)
       session,
       BroadcastExchangeExec.executionContext) {
       try {
-        // Setup a job group here so later it may get cancelled by groupId if 
necessary.
-        sparkContext.setJobGroup(
-          runId.toString,
-          s"broadcast exchange (runId $runId)",
-          interruptOnCancel = true)
+        
SparkShimLoader.getSparkShims.setJobDescriptionOrTagForBroadcastExchange(sparkContext,
 this)
         val relation = GlutenTimeMetric.millis(longMetric("collectTime")) {
           _ =>
             // this created relation ignore HashedRelationBroadcastMode 
isNullAware, because we
@@ -169,7 +166,7 @@ case class ColumnarBroadcastExchangeExec(mode: 
BroadcastMode, child: SparkPlan)
       case ex: TimeoutException =>
         logError(s"Could not execute broadcast in $timeout secs.", ex)
         if (!relationFuture.isDone) {
-          sparkContext.cancelJobGroup(runId.toString)
+          
SparkShimLoader.getSparkShims.cancelJobGroupForBroadcastExchange(sparkContext, 
this)
           relationFuture.cancel(true)
         }
         throw new SparkException(
diff --git 
a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
index 3653593fe..4894ce34e 100644
--- a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
@@ -18,7 +18,7 @@ package io.glutenproject.sql.shims
 
 import io.glutenproject.expression.Sig
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkContext, TaskContext}
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.scheduler.TaskInfo
 import org.apache.spark.shuffle.{ShuffleHandle, ShuffleReader}
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.{FileSourceScanExec, 
GlobalLimitExec, Spar
 import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, 
PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex, 
WriteJobDescription, WriteTaskResult}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.text.TextScan
-import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, 
ShuffleExchangeLike}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
@@ -117,6 +117,15 @@ trait SparkShims {
 
   def createTestTaskContext(): TaskContext
 
+  // To be compatible with Spark-3.5 and later
+  // See https://github.com/apache/spark/pull/41440
+  def setJobDescriptionOrTagForBroadcastExchange(
+      sc: SparkContext,
+      broadcastExchange: BroadcastExchangeLike): Unit
+  def cancelJobGroupForBroadcastExchange(
+      sc: SparkContext,
+      broadcastExchange: BroadcastExchangeLike): Unit
+
   def getShuffleReaderParam[K, C](
       handle: ShuffleHandle,
       startMapIndex: Int,
diff --git 
a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
 
b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
index 0199f752c..622335046 100644
--- 
a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
+++ 
b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala
@@ -20,7 +20,7 @@ import 
io.glutenproject.execution.datasource.GlutenParquetWriterInjects
 import io.glutenproject.expression.{ExpressionNames, Sig}
 import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims}
 
-import org.apache.spark.{ShuffleUtils, TaskContext, TaskContextUtils}
+import org.apache.spark.{ShuffleUtils, SparkContext, TaskContext, 
TaskContextUtils}
 import org.apache.spark.scheduler.TaskInfo
 import org.apache.spark.shuffle.ShuffleHandle
 import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -38,6 +38,7 @@ import 
org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.text.TextScan
 import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
+import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
 import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
@@ -124,6 +125,22 @@ class Spark32Shims extends SparkShims {
     TaskContextUtils.createTestTaskContext()
   }
 
+  def setJobDescriptionOrTagForBroadcastExchange(
+      sc: SparkContext,
+      broadcastExchange: BroadcastExchangeLike): Unit = {
+    // Setup a job group here so later it may get cancelled by groupId if 
necessary.
+    sc.setJobGroup(
+      broadcastExchange.runId.toString,
+      s"broadcast exchange (runId ${broadcastExchange.runId})",
+      interruptOnCancel = true)
+  }
+
+  def cancelJobGroupForBroadcastExchange(
+      sc: SparkContext,
+      broadcastExchange: BroadcastExchangeLike): Unit = {
+    sc.cancelJobGroup(broadcastExchange.runId.toString)
+  }
+
   override def getShuffleReaderParam[K, C](
       handle: ShuffleHandle,
       startMapIndex: Int,
diff --git 
a/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
 
b/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
index 8774f70b3..b580e792b 100644
--- 
a/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
+++ 
b/shims/spark33/src/main/scala/io/glutenproject/sql/shims/spark33/Spark33Shims.scala
@@ -21,7 +21,7 @@ import 
io.glutenproject.execution.datasource.GlutenParquetWriterInjects
 import io.glutenproject.expression.{ExpressionNames, Sig}
 import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims}
 
-import org.apache.spark.{ShuffleDependency, ShuffleUtils, SparkEnv, 
SparkException, TaskContext, TaskContextUtils}
+import org.apache.spark.{ShuffleDependency, ShuffleUtils, SparkContext, 
SparkEnv, SparkException, TaskContext, TaskContextUtils}
 import org.apache.spark.scheduler.TaskInfo
 import org.apache.spark.serializer.SerializerManager
 import org.apache.spark.shuffle.ShuffleHandle
@@ -41,6 +41,7 @@ import 
org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.text.TextScan
 import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
+import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
 import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
@@ -167,6 +168,22 @@ class Spark33Shims extends SparkShims {
     TaskContextUtils.createTestTaskContext()
   }
 
+  def setJobDescriptionOrTagForBroadcastExchange(
+      sc: SparkContext,
+      broadcastExchange: BroadcastExchangeLike): Unit = {
+    // Setup a job group here so later it may get cancelled by groupId if 
necessary.
+    sc.setJobGroup(
+      broadcastExchange.runId.toString,
+      s"broadcast exchange (runId ${broadcastExchange.runId})",
+      interruptOnCancel = true)
+  }
+
+  def cancelJobGroupForBroadcastExchange(
+      sc: SparkContext,
+      broadcastExchange: BroadcastExchangeLike): Unit = {
+    sc.cancelJobGroup(broadcastExchange.runId.toString)
+  }
+
   override def getShuffleReaderParam[K, C](
       handle: ShuffleHandle,
       startMapIndex: Int,
diff --git 
a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
 
b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
index 658eb38ec..a1851b276 100644
--- 
a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
+++ 
b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
@@ -20,7 +20,7 @@ import io.glutenproject.GlutenConfig
 import io.glutenproject.expression.{ExpressionNames, Sig}
 import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims}
 
-import org.apache.spark.{ShuffleUtils, SparkException, TaskContext, 
TaskContextUtils}
+import org.apache.spark.{ShuffleUtils, SparkContext, SparkException, 
TaskContext, TaskContextUtils}
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.paths.SparkPath
 import org.apache.spark.scheduler.TaskInfo
@@ -40,6 +40,7 @@ import 
org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.text.TextScan
 import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
+import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
 import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
@@ -205,6 +206,22 @@ class Spark34Shims extends SparkShims {
     TaskContextUtils.createTestTaskContext()
   }
 
+  def setJobDescriptionOrTagForBroadcastExchange(
+      sc: SparkContext,
+      broadcastExchange: BroadcastExchangeLike): Unit = {
+    // Setup a job group here so later it may get cancelled by groupId if 
necessary.
+    sc.setJobGroup(
+      broadcastExchange.runId.toString,
+      s"broadcast exchange (runId ${broadcastExchange.runId})",
+      interruptOnCancel = true)
+  }
+
+  def cancelJobGroupForBroadcastExchange(
+      sc: SparkContext,
+      broadcastExchange: BroadcastExchangeLike): Unit = {
+    sc.cancelJobGroup(broadcastExchange.runId.toString)
+  }
+
   override def getShuffleReaderParam[K, C](
       handle: ShuffleHandle,
       startMapIndex: Int,
diff --git 
a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
index 0e9aa50bd..8468b7a81 100644
--- 
a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
@@ -20,7 +20,7 @@ import io.glutenproject.GlutenConfig
 import io.glutenproject.expression.{ExpressionNames, Sig}
 import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims}
 
-import org.apache.spark.{ShuffleUtils, SparkException, TaskContext, 
TaskContextUtils}
+import org.apache.spark.{ShuffleUtils, SparkContext, SparkException, 
TaskContext, TaskContextUtils}
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.paths.SparkPath
 import org.apache.spark.scheduler.TaskInfo
@@ -41,7 +41,7 @@ import 
org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.text.TextScan
 import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
-import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, 
ShuffleExchangeLike}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
@@ -205,6 +205,20 @@ class Spark35Shims extends SparkShims {
     TaskContextUtils.createTestTaskContext()
   }
 
+  override def setJobDescriptionOrTagForBroadcastExchange(
+      sc: SparkContext,
+      broadcastExchange: BroadcastExchangeLike): Unit = {
+    // Setup a job tag here so later it may get cancelled by tag if necessary.
+    sc.addJobTag(broadcastExchange.jobTag)
+    sc.setInterruptOnCancel(true)
+  }
+
+  override def cancelJobGroupForBroadcastExchange(
+      sc: SparkContext,
+      broadcastExchange: BroadcastExchangeLike): Unit = {
+    sc.cancelJobsWithTag(broadcastExchange.jobTag)
+  }
+
   override def getShuffleReaderParam[K, C](
       handle: ShuffleHandle,
       startMapIndex: Int,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to