This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 498efc7a33 [GLUTEN-8216][CH] Fix OOM when cartesian product with empty
data (#8219)
498efc7a33 is described below
commit 498efc7a332c0a7df4ea0a9c5c54987570c339ed
Author: Wenzheng Liu <[email protected]>
AuthorDate: Sun Dec 15 16:00:25 2024 +0800
[GLUTEN-8216][CH] Fix OOM when cartesian product with empty data (#8219)
* [GLUTEN-8216][CH] Fix OOM when cartesian product with empty data
* [GLUTEN-8216][CH] Fix CI
---
.../utils/RangePartitionerBoundsGenerator.scala | 11 ++++---
.../spark/sql/execution/utils/CHExecUtil.scala | 11 ++++---
.../GlutenClickHouseColumnarShuffleAQESuite.scala | 2 +-
.../execution/GlutenClickHouseJoinSuite.scala | 37 ++++++++++++++++++++++
...lutenClickHouseRSSColumnarShuffleAQESuite.scala | 2 +-
5 files changed, 52 insertions(+), 11 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/utils/RangePartitionerBoundsGenerator.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/utils/RangePartitionerBoundsGenerator.scala
index 706cc5f341..694035b878 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/utils/RangePartitionerBoundsGenerator.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/utils/RangePartitionerBoundsGenerator.scala
@@ -210,28 +210,31 @@ class RangePartitionerBoundsGenerator[K: Ordering:
ClassTag, V](
arrayNode
}
- private def buildRangeBoundsJson(jsonMapper: ObjectMapper, arrayNode:
ArrayNode): Unit = {
+ private def buildRangeBoundsJson(jsonMapper: ObjectMapper, arrayNode:
ArrayNode): Int = {
val bounds = getRangeBounds
bounds.foreach {
bound =>
val row = bound.asInstanceOf[UnsafeRow]
arrayNode.add(buildRangeBoundJson(row, ordering, jsonMapper))
}
+ bounds.length
}
// Make a json structure that can be passed to native engine
- def getRangeBoundsJsonString: String = {
+ def getRangeBoundsJsonString: RangeBoundsInfo = {
val context = new SubstraitContext()
val mapper = new ObjectMapper
val rootNode = mapper.createObjectNode
val orderingArray = rootNode.putArray("ordering")
buildOrderingJson(context, ordering, inputAttributes, mapper,
orderingArray)
val boundArray = rootNode.putArray("range_bounds")
- buildRangeBoundsJson(mapper, boundArray)
- mapper.writeValueAsString(rootNode)
+ val boundLength = buildRangeBoundsJson(mapper, boundArray)
+ RangeBoundsInfo(mapper.writeValueAsString(rootNode), boundLength)
}
}
+case class RangeBoundsInfo(json: String, boundsSize: Int)
+
object RangePartitionerBoundsGenerator {
def supportedFieldType(dataType: DataType): Boolean = {
dataType match {
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
index cc172ac4b5..83bde9d168 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
@@ -311,7 +311,7 @@ object CHExecUtil extends Logging {
rddForSampling,
sortingExpressions,
childOutputAttributes)
- val orderingAndRangeBounds = generator.getRangeBoundsJsonString
+ val rangeBoundsInfo = generator.getRangeBoundsJsonString
val attributePos = if (projectOutputAttributes != null) {
projectOutputAttributes.map(
attr =>
@@ -324,10 +324,11 @@ object CHExecUtil extends Logging {
}
new NativePartitioning(
GlutenShuffleUtils.RangePartitioningShortName,
- numPartitions,
+ rangeBoundsInfo.boundsSize + 1,
Array.empty[Byte],
- orderingAndRangeBounds.getBytes(),
- attributePos.mkString(",").getBytes)
+ rangeBoundsInfo.json.getBytes,
+ attributePos.mkString(",").getBytes
+ )
case p =>
throw new IllegalStateException(s"Unknow partition type:
${p.getClass.toString}")
}
@@ -368,7 +369,7 @@ object CHExecUtil extends Logging {
val dependency =
new ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch](
rddWithPartitionKey,
- new PartitionIdPassthrough(newPartitioning.numPartitions),
+ new PartitionIdPassthrough(nativePartitioning.getNumPartitions),
serializer,
shuffleWriterProcessor =
ShuffleExchangeExec.createShuffleWriteProcessor(writeMetrics),
nativePartitioning = nativePartitioning,
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala
index b43fc2625f..f837484b0c 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala
@@ -57,7 +57,7 @@ class GlutenClickHouseColumnarShuffleAQESuite
val coalescedPartitionSpec0 =
colCustomShuffleReaderExecs.head.partitionSpecs.head
.asInstanceOf[CoalescedPartitionSpec]
assert(coalescedPartitionSpec0.startReducerIndex == 0)
- assert(coalescedPartitionSpec0.endReducerIndex == 5)
+ assert(coalescedPartitionSpec0.endReducerIndex == 4)
val coalescedPartitionSpec1 =
colCustomShuffleReaderExecs(1).partitionSpecs.head
.asInstanceOf[CoalescedPartitionSpec]
assert(coalescedPartitionSpec1.startReducerIndex == 0)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseJoinSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseJoinSuite.scala
index 1a276a26b2..7a9abd3bad 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseJoinSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseJoinSuite.scala
@@ -21,8 +21,11 @@ import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.utils.UTSystemParameters
import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
+import java.util.concurrent.atomic.AtomicInteger
+
class GlutenClickHouseJoinSuite extends
GlutenClickHouseWholeStageTransformerSuite {
protected val tablesPath: String = basePath + "/tpch-data"
@@ -141,4 +144,38 @@ class GlutenClickHouseJoinSuite extends
GlutenClickHouseWholeStageTransformerSui
sql("drop table if exists tj2")
}
+ test("GLUTEN-8216 Fix OOM when cartesian product with empty data") {
+ // prepare
+ spark.sql("create table test_join(a int, b int, c int) using parquet")
+ var overrideConfs = Map(
+ "spark.sql.autoBroadcastJoinThreshold" -> "-1",
+ "spark.sql.shuffle.partitions" -> "1"
+ )
+ if (isSparkVersionGE("3.5")) {
+ // Range partitions will not be reduced if EliminateSorts is enabled in
spark35.
+ overrideConfs += "spark.sql.optimizer.excludedRules" ->
+ "org.apache.spark.sql.catalyst.optimizer.EliminateSorts"
+ }
+
+ withSQLConf(overrideConfs.toSeq: _*) {
+ val taskCount = new AtomicInteger(0)
+ val taskListener = new SparkListener {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+ taskCount.incrementAndGet()
+ logDebug(s"Task ${taskEnd.taskInfo.id} finished. Total tasks
completed: $taskCount")
+ }
+ }
+ spark.sparkContext.addSparkListener(taskListener)
+ spark
+ .sql(
+ "select * from " +
+ "(select a from test_join group by a order by a), " +
+ "(select b from test_join group by b order by b)" +
+ " limit 10000"
+ )
+ .collect()
+ assert(taskCount.get() < 500)
+ }
+ }
+
}
diff --git
a/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarShuffleAQESuite.scala
b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarShuffleAQESuite.scala
index 00f3bee8eb..e62dbdd2a5 100644
---
a/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarShuffleAQESuite.scala
+++
b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarShuffleAQESuite.scala
@@ -62,7 +62,7 @@ class GlutenClickHouseRSSColumnarShuffleAQESuite
.partitionSpecs(0)
.asInstanceOf[CoalescedPartitionSpec]
assert(coalescedPartitionSpec0.startReducerIndex == 0)
- assert(coalescedPartitionSpec0.endReducerIndex == 5)
+ assert(coalescedPartitionSpec0.endReducerIndex == 4)
val coalescedPartitionSpec1 = colCustomShuffleReaderExecs(1)
.partitionSpecs(0)
.asInstanceOf[CoalescedPartitionSpec]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]