This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f52885898 [CORE] Port "SPARK-39983 Should not cache unserialized
broadcast relations on the driver" (#5149)
f52885898 is described below
commit f52885898c20e99684503e37fe1db97a13244e11
Author: Xiduo You <[email protected]>
AuthorDate: Thu Mar 28 08:55:10 2024 +0800
[CORE] Port "SPARK-39983 Should not cache unserialized broadcast relations
on the driver" (#5149)
---
.../execution/ColumnarBroadcastExchangeExec.scala | 4 +-
.../apache/spark/sql/GlutenSQLTestsBaseTrait.scala | 70 ++++++++++++----------
.../utils/velox/VeloxTestSettings.scala | 3 +-
.../execution/GlutenBroadcastExchangeSuite.scala | 39 +++++++++++-
.../io/glutenproject/sql/shims/SparkShims.scala | 10 ++++
.../sql/shims/spark34/Spark34Shims.scala | 9 ++-
.../scala/org/apache/spark/SparkContextUtils.scala | 12 +++-
.../sql/shims/spark35/Spark35Shims.scala | 9 ++-
.../scala/org/apache/spark/SparkContextUtils.scala | 12 +++-
9 files changed, 124 insertions(+), 44 deletions(-)
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index 645bce76d..b90ff4967 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -80,7 +80,9 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode,
child: SparkPlan)
val broadcasted = GlutenTimeMetric.millis(longMetric("broadcastTime"))
{
_ =>
// Broadcast the relation
- sparkContext.broadcast(relation.asInstanceOf[Any])
+ SparkShimLoader.getSparkShims.broadcastInternal(
+ sparkContext,
+ relation.asInstanceOf[Any])
}
// Update driver metrics
diff --git
a/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala
b/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala
index 68fa08879..5cd9f3e9c 100644
---
a/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala
+++
b/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala
@@ -45,39 +45,7 @@ trait GlutenSQLTestsBaseTrait extends SharedSparkSession
with GlutenTestsBaseTra
}
override def sparkConf: SparkConf = {
- // Native SQL configs
- val conf = super.sparkConf
- .setAppName("Gluten-UT")
- .set("spark.driver.memory", "1G")
- .set("spark.sql.adaptive.enabled", "true")
- .set("spark.sql.shuffle.partitions", "1")
- .set("spark.sql.files.maxPartitionBytes", "134217728")
- .set("spark.memory.offHeap.enabled", "true")
- .set("spark.memory.offHeap.size", "1024MB")
- .set("spark.plugins", "io.glutenproject.GlutenPlugin")
- .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
- .set("spark.sql.warehouse.dir", warehouse)
- .set("spark.ui.enabled", "false")
- .set("spark.gluten.ui.enabled", "false")
- // Avoid static evaluation by spark catalyst. But there are some UT issues
- // coming from spark, e.g., expecting SparkException is thrown, but the
wrapped
- // exception is thrown.
- // .set("spark.sql.optimizer.excludedRules", ConstantFolding.ruleName +
"," +
- // NullPropagation.ruleName)
-
- if (BackendTestUtils.isCHBackendLoaded()) {
- conf
- .set("spark.io.compression.codec", "LZ4")
- .set("spark.gluten.sql.columnar.backend.ch.worker.id", "1")
- .set("spark.gluten.sql.enable.native.validation", "false")
- .set(GlutenConfig.GLUTEN_LIB_PATH,
SystemParameters.getClickHouseLibPath)
- .set("spark.sql.files.openCostInBytes", "134217728")
- .set("spark.unsafe.exceptionOnMemoryLeak", "true")
- } else {
- conf.set("spark.unsafe.exceptionOnMemoryLeak", "true")
- }
-
- conf
+ GlutenSQLTestsBaseTrait.nativeSparkConf(super.sparkConf, warehouse)
}
/**
@@ -126,3 +94,39 @@ trait GlutenSQLTestsBaseTrait extends SharedSparkSession
with GlutenTestsBaseTra
}
}
}
+
+object GlutenSQLTestsBaseTrait {
+ def nativeSparkConf(origin: SparkConf, warehouse: String): SparkConf = {
+ // Native SQL configs
+ val conf = origin
+ .setAppName("Gluten-UT")
+ .set("spark.driver.memory", "1G")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set("spark.sql.shuffle.partitions", "1")
+ .set("spark.sql.files.maxPartitionBytes", "134217728")
+ .set("spark.memory.offHeap.enabled", "true")
+ .set("spark.memory.offHeap.size", "1024MB")
+ .set("spark.plugins", "io.glutenproject.GlutenPlugin")
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.sql.warehouse.dir", warehouse)
+ .set("spark.ui.enabled", "false")
+ .set("spark.gluten.ui.enabled", "false")
+ // Avoid static evaluation by spark catalyst. But there are some UT issues
+ // coming from spark, e.g., expecting SparkException is thrown, but the
wrapped
+ // exception is thrown.
+ // .set("spark.sql.optimizer.excludedRules", ConstantFolding.ruleName +
"," +
+ // NullPropagation.ruleName)
+
+ if (BackendTestUtils.isCHBackendLoaded()) {
+ conf
+ .set("spark.io.compression.codec", "LZ4")
+ .set("spark.gluten.sql.columnar.backend.ch.worker.id", "1")
+ .set("spark.gluten.sql.enable.native.validation", "false")
+ .set(GlutenConfig.GLUTEN_LIB_PATH,
SystemParameters.getClickHouseLibPath)
+ .set("spark.sql.files.openCostInBytes", "134217728")
+ .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+ } else {
+ conf.set("spark.unsafe.exceptionOnMemoryLeak", "true")
+ }
+ }
+}
diff --git
a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala
index 852ba27f2..84fc83083 100644
---
a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql._
import
org.apache.spark.sql.catalyst.expressions.{GlutenArithmeticExpressionSuite,
GlutenBitwiseExpressionsSuite, GlutenCastSuite,
GlutenCollectionExpressionsSuite, GlutenComplexTypeSuite,
GlutenConditionalExpressionSuite, GlutenDateExpressionsSuite,
GlutenDecimalExpressionSuite, GlutenHashExpressionsSuite,
GlutenIntervalExpressionsSuite, GlutenLiteralExpressionSuite,
GlutenMathExpressionsSuite, GlutenMiscExpressionsSuite,
GlutenNondeterministicSuite, GlutenNullExpressionsSuite, GlutenPr [...]
import
org.apache.spark.sql.connector.{GlutenDataSourceV2DataFrameSessionCatalogSuite,
GlutenDataSourceV2DataFrameSuite, GlutenDataSourceV2FunctionSuite,
GlutenDataSourceV2SQLSessionCatalogSuite, GlutenDataSourceV2SQLSuiteV1Filter,
GlutenDataSourceV2SQLSuiteV2Filter, GlutenDataSourceV2Suite,
GlutenDeleteFromTableSuite, GlutenFileDataSourceV2FallBackSuite,
GlutenKeyGroupedPartitioningSuite, GlutenLocalScanSuite,
GlutenMetadataColumnSuite, GlutenSupportsCatalogOptionsSuite, GlutenTableCapa
[...]
import org.apache.spark.sql.errors.{GlutenQueryCompilationErrorsDSv2Suite,
GlutenQueryCompilationErrorsSuite, GlutenQueryExecutionErrorsSuite,
GlutenQueryParsingErrorsSuite}
-import org.apache.spark.sql.execution.{FallbackStrategiesSuite,
GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite,
GlutenExchangeSuite, GlutenReplaceHashWithSortAggSuite,
GlutenReuseExchangeAndSubquerySuite, GlutenSameResultSuite, GlutenSortSuite,
GlutenSQLAggregateFunctionSuite, GlutenSQLWindowFunctionSuite,
GlutenTakeOrderedAndProjectSuite}
+import org.apache.spark.sql.execution.{FallbackStrategiesSuite,
GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite,
GlutenExchangeSuite, GlutenLocalBroadcastExchangeSuite,
GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite,
GlutenSameResultSuite, GlutenSortSuite, GlutenSQLAggregateFunctionSuite,
GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite}
import
org.apache.spark.sql.execution.adaptive.velox.VeloxAdaptiveQueryExecSuite
import org.apache.spark.sql.execution.datasources.{GlutenBucketingUtilsSuite,
GlutenCSVReadSchemaSuite, GlutenDataSourceStrategySuite, GlutenDataSourceSuite,
GlutenFileFormatWriterSuite, GlutenFileIndexSuite,
GlutenFileMetadataStructSuite, GlutenFileSourceStrategySuite,
GlutenHadoopFileLinesReaderSuite, GlutenHeaderCSVReadSchemaSuite,
GlutenJsonReadSchemaSuite, GlutenMergedOrcReadSchemaSuite,
GlutenMergedParquetReadSchemaSuite, GlutenOrcCodecSuite,
GlutenOrcReadSchemaSuite, GlutenOrcV1Ag [...]
import
org.apache.spark.sql.execution.datasources.binaryfile.GlutenBinaryFileFormatSuite
@@ -827,6 +827,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenOuterJoinSuite]
enableSuite[FallbackStrategiesSuite]
enableSuite[GlutenBroadcastExchangeSuite]
+ enableSuite[GlutenLocalBroadcastExchangeSuite]
enableSuite[GlutenCoalesceShufflePartitionsSuite]
.excludeByPrefix("determining the number of reducers")
enableSuite[GlutenExchangeSuite]
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
index 481863354..0f953b6d6 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
@@ -16,6 +16,43 @@
*/
package org.apache.spark.sql.execution
-import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext,
SparkFunSuite}
+import org.apache.spark.broadcast.TorrentBroadcast
+import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, GlutenTestsBaseTrait,
SparkSession}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.functions.broadcast
class GlutenBroadcastExchangeSuite extends BroadcastExchangeSuite with
GlutenSQLTestsBaseTrait {}
+
+// Additional tests run in 'local-cluster' mode.
+class GlutenLocalBroadcastExchangeSuite
+ extends SparkFunSuite
+ with LocalSparkContext
+ with GlutenTestsBaseTrait
+ with AdaptiveSparkPlanHelper {
+
+ def newSparkConf(): SparkConf = {
+ val conf = new SparkConf().setMaster("local-cluster[2,1,1024]")
+ GlutenSQLTestsBaseTrait.nativeSparkConf(conf, warehouse)
+ }
+
+ test("SPARK-39983 - Broadcasted relation is not cached on the driver") {
+ // Use distributed cluster as in local mode the broabcast value is
actually cached.
+ val conf = newSparkConf()
+ sc = new SparkContext(conf)
+ val spark = new SparkSession(sc)
+
+ val df = spark.range(1).toDF()
+ val joinDF = df.join(broadcast(df), "id")
+ joinDF.collect()
+ val broadcastExchangeExec = collect(joinDF.queryExecution.executedPlan) {
+ case p: ColumnarBroadcastExchangeExec => p
+ }
+ assert(broadcastExchangeExec.size == 1, "one and only
ColumnarBroadcastExchangeExec")
+
+ // The broadcasted relation should not be cached on the driver.
+ val broadcasted =
+
broadcastExchangeExec(0).relationFuture.get().asInstanceOf[TorrentBroadcast[Any]]
+ assert(!broadcasted.hasCachedValue)
+ }
+}
diff --git
a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
index df1f3e451..7d71346a7 100644
--- a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
@@ -19,6 +19,7 @@ package io.glutenproject.sql.shims
import io.glutenproject.expression.Sig
import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.scheduler.TaskInfo
import org.apache.spark.shuffle.ShuffleHandle
@@ -46,6 +47,8 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import java.util.{ArrayList => JArrayList, Map => JMap}
+import scala.reflect.ClassTag
+
sealed abstract class ShimDescriptor
case class SparkShimDescriptor(major: Int, minor: Int, patch: Int) extends
ShimDescriptor {
@@ -123,6 +126,13 @@ trait SparkShims {
def createTestTaskContext(): TaskContext
+ def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T]
= {
+ // Since Spark 3.4, the `sc.broadcast` has been optimized to use
`sc.broadcastInternal`.
+ // More details see SPARK-39983.
+ // TODO, remove this shim once we drop Spark3.3 and previous
+ sc.broadcast(value)
+ }
+
// To be compatible with Spark-3.5 and later
// See https://github.com/apache/spark/pull/41440
def setJobDescriptionOrTagForBroadcastExchange(
diff --git
a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
index 027e88cbc..2732c190f 100644
---
a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
@@ -20,7 +20,8 @@ import io.glutenproject.GlutenConfig
import io.glutenproject.expression.{ExpressionNames, Sig}
import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims}
-import org.apache.spark.{ShuffleUtils, SparkContext, SparkException,
TaskContext, TaskContextUtils}
+import org.apache.spark.{ShuffleUtils, SparkContext, SparkContextUtils,
SparkException, TaskContext, TaskContextUtils}
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.paths.SparkPath
import org.apache.spark.scheduler.TaskInfo
@@ -53,6 +54,8 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import java.time.ZoneOffset
import java.util.{HashMap => JHashMap, Map => JMap}
+import scala.reflect.ClassTag
+
class Spark34Shims extends SparkShims {
override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
@@ -241,6 +244,10 @@ class Spark34Shims extends SparkShims {
TaskContextUtils.createTestTaskContext()
}
+ override def broadcastInternal[T: ClassTag](sc: SparkContext, value: T):
Broadcast[T] = {
+ SparkContextUtils.broadcastInternal(sc, value)
+ }
+
def setJobDescriptionOrTagForBroadcastExchange(
sc: SparkContext,
broadcastExchange: BroadcastExchangeLike): Unit = {
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
b/shims/spark34/src/main/scala/org/apache/spark/SparkContextUtils.scala
similarity index 74%
copy from
gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
copy to shims/spark34/src/main/scala/org/apache/spark/SparkContextUtils.scala
index 481863354..3cbf2b602 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
+++ b/shims/spark34/src/main/scala/org/apache/spark/SparkContextUtils.scala
@@ -14,8 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.execution
+package org.apache.spark
-import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.broadcast.Broadcast
-class GlutenBroadcastExchangeSuite extends BroadcastExchangeSuite with
GlutenSQLTestsBaseTrait {}
+import scala.reflect.ClassTag
+
+object SparkContextUtils {
+ def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T]
= {
+ sc.broadcastInternal(value, serializedOnly = true)
+ }
+}
diff --git
a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
index 93dce4fdc..1c87db39f 100644
---
a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
@@ -20,7 +20,8 @@ import io.glutenproject.GlutenConfig
import io.glutenproject.expression.{ExpressionNames, Sig}
import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims}
-import org.apache.spark.{ShuffleUtils, SparkContext, SparkException,
TaskContext, TaskContextUtils}
+import org.apache.spark.{ShuffleUtils, SparkContext, SparkContextUtils,
SparkException, TaskContext, TaskContextUtils}
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.paths.SparkPath
import org.apache.spark.scheduler.TaskInfo
@@ -52,6 +53,8 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import java.time.ZoneOffset
import java.util.{HashMap => JHashMap, Map => JMap}
+import scala.reflect.ClassTag
+
class Spark35Shims extends SparkShims {
override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
@@ -238,6 +241,10 @@ class Spark35Shims extends SparkShims {
TaskContextUtils.createTestTaskContext()
}
+ override def broadcastInternal[T: ClassTag](sc: SparkContext, value: T):
Broadcast[T] = {
+ SparkContextUtils.broadcastInternal(sc, value)
+ }
+
override def setJobDescriptionOrTagForBroadcastExchange(
sc: SparkContext,
broadcastExchange: BroadcastExchangeLike): Unit = {
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
b/shims/spark35/src/main/scala/org/apache/spark/SparkContextUtils.scala
similarity index 74%
copy from
gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
copy to shims/spark35/src/main/scala/org/apache/spark/SparkContextUtils.scala
index 481863354..3cbf2b602 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
+++ b/shims/spark35/src/main/scala/org/apache/spark/SparkContextUtils.scala
@@ -14,8 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.execution
+package org.apache.spark
-import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.broadcast.Broadcast
-class GlutenBroadcastExchangeSuite extends BroadcastExchangeSuite with
GlutenSQLTestsBaseTrait {}
+import scala.reflect.ClassTag
+
+object SparkContextUtils {
+ def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T]
= {
+ sc.broadcastInternal(value, serializedOnly = true)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]