This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new de4af5c5e0 [GLUTEN-8855][VL] Support dictionary in hash based shuffle
(#9727)
de4af5c5e0 is described below
commit de4af5c5e0825501abb2b0c368235484ef62ec5b
Author: Rong Ma <[email protected]>
AuthorDate: Wed Jun 25 09:11:02 2025 +0100
[GLUTEN-8855][VL] Support dictionary in hash based shuffle (#9727)
---
.../gluten/backendsapi/velox/VeloxMetricsApi.scala | 5 +-
.../spark/shuffle/ColumnarShuffleWriter.scala | 5 +-
cpp/core/CMakeLists.txt | 1 +
cpp/core/jni/JniWrapper.cc | 17 +-
cpp/core/memory/MemoryManager.h | 6 +-
cpp/core/shuffle/Dictionary.cc | 41 ++
cpp/core/shuffle/Dictionary.h | 60 +++
cpp/core/shuffle/LocalPartitionWriter.cc | 107 ++++-
cpp/core/shuffle/Options.h | 11 +-
cpp/core/shuffle/PartitionWriter.h | 2 +-
cpp/core/shuffle/Payload.cc | 32 +-
cpp/core/shuffle/Payload.h | 3 +
cpp/core/shuffle/ShuffleWriter.cc | 8 +
cpp/core/shuffle/ShuffleWriter.h | 4 +
cpp/core/shuffle/rss/RssPartitionWriter.cc | 4 +
cpp/velox/CMakeLists.txt | 1 +
cpp/velox/benchmarks/GenericBenchmark.cc | 3 +
cpp/velox/compute/VeloxBackend.cc | 6 +
cpp/velox/memory/VeloxMemoryManager.cc | 12 +-
cpp/velox/memory/VeloxMemoryManager.h | 2 +-
cpp/velox/shuffle/ArrowShuffleDictionaryWriter.cc | 483 +++++++++++++++++++++
cpp/velox/shuffle/ArrowShuffleDictionaryWriter.h | 71 +++
cpp/velox/shuffle/VeloxShuffleReader.cc | 354 ++++++++++++---
cpp/velox/shuffle/VeloxShuffleReader.h | 8 +
cpp/velox/shuffle/VeloxShuffleWriter.h | 2 +-
cpp/velox/tests/RuntimeTest.cc | 2 +-
cpp/velox/tests/VeloxShuffleWriterTest.cc | 39 +-
docs/Configuration.md | 1 +
.../gluten/vectorized/GlutenSplitResult.java | 14 +
.../vectorized/LocalPartitionWriterJniWrapper.java | 3 +-
.../org/apache/gluten/config/GlutenConfig.scala | 10 +
31 files changed, 1197 insertions(+), 120 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 304f764aa0..14d4059679 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -321,7 +321,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
)
} else {
baseMetrics ++ Map(
- "splitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time
to split")
+ "splitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time
to split"),
+ "avgDictionaryFields" -> SQLMetrics
+ .createAverageMetric(sparkContext, "avg dictionary fields"),
+ "dictionarySize" -> SQLMetrics.createSizeMetric(sparkContext,
"dictionary size")
)
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index 1597f45b88..6f5348ac0b 100644
---
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -145,7 +145,8 @@ class ColumnarShuffleWriter[K, V](
blockManager.subDirsPerLocalDir,
conf.get(SHUFFLE_FILE_BUFFER_SIZE).toInt,
tempDataFile.getAbsolutePath,
- localDirs
+ localDirs,
+ GlutenConfig.get.columnarShuffleEnableDictionary
)
nativeShuffleWriter = if (isSort) {
@@ -223,6 +224,8 @@ class ColumnarShuffleWriter[K, V](
dep.metrics("shuffleWallTime").value - splitResult.getTotalSpillTime
-
splitResult.getTotalWriteTime -
splitResult.getTotalCompressTime)
+
dep.metrics("avgDictionaryFields").set(splitResult.getAvgDictionaryFields)
+ dep.metrics("dictionarySize").add(splitResult.getDictionarySize)
} else {
dep.metrics("sortTime").add(splitResult.getSortTime)
dep.metrics("c2rTime").add(splitResult.getC2RTime)
diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt
index b164f2a953..5264eeb63e 100644
--- a/cpp/core/CMakeLists.txt
+++ b/cpp/core/CMakeLists.txt
@@ -128,6 +128,7 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
memory/MemoryManager.cc
memory/ArrowMemoryPool.cc
memory/ColumnarBatch.cc
+ shuffle/Dictionary.cc
shuffle/FallbackRangePartitioner.cc
shuffle/HashPartitioner.cc
shuffle/LocalPartitionWriter.cc
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 59c0fa5078..1d2bddfcff 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -147,7 +147,7 @@ class InternalMemoryManager : public MemoryManager {
throw GlutenException("Not implemented");
}
- std::shared_ptr<arrow::MemoryPool> createArrowMemoryPool(const std::string&
name) override {
+ std::shared_ptr<arrow::MemoryPool> getOrCreateArrowMemoryPool(const
std::string& name) override {
throw GlutenException("Not yet implemented");
}
@@ -215,7 +215,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
jniByteInputStreamClose = getMethodIdOrError(env, jniByteInputStreamClass,
"close", "()V");
splitResultClass = createGlobalClassReferenceOrError(env,
"Lorg/apache/gluten/vectorized/GlutenSplitResult;");
- splitResultConstructor = getMethodIdOrError(env, splitResultClass, "<init>",
"(JJJJJJJJJJ[J[J)V");
+ splitResultConstructor = getMethodIdOrError(env, splitResultClass, "<init>",
"(JJJJJJJJJJDJ[J[J)V");
metricsBuilderClass = createGlobalClassReferenceOrError(env,
"Lorg/apache/gluten/metrics/Metrics;");
@@ -794,7 +794,8 @@
Java_org_apache_gluten_vectorized_LocalPartitionWriterJniWrapper_createPartition
jint numSubDirs,
jint shuffleFileBufferSize,
jstring dataFileJstr,
- jstring localDirsJstr) {
+ jstring localDirsJstr,
+ jboolean enableDictionary) {
JNI_METHOD_START
const auto ctx = getRuntime(env, wrapper);
@@ -803,7 +804,13 @@
Java_org_apache_gluten_vectorized_LocalPartitionWriterJniWrapper_createPartition
auto localDirs = splitPaths(jStringToCString(env, localDirsJstr));
auto partitionWriterOptions = std::make_shared<LocalPartitionWriterOptions>(
- shuffleFileBufferSize, compressionBufferSize, compressionThreshold,
mergeBufferSize, mergeThreshold, numSubDirs);
+ shuffleFileBufferSize,
+ compressionBufferSize,
+ compressionThreshold,
+ mergeBufferSize,
+ mergeThreshold,
+ numSubDirs,
+ enableDictionary);
auto partitionWriter = std::make_shared<LocalPartitionWriter>(
numPartitions,
@@ -980,6 +987,8 @@ JNIEXPORT jobject JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrap
shuffleWriter->totalBytesEvicted(),
shuffleWriter->totalBytesToEvict(),
shuffleWriter->peakBytesAllocated(),
+ shuffleWriter->avgDictionaryFields(),
+ shuffleWriter->dictionarySize(),
partitionLengthArr,
rawPartitionLengthArr);
diff --git a/cpp/core/memory/MemoryManager.h b/cpp/core/memory/MemoryManager.h
index cb7512d23c..93d875ad1e 100644
--- a/cpp/core/memory/MemoryManager.h
+++ b/cpp/core/memory/MemoryManager.h
@@ -42,10 +42,10 @@ class MemoryManager {
// Get the default Arrow memory pool for this memory manager. This memory
pool is held by the memory manager.
virtual arrow::MemoryPool* defaultArrowMemoryPool() = 0;
- // Create a new Arrow memory pool with the given name. The caller is
responsible for managing the lifetime of the
+ // Return the Arrow memory pool with the given name. The caller is
responsible for managing the lifetime of the
// returned memory pool. Memory manager only holds the weak reference to the
memory pool for collecting memory usage.
- // If the name is already used by an existing memory pool, the creation will
fail.
- virtual std::shared_ptr<arrow::MemoryPool> createArrowMemoryPool(const
std::string& name) = 0;
+ // If the memory pool with the given name does not exist, it will create a
new one and return it.
+ virtual std::shared_ptr<arrow::MemoryPool> getOrCreateArrowMemoryPool(const
std::string& name) = 0;
virtual const MemoryUsageStats collectMemoryUsageStats() const = 0;
diff --git a/cpp/core/shuffle/Dictionary.cc b/cpp/core/shuffle/Dictionary.cc
new file mode 100644
index 0000000000..05bfb4131a
--- /dev/null
+++ b/cpp/core/shuffle/Dictionary.cc
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "shuffle/Dictionary.h"
+#include "utils/Exception.h"
+
+namespace gluten {
+
+static ShuffleDictionaryWriterFactory dictionaryWriterFactory;
+
+void registerShuffleDictionaryWriterFactory(ShuffleDictionaryWriterFactory
factory) {
+ if (dictionaryWriterFactory) {
+ throw GlutenException("DictionaryWriter factory already registered.");
+ }
+ dictionaryWriterFactory = std::move(factory);
+}
+
+std::unique_ptr<ShuffleDictionaryWriter> createDictionaryWriter(
+ MemoryManager* memoryManager,
+ arrow::util::Codec* codec) {
+ if (!dictionaryWriterFactory) {
+ throw GlutenException("DictionaryWriter factory not registered.");
+ }
+ return dictionaryWriterFactory(memoryManager, codec);
+}
+
+} // namespace gluten
diff --git a/cpp/core/shuffle/Dictionary.h b/cpp/core/shuffle/Dictionary.h
new file mode 100644
index 0000000000..246406d5ae
--- /dev/null
+++ b/cpp/core/shuffle/Dictionary.h
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <arrow/io/api.h>
+
+#include "memory/MemoryManager.h"
+
+namespace gluten {
+
+enum class BlockType : uint8_t { kEndOfStream = 0, kPlainPayload = 1,
kDictionary = 2, kDictionaryPayload = 3 };
+
+class ShuffleDictionaryStorage {
+ public:
+ virtual ~ShuffleDictionaryStorage() = default;
+
+ virtual arrow::Status serialize(arrow::io::OutputStream* out) = 0;
+};
+
+class ShuffleDictionaryWriter {
+ public:
+ virtual ~ShuffleDictionaryWriter() = default;
+
+ virtual arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>>
updateAndGet(
+ const std::shared_ptr<arrow::Schema>& schema,
+ int32_t numRows,
+ const std::vector<std::shared_ptr<arrow::Buffer>>& buffers) = 0;
+
+ virtual arrow::Status serialize(arrow::io::OutputStream* out) = 0;
+
+ virtual int64_t numDictionaryFields() = 0;
+
+ virtual int64_t getDictionarySize() = 0;
+};
+
+using ShuffleDictionaryWriterFactory =
+ std::function<std::unique_ptr<ShuffleDictionaryWriter>(MemoryManager*
memoryManager, arrow::util::Codec* codec)>;
+
+void registerShuffleDictionaryWriterFactory(ShuffleDictionaryWriterFactory
factory);
+
+std::unique_ptr<ShuffleDictionaryWriter> createDictionaryWriter(
+ MemoryManager* memoryManager,
+ arrow::util::Codec* codec);
+
+} // namespace gluten
diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc
b/cpp/core/shuffle/LocalPartitionWriter.cc
index eb7668185e..457db60a5c 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -17,6 +17,7 @@
#include "shuffle/LocalPartitionWriter.h"
+#include "shuffle/Dictionary.h"
#include "shuffle/Payload.h"
#include "shuffle/Spill.h"
#include "shuffle/Utils.h"
@@ -71,6 +72,10 @@ class LocalPartitionWriter::LocalSpiller {
arrow::Status spill(uint32_t partitionId, std::unique_ptr<BlockPayload>
payload) {
ARROW_ASSIGN_OR_RAISE(auto start, os_->Tell());
+
+ static constexpr uint8_t kSpillBlockType =
static_cast<uint8_t>(BlockType::kPlainPayload);
+
+ RETURN_NOT_OK(os_->Write(&kSpillBlockType, sizeof(kSpillBlockType)));
RETURN_NOT_OK(payload->serialize(os_.get()));
ARROW_ASSIGN_OR_RAISE(auto end, os_->Tell());
@@ -283,8 +288,19 @@ class LocalPartitionWriter::PayloadMerger {
class LocalPartitionWriter::PayloadCache {
public:
- PayloadCache(uint32_t numPartitions, arrow::util::Codec* codec, int32_t
compressionThreshold, arrow::MemoryPool* pool)
- : numPartitions_(numPartitions), codec_(codec),
compressionThreshold_(compressionThreshold), pool_(pool) {}
+ PayloadCache(
+ uint32_t numPartitions,
+ arrow::util::Codec* codec,
+ int32_t compressionThreshold,
+ bool enableDictionary,
+ arrow::MemoryPool* pool,
+ MemoryManager* memoryManager)
+ : numPartitions_(numPartitions),
+ codec_(codec),
+ compressionThreshold_(compressionThreshold),
+ enableDictionary_(enableDictionary),
+ pool_(pool),
+ memoryManager_(memoryManager) {}
arrow::Status cache(uint32_t partitionId, std::unique_ptr<InMemoryPayload>
payload) {
PartitionScopeGuard cacheGuard(partitionInUse_, partitionId);
@@ -293,6 +309,13 @@ class LocalPartitionWriter::PayloadCache {
partitionCachedPayload_[partitionId] =
std::list<std::unique_ptr<BlockPayload>>{};
}
+ if (enableDictionary_) {
+ if (partitionDictionaries_.find(partitionId) ==
partitionDictionaries_.end()) {
+ partitionDictionaries_[partitionId] =
createDictionaryWriter(memoryManager_, codec_);
+ }
+
RETURN_NOT_OK(payload->createDictionaries(partitionDictionaries_[partitionId]));
+ }
+
bool shouldCompress = codec_ != nullptr && payload->numRows() >=
compressionThreshold_;
ARROW_ASSIGN_OR_RAISE(
auto block,
@@ -309,12 +332,17 @@ class LocalPartitionWriter::PayloadCache {
"Invalid status: partitionInUse_ is set: " +
std::to_string(partitionInUse_.value()));
if (hasCachedPayloads(partitionId)) {
+ ARROW_ASSIGN_OR_RAISE(const bool hasDictionaries,
writeDictionaries(partitionId, os));
+
auto& payloads = partitionCachedPayload_[partitionId];
while (!payloads.empty()) {
- auto payload = std::move(payloads.front());
+ const auto payload = std::move(payloads.front());
payloads.pop_front();
// Write the cached payload to disk.
+ uint8_t blockType =
+ static_cast<uint8_t>(hasDictionaries ?
BlockType::kDictionaryPayload : BlockType::kPlainPayload);
+ RETURN_NOT_OK(os->Write(&blockType, sizeof(blockType)));
RETURN_NOT_OK(payload->serialize(os));
compressTime_ += payload->getCompressTime();
@@ -342,7 +370,7 @@ class LocalPartitionWriter::PayloadCache {
arrow::util::Codec* codec,
const int64_t bufferSize,
int64_t& totalBytesToEvict) {
- ARROW_ASSIGN_OR_RAISE(auto os, openFile(spillFile, bufferSize));
+ ARROW_ASSIGN_OR_RAISE(const auto os, openFile(spillFile, bufferSize));
int64_t start = 0;
auto diskSpill = std::make_shared<Spill>();
@@ -353,6 +381,8 @@ class LocalPartitionWriter::PayloadCache {
}
if (hasCachedPayloads(pid)) {
+ ARROW_ASSIGN_OR_RAISE(const bool hasDictionaries,
writeDictionaries(pid, os.get()));
+
auto& payloads = partitionCachedPayload_[pid];
while (!payloads.empty()) {
auto payload = std::move(payloads.front());
@@ -360,7 +390,11 @@ class LocalPartitionWriter::PayloadCache {
totalBytesToEvict += payload->rawSize();
// Spill the cached payload to disk.
+ uint8_t blockType =
+ static_cast<uint8_t>(hasDictionaries ?
BlockType::kDictionaryPayload : BlockType::kPlainPayload);
+ RETURN_NOT_OK(os->Write(&blockType, sizeof(blockType)));
RETURN_NOT_OK(payload->serialize(os.get()));
+
compressTime_ += payload->getCompressTime();
spillTime_ += payload->getWriteTime();
}
@@ -394,22 +428,71 @@ class LocalPartitionWriter::PayloadCache {
return writeTime_;
}
+ double getAvgDictionaryFields() const {
+ if (numDictionaryPayloads_ == 0 || dictionaryFieldCount_ == 0) {
+ return 0.0;
+ }
+ return dictionaryFieldCount_ / static_cast<double>(numDictionaryPayloads_);
+ }
+
+ int64_t getDictionarySize() const {
+ return dictionarySize_;
+ }
+
private:
bool hasCachedPayloads(uint32_t partitionId) {
return partitionCachedPayload_.find(partitionId) !=
partitionCachedPayload_.end() &&
!partitionCachedPayload_[partitionId].empty();
}
+ arrow::Result<bool> writeDictionaries(uint32_t partitionId,
arrow::io::OutputStream* os) {
+ if (!enableDictionary_) {
+ return false;
+ }
+
+ const auto& dict = partitionDictionaries_.find(partitionId);
+ GLUTEN_DCHECK(
+ dict != partitionDictionaries_.end(),
+ "Dictionary for partition " + std::to_string(partitionId) + " not
found.");
+
+ const auto numDictionaryFields = dict->second->numDictionaryFields();
+ dictionaryFieldCount_ += numDictionaryFields;
+ ++numDictionaryPayloads_;
+
+ if (numDictionaryFields == 0) {
+ // No dictionary fields, no need to write.
+ return false;
+ }
+
+ dictionarySize_ +=
partitionDictionaries_[partitionId]->getDictionarySize();
+
+ static constexpr uint8_t kDictionaryBlock =
static_cast<uint8_t>(BlockType::kDictionary);
+ RETURN_NOT_OK(os->Write(&kDictionaryBlock, sizeof(kDictionaryBlock)));
+ RETURN_NOT_OK(partitionDictionaries_[partitionId]->serialize(os));
+
+ partitionDictionaries_.erase(partitionId);
+
+ return true;
+ }
+
uint32_t numPartitions_;
arrow::util::Codec* codec_;
int32_t compressionThreshold_;
+ bool enableDictionary_;
arrow::MemoryPool* pool_;
+ MemoryManager* memoryManager_;
int64_t compressTime_{0};
int64_t spillTime_{0};
int64_t writeTime_{0};
std::unordered_map<uint32_t, std::list<std::unique_ptr<BlockPayload>>>
partitionCachedPayload_;
+ std::unordered_map<uint32_t, std::shared_ptr<ShuffleDictionaryWriter>>
partitionDictionaries_;
+
+ int64_t dictionaryFieldCount_{0};
+ int64_t numDictionaryPayloads_{0};
+ int64_t dictionarySize_{0};
+
std::optional<uint32_t> partitionInUse_{std::nullopt};
};
@@ -603,7 +686,12 @@ arrow::Status LocalPartitionWriter::finishMerger() {
if (maybeMerged.has_value()) {
if (payloadCache_ == nullptr) {
payloadCache_ = std::make_shared<PayloadCache>(
- numPartitions_, codec_.get(), options_->compressionThreshold,
payloadPool_.get());
+ numPartitions_,
+ codec_.get(),
+ options_->compressionThreshold,
+ options_->enableDictionary,
+ payloadPool_.get(),
+ memoryManager_);
}
// Spill can be triggered by compressing or building dictionaries.
RETURN_NOT_OK(payloadCache_->cache(pid,
std::move(maybeMerged.value())));
@@ -646,7 +734,12 @@ arrow::Status LocalPartitionWriter::hashEvict(
if (!merged.empty()) {
if (UNLIKELY(!payloadCache_)) {
payloadCache_ = std::make_shared<PayloadCache>(
- numPartitions_, codec_.get(), options_->compressionThreshold,
payloadPool_.get());
+ numPartitions_,
+ codec_.get(),
+ options_->compressionThreshold,
+ options_->enableDictionary,
+ payloadPool_.get(),
+ memoryManager_);
}
for (auto& payload : merged) {
RETURN_NOT_OK(payloadCache_->cache(partitionId, std::move(payload)));
@@ -744,6 +837,8 @@ arrow::Status
LocalPartitionWriter::populateMetrics(ShuffleWriterMetrics* metric
spillTime_ += payloadCache_->getSpillTime();
writeTime_ += payloadCache_->getWriteTime();
compressTime_ += payloadCache_->getCompressTime();
+ metrics->avgDictionaryFields = payloadCache_->getAvgDictionaryFields();
+ metrics->dictionarySize = payloadCache_->getDictionarySize();
}
metrics->totalCompressTime += compressTime_;
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index a5f051848c..717f75dea5 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -41,6 +41,7 @@ static constexpr int32_t kDefaultSortBufferSize = 4096;
static constexpr int64_t kDefaultReadBufferSize = 1 << 20;
static constexpr int64_t kDefaultDeserializerBufferSize = 1 << 20;
static constexpr int64_t kDefaultShuffleFileBufferSize = 32 << 10;
+static constexpr bool kDefaultEnableDictionary = false;
enum class ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle };
@@ -141,6 +142,8 @@ struct LocalPartitionWriterOptions {
int32_t numSubDirs = kDefaultNumSubDirs; // spark.diskStore.subDirectories
+ bool enableDictionary = kDefaultEnableDictionary;
+
LocalPartitionWriterOptions() = default;
LocalPartitionWriterOptions(
@@ -149,13 +152,15 @@ struct LocalPartitionWriterOptions {
int64_t compressionThreshold,
int32_t mergeBufferSize,
double mergeThreshold,
- int32_t numSubDirs)
+ int32_t numSubDirs,
+ bool enableDictionary)
: shuffleFileBufferSize(shuffleFileBufferSize),
compressionBufferSize(compressionBufferSize),
compressionThreshold(compressionThreshold),
mergeBufferSize(mergeBufferSize),
mergeThreshold(mergeThreshold),
- numSubDirs(numSubDirs) {}
+ numSubDirs(numSubDirs),
+ enableDictionary(enableDictionary) {}
};
struct RssPartitionWriterOptions {
@@ -179,6 +184,8 @@ struct ShuffleWriterMetrics {
int64_t totalWriteTime{0};
int64_t totalEvictTime{0};
int64_t totalCompressTime{0};
+ double avgDictionaryFields{0};
+ int64_t dictionarySize{0};
std::vector<int64_t> partitionLengths{};
std::vector<int64_t> rawPartitionLengths{}; // Uncompressed size.
};
diff --git a/cpp/core/shuffle/PartitionWriter.h
b/cpp/core/shuffle/PartitionWriter.h
index 7fbcd73f58..cedae6d2fd 100644
--- a/cpp/core/shuffle/PartitionWriter.h
+++ b/cpp/core/shuffle/PartitionWriter.h
@@ -33,7 +33,7 @@ class PartitionWriter : public Reclaimable {
public:
PartitionWriter(uint32_t numPartitions, std::unique_ptr<arrow::util::Codec>
codec, MemoryManager* memoryManager)
: numPartitions_(numPartitions), codec_(std::move(codec)),
memoryManager_(memoryManager) {
- payloadPool_ =
memoryManager->createArrowMemoryPool("PartitionWriter.cached_payload");
+ payloadPool_ =
memoryManager->getOrCreateArrowMemoryPool("PartitionWriter.cached_payload");
}
static inline std::string typeToString(PartitionWriterType type) {
diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc
index 352f46ffb6..27e7d9c85c 100644
--- a/cpp/core/shuffle/Payload.cc
+++ b/cpp/core/shuffle/Payload.cc
@@ -50,13 +50,10 @@ T* advance(uint8_t** dst) {
return ptr;
}
-arrow::Result<uint8_t> readType(arrow::io::InputStream* inputStream) {
+arrow::Result<uint8_t> readPayloadType(arrow::io::InputStream* is) {
uint8_t type;
- ARROW_ASSIGN_OR_RAISE(auto bytes, inputStream->Read(sizeof(Payload::Type),
&type));
- if (bytes == 0) {
- // Reach EOS.
- return 0;
- }
+ ARROW_ASSIGN_OR_RAISE(auto bytes, is->Read(sizeof(Payload::Type), &type));
+ ARROW_RETURN_IF(bytes == 0, arrow::Status::IOError("Failed to read bytes.
Reached EOS."));
return type;
}
@@ -298,12 +295,8 @@ arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>>
BlockPayload::deseria
int64_t& deserializeTime,
int64_t& decompressTime) {
auto timer = std::make_unique<ScopedTimer>(&deserializeTime);
- static const std::vector<std::shared_ptr<arrow::Buffer>> kEmptyBuffers{};
- ARROW_ASSIGN_OR_RAISE(auto type, readType(inputStream));
- if (type == 0) {
- numRows = 0;
- return kEmptyBuffers;
- }
+ ARROW_ASSIGN_OR_RAISE(auto type, readPayloadType(inputStream));
+
RETURN_NOT_OK(inputStream->Read(sizeof(uint32_t), &numRows));
uint32_t numBuffers;
RETURN_NOT_OK(inputStream->Read(sizeof(uint32_t), &numBuffers));
@@ -481,6 +474,11 @@ std::shared_ptr<arrow::Schema> InMemoryPayload::schema()
const {
return schema_;
}
+arrow::Status InMemoryPayload::createDictionaries(const
std::shared_ptr<ShuffleDictionaryWriter>& dictionaryWriter) {
+ ARROW_ASSIGN_OR_RAISE(buffers_, dictionaryWriter->updateAndGet(schema_,
numRows_, buffers_));
+ return arrow::Status::OK();
+}
+
UncompressedDiskBlockPayload::UncompressedDiskBlockPayload(
Type type,
uint32_t numRows,
@@ -512,11 +510,17 @@ arrow::Status
UncompressedDiskBlockPayload::serialize(arrow::io::OutputStream* o
GLUTEN_CHECK(codec_ != nullptr, "Codec is null when serializing
Payload::kToBeCompressed.");
+ uint8_t blockType;
+ ARROW_ASSIGN_OR_RAISE(auto bytes, inputStream_->Read(sizeof(blockType),
&blockType));
+ ARROW_RETURN_IF(bytes == 0, arrow::Status::Invalid("Cannot serialize
payload. Reached EOS."));
+
+ RETURN_NOT_OK(outputStream->Write(&blockType, sizeof(blockType)));
+
RETURN_NOT_OK(outputStream->Write(&kCompressedType,
sizeof(kCompressedType)));
RETURN_NOT_OK(outputStream->Write(&numRows_, sizeof(uint32_t)));
uint32_t numBuffers = 0;
- ARROW_ASSIGN_OR_RAISE(auto bytes, inputStream_->Read(sizeof(uint32_t),
&numBuffers));
+ ARROW_ASSIGN_OR_RAISE(bytes, inputStream_->Read(sizeof(uint32_t),
&numBuffers));
ARROW_RETURN_IF(bytes == 0 || numBuffers == 0,
arrow::Status::Invalid("Cannot serialize payload with 0 buffers."));
RETURN_NOT_OK(outputStream->Write(&numBuffers, sizeof(uint32_t)));
@@ -524,7 +528,7 @@ arrow::Status
UncompressedDiskBlockPayload::serialize(arrow::io::OutputStream* o
ARROW_ASSIGN_OR_RAISE(auto start, inputStream_->Tell());
auto pos = start;
- auto rawBufferSize = rawSize_ - sizeof(numBuffers);
+ auto rawBufferSize = rawSize_ - sizeof(blockType) - sizeof(numBuffers);
while (pos - start < rawBufferSize) {
ARROW_ASSIGN_OR_RAISE(auto uncompressed, readUncompressedBuffer());
diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h
index 2f481c5197..dfe98cafd5 100644
--- a/cpp/core/shuffle/Payload.h
+++ b/cpp/core/shuffle/Payload.h
@@ -21,6 +21,7 @@
#include <arrow/io/interfaces.h>
#include <arrow/memory_pool.h>
+#include "shuffle/Dictionary.h"
#include "shuffle/Options.h"
#include "shuffle/Utils.h"
@@ -148,6 +149,8 @@ class InMemoryPayload final : public Payload {
std::shared_ptr<arrow::Schema> schema() const;
+ arrow::Status createDictionaries(const
std::shared_ptr<ShuffleDictionaryWriter>& dictionaryWriter);
+
private:
std::shared_ptr<arrow::Schema> schema_;
std::vector<std::shared_ptr<arrow::Buffer>> buffers_;
diff --git a/cpp/core/shuffle/ShuffleWriter.cc
b/cpp/core/shuffle/ShuffleWriter.cc
index 16e8b32624..099794ddb7 100644
--- a/cpp/core/shuffle/ShuffleWriter.cc
+++ b/cpp/core/shuffle/ShuffleWriter.cc
@@ -87,6 +87,14 @@ int64_t ShuffleWriter::totalC2RTime() const {
return 0;
}
+double ShuffleWriter::avgDictionaryFields() const {
+ return metrics_.avgDictionaryFields;
+}
+
+int64_t ShuffleWriter::dictionarySize() const {
+ return metrics_.dictionarySize;
+}
+
const std::vector<int64_t>& ShuffleWriter::partitionLengths() const {
return metrics_.partitionLengths;
}
diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h
index 224c2726e4..1028b0a318 100644
--- a/cpp/core/shuffle/ShuffleWriter.h
+++ b/cpp/core/shuffle/ShuffleWriter.h
@@ -59,6 +59,10 @@ class ShuffleWriter : public Reclaimable {
virtual int64_t totalC2RTime() const;
+ double avgDictionaryFields() const;
+
+ int64_t dictionarySize() const;
+
const std::vector<int64_t>& partitionLengths() const;
const std::vector<int64_t>& rawPartitionLengths() const;
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 83a31cac1e..d52bd5dbf3 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -118,6 +118,10 @@ arrow::Status RssPartitionWriter::doEvict(uint32_t
partitionId, std::unique_ptr<
auto payload, inMemoryPayload->toBlockPayload(payloadType,
payloadPool_.get(), codec_ ? codec_.get() : nullptr));
// Copy payload to arrow buffered os.
ARROW_ASSIGN_OR_RAISE(auto rssBufferOs,
arrow::io::BufferOutputStream::Create(options_->pushBufferMaxSize));
+
+ static constexpr uint8_t kRssBlock =
static_cast<uint8_t>(BlockType::kPlainPayload);
+ RETURN_NOT_OK(rssBufferOs->Write(&kRssBlock, sizeof(kRssBlock)));
+
RETURN_NOT_OK(payload->serialize(rssBufferOs.get()));
payload = nullptr; // Invalidate payload immediately.
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 4deb062174..213a33dc64 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -148,6 +148,7 @@ set(VELOX_SRCS
operators/serializer/VeloxRowToColumnarConverter.cc
operators/writer/VeloxColumnarBatchWriter.cc
operators/writer/VeloxParquetDataSource.cc
+ shuffle/ArrowShuffleDictionaryWriter.cc
shuffle/VeloxHashShuffleWriter.cc
shuffle/VeloxRssSortShuffleWriter.cc
shuffle/VeloxShuffleReader.cc
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc
b/cpp/velox/benchmarks/GenericBenchmark.cc
index bc811c19cf..ad591c91fb 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -63,6 +63,7 @@ DEFINE_string(
"lz4",
"Specify the compression codec. Valid options are none, lz4, zstd,
qat_gzip, qat_zstd, iaa_gzip");
DEFINE_int32(shuffle_partitions, 200, "Number of shuffle split (reducer)
partitions");
+DEFINE_bool(shuffle_dictionary, false, "Whether to enable dictionary encoding
for shuffle write.");
DEFINE_string(plan, "", "Path to input json file of the substrait plan.");
DEFINE_string(
@@ -208,7 +209,9 @@ createPartitionWriter(Runtime* runtime, const std::string&
dataFile, const std::
return std::make_shared<RssPartitionWriter>(
FLAGS_shuffle_partitions, createCodec(), runtime->memoryManager(),
options, std::move(rssClient));
}
+
auto options = std::make_shared<LocalPartitionWriterOptions>();
+ options->enableDictionary = FLAGS_shuffle_dictionary;
return std::make_unique<LocalPartitionWriter>(
FLAGS_shuffle_partitions, createCodec(), runtime->memoryManager(),
options, dataFile, localDirs);
}
diff --git a/cpp/velox/compute/VeloxBackend.cc
b/cpp/velox/compute/VeloxBackend.cc
index ada113f946..7ba2392f9b 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -33,10 +33,12 @@
#ifdef GLUTEN_ENABLE_GPU
#include "velox/experimental/cudf/exec/ToCudf.h"
#endif
+
#include "compute/VeloxRuntime.h"
#include "config/VeloxConfig.h"
#include "jni/JniFileSystem.h"
#include "operators/functions/SparkExprToSubfieldFilterParser.h"
+#include "shuffle/ArrowShuffleDictionaryWriter.h"
#include "udf/UdfLoader.h"
#include "utils/Exception.h"
#include "velox/common/caching/SsdCache.h"
@@ -210,6 +212,10 @@ void VeloxBackend::init(
// local cache persistent relies on the cache pool from root memory pool so
we need to init this
// after the memory manager instanced
initCache();
+
+ registerShuffleDictionaryWriterFactory([](MemoryManager* memoryManager,
arrow::util::Codec* codec) {
+ return std::make_unique<ArrowShuffleDictionaryWriter>(memoryManager,
codec);
+ });
}
facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache()
const {
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc
b/cpp/velox/memory/VeloxMemoryManager.cc
index 669a72eb62..b0e3a799b3 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -172,8 +172,8 @@ class ListenableArbitrator : public
velox::memory::MemoryArbitrator {
try {
listener_->allocationChanged(neededBytes);
} catch (const std::exception&) {
- VLOG(2) << "ListenableArbitrator growCapacityInternal failed,
stacktrace: "
- << velox::process::StackTrace().toString();
+ VLOG(2) << "ListenableArbitrator growCapacityInternal failed,
stacktrace: "
+ << velox::process::StackTrace().toString();
// if allocationChanged failed, we need to free the reclaimed bytes
listener_->allocationChanged(-reclaimedFreeBytes);
std::rethrow_exception(std::current_exception());
@@ -324,9 +324,13 @@ int64_t
shrinkVeloxMemoryPool(velox::memory::MemoryManager* mm, velox::memory::M
}
} // namespace
-std::shared_ptr<arrow::MemoryPool>
VeloxMemoryManager::createArrowMemoryPool(const std::string& name) {
+std::shared_ptr<arrow::MemoryPool>
VeloxMemoryManager::getOrCreateArrowMemoryPool(const std::string& name) {
std::lock_guard<std::mutex> l(mutex_);
- VELOX_CHECK_EQ(arrowPools_.count(name), 0, "Arrow memory pool {} already
exists", name);
+ if (const auto it = arrowPools_.find(name); it != arrowPools_.end()) {
+ auto pool = it->second.lock();
+ VELOX_CHECK_NOT_NULL(pool, "Arrow memory pool {} has been destructed",
name);
+ return pool;
+ }
auto pool = std::make_shared<ArrowMemoryPool>(
blockListener_.get(), [this, name](arrow::MemoryPool* pool) {
this->dropMemoryPool(name); });
arrowPools_.emplace(name, pool);
diff --git a/cpp/velox/memory/VeloxMemoryManager.h
b/cpp/velox/memory/VeloxMemoryManager.h
index 28c0f79614..15e298629e 100644
--- a/cpp/velox/memory/VeloxMemoryManager.h
+++ b/cpp/velox/memory/VeloxMemoryManager.h
@@ -83,7 +83,7 @@ class VeloxMemoryManager final : public MemoryManager {
return defaultArrowPool_.get();
}
- std::shared_ptr<arrow::MemoryPool> createArrowMemoryPool(const std::string&
name) override;
+ std::shared_ptr<arrow::MemoryPool> getOrCreateArrowMemoryPool(const
std::string& name) override;
const MemoryUsageStats collectMemoryUsageStats() const override;
diff --git a/cpp/velox/shuffle/ArrowShuffleDictionaryWriter.cc
b/cpp/velox/shuffle/ArrowShuffleDictionaryWriter.cc
new file mode 100644
index 0000000000..fcb43de106
--- /dev/null
+++ b/cpp/velox/shuffle/ArrowShuffleDictionaryWriter.cc
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "shuffle/ArrowShuffleDictionaryWriter.h"
+#include "shuffle/Utils.h"
+#include "utils/VeloxArrowUtils.h"
+
+#include <arrow/array/builder_dict.h>
+
+namespace gluten {
+
+static constexpr double kDictionaryFactor = 0.5;
+
+using ArrowDictionaryIndexType = int32_t;
+
+template <typename T>
+using is_dictionary_binary_type =
+ std::integral_constant<bool, std::is_same_v<arrow::BinaryType, T> ||
std::is_same_v<arrow::StringType, T>>;
+
+template <typename T>
+using is_dictionary_primitive_type = std::integral_constant<
+ bool,
+ std::is_same_v<arrow::Int64Type, T> || std::is_same_v<arrow::DoubleType,
T> ||
+ std::is_same_v<arrow::TimestampType, T>>;
+
+template <typename T, typename R = void>
+using enable_if_dictionary_type =
+ std::enable_if_t<is_dictionary_primitive_type<T>::value ||
is_dictionary_binary_type<T>::value, R>;
+
+namespace {
+arrow::Status writeDictionaryBuffer(
+ const uint8_t* data,
+ size_t size,
+ arrow::MemoryPool* pool,
+ arrow::util::Codec* codec,
+ arrow::io::OutputStream* out) {
+ ARROW_RETURN_NOT_OK(out->Write(&size, sizeof(size_t)));
+
+ if (size == 0) {
+ return arrow::Status::OK();
+ }
+
+ GLUTEN_DCHECK(data != nullptr, "Cannot write null data");
+
+ if (codec != nullptr) {
+ size_t compressedLength = codec->MaxCompressedLen(size, data);
+ ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateBuffer(compressedLength,
pool));
+ ARROW_ASSIGN_OR_RAISE(
+ compressedLength, codec->Compress(size, data, compressedLength,
buffer->mutable_data_as<uint8_t>()));
+
+ ARROW_RETURN_NOT_OK(out->Write(&compressedLength, sizeof(size_t)));
+ ARROW_RETURN_NOT_OK(out->Write(buffer->data_as<void>(), compressedLength));
+ } else {
+ ARROW_RETURN_NOT_OK(out->Write(data, size));
+ }
+
+ return arrow::Status::OK();
+}
+
+} // namespace
+
+template <typename ArrowType>
+class DictionaryStorageImpl : public ShuffleDictionaryStorage {
+ public:
+ using ValueType = typename ArrowType::c_type;
+
+ DictionaryStorageImpl(arrow::MemoryPool* pool, arrow::util::Codec* codec) :
pool_(pool), codec_(codec) {
+ table_ = std::make_shared<arrow::internal::DictionaryMemoTable>(pool,
std::make_shared<ArrowType>());
+ }
+
+ arrow::Result<ArrowDictionaryIndexType> getOrUpdate(const ValueType& value) {
+ ArrowDictionaryIndexType memoIndex;
+ ARROW_RETURN_NOT_OK(table_->GetOrInsert<ArrowType>(value, &memoIndex));
+ return memoIndex;
+ }
+
+ arrow::Status serialize(arrow::io::OutputStream* out) override {
+ std::shared_ptr<arrow::ArrayData> data;
+ ARROW_RETURN_NOT_OK(table_->GetArrayData(0, &data));
+
+ DLOG(INFO) << "ShuffleDictionaryStorage::serialize num elements: " <<
data->length;
+
+ ARROW_RETURN_IF(
+ data->buffers.size() != 2, arrow::Status::Invalid("Invalid dictionary
for type: ", data->type->ToString()));
+
+ const auto& values = data->buffers[1];
+ ARROW_RETURN_NOT_OK(writeDictionaryBuffer(values->data(), values->size(),
pool_, codec_, out));
+
+ return arrow::Status::OK();
+ }
+
+ int32_t size() const {
+ return table_->size();
+ }
+
+ private:
+ arrow::MemoryPool* pool_;
+ arrow::util::Codec* codec_;
+
+ std::shared_ptr<arrow::internal::DictionaryMemoTable> table_;
+};
+
+class BinaryShuffleDictionaryStorage : public ShuffleDictionaryStorage {
+ public:
+ using MemoType = arrow::LargeBinaryType;
+
+ BinaryShuffleDictionaryStorage(arrow::MemoryPool* pool, arrow::util::Codec*
codec) : pool_(pool), codec_(codec) {
+ table_ = std::make_shared<arrow::internal::DictionaryMemoTable>(pool,
std::make_shared<MemoType>());
+ }
+
+ arrow::Result<ArrowDictionaryIndexType> getOrUpdate(const std::string_view&
view) const {
+ ArrowDictionaryIndexType memoIndex;
+ ARROW_RETURN_NOT_OK(table_->GetOrInsert<MemoType>(view, &memoIndex));
+ return memoIndex;
+ }
+
+ arrow::Status serialize(arrow::io::OutputStream* out) override {
+ std::shared_ptr<arrow::ArrayData> data;
+ ARROW_RETURN_NOT_OK(table_->GetArrayData(0, &data));
+
+ ARROW_RETURN_IF(data->buffers.size() != 3, arrow::Status::Invalid("Invalid
dictionary for binary type"));
+
+ DLOG(INFO) << "BinaryShuffleDictionaryStorage::serialize num elements: "
<< table_->size();
+
+ ARROW_ASSIGN_OR_RAISE(auto lengths, offsetToLength(data->length,
data->buffers[1]));
+ ARROW_RETURN_NOT_OK(writeDictionaryBuffer(lengths->data(),
lengths->size(), pool_, codec_, out));
+
+ const auto& values = data->buffers[2];
+ ARROW_RETURN_NOT_OK(writeDictionaryBuffer(values->data(), values->size(),
pool_, codec_, out));
+
+ return arrow::Status::OK();
+ }
+
+ int32_t size() const {
+ return table_->size();
+ }
+
+ private:
+ arrow::Result<std::shared_ptr<arrow::Buffer>> offsetToLength(
+ size_t numElements,
+ const std::shared_ptr<arrow::Buffer>& offsets) const {
+ ARROW_ASSIGN_OR_RAISE(auto lengths,
arrow::AllocateBuffer(sizeof(StringLengthType) * numElements, pool_));
+ auto* rawLengths = lengths->mutable_data_as<StringLengthType>();
+ const auto* rawOffsets =
offsets->data_as<arrow::TypeTraits<MemoType>::OffsetType::c_type>();
+
+ for (auto i = 0; i < numElements; ++i) {
+ rawLengths[i] = static_cast<StringLengthType>(rawOffsets[i + 1] -
rawOffsets[i]);
+ }
+
+ return lengths;
+ }
+
+ arrow::MemoryPool* pool_;
+ arrow::util::Codec* codec_;
+
+ std::shared_ptr<arrow::internal::DictionaryMemoTable> table_;
+};
+
+template <>
+class DictionaryStorageImpl<arrow::BinaryType> : public
BinaryShuffleDictionaryStorage {
+ public:
+ DictionaryStorageImpl(arrow::MemoryPool* pool, arrow::util::Codec* codec)
+ : BinaryShuffleDictionaryStorage(pool, codec) {}
+};
+
+template <>
+class DictionaryStorageImpl<arrow::StringType> : public
BinaryShuffleDictionaryStorage {
+ public:
+ DictionaryStorageImpl(arrow::MemoryPool* pool, arrow::util::Codec* codec)
+ : BinaryShuffleDictionaryStorage(pool, codec) {}
+};
+
+namespace {
+template <typename ArrowType>
+arrow::Result<std::shared_ptr<arrow::Buffer>> updateDictionary(
+ int32_t numRows,
+ const std::shared_ptr<arrow::Buffer>& validityBuffer,
+ const std::shared_ptr<arrow::Buffer>& valueBuffer,
+ const std::shared_ptr<arrow::Buffer>&,
+ const std::shared_ptr<DictionaryStorageImpl<ArrowType>>& dictionary,
+ arrow::MemoryPool* pool) {
+ ARROW_ASSIGN_OR_RAISE(auto indices,
arrow::AllocateBuffer(sizeof(ArrowDictionaryIndexType) * numRows, pool));
+ auto rawIndices = indices->mutable_data_as<ArrowDictionaryIndexType>();
+
+ const auto* values = valueBuffer->data_as<typename ArrowType::c_type>();
+
+ if (validityBuffer != nullptr) {
+ const auto* nulls = validityBuffer->data();
+ for (auto i = 0; i < numRows; ++i) {
+ if (arrow::bit_util::GetBit(nulls, i)) {
+ ARROW_ASSIGN_OR_RAISE(*rawIndices, dictionary->getOrUpdate(values[i]));
+ }
+ ++rawIndices;
+ }
+ } else {
+ for (auto i = 0; i < numRows; ++i) {
+ ARROW_ASSIGN_OR_RAISE(*rawIndices++, dictionary->getOrUpdate(values[i]));
+ }
+ }
+
+ return indices;
+}
+
+arrow::Result<std::shared_ptr<arrow::Buffer>> updateDictionaryForBinary(
+ int32_t numRows,
+ const std::shared_ptr<arrow::Buffer>& validityBuffer,
+ const std::shared_ptr<arrow::Buffer>& lengthBuffer,
+ const std::shared_ptr<arrow::Buffer>& valueBuffer,
+ const std::shared_ptr<BinaryShuffleDictionaryStorage>& dictionary,
+ arrow::MemoryPool* pool) {
+ ARROW_ASSIGN_OR_RAISE(auto indices,
arrow::AllocateBuffer(sizeof(ArrowDictionaryIndexType) * numRows, pool));
+ auto rawIndices = indices->mutable_data_as<ArrowDictionaryIndexType>();
+
+ const auto* lengths = lengthBuffer->data_as<uint32_t>();
+ const auto* values = valueBuffer->data_as<char>();
+
+ size_t offset = 0;
+
+ if (validityBuffer != nullptr) {
+ const auto* nulls = validityBuffer->data();
+ for (auto i = 0; i < numRows; ++i) {
+ if (arrow::bit_util::GetBit(nulls, i)) {
+ std::string_view view(values + offset, lengths[i]);
+ offset += lengths[i];
+ ARROW_ASSIGN_OR_RAISE(*rawIndices, dictionary->getOrUpdate(view));
+ }
+ ++rawIndices;
+ }
+ } else {
+ for (auto i = 0; i < numRows; ++i) {
+ std::string_view view(values + offset, lengths[i]);
+ offset += lengths[i];
+ ARROW_ASSIGN_OR_RAISE(*rawIndices++, dictionary->getOrUpdate(view));
+ }
+ }
+
+ return indices;
+}
+
+template <>
+arrow::Result<std::shared_ptr<arrow::Buffer>>
updateDictionary<arrow::BinaryType>(
+ int32_t numRows,
+ const std::shared_ptr<arrow::Buffer>& validityBuffer,
+ const std::shared_ptr<arrow::Buffer>& lengthBuffer,
+ const std::shared_ptr<arrow::Buffer>& valueBuffer,
+ const std::shared_ptr<DictionaryStorageImpl<arrow::BinaryType>>&
dictionary,
+ arrow::MemoryPool* pool) {
+ return updateDictionaryForBinary(numRows, validityBuffer, lengthBuffer,
valueBuffer, dictionary, pool);
+}
+
+template <>
+arrow::Result<std::shared_ptr<arrow::Buffer>>
updateDictionary<arrow::StringType>(
+ int32_t numRows,
+ const std::shared_ptr<arrow::Buffer>& validityBuffer,
+ const std::shared_ptr<arrow::Buffer>& lengthBuffer,
+ const std::shared_ptr<arrow::Buffer>& valueBuffer,
+ const std::shared_ptr<DictionaryStorageImpl<arrow::StringType>>&
dictionary,
+ arrow::MemoryPool* pool) {
+ return updateDictionaryForBinary(numRows, validityBuffer, lengthBuffer,
valueBuffer, dictionary, pool);
+}
+} // namespace
+
+class ValueUpdater {
+ public:
+ template <typename ArrowType>
+ enable_if_dictionary_type<ArrowType, arrow::Status> Visit(const ArrowType&) {
+ bool dictionaryExists = false;
+ std::shared_ptr<DictionaryStorageImpl<ArrowType>> dictionary;
+ if (const auto& it = writer->dictionaries_.find(fieldIdx); it !=
writer->dictionaries_.end()) {
+ dictionaryExists = true;
+ dictionary =
std::dynamic_pointer_cast<DictionaryStorageImpl<ArrowType>>(it->second);
+ } else {
+ dictionary =
std::make_shared<DictionaryStorageImpl<ArrowType>>(writer->dictionaryPool_.get(),
writer->codec_);
+ }
+
+ ARROW_ASSIGN_OR_RAISE(
+ auto indices, updateDictionary<ArrowType>(numRows, nulls, values,
binaryValues, dictionary, pool));
+
+ results.push_back(nulls);
+
+ // Discard dictionary.
+ if (!dictionaryExists && dictionary->size() > numRows * kDictionaryFactor)
{
+ dictionaryCreated = false;
+ results.push_back(values);
+ if (binaryValues != nullptr) {
+ results.push_back(binaryValues);
+ }
+ return arrow::Status::OK();
+ }
+
+ if (!dictionaryExists) {
+ writer->dictionaries_[fieldIdx] = dictionary;
+ }
+
+ dictionaryCreated = true;
+ results.push_back(indices);
+
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Visit(const arrow::Decimal128Type& type) {
+ // Only support short decimal.
+ return Visit(arrow::Int64Type());
+ }
+
+ arrow::Status Visit(const arrow::DataType& type) {
+ return arrow::Status::TypeError("Not implemented for type: ",
type.ToString());
+ }
+
+ ArrowShuffleDictionaryWriter* writer;
+ arrow::MemoryPool* pool;
+ int32_t fieldIdx;
+ int32_t numRows;
+ std::shared_ptr<arrow::Buffer> nulls{nullptr};
+ std::shared_ptr<arrow::Buffer> values{nullptr};
+ std::shared_ptr<arrow::Buffer> binaryValues{nullptr};
+
+ std::vector<std::shared_ptr<arrow::Buffer>>& results;
+ bool& dictionaryCreated;
+};
+
+arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>>
ArrowShuffleDictionaryWriter::updateAndGet(
+ const std::shared_ptr<arrow::Schema>& schema,
+ int32_t numRows,
+ const std::vector<std::shared_ptr<arrow::Buffer>>& buffers) {
+ ARROW_RETURN_NOT_OK(initSchema(schema));
+
+ std::vector<std::shared_ptr<arrow::Buffer>> results;
+
+ size_t bufferIdx = 0;
+ for (auto i = 0; i < schema->num_fields(); ++i) {
+ switch (fieldTypes_[i]) {
+ case FieldType::kNull:
+ case FieldType::kComplex:
+ break;
+ case FieldType::kFixedWidth:
+ results.emplace_back(buffers[bufferIdx++]);
+ results.emplace_back(buffers[bufferIdx++]);
+ break;
+ case FieldType::kBinary:
+ results.emplace_back(buffers[bufferIdx++]);
+ results.emplace_back(buffers[bufferIdx++]);
+ results.emplace_back(buffers[bufferIdx++]);
+ break;
+ case FieldType::kSupportsDictionary: {
+ const auto fieldType = schema_->field(i)->type();
+ bool isBinaryType =
+ fieldType->id() == arrow::BinaryType::type_id || fieldType->id()
== arrow::StringType::type_id;
+
+ bool isDictionaryCreated = false;
+ ValueUpdater valueUpdater{
+ this,
+ memoryManager_->defaultArrowMemoryPool(),
+ i,
+ numRows,
+ buffers[bufferIdx++],
+ buffers[bufferIdx++],
+ isBinaryType ? buffers[bufferIdx++] : nullptr,
+ results,
+ isDictionaryCreated};
+
+ ARROW_RETURN_NOT_OK(arrow::VisitTypeInline(*fieldType, &valueUpdater));
+
+ if (!isDictionaryCreated) {
+ ARROW_RETURN_NOT_OK(blackList(i));
+ }
+
+ break;
+ }
+ }
+ }
+
+ if (hasComplexType_) {
+ results.emplace_back(buffers[bufferIdx++]);
+ }
+
+ GLUTEN_DCHECK(bufferIdx == buffers.size(), "Not all buffers are consumed.");
+
+ return results;
+}
+
+arrow::Status ArrowShuffleDictionaryWriter::serialize(arrow::io::OutputStream*
out) {
+ auto bitMapSize =
arrow::bit_util::RoundUpToMultipleOf8(schema_->num_fields());
+ std::vector<uint8_t> bitMap(bitMapSize);
+
+ for (auto fieldIdx : dictionaryFields_) {
+ arrow::bit_util::SetBit(bitMap.data(), fieldIdx);
+ }
+
+ ARROW_RETURN_NOT_OK(out->Write(bitMap.data(), bitMapSize));
+
+ for (auto fieldIdx : dictionaryFields_) {
+ GLUTEN_DCHECK(
+ dictionaries_.find(fieldIdx) != dictionaries_.end(),
+ "Invalid dictionary field index: " + std::to_string(fieldIdx));
+
+ const auto& dictionary = dictionaries_[fieldIdx];
+ ARROW_RETURN_NOT_OK(dictionary->serialize(out));
+
+ dictionaries_.erase(fieldIdx);
+ }
+
+ return arrow::Status::OK();
+}
+
+int64_t ArrowShuffleDictionaryWriter::numDictionaryFields() {
+ return dictionaryFields_.size();
+}
+
+int64_t ArrowShuffleDictionaryWriter::getDictionarySize() {
+ return dictionaryPool_->bytes_allocated();
+}
+
+arrow::Status ArrowShuffleDictionaryWriter::initSchema(const
std::shared_ptr<arrow::Schema>& schema) {
+ if (schema_ == nullptr) {
+ schema_ = schema;
+
+ rowType_ = fromArrowSchema(schema);
+ fieldTypes_.resize(rowType_->size());
+
+ for (auto i = 0; i < rowType_->size(); ++i) {
+ switch (rowType_->childAt(i)->kind()) {
+ case facebook::velox::TypeKind::UNKNOWN:
+ fieldTypes_[i] = FieldType::kNull;
+ break;
+ case facebook::velox::TypeKind::ARRAY:
+ case facebook::velox::TypeKind::MAP:
+ case facebook::velox::TypeKind::ROW:
+ fieldTypes_[i] = FieldType::kComplex;
+ hasComplexType_ = true;
+ break;
+ case facebook::velox::TypeKind::VARBINARY:
+ case facebook::velox::TypeKind::VARCHAR:
+ case facebook::velox::TypeKind::DOUBLE:
+ case facebook::velox::TypeKind::BIGINT:
+ fieldTypes_[i] = FieldType::kSupportsDictionary;
+ dictionaryFields_.emplace(i);
+ break;
+ default:
+ fieldTypes_[i] = FieldType::kFixedWidth;
+ break;
+ }
+ }
+ } else if (schema_ != schema) {
+ return arrow::Status::Invalid("Schema mismatch");
+ }
+ return arrow::Status::OK();
+}
+
+arrow::Status ArrowShuffleDictionaryWriter::blackList(int32_t fieldId) {
+ switch (const auto typeId = schema_->field(fieldId)->type()->id()) {
+ case arrow::BinaryType::type_id:
+ case arrow::StringType::type_id:
+ fieldTypes_[fieldId] = FieldType::kBinary;
+ break;
+ default: {
+ if (!arrow::is_fixed_width(typeId)) {
+ return arrow::Status::Invalid("Invalid field type: ",
schema_->field(fieldId)->type()->ToString());
+ }
+ fieldTypes_[fieldId] = FieldType::kFixedWidth;
+ break;
+ }
+ }
+
+ dictionaryFields_.erase(fieldId);
+
+ return arrow::Status::OK();
+}
+} // namespace gluten
diff --git a/cpp/velox/shuffle/ArrowShuffleDictionaryWriter.h
b/cpp/velox/shuffle/ArrowShuffleDictionaryWriter.h
new file mode 100644
index 0000000000..ef0efbb668
--- /dev/null
+++ b/cpp/velox/shuffle/ArrowShuffleDictionaryWriter.h
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "shuffle/Dictionary.h"
+
+#include "velox/type/Type.h"
+
+#include <arrow/buffer.h>
+#include <arrow/type.h>
+
+#include <set>
+
+namespace gluten {
+
+class ArrowShuffleDictionaryWriter final : public ShuffleDictionaryWriter {
+ public:
+ ArrowShuffleDictionaryWriter(MemoryManager* memoryManager,
arrow::util::Codec* codec)
+ : memoryManager_(memoryManager), codec_(codec) {
+ dictionaryPool_ =
memoryManager->getOrCreateArrowMemoryPool("ArrowShuffleDictionaryWriter.dictionary");
+ }
+
+ arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> updateAndGet(
+ const std::shared_ptr<arrow::Schema>& schema,
+ int32_t numRows,
+ const std::vector<std::shared_ptr<arrow::Buffer>>& buffers) override;
+
+ arrow::Status serialize(arrow::io::OutputStream* out) override;
+
+ int64_t numDictionaryFields() override;
+
+ int64_t getDictionarySize() override;
+
+ private:
+ enum class FieldType { kNull, kFixedWidth, kBinary, kComplex,
kSupportsDictionary };
+
+ arrow::Status initSchema(const std::shared_ptr<arrow::Schema>& schema);
+
+ arrow::Status blackList(int32_t fieldId);
+
+ MemoryManager* memoryManager_;
+ // Used to count the memory allocation for dictionary data.
+ std::shared_ptr<arrow::MemoryPool> dictionaryPool_;
+
+ arrow::util::Codec* codec_;
+
+ std::shared_ptr<arrow::Schema> schema_{nullptr};
+ facebook::velox::TypePtr rowType_{nullptr};
+ std::vector<FieldType> fieldTypes_;
+ std::set<int32_t> dictionaryFields_;
+ bool hasComplexType_{false};
+ std::unordered_map<int32_t, std::shared_ptr<ShuffleDictionaryStorage>>
dictionaries_;
+
+ friend class ValueUpdater;
+};
+} // namespace gluten
\ No newline at end of file
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index ae956aa49b..46fb1716fe 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -41,6 +41,17 @@ using namespace facebook::velox;
namespace gluten {
namespace {
+
+arrow::Result<BlockType> readBlockType(arrow::io::InputStream* inputStream) {
+ BlockType type;
+ ARROW_ASSIGN_OR_RAISE(auto bytes, inputStream->Read(sizeof(BlockType),
&type));
+ if (bytes == 0) {
+ // Reach EOS.
+ return BlockType::kEndOfStream;
+ }
+ return type;
+}
+
struct BufferViewReleaser {
BufferViewReleaser() : BufferViewReleaser(nullptr) {}
@@ -66,23 +77,25 @@ BufferPtr
convertToVeloxBuffer(std::shared_ptr<arrow::Buffer> buffer) {
return wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer);
}
-template <TypeKind kind>
+template <TypeKind Kind, typename T = typename TypeTraits<Kind>::NativeType>
VectorPtr readFlatVector(
std::vector<BufferPtr>& buffers,
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
+ const VectorPtr& dictionary,
memory::MemoryPool* pool) {
auto nulls = buffers[bufferIdx++];
- auto values = buffers[bufferIdx++];
- std::vector<BufferPtr> stringBuffers;
- using T = typename TypeTraits<kind>::NativeType;
- if (nulls == nullptr || nulls->size() == 0) {
- return std::make_shared<FlatVector<T>>(
- pool, type, BufferPtr(nullptr), length, std::move(values),
std::move(stringBuffers));
+ auto valuesOrIndices = buffers[bufferIdx++];
+
+ nulls = nulls == nullptr || nulls->size() == 0 ? BufferPtr(nullptr) : nulls;
+
+ if (dictionary != nullptr) {
+ return BaseVector::wrapInDictionary(nulls, valuesOrIndices, length,
dictionary);
}
+
return std::make_shared<FlatVector<T>>(
- pool, type, std::move(nulls), length, std::move(values),
std::move(stringBuffers));
+ pool, type, nulls, length, std::move(valuesOrIndices),
std::vector<BufferPtr>{});
}
template <>
@@ -91,6 +104,7 @@ VectorPtr readFlatVector<TypeKind::UNKNOWN>(
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
+ const VectorPtr& dictionary,
memory::MemoryPool* pool) {
return BaseVector::createNullConstant(type, length, pool);
}
@@ -101,27 +115,28 @@ VectorPtr readFlatVector<TypeKind::HUGEINT>(
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
+ const VectorPtr& dictionary,
memory::MemoryPool* pool) {
auto nulls = buffers[bufferIdx++];
- auto valueBuffer = buffers[bufferIdx++];
+ auto valuesOrIndices = buffers[bufferIdx++];
+
// Because if buffer does not compress, it will get from netty, the address
maynot aligned 16B, which will cause
// int128_t = xxx coredump by instruction movdqa
- auto data = valueBuffer->as<int128_t>();
- BufferPtr values;
- if ((reinterpret_cast<uintptr_t>(data) & 0xf) == 0) {
- values = valueBuffer;
- } else {
- values = AlignedBuffer::allocate<char>(valueBuffer->size(), pool);
- gluten::fastCopy(values->asMutable<char>(), valueBuffer->as<char>(),
valueBuffer->size());
+ const auto* addr = valuesOrIndices->as<facebook::velox::int128_t>();
+ if ((reinterpret_cast<uintptr_t>(addr) & 0xf) != 0) {
+ auto alignedBuffer =
AlignedBuffer::allocate<char>(valuesOrIndices->size(), pool);
+ fastCopy(alignedBuffer->asMutable<char>(), valuesOrIndices->as<char>(),
valuesOrIndices->size());
+ valuesOrIndices = alignedBuffer;
}
- std::vector<BufferPtr> stringBuffers;
- if (nulls == nullptr || nulls->size() == 0) {
- auto vp = std::make_shared<FlatVector<int128_t>>(
- pool, type, BufferPtr(nullptr), length, std::move(values),
std::move(stringBuffers));
- return vp;
+
+ nulls = nulls == nullptr || nulls->size() == 0 ? BufferPtr(nullptr) : nulls;
+
+ if (dictionary != nullptr) {
+ return BaseVector::wrapInDictionary(nulls, valuesOrIndices, length,
dictionary);
}
+
return std::make_shared<FlatVector<int128_t>>(
- pool, type, std::move(nulls), length, std::move(values),
std::move(stringBuffers));
+ pool, type, nulls, length, std::move(valuesOrIndices),
std::vector<BufferPtr>{});
}
VectorPtr readFlatVectorStringView(
@@ -129,28 +144,36 @@ VectorPtr readFlatVectorStringView(
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
+ const VectorPtr& dictionary,
memory::MemoryPool* pool) {
auto nulls = buffers[bufferIdx++];
- auto lengthBuffer = buffers[bufferIdx++];
+ auto lengthOrIndices = buffers[bufferIdx++];
+
+ nulls = nulls == nullptr || nulls->size() == 0 ? BufferPtr(nullptr) : nulls;
+
+ if (dictionary != nullptr) {
+ return BaseVector::wrapInDictionary(nulls, lengthOrIndices, length,
dictionary);
+ }
+
auto valueBuffer = buffers[bufferIdx++];
- const auto* rawLength = lengthBuffer->as<StringLengthType>();
- std::vector<BufferPtr> stringBuffers;
+ const auto* rawLength = lengthOrIndices->as<StringLengthType>();
+ const auto* valueBufferPtr = valueBuffer->as<char>();
+
auto values = AlignedBuffer::allocate<char>(sizeof(StringView) * length,
pool);
- auto rawValues = values->asMutable<StringView>();
- auto rawChars = valueBuffer->as<char>();
+ auto* rawValues = values->asMutable<StringView>();
+
uint64_t offset = 0;
for (int32_t i = 0; i < length; ++i) {
- rawValues[i] = StringView(rawChars + offset, rawLength[i]);
+ rawValues[i] = StringView(valueBufferPtr + offset, rawLength[i]);
offset += rawLength[i];
}
+
+ std::vector<BufferPtr> stringBuffers;
stringBuffers.emplace_back(valueBuffer);
- if (nulls == nullptr || nulls->size() == 0) {
- return std::make_shared<FlatVector<StringView>>(
- pool, type, BufferPtr(nullptr), length, std::move(values),
std::move(stringBuffers));
- }
+
return std::make_shared<FlatVector<StringView>>(
- pool, type, std::move(nulls), length, std::move(values),
std::move(stringBuffers));
+ pool, type, nulls, length, std::move(values), std::move(stringBuffers));
}
template <>
@@ -159,8 +182,9 @@ VectorPtr readFlatVector<TypeKind::VARCHAR>(
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
+ const VectorPtr& dictionary,
memory::MemoryPool* pool) {
- return readFlatVectorStringView(buffers, bufferIdx, length, type, pool);
+ return readFlatVectorStringView(buffers, bufferIdx, length, type,
dictionary, pool);
}
template <>
@@ -169,8 +193,9 @@ VectorPtr readFlatVector<TypeKind::VARBINARY>(
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
+ const VectorPtr& dictionary,
memory::MemoryPool* pool) {
- return readFlatVectorStringView(buffers, bufferIdx, length, type, pool);
+ return readFlatVectorStringView(buffers, bufferIdx, length, type,
dictionary, pool);
}
std::unique_ptr<ByteInputStream> toByteStream(uint8_t* data, int32_t size) {
@@ -209,42 +234,47 @@ RowTypePtr getComplexWriteType(const
std::vector<TypePtr>& types) {
return std::make_shared<const RowType>(std::move(complexTypeColNames),
std::move(complexTypeChildrens));
}
-void readColumns(
- std::vector<BufferPtr>& buffers,
- memory::MemoryPool* pool,
+RowVectorPtr deserialize(
+ RowTypePtr type,
uint32_t numRows,
- const std::vector<TypePtr>& types,
- std::vector<VectorPtr>& result) {
- int32_t bufferIdx = 0;
+ std::vector<BufferPtr>& buffers,
+ const std::vector<int32_t>& dictionaryFields,
+ const std::vector<VectorPtr>& dictionaries,
+ memory::MemoryPool* pool) {
+ std::vector<VectorPtr> children;
+ auto types = type->as<TypeKind::ROW>().children();
+
std::vector<VectorPtr> complexChildren;
auto complexRowType = getComplexWriteType(types);
if (complexRowType->children().size() > 0) {
complexChildren = readComplexType(buffers[buffers.size() - 1],
complexRowType, pool)->children();
}
+ int32_t bufferIdx = 0;
int32_t complexIdx = 0;
- for (int32_t i = 0; i < types.size(); ++i) {
- auto kind = types[i]->kind();
+ int32_t dictionaryIdx = 0;
+ for (size_t i = 0; i < types.size(); ++i) {
+ const auto kind = types[i]->kind();
switch (kind) {
case TypeKind::ROW:
case TypeKind::MAP:
case TypeKind::ARRAY: {
- result.emplace_back(std::move(complexChildren[complexIdx]));
+ children.emplace_back(std::move(complexChildren[complexIdx]));
complexIdx++;
} break;
default: {
+ VectorPtr dictionary{nullptr};
+ if (!dictionaryFields.empty() && dictionaryIdx <
dictionaryFields.size() &&
+ dictionaryFields[dictionaryIdx] == i) {
+ dictionary = dictionaries[dictionaryIdx++];
+ }
auto res = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
- readFlatVector, types[i]->kind(), buffers, bufferIdx, numRows,
types[i], pool);
- result.emplace_back(std::move(res));
+ readFlatVector, kind, buffers, bufferIdx, numRows, types[i],
dictionary, pool);
+ children.emplace_back(std::move(res));
} break;
}
}
-}
-RowVectorPtr deserialize(RowTypePtr type, uint32_t numRows,
std::vector<BufferPtr>& buffers, memory::MemoryPool* pool) {
- std::vector<VectorPtr> children;
- auto childTypes = type->as<TypeKind::ROW>().children();
- readColumns(buffers, pool, numRows, childTypes, children);
return std::make_shared<RowVector>(pool, type, BufferPtr(nullptr), numRows,
children);
}
@@ -252,6 +282,8 @@ std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
RowTypePtr type,
uint32_t numRows,
std::vector<std::shared_ptr<arrow::Buffer>> arrowBuffers,
+ const std::vector<int32_t>& dictionaryFields,
+ const std::vector<VectorPtr>& dictionaries,
memory::MemoryPool* pool,
int64_t& deserializeTime) {
ScopedTimer timer(&deserializeTime);
@@ -260,7 +292,7 @@ std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
for (auto& buffer : arrowBuffers) {
veloxBuffers.push_back(convertToVeloxBuffer(std::move(buffer)));
}
- auto rowVector = deserialize(type, numRows, veloxBuffers, pool);
+ auto rowVector = deserialize(type, numRows, veloxBuffers, dictionaryFields,
dictionaries, pool);
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
@@ -277,11 +309,149 @@ std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
GLUTEN_ASSIGN_OR_THROW(auto buffer, payload->readBufferAt(i));
veloxBuffers.push_back(convertToVeloxBuffer(std::move(buffer)));
}
- auto rowVector = deserialize(type, payload->numRows(), veloxBuffers, pool);
+ auto rowVector = deserialize(type, payload->numRows(), veloxBuffers, {}, {},
pool);
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
+
+arrow::Result<BufferPtr>
+readDictionaryBuffer(arrow::io::InputStream* in,
facebook::velox::memory::MemoryPool* pool, arrow::util::Codec* codec) {
+ size_t bufferSize;
+
+ ARROW_RETURN_NOT_OK(in->Read(sizeof(bufferSize), &bufferSize));
+ auto buffer = facebook::velox::AlignedBuffer::allocate<char>(bufferSize,
pool, std::nullopt, true);
+
+ if (bufferSize == 0) {
+ return buffer;
+ }
+
+ if (codec != nullptr) {
+ size_t compressedSize;
+ ARROW_RETURN_NOT_OK(in->Read(sizeof(compressedSize), &compressedSize));
+ auto compressedBuffer =
facebook::velox::AlignedBuffer::allocate<char>(compressedSize, pool,
std::nullopt, true);
+ ARROW_RETURN_NOT_OK(in->Read(compressedSize,
compressedBuffer->asMutable<void>()));
+ ARROW_ASSIGN_OR_RAISE(
+ auto decompressedSize,
+ codec->Decompress(compressedSize, compressedBuffer->as<uint8_t>(),
bufferSize, buffer->asMutable<uint8_t>()));
+ ARROW_RETURN_IF(
+ decompressedSize != bufferSize,
+ arrow::Status::IOError(
+ fmt::format("Decompressed size doesn't equal to original size: ({}
vs {})", decompressedSize, bufferSize)));
+ } else {
+ ARROW_RETURN_NOT_OK(in->Read(bufferSize, buffer->asMutable<void>()));
+ }
+ return buffer;
+}
+
+arrow::Result<VectorPtr> readDictionaryForBinary(
+ arrow::io::InputStream* in,
+ const TypePtr& type,
+ facebook::velox::memory::MemoryPool* pool,
+ arrow::util::Codec* codec) {
+ // Read length buffer.
+ ARROW_ASSIGN_OR_RAISE(auto lengthBuffer, readDictionaryBuffer(in, pool,
codec));
+ const auto* lengthBufferPtr = lengthBuffer->as<StringLengthType>();
+
+ // Read value buffer.
+ ARROW_ASSIGN_OR_RAISE(auto valueBuffer, readDictionaryBuffer(in, pool,
codec));
+ const auto* valueBufferPtr = valueBuffer->as<char>();
+
+ // Build StringViews.
+ const auto numElements = lengthBuffer->size() / sizeof(StringLengthType);
+ auto values = AlignedBuffer::allocate<char>(sizeof(StringView) *
numElements, pool, std::nullopt, true);
+ auto* rawValues = values->asMutable<StringView>();
+
+ uint64_t offset = 0;
+ for (size_t i = 0; i < numElements; ++i) {
+ rawValues[i] = StringView(valueBufferPtr + offset, lengthBufferPtr[i]);
+ offset += lengthBufferPtr[i];
+ }
+
+ std::vector<BufferPtr> stringBuffers;
+ stringBuffers.emplace_back(valueBuffer);
+
+ return std::make_shared<FlatVector<StringView>>(
+ pool, type, BufferPtr(nullptr), numElements, std::move(values),
std::move(stringBuffers));
+}
+
+template <TypeKind Kind, typename NativeType = typename
TypeTraits<Kind>::NativeType>
+arrow::Result<VectorPtr> readDictionary(
+ arrow::io::InputStream* in,
+ const TypePtr& type,
+ facebook::velox::memory::MemoryPool* pool,
+ arrow::util::Codec* codec) {
+ ARROW_ASSIGN_OR_RAISE(auto buffer, readDictionaryBuffer(in, pool, codec));
+
+ const auto numElements = buffer->size() / sizeof(NativeType);
+
+ return std::make_shared<FlatVector<NativeType>>(
+ pool, type, BufferPtr(nullptr), numElements, std::move(buffer),
std::vector<BufferPtr>{});
+}
+
+template <>
+arrow::Result<VectorPtr> readDictionary<TypeKind::VARCHAR>(
+ arrow::io::InputStream* in,
+ const TypePtr& type,
+ facebook::velox::memory::MemoryPool* pool,
+ arrow::util::Codec* codec) {
+ return readDictionaryForBinary(in, type, pool, codec);
+}
+
+template <>
+arrow::Result<VectorPtr> readDictionary<TypeKind::VARBINARY>(
+ arrow::io::InputStream* in,
+ const TypePtr& type,
+ facebook::velox::memory::MemoryPool* pool,
+ arrow::util::Codec* codec) {
+ return readDictionaryForBinary(in, type, pool, codec);
+}
+
} // namespace
+class VeloxDictionaryReader {
+ public:
+ VeloxDictionaryReader(
+ const facebook::velox::RowTypePtr& rowType,
+ facebook::velox::memory::MemoryPool* veloxPool,
+ arrow::util::Codec* codec)
+ : rowType_(rowType), veloxPool_(veloxPool), codec_(codec) {}
+
+ arrow::Result<std::vector<int32_t>> readFields(arrow::io::InputStream* in)
const {
+ // Read bitmap.
+ auto bitMapSize = arrow::bit_util::RoundUpToMultipleOf8(rowType_->size());
+ std::vector<uint8_t> bitMap(bitMapSize);
+
+ RETURN_NOT_OK(in->Read(bitMapSize, bitMap.data()));
+
+ std::vector<int32_t> fields;
+ for (auto i = 0; i < rowType_->size(); ++i) {
+ if (arrow::bit_util::GetBit(bitMap.data(), i)) {
+ fields.push_back(i);
+ }
+ }
+
+ return fields;
+ }
+
+ arrow::Result<std::vector<VectorPtr>>
readDictionaries(arrow::io::InputStream* in, const std::vector<int32_t>& fields)
+ const {
+ // Read dictionary buffers.
+ std::vector<VectorPtr> dictionaries;
+ for (const auto i : fields) {
+ auto dictionary = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(
+ readDictionary, rowType_->childAt(i)->kind(), in,
rowType_->childAt(i), veloxPool_, codec_);
+ dictionaries.emplace_back();
+ ARROW_ASSIGN_OR_RAISE(dictionaries.back(), dictionary);
+ }
+
+ return dictionaries;
+ }
+
+ private:
+ facebook::velox::RowTypePtr rowType_;
+ facebook::velox::memory::MemoryPool* veloxPool_;
+ arrow::util::Codec* codec_;
+};
+
VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer(
std::shared_ptr<arrow::io::InputStream> in,
const std::shared_ptr<arrow::Schema>& schema,
@@ -308,19 +478,71 @@
VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer(
GLUTEN_ASSIGN_OR_THROW(in_,
arrow::io::BufferedInputStream::Create(bufferSize, memoryPool, std::move(in)));
}
+bool VeloxHashShuffleReaderDeserializer::shouldSkipMerge() const {
+ // Complex type or dictionary encodings do not support merging.
+ return hasComplexType_ || !dictionaryFields_.empty();
+}
+
+void VeloxHashShuffleReaderDeserializer::resolveNextBlockType() {
+ if (blockTypeResolved_) {
+ return;
+ }
+
+ blockTypeResolved_ = true;
+
+ GLUTEN_ASSIGN_OR_THROW(auto blockType, readBlockType(in_.get()));
+ switch (blockType) {
+ case BlockType::kEndOfStream:
+ reachEos_ = true;
+ break;
+ case BlockType::kDictionary: {
+ VeloxDictionaryReader reader(rowType_, veloxPool_, codec_.get());
+ GLUTEN_ASSIGN_OR_THROW(dictionaryFields_, reader.readFields(in_.get()));
+ GLUTEN_ASSIGN_OR_THROW(dictionaries_, reader.readDictionaries(in_.get(),
dictionaryFields_));
+
+ GLUTEN_ASSIGN_OR_THROW(blockType, readBlockType(in_.get()));
+ GLUTEN_CHECK(blockType == BlockType::kDictionaryPayload, "Invalid block
type for dictionary payload");
+ } break;
+ case BlockType::kDictionaryPayload: {
+ GLUTEN_CHECK(
+ !dictionaryFields_.empty() && !dictionaries_.empty(),
+ "Dictionaries cannot be empty when reading dictionary payload");
+ } break;
+ case BlockType::kPlainPayload: {
+ if (!dictionaryFields_.empty()) {
+ // Clear previous dictionaries if the next block is a plain payload.
+ dictionaryFields_.clear();
+ dictionaries_.clear();
+ }
+ } break;
+ }
+}
+
std::shared_ptr<ColumnarBatch> VeloxHashShuffleReaderDeserializer::next() {
- if (hasComplexType_) {
+ resolveNextBlockType();
+
+ if (shouldSkipMerge()) {
+ // We have leftover rows from the last mergeable read.
+ if (merged_) {
+ return makeColumnarBatch(rowType_, std::move(merged_), veloxPool_,
deserializeTime_);
+ }
+
+ if (reachEos_) {
+ return nullptr;
+ }
+
uint32_t numRows = 0;
GLUTEN_ASSIGN_OR_THROW(
auto arrowBuffers,
BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows,
deserializeTime_, decompressTime_));
- if (arrowBuffers.empty()) {
- // Reach EOS.
- return nullptr;
- }
- return makeColumnarBatch(rowType_, numRows, std::move(arrowBuffers),
veloxPool_, deserializeTime_);
+
+ blockTypeResolved_ = false;
+
+ return makeColumnarBatch(
+ rowType_, numRows, std::move(arrowBuffers), dictionaryFields_,
dictionaries_, veloxPool_, deserializeTime_);
}
+ // TODO: Remove merging.
if (reachEos_) {
if (merged_) {
return makeColumnarBatch(rowType_, std::move(merged_), veloxPool_,
deserializeTime_);
@@ -331,13 +553,19 @@ std::shared_ptr<ColumnarBatch>
VeloxHashShuffleReaderDeserializer::next() {
std::vector<std::shared_ptr<arrow::Buffer>> arrowBuffers{};
uint32_t numRows = 0;
while (!merged_ || merged_->numRows() < batchSize_) {
+ resolveNextBlockType();
+
+ // Break the merging loop once we reach EOS or read a dictionary block.
+ if (reachEos_ || !dictionaryFields_.empty()) {
+ break;
+ }
+
GLUTEN_ASSIGN_OR_THROW(
arrowBuffers,
BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows,
deserializeTime_, decompressTime_));
- if (arrowBuffers.empty()) {
- reachEos_ = true;
- break;
- }
+
+ blockTypeResolved_ = false;
+
if (!merged_) {
merged_ = std::make_unique<InMemoryPayload>(numRows, isValidityBuffer_,
schema_, std::move(arrowBuffers));
arrowBuffers.clear();
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h
b/cpp/velox/shuffle/VeloxShuffleReader.h
index ea9e78e181..4eae874765 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -46,6 +46,10 @@ class VeloxHashShuffleReaderDeserializer final : public
ColumnarBatchIterator {
std::shared_ptr<ColumnarBatch> next() override;
private:
+ bool shouldSkipMerge() const;
+
+ void resolveNextBlockType();
+
std::shared_ptr<arrow::io::InputStream> in_;
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::util::Codec> codec_;
@@ -61,6 +65,10 @@ class VeloxHashShuffleReaderDeserializer final : public
ColumnarBatchIterator {
std::unique_ptr<InMemoryPayload> merged_{nullptr};
bool reachEos_{false};
+ bool blockTypeResolved_{false};
+
+ std::vector<int32_t> dictionaryFields_{};
+ std::vector<facebook::velox::VectorPtr> dictionaries_{};
};
class VeloxSortShuffleReaderDeserializer final : public ColumnarBatchIterator {
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h
b/cpp/velox/shuffle/VeloxShuffleWriter.h
index 8a7b0bf187..58131442ad 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.h
@@ -120,7 +120,7 @@ class VeloxShuffleWriter : public ShuffleWriter {
const std::shared_ptr<ShuffleWriterOptions>& options,
MemoryManager* memoryManager)
: ShuffleWriter(numPartitions, options->partitioning),
-
partitionBufferPool_(memoryManager->createArrowMemoryPool("VeloxShuffleWriter.partitionBufferPool")),
+
partitionBufferPool_(memoryManager->getOrCreateArrowMemoryPool("VeloxShuffleWriter.partitionBufferPool")),
veloxPool_(dynamic_cast<VeloxMemoryManager*>(memoryManager)->getLeafMemoryPool()),
partitionWriter_(partitionWriter) {
partitioner_ = Partitioner::make(options->partitioning, numPartitions_,
options->startPartitionId);
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index 2086012e5c..ec9aee855b 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -30,7 +30,7 @@ class DummyMemoryManager final : public MemoryManager {
arrow::MemoryPool* defaultArrowMemoryPool() override {
throw GlutenException("Not yet implemented");
}
- std::shared_ptr<arrow::MemoryPool> createArrowMemoryPool(const std::string&
name) override {
+ std::shared_ptr<arrow::MemoryPool> getOrCreateArrowMemoryPool(const
std::string& name) override {
throw GlutenException("Not yet implemented");
}
const MemoryUsageStats collectMemoryUsageStats() const override {
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index cb6f1b0feb..4d56e094b9 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -42,6 +42,7 @@ struct ShuffleTestParams {
int32_t mergeBufferSize{0};
int32_t diskWriteBufferSize{0};
bool useRadixSort{false};
+ bool enableDictionary{false};
std::string toString() const {
std::ostringstream out;
@@ -50,7 +51,8 @@ struct ShuffleTestParams {
<< ", compressionType = " <<
arrow::util::Codec::GetCodecAsString(compressionType)
<< ", compressionThreshold = " << compressionThreshold << ",
mergeBufferSize = " << mergeBufferSize
<< ", compressionBufferSize = " << diskWriteBufferSize
- << ", useRadixSort = " << (useRadixSort ? "true" : "false");
+ << ", useRadixSort = " << (useRadixSort ? "true" : "false")
+ << ", enableDictionary = " << (enableDictionary ? "true" : "false");
return out.str();
}
};
@@ -124,12 +126,15 @@ std::vector<ShuffleTestParams> getTestParams() {
for (const auto compressionThreshold : compressionThresholds) {
// Local.
for (const auto mergeBufferSize : mergeBufferSizes) {
- params.push_back(ShuffleTestParams{
- .shuffleWriterType = ShuffleWriterType::kHashShuffle,
- .partitionWriterType = PartitionWriterType::kLocal,
- .compressionType = compression,
- .compressionThreshold = compressionThreshold,
- .mergeBufferSize = mergeBufferSize});
+ for (const bool enableDictionary : {true, false}) {
+ params.push_back(ShuffleTestParams{
+ .shuffleWriterType = ShuffleWriterType::kHashShuffle,
+ .partitionWriterType = PartitionWriterType::kLocal,
+ .compressionType = compression,
+ .compressionThreshold = compressionThreshold,
+ .mergeBufferSize = mergeBufferSize,
+ .enableDictionary = enableDictionary});
+ }
}
// Rss.
@@ -151,13 +156,15 @@ std::shared_ptr<PartitionWriter> createPartitionWriter(
const std::vector<std::string>& localDirs,
arrow::Compression::type compressionType,
int32_t mergeBufferSize,
- int32_t compressionThreshold) {
+ int32_t compressionThreshold,
+ bool enableDictionary) {
GLUTEN_ASSIGN_OR_THROW(auto codec,
arrow::util::Codec::Create(compressionType));
switch (partitionWriterType) {
case PartitionWriterType::kLocal: {
auto options = std::make_shared<LocalPartitionWriterOptions>();
options->mergeBufferSize = mergeBufferSize;
options->compressionThreshold = compressionThreshold;
+ options->enableDictionary = enableDictionary;
return std::make_shared<LocalPartitionWriter>(
numPartitions, std::move(codec), getDefaultMemoryManager(), options,
dataFile, std::move(localDirs));
}
@@ -227,23 +234,21 @@ class VeloxShuffleWriterTest : public
::testing::TestWithParam<ShuffleTestParams
shuffleWriterOptions = defaultShuffleWriterOptions();
}
+ const auto& params = GetParam();
const auto partitionWriter = createPartitionWriter(
- GetParam().partitionWriterType,
+ params.partitionWriterType,
numPartitions,
dataFile_,
localDirs_,
- GetParam().compressionType,
- GetParam().mergeBufferSize,
- GetParam().compressionThreshold);
+ params.compressionType,
+ params.mergeBufferSize,
+ params.compressionThreshold,
+ params.enableDictionary);
GLUTEN_ASSIGN_OR_THROW(
auto shuffleWriter,
VeloxShuffleWriter::create(
- GetParam().shuffleWriterType,
- numPartitions,
- partitionWriter,
- shuffleWriterOptions,
- getDefaultMemoryManager()));
+ params.shuffleWriterType, numPartitions, partitionWriter,
shuffleWriterOptions, getDefaultMemoryManager()));
return shuffleWriter;
}
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 8fb73131c9..2ade39267c 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -53,6 +53,7 @@ You can add these configurations into spark-defaults.conf to
enable or disable t
| spark.gluten.sql.columnar.shuffle.merge.threshold | Set the
threshold control the minimum merged size. When a partition buffer is full, and
the number of rows is below (`threshold *
spark.gluten.sql.columnar.maxBatchSize`), it will be saved for merging.
[...]
| spark.gluten.sql.columnar.shuffle.readerBufferSize | Buffer size
in bytes for shuffle reader reading input stream from local or remote.
[...]
| spark.gluten.sql.columnar.shuffle.sort.deserializerBufferSize | Buffer size
in bytes for sort-based shuffle reader deserializing raw input to columnar
batch.
[...]
+| spark.gluten.sql.columnar.shuffle.dictionary.enabled | Enable
dictionary in hash-based shuffle.
[...]
| spark.gluten.sql.columnar.numaBinding | Set up
NUMABinding, default is false
[...]
| spark.gluten.sql.columnar.coreRange | Set up the
core range for NUMABinding, only works when numaBinding set to true. <br /> The
setting is based on the number of cores in your system. Use 72 cores as an
example.
[...]
| spark.gluten.sql.columnar.wholeStage.fallback.threshold | Configure
the threshold for whether whole stage will fall back in AQE supported case by
counting the number of ColumnarToRow & vanilla leaf node
[...]
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
index 97459f82d6..eabc975e02 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
@@ -29,6 +29,8 @@ public class GlutenSplitResult {
private final long peakBytes;
private final long sortTime;
private final long c2rTime;
+ private final double avgDictionaryFields;
+ private final long dictionarySize;
public GlutenSplitResult(
long totalComputePidTime,
@@ -41,6 +43,8 @@ public class GlutenSplitResult {
long totalBytesEvicted,
long totalBytesToEvict, // In-memory bytes(uncompressed) before spill.
long peakBytes,
+ double avgDictionaryFields,
+ long dictionarySize,
long[] partitionLengths,
long[] rawPartitionLengths) {
this.totalComputePidTime = totalComputePidTime;
@@ -55,6 +59,8 @@ public class GlutenSplitResult {
this.peakBytes = peakBytes;
this.sortTime = totalSortTime;
this.c2rTime = totalC2RTime;
+ this.avgDictionaryFields = avgDictionaryFields;
+ this.dictionarySize = dictionarySize;
}
public long getTotalComputePidTime() {
@@ -108,4 +114,12 @@ public class GlutenSplitResult {
public long getC2RTime() {
return c2rTime;
}
+
+ public double getAvgDictionaryFields() {
+ return avgDictionaryFields;
+ }
+
+ public long getDictionarySize() {
+ return dictionarySize;
+ }
}
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/LocalPartitionWriterJniWrapper.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/LocalPartitionWriterJniWrapper.java
index be74df1b8a..3269e7b176 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/LocalPartitionWriterJniWrapper.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/LocalPartitionWriterJniWrapper.java
@@ -47,5 +47,6 @@ public class LocalPartitionWriterJniWrapper implements
RuntimeAware {
int subDirsPerLocalDir,
int shuffleFileBufferSize,
String dataFile,
- String localDirs);
+ String localDirs,
+ boolean enableDictionary);
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 16dd6b3e1b..f562d1c127 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -196,6 +196,9 @@ class GlutenConfig(conf: SQLConf) extends
GlutenCoreConfig(conf) {
def columnarSortShuffleDeserializerBufferSize: Long =
getConf(COLUMNAR_SORT_SHUFFLE_DESERIALIZER_BUFFER_SIZE)
+ def columnarShuffleEnableDictionary: Boolean =
+ getConf(SHUFFLE_ENABLE_DICTIONARY)
+
def maxBatchSize: Int = getConf(COLUMNAR_MAX_BATCH_SIZE)
def shuffleWriterBufferSize: Int = getConf(SHUFFLE_WRITER_BUFFER_SIZE)
@@ -1037,6 +1040,13 @@ object GlutenConfig {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1MB")
+ val SHUFFLE_ENABLE_DICTIONARY =
+ buildConf("spark.gluten.sql.columnar.shuffle.dictionary.enabled")
+ .internal()
+ .doc("Enable dictionary in hash-based shuffle.")
+ .booleanConf
+ .createWithDefault(false)
+
val COLUMNAR_MAX_BATCH_SIZE =
buildConf("spark.gluten.sql.columnar.maxBatchSize")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]