This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d35d1dc5e [CH] Adaptive sort memory controll and support memory sort 
shuffle (#5893)
d35d1dc5e is described below

commit d35d1dc5e4450fdf58b8092ea26a0c928de29a48
Author: LiuNeng <[email protected]>
AuthorDate: Thu May 30 13:09:25 2024 +0800

    [CH] Adaptive sort memory controll and support memory sort shuffle (#5893)
    
    * optimize sort and shuffle
    * change block size config
    * support memory sort local shuffle
    * fix bug
    * support memory sort shuffle
    * update ch version
    * fix check style
    * fix bug
    * fix bug
    ---------
    
    Co-authored-by: liuneng1994 <[email protected]>
---
 .../vectorized/CHShuffleSplitterJniWrapper.java    |  21 +-
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |   2 +-
 .../backendsapi/clickhouse/CHTransformerApi.scala  |   6 -
 .../spark/shuffle/CHColumnarShuffleWriter.scala    |  10 +-
 ...ickHouseColumnarExternalSortShuffleSuite.scala} |  63 +---
 ...ClickHouseColumnarMemorySortShuffleSuite.scala} |  63 +---
 .../GlutenClickHouseMergeTreeOptimizeSuite.scala   |  20 +-
 cpp-ch/clickhouse.version                          |   2 +-
 cpp-ch/local-engine/Common/CHUtil.cpp              |  21 ++
 cpp-ch/local-engine/Common/CHUtil.h                |   4 +
 cpp-ch/local-engine/Parser/SortRelParser.cpp       |   9 +-
 .../local-engine/Shuffle/CachedShuffleWriter.cpp   |  79 +++--
 cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h  |  15 +-
 cpp-ch/local-engine/Shuffle/PartitionWriter.cpp    | 338 +++++++++++++++++----
 cpp-ch/local-engine/Shuffle/PartitionWriter.h      | 124 ++++++--
 cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp    |   6 +-
 cpp-ch/local-engine/Shuffle/SelectorBuilder.h      |   5 +-
 cpp-ch/local-engine/Shuffle/ShuffleSplitter.h      |   3 +-
 .../Shuffle/SortedPartitionDataMerger.h            |   2 +
 cpp-ch/local-engine/local_engine_jni.cpp           |  15 +-
 .../CHCelebornHashBasedColumnarShuffleWriter.scala |   4 +-
 ...HouseRSSColumnarExternalSortShuffleSuite.scala} |   4 +-
 ...ickHouseRSSColumnarMemorySortShuffleSuite.scala | 136 +++++++++
 .../scala/org/apache/gluten/GlutenConfig.scala     |  31 +-
 24 files changed, 704 insertions(+), 279 deletions(-)

diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java
index f81ec88c2..815bf472c 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java
@@ -37,7 +37,8 @@ public class CHShuffleSplitterJniWrapper {
       boolean flushBlockBufferBeforeEvict,
       long maxSortBufferSize,
       boolean spillFirstlyBeforeStop,
-      boolean forceSort) {
+      boolean forceExternalSort,
+      boolean forceMemorySort) {
     return nativeMake(
         part.getShortName(),
         part.getNumPartitions(),
@@ -57,7 +58,8 @@ public class CHShuffleSplitterJniWrapper {
         flushBlockBufferBeforeEvict,
         maxSortBufferSize,
         spillFirstlyBeforeStop,
-        forceSort);
+        forceExternalSort,
+        forceMemorySort);
   }
 
   public long makeForRSS(
@@ -70,7 +72,9 @@ public class CHShuffleSplitterJniWrapper {
       String hashAlgorithm,
       Object pusher,
       boolean throwIfMemoryExceed,
-      boolean flushBlockBufferBeforeEvict) {
+      boolean flushBlockBufferBeforeEvict,
+      boolean forceExternalSort,
+      boolean forceMemorySort) {
     return nativeMakeForRSS(
         part.getShortName(),
         part.getNumPartitions(),
@@ -84,7 +88,9 @@ public class CHShuffleSplitterJniWrapper {
         hashAlgorithm,
         pusher,
         throwIfMemoryExceed,
-        flushBlockBufferBeforeEvict);
+        flushBlockBufferBeforeEvict,
+        forceExternalSort,
+        forceMemorySort);
   }
 
   public native long nativeMake(
@@ -106,7 +112,8 @@ public class CHShuffleSplitterJniWrapper {
       boolean flushBlockBufferBeforeEvict,
       long maxSortBufferSize,
       boolean spillFirstlyBeforeStop,
-      boolean forceSort);
+      boolean forceSort,
+      boolean forceMemorySort);
 
   public native long nativeMakeForRSS(
       String shortName,
@@ -121,7 +128,9 @@ public class CHShuffleSplitterJniWrapper {
       String hashAlgorithm,
       Object pusher,
       boolean throwIfMemoryExceed,
-      boolean flushBlockBufferBeforeEvict);
+      boolean flushBlockBufferBeforeEvict,
+      boolean forceSort,
+      boolean forceMemorySort);
 
   public native void split(long splitterId, long block);
 
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index bc0c8d1c0..e5f68a869 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -127,7 +127,7 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
   val GLUTEN_MAX_SHUFFLE_READ_BYTES: String =
     GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
       ".runtime_config.max_source_concatenate_bytes"
-  val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = -1
+  val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = GLUTEN_MAX_BLOCK_SIZE_DEFAULT * 
256
 
   def affinityMode: String = {
     SparkEnv.get.conf
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
index c75cf4788..ea3398e77 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
@@ -91,12 +91,6 @@ class CHTransformerApi extends TransformerApi with Logging {
       val offHeapSize =
         
nativeConfMap.getOrDefault("spark.gluten.memory.offHeap.size.in.bytes", 
"0").toLong
       if (offHeapSize > 0) {
-        // Only set default max_bytes_before_external_sort for CH when it is 
not set explicitly.
-        val sortSpillKey = settingPrefix + "max_bytes_before_external_sort";
-        if (!nativeConfMap.containsKey(sortSpillKey)) {
-          val sortSpillValue = offHeapSize * 0.5
-          nativeConfMap.put(sortSpillKey, sortSpillValue.toLong.toString)
-        }
 
         // Only set default max_bytes_before_external_group_by for CH when it 
is not set explicitly.
         val groupBySpillKey = settingPrefix + 
"max_bytes_before_external_group_by";
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
index 3a80e18bd..4a1adbec7 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
@@ -61,7 +61,8 @@ class CHColumnarShuffleWriter[K, V](
     GlutenConfig.getConf.chColumnarFlushBlockBufferBeforeEvict
   private val maxSortBufferSize = 
GlutenConfig.getConf.chColumnarMaxSortBufferSize
   private val spillFirstlyBeforeStop = 
GlutenConfig.getConf.chColumnarSpillFirstlyBeforeStop
-  private val forceSortShuffle = 
GlutenConfig.getConf.chColumnarForceSortShuffle
+  private val forceExternalSortShuffle = 
GlutenConfig.getConf.chColumnarForceExternalSortShuffle
+  private val forceMemorySortShuffle = 
GlutenConfig.getConf.chColumnarForceMemorySortShuffle
   private val spillThreshold = 
GlutenConfig.getConf.chColumnarShuffleSpillThreshold
   private val jniWrapper = new CHShuffleSplitterJniWrapper
   // Are we in the process of stopping? Because map tasks can call stop() with 
success = true
@@ -115,7 +116,8 @@ class CHColumnarShuffleWriter[K, V](
         flushBlockBufferBeforeEvict,
         maxSortBufferSize,
         spillFirstlyBeforeStop,
-        forceSortShuffle
+        forceExternalSortShuffle,
+        forceMemorySortShuffle
       )
       CHNativeMemoryAllocators.createSpillable(
         "ShuffleWriter",
@@ -127,9 +129,9 @@ class CHColumnarShuffleWriter[K, V](
                   "is created. This behavior should be optimized by moving 
memory " +
                   "allocations from make() to split()")
             }
-            logInfo(s"Gluten shuffle writer: Trying to spill $size bytes of 
data")
+            logError(s"Gluten shuffle writer: Trying to spill $size bytes of 
data")
             val spilled = splitterJniWrapper.evict(nativeSplitter);
-            logInfo(s"Gluten shuffle writer: Spilled $spilled / $size bytes of 
data")
+            logError(s"Gluten shuffle writer: Spilled $spilled / $size bytes 
of data")
             spilled
           }
 
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarSortShuffleAQESuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarExternalSortShuffleSuite.scala
similarity index 52%
copy from 
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarSortShuffleAQESuite.scala
copy to 
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarExternalSortShuffleSuite.scala
index 098c7117b..be36cd998 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarSortShuffleAQESuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarExternalSortShuffleSuite.scala
@@ -17,10 +17,9 @@
 package org.apache.gluten.execution
 
 import org.apache.spark.SparkConf
-import org.apache.spark.sql.execution.CoalescedPartitionSpec
-import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AdaptiveSparkPlanHelper, AQEShuffleReadExec}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 
-class GlutenClickHouseColumnarSortShuffleAQESuite
+class GlutenClickHouseColumnarExternalSortShuffleSuite
   extends GlutenClickHouseTPCHAbstractSuite
   with AdaptiveSparkPlanHelper {
 
@@ -36,29 +35,11 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
       .set("spark.sql.shuffle.partitions", "5")
       .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
       .set("spark.sql.adaptive.enabled", "true")
-      .set("spark.gluten.sql.columnar.backend.ch.forceSortShuffle", "true")
+      .set("spark.gluten.sql.columnar.backend.ch.forceExternalSortShuffle", 
"true")
   }
 
   test("TPCH Q1") {
-    runTPCHQuery(1) {
-      df =>
-        
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
-
-        val colCustomShuffleReaderExecs = 
collect(df.queryExecution.executedPlan) {
-          case csr: AQEShuffleReadExec => csr
-        }
-        assert(colCustomShuffleReaderExecs.size == 2)
-        val coalescedPartitionSpec0 = colCustomShuffleReaderExecs(0)
-          .partitionSpecs(0)
-          .asInstanceOf[CoalescedPartitionSpec]
-        assert(coalescedPartitionSpec0.startReducerIndex == 0)
-        assert(coalescedPartitionSpec0.endReducerIndex == 5)
-        val coalescedPartitionSpec1 = colCustomShuffleReaderExecs(1)
-          .partitionSpecs(0)
-          .asInstanceOf[CoalescedPartitionSpec]
-        assert(coalescedPartitionSpec1.startReducerIndex == 0)
-        assert(coalescedPartitionSpec1.endReducerIndex == 5)
-    }
+    runTPCHQuery(1) { df => }
   }
 
   test("TPCH Q2") {
@@ -98,14 +79,7 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
   }
 
   test("TPCH Q11") {
-    runTPCHQuery(11) {
-      df =>
-        
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
-        val adaptiveSparkPlanExec = 
collectWithSubqueries(df.queryExecution.executedPlan) {
-          case adaptive: AdaptiveSparkPlanExec => adaptive
-        }
-        assert(adaptiveSparkPlanExec.size == 2)
-    }
+    runTPCHQuery(11) { df => }
   }
 
   test("TPCH Q12") {
@@ -121,14 +95,7 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
   }
 
   test("TPCH Q15") {
-    runTPCHQuery(15) {
-      df =>
-        
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
-        val adaptiveSparkPlanExec = 
collectWithSubqueries(df.queryExecution.executedPlan) {
-          case adaptive: AdaptiveSparkPlanExec => adaptive
-        }
-        assert(adaptiveSparkPlanExec.size == 2)
-    }
+    runTPCHQuery(15) { df => }
   }
 
   test("TPCH Q16") {
@@ -140,13 +107,7 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
   }
 
   test("TPCH Q18") {
-    runTPCHQuery(18) {
-      df =>
-        val hashAggregates = collect(df.queryExecution.executedPlan) {
-          case hash: HashAggregateExecBaseTransformer => hash
-        }
-        assert(hashAggregates.size == 3)
-    }
+    runTPCHQuery(18) { df => }
   }
 
   test("TPCH Q19") {
@@ -162,14 +123,6 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
   }
 
   test("TPCH Q22") {
-    runTPCHQuery(22) {
-      df =>
-        
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
-        val adaptiveSparkPlanExec = 
collectWithSubqueries(df.queryExecution.executedPlan) {
-          case adaptive: AdaptiveSparkPlanExec => adaptive
-        }
-        assert(adaptiveSparkPlanExec.size == 3)
-        assert(adaptiveSparkPlanExec(1) == adaptiveSparkPlanExec(2))
-    }
+    runTPCHQuery(22) { df => }
   }
 }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarSortShuffleAQESuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala
similarity index 52%
rename from 
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarSortShuffleAQESuite.scala
rename to 
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala
index 098c7117b..b9d580c72 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarSortShuffleAQESuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala
@@ -17,10 +17,9 @@
 package org.apache.gluten.execution
 
 import org.apache.spark.SparkConf
-import org.apache.spark.sql.execution.CoalescedPartitionSpec
-import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AdaptiveSparkPlanHelper, AQEShuffleReadExec}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 
-class GlutenClickHouseColumnarSortShuffleAQESuite
+class GlutenClickHouseColumnarMemorySortShuffleSuite
   extends GlutenClickHouseTPCHAbstractSuite
   with AdaptiveSparkPlanHelper {
 
@@ -36,29 +35,11 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
       .set("spark.sql.shuffle.partitions", "5")
       .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
       .set("spark.sql.adaptive.enabled", "true")
-      .set("spark.gluten.sql.columnar.backend.ch.forceSortShuffle", "true")
+      .set("spark.gluten.sql.columnar.backend.ch.forceMemorySortShuffle", 
"true")
   }
 
   test("TPCH Q1") {
-    runTPCHQuery(1) {
-      df =>
-        
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
-
-        val colCustomShuffleReaderExecs = 
collect(df.queryExecution.executedPlan) {
-          case csr: AQEShuffleReadExec => csr
-        }
-        assert(colCustomShuffleReaderExecs.size == 2)
-        val coalescedPartitionSpec0 = colCustomShuffleReaderExecs(0)
-          .partitionSpecs(0)
-          .asInstanceOf[CoalescedPartitionSpec]
-        assert(coalescedPartitionSpec0.startReducerIndex == 0)
-        assert(coalescedPartitionSpec0.endReducerIndex == 5)
-        val coalescedPartitionSpec1 = colCustomShuffleReaderExecs(1)
-          .partitionSpecs(0)
-          .asInstanceOf[CoalescedPartitionSpec]
-        assert(coalescedPartitionSpec1.startReducerIndex == 0)
-        assert(coalescedPartitionSpec1.endReducerIndex == 5)
-    }
+    runTPCHQuery(1) { df => }
   }
 
   test("TPCH Q2") {
@@ -98,14 +79,7 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
   }
 
   test("TPCH Q11") {
-    runTPCHQuery(11) {
-      df =>
-        
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
-        val adaptiveSparkPlanExec = 
collectWithSubqueries(df.queryExecution.executedPlan) {
-          case adaptive: AdaptiveSparkPlanExec => adaptive
-        }
-        assert(adaptiveSparkPlanExec.size == 2)
-    }
+    runTPCHQuery(11) { df => }
   }
 
   test("TPCH Q12") {
@@ -121,14 +95,7 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
   }
 
   test("TPCH Q15") {
-    runTPCHQuery(15) {
-      df =>
-        
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
-        val adaptiveSparkPlanExec = 
collectWithSubqueries(df.queryExecution.executedPlan) {
-          case adaptive: AdaptiveSparkPlanExec => adaptive
-        }
-        assert(adaptiveSparkPlanExec.size == 2)
-    }
+    runTPCHQuery(15) { df => }
   }
 
   test("TPCH Q16") {
@@ -140,13 +107,7 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
   }
 
   test("TPCH Q18") {
-    runTPCHQuery(18) {
-      df =>
-        val hashAggregates = collect(df.queryExecution.executedPlan) {
-          case hash: HashAggregateExecBaseTransformer => hash
-        }
-        assert(hashAggregates.size == 3)
-    }
+    runTPCHQuery(18) { df => }
   }
 
   test("TPCH Q19") {
@@ -162,14 +123,6 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
   }
 
   test("TPCH Q22") {
-    runTPCHQuery(22) {
-      df =>
-        
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
-        val adaptiveSparkPlanExec = 
collectWithSubqueries(df.queryExecution.executedPlan) {
-          case adaptive: AdaptiveSparkPlanExec => adaptive
-        }
-        assert(adaptiveSparkPlanExec.size == 3)
-        assert(adaptiveSparkPlanExec(1) == adaptiveSparkPlanExec(2))
-    }
+    runTPCHQuery(22) { df => }
   }
 }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
index d4302193f..f016f9dc5 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
@@ -157,9 +157,9 @@ class GlutenClickHouseMergeTreeOptimizeSuite
     assert(ret.apply(0).get(0) == 600572)
 
     spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
-    assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) 
== 812)
+    assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) 
== 372)
     spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
-    assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) 
== 232)
+    assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) 
== 239)
     spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
     // the second VACUUM will remove some empty folders
     assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) 
== 220)
@@ -188,11 +188,11 @@ class GlutenClickHouseMergeTreeOptimizeSuite
       assert(ret.apply(0).get(0) == 600572)
 
       spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
-      assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) 
== 398)
+      assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) 
== 516)
       spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
-      assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) 
== 286)
+      assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) 
== 306)
       spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
-      assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) 
== 270)
+      assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) 
== 276)
       spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
 
       val ret2 = spark.sql("select count(*) from 
lineitem_mergetree_optimize_p3").collect()
@@ -219,11 +219,11 @@ class GlutenClickHouseMergeTreeOptimizeSuite
       assert(ret.apply(0).get(0) == 600572)
 
       spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
-      assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) 
== 398)
+      assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) 
== 516)
       spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
-      assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) 
== 286)
+      assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) 
== 306)
       spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
-      assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) 
== 270)
+      assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) 
== 276)
       spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
 
       val ret2 = spark.sql("select count(*) from 
lineitem_mergetree_optimize_p4").collect()
@@ -313,12 +313,12 @@ class GlutenClickHouseMergeTreeOptimizeSuite
 
     spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=false")
     assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) 
== {
-      if (sparkVersion.equals("3.2")) 931 else 1014
+      if (sparkVersion.equals("3.2")) 499 else 528
     })
     spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS")
     spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS")
     assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) 
== {
-      if (sparkVersion.equals("3.2")) 439 else 445
+      if (sparkVersion.equals("3.2")) 315 else 321
     })
     spark.sql(s"set ${GlutenConfig.GLUTEN_ENABLED.key}=true")
 
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index b5d3aac8b..5125aabe5 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
 CH_ORG=Kyligence
 CH_BRANCH=rebase_ch/20240527
-CH_COMMIT=dd16f9435bf
+CH_COMMIT=55b10ba376274f2a61a4c1daf1a2fb744155bd32
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 9e2ce6304..317adda2a 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -555,6 +555,12 @@ DB::Context::ConfigurationPtr 
BackendInitializerUtil::initConfig(std::map<std::s
             config->setString(key.substr(CH_RUNTIME_CONFIG_PREFIX.size()), 
value);
         }
     }
+
+    if (backend_conf_map.contains(GLUTEN_TASK_OFFHEAP))
+    {
+        config->setString(CH_TASK_MEMORY, 
backend_conf_map.at(GLUTEN_TASK_OFFHEAP));
+    }
+
     return config;
 }
 
@@ -672,6 +678,21 @@ void 
BackendInitializerUtil::initSettings(std::map<std::string, std::string> & b
     settings.set("function_json_value_return_type_allow_complex", true);
     settings.set("function_json_value_return_type_allow_nullable", true);
     settings.set("precise_float_parsing", true);
+    if (backend_conf_map.contains(GLUTEN_TASK_OFFHEAP))
+    {
+        auto task_memory = 
std::stoull(backend_conf_map.at(GLUTEN_TASK_OFFHEAP));
+        if (!backend_conf_map.contains(CH_RUNTIME_SETTINGS_PREFIX + 
"max_bytes_before_external_sort"))
+        {
+            settings.max_bytes_before_external_sort = static_cast<size_t>(0.8 
* task_memory);
+        }
+        if (!backend_conf_map.contains(CH_RUNTIME_SETTINGS_PREFIX + 
"prefer_external_sort_block_bytes"))
+        {
+            auto mem_gb = task_memory / static_cast<double>(1_GiB);
+            // 2.8x+5, Heuristics calculate the block size of external sort, 
[8,16]
+            settings.prefer_external_sort_block_bytes = std::max(std::min(
+                static_cast<size_t>(2.8*mem_gb + 5), 16ul), 8ul) * 1024 * 1024;
+        }
+    }
 }
 
 void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
diff --git a/cpp-ch/local-engine/Common/CHUtil.h 
b/cpp-ch/local-engine/Common/CHUtil.h
index edbd91c50..458eec9d3 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -1,3 +1,4 @@
+/*
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -168,6 +169,9 @@ public:
     inline static const std::string S3A_PREFIX = "fs.s3a.";
     inline static const std::string SPARK_DELTA_PREFIX = 
"spark.databricks.delta.";
 
+    inline static const String GLUTEN_TASK_OFFHEAP = 
"spark.gluten.memory.task.offHeap.size.in.bytes";
+    inline static const String CH_TASK_MEMORY = "off_heap_per_task";
+
     /// On yarn mode, native writing on hdfs cluster takes yarn container user 
as the user passed to libhdfs3, which
     /// will cause permission issue because yarn container user is not the 
owner of the hdfs dir to be written.
     /// So we need to get the spark user from env and pass it to libhdfs3.
diff --git a/cpp-ch/local-engine/Parser/SortRelParser.cpp 
b/cpp-ch/local-engine/Parser/SortRelParser.cpp
index 88141d030..ea29e72d1 100644
--- a/cpp-ch/local-engine/Parser/SortRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/SortRelParser.cpp
@@ -40,8 +40,15 @@ SortRelParser::parse(DB::QueryPlanPtr query_plan, const 
substrait::Rel & rel, st
     size_t limit = parseLimit(rel_stack_);
     const auto & sort_rel = rel.sort();
     auto sort_descr = parseSortDescription(sort_rel.sorts(), 
query_plan->getCurrentDataStream().header);
+    SortingStep::Settings settings(*getContext());
+    size_t offheap_per_task = 
getContext()->getConfigRef().getUInt64("off_heap_per_task");
+    double spill_mem_ratio = 
getContext()->getConfigRef().getDouble("spill_mem_ratio", 0.9);
+    settings.worth_external_sort = [offheap_per_task, spill_mem_ratio]() -> 
bool
+    {
+        return CurrentMemoryTracker::current_memory() > offheap_per_task * 
spill_mem_ratio;
+    };
     auto sorting_step = std::make_unique<DB::SortingStep>(
-        query_plan->getCurrentDataStream(), sort_descr, limit, 
SortingStep::Settings(*getContext()), false);
+        query_plan->getCurrentDataStream(), sort_descr, limit, settings, 
false);
     sorting_step->setStepDescription("Sorting step");
     steps.emplace_back(sorting_step.get());
     query_plan->addStep(std::move(sorting_step));
diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp 
b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
index 5a8629434..559d90318 100644
--- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
+++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
@@ -25,25 +25,23 @@
 
 namespace DB
 {
-
 namespace ErrorCodes
 {
-    extern const int BAD_ARGUMENTS;
+extern const int BAD_ARGUMENTS;
 }
 }
 
 
 namespace local_engine
 {
-
 using namespace DB;
 
-CachedShuffleWriter::CachedShuffleWriter(const String & short_name, const 
SplitOptions & options_, jobject rss_pusher) : options(options_)
+CachedShuffleWriter::CachedShuffleWriter(const String & short_name, const 
SplitOptions & options_, jobject rss_pusher)
+    : options(options_)
 {
-    bool use_external_sort_shuffle = (options.force_sort) && !rss_pusher;
     if (short_name == "rr")
     {
-        partitioner = 
std::make_unique<RoundRobinSelectorBuilder>(options.partition_num, 
use_external_sort_shuffle);
+        partitioner = 
std::make_unique<RoundRobinSelectorBuilder>(options.partition_num);
     }
     else if (short_name == "hash")
     {
@@ -53,15 +51,15 @@ CachedShuffleWriter::CachedShuffleWriter(const String & 
short_name, const SplitO
         {
             hash_fields.push_back(std::stoi(expr));
         }
-        partitioner = 
std::make_unique<HashSelectorBuilder>(options.partition_num, hash_fields, 
options_.hash_algorithm, use_external_sort_shuffle);
+        partitioner = 
std::make_unique<HashSelectorBuilder>(options.partition_num, hash_fields, 
options_.hash_algorithm);
     }
     else if (short_name == "single")
     {
         options.partition_num = 1;
-        partitioner = 
std::make_unique<RoundRobinSelectorBuilder>(options.partition_num, 
use_external_sort_shuffle);
+        partitioner = 
std::make_unique<RoundRobinSelectorBuilder>(options.partition_num);
     }
     else if (short_name == "range")
-        partitioner = 
std::make_unique<RangeSelectorBuilder>(options.hash_exprs, 
options.partition_num, use_external_sort_shuffle);
+        partitioner = 
std::make_unique<RangeSelectorBuilder>(options.hash_exprs, 
options.partition_num);
     else
         throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "unsupported 
splitter {}", short_name);
 
@@ -77,32 +75,17 @@ CachedShuffleWriter::CachedShuffleWriter(const String & 
short_name, const SplitO
         jmethodID celeborn_push_partition_data_method =
             GetMethodID(env, celeborn_partition_pusher_class, 
"pushPartitionData", "(I[BI)I");
         CLEAN_JNIENV
-        auto celeborn_client = std::make_unique<CelebornClient>(rss_pusher, 
celeborn_push_partition_data_method);
-        if (use_external_sort_shuffle)
-        {
-            partition_writer = 
std::make_unique<ExternalSortCelebornPartitionWriter>(this, 
std::move(celeborn_client));
-            sort_shuffle = true;
-        }
-        else
-            partition_writer = std::make_unique<CelebornPartitionWriter>(this, 
std::move(celeborn_client));
-    }
-    else
-    {
-        if (use_external_sort_shuffle)
-        {
-            partition_writer = 
std::make_unique<ExternalSortLocalPartitionWriter>(this);
-            sort_shuffle = true;
-        }
-        else
-            partition_writer = std::make_unique<LocalPartitionWriter>(this);
+        celeborn_client = std::make_unique<CelebornClient>(rss_pusher, 
celeborn_push_partition_data_method);
     }
 
+
     split_result.partition_lengths.resize(options.partition_num, 0);
     split_result.raw_partition_lengths.resize(options.partition_num, 0);
 }
 
 void CachedShuffleWriter::split(DB::Block & block)
 {
+    lazyInitPartitionWriter(block);
     auto block_info = block.info;
     initOutputIfNeeded(block);
 
@@ -145,18 +128,50 @@ void CachedShuffleWriter::initOutputIfNeeded(Block & 
block)
     }
 }
 
-SplitResult CachedShuffleWriter::stop()
+void CachedShuffleWriter::lazyInitPartitionWriter(Block & input_sample)
 {
-    partition_writer->stop();
+    if (partition_writer)
+        return;
+
+    auto avg_row_size = input_sample.allocatedBytes() / input_sample.rows();
+    auto overhead_memory = std::max(avg_row_size, input_sample.columns() * 16) 
* options.split_size * options.partition_num;
+    auto use_sort_shuffle = overhead_memory > options.spill_threshold * 0.5 || 
options.partition_num >= 300;
+    auto use_external_sort_shuffle = options.force_external_sort;
+    auto use_memory_sort_shuffle = options.force_mermory_sort || 
use_sort_shuffle;
+    sort_shuffle = use_memory_sort_shuffle || use_external_sort_shuffle;
+    if (celeborn_client)
+    {
+        if (use_external_sort_shuffle)
+            partition_writer = 
std::make_unique<ExternalSortCelebornPartitionWriter>(this, 
std::move(celeborn_client));
+        else if (use_memory_sort_shuffle)
+            partition_writer = 
std::make_unique<MemorySortCelebornPartitionWriter>(this, 
std::move(celeborn_client));
+        else
+            partition_writer = std::make_unique<CelebornPartitionWriter>(this, 
std::move(celeborn_client));
+    }
+    else
+    {
+        if (use_external_sort_shuffle)
+            partition_writer = 
std::make_unique<ExternalSortLocalPartitionWriter>(this);
+        else if (use_memory_sort_shuffle)
+            partition_writer = 
std::make_unique<MemorySortLocalPartitionWriter>(this);
+        else
+            partition_writer = std::make_unique<LocalPartitionWriter>(this);
+    }
+    partitioner->setUseSortShuffle(sort_shuffle);
+    LOG_INFO(logger, "Use Partition Writer {}", partition_writer->getName());
+}
 
-    static auto * logger = &Poco::Logger::get("CachedShuffleWriter");
+SplitResult CachedShuffleWriter::stop()
+{
+    if (partition_writer)
+        partition_writer->stop();
     LOG_INFO(logger, "CachedShuffleWriter stop, split result: {}", 
split_result.toString());
     return split_result;
 }
 
 size_t CachedShuffleWriter::evictPartitions()
 {
+    if (!partition_writer) return 0;
     return partition_writer->evictPartitions(true, 
options.flush_block_buffer_before_evict);
 }
-
-}
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h 
b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h
index d1dd4ff2f..e6395c8e4 100644
--- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h
+++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h
@@ -24,10 +24,10 @@
 
 namespace local_engine
 {
-
-class PartitionWriter;
-class LocalPartitionWriter;
-class CelebornPartitionWriter;
+    class CelebornClient;
+    class PartitionWriter;
+    class LocalPartitionWriter;
+    class CelebornPartitionWriter;
 
 class CachedShuffleWriter : public ShuffleWriterBase
 {
@@ -35,8 +35,12 @@ public:
     friend class PartitionWriter;
     friend class LocalPartitionWriter;
     friend class CelebornPartitionWriter;
+    friend class SortBasedPartitionWriter;
+    friend class MemorySortLocalPartitionWriter;
+    friend class MemorySortCelebornPartitionWriter;
     friend class ExternalSortLocalPartitionWriter;
     friend class ExternalSortCelebornPartitionWriter;
+    friend class Spillable;
 
     explicit CachedShuffleWriter(const String & short_name, const SplitOptions 
& options,  jobject rss_pusher = nullptr);
     ~CachedShuffleWriter() override = default;
@@ -47,6 +51,7 @@ public:
 
 private:
     void initOutputIfNeeded(DB::Block & block);
+    void lazyInitPartitionWriter(DB::Block & input_sample);
 
     bool stopped = false;
     DB::Block output_header;
@@ -55,7 +60,9 @@ private:
     std::unique_ptr<SelectorBuilder> partitioner;
     std::vector<size_t> output_columns_indicies;
     std::unique_ptr<PartitionWriter> partition_writer;
+    std::unique_ptr<CelebornClient> celeborn_client;
     bool sort_shuffle = false;
+    Poco::Logger* logger = &Poco::Logger::get("CachedShuffleWriter");
 };
 }
 
diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp 
b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
index e0b69316d..d02c79e0a 100644
--- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
+++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
@@ -49,6 +49,7 @@ extern const int LOGICAL_ERROR;
 using namespace DB;
 namespace local_engine
 {
+static const String PARTITION_COLUMN_NAME = "partition";
 
 void PartitionWriter::write(const PartitionInfo & partition_info, DB::Block & 
block)
 {
@@ -120,7 +121,7 @@ void PartitionWriter::write(const PartitionInfo & 
partition_info, DB::Block & bl
     }
 
     /// Only works for local partition writer
-    if (!supportsEvictSinglePartition() && options->spill_threshold && 
current_cached_bytes >= options->spill_threshold)
+    if (!supportsEvictSinglePartition() && options->spill_threshold && 
CurrentMemoryTracker::current_memory() >= options->spill_threshold)
         unsafeEvictPartitions(false, options->flush_block_buffer_before_evict);
 
     shuffle_writer->split_result.total_split_time += 
watch.elapsedNanoseconds();
@@ -157,20 +158,18 @@ size_t LocalPartitionWriter::unsafeEvictPartitions(bool 
for_memory_spill, bool f
             if (buffer->empty())
                 continue;
 
-            PartitionSpillInfo partition_spill_info;
-            partition_spill_info.start = output.count();
+            std::pair<size_t, size_t> offsets;
+            offsets.first = output.count();
             spilled_bytes += buffer->bytes();
 
             size_t written_bytes = buffer->spill(writer);
             res += written_bytes;
 
             compressed_output.sync();
-            partition_spill_info.length = output.count() - 
partition_spill_info.start;
+            offsets.second = output.count() - offsets.first;
             shuffle_writer->split_result.raw_partition_lengths[partition_id] 
+= written_bytes;
-            partition_spill_info.partition_id = partition_id;
-            info.partition_spill_infos.emplace_back(partition_spill_info);
+            info.partition_spill_infos[partition_id] = offsets;
         }
-
         spill_infos.emplace_back(info);
         shuffle_writer->split_result.total_compress_time += 
compressed_output.getCompressTime();
         shuffle_writer->split_result.total_write_time += 
compressed_output.getWriteTime();
@@ -182,8 +181,7 @@ size_t LocalPartitionWriter::unsafeEvictPartitions(bool 
for_memory_spill, bool f
     {
         // escape memory track from current thread status; add untracked 
memory limit for create thread object, avoid trigger memory spill again
         IgnoreMemoryTracker ignore(settings.spill_memory_overhead);
-        ThreadFromGlobalPool thread(spill_to_file);
-        thread.join();
+        spill_to_file();
     }
     else
     {
@@ -194,20 +192,35 @@ size_t LocalPartitionWriter::unsafeEvictPartitions(bool 
for_memory_spill, bool f
     return res;
 }
 
-std::vector<UInt64> LocalPartitionWriter::mergeSpills(WriteBuffer & data_file)
+String Spillable::getNextSpillFile()
+{
+    auto file_name = std::to_string(split_options.shuffle_id) + "_" + 
std::to_string(split_options.map_id) + "_" + std::to_string(spill_infos.size());
+    std::hash<std::string> hasher;
+    auto hash = hasher(file_name);
+    auto dir_id = hash % split_options.local_dirs_list.size();
+    auto sub_dir_id = (hash / split_options.local_dirs_list.size()) % 
split_options.num_sub_dirs;
+
+    std::string dir = 
std::filesystem::path(split_options.local_dirs_list[dir_id]) / 
std::format("{:02x}", sub_dir_id);
+    if (!std::filesystem::exists(dir))
+        std::filesystem::create_directories(dir);
+    return std::filesystem::path(dir) / file_name;
+}
+
+std::vector<UInt64> Spillable::mergeSpills(CachedShuffleWriter * 
shuffle_writer, WriteBuffer & data_file, ExtraData extra_data)
 {
     auto codec = 
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
 {});
+
     CompressedWriteBuffer compressed_output(data_file, codec, 
shuffle_writer->options.io_buffer_size);
     NativeWriter writer(compressed_output, shuffle_writer->output_header);
 
     std::vector<UInt64> 
partition_length(shuffle_writer->options.partition_num, 0);
 
-    std::vector<ReadBufferPtr> spill_inputs;
+    std::vector<std::shared_ptr<ReadBufferFromFile>> spill_inputs;
     spill_inputs.reserve(spill_infos.size());
     for (const auto & spill : spill_infos)
     {
         // only use readBig
-        
spill_inputs.emplace_back(std::make_shared<ReadBufferFromFile>(spill.spilled_file,
 0));
+        
spill_inputs.emplace_back(std::make_shared<ReadBufferFromFilePRead>(spill.spilled_file,
 0));
     }
 
     Stopwatch write_time_watch;
@@ -215,33 +228,46 @@ std::vector<UInt64> 
LocalPartitionWriter::mergeSpills(WriteBuffer & data_file)
     Stopwatch serialization_time_watch;
     size_t merge_io_time = 0;
     String buffer;
-    for (size_t partition_id = 0; partition_id < 
partition_block_buffer.size(); ++partition_id)
+    for (size_t partition_id = 0; partition_id < split_options.partition_num; 
++partition_id)
     {
         auto size_before = data_file.count();
 
         io_time_watch.restart();
         for (size_t i = 0; i < spill_infos.size(); ++i)
         {
-            size_t size = 
spill_infos[i].partition_spill_infos[partition_id].length;
+            if (!spill_infos[i].partition_spill_infos.contains(partition_id))
+            {
+                continue;
+            }
+            size_t size = 
spill_infos[i].partition_spill_infos[partition_id].second;
+            size_t offset = 
spill_infos[i].partition_spill_infos[partition_id].first;
+            if (!size)
+            {
+                continue;
+            }
             buffer.reserve(size);
-            auto count = spill_inputs[i]->readBig(buffer.data(), size);
+            auto count = spill_inputs[i]->readBigAt(buffer.data(), size, 
offset, nullptr);
+
+            chassert(count == size);
             data_file.write(buffer.data(), count);
         }
         merge_io_time += io_time_watch.elapsedNanoseconds();
 
         serialization_time_watch.restart();
-        if (!partition_block_buffer[partition_id]->empty())
+        if (!extra_data.partition_block_buffer.empty() && 
!extra_data.partition_block_buffer[partition_id]->empty())
         {
-            Block block = 
partition_block_buffer[partition_id]->releaseColumns();
-            partition_buffer[partition_id]->addBlock(std::move(block));
+            Block block = 
extra_data.partition_block_buffer[partition_id]->releaseColumns();
+            
extra_data.partition_buffer[partition_id]->addBlock(std::move(block));
+        }
+        if (!extra_data.partition_buffer.empty())
+        {
+            size_t raw_size = 
extra_data.partition_buffer[partition_id]->spill(writer);
+            shuffle_writer->split_result.raw_partition_lengths[partition_id] 
+= raw_size;
         }
-        size_t raw_size = partition_buffer[partition_id]->spill(writer);
-
         compressed_output.sync();
         partition_length[partition_id] = data_file.count() - size_before;
         shuffle_writer->split_result.total_serialize_time += 
serialization_time_watch.elapsedNanoseconds();
         shuffle_writer->split_result.total_bytes_written += 
partition_length[partition_id];
-        shuffle_writer->split_result.raw_partition_lengths[partition_id] += 
raw_size;
     }
 
     shuffle_writer->split_result.total_write_time += 
write_time_watch.elapsedNanoseconds();
@@ -253,32 +279,43 @@ std::vector<UInt64> 
LocalPartitionWriter::mergeSpills(WriteBuffer & data_file)
 
     for (const auto & spill : spill_infos)
         std::filesystem::remove(spill.spilled_file);
-
     return partition_length;
 }
 
-LocalPartitionWriter::LocalPartitionWriter(CachedShuffleWriter * 
shuffle_writer_) : PartitionWriter(shuffle_writer_)
+void SortBasedPartitionWriter::write(const PartitionInfo & info, DB::Block & 
block)
 {
+    Stopwatch write_time_watch;
+    if (output_header.columns() == 0)
+        output_header = block.cloneEmpty();
+    auto partition_column = ColumnUInt64::create();
+    partition_column->reserve(block.rows());
+    
partition_column->getData().insert_assume_reserved(info.src_partition_num.begin(),
 info.src_partition_num.end());
+    block.insert({std::move(partition_column), 
std::make_shared<DataTypeUInt64>(), PARTITION_COLUMN_NAME});
+    if (sort_header.columns() == 0)
+    {
+        sort_header = block.cloneEmpty();
+        
sort_description.emplace_back(SortColumnDescription(PARTITION_COLUMN_NAME));
+    }
+    // partial sort
+    sortBlock(block, sort_description);
+    Chunk chunk;
+    chunk.setColumns(block.getColumns(), block.rows());
+    accumulated_blocks.emplace_back(std::move(chunk));
+    current_accumulated_bytes += accumulated_blocks.back().allocatedBytes();
+    current_accumulated_rows += accumulated_blocks.back().getNumRows();
+    shuffle_writer->split_result.total_write_time += 
write_time_watch.elapsedNanoseconds();
+    if (options->spill_threshold && CurrentMemoryTracker::current_memory() >= 
options->spill_threshold)
+        unsafeEvictPartitions(false, false);
 }
 
-String LocalPartitionWriter::getNextSpillFile()
+LocalPartitionWriter::LocalPartitionWriter(CachedShuffleWriter * 
shuffle_writer_) : PartitionWriter(shuffle_writer_), 
Spillable(shuffle_writer_->options)
 {
-    auto file_name = std::to_string(options->shuffle_id) + "_" + 
std::to_string(options->map_id) + "_" + std::to_string(spill_infos.size());
-    std::hash<std::string> hasher;
-    auto hash = hasher(file_name);
-    auto dir_id = hash % options->local_dirs_list.size();
-    auto sub_dir_id = (hash / options->local_dirs_list.size()) % 
options->num_sub_dirs;
-
-    std::string dir = std::filesystem::path(options->local_dirs_list[dir_id]) 
/ std::format("{:02x}", sub_dir_id);
-    if (!std::filesystem::exists(dir))
-        std::filesystem::create_directories(dir);
-    return std::filesystem::path(dir) / file_name;
 }
 
 void LocalPartitionWriter::unsafeStop()
 {
     WriteBufferFromFile output(options->data_file, options->io_buffer_size);
-    auto offsets = mergeSpills(output);
+    auto offsets = mergeSpills(shuffle_writer, output, 
{partition_block_buffer, partition_buffer});
     shuffle_writer->split_result.partition_lengths = offsets;
 }
 
@@ -335,30 +372,211 @@ size_t PartitionWriter::bytes() const
     return bytes;
 }
 
-void ExternalSortLocalPartitionWriter::write(const PartitionInfo & info, 
DB::Block & block)
+size_t MemorySortLocalPartitionWriter::unsafeEvictPartitions(bool 
for_memory_spill, bool  /*flush_block_buffer*/)
 {
-    Stopwatch write_time_watch;
-    if (output_header.columns() == 0)
-        output_header = block.cloneEmpty();
-    static const String partition_column_name = "partition";
-    auto partition_column = ColumnUInt64::create();
-    partition_column->reserve(block.rows());
-    
partition_column->getData().insert_assume_reserved(info.src_partition_num.begin(),
 info.src_partition_num.end());
-    block.insert({std::move(partition_column), 
std::make_shared<DataTypeUInt64>(), partition_column_name});
-    if (sort_header.columns() == 0)
+    size_t res = 0;
+    size_t spilled_bytes = 0;
+
+    auto spill_to_file = [this, &res, &spilled_bytes]()
     {
-        sort_header = block.cloneEmpty();
-        
sort_description.emplace_back(SortColumnDescription(partition_column_name));
+        if (accumulated_blocks.empty())
+            return;
+        auto file = getNextSpillFile();
+        WriteBufferFromFile output(file, 
shuffle_writer->options.io_buffer_size);
+        auto codec = 
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
 {});
+        CompressedWriteBuffer compressed_output(output, codec, 
shuffle_writer->options.io_buffer_size);
+        NativeWriter writer(compressed_output, output_header);
+
+        SpillInfo info;
+        info.spilled_file = file;
+
+        Stopwatch serialization_time_watch;
+        MergeSorter sorter(sort_header, std::move(accumulated_blocks), 
sort_description, adaptiveBlockSize(), 0);
+        size_t cur_partition_id = 0;
+        info.partition_spill_infos[cur_partition_id] = {0,0};
+        while (auto data = sorter.read())
+        {
+            Block serialized_block = 
sort_header.cloneWithColumns(data.detachColumns());
+            const auto partitions = 
serialized_block.getByName(PARTITION_COLUMN_NAME).column;
+            serialized_block.erase(PARTITION_COLUMN_NAME);
+            size_t row_offset = 0;
+            while (row_offset < serialized_block.rows())
+            {
+                auto last_idx = searchLastPartitionIdIndex(partitions, 
row_offset, cur_partition_id);
+                if (last_idx < 0)
+                {
+                    auto& last = info.partition_spill_infos[cur_partition_id];
+                    compressed_output.sync();
+                    last.second = output.count() - last.first;
+                    cur_partition_id++;
+                    info.partition_spill_infos[cur_partition_id] = {last.first 
+ last.second, 0};
+                    continue;
+                }
+
+                if (row_offset == 0 && last_idx == serialized_block.rows() - 1)
+                {
+                    auto count = writer.write(serialized_block);
+                    
shuffle_writer->split_result.raw_partition_lengths[cur_partition_id] += count;
+                    break;
+                }
+                else
+                {
+                    auto cut_block = 
serialized_block.cloneWithCutColumns(row_offset, last_idx - row_offset + 1);
+
+                    auto count = writer.write(cut_block);
+                    
shuffle_writer->split_result.raw_partition_lengths[cur_partition_id] += count;
+                    row_offset = last_idx + 1;
+                    if (last_idx != serialized_block.rows() - 1)
+                    {
+                        auto& last = 
info.partition_spill_infos[cur_partition_id];
+                        compressed_output.sync();
+                        last.second = output.count() - last.first;
+                        cur_partition_id++;
+                        info.partition_spill_infos[cur_partition_id] = 
{last.first + last.second, 0};
+                    }
+                }
+            }
+        }
+        compressed_output.sync();
+        auto& last = info.partition_spill_infos[cur_partition_id];
+        last.second = output.count() - last.first;
+        spilled_bytes = current_accumulated_bytes;
+        res = current_accumulated_bytes;
+        current_accumulated_bytes = 0;
+        current_accumulated_rows = 0;
+        std::erase_if(info.partition_spill_infos, [](const auto & item)
+        {
+            auto const& [key, value] = item;
+           return value.second == 0;
+        });
+        spill_infos.emplace_back(info);
+        shuffle_writer->split_result.total_compress_time += 
compressed_output.getCompressTime();
+        shuffle_writer->split_result.total_io_time += 
compressed_output.getWriteTime();
+        shuffle_writer->split_result.total_serialize_time += 
serialization_time_watch.elapsedNanoseconds();
+    };
+
+    Stopwatch spill_time_watch;
+    if (for_memory_spill && options->throw_if_memory_exceed)
+    {
+        // escape memory track from current thread status; add untracked 
memory limit for create thread object, avoid trigger memory spill again
+        IgnoreMemoryTracker ignore(settings.spill_memory_overhead);
+        spill_to_file();
     }
-    // partial sort
-    sortBlock(block, sort_description);
-    Chunk chunk;
-    chunk.setColumns(block.getColumns(), block.rows());
-    accumulated_blocks.emplace_back(std::move(chunk));
-    current_accumulated_bytes += accumulated_blocks.back().allocatedBytes();
-    if (current_accumulated_bytes >= max_sort_buffer_size)
-        unsafeEvictPartitions(false, false);
-    shuffle_writer->split_result.total_write_time += 
write_time_watch.elapsedNanoseconds();
+    else
+    {
+        spill_to_file();
+    }
+    shuffle_writer->split_result.total_spill_time += 
spill_time_watch.elapsedNanoseconds();
+    shuffle_writer->split_result.total_bytes_spilled += spilled_bytes;
+    return res;
+}
+
+void MemorySortLocalPartitionWriter::unsafeStop()
+{
+    unsafeEvictPartitions(false, false);
+    WriteBufferFromFile output(options->data_file, options->io_buffer_size);
+    auto offsets = mergeSpills(shuffle_writer, output);
+    shuffle_writer->split_result.partition_lengths = offsets;
+}
+
+size_t MemorySortCelebornPartitionWriter::unsafeEvictPartitions(bool 
for_memory_spill, bool flush_block_buffer)
+{
+    size_t res = 0;
+    size_t spilled_bytes = 0;
+    auto spill_to_celeborn = [this, for_memory_spill, flush_block_buffer, 
&res, &spilled_bytes]()
+    {
+        Stopwatch serialization_time_watch;
+
+        /// Skip empty buffer
+        if (accumulated_blocks.empty())
+            return;
+
+        WriteBufferFromOwnString output;
+        auto codec = 
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
 {});
+        CompressedWriteBuffer compressed_output(output, codec, 
shuffle_writer->options.io_buffer_size);
+        NativeWriter writer(compressed_output, shuffle_writer->output_header);
+
+        MergeSorter sorter(sort_header, std::move(accumulated_blocks), 
sort_description, adaptiveBlockSize(), 0);
+        size_t cur_partition_id = 0;
+        auto push_to_celeborn = [&]()
+        {
+            compressed_output.sync();
+            auto& data = output.str();
+            if (!data.empty())
+            {
+                Stopwatch push_time_watch;
+                celeborn_client->pushPartitionData(cur_partition_id, 
data.data(), data.size());
+                shuffle_writer->split_result.total_io_time += 
push_time_watch.elapsedNanoseconds();
+                
shuffle_writer->split_result.partition_lengths[cur_partition_id] += data.size();
+            }
+            output.restart();
+        };
+
+        while (auto data = sorter.read())
+        {
+            Block serialized_block = 
sort_header.cloneWithColumns(data.detachColumns());
+            const auto partitions = 
serialized_block.getByName(PARTITION_COLUMN_NAME).column;
+            serialized_block.erase(PARTITION_COLUMN_NAME);
+            size_t row_offset = 0;
+            while (row_offset < serialized_block.rows())
+            {
+                auto last_idx = searchLastPartitionIdIndex(partitions, 
row_offset, cur_partition_id);
+                if (last_idx < 0)
+                {
+                    push_to_celeborn();
+                    cur_partition_id++;
+                    continue;
+                }
+
+                if (row_offset == 0 && last_idx == serialized_block.rows() - 1)
+                {
+                    auto count = writer.write(serialized_block);
+                    
shuffle_writer->split_result.raw_partition_lengths[cur_partition_id] += count;
+                    break;
+                }
+                auto cut_block = 
serialized_block.cloneWithCutColumns(row_offset, last_idx - row_offset + 1);
+                auto count = writer.write(cut_block);
+                
shuffle_writer->split_result.raw_partition_lengths[cur_partition_id] += count;
+                row_offset = last_idx + 1;
+                if (last_idx != serialized_block.rows() - 1)
+                {
+                    push_to_celeborn();
+                    cur_partition_id++;
+                }
+            }
+        }
+        push_to_celeborn();
+        spilled_bytes = current_accumulated_bytes;
+        res = current_accumulated_bytes;
+        current_accumulated_bytes = 0;
+        current_accumulated_rows = 0;
+
+        shuffle_writer->split_result.total_compress_time += 
compressed_output.getCompressTime();
+        shuffle_writer->split_result.total_io_time += 
compressed_output.getWriteTime();
+
+        shuffle_writer->split_result.total_serialize_time += 
serialization_time_watch.elapsedNanoseconds();
+    };
+
+    Stopwatch spill_time_watch;
+    if (for_memory_spill && options->throw_if_memory_exceed)
+    {
+        // escape memory track from current thread status; add untracked 
memory limit for create thread object, avoid trigger memory spill again
+        IgnoreMemoryTracker ignore(settings.spill_memory_overhead);
+        spill_to_celeborn();
+    }
+    else
+    {
+        spill_to_celeborn();
+    }
+
+    shuffle_writer->split_result.total_spill_time += 
spill_time_watch.elapsedNanoseconds();
+    shuffle_writer->split_result.total_bytes_spilled += spilled_bytes;
+    return res;
+}
+
+void MemorySortCelebornPartitionWriter::unsafeStop()
+{
+    unsafeEvictPartitions(false, false);
 }
 
 size_t ExternalSortLocalPartitionWriter::unsafeEvictPartitions(bool, bool)
@@ -367,6 +585,10 @@ size_t 
ExternalSortLocalPartitionWriter::unsafeEvictPartitions(bool, bool)
     IgnoreMemoryTracker ignore(settings.spill_memory_overhead);
     if (accumulated_blocks.empty())
         return 0;
+    if (max_merge_block_bytes)
+    {
+        max_merge_block_size = std::max(max_merge_block_bytes / 
(current_accumulated_bytes / current_accumulated_rows), 128UL);
+    }
     Stopwatch watch;
     MergeSorter sorter(sort_header, std::move(accumulated_blocks), 
sort_description, max_merge_block_size, 0);
     streams.emplace_back(&tmp_data->createStream(sort_header));
@@ -378,6 +600,7 @@ size_t 
ExternalSortLocalPartitionWriter::unsafeEvictPartitions(bool, bool)
     streams.back()->finishWriting();
     auto result = current_accumulated_bytes;
     current_accumulated_bytes = 0;
+    current_accumulated_rows = 0;
     shuffle_writer->split_result.total_spill_time += 
watch.elapsedNanoseconds();
     return result;
 }
@@ -562,8 +785,7 @@ size_t 
CelebornPartitionWriter::unsafeEvictSinglePartition(bool for_memory_spill
     {
         // escape memory track from current thread status; add untracked 
memory limit for create thread object, avoid trigger memory spill again
         IgnoreMemoryTracker ignore(settings.spill_memory_overhead);
-        ThreadFromGlobalPool thread(spill_to_celeborn);
-        thread.join();
+        spill_to_celeborn();
     }
     else
     {
diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h 
b/cpp-ch/local-engine/Shuffle/PartitionWriter.h
index 9c4e75db6..5b4285afd 100644
--- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h
+++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h
@@ -17,7 +17,6 @@
 #pragma once
 #include <cstddef>
 #include <memory>
-#include <mutex>
 #include <vector>
 #include <Core/Block.h>
 #include <IO/WriteBuffer.h>
@@ -26,6 +25,8 @@
 #include <jni/CelebornClient.h>
 #include <Parser/SerializedPlanParser.h>
 
+#include "CachedShuffleWriter.h"
+
 namespace DB
 {
 class MergingSortedAlgorithm;
@@ -33,17 +34,11 @@ class MergingSortedAlgorithm;
 
 namespace local_engine
 {
-struct PartitionSpillInfo
-{
-    size_t partition_id;
-    size_t start;
-    size_t length; // in Bytes
-};
 
 struct SpillInfo
 {
     std::string spilled_file;
-    std::vector<PartitionSpillInfo> partition_spill_infos;
+    std::map<size_t, std::pair<size_t, size_t>> partition_spill_infos;
 };
 
 class Partition
@@ -113,7 +108,28 @@ protected:
     size_t last_partition_id;
 };
 
-class LocalPartitionWriter : public PartitionWriter
+class Spillable
+{
+public:
+    struct ExtraData
+    {
+        std::vector<ColumnsBufferPtr> partition_block_buffer;
+        std::vector<PartitionPtr> partition_buffer;
+    };
+
+    Spillable(SplitOptions options_) : split_options(std::move(options_)) {}
+    virtual ~Spillable() = default;
+
+protected:
+    String getNextSpillFile();
+    std::vector<UInt64> mergeSpills(CachedShuffleWriter * shuffle_writer, 
WriteBuffer & data_file, ExtraData extra_data = {});
+    std::vector<SpillInfo> spill_infos;
+
+private:
+    const SplitOptions split_options;
+};
+
+class LocalPartitionWriter : public PartitionWriter, public Spillable
 {
 public:
     explicit LocalPartitionWriter(CachedShuffleWriter * shuffle_writer);
@@ -124,16 +140,79 @@ public:
 protected:
     size_t unsafeEvictPartitions(bool for_memory_spill, bool 
flush_block_buffer) override;
     void unsafeStop() override;
+};
 
-    String getNextSpillFile();
-    std::vector<UInt64> mergeSpills(DB::WriteBuffer & data_file);
+class SortBasedPartitionWriter : public PartitionWriter
+{
+public:
+    explicit SortBasedPartitionWriter(CachedShuffleWriter * shuffle_writer_) : 
PartitionWriter(shuffle_writer_)
+    {
+        max_merge_block_size = options->split_size;
+        max_sort_buffer_size = options->max_sort_buffer_size;
+        max_merge_block_bytes = 
SerializedPlanParser::global_context->getSettings().prefer_external_sort_block_bytes;
+    }
 
-    std::vector<SpillInfo> spill_infos;
+    String getName() const override { return "SortBasedPartitionWriter"; }
+    void write(const PartitionInfo & info, DB::Block & block) override;
+    size_t adaptiveBlockSize()
+    {
+        size_t res = max_merge_block_size;
+        if (max_merge_block_bytes)
+        {
+            res = std::min(std::max(max_merge_block_bytes / 
(current_accumulated_bytes / current_accumulated_rows), 128UL), res);
+        }
+        return res;
+    }
+
+protected:
+    size_t max_merge_block_size = DB::DEFAULT_BLOCK_SIZE;
+    size_t max_sort_buffer_size = 1_GiB;
+    size_t max_merge_block_bytes = 0;
+    size_t current_accumulated_bytes = 0;
+    size_t current_accumulated_rows = 0;
+    Chunks accumulated_blocks;
+    Block output_header;
+    Block sort_header;
+    SortDescription sort_description;
+};
+
+class MemorySortLocalPartitionWriter : public SortBasedPartitionWriter, public 
Spillable
+{
+public:
+    explicit MemorySortLocalPartitionWriter(CachedShuffleWriter* 
shuffle_writer_)
+        : SortBasedPartitionWriter(shuffle_writer_), 
Spillable(shuffle_writer_->options)
+    {
+    }
+
+    ~MemorySortLocalPartitionWriter() override = default;
+    String getName() const override { return "MemorySortLocalPartitionWriter"; 
}
+
+protected:
+    size_t unsafeEvictPartitions(bool for_memory_spill, bool 
flush_block_buffer) override;
+    void unsafeStop() override;
+};
+
+class MemorySortCelebornPartitionWriter : public SortBasedPartitionWriter
+{
+public:
+    explicit MemorySortCelebornPartitionWriter(CachedShuffleWriter* 
shuffle_writer_, std::unique_ptr<CelebornClient> celeborn_client_)
+        : SortBasedPartitionWriter(shuffle_writer_), 
celeborn_client(std::move(celeborn_client_))
+    {
+    }
+
+    ~MemorySortCelebornPartitionWriter() override = default;
+
+protected:
+    size_t unsafeEvictPartitions(bool for_memory_spill, bool 
flush_block_buffer) override;
+    void unsafeStop() override;
+
+private:
+    std::unique_ptr<CelebornClient> celeborn_client;
 };
 
 class SortedPartitionDataMerger;
 
-class ExternalSortLocalPartitionWriter : public PartitionWriter
+class ExternalSortLocalPartitionWriter : public SortBasedPartitionWriter
 {
 public:
     struct MergeContext
@@ -142,37 +221,30 @@ public:
         std::unique_ptr<SortedPartitionDataMerger> merger;
     };
 
-    explicit ExternalSortLocalPartitionWriter(CachedShuffleWriter * 
shuffle_writer_) : PartitionWriter(shuffle_writer_)
+    explicit ExternalSortLocalPartitionWriter(CachedShuffleWriter * 
shuffle_writer_) : SortBasedPartitionWriter(shuffle_writer_)
     {
         max_merge_block_size = options->split_size;
         max_sort_buffer_size = options->max_sort_buffer_size;
+        max_merge_block_bytes = 
SerializedPlanParser::global_context->getSettings().prefer_external_sort_block_bytes;
         tmp_data = 
std::make_unique<TemporaryDataOnDisk>(SerializedPlanParser::global_context->getTempDataOnDisk());
     }
 
     ~ExternalSortLocalPartitionWriter() override = default;
 
     String getName() const override { return 
"ExternalSortLocalPartitionWriter"; }
-    void write(const PartitionInfo & info, DB::Block & block) override;
 
 protected:
     size_t unsafeEvictPartitions(bool for_memory_spill, bool 
flush_block_buffer) override;
     /// Prepare for data merging, spill the remaining memory data,and create a 
merger object.
     MergeContext prepareMerge();
     void unsafeStop() override;
-    std::queue<DB::Block> mergeDataInMemory();
+    std::queue<Block> mergeDataInMemory();
 
-    size_t max_sort_buffer_size = 1_GiB;
-    size_t max_merge_block_size = DB::DEFAULT_BLOCK_SIZE;
-    size_t current_accumulated_bytes = 0;
-    DB::Chunks accumulated_blocks;
-    DB::Block output_header;
-    DB::Block sort_header;
-    DB::SortDescription sort_description;
-    DB::TemporaryDataOnDiskPtr tmp_data;
-    std::vector<DB::TemporaryFileStream *> streams;
+    TemporaryDataOnDiskPtr tmp_data;
+    std::vector<TemporaryFileStream *> streams;
 };
 
-class  ExternalSortCelebornPartitionWriter : public 
ExternalSortLocalPartitionWriter
+class ExternalSortCelebornPartitionWriter : public 
ExternalSortLocalPartitionWriter
 {
 public:
     explicit ExternalSortCelebornPartitionWriter(CachedShuffleWriter * 
shuffle_writer_, std::unique_ptr<CelebornClient> celeborn_client_)
diff --git a/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp 
b/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp
index 5a5e969e1..7e3642dac 100644
--- a/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp
+++ b/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp
@@ -81,7 +81,7 @@ PartitionInfo RoundRobinSelectorBuilder::build(DB::Block & 
block)
         pid = pid_selection;
         pid_selection = (pid_selection + 1) % parts_num;
     }
-    return PartitionInfo::fromSelector(std::move(result), parts_num, 
use_external_sort_shuffle);
+    return PartitionInfo::fromSelector(std::move(result), parts_num, 
use_sort_shuffle);
 }
 
 HashSelectorBuilder::HashSelectorBuilder(
@@ -156,7 +156,7 @@ PartitionInfo HashSelectorBuilder::build(DB::Block & block)
             }
         }
     }
-    return PartitionInfo::fromSelector(std::move(partition_ids), parts_num, 
use_external_sort_shuffle);
+    return PartitionInfo::fromSelector(std::move(partition_ids), parts_num, 
use_sort_shuffle);
 }
 
 
@@ -177,7 +177,7 @@ PartitionInfo RangeSelectorBuilder::build(DB::Block & block)
 {
     DB::IColumn::Selector result;
     computePartitionIdByBinarySearch(block, result);
-    return PartitionInfo::fromSelector(std::move(result), partition_num, 
use_external_sort_shuffle);
+    return PartitionInfo::fromSelector(std::move(result), partition_num, 
use_sort_shuffle);
 }
 
 void RangeSelectorBuilder::initSortInformation(Poco::JSON::Array::Ptr 
orderings)
diff --git a/cpp-ch/local-engine/Shuffle/SelectorBuilder.h 
b/cpp-ch/local-engine/Shuffle/SelectorBuilder.h
index 9510291c8..97894daa3 100644
--- a/cpp-ch/local-engine/Shuffle/SelectorBuilder.h
+++ b/cpp-ch/local-engine/Shuffle/SelectorBuilder.h
@@ -46,11 +46,12 @@ struct PartitionInfo
 class SelectorBuilder
 {
 public:
-    explicit SelectorBuilder(bool use_external_sort_shuffle) : 
use_external_sort_shuffle(use_external_sort_shuffle) { }
+    explicit SelectorBuilder(bool use_external_sort_shuffle) : 
use_sort_shuffle(use_external_sort_shuffle) { }
     virtual ~SelectorBuilder() = default;
     virtual PartitionInfo build(DB::Block & block) = 0;
+    void setUseSortShuffle(bool use_external_sort_shuffle_) { use_sort_shuffle 
= use_external_sort_shuffle_; }
 protected:
-    bool use_external_sort_shuffle = false;
+    bool use_sort_shuffle = false;
 };
 
 class RoundRobinSelectorBuilder : public SelectorBuilder
diff --git a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h 
b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h
index cfd406261..75edea325 100644
--- a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h
+++ b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h
@@ -53,7 +53,8 @@ struct SplitOptions
     size_t max_sort_buffer_size = 1_GiB;
     // Whether to spill firstly before stop external sort shuffle.
     bool spill_firstly_before_stop = true;
-    bool force_sort = true;
+    bool force_external_sort = false;
+    bool force_mermory_sort = false;
 };
 
 class ColumnsBuffer
diff --git a/cpp-ch/local-engine/Shuffle/SortedPartitionDataMerger.h 
b/cpp-ch/local-engine/Shuffle/SortedPartitionDataMerger.h
index 31f5547fb..e38f58647 100644
--- a/cpp-ch/local-engine/Shuffle/SortedPartitionDataMerger.h
+++ b/cpp-ch/local-engine/Shuffle/SortedPartitionDataMerger.h
@@ -22,6 +22,8 @@
 
 namespace local_engine
 {
+
+int64_t searchLastPartitionIdIndex(DB::ColumnPtr column, size_t start, size_t 
partition_id);
 class SortedPartitionDataMerger;
 using SortedPartitionDataMergerPtr = 
std::unique_ptr<SortedPartitionDataMerger>;
 class SortedPartitionDataMerger
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index c7721b470..00a7b1b0a 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -671,7 +671,8 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
     jboolean flush_block_buffer_before_evict,
     jlong max_sort_buffer_size,
     jboolean spill_firstly_before_stop,
-    jboolean force_sort)
+    jboolean force_external_sort,
+    jboolean force_memory_sort)
 {
     LOCAL_ENGINE_JNI_METHOD_START
     std::string hash_exprs;
@@ -718,7 +719,8 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
         .flush_block_buffer_before_evict = 
static_cast<bool>(flush_block_buffer_before_evict),
         .max_sort_buffer_size = static_cast<size_t>(max_sort_buffer_size),
         .spill_firstly_before_stop = 
static_cast<bool>(spill_firstly_before_stop),
-        .force_sort = static_cast<bool>(force_sort)
+        .force_external_sort = static_cast<bool>(force_external_sort),
+        .force_mermory_sort = static_cast<bool>(force_memory_sort)
     };
     auto name = jstring2string(env, short_name);
     local_engine::SplitterHolder * splitter;
@@ -745,7 +747,9 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
     jstring hash_algorithm,
     jobject pusher,
     jboolean throw_if_memory_exceed,
-    jboolean flush_block_buffer_before_evict)
+    jboolean flush_block_buffer_before_evict,
+    jboolean force_external_sort,
+    jboolean force_memory_sort)
 {
     LOCAL_ENGINE_JNI_METHOD_START
     std::string hash_exprs;
@@ -780,7 +784,10 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
         .spill_threshold = static_cast<size_t>(spill_threshold),
         .hash_algorithm = jstring2string(env, hash_algorithm),
         .throw_if_memory_exceed = static_cast<bool>(throw_if_memory_exceed),
-        .flush_block_buffer_before_evict = 
static_cast<bool>(flush_block_buffer_before_evict)};
+        .flush_block_buffer_before_evict = 
static_cast<bool>(flush_block_buffer_before_evict),
+        .force_external_sort = static_cast<bool>(force_external_sort),
+        .force_mermory_sort = static_cast<bool>(force_memory_sort)
+    };
     auto name = jstring2string(env, short_name);
     local_engine::SplitterHolder * splitter;
     splitter = new local_engine::SplitterHolder{.splitter = 
std::make_unique<local_engine::CachedShuffleWriter>(name, options, pusher)};
diff --git 
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala
 
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala
index 75efa3553..524a3ee2e 100644
--- 
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala
+++ 
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala
@@ -78,7 +78,9 @@ class CHCelebornHashBasedColumnarShuffleWriter[K, V](
         CHBackendSettings.shuffleHashAlgorithm,
         celebornPartitionPusher,
         GlutenConfig.getConf.chColumnarThrowIfMemoryExceed,
-        GlutenConfig.getConf.chColumnarFlushBlockBufferBeforeEvict
+        GlutenConfig.getConf.chColumnarFlushBlockBufferBeforeEvict,
+        GlutenConfig.getConf.chColumnarForceExternalSortShuffle,
+        GlutenConfig.getConf.chColumnarForceMemorySortShuffle
       )
       CHNativeMemoryAllocators.createSpillable(
         "CelebornShuffleWriter",
diff --git 
a/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarSortShuffleAQESuite.scala
 
b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarExternalSortShuffleSuite.scala
similarity index 97%
rename from 
gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarSortShuffleAQESuite.scala
rename to 
gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarExternalSortShuffleSuite.scala
index 0072fe8c9..3ecf1fc1a 100644
--- 
a/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarSortShuffleAQESuite.scala
+++ 
b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarExternalSortShuffleSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.execution.CoalescedPartitionSpec
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AdaptiveSparkPlanHelper, AQEShuffleReadExec}
 import org.apache.spark.sql.internal.SQLConf
 
-class GlutenClickHouseRSSColumnarSortShuffleAQESuite
+class GlutenClickHouseRSSColumnarExternalSortShuffleSuite
   extends GlutenClickHouseTPCHAbstractSuite
   with AdaptiveSparkPlanHelper {
 
@@ -45,7 +45,7 @@ class GlutenClickHouseRSSColumnarSortShuffleAQESuite
       .set("spark.sql.adaptive.enabled", "true")
       .set("spark.shuffle.service.enabled", "false")
       .set("spark.celeborn.client.spark.shuffle.writer", "hash")
-      .set("spark.gluten.sql.columnar.backend.ch.forceSortShuffle", "true")
+      .set("spark.gluten.sql.columnar.backend.ch.forceExternalSortShuffle", 
"true")
   }
 
   test("TPCH Q1") {
diff --git 
a/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarMemorySortShuffleSuite.scala
 
b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarMemorySortShuffleSuite.scala
new file mode 100644
index 000000000..ddef1d87c
--- /dev/null
+++ 
b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarMemorySortShuffleSuite.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+
+class GlutenClickHouseRSSColumnarMemorySortShuffleSuite
+  extends GlutenClickHouseTPCHAbstractSuite
+  with AdaptiveSparkPlanHelper {
+
+  override protected val tablesPath: String = basePath + "/tpch-data-ch"
+  override protected val tpchQueries: String = rootPath + 
"queries/tpch-queries-ch"
+  override protected val queriesResults: String =
+    rootPath + 
"../../../../../backends-clickhouse/src/test/resources/mergetree-queries-output"
+
+  override protected val parquetTableDataPath: String =
+    "../../../../../gluten-core/src/test/resources/tpch-data"
+
+  /** Run Gluten + ClickHouse Backend with ColumnarShuffleManager */
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set(
+        "spark.shuffle.manager",
+        "org.apache.spark.shuffle.gluten.celeborn.CelebornShuffleManager")
+      .set("spark.io.compression.codec", "LZ4")
+      .set("spark.sql.shuffle.partitions", "5")
+      .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+      .set("spark.sql.adaptive.enabled", "true")
+      .set("spark.shuffle.service.enabled", "false")
+      .set("spark.celeborn.client.spark.shuffle.writer", "hash")
+      .set("spark.gluten.sql.columnar.backend.ch.forceMemorySortShuffle", 
"true")
+  }
+
+  test("TPCH Q1") {
+    runTPCHQuery(1) { df => }
+  }
+
+  test("TPCH Q2") {
+    runTPCHQuery(2) { df => }
+  }
+
+  test("TPCH Q3") {
+    runTPCHQuery(3) { df => }
+  }
+
+  test("TPCH Q4") {
+    runTPCHQuery(4) { df => }
+  }
+
+  test("TPCH Q5") {
+    runTPCHQuery(5) { df => }
+  }
+
+  test("TPCH Q6") {
+    runTPCHQuery(6) { df => }
+  }
+
+  test("TPCH Q7") {
+    runTPCHQuery(7) { df => }
+  }
+
+  test("TPCH Q8") {
+    runTPCHQuery(8) { df => }
+  }
+
+  test("TPCH Q9") {
+    runTPCHQuery(9) { df => }
+  }
+
+  test("TPCH Q10") {
+    runTPCHQuery(10) { df => }
+  }
+
+  test("TPCH Q11") {
+    runTPCHQuery(11) { df => }
+  }
+
+  test("TPCH Q12") {
+    runTPCHQuery(12) { df => }
+  }
+
+  test("TPCH Q13") {
+    runTPCHQuery(13) { df => }
+  }
+
+  test("TPCH Q14") {
+    runTPCHQuery(14) { df => }
+  }
+
+  test("TPCH Q15") {
+    runTPCHQuery(15) { df => }
+  }
+
+  test("TPCH Q16") {
+    runTPCHQuery(16, noFallBack = false) { df => }
+  }
+
+  test("TPCH Q17") {
+    runTPCHQuery(17) { df => }
+  }
+
+  test("TPCH Q18") {
+    runTPCHQuery(18) { df => }
+  }
+
+  test("TPCH Q19") {
+    runTPCHQuery(19) { df => }
+  }
+
+  test("TPCH Q20") {
+    runTPCHQuery(20) { df => }
+  }
+
+  test("TPCH Q21") {
+    runTPCHQuery(21, noFallBack = false) { df => }
+  }
+
+  test("TPCH Q22") {
+    runTPCHQuery(22) { df => }
+  }
+}
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index c9a62b8b7..6659a42c7 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -297,7 +297,14 @@ class GlutenConfig(conf: SQLConf) extends Logging {
 
   def chColumnarShufflePreferSpill: Boolean = 
conf.getConf(COLUMNAR_CH_SHUFFLE_PREFER_SPILL_ENABLED)
 
-  def chColumnarShuffleSpillThreshold: Long = 
conf.getConf(COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD)
+  def chColumnarShuffleSpillThreshold: Long = {
+    val threshold = conf.getConf(COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD)
+    if (threshold == 0) {
+      (conf.getConf(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES) * 0.9).toLong
+    } else {
+      threshold
+    }
+  }
 
   def chColumnarThrowIfMemoryExceed: Boolean = 
conf.getConf(COLUMNAR_CH_THROW_IF_MEMORY_EXCEED)
 
@@ -309,7 +316,11 @@ class GlutenConfig(conf: SQLConf) extends Logging {
   def chColumnarSpillFirstlyBeforeStop: Boolean =
     conf.getConf(COLUMNAR_CH_SPILL_FIRSTLY_BEFORE_STOP)
 
-  def chColumnarForceSortShuffle: Boolean = 
conf.getConf(COLUMNAR_CH_FORCE_SORT_SHUFFLE)
+  def chColumnarForceExternalSortShuffle: Boolean =
+    conf.getConf(COLUMNAR_CH_FORCE_EXTERNAL_SORT_SHUFFLE)
+
+  def chColumnarForceMemorySortShuffle: Boolean =
+    conf.getConf(COLUMNAR_CH_FORCE_MEMORY_SORT_SHUFFLE)
 
   def cartesianProductTransformerEnabled: Boolean =
     conf.getConf(CARTESIAN_PRODUCT_TRANSFORMER_ENABLED)
@@ -1416,7 +1427,7 @@ object GlutenConfig {
       .internal()
       .doc("The maximum size of sort shuffle buffer in CH backend.")
       .bytesConf(ByteUnit.BYTE)
-      .createWithDefaultString("1GB")
+      .createWithDefaultString("0")
 
   val COLUMNAR_CH_SPILL_FIRSTLY_BEFORE_STOP =
     buildConf("spark.gluten.sql.columnar.backend.ch.spillFirstlyBeforeStop")
@@ -1425,11 +1436,17 @@ object GlutenConfig {
       .booleanConf
       .createWithDefault(true)
 
-  val COLUMNAR_CH_FORCE_SORT_SHUFFLE =
-    buildConf("spark.gluten.sql.columnar.backend.ch.forceSortShuffle")
+  val COLUMNAR_CH_FORCE_EXTERNAL_SORT_SHUFFLE =
+    buildConf("spark.gluten.sql.columnar.backend.ch.forceExternalSortShuffle")
+      .internal()
+      .doc("Whether to force to use external sort shuffle in CH backend. ")
+      .booleanConf
+      .createWithDefault(false)
+
+  val COLUMNAR_CH_FORCE_MEMORY_SORT_SHUFFLE =
+    buildConf("spark.gluten.sql.columnar.backend.ch.forceMemorySortShuffle")
       .internal()
-      .doc("Whether to force to use sort shuffle in CH backend. " +
-        "Sort shuffle will enable When partition num greater than 300.")
+      .doc("Whether to force to use memory sort shuffle in CH backend. ")
       .booleanConf
       .createWithDefault(false)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to