This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new dfac04f6e [VL] Support celeborn sort based shuffle (#5675)
dfac04f6e is described below
commit dfac04f6e9f471a67248a2b9a2e582c9d6f22597
Author: Kerwin Zhang <[email protected]>
AuthorDate: Fri May 17 13:44:54 2024 +0800
[VL] Support celeborn sort based shuffle (#5675)
---
cpp/core/jni/JniCommon.h | 18 +-
cpp/core/jni/JniWrapper.cc | 30 +-
cpp/core/shuffle/FallbackRangePartitioner.cc | 18 +
cpp/core/shuffle/FallbackRangePartitioner.h | 6 +
cpp/core/shuffle/HashPartitioner.cc | 49 ++-
cpp/core/shuffle/HashPartitioner.h | 6 +
cpp/core/shuffle/LocalPartitionWriter.cc | 4 +
cpp/core/shuffle/LocalPartitionWriter.h | 2 +
cpp/core/shuffle/Options.h | 16 +-
cpp/core/shuffle/PartitionWriter.h | 6 +
cpp/core/shuffle/Partitioner.h | 9 +
cpp/core/shuffle/RoundRobinPartitioner.cc | 16 +
cpp/core/shuffle/RoundRobinPartitioner.h | 6 +
cpp/core/shuffle/ShuffleReader.cc | 4 +
cpp/core/shuffle/ShuffleReader.h | 6 +-
cpp/core/shuffle/ShuffleWriter.h | 12 +-
cpp/core/shuffle/SinglePartitioner.cc | 9 +
cpp/core/shuffle/SinglePartitioner.h | 6 +
cpp/core/shuffle/rss/RssClient.h | 2 +-
cpp/core/shuffle/rss/RssPartitionWriter.cc | 7 +
cpp/core/shuffle/rss/RssPartitionWriter.h | 2 +
cpp/velox/CMakeLists.txt | 3 +-
cpp/velox/benchmarks/GenericBenchmark.cc | 5 +-
cpp/velox/benchmarks/ShuffleSplitBenchmark.cc | 9 +-
cpp/velox/compute/VeloxRuntime.cc | 33 +-
...fleWriter.cc => VeloxHashBasedShuffleWriter.cc} | 161 +++-----
...uffleWriter.h => VeloxHashBasedShuffleWriter.h} | 92 +----
cpp/velox/shuffle/VeloxShuffleReader.cc | 115 +++++-
cpp/velox/shuffle/VeloxShuffleReader.h | 51 ++-
cpp/velox/shuffle/VeloxShuffleWriter.h | 405 ++++-----------------
cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc | 317 ++++++++++++++++
cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h | 117 ++++++
cpp/velox/tests/VeloxShuffleWriterTest.cc | 24 +-
cpp/velox/utils/tests/LocalRssClient.h | 2 +-
cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h | 102 ++++--
.../gluten/celeborn/CelebornShuffleManager.java | 5 -
.../CelebornHashBasedColumnarShuffleWriter.scala | 8 +
.../VeloxCelebornColumnarBatchSerializer.scala | 6 +-
...loxCelebornHashBasedColumnarShuffleWriter.scala | 10 +-
.../gluten/vectorized/ShuffleReaderJniWrapper.java | 3 +-
.../gluten/vectorized/ShuffleWriterJniWrapper.java | 16 +-
.../vectorized/ColumnarBatchSerializer.scala | 4 +-
.../spark/shuffle/ColumnarShuffleWriter.scala | 2 +-
.../writer/VeloxUniffleColumnarShuffleWriter.java | 6 +-
.../scala/org/apache/gluten/GlutenConfig.scala | 5 +
45 files changed, 1104 insertions(+), 631 deletions(-)
diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index 29c38689c..aa3b2b884 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -280,6 +280,20 @@ static inline arrow::Compression::type
getCompressionType(JNIEnv* env, jstring c
return compressionType;
}
+static inline const std::string getCompressionTypeStr(JNIEnv* env, jstring
codecJstr) {
+ if (codecJstr == NULL) {
+ return "none";
+ }
+ auto codec = env->GetStringUTFChars(codecJstr, JNI_FALSE);
+
+ // Convert codec string into lowercase.
+ std::string codecLower;
+ std::transform(codec, codec + std::strlen(codec),
std::back_inserter(codecLower), ::tolower);
+
+ env->ReleaseStringUTFChars(codecJstr, codec);
+ return codecLower;
+}
+
static inline gluten::CodecBackend getCodecBackend(JNIEnv* env, jstring
codecJstr) {
if (codecJstr == nullptr) {
return gluten::CodecBackend::NONE;
@@ -444,7 +458,7 @@ class JavaRssClient : public RssClient {
env->DeleteGlobalRef(array_);
}
- int32_t pushPartitionData(int32_t partitionId, char* bytes, int64_t size)
override {
+ int32_t pushPartitionData(int32_t partitionId, const char* bytes, int64_t
size) override {
JNIEnv* env;
if (vm_->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
throw gluten::GlutenException("JNIEnv was not attached to current
thread");
@@ -457,7 +471,7 @@ class JavaRssClient : public RssClient {
array_ = env->NewByteArray(size);
array_ = static_cast<jbyteArray>(env->NewGlobalRef(array_));
}
- env->SetByteArrayRegion(array_, 0, size, reinterpret_cast<jbyte*>(bytes));
+ env->SetByteArrayRegion(array_, 0, size, (jbyte*)bytes);
jint javaBytesSize = env->CallIntMethod(javaRssShuffleWriter_,
javaPushPartitionData_, partitionId, array_, size);
checkException(env);
return static_cast<int32_t>(javaBytesSize);
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 7363a9da0..e70a017e0 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -831,8 +831,10 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
jlong taskAttemptId,
jint startPartitionId,
jint pushBufferMaxSize,
+ jlong sortBufferMaxSize,
jobject partitionPusher,
- jstring partitionWriterTypeJstr) {
+ jstring partitionWriterTypeJstr,
+ jstring shuffleWriterTypeJstr) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
auto memoryManager = jniCastOrThrow<MemoryManager>(memoryManagerHandle);
@@ -866,10 +868,12 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
.mergeThreshold = mergeThreshold,
.compressionThreshold = compressionThreshold,
.compressionType = getCompressionType(env, codecJstr),
+ .compressionTypeStr = getCompressionTypeStr(env, codecJstr),
.compressionLevel = compressionLevel,
.bufferedWrite = true,
.numSubDirs = numSubDirs,
- .pushBufferMaxSize = pushBufferMaxSize > 0 ? pushBufferMaxSize :
kDefaultShuffleWriterBufferSize};
+ .pushBufferMaxSize = pushBufferMaxSize > 0 ? pushBufferMaxSize :
kDefaultPushMemoryThreshold,
+ .sortBufferMaxSize = sortBufferMaxSize > 0 ? sortBufferMaxSize :
kDefaultSortBufferThreshold};
if (codecJstr != NULL) {
partitionWriterOptions.codecBackend = getCodecBackend(env,
codecBackendJstr);
partitionWriterOptions.compressionMode = getCompressionMode(env,
compressionModeJstr);
@@ -879,6 +883,15 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
auto partitionWriterTypeC = env->GetStringUTFChars(partitionWriterTypeJstr,
JNI_FALSE);
auto partitionWriterType = std::string(partitionWriterTypeC);
env->ReleaseStringUTFChars(partitionWriterTypeJstr, partitionWriterTypeC);
+
+ auto shuffleWriterTypeC = env->GetStringUTFChars(shuffleWriterTypeJstr,
JNI_FALSE);
+ auto shuffleWriterType = std::string(shuffleWriterTypeC);
+ env->ReleaseStringUTFChars(shuffleWriterTypeJstr, shuffleWriterTypeC);
+
+ if (shuffleWriterType == "sort") {
+ shuffleWriterOptions.shuffleWriterType = kSortShuffle;
+ }
+
if (partitionWriterType == "local") {
if (dataFileJstr == NULL) {
throw gluten::GlutenException(std::string("Shuffle DataFile can't be
null"));
@@ -962,7 +975,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
JNI_METHOD_END(kInvalidResourceHandle)
}
-JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrapper_split( // NOLINT
+JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrapper_write( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong shuffleWriterHandle,
@@ -981,7 +994,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
// The column batch maybe VeloxColumnBatch or
ArrowCStructColumnarBatch(FallbackRangeShuffleWriter)
auto batch = ctx->objectStore()->retrieve<ColumnarBatch>(batchHandle);
auto numBytes = batch->numBytes();
- gluten::arrowAssertOkOrThrow(shuffleWriter->split(batch, memLimit), "Native
split: shuffle writer split failed");
+ gluten::arrowAssertOkOrThrow(shuffleWriter->write(batch, memLimit), "Native
write: shuffle writer failed");
return numBytes;
JNI_METHOD_END(kInvalidResourceHandle)
}
@@ -1058,7 +1071,8 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
jlong memoryManagerHandle,
jstring compressionType,
jstring compressionBackend,
- jint batchSize) {
+ jint batchSize,
+ jstring shuffleWriterType) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
auto memoryManager = jniCastOrThrow<MemoryManager>(memoryManagerHandle);
@@ -1066,11 +1080,16 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
auto pool = memoryManager->getArrowMemoryPool();
ShuffleReaderOptions options = ShuffleReaderOptions{};
options.compressionType = getCompressionType(env, compressionType);
+ options.compressionTypeStr = getCompressionTypeStr(env, compressionType);
if (compressionType != nullptr) {
options.codecBackend = getCodecBackend(env, compressionBackend);
}
options.batchSize = batchSize;
// TODO: Add coalesce option and maximum coalesced size.
+
+ if (jStringToCString(env, shuffleWriterType) == "sort") {
+ options.shuffleWriterType = kSortShuffle;
+ }
std::shared_ptr<arrow::Schema> schema =
gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast<struct
ArrowSchema*>(cSchema)));
@@ -1085,7 +1104,6 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
jobject jniIn) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
-
auto reader =
ctx->objectStore()->retrieve<ShuffleReader>(shuffleReaderHandle);
std::shared_ptr<arrow::io::InputStream> in =
std::make_shared<JavaInputStreamAdaptor>(env, reader->getPool(), jniIn);
auto outItr = reader->readStream(in);
diff --git a/cpp/core/shuffle/FallbackRangePartitioner.cc
b/cpp/core/shuffle/FallbackRangePartitioner.cc
index 4bad50b51..677fcd114 100644
--- a/cpp/core/shuffle/FallbackRangePartitioner.cc
+++ b/cpp/core/shuffle/FallbackRangePartitioner.cc
@@ -39,4 +39,22 @@ arrow::Status gluten::FallbackRangePartitioner::compute(
return arrow::Status::OK();
}
+arrow::Status gluten::FallbackRangePartitioner::compute(
+ const int32_t* pidArr,
+ const int64_t numRows,
+ const int32_t vectorIndex,
+ std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) {
+ auto index = static_cast<int64_t>(vectorIndex) << 32;
+ for (auto i = 0; i < numRows; ++i) {
+ auto pid = pidArr[i];
+ int64_t combined = index | (i & 0xFFFFFFFFLL);
+ auto& vec = rowVectorIndexMap[pid];
+ vec.push_back(combined);
+ if (pid >= numPartitions_) {
+ return arrow::Status::Invalid(
+ "Partition id ", std::to_string(pid), " is equal or greater than ",
std::to_string(numPartitions_));
+ }
+ }
+ return arrow::Status::OK();
+}
} // namespace gluten
diff --git a/cpp/core/shuffle/FallbackRangePartitioner.h
b/cpp/core/shuffle/FallbackRangePartitioner.h
index f54dd1abc..b06ce7e17 100644
--- a/cpp/core/shuffle/FallbackRangePartitioner.h
+++ b/cpp/core/shuffle/FallbackRangePartitioner.h
@@ -30,6 +30,12 @@ class FallbackRangePartitioner final : public Partitioner {
const int64_t numRows,
std::vector<uint32_t>& row2partition,
std::vector<uint32_t>& partition2RowCount) override;
+
+ arrow::Status compute(
+ const int32_t* pidArr,
+ const int64_t numRows,
+ const int32_t vectorIndex,
+ std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap)
override;
};
} // namespace gluten
diff --git a/cpp/core/shuffle/HashPartitioner.cc
b/cpp/core/shuffle/HashPartitioner.cc
index c62e3185f..4a26dc67b 100644
--- a/cpp/core/shuffle/HashPartitioner.cc
+++ b/cpp/core/shuffle/HashPartitioner.cc
@@ -19,6 +19,24 @@
namespace gluten {
+int32_t computePid(const int32_t* pidArr, int64_t i, int32_t numPartitions) {
+ auto pid = pidArr[i] % numPartitions;
+#if defined(__x86_64__)
+ // force to generate ASM
+ __asm__(
+ "lea (%[num_partitions],%[pid],1),%[tmp]\n"
+ "test %[pid],%[pid]\n"
+ "cmovs %[tmp],%[pid]\n"
+ : [pid] "+r"(pid)
+ : [num_partitions] "r"(numPartitions), [tmp] "r"(0));
+#else
+ if (pid < 0) {
+ pid += numPartitions_;
+ }
+#endif
+ return pid;
+}
+
arrow::Status gluten::HashPartitioner::compute(
const int32_t* pidArr,
const int64_t numRows,
@@ -28,20 +46,7 @@ arrow::Status gluten::HashPartitioner::compute(
std::fill(std::begin(partition2RowCount), std::end(partition2RowCount), 0);
for (auto i = 0; i < numRows; ++i) {
- auto pid = pidArr[i] % numPartitions_;
-#if defined(__x86_64__)
- // force to generate ASM
- __asm__(
- "lea (%[num_partitions],%[pid],1),%[tmp]\n"
- "test %[pid],%[pid]\n"
- "cmovs %[tmp],%[pid]\n"
- : [pid] "+r"(pid)
- : [num_partitions] "r"(numPartitions_), [tmp] "r"(0));
-#else
- if (pid < 0) {
- pid += numPartitions_;
- }
-#endif
+ auto pid = computePid(pidArr, i, numPartitions_);
row2partition[i] = pid;
}
@@ -52,4 +57,20 @@ arrow::Status gluten::HashPartitioner::compute(
return arrow::Status::OK();
}
+arrow::Status gluten::HashPartitioner::compute(
+ const int32_t* pidArr,
+ const int64_t numRows,
+ const int32_t vectorIndex,
+ std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) {
+ auto index = static_cast<int64_t>(vectorIndex) << 32;
+ for (auto i = 0; i < numRows; ++i) {
+ auto pid = computePid(pidArr, i, numPartitions_);
+ int64_t combined = index | (i & 0xFFFFFFFFLL);
+ auto& vec = rowVectorIndexMap[pid];
+ vec.push_back(combined);
+ }
+
+ return arrow::Status::OK();
+}
+
} // namespace gluten
diff --git a/cpp/core/shuffle/HashPartitioner.h
b/cpp/core/shuffle/HashPartitioner.h
index fff01f939..6cd664634 100644
--- a/cpp/core/shuffle/HashPartitioner.h
+++ b/cpp/core/shuffle/HashPartitioner.h
@@ -30,6 +30,12 @@ class HashPartitioner final : public Partitioner {
const int64_t numRows,
std::vector<uint32_t>& row2partition,
std::vector<uint32_t>& partition2RowCount) override;
+
+ arrow::Status compute(
+ const int32_t* pidArr,
+ const int64_t numRows,
+ const int32_t vectorIndex,
+ std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap)
override;
};
} // namespace gluten
diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc
b/cpp/core/shuffle/LocalPartitionWriter.cc
index 0582ce0e5..2fa0b954f 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -541,6 +541,10 @@ arrow::Status LocalPartitionWriter::evict(
return arrow::Status::OK();
}
+arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, int64_t
rawSize, const char* data, int64_t length) {
+ return arrow::Status::NotImplemented("Invalid code path for local shuffle
writer: sort based is not supported.");
+}
+
arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t*
actual) {
// Finish last spiller.
RETURN_NOT_OK(finishSpill());
diff --git a/cpp/core/shuffle/LocalPartitionWriter.h
b/cpp/core/shuffle/LocalPartitionWriter.h
index 2cf4f2fd9..c2bfacd4b 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.h
+++ b/cpp/core/shuffle/LocalPartitionWriter.h
@@ -42,6 +42,8 @@ class LocalPartitionWriter : public PartitionWriter {
bool reuseBuffers,
bool hasComplexType) override;
+ arrow::Status evict(uint32_t partitionId, int64_t rawSize, const char* data,
int64_t length) override;
+
/// The stop function performs several tasks:
/// 1. Opens the final data file.
/// 2. Iterates over each partition ID (pid) to:
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index d8fe1c802..4317ed631 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -25,18 +25,24 @@
namespace gluten {
static constexpr int16_t kDefaultBatchSize = 4096;
-static constexpr int16_t kDefaultShuffleWriterBufferSize = 4096;
+static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096;
+static constexpr int64_t kDefaultSortBufferThreshold = 64000000000;
+static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
static constexpr int32_t kDefaultNumSubDirs = 64;
static constexpr int32_t kDefaultCompressionThreshold = 100;
+static const std::string kDefaultCompressionTypeStr = "lz4";
static constexpr int32_t kDefaultBufferAlignment = 64;
static constexpr double kDefaultBufferReallocThreshold = 0.25;
static constexpr double kDefaultMergeBufferThreshold = 0.25;
static constexpr bool kEnableBufferedWrite = true;
+enum ShuffleWriterType { kHashShuffle, kSortShuffle };
enum PartitionWriterType { kLocal, kRss };
struct ShuffleReaderOptions {
arrow::Compression::type compressionType =
arrow::Compression::type::LZ4_FRAME;
+ std::string compressionTypeStr = "lz4";
+ ShuffleWriterType shuffleWriterType = kHashShuffle;
CodecBackend codecBackend = CodecBackend::NONE;
int32_t batchSize = kDefaultBatchSize;
};
@@ -44,18 +50,20 @@ struct ShuffleReaderOptions {
struct ShuffleWriterOptions {
int32_t bufferSize = kDefaultShuffleWriterBufferSize;
double bufferReallocThreshold = kDefaultBufferReallocThreshold;
+ int64_t pushMemoryThreshold = kDefaultPushMemoryThreshold;
Partitioning partitioning = Partitioning::kRoundRobin;
int64_t taskAttemptId = -1;
int32_t startPartitionId = 0;
int64_t threadId = -1;
+ ShuffleWriterType shuffleWriterType = kHashShuffle;
};
struct PartitionWriterOptions {
int32_t mergeBufferSize = kDefaultShuffleWriterBufferSize;
double mergeThreshold = kDefaultMergeBufferThreshold;
-
int32_t compressionThreshold = kDefaultCompressionThreshold;
arrow::Compression::type compressionType = arrow::Compression::LZ4_FRAME;
+ std::string compressionTypeStr = kDefaultCompressionTypeStr;
CodecBackend codecBackend = CodecBackend::NONE;
int32_t compressionLevel = arrow::util::kUseDefaultCompressionLevel;
CompressionMode compressionMode = CompressionMode::BUFFER;
@@ -64,7 +72,9 @@ struct PartitionWriterOptions {
int32_t numSubDirs = kDefaultNumSubDirs;
- int32_t pushBufferMaxSize = kDefaultShuffleWriterBufferSize;
+ int64_t pushBufferMaxSize = kDefaultPushMemoryThreshold;
+
+ int64_t sortBufferMaxSize = kDefaultSortBufferThreshold;
};
struct ShuffleWriterMetrics {
diff --git a/cpp/core/shuffle/PartitionWriter.h
b/cpp/core/shuffle/PartitionWriter.h
index 42e97cf06..93a6d04fe 100644
--- a/cpp/core/shuffle/PartitionWriter.h
+++ b/cpp/core/shuffle/PartitionWriter.h
@@ -49,10 +49,16 @@ class PartitionWriter : public Reclaimable {
bool reuseBuffers,
bool hasComplexType) = 0;
+ virtual arrow::Status evict(uint32_t partitionId, int64_t rawSize, const
char* data, int64_t length) = 0;
+
uint64_t cachedPayloadSize() {
return payloadPool_->bytes_allocated();
}
+ PartitionWriterOptions& options() {
+ return options_;
+ }
+
protected:
uint32_t numPartitions_;
PartitionWriterOptions options_;
diff --git a/cpp/core/shuffle/Partitioner.h b/cpp/core/shuffle/Partitioner.h
index 8331b8a91..b233f5b82 100644
--- a/cpp/core/shuffle/Partitioner.h
+++ b/cpp/core/shuffle/Partitioner.h
@@ -18,7 +18,10 @@
#pragma once
#include <arrow/result.h>
+#include <folly/container/F14Map.h>
+
#include <memory>
+#include <unordered_map>
#include <vector>
#include "shuffle/Partitioning.h"
@@ -40,6 +43,12 @@ class Partitioner {
std::vector<uint32_t>& row2partition,
std::vector<uint32_t>& partition2RowCount) = 0;
+ virtual arrow::Status compute(
+ const int32_t* pidArr,
+ const int64_t numRows,
+ const int32_t vectorIndex,
+ std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) =
0;
+
protected:
Partitioner(int32_t numPartitions, bool hasPid) :
numPartitions_(numPartitions), hasPid_(hasPid) {}
diff --git a/cpp/core/shuffle/RoundRobinPartitioner.cc
b/cpp/core/shuffle/RoundRobinPartitioner.cc
index b00680a18..196f9308d 100644
--- a/cpp/core/shuffle/RoundRobinPartitioner.cc
+++ b/cpp/core/shuffle/RoundRobinPartitioner.cc
@@ -39,4 +39,20 @@ arrow::Status gluten::RoundRobinPartitioner::compute(
return arrow::Status::OK();
}
+arrow::Status gluten::RoundRobinPartitioner::compute(
+ const int32_t* pidArr,
+ const int64_t numRows,
+ const int32_t vectorIndex,
+ std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) {
+ auto index = static_cast<int64_t>(vectorIndex) << 32;
+ for (int32_t i = 0; i < numRows; ++i) {
+ int64_t combined = index | (i & 0xFFFFFFFFLL);
+ auto& vec = rowVectorIndexMap[pidSelection_];
+ vec.push_back(combined);
+ pidSelection_ = (pidSelection_ + 1) % numPartitions_;
+ }
+
+ return arrow::Status::OK();
+}
+
} // namespace gluten
diff --git a/cpp/core/shuffle/RoundRobinPartitioner.h
b/cpp/core/shuffle/RoundRobinPartitioner.h
index 5afd2832a..126a08eb9 100644
--- a/cpp/core/shuffle/RoundRobinPartitioner.h
+++ b/cpp/core/shuffle/RoundRobinPartitioner.h
@@ -32,6 +32,12 @@ class RoundRobinPartitioner final : public Partitioner {
std::vector<uint32_t>& row2Partition,
std::vector<uint32_t>& partition2RowCount) override;
+ arrow::Status compute(
+ const int32_t* pidArr,
+ const int64_t numRows,
+ const int32_t vectorIndex,
+ std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap)
override;
+
private:
friend class RoundRobinPartitionerTest;
diff --git a/cpp/core/shuffle/ShuffleReader.cc
b/cpp/core/shuffle/ShuffleReader.cc
index 471409d6d..faa81b522 100644
--- a/cpp/core/shuffle/ShuffleReader.cc
+++ b/cpp/core/shuffle/ShuffleReader.cc
@@ -48,6 +48,10 @@ int64_t ShuffleReader::getIpcTime() const {
return ipcTime_;
}
+ShuffleWriterType ShuffleReader::getShuffleWriterType() const {
+ return factory_->getShuffleWriterType();
+}
+
int64_t ShuffleReader::getDeserializeTime() const {
return factory_->getDeserializeTime();
}
diff --git a/cpp/core/shuffle/ShuffleReader.h b/cpp/core/shuffle/ShuffleReader.h
index 4ba105712..5cef14768 100644
--- a/cpp/core/shuffle/ShuffleReader.h
+++ b/cpp/core/shuffle/ShuffleReader.h
@@ -39,6 +39,8 @@ class DeserializerFactory {
virtual int64_t getDecompressTime() = 0;
virtual int64_t getDeserializeTime() = 0;
+
+ virtual ShuffleWriterType getShuffleWriterType() = 0;
};
class ShuffleReader {
@@ -60,13 +62,15 @@ class ShuffleReader {
arrow::MemoryPool* getPool() const;
+ ShuffleWriterType getShuffleWriterType() const;
+
protected:
arrow::MemoryPool* pool_;
int64_t decompressTime_ = 0;
int64_t ipcTime_ = 0;
int64_t deserializeTime_ = 0;
- ShuffleReaderOptions options_;
+ ShuffleWriterType shuffleWriterType_;
private:
std::shared_ptr<arrow::Schema> schema_;
diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h
index bcf0c2c3b..a7987ce3e 100644
--- a/cpp/core/shuffle/ShuffleWriter.h
+++ b/cpp/core/shuffle/ShuffleWriter.h
@@ -37,7 +37,7 @@ class ShuffleWriter : public Reclaimable {
public:
static constexpr int64_t kMinMemLimit = 128LL * 1024 * 1024;
- virtual arrow::Status split(std::shared_ptr<ColumnarBatch> cb, int64_t
memLimit) = 0;
+ virtual arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t
memLimit) = 0;
virtual arrow::Status stop() = 0;
@@ -45,6 +45,10 @@ class ShuffleWriter : public Reclaimable {
return numPartitions_;
}
+ ShuffleWriterOptions& options() {
+ return options_;
+ }
+
int64_t partitionBufferSize() const {
return partitionBufferPool_->bytes_allocated();
}
@@ -81,7 +85,9 @@ class ShuffleWriter : public Reclaimable {
return metrics_.rawPartitionLengths;
}
- virtual const uint64_t cachedPayloadSize() const = 0;
+ const int64_t rawPartitionBytes() {
+ return std::accumulate(metrics_.rawPartitionLengths.begin(),
metrics_.rawPartitionLengths.end(), 0LL);
+ }
protected:
ShuffleWriter(
@@ -108,6 +114,8 @@ class ShuffleWriter : public Reclaimable {
std::unique_ptr<PartitionWriter> partitionWriter_;
+ std::vector<int64_t> rowVectorLengths_;
+
std::shared_ptr<arrow::Schema> schema_;
// Column index, partition id, buffers.
diff --git a/cpp/core/shuffle/SinglePartitioner.cc
b/cpp/core/shuffle/SinglePartitioner.cc
index c4f80ce79..981a5b8e4 100644
--- a/cpp/core/shuffle/SinglePartitioner.cc
+++ b/cpp/core/shuffle/SinglePartitioner.cc
@@ -28,4 +28,13 @@ arrow::Status gluten::SinglePartitioner::compute(
return arrow::Status::OK();
}
+arrow::Status gluten::SinglePartitioner::compute(
+ const int32_t* pidArr,
+ const int64_t numRows,
+ const int32_t vectorIndex,
+ std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) {
+ // nothing is need do here
+ return arrow::Status::OK();
+}
+
} // namespace gluten
diff --git a/cpp/core/shuffle/SinglePartitioner.h
b/cpp/core/shuffle/SinglePartitioner.h
index d3d2c29f7..e5d7a920f 100644
--- a/cpp/core/shuffle/SinglePartitioner.h
+++ b/cpp/core/shuffle/SinglePartitioner.h
@@ -29,5 +29,11 @@ class SinglePartitioner final : public Partitioner {
const int64_t numRows,
std::vector<uint32_t>& row2partition,
std::vector<uint32_t>& partition2RowCount) override;
+
+ arrow::Status compute(
+ const int32_t* pidArr,
+ const int64_t numRows,
+ const int32_t vectorIndex,
+ std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap)
override;
};
} // namespace gluten
diff --git a/cpp/core/shuffle/rss/RssClient.h b/cpp/core/shuffle/rss/RssClient.h
index 9209430b0..dddccfa1a 100644
--- a/cpp/core/shuffle/rss/RssClient.h
+++ b/cpp/core/shuffle/rss/RssClient.h
@@ -21,7 +21,7 @@ class RssClient {
public:
virtual ~RssClient() = default;
- virtual int32_t pushPartitionData(int32_t partitionId, char* bytes, int64_t
size) = 0;
+ virtual int32_t pushPartitionData(int32_t partitionId, const char* bytes,
int64_t size) = 0;
virtual void stop() = 0;
};
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 15981bf8d..015129e26 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -73,4 +73,11 @@ arrow::Status RssPartitionWriter::evict(
partitionId,
reinterpret_cast<char*>(const_cast<uint8_t*>(buffer->data())), buffer->size());
return arrow::Status::OK();
}
+
+arrow::Status RssPartitionWriter::evict(uint32_t partitionId, int64_t rawSize,
const char* data, int64_t length) {
+ rawPartitionLengths_[partitionId] += rawSize;
+ ScopedTimer timer(&spillTime_);
+ bytesEvicted_[partitionId] += rssClient_->pushPartitionData(partitionId,
data, length);
+ return arrow::Status::OK();
+}
} // namespace gluten
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h
b/cpp/core/shuffle/rss/RssPartitionWriter.h
index ef43017fc..b8cc1551c 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.h
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.h
@@ -44,6 +44,8 @@ class RssPartitionWriter final : public RemotePartitionWriter
{
bool reuseBuffers,
bool hasComplexType) override;
+ arrow::Status evict(uint32_t partitionId, int64_t rawSize, const char* data,
int64_t length) override;
+
arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override;
arrow::Status stop(ShuffleWriterMetrics* metrics) override;
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 9e5f08b1c..c058883b6 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -310,7 +310,8 @@ set(VELOX_SRCS
operators/serializer/VeloxRowToColumnarConverter.cc
operators/writer/VeloxParquetDatasource.cc
shuffle/VeloxShuffleReader.cc
- shuffle/VeloxShuffleWriter.cc
+ shuffle/VeloxHashBasedShuffleWriter.cc
+ shuffle/VeloxSortBasedShuffleWriter.cc
substrait/SubstraitParser.cc
substrait/SubstraitToVeloxExpr.cc
substrait/SubstraitToVeloxPlan.cc
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc
b/cpp/velox/benchmarks/GenericBenchmark.cc
index 14593c8df..71d3d96b5 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -31,6 +31,7 @@
#include "compute/VeloxRuntime.h"
#include "config/GlutenConfig.h"
#include "shuffle/LocalPartitionWriter.h"
+#include "shuffle/VeloxHashBasedShuffleWriter.h"
#include "shuffle/VeloxShuffleWriter.h"
#include "shuffle/rss/RssPartitionWriter.h"
#include "utils/StringUtil.h"
@@ -111,7 +112,7 @@ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(
options.partitioning = gluten::toPartitioning(FLAGS_partitioning);
GLUTEN_ASSIGN_OR_THROW(
auto shuffleWriter,
- VeloxShuffleWriter::create(
+ VeloxHashBasedShuffleWriter::create(
FLAGS_shuffle_partitions,
std::move(partitionWriter),
std::move(options),
@@ -191,7 +192,7 @@ auto BM_Generic = [](::benchmark::State& state,
GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(dataFile, localDirs,
isFromEnv));
const auto& shuffleWriter = createShuffleWriter(memoryManager.get(),
dataFile, localDirs);
while (resultIter->hasNext()) {
- GLUTEN_THROW_NOT_OK(shuffleWriter->split(resultIter->next(),
ShuffleWriter::kMinMemLimit));
+ GLUTEN_THROW_NOT_OK(shuffleWriter->write(resultIter->next(),
ShuffleWriter::kMinMemLimit));
}
GLUTEN_THROW_NOT_OK(shuffleWriter->stop());
TIME_NANO_END(shuffleWriteTime);
diff --git a/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc
b/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc
index 0109de603..4a4bb69b8 100644
--- a/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc
+++ b/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc
@@ -31,6 +31,7 @@
#include "benchmarks/common/BenchmarkUtils.h"
#include "memory/ColumnarBatch.h"
#include "shuffle/LocalPartitionWriter.h"
+#include "shuffle/VeloxHashBasedShuffleWriter.h"
#include "shuffle/VeloxShuffleWriter.h"
#include "utils/TestUtils.h"
#include "utils/VeloxArrowUtils.h"
@@ -259,7 +260,7 @@ class BenchmarkShuffleSplitCacheScanBenchmark : public
BenchmarkShuffleSplit {
numPartitions, PartitionWriterOptions{},
defaultArrowMemoryPool().get(), dataFile, localDirs);
GLUTEN_ASSIGN_OR_THROW(
shuffleWriter,
- VeloxShuffleWriter::create(
+ VeloxHashBasedShuffleWriter::create(
numPartitions,
std::move(partitionWriter),
std::move(options),
@@ -294,7 +295,7 @@ class BenchmarkShuffleSplitCacheScanBenchmark : public
BenchmarkShuffleSplit {
[&shuffleWriter, &splitTime](const
std::shared_ptr<arrow::RecordBatch>& recordBatch) {
std::shared_ptr<ColumnarBatch> cb;
ARROW_ASSIGN_OR_THROW(cb,
recordBatch2VeloxColumnarBatch(*recordBatch));
- TIME_NANO_OR_THROW(splitTime, shuffleWriter->split(cb,
ShuffleWriter::kMinMemLimit));
+ TIME_NANO_OR_THROW(splitTime, shuffleWriter->write(cb,
ShuffleWriter::kMinMemLimit));
});
// LOG(INFO) << " split done memory allocated = " <<
// options.memoryPool->bytes_allocated();
@@ -327,7 +328,7 @@ class BenchmarkShuffleSplitIterateScanBenchmark : public
BenchmarkShuffleSplit {
numPartitions, PartitionWriterOptions{},
defaultArrowMemoryPool().get(), dataFile, localDirs);
GLUTEN_ASSIGN_OR_THROW(
shuffleWriter,
- VeloxShuffleWriter::create(
+ VeloxHashBasedShuffleWriter::create(
numPartitions,
std::move(partitionWriter),
std::move(options),
@@ -350,7 +351,7 @@ class BenchmarkShuffleSplitIterateScanBenchmark : public
BenchmarkShuffleSplit {
numRows += recordBatch->num_rows();
std::shared_ptr<ColumnarBatch> cb;
ARROW_ASSIGN_OR_THROW(cb,
recordBatch2VeloxColumnarBatch(*recordBatch));
- TIME_NANO_OR_THROW(splitTime, shuffleWriter->split(cb,
ShuffleWriter::kMinMemLimit));
+ TIME_NANO_OR_THROW(splitTime, shuffleWriter->write(cb,
ShuffleWriter::kMinMemLimit));
TIME_NANO_OR_THROW(elapseRead,
recordBatchReader->ReadNext(&recordBatch));
}
}
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index a3e8c159c..15c84b41c 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -28,8 +28,9 @@
#include "compute/VeloxPlanConverter.h"
#include "config/VeloxConfig.h"
#include "operators/serializer/VeloxRowToColumnarConverter.h"
+#include "shuffle/VeloxHashBasedShuffleWriter.h"
#include "shuffle/VeloxShuffleReader.h"
-#include "shuffle/VeloxShuffleWriter.h"
+#include "shuffle/VeloxSortBasedShuffleWriter.h"
#include "utils/ConfigExtractor.h"
#include "utils/VeloxArrowUtils.h"
@@ -187,10 +188,19 @@ std::shared_ptr<ShuffleWriter>
VeloxRuntime::createShuffleWriter(
MemoryManager* memoryManager) {
auto ctxPool = getLeafVeloxPool(memoryManager);
auto arrowPool = memoryManager->getArrowMemoryPool();
- GLUTEN_ASSIGN_OR_THROW(
- auto shuffle_writer,
- VeloxShuffleWriter::create(numPartitions, std::move(partitionWriter),
std::move(options), ctxPool, arrowPool));
- return shuffle_writer;
+ std::shared_ptr<ShuffleWriter> shuffleWriter;
+ if (options.shuffleWriterType == kHashShuffle) {
+ GLUTEN_ASSIGN_OR_THROW(
+ shuffleWriter,
+ VeloxHashBasedShuffleWriter::create(
+ numPartitions, std::move(partitionWriter), std::move(options),
ctxPool, arrowPool));
+ } else if (options.shuffleWriterType == kSortShuffle) {
+ GLUTEN_ASSIGN_OR_THROW(
+ shuffleWriter,
+ VeloxSortBasedShuffleWriter::create(
+ numPartitions, std::move(partitionWriter), std::move(options),
ctxPool, arrowPool));
+ }
+ return shuffleWriter;
}
std::shared_ptr<Datasource> VeloxRuntime::createDatasource(
@@ -242,9 +252,18 @@ std::shared_ptr<ShuffleReader>
VeloxRuntime::createShuffleReader(
auto rowType = facebook::velox::asRowType(gluten::fromArrowSchema(schema));
auto codec = gluten::createArrowIpcCodec(options.compressionType,
options.codecBackend);
auto ctxVeloxPool = getLeafVeloxPool(memoryManager);
+ auto veloxCompressionType =
facebook::velox::common::stringToCompressionKind(options.compressionTypeStr);
auto deserializerFactory =
std::make_unique<gluten::VeloxColumnarBatchDeserializerFactory>(
- schema, std::move(codec), rowType, options.batchSize, pool,
ctxVeloxPool);
- return std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
+ schema,
+ std::move(codec),
+ veloxCompressionType,
+ rowType,
+ options.batchSize,
+ pool,
+ ctxVeloxPool,
+ options.shuffleWriterType);
+ auto reader =
std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
+ return reader;
}
std::unique_ptr<ColumnarBatchSerializer>
VeloxRuntime::createColumnarBatchSerializer(
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc
b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
similarity index 90%
rename from cpp/velox/shuffle/VeloxShuffleWriter.cc
rename to cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
index b304565e5..daff13703 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-#include "VeloxShuffleWriter.h"
+#include "VeloxHashBasedShuffleWriter.h"
#include "memory/ArrowMemory.h"
#include "memory/VeloxColumnarBatch.h"
#include "memory/VeloxMemoryManager.h"
@@ -70,58 +70,6 @@ bool vectorHasNull(const facebook::velox::VectorPtr& vp) {
return vp->countNulls(vp->nulls(), vp->size()) != 0;
}
-facebook::velox::RowVectorPtr getStrippedRowVector(const
facebook::velox::RowVector& rv) {
- // get new row type
- auto rowType = rv.type()->asRow();
- auto typeChildren = rowType.children();
- typeChildren.erase(typeChildren.begin());
- auto newRowType = facebook::velox::ROW(std::move(typeChildren));
-
- // get length
- auto length = rv.size();
-
- // get children
- auto children = rv.children();
- children.erase(children.begin());
-
- return std::make_shared<facebook::velox::RowVector>(
- rv.pool(), newRowType, facebook::velox::BufferPtr(nullptr), length,
std::move(children));
-}
-
-const int32_t* getFirstColumn(const facebook::velox::RowVector& rv) {
- VELOX_CHECK(rv.childrenSize() > 0, "RowVector missing partition id column.");
-
- auto& firstChild = rv.childAt(0);
- VELOX_CHECK(firstChild->isFlatEncoding(), "Partition id (field 0) is not
flat encoding.");
- VELOX_CHECK(
- firstChild->type()->isInteger(),
- "Partition id (field 0) should be integer, but got {}",
- firstChild->type()->toString());
-
- // first column is partition key hash value or pid
- return firstChild->asFlatVector<int32_t>()->rawValues();
-}
-
-class EvictGuard {
- public:
- explicit EvictGuard(EvictState& evictState) : evictState_(evictState) {
- evictState_ = EvictState::kUnevictable;
- }
-
- ~EvictGuard() {
- evictState_ = EvictState::kEvictable;
- }
-
- // For safety and clarity.
- EvictGuard(const EvictGuard&) = delete;
- EvictGuard& operator=(const EvictGuard&) = delete;
- EvictGuard(EvictGuard&&) = delete;
- EvictGuard& operator=(EvictGuard&&) = delete;
-
- private:
- EvictState& evictState_;
-};
-
class BinaryArrayResizeGuard {
public:
explicit BinaryArrayResizeGuard(BinaryArrayResizeState& state) :
state_(state) {
@@ -199,19 +147,19 @@ arrow::Status
collectFlatVectorBuffer<facebook::velox::TypeKind::VARBINARY>(
} // namespace
-arrow::Result<std::shared_ptr<VeloxShuffleWriter>> VeloxShuffleWriter::create(
+arrow::Result<std::shared_ptr<VeloxShuffleWriter>>
VeloxHashBasedShuffleWriter::create(
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
arrow::MemoryPool* arrowPool) {
- std::shared_ptr<VeloxShuffleWriter> res(
- new VeloxShuffleWriter(numPartitions, std::move(partitionWriter),
std::move(options), veloxPool, arrowPool));
+ std::shared_ptr<VeloxHashBasedShuffleWriter> res(new
VeloxHashBasedShuffleWriter(
+ numPartitions, std::move(partitionWriter), std::move(options),
veloxPool, arrowPool));
RETURN_NOT_OK(res->init());
return res;
} // namespace gluten
-arrow::Status VeloxShuffleWriter::init() {
+arrow::Status VeloxHashBasedShuffleWriter::init() {
#if defined(__x86_64__)
supportAvx512_ = __builtin_cpu_supports("avx512bw");
#else
@@ -235,7 +183,7 @@ arrow::Status VeloxShuffleWriter::init() {
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::initPartitions() {
+arrow::Status VeloxHashBasedShuffleWriter::initPartitions() {
auto simpleColumnCount = simpleColumnIndices_.size();
partitionValidityAddrs_.resize(simpleColumnCount);
@@ -260,15 +208,11 @@ arrow::Status VeloxShuffleWriter::initPartitions() {
return arrow::Status::OK();
}
-int64_t VeloxShuffleWriter::rawPartitionBytes() const {
- return std::accumulate(metrics_.rawPartitionLengths.begin(),
metrics_.rawPartitionLengths.end(), 0LL);
-}
-
-void VeloxShuffleWriter::setPartitionBufferSize(uint32_t newSize) {
+void VeloxHashBasedShuffleWriter::setPartitionBufferSize(uint32_t newSize) {
options_.bufferSize = newSize;
}
-arrow::Result<std::shared_ptr<arrow::Buffer>>
VeloxShuffleWriter::generateComplexTypeBuffers(
+arrow::Result<std::shared_ptr<arrow::Buffer>>
VeloxHashBasedShuffleWriter::generateComplexTypeBuffers(
facebook::velox::RowVectorPtr vector) {
auto arena =
std::make_unique<facebook::velox::StreamArena>(veloxPool_.get());
auto serializer =
@@ -291,7 +235,7 @@ arrow::Result<std::shared_ptr<arrow::Buffer>>
VeloxShuffleWriter::generateComple
return valueBuffer;
}
-arrow::Status VeloxShuffleWriter::split(std::shared_ptr<ColumnarBatch> cb,
int64_t memLimit) {
+arrow::Status
VeloxHashBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, int64_t
memLimit) {
if (options_.partitioning == Partitioning::kSingle) {
auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
VELOX_CHECK_NOT_NULL(veloxColumnBatch);
@@ -357,7 +301,7 @@ arrow::Status
VeloxShuffleWriter::split(std::shared_ptr<ColumnarBatch> cb, int64
return arrow::Status::OK();
}
-arrow::Status
VeloxShuffleWriter::partitioningAndDoSplit(facebook::velox::RowVectorPtr rv,
int64_t memLimit) {
+arrow::Status
VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velox::RowVectorPtr
rv, int64_t memLimit) {
if (partitioner_->hasPid()) {
auto pidArr = getFirstColumn(*rv);
START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]);
@@ -376,7 +320,7 @@ arrow::Status
VeloxShuffleWriter::partitioningAndDoSplit(facebook::velox::RowVec
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::stop() {
+arrow::Status VeloxHashBasedShuffleWriter::stop() {
if (options_.partitioning != Partitioning::kSingle) {
for (auto pid = 0; pid < numPartitions_; ++pid) {
RETURN_NOT_OK(evictPartitionBuffers(pid, false));
@@ -394,7 +338,7 @@ arrow::Status VeloxShuffleWriter::stop() {
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::buildPartition2Row(uint32_t rowNum) {
+arrow::Status VeloxHashBasedShuffleWriter::buildPartition2Row(uint32_t rowNum)
{
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingBuildPartition]);
// calc partition2RowOffsetBase_
@@ -427,7 +371,7 @@ arrow::Status
VeloxShuffleWriter::buildPartition2Row(uint32_t rowNum) {
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::updateInputHasNull(const
facebook::velox::RowVector& rv) {
+arrow::Status VeloxHashBasedShuffleWriter::updateInputHasNull(const
facebook::velox::RowVector& rv) {
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingHasNull]);
for (size_t col = 0; col < simpleColumnIndices_.size(); ++col) {
@@ -444,11 +388,11 @@ arrow::Status
VeloxShuffleWriter::updateInputHasNull(const facebook::velox::RowV
return arrow::Status::OK();
}
-void VeloxShuffleWriter::setSplitState(SplitState state) {
+void VeloxHashBasedShuffleWriter::setSplitState(SplitState state) {
splitState_ = state;
}
-arrow::Status VeloxShuffleWriter::doSplit(const facebook::velox::RowVector&
rv, int64_t memLimit) {
+arrow::Status VeloxHashBasedShuffleWriter::doSplit(const
facebook::velox::RowVector& rv, int64_t memLimit) {
auto rowNum = rv.size();
RETURN_NOT_OK(buildPartition2Row(rowNum));
RETURN_NOT_OK(updateInputHasNull(rv));
@@ -472,7 +416,7 @@ arrow::Status VeloxShuffleWriter::doSplit(const
facebook::velox::RowVector& rv,
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::splitRowVector(const
facebook::velox::RowVector& rv) {
+arrow::Status VeloxHashBasedShuffleWriter::splitRowVector(const
facebook::velox::RowVector& rv) {
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitRV]);
// now start to split the RowVector
@@ -489,7 +433,7 @@ arrow::Status VeloxShuffleWriter::splitRowVector(const
facebook::velox::RowVecto
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const
facebook::velox::RowVector& rv) {
+arrow::Status VeloxHashBasedShuffleWriter::splitFixedWidthValueBuffer(const
facebook::velox::RowVector& rv) {
for (auto col = 0; col < fixedWidthColumnCount_; ++col) {
auto colIdx = simpleColumnIndices_[col];
auto& column = rv.childAt(colIdx);
@@ -543,7 +487,9 @@ arrow::Status
VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::splitBoolType(const uint8_t* srcAddr, const
std::vector<uint8_t*>& dstAddrs) {
+arrow::Status VeloxHashBasedShuffleWriter::splitBoolType(
+ const uint8_t* srcAddr,
+ const std::vector<uint8_t*>& dstAddrs) {
// assume batch size = 32k; reducer# = 4K; row/reducer = 8
for (auto& pid : partitionUsed_) {
// set the last byte
@@ -632,7 +578,7 @@ arrow::Status VeloxShuffleWriter::splitBoolType(const
uint8_t* srcAddr, const st
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::splitValidityBuffer(const
facebook::velox::RowVector& rv) {
+arrow::Status VeloxHashBasedShuffleWriter::splitValidityBuffer(const
facebook::velox::RowVector& rv) {
for (size_t col = 0; col < simpleColumnIndices_.size(); ++col) {
auto colIdx = simpleColumnIndices_[col];
auto& column = rv.childAt(colIdx);
@@ -660,7 +606,7 @@ arrow::Status VeloxShuffleWriter::splitValidityBuffer(const
facebook::velox::Row
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::splitBinaryType(
+arrow::Status VeloxHashBasedShuffleWriter::splitBinaryType(
uint32_t binaryIdx,
const facebook::velox::FlatVector<facebook::velox::StringView>& src,
std::vector<BinaryBuf>& dst) {
@@ -723,7 +669,7 @@ arrow::Status VeloxShuffleWriter::splitBinaryType(
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::splitBinaryArray(const
facebook::velox::RowVector& rv) {
+arrow::Status VeloxHashBasedShuffleWriter::splitBinaryArray(const
facebook::velox::RowVector& rv) {
for (auto col = fixedWidthColumnCount_; col < simpleColumnIndices_.size();
++col) {
auto binaryIdx = col - fixedWidthColumnCount_;
auto& dstAddrs = partitionBinaryAddrs_[binaryIdx];
@@ -734,7 +680,7 @@ arrow::Status VeloxShuffleWriter::splitBinaryArray(const
facebook::velox::RowVec
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::splitComplexType(const
facebook::velox::RowVector& rv) {
+arrow::Status VeloxHashBasedShuffleWriter::splitComplexType(const
facebook::velox::RowVector& rv) {
if (complexColumnIndices_.size() == 0) {
return arrow::Status::OK();
}
@@ -773,7 +719,7 @@ arrow::Status VeloxShuffleWriter::splitComplexType(const
facebook::velox::RowVec
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::initColumnTypes(const
facebook::velox::RowVector& rv) {
+arrow::Status VeloxHashBasedShuffleWriter::initColumnTypes(const
facebook::velox::RowVector& rv) {
schema_ = toArrowSchema(rv.type(), veloxPool_.get());
for (size_t i = 0; i < rv.childrenSize(); ++i) {
veloxColumnTypes_.push_back(rv.childAt(i)->type());
@@ -837,7 +783,7 @@ arrow::Status VeloxShuffleWriter::initColumnTypes(const
facebook::velox::RowVect
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::initFromRowVector(const
facebook::velox::RowVector& rv) {
+arrow::Status VeloxHashBasedShuffleWriter::initFromRowVector(const
facebook::velox::RowVector& rv) {
if (veloxColumnTypes_.empty()) {
RETURN_NOT_OK(initColumnTypes(rv));
RETURN_NOT_OK(initPartitions());
@@ -846,13 +792,13 @@ arrow::Status VeloxShuffleWriter::initFromRowVector(const
facebook::velox::RowVe
return arrow::Status::OK();
}
-inline bool VeloxShuffleWriter::beyondThreshold(uint32_t partitionId, uint32_t
newSize) {
+inline bool VeloxHashBasedShuffleWriter::beyondThreshold(uint32_t partitionId,
uint32_t newSize) {
auto currentBufferSize = partitionBufferSize_[partitionId];
return newSize > (1 + options_.bufferReallocThreshold) * currentBufferSize ||
newSize < (1 - options_.bufferReallocThreshold) * currentBufferSize;
}
-void VeloxShuffleWriter::calculateSimpleColumnBytes() {
+void VeloxHashBasedShuffleWriter::calculateSimpleColumnBytes() {
fixedWidthBufferBytes_ = 0;
for (size_t col = 0; col < fixedWidthColumnCount_; ++col) {
auto colIdx = simpleColumnIndices_[col];
@@ -862,7 +808,9 @@ void VeloxShuffleWriter::calculateSimpleColumnBytes() {
fixedWidthBufferBytes_ += kSizeOfBinaryArrayLengthBuffer *
binaryColumnIndices_.size();
}
-uint32_t VeloxShuffleWriter::calculatePartitionBufferSize(const
facebook::velox::RowVector& rv, int64_t memLimit) {
+uint32_t VeloxHashBasedShuffleWriter::calculatePartitionBufferSize(
+ const facebook::velox::RowVector& rv,
+ int64_t memLimit) {
auto bytesPerRow = fixedWidthBufferBytes_;
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCalculateBufferSize]);
@@ -915,7 +863,7 @@ uint32_t
VeloxShuffleWriter::calculatePartitionBufferSize(const facebook::velox:
}
arrow::Result<std::shared_ptr<arrow::ResizableBuffer>>
-VeloxShuffleWriter::allocateValidityBuffer(uint32_t col, uint32_t partitionId,
uint32_t newSize) {
+VeloxHashBasedShuffleWriter::allocateValidityBuffer(uint32_t col, uint32_t
partitionId, uint32_t newSize) {
if (inputHasNull_[col]) {
ARROW_ASSIGN_OR_RAISE(
auto validityBuffer,
@@ -929,7 +877,7 @@ VeloxShuffleWriter::allocateValidityBuffer(uint32_t col,
uint32_t partitionId, u
return nullptr;
}
-arrow::Status VeloxShuffleWriter::updateValidityBuffers(uint32_t partitionId,
uint32_t newSize) {
+arrow::Status VeloxHashBasedShuffleWriter::updateValidityBuffers(uint32_t
partitionId, uint32_t newSize) {
for (auto i = 0; i < simpleColumnIndices_.size(); ++i) {
// If the validity buffer is not yet allocated, allocate and fill 0xff
based on inputHasNull_.
if (partitionValidityAddrs_[i][partitionId] == nullptr) {
@@ -940,7 +888,7 @@ arrow::Status
VeloxShuffleWriter::updateValidityBuffers(uint32_t partitionId, ui
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::allocatePartitionBuffer(uint32_t
partitionId, uint32_t newSize) {
+arrow::Status VeloxHashBasedShuffleWriter::allocatePartitionBuffer(uint32_t
partitionId, uint32_t newSize) {
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingAllocateBuffer]);
for (auto i = 0; i < simpleColumnIndices_.size(); ++i) {
@@ -987,7 +935,7 @@ arrow::Status
VeloxShuffleWriter::allocatePartitionBuffer(uint32_t partitionId,
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::evictBuffers(
+arrow::Status VeloxHashBasedShuffleWriter::evictBuffers(
uint32_t partitionId,
uint32_t numRows,
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
@@ -1000,7 +948,7 @@ arrow::Status VeloxShuffleWriter::evictBuffers(
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::evictPartitionBuffers(uint32_t partitionId,
bool reuseBuffers) {
+arrow::Status VeloxHashBasedShuffleWriter::evictPartitionBuffers(uint32_t
partitionId, bool reuseBuffers) {
auto numRows = partitionBufferBase_[partitionId];
if (numRows > 0) {
ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(partitionId,
reuseBuffers));
@@ -1009,7 +957,7 @@ arrow::Status
VeloxShuffleWriter::evictPartitionBuffers(uint32_t partitionId, bo
return arrow::Status::OK();
}
-arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>>
VeloxShuffleWriter::assembleBuffers(
+arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>>
VeloxHashBasedShuffleWriter::assembleBuffers(
uint32_t partitionId,
bool reuseBuffers) {
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCreateRbFromBuffer]);
@@ -1150,7 +1098,7 @@
arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> VeloxShuffleWriter::a
return allBuffers;
}
-arrow::Status VeloxShuffleWriter::reclaimFixedSize(int64_t size, int64_t*
actual) {
+arrow::Status VeloxHashBasedShuffleWriter::reclaimFixedSize(int64_t size,
int64_t* actual) {
if (evictState_ == EvictState::kUnevictable) {
*actual = 0;
return arrow::Status::OK();
@@ -1174,7 +1122,7 @@ arrow::Status
VeloxShuffleWriter::reclaimFixedSize(int64_t size, int64_t* actual
return arrow::Status::OK();
}
-arrow::Result<int64_t> VeloxShuffleWriter::evictCachedPayload(int64_t size) {
+arrow::Result<int64_t> VeloxHashBasedShuffleWriter::evictCachedPayload(int64_t
size) {
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingEvictPartition]);
int64_t actual;
auto before = partitionBufferPool_->bytes_allocated();
@@ -1188,7 +1136,7 @@ arrow::Result<int64_t>
VeloxShuffleWriter::evictCachedPayload(int64_t size) {
return actual;
}
-arrow::Status VeloxShuffleWriter::resetValidityBuffer(uint32_t partitionId) {
+arrow::Status VeloxHashBasedShuffleWriter::resetValidityBuffer(uint32_t
partitionId) {
std::for_each(partitionBuffers_.begin(), partitionBuffers_.end(),
[partitionId](auto& bufs) {
if (bufs[partitionId].size() != 0 &&
bufs[partitionId][kValidityBufferIndex] != nullptr) {
// initialize all true once allocated
@@ -1199,7 +1147,8 @@ arrow::Status
VeloxShuffleWriter::resetValidityBuffer(uint32_t partitionId) {
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::resizePartitionBuffer(uint32_t partitionId,
uint32_t newSize, bool preserveData) {
+arrow::Status
+VeloxHashBasedShuffleWriter::resizePartitionBuffer(uint32_t partitionId,
uint32_t newSize, bool preserveData) {
for (auto i = 0; i < simpleColumnIndices_.size(); ++i) {
auto columnType = schema_->field(simpleColumnIndices_[i])->type()->id();
auto& buffers = partitionBuffers_[i][partitionId];
@@ -1278,7 +1227,7 @@ arrow::Status
VeloxShuffleWriter::resizePartitionBuffer(uint32_t partitionId, ui
return arrow::Status::OK();
}
-arrow::Status VeloxShuffleWriter::shrinkPartitionBuffer(uint32_t partitionId) {
+arrow::Status VeloxHashBasedShuffleWriter::shrinkPartitionBuffer(uint32_t
partitionId) {
auto bufferSize = partitionBufferSize_[partitionId];
if (bufferSize == 0) {
return arrow::Status::OK();
@@ -1301,11 +1250,11 @@ arrow::Status
VeloxShuffleWriter::shrinkPartitionBuffer(uint32_t partitionId) {
return resizePartitionBuffer(partitionId, newSize, /*preserveData=*/true);
}
-uint64_t VeloxShuffleWriter::valueBufferSizeForBinaryArray(uint32_t binaryIdx,
uint32_t newSize) {
+uint64_t VeloxHashBasedShuffleWriter::valueBufferSizeForBinaryArray(uint32_t
binaryIdx, uint32_t newSize) {
return (binaryArrayTotalSizeBytes_[binaryIdx] + totalInputNumRows_ - 1) /
totalInputNumRows_ * newSize + 1024;
}
-uint64_t VeloxShuffleWriter::valueBufferSizeForFixedWidthArray(uint32_t
fixedWidthIndex, uint32_t newSize) {
+uint64_t
VeloxHashBasedShuffleWriter::valueBufferSizeForFixedWidthArray(uint32_t
fixedWidthIndex, uint32_t newSize) {
uint64_t valueBufferSize = 0;
auto columnIdx = simpleColumnIndices_[fixedWidthIndex];
if (arrowColumnTypes_[columnIdx]->id() == arrow::BooleanType::type_id) {
@@ -1320,7 +1269,7 @@ uint64_t
VeloxShuffleWriter::valueBufferSizeForFixedWidthArray(uint32_t fixedWid
return valueBufferSize;
}
-void VeloxShuffleWriter::stat() const {
+void VeloxHashBasedShuffleWriter::stat() const {
#if VELOX_SHUFFLE_WRITER_LOG_FLAG
for (int i = CpuWallTimingBegin; i != CpuWallTimingEnd; ++i) {
std::ostringstream oss;
@@ -1336,7 +1285,7 @@ void VeloxShuffleWriter::stat() const {
#endif
}
-arrow::Status VeloxShuffleWriter::resetPartitionBuffer(uint32_t partitionId) {
+arrow::Status VeloxHashBasedShuffleWriter::resetPartitionBuffer(uint32_t
partitionId) {
// Reset fixed-width partition buffers
for (auto i = 0; i < fixedWidthColumnCount_; ++i) {
partitionValidityAddrs_[i][partitionId] = nullptr;
@@ -1356,11 +1305,11 @@ arrow::Status
VeloxShuffleWriter::resetPartitionBuffer(uint32_t partitionId) {
return arrow::Status::OK();
}
-const uint64_t VeloxShuffleWriter::cachedPayloadSize() const {
+const uint64_t VeloxHashBasedShuffleWriter::cachedPayloadSize() const {
return partitionWriter_->cachedPayloadSize();
}
-arrow::Result<int64_t>
VeloxShuffleWriter::shrinkPartitionBuffersMinSize(int64_t size) {
+arrow::Result<int64_t>
VeloxHashBasedShuffleWriter::shrinkPartitionBuffersMinSize(int64_t size) {
// Sort partition buffers by (partitionBufferSize_ - partitionBufferBase_)
std::vector<std::pair<uint32_t, uint32_t>> pidToSize;
for (auto pid = 0; pid < numPartitions_; ++pid) {
@@ -1388,7 +1337,7 @@ arrow::Result<int64_t>
VeloxShuffleWriter::shrinkPartitionBuffersMinSize(int64_t
return shrunken;
}
-arrow::Result<int64_t>
VeloxShuffleWriter::evictPartitionBuffersMinSize(int64_t size) {
+arrow::Result<int64_t>
VeloxHashBasedShuffleWriter::evictPartitionBuffersMinSize(int64_t size) {
// Evict partition buffers, only when splitState_ == SplitState::kInit, and
space freed from
// shrinking is not enough. In this case partitionBufferSize_ ==
partitionBufferBase_
int64_t beforeEvict = partitionBufferPool_->bytes_allocated();
@@ -1415,7 +1364,7 @@ arrow::Result<int64_t>
VeloxShuffleWriter::evictPartitionBuffersMinSize(int64_t
return evicted;
}
-bool VeloxShuffleWriter::shrinkPartitionBuffersAfterSpill() const {
+bool VeloxHashBasedShuffleWriter::shrinkPartitionBuffersAfterSpill() const {
// If OOM happens during SplitState::kSplit, it is triggered by binary
buffers resize.
// Or during SplitState::kInit, it is triggered by other operators.
// The reclaim order is spill->shrink, because the partition buffers can be
reused.
@@ -1424,13 +1373,13 @@ bool
VeloxShuffleWriter::shrinkPartitionBuffersAfterSpill() const {
(splitState_ == SplitState::kSplit || splitState_ == SplitState::kInit);
}
-bool VeloxShuffleWriter::evictPartitionBuffersAfterSpill() const {
+bool VeloxHashBasedShuffleWriter::evictPartitionBuffersAfterSpill() const {
// If OOM triggered by other operators, the splitState_ is SplitState::kInit.
// The last resort is to evict the partition buffers to reclaim more space.
return options_.partitioning != Partitioning::kSingle && splitState_ ==
SplitState::kInit;
}
-arrow::Result<uint32_t>
VeloxShuffleWriter::partitionBufferSizeAfterShrink(uint32_t partitionId) const {
+arrow::Result<uint32_t>
VeloxHashBasedShuffleWriter::partitionBufferSizeAfterShrink(uint32_t
partitionId) const {
if (splitState_ == SplitState::kSplit) {
return partitionBufferBase_[partitionId] +
partition2RowCount_[partitionId];
}
@@ -1440,7 +1389,7 @@ arrow::Result<uint32_t>
VeloxShuffleWriter::partitionBufferSizeAfterShrink(uint3
return arrow::Status::Invalid("Cannot shrink partition buffers in
SplitState: " + std::to_string(splitState_));
}
-arrow::Status VeloxShuffleWriter::preAllocPartitionBuffers(uint32_t
preAllocBufferSize) {
+arrow::Status VeloxHashBasedShuffleWriter::preAllocPartitionBuffers(uint32_t
preAllocBufferSize) {
for (auto& pid : partitionUsed_) {
auto newSize = std::max(preAllocBufferSize, partition2RowCount_[pid]);
VLOG_IF(9, partitionBufferSize_[pid] != newSize)
@@ -1494,7 +1443,7 @@ arrow::Status
VeloxShuffleWriter::preAllocPartitionBuffers(uint32_t preAllocBuff
return arrow::Status::OK();
}
-bool VeloxShuffleWriter::isExtremelyLargeBatch(facebook::velox::RowVectorPtr&
rv) const {
+bool
VeloxHashBasedShuffleWriter::isExtremelyLargeBatch(facebook::velox::RowVectorPtr&
rv) const {
return (rv->size() > maxBatchSize_ && maxBatchSize_ > 0);
}
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h
b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
similarity index 83%
copy from cpp/velox/shuffle/VeloxShuffleWriter.h
copy to cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
index e699a323b..a11f84e95 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
@@ -36,10 +36,10 @@
#include <arrow/result.h>
#include <arrow/type.h>
+#include "VeloxShuffleWriter.h"
#include "memory/VeloxMemoryManager.h"
#include "shuffle/PartitionWriter.h"
#include "shuffle/Partitioner.h"
-#include "shuffle/ShuffleWriter.h"
#include "shuffle/Utils.h"
#include "utils/Print.h"
@@ -88,7 +88,6 @@ namespace gluten {
#endif // end of VELOX_SHUFFLE_WRITER_PRINT
enum SplitState { kInit, kPreAlloc, kSplit, kStop };
-enum EvictState { kEvictable, kUnevictable };
struct BinaryArrayResizeState {
bool inResize;
@@ -100,7 +99,7 @@ struct BinaryArrayResizeState {
: inResize(false), partitionId(partitionId), binaryIdx(binaryIdx) {}
};
-class VeloxShuffleWriter final : public ShuffleWriter {
+class VeloxHashBasedShuffleWriter : public VeloxShuffleWriter {
enum {
kValidityBufferIndex = 0,
kFixedWidthValueBufferIndex = 1,
@@ -130,7 +129,7 @@ class VeloxShuffleWriter final : public ShuffleWriter {
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
arrow::MemoryPool* arrowPool);
- arrow::Status split(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit)
override;
+ arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit)
override;
arrow::Status stop() override;
@@ -138,12 +137,10 @@ class VeloxShuffleWriter final : public ShuffleWriter {
const uint64_t cachedPayloadSize() const override;
- arrow::Status evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers);
-
- int64_t rawPartitionBytes() const;
+ arrow::Status evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers)
override;
// For test only.
- void setPartitionBufferSize(uint32_t newSize);
+ void setPartitionBufferSize(uint32_t newSize) override;
// for debugging
void printColumnsInfo() const {
@@ -192,22 +189,14 @@ class VeloxShuffleWriter final : public ShuffleWriter {
VS_PRINT_CONTAINER(input_has_null_);
}
- int32_t maxBatchSize() const {
- return maxBatchSize_;
- }
-
private:
- VeloxShuffleWriter(
+ VeloxHashBasedShuffleWriter(
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
arrow::MemoryPool* pool)
- : ShuffleWriter(numPartitions, std::move(partitionWriter),
std::move(options), pool),
- veloxPool_(std::move(veloxPool)) {
- arenas_.resize(numPartitions);
- serdeOptions_.useLosslessTimestamp = true;
- }
+ : VeloxShuffleWriter(numPartitions, std::move(partitionWriter),
std::move(options), std::move(veloxPool), pool) {}
arrow::Status init();
@@ -314,14 +303,8 @@ class VeloxShuffleWriter final : public ShuffleWriter {
arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv,
int64_t memLimit);
- SplitState splitState_{kInit};
-
- EvictState evictState_{kEvictable};
-
BinaryArrayResizeState binaryArrayResizeState_{};
- bool supportAvx512_ = false;
-
bool hasComplexType_ = false;
std::vector<bool> isValidityBuffer_;
@@ -415,66 +398,9 @@ class VeloxShuffleWriter final : public ShuffleWriter {
std::vector<std::shared_ptr<arrow::ResizableBuffer>> complexTypeFlushBuffer_;
std::shared_ptr<const facebook::velox::RowType> complexWriteType_;
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
- std::vector<std::unique_ptr<facebook::velox::StreamArena>> arenas_;
facebook::velox::serializer::presto::PrestoVectorSerde serde_;
- facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions
serdeOptions_;
-
- // stat
- enum CpuWallTimingType {
- CpuWallTimingBegin = 0,
- CpuWallTimingCompute = CpuWallTimingBegin,
- CpuWallTimingBuildPartition,
- CpuWallTimingEvictPartition,
- CpuWallTimingHasNull,
- CpuWallTimingCalculateBufferSize,
- CpuWallTimingAllocateBuffer,
- CpuWallTimingCreateRbFromBuffer,
- CpuWallTimingMakeRB,
- CpuWallTimingCacheRB,
- CpuWallTimingFlattenRV,
- CpuWallTimingSplitRV,
- CpuWallTimingIteratePartitions,
- CpuWallTimingStop,
- CpuWallTimingEnd,
- CpuWallTimingNum = CpuWallTimingEnd - CpuWallTimingBegin
- };
-
- static std::string CpuWallTimingName(CpuWallTimingType type) {
- switch (type) {
- case CpuWallTimingCompute:
- return "CpuWallTimingCompute";
- case CpuWallTimingBuildPartition:
- return "CpuWallTimingBuildPartition";
- case CpuWallTimingEvictPartition:
- return "CpuWallTimingEvictPartition";
- case CpuWallTimingHasNull:
- return "CpuWallTimingHasNull";
- case CpuWallTimingCalculateBufferSize:
- return "CpuWallTimingCalculateBufferSize";
- case CpuWallTimingAllocateBuffer:
- return "CpuWallTimingAllocateBuffer";
- case CpuWallTimingCreateRbFromBuffer:
- return "CpuWallTimingCreateRbFromBuffer";
- case CpuWallTimingMakeRB:
- return "CpuWallTimingMakeRB";
- case CpuWallTimingCacheRB:
- return "CpuWallTimingCacheRB";
- case CpuWallTimingFlattenRV:
- return "CpuWallTimingFlattenRV";
- case CpuWallTimingSplitRV:
- return "CpuWallTimingSplitRV";
- case CpuWallTimingIteratePartitions:
- return "CpuWallTimingIteratePartitions";
- case CpuWallTimingStop:
- return "CpuWallTimingStop";
- default:
- return "CpuWallTimingUnknown";
- }
- }
- facebook::velox::CpuWallTiming cpuWallTimingList_[CpuWallTimingNum];
- int32_t maxBatchSize_{0};
-}; // class VeloxShuffleWriter
+ SplitState splitState_{kInit};
+}; // class VeloxHashBasedShuffleWriter
} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index 5c4e01936..22298ef91 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -370,39 +370,104 @@ std::shared_ptr<ColumnarBatch>
VeloxColumnarBatchDeserializer::next() {
VeloxColumnarBatchDeserializerFactory::VeloxColumnarBatchDeserializerFactory(
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
+ const facebook::velox::common::CompressionKind veloxCompressionType,
const RowTypePtr& rowType,
int32_t batchSize,
arrow::MemoryPool* memoryPool,
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool)
+ std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+ ShuffleWriterType shuffleWriterType)
: schema_(schema),
codec_(codec),
+ veloxCompressionType_(veloxCompressionType),
rowType_(rowType),
batchSize_(batchSize),
memoryPool_(memoryPool),
- veloxPool_(veloxPool) {
+ veloxPool_(veloxPool),
+ shuffleWriterType_(shuffleWriterType) {
initFromSchema();
}
std::unique_ptr<ColumnarBatchIterator>
VeloxColumnarBatchDeserializerFactory::createDeserializer(
std::shared_ptr<arrow::io::InputStream> in) {
- return std::make_unique<VeloxColumnarBatchDeserializer>(
- std::move(in),
- schema_,
- codec_,
+ if (shuffleWriterType_ == kHashShuffle) {
+ return std::make_unique<VeloxColumnarBatchDeserializer>(
+ std::move(in),
+ schema_,
+ codec_,
+ rowType_,
+ batchSize_,
+ memoryPool_,
+ veloxPool_.get(),
+ &isValidityBuffer_,
+ hasComplexType_,
+ deserializeTime_,
+ decompressTime_);
+ }
+ return std::make_unique<VeloxShuffleReaderOutStreamWrapper>(
+ veloxPool_,
rowType_,
batchSize_,
- memoryPool_,
- veloxPool_.get(),
- &isValidityBuffer_,
- hasComplexType_,
- deserializeTime_,
- decompressTime_);
+ veloxCompressionType_,
+ [this](int64_t decompressionTime) { this->decompressTime_ +=
decompressionTime; },
+ [this](int64_t deserializeTime) { this->deserializeTime_ +=
deserializeTime; },
+ in);
+}
+
+VeloxShuffleReaderOutStreamWrapper::VeloxShuffleReaderOutStreamWrapper(
+ const std::shared_ptr<facebook::velox::memory::MemoryPool>& veloxPool,
+ const RowTypePtr& rowType,
+ int32_t batchSize,
+ facebook::velox::common::CompressionKind veloxCompressionType,
+ const std::function<void(int64_t)> decompressionTimeAccumulator,
+ const std::function<void(int64_t)> deserializeTimeAccumulator,
+ const std::shared_ptr<arrow::io::InputStream> in)
+ : veloxPool_(veloxPool),
+ rowType_(rowType),
+ batchSize_(batchSize),
+ veloxCompressionType_(veloxCompressionType),
+ decompressionTimeAccumulator_(decompressionTimeAccumulator),
+ deserializeTimeAccumulator_(deserializeTimeAccumulator) {
+ constexpr uint64_t kMaxReadBufferSize = (1 << 20) -
AlignedBuffer::kPaddedSize;
+ auto buffer = AlignedBuffer::allocate<char>(kMaxReadBufferSize,
veloxPool_.get());
+ in_ = std::make_unique<VeloxInputStream>(std::move(in), std::move(buffer));
+ serdeOptions_ = {false, veloxCompressionType_};
+ RowVectorPtr rowVector;
+}
+
+std::shared_ptr<ColumnarBatch> VeloxShuffleReaderOutStreamWrapper::next() {
+ if (!in_->hasNext()) {
+ return nullptr;
+ }
+
+ RowVectorPtr rowVector;
+ VectorStreamGroup::read(in_.get(), veloxPool_.get(), rowType_, &rowVector,
&serdeOptions_);
+
+ if (rowVector->size() >= batchSize_) {
+ return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
+ }
+
+ while (rowVector->size() < batchSize_ && in_->hasNext()) {
+ RowVectorPtr rowVectorTemp;
+ VectorStreamGroup::read(in_.get(), veloxPool_.get(), rowType_,
&rowVectorTemp, &serdeOptions_);
+ rowVector->append(rowVectorTemp.get());
+ }
+
+ int64_t decompressTime = 0LL;
+ int64_t deserializeTime = 0LL;
+
+ decompressionTimeAccumulator_(decompressTime);
+ deserializeTimeAccumulator_(deserializeTime);
+ return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
arrow::MemoryPool* VeloxColumnarBatchDeserializerFactory::getPool() {
return memoryPool_;
}
+ShuffleWriterType
VeloxColumnarBatchDeserializerFactory::getShuffleWriterType() {
+ return shuffleWriterType_;
+}
+
int64_t VeloxColumnarBatchDeserializerFactory::getDecompressTime() {
return decompressTime_;
}
@@ -440,4 +505,30 @@ void
VeloxColumnarBatchDeserializerFactory::initFromSchema() {
}
}
}
+
+VeloxInputStream::VeloxInputStream(std::shared_ptr<arrow::io::InputStream>
input, facebook::velox::BufferPtr buffer)
+ : in_(std::move(input)), buffer_(std::move(buffer)) {
+ next(true);
+}
+
+bool VeloxInputStream::hasNext() {
+ if (offset_ == 0) {
+ return false;
+ }
+ if (ranges()[0].position >= ranges()[0].size) {
+ next(true);
+ return offset_ != 0;
+ }
+ return true;
+}
+
+void VeloxInputStream::next(bool throwIfPastEnd) {
+ const uint32_t readBytes = buffer_->capacity();
+ offset_ = in_->Read(readBytes, buffer_->asMutable<char>()).ValueOr(0);
+ if (offset_ > 0) {
+ int32_t realBytes = offset_;
+ VELOX_CHECK_LT(0, realBytes, "Reading past end of spill file");
+ setRange({buffer_->asMutable<uint8_t>(), realBytes, 0});
+ }
+}
} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h
b/cpp/velox/shuffle/VeloxShuffleReader.h
index 18df38006..3a0d8f9ff 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -22,6 +22,8 @@
#include "velox/type/Type.h"
#include "velox/vector/ComplexVector.h"
+#include <velox/serializers/PrestoSerializer.h>
+
namespace gluten {
class VeloxColumnarBatchDeserializer final : public ColumnarBatchIterator {
@@ -59,15 +61,57 @@ class VeloxColumnarBatchDeserializer final : public
ColumnarBatchIterator {
bool reachEos_{false};
};
+class VeloxInputStream : public facebook::velox::ByteInputStream {
+ public:
+ VeloxInputStream(std::shared_ptr<arrow::io::InputStream> input,
facebook::velox::BufferPtr buffer);
+
+ bool hasNext();
+
+ void next(bool throwIfPastEnd) override;
+
+ std::shared_ptr<arrow::io::InputStream> in_;
+ const facebook::velox::BufferPtr buffer_;
+ uint64_t offset_ = -1;
+};
+
+class VeloxShuffleReaderOutStreamWrapper : public ColumnarBatchIterator {
+ public:
+ VeloxShuffleReaderOutStreamWrapper(
+ const std::shared_ptr<facebook::velox::memory::MemoryPool>& veloxPool,
+ const facebook::velox::RowTypePtr& rowType,
+ int32_t batchSize,
+ const facebook::velox::common::CompressionKind veloxCompressionType,
+ const std::function<void(int64_t)> decompressionTimeAccumulator,
+ const std::function<void(int64_t)> deserializeTimeAccumulator,
+ const std::shared_ptr<arrow::io::InputStream> in);
+
+ std::shared_ptr<ColumnarBatch> next();
+
+ private:
+ std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
+ facebook::velox::RowTypePtr rowType_;
+ std::vector<facebook::velox::RowVectorPtr> batches_;
+ bool reachEos_{false};
+ int32_t rowCount_;
+ int32_t batchSize_;
+ facebook::velox::common::CompressionKind veloxCompressionType_;
+ facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions
serdeOptions_;
+ std::function<void(int64_t)> decompressionTimeAccumulator_;
+ std::function<void(int64_t)> deserializeTimeAccumulator_;
+ std::shared_ptr<VeloxInputStream> in_;
+};
+
class VeloxColumnarBatchDeserializerFactory : public DeserializerFactory {
public:
VeloxColumnarBatchDeserializerFactory(
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
+ const facebook::velox::common::CompressionKind veloxCompressionType,
const facebook::velox::RowTypePtr& rowType,
int32_t batchSize,
arrow::MemoryPool* memoryPool,
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool);
+ std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+ ShuffleWriterType shuffleWriterType);
std::unique_ptr<ColumnarBatchIterator>
createDeserializer(std::shared_ptr<arrow::io::InputStream> in) override;
@@ -77,9 +121,12 @@ class VeloxColumnarBatchDeserializerFactory : public
DeserializerFactory {
int64_t getDeserializeTime() override;
+ ShuffleWriterType getShuffleWriterType() override;
+
private:
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::util::Codec> codec_;
+ facebook::velox::common::CompressionKind veloxCompressionType_;
facebook::velox::RowTypePtr rowType_;
int32_t batchSize_;
arrow::MemoryPool* memoryPool_;
@@ -88,6 +135,8 @@ class VeloxColumnarBatchDeserializerFactory : public
DeserializerFactory {
std::vector<bool> isValidityBuffer_;
bool hasComplexType_{false};
+ ShuffleWriterType shuffleWriterType_;
+
int64_t deserializeTime_{0};
int64_t decompressTime_{0};
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h
b/cpp/velox/shuffle/VeloxShuffleWriter.h
index e699a323b..104b87616 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.h
@@ -46,157 +46,60 @@
namespace gluten {
-// set 1 to open print
-#define VELOX_SHUFFLE_WRITER_PRINT 0
-
-#if VELOX_SHUFFLE_WRITER_PRINT
-
-#define VsPrint Print
-#define VsPrintLF PrintLF
-#define VsPrintSplit PrintSplit
-#define VsPrintSplitLF PrintSplitLF
-#define VsPrintVectorRange PrintVectorRange
-#define VS_PRINT PRINT
-#define VS_PRINTLF PRINTLF
-#define VS_PRINT_FUNCTION_NAME PRINT_FUNCTION_NAME
-#define VS_PRINT_FUNCTION_SPLIT_LINE PRINT_FUNCTION_SPLIT_LINE
-#define VS_PRINT_CONTAINER PRINT_CONTAINER
-#define VS_PRINT_CONTAINER_TO_STRING PRINT_CONTAINER_TO_STRING
-#define VS_PRINT_CONTAINER_2_STRING PRINT_CONTAINER_2_STRING
-#define VS_PRINT_VECTOR_TO_STRING PRINT_VECTOR_TO_STRING
-#define VS_PRINT_VECTOR_2_STRING PRINT_VECTOR_2_STRING
-#define VS_PRINT_VECTOR_MAPPING PRINT_VECTOR_MAPPING
-
-#else // VELOX_SHUFFLE_WRITER_PRINT
-
-#define VsPrint(...) // NOLINT
-#define VsPrintLF(...) // NOLINT
-#define VsPrintSplit(...) // NOLINT
-#define VsPrintSplitLF(...) // NOLINT
-#define VsPrintVectorRange(...) // NOLINT
-#define VS_PRINT(a)
-#define VS_PRINTLF(a)
-#define VS_PRINT_FUNCTION_NAME()
-#define VS_PRINT_FUNCTION_SPLIT_LINE()
-#define VS_PRINT_CONTAINER(c)
-#define VS_PRINT_CONTAINER_TO_STRING(c)
-#define VS_PRINT_CONTAINER_2_STRING(c)
-#define VS_PRINT_VECTOR_TO_STRING(v)
-#define VS_PRINT_VECTOR_2_STRING(v)
-#define VS_PRINT_VECTOR_MAPPING(v)
-
-#endif // end of VELOX_SHUFFLE_WRITER_PRINT
-
-enum SplitState { kInit, kPreAlloc, kSplit, kStop };
-enum EvictState { kEvictable, kUnevictable };
-
-struct BinaryArrayResizeState {
- bool inResize;
- uint32_t partitionId;
- uint32_t binaryIdx;
-
- BinaryArrayResizeState() : inResize(false) {}
- BinaryArrayResizeState(uint32_t partitionId, uint32_t binaryIdx)
- : inResize(false), partitionId(partitionId), binaryIdx(binaryIdx) {}
-};
-
-class VeloxShuffleWriter final : public ShuffleWriter {
- enum {
- kValidityBufferIndex = 0,
- kFixedWidthValueBufferIndex = 1,
- kBinaryValueBufferIndex = 2,
- kBinaryLengthBufferIndex = kFixedWidthValueBufferIndex
- };
-
+class VeloxShuffleWriter : public ShuffleWriter {
public:
- struct BinaryBuf {
- BinaryBuf(uint8_t* value, uint8_t* length, uint64_t valueCapacityIn,
uint64_t valueOffsetIn)
- : valuePtr(value), lengthPtr(length), valueCapacity(valueCapacityIn),
valueOffset(valueOffsetIn) {}
-
- BinaryBuf(uint8_t* value, uint8_t* length, uint64_t valueCapacity) :
BinaryBuf(value, length, valueCapacity, 0) {}
-
- BinaryBuf() : BinaryBuf(nullptr, nullptr, 0) {}
-
- uint8_t* valuePtr;
- uint8_t* lengthPtr;
- uint64_t valueCapacity;
- uint64_t valueOffset;
- };
-
- static arrow::Result<std::shared_ptr<VeloxShuffleWriter>> create(
- uint32_t numPartitions,
- std::unique_ptr<PartitionWriter> partitionWriter,
- ShuffleWriterOptions options,
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
- arrow::MemoryPool* arrowPool);
-
- arrow::Status split(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit)
override;
-
- arrow::Status stop() override;
-
- arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override;
-
- const uint64_t cachedPayloadSize() const override;
-
- arrow::Status evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers);
-
- int64_t rawPartitionBytes() const;
-
- // For test only.
- void setPartitionBufferSize(uint32_t newSize);
+ facebook::velox::RowVectorPtr getStrippedRowVector(const
facebook::velox::RowVector& rv) {
+ // get new row type
+ auto rowType = rv.type()->asRow();
+ auto typeChildren = rowType.children();
+ typeChildren.erase(typeChildren.begin());
+ auto newRowType = facebook::velox::ROW(std::move(typeChildren));
+
+ // get length
+ auto length = rv.size();
+
+ // get children
+ auto children = rv.children();
+ children.erase(children.begin());
+
+ return std::make_shared<facebook::velox::RowVector>(
+ rv.pool(), newRowType, facebook::velox::BufferPtr(nullptr), length,
std::move(children));
+ }
- // for debugging
- void printColumnsInfo() const {
- VS_PRINT_FUNCTION_SPLIT_LINE();
- VS_PRINTLF(fixed_width_column_count_);
+ const int32_t* getFirstColumn(const facebook::velox::RowVector& rv) {
+ VELOX_CHECK(rv.childrenSize() > 0, "RowVector missing partition id
column.");
- VS_PRINT_CONTAINER(simple_column_indices_);
- VS_PRINT_CONTAINER(binary_column_indices_);
- VS_PRINT_CONTAINER(complex_column_indices_);
+ auto& firstChild = rv.childAt(0);
+ VELOX_CHECK(firstChild->isFlatEncoding(), "Partition id (field 0) is not
flat encoding.");
+ VELOX_CHECK(
+ firstChild->type()->isInteger(),
+ "Partition id (field 0) should be integer, but got {}",
+ firstChild->type()->toString());
- VS_PRINT_VECTOR_2_STRING(velox_column_types_);
- VS_PRINT_VECTOR_TO_STRING(arrow_column_types_);
+ // first column is partition key hash value or pid
+ return firstChild->asFlatVector<int32_t>()->rawValues();
}
- void printPartition() const {
- VS_PRINT_FUNCTION_SPLIT_LINE();
- // row ID -> partition ID
- VS_PRINT_VECTOR_MAPPING(row_2_partition_);
-
- // partition -> row count
- VS_PRINT_VECTOR_MAPPING(partition_2_row_count_);
- }
+ // For test only.
+ virtual void setPartitionBufferSize(uint32_t newSize) {}
- void printPartitionBuffer() const {
- VS_PRINT_FUNCTION_SPLIT_LINE();
- VS_PRINT_VECTOR_MAPPING(partition_2_buffer_size_);
- VS_PRINT_VECTOR_MAPPING(partitionBufferBase_);
+ virtual arrow::Status evictPartitionBuffers(uint32_t partitionId, bool
reuseBuffers) {
+ return arrow::Status::OK();
}
- void printPartition2Row() const {
- VS_PRINT_FUNCTION_SPLIT_LINE();
- VS_PRINT_VECTOR_MAPPING(partition2RowOffsetBase_);
-
-#if VELOX_SHUFFLE_WRITER_PRINT
- for (auto pid = 0; pid < numPartitions_; ++pid) {
- auto begin = partition2RowOffsetBase_[pid];
- auto end = partition2RowOffsetBase_[pid + 1];
- VsPrint("partition", pid);
- VsPrintVectorRange(rowOffset2RowId_, begin, end);
- }
-#endif
+ virtual arrow::Status evictRowVector(uint32_t partitionId) {
+ return arrow::Status::OK();
}
- void printInputHasNull() const {
- VS_PRINT_FUNCTION_SPLIT_LINE();
- VS_PRINT_CONTAINER(input_has_null_);
+ virtual const uint64_t cachedPayloadSize() const {
+ return 0;
}
int32_t maxBatchSize() const {
return maxBatchSize_;
}
- private:
+ protected:
VeloxShuffleWriter(
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
@@ -209,216 +112,19 @@ class VeloxShuffleWriter final : public ShuffleWriter {
serdeOptions_.useLosslessTimestamp = true;
}
- arrow::Status init();
-
- arrow::Status initPartitions();
-
- arrow::Status initColumnTypes(const facebook::velox::RowVector& rv);
-
- arrow::Status splitRowVector(const facebook::velox::RowVector& rv);
-
- arrow::Status initFromRowVector(const facebook::velox::RowVector& rv);
-
- arrow::Status buildPartition2Row(uint32_t rowNum);
-
- arrow::Status updateInputHasNull(const facebook::velox::RowVector& rv);
-
- void setSplitState(SplitState state);
-
- arrow::Status doSplit(const facebook::velox::RowVector& rv, int64_t
memLimit);
-
- bool beyondThreshold(uint32_t partitionId, uint32_t newSize);
-
- uint32_t calculatePartitionBufferSize(const facebook::velox::RowVector& rv,
int64_t memLimit);
-
- arrow::Status preAllocPartitionBuffers(uint32_t preAllocBufferSize);
-
- arrow::Status updateValidityBuffers(uint32_t partitionId, uint32_t newSize);
-
- arrow::Result<std::shared_ptr<arrow::ResizableBuffer>>
- allocateValidityBuffer(uint32_t col, uint32_t partitionId, uint32_t newSize);
-
- arrow::Status allocatePartitionBuffer(uint32_t partitionId, uint32_t
newSize);
-
- arrow::Status splitFixedWidthValueBuffer(const facebook::velox::RowVector&
rv);
-
- arrow::Status splitBoolType(const uint8_t* srcAddr, const
std::vector<uint8_t*>& dstAddrs);
-
- arrow::Status splitValidityBuffer(const facebook::velox::RowVector& rv);
+ virtual ~VeloxShuffleWriter() = default;
- arrow::Status splitBinaryArray(const facebook::velox::RowVector& rv);
-
- arrow::Status splitComplexType(const facebook::velox::RowVector& rv);
-
- arrow::Status evictBuffers(
- uint32_t partitionId,
- uint32_t numRows,
- std::vector<std::shared_ptr<arrow::Buffer>> buffers,
- bool reuseBuffers);
-
- arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>>
assembleBuffers(uint32_t partitionId, bool reuseBuffers);
-
- template <typename T>
- arrow::Status splitFixedType(const uint8_t* srcAddr, const
std::vector<uint8_t*>& dstAddrs) {
- for (auto& pid : partitionUsed_) {
- auto dstPidBase = (T*)(dstAddrs[pid] + partitionBufferBase_[pid] *
sizeof(T));
- auto pos = partition2RowOffsetBase_[pid];
- auto end = partition2RowOffsetBase_[pid + 1];
- for (; pos < end; ++pos) {
- auto rowId = rowOffset2RowId_[pos];
- *dstPidBase++ = reinterpret_cast<const T*>(srcAddr)[rowId]; // copy
- }
- }
- return arrow::Status::OK();
- }
-
- arrow::Status splitBinaryType(
- uint32_t binaryIdx,
- const facebook::velox::FlatVector<facebook::velox::StringView>& src,
- std::vector<BinaryBuf>& dst);
-
- arrow::Result<int64_t> evictCachedPayload(int64_t size);
-
- arrow::Result<std::shared_ptr<arrow::Buffer>>
generateComplexTypeBuffers(facebook::velox::RowVectorPtr vector);
-
- arrow::Status resetValidityBuffer(uint32_t partitionId);
-
- arrow::Result<int64_t> shrinkPartitionBuffersMinSize(int64_t size);
-
- arrow::Result<int64_t> evictPartitionBuffersMinSize(int64_t size);
-
- arrow::Status shrinkPartitionBuffer(uint32_t partitionId);
-
- arrow::Status resetPartitionBuffer(uint32_t partitionId);
-
- // Resize the partition buffer to newSize. If preserveData is true, it will
keep the data in buffer.
- // Note when preserveData is false, and newSize is larger, this function can
introduce unnecessary memory copy.
- // In this case, use allocatePartitionBuffer to free current buffers and
allocate new buffers instead.
- arrow::Status resizePartitionBuffer(uint32_t partitionId, uint32_t newSize,
bool preserveData);
-
- uint64_t valueBufferSizeForBinaryArray(uint32_t binaryIdx, uint32_t newSize);
-
- uint64_t valueBufferSizeForFixedWidthArray(uint32_t fixedWidthIndex,
uint32_t newSize);
-
- void calculateSimpleColumnBytes();
-
- void stat() const;
-
- bool shrinkPartitionBuffersAfterSpill() const;
-
- bool evictPartitionBuffersAfterSpill() const;
-
- arrow::Result<uint32_t> partitionBufferSizeAfterShrink(uint32_t partitionId)
const;
-
- bool isExtremelyLargeBatch(facebook::velox::RowVectorPtr& rv) const;
-
- arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv,
int64_t memLimit);
-
- SplitState splitState_{kInit};
+ std::vector<std::unique_ptr<facebook::velox::StreamArena>> arenas_;
- EvictState evictState_{kEvictable};
+ facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions
serdeOptions_;
- BinaryArrayResizeState binaryArrayResizeState_{};
+ std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
bool supportAvx512_ = false;
- bool hasComplexType_ = false;
- std::vector<bool> isValidityBuffer_;
-
- // Store arrow column types. Calculated once.
- std::vector<std::shared_ptr<arrow::DataType>> arrowColumnTypes_;
-
- // Store velox column types. Calculated once.
- std::vector<std::shared_ptr<const facebook::velox::Type>> veloxColumnTypes_;
-
- // How many fixed-width columns in the schema. Calculated once.
- uint32_t fixedWidthColumnCount_ = 0;
-
- // The column indices of all binary types in the schema.
- std::vector<uint32_t> binaryColumnIndices_;
-
- // The column indices of all fixed-width and binary columns in the schema.
- std::vector<uint32_t> simpleColumnIndices_;
-
- // The column indices of all complex types in the schema, including Struct,
Map, List columns.
- std::vector<uint32_t> complexColumnIndices_;
-
- // Total bytes of fixed-width buffers of all simple columns. Including
validity buffers, value buffers of
- // fixed-width types and length buffers of binary types.
- // Used for estimating pre-allocated partition buffer size. Calculated once.
- uint32_t fixedWidthBufferBytes_ = 0;
-
- // Used for calculating the average binary length.
- // Updated for each input RowVector.
- uint64_t totalInputNumRows_ = 0;
- std::vector<uint64_t> binaryArrayTotalSizeBytes_;
-
- // True if input column has null in any processed input RowVector.
- // In the order of fixed-width columns + binary columns.
- std::vector<bool> inputHasNull_;
-
- // Records which partitions are actually occurred in the current input
RowVector.
- // Most of the loops can loop on this array to avoid visiting unused
partition id.
- std::vector<uint32_t> partitionUsed_;
-
- // Row ID -> Partition ID
- // subscript: The index of row in the current input RowVector
- // value: Partition ID
- // Updated for each input RowVector.
- std::vector<uint32_t> row2Partition_;
-
- // Partition ID -> Row Count
- // subscript: Partition ID
- // value: How many rows does this partition have in the current input
RowVector
- // Updated for each input RowVector.
- std::vector<uint32_t> partition2RowCount_;
-
- // Note: partition2RowOffsetBase_ and rowOffset2RowId_ are the optimization
of flattening the 2-dimensional vector
- // into single dimension.
- // The first dimension is the partition id. The second dimension is the ith
occurrence of this partition in the
- // input RowVector. The value is the index of the row in the input RowVector.
- // partition2RowOffsetBase_ records the offset of the first dimension.
- //
- // The index of the ith occurrence of a give partition `pid` in the input
RowVector can be calculated via
- // rowOffset2RowId_[partition2RowOffsetBase_[pid] + i]
- // i is in the range of [0, partition2RowCount_[pid])
-
- // Partition ID -> Row offset, elements num: Partition num + 1
- // subscript: Partition ID
- // value: The base row offset of this Partition
- // Updated for each input RowVector.
- std::vector<uint32_t> partition2RowOffsetBase_;
-
- // Row offset -> Source row ID, elements num: input RowVector row num
- // subscript: Row offset
- // value: The index of row in the current input RowVector
- // Updated for each input RowVector.
- std::vector<uint32_t> rowOffset2RowId_;
-
- // Partition buffers are used for holding the intermediate data during split.
- // Partition ID -> Partition buffer size(unit is row)
- std::vector<uint32_t> partitionBufferSize_;
-
- // The write position of partition buffer. Updated after split. Reset when
partition buffers are reallocated.
- std::vector<uint32_t> partitionBufferBase_;
-
- // Used by all simple types. Stores raw pointers of partition buffers.
- std::vector<std::vector<uint8_t*>> partitionValidityAddrs_;
- // Used by fixed-width types. Stores raw pointers of partition buffers.
- std::vector<std::vector<uint8_t*>> partitionFixedWidthValueAddrs_;
- // Used by binary types. Stores raw pointers and metadata of partition
buffers.
- std::vector<std::vector<BinaryBuf>> partitionBinaryAddrs_;
-
- // Used by complex types.
- // Partition id -> Serialized complex data.
- std::vector<std::unique_ptr<facebook::velox::IterativeVectorSerializer>>
complexTypeData_;
- std::vector<std::shared_ptr<arrow::ResizableBuffer>> complexTypeFlushBuffer_;
- std::shared_ptr<const facebook::velox::RowType> complexWriteType_;
+ int32_t maxBatchSize_{0};
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
- std::vector<std::unique_ptr<facebook::velox::StreamArena>> arenas_;
- facebook::velox::serializer::presto::PrestoVectorSerde serde_;
- facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions
serdeOptions_;
+ enum EvictState { kEvictable, kUnevictable };
// stat
enum CpuWallTimingType {
@@ -474,7 +180,28 @@ class VeloxShuffleWriter final : public ShuffleWriter {
}
facebook::velox::CpuWallTiming cpuWallTimingList_[CpuWallTimingNum];
- int32_t maxBatchSize_{0};
-}; // class VeloxShuffleWriter
+
+ EvictState evictState_{kEvictable};
+
+ class EvictGuard {
+ public:
+ explicit EvictGuard(EvictState& evictState) : evictState_(evictState) {
+ evictState_ = EvictState::kUnevictable;
+ }
+
+ ~EvictGuard() {
+ evictState_ = EvictState::kEvictable;
+ }
+
+ // For safety and clarity.
+ EvictGuard(const EvictGuard&) = delete;
+ EvictGuard& operator=(const EvictGuard&) = delete;
+ EvictGuard(EvictGuard&&) = delete;
+ EvictGuard& operator=(EvictGuard&&) = delete;
+
+ private:
+ EvictState& evictState_;
+ };
+};
} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
new file mode 100644
index 000000000..b0c2cc8ad
--- /dev/null
+++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "VeloxSortBasedShuffleWriter.h"
+#include "memory/ArrowMemory.h"
+#include "memory/VeloxColumnarBatch.h"
+#include "memory/VeloxMemoryManager.h"
+#include "shuffle/ShuffleSchema.h"
+#include "utils/Common.h"
+#include "utils/VeloxArrowUtils.h"
+#include "utils/macros.h"
+#include "velox/common/base/Nulls.h"
+#include "velox/type/Type.h"
+#include "velox/vector/ComplexVector.h"
+
+#if defined(__x86_64__)
+#include <immintrin.h>
+#include <x86intrin.h>
+#elif defined(__aarch64__)
+#include <arm_neon.h>
+#endif
+
+namespace gluten {
+
+#define VELOX_SHUFFLE_WRITER_LOG_FLAG 0
+
+// macro to rotate left an 8-bit value 'x' given the shift 's' is a 32-bit
integer
+// (x is left shifted by 's' modulo 8) OR (x right shifted by (8 - 's' modulo
8))
+#if !defined(__x86_64__)
+#define rotateLeft(x, s) (x << (s - ((s >> 3) << 3)) | x >> (8 - (s - ((s >>
3) << 3))))
+#endif
+
+// on x86 machines, _MM_HINT_T0,T1,T2 are defined as 1, 2, 3
+// equivalent mapping to __builtin_prefetch hints is 3, 2, 1
+#if defined(__x86_64__)
+#define PREFETCHT0(ptr) _mm_prefetch(ptr, _MM_HINT_T0)
+#define PREFETCHT1(ptr) _mm_prefetch(ptr, _MM_HINT_T1)
+#define PREFETCHT2(ptr) _mm_prefetch(ptr, _MM_HINT_T2)
+#else
+#define PREFETCHT0(ptr) __builtin_prefetch(ptr, 0, 3)
+#define PREFETCHT1(ptr) __builtin_prefetch(ptr, 0, 2)
+#define PREFETCHT2(ptr) __builtin_prefetch(ptr, 0, 1)
+#endif
+
+arrow::Result<std::shared_ptr<VeloxShuffleWriter>>
VeloxSortBasedShuffleWriter::create(
+ uint32_t numPartitions,
+ std::unique_ptr<PartitionWriter> partitionWriter,
+ ShuffleWriterOptions options,
+ std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+ arrow::MemoryPool* arrowPool) {
+ std::shared_ptr<VeloxSortBasedShuffleWriter> res(new
VeloxSortBasedShuffleWriter(
+ numPartitions, std::move(partitionWriter), std::move(options),
veloxPool, arrowPool));
+ RETURN_NOT_OK(res->init());
+ return res;
+} // namespace gluten
+
+arrow::Status VeloxSortBasedShuffleWriter::init() {
+#if defined(__x86_64__)
+ supportAvx512_ = __builtin_cpu_supports("avx512bw");
+#else
+ supportAvx512_ = false;
+#endif
+
+ ARROW_ASSIGN_OR_RAISE(
+ partitioner_, Partitioner::make(options_.partitioning, numPartitions_,
options_.startPartitionId));
+ DLOG(INFO) << "Create partitioning type: " <<
std::to_string(options_.partitioning);
+
+ partition2RowCount_.resize(numPartitions_);
+ rowVectorIndexMap_.reserve(numPartitions_);
+ for (auto pid = 0; pid < numPartitions_; ++pid) {
+ rowVectorIndexMap_[pid].reserve(options_.bufferSize);
+ }
+
+ return arrow::Status::OK();
+}
+
+arrow::Status
VeloxSortBasedShuffleWriter::doSort(facebook::velox::RowVectorPtr rv, int64_t
memLimit) {
+ currentInputColumnBytes_ += rv->estimateFlatSize();
+ batches_.push_back(rv);
+ if (currentInputColumnBytes_ > memLimit) {
+ for (auto pid = 0; pid < numPartitions(); ++pid) {
+ RETURN_NOT_OK(evictRowVector(pid));
+ partition2RowCount_[pid] = 0;
+ }
+ batches_.clear();
+ currentInputColumnBytes_ = 0;
+ }
+ setSortState(SortState::kSortInit);
+ return arrow::Status::OK();
+}
+
+arrow::Status
VeloxSortBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, int64_t
memLimit) {
+ if (options_.partitioning == Partitioning::kSingle) {
+ auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
+ VELOX_CHECK_NOT_NULL(veloxColumnBatch);
+ auto rv = veloxColumnBatch->getFlattenedRowVector();
+ RETURN_NOT_OK(initFromRowVector(*rv.get()));
+ RETURN_NOT_OK(doSort(rv,
partitionWriter_.get()->options().sortBufferMaxSize));
+ } else if (options_.partitioning == Partitioning::kRange) {
+ auto compositeBatch =
std::dynamic_pointer_cast<CompositeColumnarBatch>(cb);
+ VELOX_CHECK_NOT_NULL(compositeBatch);
+ auto batches = compositeBatch->getBatches();
+ VELOX_CHECK_EQ(batches.size(), 2);
+ auto pidBatch = VeloxColumnarBatch::from(veloxPool_.get(), batches[0]);
+ auto pidArr = getFirstColumn(*(pidBatch->getRowVector()));
+ START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]);
+ setSortState(SortState::kSort);
+ RETURN_NOT_OK(partitioner_->compute(pidArr, pidBatch->numRows(),
batches_.size(), rowVectorIndexMap_));
+ END_TIMING();
+ auto rvBatch = VeloxColumnarBatch::from(veloxPool_.get(), batches[1]);
+ auto rv = rvBatch->getFlattenedRowVector();
+ RETURN_NOT_OK(initFromRowVector(*rv.get()));
+ RETURN_NOT_OK(doSort(rv,
partitionWriter_.get()->options().sortBufferMaxSize));
+ } else {
+ auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
+ VELOX_CHECK_NOT_NULL(veloxColumnBatch);
+ facebook::velox::RowVectorPtr rv;
+ START_TIMING(cpuWallTimingList_[CpuWallTimingFlattenRV]);
+ rv = veloxColumnBatch->getFlattenedRowVector();
+ END_TIMING();
+ if (partitioner_->hasPid()) {
+ auto pidArr = getFirstColumn(*rv);
+ START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]);
+ setSortState(SortState::kSort);
+ RETURN_NOT_OK(partitioner_->compute(pidArr, rv->size(), batches_.size(),
rowVectorIndexMap_));
+ END_TIMING();
+ auto strippedRv = getStrippedRowVector(*rv);
+ RETURN_NOT_OK(initFromRowVector(*strippedRv));
+ RETURN_NOT_OK(doSort(strippedRv,
partitionWriter_.get()->options().sortBufferMaxSize));
+ } else {
+ RETURN_NOT_OK(initFromRowVector(*rv));
+ START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]);
+ setSortState(SortState::kSort);
+ RETURN_NOT_OK(partitioner_->compute(nullptr, rv->size(),
batches_.size(), rowVectorIndexMap_));
+ END_TIMING();
+ RETURN_NOT_OK(doSort(rv,
partitionWriter_.get()->options().sortBufferMaxSize));
+ }
+ }
+ return arrow::Status::OK();
+}
+
+arrow::Status VeloxSortBasedShuffleWriter::evictBatch(
+ uint32_t partitionId,
+ std::ostringstream* output,
+ facebook::velox::OStreamOutputStream* out,
+ facebook::velox::RowTypePtr* rowTypePtr) {
+ int64_t rawSize = batch_->size();
+ batch_->flush(out);
+ const std::string& outputStr = output->str();
+ RETURN_NOT_OK(partitionWriter_->evict(partitionId, rawSize,
outputStr.c_str(), outputStr.size()));
+ batch_.reset();
+ output->clear();
+ output->str("");
+ batch_ =
std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(),
serde_.get());
+ batch_->createStreamTree(*rowTypePtr, options_.bufferSize, &serdeOptions_);
+ return arrow::Status::OK();
+}
+
+arrow::Status VeloxSortBasedShuffleWriter::evictRowVector(uint32_t
partitionId) {
+ int32_t rowNum = 0;
+ const int32_t maxBatchNum = options_.bufferSize;
+ auto rowTypePtr = std::static_pointer_cast<const
facebook::velox::RowType>(rowType_.value());
+ std::ostringstream output;
+ facebook::velox::OStreamOutputStream out(&output);
+
+ if (options_.partitioning != Partitioning::kSingle) {
+ if (auto it = rowVectorIndexMap_.find(partitionId); it !=
rowVectorIndexMap_.end()) {
+ auto rowVectorIndex = it->second;
+ const int32_t outputSize = rowVectorIndex.size();
+
+ std::map<int32_t, std::vector<facebook::velox::IndexRange>>
groupedIndices;
+ std::map<int32_t, int64_t> groupedSize;
+
+ int32_t tempVectorIndex = -1;
+ int32_t baseRowIndex = -1;
+ int32_t tempRowIndex = -1;
+ int32_t size = 1;
+ for (int start = 0; start < outputSize; start++) {
+ const int64_t rowVector = rowVectorIndex[start];
+ const int32_t vectorIndex = static_cast<int32_t>(rowVector >> 32);
+ const int32_t rowIndex = static_cast<int32_t>(rowVector &
0xFFFFFFFFLL);
+ if (tempVectorIndex == -1) {
+ tempVectorIndex = vectorIndex;
+ baseRowIndex = rowIndex;
+ tempRowIndex = rowIndex;
+ } else {
+ if (vectorIndex == tempVectorIndex && rowIndex == tempRowIndex + 1) {
+ size += 1;
+ tempRowIndex = rowIndex;
+ } else {
+ groupedIndices[tempVectorIndex].push_back({baseRowIndex, size});
+ groupedSize[tempVectorIndex] += size;
+ size = 1;
+ tempVectorIndex = vectorIndex;
+ baseRowIndex = rowIndex;
+ tempRowIndex = rowIndex;
+ }
+ }
+ }
+ groupedIndices[tempVectorIndex].push_back({baseRowIndex, size});
+ groupedSize[tempVectorIndex] += size;
+
+ for (auto& pair : groupedIndices) {
+ batch_->append(batches_[pair.first], pair.second);
+ rowNum += groupedSize[pair.first];
+ if (rowNum >= maxBatchNum) {
+ rowNum = 0;
+ RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr));
+ }
+ }
+
+ rowVectorIndex.clear();
+ rowVectorIndexMap_.erase(partitionId);
+ }
+ } else {
+ for (facebook::velox::RowVectorPtr rowVectorPtr : batches_) {
+ rowNum += rowVectorPtr->size();
+ batch_->append(rowVectorPtr);
+ if (rowNum >= maxBatchNum) {
+ RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr));
+ rowNum = 0;
+ }
+ }
+ }
+ if (rowNum > 0) {
+ RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr));
+ }
+ return arrow::Status::OK();
+}
+
+arrow::Status VeloxSortBasedShuffleWriter::stop() {
+ for (auto pid = 0; pid < numPartitions(); ++pid) {
+ RETURN_NOT_OK(evictRowVector(pid));
+ partition2RowCount_[pid] = 0;
+ }
+ batches_.clear();
+ currentInputColumnBytes_ = 0;
+ {
+ SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingStop]);
+ setSortState(SortState::kSortStop);
+ RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
+ partitionBuffers_.clear();
+ }
+
+ stat();
+
+ return arrow::Status::OK();
+}
+
+arrow::Status VeloxSortBasedShuffleWriter::initFromRowVector(const
facebook::velox::RowVector& rv) {
+ if (!rowType_.has_value()) {
+ rowType_ = rv.type();
+ serdeOptions_ = {
+ false,
facebook::velox::common::stringToCompressionKind(partitionWriter_->options().compressionTypeStr)};
+ batch_ =
std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(),
serde_.get());
+ batch_->createStreamTree(
+ std::static_pointer_cast<const
facebook::velox::RowType>(rowType_.value()),
+ options_.bufferSize,
+ &serdeOptions_);
+ }
+ return arrow::Status::OK();
+}
+
+arrow::Status VeloxSortBasedShuffleWriter::reclaimFixedSize(int64_t size,
int64_t* actual) {
+ if (evictState_ == EvictState::kUnevictable) {
+ *actual = 0;
+ return arrow::Status::OK();
+ }
+ EvictGuard evictGuard{evictState_};
+
+ if (sortState_ == SortState::kSortInit) {
+ for (auto pid = 0; pid < numPartitions(); ++pid) {
+ RETURN_NOT_OK(evictRowVector(pid));
+ partition2RowCount_[pid] = 0;
+ }
+ batches_.clear();
+ *actual = currentInputColumnBytes_;
+ currentInputColumnBytes_ = 0;
+ }
+ return arrow::Status::OK();
+}
+
+void VeloxSortBasedShuffleWriter::stat() const {
+#if VELOX_SHUFFLE_WRITER_LOG_FLAG
+ for (int i = CpuWallTimingBegin; i != CpuWallTimingEnd; ++i) {
+ std::ostringstream oss;
+ auto& timing = cpuWallTimingList_[i];
+ oss << "Velox shuffle writer stat:" <<
CpuWallTimingName((CpuWallTimingType)i);
+ oss << " " << timing.toString();
+ if (timing.count > 0) {
+ oss << " wallNanos-avg:" << timing.wallNanos / timing.count;
+ oss << " cpuNanos-avg:" << timing.cpuNanos / timing.count;
+ }
+ LOG(INFO) << oss.str();
+ }
+#endif
+}
+
+void VeloxSortBasedShuffleWriter::setSortState(SortState state) {
+ sortState_ = state;
+}
+
+} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
new file mode 100644
index 000000000..e3ac07dfc
--- /dev/null
+++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "velox/common/time/CpuWallTimer.h"
+#include "velox/serializers/PrestoSerializer.h"
+#include "velox/type/Type.h"
+#include "velox/vector/ComplexVector.h"
+#include "velox/vector/FlatVector.h"
+#include "velox/vector/VectorStream.h"
+
+#include <arrow/array/util.h>
+#include <arrow/ipc/writer.h>
+#include <arrow/memory_pool.h>
+#include <arrow/record_batch.h>
+#include <arrow/result.h>
+#include <arrow/type.h>
+
+#include "VeloxShuffleWriter.h"
+#include "memory/VeloxMemoryManager.h"
+#include "shuffle/PartitionWriter.h"
+#include "shuffle/Partitioner.h"
+#include "shuffle/Utils.h"
+
+#include "utils/Print.h"
+
+namespace gluten {
+
+enum SortState { kSortInit, kSort, kSortStop };
+
+class VeloxSortBasedShuffleWriter : public VeloxShuffleWriter {
+ public:
+ static arrow::Result<std::shared_ptr<VeloxShuffleWriter>> create(
+ uint32_t numPartitions,
+ std::unique_ptr<PartitionWriter> partitionWriter,
+ ShuffleWriterOptions options,
+ std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+ arrow::MemoryPool* arrowPool);
+
+ arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit)
override;
+
+ arrow::Status stop() override;
+
+ arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override;
+
+ arrow::Status evictRowVector(uint32_t partitionId) override;
+
+ arrow::Status evictBatch(
+ uint32_t partitionId,
+ std::ostringstream* output,
+ facebook::velox::OStreamOutputStream* out,
+ facebook::velox::RowTypePtr* rowTypePtr);
+
+ private:
+ VeloxSortBasedShuffleWriter(
+ uint32_t numPartitions,
+ std::unique_ptr<PartitionWriter> partitionWriter,
+ ShuffleWriterOptions options,
+ std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+ arrow::MemoryPool* pool)
+ : VeloxShuffleWriter(numPartitions, std::move(partitionWriter),
std::move(options), std::move(veloxPool), pool) {}
+
+ arrow::Status init();
+
+ arrow::Status initFromRowVector(const facebook::velox::RowVector& rv);
+
+ void setSortState(SortState state);
+
+ arrow::Status doSort(facebook::velox::RowVectorPtr rv, int64_t memLimit);
+
+ void stat() const;
+
+ std::optional<facebook::velox::TypePtr> rowType_;
+
+ std::unique_ptr<facebook::velox::VectorStreamGroup> batch_;
+
+ // Partition ID -> Row Count
+ // subscript: Partition ID
+ // value: How many rows does this partition have in the current input
RowVector
+ // Updated for each input RowVector.
+ std::vector<uint32_t> partition2RowCount_;
+
+ std::unique_ptr<facebook::velox::serializer::presto::PrestoVectorSerde>
serde_ =
+
std::make_unique<facebook::velox::serializer::presto::PrestoVectorSerde>();
+
+ std::vector<facebook::velox::RowVectorPtr> batches_;
+
+ std::unordered_map<int32_t, std::vector<int64_t>> rowVectorIndexMap_;
+
+ std::unordered_map<int32_t, std::vector<int64_t>> rowVectorPartitionMap_;
+
+ uint32_t currentInputColumnBytes_ = 0;
+
+ SortState sortState_{kSortInit};
+}; // class VeloxSortBasedShuffleWriter
+
+} // namespace gluten
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index ffda945b1..fdf3e4491 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -19,7 +19,9 @@
#include <arrow/io/api.h>
#include "shuffle/LocalPartitionWriter.h"
+#include "shuffle/VeloxHashBasedShuffleWriter.h"
#include "shuffle/VeloxShuffleWriter.h"
+#include "shuffle/VeloxSortBasedShuffleWriter.h"
#include "shuffle/rss/RssPartitionWriter.h"
#include "utils/TestUtils.h"
#include "utils/VeloxArrowUtils.h"
@@ -68,12 +70,18 @@ std::vector<ShuffleTestParams> createShuffleTestParams() {
std::vector<int32_t> mergeBufferSizes = {0, 3, 4, 10, 4096};
for (const auto& compression : compressions) {
+ params.push_back(ShuffleTestParams{ShuffleWriterType::kSortShuffle,
PartitionWriterType::kRss, compression, 0, 0});
for (const auto compressionThreshold : compressionThresholds) {
for (const auto mergeBufferSize : mergeBufferSizes) {
- params.push_back(
- ShuffleTestParams{PartitionWriterType::kLocal, compression,
compressionThreshold, mergeBufferSize});
+ params.push_back(ShuffleTestParams{
+ ShuffleWriterType::kHashShuffle,
+ PartitionWriterType::kLocal,
+ compression,
+ compressionThreshold,
+ mergeBufferSize});
}
- params.push_back(ShuffleTestParams{PartitionWriterType::kRss,
compression, compressionThreshold, 0});
+ params.push_back(ShuffleTestParams{
+ ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss,
compression, compressionThreshold, 0});
}
}
@@ -264,7 +272,9 @@ TEST_P(HashPartitioningShuffleWriter, hashLargeVectors) {
auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
// calculate maxBatchSize_
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, hashInputVector1_));
- VELOX_CHECK_EQ(shuffleWriter->maxBatchSize(), expectedMaxBatchSize);
+ if (GetParam().shuffleWriterType == kHashShuffle) {
+ VELOX_CHECK_EQ(shuffleWriter->maxBatchSize(), expectedMaxBatchSize);
+ }
auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_},
{{1, 2, 3, 4, 8}, {0, 1}, {1, 2, 3, 4, 8}});
auto blockPid1 = takeRows({inputVector1_}, {{0, 5, 6, 7, 9, 0, 5, 6, 7, 9}});
@@ -305,6 +315,9 @@ TEST_P(RoundRobinPartitioningShuffleWriter, roundRobin) {
}
TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceRealloc) {
+ if (GetParam().shuffleWriterType == kSortShuffle) {
+ return;
+ }
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferReallocThreshold = 0; // Force re-alloc on
buffer size changed.
auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
@@ -392,6 +405,9 @@ TEST_P(RoundRobinPartitioningShuffleWriter,
preAllocForceReuse) {
}
TEST_P(RoundRobinPartitioningShuffleWriter, spillVerifyResult) {
+ if (GetParam().shuffleWriterType == kSortShuffle) {
+ return;
+ }
ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
diff --git a/cpp/velox/utils/tests/LocalRssClient.h
b/cpp/velox/utils/tests/LocalRssClient.h
index 0033526bb..c5c1b5d2c 100644
--- a/cpp/velox/utils/tests/LocalRssClient.h
+++ b/cpp/velox/utils/tests/LocalRssClient.h
@@ -30,7 +30,7 @@ class LocalRssClient : public RssClient {
public:
LocalRssClient(std::string dataFile) : dataFile_(dataFile) {}
- int32_t pushPartitionData(int32_t partitionId, char* bytes, int64_t size) {
+ int32_t pushPartitionData(int32_t partitionId, const char* bytes, int64_t
size) {
auto idx = -1;
auto maybeIdx = partitionIdx_.find(partitionId);
if (maybeIdx == partitionIdx_.end()) {
diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
index 972c0cb25..66732c97a 100644
--- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
@@ -62,6 +62,7 @@ std::unique_ptr<PartitionWriter> createPartitionWriter(
} // namespace
struct ShuffleTestParams {
+ ShuffleWriterType shuffleWriterType;
PartitionWriterType partitionWriterType;
arrow::Compression::type compressionType;
int32_t compressionThreshold;
@@ -69,8 +70,9 @@ struct ShuffleTestParams {
std::string toString() const {
std::ostringstream out;
- out << "partitionWriterType = " << partitionWriterType << ",
compressionType = " << compressionType
- << ", compressionThreshold = " << compressionThreshold << ",
mergeBufferSize = " << mergeBufferSize;
+ out << "shuffleWriterType = " << shuffleWriterType << ",
partitionWriterType = " << partitionWriterType
+ << ", compressionType = " << compressionType << ",
compressionThreshold = " << compressionThreshold
+ << ", mergeBufferSize = " << mergeBufferSize;
return out.str();
}
};
@@ -179,7 +181,7 @@ class VeloxShuffleWriterTestBase : public
facebook::velox::test::VectorTestBase
arrow::Status splitRowVector(VeloxShuffleWriter& shuffleWriter,
facebook::velox::RowVectorPtr vector) {
std::shared_ptr<ColumnarBatch> cb =
std::make_shared<VeloxColumnarBatch>(vector);
- return shuffleWriter.split(cb, ShuffleWriter::kMinMemLimit);
+ return shuffleWriter.write(cb, ShuffleWriter::kMinMemLimit);
}
// Create multiple local dirs and join with comma.
@@ -231,11 +233,47 @@ class VeloxShuffleWriterTest : public
::testing::TestWithParam<ShuffleTestParams
ShuffleTestParams params = GetParam();
partitionWriterOptions_.compressionType = params.compressionType;
+ switch (partitionWriterOptions_.compressionType) {
+ case arrow::Compression::UNCOMPRESSED:
+ partitionWriterOptions_.compressionTypeStr = "none";
+ break;
+ case arrow::Compression::LZ4_FRAME:
+ partitionWriterOptions_.compressionTypeStr = "lz4";
+ break;
+ case arrow::Compression::ZSTD:
+ partitionWriterOptions_.compressionTypeStr = "zstd";
+ break;
+ default:
+ break;
+ };
partitionWriterOptions_.compressionThreshold = params.compressionThreshold;
partitionWriterOptions_.mergeBufferSize = params.mergeBufferSize;
return arrow::Status::OK();
}
+ std::shared_ptr<VeloxShuffleWriter> createSpecificShuffleWriter(
+ arrow::MemoryPool* arrowPool,
+ std::unique_ptr<PartitionWriter> partitionWriter,
+ ShuffleWriterOptions shuffleWriterOptions,
+ uint32_t numPartitions,
+ int32_t bufferSize) {
+ std::shared_ptr<VeloxShuffleWriter> shuffleWriter;
+ if (GetParam().shuffleWriterType == kHashShuffle) {
+ shuffleWriterOptions.bufferSize = bufferSize;
+ GLUTEN_ASSIGN_OR_THROW(
+ shuffleWriter,
+ VeloxHashBasedShuffleWriter::create(
+ numPartitions, std::move(partitionWriter),
std::move(shuffleWriterOptions), pool_, arrowPool));
+ } else if (
+ GetParam().shuffleWriterType == kSortShuffle &&
GetParam().partitionWriterType == PartitionWriterType::kRss) {
+ GLUTEN_ASSIGN_OR_THROW(
+ shuffleWriter,
+ VeloxSortBasedShuffleWriter::create(
+ numPartitions, std::move(partitionWriter),
std::move(shuffleWriterOptions), pool_, arrowPool));
+ }
+ return shuffleWriter;
+ }
+
protected:
static void SetUpTestCase() {
facebook::velox::memory::MemoryManager::testingSetInstance({});
@@ -276,9 +314,33 @@ class VeloxShuffleWriterTest : public
::testing::TestWithParam<ShuffleTestParams
options.compressionType = compressionType;
auto codec = createArrowIpcCodec(options.compressionType,
CodecBackend::NONE);
auto rowType = facebook::velox::asRowType(gluten::fromArrowSchema(schema));
+ switch (options.compressionType) {
+ case arrow::Compression::type::UNCOMPRESSED:
+ options.compressionTypeStr = "none";
+ break;
+ case arrow::Compression::type::LZ4_FRAME:
+ options.compressionTypeStr = "lz4";
+ break;
+ case arrow::Compression::type::ZSTD:
+ options.compressionTypeStr = "zstd";
+ break;
+ default:
+ break;
+ };
+ auto veloxCompressionType =
facebook::velox::common::stringToCompressionKind(options.compressionTypeStr);
+ if (!facebook::velox::isRegisteredVectorSerde()) {
+
facebook::velox::serializer::presto::PrestoVectorSerde::registerVectorSerde();
+ }
// Set batchSize to a large value to make all batches are merged by reader.
auto deserializerFactory =
std::make_unique<gluten::VeloxColumnarBatchDeserializerFactory>(
- schema, std::move(codec), rowType,
std::numeric_limits<int32_t>::max(), defaultArrowMemoryPool().get(), pool_);
+ schema,
+ std::move(codec),
+ veloxCompressionType,
+ rowType,
+ std::numeric_limits<int32_t>::max(),
+ defaultArrowMemoryPool().get(),
+ pool_,
+ GetParam().shuffleWriterType);
auto reader =
std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
auto iter = reader->readStream(in);
while (iter->hasNext()) {
@@ -316,15 +378,12 @@ class SinglePartitioningShuffleWriter : public
VeloxShuffleWriterTest {
}
std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool*
arrowPool) override {
- shuffleWriterOptions_.bufferSize = 10;
shuffleWriterOptions_.partitioning = Partitioning::kSingle;
static const uint32_t kNumPartitions = 1;
auto partitionWriter = createPartitionWriter(
GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
- GLUTEN_ASSIGN_OR_THROW(
- auto shuffleWriter,
- VeloxShuffleWriter::create(
- kNumPartitions, std::move(partitionWriter),
std::move(shuffleWriterOptions_), pool_, arrowPool));
+ std::shared_ptr<VeloxShuffleWriter> shuffleWriter =
createSpecificShuffleWriter(
+ arrowPool, std::move(partitionWriter),
std::move(shuffleWriterOptions_), kNumPartitions, 10);
return shuffleWriter;
}
};
@@ -387,15 +446,12 @@ class HashPartitioningShuffleWriter : public
MultiplePartitioningShuffleWriter {
}
std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool*
arrowPool) override {
- shuffleWriterOptions_.bufferSize = 4;
shuffleWriterOptions_.partitioning = Partitioning::kHash;
static const uint32_t kNumPartitions = 2;
auto partitionWriter = createPartitionWriter(
GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
- GLUTEN_ASSIGN_OR_THROW(
- auto shuffleWriter,
- VeloxShuffleWriter::create(
- kNumPartitions, std::move(partitionWriter),
std::move(shuffleWriterOptions_), pool_, arrowPool));
+ std::shared_ptr<VeloxShuffleWriter> shuffleWriter =
createSpecificShuffleWriter(
+ arrowPool, std::move(partitionWriter),
std::move(shuffleWriterOptions_), kNumPartitions, 4);
return shuffleWriter;
}
@@ -422,15 +478,12 @@ class RangePartitioningShuffleWriter : public
MultiplePartitioningShuffleWriter
}
std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool*
arrowPool) override {
- shuffleWriterOptions_.bufferSize = 4;
shuffleWriterOptions_.partitioning = Partitioning::kRange;
static const uint32_t kNumPartitions = 2;
auto partitionWriter = createPartitionWriter(
GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
- GLUTEN_ASSIGN_OR_THROW(
- auto shuffleWriter,
- VeloxShuffleWriter::create(
- kNumPartitions, std::move(partitionWriter),
std::move(shuffleWriterOptions_), pool_, arrowPool));
+ std::shared_ptr<VeloxShuffleWriter> shuffleWriter =
createSpecificShuffleWriter(
+ arrowPool, std::move(partitionWriter),
std::move(shuffleWriterOptions_), kNumPartitions, 4);
return shuffleWriter;
}
@@ -441,7 +494,7 @@ class RangePartitioningShuffleWriter : public
MultiplePartitioningShuffleWriter
facebook::velox::TypePtr dataType,
std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors)
{ /* blockId = pid, rowVector in block */
for (auto& batch : batches) {
- ASSERT_NOT_OK(shuffleWriter.split(batch, ShuffleWriter::kMinMemLimit));
+ ASSERT_NOT_OK(shuffleWriter.write(batch, ShuffleWriter::kMinMemLimit));
}
shuffleWriteReadMultiBlocks(shuffleWriter, expectPartitionLength,
dataType, expectedVectors);
}
@@ -453,14 +506,11 @@ class RangePartitioningShuffleWriter : public
MultiplePartitioningShuffleWriter
class RoundRobinPartitioningShuffleWriter : public
MultiplePartitioningShuffleWriter {
protected:
std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool*
arrowPool) override {
- shuffleWriterOptions_.bufferSize = 4;
static const uint32_t kNumPartitions = 2;
auto partitionWriter = createPartitionWriter(
GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
- GLUTEN_ASSIGN_OR_THROW(
- auto shuffleWriter,
- VeloxShuffleWriter::create(
- kNumPartitions, std::move(partitionWriter),
std::move(shuffleWriterOptions_), pool_, arrowPool));
+ std::shared_ptr<VeloxShuffleWriter> shuffleWriter =
createSpecificShuffleWriter(
+ arrowPool, std::move(partitionWriter),
std::move(shuffleWriterOptions_), kNumPartitions, 4);
return shuffleWriter;
}
};
@@ -479,7 +529,7 @@ class VeloxShuffleWriterMemoryTest : public
VeloxShuffleWriterTestBase, public t
PartitionWriterType::kLocal, numPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
GLUTEN_ASSIGN_OR_THROW(
auto shuffleWriter,
- VeloxShuffleWriter::create(
+ VeloxHashBasedShuffleWriter::create(
numPartitions, std::move(partitionWriter),
std::move(shuffleWriterOptions_), pool_, arrowPool));
return shuffleWriter;
}
diff --git
a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
index a1a41f973..f454cf00c 100644
---
a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
+++
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Iterators;
import org.apache.celeborn.client.LifecycleManager;
import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.protocol.ShuffleMode;
import org.apache.spark.*;
import org.apache.spark.shuffle.*;
import org.apache.spark.shuffle.celeborn.*;
@@ -291,10 +290,6 @@ public class CelebornShuffleManager implements
ShuffleManager {
shuffleId = h.dependency().shuffleId();
}
- if (!ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
- throw new UnsupportedOperationException(
- "Unrecognized shuffle write mode!" +
celebornConf.shuffleWriterMode());
- }
if (h.dependency() instanceof ColumnarShuffleDependency) {
// columnar-based shuffle
return writerFactory.createShuffleWriterInstance(
diff --git
a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala
b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala
index 292ff3cc1..efd891498 100644
---
a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala
+++
b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala
@@ -29,6 +29,7 @@ import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
import java.io.IOException
+import java.util.Locale
abstract class CelebornHashBasedColumnarShuffleWriter[K, V](
shuffleId: Int,
@@ -53,6 +54,13 @@ abstract class CelebornHashBasedColumnarShuffleWriter[K, V](
protected val clientPushBufferMaxSize: Int =
celebornConf.clientPushBufferMaxSize
+ protected val clientPushSortMemoryThreshold: Long =
celebornConf.clientPushSortMemoryThreshold
+
+ protected val clientSortMemoryMaxSize: Long =
celebornConf.clientPushSortMemoryThreshold
+
+ protected val shuffleWriterType: String =
+ celebornConf.shuffleWriterMode.name.toLowerCase(Locale.ROOT)
+
protected val celebornPartitionPusher = new CelebornPartitionPusher(
shuffleId,
numMappers,
diff --git
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
index d72977f59..699626db1 100644
---
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
+++
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
@@ -39,6 +39,7 @@ import org.apache.celeborn.client.read.CelebornInputStream
import java.io._
import java.nio.ByteBuffer
+import java.util.Locale
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean
@@ -83,6 +84,8 @@ private class CelebornColumnarBatchSerializerInstance(
}
val compressionCodecBackend =
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull
+ val shuffleWriterType =
+ conf.get("spark.celeborn.client.spark.shuffle.writer",
"hash").toLowerCase(Locale.ROOT)
val jniWrapper = ShuffleReaderJniWrapper.create()
val batchSize = GlutenConfig.getConf.maxBatchSize
val handle = jniWrapper
@@ -91,7 +94,8 @@ private class CelebornColumnarBatchSerializerInstance(
nmm.getNativeInstanceHandle,
compressionCodec,
compressionCodecBackend,
- batchSize
+ batchSize,
+ shuffleWriterType
)
// Close shuffle reader instance as lately as the end of task processing,
// since the native reader could hold a reference to memory pool that
diff --git
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala
index 75d5148cd..37ea11a73 100644
---
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala
+++
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala
@@ -69,6 +69,12 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](
}
}
+ private val memoryLimit: Long = if ("sort".equals(shuffleWriterType)) {
+ Math.min(clientSortMemoryMaxSize, clientPushBufferMaxSize * numPartitions)
+ } else {
+ availableOffHeapPerTask()
+ }
+
private def availableOffHeapPerTask(): Long = {
val perTask =
SparkMemoryUtil.getCurrentAvailableOffHeapMemory /
SparkResourceUtil.getTaskSlots(conf)
@@ -97,6 +103,7 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](
bufferCompressThreshold,
GlutenConfig.getConf.columnarShuffleCompressionMode,
clientPushBufferMaxSize,
+ clientPushSortMemoryThreshold,
celebornPartitionPusher,
NativeMemoryManagers
.create(
@@ -127,11 +134,12 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](
context.taskAttemptId(),
GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning,
context.partitionId),
"celeborn",
+ shuffleWriterType,
GlutenConfig.getConf.columnarShuffleReallocThreshold
)
}
val startTime = System.nanoTime()
- jniWrapper.split(nativeShuffleWriter, cb.numRows, handle,
availableOffHeapPerTask())
+ jniWrapper.write(nativeShuffleWriter, cb.numRows, handle,
availableOffHeapPerTask())
dep.metrics("splitTime").add(System.nanoTime() - startTime)
dep.metrics("numInputRows").add(cb.numRows)
dep.metrics("inputBatches").add(1)
diff --git
a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
index 411907ae3..24425ccf7 100644
---
a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
+++
b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
@@ -41,7 +41,8 @@ public class ShuffleReaderJniWrapper implements RuntimeAware {
long memoryManagerHandle,
String compressionType,
String compressionCodecBackend,
- int batchSize);
+ int batchSize,
+ String shuffleWriterType);
public native long readStream(long shuffleReaderHandle, JniByteInputStream
jniIn);
diff --git
a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
index 243c90599..ed312fa14 100644
---
a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
+++
b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
@@ -90,8 +90,10 @@ public class ShuffleWriterJniWrapper implements RuntimeAware
{
taskAttemptId,
startPartitionId,
0,
+ 0,
null,
- "local");
+ "local",
+ "hash");
}
/**
@@ -110,12 +112,14 @@ public class ShuffleWriterJniWrapper implements
RuntimeAware {
int bufferCompressThreshold,
String compressionMode,
int pushBufferMaxSize,
+ long sortBufferMaxSize,
Object pusher,
long memoryManagerHandle,
long handle,
long taskAttemptId,
int startPartitionId,
String partitionWriterType,
+ String shuffleWriterType,
double reallocThreshold) {
return nativeMake(
part.getShortName(),
@@ -137,8 +141,10 @@ public class ShuffleWriterJniWrapper implements
RuntimeAware {
taskAttemptId,
startPartitionId,
pushBufferMaxSize,
+ sortBufferMaxSize,
pusher,
- partitionWriterType);
+ partitionWriterType,
+ shuffleWriterType);
}
public native long nativeMake(
@@ -161,8 +167,10 @@ public class ShuffleWriterJniWrapper implements
RuntimeAware {
long taskAttemptId,
int startPartitionId,
int pushBufferMaxSize,
+ long sortBufferMaxSize,
Object pusher,
- String partitionWriterType);
+ String partitionWriterType,
+ String shuffleWriterType);
/**
* Evict partition data.
@@ -187,7 +195,7 @@ public class ShuffleWriterJniWrapper implements
RuntimeAware {
* allocator instead
* @return batch bytes.
*/
- public native long split(long shuffleWriterHandle, int numRows, long
handler, long memLimit);
+ public native long write(long shuffleWriterHandle, int numRows, long
handler, long memLimit);
/**
* Write the data remained in the buffers hold by native shuffle writer to
each partition's
diff --git
a/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
b/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
index e632700e3..69e9aa9c9 100644
---
a/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
+++
b/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
@@ -102,7 +102,9 @@ private class ColumnarBatchSerializerInstance(
nmm.getNativeInstanceHandle,
compressionCodec,
compressionCodecBackend,
- batchSize)
+ batchSize,
+ "hash"
+ )
// Close shuffle reader instance as lately as the end of task processing,
// since the native reader could hold a reference to memory pool that
// was used to create all buffers read from shuffle reader. The pool
diff --git
a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index fb933866c..c797257f1 100644
---
a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++
b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -181,7 +181,7 @@ class ColumnarShuffleWriter[K, V](
)
}
val startTime = System.nanoTime()
- jniWrapper.split(nativeShuffleWriter, rows, handle,
availableOffHeapPerTask())
+ jniWrapper.write(nativeShuffleWriter, rows, handle,
availableOffHeapPerTask())
dep.metrics("splitTime").add(System.nanoTime() - startTime)
dep.metrics("numInputRows").add(rows)
dep.metrics("inputBatches").add(1)
diff --git
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index 17cfce1c0..c0063c6f4 100644
---
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -145,6 +145,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
compressThreshold,
GlutenConfig.getConf().columnarShuffleCompressionMode(),
bufferSize,
+ bufferSize,
partitionPusher,
NativeMemoryManagers.create(
"UniffleShuffleWriter",
@@ -180,12 +181,13 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
GlutenShuffleUtils.getStartPartitionId(
columnarDep.nativePartitioning(), partitionId),
"uniffle",
+ "hash",
reallocThreshold);
}
long startTime = System.nanoTime();
long bytes =
- jniWrapper.split(nativeShuffleWriter, cb.numRows(), handle,
availableOffHeapPerTask());
- LOG.debug("jniWrapper.split rows {}, split bytes {}", cb.numRows(),
bytes);
+ jniWrapper.write(nativeShuffleWriter, cb.numRows(), handle,
availableOffHeapPerTask());
+ LOG.debug("jniWrapper.write rows {}, split bytes {}", cb.numRows(),
bytes);
columnarDep.metrics().get("dataSize").get().add(bytes);
// this metric replace part of uniffle shuffle write time
columnarDep.metrics().get("splitTime").get().add(System.nanoTime() -
startTime);
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index ca8a9dce1..02c6bf7fe 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -134,6 +134,11 @@ class GlutenConfig(conf: SQLConf) extends Logging {
.getConfString("spark.shuffle.manager", "sort")
.contains("UniffleShuffleManager")
+ def isSortBasedCelebornShuffle: Boolean =
+ conf
+ .getConfString("spark.celeborn.client.spark.shuffle.writer", "hash")
+ .equals("sort")
+
def enableColumnarShuffle: Boolean = conf.getConf(COLUMNAR_SHUFFLE_ENABLED)
def enablePreferColumnar: Boolean = conf.getConf(COLUMNAR_PREFER_ENABLED)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]