This is an automated email from the ASF dual-hosted git repository.

marin-ma pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b1e5115e5 [VL] Reduce Velox hash shuffle partition buffer memory by 
evicting large partitions after split (#12089)
0b1e5115e5 is described below

commit 0b1e5115e5a2f9a05ad9f898f42b81eb42cb87ee
Author: wankun <[email protected]>
AuthorDate: Tue May 19 16:57:41 2026 +0800

    [VL] Reduce Velox hash shuffle partition buffer memory by evicting large 
partitions after split (#12089)
---
 .../VeloxCelebornColumnarShuffleWriter.scala       |  1 +
 .../writer/VeloxUniffleColumnarShuffleWriter.java  |  1 +
 .../spark/shuffle/ColumnarShuffleWriter.scala      |  1 +
 cpp/core/jni/JniWrapper.cc                         |  4 ++-
 cpp/core/shuffle/Options.h                         | 14 +++++++---
 cpp/core/shuffle/Payload.cc                        |  2 +-
 cpp/velox/shuffle/VeloxHashShuffleWriter.cc        | 32 ++++++++++++++++++++++
 cpp/velox/shuffle/VeloxHashShuffleWriter.h         |  6 +++-
 docs/Configuration.md                              |  1 +
 .../gluten/vectorized/ShuffleWriterJniWrapper.java |  1 +
 .../org/apache/gluten/config/GlutenConfig.scala    | 11 ++++++++
 11 files changed, 67 insertions(+), 7 deletions(-)

diff --git 
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
 
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
index 783d68a00c..03208adbcf 100644
--- 
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
+++ 
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
@@ -150,6 +150,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
           GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning, 
context.partitionId),
           nativeBufferSize,
           GlutenConfig.get.columnarShuffleReallocThreshold,
+          GlutenConfig.get.columnarShufflePartitionBufferEvictThreshold,
           partitionWriterHandle
         )
       case SortShuffleWriterType =>
diff --git 
a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
 
b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index ef35818c7b..e01f97ba3e 100644
--- 
a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++ 
b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -185,6 +185,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
                         columnarDep.nativePartitioning(), partitionId),
                     nativeBufferSize,
                     reallocThreshold,
+                    
GlutenConfig.get().columnarShufflePartitionBufferEvictThreshold(),
                     partitionWriterHandle);
           }
 
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
 
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index 01f4bd06ba..ff1c66e037 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -192,6 +192,7 @@ class ColumnarShuffleWriter[K, V](
                 taskContext.partitionId),
               nativeBufferSize,
               reallocThreshold,
+              GlutenConfig.get.columnarShufflePartitionBufferEvictThreshold,
               partitionWriterHandle
             )
           }
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index f109865840..46b9d7603c 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -990,6 +990,7 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
     jint startPartitionId,
     jint splitBufferSize,
     jdouble splitBufferReallocThreshold,
+    jint partitionBufferEvictThreshold,
     jlong partitionWriterHandle) {
   JNI_METHOD_START
   const auto ctx = getRuntime(env, wrapper);
@@ -1004,7 +1005,8 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
       toPartitioning(jStringToCString(env, partitioningNameJstr)),
       startPartitionId,
       splitBufferSize,
-      splitBufferReallocThreshold);
+      splitBufferReallocThreshold,
+      partitionBufferEvictThreshold);
 
   return ctx->saveObject(ctx->createShuffleWriter(numPartitions, 
partitionWriter, shuffleWriterOptions));
   JNI_METHOD_END(kInvalidObjectHandle)
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index ea3aff10cf..649a164774 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -27,6 +27,7 @@
 namespace gluten {
 
 static constexpr int16_t kDefaultBatchSize = 4096;
+static constexpr int32_t kDefaultPartitionBufferEvictThreshold = -1;
 static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096;
 static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20;
 static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
@@ -85,6 +86,7 @@ struct ShuffleWriterOptions {
 struct HashShuffleWriterOptions : ShuffleWriterOptions {
   int32_t splitBufferSize = kDefaultShuffleWriterBufferSize;
   double splitBufferReallocThreshold = kDefaultSplitBufferReallocThreshold;
+  int32_t partitionBufferEvictThreshold = 
kDefaultPartitionBufferEvictThreshold;
 
   HashShuffleWriterOptions() : 
ShuffleWriterOptions(ShuffleWriterType::kHashShuffle) {}
 
@@ -92,10 +94,12 @@ struct HashShuffleWriterOptions : ShuffleWriterOptions {
       Partitioning partitioning,
       int32_t startPartitionId,
       int32_t partitionBufferSize,
-      double partitionBufferReallocThreshold)
+      double partitionBufferReallocThreshold,
+      int32_t partitionBufferEvictThreshold = 
kDefaultPartitionBufferEvictThreshold)
       : ShuffleWriterOptions(ShuffleWriterType::kHashShuffle, partitioning, 
startPartitionId),
         splitBufferSize(partitionBufferSize),
-        splitBufferReallocThreshold(partitionBufferReallocThreshold) {}
+        splitBufferReallocThreshold(partitionBufferReallocThreshold),
+        partitionBufferEvictThreshold(partitionBufferEvictThreshold) {}
 
  protected:
   HashShuffleWriterOptions(ShuffleWriterType shuffleWriterType) : 
ShuffleWriterOptions(shuffleWriterType) {}
@@ -105,10 +109,12 @@ struct HashShuffleWriterOptions : ShuffleWriterOptions {
       Partitioning partitioning,
       int32_t startPartitionId,
       int32_t partitionBufferSize,
-      double partitionBufferReallocThreshold)
+      double partitionBufferReallocThreshold,
+      int32_t partitionBufferEvictThreshold = 
kDefaultPartitionBufferEvictThreshold)
       : ShuffleWriterOptions(shuffleWriterType, partitioning, 
startPartitionId),
         splitBufferSize(partitionBufferSize),
-        splitBufferReallocThreshold(partitionBufferReallocThreshold) {}
+        splitBufferReallocThreshold(partitionBufferReallocThreshold),
+        partitionBufferEvictThreshold(partitionBufferEvictThreshold) {}
 };
 
 struct SortShuffleWriterOptions : ShuffleWriterOptions {
diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc
index 230806cd2f..5d02a84483 100644
--- a/cpp/core/shuffle/Payload.cc
+++ b/cpp/core/shuffle/Payload.cc
@@ -60,7 +60,7 @@ arrow::Result<uint8_t> 
readPayloadType(arrow::io::InputStream* is) {
 }
 
 arrow::Result<int64_t> compressBuffer(
-    const std::shared_ptr<arrow::Buffer>& buffer,
+    const std::shared_ptr<arrow::Buffer> buffer,
     uint8_t* output,
     int64_t outputLength,
     arrow::util::Codec* codec) {
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
index e071e8a1c3..89f1dbcda6 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
@@ -441,9 +441,41 @@ arrow::Status VeloxHashShuffleWriter::doSplit(const 
facebook::velox::RowVector&
   printPartitionBuffer();
 
   setSplitState(SplitState::kInit);
+  if (partitionBufferEvictThreshold_ > 0) {
+    // After split, evict large partition buffers to free up memory for the 
next input RowVector.
+    const auto partitionBytes = estimatePartitionBufferBytes();
+    for (uint32_t pid = 0; pid < partitionBytes.size(); ++pid) {
+      if (partitionBufferBase_[pid] > 0 && partitionBytes[pid] >= 
partitionBufferEvictThreshold_) {
+        RETURN_NOT_OK(evictPartitionBuffers(pid, false));
+      }
+    }
+  }
   return arrow::Status::OK();
 }
 
+std::vector<int64_t> VeloxHashShuffleWriter::estimatePartitionBufferBytes() 
const {
+  std::vector<int64_t> partitionBytes(numPartitions_, 0);
+
+  for (const auto& columnBuffers : partitionBuffers_) {
+    for (uint32_t pid = 0; pid < columnBuffers.size(); ++pid) {
+      for (const auto& buffer : columnBuffers[pid]) {
+        if (buffer) {
+          partitionBytes[pid] += buffer->capacity();
+        }
+      }
+    }
+  }
+
+  for (uint32_t pid = 0; pid < complexTypeFlushBuffer_.size(); ++pid) {
+    const auto& buffer = complexTypeFlushBuffer_[pid];
+    if (buffer) {
+      partitionBytes[pid] += buffer->capacity();
+    }
+  }
+
+  return partitionBytes;
+}
+
 arrow::Status VeloxHashShuffleWriter::splitRowVector(const 
facebook::velox::RowVector& rv) {
   SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitRV]);
 
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.h 
b/cpp/velox/shuffle/VeloxHashShuffleWriter.h
index 899c2be269..d2901019b7 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.h
@@ -278,7 +278,8 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
       MemoryManager* memoryManager)
       : VeloxShuffleWriter(numPartitions, partitionWriter, options, 
memoryManager),
         splitBufferSize_(options->splitBufferSize),
-        splitBufferReallocThreshold_(options->splitBufferReallocThreshold) {
+        splitBufferReallocThreshold_(options->splitBufferReallocThreshold),
+        partitionBufferEvictThreshold_(options->partitionBufferEvictThreshold) 
{
     arenas_.resize(numPartitions);
   }
 
@@ -287,6 +288,8 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
 
   arrow::Status initColumnTypes(const facebook::velox::RowVector& rv);
 
+  std::vector<int64_t> estimatePartitionBufferBytes() const;
+
   arrow::Status splitRowVector(const facebook::velox::RowVector& rv);
 
   arrow::Status initFromRowVector(const facebook::velox::RowVector& rv);
@@ -396,6 +399,7 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
  protected:
   int32_t splitBufferSize_;
   double splitBufferReallocThreshold_;
+  int32_t partitionBufferEvictThreshold_;
 
   std::shared_ptr<arrow::Schema> schema_;
 
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 9a7cc8af8c..1bd4dce5b4 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -95,6 +95,7 @@ nav_order: 15
 | spark.gluten.sql.columnar.shuffle.compression.threshold             | 100    
           | If number of rows in a batch falls below this threshold, will copy 
all buffers into one buffer to compress.                                        
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.shuffle.dictionary.enabled                | false  
           | Enable dictionary in hash-based shuffle.                           
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.shuffle.merge.threshold                   | 0.25   
           |
+| spark.gluten.sql.columnar.shuffle.partitionBufferEvictThreshold     | -1     
           | For Velox hash shuffle writer, evict partition buffers larger than 
this threshold after splitting an input batch. Use non-positive value to 
disable this feature.                                                           
                                                                                
                                                                                
                     [...]
 | spark.gluten.sql.columnar.shuffle.readerBufferSize                  | 1MB    
           | Buffer size in bytes for shuffle reader reading input stream from 
local or remote.                                                                
                                                                                
                                                                                
                                                                                
               [...]
 | spark.gluten.sql.columnar.shuffle.realloc.threshold                 | 0.25   
           |
 | spark.gluten.sql.columnar.shuffle.sort.columns.threshold            | 100000 
           | The threshold to determine whether to use sort-based columnar 
shuffle. Sort-based shuffle will be used if the number of columns is greater 
than this threshold.                                                            
                                                                                
                                                                                
                      [...]
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
index a389da6860..87685f8505 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
@@ -43,6 +43,7 @@ public class ShuffleWriterJniWrapper implements RuntimeAware {
       int startPartitionId,
       int splitBufferSize,
       double splitBufferReallocThreshold,
+      int partitionBufferEvictThreshold,
       long partitionWriterHandle);
 
   public native long createSortShuffleWriter(
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 0bbbcead63..f7ac297dec 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -225,6 +225,9 @@ class GlutenConfig(conf: SQLConf) extends 
GlutenCoreConfig(conf) {
 
   def columnarShuffleReallocThreshold: Double = 
getConf(COLUMNAR_SHUFFLE_REALLOC_THRESHOLD)
 
+  def columnarShufflePartitionBufferEvictThreshold: Int =
+    getConf(COLUMNAR_SHUFFLE_PARTITION_BUFFER_EVICT_THRESHOLD)
+
   def columnarShuffleMergeThreshold: Double = 
getConf(SHUFFLE_WRITER_MERGE_THRESHOLD)
 
   def columnarShuffleCodec: Option[String] = getConf(COLUMNAR_SHUFFLE_CODEC)
@@ -1074,6 +1077,14 @@ object GlutenConfig extends ConfigRegistry {
       .checkValue(v => v >= 0 && v <= 1, "Buffer reallocation threshold must 
between [0, 1]")
       .createWithDefault(0.25)
 
+  val COLUMNAR_SHUFFLE_PARTITION_BUFFER_EVICT_THRESHOLD =
+    
buildConf("spark.gluten.sql.columnar.shuffle.partitionBufferEvictThreshold")
+      .doc(
+        "For Velox hash shuffle writer, evict partition buffers larger than 
this threshold " +
+          "after splitting an input batch. Use non-positive value to disable 
this feature.")
+      .intConf
+      .createWithDefault(-1)
+
   val COLUMNAR_SHUFFLE_CODEC =
     buildConf("spark.gluten.sql.columnar.shuffle.codec")
       .doc(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to