This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 408f4cbb0 [VL] Row based sort follow-up (#6579)
408f4cbb0 is described below
commit 408f4cbb0a9769ed8ea86d4cd6cb5cfa3eb62527
Author: Rong Ma <[email protected]>
AuthorDate: Mon Jul 29 16:14:51 2024 +0800
[VL] Row based sort follow-up (#6579)
---
.../gluten/backendsapi/velox/VeloxMetricsApi.scala | 1 +
cpp/core/CMakeLists.txt | 1 -
cpp/core/jni/JniWrapper.cc | 6 +-
cpp/core/shuffle/Options.cc | 18 --
cpp/core/shuffle/Options.h | 7 +
cpp/velox/shuffle/RadixSort.h | 157 +++++++++++++++
cpp/velox/shuffle/VeloxShuffleReader.cc | 4 +-
cpp/velox/shuffle/VeloxShuffleReader.h | 3 +
cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 217 ++++++++++++++-------
cpp/velox/shuffle/VeloxSortShuffleWriter.h | 29 +--
cpp/velox/tests/VeloxShuffleWriterTest.cc | 11 +-
cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h | 4 +-
.../VeloxCelebornColumnarBatchSerializer.scala | 2 +
.../VeloxCelebornColumnarShuffleWriter.scala | 17 +-
.../gluten/vectorized/ShuffleWriterJniWrapper.java | 10 +
.../spark/shuffle/ColumnarShuffleWriter.scala | 11 +-
.../writer/VeloxUniffleColumnarShuffleWriter.java | 8 +-
.../scala/org/apache/gluten/GlutenConfig.scala | 1 -
18 files changed, 384 insertions(+), 123 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 30b08749e..c05ce7cdc 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -272,6 +272,7 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"compressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time
to compress"),
"decompressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext,
"time to decompress"),
"deserializeTime" -> SQLMetrics.createNanoTimingMetric(sparkContext,
"time to deserialize"),
+ "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(sparkContext,
"shuffle wall time"),
// For hash shuffle writer, the peak bytes represents the maximum split
buffer size.
// For sort shuffle writer, the peak bytes represents the maximum
// row buffer + sort buffer size.
diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt
index caad8db1e..ef21ccbe8 100644
--- a/cpp/core/CMakeLists.txt
+++ b/cpp/core/CMakeLists.txt
@@ -191,7 +191,6 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
shuffle/FallbackRangePartitioner.cc
shuffle/HashPartitioner.cc
shuffle/LocalPartitionWriter.cc
- shuffle/Options.cc
shuffle/Partitioner.cc
shuffle/Partitioning.cc
shuffle/Payload.cc
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 3d6da31c7..f39f9c923 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -755,6 +755,8 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
jint compressionLevel,
jint compressionThreshold,
jstring compressionModeJstr,
+ jint sortBufferInitialSize,
+ jboolean useRadixSort,
jstring dataFileJstr,
jint numSubDirs,
jstring localDirsJstr,
@@ -780,7 +782,9 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
.partitioning = gluten::toPartitioning(jStringToCString(env,
partitioningNameJstr)),
.taskAttemptId = (int64_t)taskAttemptId,
.startPartitionId = startPartitionId,
- .shuffleWriterType =
gluten::ShuffleWriter::stringToType(jStringToCString(env,
shuffleWriterTypeJstr))};
+ .shuffleWriterType =
gluten::ShuffleWriter::stringToType(jStringToCString(env,
shuffleWriterTypeJstr)),
+ .sortBufferInitialSize = sortBufferInitialSize,
+ .useRadixSort = static_cast<bool>(useRadixSort)};
// Build PartitionWriterOptions.
auto partitionWriterOptions = PartitionWriterOptions{
diff --git a/cpp/core/shuffle/Options.cc b/cpp/core/shuffle/Options.cc
deleted file mode 100644
index 8e05a10d6..000000000
--- a/cpp/core/shuffle/Options.cc
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "shuffle/Options.h"
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 757950d03..11fa037eb 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -35,9 +35,12 @@ static constexpr int32_t kDefaultBufferAlignment = 64;
static constexpr double kDefaultBufferReallocThreshold = 0.25;
static constexpr double kDefaultMergeBufferThreshold = 0.25;
static constexpr bool kEnableBufferedWrite = true;
+static constexpr bool kDefaultUseRadixSort = true;
+static constexpr int32_t kDefaultSortBufferSize = 4096;
enum ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle };
enum PartitionWriterType { kLocal, kRss };
+enum SortAlgorithm { kRadixSort, kQuickSort };
struct ShuffleReaderOptions {
arrow::Compression::type compressionType =
arrow::Compression::type::LZ4_FRAME;
@@ -56,6 +59,10 @@ struct ShuffleWriterOptions {
int32_t startPartitionId = 0;
int64_t threadId = -1;
ShuffleWriterType shuffleWriterType = kHashShuffle;
+
+ // Sort shuffle writer.
+ int32_t sortBufferInitialSize = kDefaultSortBufferSize;
+ bool useRadixSort = kDefaultUseRadixSort;
};
struct PartitionWriterOptions {
diff --git a/cpp/velox/shuffle/RadixSort.h b/cpp/velox/shuffle/RadixSort.h
new file mode 100644
index 000000000..17f05d349
--- /dev/null
+++ b/cpp/velox/shuffle/RadixSort.h
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <algorithm>
+#include <cassert>
+#include <iostream>
+#include <vector>
+
+namespace gluten {
+
+template <typename Element>
+class RadixSort {
+ public:
+ /**
+ * Sorts a given array of longs using least-significant-digit radix sort.
This routine assumes
+ * you have extra space at the end of the array at least equal to the number
of records. The
+ * sort is destructive and may relocate the data positioned within the array.
+ *
+ * @param array array of long elements followed by at least that many empty
slots.
+ * @param numRecords number of data records in the array.
+ * @param startByteIndex the first byte (in range [0, 7]) to sort each long
by, counting from the
+ * least significant byte.
+ * @param endByteIndex the last byte (in range [0, 7]) to sort each long by,
counting from the
+ * least significant byte. Must be greater than
startByteIndex.
+ *
+ * @return The starting index of the sorted data within the given array. We
return this instead
+ * of always copying the data back to position zero for efficiency.
+ */
+ static int32_t sort(Element* array, size_t size, int64_t numRecords, int32_t
startByteIndex, int32_t endByteIndex) {
+ assert(startByteIndex >= 0 && "startByteIndex should >= 0");
+ assert(endByteIndex <= 7 && "endByteIndex should <= 7");
+ assert(endByteIndex > startByteIndex);
+ assert(numRecords * 2 <= size);
+
+ int64_t inIndex = 0;
+ int64_t outIndex = numRecords;
+
+ if (numRecords > 0) {
+ auto counts = getCounts(array, numRecords, startByteIndex, endByteIndex);
+
+ for (auto i = startByteIndex; i <= endByteIndex; i++) {
+ if (!counts[i].empty()) {
+ sortAtByte(array, numRecords, counts[i], i, inIndex, outIndex);
+ std::swap(inIndex, outIndex);
+ }
+ }
+ }
+
+ return static_cast<int32_t>(inIndex);
+ }
+
+ private:
+ /**
+ * Performs a partial sort by copying data into destination offsets for each
byte value at the
+ * specified byte offset.
+ *
+ * @param array array to partially sort.
+ * @param numRecords number of data records in the array.
+ * @param counts counts for each byte value. This routine destructively
modifies this array.
+ * @param byteIdx the byte in a long to sort at, counting from the least
significant byte.
+ * @param inIndex the starting index in the array where input data is
located.
+ * @param outIndex the starting index where sorted output data should be
written.
+ */
+ static void sortAtByte(
+ Element* array,
+ int64_t numRecords,
+ std::vector<int64_t>& counts,
+ int32_t byteIdx,
+ int64_t inIndex,
+ int64_t outIndex) {
+ assert(counts.size() == 256);
+
+ auto offsets = transformCountsToOffsets(counts, outIndex);
+
+ for (auto offset = inIndex; offset < inIndex + numRecords; ++offset) {
+ auto bucket = (array[offset] >> (byteIdx * 8)) & 0xff;
+ array[offsets[bucket]++] = array[offset];
+ }
+ }
+
+ /**
+ * Computes a value histogram for each byte in the given array.
+ *
+ * @param array array to count records in.
+ * @param numRecords number of data records in the array.
+ * @param startByteIndex the first byte to compute counts for (the prior are
skipped).
+ * @param endByteIndex the last byte to compute counts for.
+ *
+ * @return a vector of eight 256-element count arrays, one for each byte
starting from the least
+ * significant byte. If the byte does not need sorting the vector
entry will be empty.
+ */
+ static std::vector<std::vector<int64_t>>
+ getCounts(Element* array, int64_t numRecords, int32_t startByteIndex,
int32_t endByteIndex) {
+ std::vector<std::vector<int64_t>> counts;
+ counts.resize(8);
+
+ // Optimization: do a fast pre-pass to determine which byte indices we can
skip for sorting.
+ // If all the byte values at a particular index are the same we don't need
to count it.
+ int64_t bitwiseMax = 0;
+ int64_t bitwiseMin = -1L;
+ for (auto offset = 0; offset < numRecords; ++offset) {
+ auto value = array[offset];
+ bitwiseMax |= value;
+ bitwiseMin &= value;
+ }
+ auto bitsChanged = bitwiseMin ^ bitwiseMax;
+
+ // Compute counts for each byte index.
+ for (auto i = startByteIndex; i <= endByteIndex; i++) {
+ if (((bitsChanged >> (i * 8)) & 0xff) != 0) {
+ counts[i].resize(256);
+ for (auto offset = 0; offset < numRecords; ++offset) {
+ counts[i][(array[offset] >> (i * 8)) & 0xff]++;
+ }
+ }
+ }
+
+ return counts;
+ }
+
+ /**
+ * Transforms counts into the proper output offsets for the sort type.
+ *
+ * @param counts counts for each byte value. This routine destructively
modifies this vector.
+ * @param numRecords number of data records in the original data array.
+ * @param outputOffset output offset in bytes from the base array object.
+ *
+ * @return the input counts vector.
+ */
+ static std::vector<int64_t>& transformCountsToOffsets(std::vector<int64_t>&
counts, int64_t outputOffset) {
+ assert(counts.size() == 256);
+
+ int64_t pos = outputOffset;
+ for (auto i = 0; i < 256; i++) {
+ auto tmp = counts[i & 0xff];
+ counts[i & 0xff] = pos;
+ pos += tmp;
+ }
+
+ return counts;
+ }
+};
+
+} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index e55f4d01d..ab93d9a33 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -428,8 +428,8 @@ std::shared_ptr<ColumnarBatch>
VeloxSortShuffleReaderDeserializer::deserializeTo
auto buffer = cur->second;
const auto* rawBuffer = buffer->as<char>();
while (rowOffset_ < cur->first && readRows < batchSize_) {
- auto rowSize = *(uint32_t*)(rawBuffer + byteOffset_);
- byteOffset_ += sizeof(uint32_t);
+ auto rowSize = *(RowSizeType*)(rawBuffer + byteOffset_) -
sizeof(RowSizeType);
+ byteOffset_ += sizeof(RowSizeType);
data.push_back(std::string_view(rawBuffer + byteOffset_, rowSize));
byteOffset_ += rowSize;
++rowOffset_;
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h
b/cpp/velox/shuffle/VeloxShuffleReader.h
index 962739485..2be913aa1 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -20,6 +20,7 @@
#include "operators/serializer/VeloxColumnarBatchSerializer.h"
#include "shuffle/Payload.h"
#include "shuffle/ShuffleReader.h"
+#include "shuffle/VeloxSortShuffleWriter.h"
#include "velox/type/Type.h"
#include "velox/vector/ComplexVector.h"
@@ -64,6 +65,8 @@ class VeloxHashShuffleReaderDeserializer final : public
ColumnarBatchIterator {
class VeloxSortShuffleReaderDeserializer final : public ColumnarBatchIterator {
public:
+ using RowSizeType = VeloxSortShuffleWriter::RowSizeType;
+
VeloxSortShuffleReaderDeserializer(
std::shared_ptr<arrow::io::InputStream> in,
const std::shared_ptr<arrow::Schema>& schema,
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index 7c033fb98..0015ba9d3 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -29,6 +29,10 @@ namespace gluten {
namespace {
constexpr uint32_t kMaskLower27Bits = (1 << 27) - 1;
constexpr uint64_t kMaskLower40Bits = (1UL << 40) - 1;
+constexpr uint32_t kPartitionIdStartByteIndex = 5;
+constexpr uint32_t kPartitionIdEndByteIndex = 7;
+
+constexpr uint32_t kSortedBufferSize = 1 * 1024 * 1024;
uint64_t toCompactRowId(uint32_t partitionId, uint32_t pageNumber, uint32_t
offsetInPage) {
// |63 partitionId(24) |39 inputIndex(13) |26 rowIndex(27) |
@@ -62,9 +66,7 @@ VeloxSortShuffleWriter::VeloxSortShuffleWriter(
ShuffleWriterOptions options,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
arrow::MemoryPool* pool)
- : VeloxShuffleWriter(numPartitions, std::move(partitionWriter),
std::move(options), std::move(veloxPool), pool),
-
allocator_{std::make_unique<facebook::velox::HashStringAllocator>(veloxPool_.get())},
- array_{SortArray{Allocator(allocator_.get())}} {}
+ : VeloxShuffleWriter(numPartitions, std::move(partitionWriter),
std::move(options), std::move(veloxPool), pool) {}
arrow::Status VeloxSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb,
int64_t memLimit) {
ARROW_ASSIGN_OR_RAISE(auto rv, getPeeledRowVector(cb));
@@ -76,10 +78,14 @@ arrow::Status
VeloxSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, i
arrow::Status VeloxSortShuffleWriter::stop() {
ARROW_RETURN_IF(evictState_ == EvictState::kUnevictable,
arrow::Status::Invalid("Unevictable state in stop."));
- EvictGuard evictGuard{evictState_};
-
stopped_ = true;
- RETURN_NOT_OK(evictAllPartitions());
+ if (offset_ > 0) {
+ RETURN_NOT_OK(evictAllPartitions());
+ }
+ array_.reset();
+ sortedBuffer_.reset();
+ pages_.clear();
+ pageAddresses_.clear();
RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
return arrow::Status::OK();
}
@@ -89,7 +95,6 @@ arrow::Status
VeloxSortShuffleWriter::reclaimFixedSize(int64_t size, int64_t* ac
*actual = 0;
return arrow::Status::OK();
}
- EvictGuard evictGuard{evictState_};
auto beforeReclaim = veloxPool_->usedBytes();
RETURN_NOT_OK(evictAllPartitions());
*actual = beforeReclaim - veloxPool_->usedBytes();
@@ -100,7 +105,9 @@ arrow::Status VeloxSortShuffleWriter::init() {
ARROW_RETURN_IF(
options_.partitioning == Partitioning::kSingle,
arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single
partition."));
- array_.resize(initialSize_);
+ initArray();
+ sortedBuffer_ =
facebook::velox::AlignedBuffer::allocate<char>(kSortedBufferSize,
veloxPool_.get());
+ rawBuffer_ = sortedBuffer_->asMutable<uint8_t>();
return arrow::Status::OK();
}
@@ -108,6 +115,9 @@ void VeloxSortShuffleWriter::initRowType(const
facebook::velox::RowVectorPtr& rv
if (UNLIKELY(!rowType_)) {
rowType_ = facebook::velox::asRowType(rv->type());
fixedRowSize_ = facebook::velox::row::CompactRow::fixedRowSize(rowType_);
+ if (fixedRowSize_) {
+ *fixedRowSize_ += sizeof(RowSizeType);
+ }
}
}
@@ -148,11 +158,16 @@ arrow::Status VeloxSortShuffleWriter::insert(const
facebook::velox::RowVectorPtr
facebook::velox::row::CompactRow row(vector);
if (!fixedRowSize_) {
- rowSizes_.resize(inputRows + 1);
- rowSizes_[0] = 0;
+ rowSize_.resize(inputRows);
+ rowSizePrefixSum_.resize(inputRows + 1);
+ rowSizePrefixSum_[0] = 0;
for (auto i = 0; i < inputRows; ++i) {
- rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i);
+ auto rowSize = row.rowSize(i) + sizeof(RowSizeType);
+ rowSize_[i] = rowSize;
+ rowSizePrefixSum_[i + 1] = rowSizePrefixSum_[i] + rowSize;
}
+ } else {
+ rowSize_.resize(inputRows, *fixedRowSize_);
}
uint32_t rowOffset = 0;
@@ -160,13 +175,16 @@ arrow::Status VeloxSortShuffleWriter::insert(const
facebook::velox::RowVectorPtr
auto remainingRows = inputRows - rowOffset;
auto rows = maxRowsToInsert(rowOffset, remainingRows);
if (rows == 0) {
- auto minSizeRequired = fixedRowSize_ ? fixedRowSize_.value() :
rowSizes_[rowOffset + 1] - rowSizes_[rowOffset];
+ auto minSizeRequired = fixedRowSize_ ? fixedRowSize_.value() :
rowSize_[rowOffset];
acquireNewBuffer((uint64_t)memLimit, minSizeRequired);
rows = maxRowsToInsert(rowOffset, remainingRows);
ARROW_RETURN_IF(
rows == 0, arrow::Status::Invalid("Failed to insert rows. Remaining
rows: " + std::to_string(remainingRows)));
}
+ // Spill to avoid offset_ overflow.
RETURN_NOT_OK(maybeSpill(rows));
+ // Allocate newArray can trigger spill.
+ growArrayIfNecessary(rows);
insertRows(row, rowOffset, rows);
rowOffset += rows;
}
@@ -174,35 +192,48 @@ arrow::Status VeloxSortShuffleWriter::insert(const
facebook::velox::RowVectorPtr
}
void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row,
uint32_t offset, uint32_t rows) {
- // Allocate newArray can trigger spill.
- growArrayIfNecessary(rows);
+ VELOX_CHECK(!pages_.empty());
for (auto i = offset; i < offset + rows; ++i) {
+ auto pid = row2Partition_[i];
+ arrayPtr_[offset_++] = toCompactRowId(pid, pageNumber_, pageCursor_);
+ // size(RowSize) | bytes
+ memcpy(currentPage_ + pageCursor_, &rowSize_[i], sizeof(RowSizeType));
+ pageCursor_ += sizeof(RowSizeType);
auto size = row.serialize(i, currentPage_ + pageCursor_);
- array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_,
pageCursor_), size};
pageCursor_ += size;
+ VELOX_DCHECK_LE(pageCursor_, currenPageSize_);
}
}
arrow::Status VeloxSortShuffleWriter::maybeSpill(int32_t nextRows) {
if ((uint64_t)offset_ + nextRows > std::numeric_limits<uint32_t>::max()) {
- EvictGuard evictGuard{evictState_};
RETURN_NOT_OK(evictAllPartitions());
}
return arrow::Status::OK();
}
arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
+ EvictGuard evictGuard{evictState_};
+
+ auto numRecords = offset_;
+ int32_t begin = 0;
{
ScopedTimer timer(&sortTime_);
- // TODO: Add radix sort to align with Spark.
- std::sort(array_.begin(), array_.begin() + offset_);
+ if (options_.useRadixSort) {
+ begin = RadixSort<uint64_t>::sort(
+ arrayPtr_, arraySize_, numRecords, kPartitionIdStartByteIndex,
kPartitionIdEndByteIndex);
+ } else {
+ auto ptr = arrayPtr_;
+ qsort(ptr, numRecords, sizeof(uint64_t), compare);
+ (void)ptr;
+ }
}
- size_t begin = 0;
- size_t cur = 0;
- auto pid = extractPartitionId(array_[begin].first);
- while (++cur < offset_) {
- auto curPid = extractPartitionId(array_[cur].first);
+ auto end = begin + numRecords;
+ auto cur = begin;
+ auto pid = extractPartitionId(arrayPtr_[begin]);
+ while (++cur < end) {
+ auto curPid = extractPartitionId(arrayPtr_[cur]);
if (curPid != pid) {
RETURN_NOT_OK(evictPartition(pid, begin, cur));
pid = curPid;
@@ -211,53 +242,61 @@ arrow::Status
VeloxSortShuffleWriter::evictAllPartitions() {
}
RETURN_NOT_OK(evictPartition(pid, begin, cur));
- pageCursor_ = 0;
- pages_.clear();
- pageAddresses_.clear();
-
- offset_ = 0;
- array_.clear();
-
- sortedBuffer_ = nullptr;
-
if (!stopped_) {
- // Allocate array_ can trigger spill.
- array_.resize(initialSize_);
+ // Preserve the last page for use.
+ auto numPages = pages_.size();
+ while (--numPages) {
+ pages_.pop_front();
+ }
+ auto& page = pages_.back();
+ // Clear page for serialization.
+ memset(page->asMutable<char>(), 0, page->size());
+ // currentPage_ should always point to the last page.
+ VELOX_CHECK(currentPage_ == page->asMutable<char>());
+
+ pageAddresses_.resize(1);
+ pageAddresses_[0] = currentPage_;
+ pageNumber_ = 0;
+ pageCursor_ = 0;
+
+ // Reset and reallocate array_ to minimal size. Allocate array_ can
trigger spill.
+ offset_ = 0;
+ initArray();
}
return arrow::Status::OK();
}
arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId,
size_t begin, size_t end) {
+ ScopedTimer timer(&sortTime_);
// Serialize [begin, end)
- uint32_t numRows = end - begin;
- uint64_t rawSize = numRows * sizeof(RowSizeType);
- for (auto i = begin; i < end; ++i) {
- rawSize += array_[i].second;
- }
-
- if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) {
- sortedBuffer_ = nullptr;
- sortedBuffer_ = facebook::velox::AlignedBuffer::allocate<char>(rawSize,
veloxPool_.get());
- }
- auto* rawBuffer = sortedBuffer_->asMutable<char>();
-
uint64_t offset = 0;
- for (auto i = begin; i < end; ++i) {
- // size(size_t) | bytes
- auto size = array_[i].second;
- memcpy(rawBuffer + offset, &size, sizeof(RowSizeType));
- offset += sizeof(RowSizeType);
- auto index = extractPageNumberAndOffset(array_[i].first);
- memcpy(rawBuffer + offset, pageAddresses_[index.first] + index.second,
size);
+ char* addr;
+ uint32_t size;
+
+ auto index = begin;
+ while (index < end) {
+ auto pageIndex = extractPageNumberAndOffset(arrayPtr_[index]);
+ addr = pageAddresses_[pageIndex.first] + pageIndex.second;
+ size = *(RowSizeType*)addr;
+ if (offset + size > kSortedBufferSize) {
+ VELOX_CHECK(offset > 0);
+ auto payload = std::make_unique<InMemoryPayload>(
+ index - begin,
+ nullptr,
+
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_,
offset)});
+ RETURN_NOT_OK(
+ partitionWriter_->evict(partitionId, std::move(payload),
Evict::type::kSortSpill, false, false, stopped_));
+ begin = index;
+ offset = 0;
+ }
+ gluten::fastCopy(rawBuffer_ + offset, addr, size);
offset += size;
+ index++;
}
- VELOX_CHECK_EQ(offset, rawSize);
-
- auto rawData = sortedBuffer_->as<uint8_t>();
- std::vector<std::shared_ptr<arrow::Buffer>> buffers;
- buffers.push_back(std::make_shared<arrow::Buffer>(rawData, rawSize));
-
- auto payload = std::make_unique<InMemoryPayload>(numRows, nullptr,
std::move(buffers));
+ auto payload = std::make_unique<InMemoryPayload>(
+ end - begin,
+ nullptr,
+
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_,
offset)});
RETURN_NOT_OK(
partitionWriter_->evict(partitionId, std::move(payload),
Evict::type::kSortSpill, false, false, stopped_));
return arrow::Status::OK();
@@ -272,8 +311,8 @@ uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t
offset, uint32_t rows)
if (fixedRowSize_) {
return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())),
rows);
}
- auto beginIter = rowSizes_.begin() + 1 + offset;
- auto iter = std::upper_bound(beginIter, rowSizes_.end(), remainingBytes);
+ auto beginIter = rowSizePrefixSum_.begin() + 1 + offset;
+ auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(),
remainingBytes);
return iter - beginIter;
}
@@ -281,23 +320,54 @@ void VeloxSortShuffleWriter::acquireNewBuffer(uint64_t
memLimit, uint64_t minSiz
auto size = std::max(std::min<uint64_t>(memLimit >> 2, 64UL * 1024 * 1024),
minSizeRequired);
// Allocating new buffer can trigger spill.
auto newBuffer = facebook::velox::AlignedBuffer::allocate<char>(size,
veloxPool_.get(), 0);
+ // If spill triggered, clear pages_.
+ if (offset_ == 0 && pages_.size() > 0) {
+ pageAddresses_.clear();
+ pages_.clear();
+ }
+ currentPage_ = newBuffer->asMutable<char>();
+ pageAddresses_.emplace_back(currentPage_);
pages_.emplace_back(std::move(newBuffer));
+
pageCursor_ = 0;
pageNumber_ = pages_.size() - 1;
- currentPage_ = pages_.back()->asMutable<char>();
- pageAddresses_.emplace_back(currentPage_);
+ currenPageSize_ = pages_.back()->size();
}
void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) {
- auto arraySize = (uint32_t)array_.size();
- auto usableCapacity = useRadixSort_ ? arraySize / 2 : arraySize;
- while (offset_ + rows > usableCapacity) {
- arraySize <<= 1;
- usableCapacity = useRadixSort_ ? arraySize / 2 : arraySize;
+ auto newSize = newArraySize(rows);
+ if (newSize > arraySize_) {
+ // May trigger spill.
+ auto newSizeBytes = newSize * sizeof(uint64_t);
+ auto newArray =
facebook::velox::AlignedBuffer::allocate<char>(newSizeBytes, veloxPool_.get());
+ // Check if already satisfies.
+ if (newArraySize(rows) > arraySize_) {
+ auto newPtr = newArray->asMutable<uint64_t>();
+ if (offset_ > 0) {
+ gluten::fastCopy(newPtr, arrayPtr_, offset_ * sizeof(uint64_t));
+ }
+ arraySize_ = newSize;
+ arrayPtr_ = newPtr;
+ array_.reset();
+ array_.swap(newArray);
+ }
}
- if (arraySize != array_.size()) {
- array_.resize(arraySize);
+}
+
+uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t rows) {
+ auto newSize = arraySize_;
+ auto usableCapacity = options_.useRadixSort ? newSize / 2 : newSize;
+ while (offset_ + rows > usableCapacity) {
+ newSize <<= 1;
+ usableCapacity = options_.useRadixSort ? newSize / 2 : newSize;
}
+ return newSize;
+}
+
+void VeloxSortShuffleWriter::initArray() {
+ arraySize_ = options_.sortBufferInitialSize;
+ array_ = facebook::velox::AlignedBuffer::allocate<char>(arraySize_ *
sizeof(uint64_t), veloxPool_.get());
+ arrayPtr_ = array_->asMutable<uint64_t>();
}
int64_t VeloxSortShuffleWriter::peakBytesAllocated() const {
@@ -311,4 +381,9 @@ int64_t VeloxSortShuffleWriter::totalSortTime() const {
int64_t VeloxSortShuffleWriter::totalC2RTime() const {
return c2rTime_;
}
+
+int VeloxSortShuffleWriter::compare(const void* a, const void* b) {
+ // No same values.
+ return *(uint64_t*)a > *(uint64_t*)b ? 1 : -1;
+}
} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
index 6ac5308d0..747593ae4 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
@@ -17,6 +17,7 @@
#pragma once
+#include "shuffle/RadixSort.h"
#include "shuffle/VeloxShuffleWriter.h"
#include <arrow/status.h>
@@ -31,6 +32,8 @@ namespace gluten {
class VeloxSortShuffleWriter final : public VeloxShuffleWriter {
public:
+ using RowSizeType = uint32_t;
+
static arrow::Result<std::shared_ptr<VeloxShuffleWriter>> create(
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
@@ -80,27 +83,28 @@ class VeloxSortShuffleWriter final : public
VeloxShuffleWriter {
void growArrayIfNecessary(uint32_t rows);
- using RowSizeType = uint32_t;
- using ElementType = std::pair<uint64_t, RowSizeType>;
- using Allocator = facebook::velox::StlAllocator<ElementType>;
- using SortArray = std::vector<ElementType, Allocator>;
+ uint32_t newArraySize(uint32_t rows);
+
+ void initArray();
+
+ static int compare(const void* a, const void* b);
- std::unique_ptr<facebook::velox::HashStringAllocator> allocator_;
// Stores compact row id -> row
- SortArray array_;
+ facebook::velox::BufferPtr array_;
+ uint64_t* arrayPtr_;
+ uint32_t arraySize_;
uint32_t offset_{0};
- std::vector<facebook::velox::BufferPtr> pages_;
+ std::list<facebook::velox::BufferPtr> pages_;
std::vector<char*> pageAddresses_;
char* currentPage_;
uint32_t pageNumber_;
uint32_t pageCursor_;
-
- // FIXME: Use configuration to replace hardcode.
- uint32_t initialSize_ = 4096;
- bool useRadixSort_ = false;
+ // For debug.
+ uint32_t currenPageSize_;
facebook::velox::BufferPtr sortedBuffer_;
+ uint8_t* rawBuffer_;
// Row ID -> Partition ID
// subscript: The index of row in the current input RowVector
@@ -110,7 +114,8 @@ class VeloxSortShuffleWriter final : public
VeloxShuffleWriter {
std::shared_ptr<const facebook::velox::RowType> rowType_;
std::optional<int32_t> fixedRowSize_;
- std::vector<uint64_t> rowSizes_;
+ std::vector<RowSizeType> rowSize_;
+ std::vector<uint64_t> rowSizePrefixSum_;
int64_t c2rTime_{0};
int64_t sortTime_{0};
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index 6b86f6a0a..af9d5a58d 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -70,10 +70,12 @@ std::vector<ShuffleTestParams> createShuffleTestParams() {
std::vector<int32_t> mergeBufferSizes = {0, 3, 4, 10, 4096};
for (const auto& compression : compressions) {
+ for (auto useRadixSort : {true, false}) {
+ params.push_back(ShuffleTestParams{
+ ShuffleWriterType::kSortShuffle, PartitionWriterType::kLocal,
compression, 0, 0, useRadixSort});
+ }
params.push_back(
- ShuffleTestParams{ShuffleWriterType::kSortShuffle,
PartitionWriterType::kLocal, compression, 0, 0});
- params.push_back(
- ShuffleTestParams{ShuffleWriterType::kRssSortShuffle,
PartitionWriterType::kRss, compression, 0, 0});
+ ShuffleTestParams{ShuffleWriterType::kRssSortShuffle,
PartitionWriterType::kRss, compression, 0, 0, false});
for (const auto compressionThreshold : compressionThresholds) {
for (const auto mergeBufferSize : mergeBufferSizes) {
params.push_back(ShuffleTestParams{
@@ -81,7 +83,8 @@ std::vector<ShuffleTestParams> createShuffleTestParams() {
PartitionWriterType::kLocal,
compression,
compressionThreshold,
- mergeBufferSize});
+ mergeBufferSize,
+ false /* unused */});
}
params.push_back(ShuffleTestParams{
ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss,
compression, compressionThreshold, 0});
diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
index f9c2b1d07..d32e32721 100644
--- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
@@ -68,12 +68,13 @@ struct ShuffleTestParams {
arrow::Compression::type compressionType;
int32_t compressionThreshold;
int32_t mergeBufferSize;
+ bool useRadixSort;
std::string toString() const {
std::ostringstream out;
out << "shuffleWriterType = " << shuffleWriterType << ",
partitionWriterType = " << partitionWriterType
<< ", compressionType = " << compressionType << ",
compressionThreshold = " << compressionThreshold
- << ", mergeBufferSize = " << mergeBufferSize;
+ << ", mergeBufferSize = " << mergeBufferSize << ", useRadixSort = " <<
(useRadixSort ? "true" : "false");
return out.str();
}
};
@@ -250,6 +251,7 @@ class VeloxShuffleWriterTest : public
::testing::TestWithParam<ShuffleTestParams
RETURN_NOT_OK(VeloxShuffleWriterTestBase::initShuffleWriterOptions());
ShuffleTestParams params = GetParam();
+ shuffleWriterOptions_.useRadixSort = params.useRadixSort;
partitionWriterOptions_.compressionType = params.compressionType;
switch (partitionWriterOptions_.compressionType) {
case arrow::Compression::UNCOMPRESSED:
diff --git
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
index dd4904964..6f21b528f 100644
---
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
+++
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle
import org.apache.gluten.GlutenConfig
+import org.apache.gluten.GlutenConfig.{GLUTEN_RSS_SORT_SHUFFLE_WRITER,
GLUTEN_SORT_SHUFFLE_WRITER}
import org.apache.gluten.exec.Runtimes
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.utils.ArrowAbiUtil
@@ -84,6 +85,7 @@ private class CelebornColumnarBatchSerializerInstance(
val compressionCodecBackend =
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull
val shuffleWriterType = GlutenConfig.getConf.celebornShuffleWriterType
+ .replace(GLUTEN_SORT_SHUFFLE_WRITER, GLUTEN_RSS_SORT_SHUFFLE_WRITER)
val jniWrapper = ShuffleReaderJniWrapper.create(runtime)
val batchSize = GlutenConfig.getConf.maxBatchSize
val handle = jniWrapper
diff --git
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
index baf61b8a1..8f613c728 100644
---
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
+++
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
@@ -23,6 +23,7 @@ import org.apache.gluten.memory.memtarget.{MemoryTarget,
Spiller, Spillers}
import org.apache.gluten.vectorized._
import org.apache.spark._
+import org.apache.spark.internal.config.{SHUFFLE_SORT_INIT_BUFFER_SIZE,
SHUFFLE_SORT_USE_RADIXSORT}
import org.apache.spark.memory.SparkMemoryUtil
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle
@@ -48,6 +49,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
celebornConf,
client,
writeMetrics) {
+ private val isSort =
!GlutenConfig.GLUTEN_HASH_SHUFFLE_WRITER.equals(shuffleWriterType)
private val runtime = Runtimes.contextInstance("CelebornShuffleWriter")
@@ -72,7 +74,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
val handle = ColumnarBatches.getNativeHandle(cb)
val startTime = System.nanoTime()
jniWrapper.write(nativeShuffleWriter, cb.numRows, handle,
availableOffHeapPerTask())
- dep.metrics("splitTime").add(System.nanoTime() - startTime)
+ dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
dep.metrics("numInputRows").add(cb.numRows)
dep.metrics("inputBatches").add(1)
// This metric is important, AQE use it to decide if EliminateLimit
@@ -84,10 +86,15 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
val startTime = System.nanoTime()
splitResult = jniWrapper.stop(nativeShuffleWriter)
- dep
- .metrics("splitTime")
+ dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
+ val nativeMetrics = if (isSort) {
+ dep.metrics("sortTime")
+ } else {
+ dep.metrics("splitTime")
+ }
+ nativeMetrics
.add(
- System.nanoTime() - startTime - splitResult.getTotalPushTime -
+ dep.metrics("shuffleWallTime").value - splitResult.getTotalPushTime -
splitResult.getTotalWriteTime -
splitResult.getTotalCompressTime)
dep.metrics("dataSize").add(splitResult.getRawPartitionLengths.sum)
@@ -108,6 +115,8 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
compressionLevel,
bufferCompressThreshold,
GlutenConfig.getConf.columnarShuffleCompressionMode,
+ conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt,
+ conf.get(SHUFFLE_SORT_USE_RADIXSORT),
clientPushBufferMaxSize,
clientPushSortMemoryThreshold,
celebornPartitionPusher,
diff --git
a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
index 883fc6001..1d622d491 100644
---
a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
+++
b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
@@ -61,6 +61,8 @@ public class ShuffleWriterJniWrapper implements RuntimeAware {
int compressionLevel,
int bufferCompressThreshold,
String compressionMode,
+ int sortBufferInitialSize,
+ boolean useRadixSort,
String dataFile,
int subDirsPerLocalDir,
String localDirs,
@@ -80,6 +82,8 @@ public class ShuffleWriterJniWrapper implements RuntimeAware {
compressionLevel,
bufferCompressThreshold,
compressionMode,
+ sortBufferInitialSize,
+ useRadixSort,
dataFile,
subDirsPerLocalDir,
localDirs,
@@ -109,6 +113,8 @@ public class ShuffleWriterJniWrapper implements
RuntimeAware {
int compressionLevel,
int bufferCompressThreshold,
String compressionMode,
+ int sortBufferInitialSize,
+ boolean useRadixSort,
int pushBufferMaxSize,
long sortBufferMaxSize,
Object pusher,
@@ -129,6 +135,8 @@ public class ShuffleWriterJniWrapper implements
RuntimeAware {
compressionLevel,
bufferCompressThreshold,
compressionMode,
+ sortBufferInitialSize,
+ useRadixSort,
null,
0,
null,
@@ -154,6 +162,8 @@ public class ShuffleWriterJniWrapper implements
RuntimeAware {
int compressionLevel,
int bufferCompressThreshold,
String compressionMode,
+ int sortBufferInitialSize,
+ boolean useRadixSort,
String dataFile,
int subDirsPerLocalDir,
String localDirs,
diff --git
a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index 8a8248f23..d62ff1d68 100644
---
a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++
b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -24,7 +24,7 @@ import org.apache.gluten.vectorized._
import org.apache.spark._
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.SHUFFLE_COMPRESS
+import org.apache.spark.internal.config.{SHUFFLE_COMPRESS,
SHUFFLE_SORT_INIT_BUFFER_SIZE, SHUFFLE_SORT_USE_RADIXSORT}
import org.apache.spark.memory.SparkMemoryUtil
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -151,6 +151,8 @@ class ColumnarShuffleWriter[K, V](
compressionLevel,
bufferCompressThreshold,
GlutenConfig.getConf.columnarShuffleCompressionMode,
+ conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt,
+ conf.get(SHUFFLE_SORT_USE_RADIXSORT),
dataTmp.getAbsolutePath,
blockManager.subDirsPerLocalDir,
localDirs,
@@ -176,9 +178,7 @@ class ColumnarShuffleWriter[K, V](
}
val startTime = System.nanoTime()
jniWrapper.write(nativeShuffleWriter, rows, handle,
availableOffHeapPerTask())
- if (!isSort) {
- dep.metrics("splitTime").add(System.nanoTime() - startTime)
- }
+ dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
dep.metrics("numInputRows").add(rows)
dep.metrics("inputBatches").add(1)
// This metric is important, AQE use it to decide if EliminateLimit
@@ -191,11 +191,12 @@ class ColumnarShuffleWriter[K, V](
assert(nativeShuffleWriter != -1L)
splitResult = jniWrapper.stop(nativeShuffleWriter)
closeShuffleWriter()
+ dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
if (!isSort) {
dep
.metrics("splitTime")
.add(
- System.nanoTime() - startTime - splitResult.getTotalSpillTime -
+ dep.metrics("shuffleWallTime").value - splitResult.getTotalSpillTime
-
splitResult.getTotalWriteTime -
splitResult.getTotalCompressTime)
} else {
diff --git
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index 62e197163..b84c9d4ee 100644
---
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -148,6 +148,8 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
compressionLevel,
compressThreshold,
GlutenConfig.getConf().columnarShuffleCompressionMode(),
+ (int) (long)
sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
+ (boolean)
sparkConf.get(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT()),
bufferSize,
bufferSize,
partitionPusher,
@@ -180,7 +182,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
LOG.debug("jniWrapper.write rows {}, split bytes {}", cb.numRows(),
bytes);
columnarDep.metrics().get("dataSize").get().add(bytes);
// this metric replace part of uniffle shuffle write time
- columnarDep.metrics().get("splitTime").get().add(System.nanoTime() -
startTime);
+
columnarDep.metrics().get("shuffleWallTime").get().add(System.nanoTime() -
startTime);
columnarDep.metrics().get("numInputRows").get().add(cb.numRows());
columnarDep.metrics().get("inputBatches").get().add(1);
shuffleWriteMetrics.incRecordsWritten(cb.numRows());
@@ -193,13 +195,13 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
throw new IllegalStateException("nativeShuffleWriter should not be -1L");
}
splitResult = jniWrapper.stop(nativeShuffleWriter);
+ columnarDep.metrics().get("shuffleWallTime").get().add(System.nanoTime() -
startTime);
columnarDep
.metrics()
.get("splitTime")
.get()
.add(
- System.nanoTime()
- - startTime
+ columnarDep.metrics().get("shuffleWallTime").get().value()
- splitResult.getTotalPushTime()
- splitResult.getTotalWriteTime()
- splitResult.getTotalCompressTime());
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 5547feafe..586a277f7 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -137,7 +137,6 @@ class GlutenConfig(conf: SQLConf) extends Logging {
conf
.getConfString("spark.celeborn.client.spark.shuffle.writer",
GLUTEN_HASH_SHUFFLE_WRITER)
.toLowerCase(Locale.ROOT)
- .replace(GLUTEN_SORT_SHUFFLE_WRITER, GLUTEN_RSS_SORT_SHUFFLE_WRITER)
def enableColumnarShuffle: Boolean = conf.getConf(COLUMNAR_SHUFFLE_ENABLED)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]