This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new dc8e934  ORC-1125: [C++] Support reading decimal64 in ORCv2
dc8e934 is described below

commit dc8e9346f956fd0b092a0ad06176f7c47526d543
Author: Quanlong Huang <[email protected]>
AuthorDate: Sun Mar 20 22:55:24 2022 +0800

    ORC-1125: [C++] Support reading decimal64 in ORCv2
    
    This fixes #1062
---
 c++/src/ColumnReader.cc        |  71 ++++++++++++++++---
 c++/src/ColumnReader.hh        |   6 ++
 c++/src/Reader.cc              |  15 +++-
 c++/src/Reader.hh              |   4 ++
 c++/src/StripeStream.cc        |   4 ++
 c++/src/StripeStream.hh        |   2 +
 c++/test/TestColumnReader.cc   | 151 +++++++++++++++++++++++++++++++++++++++++
 c++/test/TestRleDecoder.cc     |  22 ++++++
 tools/test/TestFileContents.cc |  25 +++++++
 9 files changed, 289 insertions(+), 11 deletions(-)

diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index 439d92a..c3a4b45 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -1656,6 +1656,60 @@ namespace orc {
     }
   }
 
+  class Decimal64ColumnReaderV2: public ColumnReader {
+  protected:
+    std::unique_ptr<RleDecoder> valueDecoder;
+    int32_t precision;
+    int32_t scale;
+
+  public:
+    Decimal64ColumnReaderV2(const Type& type, StripeStreams& stripe);
+    ~Decimal64ColumnReaderV2() override;
+
+    uint64_t skip(uint64_t numValues) override;
+
+    void next(ColumnVectorBatch& rowBatch,
+              uint64_t numValues,
+              char *notNull) override;
+  };
+
+  Decimal64ColumnReaderV2::Decimal64ColumnReaderV2(const Type& type,
+                                                   StripeStreams& stripe
+                                                   ): ColumnReader(type, 
stripe) {
+    scale = static_cast<int32_t>(type.getScale());
+    precision = static_cast<int32_t>(type.getPrecision());
+    std::unique_ptr<SeekableInputStream> stream =
+        stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
+    if (stream == nullptr) {
+      std::stringstream ss;
+      ss << "DATA stream not found in Decimal64V2 column. ColumnId=" << 
columnId;
+      throw ParseError(ss.str());
+    }
+    valueDecoder = createRleDecoder(std::move(stream), true, RleVersion_2, 
memoryPool);
+  }
+
+  Decimal64ColumnReaderV2::~Decimal64ColumnReaderV2() {
+    // PASS
+  }
+
+  uint64_t Decimal64ColumnReaderV2::skip(uint64_t numValues) {
+    numValues = ColumnReader::skip(numValues);
+    valueDecoder->skip(numValues);
+    return numValues;
+  }
+
+  void Decimal64ColumnReaderV2::next(ColumnVectorBatch& rowBatch,
+                                     uint64_t numValues,
+                                     char *notNull) {
+    ColumnReader::next(rowBatch, numValues, notNull);
+    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
+    Decimal64VectorBatch &batch =
+      dynamic_cast<Decimal64VectorBatch&>(rowBatch);
+    valueDecoder->next(batch.values.data(), numValues, notNull);
+    batch.precision = precision;
+    batch.scale = scale;
+  }
+
   class DecimalHive11ColumnReader: public Decimal64ColumnReader {
   private:
     bool throwOnOverflow;
@@ -1841,18 +1895,19 @@ namespace orc {
       if (type.getPrecision() == 0) {
         return std::unique_ptr<ColumnReader>
           (new DecimalHive11ColumnReader(type, stripe));
-
+      }
       // can we represent the values using int64_t?
-      } else if (type.getPrecision() <=
-                 Decimal64ColumnReader::MAX_PRECISION_64) {
+      if (type.getPrecision() <= Decimal64ColumnReader::MAX_PRECISION_64) {
+        if (stripe.isDecimalAsLong()) {
+          return std::unique_ptr<ColumnReader>
+            (new Decimal64ColumnReaderV2(type, stripe));
+        }
         return std::unique_ptr<ColumnReader>
           (new Decimal64ColumnReader(type, stripe));
-
-      // otherwise we use the Int128 implementation
-      } else {
-        return std::unique_ptr<ColumnReader>
-          (new Decimal128ColumnReader(type, stripe));
       }
+      // otherwise we use the Int128 implementation
+      return std::unique_ptr<ColumnReader>
+        (new Decimal128ColumnReader(type, stripe));
 
     default:
       throw NotImplementedYet("buildReader unhandled type");
diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh
index 87994da..80b59de 100644
--- a/c++/src/ColumnReader.hh
+++ b/c++/src/ColumnReader.hh
@@ -91,6 +91,12 @@ namespace orc {
      * @return the number of scale digits
      */
     virtual int32_t getForcedScaleOnHive11Decimal() const = 0;
+
+    /**
+     * Whether decimals that have precision <=18 are encoded as fixed scale 
and values
+     * encoded in RLE.
+     */
+    virtual bool isDecimalAsLong() const = 0;
   };
 
   /**
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 1bfcd1d..34cc950 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -502,6 +502,10 @@ namespace orc {
     return throwOnHive11DecimalOverflow;
   }
 
+  bool RowReaderImpl::getIsDecimalAsLong() const {
+    return contents->isDecimalAsLong;
+  }
+
   int32_t RowReaderImpl::getForcedScaleOnHive11Decimal() const {
     return forcedScaleOnHive11Decimal;
   }
@@ -616,9 +620,7 @@ namespace orc {
     if (contents->postscript->version_size() != 2) {
       return FileVersion::v_0_11();
     }
-    return FileVersion(
-                contents->postscript->version(0),
-                contents->postscript->version(1));
+    return {contents->postscript->version(0), 
contents->postscript->version(1)};
   }
 
   uint64_t ReaderImpl::getNumberOfRows() const {
@@ -1389,6 +1391,13 @@ namespace orc {
       contents->footer = REDUNDANT_MOVE(readFooter(stream.get(), buffer.get(),
         footerOffset, *contents->postscript,  *contents->pool));
     }
+    contents->isDecimalAsLong = false;
+    if (contents->postscript->version_size() == 2) {
+      FileVersion v(contents->postscript->version(0), 
contents->postscript->version(1));
+      if (v == FileVersion::UNSTABLE_PRE_2_0()) {
+        contents->isDecimalAsLong = true;
+      }
+    }
     contents->stream = std::move(stream);
     return std::unique_ptr<Reader>(new ReaderImpl(std::move(contents),
                                                   options,
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index baa716f..54dbe33 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -63,6 +63,9 @@ namespace orc {
     CompressionKind compression;
     MemoryPool *pool;
     std::ostream *errorStream;
+    /// Decimal64 in ORCv2 uses RLE to store values. This flag indicates 
whether
+    /// this new encoding is used.
+    bool isDecimalAsLong;
   };
 
   proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
@@ -224,6 +227,7 @@ namespace orc {
 
     const FileContents& getFileContents() const;
     bool getThrowOnHive11DecimalOverflow() const;
+    bool getIsDecimalAsLong() const;
     int32_t getForcedScaleOnHive11Decimal() const;
   };
 
diff --git a/c++/src/StripeStream.cc b/c++/src/StripeStream.cc
index eda565c..6d6dda8 100644
--- a/c++/src/StripeStream.cc
+++ b/c++/src/StripeStream.cc
@@ -127,6 +127,10 @@ namespace orc {
     return reader.getThrowOnHive11DecimalOverflow();
   }
 
+  bool StripeStreamsImpl::isDecimalAsLong() const {
+    return reader.getIsDecimalAsLong();
+  }
+
   int32_t StripeStreamsImpl::getForcedScaleOnHive11Decimal() const {
     return reader.getForcedScaleOnHive11Decimal();
   }
diff --git a/c++/src/StripeStream.hh b/c++/src/StripeStream.hh
index 73ce7b3..8d9fb06 100644
--- a/c++/src/StripeStream.hh
+++ b/c++/src/StripeStream.hh
@@ -76,6 +76,8 @@ namespace orc {
 
     bool getThrowOnHive11DecimalOverflow() const override;
 
+    bool isDecimalAsLong() const override;
+
     int32_t getForcedScaleOnHive11Decimal() const override;
   };
 
diff --git a/c++/test/TestColumnReader.cc b/c++/test/TestColumnReader.cc
index bb16700..bc0ecb8 100644
--- a/c++/test/TestColumnReader.cc
+++ b/c++/test/TestColumnReader.cc
@@ -65,6 +65,7 @@ namespace orc {
     bool());
     MOCK_CONST_METHOD0(getForcedScaleOnHive11Decimal, int32_t()
     );
+    MOCK_CONST_METHOD0(isDecimalAsLong, bool());
 
     MemoryPool &getMemoryPool() const {
       return *getDefaultPool();
@@ -3298,6 +3299,156 @@ TEST(DecimalColumnReader, testDecimal128Skip) {
             values[4].toDecimalString(decimals->scale));
 }
 
+TEST(DecimalColumnReader, testDecimal64V2) {
+  MockStripeStreams streams;
+
+  // set getSelectedColumns() for struct<decimal(12,2)>
+  std::vector<bool> selectedColumns(2, true);
+  EXPECT_CALL(streams, getSelectedColumns())
+      .WillRepeatedly(testing::Return(selectedColumns));
+
+  // Use the decimal encoding in ORCv2
+  EXPECT_CALL(streams, isDecimalAsLong())
+      .WillRepeatedly(testing::Return(true));
+
+  // set encoding
+  proto::ColumnEncoding directEncoding;
+  directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+  EXPECT_CALL(streams, getEncoding(testing::_))
+      .WillRepeatedly(testing::Return(directEncoding));
+
+  // set getStream
+  // PRESENT stream of the struct column is nullptr.
+  EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
+      .WillRepeatedly(testing::Return(nullptr));
+
+  // PRESENT stream of the decimal column is in Boolean Run Length Encoding.
+  // {0x05, 0xff} -> 8 bytes of 0xff -> 64 true values.
+  // {0x04, 0x00} -> 7 bytes of 0x00 -> 56 false values.
+  // {0xff, 0x01} -> 1 byte of 0x01 -> 7 false values followed with 1 true.
+  const unsigned char buffer1[] = { 0x05, 0xff, 0x04, 0x00, 0xff, 0x01 };
+  EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
+      .WillRepeatedly(testing::Return(new SeekableArrayInputStream
+                                      (buffer1, ARRAY_SIZE(buffer1))));
+
+  // DATA stream of the decimal column is in RLEv2.
+  // Original values: [-32, -31, -30, ..., -1, 0, 1, 2, ..., 32]. See 
RLEv2.basicDelta5.
+  const unsigned char buffer2[] = { 0xc0, 0x40, 0x3f, 0x02 };
+  EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
+      .WillRepeatedly(testing::Return(new SeekableArrayInputStream
+                                      (buffer2, ARRAY_SIZE(buffer2), 3)));
+
+  // create the row type
+  std::unique_ptr<Type> rowType = createStructType();
+  rowType->addStructField("col0", createDecimalType(12, 2));
+
+  std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
+
+  StructVectorBatch batch(64, *getDefaultPool());
+  Decimal64VectorBatch *decimals = new Decimal64VectorBatch(64, 
*getDefaultPool());
+  batch.fields.push_back(decimals);
+  reader->next(batch, 64, 0);
+  EXPECT_FALSE(batch.hasNulls);
+  EXPECT_EQ(64, batch.numElements);
+  EXPECT_FALSE(decimals->hasNulls);
+  EXPECT_EQ(64, decimals->numElements);
+  EXPECT_EQ(2, decimals->scale);
+  int64_t *values = decimals->values.data();
+  for (int64_t i = 0; i < 64; ++i) {
+    EXPECT_EQ(i - 32, values[i]);
+  }
+  reader->next(batch, 64, 0);
+  EXPECT_FALSE(batch.hasNulls);
+  EXPECT_EQ(64, batch.numElements);
+  EXPECT_TRUE(decimals->hasNulls);
+  EXPECT_EQ(64, decimals->numElements);
+  for (size_t i=0; i < 63; ++i) {
+    EXPECT_EQ(0, decimals->notNull[i]);
+  }
+  EXPECT_EQ(1, decimals->notNull[63]);
+  EXPECT_EQ(32, decimals->values.data()[63]);
+}
+
+TEST(DecimalColumnReader, testDecimal64V2Skip) {
+  MockStripeStreams streams;
+
+  // set getSelectedColumns() for struct<decimal(12,2)>
+  std::vector<bool> selectedColumns(2, true);
+  EXPECT_CALL(streams, getSelectedColumns())
+      .WillRepeatedly(testing::Return(selectedColumns));
+
+  // Use the decimal encoding in ORCv2
+  EXPECT_CALL(streams, isDecimalAsLong())
+      .WillRepeatedly(testing::Return(true));
+
+  // set encoding
+  proto::ColumnEncoding directEncoding;
+  directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+  EXPECT_CALL(streams, getEncoding(testing::_))
+      .WillRepeatedly(testing::Return(directEncoding));
+
+  // set getStream
+  // PRESENT stream of the struct column is nullptr.
+  EXPECT_CALL(streams, getStreamProxy(0, proto::Stream_Kind_PRESENT, true))
+      .WillRepeatedly(testing::Return(nullptr));
+
+  // PRESENT stream of the decimal column is in Boolean Run Length Encoding.
+  // {0x05, 0xff} -> 8 bytes of 0xff -> 64 true values.
+  // {0x04, 0x00} -> 7 bytes of 0x00 -> 56 false values.
+  // {0xff, 0x01} -> 1 byte of 0x01 -> 7 false values followed with 1 true.
+  const unsigned char buffer1[] = { 0x05, 0xff, 0x04, 0x00, 0xff, 0x01 };
+  EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_PRESENT, true))
+      .WillRepeatedly(testing::Return(new SeekableArrayInputStream
+                                      (buffer1, ARRAY_SIZE(buffer1))));
+
+  // DATA stream of the decimal column is in RLEv2.
+  // Original values: [-32, -31, -30, ..., -1, 0, 1, 2, ..., 32]. See 
RLEv2.basicDelta5.
+  const unsigned char buffer2[] = { 0xc0, 0x40, 0x3f, 0x02 };
+  EXPECT_CALL(streams, getStreamProxy(1, proto::Stream_Kind_DATA, true))
+      .WillRepeatedly(testing::Return(new SeekableArrayInputStream
+                                      (buffer2, ARRAY_SIZE(buffer2), 3)));
+
+  // create the row type
+  std::unique_ptr<Type> rowType = createStructType();
+  rowType->addStructField("col0", createDecimalType(12, 2));
+
+  std::unique_ptr<ColumnReader> reader = buildReader(*rowType, streams);
+  StructVectorBatch batch(64, *getDefaultPool());
+  Decimal64VectorBatch *decimals = new Decimal64VectorBatch(64, 
*getDefaultPool());
+  batch.fields.push_back(decimals);
+  // Read 10 values
+  reader->next(batch, 10, 0);
+  EXPECT_FALSE(batch.hasNulls);
+  EXPECT_EQ(10, batch.numElements);
+  EXPECT_FALSE(decimals->hasNulls);
+  EXPECT_EQ(10, decimals->numElements);
+  EXPECT_EQ(2, decimals->scale);
+  int64_t *values = decimals->values.data();
+  for (int64_t i = 0; i < 10; ++i) {
+    EXPECT_EQ(i - 32, values[i]);
+  }
+  // Skip 50 values and read 10 values again
+  reader->skip(50);
+  reader->next(batch, 10, 0);
+  EXPECT_FALSE(batch.hasNulls);
+  EXPECT_EQ(10, batch.numElements);
+  EXPECT_TRUE(decimals->hasNulls);
+  values = decimals->values.data();
+  for (int64_t i = 0; i < 4; ++i) {
+    EXPECT_EQ(60 + i - 32, values[i]);
+  }
+  for (size_t i = 4; i < 10; ++i) {
+    EXPECT_EQ(0, decimals->notNull[i]);
+  }
+  // Skip 57 values and read the last value
+  reader->skip(57);
+  reader->next(batch, 1, 0);
+  EXPECT_FALSE(batch.hasNulls);
+  EXPECT_EQ(1, batch.numElements);
+  EXPECT_FALSE(decimals->hasNulls);
+  EXPECT_EQ(32, decimals->values.data()[0]);
+}
+
 TEST(DecimalColumnReader, testDecimalHive11) {
   MockStripeStreams streams;
 
diff --git a/c++/test/TestRleDecoder.cc b/c++/test/TestRleDecoder.cc
index 1e1447a..1b4ca4e 100644
--- a/c++/test/TestRleDecoder.cc
+++ b/c++/test/TestRleDecoder.cc
@@ -151,6 +151,28 @@ TEST(RLEv2, basicDelta4) {
                values.size());
 };
 
+TEST(RLEv2, basicDelta5) {
+  std::vector<int64_t> values(65);
+  for (size_t i = 0; i < 65; ++i) {
+    values[i] = static_cast<int64_t>(i - 32);
+  }
+
+  // Original values: [-32, -31, -30, ..., -1, 0, 1, 2, ..., 32]
+  // 2 bytes header: 0xc0, 0x40
+  //    2 bits for encoding type(3). 5 bits for bitSize which is 0 for fixed 
delta.
+  //    9 bits for length of 65(64).
+  // Base value: -32 which is 65(0x3f) after zigzag
+  // Delta base: 1 which is 2(0x02) after zigzag
+  const unsigned char bytes[] = {0xc0, 0x40, 0x3f, 0x02};
+  unsigned long l = sizeof(bytes) / sizeof(char);
+  // Read 1 at a time, then 3 at a time, etc.
+  checkResults(values, decodeRLEv2(bytes, l, 1, values.size()), 1);
+  checkResults(values, decodeRLEv2(bytes, l, 3, values.size()), 3);
+  checkResults(values, decodeRLEv2(bytes, l, 7, values.size()), 7);
+  checkResults(values, decodeRLEv2(bytes, l, values.size(), values.size()),
+               values.size());
+}
+
 TEST(RLEv2, delta0Width) {
   const unsigned char buffer[] = {0x4e, 0x2, 0x0, 0x1, 0x2, 0xc0, 0x2, 0x42,
                                  0x0};
diff --git a/tools/test/TestFileContents.cc b/tools/test/TestFileContents.cc
index 9b942f1..3214fa4 100644
--- a/tools/test/TestFileContents.cc
+++ b/tools/test/TestFileContents.cc
@@ -142,3 +142,28 @@ TEST (TestFileContents, testInvalidName) {
   EXPECT_EQ("", output);
   EXPECT_NE(std::string::npos, error.find(error_msg));
 }
+
+TEST (TestFileContents, testDecimal64V2) {
+  const std::string pgm = findProgram("tools/src/orc-contents");
+  const std::string file = findExample("decimal64_v2.orc");
+  const std::string expected =
+      "{\"a\": 17292380420, \"b\": 24, \"c\": 36164.16, \"d\": 0.03, \"e\": 
0.01}\n"
+      "{\"a\": 17292380421, \"b\": 38, \"c\": 63351.70, \"d\": 0.08, \"e\": 
0.01}\n"
+      "{\"a\": 17292380421, \"b\": 28, \"c\": 42673.96, \"d\": 0.09, \"e\": 
0.06}\n"
+      "{\"a\": 17292380421, \"b\": 40, \"c\": 76677.60, \"d\": 0.05, \"e\": 
0.04}\n"
+      "{\"a\": 17292380421, \"b\": 2, \"c\": 2096.48, \"d\": 0.07, \"e\": 
0.07}\n"
+      "{\"a\": 17292380421, \"b\": 42, \"c\": 45284.82, \"d\": 0.07, \"e\": 
0.05}\n"
+      "{\"a\": 17292380421, \"b\": 10, \"c\": 18572.90, \"d\": 0.01, \"e\": 
0.08}\n"
+      "{\"a\": 17292380422, \"b\": 12, \"c\": 14836.80, \"d\": 0.09, \"e\": 
0.06}\n"
+      "{\"a\": 17292380422, \"b\": 41, \"c\": 82152.52, \"d\": 0.07, \"e\": 
0.02}\n"
+      "{\"a\": 17292380422, \"b\": 38, \"c\": 47240.84, \"d\": 0.10, \"e\": 
0.00}\n";
+  const std::string error_msg = "Warning: ORC file " + file +
+      " was written in an unknown format version UNSTABLE-PRE-2.0\n";
+
+  std::string output;
+  std::string error;
+
+  EXPECT_EQ(0, runProgram({pgm, file}, output, error)) << error;
+  EXPECT_EQ(expected, output);
+  EXPECT_EQ(error_msg, error);
+}

Reply via email to