This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new de4af5c5e0 [GLUTEN-8855][VL] Support dictionary in hash based shuffle 
(#9727)
de4af5c5e0 is described below

commit de4af5c5e0825501abb2b0c368235484ef62ec5b
Author: Rong Ma <[email protected]>
AuthorDate: Wed Jun 25 09:11:02 2025 +0100

    [GLUTEN-8855][VL] Support dictionary in hash based shuffle (#9727)
---
 .../gluten/backendsapi/velox/VeloxMetricsApi.scala |   5 +-
 .../spark/shuffle/ColumnarShuffleWriter.scala      |   5 +-
 cpp/core/CMakeLists.txt                            |   1 +
 cpp/core/jni/JniWrapper.cc                         |  17 +-
 cpp/core/memory/MemoryManager.h                    |   6 +-
 cpp/core/shuffle/Dictionary.cc                     |  41 ++
 cpp/core/shuffle/Dictionary.h                      |  60 +++
 cpp/core/shuffle/LocalPartitionWriter.cc           | 107 ++++-
 cpp/core/shuffle/Options.h                         |  11 +-
 cpp/core/shuffle/PartitionWriter.h                 |   2 +-
 cpp/core/shuffle/Payload.cc                        |  32 +-
 cpp/core/shuffle/Payload.h                         |   3 +
 cpp/core/shuffle/ShuffleWriter.cc                  |   8 +
 cpp/core/shuffle/ShuffleWriter.h                   |   4 +
 cpp/core/shuffle/rss/RssPartitionWriter.cc         |   4 +
 cpp/velox/CMakeLists.txt                           |   1 +
 cpp/velox/benchmarks/GenericBenchmark.cc           |   3 +
 cpp/velox/compute/VeloxBackend.cc                  |   6 +
 cpp/velox/memory/VeloxMemoryManager.cc             |  12 +-
 cpp/velox/memory/VeloxMemoryManager.h              |   2 +-
 cpp/velox/shuffle/ArrowShuffleDictionaryWriter.cc  | 483 +++++++++++++++++++++
 cpp/velox/shuffle/ArrowShuffleDictionaryWriter.h   |  71 +++
 cpp/velox/shuffle/VeloxShuffleReader.cc            | 354 ++++++++++++---
 cpp/velox/shuffle/VeloxShuffleReader.h             |   8 +
 cpp/velox/shuffle/VeloxShuffleWriter.h             |   2 +-
 cpp/velox/tests/RuntimeTest.cc                     |   2 +-
 cpp/velox/tests/VeloxShuffleWriterTest.cc          |  39 +-
 docs/Configuration.md                              |   1 +
 .../gluten/vectorized/GlutenSplitResult.java       |  14 +
 .../vectorized/LocalPartitionWriterJniWrapper.java |   3 +-
 .../org/apache/gluten/config/GlutenConfig.scala    |  10 +
 31 files changed, 1197 insertions(+), 120 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 304f764aa0..14d4059679 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -321,7 +321,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       )
     } else {
       baseMetrics ++ Map(
-        "splitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time 
to split")
+        "splitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time 
to split"),
+        "avgDictionaryFields" -> SQLMetrics
+          .createAverageMetric(sparkContext, "avg dictionary fields"),
+        "dictionarySize" -> SQLMetrics.createSizeMetric(sparkContext, 
"dictionary size")
       )
     }
   }
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
 
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index 1597f45b88..6f5348ac0b 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -145,7 +145,8 @@ class ColumnarShuffleWriter[K, V](
             blockManager.subDirsPerLocalDir,
             conf.get(SHUFFLE_FILE_BUFFER_SIZE).toInt,
             tempDataFile.getAbsolutePath,
-            localDirs
+            localDirs,
+            GlutenConfig.get.columnarShuffleEnableDictionary
           )
 
           nativeShuffleWriter = if (isSort) {
@@ -223,6 +224,8 @@ class ColumnarShuffleWriter[K, V](
           dep.metrics("shuffleWallTime").value - splitResult.getTotalSpillTime 
-
             splitResult.getTotalWriteTime -
             splitResult.getTotalCompressTime)
+      
dep.metrics("avgDictionaryFields").set(splitResult.getAvgDictionaryFields)
+      dep.metrics("dictionarySize").add(splitResult.getDictionarySize)
     } else {
       dep.metrics("sortTime").add(splitResult.getSortTime)
       dep.metrics("c2rTime").add(splitResult.getC2RTime)
diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt
index b164f2a953..5264eeb63e 100644
--- a/cpp/core/CMakeLists.txt
+++ b/cpp/core/CMakeLists.txt
@@ -128,6 +128,7 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
     memory/MemoryManager.cc
     memory/ArrowMemoryPool.cc
     memory/ColumnarBatch.cc
+    shuffle/Dictionary.cc
     shuffle/FallbackRangePartitioner.cc
     shuffle/HashPartitioner.cc
     shuffle/LocalPartitionWriter.cc
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 59c0fa5078..1d2bddfcff 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -147,7 +147,7 @@ class InternalMemoryManager : public MemoryManager {
     throw GlutenException("Not implemented");
   }
 
-  std::shared_ptr<arrow::MemoryPool> createArrowMemoryPool(const std::string& 
name) override {
+  std::shared_ptr<arrow::MemoryPool> getOrCreateArrowMemoryPool(const 
std::string& name) override {
     throw GlutenException("Not yet implemented");
   }
 
@@ -215,7 +215,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
   jniByteInputStreamClose = getMethodIdOrError(env, jniByteInputStreamClass, 
"close", "()V");
 
   splitResultClass = createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/vectorized/GlutenSplitResult;");
-  splitResultConstructor = getMethodIdOrError(env, splitResultClass, "<init>", 
"(JJJJJJJJJJ[J[J)V");
+  splitResultConstructor = getMethodIdOrError(env, splitResultClass, "<init>", 
"(JJJJJJJJJJDJ[J[J)V");
 
   metricsBuilderClass = createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/metrics/Metrics;");
 
@@ -794,7 +794,8 @@ 
Java_org_apache_gluten_vectorized_LocalPartitionWriterJniWrapper_createPartition
     jint numSubDirs,
     jint shuffleFileBufferSize,
     jstring dataFileJstr,
-    jstring localDirsJstr) {
+    jstring localDirsJstr,
+    jboolean enableDictionary) {
   JNI_METHOD_START
 
   const auto ctx = getRuntime(env, wrapper);
@@ -803,7 +804,13 @@ 
Java_org_apache_gluten_vectorized_LocalPartitionWriterJniWrapper_createPartition
   auto localDirs = splitPaths(jStringToCString(env, localDirsJstr));
 
   auto partitionWriterOptions = std::make_shared<LocalPartitionWriterOptions>(
-      shuffleFileBufferSize, compressionBufferSize, compressionThreshold, 
mergeBufferSize, mergeThreshold, numSubDirs);
+      shuffleFileBufferSize,
+      compressionBufferSize,
+      compressionThreshold,
+      mergeBufferSize,
+      mergeThreshold,
+      numSubDirs,
+      enableDictionary);
 
   auto partitionWriter = std::make_shared<LocalPartitionWriter>(
       numPartitions,
@@ -980,6 +987,8 @@ JNIEXPORT jobject JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrap
       shuffleWriter->totalBytesEvicted(),
       shuffleWriter->totalBytesToEvict(),
       shuffleWriter->peakBytesAllocated(),
+      shuffleWriter->avgDictionaryFields(),
+      shuffleWriter->dictionarySize(),
       partitionLengthArr,
       rawPartitionLengthArr);
 
diff --git a/cpp/core/memory/MemoryManager.h b/cpp/core/memory/MemoryManager.h
index cb7512d23c..93d875ad1e 100644
--- a/cpp/core/memory/MemoryManager.h
+++ b/cpp/core/memory/MemoryManager.h
@@ -42,10 +42,10 @@ class MemoryManager {
   // Get the default Arrow memory pool for this memory manager. This memory 
pool is held by the memory manager.
   virtual arrow::MemoryPool* defaultArrowMemoryPool() = 0;
 
-  // Create a new Arrow memory pool with the given name. The caller is 
responsible for managing the lifetime of the
+  // Return the Arrow memory pool with the given name. The caller is 
responsible for managing the lifetime of the
   // returned memory pool. Memory manager only holds the weak reference to the 
memory pool for collecting memory usage.
-  // If the name is already used by an existing memory pool, the creation will 
fail.
-  virtual std::shared_ptr<arrow::MemoryPool> createArrowMemoryPool(const 
std::string& name) = 0;
+  // If the memory pool with the given name does not exist, it will create a 
new one and return it.
+  virtual std::shared_ptr<arrow::MemoryPool> getOrCreateArrowMemoryPool(const 
std::string& name) = 0;
 
   virtual const MemoryUsageStats collectMemoryUsageStats() const = 0;
 
diff --git a/cpp/core/shuffle/Dictionary.cc b/cpp/core/shuffle/Dictionary.cc
new file mode 100644
index 0000000000..05bfb4131a
--- /dev/null
+++ b/cpp/core/shuffle/Dictionary.cc
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "shuffle/Dictionary.h"
+#include "utils/Exception.h"
+
+namespace gluten {
+
+static ShuffleDictionaryWriterFactory dictionaryWriterFactory;
+
+void registerShuffleDictionaryWriterFactory(ShuffleDictionaryWriterFactory 
factory) {
+  if (dictionaryWriterFactory) {
+    throw GlutenException("DictionaryWriter factory already registered.");
+  }
+  dictionaryWriterFactory = std::move(factory);
+}
+
+std::unique_ptr<ShuffleDictionaryWriter> createDictionaryWriter(
+    MemoryManager* memoryManager,
+    arrow::util::Codec* codec) {
+  if (!dictionaryWriterFactory) {
+    throw GlutenException("DictionaryWriter factory not registered.");
+  }
+  return dictionaryWriterFactory(memoryManager, codec);
+}
+
+} // namespace gluten
diff --git a/cpp/core/shuffle/Dictionary.h b/cpp/core/shuffle/Dictionary.h
new file mode 100644
index 0000000000..246406d5ae
--- /dev/null
+++ b/cpp/core/shuffle/Dictionary.h
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <arrow/io/api.h>
+
+#include "memory/MemoryManager.h"
+
+namespace gluten {
+
+enum class BlockType : uint8_t { kEndOfStream = 0, kPlainPayload = 1, 
kDictionary = 2, kDictionaryPayload = 3 };
+
+class ShuffleDictionaryStorage {
+ public:
+  virtual ~ShuffleDictionaryStorage() = default;
+
+  virtual arrow::Status serialize(arrow::io::OutputStream* out) = 0;
+};
+
+class ShuffleDictionaryWriter {
+ public:
+  virtual ~ShuffleDictionaryWriter() = default;
+
+  virtual arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> 
updateAndGet(
+      const std::shared_ptr<arrow::Schema>& schema,
+      int32_t numRows,
+      const std::vector<std::shared_ptr<arrow::Buffer>>& buffers) = 0;
+
+  virtual arrow::Status serialize(arrow::io::OutputStream* out) = 0;
+
+  virtual int64_t numDictionaryFields() = 0;
+
+  virtual int64_t getDictionarySize() = 0;
+};
+
+using ShuffleDictionaryWriterFactory =
+    std::function<std::unique_ptr<ShuffleDictionaryWriter>(MemoryManager* 
memoryManager, arrow::util::Codec* codec)>;
+
+void registerShuffleDictionaryWriterFactory(ShuffleDictionaryWriterFactory 
factory);
+
+std::unique_ptr<ShuffleDictionaryWriter> createDictionaryWriter(
+    MemoryManager* memoryManager,
+    arrow::util::Codec* codec);
+
+} // namespace gluten
diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc 
b/cpp/core/shuffle/LocalPartitionWriter.cc
index eb7668185e..457db60a5c 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -17,6 +17,7 @@
 
 #include "shuffle/LocalPartitionWriter.h"
 
+#include "shuffle/Dictionary.h"
 #include "shuffle/Payload.h"
 #include "shuffle/Spill.h"
 #include "shuffle/Utils.h"
@@ -71,6 +72,10 @@ class LocalPartitionWriter::LocalSpiller {
 
   arrow::Status spill(uint32_t partitionId, std::unique_ptr<BlockPayload> 
payload) {
     ARROW_ASSIGN_OR_RAISE(auto start, os_->Tell());
+
+    static constexpr uint8_t kSpillBlockType = 
static_cast<uint8_t>(BlockType::kPlainPayload);
+
+    RETURN_NOT_OK(os_->Write(&kSpillBlockType, sizeof(kSpillBlockType)));
     RETURN_NOT_OK(payload->serialize(os_.get()));
 
     ARROW_ASSIGN_OR_RAISE(auto end, os_->Tell());
@@ -283,8 +288,19 @@ class LocalPartitionWriter::PayloadMerger {
 
 class LocalPartitionWriter::PayloadCache {
  public:
-  PayloadCache(uint32_t numPartitions, arrow::util::Codec* codec, int32_t 
compressionThreshold, arrow::MemoryPool* pool)
-      : numPartitions_(numPartitions), codec_(codec), 
compressionThreshold_(compressionThreshold), pool_(pool) {}
+  PayloadCache(
+      uint32_t numPartitions,
+      arrow::util::Codec* codec,
+      int32_t compressionThreshold,
+      bool enableDictionary,
+      arrow::MemoryPool* pool,
+      MemoryManager* memoryManager)
+      : numPartitions_(numPartitions),
+        codec_(codec),
+        compressionThreshold_(compressionThreshold),
+        enableDictionary_(enableDictionary),
+        pool_(pool),
+        memoryManager_(memoryManager) {}
 
   arrow::Status cache(uint32_t partitionId, std::unique_ptr<InMemoryPayload> 
payload) {
     PartitionScopeGuard cacheGuard(partitionInUse_, partitionId);
@@ -293,6 +309,13 @@ class LocalPartitionWriter::PayloadCache {
       partitionCachedPayload_[partitionId] = 
std::list<std::unique_ptr<BlockPayload>>{};
     }
 
+    if (enableDictionary_) {
+      if (partitionDictionaries_.find(partitionId) == 
partitionDictionaries_.end()) {
+        partitionDictionaries_[partitionId] = 
createDictionaryWriter(memoryManager_, codec_);
+      }
+      
RETURN_NOT_OK(payload->createDictionaries(partitionDictionaries_[partitionId]));
+    }
+
     bool shouldCompress = codec_ != nullptr && payload->numRows() >= 
compressionThreshold_;
     ARROW_ASSIGN_OR_RAISE(
         auto block,
@@ -309,12 +332,17 @@ class LocalPartitionWriter::PayloadCache {
         "Invalid status: partitionInUse_ is set: " + 
std::to_string(partitionInUse_.value()));
 
     if (hasCachedPayloads(partitionId)) {
+      ARROW_ASSIGN_OR_RAISE(const bool hasDictionaries, 
writeDictionaries(partitionId, os));
+
       auto& payloads = partitionCachedPayload_[partitionId];
       while (!payloads.empty()) {
-        auto payload = std::move(payloads.front());
+        const auto payload = std::move(payloads.front());
         payloads.pop_front();
 
         // Write the cached payload to disk.
+        uint8_t blockType =
+            static_cast<uint8_t>(hasDictionaries ? 
BlockType::kDictionaryPayload : BlockType::kPlainPayload);
+        RETURN_NOT_OK(os->Write(&blockType, sizeof(blockType)));
         RETURN_NOT_OK(payload->serialize(os));
 
         compressTime_ += payload->getCompressTime();
@@ -342,7 +370,7 @@ class LocalPartitionWriter::PayloadCache {
       arrow::util::Codec* codec,
       const int64_t bufferSize,
       int64_t& totalBytesToEvict) {
-    ARROW_ASSIGN_OR_RAISE(auto os, openFile(spillFile, bufferSize));
+    ARROW_ASSIGN_OR_RAISE(const auto os, openFile(spillFile, bufferSize));
 
     int64_t start = 0;
     auto diskSpill = std::make_shared<Spill>();
@@ -353,6 +381,8 @@ class LocalPartitionWriter::PayloadCache {
       }
 
       if (hasCachedPayloads(pid)) {
+        ARROW_ASSIGN_OR_RAISE(const bool hasDictionaries, 
writeDictionaries(pid, os.get()));
+
         auto& payloads = partitionCachedPayload_[pid];
         while (!payloads.empty()) {
           auto payload = std::move(payloads.front());
@@ -360,7 +390,11 @@ class LocalPartitionWriter::PayloadCache {
           totalBytesToEvict += payload->rawSize();
 
           // Spill the cached payload to disk.
+          uint8_t blockType =
+              static_cast<uint8_t>(hasDictionaries ? 
BlockType::kDictionaryPayload : BlockType::kPlainPayload);
+          RETURN_NOT_OK(os->Write(&blockType, sizeof(blockType)));
           RETURN_NOT_OK(payload->serialize(os.get()));
+
           compressTime_ += payload->getCompressTime();
           spillTime_ += payload->getWriteTime();
         }
@@ -394,22 +428,71 @@ class LocalPartitionWriter::PayloadCache {
     return writeTime_;
   }
 
+  double getAvgDictionaryFields() const {
+    if (numDictionaryPayloads_ == 0 || dictionaryFieldCount_ == 0) {
+      return 0.0;
+    }
+    return dictionaryFieldCount_ / static_cast<double>(numDictionaryPayloads_);
+  }
+
+  int64_t getDictionarySize() const {
+    return dictionarySize_;
+  }
+
  private:
   bool hasCachedPayloads(uint32_t partitionId) {
     return partitionCachedPayload_.find(partitionId) != 
partitionCachedPayload_.end() &&
         !partitionCachedPayload_[partitionId].empty();
   }
 
+  arrow::Result<bool> writeDictionaries(uint32_t partitionId, 
arrow::io::OutputStream* os) {
+    if (!enableDictionary_) {
+      return false;
+    }
+
+    const auto& dict = partitionDictionaries_.find(partitionId);
+    GLUTEN_DCHECK(
+        dict != partitionDictionaries_.end(),
+        "Dictionary for partition " + std::to_string(partitionId) + " not 
found.");
+
+    const auto numDictionaryFields = dict->second->numDictionaryFields();
+    dictionaryFieldCount_ += numDictionaryFields;
+    ++numDictionaryPayloads_;
+
+    if (numDictionaryFields == 0) {
+      // No dictionary fields, no need to write.
+      return false;
+    }
+
+    dictionarySize_ += 
partitionDictionaries_[partitionId]->getDictionarySize();
+
+    static constexpr uint8_t kDictionaryBlock = 
static_cast<uint8_t>(BlockType::kDictionary);
+    RETURN_NOT_OK(os->Write(&kDictionaryBlock, sizeof(kDictionaryBlock)));
+    RETURN_NOT_OK(partitionDictionaries_[partitionId]->serialize(os));
+
+    partitionDictionaries_.erase(partitionId);
+
+    return true;
+  }
+
   uint32_t numPartitions_;
   arrow::util::Codec* codec_;
   int32_t compressionThreshold_;
+  bool enableDictionary_;
   arrow::MemoryPool* pool_;
+  MemoryManager* memoryManager_;
 
   int64_t compressTime_{0};
   int64_t spillTime_{0};
   int64_t writeTime_{0};
   std::unordered_map<uint32_t, std::list<std::unique_ptr<BlockPayload>>> 
partitionCachedPayload_;
 
+  std::unordered_map<uint32_t, std::shared_ptr<ShuffleDictionaryWriter>> 
partitionDictionaries_;
+
+  int64_t dictionaryFieldCount_{0};
+  int64_t numDictionaryPayloads_{0};
+  int64_t dictionarySize_{0};
+
   std::optional<uint32_t> partitionInUse_{std::nullopt};
 };
 
@@ -603,7 +686,12 @@ arrow::Status LocalPartitionWriter::finishMerger() {
       if (maybeMerged.has_value()) {
         if (payloadCache_ == nullptr) {
           payloadCache_ = std::make_shared<PayloadCache>(
-              numPartitions_, codec_.get(), options_->compressionThreshold, 
payloadPool_.get());
+              numPartitions_,
+              codec_.get(),
+              options_->compressionThreshold,
+              options_->enableDictionary,
+              payloadPool_.get(),
+              memoryManager_);
         }
         // Spill can be triggered by compressing or building dictionaries.
         RETURN_NOT_OK(payloadCache_->cache(pid, 
std::move(maybeMerged.value())));
@@ -646,7 +734,12 @@ arrow::Status LocalPartitionWriter::hashEvict(
   if (!merged.empty()) {
     if (UNLIKELY(!payloadCache_)) {
       payloadCache_ = std::make_shared<PayloadCache>(
-          numPartitions_, codec_.get(), options_->compressionThreshold, 
payloadPool_.get());
+          numPartitions_,
+          codec_.get(),
+          options_->compressionThreshold,
+          options_->enableDictionary,
+          payloadPool_.get(),
+          memoryManager_);
     }
     for (auto& payload : merged) {
       RETURN_NOT_OK(payloadCache_->cache(partitionId, std::move(payload)));
@@ -744,6 +837,8 @@ arrow::Status 
LocalPartitionWriter::populateMetrics(ShuffleWriterMetrics* metric
     spillTime_ += payloadCache_->getSpillTime();
     writeTime_ += payloadCache_->getWriteTime();
     compressTime_ += payloadCache_->getCompressTime();
+    metrics->avgDictionaryFields = payloadCache_->getAvgDictionaryFields();
+    metrics->dictionarySize = payloadCache_->getDictionarySize();
   }
 
   metrics->totalCompressTime += compressTime_;
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index a5f051848c..717f75dea5 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -41,6 +41,7 @@ static constexpr int32_t kDefaultSortBufferSize = 4096;
 static constexpr int64_t kDefaultReadBufferSize = 1 << 20;
 static constexpr int64_t kDefaultDeserializerBufferSize = 1 << 20;
 static constexpr int64_t kDefaultShuffleFileBufferSize = 32 << 10;
+static constexpr bool kDefaultEnableDictionary = false;
 
 enum class ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle };
 
@@ -141,6 +142,8 @@ struct LocalPartitionWriterOptions {
 
   int32_t numSubDirs = kDefaultNumSubDirs; // spark.diskStore.subDirectories
 
+  bool enableDictionary = kDefaultEnableDictionary;
+
   LocalPartitionWriterOptions() = default;
 
   LocalPartitionWriterOptions(
@@ -149,13 +152,15 @@ struct LocalPartitionWriterOptions {
       int64_t compressionThreshold,
       int32_t mergeBufferSize,
       double mergeThreshold,
-      int32_t numSubDirs)
+      int32_t numSubDirs,
+      bool enableDictionary)
       : shuffleFileBufferSize(shuffleFileBufferSize),
         compressionBufferSize(compressionBufferSize),
         compressionThreshold(compressionThreshold),
         mergeBufferSize(mergeBufferSize),
         mergeThreshold(mergeThreshold),
-        numSubDirs(numSubDirs) {}
+        numSubDirs(numSubDirs),
+        enableDictionary(enableDictionary) {}
 };
 
 struct RssPartitionWriterOptions {
@@ -179,6 +184,8 @@ struct ShuffleWriterMetrics {
   int64_t totalWriteTime{0};
   int64_t totalEvictTime{0};
   int64_t totalCompressTime{0};
+  double avgDictionaryFields{0};
+  int64_t dictionarySize{0};
   std::vector<int64_t> partitionLengths{};
   std::vector<int64_t> rawPartitionLengths{}; // Uncompressed size.
 };
diff --git a/cpp/core/shuffle/PartitionWriter.h 
b/cpp/core/shuffle/PartitionWriter.h
index 7fbcd73f58..cedae6d2fd 100644
--- a/cpp/core/shuffle/PartitionWriter.h
+++ b/cpp/core/shuffle/PartitionWriter.h
@@ -33,7 +33,7 @@ class PartitionWriter : public Reclaimable {
  public:
   PartitionWriter(uint32_t numPartitions, std::unique_ptr<arrow::util::Codec> 
codec, MemoryManager* memoryManager)
       : numPartitions_(numPartitions), codec_(std::move(codec)), 
memoryManager_(memoryManager) {
-    payloadPool_ = 
memoryManager->createArrowMemoryPool("PartitionWriter.cached_payload");
+    payloadPool_ = 
memoryManager->getOrCreateArrowMemoryPool("PartitionWriter.cached_payload");
   }
 
   static inline std::string typeToString(PartitionWriterType type) {
diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc
index 352f46ffb6..27e7d9c85c 100644
--- a/cpp/core/shuffle/Payload.cc
+++ b/cpp/core/shuffle/Payload.cc
@@ -50,13 +50,10 @@ T* advance(uint8_t** dst) {
   return ptr;
 }
 
-arrow::Result<uint8_t> readType(arrow::io::InputStream* inputStream) {
+arrow::Result<uint8_t> readPayloadType(arrow::io::InputStream* is) {
   uint8_t type;
-  ARROW_ASSIGN_OR_RAISE(auto bytes, inputStream->Read(sizeof(Payload::Type), 
&type));
-  if (bytes == 0) {
-    // Reach EOS.
-    return 0;
-  }
+  ARROW_ASSIGN_OR_RAISE(auto bytes, is->Read(sizeof(Payload::Type), &type));
+  ARROW_RETURN_IF(bytes == 0, arrow::Status::IOError("Failed to read bytes. 
Reached EOS."));
   return type;
 }
 
@@ -298,12 +295,8 @@ arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> 
BlockPayload::deseria
     int64_t& deserializeTime,
     int64_t& decompressTime) {
   auto timer = std::make_unique<ScopedTimer>(&deserializeTime);
-  static const std::vector<std::shared_ptr<arrow::Buffer>> kEmptyBuffers{};
-  ARROW_ASSIGN_OR_RAISE(auto type, readType(inputStream));
-  if (type == 0) {
-    numRows = 0;
-    return kEmptyBuffers;
-  }
+  ARROW_ASSIGN_OR_RAISE(auto type, readPayloadType(inputStream));
+
   RETURN_NOT_OK(inputStream->Read(sizeof(uint32_t), &numRows));
   uint32_t numBuffers;
   RETURN_NOT_OK(inputStream->Read(sizeof(uint32_t), &numBuffers));
@@ -481,6 +474,11 @@ std::shared_ptr<arrow::Schema> InMemoryPayload::schema() 
const {
   return schema_;
 }
 
+arrow::Status InMemoryPayload::createDictionaries(const 
std::shared_ptr<ShuffleDictionaryWriter>& dictionaryWriter) {
+  ARROW_ASSIGN_OR_RAISE(buffers_, dictionaryWriter->updateAndGet(schema_, 
numRows_, buffers_));
+  return arrow::Status::OK();
+}
+
 UncompressedDiskBlockPayload::UncompressedDiskBlockPayload(
     Type type,
     uint32_t numRows,
@@ -512,11 +510,17 @@ arrow::Status 
UncompressedDiskBlockPayload::serialize(arrow::io::OutputStream* o
 
   GLUTEN_CHECK(codec_ != nullptr, "Codec is null when serializing 
Payload::kToBeCompressed.");
 
+  uint8_t blockType;
+  ARROW_ASSIGN_OR_RAISE(auto bytes, inputStream_->Read(sizeof(blockType), 
&blockType));
+  ARROW_RETURN_IF(bytes == 0, arrow::Status::Invalid("Cannot serialize 
payload. Reached EOS."));
+
+  RETURN_NOT_OK(outputStream->Write(&blockType, sizeof(blockType)));
+
   RETURN_NOT_OK(outputStream->Write(&kCompressedType, 
sizeof(kCompressedType)));
   RETURN_NOT_OK(outputStream->Write(&numRows_, sizeof(uint32_t)));
 
   uint32_t numBuffers = 0;
-  ARROW_ASSIGN_OR_RAISE(auto bytes, inputStream_->Read(sizeof(uint32_t), 
&numBuffers));
+  ARROW_ASSIGN_OR_RAISE(bytes, inputStream_->Read(sizeof(uint32_t), 
&numBuffers));
   ARROW_RETURN_IF(bytes == 0 || numBuffers == 0, 
arrow::Status::Invalid("Cannot serialize payload with 0 buffers."));
 
   RETURN_NOT_OK(outputStream->Write(&numBuffers, sizeof(uint32_t)));
@@ -524,7 +528,7 @@ arrow::Status 
UncompressedDiskBlockPayload::serialize(arrow::io::OutputStream* o
   ARROW_ASSIGN_OR_RAISE(auto start, inputStream_->Tell());
 
   auto pos = start;
-  auto rawBufferSize = rawSize_ - sizeof(numBuffers);
+  auto rawBufferSize = rawSize_ - sizeof(blockType) - sizeof(numBuffers);
 
   while (pos - start < rawBufferSize) {
     ARROW_ASSIGN_OR_RAISE(auto uncompressed, readUncompressedBuffer());
diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h
index 2f481c5197..dfe98cafd5 100644
--- a/cpp/core/shuffle/Payload.h
+++ b/cpp/core/shuffle/Payload.h
@@ -21,6 +21,7 @@
 #include <arrow/io/interfaces.h>
 #include <arrow/memory_pool.h>
 
+#include "shuffle/Dictionary.h"
 #include "shuffle/Options.h"
 #include "shuffle/Utils.h"
 
@@ -148,6 +149,8 @@ class InMemoryPayload final : public Payload {
 
   std::shared_ptr<arrow::Schema> schema() const;
 
+  arrow::Status createDictionaries(const 
std::shared_ptr<ShuffleDictionaryWriter>& dictionaryWriter);
+
  private:
   std::shared_ptr<arrow::Schema> schema_;
   std::vector<std::shared_ptr<arrow::Buffer>> buffers_;
diff --git a/cpp/core/shuffle/ShuffleWriter.cc 
b/cpp/core/shuffle/ShuffleWriter.cc
index 16e8b32624..099794ddb7 100644
--- a/cpp/core/shuffle/ShuffleWriter.cc
+++ b/cpp/core/shuffle/ShuffleWriter.cc
@@ -87,6 +87,14 @@ int64_t ShuffleWriter::totalC2RTime() const {
   return 0;
 }
 
+double ShuffleWriter::avgDictionaryFields() const {
+  return metrics_.avgDictionaryFields;
+}
+
+int64_t ShuffleWriter::dictionarySize() const {
+  return metrics_.dictionarySize;
+}
+
 const std::vector<int64_t>& ShuffleWriter::partitionLengths() const {
   return metrics_.partitionLengths;
 }
diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h
index 224c2726e4..1028b0a318 100644
--- a/cpp/core/shuffle/ShuffleWriter.h
+++ b/cpp/core/shuffle/ShuffleWriter.h
@@ -59,6 +59,10 @@ class ShuffleWriter : public Reclaimable {
 
   virtual int64_t totalC2RTime() const;
 
+  double avgDictionaryFields() const;
+
+  int64_t dictionarySize() const;
+
   const std::vector<int64_t>& partitionLengths() const;
 
   const std::vector<int64_t>& rawPartitionLengths() const;
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc 
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 83a31cac1e..d52bd5dbf3 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -118,6 +118,10 @@ arrow::Status RssPartitionWriter::doEvict(uint32_t 
partitionId, std::unique_ptr<
       auto payload, inMemoryPayload->toBlockPayload(payloadType, 
payloadPool_.get(), codec_ ? codec_.get() : nullptr));
   // Copy payload to arrow buffered os.
   ARROW_ASSIGN_OR_RAISE(auto rssBufferOs, 
arrow::io::BufferOutputStream::Create(options_->pushBufferMaxSize));
+
+  static constexpr uint8_t kRssBlock = 
static_cast<uint8_t>(BlockType::kPlainPayload);
+  RETURN_NOT_OK(rssBufferOs->Write(&kRssBlock, sizeof(kRssBlock)));
+
   RETURN_NOT_OK(payload->serialize(rssBufferOs.get()));
   payload = nullptr; // Invalidate payload immediately.
 
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 4deb062174..213a33dc64 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -148,6 +148,7 @@ set(VELOX_SRCS
     operators/serializer/VeloxRowToColumnarConverter.cc
     operators/writer/VeloxColumnarBatchWriter.cc
     operators/writer/VeloxParquetDataSource.cc
+    shuffle/ArrowShuffleDictionaryWriter.cc
     shuffle/VeloxHashShuffleWriter.cc
     shuffle/VeloxRssSortShuffleWriter.cc
     shuffle/VeloxShuffleReader.cc
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc 
b/cpp/velox/benchmarks/GenericBenchmark.cc
index bc811c19cf..ad591c91fb 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -63,6 +63,7 @@ DEFINE_string(
     "lz4",
     "Specify the compression codec. Valid options are none, lz4, zstd, 
qat_gzip, qat_zstd, iaa_gzip");
 DEFINE_int32(shuffle_partitions, 200, "Number of shuffle split (reducer) 
partitions");
+DEFINE_bool(shuffle_dictionary, false, "Whether to enable dictionary encoding 
for shuffle write.");
 
 DEFINE_string(plan, "", "Path to input json file of the substrait plan.");
 DEFINE_string(
@@ -208,7 +209,9 @@ createPartitionWriter(Runtime* runtime, const std::string& 
dataFile, const std::
     return std::make_shared<RssPartitionWriter>(
         FLAGS_shuffle_partitions, createCodec(), runtime->memoryManager(), 
options, std::move(rssClient));
   }
+
   auto options = std::make_shared<LocalPartitionWriterOptions>();
+  options->enableDictionary = FLAGS_shuffle_dictionary;
   return std::make_unique<LocalPartitionWriter>(
       FLAGS_shuffle_partitions, createCodec(), runtime->memoryManager(), 
options, dataFile, localDirs);
 }
diff --git a/cpp/velox/compute/VeloxBackend.cc 
b/cpp/velox/compute/VeloxBackend.cc
index ada113f946..7ba2392f9b 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -33,10 +33,12 @@
 #ifdef GLUTEN_ENABLE_GPU
 #include "velox/experimental/cudf/exec/ToCudf.h"
 #endif
+
 #include "compute/VeloxRuntime.h"
 #include "config/VeloxConfig.h"
 #include "jni/JniFileSystem.h"
 #include "operators/functions/SparkExprToSubfieldFilterParser.h"
+#include "shuffle/ArrowShuffleDictionaryWriter.h"
 #include "udf/UdfLoader.h"
 #include "utils/Exception.h"
 #include "velox/common/caching/SsdCache.h"
@@ -210,6 +212,10 @@ void VeloxBackend::init(
   // local cache persistent relies on the cache pool from root memory pool so 
we need to init this
   // after the memory manager instanced
   initCache();
+
+  registerShuffleDictionaryWriterFactory([](MemoryManager* memoryManager, 
arrow::util::Codec* codec) {
+    return std::make_unique<ArrowShuffleDictionaryWriter>(memoryManager, 
codec);
+  });
 }
 
 facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() 
const {
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc 
b/cpp/velox/memory/VeloxMemoryManager.cc
index 669a72eb62..b0e3a799b3 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -172,8 +172,8 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
     try {
       listener_->allocationChanged(neededBytes);
     } catch (const std::exception&) {
-       VLOG(2) << "ListenableArbitrator growCapacityInternal failed, 
stacktrace: "
-               << velox::process::StackTrace().toString();
+      VLOG(2) << "ListenableArbitrator growCapacityInternal failed, 
stacktrace: "
+              << velox::process::StackTrace().toString();
       // if allocationChanged failed, we need to free the reclaimed bytes
       listener_->allocationChanged(-reclaimedFreeBytes);
       std::rethrow_exception(std::current_exception());
@@ -324,9 +324,13 @@ int64_t 
shrinkVeloxMemoryPool(velox::memory::MemoryManager* mm, velox::memory::M
 }
 } // namespace
 
-std::shared_ptr<arrow::MemoryPool> 
VeloxMemoryManager::createArrowMemoryPool(const std::string& name) {
+std::shared_ptr<arrow::MemoryPool> 
VeloxMemoryManager::getOrCreateArrowMemoryPool(const std::string& name) {
   std::lock_guard<std::mutex> l(mutex_);
-  VELOX_CHECK_EQ(arrowPools_.count(name), 0, "Arrow memory pool {} already 
exists", name);
+  if (const auto it = arrowPools_.find(name); it != arrowPools_.end()) {
+    auto pool = it->second.lock();
+    VELOX_CHECK_NOT_NULL(pool, "Arrow memory pool {} has been destructed", 
name);
+    return pool;
+  }
   auto pool = std::make_shared<ArrowMemoryPool>(
       blockListener_.get(), [this, name](arrow::MemoryPool* pool) { 
this->dropMemoryPool(name); });
   arrowPools_.emplace(name, pool);
diff --git a/cpp/velox/memory/VeloxMemoryManager.h 
b/cpp/velox/memory/VeloxMemoryManager.h
index 28c0f79614..15e298629e 100644
--- a/cpp/velox/memory/VeloxMemoryManager.h
+++ b/cpp/velox/memory/VeloxMemoryManager.h
@@ -83,7 +83,7 @@ class VeloxMemoryManager final : public MemoryManager {
     return defaultArrowPool_.get();
   }
 
-  std::shared_ptr<arrow::MemoryPool> createArrowMemoryPool(const std::string& 
name) override;
+  std::shared_ptr<arrow::MemoryPool> getOrCreateArrowMemoryPool(const 
std::string& name) override;
 
   const MemoryUsageStats collectMemoryUsageStats() const override;
 
diff --git a/cpp/velox/shuffle/ArrowShuffleDictionaryWriter.cc 
b/cpp/velox/shuffle/ArrowShuffleDictionaryWriter.cc
new file mode 100644
index 0000000000..fcb43de106
--- /dev/null
+++ b/cpp/velox/shuffle/ArrowShuffleDictionaryWriter.cc
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "shuffle/ArrowShuffleDictionaryWriter.h"
+#include "shuffle/Utils.h"
+#include "utils/VeloxArrowUtils.h"
+
+#include <arrow/array/builder_dict.h>
+
+namespace gluten {
+
+static constexpr double kDictionaryFactor = 0.5;
+
+using ArrowDictionaryIndexType = int32_t;
+
+template <typename T>
+using is_dictionary_binary_type =
+    std::integral_constant<bool, std::is_same_v<arrow::BinaryType, T> || 
std::is_same_v<arrow::StringType, T>>;
+
+template <typename T>
+using is_dictionary_primitive_type = std::integral_constant<
+    bool,
+    std::is_same_v<arrow::Int64Type, T> || std::is_same_v<arrow::DoubleType, 
T> ||
+        std::is_same_v<arrow::TimestampType, T>>;
+
+template <typename T, typename R = void>
+using enable_if_dictionary_type =
+    std::enable_if_t<is_dictionary_primitive_type<T>::value || 
is_dictionary_binary_type<T>::value, R>;
+
+namespace {
+arrow::Status writeDictionaryBuffer(
+    const uint8_t* data,
+    size_t size,
+    arrow::MemoryPool* pool,
+    arrow::util::Codec* codec,
+    arrow::io::OutputStream* out) {
+  ARROW_RETURN_NOT_OK(out->Write(&size, sizeof(size_t)));
+
+  if (size == 0) {
+    return arrow::Status::OK();
+  }
+
+  GLUTEN_DCHECK(data != nullptr, "Cannot write null data");
+
+  if (codec != nullptr) {
+    size_t compressedLength = codec->MaxCompressedLen(size, data);
+    ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateBuffer(compressedLength, 
pool));
+    ARROW_ASSIGN_OR_RAISE(
+        compressedLength, codec->Compress(size, data, compressedLength, 
buffer->mutable_data_as<uint8_t>()));
+
+    ARROW_RETURN_NOT_OK(out->Write(&compressedLength, sizeof(size_t)));
+    ARROW_RETURN_NOT_OK(out->Write(buffer->data_as<void>(), compressedLength));
+  } else {
+    ARROW_RETURN_NOT_OK(out->Write(data, size));
+  }
+
+  return arrow::Status::OK();
+}
+
+} // namespace
+
+template <typename ArrowType>
+class DictionaryStorageImpl : public ShuffleDictionaryStorage {
+ public:
+  using ValueType = typename ArrowType::c_type;
+
+  DictionaryStorageImpl(arrow::MemoryPool* pool, arrow::util::Codec* codec) : 
pool_(pool), codec_(codec) {
+    table_ = std::make_shared<arrow::internal::DictionaryMemoTable>(pool, 
std::make_shared<ArrowType>());
+  }
+
+  arrow::Result<ArrowDictionaryIndexType> getOrUpdate(const ValueType& value) {
+    ArrowDictionaryIndexType memoIndex;
+    ARROW_RETURN_NOT_OK(table_->GetOrInsert<ArrowType>(value, &memoIndex));
+    return memoIndex;
+  }
+
+  arrow::Status serialize(arrow::io::OutputStream* out) override {
+    std::shared_ptr<arrow::ArrayData> data;
+    ARROW_RETURN_NOT_OK(table_->GetArrayData(0, &data));
+
+    DLOG(INFO) << "ShuffleDictionaryStorage::serialize num elements: " << 
data->length;
+
+    ARROW_RETURN_IF(
+        data->buffers.size() != 2, arrow::Status::Invalid("Invalid dictionary 
for type: ", data->type->ToString()));
+
+    const auto& values = data->buffers[1];
+    ARROW_RETURN_NOT_OK(writeDictionaryBuffer(values->data(), values->size(), 
pool_, codec_, out));
+
+    return arrow::Status::OK();
+  }
+
+  int32_t size() const {
+    return table_->size();
+  }
+
+ private:
+  arrow::MemoryPool* pool_;
+  arrow::util::Codec* codec_;
+
+  std::shared_ptr<arrow::internal::DictionaryMemoTable> table_;
+};
+
+class BinaryShuffleDictionaryStorage : public ShuffleDictionaryStorage {
+ public:
+  using MemoType = arrow::LargeBinaryType;
+
+  BinaryShuffleDictionaryStorage(arrow::MemoryPool* pool, arrow::util::Codec* 
codec) : pool_(pool), codec_(codec) {
+    table_ = std::make_shared<arrow::internal::DictionaryMemoTable>(pool, 
std::make_shared<MemoType>());
+  }
+
+  arrow::Result<ArrowDictionaryIndexType> getOrUpdate(const std::string_view& 
view) const {
+    ArrowDictionaryIndexType memoIndex;
+    ARROW_RETURN_NOT_OK(table_->GetOrInsert<MemoType>(view, &memoIndex));
+    return memoIndex;
+  }
+
+  arrow::Status serialize(arrow::io::OutputStream* out) override {
+    std::shared_ptr<arrow::ArrayData> data;
+    ARROW_RETURN_NOT_OK(table_->GetArrayData(0, &data));
+
+    ARROW_RETURN_IF(data->buffers.size() != 3, arrow::Status::Invalid("Invalid 
dictionary for binary type"));
+
+    DLOG(INFO) << "BinaryShuffleDictionaryStorage::serialize num elements: " 
<< table_->size();
+
+    ARROW_ASSIGN_OR_RAISE(auto lengths, offsetToLength(data->length, 
data->buffers[1]));
+    ARROW_RETURN_NOT_OK(writeDictionaryBuffer(lengths->data(), 
lengths->size(), pool_, codec_, out));
+
+    const auto& values = data->buffers[2];
+    ARROW_RETURN_NOT_OK(writeDictionaryBuffer(values->data(), values->size(), 
pool_, codec_, out));
+
+    return arrow::Status::OK();
+  }
+
+  int32_t size() const {
+    return table_->size();
+  }
+
+ private:
+  arrow::Result<std::shared_ptr<arrow::Buffer>> offsetToLength(
+      size_t numElements,
+      const std::shared_ptr<arrow::Buffer>& offsets) const {
+    ARROW_ASSIGN_OR_RAISE(auto lengths, 
arrow::AllocateBuffer(sizeof(StringLengthType) * numElements, pool_));
+    auto* rawLengths = lengths->mutable_data_as<StringLengthType>();
+    const auto* rawOffsets = 
offsets->data_as<arrow::TypeTraits<MemoType>::OffsetType::c_type>();
+
+    for (auto i = 0; i < numElements; ++i) {
+      rawLengths[i] = static_cast<StringLengthType>(rawOffsets[i + 1] - 
rawOffsets[i]);
+    }
+
+    return lengths;
+  }
+
+  arrow::MemoryPool* pool_;
+  arrow::util::Codec* codec_;
+
+  std::shared_ptr<arrow::internal::DictionaryMemoTable> table_;
+};
+
+template <>
+class DictionaryStorageImpl<arrow::BinaryType> : public 
BinaryShuffleDictionaryStorage {
+ public:
+  DictionaryStorageImpl(arrow::MemoryPool* pool, arrow::util::Codec* codec)
+      : BinaryShuffleDictionaryStorage(pool, codec) {}
+};
+
+template <>
+class DictionaryStorageImpl<arrow::StringType> : public 
BinaryShuffleDictionaryStorage {
+ public:
+  DictionaryStorageImpl(arrow::MemoryPool* pool, arrow::util::Codec* codec)
+      : BinaryShuffleDictionaryStorage(pool, codec) {}
+};
+
+namespace {
+template <typename ArrowType>
+arrow::Result<std::shared_ptr<arrow::Buffer>> updateDictionary(
+    int32_t numRows,
+    const std::shared_ptr<arrow::Buffer>& validityBuffer,
+    const std::shared_ptr<arrow::Buffer>& valueBuffer,
+    const std::shared_ptr<arrow::Buffer>&,
+    const std::shared_ptr<DictionaryStorageImpl<ArrowType>>& dictionary,
+    arrow::MemoryPool* pool) {
+  ARROW_ASSIGN_OR_RAISE(auto indices, 
arrow::AllocateBuffer(sizeof(ArrowDictionaryIndexType) * numRows, pool));
+  auto rawIndices = indices->mutable_data_as<ArrowDictionaryIndexType>();
+
+  const auto* values = valueBuffer->data_as<typename ArrowType::c_type>();
+
+  if (validityBuffer != nullptr) {
+    const auto* nulls = validityBuffer->data();
+    for (auto i = 0; i < numRows; ++i) {
+      if (arrow::bit_util::GetBit(nulls, i)) {
+        ARROW_ASSIGN_OR_RAISE(*rawIndices, dictionary->getOrUpdate(values[i]));
+      }
+      ++rawIndices;
+    }
+  } else {
+    for (auto i = 0; i < numRows; ++i) {
+      ARROW_ASSIGN_OR_RAISE(*rawIndices++, dictionary->getOrUpdate(values[i]));
+    }
+  }
+
+  return indices;
+}
+
+arrow::Result<std::shared_ptr<arrow::Buffer>> updateDictionaryForBinary(
+    int32_t numRows,
+    const std::shared_ptr<arrow::Buffer>& validityBuffer,
+    const std::shared_ptr<arrow::Buffer>& lengthBuffer,
+    const std::shared_ptr<arrow::Buffer>& valueBuffer,
+    const std::shared_ptr<BinaryShuffleDictionaryStorage>& dictionary,
+    arrow::MemoryPool* pool) {
+  ARROW_ASSIGN_OR_RAISE(auto indices, 
arrow::AllocateBuffer(sizeof(ArrowDictionaryIndexType) * numRows, pool));
+  auto rawIndices = indices->mutable_data_as<ArrowDictionaryIndexType>();
+
+  const auto* lengths = lengthBuffer->data_as<uint32_t>();
+  const auto* values = valueBuffer->data_as<char>();
+
+  size_t offset = 0;
+
+  if (validityBuffer != nullptr) {
+    const auto* nulls = validityBuffer->data();
+    for (auto i = 0; i < numRows; ++i) {
+      if (arrow::bit_util::GetBit(nulls, i)) {
+        std::string_view view(values + offset, lengths[i]);
+        offset += lengths[i];
+        ARROW_ASSIGN_OR_RAISE(*rawIndices, dictionary->getOrUpdate(view));
+      }
+      ++rawIndices;
+    }
+  } else {
+    for (auto i = 0; i < numRows; ++i) {
+      std::string_view view(values + offset, lengths[i]);
+      offset += lengths[i];
+      ARROW_ASSIGN_OR_RAISE(*rawIndices++, dictionary->getOrUpdate(view));
+    }
+  }
+
+  return indices;
+}
+
+template <>
+arrow::Result<std::shared_ptr<arrow::Buffer>> 
updateDictionary<arrow::BinaryType>(
+    int32_t numRows,
+    const std::shared_ptr<arrow::Buffer>& validityBuffer,
+    const std::shared_ptr<arrow::Buffer>& lengthBuffer,
+    const std::shared_ptr<arrow::Buffer>& valueBuffer,
+    const std::shared_ptr<DictionaryStorageImpl<arrow::BinaryType>>& 
dictionary,
+    arrow::MemoryPool* pool) {
+  return updateDictionaryForBinary(numRows, validityBuffer, lengthBuffer, 
valueBuffer, dictionary, pool);
+}
+
+template <>
+arrow::Result<std::shared_ptr<arrow::Buffer>> 
updateDictionary<arrow::StringType>(
+    int32_t numRows,
+    const std::shared_ptr<arrow::Buffer>& validityBuffer,
+    const std::shared_ptr<arrow::Buffer>& lengthBuffer,
+    const std::shared_ptr<arrow::Buffer>& valueBuffer,
+    const std::shared_ptr<DictionaryStorageImpl<arrow::StringType>>& 
dictionary,
+    arrow::MemoryPool* pool) {
+  return updateDictionaryForBinary(numRows, validityBuffer, lengthBuffer, 
valueBuffer, dictionary, pool);
+}
+} // namespace
+
+class ValueUpdater {
+ public:
+  template <typename ArrowType>
+  enable_if_dictionary_type<ArrowType, arrow::Status> Visit(const ArrowType&) {
+    bool dictionaryExists = false;
+    std::shared_ptr<DictionaryStorageImpl<ArrowType>> dictionary;
+    if (const auto& it = writer->dictionaries_.find(fieldIdx); it != 
writer->dictionaries_.end()) {
+      dictionaryExists = true;
+      dictionary = 
std::dynamic_pointer_cast<DictionaryStorageImpl<ArrowType>>(it->second);
+    } else {
+      dictionary = 
std::make_shared<DictionaryStorageImpl<ArrowType>>(writer->dictionaryPool_.get(),
 writer->codec_);
+    }
+
+    ARROW_ASSIGN_OR_RAISE(
+        auto indices, updateDictionary<ArrowType>(numRows, nulls, values, 
binaryValues, dictionary, pool));
+
+    results.push_back(nulls);
+
+    // Discard dictionary.
+    if (!dictionaryExists && dictionary->size() > numRows * kDictionaryFactor) 
{
+      dictionaryCreated = false;
+      results.push_back(values);
+      if (binaryValues != nullptr) {
+        results.push_back(binaryValues);
+      }
+      return arrow::Status::OK();
+    }
+
+    if (!dictionaryExists) {
+      writer->dictionaries_[fieldIdx] = dictionary;
+    }
+
+    dictionaryCreated = true;
+    results.push_back(indices);
+
+    return arrow::Status::OK();
+  }
+
+  arrow::Status Visit(const arrow::Decimal128Type& type) {
+    // Only support short decimal.
+    return Visit(arrow::Int64Type());
+  }
+
+  arrow::Status Visit(const arrow::DataType& type) {
+    return arrow::Status::TypeError("Not implemented for type: ", 
type.ToString());
+  }
+
+  ArrowShuffleDictionaryWriter* writer;
+  arrow::MemoryPool* pool;
+  int32_t fieldIdx;
+  int32_t numRows;
+  std::shared_ptr<arrow::Buffer> nulls{nullptr};
+  std::shared_ptr<arrow::Buffer> values{nullptr};
+  std::shared_ptr<arrow::Buffer> binaryValues{nullptr};
+
+  std::vector<std::shared_ptr<arrow::Buffer>>& results;
+  bool& dictionaryCreated;
+};
+
+arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> 
ArrowShuffleDictionaryWriter::updateAndGet(
+    const std::shared_ptr<arrow::Schema>& schema,
+    int32_t numRows,
+    const std::vector<std::shared_ptr<arrow::Buffer>>& buffers) {
+  ARROW_RETURN_NOT_OK(initSchema(schema));
+
+  std::vector<std::shared_ptr<arrow::Buffer>> results;
+
+  size_t bufferIdx = 0;
+  for (auto i = 0; i < schema->num_fields(); ++i) {
+    switch (fieldTypes_[i]) {
+      case FieldType::kNull:
+      case FieldType::kComplex:
+        break;
+      case FieldType::kFixedWidth:
+        results.emplace_back(buffers[bufferIdx++]);
+        results.emplace_back(buffers[bufferIdx++]);
+        break;
+      case FieldType::kBinary:
+        results.emplace_back(buffers[bufferIdx++]);
+        results.emplace_back(buffers[bufferIdx++]);
+        results.emplace_back(buffers[bufferIdx++]);
+        break;
+      case FieldType::kSupportsDictionary: {
+        const auto fieldType = schema_->field(i)->type();
+        bool isBinaryType =
+            fieldType->id() == arrow::BinaryType::type_id || fieldType->id() 
== arrow::StringType::type_id;
+
+        bool isDictionaryCreated = false;
+        ValueUpdater valueUpdater{
+            this,
+            memoryManager_->defaultArrowMemoryPool(),
+            i,
+            numRows,
+            buffers[bufferIdx++],
+            buffers[bufferIdx++],
+            isBinaryType ? buffers[bufferIdx++] : nullptr,
+            results,
+            isDictionaryCreated};
+
+        ARROW_RETURN_NOT_OK(arrow::VisitTypeInline(*fieldType, &valueUpdater));
+
+        if (!isDictionaryCreated) {
+          ARROW_RETURN_NOT_OK(blackList(i));
+        }
+
+        break;
+      }
+    }
+  }
+
+  if (hasComplexType_) {
+    results.emplace_back(buffers[bufferIdx++]);
+  }
+
+  GLUTEN_DCHECK(bufferIdx == buffers.size(), "Not all buffers are consumed.");
+
+  return results;
+}
+
+arrow::Status ArrowShuffleDictionaryWriter::serialize(arrow::io::OutputStream* 
out) {
+  auto bitMapSize = 
arrow::bit_util::RoundUpToMultipleOf8(schema_->num_fields());
+  std::vector<uint8_t> bitMap(bitMapSize);
+
+  for (auto fieldIdx : dictionaryFields_) {
+    arrow::bit_util::SetBit(bitMap.data(), fieldIdx);
+  }
+
+  ARROW_RETURN_NOT_OK(out->Write(bitMap.data(), bitMapSize));
+
+  for (auto fieldIdx : dictionaryFields_) {
+    GLUTEN_DCHECK(
+        dictionaries_.find(fieldIdx) != dictionaries_.end(),
+        "Invalid dictionary field index: " + std::to_string(fieldIdx));
+
+    const auto& dictionary = dictionaries_[fieldIdx];
+    ARROW_RETURN_NOT_OK(dictionary->serialize(out));
+
+    dictionaries_.erase(fieldIdx);
+  }
+
+  return arrow::Status::OK();
+}
+
+int64_t ArrowShuffleDictionaryWriter::numDictionaryFields() {
+  return dictionaryFields_.size();
+}
+
+int64_t ArrowShuffleDictionaryWriter::getDictionarySize() {
+  return dictionaryPool_->bytes_allocated();
+}
+
+arrow::Status ArrowShuffleDictionaryWriter::initSchema(const 
std::shared_ptr<arrow::Schema>& schema) {
+  if (schema_ == nullptr) {
+    schema_ = schema;
+
+    rowType_ = fromArrowSchema(schema);
+    fieldTypes_.resize(rowType_->size());
+
+    for (auto i = 0; i < rowType_->size(); ++i) {
+      switch (rowType_->childAt(i)->kind()) {
+        case facebook::velox::TypeKind::UNKNOWN:
+          fieldTypes_[i] = FieldType::kNull;
+          break;
+        case facebook::velox::TypeKind::ARRAY:
+        case facebook::velox::TypeKind::MAP:
+        case facebook::velox::TypeKind::ROW:
+          fieldTypes_[i] = FieldType::kComplex;
+          hasComplexType_ = true;
+          break;
+        case facebook::velox::TypeKind::VARBINARY:
+        case facebook::velox::TypeKind::VARCHAR:
+        case facebook::velox::TypeKind::DOUBLE:
+        case facebook::velox::TypeKind::BIGINT:
+          fieldTypes_[i] = FieldType::kSupportsDictionary;
+          dictionaryFields_.emplace(i);
+          break;
+        default:
+          fieldTypes_[i] = FieldType::kFixedWidth;
+          break;
+      }
+    }
+  } else if (schema_ != schema) {
+    return arrow::Status::Invalid("Schema mismatch");
+  }
+  return arrow::Status::OK();
+}
+
+arrow::Status ArrowShuffleDictionaryWriter::blackList(int32_t fieldId) {
+  switch (const auto typeId = schema_->field(fieldId)->type()->id()) {
+    case arrow::BinaryType::type_id:
+    case arrow::StringType::type_id:
+      fieldTypes_[fieldId] = FieldType::kBinary;
+      break;
+    default: {
+      if (!arrow::is_fixed_width(typeId)) {
+        return arrow::Status::Invalid("Invalid field type: ", 
schema_->field(fieldId)->type()->ToString());
+      }
+      fieldTypes_[fieldId] = FieldType::kFixedWidth;
+      break;
+    }
+  }
+
+  dictionaryFields_.erase(fieldId);
+
+  return arrow::Status::OK();
+}
+} // namespace gluten
diff --git a/cpp/velox/shuffle/ArrowShuffleDictionaryWriter.h 
b/cpp/velox/shuffle/ArrowShuffleDictionaryWriter.h
new file mode 100644
index 0000000000..ef0efbb668
--- /dev/null
+++ b/cpp/velox/shuffle/ArrowShuffleDictionaryWriter.h
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "shuffle/Dictionary.h"
+
+#include "velox/type/Type.h"
+
+#include <arrow/buffer.h>
+#include <arrow/type.h>
+
+#include <set>
+
+namespace gluten {
+
+class ArrowShuffleDictionaryWriter final : public ShuffleDictionaryWriter {
+ public:
+  ArrowShuffleDictionaryWriter(MemoryManager* memoryManager, 
arrow::util::Codec* codec)
+      : memoryManager_(memoryManager), codec_(codec) {
+    dictionaryPool_ = 
memoryManager->getOrCreateArrowMemoryPool("ArrowShuffleDictionaryWriter.dictionary");
+  }
+
+  arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> updateAndGet(
+      const std::shared_ptr<arrow::Schema>& schema,
+      int32_t numRows,
+      const std::vector<std::shared_ptr<arrow::Buffer>>& buffers) override;
+
+  arrow::Status serialize(arrow::io::OutputStream* out) override;
+
+  int64_t numDictionaryFields() override;
+
+  int64_t getDictionarySize() override;
+
+ private:
+  enum class FieldType { kNull, kFixedWidth, kBinary, kComplex, 
kSupportsDictionary };
+
+  arrow::Status initSchema(const std::shared_ptr<arrow::Schema>& schema);
+
+  arrow::Status blackList(int32_t fieldId);
+
+  MemoryManager* memoryManager_;
+  // Used to count the memory allocation for dictionary data.
+  std::shared_ptr<arrow::MemoryPool> dictionaryPool_;
+
+  arrow::util::Codec* codec_;
+
+  std::shared_ptr<arrow::Schema> schema_{nullptr};
+  facebook::velox::TypePtr rowType_{nullptr};
+  std::vector<FieldType> fieldTypes_;
+  std::set<int32_t> dictionaryFields_;
+  bool hasComplexType_{false};
+  std::unordered_map<int32_t, std::shared_ptr<ShuffleDictionaryStorage>> 
dictionaries_;
+
+  friend class ValueUpdater;
+};
+} // namespace gluten
\ No newline at end of file
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc 
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index ae956aa49b..46fb1716fe 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -41,6 +41,17 @@ using namespace facebook::velox;
 
 namespace gluten {
 namespace {
+
+arrow::Result<BlockType> readBlockType(arrow::io::InputStream* inputStream) {
+  BlockType type;
+  ARROW_ASSIGN_OR_RAISE(auto bytes, inputStream->Read(sizeof(BlockType), 
&type));
+  if (bytes == 0) {
+    // Reach EOS.
+    return BlockType::kEndOfStream;
+  }
+  return type;
+}
+
 struct BufferViewReleaser {
   BufferViewReleaser() : BufferViewReleaser(nullptr) {}
 
@@ -66,23 +77,25 @@ BufferPtr 
convertToVeloxBuffer(std::shared_ptr<arrow::Buffer> buffer) {
   return wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer);
 }
 
-template <TypeKind kind>
+template <TypeKind Kind, typename T = typename TypeTraits<Kind>::NativeType>
 VectorPtr readFlatVector(
     std::vector<BufferPtr>& buffers,
     int32_t& bufferIdx,
     uint32_t length,
     std::shared_ptr<const Type> type,
+    const VectorPtr& dictionary,
     memory::MemoryPool* pool) {
   auto nulls = buffers[bufferIdx++];
-  auto values = buffers[bufferIdx++];
-  std::vector<BufferPtr> stringBuffers;
-  using T = typename TypeTraits<kind>::NativeType;
-  if (nulls == nullptr || nulls->size() == 0) {
-    return std::make_shared<FlatVector<T>>(
-        pool, type, BufferPtr(nullptr), length, std::move(values), 
std::move(stringBuffers));
+  auto valuesOrIndices = buffers[bufferIdx++];
+
+  nulls = nulls == nullptr || nulls->size() == 0 ? BufferPtr(nullptr) : nulls;
+
+  if (dictionary != nullptr) {
+    return BaseVector::wrapInDictionary(nulls, valuesOrIndices, length, 
dictionary);
   }
+
   return std::make_shared<FlatVector<T>>(
-      pool, type, std::move(nulls), length, std::move(values), 
std::move(stringBuffers));
+      pool, type, nulls, length, std::move(valuesOrIndices), 
std::vector<BufferPtr>{});
 }
 
 template <>
@@ -91,6 +104,7 @@ VectorPtr readFlatVector<TypeKind::UNKNOWN>(
     int32_t& bufferIdx,
     uint32_t length,
     std::shared_ptr<const Type> type,
+    const VectorPtr& dictionary,
     memory::MemoryPool* pool) {
   return BaseVector::createNullConstant(type, length, pool);
 }
@@ -101,27 +115,28 @@ VectorPtr readFlatVector<TypeKind::HUGEINT>(
     int32_t& bufferIdx,
     uint32_t length,
     std::shared_ptr<const Type> type,
+    const VectorPtr& dictionary,
     memory::MemoryPool* pool) {
   auto nulls = buffers[bufferIdx++];
-  auto valueBuffer = buffers[bufferIdx++];
+  auto valuesOrIndices = buffers[bufferIdx++];
+
   // Because if buffer does not compress, it will get from netty, the address 
maynot aligned 16B, which will cause
   // int128_t = xxx coredump by instruction movdqa
-  auto data = valueBuffer->as<int128_t>();
-  BufferPtr values;
-  if ((reinterpret_cast<uintptr_t>(data) & 0xf) == 0) {
-    values = valueBuffer;
-  } else {
-    values = AlignedBuffer::allocate<char>(valueBuffer->size(), pool);
-    gluten::fastCopy(values->asMutable<char>(), valueBuffer->as<char>(), 
valueBuffer->size());
+  const auto* addr = valuesOrIndices->as<facebook::velox::int128_t>();
+  if ((reinterpret_cast<uintptr_t>(addr) & 0xf) != 0) {
+    auto alignedBuffer = 
AlignedBuffer::allocate<char>(valuesOrIndices->size(), pool);
+    fastCopy(alignedBuffer->asMutable<char>(), valuesOrIndices->as<char>(), 
valuesOrIndices->size());
+    valuesOrIndices = alignedBuffer;
   }
-  std::vector<BufferPtr> stringBuffers;
-  if (nulls == nullptr || nulls->size() == 0) {
-    auto vp = std::make_shared<FlatVector<int128_t>>(
-        pool, type, BufferPtr(nullptr), length, std::move(values), 
std::move(stringBuffers));
-    return vp;
+
+  nulls = nulls == nullptr || nulls->size() == 0 ? BufferPtr(nullptr) : nulls;
+
+  if (dictionary != nullptr) {
+    return BaseVector::wrapInDictionary(nulls, valuesOrIndices, length, 
dictionary);
   }
+
   return std::make_shared<FlatVector<int128_t>>(
-      pool, type, std::move(nulls), length, std::move(values), 
std::move(stringBuffers));
+      pool, type, nulls, length, std::move(valuesOrIndices), 
std::vector<BufferPtr>{});
 }
 
 VectorPtr readFlatVectorStringView(
@@ -129,28 +144,36 @@ VectorPtr readFlatVectorStringView(
     int32_t& bufferIdx,
     uint32_t length,
     std::shared_ptr<const Type> type,
+    const VectorPtr& dictionary,
     memory::MemoryPool* pool) {
   auto nulls = buffers[bufferIdx++];
-  auto lengthBuffer = buffers[bufferIdx++];
+  auto lengthOrIndices = buffers[bufferIdx++];
+
+  nulls = nulls == nullptr || nulls->size() == 0 ? BufferPtr(nullptr) : nulls;
+
+  if (dictionary != nullptr) {
+    return BaseVector::wrapInDictionary(nulls, lengthOrIndices, length, 
dictionary);
+  }
+
   auto valueBuffer = buffers[bufferIdx++];
-  const auto* rawLength = lengthBuffer->as<StringLengthType>();
 
-  std::vector<BufferPtr> stringBuffers;
+  const auto* rawLength = lengthOrIndices->as<StringLengthType>();
+  const auto* valueBufferPtr = valueBuffer->as<char>();
+
   auto values = AlignedBuffer::allocate<char>(sizeof(StringView) * length, 
pool);
-  auto rawValues = values->asMutable<StringView>();
-  auto rawChars = valueBuffer->as<char>();
+  auto* rawValues = values->asMutable<StringView>();
+
   uint64_t offset = 0;
   for (int32_t i = 0; i < length; ++i) {
-    rawValues[i] = StringView(rawChars + offset, rawLength[i]);
+    rawValues[i] = StringView(valueBufferPtr + offset, rawLength[i]);
     offset += rawLength[i];
   }
+
+  std::vector<BufferPtr> stringBuffers;
   stringBuffers.emplace_back(valueBuffer);
-  if (nulls == nullptr || nulls->size() == 0) {
-    return std::make_shared<FlatVector<StringView>>(
-        pool, type, BufferPtr(nullptr), length, std::move(values), 
std::move(stringBuffers));
-  }
+
   return std::make_shared<FlatVector<StringView>>(
-      pool, type, std::move(nulls), length, std::move(values), 
std::move(stringBuffers));
+      pool, type, nulls, length, std::move(values), std::move(stringBuffers));
 }
 
 template <>
@@ -159,8 +182,9 @@ VectorPtr readFlatVector<TypeKind::VARCHAR>(
     int32_t& bufferIdx,
     uint32_t length,
     std::shared_ptr<const Type> type,
+    const VectorPtr& dictionary,
     memory::MemoryPool* pool) {
-  return readFlatVectorStringView(buffers, bufferIdx, length, type, pool);
+  return readFlatVectorStringView(buffers, bufferIdx, length, type, 
dictionary, pool);
 }
 
 template <>
@@ -169,8 +193,9 @@ VectorPtr readFlatVector<TypeKind::VARBINARY>(
     int32_t& bufferIdx,
     uint32_t length,
     std::shared_ptr<const Type> type,
+    const VectorPtr& dictionary,
     memory::MemoryPool* pool) {
-  return readFlatVectorStringView(buffers, bufferIdx, length, type, pool);
+  return readFlatVectorStringView(buffers, bufferIdx, length, type, 
dictionary, pool);
 }
 
 std::unique_ptr<ByteInputStream> toByteStream(uint8_t* data, int32_t size) {
@@ -209,42 +234,47 @@ RowTypePtr getComplexWriteType(const 
std::vector<TypePtr>& types) {
   return std::make_shared<const RowType>(std::move(complexTypeColNames), 
std::move(complexTypeChildrens));
 }
 
-void readColumns(
-    std::vector<BufferPtr>& buffers,
-    memory::MemoryPool* pool,
+RowVectorPtr deserialize(
+    RowTypePtr type,
     uint32_t numRows,
-    const std::vector<TypePtr>& types,
-    std::vector<VectorPtr>& result) {
-  int32_t bufferIdx = 0;
+    std::vector<BufferPtr>& buffers,
+    const std::vector<int32_t>& dictionaryFields,
+    const std::vector<VectorPtr>& dictionaries,
+    memory::MemoryPool* pool) {
+  std::vector<VectorPtr> children;
+  auto types = type->as<TypeKind::ROW>().children();
+
   std::vector<VectorPtr> complexChildren;
   auto complexRowType = getComplexWriteType(types);
   if (complexRowType->children().size() > 0) {
     complexChildren = readComplexType(buffers[buffers.size() - 1], 
complexRowType, pool)->children();
   }
 
+  int32_t bufferIdx = 0;
   int32_t complexIdx = 0;
-  for (int32_t i = 0; i < types.size(); ++i) {
-    auto kind = types[i]->kind();
+  int32_t dictionaryIdx = 0;
+  for (size_t i = 0; i < types.size(); ++i) {
+    const auto kind = types[i]->kind();
     switch (kind) {
       case TypeKind::ROW:
       case TypeKind::MAP:
       case TypeKind::ARRAY: {
-        result.emplace_back(std::move(complexChildren[complexIdx]));
+        children.emplace_back(std::move(complexChildren[complexIdx]));
         complexIdx++;
       } break;
       default: {
+        VectorPtr dictionary{nullptr};
+        if (!dictionaryFields.empty() && dictionaryIdx < 
dictionaryFields.size() &&
+            dictionaryFields[dictionaryIdx] == i) {
+          dictionary = dictionaries[dictionaryIdx++];
+        }
         auto res = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
-            readFlatVector, types[i]->kind(), buffers, bufferIdx, numRows, 
types[i], pool);
-        result.emplace_back(std::move(res));
+            readFlatVector, kind, buffers, bufferIdx, numRows, types[i], 
dictionary, pool);
+        children.emplace_back(std::move(res));
       } break;
     }
   }
-}
 
-RowVectorPtr deserialize(RowTypePtr type, uint32_t numRows, 
std::vector<BufferPtr>& buffers, memory::MemoryPool* pool) {
-  std::vector<VectorPtr> children;
-  auto childTypes = type->as<TypeKind::ROW>().children();
-  readColumns(buffers, pool, numRows, childTypes, children);
   return std::make_shared<RowVector>(pool, type, BufferPtr(nullptr), numRows, 
children);
 }
 
@@ -252,6 +282,8 @@ std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
     RowTypePtr type,
     uint32_t numRows,
     std::vector<std::shared_ptr<arrow::Buffer>> arrowBuffers,
+    const std::vector<int32_t>& dictionaryFields,
+    const std::vector<VectorPtr>& dictionaries,
     memory::MemoryPool* pool,
     int64_t& deserializeTime) {
   ScopedTimer timer(&deserializeTime);
@@ -260,7 +292,7 @@ std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
   for (auto& buffer : arrowBuffers) {
     veloxBuffers.push_back(convertToVeloxBuffer(std::move(buffer)));
   }
-  auto rowVector = deserialize(type, numRows, veloxBuffers, pool);
+  auto rowVector = deserialize(type, numRows, veloxBuffers, dictionaryFields, 
dictionaries, pool);
   return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
 }
 
@@ -277,11 +309,149 @@ std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
     GLUTEN_ASSIGN_OR_THROW(auto buffer, payload->readBufferAt(i));
     veloxBuffers.push_back(convertToVeloxBuffer(std::move(buffer)));
   }
-  auto rowVector = deserialize(type, payload->numRows(), veloxBuffers, pool);
+  auto rowVector = deserialize(type, payload->numRows(), veloxBuffers, {}, {}, 
pool);
   return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
 }
+
+arrow::Result<BufferPtr>
+readDictionaryBuffer(arrow::io::InputStream* in, 
facebook::velox::memory::MemoryPool* pool, arrow::util::Codec* codec) {
+  size_t bufferSize;
+
+  ARROW_RETURN_NOT_OK(in->Read(sizeof(bufferSize), &bufferSize));
+  auto buffer = facebook::velox::AlignedBuffer::allocate<char>(bufferSize, 
pool, std::nullopt, true);
+
+  if (bufferSize == 0) {
+    return buffer;
+  }
+
+  if (codec != nullptr) {
+    size_t compressedSize;
+    ARROW_RETURN_NOT_OK(in->Read(sizeof(compressedSize), &compressedSize));
+    auto compressedBuffer = 
facebook::velox::AlignedBuffer::allocate<char>(compressedSize, pool, 
std::nullopt, true);
+    ARROW_RETURN_NOT_OK(in->Read(compressedSize, 
compressedBuffer->asMutable<void>()));
+    ARROW_ASSIGN_OR_RAISE(
+        auto decompressedSize,
+        codec->Decompress(compressedSize, compressedBuffer->as<uint8_t>(), 
bufferSize, buffer->asMutable<uint8_t>()));
+    ARROW_RETURN_IF(
+        decompressedSize != bufferSize,
+        arrow::Status::IOError(
+            fmt::format("Decompressed size doesn't equal to original size: ({} 
vs {})", decompressedSize, bufferSize)));
+  } else {
+    ARROW_RETURN_NOT_OK(in->Read(bufferSize, buffer->asMutable<void>()));
+  }
+  return buffer;
+}
+
+arrow::Result<VectorPtr> readDictionaryForBinary(
+    arrow::io::InputStream* in,
+    const TypePtr& type,
+    facebook::velox::memory::MemoryPool* pool,
+    arrow::util::Codec* codec) {
+  // Read length buffer.
+  ARROW_ASSIGN_OR_RAISE(auto lengthBuffer, readDictionaryBuffer(in, pool, 
codec));
+  const auto* lengthBufferPtr = lengthBuffer->as<StringLengthType>();
+
+  // Read value buffer.
+  ARROW_ASSIGN_OR_RAISE(auto valueBuffer, readDictionaryBuffer(in, pool, 
codec));
+  const auto* valueBufferPtr = valueBuffer->as<char>();
+
+  // Build StringViews.
+  const auto numElements = lengthBuffer->size() / sizeof(StringLengthType);
+  auto values = AlignedBuffer::allocate<char>(sizeof(StringView) * 
numElements, pool, std::nullopt, true);
+  auto* rawValues = values->asMutable<StringView>();
+
+  uint64_t offset = 0;
+  for (size_t i = 0; i < numElements; ++i) {
+    rawValues[i] = StringView(valueBufferPtr + offset, lengthBufferPtr[i]);
+    offset += lengthBufferPtr[i];
+  }
+
+  std::vector<BufferPtr> stringBuffers;
+  stringBuffers.emplace_back(valueBuffer);
+
+  return std::make_shared<FlatVector<StringView>>(
+      pool, type, BufferPtr(nullptr), numElements, std::move(values), 
std::move(stringBuffers));
+}
+
+template <TypeKind Kind, typename NativeType = typename 
TypeTraits<Kind>::NativeType>
+arrow::Result<VectorPtr> readDictionary(
+    arrow::io::InputStream* in,
+    const TypePtr& type,
+    facebook::velox::memory::MemoryPool* pool,
+    arrow::util::Codec* codec) {
+  ARROW_ASSIGN_OR_RAISE(auto buffer, readDictionaryBuffer(in, pool, codec));
+
+  const auto numElements = buffer->size() / sizeof(NativeType);
+
+  return std::make_shared<FlatVector<NativeType>>(
+      pool, type, BufferPtr(nullptr), numElements, std::move(buffer), 
std::vector<BufferPtr>{});
+}
+
+template <>
+arrow::Result<VectorPtr> readDictionary<TypeKind::VARCHAR>(
+    arrow::io::InputStream* in,
+    const TypePtr& type,
+    facebook::velox::memory::MemoryPool* pool,
+    arrow::util::Codec* codec) {
+  return readDictionaryForBinary(in, type, pool, codec);
+}
+
+template <>
+arrow::Result<VectorPtr> readDictionary<TypeKind::VARBINARY>(
+    arrow::io::InputStream* in,
+    const TypePtr& type,
+    facebook::velox::memory::MemoryPool* pool,
+    arrow::util::Codec* codec) {
+  return readDictionaryForBinary(in, type, pool, codec);
+}
+
 } // namespace
 
+class VeloxDictionaryReader {
+ public:
+  VeloxDictionaryReader(
+      const facebook::velox::RowTypePtr& rowType,
+      facebook::velox::memory::MemoryPool* veloxPool,
+      arrow::util::Codec* codec)
+      : rowType_(rowType), veloxPool_(veloxPool), codec_(codec) {}
+
+  arrow::Result<std::vector<int32_t>> readFields(arrow::io::InputStream* in) 
const {
+    // Read bitmap.
+    auto bitMapSize = arrow::bit_util::RoundUpToMultipleOf8(rowType_->size());
+    std::vector<uint8_t> bitMap(bitMapSize);
+
+    RETURN_NOT_OK(in->Read(bitMapSize, bitMap.data()));
+
+    std::vector<int32_t> fields;
+    for (auto i = 0; i < rowType_->size(); ++i) {
+      if (arrow::bit_util::GetBit(bitMap.data(), i)) {
+        fields.push_back(i);
+      }
+    }
+
+    return fields;
+  }
+
+  arrow::Result<std::vector<VectorPtr>> 
readDictionaries(arrow::io::InputStream* in, const std::vector<int32_t>& fields)
+      const {
+    // Read dictionary buffers.
+    std::vector<VectorPtr> dictionaries;
+    for (const auto i : fields) {
+      auto dictionary = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(
+          readDictionary, rowType_->childAt(i)->kind(), in, 
rowType_->childAt(i), veloxPool_, codec_);
+      dictionaries.emplace_back();
+      ARROW_ASSIGN_OR_RAISE(dictionaries.back(), dictionary);
+    }
+
+    return dictionaries;
+  }
+
+ private:
+  facebook::velox::RowTypePtr rowType_;
+  facebook::velox::memory::MemoryPool* veloxPool_;
+  arrow::util::Codec* codec_;
+};
+
 VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer(
     std::shared_ptr<arrow::io::InputStream> in,
     const std::shared_ptr<arrow::Schema>& schema,
@@ -308,19 +478,71 @@ 
VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer(
   GLUTEN_ASSIGN_OR_THROW(in_, 
arrow::io::BufferedInputStream::Create(bufferSize, memoryPool, std::move(in)));
 }
 
+bool VeloxHashShuffleReaderDeserializer::shouldSkipMerge() const {
+  // Complex type or dictionary encodings do not support merging.
+  return hasComplexType_ || !dictionaryFields_.empty();
+}
+
+void VeloxHashShuffleReaderDeserializer::resolveNextBlockType() {
+  if (blockTypeResolved_) {
+    return;
+  }
+
+  blockTypeResolved_ = true;
+
+  GLUTEN_ASSIGN_OR_THROW(auto blockType, readBlockType(in_.get()));
+  switch (blockType) {
+    case BlockType::kEndOfStream:
+      reachEos_ = true;
+      break;
+    case BlockType::kDictionary: {
+      VeloxDictionaryReader reader(rowType_, veloxPool_, codec_.get());
+      GLUTEN_ASSIGN_OR_THROW(dictionaryFields_, reader.readFields(in_.get()));
+      GLUTEN_ASSIGN_OR_THROW(dictionaries_, reader.readDictionaries(in_.get(), 
dictionaryFields_));
+
+      GLUTEN_ASSIGN_OR_THROW(blockType, readBlockType(in_.get()));
+      GLUTEN_CHECK(blockType == BlockType::kDictionaryPayload, "Invalid block 
type for dictionary payload");
+    } break;
+    case BlockType::kDictionaryPayload: {
+      GLUTEN_CHECK(
+          !dictionaryFields_.empty() && !dictionaries_.empty(),
+          "Dictionaries cannot be empty when reading dictionary payload");
+    } break;
+    case BlockType::kPlainPayload: {
+      if (!dictionaryFields_.empty()) {
+        // Clear previous dictionaries if the next block is a plain payload.
+        dictionaryFields_.clear();
+        dictionaries_.clear();
+      }
+    } break;
+  }
+}
+
 std::shared_ptr<ColumnarBatch> VeloxHashShuffleReaderDeserializer::next() {
-  if (hasComplexType_) {
+  resolveNextBlockType();
+
+  if (shouldSkipMerge()) {
+    // We have leftover rows from the last mergeable read.
+    if (merged_) {
+      return makeColumnarBatch(rowType_, std::move(merged_), veloxPool_, 
deserializeTime_);
+    }
+
+    if (reachEos_) {
+      return nullptr;
+    }
+
     uint32_t numRows = 0;
     GLUTEN_ASSIGN_OR_THROW(
         auto arrowBuffers,
         BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows, 
deserializeTime_, decompressTime_));
-    if (arrowBuffers.empty()) {
-      // Reach EOS.
-      return nullptr;
-    }
-    return makeColumnarBatch(rowType_, numRows, std::move(arrowBuffers), 
veloxPool_, deserializeTime_);
+
+    blockTypeResolved_ = false;
+
+    return makeColumnarBatch(
+        rowType_, numRows, std::move(arrowBuffers), dictionaryFields_, 
dictionaries_, veloxPool_, deserializeTime_);
   }
 
+  // TODO: Remove merging.
   if (reachEos_) {
     if (merged_) {
       return makeColumnarBatch(rowType_, std::move(merged_), veloxPool_, 
deserializeTime_);
@@ -331,13 +553,19 @@ std::shared_ptr<ColumnarBatch> 
VeloxHashShuffleReaderDeserializer::next() {
   std::vector<std::shared_ptr<arrow::Buffer>> arrowBuffers{};
   uint32_t numRows = 0;
   while (!merged_ || merged_->numRows() < batchSize_) {
+    resolveNextBlockType();
+
+    // Break the merging loop once we reach EOS or read a dictionary block.
+    if (reachEos_ || !dictionaryFields_.empty()) {
+      break;
+    }
+
     GLUTEN_ASSIGN_OR_THROW(
         arrowBuffers,
         BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows, 
deserializeTime_, decompressTime_));
-    if (arrowBuffers.empty()) {
-      reachEos_ = true;
-      break;
-    }
+
+    blockTypeResolved_ = false;
+
     if (!merged_) {
       merged_ = std::make_unique<InMemoryPayload>(numRows, isValidityBuffer_, 
schema_, std::move(arrowBuffers));
       arrowBuffers.clear();
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h 
b/cpp/velox/shuffle/VeloxShuffleReader.h
index ea9e78e181..4eae874765 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -46,6 +46,10 @@ class VeloxHashShuffleReaderDeserializer final : public 
ColumnarBatchIterator {
   std::shared_ptr<ColumnarBatch> next() override;
 
  private:
+  bool shouldSkipMerge() const;
+
+  void resolveNextBlockType();
+
   std::shared_ptr<arrow::io::InputStream> in_;
   std::shared_ptr<arrow::Schema> schema_;
   std::shared_ptr<arrow::util::Codec> codec_;
@@ -61,6 +65,10 @@ class VeloxHashShuffleReaderDeserializer final : public 
ColumnarBatchIterator {
 
   std::unique_ptr<InMemoryPayload> merged_{nullptr};
   bool reachEos_{false};
+  bool blockTypeResolved_{false};
+
+  std::vector<int32_t> dictionaryFields_{};
+  std::vector<facebook::velox::VectorPtr> dictionaries_{};
 };
 
 class VeloxSortShuffleReaderDeserializer final : public ColumnarBatchIterator {
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h 
b/cpp/velox/shuffle/VeloxShuffleWriter.h
index 8a7b0bf187..58131442ad 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.h
@@ -120,7 +120,7 @@ class VeloxShuffleWriter : public ShuffleWriter {
       const std::shared_ptr<ShuffleWriterOptions>& options,
       MemoryManager* memoryManager)
       : ShuffleWriter(numPartitions, options->partitioning),
-        
partitionBufferPool_(memoryManager->createArrowMemoryPool("VeloxShuffleWriter.partitionBufferPool")),
+        
partitionBufferPool_(memoryManager->getOrCreateArrowMemoryPool("VeloxShuffleWriter.partitionBufferPool")),
         
veloxPool_(dynamic_cast<VeloxMemoryManager*>(memoryManager)->getLeafMemoryPool()),
         partitionWriter_(partitionWriter) {
     partitioner_ = Partitioner::make(options->partitioning, numPartitions_, 
options->startPartitionId);
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index 2086012e5c..ec9aee855b 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -30,7 +30,7 @@ class DummyMemoryManager final : public MemoryManager {
   arrow::MemoryPool* defaultArrowMemoryPool() override {
     throw GlutenException("Not yet implemented");
   }
-  std::shared_ptr<arrow::MemoryPool> createArrowMemoryPool(const std::string& 
name) override {
+  std::shared_ptr<arrow::MemoryPool> getOrCreateArrowMemoryPool(const 
std::string& name) override {
     throw GlutenException("Not yet implemented");
   }
   const MemoryUsageStats collectMemoryUsageStats() const override {
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index cb6f1b0feb..4d56e094b9 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -42,6 +42,7 @@ struct ShuffleTestParams {
   int32_t mergeBufferSize{0};
   int32_t diskWriteBufferSize{0};
   bool useRadixSort{false};
+  bool enableDictionary{false};
 
   std::string toString() const {
     std::ostringstream out;
@@ -50,7 +51,8 @@ struct ShuffleTestParams {
         << ", compressionType = " << 
arrow::util::Codec::GetCodecAsString(compressionType)
         << ", compressionThreshold = " << compressionThreshold << ", 
mergeBufferSize = " << mergeBufferSize
         << ", compressionBufferSize = " << diskWriteBufferSize
-        << ", useRadixSort = " << (useRadixSort ? "true" : "false");
+        << ", useRadixSort = " << (useRadixSort ? "true" : "false")
+        << ", enableDictionary = " << (enableDictionary ? "true" : "false");
     return out.str();
   }
 };
@@ -124,12 +126,15 @@ std::vector<ShuffleTestParams> getTestParams() {
     for (const auto compressionThreshold : compressionThresholds) {
       // Local.
       for (const auto mergeBufferSize : mergeBufferSizes) {
-        params.push_back(ShuffleTestParams{
-            .shuffleWriterType = ShuffleWriterType::kHashShuffle,
-            .partitionWriterType = PartitionWriterType::kLocal,
-            .compressionType = compression,
-            .compressionThreshold = compressionThreshold,
-            .mergeBufferSize = mergeBufferSize});
+        for (const bool enableDictionary : {true, false}) {
+          params.push_back(ShuffleTestParams{
+              .shuffleWriterType = ShuffleWriterType::kHashShuffle,
+              .partitionWriterType = PartitionWriterType::kLocal,
+              .compressionType = compression,
+              .compressionThreshold = compressionThreshold,
+              .mergeBufferSize = mergeBufferSize,
+              .enableDictionary = enableDictionary});
+        }
       }
 
       // Rss.
@@ -151,13 +156,15 @@ std::shared_ptr<PartitionWriter> createPartitionWriter(
     const std::vector<std::string>& localDirs,
     arrow::Compression::type compressionType,
     int32_t mergeBufferSize,
-    int32_t compressionThreshold) {
+    int32_t compressionThreshold,
+    bool enableDictionary) {
   GLUTEN_ASSIGN_OR_THROW(auto codec, 
arrow::util::Codec::Create(compressionType));
   switch (partitionWriterType) {
     case PartitionWriterType::kLocal: {
       auto options = std::make_shared<LocalPartitionWriterOptions>();
       options->mergeBufferSize = mergeBufferSize;
       options->compressionThreshold = compressionThreshold;
+      options->enableDictionary = enableDictionary;
       return std::make_shared<LocalPartitionWriter>(
           numPartitions, std::move(codec), getDefaultMemoryManager(), options, 
dataFile, std::move(localDirs));
     }
@@ -227,23 +234,21 @@ class VeloxShuffleWriterTest : public 
::testing::TestWithParam<ShuffleTestParams
       shuffleWriterOptions = defaultShuffleWriterOptions();
     }
 
+    const auto& params = GetParam();
     const auto partitionWriter = createPartitionWriter(
-        GetParam().partitionWriterType,
+        params.partitionWriterType,
         numPartitions,
         dataFile_,
         localDirs_,
-        GetParam().compressionType,
-        GetParam().mergeBufferSize,
-        GetParam().compressionThreshold);
+        params.compressionType,
+        params.mergeBufferSize,
+        params.compressionThreshold,
+        params.enableDictionary);
 
     GLUTEN_ASSIGN_OR_THROW(
         auto shuffleWriter,
         VeloxShuffleWriter::create(
-            GetParam().shuffleWriterType,
-            numPartitions,
-            partitionWriter,
-            shuffleWriterOptions,
-            getDefaultMemoryManager()));
+            params.shuffleWriterType, numPartitions, partitionWriter, 
shuffleWriterOptions, getDefaultMemoryManager()));
 
     return shuffleWriter;
   }
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 8fb73131c9..2ade39267c 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -53,6 +53,7 @@ You can add these configurations into spark-defaults.conf to 
enable or disable t
 | spark.gluten.sql.columnar.shuffle.merge.threshold             | Set the 
threshold control the minimum merged size. When a partition buffer is full, and 
the number of rows is below (`threshold * 
spark.gluten.sql.columnar.maxBatchSize`), it will be saved for merging.         
                                                                                
                                                                                
                                                         [...]
 | spark.gluten.sql.columnar.shuffle.readerBufferSize            | Buffer size 
in bytes for shuffle reader reading input stream from local or remote.          
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
 | spark.gluten.sql.columnar.shuffle.sort.deserializerBufferSize | Buffer size 
in bytes for sort-based shuffle reader deserializing raw input to columnar 
batch.                                                                          
                                                                                
                                                                                
                                                                                
                    [...]
+| spark.gluten.sql.columnar.shuffle.dictionary.enabled          | Enable 
dictionary in hash-based shuffle.                                               
                                                                                
                                                                                
                                                                                
                                                                                
                    [...]
 | spark.gluten.sql.columnar.numaBinding                         | Set up 
NUMABinding, default is false                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                    [...]
 | spark.gluten.sql.columnar.coreRange                           | Set up the 
core range for NUMABinding, only works when numaBinding set to true. <br /> The 
setting is based on the number of cores in your system. Use 72 cores as an 
example.                                                                        
                                                                                
                                                                                
                     [...]
 | spark.gluten.sql.columnar.wholeStage.fallback.threshold       | Configure 
the threshold for whether whole stage will fall back in AQE supported case by 
counting the number of ColumnarToRow & vanilla leaf node                        
                                                                                
                                                                                
                                                                                
                   [...]
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
index 97459f82d6..eabc975e02 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
@@ -29,6 +29,8 @@ public class GlutenSplitResult {
   private final long peakBytes;
   private final long sortTime;
   private final long c2rTime;
+  private final double avgDictionaryFields;
+  private final long dictionarySize;
 
   public GlutenSplitResult(
       long totalComputePidTime,
@@ -41,6 +43,8 @@ public class GlutenSplitResult {
       long totalBytesEvicted,
       long totalBytesToEvict, // In-memory bytes(uncompressed) before spill.
       long peakBytes,
+      double avgDictionaryFields,
+      long dictionarySize,
       long[] partitionLengths,
       long[] rawPartitionLengths) {
     this.totalComputePidTime = totalComputePidTime;
@@ -55,6 +59,8 @@ public class GlutenSplitResult {
     this.peakBytes = peakBytes;
     this.sortTime = totalSortTime;
     this.c2rTime = totalC2RTime;
+    this.avgDictionaryFields = avgDictionaryFields;
+    this.dictionarySize = dictionarySize;
   }
 
   public long getTotalComputePidTime() {
@@ -108,4 +114,12 @@ public class GlutenSplitResult {
   public long getC2RTime() {
     return c2rTime;
   }
+
+  public double getAvgDictionaryFields() {
+    return avgDictionaryFields;
+  }
+
+  public long getDictionarySize() {
+    return dictionarySize;
+  }
 }
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/LocalPartitionWriterJniWrapper.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/LocalPartitionWriterJniWrapper.java
index be74df1b8a..3269e7b176 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/LocalPartitionWriterJniWrapper.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/LocalPartitionWriterJniWrapper.java
@@ -47,5 +47,6 @@ public class LocalPartitionWriterJniWrapper implements 
RuntimeAware {
       int subDirsPerLocalDir,
       int shuffleFileBufferSize,
       String dataFile,
-      String localDirs);
+      String localDirs,
+      boolean enableDictionary);
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 16dd6b3e1b..f562d1c127 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -196,6 +196,9 @@ class GlutenConfig(conf: SQLConf) extends 
GlutenCoreConfig(conf) {
   def columnarSortShuffleDeserializerBufferSize: Long =
     getConf(COLUMNAR_SORT_SHUFFLE_DESERIALIZER_BUFFER_SIZE)
 
+  def columnarShuffleEnableDictionary: Boolean =
+    getConf(SHUFFLE_ENABLE_DICTIONARY)
+
   def maxBatchSize: Int = getConf(COLUMNAR_MAX_BATCH_SIZE)
 
   def shuffleWriterBufferSize: Int = getConf(SHUFFLE_WRITER_BUFFER_SIZE)
@@ -1037,6 +1040,13 @@ object GlutenConfig {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("1MB")
 
+  val SHUFFLE_ENABLE_DICTIONARY =
+    buildConf("spark.gluten.sql.columnar.shuffle.dictionary.enabled")
+      .internal()
+      .doc("Enable dictionary in hash-based shuffle.")
+      .booleanConf
+      .createWithDefault(false)
+
   val COLUMNAR_MAX_BATCH_SIZE =
     buildConf("spark.gluten.sql.columnar.maxBatchSize")
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to