This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a3a0dd60ca [GLUTEN-9457][VL] Shuffle test code cleanup (#9458)
a3a0dd60ca is described below
commit a3a0dd60ca63fc4ca1dff2a2977f60c86654ad5d
Author: Rong Ma <[email protected]>
AuthorDate: Wed Apr 30 11:48:52 2025 +0100
[GLUTEN-9457][VL] Shuffle test code cleanup (#9458)
---
cpp/core/jni/JniCommon.h | 14 -
cpp/core/jni/JniWrapper.cc | 2 -
cpp/core/shuffle/Options.h | 26 +-
cpp/core/shuffle/PartitionWriter.h | 13 +-
cpp/core/shuffle/ShuffleWriter.cc | 22 +-
cpp/core/shuffle/ShuffleWriter.h | 13 +-
cpp/core/shuffle/rss/RssClient.h | 2 +
cpp/core/utils/Macros.h | 8 +
cpp/velox/CMakeLists.txt | 6 +-
cpp/velox/benchmarks/ColumnarToRowBenchmark.cc | 3 +-
cpp/velox/benchmarks/GenericBenchmark.cc | 27 +-
cpp/velox/benchmarks/common/BenchmarkUtils.cc | 42 +-
cpp/velox/benchmarks/common/BenchmarkUtils.h | 30 +-
cpp/velox/compute/VeloxRuntime.cc | 14 +-
cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc | 3 +-
cpp/velox/shuffle/VeloxShuffleReader.cc | 5 +-
cpp/velox/shuffle/VeloxShuffleReader.h | 2 +-
cpp/velox/shuffle/VeloxShuffleWriter.cc | 2 +-
cpp/velox/tests/CMakeLists.txt | 4 +
cpp/velox/tests/VeloxShuffleWriterSpillTest.cc | 375 ++++++++++
cpp/velox/tests/VeloxShuffleWriterTest.cc | 762 +++++++++++----------
cpp/velox/tests/VeloxShuffleWriterTestBase.h | 257 +++++++
cpp/velox/utils/LocalRssClient.cc | 62 ++
.../RssClient.h => velox/utils/LocalRssClient.h} | 25 +-
cpp/velox/utils/TestAllocationListener.cc | 72 ++
cpp/velox/utils/TestAllocationListener.h | 66 ++
cpp/velox/utils/VeloxArrowUtils.cc | 19 +
cpp/velox/utils/VeloxArrowUtils.h | 9 +-
cpp/velox/utils/tests/LocalRssClient.h | 75 --
cpp/velox/utils/tests/MemoryPoolUtils.cc | 156 -----
cpp/velox/utils/tests/MemoryPoolUtils.h | 96 ---
cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h | 592 ----------------
32 files changed, 1389 insertions(+), 1415 deletions(-)
diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index 0436c986a1..38839b31ae 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -323,20 +323,6 @@ static inline arrow::Compression::type
getCompressionType(JNIEnv* env, jstring c
return compressionType;
}
-static inline const std::string getCompressionTypeStr(JNIEnv* env, jstring
codecJstr) {
- if (codecJstr == NULL) {
- return "none";
- }
- auto codec = env->GetStringUTFChars(codecJstr, JNI_FALSE);
-
- // Convert codec string into lowercase.
- std::string codecLower;
- std::transform(codec, codec + std::strlen(codec),
std::back_inserter(codecLower), ::tolower);
-
- env->ReleaseStringUTFChars(codecJstr, codec);
- return codecLower;
-}
-
static inline gluten::CodecBackend getCodecBackend(JNIEnv* env, jstring
codecJstr) {
if (codecJstr == nullptr) {
return gluten::CodecBackend::NONE;
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index ecc816a79b..f794516f02 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -837,7 +837,6 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
.compressionBufferSize = compressionBufferSize,
.compressionThreshold = compressionThreshold,
.compressionType = getCompressionType(env, codecJstr),
- .compressionTypeStr = getCompressionTypeStr(env, codecJstr),
.compressionLevel = compressionLevel,
.bufferedWrite = true,
.numSubDirs = numSubDirs,
@@ -1042,7 +1041,6 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
ShuffleReaderOptions options = ShuffleReaderOptions{};
options.compressionType = getCompressionType(env, compressionType);
- options.compressionTypeStr = getCompressionTypeStr(env, compressionType);
if (compressionType != nullptr) {
options.codecBackend = getCodecBackend(env, compressionBackend);
}
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index cb6fec7ae8..55d82cafd9 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -17,10 +17,12 @@
#pragma once
-#include <arrow/ipc/options.h>
-#include <arrow/util/compression.h>
#include "shuffle/Partitioning.h"
#include "utils/Compression.h"
+#include "utils/Macros.h"
+
+#include <arrow/ipc/options.h>
+#include <arrow/util/compression.h>
namespace gluten {
@@ -43,17 +45,24 @@ static constexpr int64_t kDefaultReadBufferSize = 1 << 20;
static constexpr int64_t kDefaultDeserializerBufferSize = 1 << 20;
static constexpr int64_t kDefaultShuffleFileBufferSize = 32 << 10;
-enum ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle };
-enum PartitionWriterType { kLocal, kRss };
-enum SortAlgorithm { kRadixSort, kQuickSort };
+enum class ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle };
+
+enum class PartitionWriterType { kLocal, kRss };
struct ShuffleReaderOptions {
+ ShuffleWriterType shuffleWriterType = ShuffleWriterType::kHashShuffle;
+
+ // Compression options.
arrow::Compression::type compressionType =
arrow::Compression::type::LZ4_FRAME;
- std::string compressionTypeStr = "lz4";
- ShuffleWriterType shuffleWriterType = kHashShuffle;
CodecBackend codecBackend = CodecBackend::NONE;
+
+ // Output batch size.
int32_t batchSize = kDefaultBatchSize;
+
+ // Buffer size when reading data from the input stream.
int64_t readerBufferSize = kDefaultReadBufferSize;
+
+ // Buffer size when deserializing rows into columnar batches. Only used for
sort-based shuffle.
int64_t deserializerBufferSize = kDefaultDeserializerBufferSize;
};
@@ -65,7 +74,7 @@ struct ShuffleWriterOptions {
int64_t taskAttemptId = -1;
int32_t startPartitionId = 0;
int64_t threadId = -1;
- ShuffleWriterType shuffleWriterType = kHashShuffle;
+ ShuffleWriterType shuffleWriterType = ShuffleWriterType::kHashShuffle;
// Sort shuffle writer.
int32_t initialSortBufferSize = kDefaultSortBufferSize; //
spark.shuffle.sort.initialBufferSize
@@ -80,7 +89,6 @@ struct PartitionWriterOptions {
kDefaultCompressionBufferSize; //
spark.io.compression.lz4.blockSize,spark.io.compression.zstd.bufferSize
int32_t compressionThreshold = kDefaultCompressionThreshold;
arrow::Compression::type compressionType = arrow::Compression::LZ4_FRAME;
- std::string compressionTypeStr = kDefaultCompressionTypeStr;
CodecBackend codecBackend = CodecBackend::NONE;
int32_t compressionLevel = arrow::util::kUseDefaultCompressionLevel;
CompressionMode compressionMode = CompressionMode::BUFFER;
diff --git a/cpp/core/shuffle/PartitionWriter.h
b/cpp/core/shuffle/PartitionWriter.h
index 171efed0a3..4d86b7fdad 100644
--- a/cpp/core/shuffle/PartitionWriter.h
+++ b/cpp/core/shuffle/PartitionWriter.h
@@ -37,7 +37,17 @@ class PartitionWriter : public Reclaimable {
codec_ = createArrowIpcCodec(options_.compressionType,
options_.codecBackend, options_.compressionLevel);
}
- virtual ~PartitionWriter() = default;
+ static inline std::string typeToString(PartitionWriterType type) {
+ switch (type) {
+ case PartitionWriterType::kLocal:
+ return "LocalPartitionWriter";
+ case PartitionWriterType::kRss:
+ return "RssPartitionWriter";
+ }
+ GLUTEN_UNREACHABLE();
+ }
+
+ ~PartitionWriter() override = default;
virtual arrow::Status stop(ShuffleWriterMetrics* metrics) = 0;
@@ -83,4 +93,5 @@ class PartitionWriter : public Reclaimable {
int64_t spillTime_{0};
int64_t writeTime_{0};
};
+
} // namespace gluten
diff --git a/cpp/core/shuffle/ShuffleWriter.cc
b/cpp/core/shuffle/ShuffleWriter.cc
index eff5523fe3..13d7c30fec 100644
--- a/cpp/core/shuffle/ShuffleWriter.cc
+++ b/cpp/core/shuffle/ShuffleWriter.cc
@@ -26,17 +26,29 @@ const std::string kSortShuffleName = "sort";
const std::string kRssSortShuffleName = "rss_sort";
} // namespace
-ShuffleWriterType ShuffleWriter::stringToType(const std::string& type) {
- if (type == kHashShuffleName) {
+ShuffleWriterType ShuffleWriter::stringToType(const std::string& typeString) {
+ if (typeString == kHashShuffleName) {
return ShuffleWriterType::kHashShuffle;
}
- if (type == kSortShuffleName) {
+ if (typeString == kSortShuffleName) {
return ShuffleWriterType::kSortShuffle;
}
- if (type == kRssSortShuffleName) {
+ if (typeString == kRssSortShuffleName) {
return ShuffleWriterType::kRssSortShuffle;
}
- throw GlutenException("Unrecognized shuffle writer type: " + type);
+ throw GlutenException("Unrecognized shuffle writer type: " + typeString);
+}
+
+std::string ShuffleWriter::typeToString(ShuffleWriterType type) {
+ switch (type) {
+ case ShuffleWriterType::kHashShuffle:
+ return kHashShuffleName;
+ case ShuffleWriterType::kSortShuffle:
+ return kSortShuffleName;
+ case ShuffleWriterType::kRssSortShuffle:
+ return kRssSortShuffleName;
+ }
+ GLUTEN_UNREACHABLE();
}
int32_t ShuffleWriter::numPartitions() const {
diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h
index 8c79829e00..8852f0527b 100644
--- a/cpp/core/shuffle/ShuffleWriter.h
+++ b/cpp/core/shuffle/ShuffleWriter.h
@@ -17,18 +17,11 @@
#pragma once
-#include <numeric>
-#include <utility>
-
-#include "memory/ArrowMemoryPool.h"
#include "memory/ColumnarBatch.h"
#include "memory/Reclaimable.h"
#include "shuffle/Options.h"
#include "shuffle/PartitionWriter.h"
#include "shuffle/Partitioner.h"
-#include "shuffle/Partitioning.h"
-#include "shuffle/ShuffleMemoryPool.h"
-#include "utils/Compression.h"
namespace gluten {
@@ -38,7 +31,9 @@ class ShuffleWriter : public Reclaimable {
static constexpr int64_t kMaxMemLimit = 1LL * 1024 * 1024 * 1024;
- static ShuffleWriterType stringToType(const std::string& type);
+ static ShuffleWriterType stringToType(const std::string& typeString);
+
+ static std::string typeToString(ShuffleWriterType type);
virtual arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t
memLimit) = 0;
@@ -73,7 +68,7 @@ class ShuffleWriter : public Reclaimable {
protected:
ShuffleWriter(int32_t numPartitions, ShuffleWriterOptions options,
arrow::MemoryPool* pool);
- virtual ~ShuffleWriter() = default;
+ ~ShuffleWriter() override = default;
int32_t numPartitions_;
diff --git a/cpp/core/shuffle/rss/RssClient.h b/cpp/core/shuffle/rss/RssClient.h
index dddccfa1ad..ba7380ae8d 100644
--- a/cpp/core/shuffle/rss/RssClient.h
+++ b/cpp/core/shuffle/rss/RssClient.h
@@ -17,6 +17,8 @@
#pragma once
+#include <cstdint>
+
class RssClient {
public:
virtual ~RssClient() = default;
diff --git a/cpp/core/utils/Macros.h b/cpp/core/utils/Macros.h
index 8c6807c40b..74128cca35 100644
--- a/cpp/core/utils/Macros.h
+++ b/cpp/core/utils/Macros.h
@@ -106,3 +106,11 @@
(time > 1e7 ? time / 1e6 : ((time > 1e4) ? time / 1e3 : time)) << (time >
1e7 ? "ms" : (time > 1e4 ? "us" : "ns"))
#define ROUND_TO_LINE(n, round) (((n) + (round)-1) & ~((round)-1))
+
+#if defined(__GNUC__) || __has_builtin(__builtin_unreachable)
+#define GLUTEN_UNREACHABLE() __builtin_unreachable()
+#elif defined(_MSC_VER)
+#define GLUTEN_UNREACHABLE() __assume(false)
+#else
+#define GLUTEN_UNREACHABLE() ((void)0)
+#endif
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 7f902e8078..71e30922d2 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -166,6 +166,8 @@ set(VELOX_SRCS
udf/UdfLoader.cc
utils/Common.cc
utils/ConfigExtractor.cc
+ utils/LocalRssClient.cc
+ utils/TestAllocationListener.cc
utils/VeloxArrowUtils.cc
utils/VeloxBatchResizer.cc)
@@ -173,10 +175,6 @@ if(ENABLE_S3)
find_package(ZLIB)
endif()
-if(BUILD_TESTS OR BUILD_BENCHMARKS)
- list(APPEND VELOX_SRCS utils/tests/MemoryPoolUtils.cc)
-endif()
-
add_library(velox SHARED ${VELOX_SRCS})
if(ENABLE_GLUTEN_VCPKG AND NOT CMAKE_SYSTEM_NAME MATCHES "Darwin")
diff --git a/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
b/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
index f693bb8858..e32e6d7513 100644
--- a/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
+++ b/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
@@ -261,8 +261,7 @@ int main(int argc, char** argv) {
LOG(INFO) << "datafile = " << datafile;
LOG(INFO) << "cpu = " << cpu;
- auto backendConf = gluten::defaultConf();
- gluten::initVeloxBackend(backendConf);
+ gluten::initVeloxBackend();
memory::MemoryManager::testingSetInstance({});
gluten::GoogleBenchmarkColumnarToRowCacheScanBenchmark bck(datafile);
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc
b/cpp/velox/benchmarks/GenericBenchmark.cc
index b73840fbaf..e9c85987c6 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -20,9 +20,10 @@
#include <arrow/c/bridge.h>
#include <arrow/util/range.h>
+
#include <benchmark/benchmark.h>
+
#include <gflags/gflags.h>
-#include <operators/writer/ArrowWriter.h>
#include "benchmarks/common/BenchmarkUtils.h"
#include "compute/VeloxBackend.h"
@@ -35,10 +36,11 @@
#include "shuffle/VeloxShuffleWriter.h"
#include "shuffle/rss/RssPartitionWriter.h"
#include "utils/Exception.h"
+#include "utils/LocalRssClient.h"
#include "utils/StringUtil.h"
+#include "utils/TestAllocationListener.h"
#include "utils/Timer.h"
#include "utils/VeloxArrowUtils.h"
-#include "utils/tests/LocalRssClient.h"
#include "velox/exec/PlanNodeStats.h"
using namespace gluten;
@@ -170,11 +172,9 @@ PartitionWriterOptions createPartitionWriterOptions() {
if (FLAGS_compression == "lz4") {
partitionWriterOptions.codecBackend = CodecBackend::NONE;
partitionWriterOptions.compressionType = arrow::Compression::LZ4_FRAME;
- partitionWriterOptions.compressionTypeStr = "lz4";
} else if (FLAGS_compression == "zstd") {
partitionWriterOptions.codecBackend = CodecBackend::NONE;
partitionWriterOptions.compressionType = arrow::Compression::ZSTD;
- partitionWriterOptions.compressionTypeStr = "zstd";
} else if (FLAGS_compression == "qat_gzip") {
partitionWriterOptions.codecBackend = CodecBackend::QAT;
partitionWriterOptions.compressionType = arrow::Compression::GZIP;
@@ -218,9 +218,9 @@ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(
auto options = ShuffleWriterOptions{};
options.partitioning = gluten::toPartitioning(FLAGS_partitioning);
if (FLAGS_rss || FLAGS_shuffle_writer == "rss_sort") {
- options.shuffleWriterType = gluten::kRssSortShuffle;
+ options.shuffleWriterType = gluten::ShuffleWriterType::kRssSortShuffle;
} else if (FLAGS_shuffle_writer == "sort") {
- options.shuffleWriterType = gluten::kSortShuffle;
+ options.shuffleWriterType = gluten::ShuffleWriterType::kSortShuffle;
}
auto shuffleWriter =
runtime->createShuffleWriter(FLAGS_shuffle_partitions,
std::move(partitionWriter), std::move(options));
@@ -257,7 +257,7 @@ void setCpu(::benchmark::State& state) {
void runShuffle(
Runtime* runtime,
- BenchmarkAllocationListener* listener,
+ TestAllocationListener* listener,
const std::shared_ptr<gluten::ResultIterator>& resultIter,
WriterMetrics& writerMetrics,
ReaderMetrics& readerMetrics,
@@ -292,7 +292,6 @@ void runShuffle(
readerOptions.shuffleWriterType =
shuffleWriter->options().shuffleWriterType;
readerOptions.compressionType = partitionWriterOptions.compressionType;
readerOptions.codecBackend = partitionWriterOptions.codecBackend;
- readerOptions.compressionTypeStr =
partitionWriterOptions.compressionTypeStr;
std::shared_ptr<arrow::Schema> schema =
gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast<struct
ArrowSchema*>(cSchema.get())));
@@ -389,7 +388,9 @@ auto BM_Generic = [](::benchmark::State& state,
FileReaderType readerType) {
setCpu(state);
- auto listener =
std::make_unique<BenchmarkAllocationListener>(FLAGS_memory_limit);
+ auto listener = std::make_unique<TestAllocationListener>();
+ listener->updateLimit(FLAGS_memory_limit);
+
auto* listenerPtr = listener.get();
auto* memoryManager = MemoryManager::create(kVeloxBackendKind,
std::move(listener));
auto runtime = runtimeFactory(memoryManager);
@@ -513,7 +514,9 @@ auto BM_ShuffleWriteRead = [](::benchmark::State& state,
FileReaderType readerType) {
setCpu(state);
- auto listener =
std::make_unique<BenchmarkAllocationListener>(FLAGS_memory_limit);
+ auto listener = std::make_unique<TestAllocationListener>();
+ listener->updateLimit(FLAGS_memory_limit);
+
auto* listenerPtr = listener.get();
auto* memoryManager = MemoryManager::create(kVeloxBackendKind,
std::move(listener));
auto runtime = runtimeFactory(memoryManager);
@@ -570,8 +573,8 @@ int main(int argc, char** argv) {
::benchmark::Initialize(&argc, argv);
// Init Velox backend.
- auto backendConf = gluten::defaultConf();
- auto sessionConf = gluten::defaultConf();
+ std::unordered_map<std::string, std::string> backendConf{};
+ std::unordered_map<std::string, std::string> sessionConf{};
backendConf.insert({gluten::kDebugModeEnabled,
std::to_string(FLAGS_debug_mode)});
backendConf.insert({gluten::kGlogVerboseLevel, std::to_string(FLAGS_v)});
backendConf.insert({gluten::kGlogSeverityLevel,
std::to_string(FLAGS_minloglevel)});
diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.cc
b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
index 6790e9235f..f31cbc0db5 100644
--- a/cpp/velox/benchmarks/common/BenchmarkUtils.cc
+++ b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
@@ -15,7 +15,8 @@
* limitations under the License.
*/
-#include "BenchmarkUtils.h"
+#include "benchmarks/common/BenchmarkUtils.h"
+
#include "compute/VeloxBackend.h"
#include "compute/VeloxRuntime.h"
#include "config/VeloxConfig.h"
@@ -23,30 +24,19 @@
#include "utils/StringUtil.h"
#include "velox/dwio/common/Options.h"
+#include <benchmark/benchmark.h>
+
DEFINE_int64(batch_size, 4096, "To set
velox::core::QueryConfig::kPreferredOutputBatchSize.");
DEFINE_int32(cpu, -1, "Run benchmark on specific CPU");
DEFINE_int32(threads, 1, "The number of threads to run this benchmark");
DEFINE_int32(iterations, 1, "The number of iterations to run this benchmark");
namespace gluten {
-namespace {
-std::unordered_map<std::string, std::string> bmConfMap = defaultConf();
-} // namespace
-
-std::unordered_map<std::string, std::string> defaultConf() {
- return {
- {gluten::kSparkBatchSize, std::to_string(FLAGS_batch_size)},
- };
-}
-void initVeloxBackend(std::unordered_map<std::string, std::string>& conf) {
+void initVeloxBackend(const std::unordered_map<std::string, std::string>&
conf) {
gluten::VeloxBackend::create(AllocationListener::noop(), conf);
}
-void initVeloxBackend() {
- initVeloxBackend(bmConfMap);
-}
-
std::string getPlanFromFile(const std::string& type, const std::string&
filePath) {
// Read json file and resume the binary data.
std::ifstream msgJson(filePath);
@@ -137,26 +127,4 @@ void setCpu(uint32_t cpuIndex) {
#endif
}
-void BenchmarkAllocationListener::allocationChanged(int64_t diff) {
- if (diff > 0 && usedBytes_ + diff >= limit_) {
- LOG(INFO) << fmt::format(
- "reach hard limit {} when need {}, current used {}.",
- facebook::velox::succinctBytes(limit_),
- facebook::velox::succinctBytes(diff),
- facebook::velox::succinctBytes(usedBytes_));
- auto neededBytes = usedBytes_ + diff - limit_;
- int64_t spilledBytes = 0;
- if (iterator_) {
- spilledBytes += iterator_->spillFixedSize(neededBytes);
- }
- if (spilledBytes < neededBytes && shuffleWriter_) {
- int64_t reclaimed = 0;
- GLUTEN_THROW_NOT_OK(shuffleWriter_->reclaimFixedSize(neededBytes -
spilledBytes, &reclaimed));
- spilledBytes += reclaimed;
- }
- LOG(INFO) << fmt::format("spill finish, got {}.",
facebook::velox::succinctBytes(spilledBytes));
- } else {
- usedBytes_ += diff;
- }
-}
} // namespace gluten
diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.h
b/cpp/velox/benchmarks/common/BenchmarkUtils.h
index de3df96f89..3d603a38fc 100644
--- a/cpp/velox/benchmarks/common/BenchmarkUtils.h
+++ b/cpp/velox/benchmarks/common/BenchmarkUtils.h
@@ -17,7 +17,6 @@
#pragma once
-#include <arrow/c/abi.h>
#include <glog/logging.h>
#include <cstdlib>
#include <filesystem>
@@ -25,16 +24,12 @@
#include <thread>
#include <utility>
-#include "benchmark/benchmark.h"
#include "substrait/SubstraitToVeloxPlan.h"
#include "compute/ProtobufUtils.h"
#include "memory/VeloxColumnarBatch.h"
#include "memory/VeloxMemoryManager.h"
-#include "shuffle/Options.h"
-#include "shuffle/ShuffleWriter.h"
#include "utils/Exception.h"
-#include "utils/VeloxArrowUtils.h"
#include "velox/common/memory/Memory.h"
DECLARE_int64(batch_size);
@@ -46,11 +41,8 @@ namespace gluten {
std::unordered_map<std::string, std::string> defaultConf();
-/// Initialize the Velox backend with default value.
-void initVeloxBackend();
-
/// Initialize the Velox backend.
-void initVeloxBackend(std::unordered_map<std::string, std::string>& conf);
+void initVeloxBackend(const std::unordered_map<std::string, std::string>& conf
= {});
// Get the location of a file generated by Java unittest.
inline std::string getGeneratedFilePath(const std::string& fileName) {
@@ -97,24 +89,4 @@ bool endsWith(const std::string& data, const std::string&
suffix);
void setCpu(uint32_t cpuIndex);
-class BenchmarkAllocationListener final : public gluten::AllocationListener {
- public:
- BenchmarkAllocationListener(uint64_t limit) : limit_(limit) {}
-
- void setIterator(gluten::ResultIterator* iterator) {
- iterator_ = iterator;
- }
-
- void setShuffleWriter(gluten::ShuffleWriter* shuffleWriter) {
- shuffleWriter_ = shuffleWriter;
- }
-
- void allocationChanged(int64_t diff) override;
-
- private:
- uint64_t usedBytes_{0L};
- const uint64_t limit_{0L};
- gluten::ResultIterator* iterator_{nullptr};
- gluten::ShuffleWriter* shuffleWriter_{nullptr};
-};
} // namespace gluten
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index a328e9b22d..e7bd7af585 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -295,23 +295,23 @@ std::shared_ptr<VeloxDataSource>
VeloxRuntime::createDataSource(
std::shared_ptr<ShuffleReader> VeloxRuntime::createShuffleReader(
std::shared_ptr<arrow::Schema> schema,
ShuffleReaderOptions options) {
- auto rowType = facebook::velox::asRowType(gluten::fromArrowSchema(schema));
auto codec = gluten::createArrowIpcCodec(options.compressionType,
options.codecBackend);
- auto ctxVeloxPool = memoryManager()->getLeafMemoryPool();
- auto veloxCompressionType =
facebook::velox::common::stringToCompressionKind(options.compressionTypeStr);
+ const auto veloxCompressionKind =
arrowCompressionTypeToVelox(options.compressionType);
+ const auto rowType =
facebook::velox::asRowType(gluten::fromArrowSchema(schema));
+
auto deserializerFactory =
std::make_unique<gluten::VeloxShuffleReaderDeserializerFactory>(
schema,
std::move(codec),
- veloxCompressionType,
+ veloxCompressionKind,
rowType,
options.batchSize,
options.readerBufferSize,
options.deserializerBufferSize,
memoryManager()->getArrowMemoryPool(),
- ctxVeloxPool,
+ memoryManager()->getLeafMemoryPool(),
options.shuffleWriterType);
- auto reader =
std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
- return reader;
+
+ return std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
}
std::unique_ptr<ColumnarBatchSerializer>
VeloxRuntime::createColumnarBatchSerializer(struct ArrowSchema* cSchema) {
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
index e17ad5e2f7..e3075872e5 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
@@ -209,8 +209,7 @@ arrow::Status VeloxRssSortShuffleWriter::stop() {
arrow::Status VeloxRssSortShuffleWriter::initFromRowVector(const
facebook::velox::RowVector& rv) {
if (!rowType_) {
rowType_ = facebook::velox::asRowType(rv.type());
- serdeOptions_ = {
- false,
facebook::velox::common::stringToCompressionKind(partitionWriter_->options().compressionTypeStr)};
+ serdeOptions_ = {false,
arrowCompressionTypeToVelox(partitionWriter_->options().compressionType)};
batch_ =
std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(),
serde_.get());
batch_->createStreamTree(rowType_, options_.bufferSize, &serdeOptions_);
}
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index 588a1aa808..dc0af91dc0 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -561,7 +561,7 @@ size_t
VeloxRssSortShuffleReaderDeserializer::VeloxInputStream::remainingSize()
VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
- const facebook::velox::common::CompressionKind veloxCompressionType,
+ facebook::velox::common::CompressionKind veloxCompressionType,
const RowTypePtr& rowType,
int32_t batchSize,
int64_t readerBufferSize,
@@ -615,9 +615,8 @@ std::unique_ptr<ColumnarBatchIterator>
VeloxShuffleReaderDeserializerFactory::cr
case ShuffleWriterType::kRssSortShuffle:
return std::make_unique<VeloxRssSortShuffleReaderDeserializer>(
veloxPool_, rowType_, batchSize_, veloxCompressionType_,
deserializeTime_, std::move(in));
- default:
- throw gluten::GlutenException("Unsupported shuffle writer type: " +
std::to_string(shuffleWriterType_));
}
+ GLUTEN_UNREACHABLE();
}
arrow::MemoryPool* VeloxShuffleReaderDeserializerFactory::getPool() {
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h
b/cpp/velox/shuffle/VeloxShuffleReader.h
index de486d5991..71f31618cf 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -146,7 +146,7 @@ class VeloxShuffleReaderDeserializerFactory {
VeloxShuffleReaderDeserializerFactory(
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
- const facebook::velox::common::CompressionKind veloxCompressionType,
+ facebook::velox::common::CompressionKind veloxCompressionType,
const facebook::velox::RowTypePtr& rowType,
int32_t batchSize,
int64_t readerBufferSize,
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc
b/cpp/velox/shuffle/VeloxShuffleWriter.cc
index 06b59cf3eb..a01615dc35 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.cc
@@ -41,7 +41,7 @@ arrow::Result<std::shared_ptr<VeloxShuffleWriter>>
VeloxShuffleWriter::create(
return VeloxRssSortShuffleWriter::create(
numPartitions, std::move(partitionWriter), std::move(options),
veloxPool, arrowPool);
default:
- return arrow::Status::Invalid("Unsupported shuffle writer type: ",
std::to_string(type));
+ return arrow::Status::Invalid("Unsupported shuffle writer type: ",
typeToString(type));
}
}
diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index ba90e45076..a6d04822b6 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -35,6 +35,10 @@ endfunction()
set(VELOX_TEST_COMMON_SRCS JsonToProtoConverter.cc FilePathGenerator.cc)
add_velox_test(velox_shuffle_writer_test SOURCES VeloxShuffleWriterTest.cc)
+
+add_velox_test(velox_shuffle_writer_spill_test SOURCES
+ VeloxShuffleWriterSpillTest.cc)
+
# TODO: ORC is not well supported. add_velox_test(orc_test SOURCES OrcTest.cc)
add_velox_test(
velox_operators_test
diff --git a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
new file mode 100644
index 0000000000..bdbdb604ca
--- /dev/null
+++ b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config/GlutenConfig.h"
+#include "shuffle/VeloxHashShuffleWriter.h"
+#include "tests/VeloxShuffleWriterTestBase.h"
+#include "utils/TestAllocationListener.h"
+#include "utils/TestUtils.h"
+
+using namespace facebook::velox;
+
+namespace gluten {
+
+namespace {
+void assertSpill(TestAllocationListener* listener, std::function<void()>
block) {
+ const auto beforeSpill = listener->reclaimedBytes();
+ block();
+ const auto afterSpill = listener->reclaimedBytes();
+
+ ASSERT_GT(afterSpill, beforeSpill);
+}
+} // namespace
+
+class VeloxHashShuffleWriterSpillTest : public VeloxShuffleWriterTestBase,
public testing::Test {
+ protected:
+ static void SetUpTestSuite() {
+ setUpVeloxBackend();
+ }
+
+ static void TearDownTestSuite() {
+ tearDownVeloxBackend();
+ }
+
+ void SetUp() override {
+ setUpTestData();
+ }
+
+ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t
numPartitions) override {
+ auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
+ auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
+
+ auto partitionWriter = createPartitionWriter(
+ PartitionWriterType::kLocal, numPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
+
+ GLUTEN_ASSIGN_OR_THROW(
+ auto shuffleWriter,
+ VeloxHashShuffleWriter::create(
+ numPartitions, std::move(partitionWriter),
std::move(shuffleWriterOptions_), veloxPool, arrowPool));
+
+ return shuffleWriter;
+ }
+
+ int64_t splitRowVectorAndSpill(
+ VeloxShuffleWriter& shuffleWriter,
+ std::vector<facebook::velox::RowVectorPtr> vectors,
+ bool shrink) {
+ for (auto vector : vectors) {
+ ASSERT_NOT_OK(splitRowVector(shuffleWriter, vector));
+ }
+
+ auto targetEvicted = shuffleWriter.cachedPayloadSize();
+ if (shrink) {
+ targetEvicted += shuffleWriter.partitionBufferSize();
+ }
+ int64_t evicted;
+ ASSERT_NOT_OK(shuffleWriter.reclaimFixedSize(targetEvicted, &evicted));
+
+ return evicted;
+ };
+};
+
+TEST_F(VeloxHashShuffleWriterSpillTest, memoryLeak) {
+ shuffleWriterOptions_.bufferSize = 4;
+
+ auto shuffleWriter = createShuffleWriter(2);
+
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+ ASSERT_NOT_OK(shuffleWriter->stop());
+
+ const auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
+
+ ASSERT_EQ(arrowPool->bytes_allocated(), 0);
+
+ shuffleWriter.reset();
+ ASSERT_EQ(arrowPool->bytes_allocated(), 0);
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, spillFailWithOutOfMemory) {
+ shuffleWriterOptions_.bufferSize = 4;
+
+ auto shuffleWriter = createShuffleWriter(2);
+
+ listener_->updateLimit(0L);
+ listener_->setShuffleWriter(shuffleWriter.get());
+ listener_->setThrowIfOOM(true);
+
+ ASSERT_THROW([&] { auto status = splitRowVector(*shuffleWriter,
inputVector1_); }(), GlutenException);
+
+ // Should return OOM status because there's no partition buffer to spill.
+
+ listener_->reset();
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, kInit) {
+ shuffleWriterOptions_.bufferSize = 4;
+
+ const int32_t numPartitions = 2;
+ auto shuffleWriter = createShuffleWriter(numPartitions);
+
+ // Test spill all partition buffers.
+ {
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+ auto bufferSize = shuffleWriter->partitionBufferSize();
+ auto payloadSize = shuffleWriter->cachedPayloadSize();
+ int64_t evicted;
+ ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize + bufferSize,
&evicted));
+ ASSERT_EQ(evicted, payloadSize + bufferSize);
+ // No cached payload after evict.
+ ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
+ // All partition buffers should be evicted.
+ ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
+ }
+
+ // Test spill minimum-size partition buffers.
+ {
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+ auto bufferSize = shuffleWriter->partitionBufferSize();
+ auto payloadSize = shuffleWriter->cachedPayloadSize();
+ int64_t evicted;
+ ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize + 1, &evicted));
+ ASSERT_GT(evicted, payloadSize);
+ // No cached payload after evict.
+ ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
+ ASSERT_LE(shuffleWriter->partitionBufferSize(), bufferSize);
+ // Not all partition buffers was evicted.
+ ASSERT_GT(shuffleWriter->partitionBufferSize(), 0);
+ }
+
+ // Test spill empty partition buffers.
+ {
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+ // Clear buffers then the size after shrink will be 0.
+ for (auto pid = 0; pid < numPartitions; ++pid) {
+ ASSERT_NOT_OK(shuffleWriter->evictPartitionBuffers(pid, true));
+ }
+
+ auto bufferSize = shuffleWriter->partitionBufferSize();
+ auto payloadSize = shuffleWriter->cachedPayloadSize();
+ int64_t evicted;
+ // Evict payload and shrink min-size buffer.
+ ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize + 1, &evicted));
+ ASSERT_GT(evicted, payloadSize);
+ ASSERT_GT(shuffleWriter->partitionBufferSize(), 0);
+ // Evict empty partition buffers.
+ ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(bufferSize, &evicted));
+ ASSERT_GT(evicted, 0);
+ ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
+ // Evict again. No reclaimable space.
+ ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(1, &evicted));
+ ASSERT_EQ(evicted, 0);
+ }
+
+ ASSERT_NOT_OK(shuffleWriter->stop());
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, kInitSingle) {
+ shuffleWriterOptions_.partitioning = Partitioning::kSingle;
+ shuffleWriterOptions_.bufferSize = 4;
+
+ auto shuffleWriter = createShuffleWriter(2);
+
+ {
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+ auto payloadSize = shuffleWriter->partitionBufferSize() +
shuffleWriter->cachedPayloadSize();
+ int64_t evicted;
+ ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize, &evicted));
+ ASSERT_GE(evicted, payloadSize);
+ // No cached payload after evict.
+ ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
+ }
+
+ ASSERT_NOT_OK(shuffleWriter->stop());
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, kSplit) {
+ shuffleWriterOptions_.bufferSize = 4;
+
+ auto shuffleWriter = createShuffleWriter(2);
+
+ listener_->setShuffleWriter(shuffleWriter.get());
+
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+ listener_->updateLimit(listener_->currentBytes());
+
+ assertSpill(listener_, [&]() {
+ // Trigger spill for the next split.
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+ });
+
+ ASSERT_NOT_OK(shuffleWriter->stop());
+
+ listener_->reset();
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, kSplitSingle) {
+ shuffleWriterOptions_.partitioning = Partitioning::kSingle;
+
+ auto shuffleWriter = createShuffleWriter(1);
+
+ listener_->setShuffleWriter(shuffleWriter.get());
+
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+ // Trigger spill for the next split.
+ listener_->updateLimit(listener_->currentBytes());
+
+ assertSpill(listener_, [&]() { ASSERT_NOT_OK(splitRowVector(*shuffleWriter,
inputVector1_)); });
+
+ ASSERT_NOT_OK(shuffleWriter->stop());
+
+ listener_->reset();
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, kStop) {
+ shuffleWriterOptions_.bufferSize = 4096;
+ // Force compression.
+ partitionWriterOptions_.compressionThreshold = 0;
+ partitionWriterOptions_.mergeThreshold = 0;
+
+ auto shuffleWriter = createShuffleWriter(2);
+
+ for (int i = 0; i < 10; ++i) {
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+ }
+ // Reclaim bytes to shrink partition buffer.
+ int64_t reclaimed = 0;
+ ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(2000, &reclaimed));
+ ASSERT_GE(reclaimed, 2000);
+
+ listener_->setShuffleWriter(shuffleWriter.get());
+ listener_->updateLimit(listener_->currentBytes());
+
+ // Trigger spill during stop.
+ assertSpill(listener_, [&] { ASSERT_NOT_OK(shuffleWriter->stop()); });
+
+ listener_->reset();
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, kStopComplex) {
+ shuffleWriterOptions_.bufferSize = 4096;
+
+ // Force compression.
+ partitionWriterOptions_.compressionThreshold = 0;
+ partitionWriterOptions_.mergeThreshold = 0;
+
+ auto shuffleWriter = createShuffleWriter(2);
+
+ for (int i = 0; i < 3; ++i) {
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorComplex_));
+ }
+
+ // Reclaim bytes to shrink partition buffer.
+ int64_t reclaimed = 0;
+ ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(2000, &reclaimed));
+ ASSERT_GE(reclaimed, 2000);
+
+ // Reclaim from PartitionWriter to free cached bytes.
+ auto payloadSize = shuffleWriter->cachedPayloadSize();
+ int64_t evicted;
+ ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize, &evicted));
+ ASSERT_EQ(evicted, payloadSize);
+
+ listener_->setShuffleWriter(shuffleWriter.get());
+ listener_->updateLimit(listener_->currentBytes());
+
+ // When evicting partitioning buffers in stop, spill will be triggered by
complex types allocating extra memory.
+ assertSpill(listener_, [&] { ASSERT_NOT_OK(shuffleWriter->stop()); });
+
+ listener_->reset();
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, evictPartitionBuffers) {
+ shuffleWriterOptions_.bufferSize = 4;
+
+ auto shuffleWriter = createShuffleWriter(2);
+
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+ // First evict cached payloads.
+ int64_t evicted;
+
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(shuffleWriter->cachedPayloadSize(),
&evicted));
+ ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
+ ASSERT_GT(shuffleWriter->partitionBufferSize(), 0);
+
+ // Evict again. Because no cached payload to evict, it will try to evict all
partition buffers.
+
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(shuffleWriter->partitionBufferSize(),
&evicted));
+ ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, kUnevictableSingle) {
+ shuffleWriterOptions_.partitioning = Partitioning::kSingle;
+
+ auto shuffleWriter = createShuffleWriter(1);
+
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+ // First evict cached payloads.
+ int64_t evicted;
+ ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(
+ shuffleWriter->partitionBufferSize() +
shuffleWriter->cachedPayloadSize(), &evicted));
+ ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
+ ASSERT_GE(evicted, 0);
+
+ // Evict again. The evicted size should be 0.
+ ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(1, &evicted));
+ ASSERT_EQ(evicted, 0);
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, resizeBinaryBufferTriggerSpill) {
+ shuffleWriterOptions_.bufferReallocThreshold = 1;
+ partitionWriterOptions_.compressionType =
arrow::Compression::type::UNCOMPRESSED;
+
+ auto shuffleWriter = createShuffleWriter(1);
+
+ // Split first input vector. Large average string length.
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorLargeBinary1_));
+
+ listener_->setShuffleWriter(shuffleWriter.get());
+ listener_->updateLimit(listener_->currentBytes());
+
+ assertSpill(listener_, [&] {
+ // Split second input vector. Large average string length.
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorLargeBinary2_));
+ });
+
+ ASSERT_NOT_OK(shuffleWriter->stop());
+
+ listener_->reset();
+}
+
+} // namespace gluten
\ No newline at end of file
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index fac503ca70..405e702cd3 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -18,40 +18,53 @@
#include <arrow/c/bridge.h>
#include <arrow/io/api.h>
-#include "shuffle/LocalPartitionWriter.h"
+#include "config/GlutenConfig.h"
#include "shuffle/VeloxHashShuffleWriter.h"
#include "shuffle/VeloxRssSortShuffleWriter.h"
-#include "shuffle/VeloxShuffleWriter.h"
-#include "shuffle/rss/RssPartitionWriter.h"
+#include "shuffle/VeloxSortShuffleWriter.h"
+#include "tests/VeloxShuffleWriterTestBase.h"
+#include "utils/TestAllocationListener.h"
#include "utils/TestUtils.h"
#include "utils/VeloxArrowUtils.h"
-#include "utils/tests/MemoryPoolUtils.h"
-#include "utils/tests/VeloxShuffleWriterTestBase.h"
+
#include "velox/vector/tests/utils/VectorTestBase.h"
-using namespace facebook;
using namespace facebook::velox;
-using namespace arrow;
-using namespace arrow::ipc;
namespace gluten {
+
namespace {
+struct ShuffleTestParams {
+ ShuffleWriterType shuffleWriterType;
+ PartitionWriterType partitionWriterType;
+ arrow::Compression::type compressionType;
+ int32_t compressionThreshold{0};
+ int32_t mergeBufferSize{0};
+ int32_t diskWriteBufferSize{0};
+ bool useRadixSort{false};
+
+ std::string toString() const {
+ std::ostringstream out;
+ out << "shuffleWriterType = " <<
ShuffleWriter::typeToString(shuffleWriterType)
+ << ", partitionWriterType = " <<
PartitionWriter::typeToString(partitionWriterType)
+ << ", compressionType = " <<
arrow::util::Codec::GetCodecAsString(compressionType)
+ << ", compressionThreshold = " << compressionThreshold << ",
mergeBufferSize = " << mergeBufferSize
+ << ", compressionBufferSize = " << diskWriteBufferSize
+ << ", useRadixSort = " << (useRadixSort ? "true" : "false");
+ return out.str();
+ }
+};
-facebook::velox::RowVectorPtr takeRows(
- const std::vector<facebook::velox::RowVectorPtr>& sources,
- const std::vector<std::vector<int32_t>>& indices) {
- facebook::velox::RowVectorPtr copy =
facebook::velox::RowVector::createEmpty(sources[0]->type(), sources[0]->pool());
+RowVectorPtr takeRows(const std::vector<RowVectorPtr>& sources, const
std::vector<std::vector<int32_t>>& indices) {
+ auto copy = RowVector::createEmpty(sources[0]->type(), sources[0]->pool());
for (size_t i = 0; i < sources.size(); ++i) {
if (indices[i].empty()) {
- // Take all rows;
copy->append(sources[i].get());
continue;
}
for (int32_t idx : indices[i]) {
if (idx >= sources[i]->size()) {
- throw GlutenException(
- "Index out of bound: " + std::to_string(idx) + ". Source RowVector
" + std::to_string(i) +
- " num rows: " + std::to_string(sources[i]->size()));
+ throw GlutenException("Index out of bound: " + std::to_string(idx));
}
copy->append(sources[i]->slice(idx, 1).get());
}
@@ -59,19 +72,23 @@ facebook::velox::RowVectorPtr takeRows(
return copy;
}
-std::vector<ShuffleTestParams> createShuffleTestParams() {
- std::vector<ShuffleTestParams> params;
+std::vector<ShuffleTestParams> getTestParams() {
+ static std::vector<ShuffleTestParams> params{};
- std::vector<arrow::Compression::type> compressions = {
- arrow::Compression::UNCOMPRESSED, arrow::Compression::LZ4_FRAME,
arrow::Compression::ZSTD};
+ if (!params.empty()) {
+ return params;
+ }
- std::vector<int32_t> compressionThresholds = {-1, 0, 3, 4, 10, 4096};
- std::vector<int32_t> mergeBufferSizes = {0, 3, 4, 10, 4096};
+ const std::vector<arrow::Compression::type> compressions = {
+ arrow::Compression::UNCOMPRESSED, arrow::Compression::LZ4_FRAME,
arrow::Compression::ZSTD};
+ const std::vector<int32_t> compressionThresholds = {-1, 0, 3, 4, 10, 4096};
+ const std::vector<int32_t> mergeBufferSizes = {0, 3, 4, 10, 4096};
for (const auto& compression : compressions) {
+ // Sort-based shuffle.
for (const auto partitionWriterType : {PartitionWriterType::kLocal,
PartitionWriterType::kRss}) {
for (const auto diskWriteBufferSize : {4, 56, 32 * 1024}) {
- for (auto useRadixSort : {true, false}) {
+ for (const auto useRadixSort : {true, false}) {
params.push_back(ShuffleTestParams{
.shuffleWriterType = ShuffleWriterType::kSortShuffle,
.partitionWriterType = partitionWriterType,
@@ -81,28 +98,346 @@ std::vector<ShuffleTestParams> createShuffleTestParams() {
}
}
}
- params.push_back(ShuffleTestParams{ShuffleWriterType::kRssSortShuffle,
PartitionWriterType::kRss, compression});
+ // Rss sort-based shuffle.
+ params.push_back(ShuffleTestParams{
+ .shuffleWriterType = ShuffleWriterType::kRssSortShuffle,
+ .partitionWriterType = PartitionWriterType::kRss,
+ .compressionType = compression});
+
+ // Hash-based shuffle.
for (const auto compressionThreshold : compressionThresholds) {
+ // Local.
for (const auto mergeBufferSize : mergeBufferSizes) {
params.push_back(ShuffleTestParams{
- ShuffleWriterType::kHashShuffle,
- PartitionWriterType::kLocal,
- compression,
- compressionThreshold,
- mergeBufferSize});
+ .shuffleWriterType = ShuffleWriterType::kHashShuffle,
+ .partitionWriterType = PartitionWriterType::kLocal,
+ .compressionType = compression,
+ .compressionThreshold = compressionThreshold,
+ .mergeBufferSize = mergeBufferSize});
}
+
+ // Rss.
params.push_back(ShuffleTestParams{
- ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss,
compression, compressionThreshold});
+ .shuffleWriterType = ShuffleWriterType::kHashShuffle,
+ .partitionWriterType = PartitionWriterType::kRss,
+ .compressionType = compression,
+ .compressionThreshold = compressionThreshold});
}
}
return params;
}
+} // namespace
-static const auto kShuffleWriteTestParams = createShuffleTestParams();
+class VeloxShuffleWriterTestEnvironment : public ::testing::Environment {
+ public:
+ void SetUp() override {
+ VeloxShuffleWriterTestBase::setUpVeloxBackend();
+ }
-} // namespace
+ void TearDown() override {
+ VeloxShuffleWriterTestBase::tearDownVeloxBackend();
+ }
+};
+
+class VeloxShuffleWriterTest : public
::testing::TestWithParam<ShuffleTestParams>, public VeloxShuffleWriterTestBase {
+ public:
+ arrow::Status initShuffleWriterOptions() {
+ ShuffleTestParams params = GetParam();
+
+ shuffleWriterOptions_.useRadixSort = params.useRadixSort;
+ shuffleWriterOptions_.diskWriteBufferSize = params.diskWriteBufferSize;
+
+ partitionWriterOptions_.compressionType = params.compressionType;
+ partitionWriterOptions_.compressionThreshold = params.compressionThreshold;
+ partitionWriterOptions_.mergeBufferSize = params.mergeBufferSize;
+
+ return arrow::Status::OK();
+ }
+
+ protected:
+ void SetUp() override {
+ std::cout << "Running test with param: " << GetParam().toString() <<
std::endl;
+ VeloxShuffleWriterTestBase::setUpTestData();
+ }
+
+ void TearDown() override {
+ if (file_ != nullptr && !file_->closed()) {
+ GLUTEN_THROW_NOT_OK(file_->Close());
+ }
+ }
+
+ static void checkFileExists(const std::string& fileName) {
+
ASSERT_EQ(*arrow::internal::FileExists(*arrow::internal::PlatformFilename::FromString(fileName)),
true);
+ }
+
+ std::shared_ptr<arrow::Schema> getArrowSchema(facebook::velox::RowVectorPtr&
rowVector) {
+ return toArrowSchema(rowVector->type(), pool());
+ }
+
+ void setReadableFile(const std::string& fileName) {
+ if (file_ != nullptr && !file_->closed()) {
+ GLUTEN_THROW_NOT_OK(file_->Close());
+ }
+ GLUTEN_ASSIGN_OR_THROW(file_, arrow::io::ReadableFile::Open(fileName))
+ }
+
+ void getRowVectors(
+ arrow::Compression::type compressionType,
+ std::shared_ptr<arrow::Schema> schema,
+ std::vector<facebook::velox::RowVectorPtr>& vectors,
+ std::shared_ptr<arrow::io::InputStream> in) {
+ const auto rowType =
facebook::velox::asRowType(gluten::fromArrowSchema(schema));
+ const auto veloxCompressionType =
arrowCompressionTypeToVelox(compressionType);
+
+ auto codec = createArrowIpcCodec(compressionType, CodecBackend::NONE);
+
+ // Set batchSize to a large value to make all batches are merged by reader.
+ auto deserializerFactory =
std::make_unique<gluten::VeloxShuffleReaderDeserializerFactory>(
+ schema,
+ std::move(codec),
+ veloxCompressionType,
+ rowType,
+ kDefaultBatchSize,
+ kDefaultReadBufferSize,
+ kDefaultDeserializerBufferSize,
+ getDefaultMemoryManager()->getArrowMemoryPool(),
+ pool_,
+ GetParam().shuffleWriterType);
+
+ const auto reader =
std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
+
+ const auto iter = reader->readStream(in);
+ while (iter->hasNext()) {
+ auto vector =
std::dynamic_pointer_cast<VeloxColumnarBatch>(iter->next())->getRowVector();
+ vectors.emplace_back(vector);
+ }
+ }
+
+ inline static TestAllocationListener* listener_{nullptr};
+
+ std::shared_ptr<arrow::io::ReadableFile> file_;
+};
+
+class SinglePartitioningShuffleWriter : public VeloxShuffleWriterTest {
+ protected:
+ void testShuffleWrite(VeloxShuffleWriter& shuffleWriter,
std::vector<facebook::velox::RowVectorPtr> inputs) {
+ for (auto& vector : inputs) {
+ ASSERT_NOT_OK(splitRowVector(shuffleWriter, vector));
+ }
+ ASSERT_NOT_OK(shuffleWriter.stop());
+
+ // Verify data file exists.
+ checkFileExists(dataFile_);
+
+ // Verify number of output partitions.
+ const auto& lengths = shuffleWriter.partitionLengths();
+ ASSERT_EQ(lengths.size(), 1);
+
+ const auto schema = getArrowSchema(inputs[0]);
+
+ std::vector<facebook::velox::RowVectorPtr> deserializedVectors;
+ setReadableFile(dataFile_);
+ GLUTEN_ASSIGN_OR_THROW(auto in,
arrow::io::RandomAccessFile::GetStream(file_, 0, lengths[0]));
+ getRowVectors(partitionWriterOptions_.compressionType, schema,
deserializedVectors, in);
+
+ ASSERT_EQ(deserializedVectors.size(), inputs.size());
+ for (int32_t i = 0; i < deserializedVectors.size(); i++) {
+ facebook::velox::test::assertEqualVectors(inputs[i],
deserializedVectors[i]);
+ }
+ }
+
+ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t) override {
+ auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
+ auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
+
+ shuffleWriterOptions_.partitioning = Partitioning::kSingle;
+ shuffleWriterOptions_.bufferSize = 10;
+
+ auto partitionWriter = createPartitionWriter(
+ GetParam().partitionWriterType, 1, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
+
+ GLUTEN_ASSIGN_OR_THROW(
+ auto shuffleWriter,
+ VeloxShuffleWriter::create(
+ GetParam().shuffleWriterType,
+ 1,
+ std::move(partitionWriter),
+ std::move(shuffleWriterOptions_),
+ veloxPool,
+ arrowPool));
+
+ return shuffleWriter;
+ }
+};
+
+class MultiplePartitioningShuffleWriter : public VeloxShuffleWriterTest {
+ protected:
+ void shuffleWriteReadMultiBlocks(
+ VeloxShuffleWriter& shuffleWriter,
+ int32_t expectPartitionLength,
+ facebook::velox::TypePtr dataType,
+ std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors)
{ /* blockId = pid, rowVector in block */
+ ASSERT_NOT_OK(shuffleWriter.stop());
+ // verify data file
+ checkFileExists(dataFile_);
+ // verify output temporary files
+ const auto& lengths = shuffleWriter.partitionLengths();
+ ASSERT_EQ(lengths.size(), expectPartitionLength);
+ int64_t lengthSum = std::accumulate(lengths.begin(), lengths.end(), 0);
+ auto schema = toArrowSchema(dataType, pool());
+ setReadableFile(dataFile_);
+ ASSERT_EQ(*file_->GetSize(), lengthSum);
+ for (int32_t i = 0; i < expectPartitionLength; i++) {
+ if (expectedVectors[i].size() == 0) {
+ ASSERT_EQ(lengths[i], 0);
+ } else {
+ std::vector<facebook::velox::RowVectorPtr> deserializedVectors;
+ GLUTEN_ASSIGN_OR_THROW(
+ auto in, arrow::io::RandomAccessFile::GetStream(file_, i == 0 ? 0
: lengths[i - 1], lengths[i]));
+ getRowVectors(partitionWriterOptions_.compressionType, schema,
deserializedVectors, in);
+ ASSERT_EQ(expectedVectors[i].size(), deserializedVectors.size());
+ for (int32_t j = 0; j < expectedVectors[i].size(); j++) {
+ facebook::velox::test::assertEqualVectors(expectedVectors[i][j],
deserializedVectors[j]);
+ }
+ }
+ }
+ }
+
+ void testShuffleWriteMultiBlocks(
+ VeloxShuffleWriter& shuffleWriter,
+ std::vector<facebook::velox::RowVectorPtr> vectors,
+ int32_t expectPartitionLength,
+ facebook::velox::TypePtr dataType,
+ std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors)
{
+ for (auto& vector : vectors) {
+ ASSERT_NOT_OK(splitRowVector(shuffleWriter, vector));
+ }
+ shuffleWriteReadMultiBlocks(shuffleWriter, expectPartitionLength,
dataType, expectedVectors);
+ }
+};
+
+class HashPartitioningShuffleWriter : public MultiplePartitioningShuffleWriter
{
+ protected:
+ void SetUp() override {
+ MultiplePartitioningShuffleWriter::SetUp();
+
+ children1_.insert((children1_.begin()), makeFlatVector<int32_t>({1, 2, 2,
2, 2, 1, 1, 1, 2, 1}));
+ hashInputVector1_ = makeRowVector(children1_);
+ children2_.insert((children2_.begin()), makeFlatVector<int32_t>({2, 2}));
+ hashInputVector2_ = makeRowVector(children2_);
+ }
+
+ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t
numPartitions) override {
+ auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
+ auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
+
+ shuffleWriterOptions_.partitioning = Partitioning::kHash;
+ shuffleWriterOptions_.bufferSize = 4;
+
+ auto partitionWriter = createPartitionWriter(
+ GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
+
+ GLUTEN_ASSIGN_OR_THROW(
+ auto shuffleWriter,
+ VeloxShuffleWriter::create(
+ GetParam().shuffleWriterType,
+ numPartitions,
+ std::move(partitionWriter),
+ std::move(shuffleWriterOptions_),
+ veloxPool,
+ arrowPool));
+
+ return shuffleWriter;
+ }
+
+ std::vector<uint32_t> hashPartitionIds_{1, 2};
+
+ facebook::velox::RowVectorPtr hashInputVector1_;
+ facebook::velox::RowVectorPtr hashInputVector2_;
+};
+
+class RangePartitioningShuffleWriter : public
MultiplePartitioningShuffleWriter {
+ protected:
+ void SetUp() override {
+ MultiplePartitioningShuffleWriter::SetUp();
+
+ auto pid1 = makeRowVector({makeFlatVector<int32_t>({0, 1, 0, 1, 0, 1, 0,
1, 0, 1})});
+ auto rangeVector1 = makeRowVector(inputVector1_->children());
+ compositeBatch1_ = VeloxColumnarBatch::compose(
+ pool(), {std::make_shared<VeloxColumnarBatch>(pid1),
std::make_shared<VeloxColumnarBatch>(rangeVector1)});
+
+ auto pid2 = makeRowVector({makeFlatVector<int32_t>({0, 1})});
+ auto rangeVector2 = makeRowVector(inputVector2_->children());
+ compositeBatch2_ = VeloxColumnarBatch::compose(
+ pool(), {std::make_shared<VeloxColumnarBatch>(pid2),
std::make_shared<VeloxColumnarBatch>(rangeVector2)});
+ }
+
+ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t
numPartitions) override {
+ auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
+ auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
+
+ shuffleWriterOptions_.partitioning = Partitioning::kRange;
+ shuffleWriterOptions_.bufferSize = 4;
+
+ auto partitionWriter = createPartitionWriter(
+ GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
+
+ GLUTEN_ASSIGN_OR_THROW(
+ auto shuffleWriter,
+ VeloxShuffleWriter::create(
+ GetParam().shuffleWriterType,
+ numPartitions,
+ std::move(partitionWriter),
+ std::move(shuffleWriterOptions_),
+ veloxPool,
+ arrowPool));
+
+ return shuffleWriter;
+ }
+
+ void testShuffleWriteMultiBlocks(
+ VeloxShuffleWriter& shuffleWriter,
+ std::vector<std::shared_ptr<ColumnarBatch>> batches,
+ int32_t expectPartitionLength,
+ facebook::velox::TypePtr dataType,
+ std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors)
{ /* blockId = pid, rowVector in block */
+ for (auto& batch : batches) {
+ ASSERT_NOT_OK(shuffleWriter.write(batch, ShuffleWriter::kMinMemLimit));
+ }
+ shuffleWriteReadMultiBlocks(shuffleWriter, expectPartitionLength,
dataType, expectedVectors);
+ }
+
+ std::shared_ptr<ColumnarBatch> compositeBatch1_;
+ std::shared_ptr<ColumnarBatch> compositeBatch2_;
+};
+
+class RoundRobinPartitioningShuffleWriter : public
MultiplePartitioningShuffleWriter {
+ protected:
+ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t
numPartitions) override {
+ auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
+ auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
+
+ shuffleWriterOptions_.partitioning = Partitioning::kRoundRobin;
+ shuffleWriterOptions_.bufferSize = 4;
+
+ auto partitionWriter = createPartitionWriter(
+ GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
+
+ GLUTEN_ASSIGN_OR_THROW(
+ auto shuffleWriter,
+ VeloxShuffleWriter::create(
+ GetParam().shuffleWriterType,
+ numPartitions,
+ std::move(partitionWriter),
+ std::move(shuffleWriterOptions_),
+ veloxPool,
+ arrowPool));
+
+ return shuffleWriter;
+ }
+};
TEST_P(SinglePartitioningShuffleWriter, single) {
if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
@@ -111,30 +446,30 @@ TEST_P(SinglePartitioningShuffleWriter, single) {
// Split 1 RowVector.
{
ASSERT_NOT_OK(initShuffleWriterOptions());
- auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+ auto shuffleWriter = createShuffleWriter(1);
testShuffleWrite(*shuffleWriter, {inputVector1_});
}
// Split > 1 RowVector.
{
ASSERT_NOT_OK(initShuffleWriterOptions());
- auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+ auto shuffleWriter = createShuffleWriter(1);
auto resultBlock = takeRows({inputVector1_, inputVector2_, inputVector1_},
{{}, {}, {}});
testShuffleWrite(*shuffleWriter, {resultBlock});
}
// Split null RowVector.
{
ASSERT_NOT_OK(initShuffleWriterOptions());
- auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+ auto shuffleWriter = createShuffleWriter(1);
auto vector = makeRowVector({
makeNullableFlatVector<int32_t>({std::nullopt}),
- makeNullableFlatVector<velox::StringView>({std::nullopt}),
+ makeNullableFlatVector<StringView>({std::nullopt}),
});
testShuffleWrite(*shuffleWriter, {vector});
}
// Other types.
{
ASSERT_NOT_OK(initShuffleWriterOptions());
- auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+ auto shuffleWriter = createShuffleWriter(1);
auto vector = makeRowVector({
makeNullableFlatVector<int32_t>({std::nullopt, 1}),
makeNullableFlatVector<StringView>({std::nullopt, "10"}),
@@ -145,7 +480,7 @@ TEST_P(SinglePartitioningShuffleWriter, single) {
makeNullableFlatVector<int32_t>({std::nullopt, 1}),
makeRowVector({
makeFlatVector<int32_t>({1, 3}),
- makeNullableFlatVector<velox::StringView>({std::nullopt, "de"}),
+ makeNullableFlatVector<StringView>({std::nullopt, "de"}),
}),
makeNullableFlatVector<StringView>({std::nullopt, "10 I'm not inline
string"}),
makeArrayVector<int64_t>({
@@ -160,12 +495,12 @@ TEST_P(SinglePartitioningShuffleWriter, single) {
TEST_P(HashPartitioningShuffleWriter, hashPart1Vector) {
ASSERT_NOT_OK(initShuffleWriterOptions());
- auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+ auto shuffleWriter = createShuffleWriter(2);
auto vector = makeRowVector({
makeFlatVector<int32_t>({1, 2, 1, 2}),
makeNullableFlatVector<int8_t>({1, 2, 3, std::nullopt}),
makeFlatVector<int64_t>({1, 2, 3, 4}),
- makeFlatVector<velox::StringView>({"nn", "re", "fr", "juiu"}),
+ makeFlatVector<StringView>({"nn", "re", "fr", "juiu"}),
makeFlatVector<int64_t>({232, 34567235, 1212, 4567}, DECIMAL(12, 4)),
makeFlatVector<int128_t>({232, 34567235, 1212, 4567}, DECIMAL(20, 4)),
makeFlatVector<int32_t>(
@@ -188,7 +523,7 @@ TEST_P(HashPartitioningShuffleWriter, hashPart1Vector) {
auto firstBlock = makeRowVector({
makeNullableFlatVector<int8_t>({2, std::nullopt}),
makeFlatVector<int64_t>({2, 4}),
- makeFlatVector<velox::StringView>({"re", "juiu"}),
+ makeFlatVector<StringView>({"re", "juiu"}),
makeFlatVector<int64_t>({34567235, 4567}, DECIMAL(12, 4)),
makeFlatVector<int128_t>({34567235, 4567}, DECIMAL(20, 4)),
makeFlatVector<int32_t>({1, 1}, DATE()),
@@ -198,7 +533,7 @@ TEST_P(HashPartitioningShuffleWriter, hashPart1Vector) {
auto secondBlock = makeRowVector({
makeNullableFlatVector<int8_t>({1, 3}),
makeFlatVector<int64_t>({1, 3}),
- makeFlatVector<velox::StringView>({"nn", "fr"}),
+ makeFlatVector<StringView>({"nn", "fr"}),
makeFlatVector<int64_t>({232, 1212}, DECIMAL(12, 4)),
makeFlatVector<int128_t>({232, 1212}, DECIMAL(20, 4)),
makeNullableFlatVector<int32_t>({std::nullopt, 0}, DATE()),
@@ -210,7 +545,7 @@ TEST_P(HashPartitioningShuffleWriter, hashPart1Vector) {
TEST_P(HashPartitioningShuffleWriter, hashPart1VectorComplexType) {
ASSERT_NOT_OK(initShuffleWriterOptions());
- auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+ auto shuffleWriter = createShuffleWriter(2);
auto children = childrenComplex_;
children.insert((children.begin()), makeFlatVector<int32_t>({1, 2}));
auto vector = makeRowVector(children);
@@ -222,7 +557,7 @@ TEST_P(HashPartitioningShuffleWriter,
hashPart1VectorComplexType) {
TEST_P(HashPartitioningShuffleWriter, hashPart3Vectors) {
ASSERT_NOT_OK(initShuffleWriterOptions());
- auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+ auto shuffleWriter = createShuffleWriter(2);
auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_},
{{1, 2, 3, 4, 8}, {0, 1}, {1, 2, 3, 4, 8}});
auto blockPid1 = takeRows({inputVector1_}, {{0, 5, 6, 7, 9, 0, 5, 6, 7, 9}});
@@ -238,10 +573,10 @@ TEST_P(HashPartitioningShuffleWriter, hashPart3Vectors) {
TEST_P(HashPartitioningShuffleWriter, hashLargeVectors) {
const int32_t expectedMaxBatchSize = 8;
ASSERT_NOT_OK(initShuffleWriterOptions());
- auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+ auto shuffleWriter = createShuffleWriter(2);
// calculate maxBatchSize_
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, hashInputVector1_));
- if (GetParam().shuffleWriterType == kHashShuffle) {
+ if (GetParam().shuffleWriterType == ShuffleWriterType::kHashShuffle) {
VELOX_CHECK_EQ(shuffleWriter->maxBatchSize(), expectedMaxBatchSize);
}
@@ -253,9 +588,9 @@ TEST_P(HashPartitioningShuffleWriter, hashLargeVectors) {
*shuffleWriter, {hashInputVector2_, hashInputVector1_}, 2,
inputVector1_->type(), {{blockPid2}, {blockPid1}});
}
-TEST_P(RangePartitioningShuffleWriter, rangePartition) {
+TEST_P(RangePartitioningShuffleWriter, range) {
ASSERT_NOT_OK(initShuffleWriterOptions());
- auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+ auto shuffleWriter = createShuffleWriter(2);
auto blockPid1 = takeRows({inputVector1_, inputVector2_, inputVector1_},
{{0, 2, 4, 6, 8}, {0}, {0, 2, 4, 6, 8}});
auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_},
{{1, 3, 5, 7, 9}, {1}, {1, 3, 5, 7, 9}});
@@ -270,7 +605,7 @@ TEST_P(RangePartitioningShuffleWriter, rangePartition) {
TEST_P(RoundRobinPartitioningShuffleWriter, roundRobin) {
ASSERT_NOT_OK(initShuffleWriterOptions());
- auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+ auto shuffleWriter = createShuffleWriter(2);
auto blockPid1 = takeRows({inputVector1_, inputVector2_, inputVector1_},
{{0, 2, 4, 6, 8}, {0}, {0, 2, 4, 6, 8}});
auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_},
{{1, 3, 5, 7, 9}, {1}, {1, 3, 5, 7, 9}});
@@ -284,12 +619,12 @@ TEST_P(RoundRobinPartitioningShuffleWriter, roundRobin) {
}
TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceRealloc) {
- if (GetParam().shuffleWriterType != kHashShuffle) {
+ if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
return;
}
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferReallocThreshold = 0; // Force re-alloc on
buffer size changed.
- auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+ auto shuffleWriter = createShuffleWriter(2);
// First spilt no null.
auto inputNoNull = inputVectorNoNull_;
@@ -302,8 +637,8 @@ TEST_P(RoundRobinPartitioningShuffleWriter,
preAllocForceRealloc) {
makeNullableFlatVector<int64_t>({0, 1}),
makeNullableFlatVector<float>({0, 0.142857}),
makeNullableFlatVector<bool>({false, true}),
- makeNullableFlatVector<velox::StringView>({"", "alice"}),
- makeNullableFlatVector<velox::StringView>({"alice", ""}),
+ makeNullableFlatVector<StringView>({"", "alice"}),
+ makeNullableFlatVector<StringView>({"alice", ""}),
};
auto inputHasNull = makeRowVector(intHasNull);
@@ -342,12 +677,12 @@ TEST_P(RoundRobinPartitioningShuffleWriter,
preAllocForceRealloc) {
}
TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceReuse) {
- if (GetParam().shuffleWriterType != kHashShuffle) {
+ if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
return;
}
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferReallocThreshold = 1; // Force re-alloc on
buffer size changed.
- auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+ auto shuffleWriter = createShuffleWriter(2);
// First spilt no null.
auto inputNoNull = inputVectorNoNull_;
@@ -361,8 +696,8 @@ TEST_P(RoundRobinPartitioningShuffleWriter,
preAllocForceReuse) {
makeNullableFlatVector<int64_t>({0, 1}),
makeNullableFlatVector<float>({0, 0.142857}),
makeNullableFlatVector<bool>({false, true}),
- makeNullableFlatVector<velox::StringView>({std::nullopt, std::nullopt}),
- makeNullableFlatVector<velox::StringView>({std::nullopt, std::nullopt}),
+ makeNullableFlatVector<StringView>({std::nullopt, std::nullopt}),
+ makeNullableFlatVector<StringView>({std::nullopt, std::nullopt}),
};
auto inputStringHasNull = makeRowVector(stringHasNull);
@@ -377,11 +712,11 @@ TEST_P(RoundRobinPartitioningShuffleWriter,
preAllocForceReuse) {
}
TEST_P(RoundRobinPartitioningShuffleWriter, spillVerifyResult) {
- if (GetParam().shuffleWriterType != kHashShuffle) {
+ if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
return;
}
ASSERT_NOT_OK(initShuffleWriterOptions());
- auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+ auto shuffleWriter = createShuffleWriter(2);
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
@@ -414,11 +749,11 @@ TEST_P(RoundRobinPartitioningShuffleWriter,
spillVerifyResult) {
}
TEST_P(RoundRobinPartitioningShuffleWriter, sortMaxRows) {
- if (GetParam().shuffleWriterType != kSortShuffle) {
+ if (GetParam().shuffleWriterType != ShuffleWriterType::kSortShuffle) {
return;
}
ASSERT_NOT_OK(initShuffleWriterOptions());
- auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+ auto shuffleWriter = createShuffleWriter(2);
// Set memLimit to 0 to force allocate a new buffer for each row.
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_, 0));
@@ -428,307 +763,30 @@ TEST_P(RoundRobinPartitioningShuffleWriter, sortMaxRows)
{
shuffleWriteReadMultiBlocks(*shuffleWriter, 2, inputVector1_->type(),
{{blockPid1}, {blockPid2}});
}
-TEST_F(VeloxHashShuffleWriterMemoryTest, memoryLeak) {
- ASSERT_NOT_OK(initShuffleWriterOptions());
- std::shared_ptr<arrow::MemoryPool> pool =
std::make_shared<LimitedMemoryPool>();
- shuffleWriterOptions_.bufferSize = 4;
-
- auto shuffleWriter = createShuffleWriter(pool.get());
-
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
- ASSERT_NOT_OK(shuffleWriter->stop());
-
- ASSERT_TRUE(pool->bytes_allocated() == 0);
- shuffleWriter.reset();
- ASSERT_TRUE(pool->bytes_allocated() == 0);
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, spillFailWithOutOfMemory) {
- ASSERT_NOT_OK(initShuffleWriterOptions());
- std::shared_ptr<arrow::MemoryPool> pool =
std::make_shared<LimitedMemoryPool>(0);
- shuffleWriterOptions_.bufferSize = 4;
-
- auto shuffleWriter = createShuffleWriter(pool.get());
-
- auto status = splitRowVector(*shuffleWriter, inputVector1_);
-
- // Should return OOM status because there's no partition buffer to spill.
- ASSERT_TRUE(status.IsOutOfMemory());
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, kInit) {
- ASSERT_NOT_OK(initShuffleWriterOptions());
- shuffleWriterOptions_.bufferSize = 4;
- auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
-
- // Test spill all partition buffers.
- {
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
- auto bufferSize = shuffleWriter->partitionBufferSize();
- auto payloadSize = shuffleWriter->cachedPayloadSize();
- int64_t evicted;
- ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize + bufferSize,
&evicted));
- ASSERT_EQ(evicted, payloadSize + bufferSize);
- // No cached payload after evict.
- ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
- // All partition buffers should be evicted.
- ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
- }
-
- // Test spill minimum-size partition buffers.
- {
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
- auto bufferSize = shuffleWriter->partitionBufferSize();
- auto payloadSize = shuffleWriter->cachedPayloadSize();
- int64_t evicted;
- ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize + 1, &evicted));
- ASSERT_GT(evicted, payloadSize);
- // No cached payload after evict.
- ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
- ASSERT_LE(shuffleWriter->partitionBufferSize(), bufferSize);
- // Not all partition buffers was evicted.
- ASSERT_GT(shuffleWriter->partitionBufferSize(), 0);
- }
-
- // Test spill empty partition buffers.
- {
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
- // Clear buffers then the size after shrink will be 0.
- for (auto pid = 0; pid < kDefaultShufflePartitions; ++pid) {
- ASSERT_NOT_OK(shuffleWriter->evictPartitionBuffers(pid, true));
- }
-
- auto bufferSize = shuffleWriter->partitionBufferSize();
- auto payloadSize = shuffleWriter->cachedPayloadSize();
- int64_t evicted;
- // Evict payload and shrink min-size buffer.
- ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize + 1, &evicted));
- ASSERT_GT(evicted, payloadSize);
- ASSERT_GT(shuffleWriter->partitionBufferSize(), 0);
- // Evict empty partition buffers.
- ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(bufferSize, &evicted));
- ASSERT_GT(evicted, 0);
- ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
- // Evict again. No reclaimable space.
- ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(1, &evicted));
- ASSERT_EQ(evicted, 0);
- }
-
- ASSERT_NOT_OK(shuffleWriter->stop());
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, kInitSingle) {
- ASSERT_NOT_OK(initShuffleWriterOptions());
- shuffleWriterOptions_.partitioning = Partitioning::kSingle;
- shuffleWriterOptions_.bufferSize = 4;
- auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
-
- {
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
- auto payloadSize = shuffleWriter->partitionBufferSize() +
shuffleWriter->cachedPayloadSize();
- int64_t evicted;
- ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize, &evicted));
- ASSERT_GE(evicted, payloadSize);
- // No cached payload after evict.
- ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
- }
-
- ASSERT_NOT_OK(shuffleWriter->stop());
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, kSplit) {
- ASSERT_NOT_OK(initShuffleWriterOptions());
- shuffleWriterOptions_.bufferSize = 4;
- auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
- auto shuffleWriter = createShuffleWriter(&pool);
-
- pool.setEvictable(shuffleWriter.get());
-
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
- // Trigger spill for the next split.
- ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] {
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
- }));
-
- ASSERT_NOT_OK(shuffleWriter->stop());
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, kSplitSingle) {
- ASSERT_NOT_OK(initShuffleWriterOptions());
- shuffleWriterOptions_.partitioning = Partitioning::kSingle;
- auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
-
- auto shuffleWriter = createShuffleWriter(1, &pool);
-
- pool.setEvictable(shuffleWriter.get());
-
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
- // Trigger spill for the next split.
- ASSERT_TRUE(pool.checkEvict(
- shuffleWriter->cachedPayloadSize(), [&] {
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_)); }));
-
- ASSERT_NOT_OK(shuffleWriter->stop());
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, kStop) {
- for (const auto partitioning : {Partitioning::kSingle,
Partitioning::kRoundRobin}) {
- ASSERT_NOT_OK(initShuffleWriterOptions());
- shuffleWriterOptions_.bufferSize = 4096;
- // Force compression.
- partitionWriterOptions_.compressionThreshold = 0;
- partitionWriterOptions_.mergeThreshold = 0;
- auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
- auto shuffleWriter = createShuffleWriter(&pool);
-
- pool.setEvictable(shuffleWriter.get());
-
- for (int i = 0; i < 10; ++i) {
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
- }
- // Reclaim bytes to shrink partition buffer.
- int64_t reclaimed = 0;
- ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(2000, &reclaimed));
- ASSERT_TRUE(reclaimed >= 2000);
-
- // Trigger spill during stop.
- ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] {
ASSERT_NOT_OK(shuffleWriter->stop()); }));
- }
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, kStopComplex) {
- ASSERT_NOT_OK(initShuffleWriterOptions());
- shuffleWriterOptions_.bufferSize = 4096;
- // Force compression.
- partitionWriterOptions_.compressionThreshold = 0;
- partitionWriterOptions_.mergeThreshold = 0;
- auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
- auto shuffleWriter = createShuffleWriter(&pool);
-
- pool.setEvictable(shuffleWriter.get());
- for (int i = 0; i < 3; ++i) {
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorComplex_));
- }
- // Reclaim bytes to shrink partition buffer.
- int64_t reclaimed = 0;
- ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(2000, &reclaimed));
- ASSERT_TRUE(reclaimed >= 2000);
-
- // Reclaim from PartitionWriter to free cached bytes.
- auto payloadSize = shuffleWriter->cachedPayloadSize();
- int64_t evicted;
- ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize, &evicted));
- ASSERT_EQ(evicted, payloadSize);
-
- // When evicting partitioning buffers in stop, spill will be triggered by
complex types allocating extra memory.
- ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] {
ASSERT_NOT_OK(shuffleWriter->stop()); }));
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, evictPartitionBuffers) {
- ASSERT_NOT_OK(initShuffleWriterOptions());
- shuffleWriterOptions_.bufferSize = 4;
- auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
- auto shuffleWriter = createShuffleWriter(&pool);
-
- pool.setEvictable(shuffleWriter.get());
-
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
- // First evict cached payloads.
- int64_t evicted;
-
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(shuffleWriter->cachedPayloadSize(),
&evicted));
- ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
- ASSERT_GT(shuffleWriter->partitionBufferSize(), 0);
- // Set limited capacity.
- pool.setCapacity(0);
- // Evict again. Because no cached payload to evict, it will try to evict all
partition buffers.
-
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(shuffleWriter->partitionBufferSize(),
&evicted));
- ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, kUnevictableSingle) {
- ASSERT_NOT_OK(initShuffleWriterOptions());
- shuffleWriterOptions_.partitioning = Partitioning::kSingle;
- auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get());
- auto shuffleWriter = createShuffleWriter(&pool);
-
- pool.setEvictable(shuffleWriter.get());
-
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
- // First evict cached payloads.
- int64_t evicted;
- ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(
- shuffleWriter->partitionBufferSize() +
shuffleWriter->cachedPayloadSize(), &evicted));
- ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
- ASSERT_GE(evicted, 0);
-
- // Evict again. The evicted size should be 0.
- ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(1, &evicted));
- ASSERT_EQ(evicted, 0);
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, resizeBinaryBufferTriggerSpill) {
- ASSERT_NOT_OK(initShuffleWriterOptions());
- shuffleWriterOptions_.bufferReallocThreshold = 1;
- auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
- auto shuffleWriter = createShuffleWriter(&pool);
-
- pool.setEvictable(shuffleWriter.get());
-
- // Split first input vector. Large average string length.
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorLargeBinary1_));
-
- // Evict cached payloads.
- int64_t evicted;
-
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(shuffleWriter->cachedPayloadSize(),
&evicted));
- ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
- // Set limited capacity.
- ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] {
- // Split second input vector. Large average string length.
- ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorLargeBinary2_));
- }));
- ASSERT_NOT_OK(shuffleWriter->stop());
-}
-
INSTANTIATE_TEST_SUITE_P(
- VeloxShuffleWriteParam,
+ SinglePartitioningShuffleWriterGroup,
SinglePartitioningShuffleWriter,
- ::testing::ValuesIn(kShuffleWriteTestParams));
+ ::testing::ValuesIn(getTestParams()));
INSTANTIATE_TEST_SUITE_P(
- VeloxShuffleWriteParam,
+ RoundRobinPartitioningShuffleWriterGroup,
RoundRobinPartitioningShuffleWriter,
- ::testing::ValuesIn(kShuffleWriteTestParams));
+ ::testing::ValuesIn(getTestParams()));
INSTANTIATE_TEST_SUITE_P(
- VeloxShuffleWriteParam,
+ HashPartitioningShuffleWriterGroup,
HashPartitioningShuffleWriter,
- ::testing::ValuesIn(kShuffleWriteTestParams));
+ ::testing::ValuesIn(getTestParams()));
INSTANTIATE_TEST_SUITE_P(
- VeloxShuffleWriteParam,
+ RangePartitioningShuffleWriterGroup,
RangePartitioningShuffleWriter,
- ::testing::ValuesIn(kShuffleWriteTestParams));
+ ::testing::ValuesIn(getTestParams()));
} // namespace gluten
+
+int main(int argc, char** argv) {
+ testing::InitGoogleTest(&argc, argv);
+ ::testing::AddGlobalTestEnvironment(new
gluten::VeloxShuffleWriterTestEnvironment);
+ return RUN_ALL_TESTS();
+}
diff --git a/cpp/velox/tests/VeloxShuffleWriterTestBase.h
b/cpp/velox/tests/VeloxShuffleWriterTestBase.h
new file mode 100644
index 0000000000..913bd8e487
--- /dev/null
+++ b/cpp/velox/tests/VeloxShuffleWriterTestBase.h
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <arrow/result.h>
+
+#include <gtest/gtest.h>
+
+#include <compute/VeloxBackend.h>
+#include "memory/VeloxColumnarBatch.h"
+#include "shuffle/LocalPartitionWriter.h"
+#include "shuffle/PartitionWriter.h"
+#include "shuffle/VeloxShuffleReader.h"
+#include "shuffle/rss/RssPartitionWriter.h"
+#include "utils/LocalRssClient.h"
+#include "utils/TestAllocationListener.h"
+#include "utils/VeloxArrowUtils.h"
+#include "velox/type/Type.h"
+
+#include "velox/vector/tests/VectorTestUtils.h"
+
+namespace gluten {
+
+namespace {
+std::string makeString(uint32_t length) {
+ static const std::string kLargeStringOf128Bytes =
+ "thisisalaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
+ "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaargestringlengthmorethan16bytes";
+ std::string res{};
+ auto repeats = length / kLargeStringOf128Bytes.length();
+ while (repeats--) {
+ res.append(kLargeStringOf128Bytes);
+ }
+ if (auto remains = length % kLargeStringOf128Bytes.length()) {
+ res.append(kLargeStringOf128Bytes.substr(0, remains));
+ }
+ return res;
+}
+
+std::unique_ptr<PartitionWriter> createPartitionWriter(
+ PartitionWriterType partitionWriterType,
+ uint32_t numPartitions,
+ const std::string& dataFile,
+ const std::vector<std::string>& localDirs,
+ const PartitionWriterOptions& options,
+ arrow::MemoryPool* pool) {
+ if (partitionWriterType == PartitionWriterType::kRss) {
+ auto rssClient = std::make_unique<LocalRssClient>(dataFile);
+ return std::make_unique<RssPartitionWriter>(numPartitions, options, pool,
std::move(rssClient));
+ }
+ return std::make_unique<LocalPartitionWriter>(numPartitions, options, pool,
dataFile, localDirs);
+}
+} // namespace
+
+class VeloxShuffleWriterTestBase : public
facebook::velox::test::VectorTestBase {
+ public:
+ virtual ~VeloxShuffleWriterTestBase() = default;
+
+ static void setUpVeloxBackend() {
+ auto listener = std::make_unique<TestAllocationListener>();
+ listener_ = listener.get();
+
+ std::unordered_map<std::string, std::string>
conf{{kMemoryReservationBlockSize, "1"}};
+
+ VeloxBackend::create(std::move(listener), conf);
+ }
+
+ static void tearDownVeloxBackend() {
+ VeloxBackend::get()->tearDown();
+ }
+
+ protected:
+ void setUpTestData() {
+ GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFile());
+
+ // Set up test data.
+ children1_ = {
+ makeNullableFlatVector<int8_t>({1, 2, 3, std::nullopt, 4,
std::nullopt, 5, 6, std::nullopt, 7}),
+ makeNullableFlatVector<int8_t>({1, -1, std::nullopt, std::nullopt, -2,
2, std::nullopt, std::nullopt, 3, -3}),
+ makeNullableFlatVector<int32_t>({1, 2, 3, 4, std::nullopt, 5, 6, 7, 8,
std::nullopt}),
+ makeNullableFlatVector<int64_t>(
+ {std::nullopt,
+ std::nullopt,
+ std::nullopt,
+ std::nullopt,
+ std::nullopt,
+ std::nullopt,
+ std::nullopt,
+ std::nullopt,
+ std::nullopt,
+ std::nullopt}),
+ makeNullableFlatVector<float>(
+ {-0.1234567,
+ std::nullopt,
+ 0.1234567,
+ std::nullopt,
+ -0.142857,
+ std::nullopt,
+ 0.142857,
+ 0.285714,
+ 0.428617,
+ std::nullopt}),
+ makeNullableFlatVector<bool>(
+ {std::nullopt, true, false, std::nullopt, true, true, false, true,
std::nullopt, std::nullopt}),
+ makeFlatVector<facebook::velox::StringView>(
+ {"alice0", "bob1", "alice2", "bob3", "Alice4", "Bob5", "AlicE6",
"boB7", "ALICE8", "BOB9"}),
+ makeNullableFlatVector<facebook::velox::StringView>(
+ {"alice", "bob", std::nullopt, std::nullopt, "Alice", "Bob",
std::nullopt, "alicE", std::nullopt, "boB"}),
+ facebook::velox::BaseVector::create(facebook::velox::UNKNOWN(), 10,
pool())};
+
+ children2_ = {
+ makeNullableFlatVector<int8_t>({std::nullopt, std::nullopt}),
+ makeFlatVector<int8_t>({1, -1}),
+ makeNullableFlatVector<int32_t>({100, std::nullopt}),
+ makeFlatVector<int64_t>({1, 1}),
+ makeFlatVector<float>({0.142857, -0.142857}),
+ makeFlatVector<bool>({true, false}),
+ makeFlatVector<facebook::velox::StringView>(
+ {"bob",
+
"alicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealice"}),
+ makeNullableFlatVector<facebook::velox::StringView>({std::nullopt,
std::nullopt}),
+ facebook::velox::BaseVector::create(facebook::velox::UNKNOWN(), 2,
pool())};
+
+ childrenNoNull_ = {
+ makeFlatVector<int8_t>({0, 1}),
+ makeFlatVector<int8_t>({0, -1}),
+ makeFlatVector<int32_t>({0, 100}),
+ makeFlatVector<int64_t>({0, 1}),
+ makeFlatVector<float>({0, 0.142857}),
+ makeFlatVector<bool>({false, true}),
+ makeFlatVector<facebook::velox::StringView>({"", "alice"}),
+ makeFlatVector<facebook::velox::StringView>({"alice", ""}),
+ };
+
+ largeString1_ = makeString(1024);
+ int32_t numRows = 1024;
+ childrenLargeBinary1_ = {
+ makeFlatVector<int8_t>(std::vector<int8_t>(numRows, 0)),
+ makeFlatVector<int8_t>(std::vector<int8_t>(numRows, 0)),
+ makeFlatVector<int32_t>(std::vector<int32_t>(numRows, 0)),
+ makeFlatVector<int64_t>(std::vector<int64_t>(numRows, 0)),
+ makeFlatVector<float>(std::vector<float>(numRows, 0)),
+ makeFlatVector<bool>(std::vector<bool>(numRows, true)),
+ makeNullableFlatVector<facebook::velox::StringView>(
+ std::vector<std::optional<facebook::velox::StringView>>(numRows,
largeString1_.c_str())),
+ makeNullableFlatVector<facebook::velox::StringView>(
+ std::vector<std::optional<facebook::velox::StringView>>(numRows,
std::nullopt)),
+ };
+
+ largeString2_ = makeString(4096);
+ numRows = 2048;
+ auto vectorToSpill = childrenLargeBinary2_ = {
+ makeFlatVector<int8_t>(std::vector<int8_t>(numRows, 0)),
+ makeFlatVector<int8_t>(std::vector<int8_t>(numRows, 0)),
+ makeFlatVector<int32_t>(std::vector<int32_t>(numRows, 0)),
+ makeFlatVector<int64_t>(std::vector<int64_t>(numRows, 0)),
+ makeFlatVector<float>(std::vector<float>(numRows, 0)),
+ makeFlatVector<bool>(std::vector<bool>(numRows, true)),
+ makeNullableFlatVector<facebook::velox::StringView>(
+ std::vector<std::optional<facebook::velox::StringView>>(numRows,
largeString2_.c_str())),
+ makeNullableFlatVector<facebook::velox::StringView>(
+ std::vector<std::optional<facebook::velox::StringView>>(numRows,
std::nullopt)),
+ };
+
+ childrenComplex_ = {
+ makeNullableFlatVector<int32_t>({std::nullopt, 1}),
+ makeRowVector({
+ makeFlatVector<int32_t>({1, 3}),
+ makeNullableFlatVector<facebook::velox::StringView>({std::nullopt,
"de"}),
+ }),
+ makeNullableFlatVector<facebook::velox::StringView>({std::nullopt, "10
I'm not inline string"}),
+ makeArrayVector<int64_t>({
+ {1, 2, 3, 4, 5},
+ {1, 2, 3},
+ }),
+ makeMapVector<int32_t, facebook::velox::StringView>(
+ {{{1, "str1000"}, {2, "str2000"}}, {{3, "str3000"}, {4,
"str4000"}}}),
+ };
+
+ inputVector1_ = makeRowVector(children1_);
+ inputVector2_ = makeRowVector(children2_);
+ inputVectorNoNull_ = makeRowVector(childrenNoNull_);
+ inputVectorLargeBinary1_ = makeRowVector(childrenLargeBinary1_);
+ inputVectorLargeBinary2_ = makeRowVector(childrenLargeBinary2_);
+ inputVectorComplex_ = makeRowVector(childrenComplex_);
+ }
+
+ arrow::Status splitRowVector(
+ VeloxShuffleWriter& shuffleWriter,
+ facebook::velox::RowVectorPtr vector,
+ int64_t memLimit = ShuffleWriter::kMinMemLimit) {
+ std::shared_ptr<ColumnarBatch> cb =
std::make_shared<VeloxColumnarBatch>(vector);
+ return shuffleWriter.write(cb, memLimit);
+ }
+
+ // Create multiple local dirs and join with comma.
+ arrow::Status setLocalDirsAndDataFile() {
+ static const std::string kTestLocalDirsPrefix = "columnar-shuffle-test-";
+
+ // Create first tmp dir and create data file.
+ // To prevent tmpDirs from being deleted in the dtor, we need to store
them.
+ tmpDirs_.emplace_back();
+ ARROW_ASSIGN_OR_RAISE(tmpDirs_.back(),
arrow::internal::TemporaryDir::Make(kTestLocalDirsPrefix))
+ ARROW_ASSIGN_OR_RAISE(dataFile_,
createTempShuffleFile(tmpDirs_.back()->path().ToString()));
+ localDirs_.push_back(tmpDirs_.back()->path().ToString());
+
+ // Create second tmp dir.
+ tmpDirs_.emplace_back();
+ ARROW_ASSIGN_OR_RAISE(tmpDirs_.back(),
arrow::internal::TemporaryDir::Make(kTestLocalDirsPrefix))
+ localDirs_.push_back(tmpDirs_.back()->path().ToString());
+ return arrow::Status::OK();
+ }
+
+ virtual std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t
numPartitions) = 0;
+
+ inline static TestAllocationListener* listener_{nullptr};
+
+ ShuffleWriterOptions shuffleWriterOptions_{};
+ PartitionWriterOptions partitionWriterOptions_{};
+
+ std::vector<std::unique_ptr<arrow::internal::TemporaryDir>> tmpDirs_;
+ std::string dataFile_;
+ std::vector<std::string> localDirs_;
+
+ std::vector<facebook::velox::VectorPtr> children1_;
+ std::vector<facebook::velox::VectorPtr> children2_;
+ std::vector<facebook::velox::VectorPtr> childrenNoNull_;
+ std::vector<facebook::velox::VectorPtr> childrenLargeBinary1_;
+ std::vector<facebook::velox::VectorPtr> childrenLargeBinary2_;
+ std::vector<facebook::velox::VectorPtr> childrenComplex_;
+
+ facebook::velox::RowVectorPtr inputVector1_;
+ facebook::velox::RowVectorPtr inputVector2_;
+ facebook::velox::RowVectorPtr inputVectorNoNull_;
+ std::string largeString1_;
+ std::string largeString2_;
+ facebook::velox::RowVectorPtr inputVectorLargeBinary1_;
+ facebook::velox::RowVectorPtr inputVectorLargeBinary2_;
+ facebook::velox::RowVectorPtr inputVectorComplex_;
+};
+
+} // namespace gluten
diff --git a/cpp/velox/utils/LocalRssClient.cc
b/cpp/velox/utils/LocalRssClient.cc
new file mode 100644
index 0000000000..3ef3c61444
--- /dev/null
+++ b/cpp/velox/utils/LocalRssClient.cc
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LocalRssClient.h"
+#include "utils/Common.h"
+
+#include <arrow/io/file.h>
+
+namespace gluten {
+
+int32_t LocalRssClient::pushPartitionData(int32_t partitionId, const char*
bytes, int64_t size) {
+ auto [iter, inserted] = partitionBufferMap_.try_emplace(partitionId,
buffers_.size());
+ int32_t idx = iter->second;
+
+ // Allocate new buffer if it's a new partition.
+ if (inserted) {
+ buffers_.emplace_back();
+ GLUTEN_ASSIGN_OR_THROW(buffers_.back(), arrow::AllocateResizableBuffer(0));
+ }
+
+ auto& buffer = buffers_[idx];
+ auto newSize = buffer->size() + size;
+
+ if (buffer->capacity() < newSize) {
+ GLUTEN_THROW_NOT_OK(buffer->Reserve(newSize));
+ }
+
+ fastCopy(buffer->mutable_data() + buffer->size(), bytes, size);
+ GLUTEN_THROW_NOT_OK(buffer->Resize(newSize));
+
+ return size;
+}
+
+void LocalRssClient::stop() {
+ std::shared_ptr<arrow::io::FileOutputStream> out;
+ GLUTEN_ASSIGN_OR_THROW(out, arrow::io::FileOutputStream::Open(dataFile_));
+
+ for (auto item : partitionBufferMap_) {
+ GLUTEN_THROW_NOT_OK(out->Write(buffers_[item.second]->data(),
buffers_[item.second]->size()));
+ GLUTEN_THROW_NOT_OK(out->Flush());
+ }
+ GLUTEN_THROW_NOT_OK(out->Close());
+
+ buffers_.clear();
+ partitionBufferMap_.clear();
+}
+
+} // namespace gluten
\ No newline at end of file
diff --git a/cpp/core/shuffle/rss/RssClient.h b/cpp/velox/utils/LocalRssClient.h
similarity index 57%
copy from cpp/core/shuffle/rss/RssClient.h
copy to cpp/velox/utils/LocalRssClient.h
index dddccfa1ad..e14c9fd01f 100644
--- a/cpp/core/shuffle/rss/RssClient.h
+++ b/cpp/velox/utils/LocalRssClient.h
@@ -17,11 +17,28 @@
#pragma once
-class RssClient {
+#include "shuffle/rss/RssClient.h"
+#include "utils/Common.h"
+#include "utils/Macros.h"
+
+#include <arrow/buffer.h>
+
+#include <map>
+
+namespace gluten {
+
+/// A local implementation of the RssClient interface for testing purposes.
+class LocalRssClient : public RssClient {
public:
- virtual ~RssClient() = default;
+ LocalRssClient(std::string dataFile) : dataFile_(dataFile) {}
+
+ int32_t pushPartitionData(int32_t partitionId, const char* bytes, int64_t
size) override;
- virtual int32_t pushPartitionData(int32_t partitionId, const char* bytes,
int64_t size) = 0;
+ void stop() override;
- virtual void stop() = 0;
+ private:
+ std::string dataFile_;
+ std::vector<std::unique_ptr<arrow::ResizableBuffer>> buffers_;
+ std::map<uint32_t, uint32_t> partitionBufferMap_;
};
+} // namespace gluten
diff --git a/cpp/velox/utils/TestAllocationListener.cc
b/cpp/velox/utils/TestAllocationListener.cc
new file mode 100644
index 0000000000..835ec263eb
--- /dev/null
+++ b/cpp/velox/utils/TestAllocationListener.cc
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/TestAllocationListener.h"
+#include "velox/common/base/SuccinctPrinter.h"
+
+#include <fmt/format.h>
+
+namespace gluten {
+
+void TestAllocationListener::allocationChanged(int64_t diff) {
+ if (diff > 0 && usedBytes_ + diff >= limit_) {
+ LOG(INFO) << fmt::format(
+ "reach hard limit {} when need {}, current used {}.",
+ facebook::velox::succinctBytes(limit_),
+ facebook::velox::succinctBytes(diff),
+ facebook::velox::succinctBytes(usedBytes_));
+ auto neededBytes = usedBytes_ + diff - limit_;
+ int64_t spilledBytes = 0;
+ if (iterator_) {
+ spilledBytes += iterator_->spillFixedSize(neededBytes);
+ }
+ if (spilledBytes < neededBytes && shuffleWriter_) {
+ int64_t reclaimed = 0;
+ GLUTEN_THROW_NOT_OK(shuffleWriter_->reclaimFixedSize(neededBytes -
spilledBytes, &reclaimed));
+ spilledBytes += reclaimed;
+ }
+ reclaimedBytes_ += spilledBytes;
+ LOG(INFO) << fmt::format("spill finish, got {}.",
facebook::velox::succinctBytes(spilledBytes));
+
+ if (spilledBytes < neededBytes && throwIfOOM_) {
+ throw GlutenException(fmt::format(
+ "Failed to reclaim {} bytes. Actual bytes reclaimed: {}",
+ facebook::velox::succinctBytes(neededBytes),
+ facebook::velox::succinctBytes(spilledBytes)));
+ }
+ }
+
+ usedBytes_ += diff;
+}
+
+int64_t TestAllocationListener::currentBytes() {
+ return usedBytes_;
+}
+
+int64_t TestAllocationListener::reclaimedBytes() const {
+ return reclaimedBytes_;
+}
+
+void TestAllocationListener::reset() {
+ usedBytes_ = 0;
+ reclaimedBytes_ = 0;
+ limit_ = std::numeric_limits<uint64_t>::max();
+ iterator_ = nullptr;
+ shuffleWriter_ = nullptr;
+ throwIfOOM_ = false;
+}
+} // namespace gluten
diff --git a/cpp/velox/utils/TestAllocationListener.h
b/cpp/velox/utils/TestAllocationListener.h
new file mode 100644
index 0000000000..9a94709aa3
--- /dev/null
+++ b/cpp/velox/utils/TestAllocationListener.h
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "compute/ResultIterator.h"
+#include "memory/AllocationListener.h"
+#include "shuffle/ShuffleWriter.h"
+
+namespace gluten {
+
+// instance with limited capacity, used by tests and benchmarks.
+class TestAllocationListener final : public AllocationListener {
+ public:
+ TestAllocationListener() = default;
+
+ void setThrowIfOOM(bool throwIfOOM) {
+ throwIfOOM_ = throwIfOOM;
+ }
+
+ void updateLimit(uint64_t limit) {
+ limit_ = limit;
+ }
+
+ void setIterator(ResultIterator* iterator) {
+ iterator_ = iterator;
+ }
+
+ void setShuffleWriter(ShuffleWriter* shuffleWriter) {
+ shuffleWriter_ = shuffleWriter;
+ }
+
+ void allocationChanged(int64_t diff) override;
+
+ int64_t currentBytes() override;
+
+ int64_t reclaimedBytes() const;
+
+ void reset();
+
+ private:
+ bool throwIfOOM_{false};
+
+ uint64_t usedBytes_{0L};
+ uint64_t reclaimedBytes_{0L};
+
+ uint64_t limit_{std::numeric_limits<uint64_t>::max()};
+ ResultIterator* iterator_{nullptr};
+ ShuffleWriter* shuffleWriter_{nullptr};
+};
+
+} // namespace gluten
diff --git a/cpp/velox/utils/VeloxArrowUtils.cc
b/cpp/velox/utils/VeloxArrowUtils.cc
index f26b49a476..7c95835625 100644
--- a/cpp/velox/utils/VeloxArrowUtils.cc
+++ b/cpp/velox/utils/VeloxArrowUtils.cc
@@ -57,6 +57,25 @@ arrow::Result<std::shared_ptr<ColumnarBatch>>
recordBatch2VeloxColumnarBatch(con
return
std::make_shared<VeloxColumnarBatch>(std::dynamic_pointer_cast<velox::RowVector>(vp));
}
+facebook::velox::common::CompressionKind
arrowCompressionTypeToVelox(arrow::Compression::type type) {
+ switch (type) {
+ case arrow::Compression::UNCOMPRESSED:
+ return facebook::velox::common::CompressionKind::CompressionKind_NONE;
+ case arrow::Compression::LZ4_FRAME:
+ return facebook::velox::common::CompressionKind::CompressionKind_LZ4;
+ case arrow::Compression::ZSTD:
+ return facebook::velox::common::CompressionKind::CompressionKind_ZSTD;
+ case arrow::Compression::GZIP:
+ return facebook::velox::common::CompressionKind::CompressionKind_GZIP;
+ case arrow::Compression::SNAPPY:
+ return facebook::velox::common::CompressionKind::CompressionKind_SNAPPY;
+ case arrow::Compression::LZO:
+ return facebook::velox::common::CompressionKind::CompressionKind_LZO;
+ default:
+ VELOX_UNSUPPORTED("Unsupported arrow compression type {}",
arrow::util::Codec::GetCodecAsString(type));
+ }
+}
+
arrow::Result<std::shared_ptr<arrow::Buffer>> toArrowBuffer(
facebook::velox::BufferPtr buffer,
arrow::MemoryPool* pool) {
diff --git a/cpp/velox/utils/VeloxArrowUtils.h
b/cpp/velox/utils/VeloxArrowUtils.h
index 3fb350266d..bacfbbd2b8 100644
--- a/cpp/velox/utils/VeloxArrowUtils.h
+++ b/cpp/velox/utils/VeloxArrowUtils.h
@@ -19,14 +19,18 @@
#pragma once
-#include <arrow/memory_pool.h>
-#include <arrow/type.h>
#include "memory/ColumnarBatch.h"
+
#include "velox/buffer/Buffer.h"
+#include "velox/common/compression/Compression.h"
#include "velox/common/memory/MemoryPool.h"
#include "velox/type/Type.h"
#include "velox/vector/arrow/Bridge.h"
+#include <arrow/memory_pool.h>
+#include <arrow/type.h>
+#include <arrow/util/compression.h>
+
namespace gluten {
class ArrowUtils {
@@ -56,4 +60,5 @@ arrow::Result<std::shared_ptr<arrow::Buffer>>
toArrowBuffer(facebook::velox::Buf
*/
arrow::Result<std::shared_ptr<ColumnarBatch>>
recordBatch2VeloxColumnarBatch(const arrow::RecordBatch& rb);
+facebook::velox::common::CompressionKind
arrowCompressionTypeToVelox(arrow::Compression::type type);
} // namespace gluten
diff --git a/cpp/velox/utils/tests/LocalRssClient.h
b/cpp/velox/utils/tests/LocalRssClient.h
deleted file mode 100644
index ff2f89ef7f..0000000000
--- a/cpp/velox/utils/tests/LocalRssClient.h
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <arrow/buffer.h>
-#include <arrow/util/io_util.h>
-#include <map>
-#include "shuffle/rss/RssClient.h"
-#include "utils/Common.h"
-#include "utils/Macros.h"
-
-namespace gluten {
-
-class LocalRssClient : public RssClient {
- public:
- LocalRssClient(std::string dataFile) : dataFile_(dataFile) {}
-
- int32_t pushPartitionData(int32_t partitionId, const char* bytes, int64_t
size) {
- auto idx = -1;
- auto maybeIdx = partitionIdx_.find(partitionId);
- if (maybeIdx == partitionIdx_.end()) {
- idx = partitionIdx_.size();
- partitionIdx_[partitionId] = idx;
- auto buffer = arrow::AllocateResizableBuffer(0).ValueOrDie();
- partitionBuffers_.push_back(std::move(buffer));
- } else {
- idx = maybeIdx->second;
- }
-
- auto& buffer = partitionBuffers_[idx];
- auto newSize = buffer->size() + size;
- if (buffer->capacity() < newSize) {
- GLUTEN_THROW_NOT_OK(buffer->Reserve(newSize));
- }
- memcpy(buffer->mutable_data() + buffer->size(), bytes, size);
- GLUTEN_THROW_NOT_OK(buffer->Resize(newSize));
- return size;
- }
-
- void stop() {
- std::shared_ptr<arrow::io::FileOutputStream> fout;
- GLUTEN_ASSIGN_OR_THROW(fout, arrow::io::FileOutputStream::Open(dataFile_));
-
- for (auto item : partitionIdx_) {
- auto idx = item.second;
- GLUTEN_THROW_NOT_OK(fout->Write(partitionBuffers_[idx]->data(),
partitionBuffers_[idx]->size()));
- GLUTEN_THROW_NOT_OK(fout->Flush());
- }
- GLUTEN_THROW_NOT_OK(fout->Close());
- partitionBuffers_.clear();
- partitionIdx_.clear();
- }
-
- private:
- std::string dataFile_;
- std::vector<std::unique_ptr<arrow::ResizableBuffer>> partitionBuffers_;
- std::map<uint32_t, uint32_t> partitionIdx_;
-};
-
-} // namespace gluten
diff --git a/cpp/velox/utils/tests/MemoryPoolUtils.cc
b/cpp/velox/utils/tests/MemoryPoolUtils.cc
deleted file mode 100644
index 2d4e19511a..0000000000
--- a/cpp/velox/utils/tests/MemoryPoolUtils.cc
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "utils/tests/MemoryPoolUtils.h"
-
-namespace gluten {
-
-arrow::Status LimitedMemoryPool::Allocate(int64_t size, int64_t alignment,
uint8_t** out) {
- if (bytes_allocated() + size > capacity_) {
- return arrow::Status::OutOfMemory("malloc of size ", size, " failed");
- }
- RETURN_NOT_OK(pool_->Allocate(size, alignment, out));
- stats_.UpdateAllocatedBytes(size);
- return arrow::Status::OK();
-}
-
-arrow::Status LimitedMemoryPool::Reallocate(int64_t oldSize, int64_t newSize,
int64_t alignment, uint8_t** ptr) {
- if (newSize > capacity_) {
- return arrow::Status::OutOfMemory("malloc of size ", newSize, " failed");
- }
- RETURN_NOT_OK(pool_->Reallocate(oldSize, newSize, alignment, ptr));
- stats_.UpdateAllocatedBytes(newSize - oldSize);
- return arrow::Status::OK();
-}
-
-void LimitedMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment)
{
- pool_->Free(buffer, size, alignment);
- stats_.UpdateAllocatedBytes(-size);
-}
-
-int64_t LimitedMemoryPool::bytes_allocated() const {
- return stats_.bytes_allocated();
-}
-
-int64_t LimitedMemoryPool::max_memory() const {
- return pool_->max_memory();
-}
-
-int64_t LimitedMemoryPool::total_bytes_allocated() const {
- return pool_->total_bytes_allocated();
-}
-
-int64_t LimitedMemoryPool::num_allocations() const {
- throw pool_->num_allocations();
-}
-
-std::string LimitedMemoryPool::backend_name() const {
- return pool_->backend_name();
-}
-
-bool SelfEvictedMemoryPool::checkEvict(int64_t newCapacity,
std::function<void()> block) {
- bytesEvicted_ = 0;
- auto capacity = capacity_;
- // Limit the capacity to trigger evict.
- setCapacity(newCapacity);
-
- block();
-
- capacity_ = capacity;
- return bytesEvicted_ > 0;
-}
-
-void SelfEvictedMemoryPool::setCapacity(int64_t capacity) {
- if (capacity < bytes_allocated()) {
- capacity_ = bytes_allocated();
- } else {
- capacity_ = capacity;
- }
-}
-
-int64_t SelfEvictedMemoryPool::capacity() const {
- return capacity_;
-}
-
-void SelfEvictedMemoryPool::setEvictable(Reclaimable* evictable) {
- evictable_ = evictable;
-}
-
-arrow::Status SelfEvictedMemoryPool::Allocate(int64_t size, int64_t alignment,
uint8_t** out) {
- RETURN_NOT_OK(ensureCapacity(size));
- return pool_->Allocate(size, alignment, out);
-}
-
-arrow::Status SelfEvictedMemoryPool::Reallocate(int64_t oldSize, int64_t
newSize, int64_t alignment, uint8_t** ptr) {
- if (newSize > oldSize) {
- RETURN_NOT_OK(ensureCapacity(newSize - oldSize));
- }
- return pool_->Reallocate(oldSize, newSize, alignment, ptr);
-}
-
-void SelfEvictedMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t
alignment) {
- return pool_->Free(buffer, size, alignment);
-}
-
-int64_t SelfEvictedMemoryPool::bytes_allocated() const {
- return pool_->bytes_allocated();
-}
-
-int64_t SelfEvictedMemoryPool::max_memory() const {
- return pool_->max_memory();
-}
-
-std::string SelfEvictedMemoryPool::backend_name() const {
- return pool_->backend_name();
-}
-
-int64_t SelfEvictedMemoryPool::total_bytes_allocated() const {
- return pool_->total_bytes_allocated();
-}
-
-int64_t SelfEvictedMemoryPool::num_allocations() const {
- throw pool_->num_allocations();
-}
-
-arrow::Status SelfEvictedMemoryPool::ensureCapacity(int64_t size) {
- VELOX_CHECK_NOT_NULL(evictable_);
- DLOG(INFO) << "Size: " << size << ", capacity_: " << capacity_ << ", bytes
allocated: " << pool_->bytes_allocated();
- if (size > capacity_ - pool_->bytes_allocated()) {
- // Self evict.
- int64_t actual;
- RETURN_NOT_OK(evictable_->reclaimFixedSize(size, &actual));
- if (size > capacity_ - pool_->bytes_allocated()) {
- if (failIfOOM_) {
- return arrow::Status::OutOfMemory(
- "Failed to allocate after evict. Capacity: ",
- capacity_,
- ", Requested: ",
- size,
- ", Evicted: ",
- actual,
- ", Allocated: ",
- pool_->bytes_allocated());
- } else {
- capacity_ = size + pool_->bytes_allocated();
- }
- }
- bytesEvicted_ += actual;
- }
- return arrow::Status::OK();
-}
-
-} // namespace gluten
diff --git a/cpp/velox/utils/tests/MemoryPoolUtils.h
b/cpp/velox/utils/tests/MemoryPoolUtils.h
deleted file mode 100644
index 5fdf880be6..0000000000
--- a/cpp/velox/utils/tests/MemoryPoolUtils.h
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <arrow/memory_pool.h>
-#include "memory/Reclaimable.h"
-#include "utils/Exception.h"
-#include "velox/common/base/Exceptions.h"
-
-namespace gluten {
-
-// arrow::MemoryPool instance with limited capacity, used by tests and
benchmarks.
-class LimitedMemoryPool final : public arrow::MemoryPool {
- public:
- explicit LimitedMemoryPool() :
capacity_(std::numeric_limits<int64_t>::max()) {}
- explicit LimitedMemoryPool(int64_t capacity) : capacity_(capacity) {}
-
- arrow::Status Allocate(int64_t size, int64_t alignment, uint8_t** out)
override;
-
- arrow::Status Reallocate(int64_t oldSize, int64_t newSize, int64_t
alignment, uint8_t** ptr) override;
-
- void Free(uint8_t* buffer, int64_t size, int64_t alignment) override;
-
- int64_t bytes_allocated() const override;
-
- int64_t max_memory() const override;
-
- int64_t total_bytes_allocated() const override;
-
- int64_t num_allocations() const override;
-
- std::string backend_name() const override;
-
- private:
- arrow::MemoryPool* pool_ = arrow::default_memory_pool();
- int64_t capacity_;
- arrow::internal::MemoryPoolStats stats_;
-};
-
-// arrow::MemoryPool instance with limited capacity and can be evictable on
OOM, used by tests and benchmarks.
-class SelfEvictedMemoryPool : public arrow::MemoryPool {
- public:
- explicit SelfEvictedMemoryPool(arrow::MemoryPool* pool, bool failIfOOM =
true) : pool_(pool), failIfOOM_(failIfOOM) {}
-
- bool checkEvict(int64_t newCapacity, std::function<void()> block);
-
- void setCapacity(int64_t capacity);
-
- int64_t capacity() const;
-
- void setEvictable(Reclaimable* evictable);
-
- arrow::Status Allocate(int64_t size, int64_t alignment, uint8_t** out)
override;
-
- arrow::Status Reallocate(int64_t oldSize, int64_t newSize, int64_t
alignment, uint8_t** ptr) override;
-
- void Free(uint8_t* buffer, int64_t size, int64_t alignment) override;
-
- int64_t bytes_allocated() const override;
-
- int64_t max_memory() const override;
-
- std::string backend_name() const override;
-
- int64_t total_bytes_allocated() const override;
-
- int64_t num_allocations() const override;
-
- private:
- arrow::Status ensureCapacity(int64_t size);
-
- arrow::MemoryPool* pool_;
- bool failIfOOM_;
-
- Reclaimable* evictable_;
- int64_t capacity_{std::numeric_limits<int64_t>::max()};
-
- int64_t bytesEvicted_{0};
-};
-
-} // namespace gluten
diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
deleted file mode 100644
index 4a69c9a7e1..0000000000
--- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
+++ /dev/null
@@ -1,592 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <arrow/record_batch.h>
-#include <arrow/result.h>
-#include <arrow/util/compression.h>
-#include <gtest/gtest.h>
-#include "LocalRssClient.h"
-#include "memory/VeloxColumnarBatch.h"
-#include "shuffle/PartitionWriter.h"
-#include "shuffle/VeloxShuffleReader.h"
-#include "utils/Compression.h"
-#include "velox/type/Type.h"
-#include "velox/vector/tests/VectorTestUtils.h"
-
-namespace gluten {
-
-namespace {
-std::string makeString(uint32_t length) {
- static const std::string kLargeStringOf128Bytes =
- "thisisalaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
- "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaargestringlengthmorethan16bytes";
- std::string res{};
- auto repeats = length / kLargeStringOf128Bytes.length();
- while (repeats--) {
- res.append(kLargeStringOf128Bytes);
- }
- if (auto remains = length % kLargeStringOf128Bytes.length()) {
- res.append(kLargeStringOf128Bytes.substr(0, remains));
- }
- return res;
-}
-
-std::unique_ptr<PartitionWriter> createPartitionWriter(
- PartitionWriterType partitionWriterType,
- uint32_t numPartitions,
- const std::string& dataFile,
- const std::vector<std::string>& localDirs,
- const PartitionWriterOptions& options,
- arrow::MemoryPool* pool) {
- if (partitionWriterType == PartitionWriterType::kRss) {
- auto rssClient = std::make_unique<LocalRssClient>(dataFile);
- return std::make_unique<RssPartitionWriter>(numPartitions, options, pool,
std::move(rssClient));
- }
- return std::make_unique<LocalPartitionWriter>(numPartitions, options, pool,
dataFile, localDirs);
-}
-} // namespace
-
-struct ShuffleTestParams {
- ShuffleWriterType shuffleWriterType;
- PartitionWriterType partitionWriterType;
- arrow::Compression::type compressionType;
- int32_t compressionThreshold{0};
- int32_t mergeBufferSize{0};
- int32_t diskWriteBufferSize{0};
- bool useRadixSort{false};
-
- std::string toString() const {
- std::ostringstream out;
- out << "shuffleWriterType = " << shuffleWriterType << ",
partitionWriterType = " << partitionWriterType
- << ", compressionType = " << compressionType << ",
compressionThreshold = " << compressionThreshold
- << ", mergeBufferSize = " << mergeBufferSize << ",
compressionBufferSize = " << diskWriteBufferSize
- << ", useRadixSort = " << (useRadixSort ? "true" : "false");
- return out.str();
- }
-};
-
-class VeloxShuffleWriterTestBase : public
facebook::velox::test::VectorTestBase {
- public:
- virtual arrow::Status initShuffleWriterOptions() {
- RETURN_NOT_OK(setLocalDirsAndDataFile());
- return arrow::Status::OK();
- }
-
- protected:
- void setUp() {
- if
(!isRegisteredNamedVectorSerde(facebook::velox::VectorSerde::Kind::kPresto)) {
- // RSS shuffle serde.
-
facebook::velox::serializer::presto::PrestoVectorSerde::registerNamedVectorSerde();
- }
- // Set up test data.
- children1_ = {
- makeNullableFlatVector<int8_t>({1, 2, 3, std::nullopt, 4,
std::nullopt, 5, 6, std::nullopt, 7}),
- makeNullableFlatVector<int8_t>({1, -1, std::nullopt, std::nullopt, -2,
2, std::nullopt, std::nullopt, 3, -3}),
- makeNullableFlatVector<int32_t>({1, 2, 3, 4, std::nullopt, 5, 6, 7, 8,
std::nullopt}),
- makeNullableFlatVector<int64_t>(
- {std::nullopt,
- std::nullopt,
- std::nullopt,
- std::nullopt,
- std::nullopt,
- std::nullopt,
- std::nullopt,
- std::nullopt,
- std::nullopt,
- std::nullopt}),
- makeNullableFlatVector<float>(
- {-0.1234567,
- std::nullopt,
- 0.1234567,
- std::nullopt,
- -0.142857,
- std::nullopt,
- 0.142857,
- 0.285714,
- 0.428617,
- std::nullopt}),
- makeNullableFlatVector<bool>(
- {std::nullopt, true, false, std::nullopt, true, true, false, true,
std::nullopt, std::nullopt}),
- makeFlatVector<facebook::velox::StringView>(
- {"alice0", "bob1", "alice2", "bob3", "Alice4", "Bob5", "AlicE6",
"boB7", "ALICE8", "BOB9"}),
- makeNullableFlatVector<facebook::velox::StringView>(
- {"alice", "bob", std::nullopt, std::nullopt, "Alice", "Bob",
std::nullopt, "alicE", std::nullopt, "boB"}),
- facebook::velox::BaseVector::create(facebook::velox::UNKNOWN(), 10,
pool())};
-
- children2_ = {
- makeNullableFlatVector<int8_t>({std::nullopt, std::nullopt}),
- makeFlatVector<int8_t>({1, -1}),
- makeNullableFlatVector<int32_t>({100, std::nullopt}),
- makeFlatVector<int64_t>({1, 1}),
- makeFlatVector<float>({0.142857, -0.142857}),
- makeFlatVector<bool>({true, false}),
- makeFlatVector<facebook::velox::StringView>(
- {"bob",
-
"alicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealice"}),
- makeNullableFlatVector<facebook::velox::StringView>({std::nullopt,
std::nullopt}),
- facebook::velox::BaseVector::create(facebook::velox::UNKNOWN(), 2,
pool())};
-
- childrenNoNull_ = {
- makeFlatVector<int8_t>({0, 1}),
- makeFlatVector<int8_t>({0, -1}),
- makeFlatVector<int32_t>({0, 100}),
- makeFlatVector<int64_t>({0, 1}),
- makeFlatVector<float>({0, 0.142857}),
- makeFlatVector<bool>({false, true}),
- makeFlatVector<facebook::velox::StringView>({"", "alice"}),
- makeFlatVector<facebook::velox::StringView>({"alice", ""}),
- };
-
- largeString1_ = makeString(1024);
- childrenLargeBinary1_ = {
- makeFlatVector<int8_t>(std::vector<int8_t>(4096, 0)),
- makeFlatVector<int8_t>(std::vector<int8_t>(4096, 0)),
- makeFlatVector<int32_t>(std::vector<int32_t>(4096, 0)),
- makeFlatVector<int64_t>(std::vector<int64_t>(4096, 0)),
- makeFlatVector<float>(std::vector<float>(4096, 0)),
- makeFlatVector<bool>(std::vector<bool>(4096, true)),
- makeNullableFlatVector<facebook::velox::StringView>(
- std::vector<std::optional<facebook::velox::StringView>>(4096,
largeString1_.c_str())),
- makeNullableFlatVector<facebook::velox::StringView>(
- std::vector<std::optional<facebook::velox::StringView>>(4096,
std::nullopt)),
- };
- largeString2_ = makeString(4096);
- auto vectorToSpill = childrenLargeBinary2_ = {
- makeFlatVector<int8_t>(std::vector<int8_t>(2048, 0)),
- makeFlatVector<int8_t>(std::vector<int8_t>(2048, 0)),
- makeFlatVector<int32_t>(std::vector<int32_t>(2048, 0)),
- makeFlatVector<int64_t>(std::vector<int64_t>(2048, 0)),
- makeFlatVector<float>(std::vector<float>(2048, 0)),
- makeFlatVector<bool>(std::vector<bool>(2048, true)),
- makeNullableFlatVector<facebook::velox::StringView>(
- std::vector<std::optional<facebook::velox::StringView>>(2048,
largeString2_.c_str())),
- makeNullableFlatVector<facebook::velox::StringView>(
- std::vector<std::optional<facebook::velox::StringView>>(2048,
std::nullopt)),
- };
- childrenComplex_ = {
- makeNullableFlatVector<int32_t>({std::nullopt, 1}),
- makeRowVector({
- makeFlatVector<int32_t>({1, 3}),
- makeNullableFlatVector<facebook::velox::StringView>({std::nullopt,
"de"}),
- }),
- makeNullableFlatVector<facebook::velox::StringView>({std::nullopt, "10
I'm not inline string"}),
- makeArrayVector<int64_t>({
- {1, 2, 3, 4, 5},
- {1, 2, 3},
- }),
- makeMapVector<int32_t, facebook::velox::StringView>(
- {{{1, "str1000"}, {2, "str2000"}}, {{3, "str3000"}, {4,
"str4000"}}}),
- };
-
- inputVector1_ = makeRowVector(children1_);
- inputVector2_ = makeRowVector(children2_);
- inputVectorNoNull_ = makeRowVector(childrenNoNull_);
- inputVectorLargeBinary1_ = makeRowVector(childrenLargeBinary1_);
- inputVectorLargeBinary2_ = makeRowVector(childrenLargeBinary2_);
- inputVectorComplex_ = makeRowVector(childrenComplex_);
- }
-
- arrow::Status splitRowVector(
- VeloxShuffleWriter& shuffleWriter,
- facebook::velox::RowVectorPtr vector,
- int64_t memLimit = ShuffleWriter::kMinMemLimit) {
- std::shared_ptr<ColumnarBatch> cb =
std::make_shared<VeloxColumnarBatch>(vector);
- return shuffleWriter.write(cb, memLimit);
- }
-
- // Create multiple local dirs and join with comma.
- arrow::Status setLocalDirsAndDataFile() {
- static const std::string kTestLocalDirsPrefix = "columnar-shuffle-test-";
-
- // Create first tmp dir and create data file.
- // To prevent tmpDirs from being deleted in the dtor, we need to store
them.
- tmpDirs_.emplace_back();
- ARROW_ASSIGN_OR_RAISE(tmpDirs_.back(),
arrow::internal::TemporaryDir::Make(kTestLocalDirsPrefix))
- ARROW_ASSIGN_OR_RAISE(dataFile_,
createTempShuffleFile(tmpDirs_.back()->path().ToString()));
- localDirs_.push_back(tmpDirs_.back()->path().ToString());
-
- // Create second tmp dir.
- tmpDirs_.emplace_back();
- ARROW_ASSIGN_OR_RAISE(tmpDirs_.back(),
arrow::internal::TemporaryDir::Make(kTestLocalDirsPrefix))
- localDirs_.push_back(tmpDirs_.back()->path().ToString());
- return arrow::Status::OK();
- }
-
- virtual std::shared_ptr<VeloxShuffleWriter>
createShuffleWriter(arrow::MemoryPool* arrowPool) = 0;
-
- ShuffleWriterOptions shuffleWriterOptions_{};
- PartitionWriterOptions partitionWriterOptions_{};
-
- std::vector<std::unique_ptr<arrow::internal::TemporaryDir>> tmpDirs_;
- std::string dataFile_;
- std::vector<std::string> localDirs_;
-
- std::vector<facebook::velox::VectorPtr> children1_;
- std::vector<facebook::velox::VectorPtr> children2_;
- std::vector<facebook::velox::VectorPtr> childrenNoNull_;
- std::vector<facebook::velox::VectorPtr> childrenLargeBinary1_;
- std::vector<facebook::velox::VectorPtr> childrenLargeBinary2_;
- std::vector<facebook::velox::VectorPtr> childrenComplex_;
-
- facebook::velox::RowVectorPtr inputVector1_;
- facebook::velox::RowVectorPtr inputVector2_;
- facebook::velox::RowVectorPtr inputVectorNoNull_;
- std::string largeString1_;
- std::string largeString2_;
- facebook::velox::RowVectorPtr inputVectorLargeBinary1_;
- facebook::velox::RowVectorPtr inputVectorLargeBinary2_;
- facebook::velox::RowVectorPtr inputVectorComplex_;
-};
-
-class VeloxShuffleWriterTest : public
::testing::TestWithParam<ShuffleTestParams>, public VeloxShuffleWriterTestBase {
- public:
- arrow::Status initShuffleWriterOptions() override {
- RETURN_NOT_OK(VeloxShuffleWriterTestBase::initShuffleWriterOptions());
-
- ShuffleTestParams params = GetParam();
- shuffleWriterOptions_.useRadixSort = params.useRadixSort;
- shuffleWriterOptions_.diskWriteBufferSize = params.diskWriteBufferSize;
- partitionWriterOptions_.compressionType = params.compressionType;
- switch (partitionWriterOptions_.compressionType) {
- case arrow::Compression::UNCOMPRESSED:
- partitionWriterOptions_.compressionTypeStr = "none";
- break;
- case arrow::Compression::LZ4_FRAME:
- partitionWriterOptions_.compressionTypeStr = "lz4";
- break;
- case arrow::Compression::ZSTD:
- partitionWriterOptions_.compressionTypeStr = "zstd";
- break;
- default:
- break;
- };
- partitionWriterOptions_.compressionThreshold = params.compressionThreshold;
- partitionWriterOptions_.mergeBufferSize = params.mergeBufferSize;
- return arrow::Status::OK();
- }
-
- std::shared_ptr<VeloxShuffleWriter> createSpecificShuffleWriter(
- arrow::MemoryPool* arrowPool,
- std::unique_ptr<PartitionWriter> partitionWriter,
- ShuffleWriterOptions shuffleWriterOptions,
- uint32_t numPartitions,
- int32_t bufferSize) {
- if (shuffleWriterOptions.shuffleWriterType ==
ShuffleWriterType::kHashShuffle) {
- shuffleWriterOptions.bufferSize = bufferSize;
- }
- GLUTEN_ASSIGN_OR_THROW(
- auto shuffleWriter,
- VeloxShuffleWriter::create(
- GetParam().shuffleWriterType,
- numPartitions,
- std::move(partitionWriter),
- std::move(shuffleWriterOptions),
- pool_,
- arrowPool));
- return shuffleWriter;
- }
-
- protected:
- static void SetUpTestCase() {
- facebook::velox::memory::MemoryManager::testingSetInstance({});
- }
-
- virtual void SetUp() override {
- std::cout << "Running test with param: " << GetParam().toString() <<
std::endl;
- VeloxShuffleWriterTestBase::setUp();
- }
-
- void TearDown() override {
- if (file_ != nullptr && !file_->closed()) {
- GLUTEN_THROW_NOT_OK(file_->Close());
- }
- }
-
- static void checkFileExists(const std::string& fileName) {
-
ASSERT_EQ(*arrow::internal::FileExists(*arrow::internal::PlatformFilename::FromString(fileName)),
true);
- }
-
- std::shared_ptr<arrow::Schema> getArrowSchema(facebook::velox::RowVectorPtr&
rowVector) {
- return toArrowSchema(rowVector->type(), pool());
- }
-
- void setReadableFile(const std::string& fileName) {
- if (file_ != nullptr && !file_->closed()) {
- GLUTEN_THROW_NOT_OK(file_->Close());
- }
- GLUTEN_ASSIGN_OR_THROW(file_, arrow::io::ReadableFile::Open(fileName))
- }
-
- void getRowVectors(
- arrow::Compression::type compressionType,
- std::shared_ptr<arrow::Schema> schema,
- std::vector<facebook::velox::RowVectorPtr>& vectors,
- std::shared_ptr<arrow::io::InputStream> in) {
- ShuffleReaderOptions options;
- options.compressionType = compressionType;
- auto codec = createArrowIpcCodec(options.compressionType,
CodecBackend::NONE);
- auto rowType = facebook::velox::asRowType(gluten::fromArrowSchema(schema));
- switch (options.compressionType) {
- case arrow::Compression::type::UNCOMPRESSED:
- options.compressionTypeStr = "none";
- break;
- case arrow::Compression::type::LZ4_FRAME:
- options.compressionTypeStr = "lz4";
- break;
- case arrow::Compression::type::ZSTD:
- options.compressionTypeStr = "zstd";
- break;
- default:
- break;
- };
- auto veloxCompressionType =
facebook::velox::common::stringToCompressionKind(options.compressionTypeStr);
- if (!facebook::velox::isRegisteredVectorSerde()) {
-
facebook::velox::serializer::presto::PrestoVectorSerde::registerVectorSerde();
- }
- // Set batchSize to a large value to make all batches are merged by reader.
- auto deserializerFactory =
std::make_unique<gluten::VeloxShuffleReaderDeserializerFactory>(
- schema,
- std::move(codec),
- veloxCompressionType,
- rowType,
- kDefaultBatchSize,
- kDefaultReadBufferSize,
- kDefaultDeserializerBufferSize,
- defaultArrowMemoryPool().get(),
- pool_,
- GetParam().shuffleWriterType);
- auto reader =
std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
- auto iter = reader->readStream(in);
- while (iter->hasNext()) {
- auto vector =
std::dynamic_pointer_cast<VeloxColumnarBatch>(iter->next())->getRowVector();
- vectors.emplace_back(vector);
- }
- }
-
- std::shared_ptr<arrow::io::ReadableFile> file_;
-};
-
-class SinglePartitioningShuffleWriter : public VeloxShuffleWriterTest {
- protected:
- void testShuffleWrite(VeloxShuffleWriter& shuffleWriter,
std::vector<facebook::velox::RowVectorPtr> vectors) {
- for (auto& vector : vectors) {
- ASSERT_NOT_OK(splitRowVector(shuffleWriter, vector));
- }
- ASSERT_NOT_OK(shuffleWriter.stop());
- // verify data file
- checkFileExists(dataFile_);
- // verify output temporary files
- const auto& lengths = shuffleWriter.partitionLengths();
- ASSERT_EQ(lengths.size(), 1);
-
- auto schema = getArrowSchema(vectors[0]);
- std::vector<facebook::velox::RowVectorPtr> deserializedVectors;
- setReadableFile(dataFile_);
- GLUTEN_ASSIGN_OR_THROW(auto in,
arrow::io::RandomAccessFile::GetStream(file_, 0, lengths[0]));
- getRowVectors(partitionWriterOptions_.compressionType, schema,
deserializedVectors, in);
-
- ASSERT_EQ(deserializedVectors.size(), vectors.size());
- for (int32_t i = 0; i < deserializedVectors.size(); i++) {
- facebook::velox::test::assertEqualVectors(vectors[i],
deserializedVectors[i]);
- }
- }
-
- std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool*
arrowPool) override {
- shuffleWriterOptions_.partitioning = Partitioning::kSingle;
- static const uint32_t kNumPartitions = 1;
- auto partitionWriter = createPartitionWriter(
- GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
- std::shared_ptr<VeloxShuffleWriter> shuffleWriter =
createSpecificShuffleWriter(
- arrowPool, std::move(partitionWriter),
std::move(shuffleWriterOptions_), kNumPartitions, 10);
- return shuffleWriter;
- }
-};
-
-class MultiplePartitioningShuffleWriter : public VeloxShuffleWriterTest {
- protected:
- void shuffleWriteReadMultiBlocks(
- VeloxShuffleWriter& shuffleWriter,
- int32_t expectPartitionLength,
- facebook::velox::TypePtr dataType,
- std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors)
{ /* blockId = pid, rowVector in block */
- ASSERT_NOT_OK(shuffleWriter.stop());
- // verify data file
- checkFileExists(dataFile_);
- // verify output temporary files
- const auto& lengths = shuffleWriter.partitionLengths();
- ASSERT_EQ(lengths.size(), expectPartitionLength);
- int64_t lengthSum = std::accumulate(lengths.begin(), lengths.end(), 0);
- auto schema = toArrowSchema(dataType, pool());
- setReadableFile(dataFile_);
- ASSERT_EQ(*file_->GetSize(), lengthSum);
- for (int32_t i = 0; i < expectPartitionLength; i++) {
- if (expectedVectors[i].size() == 0) {
- ASSERT_EQ(lengths[i], 0);
- } else {
- std::vector<facebook::velox::RowVectorPtr> deserializedVectors;
- GLUTEN_ASSIGN_OR_THROW(
- auto in, arrow::io::RandomAccessFile::GetStream(file_, i == 0 ? 0
: lengths[i - 1], lengths[i]));
- getRowVectors(partitionWriterOptions_.compressionType, schema,
deserializedVectors, in);
- ASSERT_EQ(expectedVectors[i].size(), deserializedVectors.size());
- for (int32_t j = 0; j < expectedVectors[i].size(); j++) {
- facebook::velox::test::assertEqualVectors(expectedVectors[i][j],
deserializedVectors[j]);
- }
- }
- }
- }
-
- void testShuffleWriteMultiBlocks(
- VeloxShuffleWriter& shuffleWriter,
- std::vector<facebook::velox::RowVectorPtr> vectors,
- int32_t expectPartitionLength,
- facebook::velox::TypePtr dataType,
- std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors)
{
- for (auto& vector : vectors) {
- ASSERT_NOT_OK(splitRowVector(shuffleWriter, vector));
- }
- shuffleWriteReadMultiBlocks(shuffleWriter, expectPartitionLength,
dataType, expectedVectors);
- }
-};
-
-class HashPartitioningShuffleWriter : public MultiplePartitioningShuffleWriter
{
- protected:
- void SetUp() override {
- MultiplePartitioningShuffleWriter::SetUp();
-
- children1_.insert((children1_.begin()), makeFlatVector<int32_t>({1, 2, 2,
2, 2, 1, 1, 1, 2, 1}));
- hashInputVector1_ = makeRowVector(children1_);
- children2_.insert((children2_.begin()), makeFlatVector<int32_t>({2, 2}));
- hashInputVector2_ = makeRowVector(children2_);
- }
-
- std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool*
arrowPool) override {
- shuffleWriterOptions_.partitioning = Partitioning::kHash;
- static const uint32_t kNumPartitions = 2;
- auto partitionWriter = createPartitionWriter(
- GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
- std::shared_ptr<VeloxShuffleWriter> shuffleWriter =
createSpecificShuffleWriter(
- arrowPool, std::move(partitionWriter),
std::move(shuffleWriterOptions_), kNumPartitions, 4);
- return shuffleWriter;
- }
-
- std::vector<uint32_t> hashPartitionIds_{1, 2};
-
- facebook::velox::RowVectorPtr hashInputVector1_;
- facebook::velox::RowVectorPtr hashInputVector2_;
-};
-
-class RangePartitioningShuffleWriter : public
MultiplePartitioningShuffleWriter {
- protected:
- void SetUp() override {
- MultiplePartitioningShuffleWriter::SetUp();
-
- auto pid1 = makeRowVector({makeFlatVector<int32_t>({0, 1, 0, 1, 0, 1, 0,
1, 0, 1})});
- auto rangeVector1 = makeRowVector(inputVector1_->children());
- compositeBatch1_ = VeloxColumnarBatch::compose(
- pool(), {std::make_shared<VeloxColumnarBatch>(pid1),
std::make_shared<VeloxColumnarBatch>(rangeVector1)});
-
- auto pid2 = makeRowVector({makeFlatVector<int32_t>({0, 1})});
- auto rangeVector2 = makeRowVector(inputVector2_->children());
- compositeBatch2_ = VeloxColumnarBatch::compose(
- pool(), {std::make_shared<VeloxColumnarBatch>(pid2),
std::make_shared<VeloxColumnarBatch>(rangeVector2)});
- }
-
- std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool*
arrowPool) override {
- shuffleWriterOptions_.partitioning = Partitioning::kRange;
- static const uint32_t kNumPartitions = 2;
- auto partitionWriter = createPartitionWriter(
- GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
- std::shared_ptr<VeloxShuffleWriter> shuffleWriter =
createSpecificShuffleWriter(
- arrowPool, std::move(partitionWriter),
std::move(shuffleWriterOptions_), kNumPartitions, 4);
- return shuffleWriter;
- }
-
- void testShuffleWriteMultiBlocks(
- VeloxShuffleWriter& shuffleWriter,
- std::vector<std::shared_ptr<ColumnarBatch>> batches,
- int32_t expectPartitionLength,
- facebook::velox::TypePtr dataType,
- std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors)
{ /* blockId = pid, rowVector in block */
- for (auto& batch : batches) {
- ASSERT_NOT_OK(shuffleWriter.write(batch, ShuffleWriter::kMinMemLimit));
- }
- shuffleWriteReadMultiBlocks(shuffleWriter, expectPartitionLength,
dataType, expectedVectors);
- }
-
- std::shared_ptr<ColumnarBatch> compositeBatch1_;
- std::shared_ptr<ColumnarBatch> compositeBatch2_;
-};
-
-class RoundRobinPartitioningShuffleWriter : public
MultiplePartitioningShuffleWriter {
- protected:
- std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool*
arrowPool) override {
- static const uint32_t kNumPartitions = 2;
- auto partitionWriter = createPartitionWriter(
- GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
- std::shared_ptr<VeloxShuffleWriter> shuffleWriter =
createSpecificShuffleWriter(
- arrowPool, std::move(partitionWriter),
std::move(shuffleWriterOptions_), kNumPartitions, 4);
- return shuffleWriter;
- }
-};
-
-class VeloxHashShuffleWriterMemoryTest : public VeloxShuffleWriterTestBase,
public testing::Test {
- protected:
- static void SetUpTestCase() {
- facebook::velox::memory::MemoryManager::testingSetInstance({});
- }
- void SetUp() override {
- VeloxShuffleWriterTestBase::setUp();
- }
-
- std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t
numPartitions, arrow::MemoryPool* arrowPool) {
- auto partitionWriter = createPartitionWriter(
- PartitionWriterType::kLocal, numPartitions, dataFile_, localDirs_,
partitionWriterOptions_, arrowPool);
- GLUTEN_ASSIGN_OR_THROW(
- auto shuffleWriter,
- VeloxHashShuffleWriter::create(
- numPartitions, std::move(partitionWriter),
std::move(shuffleWriterOptions_), pool_, arrowPool));
- return shuffleWriter;
- }
-
- std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool*
arrowPool) override {
- return createShuffleWriter(kDefaultShufflePartitions, arrowPool);
- }
-
- int64_t splitRowVectorAndSpill(
- VeloxShuffleWriter& shuffleWriter,
- std::vector<facebook::velox::RowVectorPtr> vectors,
- bool shrink) {
- for (auto vector : vectors) {
- ASSERT_NOT_OK(splitRowVector(shuffleWriter, vector));
- }
-
- auto targetEvicted = shuffleWriter.cachedPayloadSize();
- if (shrink) {
- targetEvicted += shuffleWriter.partitionBufferSize();
- }
- int64_t evicted;
- ASSERT_NOT_OK(shuffleWriter.reclaimFixedSize(targetEvicted, &evicted));
-
- return evicted;
- };
-
- static constexpr uint32_t kDefaultShufflePartitions = 2;
-};
-
-} // namespace gluten
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]