This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new f326f6a6b ORC-1388: [C++] Support schema evolution from decimal to 
timestamp/string group
f326f6a6b is described below

commit f326f6a6b627f43677f5e4c245f19bf461e46820
Author: ffacs <[email protected]>
AuthorDate: Tue Jan 30 13:52:22 2024 +0800

    ORC-1388: [C++] Support schema evolution from decimal to timestamp/string 
group
    
    ### What changes were proposed in this pull request?
    Support conversion from
    {decimal}
    to
    {string, char, varchar, timestamp, timestamp_instant}
    
    ### Why are the changes needed?
    Support schema evolution at cpp side.
    
    ### How was this patch tested?
    UT passed.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    NO
    
    Closes #1761 from ffacs/ORC-1388.
    
    Authored-by: ffacs <[email protected]>
    Signed-off-by: Gang Wu <[email protected]>
---
 c++/src/ConvertColumnReader.cc      | 138 +++++++++++++++++++++++++++++++---
 c++/src/SchemaEvolution.cc          |   3 +-
 c++/test/TestConvertColumnReader.cc | 143 ++++++++++++++++++++++++++++++++++++
 c++/test/TestSchemaEvolution.cc     |  16 ++++
 4 files changed, 287 insertions(+), 13 deletions(-)

diff --git a/c++/src/ConvertColumnReader.cc b/c++/src/ConvertColumnReader.cc
index a1e29ba58..c139cfa9d 100644
--- a/c++/src/ConvertColumnReader.cc
+++ b/c++/src/ConvertColumnReader.cc
@@ -593,6 +593,107 @@ namespace orc {
     int32_t toScale;
   };
 
+  template <typename FileTypeBatch>
+  class DecimalToTimestampColumnReader : public ConvertToTimestampColumnReader 
{
+   public:
+    DecimalToTimestampColumnReader(const Type& _readType, const Type& fileType,
+                                   StripeStreams& stripe, bool 
_throwOnOverflow)
+        : ConvertToTimestampColumnReader(_readType, fileType, stripe, 
_throwOnOverflow),
+          precision(static_cast<int32_t>(fileType.getPrecision())),
+          scale(static_cast<int32_t>(fileType.getScale())) {}
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) 
override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+      const auto& srcBatch = *SafeCastBatchTo<const 
FileTypeBatch*>(data.get());
+      auto& dstBatch = *SafeCastBatchTo<TimestampVectorBatch*>(&rowBatch);
+      for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          convertDecimalToTimestamp(dstBatch, i, srcBatch);
+        }
+      }
+    }
+
+   private:
+    void convertDecimalToTimestamp(TimestampVectorBatch& dstBatch, uint64_t 
idx,
+                                   const FileTypeBatch& srcBatch) {
+      constexpr int SecondToNanoFactor = 9;
+      // Following constant comes from java.time.Instant
+      // '-1000000000-01-01T00:00Z'
+      constexpr int64_t MIN_EPOCH_SECONDS = -31557014167219200L;
+      // '1000000000-12-31T23:59:59.999999999Z'
+      constexpr int64_t MAX_EPOCH_SECONDS = 31556889864403199L;
+      // dummy variable, there's no risk of overflow
+      bool overflow = false;
+
+      Int128 i128(srcBatch.values[idx]);
+      Int128 integerPortion = scaleDownInt128ByPowerOfTen(i128, scale);
+      if (integerPortion < MIN_EPOCH_SECONDS || integerPortion > 
MAX_EPOCH_SECONDS) {
+        handleOverflow<Decimal, int64_t>(dstBatch, idx, throwOnOverflow);
+        return;
+      }
+      i128 -= scaleUpInt128ByPowerOfTen(integerPortion, scale, overflow);
+      Int128 fractionPortion = std::move(i128);
+      if (scale < SecondToNanoFactor) {
+        fractionPortion =
+            scaleUpInt128ByPowerOfTen(fractionPortion, SecondToNanoFactor - 
scale, overflow);
+      } else {
+        fractionPortion = scaleDownInt128ByPowerOfTen(fractionPortion, scale - 
SecondToNanoFactor);
+      }
+      if (fractionPortion < 0) {
+        fractionPortion += 1e9;
+        integerPortion -= 1;
+      }
+      // line 630 has guaranteed toLong() will not overflow
+      dstBatch.data[idx] = integerPortion.toLong();
+      dstBatch.nanoseconds[idx] = fractionPortion.toLong();
+
+      if (needConvertTimezone) {
+        dstBatch.data[idx] = readerTimezone.convertFromUTC(dstBatch.data[idx]);
+      }
+    }
+
+    const int32_t precision;
+    const int32_t scale;
+  };
+
+  template <typename FileTypeBatch>
+  class DecimalToStringVariantColumnReader : public 
ConvertToStringVariantColumnReader {
+   public:
+    DecimalToStringVariantColumnReader(const Type& _readType, const Type& 
fileType,
+                                       StripeStreams& stripe, bool 
_throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, 
_throwOnOverflow),
+          scale(fileType.getScale()) {}
+
+    uint64_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t 
numValues) override {
+      uint64_t size = 0;
+      strBuffer.resize(numValues);
+      const auto& srcBatch = *SafeCastBatchTo<const 
FileTypeBatch*>(data.get());
+      if (readType.getKind() == STRING) {
+        for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
+          if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+            strBuffer[i] = Int128(srcBatch.values[i]).toDecimalString(scale, 
true);
+            size += strBuffer[i].size();
+          }
+        }
+      } else {
+        const auto maxLength = readType.getMaximumLength();
+        for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
+          if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+            strBuffer[i] = Int128(srcBatch.values[i]).toDecimalString(scale, 
true);
+          }
+          if (strBuffer[i].size() > maxLength) {
+            strBuffer[i].resize(maxLength);
+          }
+          size += strBuffer[i].size();
+        }
+      }
+      return size;
+    }
+
+   private:
+    const int32_t scale;
+  };
+
 #define DEFINE_NUMERIC_CONVERT_READER(FROM, TO, TYPE) \
   using FROM##To##TO##ColumnReader =                  \
       NumericConvertColumnReader<FROM##VectorBatch, TO##VectorBatch, TYPE>;
@@ -621,6 +722,14 @@ namespace orc {
   using Decimal128##To##TO##ColumnReader =                               \
       DecimalConvertColumnReader<Decimal128VectorBatch, TO##VectorBatch>;
 
+#define DEFINE_DECIMAL_CONVERT_TO_TIMESTAMP_READER                             
                  \
+  using Decimal64ToTimestampColumnReader = 
DecimalToTimestampColumnReader<Decimal64VectorBatch>; \
+  using Decimal128ToTimestampColumnReader = 
DecimalToTimestampColumnReader<Decimal128VectorBatch>;
+
+#define DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(TO)                     
                   \
+  using Decimal64To##TO##ColumnReader = 
DecimalToStringVariantColumnReader<Decimal64VectorBatch>; \
+  using Decimal128To##TO##ColumnReader = 
DecimalToStringVariantColumnReader<Decimal128VectorBatch>;
+
   DEFINE_NUMERIC_CONVERT_READER(Boolean, Byte, int8_t)
   DEFINE_NUMERIC_CONVERT_READER(Boolean, Short, int16_t)
   DEFINE_NUMERIC_CONVERT_READER(Boolean, Int, int32_t)
@@ -720,6 +829,11 @@ namespace orc {
   DEFINE_DECIMAL_CONVERT_TO_DECIMAL_READER(Decimal64)
   DEFINE_DECIMAL_CONVERT_TO_DECIMAL_READER(Decimal128)
 
+  DEFINE_DECIMAL_CONVERT_TO_TIMESTAMP_READER
+  DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(String)
+  DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(Char)
+  DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(Varchar)
+
 #define CREATE_READER(NAME) \
   return std::make_unique<NAME>(_readType, fileType, stripe, throwOnOverflow);
 
@@ -935,13 +1049,6 @@ namespace orc {
             CASE_EXCEPTION
         }
       }
-      case STRING:
-      case BINARY:
-      case TIMESTAMP:
-      case LIST:
-      case MAP:
-      case STRUCT:
-      case UNION:
       case DECIMAL: {
         switch (_readType.getKind()) {
           CASE_CREATE_FROM_DECIMAL_READER(BOOLEAN, Boolean)
@@ -951,6 +1058,11 @@ namespace orc {
           CASE_CREATE_FROM_DECIMAL_READER(LONG, Long)
           CASE_CREATE_FROM_DECIMAL_READER(FLOAT, Float)
           CASE_CREATE_FROM_DECIMAL_READER(DOUBLE, Double)
+          CASE_CREATE_FROM_DECIMAL_READER(STRING, String)
+          CASE_CREATE_FROM_DECIMAL_READER(CHAR, Char)
+          CASE_CREATE_FROM_DECIMAL_READER(VARCHAR, Varchar)
+          CASE_CREATE_FROM_DECIMAL_READER(TIMESTAMP, Timestamp)
+          CASE_CREATE_FROM_DECIMAL_READER(TIMESTAMP_INSTANT, Timestamp)
           case DECIMAL: {
             if (isDecimal64(fileType)) {
               if (isDecimal64(_readType)) {
@@ -966,11 +1078,6 @@ namespace orc {
               }
             }
           }
-          case STRING:
-          case CHAR:
-          case VARCHAR:
-          case TIMESTAMP:
-          case TIMESTAMP_INSTANT:
           case BINARY:
           case LIST:
           case MAP:
@@ -980,6 +1087,13 @@ namespace orc {
             CASE_EXCEPTION
         }
       }
+      case STRING:
+      case BINARY:
+      case TIMESTAMP:
+      case LIST:
+      case MAP:
+      case STRUCT:
+      case UNION:
       case DATE:
       case VARCHAR:
       case CHAR:
diff --git a/c++/src/SchemaEvolution.cc b/c++/src/SchemaEvolution.cc
index b8c4fd404..dc30e6118 100644
--- a/c++/src/SchemaEvolution.cc
+++ b/c++/src/SchemaEvolution.cc
@@ -99,7 +99,8 @@ namespace orc {
           break;
         }
         case DECIMAL: {
-          ret.isValid = ret.needConvert = isNumeric(readType);
+          ret.isValid = ret.needConvert =
+              isNumeric(readType) || isStringVariant(readType) || 
isTimestamp(readType);
           break;
         }
         case STRING:
diff --git a/c++/test/TestConvertColumnReader.cc 
b/c++/test/TestConvertColumnReader.cc
index 4aabce047..83798289d 100644
--- a/c++/test/TestConvertColumnReader.cc
+++ b/c++/test/TestConvertColumnReader.cc
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
+#include "Timezone.hh"
 #include "orc/Type.hh"
 #include "wrap/gtest-wrapper.h"
 
@@ -672,4 +673,146 @@ namespace orc {
     }
   }
 
+  TEST(ConvertColumnReader, TestConvertDecimalToTimestamp) {
+    constexpr int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024;
+    constexpr int TEST_CASES = 1024;
+    std::string writerTimezoneName = "America/New_York";
+    std::string readerTimezoneName = "Australia/Sydney";
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    std::unique_ptr<Type> fileType(
+        
Type::buildTypeFromString("struct<c1:decimal(14,4),c2:decimal(25,10)>"));
+    std::shared_ptr<Type> readType(
+        Type::buildTypeFromString("struct<c1:timestamp,c2:timestamp with local 
time zone>"));
+    WriterOptions options;
+    options.setUseTightNumericVector(true);
+    options.setTimezoneName(writerTimezoneName);
+    auto writer = createWriter(*fileType, &memStream, options);
+    auto batch = writer->createRowBatch(TEST_CASES);
+    auto structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
+    auto& c1 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[0]);
+    auto& c2 = dynamic_cast<Decimal128VectorBatch&>(*structBatch->fields[1]);
+
+    auto convertToSeconds = [](const Timezone& writerTimezone, const 
std::string& date) {
+      tm timeStruct;
+      if (strptime(date.c_str(), "%Y-%m-%d %H:%M:%S", &timeStruct) == nullptr) 
{
+        throw TimezoneError("bad time " + date);
+      }
+      return writerTimezone.convertFromUTC(timegm(&timeStruct));
+    };
+
+    std::vector<std::string> timeStrings;
+    for (int i = 0; i < TEST_CASES; i++) {
+      int64_t year = 1960 + (i / 12);
+      int64_t month = i % 12 + 1;
+      int64_t day = 27;
+      std::string others = "23:45:56";
+      std::stringstream ss;
+      ss << year << "-";
+      ss << std::setfill('0') << std::setw(2) << month << "-" << day << " " << 
others;
+      timeStrings.push_back(ss.str());
+    }
+    std::vector<int64_t> ts[2];
+    for (auto& time : timeStrings) {
+      ts[0].emplace_back(convertToSeconds(getTimezoneByName("GMT"), time));
+      
ts[1].emplace_back(convertToSeconds(getTimezoneByName(writerTimezoneName), 
time));
+    }
+    bool overflow = false;
+
+    for (int i = 0; i < TEST_CASES; i++) {
+      c1.values[i] = ts[0][i] * 10000 + 1234;
+      c2.values[i] = scaleUpInt128ByPowerOfTen(Int128(ts[1][i]), 10, overflow) 
+=
+          Int128("1234567895");
+      assert(!overflow);
+    }
+
+    structBatch->numElements = c1.numElements = c2.numElements = TEST_CASES;
+    structBatch->hasNulls = c1.hasNulls = c2.hasNulls = false;
+    writer->add(*batch);
+    writer->close();
+    auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), 
memStream.getLength());
+    auto pool = getDefaultPool();
+    auto reader = createReader(*pool, std::move(inStream));
+    RowReaderOptions rowReaderOptions;
+    rowReaderOptions.setUseTightNumericVector(true);
+    rowReaderOptions.setReadType(readType);
+    rowReaderOptions.setTimezoneName(readerTimezoneName);
+    auto rowReader = reader->createRowReader(rowReaderOptions);
+    auto readBatch = rowReader->createRowBatch(TEST_CASES);
+    EXPECT_EQ(true, rowReader->next(*readBatch));
+
+    auto& readSturctBatch = dynamic_cast<StructVectorBatch&>(*readBatch);
+    auto& readC1 = 
dynamic_cast<TimestampVectorBatch&>(*readSturctBatch.fields[0]);
+    auto& readC2 = 
dynamic_cast<TimestampVectorBatch&>(*readSturctBatch.fields[1]);
+    for (int i = 0; i < TEST_CASES; i++) {
+      size_t idx = static_cast<size_t>(i);
+      EXPECT_TRUE(readC1.notNull[idx]) << i;
+      EXPECT_TRUE(readC2.notNull[idx]) << i;
+      
EXPECT_EQ(getTimezoneByName(readerTimezoneName).convertToUTC(readC1.data[i]), 
ts[0][i]);
+      EXPECT_TRUE(readC1.nanoseconds[i] == 123400000);
+      EXPECT_EQ(readC2.data[i], ts[1][i]);
+      if (readC2.data[i] < 0) {
+        EXPECT_EQ(readC2.nanoseconds[i], 123456790) << timeStrings[i];
+      } else {
+        EXPECT_EQ(readC2.nanoseconds[i], 123456789) << timeStrings[i];
+      }
+    }
+  }
+
+  TEST(ConvertColumnReader, TestConvertDecimalToStringVariant) {
+    constexpr int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024;
+    constexpr int TEST_CASES = 1024;
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    std::unique_ptr<Type> fileType(
+        
Type::buildTypeFromString("struct<c1:decimal(14,3),c2:decimal(14,3),c3:decimal(14,3)>"));
+    std::shared_ptr<Type> readType(
+        
Type::buildTypeFromString("struct<c1:char(5),c2:varchar(5),c3:string>"));
+    WriterOptions options;
+    auto writer = createWriter(*fileType, &memStream, options);
+    auto batch = writer->createRowBatch(TEST_CASES);
+    auto structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
+    auto& c1 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[0]);
+    auto& c2 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[1]);
+    auto& c3 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[2]);
+
+    for (int i = 0; i < TEST_CASES; i++) {
+      c1.values[i] = i * 1000 + 123;
+      c2.values[i] = i * 1000 + 456;
+      c3.values[i] = i * 1000 + 789;
+    }
+    structBatch->numElements = c1.numElements = c2.numElements = TEST_CASES;
+    structBatch->hasNulls = c1.hasNulls = c2.hasNulls = false;
+    writer->add(*batch);
+    writer->close();
+    auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), 
memStream.getLength());
+    auto pool = getDefaultPool();
+    auto reader = createReader(*pool, std::move(inStream));
+    RowReaderOptions rowReaderOptions;
+    rowReaderOptions.setUseTightNumericVector(true);
+    rowReaderOptions.setReadType(readType);
+    auto rowReader = reader->createRowReader(rowReaderOptions);
+    auto readBatch = rowReader->createRowBatch(TEST_CASES);
+    EXPECT_EQ(true, rowReader->next(*readBatch));
+
+    auto& readSturctBatch = dynamic_cast<StructVectorBatch&>(*readBatch);
+    auto& readC1 = 
dynamic_cast<StringVectorBatch&>(*readSturctBatch.fields[0]);
+    auto& readC2 = 
dynamic_cast<StringVectorBatch&>(*readSturctBatch.fields[1]);
+    auto& readC3 = 
dynamic_cast<StringVectorBatch&>(*readSturctBatch.fields[2]);
+    for (int i = 0; i < TEST_CASES; i++) {
+      if (i < 10) {
+        EXPECT_EQ(std::to_string(i) + ".123", std::string(readC1.data[i], 
readC1.length[i]));
+        EXPECT_EQ(std::to_string(i) + ".456", std::string(readC2.data[i], 
readC2.length[i]));
+      } else if (i >= 10 && i < 100) {
+        EXPECT_EQ(std::to_string(i) + ".12", std::string(readC1.data[i], 
readC1.length[i]));
+        EXPECT_EQ(std::to_string(i) + ".45", std::string(readC2.data[i], 
readC2.length[i]));
+      } else if (i >= 100 && i < 1000) {
+        EXPECT_EQ(std::to_string(i) + ".1", std::string(readC1.data[i], 
readC1.length[i]));
+        EXPECT_EQ(std::to_string(i) + ".4", std::string(readC2.data[i], 
readC2.length[i]));
+      } else {
+        EXPECT_EQ(std::to_string(i) + ".", std::string(readC1.data[i], 
readC1.length[i]));
+        EXPECT_EQ(std::to_string(i) + ".", std::string(readC2.data[i], 
readC2.length[i]));
+      }
+      EXPECT_EQ(std::to_string(i) + ".789", std::string(readC3.data[i], 
readC3.length[i]));
+    }
+  }
+
 }  // namespace orc
diff --git a/c++/test/TestSchemaEvolution.cc b/c++/test/TestSchemaEvolution.cc
index f57e40086..c52ba009f 100644
--- a/c++/test/TestSchemaEvolution.cc
+++ b/c++/test/TestSchemaEvolution.cc
@@ -132,6 +132,22 @@ namespace orc {
       }
     }
 
+    // conversion from decimal to string/char/varchar
+    for (size_t i = 12; i <= 13; i++) {
+      for (size_t j = 7; j <= 11; j++) {
+        canConvert[i][j] = true;
+        needConvert[i][j] = true;
+      }
+    }
+
+    // conversion from decimal to timestamp
+    for (size_t i = 12; i <= 13; i++) {
+      for (size_t j = 14; j <= 15; j++) {
+        canConvert[i][j] = true;
+        needConvert[i][j] = true;
+      }
+    }
+
     for (size_t i = 0; i < typesSize; i++) {
       for (size_t j = 0; j < typesSize; j++) {
         testConvertReader(types[i], types[j], canConvert[i][j], 
needConvert[i][j]);

Reply via email to