This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new dc8e934 ORC-1125: [C++] Support reading decimal64 in ORCv2
dc8e934 is described below
commit dc8e9346f956fd0b092a0ad06176f7c47526d543
Author: Quanlong Huang <[email protected]>
AuthorDate: Sun Mar 20 22:55:24 2022 +0800
ORC-1125: [C++] Support reading decimal64 in ORCv2
This fixes #1062
---
c++/src/ColumnReader.cc | 71 ++++++++++++++++---
c++/src/ColumnReader.hh | 6 ++
c++/src/Reader.cc | 15 +++-
c++/src/Reader.hh | 4 ++
c++/src/StripeStream.cc | 4 ++
c++/src/StripeStream.hh | 2 +
c++/test/TestColumnReader.cc | 151 +++++++++++++++++++++++++++++++++++++++++
c++/test/TestRleDecoder.cc | 22 ++++++
tools/test/TestFileContents.cc | 25 +++++++
9 files changed, 289 insertions(+), 11 deletions(-)
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index 439d92a..c3a4b45 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -1656,6 +1656,60 @@ namespace orc {
}
}
+ class Decimal64ColumnReaderV2: public ColumnReader {
+ protected:
+ std::unique_ptr<RleDecoder> valueDecoder;
+ int32_t precision;
+ int32_t scale;
+
+ public:
+ Decimal64ColumnReaderV2(const Type& type, StripeStreams& stripe);
+ ~Decimal64ColumnReaderV2() override;
+
+ uint64_t skip(uint64_t numValues) override;
+
+ void next(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull) override;
+ };
+
+ Decimal64ColumnReaderV2::Decimal64ColumnReaderV2(const Type& type,
+ StripeStreams& stripe
+ ): ColumnReader(type,
stripe) {
+ scale = static_cast<int32_t>(type.getScale());
+ precision = static_cast<int32_t>(type.getPrecision());
+ std::unique_ptr<SeekableInputStream> stream =
+ stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
+ if (stream == nullptr) {
+ std::stringstream ss;
+ ss << "DATA stream not found in Decimal64V2 column. ColumnId=" <<
columnId;
+ throw ParseError(ss.str());
+ }
+ valueDecoder = createRleDecoder(std::move(stream), true, RleVersion_2,
memoryPool);
+ }
+
+ Decimal64ColumnReaderV2::~Decimal64ColumnReaderV2() {
+ // PASS
+ }
+
+ uint64_t Decimal64ColumnReaderV2::skip(uint64_t numValues) {
+ numValues = ColumnReader::skip(numValues);
+ valueDecoder->skip(numValues);
+ return numValues;
+ }
+
+ void Decimal64ColumnReaderV2::next(ColumnVectorBatch& rowBatch,
+ uint64_t numValues,
+ char *notNull) {
+ ColumnReader::next(rowBatch, numValues, notNull);
+ notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
+ Decimal64VectorBatch &batch =
+ dynamic_cast<Decimal64VectorBatch&>(rowBatch);
+ valueDecoder->next(batch.values.data(), numValues, notNull);
+ batch.precision = precision;
+ batch.scale = scale;
+ }
+
class DecimalHive11ColumnReader: public Decimal64ColumnReader {
private:
bool throwOnOverflow;
@@ -1841,18 +1895,19 @@ namespace orc {
if (type.getPrecision() == 0) {
return std::unique_ptr<ColumnReader>
(new DecimalHive11ColumnReader(type, stripe));
-
+ }
// can we represent the values using int64_t?
- } else if (type.getPrecision() <=
- Decimal64ColumnReader::MAX_PRECISION_64) {
+ if (type.getPrecision() <= Decimal64ColumnReader::MAX_PRECISION_64) {
+ if (stripe.isDecimalAsLong()) {
+ return std::unique_ptr<ColumnReader>
+ (new Decimal64ColumnReaderV2(type, stripe));
+ }
return std::unique_ptr<ColumnReader>
(new Decimal64ColumnReader(type, stripe));
-
- // otherwise we use the Int128 implementation
- } else {
- return std::unique_ptr<ColumnReader>
- (new Decimal128ColumnReader(type, stripe));
}
+ // otherwise we use the Int128 implementation
+ return std::unique_ptr<ColumnReader>
+ (new Decimal128ColumnReader(type, stripe));
default:
throw NotImplementedYet("buildReader unhandled type");
diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh
index 87994da..80b59de 100644
--- a/c++/src/ColumnReader.hh
+++ b/c++/src/ColumnReader.hh
@@ -91,6 +91,12 @@ namespace orc {
* @return the number of scale digits
*/
virtual int32_t getForcedScaleOnHive11Decimal() const = 0;
+
+ /**
+ * Whether decimals that have precision <=18 are encoded as fixed scale
and values
+ * encoded in RLE.
+ */
+ virtual bool isDecimalAsLong() const = 0;
};
/**
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 1bfcd1d..34cc950 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -502,6 +502,10 @@ namespace orc {
return throwOnHive11DecimalOverflow;
}
+ bool RowReaderImpl::getIsDecimalAsLong() const {
+ return contents->isDecimalAsLong;
+ }
+
int32_t RowReaderImpl::getForcedScaleOnHive11Decimal() const {
return forcedScaleOnHive11Decimal;
}
@@ -616,9 +620,7 @@ namespace orc {
if (contents->postscript->version_size() != 2) {
return FileVersion::v_0_11();
}
- return FileVersion(
- contents->postscript->version(0),
- contents->postscript->version(1));
+ return {contents->postscript->version(0),
contents->postscript->version(1)};
}
uint64_t ReaderImpl::getNumberOfRows() const {
@@ -1389,6 +1391,13 @@ namespace orc {
contents->footer = REDUNDANT_MOVE(readFooter(stream.get(), buffer.get(),
footerOffset, *contents->postscript, *contents->pool));
}
+ contents->isDecimalAsLong = false;
+ if (contents->postscript->version_size() == 2) {
+ FileVersion v(contents->postscript->version(0),
contents->postscript->version(1));
+ if (v == FileVersion::UNSTABLE_PRE_2_0()) {
+ contents->isDecimalAsLong = true;
+ }
+ }
contents->stream = std::move(stream);
return std::unique_ptr<Reader>(new ReaderImpl(std::move(contents),
options,
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index baa716f..54dbe33 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -63,6 +63,9 @@ namespace orc {
CompressionKind compression;
MemoryPool *pool;
std::ostream *errorStream;
+ /// Decimal64 in ORCv2 uses RLE to store values. This flag indicates
whether
+ /// this new encoding is used.
+ bool isDecimalAsLong;
};
proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
@@ -224,6 +227,7 @@ namespace orc {
const FileContents& getFileContents() const;
bool getThrowOnHive11DecimalOverflow() const;
+ bool getIsDecimalAsLong() const;
int32_t getForcedScaleOnHive11Decimal() const;
};
diff --git a/c++/src/StripeStream.cc b/c++/src/StripeStream.cc
index eda565c..6d6dda8 100644
--- a/c++/src/StripeStream.cc
+++ b/c++/src/StripeStream.cc
@@ -127,6 +127,10 @@ namespace orc {
return reader.getThrowOnHive11DecimalOverflow();
}
+ bool StripeStreamsImpl::isDecimalAsLong() const {
+ return reader.getIsDecimalAsLong();
+ }
+
int32_t StripeStreamsImpl::getForcedScaleOnHive11Decimal() const {
return reader.getForcedScaleOnHive11Decimal();
}
diff --git a/c++/src/StripeStream.hh b/c++/src/StripeStream.hh
index 73ce7b3..8d9fb06 100644
--- a/c++/src/StripeStream.hh
+++ b/c++/src/StripeStream.hh
@@ -76,6 +76,8 @@ namespace orc {
bool getThrowOnHive11DecimalOverflow() const override;
+ bool isDecimalAsLong() const override;
+
int32_t getForcedScaleOnHive11Decimal() const override;
};
diff --git a/c++/test/TestColumnReader.cc b/c++/test/TestColumnReader.cc
index bb16700..bc0ecb8 100644
--- a/c++/test/TestColumnReader.cc
+++ b/c++/test/TestColumnReader.cc
@@ -65,6 +65,7 @@ namespace orc {
bool());
MOCK_CONST_METHOD0(getForcedScaleOnHive11Decimal, int32_t()
);
+ MOCK_CONST_METHOD0(isDecimalAsLong, bool());
MemoryPool &getMemoryPool() const {
return *getDefaultPool();
@@ -3298,6 +3299,156 @@ TEST(DecimalColumnReader, testDecimal128Skip) {
values[4].toDecimalString(decimals->scale));
}
+TEST(DecimalColumnReader, testDecimal64V2) {
+ MockStripeStreams streams;
+
+ // set getSelectedColumns() for struct<decimal(12,2)>
+ std::vector<bool> selectedColumns(2, true);
+ EXPECT_CALL(streams, getSelectedColumns())
+ .WillRepeatedly(testing::Return(selectedColumns));
+
+ // Use the decimal encoding in ORCv2
+ EXPECT_CALL(streams, isDecimalAsLong())
+ .WillRepeatedly(testing::Return(true));
+
+ // set encoding
+ proto::ColumnEncoding directEncoding;
+ directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+ EXPECT_CALL(streams, getEncoding(testing::_))
+ .WillRepeatedly(testing::Return(directEncoding));
+
+ // set getStream
+ // PRESENT stream of the struct column is nullptr.
+ EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
+ .WillRepeatedly(testing::Return(nullptr));
+
+ // PRESENT stream of the decimal column is in Boolean Run Length Encoding.
+ // {0x05, 0xff} -> 8 bytes of 0xff -> 64 true values.
+ // {0x04, 0x00} -> 7 bytes of 0x00 -> 56 false values.
+ // {0xff, 0x01} -> 1 byte of 0x01 -> 7 false values followed with 1 true.
+ const unsigned char buffer1[] = { 0x05, 0xff, 0x04, 0x00, 0xff, 0x01 };
+ EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
+ .WillRepeatedly(testing::Return(new SeekableArrayInputStream
+ (buffer1, ARRAY_SIZE(buffer1))));
+
+ // DATA stream of the decimal column is in RLEv2.
+ // Original values: [-32, -31, -30, ..., -1, 0, 1, 2, ..., 32]. See
RLEv2.basicDelta5.
+ const unsigned char buffer2[] = { 0xc0, 0x40, 0x3f, 0x02 };
+ EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
+ .WillRepeatedly(testing::Return(new SeekableArrayInputStream
+ (buffer2, ARRAY_SIZE(buffer2), 3)));
+
+ // create the row type
+ std::unique_ptr<Type> rowType = createStructType();
+ rowType->addStructField("col0", createDecimalType(12, 2));
+
+ std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
+
+ StructVectorBatch batch(64, *getDefaultPool());
+ Decimal64VectorBatch *decimals = new Decimal64VectorBatch(64,
*getDefaultPool());
+ batch.fields.push_back(decimals);
+ reader->next(batch, 64, 0);
+ EXPECT_FALSE(batch.hasNulls);
+ EXPECT_EQ(64, batch.numElements);
+ EXPECT_FALSE(decimals->hasNulls);
+ EXPECT_EQ(64, decimals->numElements);
+ EXPECT_EQ(2, decimals->scale);
+ int64_t *values = decimals->values.data();
+ for (int64_t i = 0; i < 64; ++i) {
+ EXPECT_EQ(i - 32, values[i]);
+ }
+ reader->next(batch, 64, 0);
+ EXPECT_FALSE(batch.hasNulls);
+ EXPECT_EQ(64, batch.numElements);
+ EXPECT_TRUE(decimals->hasNulls);
+ EXPECT_EQ(64, decimals->numElements);
+ for (size_t i=0; i < 63; ++i) {
+ EXPECT_EQ(0, decimals->notNull[i]);
+ }
+ EXPECT_EQ(1, decimals->notNull[63]);
+ EXPECT_EQ(32, decimals->values.data()[63]);
+}
+
+TEST(DecimalColumnReader, testDecimal64V2Skip) {
+ MockStripeStreams streams;
+
+ // set getSelectedColumns() for struct<decimal(12,2)>
+ std::vector<bool> selectedColumns(2, true);
+ EXPECT_CALL(streams, getSelectedColumns())
+ .WillRepeatedly(testing::Return(selectedColumns));
+
+ // Use the decimal encoding in ORCv2
+ EXPECT_CALL(streams, isDecimalAsLong())
+ .WillRepeatedly(testing::Return(true));
+
+ // set encoding
+ proto::ColumnEncoding directEncoding;
+ directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+ EXPECT_CALL(streams, getEncoding(testing::_))
+ .WillRepeatedly(testing::Return(directEncoding));
+
+ // set getStream
+ // PRESENT stream of the struct column is nullptr.
+ EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
+ .WillRepeatedly(testing::Return(nullptr));
+
+ // PRESENT stream of the decimal column is in Boolean Run Length Encoding.
+ // {0x05, 0xff} -> 8 bytes of 0xff -> 64 true values.
+ // {0x04, 0x00} -> 7 bytes of 0x00 -> 56 false values.
+ // {0xff, 0x01} -> 1 byte of 0x01 -> 7 false values followed with 1 true.
+ const unsigned char buffer1[] = { 0x05, 0xff, 0x04, 0x00, 0xff, 0x01 };
+ EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
+ .WillRepeatedly(testing::Return(new SeekableArrayInputStream
+ (buffer1, ARRAY_SIZE(buffer1))));
+
+ // DATA stream of the decimal column is in RLEv2.
+ // Original values: [-32, -31, -30, ..., -1, 0, 1, 2, ..., 32]. See
RLEv2.basicDelta5.
+ const unsigned char buffer2[] = { 0xc0, 0x40, 0x3f, 0x02 };
+ EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
+ .WillRepeatedly(testing::Return(new SeekableArrayInputStream
+ (buffer2, ARRAY_SIZE(buffer2), 3)));
+
+ // create the row type
+ std::unique_ptr<Type> rowType = createStructType();
+ rowType->addStructField("col0", createDecimalType(12, 2));
+
+ std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
+ StructVectorBatch batch(64, *getDefaultPool());
+ Decimal64VectorBatch *decimals = new Decimal64VectorBatch(64,
*getDefaultPool());
+ batch.fields.push_back(decimals);
+ // Read 10 values
+ reader->next(batch, 10, 0);
+ EXPECT_FALSE(batch.hasNulls);
+ EXPECT_EQ(10, batch.numElements);
+ EXPECT_FALSE(decimals->hasNulls);
+ EXPECT_EQ(10, decimals->numElements);
+ EXPECT_EQ(2, decimals->scale);
+ int64_t *values = decimals->values.data();
+ for (int64_t i = 0; i < 10; ++i) {
+ EXPECT_EQ(i - 32, values[i]);
+ }
+ // Skip 50 values and read 10 values again
+ reader->skip(50);
+ reader->next(batch, 10, 0);
+ EXPECT_FALSE(batch.hasNulls);
+ EXPECT_EQ(10, batch.numElements);
+ EXPECT_TRUE(decimals->hasNulls);
+ values = decimals->values.data();
+ for (int64_t i = 0; i < 4; ++i) {
+ EXPECT_EQ(60 + i - 32, values[i]);
+ }
+ for (size_t i = 4; i < 10; ++i) {
+ EXPECT_EQ(0, decimals->notNull[i]);
+ }
+ // Skip 57 values and read the last value
+ reader->skip(57);
+ reader->next(batch, 1, 0);
+ EXPECT_FALSE(batch.hasNulls);
+ EXPECT_EQ(1, batch.numElements);
+ EXPECT_FALSE(decimals->hasNulls);
+ EXPECT_EQ(32, decimals->values.data()[0]);
+}
+
TEST(DecimalColumnReader, testDecimalHive11) {
MockStripeStreams streams;
diff --git a/c++/test/TestRleDecoder.cc b/c++/test/TestRleDecoder.cc
index 1e1447a..1b4ca4e 100644
--- a/c++/test/TestRleDecoder.cc
+++ b/c++/test/TestRleDecoder.cc
@@ -151,6 +151,28 @@ TEST(RLEv2, basicDelta4) {
values.size());
};
+TEST(RLEv2, basicDelta5) {
+ std::vector<int64_t> values(65);
+ for (size_t i = 0; i < 65; ++i) {
+ values[i] = static_cast<int64_t>(i - 32);
+ }
+
+ // Original values: [-32, -31, -30, ..., -1, 0, 1, 2, ..., 32]
+ // 2 bytes header: 0xc0, 0x40
+ // 2 bits for encoding type(3). 5 bits for bitSize which is 0 for fixed
delta.
+ // 9 bits for length of 65(64).
+ // Base value: -32 which is 65(0x3f) after zigzag
+ // Delta base: 1 which is 2(0x02) after zigzag
+ const unsigned char bytes[] = {0xc0, 0x40, 0x3f, 0x02};
+ unsigned long l = sizeof(bytes) / sizeof(char);
+ // Read 1 at a time, then 3 at a time, etc.
+ checkResults(values, decodeRLEv2(bytes, l, 1, values.size()), 1);
+ checkResults(values, decodeRLEv2(bytes, l, 3, values.size()), 3);
+ checkResults(values, decodeRLEv2(bytes, l, 7, values.size()), 7);
+ checkResults(values, decodeRLEv2(bytes, l, values.size(), values.size()),
+ values.size());
+}
+
TEST(RLEv2, delta0Width) {
const unsigned char buffer[] = {0x4e, 0x2, 0x0, 0x1, 0x2, 0xc0, 0x2, 0x42,
0x0};
diff --git a/tools/test/TestFileContents.cc b/tools/test/TestFileContents.cc
index 9b942f1..3214fa4 100644
--- a/tools/test/TestFileContents.cc
+++ b/tools/test/TestFileContents.cc
@@ -142,3 +142,28 @@ TEST (TestFileContents, testInvalidName) {
EXPECT_EQ("", output);
EXPECT_NE(std::string::npos, error.find(error_msg));
}
+
+TEST (TestFileContents, testDecimal64V2) {
+ const std::string pgm = findProgram("tools/src/orc-contents");
+ const std::string file = findExample("decimal64_v2.orc");
+ const std::string expected =
+ "{\"a\": 17292380420, \"b\": 24, \"c\": 36164.16, \"d\": 0.03, \"e\":
0.01}\n"
+ "{\"a\": 17292380421, \"b\": 38, \"c\": 63351.70, \"d\": 0.08, \"e\":
0.01}\n"
+ "{\"a\": 17292380421, \"b\": 28, \"c\": 42673.96, \"d\": 0.09, \"e\":
0.06}\n"
+ "{\"a\": 17292380421, \"b\": 40, \"c\": 76677.60, \"d\": 0.05, \"e\":
0.04}\n"
+ "{\"a\": 17292380421, \"b\": 2, \"c\": 2096.48, \"d\": 0.07, \"e\":
0.07}\n"
+ "{\"a\": 17292380421, \"b\": 42, \"c\": 45284.82, \"d\": 0.07, \"e\":
0.05}\n"
+ "{\"a\": 17292380421, \"b\": 10, \"c\": 18572.90, \"d\": 0.01, \"e\":
0.08}\n"
+ "{\"a\": 17292380422, \"b\": 12, \"c\": 14836.80, \"d\": 0.09, \"e\":
0.06}\n"
+ "{\"a\": 17292380422, \"b\": 41, \"c\": 82152.52, \"d\": 0.07, \"e\":
0.02}\n"
+ "{\"a\": 17292380422, \"b\": 38, \"c\": 47240.84, \"d\": 0.10, \"e\":
0.00}\n";
+ const std::string error_msg = "Warning: ORC file " + file +
+ " was written in an unknown format version UNSTABLE-PRE-2.0\n";
+
+ std::string output;
+ std::string error;
+
+ EXPECT_EQ(0, runProgram({pgm, file}, output, error)) << error;
+ EXPECT_EQ(expected, output);
+ EXPECT_EQ(error_msg, error);
+}