This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 408f4cbb0 [VL] Row based sort follow-up (#6579)
408f4cbb0 is described below

commit 408f4cbb0a9769ed8ea86d4cd6cb5cfa3eb62527
Author: Rong Ma <[email protected]>
AuthorDate: Mon Jul 29 16:14:51 2024 +0800

    [VL] Row based sort follow-up (#6579)
---
 .../gluten/backendsapi/velox/VeloxMetricsApi.scala |   1 +
 cpp/core/CMakeLists.txt                            |   1 -
 cpp/core/jni/JniWrapper.cc                         |   6 +-
 cpp/core/shuffle/Options.cc                        |  18 --
 cpp/core/shuffle/Options.h                         |   7 +
 cpp/velox/shuffle/RadixSort.h                      | 157 +++++++++++++++
 cpp/velox/shuffle/VeloxShuffleReader.cc            |   4 +-
 cpp/velox/shuffle/VeloxShuffleReader.h             |   3 +
 cpp/velox/shuffle/VeloxSortShuffleWriter.cc        | 217 ++++++++++++++-------
 cpp/velox/shuffle/VeloxSortShuffleWriter.h         |  29 +--
 cpp/velox/tests/VeloxShuffleWriterTest.cc          |  11 +-
 cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h |   4 +-
 .../VeloxCelebornColumnarBatchSerializer.scala     |   2 +
 .../VeloxCelebornColumnarShuffleWriter.scala       |  17 +-
 .../gluten/vectorized/ShuffleWriterJniWrapper.java |  10 +
 .../spark/shuffle/ColumnarShuffleWriter.scala      |  11 +-
 .../writer/VeloxUniffleColumnarShuffleWriter.java  |   8 +-
 .../scala/org/apache/gluten/GlutenConfig.scala     |   1 -
 18 files changed, 384 insertions(+), 123 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 30b08749e..c05ce7cdc 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -272,6 +272,7 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "compressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time 
to compress"),
       "decompressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, 
"time to decompress"),
       "deserializeTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, 
"time to deserialize"),
+      "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, 
"shuffle wall time"),
       // For hash shuffle writer, the peak bytes represents the maximum split 
buffer size.
       // For sort shuffle writer, the peak bytes represents the maximum
       // row buffer + sort buffer size.
diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt
index caad8db1e..ef21ccbe8 100644
--- a/cpp/core/CMakeLists.txt
+++ b/cpp/core/CMakeLists.txt
@@ -191,7 +191,6 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
     shuffle/FallbackRangePartitioner.cc
     shuffle/HashPartitioner.cc
     shuffle/LocalPartitionWriter.cc
-    shuffle/Options.cc
     shuffle/Partitioner.cc
     shuffle/Partitioning.cc
     shuffle/Payload.cc
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 3d6da31c7..f39f9c923 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -755,6 +755,8 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
     jint compressionLevel,
     jint compressionThreshold,
     jstring compressionModeJstr,
+    jint sortBufferInitialSize,
+    jboolean useRadixSort,
     jstring dataFileJstr,
     jint numSubDirs,
     jstring localDirsJstr,
@@ -780,7 +782,9 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
       .partitioning = gluten::toPartitioning(jStringToCString(env, 
partitioningNameJstr)),
       .taskAttemptId = (int64_t)taskAttemptId,
       .startPartitionId = startPartitionId,
-      .shuffleWriterType = 
gluten::ShuffleWriter::stringToType(jStringToCString(env, 
shuffleWriterTypeJstr))};
+      .shuffleWriterType = 
gluten::ShuffleWriter::stringToType(jStringToCString(env, 
shuffleWriterTypeJstr)),
+      .sortBufferInitialSize = sortBufferInitialSize,
+      .useRadixSort = static_cast<bool>(useRadixSort)};
 
   // Build PartitionWriterOptions.
   auto partitionWriterOptions = PartitionWriterOptions{
diff --git a/cpp/core/shuffle/Options.cc b/cpp/core/shuffle/Options.cc
deleted file mode 100644
index 8e05a10d6..000000000
--- a/cpp/core/shuffle/Options.cc
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "shuffle/Options.h"
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 757950d03..11fa037eb 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -35,9 +35,12 @@ static constexpr int32_t kDefaultBufferAlignment = 64;
 static constexpr double kDefaultBufferReallocThreshold = 0.25;
 static constexpr double kDefaultMergeBufferThreshold = 0.25;
 static constexpr bool kEnableBufferedWrite = true;
+static constexpr bool kDefaultUseRadixSort = true;
+static constexpr int32_t kDefaultSortBufferSize = 4096;
 
 enum ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle };
 enum PartitionWriterType { kLocal, kRss };
+enum SortAlgorithm { kRadixSort, kQuickSort };
 
 struct ShuffleReaderOptions {
   arrow::Compression::type compressionType = 
arrow::Compression::type::LZ4_FRAME;
@@ -56,6 +59,10 @@ struct ShuffleWriterOptions {
   int32_t startPartitionId = 0;
   int64_t threadId = -1;
   ShuffleWriterType shuffleWriterType = kHashShuffle;
+
+  // Sort shuffle writer.
+  int32_t sortBufferInitialSize = kDefaultSortBufferSize;
+  bool useRadixSort = kDefaultUseRadixSort;
 };
 
 struct PartitionWriterOptions {
diff --git a/cpp/velox/shuffle/RadixSort.h b/cpp/velox/shuffle/RadixSort.h
new file mode 100644
index 000000000..17f05d349
--- /dev/null
+++ b/cpp/velox/shuffle/RadixSort.h
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <algorithm>
+#include <cassert>
+#include <iostream>
+#include <vector>
+
+namespace gluten {
+
+template <typename Element>
+class RadixSort {
+ public:
+  /**
+   * Sorts a given array of longs using least-significant-digit radix sort. 
This routine assumes
+   * you have extra space at the end of the array at least equal to the number 
of records. The
+   * sort is destructive and may relocate the data positioned within the array.
+   *
+   * @param array array of long elements followed by at least that many empty 
slots.
+   * @param numRecords number of data records in the array.
+   * @param startByteIndex the first byte (in range [0, 7]) to sort each long 
by, counting from the
+   *                       least significant byte.
+   * @param endByteIndex the last byte (in range [0, 7]) to sort each long by, 
counting from the
+   *                     least significant byte. Must be greater than 
startByteIndex.
+   *
+   * @return The starting index of the sorted data within the given array. We 
return this instead
+   *         of always copying the data back to position zero for efficiency.
+   */
+  static int32_t sort(Element* array, size_t size, int64_t numRecords, int32_t 
startByteIndex, int32_t endByteIndex) {
+    assert(startByteIndex >= 0 && "startByteIndex should >= 0");
+    assert(endByteIndex <= 7 && "endByteIndex should <= 7");
+    assert(endByteIndex > startByteIndex);
+    assert(numRecords * 2 <= size);
+
+    int64_t inIndex = 0;
+    int64_t outIndex = numRecords;
+
+    if (numRecords > 0) {
+      auto counts = getCounts(array, numRecords, startByteIndex, endByteIndex);
+
+      for (auto i = startByteIndex; i <= endByteIndex; i++) {
+        if (!counts[i].empty()) {
+          sortAtByte(array, numRecords, counts[i], i, inIndex, outIndex);
+          std::swap(inIndex, outIndex);
+        }
+      }
+    }
+
+    return static_cast<int32_t>(inIndex);
+  }
+
+ private:
+  /**
+   * Performs a partial sort by copying data into destination offsets for each 
byte value at the
+   * specified byte offset.
+   *
+   * @param array array to partially sort.
+   * @param numRecords number of data records in the array.
+   * @param counts counts for each byte value. This routine destructively 
modifies this array.
+   * @param byteIdx the byte in a long to sort at, counting from the least 
significant byte.
+   * @param inIndex the starting index in the array where input data is 
located.
+   * @param outIndex the starting index where sorted output data should be 
written.
+   */
+  static void sortAtByte(
+      Element* array,
+      int64_t numRecords,
+      std::vector<int64_t>& counts,
+      int32_t byteIdx,
+      int64_t inIndex,
+      int64_t outIndex) {
+    assert(counts.size() == 256);
+
+    auto offsets = transformCountsToOffsets(counts, outIndex);
+
+    for (auto offset = inIndex; offset < inIndex + numRecords; ++offset) {
+      auto bucket = (array[offset] >> (byteIdx * 8)) & 0xff;
+      array[offsets[bucket]++] = array[offset];
+    }
+  }
+
+  /**
+   * Computes a value histogram for each byte in the given array.
+   *
+   * @param array array to count records in.
+   * @param numRecords number of data records in the array.
+   * @param startByteIndex the first byte to compute counts for (the prior are 
skipped).
+   * @param endByteIndex the last byte to compute counts for.
+   *
+   * @return a vector of eight 256-element count arrays, one for each byte 
starting from the least
+   *         significant byte. If the byte does not need sorting the vector 
entry will be empty.
+   */
+  static std::vector<std::vector<int64_t>>
+  getCounts(Element* array, int64_t numRecords, int32_t startByteIndex, 
int32_t endByteIndex) {
+    std::vector<std::vector<int64_t>> counts;
+    counts.resize(8);
+
+    // Optimization: do a fast pre-pass to determine which byte indices we can 
skip for sorting.
+    // If all the byte values at a particular index are the same we don't need 
to count it.
+    int64_t bitwiseMax = 0;
+    int64_t bitwiseMin = -1L;
+    for (auto offset = 0; offset < numRecords; ++offset) {
+      auto value = array[offset];
+      bitwiseMax |= value;
+      bitwiseMin &= value;
+    }
+    auto bitsChanged = bitwiseMin ^ bitwiseMax;
+
+    // Compute counts for each byte index.
+    for (auto i = startByteIndex; i <= endByteIndex; i++) {
+      if (((bitsChanged >> (i * 8)) & 0xff) != 0) {
+        counts[i].resize(256);
+        for (auto offset = 0; offset < numRecords; ++offset) {
+          counts[i][(array[offset] >> (i * 8)) & 0xff]++;
+        }
+      }
+    }
+
+    return counts;
+  }
+
+  /**
+   * Transforms counts into the proper output offsets for the sort type.
+   *
+   * @param counts counts for each byte value. This routine destructively 
modifies this vector.
+   * @param numRecords number of data records in the original data array.
+   * @param outputOffset output offset in bytes from the base array object.
+   *
+   * @return the input counts vector.
+   */
+  static std::vector<int64_t>& transformCountsToOffsets(std::vector<int64_t>& 
counts, int64_t outputOffset) {
+    assert(counts.size() == 256);
+
+    int64_t pos = outputOffset;
+    for (auto i = 0; i < 256; i++) {
+      auto tmp = counts[i & 0xff];
+      counts[i & 0xff] = pos;
+      pos += tmp;
+    }
+
+    return counts;
+  }
+};
+
+} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc 
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index e55f4d01d..ab93d9a33 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -428,8 +428,8 @@ std::shared_ptr<ColumnarBatch> 
VeloxSortShuffleReaderDeserializer::deserializeTo
     auto buffer = cur->second;
     const auto* rawBuffer = buffer->as<char>();
     while (rowOffset_ < cur->first && readRows < batchSize_) {
-      auto rowSize = *(uint32_t*)(rawBuffer + byteOffset_);
-      byteOffset_ += sizeof(uint32_t);
+      auto rowSize = *(RowSizeType*)(rawBuffer + byteOffset_) - 
sizeof(RowSizeType);
+      byteOffset_ += sizeof(RowSizeType);
       data.push_back(std::string_view(rawBuffer + byteOffset_, rowSize));
       byteOffset_ += rowSize;
       ++rowOffset_;
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h 
b/cpp/velox/shuffle/VeloxShuffleReader.h
index 962739485..2be913aa1 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -20,6 +20,7 @@
 #include "operators/serializer/VeloxColumnarBatchSerializer.h"
 #include "shuffle/Payload.h"
 #include "shuffle/ShuffleReader.h"
+#include "shuffle/VeloxSortShuffleWriter.h"
 #include "velox/type/Type.h"
 #include "velox/vector/ComplexVector.h"
 
@@ -64,6 +65,8 @@ class VeloxHashShuffleReaderDeserializer final : public 
ColumnarBatchIterator {
 
 class VeloxSortShuffleReaderDeserializer final : public ColumnarBatchIterator {
  public:
+  using RowSizeType = VeloxSortShuffleWriter::RowSizeType;
+
   VeloxSortShuffleReaderDeserializer(
       std::shared_ptr<arrow::io::InputStream> in,
       const std::shared_ptr<arrow::Schema>& schema,
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index 7c033fb98..0015ba9d3 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -29,6 +29,10 @@ namespace gluten {
 namespace {
 constexpr uint32_t kMaskLower27Bits = (1 << 27) - 1;
 constexpr uint64_t kMaskLower40Bits = (1UL << 40) - 1;
+constexpr uint32_t kPartitionIdStartByteIndex = 5;
+constexpr uint32_t kPartitionIdEndByteIndex = 7;
+
+constexpr uint32_t kSortedBufferSize = 1 * 1024 * 1024;
 
 uint64_t toCompactRowId(uint32_t partitionId, uint32_t pageNumber, uint32_t 
offsetInPage) {
   // |63 partitionId(24) |39 inputIndex(13) |26 rowIndex(27) |
@@ -62,9 +66,7 @@ VeloxSortShuffleWriter::VeloxSortShuffleWriter(
     ShuffleWriterOptions options,
     std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
     arrow::MemoryPool* pool)
-    : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), 
std::move(options), std::move(veloxPool), pool),
-      
allocator_{std::make_unique<facebook::velox::HashStringAllocator>(veloxPool_.get())},
-      array_{SortArray{Allocator(allocator_.get())}} {}
+    : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), 
std::move(options), std::move(veloxPool), pool) {}
 
 arrow::Status VeloxSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, 
int64_t memLimit) {
   ARROW_ASSIGN_OR_RAISE(auto rv, getPeeledRowVector(cb));
@@ -76,10 +78,14 @@ arrow::Status 
VeloxSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, i
 arrow::Status VeloxSortShuffleWriter::stop() {
   ARROW_RETURN_IF(evictState_ == EvictState::kUnevictable, 
arrow::Status::Invalid("Unevictable state in stop."));
 
-  EvictGuard evictGuard{evictState_};
-
   stopped_ = true;
-  RETURN_NOT_OK(evictAllPartitions());
+  if (offset_ > 0) {
+    RETURN_NOT_OK(evictAllPartitions());
+  }
+  array_.reset();
+  sortedBuffer_.reset();
+  pages_.clear();
+  pageAddresses_.clear();
   RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
   return arrow::Status::OK();
 }
@@ -89,7 +95,6 @@ arrow::Status 
VeloxSortShuffleWriter::reclaimFixedSize(int64_t size, int64_t* ac
     *actual = 0;
     return arrow::Status::OK();
   }
-  EvictGuard evictGuard{evictState_};
   auto beforeReclaim = veloxPool_->usedBytes();
   RETURN_NOT_OK(evictAllPartitions());
   *actual = beforeReclaim - veloxPool_->usedBytes();
@@ -100,7 +105,9 @@ arrow::Status VeloxSortShuffleWriter::init() {
   ARROW_RETURN_IF(
       options_.partitioning == Partitioning::kSingle,
       arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single 
partition."));
-  array_.resize(initialSize_);
+  initArray();
+  sortedBuffer_ = 
facebook::velox::AlignedBuffer::allocate<char>(kSortedBufferSize, 
veloxPool_.get());
+  rawBuffer_ = sortedBuffer_->asMutable<uint8_t>();
   return arrow::Status::OK();
 }
 
@@ -108,6 +115,9 @@ void VeloxSortShuffleWriter::initRowType(const 
facebook::velox::RowVectorPtr& rv
   if (UNLIKELY(!rowType_)) {
     rowType_ = facebook::velox::asRowType(rv->type());
     fixedRowSize_ = facebook::velox::row::CompactRow::fixedRowSize(rowType_);
+    if (fixedRowSize_) {
+      *fixedRowSize_ += sizeof(RowSizeType);
+    }
   }
 }
 
@@ -148,11 +158,16 @@ arrow::Status VeloxSortShuffleWriter::insert(const 
facebook::velox::RowVectorPtr
   facebook::velox::row::CompactRow row(vector);
 
   if (!fixedRowSize_) {
-    rowSizes_.resize(inputRows + 1);
-    rowSizes_[0] = 0;
+    rowSize_.resize(inputRows);
+    rowSizePrefixSum_.resize(inputRows + 1);
+    rowSizePrefixSum_[0] = 0;
     for (auto i = 0; i < inputRows; ++i) {
-      rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i);
+      auto rowSize = row.rowSize(i) + sizeof(RowSizeType);
+      rowSize_[i] = rowSize;
+      rowSizePrefixSum_[i + 1] = rowSizePrefixSum_[i] + rowSize;
     }
+  } else {
+    rowSize_.resize(inputRows, *fixedRowSize_);
   }
 
   uint32_t rowOffset = 0;
@@ -160,13 +175,16 @@ arrow::Status VeloxSortShuffleWriter::insert(const 
facebook::velox::RowVectorPtr
     auto remainingRows = inputRows - rowOffset;
     auto rows = maxRowsToInsert(rowOffset, remainingRows);
     if (rows == 0) {
-      auto minSizeRequired = fixedRowSize_ ? fixedRowSize_.value() : 
rowSizes_[rowOffset + 1] - rowSizes_[rowOffset];
+      auto minSizeRequired = fixedRowSize_ ? fixedRowSize_.value() : 
rowSize_[rowOffset];
       acquireNewBuffer((uint64_t)memLimit, minSizeRequired);
       rows = maxRowsToInsert(rowOffset, remainingRows);
       ARROW_RETURN_IF(
           rows == 0, arrow::Status::Invalid("Failed to insert rows. Remaining 
rows: " + std::to_string(remainingRows)));
     }
+    // Spill to avoid offset_ overflow.
     RETURN_NOT_OK(maybeSpill(rows));
+    // Allocate newArray can trigger spill.
+    growArrayIfNecessary(rows);
     insertRows(row, rowOffset, rows);
     rowOffset += rows;
   }
@@ -174,35 +192,48 @@ arrow::Status VeloxSortShuffleWriter::insert(const 
facebook::velox::RowVectorPtr
 }
 
 void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, 
uint32_t offset, uint32_t rows) {
-  // Allocate newArray can trigger spill.
-  growArrayIfNecessary(rows);
+  VELOX_CHECK(!pages_.empty());
   for (auto i = offset; i < offset + rows; ++i) {
+    auto pid = row2Partition_[i];
+    arrayPtr_[offset_++] = toCompactRowId(pid, pageNumber_, pageCursor_);
+    // size(RowSize) | bytes
+    memcpy(currentPage_ + pageCursor_, &rowSize_[i], sizeof(RowSizeType));
+    pageCursor_ += sizeof(RowSizeType);
     auto size = row.serialize(i, currentPage_ + pageCursor_);
-    array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, 
pageCursor_), size};
     pageCursor_ += size;
+    VELOX_DCHECK_LE(pageCursor_, currenPageSize_);
   }
 }
 
 arrow::Status VeloxSortShuffleWriter::maybeSpill(int32_t nextRows) {
   if ((uint64_t)offset_ + nextRows > std::numeric_limits<uint32_t>::max()) {
-    EvictGuard evictGuard{evictState_};
     RETURN_NOT_OK(evictAllPartitions());
   }
   return arrow::Status::OK();
 }
 
 arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
+  EvictGuard evictGuard{evictState_};
+
+  auto numRecords = offset_;
+  int32_t begin = 0;
   {
     ScopedTimer timer(&sortTime_);
-    // TODO: Add radix sort to align with Spark.
-    std::sort(array_.begin(), array_.begin() + offset_);
+    if (options_.useRadixSort) {
+      begin = RadixSort<uint64_t>::sort(
+          arrayPtr_, arraySize_, numRecords, kPartitionIdStartByteIndex, 
kPartitionIdEndByteIndex);
+    } else {
+      auto ptr = arrayPtr_;
+      qsort(ptr, numRecords, sizeof(uint64_t), compare);
+      (void)ptr;
+    }
   }
 
-  size_t begin = 0;
-  size_t cur = 0;
-  auto pid = extractPartitionId(array_[begin].first);
-  while (++cur < offset_) {
-    auto curPid = extractPartitionId(array_[cur].first);
+  auto end = begin + numRecords;
+  auto cur = begin;
+  auto pid = extractPartitionId(arrayPtr_[begin]);
+  while (++cur < end) {
+    auto curPid = extractPartitionId(arrayPtr_[cur]);
     if (curPid != pid) {
       RETURN_NOT_OK(evictPartition(pid, begin, cur));
       pid = curPid;
@@ -211,53 +242,61 @@ arrow::Status 
VeloxSortShuffleWriter::evictAllPartitions() {
   }
   RETURN_NOT_OK(evictPartition(pid, begin, cur));
 
-  pageCursor_ = 0;
-  pages_.clear();
-  pageAddresses_.clear();
-
-  offset_ = 0;
-  array_.clear();
-
-  sortedBuffer_ = nullptr;
-
   if (!stopped_) {
-    // Allocate array_ can trigger spill.
-    array_.resize(initialSize_);
+    // Preserve the last page for use.
+    auto numPages = pages_.size();
+    while (--numPages) {
+      pages_.pop_front();
+    }
+    auto& page = pages_.back();
+    // Clear page for serialization.
+    memset(page->asMutable<char>(), 0, page->size());
+    // currentPage_ should always point to the last page.
+    VELOX_CHECK(currentPage_ == page->asMutable<char>());
+
+    pageAddresses_.resize(1);
+    pageAddresses_[0] = currentPage_;
+    pageNumber_ = 0;
+    pageCursor_ = 0;
+
+    // Reset and reallocate array_ to minimal size. Allocate array_ can 
trigger spill.
+    offset_ = 0;
+    initArray();
   }
   return arrow::Status::OK();
 }
 
 arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, 
size_t begin, size_t end) {
+  ScopedTimer timer(&sortTime_);
   // Serialize [begin, end)
-  uint32_t numRows = end - begin;
-  uint64_t rawSize = numRows * sizeof(RowSizeType);
-  for (auto i = begin; i < end; ++i) {
-    rawSize += array_[i].second;
-  }
-
-  if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) {
-    sortedBuffer_ = nullptr;
-    sortedBuffer_ = facebook::velox::AlignedBuffer::allocate<char>(rawSize, 
veloxPool_.get());
-  }
-  auto* rawBuffer = sortedBuffer_->asMutable<char>();
-
   uint64_t offset = 0;
-  for (auto i = begin; i < end; ++i) {
-    // size(size_t) | bytes
-    auto size = array_[i].second;
-    memcpy(rawBuffer + offset, &size, sizeof(RowSizeType));
-    offset += sizeof(RowSizeType);
-    auto index = extractPageNumberAndOffset(array_[i].first);
-    memcpy(rawBuffer + offset, pageAddresses_[index.first] + index.second, 
size);
+  char* addr;
+  uint32_t size;
+
+  auto index = begin;
+  while (index < end) {
+    auto pageIndex = extractPageNumberAndOffset(arrayPtr_[index]);
+    addr = pageAddresses_[pageIndex.first] + pageIndex.second;
+    size = *(RowSizeType*)addr;
+    if (offset + size > kSortedBufferSize) {
+      VELOX_CHECK(offset > 0);
+      auto payload = std::make_unique<InMemoryPayload>(
+          index - begin,
+          nullptr,
+          
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_,
 offset)});
+      RETURN_NOT_OK(
+          partitionWriter_->evict(partitionId, std::move(payload), 
Evict::type::kSortSpill, false, false, stopped_));
+      begin = index;
+      offset = 0;
+    }
+    gluten::fastCopy(rawBuffer_ + offset, addr, size);
     offset += size;
+    index++;
   }
-  VELOX_CHECK_EQ(offset, rawSize);
-
-  auto rawData = sortedBuffer_->as<uint8_t>();
-  std::vector<std::shared_ptr<arrow::Buffer>> buffers;
-  buffers.push_back(std::make_shared<arrow::Buffer>(rawData, rawSize));
-
-  auto payload = std::make_unique<InMemoryPayload>(numRows, nullptr, 
std::move(buffers));
+  auto payload = std::make_unique<InMemoryPayload>(
+      end - begin,
+      nullptr,
+      
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_,
 offset)});
   RETURN_NOT_OK(
       partitionWriter_->evict(partitionId, std::move(payload), 
Evict::type::kSortSpill, false, false, stopped_));
   return arrow::Status::OK();
@@ -272,8 +311,8 @@ uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t 
offset, uint32_t rows)
   if (fixedRowSize_) {
     return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())), 
rows);
   }
-  auto beginIter = rowSizes_.begin() + 1 + offset;
-  auto iter = std::upper_bound(beginIter, rowSizes_.end(), remainingBytes);
+  auto beginIter = rowSizePrefixSum_.begin() + 1 + offset;
+  auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(), 
remainingBytes);
   return iter - beginIter;
 }
 
@@ -281,23 +320,54 @@ void VeloxSortShuffleWriter::acquireNewBuffer(uint64_t 
memLimit, uint64_t minSiz
   auto size = std::max(std::min<uint64_t>(memLimit >> 2, 64UL * 1024 * 1024), 
minSizeRequired);
   // Allocating new buffer can trigger spill.
   auto newBuffer = facebook::velox::AlignedBuffer::allocate<char>(size, 
veloxPool_.get(), 0);
+  // If spill triggered, clear pages_.
+  if (offset_ == 0 && pages_.size() > 0) {
+    pageAddresses_.clear();
+    pages_.clear();
+  }
+  currentPage_ = newBuffer->asMutable<char>();
+  pageAddresses_.emplace_back(currentPage_);
   pages_.emplace_back(std::move(newBuffer));
+
   pageCursor_ = 0;
   pageNumber_ = pages_.size() - 1;
-  currentPage_ = pages_.back()->asMutable<char>();
-  pageAddresses_.emplace_back(currentPage_);
+  currenPageSize_ = pages_.back()->size();
 }
 
 void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) {
-  auto arraySize = (uint32_t)array_.size();
-  auto usableCapacity = useRadixSort_ ? arraySize / 2 : arraySize;
-  while (offset_ + rows > usableCapacity) {
-    arraySize <<= 1;
-    usableCapacity = useRadixSort_ ? arraySize / 2 : arraySize;
+  auto newSize = newArraySize(rows);
+  if (newSize > arraySize_) {
+    // May trigger spill.
+    auto newSizeBytes = newSize * sizeof(uint64_t);
+    auto newArray = 
facebook::velox::AlignedBuffer::allocate<char>(newSizeBytes, veloxPool_.get());
+    // Check if already satisfies.
+    if (newArraySize(rows) > arraySize_) {
+      auto newPtr = newArray->asMutable<uint64_t>();
+      if (offset_ > 0) {
+        gluten::fastCopy(newPtr, arrayPtr_, offset_ * sizeof(uint64_t));
+      }
+      arraySize_ = newSize;
+      arrayPtr_ = newPtr;
+      array_.reset();
+      array_.swap(newArray);
+    }
   }
-  if (arraySize != array_.size()) {
-    array_.resize(arraySize);
+}
+
+uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t rows) {
+  auto newSize = arraySize_;
+  auto usableCapacity = options_.useRadixSort ? newSize / 2 : newSize;
+  while (offset_ + rows > usableCapacity) {
+    newSize <<= 1;
+    usableCapacity = options_.useRadixSort ? newSize / 2 : newSize;
   }
+  return newSize;
+}
+
+void VeloxSortShuffleWriter::initArray() {
+  arraySize_ = options_.sortBufferInitialSize;
+  array_ = facebook::velox::AlignedBuffer::allocate<char>(arraySize_ * 
sizeof(uint64_t), veloxPool_.get());
+  arrayPtr_ = array_->asMutable<uint64_t>();
 }
 
 int64_t VeloxSortShuffleWriter::peakBytesAllocated() const {
@@ -311,4 +381,9 @@ int64_t VeloxSortShuffleWriter::totalSortTime() const {
 int64_t VeloxSortShuffleWriter::totalC2RTime() const {
   return c2rTime_;
 }
+
+int VeloxSortShuffleWriter::compare(const void* a, const void* b) {
+  // No same values.
+  return *(uint64_t*)a > *(uint64_t*)b ? 1 : -1;
+}
 } // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
index 6ac5308d0..747593ae4 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include "shuffle/RadixSort.h"
 #include "shuffle/VeloxShuffleWriter.h"
 
 #include <arrow/status.h>
@@ -31,6 +32,8 @@ namespace gluten {
 
 class VeloxSortShuffleWriter final : public VeloxShuffleWriter {
  public:
+  using RowSizeType = uint32_t;
+
   static arrow::Result<std::shared_ptr<VeloxShuffleWriter>> create(
       uint32_t numPartitions,
       std::unique_ptr<PartitionWriter> partitionWriter,
@@ -80,27 +83,28 @@ class VeloxSortShuffleWriter final : public 
VeloxShuffleWriter {
 
   void growArrayIfNecessary(uint32_t rows);
 
-  using RowSizeType = uint32_t;
-  using ElementType = std::pair<uint64_t, RowSizeType>;
-  using Allocator = facebook::velox::StlAllocator<ElementType>;
-  using SortArray = std::vector<ElementType, Allocator>;
+  uint32_t newArraySize(uint32_t rows);
+
+  void initArray();
+
+  static int compare(const void* a, const void* b);
 
-  std::unique_ptr<facebook::velox::HashStringAllocator> allocator_;
   // Stores compact row id -> row
-  SortArray array_;
+  facebook::velox::BufferPtr array_;
+  uint64_t* arrayPtr_;
+  uint32_t arraySize_;
   uint32_t offset_{0};
 
-  std::vector<facebook::velox::BufferPtr> pages_;
+  std::list<facebook::velox::BufferPtr> pages_;
   std::vector<char*> pageAddresses_;
   char* currentPage_;
   uint32_t pageNumber_;
   uint32_t pageCursor_;
-
-  // FIXME: Use configuration to replace hardcode.
-  uint32_t initialSize_ = 4096;
-  bool useRadixSort_ = false;
+  // For debug.
+  uint32_t currenPageSize_;
 
   facebook::velox::BufferPtr sortedBuffer_;
+  uint8_t* rawBuffer_;
 
   // Row ID -> Partition ID
   // subscript: The index of row in the current input RowVector
@@ -110,7 +114,8 @@ class VeloxSortShuffleWriter final : public 
VeloxShuffleWriter {
 
   std::shared_ptr<const facebook::velox::RowType> rowType_;
   std::optional<int32_t> fixedRowSize_;
-  std::vector<uint64_t> rowSizes_;
+  std::vector<RowSizeType> rowSize_;
+  std::vector<uint64_t> rowSizePrefixSum_;
 
   int64_t c2rTime_{0};
   int64_t sortTime_{0};
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index 6b86f6a0a..af9d5a58d 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -70,10 +70,12 @@ std::vector<ShuffleTestParams> createShuffleTestParams() {
   std::vector<int32_t> mergeBufferSizes = {0, 3, 4, 10, 4096};
 
   for (const auto& compression : compressions) {
+    for (auto useRadixSort : {true, false}) {
+      params.push_back(ShuffleTestParams{
+          ShuffleWriterType::kSortShuffle, PartitionWriterType::kLocal, 
compression, 0, 0, useRadixSort});
+    }
     params.push_back(
-        ShuffleTestParams{ShuffleWriterType::kSortShuffle, 
PartitionWriterType::kLocal, compression, 0, 0});
-    params.push_back(
-        ShuffleTestParams{ShuffleWriterType::kRssSortShuffle, 
PartitionWriterType::kRss, compression, 0, 0});
+        ShuffleTestParams{ShuffleWriterType::kRssSortShuffle, 
PartitionWriterType::kRss, compression, 0, 0, false});
     for (const auto compressionThreshold : compressionThresholds) {
       for (const auto mergeBufferSize : mergeBufferSizes) {
         params.push_back(ShuffleTestParams{
@@ -81,7 +83,8 @@ std::vector<ShuffleTestParams> createShuffleTestParams() {
             PartitionWriterType::kLocal,
             compression,
             compressionThreshold,
-            mergeBufferSize});
+            mergeBufferSize,
+            false /* unused */});
       }
       params.push_back(ShuffleTestParams{
           ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss, 
compression, compressionThreshold, 0});
diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h 
b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
index f9c2b1d07..d32e32721 100644
--- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
@@ -68,12 +68,13 @@ struct ShuffleTestParams {
   arrow::Compression::type compressionType;
   int32_t compressionThreshold;
   int32_t mergeBufferSize;
+  bool useRadixSort;
 
   std::string toString() const {
     std::ostringstream out;
     out << "shuffleWriterType = " << shuffleWriterType << ", 
partitionWriterType = " << partitionWriterType
         << ", compressionType = " << compressionType << ", 
compressionThreshold = " << compressionThreshold
-        << ", mergeBufferSize = " << mergeBufferSize;
+        << ", mergeBufferSize = " << mergeBufferSize << ", useRadixSort = " << 
(useRadixSort ? "true" : "false");
     return out.str();
   }
 };
@@ -250,6 +251,7 @@ class VeloxShuffleWriterTest : public 
::testing::TestWithParam<ShuffleTestParams
     RETURN_NOT_OK(VeloxShuffleWriterTestBase::initShuffleWriterOptions());
 
     ShuffleTestParams params = GetParam();
+    shuffleWriterOptions_.useRadixSort = params.useRadixSort;
     partitionWriterOptions_.compressionType = params.compressionType;
     switch (partitionWriterOptions_.compressionType) {
       case arrow::Compression::UNCOMPRESSED:
diff --git 
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
 
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
index dd4904964..6f21b528f 100644
--- 
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
+++ 
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.shuffle
 
 import org.apache.gluten.GlutenConfig
+import org.apache.gluten.GlutenConfig.{GLUTEN_RSS_SORT_SHUFFLE_WRITER, 
GLUTEN_SORT_SHUFFLE_WRITER}
 import org.apache.gluten.exec.Runtimes
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.utils.ArrowAbiUtil
@@ -84,6 +85,7 @@ private class CelebornColumnarBatchSerializerInstance(
     val compressionCodecBackend =
       GlutenConfig.getConf.columnarShuffleCodecBackend.orNull
     val shuffleWriterType = GlutenConfig.getConf.celebornShuffleWriterType
+      .replace(GLUTEN_SORT_SHUFFLE_WRITER, GLUTEN_RSS_SORT_SHUFFLE_WRITER)
     val jniWrapper = ShuffleReaderJniWrapper.create(runtime)
     val batchSize = GlutenConfig.getConf.maxBatchSize
     val handle = jniWrapper
diff --git 
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
 
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
index baf61b8a1..8f613c728 100644
--- 
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
+++ 
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
@@ -23,6 +23,7 @@ import org.apache.gluten.memory.memtarget.{MemoryTarget, 
Spiller, Spillers}
 import org.apache.gluten.vectorized._
 
 import org.apache.spark._
+import org.apache.spark.internal.config.{SHUFFLE_SORT_INIT_BUFFER_SIZE, 
SHUFFLE_SORT_USE_RADIXSORT}
 import org.apache.spark.memory.SparkMemoryUtil
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle
@@ -48,6 +49,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
     celebornConf,
     client,
     writeMetrics) {
+  private val isSort = 
!GlutenConfig.GLUTEN_HASH_SHUFFLE_WRITER.equals(shuffleWriterType)
 
   private val runtime = Runtimes.contextInstance("CelebornShuffleWriter")
 
@@ -72,7 +74,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
         val handle = ColumnarBatches.getNativeHandle(cb)
         val startTime = System.nanoTime()
         jniWrapper.write(nativeShuffleWriter, cb.numRows, handle, 
availableOffHeapPerTask())
-        dep.metrics("splitTime").add(System.nanoTime() - startTime)
+        dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
         dep.metrics("numInputRows").add(cb.numRows)
         dep.metrics("inputBatches").add(1)
         // This metric is important, AQE use it to decide if EliminateLimit
@@ -84,10 +86,15 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
     val startTime = System.nanoTime()
     splitResult = jniWrapper.stop(nativeShuffleWriter)
 
-    dep
-      .metrics("splitTime")
+    dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
+    val nativeMetrics = if (isSort) {
+      dep.metrics("sortTime")
+    } else {
+      dep.metrics("splitTime")
+    }
+    nativeMetrics
       .add(
-        System.nanoTime() - startTime - splitResult.getTotalPushTime -
+        dep.metrics("shuffleWallTime").value - splitResult.getTotalPushTime -
           splitResult.getTotalWriteTime -
           splitResult.getTotalCompressTime)
     dep.metrics("dataSize").add(splitResult.getRawPartitionLengths.sum)
@@ -108,6 +115,8 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
       compressionLevel,
       bufferCompressThreshold,
       GlutenConfig.getConf.columnarShuffleCompressionMode,
+      conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt,
+      conf.get(SHUFFLE_SORT_USE_RADIXSORT),
       clientPushBufferMaxSize,
       clientPushSortMemoryThreshold,
       celebornPartitionPusher,
diff --git 
a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
 
b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
index 883fc6001..1d622d491 100644
--- 
a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
+++ 
b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
@@ -61,6 +61,8 @@ public class ShuffleWriterJniWrapper implements RuntimeAware {
       int compressionLevel,
       int bufferCompressThreshold,
       String compressionMode,
+      int sortBufferInitialSize,
+      boolean useRadixSort,
       String dataFile,
       int subDirsPerLocalDir,
       String localDirs,
@@ -80,6 +82,8 @@ public class ShuffleWriterJniWrapper implements RuntimeAware {
         compressionLevel,
         bufferCompressThreshold,
         compressionMode,
+        sortBufferInitialSize,
+        useRadixSort,
         dataFile,
         subDirsPerLocalDir,
         localDirs,
@@ -109,6 +113,8 @@ public class ShuffleWriterJniWrapper implements 
RuntimeAware {
       int compressionLevel,
       int bufferCompressThreshold,
       String compressionMode,
+      int sortBufferInitialSize,
+      boolean useRadixSort,
       int pushBufferMaxSize,
       long sortBufferMaxSize,
       Object pusher,
@@ -129,6 +135,8 @@ public class ShuffleWriterJniWrapper implements 
RuntimeAware {
         compressionLevel,
         bufferCompressThreshold,
         compressionMode,
+        sortBufferInitialSize,
+        useRadixSort,
         null,
         0,
         null,
@@ -154,6 +162,8 @@ public class ShuffleWriterJniWrapper implements 
RuntimeAware {
       int compressionLevel,
       int bufferCompressThreshold,
       String compressionMode,
+      int sortBufferInitialSize,
+      boolean useRadixSort,
       String dataFile,
       int subDirsPerLocalDir,
       String localDirs,
diff --git 
a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
 
b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index 8a8248f23..d62ff1d68 100644
--- 
a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++ 
b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -24,7 +24,7 @@ import org.apache.gluten.vectorized._
 
 import org.apache.spark._
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.SHUFFLE_COMPRESS
+import org.apache.spark.internal.config.{SHUFFLE_COMPRESS, 
SHUFFLE_SORT_INIT_BUFFER_SIZE, SHUFFLE_SORT_USE_RADIXSORT}
 import org.apache.spark.memory.SparkMemoryUtil
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -151,6 +151,8 @@ class ColumnarShuffleWriter[K, V](
             compressionLevel,
             bufferCompressThreshold,
             GlutenConfig.getConf.columnarShuffleCompressionMode,
+            conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt,
+            conf.get(SHUFFLE_SORT_USE_RADIXSORT),
             dataTmp.getAbsolutePath,
             blockManager.subDirsPerLocalDir,
             localDirs,
@@ -176,9 +178,7 @@ class ColumnarShuffleWriter[K, V](
         }
         val startTime = System.nanoTime()
         jniWrapper.write(nativeShuffleWriter, rows, handle, 
availableOffHeapPerTask())
-        if (!isSort) {
-          dep.metrics("splitTime").add(System.nanoTime() - startTime)
-        }
+        dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
         dep.metrics("numInputRows").add(rows)
         dep.metrics("inputBatches").add(1)
         // This metric is important, AQE use it to decide if EliminateLimit
@@ -191,11 +191,12 @@ class ColumnarShuffleWriter[K, V](
     assert(nativeShuffleWriter != -1L)
     splitResult = jniWrapper.stop(nativeShuffleWriter)
     closeShuffleWriter()
+    dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
     if (!isSort) {
       dep
         .metrics("splitTime")
         .add(
-          System.nanoTime() - startTime - splitResult.getTotalSpillTime -
+          dep.metrics("shuffleWallTime").value - splitResult.getTotalSpillTime 
-
             splitResult.getTotalWriteTime -
             splitResult.getTotalCompressTime)
     } else {
diff --git 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index 62e197163..b84c9d4ee 100644
--- 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++ 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -148,6 +148,8 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
                   compressionLevel,
                   compressThreshold,
                   GlutenConfig.getConf().columnarShuffleCompressionMode(),
+                  (int) (long) 
sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
+                  (boolean) 
sparkConf.get(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT()),
                   bufferSize,
                   bufferSize,
                   partitionPusher,
@@ -180,7 +182,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
         LOG.debug("jniWrapper.write rows {}, split bytes {}", cb.numRows(), 
bytes);
         columnarDep.metrics().get("dataSize").get().add(bytes);
         // this metric replace part of uniffle shuffle write time
-        columnarDep.metrics().get("splitTime").get().add(System.nanoTime() - 
startTime);
+        
columnarDep.metrics().get("shuffleWallTime").get().add(System.nanoTime() - 
startTime);
         columnarDep.metrics().get("numInputRows").get().add(cb.numRows());
         columnarDep.metrics().get("inputBatches").get().add(1);
         shuffleWriteMetrics.incRecordsWritten(cb.numRows());
@@ -193,13 +195,13 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
       throw new IllegalStateException("nativeShuffleWriter should not be -1L");
     }
     splitResult = jniWrapper.stop(nativeShuffleWriter);
+    columnarDep.metrics().get("shuffleWallTime").get().add(System.nanoTime() - 
startTime);
     columnarDep
         .metrics()
         .get("splitTime")
         .get()
         .add(
-            System.nanoTime()
-                - startTime
+            columnarDep.metrics().get("shuffleWallTime").get().value()
                 - splitResult.getTotalPushTime()
                 - splitResult.getTotalWriteTime()
                 - splitResult.getTotalCompressTime());
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 5547feafe..586a277f7 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -137,7 +137,6 @@ class GlutenConfig(conf: SQLConf) extends Logging {
     conf
       .getConfString("spark.celeborn.client.spark.shuffle.writer", 
GLUTEN_HASH_SHUFFLE_WRITER)
       .toLowerCase(Locale.ROOT)
-      .replace(GLUTEN_SORT_SHUFFLE_WRITER, GLUTEN_RSS_SORT_SHUFFLE_WRITER)
 
   def enableColumnarShuffle: Boolean = conf.getConf(COLUMNAR_SHUFFLE_ENABLED)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to