This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d35d1dc5e [CH] Adaptive sort memory controll and support memory sort
shuffle (#5893)
d35d1dc5e is described below
commit d35d1dc5e4450fdf58b8092ea26a0c928de29a48
Author: LiuNeng <[email protected]>
AuthorDate: Thu May 30 13:09:25 2024 +0800
[CH] Adaptive sort memory controll and support memory sort shuffle (#5893)
* optimize sort and shuffle
* change block size config
* support memory sort local shuffle
* fix bug
* support memory sort shuffle
* update ch version
* fix check style
* fix bug
* fix bug
---------
Co-authored-by: liuneng1994 <[email protected]>
---
.../vectorized/CHShuffleSplitterJniWrapper.java | 21 +-
.../gluten/backendsapi/clickhouse/CHBackend.scala | 2 +-
.../backendsapi/clickhouse/CHTransformerApi.scala | 6 -
.../spark/shuffle/CHColumnarShuffleWriter.scala | 10 +-
...ickHouseColumnarExternalSortShuffleSuite.scala} | 63 +---
...ClickHouseColumnarMemorySortShuffleSuite.scala} | 63 +---
.../GlutenClickHouseMergeTreeOptimizeSuite.scala | 20 +-
cpp-ch/clickhouse.version | 2 +-
cpp-ch/local-engine/Common/CHUtil.cpp | 21 ++
cpp-ch/local-engine/Common/CHUtil.h | 4 +
cpp-ch/local-engine/Parser/SortRelParser.cpp | 9 +-
.../local-engine/Shuffle/CachedShuffleWriter.cpp | 79 +++--
cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h | 15 +-
cpp-ch/local-engine/Shuffle/PartitionWriter.cpp | 338 +++++++++++++++++----
cpp-ch/local-engine/Shuffle/PartitionWriter.h | 124 ++++++--
cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp | 6 +-
cpp-ch/local-engine/Shuffle/SelectorBuilder.h | 5 +-
cpp-ch/local-engine/Shuffle/ShuffleSplitter.h | 3 +-
.../Shuffle/SortedPartitionDataMerger.h | 2 +
cpp-ch/local-engine/local_engine_jni.cpp | 15 +-
.../CHCelebornHashBasedColumnarShuffleWriter.scala | 4 +-
...HouseRSSColumnarExternalSortShuffleSuite.scala} | 4 +-
...ickHouseRSSColumnarMemorySortShuffleSuite.scala | 136 +++++++++
.../scala/org/apache/gluten/GlutenConfig.scala | 31 +-
24 files changed, 704 insertions(+), 279 deletions(-)
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java
index f81ec88c2..815bf472c 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java
@@ -37,7 +37,8 @@ public class CHShuffleSplitterJniWrapper {
boolean flushBlockBufferBeforeEvict,
long maxSortBufferSize,
boolean spillFirstlyBeforeStop,
- boolean forceSort) {
+ boolean forceExternalSort,
+ boolean forceMemorySort) {
return nativeMake(
part.getShortName(),
part.getNumPartitions(),
@@ -57,7 +58,8 @@ public class CHShuffleSplitterJniWrapper {
flushBlockBufferBeforeEvict,
maxSortBufferSize,
spillFirstlyBeforeStop,
- forceSort);
+ forceExternalSort,
+ forceMemorySort);
}
public long makeForRSS(
@@ -70,7 +72,9 @@ public class CHShuffleSplitterJniWrapper {
String hashAlgorithm,
Object pusher,
boolean throwIfMemoryExceed,
- boolean flushBlockBufferBeforeEvict) {
+ boolean flushBlockBufferBeforeEvict,
+ boolean forceExternalSort,
+ boolean forceMemorySort) {
return nativeMakeForRSS(
part.getShortName(),
part.getNumPartitions(),
@@ -84,7 +88,9 @@ public class CHShuffleSplitterJniWrapper {
hashAlgorithm,
pusher,
throwIfMemoryExceed,
- flushBlockBufferBeforeEvict);
+ flushBlockBufferBeforeEvict,
+ forceExternalSort,
+ forceMemorySort);
}
public native long nativeMake(
@@ -106,7 +112,8 @@ public class CHShuffleSplitterJniWrapper {
boolean flushBlockBufferBeforeEvict,
long maxSortBufferSize,
boolean spillFirstlyBeforeStop,
- boolean forceSort);
+ boolean forceSort,
+ boolean forceMemorySort);
public native long nativeMakeForRSS(
String shortName,
@@ -121,7 +128,9 @@ public class CHShuffleSplitterJniWrapper {
String hashAlgorithm,
Object pusher,
boolean throwIfMemoryExceed,
- boolean flushBlockBufferBeforeEvict);
+ boolean flushBlockBufferBeforeEvict,
+ boolean forceSort,
+ boolean forceMemorySort);
public native void split(long splitterId, long block);
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index bc0c8d1c0..e5f68a869 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -127,7 +127,7 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
val GLUTEN_MAX_SHUFFLE_READ_BYTES: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".runtime_config.max_source_concatenate_bytes"
- val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = -1
+ val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = GLUTEN_MAX_BLOCK_SIZE_DEFAULT *
256
def affinityMode: String = {
SparkEnv.get.conf
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
index c75cf4788..ea3398e77 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
@@ -91,12 +91,6 @@ class CHTransformerApi extends TransformerApi with Logging {
val offHeapSize =
nativeConfMap.getOrDefault("spark.gluten.memory.offHeap.size.in.bytes",
"0").toLong
if (offHeapSize > 0) {
- // Only set default max_bytes_before_external_sort for CH when it is
not set explicitly.
- val sortSpillKey = settingPrefix + "max_bytes_before_external_sort";
- if (!nativeConfMap.containsKey(sortSpillKey)) {
- val sortSpillValue = offHeapSize * 0.5
- nativeConfMap.put(sortSpillKey, sortSpillValue.toLong.toString)
- }
// Only set default max_bytes_before_external_group_by for CH when it
is not set explicitly.
val groupBySpillKey = settingPrefix +
"max_bytes_before_external_group_by";
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
index 3a80e18bd..4a1adbec7 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
@@ -61,7 +61,8 @@ class CHColumnarShuffleWriter[K, V](
GlutenConfig.getConf.chColumnarFlushBlockBufferBeforeEvict
private val maxSortBufferSize =
GlutenConfig.getConf.chColumnarMaxSortBufferSize
private val spillFirstlyBeforeStop =
GlutenConfig.getConf.chColumnarSpillFirstlyBeforeStop
- private val forceSortShuffle =
GlutenConfig.getConf.chColumnarForceSortShuffle
+ private val forceExternalSortShuffle =
GlutenConfig.getConf.chColumnarForceExternalSortShuffle
+ private val forceMemorySortShuffle =
GlutenConfig.getConf.chColumnarForceMemorySortShuffle
private val spillThreshold =
GlutenConfig.getConf.chColumnarShuffleSpillThreshold
private val jniWrapper = new CHShuffleSplitterJniWrapper
// Are we in the process of stopping? Because map tasks can call stop() with
success = true
@@ -115,7 +116,8 @@ class CHColumnarShuffleWriter[K, V](
flushBlockBufferBeforeEvict,
maxSortBufferSize,
spillFirstlyBeforeStop,
- forceSortShuffle
+ forceExternalSortShuffle,
+ forceMemorySortShuffle
)
CHNativeMemoryAllocators.createSpillable(
"ShuffleWriter",
@@ -127,9 +129,9 @@ class CHColumnarShuffleWriter[K, V](
"is created. This behavior should be optimized by moving
memory " +
"allocations from make() to split()")
}
- logInfo(s"Gluten shuffle writer: Trying to spill $size bytes of
data")
+ logError(s"Gluten shuffle writer: Trying to spill $size bytes of
data")
val spilled = splitterJniWrapper.evict(nativeSplitter);
- logInfo(s"Gluten shuffle writer: Spilled $spilled / $size bytes of
data")
+ logError(s"Gluten shuffle writer: Spilled $spilled / $size bytes
of data")
spilled
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarSortShuffleAQESuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarExternalSortShuffleSuite.scala
similarity index 52%
copy from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarSortShuffleAQESuite.scala
copy to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarExternalSortShuffleSuite.scala
index 098c7117b..be36cd998 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarSortShuffleAQESuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarExternalSortShuffleSuite.scala
@@ -17,10 +17,9 @@
package org.apache.gluten.execution
import org.apache.spark.SparkConf
-import org.apache.spark.sql.execution.CoalescedPartitionSpec
-import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AdaptiveSparkPlanHelper, AQEShuffleReadExec}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-class GlutenClickHouseColumnarSortShuffleAQESuite
+class GlutenClickHouseColumnarExternalSortShuffleSuite
extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {
@@ -36,29 +35,11 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
- .set("spark.gluten.sql.columnar.backend.ch.forceSortShuffle", "true")
+ .set("spark.gluten.sql.columnar.backend.ch.forceExternalSortShuffle",
"true")
}
test("TPCH Q1") {
- runTPCHQuery(1) {
- df =>
-
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
-
- val colCustomShuffleReaderExecs =
collect(df.queryExecution.executedPlan) {
- case csr: AQEShuffleReadExec => csr
- }
- assert(colCustomShuffleReaderExecs.size == 2)
- val coalescedPartitionSpec0 = colCustomShuffleReaderExecs(0)
- .partitionSpecs(0)
- .asInstanceOf[CoalescedPartitionSpec]
- assert(coalescedPartitionSpec0.startReducerIndex == 0)
- assert(coalescedPartitionSpec0.endReducerIndex == 5)
- val coalescedPartitionSpec1 = colCustomShuffleReaderExecs(1)
- .partitionSpecs(0)
- .asInstanceOf[CoalescedPartitionSpec]
- assert(coalescedPartitionSpec1.startReducerIndex == 0)
- assert(coalescedPartitionSpec1.endReducerIndex == 5)
- }
+ runTPCHQuery(1) { df => }
}
test("TPCH Q2") {
@@ -98,14 +79,7 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
}
test("TPCH Q11") {
- runTPCHQuery(11) {
- df =>
-
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
- val adaptiveSparkPlanExec =
collectWithSubqueries(df.queryExecution.executedPlan) {
- case adaptive: AdaptiveSparkPlanExec => adaptive
- }
- assert(adaptiveSparkPlanExec.size == 2)
- }
+ runTPCHQuery(11) { df => }
}
test("TPCH Q12") {
@@ -121,14 +95,7 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
}
test("TPCH Q15") {
- runTPCHQuery(15) {
- df =>
-
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
- val adaptiveSparkPlanExec =
collectWithSubqueries(df.queryExecution.executedPlan) {
- case adaptive: AdaptiveSparkPlanExec => adaptive
- }
- assert(adaptiveSparkPlanExec.size == 2)
- }
+ runTPCHQuery(15) { df => }
}
test("TPCH Q16") {
@@ -140,13 +107,7 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
}
test("TPCH Q18") {
- runTPCHQuery(18) {
- df =>
- val hashAggregates = collect(df.queryExecution.executedPlan) {
- case hash: HashAggregateExecBaseTransformer => hash
- }
- assert(hashAggregates.size == 3)
- }
+ runTPCHQuery(18) { df => }
}
test("TPCH Q19") {
@@ -162,14 +123,6 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
}
test("TPCH Q22") {
- runTPCHQuery(22) {
- df =>
-
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
- val adaptiveSparkPlanExec =
collectWithSubqueries(df.queryExecution.executedPlan) {
- case adaptive: AdaptiveSparkPlanExec => adaptive
- }
- assert(adaptiveSparkPlanExec.size == 3)
- assert(adaptiveSparkPlanExec(1) == adaptiveSparkPlanExec(2))
- }
+ runTPCHQuery(22) { df => }
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarSortShuffleAQESuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala
similarity index 52%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarSortShuffleAQESuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala
index 098c7117b..b9d580c72 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarSortShuffleAQESuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala
@@ -17,10 +17,9 @@
package org.apache.gluten.execution
import org.apache.spark.SparkConf
-import org.apache.spark.sql.execution.CoalescedPartitionSpec
-import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AdaptiveSparkPlanHelper, AQEShuffleReadExec}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-class GlutenClickHouseColumnarSortShuffleAQESuite
+class GlutenClickHouseColumnarMemorySortShuffleSuite
extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {
@@ -36,29 +35,11 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
- .set("spark.gluten.sql.columnar.backend.ch.forceSortShuffle", "true")
+ .set("spark.gluten.sql.columnar.backend.ch.forceMemorySortShuffle",
"true")
}
test("TPCH Q1") {
- runTPCHQuery(1) {
- df =>
-
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
-
- val colCustomShuffleReaderExecs =
collect(df.queryExecution.executedPlan) {
- case csr: AQEShuffleReadExec => csr
- }
- assert(colCustomShuffleReaderExecs.size == 2)
- val coalescedPartitionSpec0 = colCustomShuffleReaderExecs(0)
- .partitionSpecs(0)
- .asInstanceOf[CoalescedPartitionSpec]
- assert(coalescedPartitionSpec0.startReducerIndex == 0)
- assert(coalescedPartitionSpec0.endReducerIndex == 5)
- val coalescedPartitionSpec1 = colCustomShuffleReaderExecs(1)
- .partitionSpecs(0)
- .asInstanceOf[CoalescedPartitionSpec]
- assert(coalescedPartitionSpec1.startReducerIndex == 0)
- assert(coalescedPartitionSpec1.endReducerIndex == 5)
- }
+ runTPCHQuery(1) { df => }
}
test("TPCH Q2") {
@@ -98,14 +79,7 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
}
test("TPCH Q11") {
- runTPCHQuery(11) {
- df =>
-
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
- val adaptiveSparkPlanExec =
collectWithSubqueries(df.queryExecution.executedPlan) {
- case adaptive: AdaptiveSparkPlanExec => adaptive
- }
- assert(adaptiveSparkPlanExec.size == 2)
- }
+ runTPCHQuery(11) { df => }
}
test("TPCH Q12") {
@@ -121,14 +95,7 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
}
test("TPCH Q15") {
- runTPCHQuery(15) {
- df =>
-
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
- val adaptiveSparkPlanExec =
collectWithSubqueries(df.queryExecution.executedPlan) {
- case adaptive: AdaptiveSparkPlanExec => adaptive
- }
- assert(adaptiveSparkPlanExec.size == 2)
- }
+ runTPCHQuery(15) { df => }
}
test("TPCH Q16") {
@@ -140,13 +107,7 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
}
test("TPCH Q18") {
- runTPCHQuery(18) {
- df =>
- val hashAggregates = collect(df.queryExecution.executedPlan) {
- case hash: HashAggregateExecBaseTransformer => hash
- }
- assert(hashAggregates.size == 3)
- }
+ runTPCHQuery(18) { df => }
}
test("TPCH Q19") {
@@ -162,14 +123,6 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
}
test("TPCH Q22") {
- runTPCHQuery(22) {
- df =>
-
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
- val adaptiveSparkPlanExec =
collectWithSubqueries(df.queryExecution.executedPlan) {
- case adaptive: AdaptiveSparkPlanExec => adaptive
- }
- assert(adaptiveSparkPlanExec.size == 3)
- assert(adaptiveSparkPlanExec(1) == adaptiveSparkPlanExec(2))
- }
+ runTPCHQuery(22) { df => }
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
index d4302193f..f016f9dc5 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
@@ -157,9 +157,9 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(ret.apply(0).get(0) == 600572)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
- assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2"))
== 812)
+ assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2"))
== 372)
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
- assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2"))
== 232)
+ assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2"))
== 239)
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
// the second VACUUM will remove some empty folders
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2"))
== 220)
@@ -188,11 +188,11 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(ret.apply(0).get(0) == 600572)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
- assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3"))
== 398)
+ assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3"))
== 516)
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
- assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3"))
== 286)
+ assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3"))
== 306)
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
- assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3"))
== 270)
+ assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3"))
== 276)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
val ret2 = spark.sql("select count(*) from
lineitem_mergetree_optimize_p3").collect()
@@ -219,11 +219,11 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(ret.apply(0).get(0) == 600572)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
- assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4"))
== 398)
+ assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4"))
== 516)
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
- assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4"))
== 286)
+ assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4"))
== 306)
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
- assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4"))
== 270)
+ assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4"))
== 276)
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
val ret2 = spark.sql("select count(*) from
lineitem_mergetree_optimize_p4").collect()
@@ -313,12 +313,12 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6"))
== {
- if (sparkVersion.equals("3.2")) 931 else 1014
+ if (sparkVersion.equals("3.2")) 499 else 528
})
spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6"))
== {
- if (sparkVersion.equals("3.2")) 439 else 445
+ if (sparkVersion.equals("3.2")) 315 else 321
})
spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index b5d3aac8b..5125aabe5 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20240527
-CH_COMMIT=dd16f9435bf
+CH_COMMIT=55b10ba376274f2a61a4c1daf1a2fb744155bd32
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 9e2ce6304..317adda2a 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -555,6 +555,12 @@ DB::Context::ConfigurationPtr
BackendInitializerUtil::initConfig(std::map<std::s
config->setString(key.substr(CH_RUNTIME_CONFIG_PREFIX.size()),
value);
}
}
+
+ if (backend_conf_map.contains(GLUTEN_TASK_OFFHEAP))
+ {
+ config->setString(CH_TASK_MEMORY,
backend_conf_map.at(GLUTEN_TASK_OFFHEAP));
+ }
+
return config;
}
@@ -672,6 +678,21 @@ void
BackendInitializerUtil::initSettings(std::map<std::string, std::string> & b
settings.set("function_json_value_return_type_allow_complex", true);
settings.set("function_json_value_return_type_allow_nullable", true);
settings.set("precise_float_parsing", true);
+ if (backend_conf_map.contains(GLUTEN_TASK_OFFHEAP))
+ {
+ auto task_memory =
std::stoull(backend_conf_map.at(GLUTEN_TASK_OFFHEAP));
+ if (!backend_conf_map.contains(CH_RUNTIME_SETTINGS_PREFIX +
"max_bytes_before_external_sort"))
+ {
+ settings.max_bytes_before_external_sort = static_cast<size_t>(0.8
* task_memory);
+ }
+ if (!backend_conf_map.contains(CH_RUNTIME_SETTINGS_PREFIX +
"prefer_external_sort_block_bytes"))
+ {
+ auto mem_gb = task_memory / static_cast<double>(1_GiB);
+ // 2.8x+5, Heuristics calculate the block size of external sort,
[8,16]
+ settings.prefer_external_sort_block_bytes = std::max(std::min(
+ static_cast<size_t>(2.8*mem_gb + 5), 16ul), 8ul) * 1024 * 1024;
+ }
+ }
}
void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
diff --git a/cpp-ch/local-engine/Common/CHUtil.h
b/cpp-ch/local-engine/Common/CHUtil.h
index edbd91c50..458eec9d3 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -1,3 +1,4 @@
+/*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -168,6 +169,9 @@ public:
inline static const std::string S3A_PREFIX = "fs.s3a.";
inline static const std::string SPARK_DELTA_PREFIX =
"spark.databricks.delta.";
+ inline static const String GLUTEN_TASK_OFFHEAP =
"spark.gluten.memory.task.offHeap.size.in.bytes";
+ inline static const String CH_TASK_MEMORY = "off_heap_per_task";
+
/// On yarn mode, native writing on hdfs cluster takes yarn container user
as the user passed to libhdfs3, which
/// will cause permission issue because yarn container user is not the
owner of the hdfs dir to be written.
/// So we need to get the spark user from env and pass it to libhdfs3.
diff --git a/cpp-ch/local-engine/Parser/SortRelParser.cpp
b/cpp-ch/local-engine/Parser/SortRelParser.cpp
index 88141d030..ea29e72d1 100644
--- a/cpp-ch/local-engine/Parser/SortRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/SortRelParser.cpp
@@ -40,8 +40,15 @@ SortRelParser::parse(DB::QueryPlanPtr query_plan, const
substrait::Rel & rel, st
size_t limit = parseLimit(rel_stack_);
const auto & sort_rel = rel.sort();
auto sort_descr = parseSortDescription(sort_rel.sorts(),
query_plan->getCurrentDataStream().header);
+ SortingStep::Settings settings(*getContext());
+ size_t offheap_per_task =
getContext()->getConfigRef().getUInt64("off_heap_per_task");
+ double spill_mem_ratio =
getContext()->getConfigRef().getDouble("spill_mem_ratio", 0.9);
+ settings.worth_external_sort = [offheap_per_task, spill_mem_ratio]() ->
bool
+ {
+ return CurrentMemoryTracker::current_memory() > offheap_per_task *
spill_mem_ratio;
+ };
auto sorting_step = std::make_unique<DB::SortingStep>(
- query_plan->getCurrentDataStream(), sort_descr, limit,
SortingStep::Settings(*getContext()), false);
+ query_plan->getCurrentDataStream(), sort_descr, limit, settings,
false);
sorting_step->setStepDescription("Sorting step");
steps.emplace_back(sorting_step.get());
query_plan->addStep(std::move(sorting_step));
diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
index 5a8629434..559d90318 100644
--- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
+++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
@@ -25,25 +25,23 @@
namespace DB
{
-
namespace ErrorCodes
{
- extern const int BAD_ARGUMENTS;
+extern const int BAD_ARGUMENTS;
}
}
namespace local_engine
{
-
using namespace DB;
-CachedShuffleWriter::CachedShuffleWriter(const String & short_name, const
SplitOptions & options_, jobject rss_pusher) : options(options_)
+CachedShuffleWriter::CachedShuffleWriter(const String & short_name, const
SplitOptions & options_, jobject rss_pusher)
+ : options(options_)
{
- bool use_external_sort_shuffle = (options.force_sort) && !rss_pusher;
if (short_name == "rr")
{
- partitioner =
std::make_unique<RoundRobinSelectorBuilder>(options.partition_num,
use_external_sort_shuffle);
+ partitioner =
std::make_unique<RoundRobinSelectorBuilder>(options.partition_num);
}
else if (short_name == "hash")
{
@@ -53,15 +51,15 @@ CachedShuffleWriter::CachedShuffleWriter(const String &
short_name, const SplitO
{
hash_fields.push_back(std::stoi(expr));
}
- partitioner =
std::make_unique<HashSelectorBuilder>(options.partition_num, hash_fields,
options_.hash_algorithm, use_external_sort_shuffle);
+ partitioner =
std::make_unique<HashSelectorBuilder>(options.partition_num, hash_fields,
options_.hash_algorithm);
}
else if (short_name == "single")
{
options.partition_num = 1;
- partitioner =
std::make_unique<RoundRobinSelectorBuilder>(options.partition_num,
use_external_sort_shuffle);
+ partitioner =
std::make_unique<RoundRobinSelectorBuilder>(options.partition_num);
}
else if (short_name == "range")
- partitioner =
std::make_unique<RangeSelectorBuilder>(options.hash_exprs,
options.partition_num, use_external_sort_shuffle);
+ partitioner =
std::make_unique<RangeSelectorBuilder>(options.hash_exprs,
options.partition_num);
else
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "unsupported
splitter {}", short_name);
@@ -77,32 +75,17 @@ CachedShuffleWriter::CachedShuffleWriter(const String &
short_name, const SplitO
jmethodID celeborn_push_partition_data_method =
GetMethodID(env, celeborn_partition_pusher_class,
"pushPartitionData", "(I[BI)I");
CLEAN_JNIENV
- auto celeborn_client = std::make_unique<CelebornClient>(rss_pusher,
celeborn_push_partition_data_method);
- if (use_external_sort_shuffle)
- {
- partition_writer =
std::make_unique<ExternalSortCelebornPartitionWriter>(this,
std::move(celeborn_client));
- sort_shuffle = true;
- }
- else
- partition_writer = std::make_unique<CelebornPartitionWriter>(this,
std::move(celeborn_client));
- }
- else
- {
- if (use_external_sort_shuffle)
- {
- partition_writer =
std::make_unique<ExternalSortLocalPartitionWriter>(this);
- sort_shuffle = true;
- }
- else
- partition_writer = std::make_unique<LocalPartitionWriter>(this);
+ celeborn_client = std::make_unique<CelebornClient>(rss_pusher,
celeborn_push_partition_data_method);
}
+
split_result.partition_lengths.resize(options.partition_num, 0);
split_result.raw_partition_lengths.resize(options.partition_num, 0);
}
void CachedShuffleWriter::split(DB::Block & block)
{
+ lazyInitPartitionWriter(block);
auto block_info = block.info;
initOutputIfNeeded(block);
@@ -145,18 +128,50 @@ void CachedShuffleWriter::initOutputIfNeeded(Block &
block)
}
}
-SplitResult CachedShuffleWriter::stop()
+void CachedShuffleWriter::lazyInitPartitionWriter(Block & input_sample)
{
- partition_writer->stop();
+ if (partition_writer)
+ return;
+
+ auto avg_row_size = input_sample.allocatedBytes() / input_sample.rows();
+ auto overhead_memory = std::max(avg_row_size, input_sample.columns() * 16)
* options.split_size * options.partition_num;
+ auto use_sort_shuffle = overhead_memory > options.spill_threshold * 0.5 ||
options.partition_num >= 300;
+ auto use_external_sort_shuffle = options.force_external_sort;
+ auto use_memory_sort_shuffle = options.force_mermory_sort ||
use_sort_shuffle;
+ sort_shuffle = use_memory_sort_shuffle || use_external_sort_shuffle;
+ if (celeborn_client)
+ {
+ if (use_external_sort_shuffle)
+ partition_writer =
std::make_unique<ExternalSortCelebornPartitionWriter>(this,
std::move(celeborn_client));
+ else if (use_memory_sort_shuffle)
+ partition_writer =
std::make_unique<MemorySortCelebornPartitionWriter>(this,
std::move(celeborn_client));
+ else
+ partition_writer = std::make_unique<CelebornPartitionWriter>(this,
std::move(celeborn_client));
+ }
+ else
+ {
+ if (use_external_sort_shuffle)
+ partition_writer =
std::make_unique<ExternalSortLocalPartitionWriter>(this);
+ else if (use_memory_sort_shuffle)
+ partition_writer =
std::make_unique<MemorySortLocalPartitionWriter>(this);
+ else
+ partition_writer = std::make_unique<LocalPartitionWriter>(this);
+ }
+ partitioner->setUseSortShuffle(sort_shuffle);
+ LOG_INFO(logger, "Use Partition Writer {}", partition_writer->getName());
+}
- static auto * logger = &Poco::Logger::get("CachedShuffleWriter");
+SplitResult CachedShuffleWriter::stop()
+{
+ if (partition_writer)
+ partition_writer->stop();
LOG_INFO(logger, "CachedShuffleWriter stop, split result: {}",
split_result.toString());
return split_result;
}
size_t CachedShuffleWriter::evictPartitions()
{
+ if (!partition_writer) return 0;
return partition_writer->evictPartitions(true,
options.flush_block_buffer_before_evict);
}
-
-}
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h
b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h
index d1dd4ff2f..e6395c8e4 100644
--- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h
+++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h
@@ -24,10 +24,10 @@
namespace local_engine
{
-
-class PartitionWriter;
-class LocalPartitionWriter;
-class CelebornPartitionWriter;
+ class CelebornClient;
+ class PartitionWriter;
+ class LocalPartitionWriter;
+ class CelebornPartitionWriter;
class CachedShuffleWriter : public ShuffleWriterBase
{
@@ -35,8 +35,12 @@ public:
friend class PartitionWriter;
friend class LocalPartitionWriter;
friend class CelebornPartitionWriter;
+ friend class SortBasedPartitionWriter;
+ friend class MemorySortLocalPartitionWriter;
+ friend class MemorySortCelebornPartitionWriter;
friend class ExternalSortLocalPartitionWriter;
friend class ExternalSortCelebornPartitionWriter;
+ friend class Spillable;
explicit CachedShuffleWriter(const String & short_name, const SplitOptions
& options, jobject rss_pusher = nullptr);
~CachedShuffleWriter() override = default;
@@ -47,6 +51,7 @@ public:
private:
void initOutputIfNeeded(DB::Block & block);
+ void lazyInitPartitionWriter(DB::Block & input_sample);
bool stopped = false;
DB::Block output_header;
@@ -55,7 +60,9 @@ private:
std::unique_ptr<SelectorBuilder> partitioner;
std::vector<size_t> output_columns_indicies;
std::unique_ptr<PartitionWriter> partition_writer;
+ std::unique_ptr<CelebornClient> celeborn_client;
bool sort_shuffle = false;
+ Poco::Logger* logger = &Poco::Logger::get("CachedShuffleWriter");
};
}
diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
index e0b69316d..d02c79e0a 100644
--- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
+++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
@@ -49,6 +49,7 @@ extern const int LOGICAL_ERROR;
using namespace DB;
namespace local_engine
{
+static const String PARTITION_COLUMN_NAME = "partition";
void PartitionWriter::write(const PartitionInfo & partition_info, DB::Block &
block)
{
@@ -120,7 +121,7 @@ void PartitionWriter::write(const PartitionInfo &
partition_info, DB::Block & bl
}
/// Only works for local partition writer
- if (!supportsEvictSinglePartition() && options->spill_threshold &&
current_cached_bytes >= options->spill_threshold)
+ if (!supportsEvictSinglePartition() && options->spill_threshold &&
CurrentMemoryTracker::current_memory() >= options->spill_threshold)
unsafeEvictPartitions(false, options->flush_block_buffer_before_evict);
shuffle_writer->split_result.total_split_time +=
watch.elapsedNanoseconds();
@@ -157,20 +158,18 @@ size_t LocalPartitionWriter::unsafeEvictPartitions(bool
for_memory_spill, bool f
if (buffer->empty())
continue;
- PartitionSpillInfo partition_spill_info;
- partition_spill_info.start = output.count();
+ std::pair<size_t, size_t> offsets;
+ offsets.first = output.count();
spilled_bytes += buffer->bytes();
size_t written_bytes = buffer->spill(writer);
res += written_bytes;
compressed_output.sync();
- partition_spill_info.length = output.count() -
partition_spill_info.start;
+ offsets.second = output.count() - offsets.first;
shuffle_writer->split_result.raw_partition_lengths[partition_id]
+= written_bytes;
- partition_spill_info.partition_id = partition_id;
- info.partition_spill_infos.emplace_back(partition_spill_info);
+ info.partition_spill_infos[partition_id] = offsets;
}
-
spill_infos.emplace_back(info);
shuffle_writer->split_result.total_compress_time +=
compressed_output.getCompressTime();
shuffle_writer->split_result.total_write_time +=
compressed_output.getWriteTime();
@@ -182,8 +181,7 @@ size_t LocalPartitionWriter::unsafeEvictPartitions(bool
for_memory_spill, bool f
{
// escape memory track from current thread status; add untracked
memory limit for create thread object, avoid trigger memory spill again
IgnoreMemoryTracker ignore(settings.spill_memory_overhead);
- ThreadFromGlobalPool thread(spill_to_file);
- thread.join();
+ spill_to_file();
}
else
{
@@ -194,20 +192,35 @@ size_t LocalPartitionWriter::unsafeEvictPartitions(bool
for_memory_spill, bool f
return res;
}
-std::vector<UInt64> LocalPartitionWriter::mergeSpills(WriteBuffer & data_file)
+String Spillable::getNextSpillFile()
+{
+ auto file_name = std::to_string(split_options.shuffle_id) + "_" +
std::to_string(split_options.map_id) + "_" + std::to_string(spill_infos.size());
+ std::hash<std::string> hasher;
+ auto hash = hasher(file_name);
+ auto dir_id = hash % split_options.local_dirs_list.size();
+ auto sub_dir_id = (hash / split_options.local_dirs_list.size()) %
split_options.num_sub_dirs;
+
+ std::string dir =
std::filesystem::path(split_options.local_dirs_list[dir_id]) /
std::format("{:02x}", sub_dir_id);
+ if (!std::filesystem::exists(dir))
+ std::filesystem::create_directories(dir);
+ return std::filesystem::path(dir) / file_name;
+}
+
+std::vector<UInt64> Spillable::mergeSpills(CachedShuffleWriter *
shuffle_writer, WriteBuffer & data_file, ExtraData extra_data)
{
auto codec =
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
{});
+
CompressedWriteBuffer compressed_output(data_file, codec,
shuffle_writer->options.io_buffer_size);
NativeWriter writer(compressed_output, shuffle_writer->output_header);
std::vector<UInt64>
partition_length(shuffle_writer->options.partition_num, 0);
- std::vector<ReadBufferPtr> spill_inputs;
+ std::vector<std::shared_ptr<ReadBufferFromFile>> spill_inputs;
spill_inputs.reserve(spill_infos.size());
for (const auto & spill : spill_infos)
{
// only use readBig
-
spill_inputs.emplace_back(std::make_shared<ReadBufferFromFile>(spill.spilled_file,
0));
+
spill_inputs.emplace_back(std::make_shared<ReadBufferFromFilePRead>(spill.spilled_file,
0));
}
Stopwatch write_time_watch;
@@ -215,33 +228,46 @@ std::vector<UInt64>
LocalPartitionWriter::mergeSpills(WriteBuffer & data_file)
Stopwatch serialization_time_watch;
size_t merge_io_time = 0;
String buffer;
- for (size_t partition_id = 0; partition_id <
partition_block_buffer.size(); ++partition_id)
+ for (size_t partition_id = 0; partition_id < split_options.partition_num;
++partition_id)
{
auto size_before = data_file.count();
io_time_watch.restart();
for (size_t i = 0; i < spill_infos.size(); ++i)
{
- size_t size =
spill_infos[i].partition_spill_infos[partition_id].length;
+ if (!spill_infos[i].partition_spill_infos.contains(partition_id))
+ {
+ continue;
+ }
+ size_t size =
spill_infos[i].partition_spill_infos[partition_id].second;
+ size_t offset =
spill_infos[i].partition_spill_infos[partition_id].first;
+ if (!size)
+ {
+ continue;
+ }
buffer.reserve(size);
- auto count = spill_inputs[i]->readBig(buffer.data(), size);
+ auto count = spill_inputs[i]->readBigAt(buffer.data(), size,
offset, nullptr);
+
+ chassert(count == size);
data_file.write(buffer.data(), count);
}
merge_io_time += io_time_watch.elapsedNanoseconds();
serialization_time_watch.restart();
- if (!partition_block_buffer[partition_id]->empty())
+ if (!extra_data.partition_block_buffer.empty() &&
!extra_data.partition_block_buffer[partition_id]->empty())
{
- Block block =
partition_block_buffer[partition_id]->releaseColumns();
- partition_buffer[partition_id]->addBlock(std::move(block));
+ Block block =
extra_data.partition_block_buffer[partition_id]->releaseColumns();
+
extra_data.partition_buffer[partition_id]->addBlock(std::move(block));
+ }
+ if (!extra_data.partition_buffer.empty())
+ {
+ size_t raw_size =
extra_data.partition_buffer[partition_id]->spill(writer);
+ shuffle_writer->split_result.raw_partition_lengths[partition_id]
+= raw_size;
}
- size_t raw_size = partition_buffer[partition_id]->spill(writer);
-
compressed_output.sync();
partition_length[partition_id] = data_file.count() - size_before;
shuffle_writer->split_result.total_serialize_time +=
serialization_time_watch.elapsedNanoseconds();
shuffle_writer->split_result.total_bytes_written +=
partition_length[partition_id];
- shuffle_writer->split_result.raw_partition_lengths[partition_id] +=
raw_size;
}
shuffle_writer->split_result.total_write_time +=
write_time_watch.elapsedNanoseconds();
@@ -253,32 +279,43 @@ std::vector<UInt64>
LocalPartitionWriter::mergeSpills(WriteBuffer & data_file)
for (const auto & spill : spill_infos)
std::filesystem::remove(spill.spilled_file);
-
return partition_length;
}
-LocalPartitionWriter::LocalPartitionWriter(CachedShuffleWriter *
shuffle_writer_) : PartitionWriter(shuffle_writer_)
+void SortBasedPartitionWriter::write(const PartitionInfo & info, DB::Block &
block)
{
+ Stopwatch write_time_watch;
+ if (output_header.columns() == 0)
+ output_header = block.cloneEmpty();
+ auto partition_column = ColumnUInt64::create();
+ partition_column->reserve(block.rows());
+
partition_column->getData().insert_assume_reserved(info.src_partition_num.begin(),
info.src_partition_num.end());
+ block.insert({std::move(partition_column),
std::make_shared<DataTypeUInt64>(), PARTITION_COLUMN_NAME});
+ if (sort_header.columns() == 0)
+ {
+ sort_header = block.cloneEmpty();
+
sort_description.emplace_back(SortColumnDescription(PARTITION_COLUMN_NAME));
+ }
+ // partial sort
+ sortBlock(block, sort_description);
+ Chunk chunk;
+ chunk.setColumns(block.getColumns(), block.rows());
+ accumulated_blocks.emplace_back(std::move(chunk));
+ current_accumulated_bytes += accumulated_blocks.back().allocatedBytes();
+ current_accumulated_rows += accumulated_blocks.back().getNumRows();
+ shuffle_writer->split_result.total_write_time +=
write_time_watch.elapsedNanoseconds();
+ if (options->spill_threshold && CurrentMemoryTracker::current_memory() >=
options->spill_threshold)
+ unsafeEvictPartitions(false, false);
}
-String LocalPartitionWriter::getNextSpillFile()
+LocalPartitionWriter::LocalPartitionWriter(CachedShuffleWriter *
shuffle_writer_) : PartitionWriter(shuffle_writer_),
Spillable(shuffle_writer_->options)
{
- auto file_name = std::to_string(options->shuffle_id) + "_" +
std::to_string(options->map_id) + "_" + std::to_string(spill_infos.size());
- std::hash<std::string> hasher;
- auto hash = hasher(file_name);
- auto dir_id = hash % options->local_dirs_list.size();
- auto sub_dir_id = (hash / options->local_dirs_list.size()) %
options->num_sub_dirs;
-
- std::string dir = std::filesystem::path(options->local_dirs_list[dir_id])
/ std::format("{:02x}", sub_dir_id);
- if (!std::filesystem::exists(dir))
- std::filesystem::create_directories(dir);
- return std::filesystem::path(dir) / file_name;
}
void LocalPartitionWriter::unsafeStop()
{
WriteBufferFromFile output(options->data_file, options->io_buffer_size);
- auto offsets = mergeSpills(output);
+ auto offsets = mergeSpills(shuffle_writer, output,
{partition_block_buffer, partition_buffer});
shuffle_writer->split_result.partition_lengths = offsets;
}
@@ -335,30 +372,211 @@ size_t PartitionWriter::bytes() const
return bytes;
}
-void ExternalSortLocalPartitionWriter::write(const PartitionInfo & info,
DB::Block & block)
+size_t MemorySortLocalPartitionWriter::unsafeEvictPartitions(bool
for_memory_spill, bool /*flush_block_buffer*/)
{
- Stopwatch write_time_watch;
- if (output_header.columns() == 0)
- output_header = block.cloneEmpty();
- static const String partition_column_name = "partition";
- auto partition_column = ColumnUInt64::create();
- partition_column->reserve(block.rows());
-
partition_column->getData().insert_assume_reserved(info.src_partition_num.begin(),
info.src_partition_num.end());
- block.insert({std::move(partition_column),
std::make_shared<DataTypeUInt64>(), partition_column_name});
- if (sort_header.columns() == 0)
+ size_t res = 0;
+ size_t spilled_bytes = 0;
+
+ auto spill_to_file = [this, &res, &spilled_bytes]()
{
- sort_header = block.cloneEmpty();
-
sort_description.emplace_back(SortColumnDescription(partition_column_name));
+ if (accumulated_blocks.empty())
+ return;
+ auto file = getNextSpillFile();
+ WriteBufferFromFile output(file,
shuffle_writer->options.io_buffer_size);
+ auto codec =
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
{});
+ CompressedWriteBuffer compressed_output(output, codec,
shuffle_writer->options.io_buffer_size);
+ NativeWriter writer(compressed_output, output_header);
+
+ SpillInfo info;
+ info.spilled_file = file;
+
+ Stopwatch serialization_time_watch;
+ MergeSorter sorter(sort_header, std::move(accumulated_blocks),
sort_description, adaptiveBlockSize(), 0);
+ size_t cur_partition_id = 0;
+ info.partition_spill_infos[cur_partition_id] = {0,0};
+ while (auto data = sorter.read())
+ {
+ Block serialized_block =
sort_header.cloneWithColumns(data.detachColumns());
+ const auto partitions =
serialized_block.getByName(PARTITION_COLUMN_NAME).column;
+ serialized_block.erase(PARTITION_COLUMN_NAME);
+ size_t row_offset = 0;
+ while (row_offset < serialized_block.rows())
+ {
+ auto last_idx = searchLastPartitionIdIndex(partitions,
row_offset, cur_partition_id);
+ if (last_idx < 0)
+ {
+ auto& last = info.partition_spill_infos[cur_partition_id];
+ compressed_output.sync();
+ last.second = output.count() - last.first;
+ cur_partition_id++;
+ info.partition_spill_infos[cur_partition_id] = {last.first
+ last.second, 0};
+ continue;
+ }
+
+ if (row_offset == 0 && last_idx == serialized_block.rows() - 1)
+ {
+ auto count = writer.write(serialized_block);
+
shuffle_writer->split_result.raw_partition_lengths[cur_partition_id] += count;
+ break;
+ }
+ else
+ {
+ auto cut_block =
serialized_block.cloneWithCutColumns(row_offset, last_idx - row_offset + 1);
+
+ auto count = writer.write(cut_block);
+
shuffle_writer->split_result.raw_partition_lengths[cur_partition_id] += count;
+ row_offset = last_idx + 1;
+ if (last_idx != serialized_block.rows() - 1)
+ {
+ auto& last =
info.partition_spill_infos[cur_partition_id];
+ compressed_output.sync();
+ last.second = output.count() - last.first;
+ cur_partition_id++;
+ info.partition_spill_infos[cur_partition_id] =
{last.first + last.second, 0};
+ }
+ }
+ }
+ }
+ compressed_output.sync();
+ auto& last = info.partition_spill_infos[cur_partition_id];
+ last.second = output.count() - last.first;
+ spilled_bytes = current_accumulated_bytes;
+ res = current_accumulated_bytes;
+ current_accumulated_bytes = 0;
+ current_accumulated_rows = 0;
+ std::erase_if(info.partition_spill_infos, [](const auto & item)
+ {
+ auto const& [key, value] = item;
+ return value.second == 0;
+ });
+ spill_infos.emplace_back(info);
+ shuffle_writer->split_result.total_compress_time +=
compressed_output.getCompressTime();
+ shuffle_writer->split_result.total_io_time +=
compressed_output.getWriteTime();
+ shuffle_writer->split_result.total_serialize_time +=
serialization_time_watch.elapsedNanoseconds();
+ };
+
+ Stopwatch spill_time_watch;
+ if (for_memory_spill && options->throw_if_memory_exceed)
+ {
+ // escape memory track from current thread status; add untracked
memory limit for create thread object, avoid trigger memory spill again
+ IgnoreMemoryTracker ignore(settings.spill_memory_overhead);
+ spill_to_file();
}
- // partial sort
- sortBlock(block, sort_description);
- Chunk chunk;
- chunk.setColumns(block.getColumns(), block.rows());
- accumulated_blocks.emplace_back(std::move(chunk));
- current_accumulated_bytes += accumulated_blocks.back().allocatedBytes();
- if (current_accumulated_bytes >= max_sort_buffer_size)
- unsafeEvictPartitions(false, false);
- shuffle_writer->split_result.total_write_time +=
write_time_watch.elapsedNanoseconds();
+ else
+ {
+ spill_to_file();
+ }
+ shuffle_writer->split_result.total_spill_time +=
spill_time_watch.elapsedNanoseconds();
+ shuffle_writer->split_result.total_bytes_spilled += spilled_bytes;
+ return res;
+}
+
+void MemorySortLocalPartitionWriter::unsafeStop()
+{
+ unsafeEvictPartitions(false, false);
+ WriteBufferFromFile output(options->data_file, options->io_buffer_size);
+ auto offsets = mergeSpills(shuffle_writer, output);
+ shuffle_writer->split_result.partition_lengths = offsets;
+}
+
+size_t MemorySortCelebornPartitionWriter::unsafeEvictPartitions(bool
for_memory_spill, bool flush_block_buffer)
+{
+ size_t res = 0;
+ size_t spilled_bytes = 0;
+ auto spill_to_celeborn = [this, for_memory_spill, flush_block_buffer,
&res, &spilled_bytes]()
+ {
+ Stopwatch serialization_time_watch;
+
+ /// Skip empty buffer
+ if (accumulated_blocks.empty())
+ return;
+
+ WriteBufferFromOwnString output;
+ auto codec =
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
{});
+ CompressedWriteBuffer compressed_output(output, codec,
shuffle_writer->options.io_buffer_size);
+ NativeWriter writer(compressed_output, shuffle_writer->output_header);
+
+ MergeSorter sorter(sort_header, std::move(accumulated_blocks),
sort_description, adaptiveBlockSize(), 0);
+ size_t cur_partition_id = 0;
+ auto push_to_celeborn = [&]()
+ {
+ compressed_output.sync();
+ auto& data = output.str();
+ if (!data.empty())
+ {
+ Stopwatch push_time_watch;
+ celeborn_client->pushPartitionData(cur_partition_id,
data.data(), data.size());
+ shuffle_writer->split_result.total_io_time +=
push_time_watch.elapsedNanoseconds();
+
shuffle_writer->split_result.partition_lengths[cur_partition_id] += data.size();
+ }
+ output.restart();
+ };
+
+ while (auto data = sorter.read())
+ {
+ Block serialized_block =
sort_header.cloneWithColumns(data.detachColumns());
+ const auto partitions =
serialized_block.getByName(PARTITION_COLUMN_NAME).column;
+ serialized_block.erase(PARTITION_COLUMN_NAME);
+ size_t row_offset = 0;
+ while (row_offset < serialized_block.rows())
+ {
+ auto last_idx = searchLastPartitionIdIndex(partitions,
row_offset, cur_partition_id);
+ if (last_idx < 0)
+ {
+ push_to_celeborn();
+ cur_partition_id++;
+ continue;
+ }
+
+ if (row_offset == 0 && last_idx == serialized_block.rows() - 1)
+ {
+ auto count = writer.write(serialized_block);
+
shuffle_writer->split_result.raw_partition_lengths[cur_partition_id] += count;
+ break;
+ }
+ auto cut_block =
serialized_block.cloneWithCutColumns(row_offset, last_idx - row_offset + 1);
+ auto count = writer.write(cut_block);
+
shuffle_writer->split_result.raw_partition_lengths[cur_partition_id] += count;
+ row_offset = last_idx + 1;
+ if (last_idx != serialized_block.rows() - 1)
+ {
+ push_to_celeborn();
+ cur_partition_id++;
+ }
+ }
+ }
+ push_to_celeborn();
+ spilled_bytes = current_accumulated_bytes;
+ res = current_accumulated_bytes;
+ current_accumulated_bytes = 0;
+ current_accumulated_rows = 0;
+
+ shuffle_writer->split_result.total_compress_time +=
compressed_output.getCompressTime();
+ shuffle_writer->split_result.total_io_time +=
compressed_output.getWriteTime();
+
+ shuffle_writer->split_result.total_serialize_time +=
serialization_time_watch.elapsedNanoseconds();
+ };
+
+ Stopwatch spill_time_watch;
+ if (for_memory_spill && options->throw_if_memory_exceed)
+ {
+ // escape memory track from current thread status; add untracked
memory limit for create thread object, avoid trigger memory spill again
+ IgnoreMemoryTracker ignore(settings.spill_memory_overhead);
+ spill_to_celeborn();
+ }
+ else
+ {
+ spill_to_celeborn();
+ }
+
+ shuffle_writer->split_result.total_spill_time +=
spill_time_watch.elapsedNanoseconds();
+ shuffle_writer->split_result.total_bytes_spilled += spilled_bytes;
+ return res;
+}
+
+void MemorySortCelebornPartitionWriter::unsafeStop()
+{
+ unsafeEvictPartitions(false, false);
}
size_t ExternalSortLocalPartitionWriter::unsafeEvictPartitions(bool, bool)
@@ -367,6 +585,10 @@ size_t
ExternalSortLocalPartitionWriter::unsafeEvictPartitions(bool, bool)
IgnoreMemoryTracker ignore(settings.spill_memory_overhead);
if (accumulated_blocks.empty())
return 0;
+ if (max_merge_block_bytes)
+ {
+ max_merge_block_size = std::max(max_merge_block_bytes /
(current_accumulated_bytes / current_accumulated_rows), 128UL);
+ }
Stopwatch watch;
MergeSorter sorter(sort_header, std::move(accumulated_blocks),
sort_description, max_merge_block_size, 0);
streams.emplace_back(&tmp_data->createStream(sort_header));
@@ -378,6 +600,7 @@ size_t
ExternalSortLocalPartitionWriter::unsafeEvictPartitions(bool, bool)
streams.back()->finishWriting();
auto result = current_accumulated_bytes;
current_accumulated_bytes = 0;
+ current_accumulated_rows = 0;
shuffle_writer->split_result.total_spill_time +=
watch.elapsedNanoseconds();
return result;
}
@@ -562,8 +785,7 @@ size_t
CelebornPartitionWriter::unsafeEvictSinglePartition(bool for_memory_spill
{
// escape memory track from current thread status; add untracked
memory limit for create thread object, avoid trigger memory spill again
IgnoreMemoryTracker ignore(settings.spill_memory_overhead);
- ThreadFromGlobalPool thread(spill_to_celeborn);
- thread.join();
+ spill_to_celeborn();
}
else
{
diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h
b/cpp-ch/local-engine/Shuffle/PartitionWriter.h
index 9c4e75db6..5b4285afd 100644
--- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h
+++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h
@@ -17,7 +17,6 @@
#pragma once
#include <cstddef>
#include <memory>
-#include <mutex>
#include <vector>
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
@@ -26,6 +25,8 @@
#include <jni/CelebornClient.h>
#include <Parser/SerializedPlanParser.h>
+#include "CachedShuffleWriter.h"
+
namespace DB
{
class MergingSortedAlgorithm;
@@ -33,17 +34,11 @@ class MergingSortedAlgorithm;
namespace local_engine
{
-struct PartitionSpillInfo
-{
- size_t partition_id;
- size_t start;
- size_t length; // in Bytes
-};
struct SpillInfo
{
std::string spilled_file;
- std::vector<PartitionSpillInfo> partition_spill_infos;
+ std::map<size_t, std::pair<size_t, size_t>> partition_spill_infos;
};
class Partition
@@ -113,7 +108,28 @@ protected:
size_t last_partition_id;
};
-class LocalPartitionWriter : public PartitionWriter
+class Spillable
+{
+public:
+ struct ExtraData
+ {
+ std::vector<ColumnsBufferPtr> partition_block_buffer;
+ std::vector<PartitionPtr> partition_buffer;
+ };
+
+ Spillable(SplitOptions options_) : split_options(std::move(options_)) {}
+ virtual ~Spillable() = default;
+
+protected:
+ String getNextSpillFile();
+ std::vector<UInt64> mergeSpills(CachedShuffleWriter * shuffle_writer,
WriteBuffer & data_file, ExtraData extra_data = {});
+ std::vector<SpillInfo> spill_infos;
+
+private:
+ const SplitOptions split_options;
+};
+
+class LocalPartitionWriter : public PartitionWriter, public Spillable
{
public:
explicit LocalPartitionWriter(CachedShuffleWriter * shuffle_writer);
@@ -124,16 +140,79 @@ public:
protected:
size_t unsafeEvictPartitions(bool for_memory_spill, bool
flush_block_buffer) override;
void unsafeStop() override;
+};
- String getNextSpillFile();
- std::vector<UInt64> mergeSpills(DB::WriteBuffer & data_file);
+class SortBasedPartitionWriter : public PartitionWriter
+{
+public:
+ explicit SortBasedPartitionWriter(CachedShuffleWriter * shuffle_writer_) :
PartitionWriter(shuffle_writer_)
+ {
+ max_merge_block_size = options->split_size;
+ max_sort_buffer_size = options->max_sort_buffer_size;
+ max_merge_block_bytes =
SerializedPlanParser::global_context->getSettings().prefer_external_sort_block_bytes;
+ }
- std::vector<SpillInfo> spill_infos;
+ String getName() const override { return "SortBasedPartitionWriter"; }
+ void write(const PartitionInfo & info, DB::Block & block) override;
+ size_t adaptiveBlockSize()
+ {
+ size_t res = max_merge_block_size;
+ if (max_merge_block_bytes)
+ {
+ res = std::min(std::max(max_merge_block_bytes /
(current_accumulated_bytes / current_accumulated_rows), 128UL), res);
+ }
+ return res;
+ }
+
+protected:
+ size_t max_merge_block_size = DB::DEFAULT_BLOCK_SIZE;
+ size_t max_sort_buffer_size = 1_GiB;
+ size_t max_merge_block_bytes = 0;
+ size_t current_accumulated_bytes = 0;
+ size_t current_accumulated_rows = 0;
+ Chunks accumulated_blocks;
+ Block output_header;
+ Block sort_header;
+ SortDescription sort_description;
+};
+
+class MemorySortLocalPartitionWriter : public SortBasedPartitionWriter, public
Spillable
+{
+public:
+ explicit MemorySortLocalPartitionWriter(CachedShuffleWriter*
shuffle_writer_)
+ : SortBasedPartitionWriter(shuffle_writer_),
Spillable(shuffle_writer_->options)
+ {
+ }
+
+ ~MemorySortLocalPartitionWriter() override = default;
+ String getName() const override { return "MemorySortLocalPartitionWriter";
}
+
+protected:
+ size_t unsafeEvictPartitions(bool for_memory_spill, bool
flush_block_buffer) override;
+ void unsafeStop() override;
+};
+
+class MemorySortCelebornPartitionWriter : public SortBasedPartitionWriter
+{
+public:
+ explicit MemorySortCelebornPartitionWriter(CachedShuffleWriter*
shuffle_writer_, std::unique_ptr<CelebornClient> celeborn_client_)
+ : SortBasedPartitionWriter(shuffle_writer_),
celeborn_client(std::move(celeborn_client_))
+ {
+ }
+
+ ~MemorySortCelebornPartitionWriter() override = default;
+
+protected:
+ size_t unsafeEvictPartitions(bool for_memory_spill, bool
flush_block_buffer) override;
+ void unsafeStop() override;
+
+private:
+ std::unique_ptr<CelebornClient> celeborn_client;
};
class SortedPartitionDataMerger;
-class ExternalSortLocalPartitionWriter : public PartitionWriter
+class ExternalSortLocalPartitionWriter : public SortBasedPartitionWriter
{
public:
struct MergeContext
@@ -142,37 +221,30 @@ public:
std::unique_ptr<SortedPartitionDataMerger> merger;
};
- explicit ExternalSortLocalPartitionWriter(CachedShuffleWriter *
shuffle_writer_) : PartitionWriter(shuffle_writer_)
+ explicit ExternalSortLocalPartitionWriter(CachedShuffleWriter *
shuffle_writer_) : SortBasedPartitionWriter(shuffle_writer_)
{
max_merge_block_size = options->split_size;
max_sort_buffer_size = options->max_sort_buffer_size;
+ max_merge_block_bytes =
SerializedPlanParser::global_context->getSettings().prefer_external_sort_block_bytes;
tmp_data =
std::make_unique<TemporaryDataOnDisk>(SerializedPlanParser::global_context->getTempDataOnDisk());
}
~ExternalSortLocalPartitionWriter() override = default;
String getName() const override { return
"ExternalSortLocalPartitionWriter"; }
- void write(const PartitionInfo & info, DB::Block & block) override;
protected:
size_t unsafeEvictPartitions(bool for_memory_spill, bool
flush_block_buffer) override;
/// Prepare for data merging, spill the remaining memory dataļ¼and create a
merger object.
MergeContext prepareMerge();
void unsafeStop() override;
- std::queue<DB::Block> mergeDataInMemory();
+ std::queue<Block> mergeDataInMemory();
- size_t max_sort_buffer_size = 1_GiB;
- size_t max_merge_block_size = DB::DEFAULT_BLOCK_SIZE;
- size_t current_accumulated_bytes = 0;
- DB::Chunks accumulated_blocks;
- DB::Block output_header;
- DB::Block sort_header;
- DB::SortDescription sort_description;
- DB::TemporaryDataOnDiskPtr tmp_data;
- std::vector<DB::TemporaryFileStream *> streams;
+ TemporaryDataOnDiskPtr tmp_data;
+ std::vector<TemporaryFileStream *> streams;
};
-class ExternalSortCelebornPartitionWriter : public
ExternalSortLocalPartitionWriter
+class ExternalSortCelebornPartitionWriter : public
ExternalSortLocalPartitionWriter
{
public:
explicit ExternalSortCelebornPartitionWriter(CachedShuffleWriter *
shuffle_writer_, std::unique_ptr<CelebornClient> celeborn_client_)
diff --git a/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp
b/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp
index 5a5e969e1..7e3642dac 100644
--- a/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp
+++ b/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp
@@ -81,7 +81,7 @@ PartitionInfo RoundRobinSelectorBuilder::build(DB::Block &
block)
pid = pid_selection;
pid_selection = (pid_selection + 1) % parts_num;
}
- return PartitionInfo::fromSelector(std::move(result), parts_num,
use_external_sort_shuffle);
+ return PartitionInfo::fromSelector(std::move(result), parts_num,
use_sort_shuffle);
}
HashSelectorBuilder::HashSelectorBuilder(
@@ -156,7 +156,7 @@ PartitionInfo HashSelectorBuilder::build(DB::Block & block)
}
}
}
- return PartitionInfo::fromSelector(std::move(partition_ids), parts_num,
use_external_sort_shuffle);
+ return PartitionInfo::fromSelector(std::move(partition_ids), parts_num,
use_sort_shuffle);
}
@@ -177,7 +177,7 @@ PartitionInfo RangeSelectorBuilder::build(DB::Block & block)
{
DB::IColumn::Selector result;
computePartitionIdByBinarySearch(block, result);
- return PartitionInfo::fromSelector(std::move(result), partition_num,
use_external_sort_shuffle);
+ return PartitionInfo::fromSelector(std::move(result), partition_num,
use_sort_shuffle);
}
void RangeSelectorBuilder::initSortInformation(Poco::JSON::Array::Ptr
orderings)
diff --git a/cpp-ch/local-engine/Shuffle/SelectorBuilder.h
b/cpp-ch/local-engine/Shuffle/SelectorBuilder.h
index 9510291c8..97894daa3 100644
--- a/cpp-ch/local-engine/Shuffle/SelectorBuilder.h
+++ b/cpp-ch/local-engine/Shuffle/SelectorBuilder.h
@@ -46,11 +46,12 @@ struct PartitionInfo
class SelectorBuilder
{
public:
- explicit SelectorBuilder(bool use_external_sort_shuffle) :
use_external_sort_shuffle(use_external_sort_shuffle) { }
+ explicit SelectorBuilder(bool use_external_sort_shuffle) :
use_sort_shuffle(use_external_sort_shuffle) { }
virtual ~SelectorBuilder() = default;
virtual PartitionInfo build(DB::Block & block) = 0;
+ void setUseSortShuffle(bool use_external_sort_shuffle_) { use_sort_shuffle
= use_external_sort_shuffle_; }
protected:
- bool use_external_sort_shuffle = false;
+ bool use_sort_shuffle = false;
};
class RoundRobinSelectorBuilder : public SelectorBuilder
diff --git a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h
b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h
index cfd406261..75edea325 100644
--- a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h
+++ b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h
@@ -53,7 +53,8 @@ struct SplitOptions
size_t max_sort_buffer_size = 1_GiB;
// Whether to spill firstly before stop external sort shuffle.
bool spill_firstly_before_stop = true;
- bool force_sort = true;
+ bool force_external_sort = false;
+ bool force_mermory_sort = false;
};
class ColumnsBuffer
diff --git a/cpp-ch/local-engine/Shuffle/SortedPartitionDataMerger.h
b/cpp-ch/local-engine/Shuffle/SortedPartitionDataMerger.h
index 31f5547fb..e38f58647 100644
--- a/cpp-ch/local-engine/Shuffle/SortedPartitionDataMerger.h
+++ b/cpp-ch/local-engine/Shuffle/SortedPartitionDataMerger.h
@@ -22,6 +22,8 @@
namespace local_engine
{
+
+int64_t searchLastPartitionIdIndex(DB::ColumnPtr column, size_t start, size_t
partition_id);
class SortedPartitionDataMerger;
using SortedPartitionDataMergerPtr =
std::unique_ptr<SortedPartitionDataMerger>;
class SortedPartitionDataMerger
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index c7721b470..00a7b1b0a 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -671,7 +671,8 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
jboolean flush_block_buffer_before_evict,
jlong max_sort_buffer_size,
jboolean spill_firstly_before_stop,
- jboolean force_sort)
+ jboolean force_external_sort,
+ jboolean force_memory_sort)
{
LOCAL_ENGINE_JNI_METHOD_START
std::string hash_exprs;
@@ -718,7 +719,8 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
.flush_block_buffer_before_evict =
static_cast<bool>(flush_block_buffer_before_evict),
.max_sort_buffer_size = static_cast<size_t>(max_sort_buffer_size),
.spill_firstly_before_stop =
static_cast<bool>(spill_firstly_before_stop),
- .force_sort = static_cast<bool>(force_sort)
+ .force_external_sort = static_cast<bool>(force_external_sort),
+ .force_mermory_sort = static_cast<bool>(force_memory_sort)
};
auto name = jstring2string(env, short_name);
local_engine::SplitterHolder * splitter;
@@ -745,7 +747,9 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
jstring hash_algorithm,
jobject pusher,
jboolean throw_if_memory_exceed,
- jboolean flush_block_buffer_before_evict)
+ jboolean flush_block_buffer_before_evict,
+ jboolean force_external_sort,
+ jboolean force_memory_sort)
{
LOCAL_ENGINE_JNI_METHOD_START
std::string hash_exprs;
@@ -780,7 +784,10 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
.spill_threshold = static_cast<size_t>(spill_threshold),
.hash_algorithm = jstring2string(env, hash_algorithm),
.throw_if_memory_exceed = static_cast<bool>(throw_if_memory_exceed),
- .flush_block_buffer_before_evict =
static_cast<bool>(flush_block_buffer_before_evict)};
+ .flush_block_buffer_before_evict =
static_cast<bool>(flush_block_buffer_before_evict),
+ .force_external_sort = static_cast<bool>(force_external_sort),
+ .force_mermory_sort = static_cast<bool>(force_memory_sort)
+ };
auto name = jstring2string(env, short_name);
local_engine::SplitterHolder * splitter;
splitter = new local_engine::SplitterHolder{.splitter =
std::make_unique<local_engine::CachedShuffleWriter>(name, options, pusher)};
diff --git
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala
index 75efa3553..524a3ee2e 100644
---
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala
+++
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala
@@ -78,7 +78,9 @@ class CHCelebornHashBasedColumnarShuffleWriter[K, V](
CHBackendSettings.shuffleHashAlgorithm,
celebornPartitionPusher,
GlutenConfig.getConf.chColumnarThrowIfMemoryExceed,
- GlutenConfig.getConf.chColumnarFlushBlockBufferBeforeEvict
+ GlutenConfig.getConf.chColumnarFlushBlockBufferBeforeEvict,
+ GlutenConfig.getConf.chColumnarForceExternalSortShuffle,
+ GlutenConfig.getConf.chColumnarForceMemorySortShuffle
)
CHNativeMemoryAllocators.createSpillable(
"CelebornShuffleWriter",
diff --git
a/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarSortShuffleAQESuite.scala
b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarExternalSortShuffleSuite.scala
similarity index 97%
rename from
gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarSortShuffleAQESuite.scala
rename to
gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarExternalSortShuffleSuite.scala
index 0072fe8c9..3ecf1fc1a 100644
---
a/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarSortShuffleAQESuite.scala
+++
b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarExternalSortShuffleSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.execution.CoalescedPartitionSpec
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AdaptiveSparkPlanHelper, AQEShuffleReadExec}
import org.apache.spark.sql.internal.SQLConf
-class GlutenClickHouseRSSColumnarSortShuffleAQESuite
+class GlutenClickHouseRSSColumnarExternalSortShuffleSuite
extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {
@@ -45,7 +45,7 @@ class GlutenClickHouseRSSColumnarSortShuffleAQESuite
.set("spark.sql.adaptive.enabled", "true")
.set("spark.shuffle.service.enabled", "false")
.set("spark.celeborn.client.spark.shuffle.writer", "hash")
- .set("spark.gluten.sql.columnar.backend.ch.forceSortShuffle", "true")
+ .set("spark.gluten.sql.columnar.backend.ch.forceExternalSortShuffle",
"true")
}
test("TPCH Q1") {
diff --git
a/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarMemorySortShuffleSuite.scala
b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarMemorySortShuffleSuite.scala
new file mode 100644
index 000000000..ddef1d87c
--- /dev/null
+++
b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarMemorySortShuffleSuite.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+
+class GlutenClickHouseRSSColumnarMemorySortShuffleSuite
+ extends GlutenClickHouseTPCHAbstractSuite
+ with AdaptiveSparkPlanHelper {
+
+ override protected val tablesPath: String = basePath + "/tpch-data-ch"
+ override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
+ override protected val queriesResults: String =
+ rootPath +
"../../../../../backends-clickhouse/src/test/resources/mergetree-queries-output"
+
+ override protected val parquetTableDataPath: String =
+ "../../../../../gluten-core/src/test/resources/tpch-data"
+
+ /** Run Gluten + ClickHouse Backend with ColumnarShuffleManager */
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set(
+ "spark.shuffle.manager",
+ "org.apache.spark.shuffle.gluten.celeborn.CelebornShuffleManager")
+ .set("spark.io.compression.codec", "LZ4")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set("spark.shuffle.service.enabled", "false")
+ .set("spark.celeborn.client.spark.shuffle.writer", "hash")
+ .set("spark.gluten.sql.columnar.backend.ch.forceMemorySortShuffle",
"true")
+ }
+
+ test("TPCH Q1") {
+ runTPCHQuery(1) { df => }
+ }
+
+ test("TPCH Q2") {
+ runTPCHQuery(2) { df => }
+ }
+
+ test("TPCH Q3") {
+ runTPCHQuery(3) { df => }
+ }
+
+ test("TPCH Q4") {
+ runTPCHQuery(4) { df => }
+ }
+
+ test("TPCH Q5") {
+ runTPCHQuery(5) { df => }
+ }
+
+ test("TPCH Q6") {
+ runTPCHQuery(6) { df => }
+ }
+
+ test("TPCH Q7") {
+ runTPCHQuery(7) { df => }
+ }
+
+ test("TPCH Q8") {
+ runTPCHQuery(8) { df => }
+ }
+
+ test("TPCH Q9") {
+ runTPCHQuery(9) { df => }
+ }
+
+ test("TPCH Q10") {
+ runTPCHQuery(10) { df => }
+ }
+
+ test("TPCH Q11") {
+ runTPCHQuery(11) { df => }
+ }
+
+ test("TPCH Q12") {
+ runTPCHQuery(12) { df => }
+ }
+
+ test("TPCH Q13") {
+ runTPCHQuery(13) { df => }
+ }
+
+ test("TPCH Q14") {
+ runTPCHQuery(14) { df => }
+ }
+
+ test("TPCH Q15") {
+ runTPCHQuery(15) { df => }
+ }
+
+ test("TPCH Q16") {
+ runTPCHQuery(16, noFallBack = false) { df => }
+ }
+
+ test("TPCH Q17") {
+ runTPCHQuery(17) { df => }
+ }
+
+ test("TPCH Q18") {
+ runTPCHQuery(18) { df => }
+ }
+
+ test("TPCH Q19") {
+ runTPCHQuery(19) { df => }
+ }
+
+ test("TPCH Q20") {
+ runTPCHQuery(20) { df => }
+ }
+
+ test("TPCH Q21") {
+ runTPCHQuery(21, noFallBack = false) { df => }
+ }
+
+ test("TPCH Q22") {
+ runTPCHQuery(22) { df => }
+ }
+}
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index c9a62b8b7..6659a42c7 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -297,7 +297,14 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def chColumnarShufflePreferSpill: Boolean =
conf.getConf(COLUMNAR_CH_SHUFFLE_PREFER_SPILL_ENABLED)
- def chColumnarShuffleSpillThreshold: Long =
conf.getConf(COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD)
+ def chColumnarShuffleSpillThreshold: Long = {
+ val threshold = conf.getConf(COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD)
+ if (threshold == 0) {
+ (conf.getConf(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES) * 0.9).toLong
+ } else {
+ threshold
+ }
+ }
def chColumnarThrowIfMemoryExceed: Boolean =
conf.getConf(COLUMNAR_CH_THROW_IF_MEMORY_EXCEED)
@@ -309,7 +316,11 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def chColumnarSpillFirstlyBeforeStop: Boolean =
conf.getConf(COLUMNAR_CH_SPILL_FIRSTLY_BEFORE_STOP)
- def chColumnarForceSortShuffle: Boolean =
conf.getConf(COLUMNAR_CH_FORCE_SORT_SHUFFLE)
+ def chColumnarForceExternalSortShuffle: Boolean =
+ conf.getConf(COLUMNAR_CH_FORCE_EXTERNAL_SORT_SHUFFLE)
+
+ def chColumnarForceMemorySortShuffle: Boolean =
+ conf.getConf(COLUMNAR_CH_FORCE_MEMORY_SORT_SHUFFLE)
def cartesianProductTransformerEnabled: Boolean =
conf.getConf(CARTESIAN_PRODUCT_TRANSFORMER_ENABLED)
@@ -1416,7 +1427,7 @@ object GlutenConfig {
.internal()
.doc("The maximum size of sort shuffle buffer in CH backend.")
.bytesConf(ByteUnit.BYTE)
- .createWithDefaultString("1GB")
+ .createWithDefaultString("0")
val COLUMNAR_CH_SPILL_FIRSTLY_BEFORE_STOP =
buildConf("spark.gluten.sql.columnar.backend.ch.spillFirstlyBeforeStop")
@@ -1425,11 +1436,17 @@ object GlutenConfig {
.booleanConf
.createWithDefault(true)
- val COLUMNAR_CH_FORCE_SORT_SHUFFLE =
- buildConf("spark.gluten.sql.columnar.backend.ch.forceSortShuffle")
+ val COLUMNAR_CH_FORCE_EXTERNAL_SORT_SHUFFLE =
+ buildConf("spark.gluten.sql.columnar.backend.ch.forceExternalSortShuffle")
+ .internal()
+ .doc("Whether to force to use external sort shuffle in CH backend. ")
+ .booleanConf
+ .createWithDefault(false)
+
+ val COLUMNAR_CH_FORCE_MEMORY_SORT_SHUFFLE =
+ buildConf("spark.gluten.sql.columnar.backend.ch.forceMemorySortShuffle")
.internal()
- .doc("Whether to force to use sort shuffle in CH backend. " +
- "Sort shuffle will enable When partition num greater than 300.")
+ .doc("Whether to force to use memory sort shuffle in CH backend. ")
.booleanConf
.createWithDefault(false)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]