This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 0120d6af9d [GLUTEN-8855][VL] Columnar shuffle code cleanup (#9693)
0120d6af9d is described below
commit 0120d6af9d7a84d0353bbae104c7f8ddcb710862
Author: Rong Ma <[email protected]>
AuthorDate: Thu May 22 15:31:23 2025 +0800
[GLUTEN-8855][VL] Columnar shuffle code cleanup (#9693)
---
cpp/core/jni/JniWrapper.cc | 17 +-
cpp/core/shuffle/LocalPartitionWriter.cc | 346 ++++++++++++-------------
cpp/core/shuffle/LocalPartitionWriter.h | 4 +-
cpp/core/shuffle/Options.h | 5 -
cpp/core/shuffle/PartitionWriter.h | 16 +-
cpp/core/shuffle/Payload.cc | 125 ++++-----
cpp/core/shuffle/Payload.h | 36 ++-
cpp/core/shuffle/Spill.cc | 1 -
cpp/core/shuffle/Utils.cc | 7 +
cpp/core/shuffle/Utils.h | 21 +-
cpp/core/shuffle/rss/RssPartitionWriter.cc | 3 +-
cpp/core/shuffle/rss/RssPartitionWriter.h | 4 +-
cpp/velox/benchmarks/GenericBenchmark.cc | 21 +-
cpp/velox/shuffle/VeloxHashShuffleWriter.cc | 32 +--
cpp/velox/shuffle/VeloxHashShuffleWriter.h | 15 --
cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc | 3 +-
cpp/velox/shuffle/VeloxShuffleReader.cc | 12 +-
cpp/velox/shuffle/VeloxShuffleReader.h | 2 -
cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 5 +-
cpp/velox/tests/VeloxShuffleWriterSpillTest.cc | 4 +-
cpp/velox/tests/VeloxShuffleWriterTest.cc | 277 +++++++++-----------
cpp/velox/tests/VeloxShuffleWriterTestBase.h | 21 +-
22 files changed, 466 insertions(+), 511 deletions(-)
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index c4ba4d865c..1321b6041f 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -832,7 +832,6 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
.compressionThreshold = compressionThreshold,
.compressionType = getCompressionType(env, codecJstr),
.compressionLevel = compressionLevel,
- .bufferedWrite = true,
.numSubDirs = numSubDirs,
.pushBufferMaxSize = pushBufferMaxSize > 0 ? pushBufferMaxSize :
kDefaultPushMemoryThreshold,
.sortBufferMaxSize = sortBufferMaxSize > 0 ? sortBufferMaxSize :
kDefaultSortBufferThreshold};
@@ -870,11 +869,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
env->ReleaseStringUTFChars(localDirsJstr, localDirsC);
partitionWriter = std::make_unique<LocalPartitionWriter>(
- numPartitions,
- std::move(partitionWriterOptions),
- ctx->memoryManager()->getArrowMemoryPool(),
- dataFile,
- configuredDirs);
+ numPartitions, std::move(partitionWriterOptions),
ctx->memoryManager(), dataFile, configuredDirs);
} else if (partitionWriterType == "celeborn") {
jclass celebornPartitionPusherClass =
createGlobalClassReferenceOrError(env,
"Lorg/apache/spark/shuffle/CelebornPartitionPusher;");
@@ -887,10 +882,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
std::shared_ptr<JavaRssClient> celebornClient =
std::make_shared<JavaRssClient>(vm, partitionPusher,
celebornPushPartitionDataMethod);
partitionWriter = std::make_unique<RssPartitionWriter>(
- numPartitions,
- std::move(partitionWriterOptions),
- ctx->memoryManager()->getArrowMemoryPool(),
- std::move(celebornClient));
+ numPartitions, std::move(partitionWriterOptions),
ctx->memoryManager(), std::move(celebornClient));
} else if (partitionWriterType == "uniffle") {
jclass unifflePartitionPusherClass =
createGlobalClassReferenceOrError(env,
"Lorg/apache/spark/shuffle/writer/PartitionPusher;");
@@ -903,10 +895,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
std::shared_ptr<JavaRssClient> uniffleClient =
std::make_shared<JavaRssClient>(vm, partitionPusher,
unifflePushPartitionDataMethod);
partitionWriter = std::make_unique<RssPartitionWriter>(
- numPartitions,
- std::move(partitionWriterOptions),
- ctx->memoryManager()->getArrowMemoryPool(),
- std::move(uniffleClient));
+ numPartitions, std::move(partitionWriterOptions),
ctx->memoryManager(), std::move(uniffleClient));
} else {
throw GlutenException("Unrecognizable partition writer type: " +
partitionWriterType);
}
diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc
b/cpp/core/shuffle/LocalPartitionWriter.cc
index 8cd328b13f..a8f6629af8 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -31,6 +31,22 @@
#include <thread>
namespace gluten {
+
+namespace {
+arrow::Result<std::shared_ptr<arrow::io::OutputStream>> openFile(const
std::string& file, int64_t bufferSize) {
+ std::shared_ptr<arrow::io::FileOutputStream> out;
+ const auto fd = open(file.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0000);
+ // Set the shuffle file permissions to 0644 to keep it consistent with the
permissions of
+ // the built-in shuffler manager in Spark.
+ fchmod(fd, 0644);
+ ARROW_ASSIGN_OR_RAISE(out, arrow::io::FileOutputStream::Open(fd));
+
+ // The `shuffleFileBufferSize` bytes is a temporary allocation and will be
freed with file close.
+ // Use default memory pool and count treat the memory as executor memory
overhead to avoid unnecessary spill.
+ return arrow::io::BufferedOutputStream::Create(bufferSize,
arrow::default_memory_pool(), out);
+}
+} // namespace
+
class LocalPartitionWriter::LocalSpiller {
public:
LocalSpiller(
@@ -38,13 +54,11 @@ class LocalPartitionWriter::LocalSpiller {
std::shared_ptr<arrow::io::OutputStream> os,
std::string spillFile,
int32_t compressionBufferSize,
- int32_t compressionThreshold,
arrow::MemoryPool* pool,
arrow::util::Codec* codec)
: isFinal_(isFinal),
os_(os),
spillFile_(std::move(spillFile)),
- compressionThreshold_(compressionThreshold),
pool_(pool),
codec_(codec),
diskSpill_(std::make_unique<Spill>()) {
@@ -67,8 +81,10 @@ class LocalPartitionWriter::LocalSpiller {
ARROW_ASSIGN_OR_RAISE(const auto pos, os_->Tell());
diskSpill_->insertPayload(lastPid_, Payload::kRaw, 0, nullptr, pos -
writePos_, pool_, nullptr);
+
DLOG(INFO) << "LocalSpiller: Spilled partition " << lastPid_ << " file
start: " << writePos_
<< ", file end: " << pos << ", file: " << spillFile_;
+
return arrow::Status::OK();
}
@@ -89,24 +105,20 @@ class LocalPartitionWriter::LocalSpiller {
}
arrow::Status spill(uint32_t partitionId, std::unique_ptr<BlockPayload>
payload) {
- // Check spill Type.
- ARROW_RETURN_IF(
- payload->type() == Payload::kToBeCompressed,
- arrow::Status::Invalid("Cannot spill payload of type: " +
payload->toString()));
ARROW_ASSIGN_OR_RAISE(auto start, os_->Tell());
RETURN_NOT_OK(payload->serialize(os_.get()));
- compressTime_ += payload->getCompressTime();
- spillTime_ += payload->getWriteTime();
+
ARROW_ASSIGN_OR_RAISE(auto end, os_->Tell());
+
DLOG(INFO) << "LocalSpiller: Spilled partition " << partitionId << " file
start: " << start << ", file end: " << end
<< ", file: " << spillFile_;
- auto payloadType = payload->type();
- if (payloadType == Payload::kUncompressed && codec_ != nullptr &&
payload->numRows() >= compressionThreshold_) {
- payloadType = Payload::kToBeCompressed;
- }
+ compressTime_ += payload->getCompressTime();
+ spillTime_ += payload->getWriteTime();
+
diskSpill_->insertPayload(
- partitionId, payloadType, payload->numRows(),
payload->isValidityBuffer(), end - start, pool_, codec_);
+ partitionId, payload->type(), payload->numRows(),
payload->isValidityBuffer(), end - start, pool_, codec_);
+
return arrow::Status::OK();
}
@@ -153,7 +165,6 @@ class LocalPartitionWriter::LocalSpiller {
int64_t writePos_{0};
std::string spillFile_;
- int32_t compressionThreshold_;
arrow::MemoryPool* pool_;
arrow::util::Codec* codec_;
@@ -175,18 +186,21 @@ class LocalPartitionWriter::PayloadMerger {
mergeBufferSize_(options.mergeBufferSize),
mergeBufferMinSize_(options.mergeBufferSize * options.mergeThreshold)
{}
- arrow::Result<std::vector<std::unique_ptr<BlockPayload>>>
+ arrow::Result<std::vector<std::unique_ptr<InMemoryPayload>>>
merge(uint32_t partitionId, std::unique_ptr<InMemoryPayload> append, bool
reuseBuffers) {
- std::vector<std::unique_ptr<BlockPayload>> merged{};
+ PartitionScopeGuard mergeGuard(partitionInUse_, partitionId);
+
+ std::vector<std::unique_ptr<InMemoryPayload>> merged{};
if (!append->mergeable()) {
// TODO: Merging complex type is currently not supported.
- merged.emplace_back();
- ARROW_ASSIGN_OR_RAISE(merged.back(),
createBlockPayload(std::move(append), reuseBuffers));
+ bool shouldCompress = codec_ != nullptr && append->numRows() >=
compressionThreshold_;
+ if (reuseBuffers && !shouldCompress) {
+ RETURN_NOT_OK(append->copyBuffers(pool_));
+ }
+ merged.emplace_back(std::move(append));
return merged;
}
- MergeGuard mergeGuard(partitionInMerge_, partitionId);
-
auto cacheOrFinish = [&]() {
if (append->numRows() <= mergeBufferMinSize_) {
// Save for merge.
@@ -197,9 +211,12 @@ class LocalPartitionWriter::PayloadMerger {
partitionMergePayload_[partitionId] = std::move(append);
return arrow::Status::OK();
}
- merged.emplace_back();
- // If current buffer rows reaches merging threshold, create BlockPayload.
- ARROW_ASSIGN_OR_RAISE(merged.back(),
createBlockPayload(std::move(append), reuseBuffers));
+ // Commit if current buffer rows reaches merging threshold.
+ bool shouldCompress = codec_ != nullptr && append->numRows() >=
compressionThreshold_;
+ if (reuseBuffers && !shouldCompress) {
+ RETURN_NOT_OK(append->copyBuffers(pool_));
+ }
+ merged.emplace_back(std::move(append));
return arrow::Status::OK();
};
@@ -211,14 +228,7 @@ class LocalPartitionWriter::PayloadMerger {
auto lastPayload = std::move(partitionMergePayload_[partitionId]);
auto mergedRows = append->numRows() + lastPayload->numRows();
if (mergedRows > mergeBufferSize_ || append->numRows() >
mergeBufferMinSize_) {
- merged.emplace_back();
- ARROW_ASSIGN_OR_RAISE(
- merged.back(),
- lastPayload->toBlockPayload(
- codec_ != nullptr && lastPayload->numRows() >=
compressionThreshold_ ? Payload::kCompressed
-
: Payload::kUncompressed,
- pool_,
- codec_));
+ merged.emplace_back(std::move(lastPayload));
RETURN_NOT_OK(cacheOrFinish());
return merged;
}
@@ -232,40 +242,26 @@ class LocalPartitionWriter::PayloadMerger {
partitionMergePayload_[partitionId] = std::move(payload);
return merged;
}
+
// mergedRows == mergeBufferSize_
- merged.emplace_back();
- ARROW_ASSIGN_OR_RAISE(
- merged.back(),
- payload->toBlockPayload(
- codec_ != nullptr && payload->numRows() >= compressionThreshold_ ?
Payload::kCompressed
- :
Payload::kUncompressed,
- pool_,
- codec_));
+ merged.emplace_back(std::move(payload));
return merged;
}
- arrow::Result<std::optional<std::unique_ptr<BlockPayload>>> finishForSpill(
- uint32_t partitionId,
- int64_t& totalBytesToEvict) {
+ arrow::Result<std::optional<std::unique_ptr<InMemoryPayload>>>
finish(uint32_t partitionId, bool fromSpill) {
// We need to check whether the spill source is from compressing/copying
the merged buffers.
- if ((partitionInMerge_.has_value() && *partitionInMerge_ == partitionId)
|| !hasMerged(partitionId)) {
+ if ((fromSpill && (partitionInUse_.has_value() && partitionInUse_.value()
== partitionId)) ||
+ !hasMerged(partitionId)) {
return std::nullopt;
}
- auto payload = std::move(partitionMergePayload_[partitionId]);
- totalBytesToEvict += payload->rawSize();
- return payload->toBlockPayload(Payload::kUncompressed, pool_, codec_);
- }
- arrow::Result<std::optional<std::unique_ptr<BlockPayload>>> finish(uint32_t
partitionId) {
- if (!hasMerged(partitionId)) {
- return std::nullopt;
+ if (!fromSpill) {
+ GLUTEN_DCHECK(
+ !partitionInUse_.has_value(),
+ "Invalid status: partitionInUse_ is set when not in spilling: " +
std::to_string(partitionInUse_.value()));
}
- auto numRows = partitionMergePayload_[partitionId]->numRows();
- // Because this is the last BlockPayload, delay the compression before
writing to the final data file.
- auto payloadType =
- (codec_ != nullptr && numRows >= compressionThreshold_) ?
Payload::kToBeCompressed : Payload::kUncompressed;
- auto payload = std::move(partitionMergePayload_[partitionId]);
- return payload->toBlockPayload(payloadType, pool_, codec_);
+
+ return std::move(partitionMergePayload_[partitionId]);
}
bool hasMerged(uint32_t partitionId) {
@@ -280,81 +276,45 @@ class LocalPartitionWriter::PayloadMerger {
int32_t mergeBufferSize_;
int32_t mergeBufferMinSize_;
std::unordered_map<uint32_t, std::unique_ptr<InMemoryPayload>>
partitionMergePayload_;
- std::optional<uint32_t> partitionInMerge_;
-
- class MergeGuard {
- public:
- MergeGuard(std::optional<uint32_t>& partitionInMerge, uint32_t
partitionId) : partitionInMerge_(partitionInMerge) {
- partitionInMerge_ = partitionId;
- }
-
- ~MergeGuard() {
- partitionInMerge_ = std::nullopt;
- }
-
- private:
- std::optional<uint32_t>& partitionInMerge_;
- };
-
- arrow::Status copyBuffers(std::vector<std::shared_ptr<arrow::Buffer>>&
buffers) {
- // Copy.
- std::vector<std::shared_ptr<arrow::Buffer>> copies;
- for (auto& buffer : buffers) {
- if (!buffer) {
- continue;
- }
- if (buffer->size() == 0) {
- buffer = zeroLengthNullBuffer();
- continue;
- }
- ARROW_ASSIGN_OR_RAISE(auto copy,
arrow::AllocateResizableBuffer(buffer->size(), pool_));
- memcpy(copy->mutable_data(), buffer->data(), buffer->size());
- buffer = std::move(copy);
- }
- return arrow::Status::OK();
- }
-
- arrow::Result<std::unique_ptr<BlockPayload>> createBlockPayload(
- std::unique_ptr<InMemoryPayload> inMemoryPayload,
- bool reuseBuffers) {
- auto createCompressed = codec_ != nullptr && inMemoryPayload->numRows() >=
compressionThreshold_;
- if (reuseBuffers && !createCompressed) {
- // For uncompressed buffers, need to copy before caching.
- RETURN_NOT_OK(inMemoryPayload->copyBuffers(pool_));
- }
- ARROW_ASSIGN_OR_RAISE(
- auto payload,
- inMemoryPayload->toBlockPayload(
- createCompressed ? Payload::kCompressed : Payload::kUncompressed,
pool_, codec_));
- return payload;
- }
+ std::optional<uint32_t> partitionInUse_{std::nullopt};
};
class LocalPartitionWriter::PayloadCache {
public:
- PayloadCache(uint32_t numPartitions) : numPartitions_(numPartitions) {}
+ PayloadCache(uint32_t numPartitions, arrow::util::Codec* codec, int32_t
compressionThreshold, arrow::MemoryPool* pool)
+ : numPartitions_(numPartitions), codec_(codec),
compressionThreshold_(compressionThreshold), pool_(pool) {}
+
+ arrow::Status cache(uint32_t partitionId, std::unique_ptr<InMemoryPayload>
payload) {
+ PartitionScopeGuard cacheGuard(partitionInUse_, partitionId);
- arrow::Status cache(uint32_t partitionId, std::unique_ptr<BlockPayload>
payload) {
if (partitionCachedPayload_.find(partitionId) ==
partitionCachedPayload_.end()) {
partitionCachedPayload_[partitionId] =
std::list<std::unique_ptr<BlockPayload>>{};
}
- partitionCachedPayload_[partitionId].push_back(std::move(payload));
- return arrow::Status::OK();
- }
- bool hasCachedPayloads(uint32_t partitionId) {
- return partitionCachedPayload_.find(partitionId) !=
partitionCachedPayload_.end() &&
- !partitionCachedPayload_[partitionId].empty();
+ bool shouldCompress = codec_ != nullptr && payload->numRows() >=
compressionThreshold_;
+ ARROW_ASSIGN_OR_RAISE(
+ auto block,
+ payload->toBlockPayload(shouldCompress ? Payload::kCompressed :
Payload::kUncompressed, pool_, codec_));
+
+ partitionCachedPayload_[partitionId].push_back(std::move(block));
+
+ return arrow::Status::OK();
}
arrow::Status write(uint32_t partitionId, arrow::io::OutputStream* os) {
+ GLUTEN_DCHECK(
+ !partitionInUse_.has_value(),
+ "Invalid status: partitionInUse_ is set: " +
std::to_string(partitionInUse_.value()));
+
if (hasCachedPayloads(partitionId)) {
auto& payloads = partitionCachedPayload_[partitionId];
while (!payloads.empty()) {
auto payload = std::move(payloads.front());
payloads.pop_front();
+
// Write the cached payload to disk.
RETURN_NOT_OK(payload->serialize(os));
+
compressTime_ += payload->getCompressTime();
writeTime_ += payload->getWriteTime();
}
@@ -364,6 +324,9 @@ class LocalPartitionWriter::PayloadCache {
bool canSpill() {
for (auto pid = 0; pid < numPartitions_; ++pid) {
+ if (partitionInUse_.has_value() && partitionInUse_.value() == pid) {
+ continue;
+ }
if (hasCachedPayloads(pid)) {
return true;
}
@@ -371,40 +334,49 @@ class LocalPartitionWriter::PayloadCache {
return false;
}
- arrow::Result<std::shared_ptr<Spill>> spillAndClose(
- std::shared_ptr<arrow::io::OutputStream> os,
+ arrow::Result<std::shared_ptr<Spill>> spill(
const std::string& spillFile,
arrow::MemoryPool* pool,
arrow::util::Codec* codec,
+ const int64_t bufferSize,
int64_t& totalBytesToEvict) {
- std::shared_ptr<Spill> diskSpill = nullptr;
- ARROW_ASSIGN_OR_RAISE(auto start, os->Tell());
+ ARROW_ASSIGN_OR_RAISE(auto os, openFile(spillFile, bufferSize));
+
+ int64_t start = 0;
+ auto diskSpill = std::make_shared<Spill>();
+
for (uint32_t pid = 0; pid < numPartitions_; ++pid) {
+ if (partitionInUse_.has_value() && partitionInUse_.value() == pid) {
+ continue;
+ }
+
if (hasCachedPayloads(pid)) {
auto& payloads = partitionCachedPayload_[pid];
while (!payloads.empty()) {
auto payload = std::move(payloads.front());
payloads.pop_front();
totalBytesToEvict += payload->rawSize();
+
// Spill the cached payload to disk.
RETURN_NOT_OK(payload->serialize(os.get()));
compressTime_ += payload->getCompressTime();
spillTime_ += payload->getWriteTime();
-
- if (UNLIKELY(!diskSpill)) {
- diskSpill = std::make_unique<Spill>();
- }
- ARROW_ASSIGN_OR_RAISE(auto end, os->Tell());
- DLOG(INFO) << "PayloadCache: Spilled partition " << pid << " file
start: " << start << ", file end: " << end
- << ", file: " << spillFile;
- diskSpill->insertPayload(
- pid, payload->type(), payload->numRows(),
payload->isValidityBuffer(), end - start, pool, codec);
- start = end;
}
+
+ ARROW_ASSIGN_OR_RAISE(auto end, os->Tell());
+
+ diskSpill->insertPayload(pid, Payload::kRaw, 0, nullptr, end - start,
pool, codec);
+
+ DLOG(INFO) << "PayloadCache: Spilled partition " << pid << " file
start: " << start << ", file end: " << end
+ << ", file: " << spillFile;
+
+ start = end;
}
}
+
RETURN_NOT_OK(os->Close());
diskSpill->setSpillFile(spillFile);
+
return diskSpill;
}
@@ -421,20 +393,31 @@ class LocalPartitionWriter::PayloadCache {
}
private:
+ bool hasCachedPayloads(uint32_t partitionId) {
+ return partitionCachedPayload_.find(partitionId) !=
partitionCachedPayload_.end() &&
+ !partitionCachedPayload_[partitionId].empty();
+ }
+
uint32_t numPartitions_;
+ arrow::util::Codec* codec_;
+ int32_t compressionThreshold_;
+ arrow::MemoryPool* pool_;
+
int64_t compressTime_{0};
int64_t spillTime_{0};
int64_t writeTime_{0};
std::unordered_map<uint32_t, std::list<std::unique_ptr<BlockPayload>>>
partitionCachedPayload_;
+
+ std::optional<uint32_t> partitionInUse_{std::nullopt};
};
LocalPartitionWriter::LocalPartitionWriter(
uint32_t numPartitions,
PartitionWriterOptions options,
- arrow::MemoryPool* pool,
+ MemoryManager* memoryManager,
const std::string& dataFile,
const std::vector<std::string>& localDirs)
- : PartitionWriter(numPartitions, std::move(options), pool),
dataFile_(dataFile), localDirs_(localDirs) {
+ : PartitionWriter(numPartitions, std::move(options), memoryManager),
dataFile_(dataFile), localDirs_(localDirs) {
init();
}
@@ -445,21 +428,6 @@ std::string LocalPartitionWriter::nextSpilledFileDir() {
return spilledFileDir;
}
-arrow::Result<std::shared_ptr<arrow::io::OutputStream>>
LocalPartitionWriter::openFile(const std::string& file) {
- std::shared_ptr<arrow::io::FileOutputStream> fout;
- auto fd = open(file.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0000);
- // Set the shuffle file permissions to 0644 to keep it consistent with the
permissions of
- // the built-in shuffler manager in Spark.
- fchmod(fd, 0644);
- ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(fd));
- if (options_.bufferedWrite) {
- // The `shuffleFileBufferSize` bytes is a temporary allocation and will be
freed with file close.
- // Use default memory pool and count treat the memory as executor memory
overhead to avoid unnecessary spill.
- return
arrow::io::BufferedOutputStream::Create(options_.shuffleFileBufferSize,
arrow::default_memory_pool(), fout);
- }
- return fout;
-}
-
arrow::Status LocalPartitionWriter::clearResource() {
RETURN_NOT_OK(dataFileOs_->Close());
// When bufferedWrite = true, dataFileOs_->Close doesn't release underlying
buffer.
@@ -490,7 +458,6 @@ arrow::Result<int64_t>
LocalPartitionWriter::mergeSpills(uint32_t partitionId) {
// Read if partition exists in the spilled file. Then write to the final
data file.
while (auto payload = spill->nextPayload(partitionId)) {
- // May trigger spill during compression.
RETURN_NOT_OK(payload->serialize(dataFileOs_.get()));
compressTime_ += payload->getCompressTime();
writeTime_ += payload->getWriteTime();
@@ -535,7 +502,24 @@ arrow::Status
LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
compressTime_ += spill->compressTime();
} else {
RETURN_NOT_OK(finishSpill());
- ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_));
+
+ if (merger_) {
+ for (auto pid = 0; pid < numPartitions_; ++pid) {
+ ARROW_ASSIGN_OR_RAISE(auto maybeMerged, merger_->finish(pid, false));
+ if (maybeMerged.has_value()) {
+ if (!payloadCache_) {
+ payloadCache_ = std::make_shared<PayloadCache>(
+ numPartitions_, codec_.get(), options_.compressionThreshold,
payloadPool_.get());
+ }
+ // Spill can be triggered by compressing or building dictionaries.
+ RETURN_NOT_OK(payloadCache_->cache(pid,
std::move(maybeMerged.value())));
+ }
+ }
+
+ merger_.reset();
+ }
+
+ ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_,
options_.shuffleFileBufferSize));
int64_t endInFinalFile = 0;
DLOG(INFO) << "LocalPartitionWriter stopped. Total spills: " <<
spills_.size();
@@ -544,20 +528,13 @@ arrow::Status
LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
// Record start offset.
auto startInFinalFile = endInFinalFile;
// Iterator over all spilled files.
- // Reading and compressing toBeCompressed payload can trigger spill.
+ // May trigger spill during compression.
RETURN_NOT_OK(mergeSpills(pid));
- if (payloadCache_ && payloadCache_->hasCachedPayloads(pid)) {
+
+ if (payloadCache_) {
RETURN_NOT_OK(payloadCache_->write(pid, dataFileOs_.get()));
}
- if (merger_) {
- ARROW_ASSIGN_OR_RAISE(auto merged, merger_->finish(pid));
- if (merged) {
- // Compressing merged payload can trigger spill.
- RETURN_NOT_OK((*merged)->serialize(dataFileOs_.get()));
- compressTime_ += (*merged)->getCompressTime();
- writeTime_ += (*merged)->getWriteTime();
- }
- }
+
ARROW_ASSIGN_OR_RAISE(endInFinalFile, dataFileOs_->Tell());
partitionLengths_[pid] = endInFinalFile - startInFinalFile;
}
@@ -593,22 +570,16 @@ arrow::Status LocalPartitionWriter::requestSpill(bool
isFinal) {
std::shared_ptr<arrow::io::OutputStream> os;
if (isFinal) {
// If `spill()` is requested after `stop()`, open the final data file
for writing.
- ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_));
+ ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_,
options_.shuffleFileBufferSize));
spillFile = dataFile_;
os = dataFileOs_;
useSpillFileAsDataFile_ = true;
} else {
ARROW_ASSIGN_OR_RAISE(spillFile,
createTempShuffleFile(nextSpilledFileDir()));
- ARROW_ASSIGN_OR_RAISE(os, openFile(spillFile));
+ ARROW_ASSIGN_OR_RAISE(os, openFile(spillFile,
options_.shuffleFileBufferSize));
}
spiller_ = std::make_unique<LocalSpiller>(
- isFinal,
- os,
- std::move(spillFile),
- options_.compressionBufferSize,
- options_.compressionThreshold,
- payloadPool_.get(),
- codec_.get());
+ isFinal, os, std::move(spillFile), options_.compressionBufferSize,
payloadPool_.get(), codec_.get());
}
return arrow::Status::OK();
}
@@ -631,19 +602,25 @@ arrow::Status LocalPartitionWriter::hashEvict(
if (evictType == Evict::kSpill) {
RETURN_NOT_OK(requestSpill(false));
+
+ auto shouldCompress = codec_ != nullptr && inMemoryPayload->numRows() >=
options_.compressionThreshold;
ARROW_ASSIGN_OR_RAISE(
- auto payload, inMemoryPayload->toBlockPayload(Payload::kUncompressed,
payloadPool_.get(), nullptr));
+ auto payload,
+ inMemoryPayload->toBlockPayload(
+ shouldCompress ? Payload::kToBeCompressed :
Payload::kUncompressed, payloadPool_.get(), codec_.get()));
+
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
return arrow::Status::OK();
}
if (!merger_) {
- merger_ = std::make_shared<PayloadMerger>(options_, payloadPool_.get(),
codec_ ? codec_.get() : nullptr);
+ merger_ = std::make_shared<PayloadMerger>(options_, payloadPool_.get(),
codec_.get());
}
ARROW_ASSIGN_OR_RAISE(auto merged, merger_->merge(partitionId,
std::move(inMemoryPayload), reuseBuffers));
if (!merged.empty()) {
if (UNLIKELY(!payloadCache_)) {
- payloadCache_ = std::make_shared<PayloadCache>(numPartitions_);
+ payloadCache_ = std::make_shared<PayloadCache>(
+ numPartitions_, codec_.get(), options_.compressionThreshold,
payloadPool_.get());
}
for (auto& payload : merged) {
RETURN_NOT_OK(payloadCache_->cache(partitionId, std::move(payload)));
@@ -690,33 +667,48 @@ arrow::Status
LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu
// Reclaim memory from payloadCache.
if (payloadCache_ && payloadCache_->canSpill()) {
auto beforeSpill = payloadPool_->bytes_allocated();
+
ARROW_ASSIGN_OR_RAISE(auto spillFile,
createTempShuffleFile(nextSpilledFileDir()));
- ARROW_ASSIGN_OR_RAISE(auto os, openFile(spillFile));
+
spills_.emplace_back();
ARROW_ASSIGN_OR_RAISE(
spills_.back(),
- payloadCache_->spillAndClose(os, spillFile, payloadPool_.get(),
codec_.get(), totalBytesToEvict_));
+ payloadCache_->spill(
+ spillFile, payloadPool_.get(), codec_.get(),
options_.shuffleFileBufferSize, totalBytesToEvict_));
+
reclaimed += beforeSpill - payloadPool_->bytes_allocated();
+
if (reclaimed >= size) {
*actual = reclaimed;
return arrow::Status::OK();
}
}
+
// Then spill payloads from merger. Create uncompressed payloads.
if (merger_) {
- auto beforeSpill = payloadPool_->bytes_allocated();
for (auto pid = 0; pid < numPartitions_; ++pid) {
- ARROW_ASSIGN_OR_RAISE(auto merged, merger_->finishForSpill(pid,
totalBytesToEvict_));
- if (merged.has_value()) {
+ ARROW_ASSIGN_OR_RAISE(auto maybeMerged, merger_->finish(pid, true));
+
+ if (maybeMerged.has_value()) {
+ const auto& merged = maybeMerged.value();
+
+ totalBytesToEvict_ += merged->rawSize();
+ reclaimed += merged->rawCapacity();
+
RETURN_NOT_OK(requestSpill(false));
- RETURN_NOT_OK(spiller_->spill(pid, std::move(*merged)));
+
+ bool shouldCompress = codec_ != nullptr && merged->numRows() >=
options_.compressionThreshold;
+ ARROW_ASSIGN_OR_RAISE(
+ auto payload,
+ merged->toBlockPayload(
+ shouldCompress ? Payload::kToBeCompressed :
Payload::kUncompressed, payloadPool_.get(), codec_.get()));
+
+ RETURN_NOT_OK(spiller_->spill(pid, std::move(payload)));
}
}
- // This is not accurate. When the evicted partition buffers are not
copied, the merged ones
- // are resized from the original buffers thus allocated from
partitionBufferPool.
- reclaimed += beforeSpill - payloadPool_->bytes_allocated();
RETURN_NOT_OK(finishSpill());
}
+
*actual = reclaimed;
return arrow::Status::OK();
}
diff --git a/cpp/core/shuffle/LocalPartitionWriter.h
b/cpp/core/shuffle/LocalPartitionWriter.h
index 02cc16c565..594a58f89b 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.h
+++ b/cpp/core/shuffle/LocalPartitionWriter.h
@@ -30,7 +30,7 @@ class LocalPartitionWriter : public PartitionWriter {
explicit LocalPartitionWriter(
uint32_t numPartitions,
PartitionWriterOptions options,
- arrow::MemoryPool* pool,
+ MemoryManager* memoryManager,
const std::string& dataFile,
const std::vector<std::string>& localDirs);
@@ -90,8 +90,6 @@ class LocalPartitionWriter : public PartitionWriter {
std::string nextSpilledFileDir();
- arrow::Result<std::shared_ptr<arrow::io::OutputStream>> openFile(const
std::string& file);
-
arrow::Result<int64_t> mergeSpills(uint32_t partitionId);
arrow::Status clearResource();
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 55d82cafd9..898bb5b9b8 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -34,11 +34,8 @@ static constexpr int32_t kDefaultNumSubDirs = 64;
static constexpr int32_t kDefaultCompressionThreshold = 100;
static constexpr int32_t kDefaultCompressionBufferSize = 32 * 1024;
static constexpr int32_t kDefaultDiskWriteBufferSize = 1024 * 1024;
-static const std::string kDefaultCompressionTypeStr = "lz4";
-static constexpr int32_t kDefaultBufferAlignment = 64;
static constexpr double kDefaultBufferReallocThreshold = 0.25;
static constexpr double kDefaultMergeBufferThreshold = 0.25;
-static constexpr bool kEnableBufferedWrite = true;
static constexpr bool kDefaultUseRadixSort = true;
static constexpr int32_t kDefaultSortBufferSize = 4096;
static constexpr int64_t kDefaultReadBufferSize = 1 << 20;
@@ -93,8 +90,6 @@ struct PartitionWriterOptions {
int32_t compressionLevel = arrow::util::kUseDefaultCompressionLevel;
CompressionMode compressionMode = CompressionMode::BUFFER;
- bool bufferedWrite = kEnableBufferedWrite;
-
int32_t numSubDirs = kDefaultNumSubDirs;
int64_t pushBufferMaxSize = kDefaultPushMemoryThreshold;
diff --git a/cpp/core/shuffle/PartitionWriter.h
b/cpp/core/shuffle/PartitionWriter.h
index 4d86b7fdad..ece16c95a9 100644
--- a/cpp/core/shuffle/PartitionWriter.h
+++ b/cpp/core/shuffle/PartitionWriter.h
@@ -18,6 +18,7 @@
#pragma once
#include "ShuffleMemoryPool.h"
+#include "memory/MemoryManager.h"
#include "memory/Reclaimable.h"
#include "shuffle/Options.h"
#include "shuffle/Payload.h"
@@ -31,9 +32,9 @@ struct Evict {
class PartitionWriter : public Reclaimable {
public:
- PartitionWriter(uint32_t numPartitions, PartitionWriterOptions options,
arrow::MemoryPool* pool)
- : numPartitions_(numPartitions), options_(std::move(options)),
pool_(pool) {
- payloadPool_ = std::make_unique<ShuffleMemoryPool>(pool);
+ PartitionWriter(uint32_t numPartitions, PartitionWriterOptions options,
MemoryManager* memoryManager)
+ : numPartitions_(numPartitions), options_(std::move(options)),
memoryManager_(memoryManager) {
+ payloadPool_ =
std::make_unique<ShuffleMemoryPool>(memoryManager->getArrowMemoryPool());
codec_ = createArrowIpcCodec(options_.compressionType,
options_.codecBackend, options_.compressionLevel);
}
@@ -61,13 +62,6 @@ class PartitionWriter : public Reclaimable {
virtual arrow::Status
sortEvict(uint32_t partitionId, std::unique_ptr<InMemoryPayload>
inMemoryPayload, bool isFinal) = 0;
- std::optional<int64_t> getCompressedBufferLength(const
std::vector<std::shared_ptr<arrow::Buffer>>& buffers) {
- if (!codec_) {
- return std::nullopt;
- }
- return BlockPayload::maxCompressedLength(buffers, codec_.get());
- }
-
virtual arrow::Status evict(uint32_t partitionId,
std::unique_ptr<BlockPayload> blockPayload, bool stop) = 0;
uint64_t cachedPayloadSize() {
@@ -81,7 +75,7 @@ class PartitionWriter : public Reclaimable {
protected:
uint32_t numPartitions_;
PartitionWriterOptions options_;
- arrow::MemoryPool* pool_;
+ MemoryManager* memoryManager_;
// Memory Pool used to track memory allocation of partition payloads.
// The actual allocation is delegated to options_.memoryPool.
diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc
index 602b49b39e..352f46ffb6 100644
--- a/cpp/core/shuffle/Payload.cc
+++ b/cpp/core/shuffle/Payload.cc
@@ -188,25 +188,18 @@ arrow::Result<std::unique_ptr<BlockPayload>>
BlockPayload::fromBuffers(
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
const std::vector<bool>* isValidityBuffer,
arrow::MemoryPool* pool,
- arrow::util::Codec* codec,
- std::shared_ptr<arrow::Buffer> compressed) {
+ arrow::util::Codec* codec) {
+ const uint32_t numBuffers = buffers.size();
+
if (payloadType == Payload::Type::kCompressed) {
Timer compressionTime;
compressionTime.start();
// Compress.
auto maxLength = maxCompressedLength(buffers, codec);
std::shared_ptr<arrow::Buffer> compressedBuffer;
- uint8_t* output;
- if (compressed) {
- ARROW_RETURN_IF(
- compressed->size() < maxLength,
- arrow::Status::Invalid(
- "Compressed buffer length < maxCompressedLength. (",
compressed->size(), " vs ", maxLength, ")"));
- output = const_cast<uint8_t*>(compressed->data());
- } else {
- ARROW_ASSIGN_OR_RAISE(compressedBuffer,
arrow::AllocateResizableBuffer(maxLength, pool));
- output = compressedBuffer->mutable_data();
- }
+
+ ARROW_ASSIGN_OR_RAISE(compressedBuffer,
arrow::AllocateResizableBuffer(maxLength, pool));
+ auto* output = compressedBuffer->mutable_data();
int64_t actualLength = 0;
// Compress buffers one by one.
@@ -219,19 +212,18 @@ arrow::Result<std::unique_ptr<BlockPayload>>
BlockPayload::fromBuffers(
}
ARROW_RETURN_IF(actualLength < 0, arrow::Status::Invalid("Writing
compressed buffer out of bound."));
- if (compressed) {
- compressedBuffer = std::make_shared<arrow::Buffer>(compressed->data(),
actualLength);
- } else {
-
RETURN_NOT_OK(std::dynamic_pointer_cast<arrow::ResizableBuffer>(compressedBuffer)->Resize(actualLength));
- }
+
+
RETURN_NOT_OK(std::dynamic_pointer_cast<arrow::ResizableBuffer>(compressedBuffer)->Resize(actualLength));
+
compressionTime.stop();
auto payload = std::unique_ptr<BlockPayload>(
- new BlockPayload(Type::kCompressed, numRows, {compressedBuffer},
isValidityBuffer, pool, codec));
+ new BlockPayload(Type::kCompressed, numRows, numBuffers,
{compressedBuffer}, isValidityBuffer));
payload->setCompressionTime(compressionTime.realTimeUsed());
+
return payload;
}
return std::unique_ptr<BlockPayload>(
- new BlockPayload(payloadType, numRows, std::move(buffers),
isValidityBuffer, pool, codec));
+ new BlockPayload(payloadType, numRows, numBuffers, std::move(buffers),
isValidityBuffer));
}
arrow::Status BlockPayload::serialize(arrow::io::OutputStream* outputStream) {
@@ -240,8 +232,7 @@ arrow::Status
BlockPayload::serialize(arrow::io::OutputStream* outputStream) {
ScopedTimer timer(&writeTime_);
RETURN_NOT_OK(outputStream->Write(&kUncompressedType, sizeof(Type)));
RETURN_NOT_OK(outputStream->Write(&numRows_, sizeof(uint32_t)));
- uint32_t numBuffers = buffers_.size();
- RETURN_NOT_OK(outputStream->Write(&numBuffers, sizeof(uint32_t)));
+ RETURN_NOT_OK(outputStream->Write(&numBuffers_, sizeof(uint32_t)));
for (auto& buffer : buffers_) {
if (!buffer) {
RETURN_NOT_OK(outputStream->Write(&kNullBuffer, sizeof(int64_t)));
@@ -255,23 +246,29 @@ arrow::Status
BlockPayload::serialize(arrow::io::OutputStream* outputStream) {
}
} break;
case Type::kToBeCompressed: {
- {
- ScopedTimer timer(&writeTime_);
- RETURN_NOT_OK(outputStream->Write(&kCompressedType, sizeof(Type)));
- RETURN_NOT_OK(outputStream->Write(&numRows_, sizeof(uint32_t)));
- uint32_t numBuffers = buffers_.size();
- RETURN_NOT_OK(outputStream->Write(&numBuffers, sizeof(uint32_t)));
- }
+ ScopedTimer timer(&writeTime_);
+
+ // No type and rows metadata for kToBeCompressed payload.
+ RETURN_NOT_OK(outputStream->Write(&numBuffers_, sizeof(uint32_t)));
+
for (auto& buffer : buffers_) {
- RETURN_NOT_OK(compressAndFlush(std::move(buffer), outputStream,
codec_, pool_, compressTime_, writeTime_));
+ if (!buffer) {
+ RETURN_NOT_OK(outputStream->Write(&kNullBuffer, sizeof(int64_t)));
+ continue;
+ }
+
+ int64_t bufferSize = buffer->size();
+ RETURN_NOT_OK(outputStream->Write(&bufferSize, sizeof(int64_t)));
+ if (bufferSize > 0) {
+ RETURN_NOT_OK(outputStream->Write(std::move(buffer)));
+ }
}
} break;
case Type::kCompressed: {
ScopedTimer timer(&writeTime_);
RETURN_NOT_OK(outputStream->Write(&kCompressedType, sizeof(Type)));
RETURN_NOT_OK(outputStream->Write(&numRows_, sizeof(uint32_t)));
- uint32_t buffers = numBuffers();
- RETURN_NOT_OK(outputStream->Write(&buffers, sizeof(uint32_t)));
+ RETURN_NOT_OK(outputStream->Write(&numBuffers_, sizeof(uint32_t)));
RETURN_NOT_OK(outputStream->Write(std::move(buffers_[0])));
} break;
case Type::kRaw: {
@@ -423,16 +420,12 @@ arrow::Result<std::unique_ptr<InMemoryPayload>>
InMemoryPayload::merge(
}
}
}
- return std::make_unique<InMemoryPayload>(mergedRows, isValidityBuffer,
std::move(merged));
+ return std::make_unique<InMemoryPayload>(mergedRows, isValidityBuffer,
source->schema(), std::move(merged));
}
-arrow::Result<std::unique_ptr<BlockPayload>> InMemoryPayload::toBlockPayload(
- Payload::Type payloadType,
- arrow::MemoryPool* pool,
- arrow::util::Codec* codec,
- std::shared_ptr<arrow::Buffer> compressed) {
- return BlockPayload::fromBuffers(
- payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool,
codec, std::move(compressed));
+arrow::Result<std::unique_ptr<BlockPayload>>
+InMemoryPayload::toBlockPayload(Payload::Type payloadType, arrow::MemoryPool*
pool, arrow::util::Codec* codec) {
+ return BlockPayload::fromBuffers(payloadType, numRows_, std::move(buffers_),
isValidityBuffer_, pool, codec);
}
arrow::Status InMemoryPayload::serialize(arrow::io::OutputStream*
outputStream) {
@@ -472,10 +465,22 @@ int64_t InMemoryPayload::rawSize() {
return getBufferSize(buffers_);
}
+uint32_t InMemoryPayload::numBuffers() const {
+ return buffers_.size();
+}
+
+int64_t InMemoryPayload::rawCapacity() const {
+ return getBufferCapacity(buffers_);
+}
+
bool InMemoryPayload::mergeable() const {
return !hasComplexType_;
}
+std::shared_ptr<arrow::Schema> InMemoryPayload::schema() const {
+ return schema_;
+}
+
UncompressedDiskBlockPayload::UncompressedDiskBlockPayload(
Type type,
uint32_t numRows,
@@ -494,45 +499,47 @@ arrow::Status
UncompressedDiskBlockPayload::serialize(arrow::io::OutputStream* o
ARROW_RETURN_IF(
inputStream_ == nullptr, arrow::Status::Invalid("inputStream_ is
uninitialized before calling serialize()."));
- if (codec_ == nullptr || type_ == Payload::kUncompressed) {
+ if (type_ == Payload::kUncompressed) {
ARROW_ASSIGN_OR_RAISE(auto block, inputStream_->Read(rawSize_));
RETURN_NOT_OK(outputStream->Write(block));
return arrow::Status::OK();
}
- ARROW_RETURN_IF(
- type_ != Payload::kToBeCompressed,
- arrow::Status::Invalid(
- "Invalid payload type: " + std::to_string(type_) +
- ", should be either Payload::kUncompressed or
Payload::kToBeCompressed"));
+ GLUTEN_DCHECK(
+ type_ == Payload::kToBeCompressed,
+ "Invalid payload type: " + std::to_string(type_) +
+ ", should be either Payload::kUncompressed or
Payload::kToBeCompressed");
+
+ GLUTEN_CHECK(codec_ != nullptr, "Codec is null when serializing
Payload::kToBeCompressed.");
+
RETURN_NOT_OK(outputStream->Write(&kCompressedType,
sizeof(kCompressedType)));
RETURN_NOT_OK(outputStream->Write(&numRows_, sizeof(uint32_t)));
- ARROW_ASSIGN_OR_RAISE(auto startPos, inputStream_->Tell());
-
- // Discard original type and rows.
- Payload::Type type;
- uint32_t numRows;
- ARROW_ASSIGN_OR_RAISE(auto bytes, inputStream_->Read(sizeof(Payload::Type),
&type));
- ARROW_ASSIGN_OR_RAISE(bytes, inputStream_->Read(sizeof(uint32_t), &numRows));
uint32_t numBuffers = 0;
- ARROW_ASSIGN_OR_RAISE(bytes, inputStream_->Read(sizeof(uint32_t),
&numBuffers));
+ ARROW_ASSIGN_OR_RAISE(auto bytes, inputStream_->Read(sizeof(uint32_t),
&numBuffers));
ARROW_RETURN_IF(bytes == 0 || numBuffers == 0,
arrow::Status::Invalid("Cannot serialize payload with 0 buffers."));
+
RETURN_NOT_OK(outputStream->Write(&numBuffers, sizeof(uint32_t)));
- // Advance Payload::Type, rows and numBuffers.
- auto readPos = startPos + sizeof(Payload::Type) + sizeof(uint32_t) +
sizeof(uint32_t);
- while (readPos - startPos < rawSize_) {
+ ARROW_ASSIGN_OR_RAISE(auto start, inputStream_->Tell());
+
+ auto pos = start;
+ auto rawBufferSize = rawSize_ - sizeof(numBuffers);
+
+ while (pos - start < rawBufferSize) {
ARROW_ASSIGN_OR_RAISE(auto uncompressed, readUncompressedBuffer());
- ARROW_ASSIGN_OR_RAISE(readPos, inputStream_->Tell());
+ ARROW_ASSIGN_OR_RAISE(pos, inputStream_->Tell());
RETURN_NOT_OK(compressAndFlush(std::move(uncompressed), outputStream,
codec_, pool_, compressTime_, writeTime_));
}
+
+ GLUTEN_CHECK(pos - start == rawBufferSize, "Not all data is read from input
stream.");
+
return arrow::Status::OK();
}
arrow::Result<std::shared_ptr<arrow::Buffer>>
UncompressedDiskBlockPayload::readUncompressedBuffer() {
ScopedTimer timer(&writeTime_);
- readPos_++;
+
int64_t bufferLength;
RETURN_NOT_OK(inputStream_->Read(sizeof(int64_t), &bufferLength));
if (bufferLength == kNullBuffer) {
diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h
index efc70b3fbe..2f481c5197 100644
--- a/cpp/core/shuffle/Payload.h
+++ b/cpp/core/shuffle/Payload.h
@@ -54,10 +54,6 @@ class Payload {
return numRows_;
}
- uint32_t numBuffers() {
- return isValidityBuffer_ ? isValidityBuffer_->size() : 1;
- }
-
const std::vector<bool>* isValidityBuffer() const {
return isValidityBuffer_;
}
@@ -82,8 +78,7 @@ class BlockPayload final : public Payload {
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
const std::vector<bool>* isValidityBuffer,
arrow::MemoryPool* pool,
- arrow::util::Codec* codec,
- std::shared_ptr<arrow::Buffer> compressed);
+ arrow::util::Codec* codec);
static arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>>
deserialize(
arrow::io::InputStream* inputStream,
@@ -103,21 +98,19 @@ class BlockPayload final : public Payload {
int64_t rawSize() override;
- protected:
+ private:
BlockPayload(
Type type,
uint32_t numRows,
+ uint32_t numBuffers,
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
- const std::vector<bool>* isValidityBuffer,
- arrow::MemoryPool* pool,
- arrow::util::Codec* codec)
- : Payload(type, numRows, isValidityBuffer),
buffers_(std::move(buffers)), pool_(pool), codec_(codec) {}
+ const std::vector<bool>* isValidityBuffer)
+ : Payload(type, numRows, isValidityBuffer), numBuffers_(numBuffers),
buffers_(std::move(buffers)) {}
void setCompressionTime(int64_t compressionTime);
+ uint32_t numBuffers_;
std::vector<std::shared_ptr<arrow::Buffer>> buffers_;
- arrow::MemoryPool* pool_;
- arrow::util::Codec* codec_;
};
class InMemoryPayload final : public Payload {
@@ -125,9 +118,11 @@ class InMemoryPayload final : public Payload {
InMemoryPayload(
uint32_t numRows,
const std::vector<bool>* isValidityBuffer,
+ const std::shared_ptr<arrow::Schema>& schema,
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
bool hasComplexType = false)
: Payload(Type::kUncompressed, numRows, isValidityBuffer),
+ schema_(schema),
buffers_(std::move(buffers)),
hasComplexType_(hasComplexType) {}
@@ -138,19 +133,23 @@ class InMemoryPayload final : public Payload {
arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t index);
- arrow::Result<std::unique_ptr<BlockPayload>> toBlockPayload(
- Payload::Type payloadType,
- arrow::MemoryPool* pool,
- arrow::util::Codec* codec,
- std::shared_ptr<arrow::Buffer> compressed = nullptr);
+ arrow::Result<std::unique_ptr<BlockPayload>>
+ toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool,
arrow::util::Codec* codec);
arrow::Status copyBuffers(arrow::MemoryPool* pool);
int64_t rawSize() override;
+ uint32_t numBuffers() const;
+
+ int64_t rawCapacity() const;
+
bool mergeable() const;
+ std::shared_ptr<arrow::Schema> schema() const;
+
private:
+ std::shared_ptr<arrow::Schema> schema_;
std::vector<std::shared_ptr<arrow::Buffer>> buffers_;
bool hasComplexType_;
};
@@ -175,7 +174,6 @@ class UncompressedDiskBlockPayload final : public Payload {
int64_t rawSize_;
arrow::MemoryPool* pool_;
arrow::util::Codec* codec_;
- uint32_t readPos_{0};
arrow::Result<std::shared_ptr<arrow::Buffer>> readUncompressedBuffer();
};
diff --git a/cpp/core/shuffle/Spill.cc b/cpp/core/shuffle/Spill.cc
index 1a88a0b11f..92434db3ec 100644
--- a/cpp/core/shuffle/Spill.cc
+++ b/cpp/core/shuffle/Spill.cc
@@ -48,7 +48,6 @@ void Spill::insertPayload(
int64_t rawSize,
arrow::MemoryPool* pool,
arrow::util::Codec* codec) {
- // TODO: Add compression threshold.
switch (payloadType) {
case Payload::Type::kUncompressed:
case Payload::Type::kToBeCompressed:
diff --git a/cpp/core/shuffle/Utils.cc b/cpp/core/shuffle/Utils.cc
index 457702c0c9..4e4dcb5d7e 100644
--- a/cpp/core/shuffle/Utils.cc
+++ b/cpp/core/shuffle/Utils.cc
@@ -420,6 +420,13 @@ int64_t gluten::getBufferSize(const
std::vector<std::shared_ptr<arrow::Buffer>>&
});
}
+int64_t gluten::getBufferCapacity(const
std::vector<std::shared_ptr<arrow::Buffer>>& buffers) {
+ return std::accumulate(
+ std::cbegin(buffers), std::cend(buffers), 0LL, [](int64_t sum, const
std::shared_ptr<arrow::Buffer>& buf) {
+ return buf == nullptr ? sum : sum + buf->capacity();
+ });
+}
+
int64_t gluten::getMaxCompressedBufferSize(
const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
arrow::util::Codec* codec) {
diff --git a/cpp/core/shuffle/Utils.h b/cpp/core/shuffle/Utils.h
index fafb1fbfd5..b08e319c50 100644
--- a/cpp/core/shuffle/Utils.h
+++ b/cpp/core/shuffle/Utils.h
@@ -32,13 +32,28 @@
namespace gluten {
-using BinaryArrayLengthBufferType = uint32_t;
+using StringLengthType = uint32_t;
using IpcOffsetBufferType = arrow::LargeStringType::offset_type;
-static const size_t kSizeOfBinaryArrayLengthBuffer =
sizeof(BinaryArrayLengthBufferType);
+static const size_t kSizeOfStringLength = sizeof(StringLengthType);
static const size_t kSizeOfIpcOffsetBuffer = sizeof(IpcOffsetBufferType);
static const std::string kGlutenSparkLocalDirs = "GLUTEN_SPARK_LOCAL_DIRS";
+class PartitionScopeGuard {
+ public:
+ PartitionScopeGuard(std::optional<uint32_t>& partitionInUse, uint32_t
partitionId) : partitionInUse_(partitionInUse) {
+ GLUTEN_DCHECK(!partitionInUse_.has_value(), "Partition id is already
set.");
+ partitionInUse_ = partitionId;
+ }
+
+ ~PartitionScopeGuard() {
+ partitionInUse_ = std::nullopt;
+ }
+
+ private:
+ std::optional<uint32_t>& partitionInUse_;
+};
+
std::string getShuffleSpillDir(const std::string& configuredDir, int32_t
subDirId);
arrow::Result<std::string> createTempShuffleFile(const std::string& dir);
@@ -50,6 +65,8 @@ int64_t getBufferSize(const std::shared_ptr<arrow::Array>&
array);
int64_t getBufferSize(const std::vector<std::shared_ptr<arrow::Buffer>>&
buffers);
+int64_t getBufferCapacity(const std::vector<std::shared_ptr<arrow::Buffer>>&
buffers);
+
int64_t getMaxCompressedBufferSize(
const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
arrow::util::Codec* codec);
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 1161d06fc5..3e8880e3b8 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -115,8 +115,7 @@ arrow::Status RssPartitionWriter::doEvict(uint32_t
partitionId, std::unique_ptr<
rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
auto payloadType = codec_ ? Payload::Type::kCompressed :
Payload::Type::kUncompressed;
ARROW_ASSIGN_OR_RAISE(
- auto payload,
- inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_
? codec_.get() : nullptr, nullptr));
+ auto payload, inMemoryPayload->toBlockPayload(payloadType,
payloadPool_.get(), codec_ ? codec_.get() : nullptr));
// Copy payload to arrow buffered os.
ARROW_ASSIGN_OR_RAISE(auto rssBufferOs,
arrow::io::BufferOutputStream::Create(options_.pushBufferMaxSize));
RETURN_NOT_OK(payload->serialize(rssBufferOs.get()));
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h
b/cpp/core/shuffle/rss/RssPartitionWriter.h
index 040c7546b9..6ad896425d 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.h
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.h
@@ -33,9 +33,9 @@ class RssPartitionWriter final : public PartitionWriter {
RssPartitionWriter(
uint32_t numPartitions,
PartitionWriterOptions options,
- arrow::MemoryPool* pool,
+ MemoryManager* memoryManager,
std::shared_ptr<RssClient> rssClient)
- : PartitionWriter(numPartitions, std::move(options), pool),
rssClient_(rssClient) {
+ : PartitionWriter(numPartitions, std::move(options), memoryManager),
rssClient_(rssClient) {
init();
}
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc
b/cpp/velox/benchmarks/GenericBenchmark.cc
index a54a6292d4..b0ec44802f 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -61,7 +61,7 @@ DEFINE_bool(rss, false, "Mocking rss.");
DEFINE_string(
compression,
"lz4",
- "Specify the compression codec. Valid options are lz4, zstd, qat_gzip,
qat_zstd, iaa_gzip");
+ "Specify the compression codec. Valid options are none, lz4, zstd,
qat_gzip, qat_zstd, iaa_gzip");
DEFINE_int32(shuffle_partitions, 200, "Number of shuffle split (reducer)
partitions");
DEFINE_string(plan, "", "Path to input json file of the substrait plan.");
@@ -165,15 +165,13 @@ void cleanupLocalDirs(const std::vector<std::string>&
localDirs) {
PartitionWriterOptions createPartitionWriterOptions() {
PartitionWriterOptions partitionWriterOptions{};
- // Disable writer's merge.
- partitionWriterOptions.mergeThreshold = 0;
// Configure compression.
- if (FLAGS_compression == "lz4") {
- partitionWriterOptions.codecBackend = CodecBackend::NONE;
+ if (FLAGS_compression == "none") {
+ partitionWriterOptions.compressionType = arrow::Compression::UNCOMPRESSED;
+ } else if (FLAGS_compression == "lz4") {
partitionWriterOptions.compressionType = arrow::Compression::LZ4_FRAME;
} else if (FLAGS_compression == "zstd") {
- partitionWriterOptions.codecBackend = CodecBackend::NONE;
partitionWriterOptions.compressionType = arrow::Compression::ZSTD;
} else if (FLAGS_compression == "qat_gzip") {
partitionWriterOptions.codecBackend = CodecBackend::QAT;
@@ -197,17 +195,10 @@ std::unique_ptr<PartitionWriter> createPartitionWriter(
if (FLAGS_rss) {
auto rssClient = std::make_unique<LocalRssClient>(dataFile);
partitionWriter = std::make_unique<RssPartitionWriter>(
- FLAGS_shuffle_partitions,
- std::move(options),
- runtime->memoryManager()->getArrowMemoryPool(),
- std::move(rssClient));
+ FLAGS_shuffle_partitions, std::move(options),
runtime->memoryManager(), std::move(rssClient));
} else {
partitionWriter = std::make_unique<LocalPartitionWriter>(
- FLAGS_shuffle_partitions,
- std::move(options),
- runtime->memoryManager()->getArrowMemoryPool(),
- dataFile,
- localDirs);
+ FLAGS_shuffle_partitions, std::move(options),
runtime->memoryManager(), dataFile, localDirs);
}
return partitionWriter;
}
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
index ae2127a135..cdc0a7653c 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
@@ -15,11 +15,9 @@
* limitations under the License.
*/
-#include "VeloxHashShuffleWriter.h"
+#include "shuffle/VeloxHashShuffleWriter.h"
#include "memory/ArrowMemory.h"
#include "memory/VeloxColumnarBatch.h"
-#include "memory/VeloxMemoryManager.h"
-#include "shuffle/ShuffleSchema.h"
#include "shuffle/Utils.h"
#include "utils/Common.h"
#include "utils/Macros.h"
@@ -108,9 +106,9 @@ arrow::Status collectFlatVectorBufferStringView(
auto rawValues = flatVector->rawValues();
// last offset is the totalStringSize
- auto lengthBufferSize = sizeof(gluten::BinaryArrayLengthBufferType) *
flatVector->size();
+ auto lengthBufferSize = sizeof(gluten::StringLengthType) *
flatVector->size();
ARROW_ASSIGN_OR_RAISE(auto lengthBuffer,
arrow::AllocateResizableBuffer(lengthBufferSize, pool));
- auto* rawLength =
reinterpret_cast<gluten::BinaryArrayLengthBufferType*>(lengthBuffer->mutable_data());
+ auto* rawLength =
reinterpret_cast<gluten::StringLengthType*>(lengthBuffer->mutable_data());
uint64_t offset = 0;
for (int32_t i = 0; i < flatVector->size(); i++) {
auto length = rawValues[i].size();
@@ -337,7 +335,7 @@ arrow::Status VeloxHashShuffleWriter::stop() {
setSplitState(SplitState::kStopEvict);
if (options_.partitioning != Partitioning::kSingle) {
for (auto pid = 0; pid < numPartitions_; ++pid) {
- PartitionBufferGuard guard(partitionBufferInUse_, pid);
+ PartitionScopeGuard stopEvictGuard(partitionBufferInUse_, pid);
RETURN_NOT_OK(evictPartitionBuffers(pid, false));
}
}
@@ -630,7 +628,7 @@ arrow::Status VeloxHashShuffleWriter::splitBinaryType(
auto& binaryBuf = dst[pid];
// use 32bit offset
- auto dstLengthBase = (BinaryArrayLengthBufferType*)(binaryBuf.lengthPtr) +
partitionBufferBase_[pid];
+ auto dstLengthBase = (StringLengthType*)(binaryBuf.lengthPtr) +
partitionBufferBase_[pid];
auto valueOffset = binaryBuf.valueOffset;
auto dstValuePtr = binaryBuf.valuePtr + valueOffset;
@@ -668,7 +666,7 @@ arrow::Status VeloxHashShuffleWriter::splitBinaryType(
binaryBuf.valueCapacity = capacity;
dstValuePtr = binaryBuf.valuePtr + valueOffset - stringLen;
// Need to update dstLengthBase because lengthPtr can be updated if
Reserve triggers spill.
- dstLengthBase = (BinaryArrayLengthBufferType*)(binaryBuf.lengthPtr) +
partitionBufferBase_[pid];
+ dstLengthBase = (StringLengthType*)(binaryBuf.lengthPtr) +
partitionBufferBase_[pid];
}
// 2. copy value
@@ -824,7 +822,7 @@ void VeloxHashShuffleWriter::calculateSimpleColumnBytes() {
// `bool(1) >> 3` gets 0, so +7
fixedWidthBufferBytes_ +=
((arrow::bit_width(arrowColumnTypes_[colIdx]->id()) + 7) >> 3);
}
- fixedWidthBufferBytes_ += kSizeOfBinaryArrayLengthBuffer *
binaryColumnIndices_.size();
+ fixedWidthBufferBytes_ += kSizeOfStringLength * binaryColumnIndices_.size();
}
uint32_t VeloxHashShuffleWriter::calculatePartitionBufferSize(const
facebook::velox::RowVector& rv, int64_t memLimit) {
@@ -861,7 +859,10 @@ uint32_t
VeloxHashShuffleWriter::calculatePartitionBufferSize(const facebook::ve
VS_PRINTLF(bytesPerRow);
+ // Remove the cache memory size since it can be spilled.
+ // The logic here is to keep the split buffer as large as possible, to get
max batch size for reducer.
memLimit += cachedPayloadSize();
+
// make sure split buffer uses 128M memory at least, let's hardcode it here
for now
if (memLimit < kMinMemLimit) {
memLimit = kMinMemLimit;
@@ -925,7 +926,7 @@ arrow::Status
VeloxHashShuffleWriter::allocatePartitionBuffer(uint32_t partition
auto binaryIdx = i - fixedWidthColumnCount_;
std::shared_ptr<arrow::ResizableBuffer> lengthBuffer{};
- auto lengthBufferSize = newSize * kSizeOfBinaryArrayLengthBuffer;
+ auto lengthBufferSize = newSize * kSizeOfStringLength;
ARROW_ASSIGN_OR_RAISE(
lengthBuffer, arrow::AllocateResizableBuffer(lengthBufferSize,
partitionBufferPool_.get()));
@@ -962,7 +963,8 @@ arrow::Status VeloxHashShuffleWriter::evictBuffers(
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
bool reuseBuffers) {
if (!buffers.empty()) {
- auto payload = std::make_unique<InMemoryPayload>(numRows,
&isValidityBuffer_, std::move(buffers), hasComplexType_);
+ auto payload =
+ std::make_unique<InMemoryPayload>(numRows, &isValidityBuffer_,
schema_, std::move(buffers), hasComplexType_);
RETURN_NOT_OK(partitionWriter_->hashEvict(partitionId, std::move(payload),
Evict::kCache, reuseBuffers));
}
return arrow::Status::OK();
@@ -1011,7 +1013,7 @@
arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> VeloxHashShuffleWrite
allBuffers.push_back(nullptr);
}
// Length buffer.
- auto lengthBufferSize = numRows * kSizeOfBinaryArrayLengthBuffer;
+ auto lengthBufferSize = numRows * kSizeOfStringLength;
ARROW_RETURN_IF(
!buffers[kBinaryLengthBufferIndex], arrow::Status::Invalid("Offset
buffer of binary array is null."));
if (reuseBuffers) {
@@ -1194,7 +1196,7 @@ arrow::Status
VeloxHashShuffleWriter::resizePartitionBuffer(uint32_t partitionId
auto& binaryBuf = partitionBinaryAddrs_[binaryIdx][partitionId];
auto& lengthBuffer = buffers[kBinaryLengthBufferIndex];
ARROW_RETURN_IF(!lengthBuffer, arrow::Status::Invalid("Offset buffer
of binary array is null."));
- RETURN_NOT_OK(lengthBuffer->Resize(newSize *
kSizeOfBinaryArrayLengthBuffer));
+ RETURN_NOT_OK(lengthBuffer->Resize(newSize * kSizeOfStringLength));
// Skip Resize value buffer if the spill is triggered by resizing this
split binary buffer.
// Only update length buffer ptr.
@@ -1372,8 +1374,8 @@ arrow::Result<int64_t>
VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6
for (auto& item : pidToSize) {
auto pid = item.first;
ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false));
- auto payload =
- std::make_unique<InMemoryPayload>(item.second, &isValidityBuffer_,
std::move(buffers), hasComplexType_);
+ auto payload = std::make_unique<InMemoryPayload>(
+ item.second, &isValidityBuffer_, schema_, std::move(buffers),
hasComplexType_);
metrics_.totalBytesToEvict += payload->rawSize();
RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload),
Evict::kSpill, false));
evicted = beforeEvict - partitionBufferPool_->bytes_allocated();
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.h
b/cpp/velox/shuffle/VeloxHashShuffleWriter.h
index 4ee12a1550..5f45c9065c 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.h
@@ -303,21 +303,6 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv,
int64_t memLimit);
- class PartitionBufferGuard {
- public:
- PartitionBufferGuard(std::optional<uint32_t>& partitionInUse, uint32_t
partitionId)
- : partitionBufferInUse_(partitionInUse) {
- partitionBufferInUse_ = partitionId;
- }
-
- ~PartitionBufferGuard() {
- partitionBufferInUse_ = std::nullopt;
- }
-
- private:
- std::optional<uint32_t>& partitionBufferInUse_;
- };
-
std::shared_ptr<arrow::Schema> schema_;
// Column index, partition id, buffers.
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
index e3075872e5..66d02aa68e 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
@@ -122,8 +122,7 @@ arrow::Status
VeloxRssSortShuffleWriter::evictBatch(uint32_t partitionId) {
auto buffer = bufferOutputStream_->getBuffer();
auto arrowBuffer = std::make_shared<arrow::Buffer>(buffer->as<uint8_t>(),
buffer->size());
ARROW_ASSIGN_OR_RAISE(
- auto payload,
- BlockPayload::fromBuffers(Payload::kRaw, 0, {std::move(arrowBuffer)},
nullptr, nullptr, nullptr, nullptr));
+ auto payload, BlockPayload::fromBuffers(Payload::kRaw, 0,
{std::move(arrowBuffer)}, nullptr, nullptr, nullptr));
RETURN_NOT_OK(partitionWriter_->evict(partitionId, std::move(payload),
stopped_));
batch_ =
std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(),
serde_.get());
batch_->createStreamTree(rowType_, options_.bufferSize, &serdeOptions_);
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index dc0af91dc0..ae956aa49b 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -19,7 +19,6 @@
#include <arrow/array/array_binary.h>
#include <arrow/io/buffered.h>
-#include <arrow/io/compressed.h>
#include <velox/common/caching/AsyncDataCache.h>
#include "memory/VeloxColumnarBatch.h"
@@ -27,7 +26,6 @@
#include "shuffle/Payload.h"
#include "shuffle/Utils.h"
#include "utils/Common.h"
-#include "utils/Compression.h"
#include "utils/Macros.h"
#include "utils/Timer.h"
#include "utils/VeloxArrowUtils.h"
@@ -135,7 +133,7 @@ VectorPtr readFlatVectorStringView(
auto nulls = buffers[bufferIdx++];
auto lengthBuffer = buffers[bufferIdx++];
auto valueBuffer = buffers[bufferIdx++];
- const auto* rawLength = lengthBuffer->as<BinaryArrayLengthBufferType>();
+ const auto* rawLength = lengthBuffer->as<StringLengthType>();
std::vector<BufferPtr> stringBuffers;
auto values = AlignedBuffer::allocate<char>(sizeof(StringView) * length,
pool);
@@ -341,16 +339,17 @@ std::shared_ptr<ColumnarBatch>
VeloxHashShuffleReaderDeserializer::next() {
break;
}
if (!merged_) {
- merged_ = std::make_unique<InMemoryPayload>(numRows, isValidityBuffer_,
std::move(arrowBuffers));
+ merged_ = std::make_unique<InMemoryPayload>(numRows, isValidityBuffer_,
schema_, std::move(arrowBuffers));
arrowBuffers.clear();
continue;
}
+
auto mergedRows = merged_->numRows() + numRows;
if (mergedRows > batchSize_) {
break;
}
- auto append = std::make_unique<InMemoryPayload>(numRows,
isValidityBuffer_, std::move(arrowBuffers));
+ auto append = std::make_unique<InMemoryPayload>(numRows,
isValidityBuffer_, schema_, std::move(arrowBuffers));
GLUTEN_ASSIGN_OR_THROW(merged_, InMemoryPayload::merge(std::move(merged_),
std::move(append), memoryPool_));
arrowBuffers.clear();
}
@@ -364,8 +363,9 @@ std::shared_ptr<ColumnarBatch>
VeloxHashShuffleReaderDeserializer::next() {
// Save remaining rows.
if (!arrowBuffers.empty()) {
- merged_ = std::make_unique<InMemoryPayload>(numRows, isValidityBuffer_,
std::move(arrowBuffers));
+ merged_ = std::make_unique<InMemoryPayload>(numRows, isValidityBuffer_,
schema_, std::move(arrowBuffers));
}
+
return columnarBatch;
}
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h
b/cpp/velox/shuffle/VeloxShuffleReader.h
index 71f31618cf..ea9e78e181 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -130,8 +130,6 @@ class VeloxRssSortShuffleReaderDeserializer : public
ColumnarBatchIterator {
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
facebook::velox::RowTypePtr rowType_;
std::vector<facebook::velox::RowVectorPtr> batches_;
- bool reachEos_{false};
- int32_t rowCount_;
int32_t batchSize_;
facebook::velox::common::CompressionKind veloxCompressionType_;
facebook::velox::VectorSerde* const serde_;
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index 6c78752664..3bf39683a7 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -325,7 +325,10 @@ arrow::Status
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
arrow::Status VeloxSortShuffleWriter::evictPartitionInternal(uint32_t
partitionId, uint8_t* buffer, int64_t rawLength) {
VELOX_CHECK(rawLength > 0);
auto payload = std::make_unique<InMemoryPayload>(
- 0, nullptr,
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(buffer,
rawLength)});
+ 0,
+ nullptr,
+ nullptr,
+
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(buffer,
rawLength)});
updateSpillMetrics(payload);
RETURN_NOT_OK(partitionWriter_->sortEvict(partitionId, std::move(payload),
stopped_));
return arrow::Status::OK();
diff --git a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
index bdbdb604ca..156196fc2f 100644
--- a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
@@ -54,12 +54,12 @@ class VeloxHashShuffleWriterSpillTest : public
VeloxShuffleWriterTestBase, publi
auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
auto partitionWriter = createPartitionWriter(
- PartitionWriterType::kLocal, numPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
+ PartitionWriterType::kLocal, numPartitions, dataFile_, localDirs_,
partitionWriterOptions_);
GLUTEN_ASSIGN_OR_THROW(
auto shuffleWriter,
VeloxHashShuffleWriter::create(
- numPartitions, std::move(partitionWriter),
std::move(shuffleWriterOptions_), veloxPool, arrowPool));
+ numPartitions, std::move(partitionWriter), shuffleWriterOptions_,
veloxPool, arrowPool));
return shuffleWriter;
}
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index 405e702cd3..53ef82a021 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -55,21 +55,36 @@ struct ShuffleTestParams {
}
};
-RowVectorPtr takeRows(const std::vector<RowVectorPtr>& sources, const
std::vector<std::vector<int32_t>>& indices) {
- auto copy = RowVector::createEmpty(sources[0]->type(), sources[0]->pool());
+std::vector<RowVectorPtr> takeRows(
+ const std::vector<RowVectorPtr>& sources,
+ const std::vector<std::vector<int32_t>>& indices) {
+ std::vector<RowVectorPtr> result;
+
for (size_t i = 0; i < sources.size(); ++i) {
if (indices[i].empty()) {
- copy->append(sources[i].get());
+ result.push_back(sources[i]);
continue;
}
- for (int32_t idx : indices[i]) {
- if (idx >= sources[i]->size()) {
- throw GlutenException("Index out of bound: " + std::to_string(idx));
- }
- copy->append(sources[i]->slice(idx, 1).get());
+
+ auto copy = RowVector::createEmpty(sources[0]->type(), sources[0]->pool());
+ for (const auto row : indices[i]) {
+ GLUTEN_CHECK(row < sources[i]->size(), fmt::format("Index out of bound:
{}", row));
+ copy->append(sources[i]->slice(row, 1).get());
}
+ result.push_back(std::move(copy));
}
- return copy;
+
+ return result;
+}
+
+RowVectorPtr mergeRowVectors(const std::vector<RowVectorPtr>& sources) {
+ RowVectorPtr result = RowVector::createEmpty(sources[0]->type(),
sources[0]->pool());
+
+ for (const auto& source : sources) {
+ result->append(source.get());
+ }
+
+ return result;
}
std::vector<ShuffleTestParams> getTestParams() {
@@ -88,7 +103,7 @@ std::vector<ShuffleTestParams> getTestParams() {
// Sort-based shuffle.
for (const auto partitionWriterType : {PartitionWriterType::kLocal,
PartitionWriterType::kRss}) {
for (const auto diskWriteBufferSize : {4, 56, 32 * 1024}) {
- for (const auto useRadixSort : {true, false}) {
+ for (const bool useRadixSort : {true, false}) {
params.push_back(ShuffleTestParams{
.shuffleWriterType = ShuffleWriterType::kSortShuffle,
.partitionWriterType = partitionWriterType,
@@ -172,8 +187,8 @@ class VeloxShuffleWriterTest : public
::testing::TestWithParam<ShuffleTestParams
ASSERT_EQ(*arrow::internal::FileExists(*arrow::internal::PlatformFilename::FromString(fileName)),
true);
}
- std::shared_ptr<arrow::Schema> getArrowSchema(facebook::velox::RowVectorPtr&
rowVector) {
- return toArrowSchema(rowVector->type(), pool());
+ std::shared_ptr<arrow::Schema> getArrowSchema(const
facebook::velox::RowVectorPtr& rowVector) const {
+ return toArrowSchema(rowVector->type(),
getDefaultMemoryManager()->getLeafMemoryPool().get());
}
void setReadableFile(const std::string& fileName) {
@@ -185,11 +200,11 @@ class VeloxShuffleWriterTest : public
::testing::TestWithParam<ShuffleTestParams
void getRowVectors(
arrow::Compression::type compressionType,
- std::shared_ptr<arrow::Schema> schema,
+ const RowTypePtr& rowType,
std::vector<facebook::velox::RowVectorPtr>& vectors,
std::shared_ptr<arrow::io::InputStream> in) {
- const auto rowType =
facebook::velox::asRowType(gluten::fromArrowSchema(schema));
const auto veloxCompressionType =
arrowCompressionTypeToVelox(compressionType);
+ const auto schema = toArrowSchema(rowType,
getDefaultMemoryManager()->getLeafMemoryPool().get());
auto codec = createArrowIpcCodec(compressionType, CodecBackend::NONE);
@@ -215,78 +230,19 @@ class VeloxShuffleWriterTest : public
::testing::TestWithParam<ShuffleTestParams
}
}
- inline static TestAllocationListener* listener_{nullptr};
-
- std::shared_ptr<arrow::io::ReadableFile> file_;
-};
-
-class SinglePartitioningShuffleWriter : public VeloxShuffleWriterTest {
- protected:
- void testShuffleWrite(VeloxShuffleWriter& shuffleWriter,
std::vector<facebook::velox::RowVectorPtr> inputs) {
- for (auto& vector : inputs) {
- ASSERT_NOT_OK(splitRowVector(shuffleWriter, vector));
- }
- ASSERT_NOT_OK(shuffleWriter.stop());
-
- // Verify data file exists.
- checkFileExists(dataFile_);
-
- // Verify number of output partitions.
- const auto& lengths = shuffleWriter.partitionLengths();
- ASSERT_EQ(lengths.size(), 1);
-
- const auto schema = getArrowSchema(inputs[0]);
-
- std::vector<facebook::velox::RowVectorPtr> deserializedVectors;
- setReadableFile(dataFile_);
- GLUTEN_ASSIGN_OR_THROW(auto in,
arrow::io::RandomAccessFile::GetStream(file_, 0, lengths[0]));
- getRowVectors(partitionWriterOptions_.compressionType, schema,
deserializedVectors, in);
-
- ASSERT_EQ(deserializedVectors.size(), inputs.size());
- for (int32_t i = 0; i < deserializedVectors.size(); i++) {
- facebook::velox::test::assertEqualVectors(inputs[i],
deserializedVectors[i]);
- }
- }
-
- std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t) override {
- auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
- auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
-
- shuffleWriterOptions_.partitioning = Partitioning::kSingle;
- shuffleWriterOptions_.bufferSize = 10;
-
- auto partitionWriter = createPartitionWriter(
- GetParam().partitionWriterType, 1, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
-
- GLUTEN_ASSIGN_OR_THROW(
- auto shuffleWriter,
- VeloxShuffleWriter::create(
- GetParam().shuffleWriterType,
- 1,
- std::move(partitionWriter),
- std::move(shuffleWriterOptions_),
- veloxPool,
- arrowPool));
-
- return shuffleWriter;
- }
-};
-
-class MultiplePartitioningShuffleWriter : public VeloxShuffleWriterTest {
- protected:
void shuffleWriteReadMultiBlocks(
VeloxShuffleWriter& shuffleWriter,
int32_t expectPartitionLength,
- facebook::velox::TypePtr dataType,
- std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors)
{ /* blockId = pid, rowVector in block */
+ const std::vector<std::vector<facebook::velox::RowVectorPtr>>&
expectedVectors) {
ASSERT_NOT_OK(shuffleWriter.stop());
- // verify data file
+
checkFileExists(dataFile_);
- // verify output temporary files
+
const auto& lengths = shuffleWriter.partitionLengths();
ASSERT_EQ(lengths.size(), expectPartitionLength);
+
int64_t lengthSum = std::accumulate(lengths.begin(), lengths.end(), 0);
- auto schema = toArrowSchema(dataType, pool());
+
setReadableFile(dataFile_);
ASSERT_EQ(*file_->GetSize(), lengthSum);
for (int32_t i = 0; i < expectPartitionLength; i++) {
@@ -296,32 +252,69 @@ class MultiplePartitioningShuffleWriter : public
VeloxShuffleWriterTest {
std::vector<facebook::velox::RowVectorPtr> deserializedVectors;
GLUTEN_ASSIGN_OR_THROW(
auto in, arrow::io::RandomAccessFile::GetStream(file_, i == 0 ? 0
: lengths[i - 1], lengths[i]));
- getRowVectors(partitionWriterOptions_.compressionType, schema,
deserializedVectors, in);
- ASSERT_EQ(expectedVectors[i].size(), deserializedVectors.size());
- for (int32_t j = 0; j < expectedVectors[i].size(); j++) {
- facebook::velox::test::assertEqualVectors(expectedVectors[i][j],
deserializedVectors[j]);
- }
+ getRowVectors(
+ partitionWriterOptions_.compressionType,
asRowType(expectedVectors[i][0]->type()), deserializedVectors, in);
+
+ const auto expectedVector = mergeRowVectors(expectedVectors[i]);
+ const auto deserializedVector = mergeRowVectors(deserializedVectors);
+ facebook::velox::test::assertEqualVectors(expectedVector,
deserializedVector);
}
}
}
- void testShuffleWriteMultiBlocks(
+ void testShuffleRoundTrip(
+ VeloxShuffleWriter& shuffleWriter,
+ std::vector<std::shared_ptr<ColumnarBatch>> batches,
+ int32_t expectPartitionLength,
+ std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors)
{
+ for (const auto& batch : batches) {
+ ASSERT_NOT_OK(shuffleWriter.write(batch, ShuffleWriter::kMinMemLimit));
+ }
+ shuffleWriteReadMultiBlocks(shuffleWriter, expectPartitionLength,
expectedVectors);
+ }
+
+ void testShuffleRoundTrip(
VeloxShuffleWriter& shuffleWriter,
- std::vector<facebook::velox::RowVectorPtr> vectors,
+ std::vector<RowVectorPtr> inputs,
int32_t expectPartitionLength,
- facebook::velox::TypePtr dataType,
std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors)
{
- for (auto& vector : vectors) {
- ASSERT_NOT_OK(splitRowVector(shuffleWriter, vector));
+ std::vector<std::shared_ptr<ColumnarBatch>> batches;
+ for (const auto& input : inputs) {
+ batches.emplace_back(std::make_shared<VeloxColumnarBatch>(input));
}
- shuffleWriteReadMultiBlocks(shuffleWriter, expectPartitionLength,
dataType, expectedVectors);
+ testShuffleRoundTrip(shuffleWriter, batches, expectPartitionLength,
expectedVectors);
}
+
+ inline static TestAllocationListener* listener_{nullptr};
+
+ std::shared_ptr<arrow::io::ReadableFile> file_;
};
-class HashPartitioningShuffleWriter : public MultiplePartitioningShuffleWriter
{
+class SinglePartitioningShuffleWriter : public VeloxShuffleWriterTest {
+ protected:
+ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t) override {
+ auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
+ auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
+
+ shuffleWriterOptions_.partitioning = Partitioning::kSingle;
+ shuffleWriterOptions_.bufferSize = 10;
+
+ auto partitionWriter =
+ createPartitionWriter(GetParam().partitionWriterType, 1, dataFile_,
localDirs_, partitionWriterOptions_);
+
+ GLUTEN_ASSIGN_OR_THROW(
+ auto shuffleWriter,
+ VeloxShuffleWriter::create(
+ GetParam().shuffleWriterType, 1, std::move(partitionWriter),
shuffleWriterOptions_, veloxPool, arrowPool));
+
+ return shuffleWriter;
+ }
+};
+
+class HashPartitioningShuffleWriter : public VeloxShuffleWriterTest {
protected:
void SetUp() override {
- MultiplePartitioningShuffleWriter::SetUp();
+ VeloxShuffleWriterTest::SetUp();
children1_.insert((children1_.begin()), makeFlatVector<int32_t>({1, 2, 2,
2, 2, 1, 1, 1, 2, 1}));
hashInputVector1_ = makeRowVector(children1_);
@@ -337,7 +330,7 @@ class HashPartitioningShuffleWriter : public
MultiplePartitioningShuffleWriter {
shuffleWriterOptions_.bufferSize = 4;
auto partitionWriter = createPartitionWriter(
- GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
+ GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_,
partitionWriterOptions_);
GLUTEN_ASSIGN_OR_THROW(
auto shuffleWriter,
@@ -345,7 +338,7 @@ class HashPartitioningShuffleWriter : public
MultiplePartitioningShuffleWriter {
GetParam().shuffleWriterType,
numPartitions,
std::move(partitionWriter),
- std::move(shuffleWriterOptions_),
+ shuffleWriterOptions_,
veloxPool,
arrowPool));
@@ -358,10 +351,10 @@ class HashPartitioningShuffleWriter : public
MultiplePartitioningShuffleWriter {
facebook::velox::RowVectorPtr hashInputVector2_;
};
-class RangePartitioningShuffleWriter : public
MultiplePartitioningShuffleWriter {
+class RangePartitioningShuffleWriter : public VeloxShuffleWriterTest {
protected:
void SetUp() override {
- MultiplePartitioningShuffleWriter::SetUp();
+ VeloxShuffleWriterTest::SetUp();
auto pid1 = makeRowVector({makeFlatVector<int32_t>({0, 1, 0, 1, 0, 1, 0,
1, 0, 1})});
auto rangeVector1 = makeRowVector(inputVector1_->children());
@@ -382,7 +375,7 @@ class RangePartitioningShuffleWriter : public
MultiplePartitioningShuffleWriter
shuffleWriterOptions_.bufferSize = 4;
auto partitionWriter = createPartitionWriter(
- GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
+ GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_,
partitionWriterOptions_);
GLUTEN_ASSIGN_OR_THROW(
auto shuffleWriter,
@@ -390,40 +383,28 @@ class RangePartitioningShuffleWriter : public
MultiplePartitioningShuffleWriter
GetParam().shuffleWriterType,
numPartitions,
std::move(partitionWriter),
- std::move(shuffleWriterOptions_),
+ shuffleWriterOptions_,
veloxPool,
arrowPool));
return shuffleWriter;
}
- void testShuffleWriteMultiBlocks(
- VeloxShuffleWriter& shuffleWriter,
- std::vector<std::shared_ptr<ColumnarBatch>> batches,
- int32_t expectPartitionLength,
- facebook::velox::TypePtr dataType,
- std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors)
{ /* blockId = pid, rowVector in block */
- for (auto& batch : batches) {
- ASSERT_NOT_OK(shuffleWriter.write(batch, ShuffleWriter::kMinMemLimit));
- }
- shuffleWriteReadMultiBlocks(shuffleWriter, expectPartitionLength,
dataType, expectedVectors);
- }
-
std::shared_ptr<ColumnarBatch> compositeBatch1_;
std::shared_ptr<ColumnarBatch> compositeBatch2_;
};
-class RoundRobinPartitioningShuffleWriter : public
MultiplePartitioningShuffleWriter {
+class RoundRobinPartitioningShuffleWriter : public VeloxShuffleWriterTest {
protected:
std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t
numPartitions) override {
auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
shuffleWriterOptions_.partitioning = Partitioning::kRoundRobin;
- shuffleWriterOptions_.bufferSize = 4;
+ shuffleWriterOptions_.bufferSize = 4096;
auto partitionWriter = createPartitionWriter(
- GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
+ GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_,
partitionWriterOptions_);
GLUTEN_ASSIGN_OR_THROW(
auto shuffleWriter,
@@ -431,7 +412,7 @@ class RoundRobinPartitioningShuffleWriter : public
MultiplePartitioningShuffleWr
GetParam().shuffleWriterType,
numPartitions,
std::move(partitionWriter),
- std::move(shuffleWriterOptions_),
+ shuffleWriterOptions_,
veloxPool,
arrowPool));
@@ -443,32 +424,34 @@ TEST_P(SinglePartitioningShuffleWriter, single) {
if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
return;
}
+
+ ASSERT_NOT_OK(initShuffleWriterOptions());
+
// Split 1 RowVector.
{
- ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(1);
- testShuffleWrite(*shuffleWriter, {inputVector1_});
+ testShuffleRoundTrip(*shuffleWriter, {inputVector1_}, 1,
{{inputVector1_}});
}
// Split > 1 RowVector.
{
- ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(1);
- auto resultBlock = takeRows({inputVector1_, inputVector2_, inputVector1_},
{{}, {}, {}});
- testShuffleWrite(*shuffleWriter, {resultBlock});
+ testShuffleRoundTrip(
+ *shuffleWriter,
+ {inputVector1_, inputVector2_, inputVector1_},
+ 1,
+ {{inputVector1_, inputVector2_, inputVector1_}});
}
// Split null RowVector.
{
- ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(1);
auto vector = makeRowVector({
makeNullableFlatVector<int32_t>({std::nullopt}),
makeNullableFlatVector<StringView>({std::nullopt}),
});
- testShuffleWrite(*shuffleWriter, {vector});
+ testShuffleRoundTrip(*shuffleWriter, {vector}, 1, {{vector}});
}
// Other types.
{
- ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(1);
auto vector = makeRowVector({
makeNullableFlatVector<int32_t>({std::nullopt, 1}),
@@ -489,7 +472,7 @@ TEST_P(SinglePartitioningShuffleWriter, single) {
}),
makeMapVector<int32_t, StringView>({{{1, "str1000"}, {2, "str2000"}},
{{3, "str3000"}, {4, "str4000"}}}),
});
- testShuffleWrite(*shuffleWriter, {vector});
+ testShuffleRoundTrip(*shuffleWriter, {vector}, 1, {{vector}});
}
}
@@ -540,7 +523,7 @@ TEST_P(HashPartitioningShuffleWriter, hashPart1Vector) {
makeNullableFlatVector<Timestamp>({std::nullopt, Timestamp(0, 0)}),
});
- testShuffleWriteMultiBlocks(*shuffleWriter, {vector}, 2, dataType,
{{firstBlock}, {secondBlock}});
+ testShuffleRoundTrip(*shuffleWriter, {vector}, 2, {{firstBlock},
{secondBlock}});
}
TEST_P(HashPartitioningShuffleWriter, hashPart1VectorComplexType) {
@@ -552,7 +535,7 @@ TEST_P(HashPartitioningShuffleWriter,
hashPart1VectorComplexType) {
auto firstBlock = takeRows({inputVectorComplex_}, {{1}});
auto secondBlock = takeRows({inputVectorComplex_}, {{0}});
- testShuffleWriteMultiBlocks(*shuffleWriter, {vector}, 2,
inputVectorComplex_->type(), {{firstBlock}, {secondBlock}});
+ testShuffleRoundTrip(*shuffleWriter, {vector}, 2, {firstBlock, secondBlock});
}
TEST_P(HashPartitioningShuffleWriter, hashPart3Vectors) {
@@ -560,32 +543,28 @@ TEST_P(HashPartitioningShuffleWriter, hashPart3Vectors) {
auto shuffleWriter = createShuffleWriter(2);
auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_},
{{1, 2, 3, 4, 8}, {0, 1}, {1, 2, 3, 4, 8}});
- auto blockPid1 = takeRows({inputVector1_}, {{0, 5, 6, 7, 9, 0, 5, 6, 7, 9}});
-
- testShuffleWriteMultiBlocks(
- *shuffleWriter,
- {hashInputVector1_, hashInputVector2_, hashInputVector1_},
- 2,
- inputVector1_->type(),
- {{blockPid2}, {blockPid1}});
+ auto blockPid1 = takeRows({inputVector1_, inputVector1_}, {{0, 5, 6, 7, 9},
{0, 5, 6, 7, 9}});
+
+ testShuffleRoundTrip(
+ *shuffleWriter, {hashInputVector1_, hashInputVector2_,
hashInputVector1_}, 2, {blockPid2, blockPid1});
}
TEST_P(HashPartitioningShuffleWriter, hashLargeVectors) {
const int32_t expectedMaxBatchSize = 8;
ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(2);
+
// calculate maxBatchSize_
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, hashInputVector1_));
if (GetParam().shuffleWriterType == ShuffleWriterType::kHashShuffle) {
VELOX_CHECK_EQ(shuffleWriter->maxBatchSize(), expectedMaxBatchSize);
}
- auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_},
{{1, 2, 3, 4, 8}, {0, 1}, {1, 2, 3, 4, 8}});
- auto blockPid1 = takeRows({inputVector1_}, {{0, 5, 6, 7, 9, 0, 5, 6, 7, 9}});
+ auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_},
{{1, 2, 3, 4, 8}, {}, {1, 2, 3, 4, 8}});
+ auto blockPid1 = takeRows({inputVector1_, inputVector1_}, {{0, 5, 6, 7, 9},
{0, 5, 6, 7, 9}});
VELOX_CHECK(hashInputVector1_->size() > expectedMaxBatchSize);
- testShuffleWriteMultiBlocks(
- *shuffleWriter, {hashInputVector2_, hashInputVector1_}, 2,
inputVector1_->type(), {{blockPid2}, {blockPid1}});
+ testShuffleRoundTrip(*shuffleWriter, {hashInputVector2_, hashInputVector1_},
2, {blockPid2, blockPid1});
}
TEST_P(RangePartitioningShuffleWriter, range) {
@@ -595,12 +574,8 @@ TEST_P(RangePartitioningShuffleWriter, range) {
auto blockPid1 = takeRows({inputVector1_, inputVector2_, inputVector1_},
{{0, 2, 4, 6, 8}, {0}, {0, 2, 4, 6, 8}});
auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_},
{{1, 3, 5, 7, 9}, {1}, {1, 3, 5, 7, 9}});
- testShuffleWriteMultiBlocks(
- *shuffleWriter,
- {compositeBatch1_, compositeBatch2_, compositeBatch1_},
- 2,
- inputVector1_->type(),
- {{blockPid1}, {blockPid2}});
+ testShuffleRoundTrip(
+ *shuffleWriter, {compositeBatch1_, compositeBatch2_, compositeBatch1_},
2, {blockPid1, blockPid2});
}
TEST_P(RoundRobinPartitioningShuffleWriter, roundRobin) {
@@ -610,12 +585,7 @@ TEST_P(RoundRobinPartitioningShuffleWriter, roundRobin) {
auto blockPid1 = takeRows({inputVector1_, inputVector2_, inputVector1_},
{{0, 2, 4, 6, 8}, {0}, {0, 2, 4, 6, 8}});
auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_},
{{1, 3, 5, 7, 9}, {1}, {1, 3, 5, 7, 9}});
- testShuffleWriteMultiBlocks(
- *shuffleWriter,
- {inputVector1_, inputVector2_, inputVector1_},
- 2,
- inputVector1_->type(),
- {{blockPid1}, {blockPid2}});
+ testShuffleRoundTrip(*shuffleWriter, {inputVector1_, inputVector2_,
inputVector1_}, 2, {blockPid1, blockPid2});
}
TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceRealloc) {
@@ -715,6 +685,7 @@ TEST_P(RoundRobinPartitioningShuffleWriter,
spillVerifyResult) {
if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
return;
}
+
ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(2);
@@ -741,11 +712,13 @@ TEST_P(RoundRobinPartitioningShuffleWriter,
spillVerifyResult) {
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
- auto blockPid1 = takeRows({inputVector1_}, {{0, 2, 4, 6, 8, 0, 2, 4, 6, 8,
0, 2, 4, 6, 8}});
- auto blockPid2 = takeRows({inputVector1_}, {{1, 3, 5, 7, 9, 1, 3, 5, 7, 9,
1, 3, 5, 7, 9}});
+ auto blockPid1 =
+ takeRows({inputVector1_, inputVector1_, inputVector1_}, {{0, 2, 4, 6,
8}, {0, 2, 4, 6, 8}, {0, 2, 4, 6, 8}});
+ auto blockPid2 =
+ takeRows({inputVector1_, inputVector1_, inputVector1_}, {{1, 3, 5, 7,
9}, {1, 3, 5, 7, 9}, {1, 3, 5, 7, 9}});
// Stop and verify.
- shuffleWriteReadMultiBlocks(*shuffleWriter, 2, inputVector1_->type(),
{{blockPid1}, {blockPid2}});
+ shuffleWriteReadMultiBlocks(*shuffleWriter, 2, {blockPid1, blockPid2});
}
TEST_P(RoundRobinPartitioningShuffleWriter, sortMaxRows) {
@@ -760,7 +733,7 @@ TEST_P(RoundRobinPartitioningShuffleWriter, sortMaxRows) {
auto blockPid1 = takeRows({inputVector1_}, {{0, 2, 4, 6, 8}});
auto blockPid2 = takeRows({inputVector1_}, {{1, 3, 5, 7, 9}});
- shuffleWriteReadMultiBlocks(*shuffleWriter, 2, inputVector1_->type(),
{{blockPid1}, {blockPid2}});
+ shuffleWriteReadMultiBlocks(*shuffleWriter, 2, {blockPid1, blockPid2});
}
INSTANTIATE_TEST_SUITE_P(
diff --git a/cpp/velox/tests/VeloxShuffleWriterTestBase.h
b/cpp/velox/tests/VeloxShuffleWriterTestBase.h
index 913bd8e487..278da23c27 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/tests/VeloxShuffleWriterTestBase.h
@@ -57,13 +57,13 @@ std::unique_ptr<PartitionWriter> createPartitionWriter(
uint32_t numPartitions,
const std::string& dataFile,
const std::vector<std::string>& localDirs,
- const PartitionWriterOptions& options,
- arrow::MemoryPool* pool) {
+ const PartitionWriterOptions& options) {
if (partitionWriterType == PartitionWriterType::kRss) {
auto rssClient = std::make_unique<LocalRssClient>(dataFile);
- return std::make_unique<RssPartitionWriter>(numPartitions, options, pool,
std::move(rssClient));
+ return std::make_unique<RssPartitionWriter>(
+ numPartitions, options, getDefaultMemoryManager(),
std::move(rssClient));
}
- return std::make_unique<LocalPartitionWriter>(numPartitions, options, pool,
dataFile, localDirs);
+ return std::make_unique<LocalPartitionWriter>(numPartitions, options,
getDefaultMemoryManager(), dataFile, localDirs);
}
} // namespace
@@ -75,7 +75,7 @@ class VeloxShuffleWriterTestBase : public
facebook::velox::test::VectorTestBase
auto listener = std::make_unique<TestAllocationListener>();
listener_ = listener.get();
- std::unordered_map<std::string, std::string>
conf{{kMemoryReservationBlockSize, "1"}};
+ std::unordered_map<std::string, std::string>
conf{{kMemoryReservationBlockSize, "1"}, {kDebugModeEnabled, "true"}};
VeloxBackend::create(std::move(listener), conf);
}
@@ -120,7 +120,16 @@ class VeloxShuffleWriterTestBase : public
facebook::velox::test::VectorTestBase
makeFlatVector<facebook::velox::StringView>(
{"alice0", "bob1", "alice2", "bob3", "Alice4", "Bob5", "AlicE6",
"boB7", "ALICE8", "BOB9"}),
makeNullableFlatVector<facebook::velox::StringView>(
- {"alice", "bob", std::nullopt, std::nullopt, "Alice", "Bob",
std::nullopt, "alicE", std::nullopt, "boB"}),
+ {"alice_0",
+ "bob_1",
+ std::nullopt,
+ std::nullopt,
+ "Alice_4",
+ "Bob_5",
+ std::nullopt,
+ "alicE_7",
+ std::nullopt,
+ "boB_9"}),
facebook::velox::BaseVector::create(facebook::velox::UNKNOWN(), 10,
pool())};
children2_ = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]