This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c48e29  ORC-666: [C++] Support timestamp with local timezone
5c48e29 is described below

commit 5c48e291f3cbb4060243895739977102f82b861a
Author: coderex2522 <[email protected]>
AuthorDate: Fri Apr 23 10:08:39 2021 +0800

    ORC-666: [C++] Support timestamp with local timezone
    
    Add TIMESTAMP_INSTANT type to C++ ORC writer/reader. Make it consistent 
with the Java side.
    
    This closes #688
---
 c++/include/orc/Type.hh    |  3 ++-
 c++/src/ColumnPrinter.cc   |  1 +
 c++/src/ColumnReader.cc    | 21 +++++++++++++-----
 c++/src/ColumnWriter.cc    | 26 +++++++++++++++++++----
 c++/src/Reader.cc          |  1 +
 c++/src/Statistics.cc      |  1 +
 c++/src/TypeImpl.cc        |  9 +++++++-
 c++/src/Writer.cc          |  4 ++++
 c++/test/TestType.cc       |  5 +++++
 c++/test/TestWriter.cc     | 53 ++++++++++++++++++++++++++++++++++++++++++++++
 tools/src/CSVFileImport.cc |  1 +
 11 files changed, 114 insertions(+), 11 deletions(-)

diff --git a/c++/include/orc/Type.hh b/c++/include/orc/Type.hh
index cfef512..a7df830 100644
--- a/c++/include/orc/Type.hh
+++ b/c++/include/orc/Type.hh
@@ -43,7 +43,8 @@ namespace orc {
     DECIMAL = 14,
     DATE = 15,
     VARCHAR = 16,
-    CHAR = 17
+    CHAR = 17,
+    TIMESTAMP_INSTANT = 18
   };
 
   class Type {
diff --git a/c++/src/ColumnPrinter.cc b/c++/src/ColumnPrinter.cc
index d781eea..ab6b690 100644
--- a/c++/src/ColumnPrinter.cc
+++ b/c++/src/ColumnPrinter.cc
@@ -249,6 +249,7 @@ namespace orc {
         break;
 
       case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
         result = new TimestampColumnPrinter(buffer);
         break;
 
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index f0d7dde..439d92a 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -310,7 +310,9 @@ namespace orc {
     const bool sameTimezone;
 
   public:
-    TimestampColumnReader(const Type& type, StripeStreams& stripe);
+    TimestampColumnReader(const Type& type,
+                          StripeStreams& stripe,
+                          bool isInstantType);
     ~TimestampColumnReader() override;
 
     uint64_t skip(uint64_t numValues) override;
@@ -325,10 +327,15 @@ namespace orc {
 
 
   TimestampColumnReader::TimestampColumnReader(const Type& type,
-                                               StripeStreams& stripe
+                                               StripeStreams& stripe,
+                                               bool isInstantType
                                ): ColumnReader(type, stripe),
-                                  writerTimezone(stripe.getWriterTimezone()),
-                                  readerTimezone(stripe.getReaderTimezone()),
+                                  writerTimezone(isInstantType ?
+                                                 getTimezoneByName("GMT") :
+                                                 stripe.getWriterTimezone()),
+                                  readerTimezone(isInstantType ?
+                                                 getTimezoneByName("GMT") :
+                                                 stripe.getReaderTimezone()),
                                   epochOffset(writerTimezone.getEpoch()),
                                   sameTimezone(&writerTimezone == 
&readerTimezone){
     RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
@@ -1823,7 +1830,11 @@ namespace orc {
 
     case TIMESTAMP:
       return std::unique_ptr<ColumnReader>
-        (new TimestampColumnReader(type, stripe));
+        (new TimestampColumnReader(type, stripe, false));
+
+    case TIMESTAMP_INSTANT:
+      return std::unique_ptr<ColumnReader>
+        (new TimestampColumnReader(type, stripe, true));
 
     case DECIMAL:
       // is this a Hive 0.11 or 0.12 file?
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 9a1f28b..a259594 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -1701,7 +1701,8 @@ namespace orc {
   public:
     TimestampColumnWriter(const Type& type,
                           const StreamsFactory& factory,
-                          const WriterOptions& options);
+                          const WriterOptions& options,
+                          bool isInstantType);
 
     virtual void add(ColumnVectorBatch& rowBatch,
                      uint64_t offset,
@@ -1723,15 +1724,21 @@ namespace orc {
   private:
     RleVersion rleVersion;
     const Timezone& timezone;
+    const bool isUTC;
   };
 
   TimestampColumnWriter::TimestampColumnWriter(
                              const Type& type,
                              const StreamsFactory& factory,
-                             const WriterOptions& options) :
+                             const WriterOptions& options,
+                             bool isInstantType) :
                                  ColumnWriter(type, factory, options),
                                  rleVersion(options.getRleVersion()),
-                                 timezone(options.getTimezone()){
+                                 timezone(isInstantType ?
+                                          getTimezoneByName("GMT") :
+                                          options.getTimezone()),
+                                 isUTC(isInstantType ||
+                                       options.getTimezoneName() == "GMT") {
     std::unique_ptr<BufferedOutputStream> dataStream =
         factory.createStream(proto::Stream_Kind_DATA);
     std::unique_ptr<BufferedOutputStream> secondaryStream =
@@ -1801,6 +1808,9 @@ namespace orc {
       if (notNull == nullptr || notNull[i]) {
         // TimestampVectorBatch already stores data in UTC
         int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000;
+        if (!isUTC) {
+          millsUTC = timezone.convertToUTC(millsUTC);
+        }
         ++count;
         if (enableBloomFilter) {
           bloomFilter->addLong(millsUTC);
@@ -2958,7 +2968,15 @@ namespace orc {
           new TimestampColumnWriter(
                                     type,
                                     factory,
-                                    options));
+                                    options,
+                                    false));
+      case TIMESTAMP_INSTANT:
+        return std::unique_ptr<ColumnWriter>(
+          new TimestampColumnWriter(
+                                    type,
+                                    factory,
+                                    options,
+                                    true));
       case DECIMAL:
         if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_64) {
           return std::unique_ptr<ColumnWriter>(
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 81a96f5..0811dbe 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -784,6 +784,7 @@ namespace orc {
       case proto::Type_Kind_BINARY:
       case proto::Type_Kind_DECIMAL:
       case proto::Type_Kind_TIMESTAMP:
+      case proto::Type_Kind_TIMESTAMP_INSTANT:
         return 3;
       case proto::Type_Kind_CHAR:
       case proto::Type_Kind_STRING:
diff --git a/c++/src/Statistics.cc b/c++/src/Statistics.cc
index 20e2fc9..b0c9de8 100644
--- a/c++/src/Statistics.cc
+++ b/c++/src/Statistics.cc
@@ -415,6 +415,7 @@ namespace orc {
         return std::unique_ptr<MutableColumnStatistics>(
           new DateColumnStatisticsImpl());
       case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
         return std::unique_ptr<MutableColumnStatistics>(
           new TimestampColumnStatisticsImpl());
       case DECIMAL:
diff --git a/c++/src/TypeImpl.cc b/c++/src/TypeImpl.cc
index 3049174..4d5a5a9 100644
--- a/c++/src/TypeImpl.cc
+++ b/c++/src/TypeImpl.cc
@@ -205,6 +205,8 @@ namespace orc {
       return "binary";
     case TIMESTAMP:
       return "timestamp";
+    case TIMESTAMP_INSTANT:
+      return "timestamp with local time zone";
     case LIST:
       return "array<" + (subTypes[0] ? subTypes[0]->toString() : "void") + ">";
     case MAP:
@@ -286,6 +288,7 @@ namespace orc {
         (new StringVectorBatch(capacity, memoryPool));
 
     case TIMESTAMP:
+    case TIMESTAMP_INSTANT:
       return std::unique_ptr<ColumnVectorBatch>
         (new TimestampVectorBatch(capacity, memoryPool));
 
@@ -401,6 +404,7 @@ namespace orc {
     case proto::Type_Kind_STRING:
     case proto::Type_Kind_BINARY:
     case proto::Type_Kind_TIMESTAMP:
+    case proto::Type_Kind_TIMESTAMP_INSTANT:
     case proto::Type_Kind_DATE:
       ret = std::unique_ptr<Type>
         (new TypeImpl(static_cast<TypeKind>(type.kind())));
@@ -484,6 +488,7 @@ namespace orc {
     case STRING:
     case BINARY:
     case TIMESTAMP:
+    case TIMESTAMP_INSTANT:
     case DATE:
       result = new TypeImpl(fileType->getKind());
       break;
@@ -658,6 +663,8 @@ namespace orc {
       return std::unique_ptr<Type>(new TypeImpl(BINARY));
     } else if (category == "timestamp") {
       return std::unique_ptr<Type>(new TypeImpl(TIMESTAMP));
+    } else if (category == "timestamp with local time zone") {
+      return std::unique_ptr<Type>(new TypeImpl(TIMESTAMP_INSTANT));
     } else if (category == "array") {
       return parseArrayType(input, start, end);
     } else if (category == "map") {
@@ -700,7 +707,7 @@ namespace orc {
       if (input[endPos] == ':') {
         fieldName = input.substr(pos, endPos - pos);
         pos = ++endPos;
-        while (endPos < end && isalpha(input[endPos])) {
+        while (endPos < end && (isalpha(input[endPos]) || input[endPos] == ' 
')) {
           ++endPos;
         }
       }
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index f6d127f..fda5f5c 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -585,6 +585,10 @@ namespace orc {
       protoType.set_kind(proto::Type_Kind_TIMESTAMP);
       break;
     }
+    case TIMESTAMP_INSTANT: {
+      protoType.set_kind(proto::Type_Kind_TIMESTAMP_INSTANT);
+      break;
+    }
     case LIST: {
       protoType.set_kind(proto::Type_Kind_LIST);
       break;
diff --git a/c++/test/TestType.cc b/c++/test/TestType.cc
index e70a9ef..3d6f2d1 100644
--- a/c++/test/TestType.cc
+++ b/c++/test/TestType.cc
@@ -295,6 +295,11 @@ namespace orc {
     EXPECT_EQ(typeStr, type->toString());
 
     typeStr =
+      "struct<a:bigint,b:struct<a:binary,b:timestamp with local time zone>>";
+    type = Type::buildTypeFromString(typeStr);
+    EXPECT_EQ(typeStr, type->toString());
+
+    typeStr =
       "struct<a:bigint,b:struct<a:binary,b:timestamp>,c:map<double,tinyint>>";
     type = Type::buildTypeFromString(typeStr);
     EXPECT_EQ(typeStr, type->toString());
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index f91aecb..506887e 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -800,6 +800,59 @@ namespace orc {
   }
 #endif
 
+  TEST_P(WriterTest, writeTimestampInstant) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    std::unique_ptr<Type> type(Type::buildTypeFromString(
+      "struct<col1:timestamp with local time zone>"));
+
+    uint64_t stripeSize = 16 * 1024;
+    uint64_t compressionBlockSize = 1024;
+    uint64_t rowCount = 102400;
+
+    std::unique_ptr<Writer> writer = createWriter(stripeSize,
+                                                  compressionBlockSize,
+                                                  CompressionKind_ZLIB,
+                                                  *type,
+                                                  pool,
+                                                  &memStream,
+                                                  fileVersion);
+    std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
+    StructVectorBatch * structBatch =
+      dynamic_cast<StructVectorBatch *>(batch.get());
+    TimestampVectorBatch * tsBatch =
+      dynamic_cast<TimestampVectorBatch *>(structBatch->fields[0]);
+
+    std::vector<std::time_t> times(rowCount);
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      time_t currTime = -14210715; // 1969-07-20 12:34:45
+      times[i] = static_cast<int64_t>(currTime) + static_cast<int64_t >(i * 
3660);
+      tsBatch->data[i] = times[i];
+      tsBatch->nanoseconds[i] = static_cast<int64_t>(i * 1000);
+    }
+    structBatch->numElements = rowCount;
+    tsBatch->numElements = rowCount;
+
+    writer->add(*batch);
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(
+      new MemoryInputStream (memStream.getData(), memStream.getLength()));
+    std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+    std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+    EXPECT_EQ(rowCount, reader->getNumberOfRows());
+
+    batch = rowReader->createRowBatch(rowCount);
+    EXPECT_EQ(true, rowReader->next(*batch));
+
+    structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+    tsBatch = dynamic_cast<TimestampVectorBatch *>(structBatch->fields[0]);
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      EXPECT_EQ(times[i], tsBatch->data[i]);
+      EXPECT_EQ(i * 1000, tsBatch->nanoseconds[i]);
+    }
+  }
+
   TEST_P(WriterTest, writeCharAndVarcharColumn) {
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
     MemoryPool * pool = getDefaultPool();
diff --git a/tools/src/CSVFileImport.cc b/tools/src/CSVFileImport.cc
index a98b511..3857adc 100644
--- a/tools/src/CSVFileImport.cc
+++ b/tools/src/CSVFileImport.cc
@@ -448,6 +448,7 @@ int main(int argc, char* argv[]) {
                            i);
             break;
           case orc::TIMESTAMP:
+          case orc::TIMESTAMP_INSTANT:
             fillTimestampValues(data,
                                 structBatch->fields[i],
                                 numValues,

Reply via email to