This is an automated email from the ASF dual-hosted git repository.

ffacs pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 97a37972e ORC-1767: [C++] Improve writing performance of encoded 
string column and support EncodedStringVectorBatch for StringColumnWriter
97a37972e is described below

commit 97a37972eba0bfecf3cd31f6de23216159814129
Author: taiyang-li <[email protected]>
AuthorDate: Tue Sep 3 19:05:46 2024 +0800

    ORC-1767: [C++] Improve writing performance of encoded string column and 
support EncodedStringVectorBatch for StringColumnWriter
    
    ### What changes were proposed in this pull request?
    
    Improve writing performance of encoded string column and support 
EncodedStringVectorBatch for StringColumnWriter.
    Performance was measured in https://github.com/ClickHouse/orc/pull/15
    
    ### Why are the changes needed?
    
    ### How was this patch tested?
    
    original tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Closes #2010 from taiyang-li/apache_improve_dict_write.
    
    Lead-authored-by: taiyang-li <[email protected]>
    Co-authored-by: 李扬 <[email protected]>
    Signed-off-by: ffacs <[email protected]>
---
 c++/include/orc/Vector.hh          | 26 +++++++++++++++++
 c++/src/ColumnWriter.cc            | 60 +++++++++++++++++++++++---------------
 c++/src/Vector.cc                  | 45 ++++++++++++++++++++++++++++
 c++/test/TestDictionaryEncoding.cc | 53 +++++++++++++++++++++++++++++++++
 4 files changed, 160 insertions(+), 24 deletions(-)

diff --git a/c++/include/orc/Vector.hh b/c++/include/orc/Vector.hh
index 0dfe92696..663bef9cd 100644
--- a/c++/include/orc/Vector.hh
+++ b/c++/include/orc/Vector.hh
@@ -57,6 +57,8 @@ namespace orc {
     bool hasNulls;
     // whether the vector batch is encoded
     bool isEncoded;
+    // whether the dictionary is decoded into vector batch
+    bool dictionaryDecoded;
 
     // custom memory pool
     MemoryPool& memoryPool;
@@ -88,6 +90,14 @@ namespace orc {
      */
     virtual bool hasVariableLength();
 
+    /**
+     * Decode possible dictionary into vector batch.
+     */
+    void decodeDictionary();
+
+   protected:
+    virtual void decodeDictionaryImpl() {}
+
    private:
     ColumnVectorBatch(const ColumnVectorBatch&);
     ColumnVectorBatch& operator=(const ColumnVectorBatch&);
@@ -248,6 +258,10 @@ namespace orc {
     ~EncodedStringVectorBatch() override;
     std::string toString() const override;
     void resize(uint64_t capacity) override;
+
+    // Calculate data and length in StringVectorBatch from dictionary and index
+    void decodeDictionaryImpl() override;
+
     std::shared_ptr<StringDictionary> dictionary;
 
     // index for dictionary entry
@@ -264,6 +278,9 @@ namespace orc {
     bool hasVariableLength() override;
 
     std::vector<ColumnVectorBatch*> fields;
+
+   protected:
+    void decodeDictionaryImpl() override;
   };
 
   struct ListVectorBatch : public ColumnVectorBatch {
@@ -283,6 +300,9 @@ namespace orc {
 
     // the concatenated elements
     std::unique_ptr<ColumnVectorBatch> elements;
+
+   protected:
+    void decodeDictionaryImpl() override;
   };
 
   struct MapVectorBatch : public ColumnVectorBatch {
@@ -304,6 +324,9 @@ namespace orc {
     std::unique_ptr<ColumnVectorBatch> keys;
     // the concatenated elements
     std::unique_ptr<ColumnVectorBatch> elements;
+
+   protected:
+    void decodeDictionaryImpl() override;
   };
 
   struct UnionVectorBatch : public ColumnVectorBatch {
@@ -327,6 +350,9 @@ namespace orc {
 
     // the sub-columns
     std::vector<ColumnVectorBatch*> children;
+
+   protected:
+    void decodeDictionaryImpl() override;
   };
 
   struct Decimal {
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 0a9576e17..7adca1440 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -889,10 +889,17 @@ namespace orc {
       size_t length;
     };
 
+    struct DictEntryWithIndex {
+      DictEntryWithIndex(const char* str, size_t len, size_t index)
+          : entry(str, len), index(index) {}
+      DictEntry entry;
+      size_t index;
+    };
+
     SortedStringDictionary() : totalLength_(0) {}
 
     // insert a new string into dictionary, return its insertion order
-    size_t insert(const char* data, size_t len);
+    size_t insert(const char* str, size_t len);
 
     // write dictionary data & length to output buffer
     void flush(AppendOnlyBufferedStream* dataStream, RleEncoder* 
lengthEncoder) const;
@@ -913,7 +920,9 @@ namespace orc {
 
    private:
     struct LessThan {
-      bool operator()(const DictEntry& left, const DictEntry& right) const {
+      bool operator()(const DictEntryWithIndex& l, const DictEntryWithIndex& 
r) {
+        const auto& left = l.entry;
+        const auto& right = r.entry;
         int ret = memcmp(left.data, right.data, std::min(left.length, 
right.length));
         if (ret != 0) {
           return ret < 0;
@@ -922,8 +931,8 @@ namespace orc {
       }
     };
 
-    std::map<DictEntry, size_t, LessThan> dict_;
-    std::vector<std::vector<char>> data_;
+    mutable std::vector<DictEntryWithIndex> flatDict_;
+    std::unordered_map<std::string, size_t> keyToIndex_;
     uint64_t totalLength_;
 
     // use friend class here to avoid being bothered by const function calls
@@ -936,14 +945,10 @@ namespace orc {
 
   // insert a new string into dictionary, return its insertion order
   size_t SortedStringDictionary::insert(const char* str, size_t len) {
-    auto ret = dict_.insert({DictEntry(str, len), dict_.size()});
+    size_t index = flatDict_.size();
+    auto ret = keyToIndex_.emplace(std::string(str, len), index);
     if (ret.second) {
-      // make a copy to internal storage
-      data_.push_back(std::vector<char>(len));
-      memcpy(data_.back().data(), str, len);
-      // update dictionary entry to link pointer to internal storage
-      DictEntry* entry = const_cast<DictEntry*>(&(ret.first->first));
-      entry->data = data_.back().data();
+      flatDict_.emplace_back(ret.first->first.data(), ret.first->first.size(), 
index);
       totalLength_ += len;
     }
     return ret.first->second;
@@ -952,9 +957,12 @@ namespace orc {
   // write dictionary data & length to output buffer
   void SortedStringDictionary::flush(AppendOnlyBufferedStream* dataStream,
                                      RleEncoder* lengthEncoder) const {
-    for (auto it = dict_.cbegin(); it != dict_.cend(); ++it) {
-      dataStream->write(it->first.data, it->first.length);
-      lengthEncoder->write(static_cast<int64_t>(it->first.length));
+    std::sort(flatDict_.begin(), flatDict_.end(), LessThan());
+
+    for (const auto& entryWithIndex : flatDict_) {
+      const auto& entry = entryWithIndex.entry;
+      dataStream->write(entry.data, entry.length);
+      lengthEncoder->write(static_cast<int64_t>(entry.length));
     }
   }
 
@@ -970,10 +978,9 @@ namespace orc {
    */
   void SortedStringDictionary::reorder(std::vector<int64_t>& idxBuffer) const {
     // iterate the dictionary to get mapping from insertion order to value 
order
-    std::vector<size_t> mapping(dict_.size());
-    size_t dictIdx = 0;
-    for (auto it = dict_.cbegin(); it != dict_.cend(); ++it) {
-      mapping[it->second] = dictIdx++;
+    std::vector<size_t> mapping(flatDict_.size());
+    for (size_t i = 0; i < flatDict_.size(); ++i) {
+      mapping[flatDict_[i].index] = i;
     }
 
     // do the transformation
@@ -985,15 +992,20 @@ namespace orc {
   // get dict entries in insertion order
   void SortedStringDictionary::getEntriesInInsertionOrder(
       std::vector<const DictEntry*>& entries) const {
-    entries.resize(dict_.size());
-    for (auto it = dict_.cbegin(); it != dict_.cend(); ++it) {
-      entries[it->second] = &(it->first);
+    std::sort(flatDict_.begin(), flatDict_.end(),
+              [](const DictEntryWithIndex& left, const DictEntryWithIndex& 
right) {
+                return left.index < right.index;
+              });
+
+    entries.resize(flatDict_.size());
+    for (size_t i = 0; i < flatDict_.size(); ++i) {
+      entries[i] = &(flatDict_[i].entry);
     }
   }
 
   // return count of entries
   size_t SortedStringDictionary::size() const {
-    return dict_.size();
+    return flatDict_.size();
   }
 
   // return total length of strings in the dictioanry
@@ -1003,8 +1015,8 @@ namespace orc {
 
   void SortedStringDictionary::clear() {
     totalLength_ = 0;
-    data_.clear();
-    dict_.clear();
+    keyToIndex_.clear();
+    flatDict_.clear();
   }
 
   class StringColumnWriter : public ColumnWriter {
diff --git a/c++/src/Vector.cc b/c++/src/Vector.cc
index bc4446995..49f47aeb0 100644
--- a/c++/src/Vector.cc
+++ b/c++/src/Vector.cc
@@ -34,6 +34,7 @@ namespace orc {
         notNull(pool, cap),
         hasNulls(false),
         isEncoded(false),
+        dictionaryDecoded(false),
         memoryPool(pool) {
     std::memset(notNull.data(), 1, capacity);
   }
@@ -61,6 +62,13 @@ namespace orc {
     return false;
   }
 
+  void ColumnVectorBatch::decodeDictionary() {
+    if (dictionaryDecoded) return;
+
+    decodeDictionaryImpl();
+    dictionaryDecoded = true;
+  }
+
   StringDictionary::StringDictionary(MemoryPool& pool)
       : dictionaryBlob(pool), dictionaryOffset(pool) {
     // PASS
@@ -88,6 +96,17 @@ namespace orc {
     }
   }
 
+  void EncodedStringVectorBatch::decodeDictionaryImpl() {
+    size_t n = index.size();
+    resize(n);
+
+    for (size_t i = 0; i < n; ++i) {
+      if (!hasNulls || notNull[i]) {
+        dictionary->getValueByIndex(index[i], data[i], length[i]);
+      }
+    }
+  }
+
   StringVectorBatch::StringVectorBatch(uint64_t capacity, MemoryPool& pool)
       : ColumnVectorBatch(capacity, pool),
         data(pool, capacity),
@@ -174,6 +193,12 @@ namespace orc {
     return false;
   }
 
+  void StructVectorBatch::decodeDictionaryImpl() {
+    for (const auto& field : fields) {
+      field->decodeDictionary();
+    }
+  }
+
   ListVectorBatch::ListVectorBatch(uint64_t cap, MemoryPool& pool)
       : ColumnVectorBatch(cap, pool), offsets(pool, cap + 1) {
     offsets.zeroOut();
@@ -211,6 +236,10 @@ namespace orc {
     return true;
   }
 
+  void ListVectorBatch::decodeDictionaryImpl() {
+    elements->decodeDictionary();
+  }
+
   MapVectorBatch::MapVectorBatch(uint64_t cap, MemoryPool& pool)
       : ColumnVectorBatch(cap, pool), offsets(pool, cap + 1) {
     offsets.zeroOut();
@@ -251,6 +280,16 @@ namespace orc {
     return true;
   }
 
+  void MapVectorBatch::decodeDictionaryImpl() {
+    if (keys) {
+      keys->decodeDictionary();
+    }
+
+    if (elements) {
+      elements->decodeDictionary();
+    }
+  }
+
   UnionVectorBatch::UnionVectorBatch(uint64_t cap, MemoryPool& pool)
       : ColumnVectorBatch(cap, pool), tags(pool, cap), offsets(pool, cap) {
     tags.zeroOut();
@@ -310,6 +349,12 @@ namespace orc {
     return false;
   }
 
+  void UnionVectorBatch::decodeDictionaryImpl() {
+    for (const auto& child : children) {
+      child->decodeDictionary();
+    }
+  }
+
   Decimal64VectorBatch::Decimal64VectorBatch(uint64_t cap, MemoryPool& pool)
       : ColumnVectorBatch(cap, pool),
         precision(0),
diff --git a/c++/test/TestDictionaryEncoding.cc 
b/c++/test/TestDictionaryEncoding.cc
index d2eeb6eb2..343c2c558 100644
--- a/c++/test/TestDictionaryEncoding.cc
+++ b/c++/test/TestDictionaryEncoding.cc
@@ -434,4 +434,57 @@ namespace orc {
     testDictionaryMultipleStripes(DICT_THRESHOLD, false);
     testDictionaryMultipleStripes(FALLBACK_THRESHOLD, false);
   }
+
+  TEST(DictionaryEncoding, decodeDictionary) {
+    size_t rowCount = 8192;
+    size_t dictionarySize = 100;
+    auto* memoryPool = getDefaultPool();
+
+    auto encodedStringBatch = 
std::make_shared<EncodedStringVectorBatch>(rowCount, *memoryPool);
+    EXPECT_FALSE(encodedStringBatch->dictionaryDecoded);
+    encodedStringBatch->numElements = rowCount;
+    encodedStringBatch->hasNulls = true;
+    encodedStringBatch->isEncoded = true;
+    encodedStringBatch->dictionary = 
std::make_shared<StringDictionary>(*memoryPool);
+
+    auto& dictionary = *encodedStringBatch->dictionary;
+    dictionary.dictionaryBlob.resize(3 * dictionarySize);
+    dictionary.dictionaryOffset.resize(dictionarySize + 1);
+    dictionary.dictionaryOffset[0] = 0;
+    for (uint64_t i = 0; i < dictionarySize; ++i) {
+      std::ostringstream oss;
+      oss << std::setw(3) << std::setfill('0') << i;
+
+      auto str = oss.str();
+      memcpy(&dictionary.dictionaryBlob[3 * i], str.data(), str.size());
+      dictionary.dictionaryOffset[i + 1] = 3 * (i + 1);
+    }
+
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      if (i % 10 == 0) {
+        encodedStringBatch->notNull[i] = 0;
+        encodedStringBatch->index[i] = 0;
+      } else {
+        encodedStringBatch->notNull[i] = 1;
+        encodedStringBatch->index[i] = i % dictionarySize;
+      }
+    }
+
+    encodedStringBatch->decodeDictionary();
+    EXPECT_TRUE(encodedStringBatch->dictionaryDecoded);
+    EXPECT_EQ(0, encodedStringBatch->blob.size());
+
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      if (encodedStringBatch->notNull[i]) {
+        auto index = encodedStringBatch->index[i];
+        char* buf = nullptr;
+        int64_t buf_size = 0;
+        dictionary.getValueByIndex(index, buf, buf_size);
+
+        EXPECT_EQ(buf, encodedStringBatch->data[i]);
+        EXPECT_EQ(buf_size, encodedStringBatch->length[i]);
+      }
+    }
+  }
+
 }  // namespace orc

Reply via email to