This is an automated email from the ASF dual-hosted git repository.
xndai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/master by this push:
new bf5b780 ORC-469:[C++] Add EncodedStringVectorBatch to expose string
dictionary in VectorBatch (#367)
bf5b780 is described below
commit bf5b7800930bfa030db83aba925d9d3b75852839
Author: Yurui Zhou <[email protected]>
AuthorDate: Wed Mar 13 13:11:51 2019 +0800
ORC-469:[C++] Add EncodedStringVectorBatch to expose string dictionary in
VectorBatch (#367)
* ORC-469:[C++] Add EncodedStringVectorBatch to expose string dictionary
in VectorBatch
* resolve pull requests comments on code style and class naming
* add test to cover direct string, map and list
* Pass encoded param through CreateBatch
fix #367
---
c++/include/orc/Reader.hh | 13 ++
c++/include/orc/Type.hh | 3 +-
c++/include/orc/Vector.hh | 36 ++++++
c++/src/ColumnReader.cc | 208 +++++++++++++++++++++++++-------
c++/src/ColumnReader.hh | 18 +++
c++/src/ColumnWriter.cc | 24 ++--
c++/src/Options.hh | 11 ++
c++/src/Reader.cc | 12 +-
c++/src/Reader.hh | 1 +
c++/src/TypeImpl.cc | 18 +--
c++/src/TypeImpl.hh | 3 +-
c++/src/Vector.cc | 24 ++++
c++/test/TestColumnReader.cc | 280 ++++++++++++++++++++++++++++++-------------
13 files changed, 502 insertions(+), 149 deletions(-)
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 5818b46..32549b5 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -181,6 +181,19 @@ namespace orc {
RowReaderOptions& forcedScaleOnHive11Decimal(int32_t forcedScale);
/**
+ * Set enable encoding block mode.
+ * By enable encoding block mode, Row Reader will not decode
+ * dictionary encoded string vector, but instead return an index array with
+ * reference to corresponding dictionary.
+ */
+ RowReaderOptions& setEnableLazyDecoding(bool enable);
+
+ /**
+ * Should enable encoding block mode
+ */
+ bool getEnableLazyDecoding() const;
+
+ /**
* Were the field ids set?
*/
bool getIndexesSet() const;
diff --git a/c++/include/orc/Type.hh b/c++/include/orc/Type.hh
index 68f5ec2..c0cbf2d 100644
--- a/c++/include/orc/Type.hh
+++ b/c++/include/orc/Type.hh
@@ -64,7 +64,8 @@ namespace orc {
* Create a row batch for this type.
*/
virtual ORC_UNIQUE_PTR<ColumnVectorBatch> createRowBatch(uint64_t size,
- MemoryPool& pool
+ MemoryPool& pool,
+ bool encoded =
false
) const = 0;
/**
diff --git a/c++/include/orc/Vector.hh b/c++/include/orc/Vector.hh
index c977998..9391da3 100644
--- a/c++/include/orc/Vector.hh
+++ b/c++/include/orc/Vector.hh
@@ -50,6 +50,8 @@ namespace orc {
DataBuffer<char> notNull;
// whether there are any null values
bool hasNulls;
+ // whether the vector batch is encoded
+ bool isEncoded;
// custom memory pool
MemoryPool& memoryPool;
@@ -113,6 +115,40 @@ namespace orc {
DataBuffer<int64_t> length;
};
+ struct StringDictionary {
+ StringDictionary(MemoryPool& pool);
+ DataBuffer<char> dictionaryBlob;
+
+ // Offset for each dictionary key entry.
+ DataBuffer<int64_t> dictionaryOffset;
+
+ void getValueByIndex(int64_t index, char*& valPtr, int64_t& length) {
+ if (index < 0 || static_cast<uint64_t>(index) >=
dictionaryOffset.size()) {
+ throw std::out_of_range("index out of range.");
+ }
+
+ int64_t* offsetPtr = dictionaryOffset.data();
+
+ valPtr = dictionaryBlob.data() + offsetPtr[index];
+ length = offsetPtr[index + 1] - offsetPtr[index];
+ }
+ };
+
+ /**
+ * Include a index array with reference to corresponding dictionary.
+ * User first obtain index from index array and retrieve string pointer
+ * and length by calling getValueByIndex() from dictionary.
+ */
+ struct EncodedStringVectorBatch : public StringVectorBatch {
+ EncodedStringVectorBatch(uint64_t capacity, MemoryPool& pool);
+ virtual ~EncodedStringVectorBatch();
+ std::string toString() const;
+ std::shared_ptr<StringDictionary> dictionary;
+
+ // index for dictionary entry
+ DataBuffer<int64_t> index;
+ };
+
struct StructVectorBatch: public ColumnVectorBatch {
StructVectorBatch(uint64_t capacity, MemoryPool& pool);
virtual ~StructVectorBatch();
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index d4a5691..01b9925 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -479,10 +479,8 @@ namespace orc {
class StringDictionaryColumnReader: public ColumnReader {
private:
- DataBuffer<char> dictionaryBlob;
- DataBuffer<int64_t> dictionaryOffset;
+ std::shared_ptr<StringDictionary> dictionary;
std::unique_ptr<RleDecoder> rle;
- uint64_t dictionaryCount;
public:
StringDictionaryColumnReader(const Type& type, StripeStreams& stipe);
@@ -493,46 +491,44 @@ namespace orc {
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
+
+ void nextEncoded(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char* notNull) override;
};
StringDictionaryColumnReader::StringDictionaryColumnReader
(const Type& type,
StripeStreams& stripe
): ColumnReader(type, stripe),
- dictionaryBlob(stripe.getMemoryPool()),
- dictionaryOffset(stripe.getMemoryPool()) {
+ dictionary(new StringDictionary(stripe.getMemoryPool())) {
RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId)
.kind());
- dictionaryCount = stripe.getEncoding(columnId).dictionarysize();
- std::unique_ptr<SeekableInputStream> stream =
- stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
- if (stream == nullptr)
- throw ParseError("DATA stream not found in StringDictionaryColumn");
- rle = createRleDecoder(std::move(stream), false, rleVersion, memoryPool);
- stream = stripe.getStream(columnId, proto::Stream_Kind_LENGTH, false);
- if (dictionaryCount > 0 && stream == nullptr) {
- throw ParseError("LENGTH stream not found in StringDictionaryColumn");
- }
+ uint32_t dictSize = stripe.getEncoding(columnId).dictionarysize();
+ rle = createRleDecoder(stripe.getStream(columnId,
+ proto::Stream_Kind_DATA,
+ true),
+ false, rleVersion, memoryPool);
std::unique_ptr<RleDecoder> lengthDecoder =
- createRleDecoder(std::move(stream), false, rleVersion, memoryPool);
- dictionaryOffset.resize(dictionaryCount+1);
- int64_t* lengthArray = dictionaryOffset.data();
- lengthDecoder->next(lengthArray + 1, dictionaryCount, nullptr);
+ createRleDecoder(stripe.getStream(columnId,
+ proto::Stream_Kind_LENGTH,
+ false),
+ false, rleVersion, memoryPool);
+ dictionary->dictionaryOffset.resize(dictSize + 1);
+ int64_t* lengthArray = dictionary->dictionaryOffset.data();
+ lengthDecoder->next(lengthArray + 1, dictSize, nullptr);
lengthArray[0] = 0;
- for (uint64_t i = 1; i < dictionaryCount + 1; ++i) {
- if (lengthArray[i] < 0)
- throw ParseError("Negative dictionary entry length");
+ for(uint32_t i = 1; i < dictSize + 1; ++i) {
lengthArray[i] += lengthArray[i - 1];
}
- int64_t blobSize = lengthArray[dictionaryCount];
- dictionaryBlob.resize(static_cast<uint64_t>(blobSize));
+ dictionary->dictionaryBlob.resize(
+ static_cast<uint64_t>(lengthArray[dictSize]));
std::unique_ptr<SeekableInputStream> blobStream =
- stripe.getStream(columnId, proto::Stream_Kind_DICTIONARY_DATA, false);
- if (blobSize > 0 && blobStream == nullptr) {
- throw ParseError(
- "DICTIONARY_DATA stream not found in StringDictionaryColumn");
- }
- readFully(dictionaryBlob.data(), blobSize, blobStream.get());
+ stripe.getStream(columnId, proto::Stream_Kind_DICTIONARY_DATA, false);
+ readFully(
+ dictionary->dictionaryBlob.data(),
+ lengthArray[dictSize],
+ blobStream.get());
}
StringDictionaryColumnReader::~StringDictionaryColumnReader() {
@@ -552,16 +548,17 @@ namespace orc {
// update the notNull from the parent class
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
- char *blob = dictionaryBlob.data();
- int64_t *dictionaryOffsets = dictionaryOffset.data();
+ char *blob = dictionary->dictionaryBlob.data();
+ int64_t *dictionaryOffsets = dictionary->dictionaryOffset.data();
char **outputStarts = byteBatch.data.data();
int64_t *outputLengths = byteBatch.length.data();
rle->next(outputLengths, numValues, notNull);
+ uint64_t dictionaryCount = dictionary->dictionaryOffset.size() - 1;
if (notNull) {
for(uint64_t i=0; i < numValues; ++i) {
if (notNull[i]) {
int64_t entry = outputLengths[i];
- if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount) {
+ if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount ) {
throw ParseError("Entry index out of range in
StringDictionaryColumn");
}
outputStarts[i] = blob + dictionaryOffsets[entry];
@@ -582,6 +579,20 @@ namespace orc {
}
}
+ void StringDictionaryColumnReader::nextEncoded(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char* notNull) {
+ ColumnReader::next(rowBatch, numValues, notNull);
+ notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
+ rowBatch.isEncoded = true;
+
+ EncodedStringVectorBatch& batch =
dynamic_cast<EncodedStringVectorBatch&>(rowBatch);
+ batch.dictionary = this->dictionary;
+
+ // Length buffer is reused to save dictionary entry ids
+ rle->next(batch.index.data(), numValues, notNull);
+ }
+
class StringDirectColumnReader: public ColumnReader {
private:
DataBuffer<char> blobBuffer;
@@ -785,6 +796,16 @@ namespace orc {
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
+
+ void nextEncoded(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull) override;
+
+ private:
+ template<bool encoded>
+ void nextInternal(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull);
};
StructColumnReader::StructColumnReader(const Type& type,
@@ -826,16 +847,35 @@ namespace orc {
void StructColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
+ nextInternal<false>(rowBatch, numValues, notNull);
+ }
+
+ void StructColumnReader::nextEncoded(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull) {
+ nextInternal<true>(rowBatch, numValues, notNull);
+ }
+
+ template<bool encoded>
+ void StructColumnReader::nextInternal(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
uint64_t i=0;
notNull = rowBatch.hasNulls? rowBatch.notNull.data() : nullptr;
for(std::vector<ColumnReader*>::iterator ptr=children.begin();
ptr != children.end(); ++ptr, ++i) {
- (*ptr)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
- numValues, notNull);
+ if (encoded) {
+
(*ptr)->nextEncoded(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
+ numValues, notNull);
+ } else {
+ (*ptr)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
+ numValues, notNull);
+ }
}
}
+
class ListColumnReader: public ColumnReader {
private:
std::unique_ptr<ColumnReader> child;
@@ -850,6 +890,16 @@ namespace orc {
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
+
+ void nextEncoded(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull) override;
+
+ private:
+ template<bool encoded>
+ void nextInternal(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull);
};
ListColumnReader::ListColumnReader(const Type& type,
@@ -897,6 +947,19 @@ namespace orc {
}
void ListColumnReader::next(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull) {
+ nextInternal<false>(rowBatch, numValues, notNull);
+ }
+
+ void ListColumnReader::nextEncoded(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull) {
+ nextInternal<true>(rowBatch, numValues, notNull);
+ }
+
+ template<bool encoded>
+ void ListColumnReader::nextInternal(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
@@ -925,7 +988,11 @@ namespace orc {
offsets[numValues] = static_cast<int64_t>(totalChildren);
ColumnReader *childReader = child.get();
if (childReader) {
- childReader->next(*(listBatch.elements.get()), totalChildren, nullptr);
+ if (encoded) {
+ childReader->nextEncoded(*(listBatch.elements.get()), totalChildren,
nullptr);
+ } else {
+ childReader->next(*(listBatch.elements.get()), totalChildren, nullptr);
+ }
}
}
@@ -944,6 +1011,16 @@ namespace orc {
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
+
+ void nextEncoded(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull) override;
+
+ private:
+ template<bool encoded>
+ void nextInternal(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull);
};
MapColumnReader::MapColumnReader(const Type& type,
@@ -1002,6 +1079,21 @@ namespace orc {
void MapColumnReader::next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
+ char *notNull)
+ {
+ nextInternal<false>(rowBatch, numValues, notNull);
+ }
+
+ void MapColumnReader::nextEncoded(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull)
+ {
+ nextInternal<true>(rowBatch, numValues, notNull);
+ }
+
+ template<bool encoded>
+ void MapColumnReader::nextInternal(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
MapVectorBatch &mapBatch = dynamic_cast<MapVectorBatch&>(rowBatch);
@@ -1029,11 +1121,19 @@ namespace orc {
offsets[numValues] = static_cast<int64_t>(totalChildren);
ColumnReader *rawKeyReader = keyReader.get();
if (rawKeyReader) {
- rawKeyReader->next(*(mapBatch.keys.get()), totalChildren, nullptr);
+ if (encoded) {
+ rawKeyReader->nextEncoded(*(mapBatch.keys.get()), totalChildren,
nullptr);
+ } else {
+ rawKeyReader->next(*(mapBatch.keys.get()), totalChildren, nullptr);
+ }
}
ColumnReader *rawElementReader = elementReader.get();
if (rawElementReader) {
- rawElementReader->next(*(mapBatch.elements.get()), totalChildren,
nullptr);
+ if (encoded) {
+ rawElementReader->nextEncoded(*(mapBatch.elements.get()),
totalChildren, nullptr);
+ } else {
+ rawElementReader->next(*(mapBatch.elements.get()), totalChildren,
nullptr);
+ }
}
}
@@ -1053,6 +1153,16 @@ namespace orc {
void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) override;
+
+ void nextEncoded(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull) override;
+
+ private:
+ template<bool encoded>
+ void nextInternal(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull);
};
UnionColumnReader::UnionColumnReader(const Type& type,
@@ -1108,6 +1218,19 @@ namespace orc {
}
void UnionColumnReader::next(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull) {
+ nextInternal<false>(rowBatch, numValues, notNull);
+ }
+
+ void UnionColumnReader::nextEncoded(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull) {
+ nextInternal<true>(rowBatch, numValues, notNull);
+ }
+
+ template<bool encoded>
+ void UnionColumnReader::nextInternal(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
@@ -1135,8 +1258,13 @@ namespace orc {
// read the right number of each child column
for(size_t i=0; i < numChildren; ++i) {
if (childrenReader[i] != nullptr) {
- childrenReader[i]->next(*(unionBatch.children[i]),
- static_cast<uint64_t>(counts[i]), nullptr);
+ if (encoded) {
+ childrenReader[i]->nextEncoded(*(unionBatch.children[i]),
+ static_cast<uint64_t>(counts[i]), nullptr);
+ } else {
+ childrenReader[i]->next(*(unionBatch.children[i]),
+ static_cast<uint64_t>(counts[i]), nullptr);
+ }
}
}
}
diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh
index 52704fc..ca65872 100644
--- a/c++/src/ColumnReader.hh
+++ b/c++/src/ColumnReader.hh
@@ -20,6 +20,7 @@
#define ORC_COLUMN_READER_HH
#include "orc/Vector.hh"
+
#include "ByteRLE.hh"
#include "Compression.hh"
#include "Timezone.hh"
@@ -117,6 +118,23 @@ namespace orc {
virtual void next(ColumnVectorBatch& rowBatch,
uint64_t numValues,
char* notNull);
+
+ /**
+ * Read the next group of values without decoding
+ * @param rowBatch the memory to read into.
+ * @param numValues the number of values to read
+ * @param notNull if null, all values are not null. Otherwise, it is
+ * a mask (with at least numValues bytes) for which values to
+ * set.
+ */
+ virtual void nextEncoded(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char* notNull)
+ {
+ rowBatch.isEncoded = false;
+ next(rowBatch, numValues, notNull);
+ }
+
};
/**
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 94fdb3a..62bef3b 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -840,7 +840,7 @@ namespace orc {
/**
* Implementation of increasing sorted string dictionary
*/
- class StringDictionary {
+ class SortedStringDictionary {
public:
struct DictEntry {
DictEntry(const char * str, size_t len):data(str),length(len) {}
@@ -848,7 +848,7 @@ namespace orc {
size_t length;
};
- StringDictionary():totalLength(0) {}
+ SortedStringDictionary():totalLength(0) {}
// insert a new string into dictionary, return its insertion order
size_t insert(const char * data, size_t len);
@@ -895,7 +895,7 @@ namespace orc {
};
// insert a new string into dictionary, return its insertion order
- size_t StringDictionary::insert(const char * str, size_t len) {
+ size_t SortedStringDictionary::insert(const char * str, size_t len) {
auto ret = dict.insert({DictEntry(str, len), dict.size()});
if (ret.second) {
// make a copy to internal storage
@@ -910,7 +910,7 @@ namespace orc {
}
// write dictionary data & length to output buffer
- void StringDictionary::flush(AppendOnlyBufferedStream * dataStream,
+ void SortedStringDictionary::flush(AppendOnlyBufferedStream * dataStream,
RleEncoder * lengthEncoder) const {
for (auto it = dict.cbegin(); it != dict.cend(); ++it) {
dataStream->write(it->first.data, it->first.length);
@@ -928,7 +928,7 @@ namespace orc {
* the indexes from insertion order to dictionary value order for final
* output.
*/
- void StringDictionary::reorder(std::vector<int64_t>& idxBuffer) const {
+ void SortedStringDictionary::reorder(std::vector<int64_t>& idxBuffer) const {
// iterate the dictionary to get mapping from insertion order to value
order
std::vector<size_t> mapping(dict.size());
size_t dictIdx = 0;
@@ -944,7 +944,7 @@ namespace orc {
}
// get dict entries in insertion order
- void StringDictionary::getEntriesInInsertionOrder(
+ void SortedStringDictionary::getEntriesInInsertionOrder(
std::vector<const DictEntry *>& entries) const {
entries.resize(dict.size());
for (auto it = dict.cbegin(); it != dict.cend(); ++it) {
@@ -953,16 +953,16 @@ namespace orc {
}
// return count of entries
- size_t StringDictionary::size() const {
+ size_t SortedStringDictionary::size() const {
return dict.size();
}
// return total length of strings in the dictioanry
- uint64_t StringDictionary::length() const {
+ uint64_t SortedStringDictionary::length() const {
return totalLength;
}
- void StringDictionary::clear() {
+ void SortedStringDictionary::clear() {
totalLength = 0;
data.clear();
dict.clear();
@@ -1021,7 +1021,7 @@ namespace orc {
/**
* dictionary related variables
*/
- StringDictionary dictionary;
+ SortedStringDictionary dictionary;
// whether or not dictionary checking is done
bool doneDictionaryCheck;
// whether or not it should be used
@@ -1317,11 +1317,11 @@ namespace orc {
}
// get dictionary entries in insertion order
- std::vector<const StringDictionary::DictEntry *> entries;
+ std::vector<const SortedStringDictionary::DictEntry *> entries;
dictionary.getEntriesInInsertionOrder(entries);
// store each length of the data into a vector
- const StringDictionary::DictEntry * dictEntry = nullptr;
+ const SortedStringDictionary::DictEntry * dictEntry = nullptr;
for (uint64_t i = 0; i != dictionary.idxInDictBuffer.size(); ++i) {
// write one row data in direct encoding
dictEntry = entries[static_cast<size_t>(dictionary.idxInDictBuffer[i])];
diff --git a/c++/src/Options.hh b/c++/src/Options.hh
index 0c644af..795e166 100644
--- a/c++/src/Options.hh
+++ b/c++/src/Options.hh
@@ -129,6 +129,7 @@ namespace orc {
uint64_t dataLength;
bool throwOnHive11DecimalOverflow;
int32_t forcedScaleOnHive11Decimal;
+ bool enableLazyDecoding;
RowReaderOptionsPrivate() {
selection = ColumnSelection_NONE;
@@ -136,6 +137,7 @@ namespace orc {
dataLength = std::numeric_limits<uint64_t>::max();
throwOnHive11DecimalOverflow = true;
forcedScaleOnHive11Decimal = 6;
+ enableLazyDecoding = false;
}
};
@@ -242,6 +244,15 @@ namespace orc {
int32_t RowReaderOptions::getForcedScaleOnHive11Decimal() const {
return privateBits->forcedScaleOnHive11Decimal;
}
+
+ bool RowReaderOptions::getEnableLazyDecoding() const {
+ return privateBits->enableLazyDecoding;
+ }
+
+ RowReaderOptions& RowReaderOptions::setEnableLazyDecoding(bool enable) {
+ privateBits->enableLazyDecoding = enable;
+ return *this;
+ }
}
#endif
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 4b7977a..8f29327 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -187,7 +187,8 @@ namespace orc {
throwOnHive11DecimalOverflow(opts.getThrowOnHive11DecimalOverflow()),
forcedScaleOnHive11Decimal(opts.getForcedScaleOnHive11Decimal()),
footer(contents->footer.get()),
- firstRowOfStripe(*contents->pool, 0) {
+ firstRowOfStripe(*contents->pool, 0),
+ enableEncodedBlock(opts.getEnableLazyDecoding()) {
uint64_t numberOfStripes;
numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
currentStripe = numberOfStripes;
@@ -856,7 +857,12 @@ namespace orc {
std::min(static_cast<uint64_t>(data.capacity),
rowsInCurrentStripe - currentRowInStripe);
data.numElements = rowsToRead;
- reader->next(data, rowsToRead, nullptr);
+ if (enableEncodedBlock) {
+ reader->nextEncoded(data, rowsToRead, nullptr);
+ }
+ else {
+ reader->next(data, rowsToRead, nullptr);
+ }
// update row number
previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe;
currentRowInStripe += rowsToRead;
@@ -869,7 +875,7 @@ namespace orc {
std::unique_ptr<ColumnVectorBatch> RowReaderImpl::createRowBatch
(uint64_t capacity) const {
- return getSelectedType().createRowBatch(capacity, *contents->pool);
+ return getSelectedType().createRowBatch(capacity, *contents->pool,
enableEncodedBlock);
}
void ensureOrcFooter(InputStream* stream,
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 03ef9dd..4efa894 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -136,6 +136,7 @@ namespace orc {
proto::StripeFooter currentStripeFooter;
std::unique_ptr<ColumnReader> reader;
+ bool enableEncodedBlock;
// internal methods
void startNextStripe();
diff --git a/c++/src/TypeImpl.cc b/c++/src/TypeImpl.cc
index d1d9c62..c154f2a 100644
--- a/c++/src/TypeImpl.cc
+++ b/c++/src/TypeImpl.cc
@@ -229,7 +229,8 @@ namespace orc {
std::unique_ptr<ColumnVectorBatch>
TypeImpl::createRowBatch(uint64_t capacity,
- MemoryPool& memoryPool) const {
+ MemoryPool& memoryPool,
+ bool encoded) const {
switch (static_cast<int64_t>(kind)) {
case BOOLEAN:
case BYTE:
@@ -249,7 +250,10 @@ namespace orc {
case BINARY:
case CHAR:
case VARCHAR:
- return std::unique_ptr<ColumnVectorBatch>
+ return encoded ?
+ std::unique_ptr<ColumnVectorBatch>
+ (new EncodedStringVectorBatch(capacity, memoryPool))
+ : std::unique_ptr<ColumnVectorBatch>
(new StringVectorBatch(capacity, memoryPool));
case TIMESTAMP:
@@ -262,7 +266,7 @@ namespace orc {
for(uint64_t i=0; i < getSubtypeCount(); ++i) {
result->fields.push_back(getSubtype(i)->
createRowBatch(capacity,
- memoryPool).release());
+ memoryPool,
encoded).release());
}
return return_value;
}
@@ -271,7 +275,7 @@ namespace orc {
ListVectorBatch* result = new ListVectorBatch(capacity, memoryPool);
std::unique_ptr<ColumnVectorBatch> return_value =
std::unique_ptr<ColumnVectorBatch>(result);
if (getSubtype(0) != nullptr) {
- result->elements = getSubtype(0)->createRowBatch(capacity, memoryPool);
+ result->elements = getSubtype(0)->createRowBatch(capacity, memoryPool,
encoded);
}
return return_value;
}
@@ -280,10 +284,10 @@ namespace orc {
MapVectorBatch* result = new MapVectorBatch(capacity, memoryPool);
std::unique_ptr<ColumnVectorBatch> return_value =
std::unique_ptr<ColumnVectorBatch>(result);
if (getSubtype(0) != nullptr) {
- result->keys = getSubtype(0)->createRowBatch(capacity, memoryPool);
+ result->keys = getSubtype(0)->createRowBatch(capacity, memoryPool,
encoded);
}
if (getSubtype(1) != nullptr) {
- result->elements = getSubtype(1)->createRowBatch(capacity, memoryPool);
+ result->elements = getSubtype(1)->createRowBatch(capacity, memoryPool,
encoded);
}
return return_value;
}
@@ -303,7 +307,7 @@ namespace orc {
std::unique_ptr<ColumnVectorBatch> return_value =
std::unique_ptr<ColumnVectorBatch>(result);
for(uint64_t i=0; i < getSubtypeCount(); ++i) {
result->children.push_back(getSubtype(i)->createRowBatch(capacity,
- memoryPool)
+ memoryPool,
encoded)
.release());
}
return return_value;
diff --git a/c++/src/TypeImpl.hh b/c++/src/TypeImpl.hh
index 4cf50f3..054ceab 100644
--- a/c++/src/TypeImpl.hh
+++ b/c++/src/TypeImpl.hh
@@ -85,7 +85,8 @@ namespace orc {
Type* addUnionChild(std::unique_ptr<Type> fieldType) override;
std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size,
- MemoryPool& memoryPool
+ MemoryPool& memoryPool,
+ bool encoded = false
) const override;
/**
diff --git a/c++/src/Vector.cc b/c++/src/Vector.cc
index 9e7e2d3..ae8ab0f 100644
--- a/c++/src/Vector.cc
+++ b/c++/src/Vector.cc
@@ -33,6 +33,7 @@ namespace orc {
numElements(0),
notNull(pool, cap),
hasNulls(false),
+ isEncoded(false),
memoryPool(pool) {
std::memset(notNull.data(), 1, capacity);
}
@@ -112,6 +113,29 @@ namespace orc {
+ static_cast<uint64_t>(data.capacity() * sizeof(double));
}
+ StringDictionary::StringDictionary(MemoryPool& pool)
+ : dictionaryBlob(pool),
+ dictionaryOffset(pool) {
+ // PASS
+ }
+
+ EncodedStringVectorBatch::EncodedStringVectorBatch(uint64_t capacity,
MemoryPool& pool)
+ : StringVectorBatch(capacity, pool),
+ dictionary(nullptr),
+ index(pool, capacity) {
+ // PASS
+ }
+
+ EncodedStringVectorBatch::~EncodedStringVectorBatch() {
+ // PASS
+ }
+
+ std::string EncodedStringVectorBatch::toString() const {
+ std::ostringstream buffer;
+ buffer << "Encoded string vector <" << numElements << " of " << capacity
<< ">";
+ return buffer.str();
+ }
+
StringVectorBatch::StringVectorBatch(uint64_t capacity, MemoryPool& pool
): ColumnVectorBatch(capacity, pool),
data(pool, capacity),
diff --git a/c++/test/TestColumnReader.cc b/c++/test/TestColumnReader.cc
index c67229a..9444616 100644
--- a/c++/test/TestColumnReader.cc
+++ b/c++/test/TestColumnReader.cc
@@ -31,48 +31,73 @@
#ifdef __clang__
DIAGNOSTIC_IGNORE("-Winconsistent-missing-override")
+ DIAGNOSTIC_IGNORE("-Wmissing-variable-declarations")
#endif
namespace orc {
+ using ::testing::TestWithParam;
+ using ::testing::Values;
+
+ class MockStripeStreams : public StripeStreams {
+ public:
+ ~MockStripeStreams() override;
+
+ std::unique_ptr<SeekableInputStream> getStream(uint64_t columnId,
+ proto::Stream_Kind kind,
+ bool stream) const override;
+
+ MOCK_CONST_METHOD0(getSelectedColumns,
+
+ const std::vector<bool>()
+
+ );
+ MOCK_CONST_METHOD1(getEncoding, proto::ColumnEncoding(uint64_t)
+ );
+ MOCK_CONST_METHOD3(getStreamProxy, SeekableInputStream
+ *
+ (uint64_t, proto::Stream_Kind, bool));
+ MOCK_CONST_METHOD0(getErrorStream, std::ostream
+ *());
+ MOCK_CONST_METHOD0(getThrowOnHive11DecimalOverflow,
+ bool());
+ MOCK_CONST_METHOD0(getForcedScaleOnHive11Decimal, int32_t()
+ );
+
+ MemoryPool &getMemoryPool() const {
+ return *getDefaultPool();
+ }
+
+ const Timezone &getWriterTimezone() const override {
+ return getTimezoneByName("America/Los_Angeles");
+ }
+ };
-class MockStripeStreams: public StripeStreams {
-public:
- ~MockStripeStreams() override;
- std::unique_ptr<SeekableInputStream> getStream(uint64_t columnId,
- proto::Stream_Kind kind,
- bool stream) const override;
- MOCK_CONST_METHOD0(getSelectedColumns, const std::vector<bool>());
- MOCK_CONST_METHOD1(getEncoding, proto::ColumnEncoding (uint64_t));
- MOCK_CONST_METHOD3(getStreamProxy, SeekableInputStream*
- (uint64_t, proto::Stream_Kind, bool));
- MOCK_CONST_METHOD0(getErrorStream, std::ostream*());
- MOCK_CONST_METHOD0(getThrowOnHive11DecimalOverflow, bool());
- MOCK_CONST_METHOD0(getForcedScaleOnHive11Decimal, int32_t());
-
- MemoryPool& getMemoryPool() const {
- return *getDefaultPool();
+ MockStripeStreams::~MockStripeStreams() {
+ // PASS
}
- const Timezone& getWriterTimezone() const override {
- return getTimezoneByName("America/Los_Angeles");
+ std::unique_ptr<SeekableInputStream>
+ MockStripeStreams::getStream(uint64_t columnId,
+ proto::Stream_Kind kind,
+ bool shouldStream) const {
+ return std::unique_ptr<SeekableInputStream>
+ (getStreamProxy(columnId, kind, shouldStream));
}
-};
-MockStripeStreams::~MockStripeStreams() {
- // PASS
-}
+ bool isNotNull(tm *timeptr) {
+ return timeptr != nullptr;
+ }
-std::unique_ptr<SeekableInputStream>
-MockStripeStreams::getStream(uint64_t columnId,
- proto::Stream_Kind kind,
- bool shouldStream) const {
- return std::unique_ptr < SeekableInputStream >
- (getStreamProxy(columnId, kind, shouldStream));
-}
+ class TestColumnReaderEncoded : public TestWithParam<bool> {
+ virtual void SetUp();
-bool isNotNull(tm *timeptr) {
- return timeptr != nullptr;
-}
+ protected:
+ bool encoded;
+ };
+
+ void TestColumnReaderEncoded::SetUp() {
+ encoded = GetParam();
+ }
TEST(TestColumnReader, testBooleanWithNulls) {
MockStripeStreams streams;
@@ -363,7 +388,7 @@ TEST(TestColumnReader, testIntegerWithNulls) {
}
}
-TEST(TestColumnReader, testDictionaryWithNulls) {
+TEST_P(TestColumnReaderEncoded, testDictionaryWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
@@ -409,32 +434,65 @@ TEST(TestColumnReader, testDictionaryWithNulls) {
std::unique_ptr<ColumnReader> reader =
buildReader(*rowType, streams);
- StringVectorBatch *stringBatch = new StringVectorBatch(1024,
- *getDefaultPool());
- StructVectorBatch batch(1024, *getDefaultPool());
- batch.fields.push_back(stringBatch);
- reader->next(batch, 200, 0);
- ASSERT_EQ(200, batch.numElements);
- ASSERT_EQ(true, !batch.hasNulls);
- ASSERT_EQ(200, stringBatch->numElements);
- ASSERT_EQ(true, stringBatch->hasNulls);
- for (size_t i = 0; i < batch.numElements; ++i) {
- if (i & 4) {
- EXPECT_EQ(0, stringBatch->notNull[i]);
- } else {
- EXPECT_EQ(1, stringBatch->notNull[i]);
- const char* expected = i < 98 ? "ORC" : "Owen";
- ASSERT_EQ(strlen(expected), stringBatch->length[i])
- << "Wrong length at " << i;
- for (size_t letter = 0; letter < strlen(expected); ++letter) {
+
+ if (encoded) {
+ EncodedStringVectorBatch *encodedStringBatch = new
EncodedStringVectorBatch(1024,
+ *getDefaultPool());
+ StructVectorBatch batch(1024, *getDefaultPool());
+ batch.fields.push_back(encodedStringBatch);
+ reader->nextEncoded(batch, 200, 0);
+ ASSERT_EQ(200, batch.numElements);
+ ASSERT_EQ(true, !batch.hasNulls);
+ ASSERT_EQ(200, encodedStringBatch->numElements);
+ ASSERT_EQ(true, encodedStringBatch->hasNulls);
+ for (size_t i = 0; i < batch.numElements; ++i) {
+ if (i & 4) {
+ EXPECT_EQ(0, encodedStringBatch->notNull[i]);
+ } else {
+ EXPECT_EQ(1, encodedStringBatch->notNull[i]);
+ const char* expected = i < 98 ? "ORC" : "Owen";
+ int64_t index = encodedStringBatch->index.data()[i];
+
+ char* actualString;
+ int64_t actualLength;
+ encodedStringBatch->dictionary->getValueByIndex(index, actualString,
actualLength);
+ ASSERT_EQ(strlen(expected), actualLength)
+ << "Wrong length at " << i;
+
+ for (size_t letter = 0; letter < strlen(expected); ++letter) {
+ EXPECT_EQ(expected[letter], actualString[letter])
+ << "Wrong contents at " << i << ", " << letter;
+ }
+ }
+ }
+ } else {
+ StringVectorBatch *stringBatch = new StringVectorBatch(1024,
+ *getDefaultPool());
+ StructVectorBatch batch(1024, *getDefaultPool());
+ batch.fields.push_back(stringBatch);
+ reader->next(batch, 200, 0);
+ ASSERT_EQ(200, batch.numElements);
+ ASSERT_EQ(true, !batch.hasNulls);
+ ASSERT_EQ(200, stringBatch->numElements);
+ ASSERT_EQ(true, stringBatch->hasNulls);
+ for (size_t i = 0; i < batch.numElements; ++i) {
+ if (i & 4) {
+ EXPECT_EQ(0, stringBatch->notNull[i]);
+ } else {
+ EXPECT_EQ(1, stringBatch->notNull[i]);
+ const char* expected = i < 98 ? "ORC" : "Owen";
+ ASSERT_EQ(strlen(expected), stringBatch->length[i])
+ << "Wrong length at " << i;
+ for (size_t letter = 0; letter < strlen(expected); ++letter) {
EXPECT_EQ(expected[letter], stringBatch->data[i][letter])
- << "Wrong contents at " << i << ", " << letter;
+ << "Wrong contents at " << i << ", " << letter;
+ }
}
}
}
}
-TEST(TestColumnReader, testVarcharDictionaryWithNulls) {
+TEST_P(TestColumnReaderEncoded, testVarcharDictionaryWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
@@ -508,31 +566,64 @@ TEST(TestColumnReader, testVarcharDictionaryWithNulls) {
->addStructField("col1", createPrimitiveType(CHAR))
->addStructField("col2", createPrimitiveType(STRING));
+
std::unique_ptr<ColumnReader> reader =
- buildReader(*rowType, streams);
- StructVectorBatch batch(1024, *getDefaultPool());
- StringVectorBatch *stringBatch = new StringVectorBatch(1024,
- *getDefaultPool());
- StringVectorBatch *nullBatch = new StringVectorBatch(1024,
- *getDefaultPool());
- batch.fields.push_back(stringBatch);
- batch.fields.push_back(nullBatch);
- reader->next(batch, 200, 0);
- ASSERT_EQ(200, batch.numElements);
- ASSERT_EQ(true, !batch.hasNulls);
- ASSERT_EQ(200, stringBatch->numElements);
- ASSERT_EQ(true, !stringBatch->hasNulls);
- ASSERT_EQ(200, nullBatch->numElements);
- ASSERT_EQ(true, nullBatch->hasNulls);
- for (size_t i = 0; i < batch.numElements; ++i) {
- EXPECT_EQ(true, stringBatch->notNull[i]);
- EXPECT_EQ(true, !nullBatch->notNull[i]);
- const char* expected = i < 100 ? "Owen" : "ORC";
- ASSERT_EQ(strlen(expected), stringBatch->length[i])
- << "Wrong length at " << i;
- for (size_t letter = 0; letter < strlen(expected); ++letter) {
- EXPECT_EQ(expected[letter], stringBatch->data[i][letter])
- << "Wrong contents at " << i << ", " << letter;
+ buildReader(*rowType, streams);
+ if (encoded) {
+ StructVectorBatch batch(1024, *getDefaultPool());
+ EncodedStringVectorBatch *encodedStringBatch = new
EncodedStringVectorBatch(1024,
+ *getDefaultPool());
+ EncodedStringVectorBatch *nullBatch = new EncodedStringVectorBatch(1024,
+ *getDefaultPool());
+ batch.fields.push_back(encodedStringBatch);
+ batch.fields.push_back(nullBatch);
+ reader->nextEncoded(batch, 200, 0);
+ ASSERT_EQ(200, batch.numElements);
+ ASSERT_EQ(true, !batch.hasNulls);
+ ASSERT_EQ(200, encodedStringBatch->numElements);
+ ASSERT_EQ(true, !encodedStringBatch->hasNulls);
+ ASSERT_EQ(200, nullBatch->numElements);
+ ASSERT_EQ(true, nullBatch->hasNulls);
+ for (size_t i = 0; i < batch.numElements; ++i) {
+ EXPECT_EQ(true, encodedStringBatch->notNull[i]);
+ EXPECT_EQ(true, !nullBatch->notNull[i]);
+ const char* expected = i < 100 ? "Owen" : "ORC";
+ int64_t index = encodedStringBatch->index.data()[i];
+ char* actualString;
+ int64_t actualLength;
+ encodedStringBatch->dictionary->getValueByIndex(index, actualString,
actualLength);
+ ASSERT_EQ(strlen(expected), actualLength)
+ << "Wrong length at " << i;
+ for (size_t letter = 0; letter < strlen(expected); ++letter) {
+ EXPECT_EQ(expected[letter], actualString[letter])
+ << "Wrong contents at " << i << ", " << letter;
+ }
+ }
+ } else {
+ StructVectorBatch batch(1024, *getDefaultPool());
+ StringVectorBatch *stringBatch = new StringVectorBatch(1024,
+ *getDefaultPool());
+ StringVectorBatch *nullBatch = new StringVectorBatch(1024,
+ *getDefaultPool());
+ batch.fields.push_back(stringBatch);
+ batch.fields.push_back(nullBatch);
+ reader->next(batch, 200, 0);
+ ASSERT_EQ(200, batch.numElements);
+ ASSERT_EQ(true, !batch.hasNulls);
+ ASSERT_EQ(200, stringBatch->numElements);
+ ASSERT_EQ(true, !stringBatch->hasNulls);
+ ASSERT_EQ(200, nullBatch->numElements);
+ ASSERT_EQ(true, nullBatch->hasNulls);
+ for (size_t i = 0; i < batch.numElements; ++i) {
+ EXPECT_EQ(true, stringBatch->notNull[i]);
+ EXPECT_EQ(true, !nullBatch->notNull[i]);
+ const char* expected = i < 100 ? "Owen" : "ORC";
+ ASSERT_EQ(strlen(expected), stringBatch->length[i])
+ << "Wrong length at " << i;
+ for (size_t letter = 0; letter < strlen(expected); ++letter) {
+ EXPECT_EQ(expected[letter], stringBatch->data[i][letter])
+ << "Wrong contents at " << i << ", " << letter;
+ }
}
}
}
@@ -899,7 +990,7 @@ TEST(TestColumnReader, testShortBlobError) {
EXPECT_THROW(reader->next(batch, 100, 0), ParseError);
}
-TEST(TestColumnReader, testStringDirectShortBuffer) {
+TEST_P(TestColumnReaderEncoded, testStringDirectShortBuffer) {
MockStripeStreams streams;
// set getSelectedColumns()
@@ -946,7 +1037,11 @@ TEST(TestColumnReader, testStringDirectShortBuffer) {
StringVectorBatch *strings = new StringVectorBatch(25, *getDefaultPool());
batch.fields.push_back(strings);
for (size_t i = 0; i < 4; ++i) {
- reader->next(batch, 25, 0);
+ if (encoded) {
+ reader->nextEncoded(batch, 25, 0);
+ } else {
+ reader->next(batch, 25, 0);
+ }
ASSERT_EQ(25, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(25, strings->numElements);
@@ -959,7 +1054,7 @@ TEST(TestColumnReader, testStringDirectShortBuffer) {
}
}
-TEST(TestColumnReader, testStringDirectShortBufferWithNulls) {
+TEST_P(TestColumnReaderEncoded, testStringDirectShortBufferWithNulls) {
MockStripeStreams streams;
// set getSelectedColumns()
@@ -1009,7 +1104,11 @@ TEST(TestColumnReader,
testStringDirectShortBufferWithNulls) {
batch.fields.push_back(strings);
size_t next = 0;
for (size_t i = 0; i < 8; ++i) {
- reader->next(batch, 64, 0);
+ if (encoded) {
+ reader->nextEncoded(batch, 64, 0);
+ } else {
+ reader->next(batch, 64, 0);
+ }
ASSERT_EQ(64, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(64, strings->numElements);
@@ -1286,7 +1385,7 @@ TEST(TestColumnReader, testStringDirectSkipWithNulls) {
}
}
-TEST(TestColumnReader, testList) {
+TEST_P(TestColumnReaderEncoded, testList) {
MockStripeStreams streams;
// set getSelectedColumns()
@@ -1341,7 +1440,11 @@ TEST(TestColumnReader, testList) {
LongVectorBatch *longs = new LongVectorBatch(512, *getDefaultPool());
batch.fields.push_back(lists);
lists->elements = std::unique_ptr < ColumnVectorBatch > (longs);
- reader->next(batch, 512, 0);
+ if (encoded) {
+ reader->nextEncoded(batch, 512, 0);
+ } else {
+ reader->next(batch, 512, 0);
+ }
ASSERT_EQ(512, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(512, lists->numElements);
@@ -1787,7 +1890,7 @@ TEST(TestColumnReader, testListSkipWithNullsNoData) {
EXPECT_EQ(19, lists->offsets[2]);
}
-TEST(TestColumnReader, testMap) {
+TEST_P(TestColumnReaderEncoded, testMap) {
MockStripeStreams streams;
// set getSelectedColumns()
@@ -1860,7 +1963,11 @@ TEST(TestColumnReader, testMap) {
batch.fields.push_back(maps);
maps->keys = std::unique_ptr < ColumnVectorBatch > (keys);
maps->elements = std::unique_ptr < ColumnVectorBatch > (elements);
- reader->next(batch, 512, 0);
+ if (encoded) {
+ reader->nextEncoded(batch, 512, 0);
+ } else {
+ reader->next(batch, 512, 0);
+ }
ASSERT_EQ(512, batch.numElements);
ASSERT_EQ(true, !batch.hasNulls);
ASSERT_EQ(512, maps->numElements);
@@ -4370,4 +4477,7 @@ TEST(TestColumnReader, testUnionWithManyVariants) {
}
}
+
+INSTANTIATE_TEST_CASE_P(OrcColumnReaderTest, TestColumnReaderEncoded,
Values(true, false));
+
} // namespace orc