This is an automated email from the ASF dual-hosted git repository.
ffacs pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new 97a37972e ORC-1767: [C++] Improve writing performance of encoded
string column and support EncodedStringVectorBatch for StringColumnWriter
97a37972e is described below
commit 97a37972eba0bfecf3cd31f6de23216159814129
Author: taiyang-li <[email protected]>
AuthorDate: Tue Sep 3 19:05:46 2024 +0800
ORC-1767: [C++] Improve writing performance of encoded string column and
support EncodedStringVectorBatch for StringColumnWriter
### What changes were proposed in this pull request?
Improve writing performance of encoded string column and support
EncodedStringVectorBatch for StringColumnWriter.
Performance was measured in https://github.com/ClickHouse/orc/pull/15
### Why are the changes needed?
### How was this patch tested?
original tests.
### Was this patch authored or co-authored using generative AI tooling?
Closes #2010 from taiyang-li/apache_improve_dict_write.
Lead-authored-by: taiyang-li <[email protected]>
Co-authored-by: 李扬 <[email protected]>
Signed-off-by: ffacs <[email protected]>
---
c++/include/orc/Vector.hh | 26 +++++++++++++++++
c++/src/ColumnWriter.cc | 60 +++++++++++++++++++++++---------------
c++/src/Vector.cc | 45 ++++++++++++++++++++++++++++
c++/test/TestDictionaryEncoding.cc | 53 +++++++++++++++++++++++++++++++++
4 files changed, 160 insertions(+), 24 deletions(-)
diff --git a/c++/include/orc/Vector.hh b/c++/include/orc/Vector.hh
index 0dfe92696..663bef9cd 100644
--- a/c++/include/orc/Vector.hh
+++ b/c++/include/orc/Vector.hh
@@ -57,6 +57,8 @@ namespace orc {
bool hasNulls;
// whether the vector batch is encoded
bool isEncoded;
+ // whether the dictionary is decoded into vector batch
+ bool dictionaryDecoded;
// custom memory pool
MemoryPool& memoryPool;
@@ -88,6 +90,14 @@ namespace orc {
*/
virtual bool hasVariableLength();
+ /**
+ * Decode possible dictionary into vector batch.
+ */
+ void decodeDictionary();
+
+ protected:
+ virtual void decodeDictionaryImpl() {}
+
private:
ColumnVectorBatch(const ColumnVectorBatch&);
ColumnVectorBatch& operator=(const ColumnVectorBatch&);
@@ -248,6 +258,10 @@ namespace orc {
~EncodedStringVectorBatch() override;
std::string toString() const override;
void resize(uint64_t capacity) override;
+
+ // Calculate data and length in StringVectorBatch from dictionary and index
+ void decodeDictionaryImpl() override;
+
std::shared_ptr<StringDictionary> dictionary;
// index for dictionary entry
@@ -264,6 +278,9 @@ namespace orc {
bool hasVariableLength() override;
std::vector<ColumnVectorBatch*> fields;
+
+ protected:
+ void decodeDictionaryImpl() override;
};
struct ListVectorBatch : public ColumnVectorBatch {
@@ -283,6 +300,9 @@ namespace orc {
// the concatenated elements
std::unique_ptr<ColumnVectorBatch> elements;
+
+ protected:
+ void decodeDictionaryImpl() override;
};
struct MapVectorBatch : public ColumnVectorBatch {
@@ -304,6 +324,9 @@ namespace orc {
std::unique_ptr<ColumnVectorBatch> keys;
// the concatenated elements
std::unique_ptr<ColumnVectorBatch> elements;
+
+ protected:
+ void decodeDictionaryImpl() override;
};
struct UnionVectorBatch : public ColumnVectorBatch {
@@ -327,6 +350,9 @@ namespace orc {
// the sub-columns
std::vector<ColumnVectorBatch*> children;
+
+ protected:
+ void decodeDictionaryImpl() override;
};
struct Decimal {
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 0a9576e17..7adca1440 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -889,10 +889,17 @@ namespace orc {
size_t length;
};
+ struct DictEntryWithIndex {
+ DictEntryWithIndex(const char* str, size_t len, size_t index)
+ : entry(str, len), index(index) {}
+ DictEntry entry;
+ size_t index;
+ };
+
SortedStringDictionary() : totalLength_(0) {}
// insert a new string into dictionary, return its insertion order
- size_t insert(const char* data, size_t len);
+ size_t insert(const char* str, size_t len);
// write dictionary data & length to output buffer
void flush(AppendOnlyBufferedStream* dataStream, RleEncoder*
lengthEncoder) const;
@@ -913,7 +920,9 @@ namespace orc {
private:
struct LessThan {
- bool operator()(const DictEntry& left, const DictEntry& right) const {
+ bool operator()(const DictEntryWithIndex& l, const DictEntryWithIndex&
r) {
+ const auto& left = l.entry;
+ const auto& right = r.entry;
int ret = memcmp(left.data, right.data, std::min(left.length,
right.length));
if (ret != 0) {
return ret < 0;
@@ -922,8 +931,8 @@ namespace orc {
}
};
- std::map<DictEntry, size_t, LessThan> dict_;
- std::vector<std::vector<char>> data_;
+ mutable std::vector<DictEntryWithIndex> flatDict_;
+ std::unordered_map<std::string, size_t> keyToIndex_;
uint64_t totalLength_;
// use friend class here to avoid being bothered by const function calls
@@ -936,14 +945,10 @@ namespace orc {
// insert a new string into dictionary, return its insertion order
size_t SortedStringDictionary::insert(const char* str, size_t len) {
- auto ret = dict_.insert({DictEntry(str, len), dict_.size()});
+ size_t index = flatDict_.size();
+ auto ret = keyToIndex_.emplace(std::string(str, len), index);
if (ret.second) {
- // make a copy to internal storage
- data_.push_back(std::vector<char>(len));
- memcpy(data_.back().data(), str, len);
- // update dictionary entry to link pointer to internal storage
- DictEntry* entry = const_cast<DictEntry*>(&(ret.first->first));
- entry->data = data_.back().data();
+ flatDict_.emplace_back(ret.first->first.data(), ret.first->first.size(),
index);
totalLength_ += len;
}
return ret.first->second;
@@ -952,9 +957,12 @@ namespace orc {
// write dictionary data & length to output buffer
void SortedStringDictionary::flush(AppendOnlyBufferedStream* dataStream,
RleEncoder* lengthEncoder) const {
- for (auto it = dict_.cbegin(); it != dict_.cend(); ++it) {
- dataStream->write(it->first.data, it->first.length);
- lengthEncoder->write(static_cast<int64_t>(it->first.length));
+ std::sort(flatDict_.begin(), flatDict_.end(), LessThan());
+
+ for (const auto& entryWithIndex : flatDict_) {
+ const auto& entry = entryWithIndex.entry;
+ dataStream->write(entry.data, entry.length);
+ lengthEncoder->write(static_cast<int64_t>(entry.length));
}
}
@@ -970,10 +978,9 @@ namespace orc {
*/
void SortedStringDictionary::reorder(std::vector<int64_t>& idxBuffer) const {
// iterate the dictionary to get mapping from insertion order to value
order
- std::vector<size_t> mapping(dict_.size());
- size_t dictIdx = 0;
- for (auto it = dict_.cbegin(); it != dict_.cend(); ++it) {
- mapping[it->second] = dictIdx++;
+ std::vector<size_t> mapping(flatDict_.size());
+ for (size_t i = 0; i < flatDict_.size(); ++i) {
+ mapping[flatDict_[i].index] = i;
}
// do the transformation
@@ -985,15 +992,20 @@ namespace orc {
// get dict entries in insertion order
void SortedStringDictionary::getEntriesInInsertionOrder(
std::vector<const DictEntry*>& entries) const {
- entries.resize(dict_.size());
- for (auto it = dict_.cbegin(); it != dict_.cend(); ++it) {
- entries[it->second] = &(it->first);
+ std::sort(flatDict_.begin(), flatDict_.end(),
+ [](const DictEntryWithIndex& left, const DictEntryWithIndex&
right) {
+ return left.index < right.index;
+ });
+
+ entries.resize(flatDict_.size());
+ for (size_t i = 0; i < flatDict_.size(); ++i) {
+ entries[i] = &(flatDict_[i].entry);
}
}
// return count of entries
size_t SortedStringDictionary::size() const {
- return dict_.size();
+ return flatDict_.size();
}
// return total length of strings in the dictioanry
@@ -1003,8 +1015,8 @@ namespace orc {
void SortedStringDictionary::clear() {
totalLength_ = 0;
- data_.clear();
- dict_.clear();
+ keyToIndex_.clear();
+ flatDict_.clear();
}
class StringColumnWriter : public ColumnWriter {
diff --git a/c++/src/Vector.cc b/c++/src/Vector.cc
index bc4446995..49f47aeb0 100644
--- a/c++/src/Vector.cc
+++ b/c++/src/Vector.cc
@@ -34,6 +34,7 @@ namespace orc {
notNull(pool, cap),
hasNulls(false),
isEncoded(false),
+ dictionaryDecoded(false),
memoryPool(pool) {
std::memset(notNull.data(), 1, capacity);
}
@@ -61,6 +62,13 @@ namespace orc {
return false;
}
+ void ColumnVectorBatch::decodeDictionary() {
+ if (dictionaryDecoded) return;
+
+ decodeDictionaryImpl();
+ dictionaryDecoded = true;
+ }
+
StringDictionary::StringDictionary(MemoryPool& pool)
: dictionaryBlob(pool), dictionaryOffset(pool) {
// PASS
@@ -88,6 +96,17 @@ namespace orc {
}
}
+ void EncodedStringVectorBatch::decodeDictionaryImpl() {
+ size_t n = index.size();
+ resize(n);
+
+ for (size_t i = 0; i < n; ++i) {
+ if (!hasNulls || notNull[i]) {
+ dictionary->getValueByIndex(index[i], data[i], length[i]);
+ }
+ }
+ }
+
StringVectorBatch::StringVectorBatch(uint64_t capacity, MemoryPool& pool)
: ColumnVectorBatch(capacity, pool),
data(pool, capacity),
@@ -174,6 +193,12 @@ namespace orc {
return false;
}
+ void StructVectorBatch::decodeDictionaryImpl() {
+ for (const auto& field : fields) {
+ field->decodeDictionary();
+ }
+ }
+
ListVectorBatch::ListVectorBatch(uint64_t cap, MemoryPool& pool)
: ColumnVectorBatch(cap, pool), offsets(pool, cap + 1) {
offsets.zeroOut();
@@ -211,6 +236,10 @@ namespace orc {
return true;
}
+ void ListVectorBatch::decodeDictionaryImpl() {
+ elements->decodeDictionary();
+ }
+
MapVectorBatch::MapVectorBatch(uint64_t cap, MemoryPool& pool)
: ColumnVectorBatch(cap, pool), offsets(pool, cap + 1) {
offsets.zeroOut();
@@ -251,6 +280,16 @@ namespace orc {
return true;
}
+ void MapVectorBatch::decodeDictionaryImpl() {
+ if (keys) {
+ keys->decodeDictionary();
+ }
+
+ if (elements) {
+ elements->decodeDictionary();
+ }
+ }
+
UnionVectorBatch::UnionVectorBatch(uint64_t cap, MemoryPool& pool)
: ColumnVectorBatch(cap, pool), tags(pool, cap), offsets(pool, cap) {
tags.zeroOut();
@@ -310,6 +349,12 @@ namespace orc {
return false;
}
+ void UnionVectorBatch::decodeDictionaryImpl() {
+ for (const auto& child : children) {
+ child->decodeDictionary();
+ }
+ }
+
Decimal64VectorBatch::Decimal64VectorBatch(uint64_t cap, MemoryPool& pool)
: ColumnVectorBatch(cap, pool),
precision(0),
diff --git a/c++/test/TestDictionaryEncoding.cc
b/c++/test/TestDictionaryEncoding.cc
index d2eeb6eb2..343c2c558 100644
--- a/c++/test/TestDictionaryEncoding.cc
+++ b/c++/test/TestDictionaryEncoding.cc
@@ -434,4 +434,57 @@ namespace orc {
testDictionaryMultipleStripes(DICT_THRESHOLD, false);
testDictionaryMultipleStripes(FALLBACK_THRESHOLD, false);
}
+
+ TEST(DictionaryEncoding, decodeDictionary) {
+ size_t rowCount = 8192;
+ size_t dictionarySize = 100;
+ auto* memoryPool = getDefaultPool();
+
+ auto encodedStringBatch =
std::make_shared<EncodedStringVectorBatch>(rowCount, *memoryPool);
+ EXPECT_FALSE(encodedStringBatch->dictionaryDecoded);
+ encodedStringBatch->numElements = rowCount;
+ encodedStringBatch->hasNulls = true;
+ encodedStringBatch->isEncoded = true;
+ encodedStringBatch->dictionary =
std::make_shared<StringDictionary>(*memoryPool);
+
+ auto& dictionary = *encodedStringBatch->dictionary;
+ dictionary.dictionaryBlob.resize(3 * dictionarySize);
+ dictionary.dictionaryOffset.resize(dictionarySize + 1);
+ dictionary.dictionaryOffset[0] = 0;
+ for (uint64_t i = 0; i < dictionarySize; ++i) {
+ std::ostringstream oss;
+ oss << std::setw(3) << std::setfill('0') << i;
+
+ auto str = oss.str();
+ memcpy(&dictionary.dictionaryBlob[3 * i], str.data(), str.size());
+ dictionary.dictionaryOffset[i + 1] = 3 * (i + 1);
+ }
+
+ for (uint64_t i = 0; i < rowCount; ++i) {
+ if (i % 10 == 0) {
+ encodedStringBatch->notNull[i] = 0;
+ encodedStringBatch->index[i] = 0;
+ } else {
+ encodedStringBatch->notNull[i] = 1;
+ encodedStringBatch->index[i] = i % dictionarySize;
+ }
+ }
+
+ encodedStringBatch->decodeDictionary();
+ EXPECT_TRUE(encodedStringBatch->dictionaryDecoded);
+ EXPECT_EQ(0, encodedStringBatch->blob.size());
+
+ for (uint64_t i = 0; i < rowCount; ++i) {
+ if (encodedStringBatch->notNull[i]) {
+ auto index = encodedStringBatch->index[i];
+ char* buf = nullptr;
+ int64_t buf_size = 0;
+ dictionary.getValueByIndex(index, buf, buf_size);
+
+ EXPECT_EQ(buf, encodedStringBatch->data[i]);
+ EXPECT_EQ(buf_size, encodedStringBatch->length[i]);
+ }
+ }
+ }
+
} // namespace orc