This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new acfbbbf  ORC-784: [C++] Support setting timezone to timestamp column 
writer/reader
acfbbbf is described below

commit acfbbbf3e4cff72b21d71fc4c85add4105d20b53
Author: coderex2522 <[email protected]>
AuthorDate: Mon Apr 19 22:05:17 2021 +0800

    ORC-784: [C++] Support setting timezone to timestamp column writer/reader
    
    This closes #683
---
 c++/include/orc/Reader.hh    |  10 +++++
 c++/include/orc/Writer.hh    |  18 ++++++++
 c++/src/ColumnReader.cc      |  21 ++++++++-
 c++/src/ColumnReader.hh      |   5 +++
 c++/src/ColumnWriter.cc      |   2 +-
 c++/src/Options.hh           |  11 +++++
 c++/src/Reader.cc            |   6 ++-
 c++/src/Reader.hh            |   4 ++
 c++/src/StripeStream.cc      |  10 ++++-
 c++/src/StripeStream.hh      |   6 ++-
 c++/src/Timezone.hh          |   4 ++
 c++/src/Writer.cc            |  22 +++++++--
 c++/test/TestColumnReader.cc |   4 ++
 c++/test/TestWriter.cc       | 105 ++++++++++++++++++++++++++++++++++++++++++-
 14 files changed, 215 insertions(+), 13 deletions(-)

diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index eee9551..cbc766a 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -256,6 +256,16 @@ namespace orc {
      * Get search argument for predicate push down
      */
     std::shared_ptr<SearchArgument> getSearchArgument() const;
+
+    /**
+     * Set desired timezone to return data of timestamp type
+     */
+    RowReaderOptions& setTimezoneName(const std::string& zoneName);
+
+    /**
+     * Get desired timezone to return data of timestamp type
+     */
+    const std::string& getTimezoneName() const;
   };
 
 
diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh
index 5b33386..78b0b97 100644
--- a/c++/include/orc/Writer.hh
+++ b/c++/include/orc/Writer.hh
@@ -217,6 +217,24 @@ namespace orc {
      * Get version of BloomFilter
      */
     BloomFilterVersion getBloomFilterVersion() const;
+
+    /**
+     * Get writer timezone
+     * @return writer timezone
+     */
+    const Timezone& getTimezone() const;
+
+    /**
+     * Get writer timezone name
+     * @return writer timezone name
+     */
+    const std::string& getTimezoneName() const;
+
+    /**
+     * Set writer timezone
+     * @param zone writer timezone name
+     */
+    WriterOptions& setTimezoneName(const std::string& zone);
   };
 
   class Writer {
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index 9694107..f0d7dde 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -305,7 +305,9 @@ namespace orc {
     std::unique_ptr<orc::RleDecoder> secondsRle;
     std::unique_ptr<orc::RleDecoder> nanoRle;
     const Timezone& writerTimezone;
+    const Timezone& readerTimezone;
     const int64_t epochOffset;
+    const bool sameTimezone;
 
   public:
     TimestampColumnReader(const Type& type, StripeStreams& stripe);
@@ -326,7 +328,9 @@ namespace orc {
                                                StripeStreams& stripe
                                ): ColumnReader(type, stripe),
                                   writerTimezone(stripe.getWriterTimezone()),
-                                  epochOffset(writerTimezone.getEpoch()) {
+                                  readerTimezone(stripe.getReaderTimezone()),
+                                  epochOffset(writerTimezone.getEpoch()),
+                                  sameTimezone(&writerTimezone == 
&readerTimezone){
     RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
     std::unique_ptr<SeekableInputStream> stream =
         stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
@@ -373,7 +377,20 @@ namespace orc {
           }
         }
         int64_t writerTime = secsBuffer[i] + epochOffset;
-        secsBuffer[i] = writerTimezone.convertToUTC(writerTime);
+        if (!sameTimezone) {
+          // adjust timestamp value to same wall clock time if writer and 
reader
+          // time zones have different rules, which is required for Apache Orc.
+          const auto& wv = writerTimezone.getVariant(writerTime);
+          const auto& rv = readerTimezone.getVariant(writerTime);
+          if (!wv.hasSameTzRule(rv)) {
+            // If the timezone adjustment moves the millis across a DST 
boundary,
+            // we need to reevaluate the offsets.
+            int64_t adjustedTime = writerTime + wv.gmtOffset - rv.gmtOffset;
+            const auto& adjustedReader = 
readerTimezone.getVariant(adjustedTime);
+            writerTime = writerTime + wv.gmtOffset - adjustedReader.gmtOffset;
+          }
+        }
+        secsBuffer[i] = writerTime;
         if (secsBuffer[i] < 0 && nanoBuffer[i] > 999999) {
           secsBuffer[i] -= 1;
         }
diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh
index 0c64e5b..87994da 100644
--- a/c++/src/ColumnReader.hh
+++ b/c++/src/ColumnReader.hh
@@ -69,6 +69,11 @@ namespace orc {
     virtual const Timezone& getWriterTimezone() const = 0;
 
     /**
+     * Get the reader's timezone, so that we can convert their dates correctly.
+     */
+    virtual const Timezone& getReaderTimezone() const = 0;
+
+    /**
      * Get the error stream.
      * @return a pointer to the stream that should get error messages
      */
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 5af4922..9a1f28b 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -1731,7 +1731,7 @@ namespace orc {
                              const WriterOptions& options) :
                                  ColumnWriter(type, factory, options),
                                  rleVersion(options.getRleVersion()),
-                                 timezone(getTimezoneByName("GMT")){
+                                 timezone(options.getTimezone()){
     std::unique_ptr<BufferedOutputStream> dataStream =
         factory.createStream(proto::Stream_Kind_DATA);
     std::unique_ptr<BufferedOutputStream> secondaryStream =
diff --git a/c++/src/Options.hh b/c++/src/Options.hh
index 2808ffa..61d4731 100644
--- a/c++/src/Options.hh
+++ b/c++/src/Options.hh
@@ -129,6 +129,7 @@ namespace orc {
     int32_t forcedScaleOnHive11Decimal;
     bool enableLazyDecoding;
     std::shared_ptr<SearchArgument> sargs;
+    std::string readerTimezone;
 
     RowReaderOptionsPrivate() {
       selection = ColumnSelection_NONE;
@@ -137,6 +138,7 @@ namespace orc {
       throwOnHive11DecimalOverflow = true;
       forcedScaleOnHive11Decimal = 6;
       enableLazyDecoding = false;
+      readerTimezone = "GMT";
     }
   };
 
@@ -259,6 +261,15 @@ namespace orc {
   std::shared_ptr<SearchArgument> RowReaderOptions::getSearchArgument() const {
     return privateBits->sargs;
   }
+
+  RowReaderOptions& RowReaderOptions::setTimezoneName(const std::string& 
zoneName) {
+    privateBits->readerTimezone = zoneName;
+    return *this;
+  }
+
+  const std::string& RowReaderOptions::getTimezoneName() const {
+    return privateBits->readerTimezone;
+  }
 }
 
 #endif
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 42ab41c..81a96f5 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -195,7 +195,8 @@ namespace orc {
                             
forcedScaleOnHive11Decimal(opts.getForcedScaleOnHive11Decimal()),
                             footer(contents->footer.get()),
                             firstRowOfStripe(*contents->pool, 0),
-                            enableEncodedBlock(opts.getEnableLazyDecoding()) {
+                            enableEncodedBlock(opts.getEnableLazyDecoding()),
+                            
readerTimezone(getTimezoneByName(opts.getTimezoneName())) {
     uint64_t numberOfStripes;
     numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
     currentStripe = numberOfStripes;
@@ -978,7 +979,8 @@ namespace orc {
                                       currentStripeFooter,
                                       currentStripeInfo.offset(),
                                       *contents->stream,
-                                      writerTimezone);
+                                      writerTimezone,
+                                      readerTimezone);
       reader = buildReader(*contents->schema, stripeStreams);
 
       if (sargsApplier) {
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 0693c62..7240975 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -69,6 +69,7 @@ namespace orc {
                                       const FileContents& contents);
 
   class ReaderImpl;
+  class Timezone;
 
   class ColumnSelector {
    private:
@@ -147,6 +148,9 @@ namespace orc {
     std::shared_ptr<SearchArgument> sargs;
     std::unique_ptr<SargsApplier> sargsApplier;
 
+    // desired timezone to return data of timestamp types.
+    const Timezone& readerTimezone;
+
     // load stripe index if not done so
     void loadStripeIndex();
 
diff --git a/c++/src/StripeStream.cc b/c++/src/StripeStream.cc
index b63f19d..eda565c 100644
--- a/c++/src/StripeStream.cc
+++ b/c++/src/StripeStream.cc
@@ -30,14 +30,16 @@ namespace orc {
                                        const proto::StripeFooter& _footer,
                                        uint64_t _stripeStart,
                                        InputStream& _input,
-                                       const Timezone& _writerTimezone
+                                       const Timezone& _writerTimezone,
+                                       const Timezone& _readerTimezone
                                        ): reader(_reader),
                                           stripeInfo(_stripeInfo),
                                           footer(_footer),
                                           stripeIndex(_index),
                                           stripeStart(_stripeStart),
                                           input(_input),
-                                          writerTimezone(_writerTimezone) {
+                                          writerTimezone(_writerTimezone),
+                                          readerTimezone(_readerTimezone) {
     // PASS
   }
 
@@ -71,6 +73,10 @@ namespace orc {
     return writerTimezone;
   }
 
+  const Timezone& StripeStreamsImpl::getReaderTimezone() const {
+    return readerTimezone;
+  }
+
   std::ostream* StripeStreamsImpl::getErrorStream() const {
     return reader.getFileContents().errorStream;
   }
diff --git a/c++/src/StripeStream.hh b/c++/src/StripeStream.hh
index 5cbaf60..73ce7b3 100644
--- a/c++/src/StripeStream.hh
+++ b/c++/src/StripeStream.hh
@@ -43,6 +43,7 @@ namespace orc {
     const uint64_t stripeStart;
     InputStream& input;
     const Timezone& writerTimezone;
+    const Timezone& readerTimezone;
 
   public:
     StripeStreamsImpl(const RowReaderImpl& reader, uint64_t index,
@@ -50,7 +51,8 @@ namespace orc {
                       const proto::StripeFooter& footer,
                       uint64_t stripeStart,
                       InputStream& input,
-                      const Timezone& writerTimezone);
+                      const Timezone& writerTimezone,
+                      const Timezone& readerTimezone);
 
     virtual ~StripeStreamsImpl() override;
 
@@ -68,6 +70,8 @@ namespace orc {
 
     const Timezone& getWriterTimezone() const override;
 
+    const Timezone& getReaderTimezone() const override;
+
     std::ostream* getErrorStream() const override;
 
     bool getThrowOnHive11DecimalOverflow() const override;
diff --git a/c++/src/Timezone.hh b/c++/src/Timezone.hh
index 136b7a1..6c8b861 100644
--- a/c++/src/Timezone.hh
+++ b/c++/src/Timezone.hh
@@ -42,6 +42,10 @@ namespace orc {
     bool isDst;
     std::string name;
 
+    bool hasSameTzRule(const TimezoneVariant& other) const {
+      return gmtOffset == other.gmtOffset && isDst == other.isDst;
+    }
+
     std::string toString() const;
   };
 
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index 66ecede..730d7ff 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -41,6 +41,7 @@ namespace orc {
     std::set<uint64_t> columnsUseBloomFilter;
     double bloomFilterFalsePositiveProb;
     BloomFilterVersion bloomFilterVersion;
+    std::string timezone;
 
     WriterOptionsPrivate() :
                             fileVersion(FileVersion::v_0_12()) { // default to 
Hive_0_12
@@ -56,6 +57,10 @@ namespace orc {
       enableIndex = true;
       bloomFilterFalsePositiveProb = 0.05;
       bloomFilterVersion = UTF8;
+      //Writer timezone uses "GMT" by default to get rid of potential issues
+      //introduced by moving timestamps between different timezones.
+      //Explictly set the writer timezone if the use case depends on it.
+      timezone = "GMT";
     }
   };
 
@@ -229,6 +234,19 @@ namespace orc {
     return privateBits->bloomFilterVersion;
   }
 
+  const Timezone& WriterOptions::getTimezone() const {
+    return getTimezoneByName(privateBits->timezone);
+  }
+
+  const std::string& WriterOptions::getTimezoneName() const {
+    return privateBits->timezone;
+  }
+
+  WriterOptions& WriterOptions::setTimezoneName(const std::string& zone) {
+    privateBits->timezone = zone;
+    return *this;
+  }
+
   Writer::~Writer() {
     // PASS
   }
@@ -439,9 +457,7 @@ namespace orc {
       *stripeFooter.add_columns() = encodings[i];
     }
 
-    // use GMT to guarantee TimestampVectorBatch from reader can write
-    // same wall clock time
-    stripeFooter.set_writertimezone("GMT");
+    stripeFooter.set_writertimezone(options.getTimezoneName());
 
     // add stripe statistics to metadata
     proto::StripeStatistics* stripeStats = metadata.add_stripestats();
diff --git a/c++/test/TestColumnReader.cc b/c++/test/TestColumnReader.cc
index 5acb19a..bb16700 100644
--- a/c++/test/TestColumnReader.cc
+++ b/c++/test/TestColumnReader.cc
@@ -73,6 +73,10 @@ namespace orc {
     const Timezone &getWriterTimezone() const override {
       return getTimezoneByName("America/Los_Angeles");
     }
+
+    const Timezone& getReaderTimezone() const override {
+      return getTimezoneByName("GMT");
+    }
   };
 
   MockStripeStreams::~MockStripeStreams() {
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index 3e63b91..f91aecb 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -48,7 +48,8 @@ namespace orc {
                                       MemoryPool* memoryPool,
                                       OutputStream* stream,
                                       FileVersion version,
-                                      uint64_t stride = 0){
+                                      uint64_t stride = 0,
+                                      const std::string& timezone = "GMT"){
     WriterOptions options;
     options.setStripeSize(stripeSize);
     options.setCompressionBlockSize(compresionblockSize);
@@ -56,6 +57,7 @@ namespace orc {
     options.setMemoryPool(memoryPool);
     options.setRowIndexStride(stride);
     options.setFileVersion(version);
+    options.setTimezoneName(timezone);
     return createWriter(type, stream, options);
   }
 
@@ -67,8 +69,11 @@ namespace orc {
     return createReader(std::move(stream), options);
   }
 
-  std::unique_ptr<RowReader> createRowReader(Reader* reader) {
+  std::unique_ptr<RowReader> createRowReader(
+                                            Reader* reader,
+                                            const std::string& timezone = 
"GMT") {
     RowReaderOptions rowReaderOpts;
+    rowReaderOpts.setTimezoneName(timezone);
     return reader->createRowReader(rowReaderOpts);
   }
 
@@ -699,6 +704,102 @@ namespace orc {
     }
   }
 
+//TODO: Disable the test below for Windows for following reasons:
+//First, the timezone name provided by Windows cannot be used as
+//a parameter to the getTimezoneByName function. Secondly, the
+//function of setting timezone in Windows is different from Linux.
+#ifndef _MSC_VER
+  void testWriteTimestampWithTimezone(FileVersion fileVersion,
+                                      const char* writerTimezone,
+                                      const char* readerTimezone,
+                                      const std::string& tsStr,
+                                      int isDst = 0) {
+    char* tzBk = getenv("TZ");  // backup TZ env
+
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    std::unique_ptr<Type> 
type(Type::buildTypeFromString("struct<col1:timestamp>"));
+
+    uint64_t stripeSize = 16 * 1024;
+    uint64_t compressionBlockSize = 1024;
+    uint64_t rowCount = 1;
+
+    std::unique_ptr<Writer> writer = createWriter(stripeSize,
+                                                  compressionBlockSize,
+                                                  CompressionKind_ZLIB,
+                                                  *type,
+                                                  pool,
+                                                  &memStream,
+                                                  fileVersion,
+                                                  0,
+                                                  writerTimezone);
+    auto batch = writer->createRowBatch(rowCount);
+    auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+    auto& tsBatch = 
dynamic_cast<TimestampVectorBatch&>(*structBatch.fields[0]);
+
+    // write timestamp in the writer timezone
+    setenv("TZ", writerTimezone, 1); tzset();
+    struct tm tm;
+    memset(&tm, 0, sizeof(struct tm));
+    strptime(tsStr.c_str(), "%Y-%m-%d %H:%M:%S", &tm);
+    // mktime() does depend on external hint for daylight saving time
+    tm.tm_isdst = isDst;
+    tsBatch.data[0] = mktime(&tm);
+    tsBatch.nanoseconds[0] = 0;
+    structBatch.numElements = rowCount;
+    tsBatch.numElements = rowCount;
+    writer->add(*batch);
+    writer->close();
+
+    // read timestamp from the reader timezone
+    std::unique_ptr<InputStream> inStream(
+      new MemoryInputStream (memStream.getData(), memStream.getLength()));
+    std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+    std::unique_ptr<RowReader> rowReader = createRowReader(reader.get(), 
readerTimezone);
+    EXPECT_EQ(true, rowReader->next(*batch));
+
+    // verify we get same wall clock in reader timezone
+    setenv("TZ", readerTimezone, 1); tzset();
+    memset(&tm, 0, sizeof(struct tm));
+    time_t ttime = tsBatch.data[0];
+    localtime_r(&ttime, &tm);
+    char buf[20];
+    strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &tm);
+    EXPECT_TRUE(strncmp(buf, tsStr.c_str(), tsStr.size()) == 0);
+
+    // restore TZ env
+    if (tzBk) {
+      setenv("TZ", tzBk, 1); tzset();
+    } else {
+      unsetenv("TZ"); tzset();
+    }
+  }
+
+  TEST_P(WriterTest, writeTimestampWithTimezone) {
+    const int IS_DST = 1, NOT_DST = 0;
+    testWriteTimestampWithTimezone(fileVersion, "GMT", "GMT", "2001-11-12 
18:31:01");
+    // behavior for Apache Orc (writer & reader timezone can change)
+    testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", 
"America/Los_Angeles", "2001-11-12 18:31:01");
+    testWriteTimestampWithTimezone(fileVersion, "Asia/Shanghai", 
"Asia/Shanghai", "2001-11-12 18:31:01");
+    testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", 
"Asia/Shanghai", "2001-11-12 18:31:01");
+    testWriteTimestampWithTimezone(fileVersion, "Asia/Shanghai", 
"America/Los_Angeles", "2001-11-12 18:31:01");
+    testWriteTimestampWithTimezone(fileVersion, "GMT", "Asia/Shanghai", 
"2001-11-12 18:31:01");
+    testWriteTimestampWithTimezone(fileVersion, "Asia/Shanghai", "GMT", 
"2001-11-12 18:31:01");
+    testWriteTimestampWithTimezone(fileVersion, "Asia/Shanghai", 
"America/Los_Angeles", "2018-01-01 23:59:59");
+    // daylight saving started at 2012-03-11 02:00:00 in Los Angeles
+    testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", 
"Asia/Shanghai", "2012-03-11 01:59:59", NOT_DST);
+    testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", 
"Asia/Shanghai", "2012-03-11 03:00:00", IS_DST);
+    testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", 
"Asia/Shanghai", "2012-03-11 03:00:01", IS_DST);
+    // daylight saving ended at 2012-11-04 02:00:00 in Los Angeles
+    testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", 
"Asia/Shanghai", "2012-11-04 01:59:59", IS_DST);
+    testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", 
"Asia/Shanghai", "2012-11-04 02:00:00", NOT_DST);
+    testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", 
"Asia/Shanghai", "2012-11-04 02:00:01", NOT_DST);
+    // other daylight saving time
+    testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", 
"Asia/Shanghai", "2014-06-06 12:34:56", IS_DST);
+    testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles", 
"America/Los_Angeles", "2014-06-06 12:34:56", IS_DST);
+  }
+#endif
+
   TEST_P(WriterTest, writeCharAndVarcharColumn) {
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
     MemoryPool * pool = getDefaultPool();

Reply via email to