Repository: orc Updated Branches: refs/heads/master 03b9ce8de -> ebf89f571
ORC-224: Implement column writers of primitive types Fixes #149 Signed-off-by: Deepak Majeti <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/orc/repo Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/ebf89f57 Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/ebf89f57 Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/ebf89f57 Branch: refs/heads/master Commit: ebf89f571110d4ab47ee4a99bbf15d3a4262c113 Parents: 03b9ce8 Author: Gang Wu <[email protected]> Authored: Tue Aug 8 15:07:06 2017 -0700 Committer: Deepak Majeti <[email protected]> Committed: Tue Nov 7 11:23:46 2017 -0500 ---------------------------------------------------------------------- c++/include/orc/Int128.hh | 17 + c++/include/orc/Vector.hh | 2 + c++/src/ColumnWriter.cc | 1246 ++++++++++++++++++++++++++++++++++++++-- c++/src/ColumnWriter.hh | 2 +- c++/src/Exceptions.cc | 19 + c++/src/Exceptions.hh | 10 + c++/src/Writer.cc | 9 +- c++/test/TestWriter.cc | 611 +++++++++++++++++++- 8 files changed, 1853 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/orc/blob/ebf89f57/c++/include/orc/Int128.hh ---------------------------------------------------------------------- diff --git a/c++/include/orc/Int128.hh b/c++/include/orc/Int128.hh index 14215b4..160f1b0 100644 --- a/c++/include/orc/Int128.hh +++ b/c++/include/orc/Int128.hh @@ -93,6 +93,12 @@ namespace orc { return *this; } + Int128 abs() const { + Int128 value = *this; + value.abs(); + return value; + } + Int128& invert() { lowbits = ~lowbits; highbits = ~highbits; @@ -173,6 +179,17 @@ namespace orc { } /** + * Logical and between two Int128. + * @param right the number to and in + * @return logical and result + */ + Int128 operator&(const Int128 &right) { + Int128 value = *this; + value &= right; + return value; + } + + /** * Shift left by the given number of bits. * Values larger than 2**127 will shift into the sign bit. */ http://git-wip-us.apache.org/repos/asf/orc/blob/ebf89f57/c++/include/orc/Vector.hh ---------------------------------------------------------------------- diff --git a/c++/include/orc/Vector.hh b/c++/include/orc/Vector.hh index f3f1343..65101db 100644 --- a/c++/include/orc/Vector.hh +++ b/c++/include/orc/Vector.hh @@ -216,6 +216,7 @@ namespace orc { */ DataBuffer<int64_t> readScales; friend class Decimal64ColumnReader; + friend class Decimal64ColumnWriter; }; struct Decimal128VectorBatch: public ColumnVectorBatch { @@ -241,6 +242,7 @@ namespace orc { DataBuffer<int64_t> readScales; friend class Decimal128ColumnReader; friend class DecimalHive11ColumnReader; + friend class Decimal128ColumnWriter; }; /** http://git-wip-us.apache.org/repos/asf/orc/blob/ebf89f57/c++/src/ColumnWriter.cc ---------------------------------------------------------------------- diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc index ad18b0c..60878c3 100644 --- a/c++/src/ColumnWriter.cc +++ b/c++/src/ColumnWriter.cc @@ -179,14 +179,16 @@ namespace orc { notNullEncoder->recordPosition(rowIndexPosition.get()); } - void ColumnWriter::resetIndex() { - // clear row index - rowIndex->clear_entry(); - rowIndexEntry->clear_positions(); - rowIndexEntry->clear_statistics(); + void ColumnWriter::reset() { + if (enableIndex) { + // clear row index + rowIndex->clear_entry(); + rowIndexEntry->clear_positions(); + rowIndexEntry->clear_statistics(); - // write current positions - recordPosition(); + // write current positions + recordPosition(); + } } class StructColumnWriter : public ColumnWriter { @@ -222,7 +224,7 @@ namespace orc { virtual void writeIndex( std::vector<proto::Stream> &streams) const override; - virtual void resetIndex() override; + virtual void reset() override; private: std::vector<ColumnWriter *> children; @@ -250,24 +252,26 @@ namespace orc { } void StructColumnWriter::add( - ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues) { + ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { ColumnWriter::add(rowBatch, offset, numValues); - - const StructVectorBatch & structBatch = - dynamic_cast<const StructVectorBatch &>(rowBatch); + const StructVectorBatch* structBatch = + dynamic_cast<const StructVectorBatch *>(&rowBatch); + if (structBatch == nullptr) { + throw InvalidArgument("Failed to cast to StructVectorBatch"); + } for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->add(*structBatch.fields[i], offset, numValues); + children[i]->add(*structBatch->fields[i], offset, numValues); } // update stats bool hasNull = false; - if (!structBatch.hasNulls) { + if (!structBatch->hasNulls) { colIndexStatistics->increase(numValues); } else { - const char* notNull = structBatch.notNull.data() + offset; + const char* notNull = structBatch->notNull.data() + offset; for (uint64_t i = 0; i < numValues; ++i) { if (notNull[i]) { colIndexStatistics->increase(1); @@ -355,11 +359,11 @@ namespace orc { } } - void StructColumnWriter::resetIndex() { - ColumnWriter::resetIndex(); + void StructColumnWriter::reset() { + ColumnWriter::reset(); for (uint32_t i = 0; i < children.size(); ++i) { - children[i]->resetIndex(); + children[i]->reset(); } } @@ -391,11 +395,11 @@ namespace orc { }; IntegerColumnWriter::IntegerColumnWriter( - const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) : - ColumnWriter(type, factory, options), - rleVersion(RleVersion_1) { + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(RleVersion_1) { std::unique_ptr<BufferedOutputStream> dataStream = factory.createStream(proto::Stream_Kind_DATA); rleEncoder = createRleEncoder( @@ -410,31 +414,40 @@ namespace orc { } void IntegerColumnWriter::add( - ColumnVectorBatch& rowBatch, - uint64_t offset, - uint64_t numValues) { + ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { ColumnWriter::add(rowBatch, offset, numValues); - const LongVectorBatch & longBatch = - dynamic_cast<const LongVectorBatch &>(rowBatch); + const LongVectorBatch* longBatch = + dynamic_cast<const LongVectorBatch*>(&rowBatch); + if (longBatch == nullptr) { + throw InvalidArgument("Failed to cast to LongVectorBatch"); + } - const int64_t* data = longBatch.data.data() + offset; - const char* notNull = longBatch.hasNulls ? - longBatch.notNull.data() + offset : nullptr; + const int64_t* data = longBatch->data.data() + offset; + const char* notNull = longBatch->hasNulls ? + longBatch->notNull.data() + offset : nullptr; rleEncoder->add(data, numValues, notNull); // update stats IntegerColumnStatisticsImpl* intStats = dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get()); + if (intStats == nullptr) { + throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl"); + } + + bool hasNull = false; for (uint64_t i = 0; i < numValues; ++i) { if (notNull == nullptr || notNull[i]) { intStats->increase(1); intStats->update(data[i], 1); - } else if (!intStats->hasNull()) { - intStats->setHasNull(true); + } else if (!hasNull) { + hasNull = true; } } + intStats->setHasNull(hasNull); } void IntegerColumnWriter::flush(std::vector<proto::Stream>& streams) { @@ -468,25 +481,1150 @@ namespace orc { rleEncoder->recordPosition(rowIndexPosition.get()); } - std::unique_ptr<ColumnWriter> buildWriter( - const Type& type, - const StreamsFactory& factory, - const WriterOptions& options) { - switch (static_cast<int64_t>(type.getKind())) { - case STRUCT: - return std::unique_ptr<ColumnWriter>( - new StructColumnWriter( - type, - factory, - options)); - case INT: - case LONG: - case SHORT: - return std::unique_ptr<ColumnWriter>( - new IntegerColumnWriter( - type, - factory, - options)); + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + }; + + ByteColumnWriter::ByteColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options) { + std::unique_ptr<BufferedOutputStream> dataStream = + factory.createStream(proto::Stream_Kind_DATA); + byteRleEncoder = createByteRleEncoder(std::move(dataStream)); + + if (enableIndex) { + recordPosition(); + } + } + + void ByteColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch); + if (byteBatch == nullptr) { + throw InvalidArgument("Failed to cast to LongVectorBatch"); + } + + int64_t* data = byteBatch->data.data() + offset; + const char* notNull = byteBatch->hasNulls ? + byteBatch->notNull.data() + offset : nullptr; + + char* byteData = reinterpret_cast<char*>(data); + for (uint64_t i = 0; i < numValues; ++i) { + byteData[i] = static_cast<char>(data[i]); + } + byteRleEncoder->add(byteData, numValues, notNull); + + IntegerColumnStatisticsImpl* intStats = + dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get()); + if (intStats == nullptr) { + throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl"); + } + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { + if (notNull == nullptr || notNull[i]) { + intStats->increase(1); + intStats->update(static_cast<int64_t>(byteData[i]), 1); + } else if (!hasNull) { + hasNull = true; + } + } + intStats->setHasNull(hasNull); + } + + void ByteColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(byteRleEncoder->flush()); + streams.push_back(stream); + } + + uint64_t ByteColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += byteRleEncoder->getBufferSize(); + return size; + } + + void ByteColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void ByteColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + byteRleEncoder->recordPosition(rowIndexPosition.get()); + } + + class BooleanColumnWriter : public ColumnWriter { + public: + BooleanColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> rleEncoder; + }; + + BooleanColumnWriter::BooleanColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options) { + std::unique_ptr<BufferedOutputStream> dataStream = + factory.createStream(proto::Stream_Kind_DATA); + rleEncoder = createBooleanRleEncoder(std::move(dataStream)); + + if (enableIndex) { + recordPosition(); + } + } + + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch); + if (byteBatch == nullptr) { + throw InvalidArgument("Failed to cast to LongVectorBatch"); + } + int64_t* data = byteBatch->data.data() + offset; + const char* notNull = byteBatch->hasNulls ? + byteBatch->notNull.data() + offset : nullptr; + + char* byteData = reinterpret_cast<char*>(data); + for (uint64_t i = 0; i < numValues; ++i) { + byteData[i] = static_cast<char>(data[i]); + } + rleEncoder->add(byteData, numValues, notNull); + + BooleanColumnStatisticsImpl* boolStats = + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get()); + if (boolStats == nullptr) { + throw InvalidArgument("Failed to cast to BooleanColumnStatisticsImpl"); + } + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { + if (notNull == nullptr || notNull[i]) { + boolStats->increase(1); + boolStats->update(byteData[i], 1); + } else if (!hasNull) { + hasNull = true; + } + } + boolStats->setHasNull(hasNull); + } + + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + } + + uint64_t BooleanColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + } + + void BooleanColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void BooleanColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + } + + class DoubleColumnWriter : public ColumnWriter { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + }; + + DoubleColumnWriter::DoubleColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloatType) : + ColumnWriter(type, factory, options), + isFloat(isFloatType), + buffer(*options.getMemoryPool()) { + dataStream.reset(new AppendOnlyBufferedStream( + factory.createStream(proto::Stream_Kind_DATA))); + buffer.resize(isFloat ? 4 : 8); + + if (enableIndex) { + recordPosition(); + } + } + + // Floating point types are stored using IEEE 754 floating point bit layout. + // Float columns use 4 bytes per value and double columns use 8 bytes. + template <typename FLOAT_TYPE, typename INTEGER_TYPE> + inline void encodeFloatNum(FLOAT_TYPE input, char* output) { + INTEGER_TYPE* intBits = reinterpret_cast<INTEGER_TYPE*>(&input); + for (size_t i = 0; i < sizeof(INTEGER_TYPE); ++i) { + output[i] = static_cast<char>(((*intBits) >> (8 * i)) & 0xff); + } + } + + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + const DoubleVectorBatch* dblBatch = + dynamic_cast<const DoubleVectorBatch*>(&rowBatch); + if (dblBatch == nullptr) { + throw InvalidArgument("Failed to cast to DoubleVectorBatch"); + } + + const double* doubleData = dblBatch->data.data() + offset; + const char* notNull = dblBatch->hasNulls ? + dblBatch->notNull.data() + offset : nullptr; + + DoubleColumnStatisticsImpl* doubleStats = + dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get()); + if (doubleStats == nullptr) { + throw InvalidArgument("Failed to cast to DoubleColumnStatisticsImpl"); + } + + size_t bytes = isFloat ? 4 : 8; + char* data = buffer.data(); + bool hasNull = false; + + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull[i]) { + if (isFloat) { + encodeFloatNum<float, int32_t>(static_cast<float>(doubleData[i]), data); + } else { + encodeFloatNum<double, int64_t>(doubleData[i], data); + } + dataStream->write(data, bytes); + + doubleStats->increase(1); + doubleStats->update(doubleData[i]); + } else if (!hasNull) { + hasNull = true; + } + } + doubleStats->setHasNull(hasNull); + } + + void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(dataStream->flush()); + streams.push_back(stream); + } + + uint64_t DoubleColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += dataStream->getSize(); + return size; + } + + void DoubleColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void DoubleColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + dataStream->recordPosition(rowIndexPosition.get()); + } + + class StringColumnWriter : public ColumnWriter { + public: + StringColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + std::unique_ptr<RleEncoder> lengthEncoder; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + RleVersion rleVersion; + }; + + StringColumnWriter::StringColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(RleVersion_1) { + std::unique_ptr<BufferedOutputStream> lengthStream = + factory.createStream(proto::Stream_Kind_LENGTH); + lengthEncoder = createRleEncoder(std::move(lengthStream), + false, + rleVersion, + memPool); + dataStream.reset(new AppendOnlyBufferedStream( + factory.createStream(proto::Stream_Kind_DATA))); + + if (enableIndex) { + recordPosition(); + } + } + + void StringColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + const StringVectorBatch* stringBatch = + dynamic_cast<const StringVectorBatch*>(&rowBatch); + if (stringBatch == nullptr) { + throw InvalidArgument("Failed to cast to StringVectorBatch"); + } + + char *const * data = stringBatch->data.data() + offset; + const int64_t* length = stringBatch->length.data() + offset; + const char* notNull = stringBatch->hasNulls ? + stringBatch->notNull.data() + offset : nullptr; + + lengthEncoder->add(length, numValues, notNull); + + StringColumnStatisticsImpl* strStats = + dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get()); + if (strStats == nullptr) { + throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl"); + } + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull[i]) { + dataStream->write(data[i], static_cast<size_t>(length[i])); + strStats->update(data[i], static_cast<size_t>(length[i])); + strStats->increase(1); + } else if (!hasNull) { + hasNull = true; + } + } + strStats->setHasNull(hasNull); + } + + void StringColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream length; + length.set_kind(proto::Stream_Kind_LENGTH); + length.set_column(static_cast<uint32_t>(columnId)); + length.set_length(lengthEncoder->flush()); + streams.push_back(length); + + proto::Stream data; + data.set_kind(proto::Stream_Kind_DATA); + data.set_column(static_cast<uint32_t>(columnId)); + data.set_length(dataStream->flush()); + streams.push_back(data); + } + + uint64_t StringColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += lengthEncoder->getBufferSize(); + size += dataStream->getSize(); + return size; + } + + void StringColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(rleVersion == RleVersion_1 ? + proto::ColumnEncoding_Kind_DIRECT : + proto::ColumnEncoding_Kind_DIRECT_V2); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void StringColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + dataStream->recordPosition(rowIndexPosition.get()); + lengthEncoder->recordPosition(rowIndexPosition.get()); + } + + class CharColumnWriter : public StringColumnWriter { + public: + CharColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + StringColumnWriter(type, factory, options), + fixedLength(type.getMaximumLength()), + padBuffer(*options.getMemoryPool(), + type.getMaximumLength()) { + // PASS + } + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + private: + uint64_t fixedLength; + DataBuffer<char> padBuffer; + }; + + void CharColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + StringVectorBatch* charsBatch = dynamic_cast<StringVectorBatch*>(&rowBatch); + if (charsBatch == nullptr) { + throw InvalidArgument("Failed to cast to StringVectorBatch"); + } + + char** data = charsBatch->data.data() + offset; + int64_t* length = charsBatch->length.data() + offset; + const char* notNull = charsBatch->hasNulls ? + charsBatch->notNull.data() + offset : nullptr; + + StringColumnStatisticsImpl* strStats = + dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get()); + if (strStats == nullptr) { + throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl"); + } + bool hasNull = false; + + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull[i]) { + char *charData = data[i]; + uint64_t oriLength = static_cast<uint64_t>(length[i]); + if (oriLength < fixedLength) { + memcpy(padBuffer.data(), data[i], oriLength); + memset(padBuffer.data() + oriLength, ' ', fixedLength - oriLength); + charData = padBuffer.data(); + } + length[i] = static_cast<int64_t>(fixedLength); + dataStream->write(charData, fixedLength); + + strStats->update(charData, fixedLength); + strStats->increase(1); + } else if (!hasNull) { + hasNull = true; + } + } + lengthEncoder->add(length, numValues, notNull); + strStats->setHasNull(hasNull); + } + + class VarCharColumnWriter : public StringColumnWriter { + public: + VarCharColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + StringColumnWriter(type, factory, options), + maxLength(type.getMaximumLength()) { + // PASS + } + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + private: + uint64_t maxLength; + }; + + void VarCharColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + StringVectorBatch* charsBatch = dynamic_cast<StringVectorBatch*>(&rowBatch); + if (charsBatch == nullptr) { + throw InvalidArgument("Failed to cast to StringVectorBatch"); + } + + char* const* data = charsBatch->data.data() + offset; + int64_t* length = charsBatch->length.data() + offset; + const char* notNull = charsBatch->hasNulls ? + charsBatch->notNull.data() + offset : nullptr; + + StringColumnStatisticsImpl* strStats = + dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get()); + if (strStats == nullptr) { + throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl"); + } + bool hasNull = false; + + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull[i]) { + if (length[i] > static_cast<int64_t>(maxLength)) { + length[i] = static_cast<int64_t>(maxLength); + } + dataStream->write(data[i], static_cast<size_t>(length[i])); + + strStats->update(data[i], static_cast<size_t>(length[i])); + strStats->increase(1); + } else if (!hasNull) { + hasNull = true; + } + } + lengthEncoder->add(length, numValues, notNull); + strStats->setHasNull(hasNull); + } + + class BinaryColumnWriter : public StringColumnWriter { + public: + BinaryColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + StringColumnWriter(type, factory, options) { + // PASS + } + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + }; + + void BinaryColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + StringVectorBatch* binBatch = dynamic_cast<StringVectorBatch*>(&rowBatch); + if (binBatch == nullptr) { + throw InvalidArgument("Failed to cast to StringVectorBatch"); + } + char** data = binBatch->data.data() + offset; + int64_t* length = binBatch->length.data() + offset; + const char* notNull = binBatch->hasNulls ? + binBatch->notNull.data() + offset : nullptr; + + BinaryColumnStatisticsImpl* binStats = + dynamic_cast<BinaryColumnStatisticsImpl*>(colIndexStatistics.get()); + if (binStats == nullptr) { + throw InvalidArgument("Failed to cast to BinaryColumnStatisticsImpl"); + } + + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { + uint64_t unsignedLength = static_cast<uint64_t>(length[i]); + if (!notNull || notNull[i]) { + dataStream->write(data[i], unsignedLength); + + binStats->update(unsignedLength); + binStats->increase(1); + } else if (!hasNull) { + hasNull = true; + } + } + lengthEncoder->add(length, numValues, notNull); + binStats->setHasNull(hasNull); + } + + class TimestampColumnWriter : public ColumnWriter { + public: + TimestampColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + std::unique_ptr<RleEncoder> secRleEncoder, nanoRleEncoder; + + private: + RleVersion rleVersion; + const Timezone& timezone; + }; + + TimestampColumnWriter::TimestampColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(RleVersion_1), + timezone(getLocalTimezone()){ + std::unique_ptr<BufferedOutputStream> dataStream = + factory.createStream(proto::Stream_Kind_DATA); + std::unique_ptr<BufferedOutputStream> secondaryStream = + factory.createStream(proto::Stream_Kind_SECONDARY); + secRleEncoder = createRleEncoder(std::move(dataStream), + true, + rleVersion, + memPool); + nanoRleEncoder = createRleEncoder(std::move(secondaryStream), + false, + rleVersion, + memPool); + + if (enableIndex) { + recordPosition(); + } + } + + // Because the number of nanoseconds often has a large number of trailing zeros, + // the number has trailing decimal zero digits removed and the last three bits + // are used to record how many zeros were removed. Thus 1000 nanoseconds would + // be serialized as 0x0b and 100000 would be serialized as 0x0d. + static int64_t formatNano(int64_t nanos) { + if (nanos == 0) { + return 0; + } else if (nanos % 100 != 0) { + return (nanos) << 3; + } else { + nanos /= 100; + int64_t trailingZeros = 1; + while (nanos % 10 == 0 && trailingZeros < 7) { + nanos /= 10; + trailingZeros += 1; + } + return (nanos) << 3 | trailingZeros; + } + } + + void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + TimestampVectorBatch* tsBatch = + dynamic_cast<TimestampVectorBatch*>(&rowBatch); + if (tsBatch == nullptr) { + throw InvalidArgument("Failed to cast to TimestampVectorBatch"); + } + + const char* notNull = tsBatch->hasNulls ? + tsBatch->notNull.data() + offset : nullptr; + int64_t *secs = tsBatch->data.data() + offset; + int64_t *nanos = tsBatch->nanoseconds.data() + offset; + + TimestampColumnStatisticsImpl* tsStats = + dynamic_cast<TimestampColumnStatisticsImpl*>(colIndexStatistics.get()); + if (tsStats == nullptr) { + throw InvalidArgument("Failed to cast to TimestampColumnStatisticsImpl"); + } + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { + if (notNull == nullptr || notNull[i]) { + // TimestampVectorBatch already stores data in UTC + int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000; + tsStats->increase(1); + tsStats->update(millsUTC); + + secs[i] -= timezone.getVariant(secs[i]).gmtOffset; + secs[i] -= timezone.getEpoch(); + nanos[i] = formatNano(nanos[i]); + } else if (!hasNull) { + hasNull = true; + } + } + tsStats->setHasNull(hasNull); + + secRleEncoder->add(secs, numValues, notNull); + nanoRleEncoder->add(nanos, numValues, notNull); + } + + void TimestampColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream dataStream; + dataStream.set_kind(proto::Stream_Kind_DATA); + dataStream.set_column(static_cast<uint32_t>(columnId)); + dataStream.set_length(secRleEncoder->flush()); + streams.push_back(dataStream); + + proto::Stream secondaryStream; + secondaryStream.set_kind(proto::Stream_Kind_SECONDARY); + secondaryStream.set_column(static_cast<uint32_t>(columnId)); + secondaryStream.set_length(nanoRleEncoder->flush()); + streams.push_back(secondaryStream); + } + + uint64_t TimestampColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += secRleEncoder->getBufferSize(); + size += nanoRleEncoder->getBufferSize(); + return size; + } + + void TimestampColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(rleVersion == RleVersion_1 ? + proto::ColumnEncoding_Kind_DIRECT : + proto::ColumnEncoding_Kind_DIRECT_V2); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void TimestampColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + secRleEncoder->recordPosition(rowIndexPosition.get()); + nanoRleEncoder->recordPosition(rowIndexPosition.get()); + } + + class DateColumnWriter : public IntegerColumnWriter { + public: + DateColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + }; + + DateColumnWriter::DateColumnWriter( + const Type &type, + const StreamsFactory &factory, + const WriterOptions &options) : + IntegerColumnWriter(type, factory, options) { + // PASS + } + + void DateColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + const LongVectorBatch* longBatch = + dynamic_cast<const LongVectorBatch*>(&rowBatch); + if (longBatch == nullptr) { + throw InvalidArgument("Failed to cast to LongVectorBatch"); + } + + const int64_t* data = longBatch->data.data() + offset; + const char* notNull = longBatch->hasNulls ? + longBatch->notNull.data() + offset : nullptr; + + rleEncoder->add(data, numValues, notNull); + + DateColumnStatisticsImpl* dateStats = + dynamic_cast<DateColumnStatisticsImpl*>(colIndexStatistics.get()); + if (dateStats == nullptr) { + throw InvalidArgument("Failed to cast to DateColumnStatisticsImpl"); + } + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull[i]) { + dateStats->increase(1); + dateStats->update(static_cast<int32_t>(data[i])); + } else if (!hasNull) { + hasNull = true; + } + } + dateStats->setHasNull(hasNull); + } + + class Decimal64ColumnWriter : public ColumnWriter { + public: + static const uint32_t MAX_PRECISION_64 = 18; + static const uint32_t MAX_PRECISION_128 = 38; + + Decimal64ColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + RleVersion rleVersion; + uint64_t precision; + uint64_t scale; + std::unique_ptr<AppendOnlyBufferedStream> valueStream; + std::unique_ptr<RleEncoder> scaleEncoder; + + private: + char buffer[8]; + }; + + Decimal64ColumnWriter::Decimal64ColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(RleVersion_1), + precision(type.getPrecision()), + scale(type.getScale()) { + valueStream.reset(new AppendOnlyBufferedStream( + factory.createStream(proto::Stream_Kind_DATA))); + std::unique_ptr<BufferedOutputStream> scaleStream = + factory.createStream(proto::Stream_Kind_SECONDARY); + scaleEncoder = createRleEncoder(std::move(scaleStream), + true, + rleVersion, + memPool); + + if (enableIndex) { + recordPosition(); + } + } + + void Decimal64ColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + const Decimal64VectorBatch* decBatch = + dynamic_cast<const Decimal64VectorBatch*>(&rowBatch); + if (decBatch == nullptr) { + throw InvalidArgument("Failed to cast to Decimal64VectorBatch"); + } + + const char* notNull = decBatch->hasNulls ? + decBatch->notNull.data() + offset : nullptr; + const int64_t* values = decBatch->values.data() + offset; + DecimalColumnStatisticsImpl* decStats = + dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get()); + if (decStats == nullptr) { + throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl"); + } + bool hasNull = false; + + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull[i]) { + int64_t val = zigZag(values[i]); + char* data = buffer; + while (true) { + if ((val & ~0x7f) == 0) { + *(data++) = (static_cast<char>(val)); + break; + } else { + *(data++) = static_cast<char>(0x80 | (val & 0x7f)); + // cast val to unsigned so as to force 0-fill right shift + val = (static_cast<uint64_t>(val) >> 7); + } + } + valueStream->write(buffer, static_cast<size_t>(data - buffer)); + + decStats->increase(1); + decStats->update(Decimal(values[i], static_cast<int32_t>(scale))); + } else if (!hasNull) { + hasNull = true; + } + } + std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale)); + scaleEncoder->add(scales.data(), numValues, notNull); + + decStats->setHasNull(hasNull); + } + + void Decimal64ColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream dataStream; + dataStream.set_kind(proto::Stream_Kind_DATA); + dataStream.set_column(static_cast<uint32_t>(columnId)); + dataStream.set_length(valueStream->flush()); + streams.push_back(dataStream); + + proto::Stream secondaryStream; + secondaryStream.set_kind(proto::Stream_Kind_SECONDARY); + secondaryStream.set_column(static_cast<uint32_t>(columnId)); + secondaryStream.set_length(scaleEncoder->flush()); + streams.push_back(secondaryStream); + } + + uint64_t Decimal64ColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += valueStream->getSize(); + size += scaleEncoder->getBufferSize(); + return size; + } + + void Decimal64ColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void Decimal64ColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + valueStream->recordPosition(rowIndexPosition.get()); + scaleEncoder->recordPosition(rowIndexPosition.get()); + } + + class Decimal128ColumnWriter : public Decimal64ColumnWriter { + public: + Decimal128ColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + private: + char buffer[16]; + }; + + Decimal128ColumnWriter::Decimal128ColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + Decimal64ColumnWriter(type, factory, options) { + // PASS + } + + // Zigzag encoding moves the sign bit to the least significant bit using the + // expression (val « 1) ^ (val » 63) and derives its name from the fact that + // positive and negative numbers alternate once encoded. + Int128 zigZagInt128(const Int128& value) { + bool isNegative = value < 0; + Int128 val = value.abs(); + val <<= 1; + if (isNegative) { + val -= 1; + } + return val; + } + + void Decimal128ColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + const Decimal128VectorBatch* decBatch = + dynamic_cast<const Decimal128VectorBatch*>(&rowBatch); + if (decBatch == nullptr) { + throw InvalidArgument("Failed to cast to Decimal128VectorBatch"); + } + + const char* notNull = decBatch->hasNulls ? + decBatch->notNull.data() + offset : nullptr; + const Int128* values = decBatch->values.data() + offset; + DecimalColumnStatisticsImpl* decStats = + dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get()); + if (decStats == nullptr) { + throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl"); + } + bool hasNull = false; + + // The current encoding of decimal columns stores the integer representation + // of the value as an unbounded length zigzag encoded base 128 varint. + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull[i]) { + Int128 val = zigZagInt128(values[i]); + char* data = buffer; + while (true) { + if ((val & ~0x7f) == 0) { + *(data++) = (static_cast<char>(val.getLowBits())); + break; + } else { + *(data++) = static_cast<char>(0x80 | (val.getLowBits() & 0x7f)); + val >>= 7; + } + } + valueStream->write(buffer, static_cast<size_t>(data - buffer)); + + decStats->increase(1); + decStats->update(Decimal(values[i], static_cast<int32_t>(scale))); + } else if (!hasNull) { + hasNull = true; + } + } + std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale)); + scaleEncoder->add(scales.data(), numValues, notNull); + + decStats->setHasNull(hasNull); + } + + std::unique_ptr<ColumnWriter> buildWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) { + switch (static_cast<int64_t>(type.getKind())) { + case STRUCT: + return std::unique_ptr<ColumnWriter>( + new StructColumnWriter( + type, + factory, + options)); + case INT: + case LONG: + case SHORT: + return std::unique_ptr<ColumnWriter>( + new IntegerColumnWriter( + type, + factory, + options)); + case BYTE: + return std::unique_ptr<ColumnWriter>( + new ByteColumnWriter( + type, + factory, + options)); + case BOOLEAN: + return std::unique_ptr<ColumnWriter>( + new BooleanColumnWriter( + type, + factory, + options)); + case DOUBLE: + return std::unique_ptr<ColumnWriter>( + new DoubleColumnWriter( + type, + factory, + options, + false)); + case FLOAT: + return std::unique_ptr<ColumnWriter>( + new DoubleColumnWriter( + type, + factory, + options, + true)); + case BINARY: + return std::unique_ptr<ColumnWriter>( + new BinaryColumnWriter( + type, + factory, + options)); + case STRING: + return std::unique_ptr<ColumnWriter>( + new StringColumnWriter( + type, + factory, + options)); + case CHAR: + return std::unique_ptr<ColumnWriter>( + new CharColumnWriter( + type, + factory, + options)); + case VARCHAR: + return std::unique_ptr<ColumnWriter>( + new VarCharColumnWriter( + type, + factory, + options)); + case DATE: + return std::unique_ptr<ColumnWriter>( + new DateColumnWriter( + type, + factory, + options)); + case TIMESTAMP: + return std::unique_ptr<ColumnWriter>( + new TimestampColumnWriter( + type, + factory, + options)); + case DECIMAL: + if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_64) { + return std::unique_ptr<ColumnWriter>( + new Decimal64ColumnWriter( + type, + factory, + options)); + } else if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_128) { + return std::unique_ptr<ColumnWriter>( + new Decimal128ColumnWriter( + type, + factory, + options)); + } else { + throw NotImplementedYet("Decimal precision more than 38 is not " + "supported"); + } default: throw NotImplementedYet("Type is not supported yet for creating " "ColumnWriter."); http://git-wip-us.apache.org/repos/asf/orc/blob/ebf89f57/c++/src/ColumnWriter.hh ---------------------------------------------------------------------- diff --git a/c++/src/ColumnWriter.hh b/c++/src/ColumnWriter.hh index 774f9b5..e6f076d 100644 --- a/c++/src/ColumnWriter.hh +++ b/c++/src/ColumnWriter.hh @@ -163,7 +163,7 @@ namespace orc { /** * Reset positions for index */ - virtual void resetIndex(); + virtual void reset(); protected: /** http://git-wip-us.apache.org/repos/asf/orc/blob/ebf89f57/c++/src/Exceptions.cc ---------------------------------------------------------------------- diff --git a/c++/src/Exceptions.cc b/c++/src/Exceptions.cc index ae0e3d1..ef6c0c2 100644 --- a/c++/src/Exceptions.cc +++ b/c++/src/Exceptions.cc @@ -56,4 +56,23 @@ namespace orc { ParseError::~ParseError() noexcept { // PASS } + + InvalidArgument::InvalidArgument(const std::string& what_arg + ): runtime_error(what_arg) { + // PASS + } + + InvalidArgument::InvalidArgument(const char* what_arg + ): runtime_error(what_arg) { + // PASS + } + + InvalidArgument::InvalidArgument(const InvalidArgument& error + ): runtime_error(error) { + // PASS + } + + InvalidArgument::~InvalidArgument() noexcept { + // PASS + } } http://git-wip-us.apache.org/repos/asf/orc/blob/ebf89f57/c++/src/Exceptions.hh ---------------------------------------------------------------------- diff --git a/c++/src/Exceptions.hh b/c++/src/Exceptions.hh index 4706085..34b7818 100644 --- a/c++/src/Exceptions.hh +++ b/c++/src/Exceptions.hh @@ -45,6 +45,16 @@ namespace orc { private: ParseError& operator=(const ParseError&); }; + + class InvalidArgument: public std::runtime_error { + public: + explicit InvalidArgument(const std::string& what_arg); + explicit InvalidArgument(const char* what_arg); + virtual ~InvalidArgument() noexcept; + InvalidArgument(const InvalidArgument&); + private: + InvalidArgument& operator=(const InvalidArgument&); + }; } #endif http://git-wip-us.apache.org/repos/asf/orc/blob/ebf89f57/c++/src/Writer.cc ---------------------------------------------------------------------- diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc index a4ae090..22f5750 100644 --- a/c++/src/Writer.cc +++ b/c++/src/Writer.cc @@ -369,11 +369,6 @@ namespace orc { // write streams like PRESENT, DATA, etc. columnWriter->flush(streams); - // only until all streams are flushed can we reset positions - if (options.getEnableIndex()) { - columnWriter->resetIndex(); - } - // generate and write stripe footer proto::StripeFooter stripeFooter; for (uint32_t i = 0; i < streams.size(); ++i) { @@ -426,6 +421,8 @@ namespace orc { currentOffset = currentOffset + indexLength + dataLength + footerLength; totalRows += stripeRows; + columnWriter->reset(); + initStripe(); } @@ -568,7 +565,7 @@ namespace orc { std::unique_ptr<Writer> createWriter( const Type& type, OutputStream* stream, - const WriterOptions& options) { + const WriterOptions& options) { return std::unique_ptr<Writer>( new WriterImpl( type, http://git-wip-us.apache.org/repos/asf/orc/blob/ebf89f57/c++/test/TestWriter.cc ---------------------------------------------------------------------- diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc index 3df4626..d401ca3 100644 --- a/c++/test/TestWriter.cc +++ b/c++/test/TestWriter.cc @@ -31,7 +31,7 @@ namespace orc { - const int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024; // 10M + const int DEFAULT_MEM_STREAM_SIZE = 100 * 1024 * 1024; // 100M std::unique_ptr<Writer> createWriter( uint64_t stripeSize, @@ -213,5 +213,612 @@ namespace orc { } EXPECT_FALSE(rowReader->next(*batch)); } -} + TEST(Writer, writeStringAndBinaryColumn) { + MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); + MemoryPool * pool = getDefaultPool(); + std::unique_ptr<Type> type(Type::buildTypeFromString( + "struct<col1:string,col2:binary>")); + + uint64_t stripeSize = 1024; // 1K + uint64_t compressionBlockSize = 1024; // 1k + + char dataBuffer[327675]; + uint64_t offset = 0; + + std::unique_ptr<Writer> writer = createWriter(stripeSize, + compressionBlockSize, + CompressionKind_ZLIB, + *type, + pool, + &memStream); + std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(65535); + StructVectorBatch * structBatch = + dynamic_cast<StructVectorBatch *>(batch.get()); + StringVectorBatch * strBatch = + dynamic_cast<StringVectorBatch *>(structBatch->fields[0]); + StringVectorBatch * binBatch = + dynamic_cast<StringVectorBatch *>(structBatch->fields[1]); + + for (uint64_t i = 0; i < 65535; ++i) { + std::ostringstream os; + os << i; + strBatch->data[i] = dataBuffer + offset; + strBatch->length[i] = static_cast<int64_t>(os.str().size()); + binBatch->data[i] = dataBuffer + offset; + binBatch->length[i] = static_cast<int64_t>(os.str().size()); + memcpy(dataBuffer + offset, os.str().c_str(), os.str().size()); + offset += os.str().size(); + } + + structBatch->numElements = 65535; + strBatch->numElements = 65535; + binBatch->numElements = 65535; + + writer->add(*batch); + writer->close(); + + std::unique_ptr<InputStream> inStream( + new MemoryInputStream (memStream.getData(), + memStream.getLength())); + std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); + std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); + EXPECT_EQ(65535, reader->getNumberOfRows()); + + batch = rowReader->createRowBatch(65535); + EXPECT_EQ(true, rowReader->next(*batch)); + EXPECT_EQ(65535, batch->numElements); + + for (uint64_t i = 0; i < 65535; ++i) { + structBatch = dynamic_cast<StructVectorBatch *>(batch.get()); + strBatch = dynamic_cast<StringVectorBatch *>(structBatch->fields[0]); + binBatch = dynamic_cast<StringVectorBatch *>(structBatch->fields[1]); + std::string str( + strBatch->data[i], + static_cast<size_t>(strBatch->length[i])); + std::string bin( + binBatch->data[i], + static_cast<size_t>(binBatch->length[i])); + EXPECT_EQ(i, static_cast<uint64_t>(atoi(str.c_str()))); + EXPECT_EQ(i, static_cast<uint64_t>(atoi(bin.c_str()))); + } + + EXPECT_EQ(false, rowReader->next(*batch)); + } + + TEST(Writer, writeFloatAndDoubleColumn) { + MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); + MemoryPool * pool = getDefaultPool(); + std::unique_ptr<Type> type(Type::buildTypeFromString( + "struct<col1:double,col2:float>")); + + uint64_t stripeSize = 16 * 1024; + uint64_t compressionBlockSize = 1024; + uint64_t rowCount = 655350; + + double data[655350]; + for (uint64_t i = 0; i < rowCount; ++i) { + data[i] = 100000 * (std::rand() * 1.0 / RAND_MAX); + } + + std::unique_ptr<Writer> writer = createWriter(stripeSize, + compressionBlockSize, + CompressionKind_ZLIB, + *type, + pool, + &memStream); + std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); + StructVectorBatch * structBatch = + dynamic_cast<StructVectorBatch *>(batch.get()); + DoubleVectorBatch * doubleBatch = + dynamic_cast<DoubleVectorBatch *>(structBatch->fields[0]); + DoubleVectorBatch * floatBatch = + dynamic_cast<DoubleVectorBatch *>(structBatch->fields[1]); + + for (uint64_t i = 0; i < rowCount; ++i) { + doubleBatch->data[i] = data[i]; + floatBatch->data[i] = data[i]; + } + + structBatch->numElements = rowCount; + doubleBatch->numElements = rowCount; + floatBatch->numElements = rowCount; + + writer->add(*batch); + writer->close(); + + std::unique_ptr<InputStream> inStream( + new MemoryInputStream (memStream.getData(), memStream.getLength())); + std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); + std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); + EXPECT_EQ(rowCount, reader->getNumberOfRows()); + + batch = rowReader->createRowBatch(rowCount); + EXPECT_EQ(true, rowReader->next(*batch)); + EXPECT_EQ(rowCount, batch->numElements); + + structBatch = dynamic_cast<StructVectorBatch *>(batch.get()); + doubleBatch = dynamic_cast<DoubleVectorBatch *>(structBatch->fields[0]); + floatBatch = dynamic_cast<DoubleVectorBatch *>(structBatch->fields[1]); + for (uint64_t i = 0; i < rowCount; ++i) { + EXPECT_TRUE(std::abs(data[i] - doubleBatch->data[i]) < 0.000001); + EXPECT_TRUE(std::abs(static_cast<float>(data[i]) - + static_cast<float>(floatBatch->data[i])) < 0.000001f); + } + EXPECT_EQ(false, rowReader->next(*batch)); + } + + TEST(Writer, writeShortIntLong) { + MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); + MemoryPool * pool = getDefaultPool(); + std::unique_ptr<Type> type(Type::buildTypeFromString( + "struct<col1:smallint,col2:int,col3:bigint>")); + + uint64_t stripeSize = 16 * 1024; + uint64_t compressionBlockSize = 1024; + uint64_t rowCount = 65535; + + std::unique_ptr<Writer> writer = createWriter(stripeSize, + compressionBlockSize, + CompressionKind_ZLIB, + *type, + pool, + &memStream); + std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); + StructVectorBatch * structBatch = + dynamic_cast<StructVectorBatch *>(batch.get()); + LongVectorBatch * smallIntBatch = + dynamic_cast<LongVectorBatch *>(structBatch->fields[0]); + LongVectorBatch * intBatch = + dynamic_cast<LongVectorBatch *>(structBatch->fields[1]); + LongVectorBatch * bigIntBatch = + dynamic_cast<LongVectorBatch *>(structBatch->fields[2]); + + for (uint64_t i = 0; i < rowCount; ++i) { + smallIntBatch->data[i] = static_cast<int16_t>(i); + intBatch->data[i] = static_cast<int32_t>(i); + bigIntBatch->data[i] = static_cast<int64_t>(i); + } + structBatch->numElements = rowCount; + smallIntBatch->numElements = rowCount; + intBatch->numElements = rowCount; + bigIntBatch->numElements = rowCount; + + writer->add(*batch); + writer->close(); + + std::unique_ptr<InputStream> inStream( + new MemoryInputStream (memStream.getData(), memStream.getLength())); + std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); + std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); + EXPECT_EQ(rowCount, reader->getNumberOfRows()); + + batch = rowReader->createRowBatch(rowCount); + EXPECT_EQ(true, rowReader->next(*batch)); + + structBatch = dynamic_cast<StructVectorBatch *>(batch.get()); + smallIntBatch = dynamic_cast<LongVectorBatch *>(structBatch->fields[0]); + intBatch = dynamic_cast<LongVectorBatch *>(structBatch->fields[1]); + bigIntBatch = dynamic_cast<LongVectorBatch *>(structBatch->fields[2]); + for (uint64_t i = 0; i < rowCount; ++i) { + EXPECT_EQ(static_cast<int16_t>(i), smallIntBatch->data[i]); + EXPECT_EQ(static_cast<int32_t>(i), intBatch->data[i]); + EXPECT_EQ(static_cast<int64_t>(i), bigIntBatch->data[i]); + } + } + + TEST(Writer, writeTinyint) { + MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); + MemoryPool * pool = getDefaultPool(); + std::unique_ptr<Type> type(Type::buildTypeFromString( + "struct<col1:tinyint>")); + + uint64_t stripeSize = 16 * 1024; + uint64_t compressionBlockSize = 1024; + uint64_t rowCount = 65535; + + std::unique_ptr<Writer> writer = createWriter(stripeSize, + compressionBlockSize, + CompressionKind_ZLIB, + *type, + pool, + &memStream); + std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); + StructVectorBatch * structBatch = + dynamic_cast<StructVectorBatch *>(batch.get()); + LongVectorBatch * byteBatch = + dynamic_cast<LongVectorBatch *>(structBatch->fields[0]); + + for (uint64_t i = 0; i < rowCount; ++i) { + byteBatch->data[i] = static_cast<int8_t>(i); + } + structBatch->numElements = rowCount; + byteBatch->numElements = rowCount; + + writer->add(*batch); + writer->close(); + + std::unique_ptr<InputStream> inStream( + new MemoryInputStream (memStream.getData(), memStream.getLength())); + std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); + std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); + EXPECT_EQ(rowCount, reader->getNumberOfRows()); + + batch = rowReader->createRowBatch(rowCount); + EXPECT_EQ(true, rowReader->next(*batch)); + + structBatch = dynamic_cast<StructVectorBatch *>(batch.get()); + byteBatch = dynamic_cast<LongVectorBatch *>(structBatch->fields[0]); + for (uint64_t i = 0; i < rowCount; ++i) { + EXPECT_EQ(static_cast<int8_t>(i), static_cast<int8_t>(byteBatch->data[i])); + } + } + + TEST(Writer, writeBooleanColumn) { + MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); + MemoryPool* pool = getDefaultPool(); + std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:boolean>")); + + uint64_t stripeSize = 16 * 1024; + uint64_t compressionBlockSize = 1024; + uint64_t rowCount = 65535; + + std::unique_ptr<Writer> writer = createWriter(stripeSize, + compressionBlockSize, + CompressionKind_ZLIB, + *type, + pool, + &memStream); + std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); + StructVectorBatch * structBatch = + dynamic_cast<StructVectorBatch *>(batch.get()); + LongVectorBatch * byteBatch = + dynamic_cast<LongVectorBatch *>(structBatch->fields[0]); + + for (uint64_t i = 0; i < rowCount; ++i) { + byteBatch->data[i] = (i % 3) == 0; + } + structBatch->numElements = rowCount; + byteBatch->numElements = rowCount; + + writer->add(*batch); + writer->close(); + + std::unique_ptr<InputStream> inStream( + new MemoryInputStream (memStream.getData(), memStream.getLength())); + std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); + std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); + EXPECT_EQ(rowCount, reader->getNumberOfRows()); + + batch = rowReader->createRowBatch(rowCount); + EXPECT_EQ(true, rowReader->next(*batch)); + + structBatch = dynamic_cast<StructVectorBatch *>(batch.get()); + byteBatch = dynamic_cast<LongVectorBatch *>(structBatch->fields[0]); + for (uint64_t i = 0; i < rowCount; ++i) { + EXPECT_EQ((i % 3) == 0, byteBatch->data[i]); + } + } + + TEST(Writer, writeDate) { + MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); + MemoryPool* pool = getDefaultPool(); + std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:date>")); + + uint64_t stripeSize = 16 * 1024; + uint64_t compressionBlockSize = 1024; + uint64_t rowCount = 1024; + + std::unique_ptr<Writer> writer = createWriter(stripeSize, + compressionBlockSize, + CompressionKind_ZLIB, + *type, + pool, + &memStream); + + std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); + StructVectorBatch * structBatch = + dynamic_cast<StructVectorBatch *>(batch.get()); + LongVectorBatch * longBatch = + dynamic_cast<LongVectorBatch *>(structBatch->fields[0]); + + for (uint64_t i = 0; i < rowCount; ++i) { + longBatch->data[i] = static_cast<int32_t>(i); + } + structBatch->numElements = rowCount; + longBatch->numElements = rowCount; + + writer->add(*batch); + writer->close(); + + std::unique_ptr<InputStream> inStream( + new MemoryInputStream (memStream.getData(), memStream.getLength())); + std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); + std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); + EXPECT_EQ(rowCount, reader->getNumberOfRows()); + + batch = rowReader->createRowBatch(rowCount); + EXPECT_EQ(true, rowReader->next(*batch)); + + structBatch = dynamic_cast<StructVectorBatch *>(batch.get()); + longBatch = dynamic_cast<LongVectorBatch *>(structBatch->fields[0]); + for (uint64_t i = 0; i < rowCount; ++i) { + EXPECT_EQ(static_cast<int32_t>(i), longBatch->data[i]); + } + } + + TEST(Writer, writeTimestamp) { + MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); + MemoryPool* pool = getDefaultPool(); + std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:timestamp>")); + + uint64_t stripeSize = 16 * 1024; + uint64_t compressionBlockSize = 1024; + uint64_t rowCount = 1024; + + std::unique_ptr<Writer> writer = createWriter(stripeSize, + compressionBlockSize, + CompressionKind_ZLIB, + *type, + pool, + &memStream); + std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); + StructVectorBatch * structBatch = + dynamic_cast<StructVectorBatch *>(batch.get()); + TimestampVectorBatch * tsBatch = + dynamic_cast<TimestampVectorBatch *>(structBatch->fields[0]); + + std::vector<std::time_t> times(rowCount); + for (uint64_t i = 0; i < rowCount; ++i) { + time_t currTime = std::time(nullptr); + times[i] = static_cast<int64_t>(currTime) - static_cast<int64_t >(i * 60); + tsBatch->data[i] = times[i]; + tsBatch->nanoseconds[i] = static_cast<int64_t>(i * 1000); + } + structBatch->numElements = rowCount; + tsBatch->numElements = rowCount; + + writer->add(*batch); + writer->close(); + + std::unique_ptr<InputStream> inStream( + new MemoryInputStream (memStream.getData(), memStream.getLength())); + std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); + std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); + EXPECT_EQ(rowCount, reader->getNumberOfRows()); + + batch = rowReader->createRowBatch(rowCount); + EXPECT_EQ(true, rowReader->next(*batch)); + + structBatch = dynamic_cast<StructVectorBatch *>(batch.get()); + tsBatch = dynamic_cast<TimestampVectorBatch *>(structBatch->fields[0]); + for (uint64_t i = 0; i < rowCount; ++i) { + EXPECT_EQ(times[i], tsBatch->data[i]); + EXPECT_EQ(i * 1000, tsBatch->nanoseconds[i]); + } + } + + TEST(Writer, writeCharAndVarcharColumn) { + MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); + MemoryPool * pool = getDefaultPool(); + std::unique_ptr<Type> type(Type::buildTypeFromString( + "struct<col1:char(3),col2:varchar(4)>")); + + uint64_t stripeSize = 1024; + uint64_t compressionBlockSize = 1024; + uint64_t rowCount = 65535; + + char dataBuffer[327675]; + uint64_t offset = 0; + + std::unique_ptr<Writer> writer = createWriter(stripeSize, + compressionBlockSize, + CompressionKind_ZLIB, + *type, + pool, + &memStream); + + std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); + StructVectorBatch * structBatch = + dynamic_cast<StructVectorBatch *>(batch.get()); + StringVectorBatch * charBatch = + dynamic_cast<StringVectorBatch *>(structBatch->fields[0]); + StringVectorBatch * varcharBatch = + dynamic_cast<StringVectorBatch *>(structBatch->fields[1]); + + for (uint64_t i = 0; i < rowCount; ++i) { + std::ostringstream os; + os << i; + charBatch->data[i] = dataBuffer + offset; + charBatch->length[i] = static_cast<int64_t>(os.str().size()); + + varcharBatch->data[i] = charBatch->data[i]; + varcharBatch->length[i] = charBatch->length[i]; + + memcpy(dataBuffer + offset, os.str().c_str(), os.str().size()); + offset += os.str().size(); + } + + structBatch->numElements = rowCount; + charBatch->numElements = rowCount; + varcharBatch->numElements = rowCount; + + writer->add(*batch); + writer->close(); + + std::unique_ptr<InputStream> inStream( + new MemoryInputStream (memStream.getData(), memStream.getLength())); + std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); + std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); + EXPECT_EQ(rowCount, reader->getNumberOfRows()); + + batch = rowReader->createRowBatch(rowCount); + EXPECT_EQ(true, rowReader->next(*batch)); + EXPECT_EQ(rowCount, batch->numElements); + + for (uint64_t i = 0; i < rowCount; ++i) { + structBatch = dynamic_cast<StructVectorBatch *>(batch.get()); + charBatch = dynamic_cast<StringVectorBatch *>(structBatch->fields[0]); + varcharBatch = dynamic_cast<StringVectorBatch *>(structBatch->fields[1]); + + EXPECT_EQ(3, charBatch->length[i]); + EXPECT_FALSE(varcharBatch->length[i] > 4); + + // test char data + std::string charsRead( + charBatch->data[i], + static_cast<size_t>(charBatch->length[i])); + + std::ostringstream os; + os << i; + std::string charsExpected = os.str().substr(0, 3); + while (charsExpected.length() < 3) { + charsExpected += ' '; + } + EXPECT_EQ(charsExpected, charsRead); + + // test varchar data + std::string varcharRead( + varcharBatch->data[i], + static_cast<size_t>(varcharBatch->length[i])); + std::string varcharExpected = os.str().substr(0, 4); + EXPECT_EQ(varcharRead, varcharExpected); + } + + EXPECT_EQ(false, rowReader->next(*batch)); + } + + TEST(Writer, writeDecimal64Column) { + MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); + MemoryPool* pool = getDefaultPool(); + std::unique_ptr<Type> type(Type::buildTypeFromString( + "struct<col1:decimal(16,5)>")); + + uint64_t stripeSize = 16 * 1024; // 16K + uint64_t compressionBlockSize = 1024; // 1k + uint64_t rowCount = 1024; + + std::unique_ptr<Writer> writer = createWriter(stripeSize, + compressionBlockSize, + CompressionKind_ZLIB, + *type, + pool, + &memStream); + std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); + StructVectorBatch * structBatch = + dynamic_cast<StructVectorBatch *>(batch.get()); + Decimal64VectorBatch * decBatch = + dynamic_cast<Decimal64VectorBatch *>(structBatch->fields[0]); + + // write positive decimals + for (uint64_t i = 0; i < rowCount; ++i) { + decBatch->values[i] = static_cast<int64_t>(i + 10000); + } + structBatch->numElements = decBatch->numElements = rowCount; + writer->add(*batch); + + // write negative decimals + for (uint64_t i = 0; i < rowCount; ++i) { + decBatch->values[i] = static_cast<int64_t>(i - 10000); + } + structBatch->numElements = decBatch->numElements = rowCount; + writer->add(*batch); + + writer->close(); + + std::unique_ptr<InputStream> inStream( + new MemoryInputStream (memStream.getData(), memStream.getLength())); + std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); + std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); + EXPECT_EQ(rowCount * 2, reader->getNumberOfRows()); + + // test reading positive decimals + batch = rowReader->createRowBatch(rowCount); + EXPECT_EQ(true, rowReader->next(*batch)); + structBatch = dynamic_cast<StructVectorBatch *>(batch.get()); + decBatch = dynamic_cast<Decimal64VectorBatch *>(structBatch->fields[0]); + for (uint64_t i = 0; i < rowCount; ++i) { + EXPECT_EQ(static_cast<int64_t>(i + 10000), decBatch->values[i]); + } + + // test reading negative decimals + EXPECT_EQ(true, rowReader->next(*batch)); + structBatch = dynamic_cast<StructVectorBatch *>(batch.get()); + decBatch = dynamic_cast<Decimal64VectorBatch *>(structBatch->fields[0]); + for (uint64_t i = 0; i < rowCount; ++i) { + EXPECT_EQ(static_cast<int64_t>(i - 10000), decBatch->values[i]); + } + } + + TEST(Writer, writeDecimal128Column) { + MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); + MemoryPool* pool = getDefaultPool(); + std::unique_ptr<Type> type(Type::buildTypeFromString( + "struct<col1:decimal(30,10)>")); + + uint64_t stripeSize = 16 * 1024; + uint64_t compressionBlockSize = 1024; + uint64_t rowCount = 1024; + + std::unique_ptr<Writer> writer = createWriter(stripeSize, + compressionBlockSize, + CompressionKind_ZLIB, + *type, + pool, + &memStream); + std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount); + StructVectorBatch * structBatch = + dynamic_cast<StructVectorBatch *>(batch.get()); + Decimal128VectorBatch * decBatch = + dynamic_cast<Decimal128VectorBatch *>(structBatch->fields[0]); + + // write positive decimals + std::string base = "1" + std::string(1, '0'); + for (uint64_t i = 0; i < rowCount; ++i) { + std::ostringstream os; + os << i; + decBatch->values[i] = Int128(base + os.str()); + } + structBatch->numElements = decBatch->numElements = rowCount; + writer->add(*batch); + + // write negative decimals + std::string nbase = "-" + base; + for (uint64_t i = 0; i < rowCount; ++i) { + std::ostringstream os; + os << i; + decBatch->values[i] = Int128(nbase + os.str()); + } + structBatch->numElements = rowCount; + decBatch->numElements = rowCount; + writer->add(*batch); + writer->close(); + + std::unique_ptr<InputStream> inStream( + new MemoryInputStream (memStream.getData(), memStream.getLength())); + std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream)); + std::unique_ptr<RowReader> rowReader = createRowReader(reader.get()); + EXPECT_EQ(rowCount * 2, reader->getNumberOfRows()); + + // test reading positive decimals + batch = rowReader->createRowBatch(rowCount); + EXPECT_EQ(true, rowReader->next(*batch)); + structBatch = dynamic_cast<StructVectorBatch *>(batch.get()); + decBatch = dynamic_cast<Decimal128VectorBatch *>(structBatch->fields[0]); + for (uint64_t i = 0; i < rowCount; ++i) { + std::ostringstream os; + os << i; + EXPECT_EQ(base + os.str(), decBatch->values[i].toString()); + } + + // test reading negative decimals and different scales + EXPECT_EQ(true, rowReader->next(*batch)); + structBatch = dynamic_cast<StructVectorBatch *>(batch.get()); + decBatch = dynamic_cast<Decimal128VectorBatch *>(structBatch->fields[0]); + for (uint64_t i = 0; i < rowCount; ++i) { + std::ostringstream os; + os << i; + EXPECT_EQ(nbase + os.str(), decBatch->values[i].toString()); + } + } +}
