This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new acfbbbf ORC-784: [C++] Support setting timezone to timestamp column
writer/reader
acfbbbf is described below
commit acfbbbf3e4cff72b21d71fc4c85add4105d20b53
Author: coderex2522 <[email protected]>
AuthorDate: Mon Apr 19 22:05:17 2021 +0800
ORC-784: [C++] Support setting timezone to timestamp column writer/reader
This closes #683
---
c++/include/orc/Reader.hh | 10 +++++
c++/include/orc/Writer.hh | 18 ++++++++
c++/src/ColumnReader.cc | 21 ++++++++-
c++/src/ColumnReader.hh | 5 +++
c++/src/ColumnWriter.cc | 2 +-
c++/src/Options.hh | 11 +++++
c++/src/Reader.cc | 6 ++-
c++/src/Reader.hh | 4 ++
c++/src/StripeStream.cc | 10 ++++-
c++/src/StripeStream.hh | 6 ++-
c++/src/Timezone.hh | 4 ++
c++/src/Writer.cc | 22 +++++++--
c++/test/TestColumnReader.cc | 4 ++
c++/test/TestWriter.cc | 105 ++++++++++++++++++++++++++++++++++++++++++-
14 files changed, 215 insertions(+), 13 deletions(-)
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index eee9551..cbc766a 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -256,6 +256,16 @@ namespace orc {
* Get search argument for predicate push down
*/
std::shared_ptr<SearchArgument> getSearchArgument() const;
+
+ /**
+ * Set desired timezone to return data of timestamp type
+ */
+ RowReaderOptions& setTimezoneName(const std::string& zoneName);
+
+ /**
+ * Get desired timezone to return data of timestamp type
+ */
+ const std::string& getTimezoneName() const;
};
diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh
index 5b33386..78b0b97 100644
--- a/c++/include/orc/Writer.hh
+++ b/c++/include/orc/Writer.hh
@@ -217,6 +217,24 @@ namespace orc {
* Get version of BloomFilter
*/
BloomFilterVersion getBloomFilterVersion() const;
+
+ /**
+ * Get writer timezone
+ * @return writer timezone
+ */
+ const Timezone& getTimezone() const;
+
+ /**
+ * Get writer timezone name
+ * @return writer timezone name
+ */
+ const std::string& getTimezoneName() const;
+
+ /**
+ * Set writer timezone
+ * @param zone writer timezone name
+ */
+ WriterOptions& setTimezoneName(const std::string& zone);
};
class Writer {
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index 9694107..f0d7dde 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -305,7 +305,9 @@ namespace orc {
std::unique_ptr<orc::RleDecoder> secondsRle;
std::unique_ptr<orc::RleDecoder> nanoRle;
const Timezone& writerTimezone;
+ const Timezone& readerTimezone;
const int64_t epochOffset;
+ const bool sameTimezone;
public:
TimestampColumnReader(const Type& type, StripeStreams& stripe);
@@ -326,7 +328,9 @@ namespace orc {
StripeStreams& stripe
): ColumnReader(type, stripe),
writerTimezone(stripe.getWriterTimezone()),
- epochOffset(writerTimezone.getEpoch()) {
+ readerTimezone(stripe.getReaderTimezone()),
+ epochOffset(writerTimezone.getEpoch()),
+ sameTimezone(&writerTimezone ==
&readerTimezone){
RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
@@ -373,7 +377,20 @@ namespace orc {
}
}
int64_t writerTime = secsBuffer[i] + epochOffset;
- secsBuffer[i] = writerTimezone.convertToUTC(writerTime);
+ if (!sameTimezone) {
+ // adjust timestamp value to same wall clock time if writer and
reader
+ // time zones have different rules, which is required for Apache Orc.
+ const auto& wv = writerTimezone.getVariant(writerTime);
+ const auto& rv = readerTimezone.getVariant(writerTime);
+ if (!wv.hasSameTzRule(rv)) {
+ // If the timezone adjustment moves the millis across a DST
boundary,
+ // we need to reevaluate the offsets.
+ int64_t adjustedTime = writerTime + wv.gmtOffset - rv.gmtOffset;
+ const auto& adjustedReader =
readerTimezone.getVariant(adjustedTime);
+ writerTime = writerTime + wv.gmtOffset - adjustedReader.gmtOffset;
+ }
+ }
+ secsBuffer[i] = writerTime;
if (secsBuffer[i] < 0 && nanoBuffer[i] > 999999) {
secsBuffer[i] -= 1;
}
diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh
index 0c64e5b..87994da 100644
--- a/c++/src/ColumnReader.hh
+++ b/c++/src/ColumnReader.hh
@@ -69,6 +69,11 @@ namespace orc {
virtual const Timezone& getWriterTimezone() const = 0;
/**
+ * Get the reader's timezone, so that we can convert their dates correctly.
+ */
+ virtual const Timezone& getReaderTimezone() const = 0;
+
+ /**
* Get the error stream.
* @return a pointer to the stream that should get error messages
*/
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 5af4922..9a1f28b 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -1731,7 +1731,7 @@ namespace orc {
const WriterOptions& options) :
ColumnWriter(type, factory, options),
rleVersion(options.getRleVersion()),
- timezone(getTimezoneByName("GMT")){
+ timezone(options.getTimezone()){
std::unique_ptr<BufferedOutputStream> dataStream =
factory.createStream(proto::Stream_Kind_DATA);
std::unique_ptr<BufferedOutputStream> secondaryStream =
diff --git a/c++/src/Options.hh b/c++/src/Options.hh
index 2808ffa..61d4731 100644
--- a/c++/src/Options.hh
+++ b/c++/src/Options.hh
@@ -129,6 +129,7 @@ namespace orc {
int32_t forcedScaleOnHive11Decimal;
bool enableLazyDecoding;
std::shared_ptr<SearchArgument> sargs;
+ std::string readerTimezone;
RowReaderOptionsPrivate() {
selection = ColumnSelection_NONE;
@@ -137,6 +138,7 @@ namespace orc {
throwOnHive11DecimalOverflow = true;
forcedScaleOnHive11Decimal = 6;
enableLazyDecoding = false;
+ readerTimezone = "GMT";
}
};
@@ -259,6 +261,15 @@ namespace orc {
std::shared_ptr<SearchArgument> RowReaderOptions::getSearchArgument() const {
return privateBits->sargs;
}
+
+ RowReaderOptions& RowReaderOptions::setTimezoneName(const std::string&
zoneName) {
+ privateBits->readerTimezone = zoneName;
+ return *this;
+ }
+
+ const std::string& RowReaderOptions::getTimezoneName() const {
+ return privateBits->readerTimezone;
+ }
}
#endif
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 42ab41c..81a96f5 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -195,7 +195,8 @@ namespace orc {
forcedScaleOnHive11Decimal(opts.getForcedScaleOnHive11Decimal()),
footer(contents->footer.get()),
firstRowOfStripe(*contents->pool, 0),
- enableEncodedBlock(opts.getEnableLazyDecoding()) {
+ enableEncodedBlock(opts.getEnableLazyDecoding()),
+
readerTimezone(getTimezoneByName(opts.getTimezoneName())) {
uint64_t numberOfStripes;
numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
currentStripe = numberOfStripes;
@@ -978,7 +979,8 @@ namespace orc {
currentStripeFooter,
currentStripeInfo.offset(),
*contents->stream,
- writerTimezone);
+ writerTimezone,
+ readerTimezone);
reader = buildReader(*contents->schema, stripeStreams);
if (sargsApplier) {
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 0693c62..7240975 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -69,6 +69,7 @@ namespace orc {
const FileContents& contents);
class ReaderImpl;
+ class Timezone;
class ColumnSelector {
private:
@@ -147,6 +148,9 @@ namespace orc {
std::shared_ptr<SearchArgument> sargs;
std::unique_ptr<SargsApplier> sargsApplier;
+ // desired timezone to return data of timestamp types.
+ const Timezone& readerTimezone;
+
// load stripe index if not done so
void loadStripeIndex();
diff --git a/c++/src/StripeStream.cc b/c++/src/StripeStream.cc
index b63f19d..eda565c 100644
--- a/c++/src/StripeStream.cc
+++ b/c++/src/StripeStream.cc
@@ -30,14 +30,16 @@ namespace orc {
const proto::StripeFooter& _footer,
uint64_t _stripeStart,
InputStream& _input,
- const Timezone& _writerTimezone
+ const Timezone& _writerTimezone,
+ const Timezone& _readerTimezone
): reader(_reader),
stripeInfo(_stripeInfo),
footer(_footer),
stripeIndex(_index),
stripeStart(_stripeStart),
input(_input),
- writerTimezone(_writerTimezone) {
+ writerTimezone(_writerTimezone),
+ readerTimezone(_readerTimezone) {
// PASS
}
@@ -71,6 +73,10 @@ namespace orc {
return writerTimezone;
}
+ const Timezone& StripeStreamsImpl::getReaderTimezone() const {
+ return readerTimezone;
+ }
+
std::ostream* StripeStreamsImpl::getErrorStream() const {
return reader.getFileContents().errorStream;
}
diff --git a/c++/src/StripeStream.hh b/c++/src/StripeStream.hh
index 5cbaf60..73ce7b3 100644
--- a/c++/src/StripeStream.hh
+++ b/c++/src/StripeStream.hh
@@ -43,6 +43,7 @@ namespace orc {
const uint64_t stripeStart;
InputStream& input;
const Timezone& writerTimezone;
+ const Timezone& readerTimezone;
public:
StripeStreamsImpl(const RowReaderImpl& reader, uint64_t index,
@@ -50,7 +51,8 @@ namespace orc {
const proto::StripeFooter& footer,
uint64_t stripeStart,
InputStream& input,
- const Timezone& writerTimezone);
+ const Timezone& writerTimezone,
+ const Timezone& readerTimezone);
virtual ~StripeStreamsImpl() override;
@@ -68,6 +70,8 @@ namespace orc {
const Timezone& getWriterTimezone() const override;
+ const Timezone& getReaderTimezone() const override;
+
std::ostream* getErrorStream() const override;
bool getThrowOnHive11DecimalOverflow() const override;
diff --git a/c++/src/Timezone.hh b/c++/src/Timezone.hh
index 136b7a1..6c8b861 100644
--- a/c++/src/Timezone.hh
+++ b/c++/src/Timezone.hh
@@ -42,6 +42,10 @@ namespace orc {
bool isDst;
std::string name;
+ bool hasSameTzRule(const TimezoneVariant& other) const {
+ return gmtOffset == other.gmtOffset && isDst == other.isDst;
+ }
+
std::string toString() const;
};
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index 66ecede..730d7ff 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -41,6 +41,7 @@ namespace orc {
std::set<uint64_t> columnsUseBloomFilter;
double bloomFilterFalsePositiveProb;
BloomFilterVersion bloomFilterVersion;
+ std::string timezone;
WriterOptionsPrivate() :
fileVersion(FileVersion::v_0_12()) { // default to
Hive_0_12
@@ -56,6 +57,10 @@ namespace orc {
enableIndex = true;
bloomFilterFalsePositiveProb = 0.05;
bloomFilterVersion = UTF8;
+ //Writer timezone uses "GMT" by default to get rid of potential issues
+ //introduced by moving timestamps between different timezones.
+ //Explictly set the writer timezone if the use case depends on it.
+ timezone = "GMT";
}
};
@@ -229,6 +234,19 @@ namespace orc {
return privateBits->bloomFilterVersion;
}
+ const Timezone& WriterOptions::getTimezone() const {
+ return getTimezoneByName(privateBits->timezone);
+ }
+
+ const std::string& WriterOptions::getTimezoneName() const {
+ return privateBits->timezone;
+ }
+
+ WriterOptions& WriterOptions::setTimezoneName(const std::string& zone) {
+ privateBits->timezone = zone;
+ return *this;
+ }
+
Writer::~Writer() {
// PASS
}
@@ -439,9 +457,7 @@ namespace orc {
*stripeFooter.add_columns() = encodings[i];
}
- // use GMT to guarantee TimestampVectorBatch from reader can write
- // same wall clock time
- stripeFooter.set_writertimezone("GMT");
+ stripeFooter.set_writertimezone(options.getTimezoneName());
// add stripe statistics to metadata
proto::StripeStatistics* stripeStats = metadata.add_stripestats();
diff --git a/c++/test/TestColumnReader.cc b/c++/test/TestColumnReader.cc
index 5acb19a..bb16700 100644
--- a/c++/test/TestColumnReader.cc
+++ b/c++/test/TestColumnReader.cc
@@ -73,6 +73,10 @@ namespace orc {
const Timezone &getWriterTimezone() const override {
return getTimezoneByName("America/Los_Angeles");
}
+
+ const Timezone& getReaderTimezone() const override {
+ return getTimezoneByName("GMT");
+ }
};
MockStripeStreams::~MockStripeStreams() {
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index 3e63b91..f91aecb 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -48,7 +48,8 @@ namespace orc {
MemoryPool* memoryPool,
OutputStream* stream,
FileVersion version,
- uint64_t stride = 0){
+ uint64_t stride = 0,
+ const std::string& timezone = "GMT"){
WriterOptions options;
options.setStripeSize(stripeSize);
options.setCompressionBlockSize(compresionblockSize);
@@ -56,6 +57,7 @@ namespace orc {
options.setMemoryPool(memoryPool);
options.setRowIndexStride(stride);
options.setFileVersion(version);
+ options.setTimezoneName(timezone);
return createWriter(type, stream, options);
}
@@ -67,8 +69,11 @@ namespace orc {
return createReader(std::move(stream), options);
}
- std::unique_ptr<RowReader> createRowReader(Reader* reader) {
+ std::unique_ptr<RowReader> createRowReader(
+ Reader* reader,
+ const std::string& timezone =
"GMT") {
RowReaderOptions rowReaderOpts;
+ rowReaderOpts.setTimezoneName(timezone);
return reader->createRowReader(rowReaderOpts);
}
@@ -699,6 +704,102 @@ namespace orc {
}
}
+//TODO: Disable the test below for Windows for following reasons:
+//First, the timezone name provided by Windows cannot be used as
+//a parameter to the getTimezoneByName function. Secondly, the
+//function of setting timezone in Windows is different from Linux.
+#ifndef _MSC_VER
+ void testWriteTimestampWithTimezone(FileVersion fileVersion,
+ const char* writerTimezone,
+ const char* readerTimezone,
+ const std::string& tsStr,
+ int isDst = 0) {
+ char* tzBk = getenv("TZ"); // backup TZ env
+
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool* pool = getDefaultPool();
+ std::unique_ptr<Type>
type(Type::buildTypeFromString("struct<col1:timestamp>"));
+
+ uint64_t stripeSize = 16 * 1024;
+ uint64_t compressionBlockSize = 1024;
+ uint64_t rowCount = 1;
+
+ std::unique_ptr<Writer> writer = createWriter(stripeSize,
+ compressionBlockSize,
+ CompressionKind_ZLIB,
+ *type,
+ pool,
+ &memStream,
+ fileVersion,
+ 0,
+ writerTimezone);
+ auto batch = writer->createRowBatch(rowCount);
+ auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+ auto& tsBatch =
dynamic_cast<TimestampVectorBatch&>(*structBatch.fields[0]);
+
+ // write timestamp in the writer timezone
+ setenv("TZ", writerTimezone, 1); tzset();
+ struct tm tm;
+ memset(&tm, 0, sizeof(struct tm));
+ strptime(tsStr.c_str(), "%Y-%m-%d %H:%M:%S", &tm);
+ // mktime() does depend on external hint for daylight saving time
+ tm.tm_isdst = isDst;
+ tsBatch.data[0] = mktime(&tm);
+ tsBatch.nanoseconds[0] = 0;
+ structBatch.numElements = rowCount;
+ tsBatch.numElements = rowCount;
+ writer->add(*batch);
+ writer->close();
+
+ // read timestamp from the reader timezone
+ std::unique_ptr<InputStream> inStream(
+ new MemoryInputStream (memStream.getData(), memStream.getLength()));
+ std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+ std::unique_ptr<RowReader> rowReader = createRowReader(reader.get(),
readerTimezone);
+ EXPECT_EQ(true, rowReader->next(*batch));
+
+ // verify we get same wall clock in reader timezone
+ setenv("TZ", readerTimezone, 1); tzset();
+ memset(&tm, 0, sizeof(struct tm));
+ time_t ttime = tsBatch.data[0];
+ localtime_r(&ttime, &tm);
+ char buf[20];
+ strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &tm);
+ EXPECT_TRUE(strncmp(buf, tsStr.c_str(), tsStr.size()) == 0);
+
+ // restore TZ env
+ if (tzBk) {
+ setenv("TZ", tzBk, 1); tzset();
+ } else {
+ unsetenv("TZ"); tzset();
+ }
+ }
+
+ TEST_P(WriterTest, writeTimestampWithTimezone) {
+ const int IS_DST = 1, NOT_DST = 0;
+ testWriteTimestampWithTimezone(fileVersion, "GMT", "GMT", "2001-11-12
18:31:01");
+ // behavior for Apache Orc (writer & reader timezone can change)
+ testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles",
"America/Los_Angeles", "2001-11-12 18:31:01");
+ testWriteTimestampWithTimezone(fileVersion, "Asia/Shanghai",
"Asia/Shanghai", "2001-11-12 18:31:01");
+ testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles",
"Asia/Shanghai", "2001-11-12 18:31:01");
+ testWriteTimestampWithTimezone(fileVersion, "Asia/Shanghai",
"America/Los_Angeles", "2001-11-12 18:31:01");
+ testWriteTimestampWithTimezone(fileVersion, "GMT", "Asia/Shanghai",
"2001-11-12 18:31:01");
+ testWriteTimestampWithTimezone(fileVersion, "Asia/Shanghai", "GMT",
"2001-11-12 18:31:01");
+ testWriteTimestampWithTimezone(fileVersion, "Asia/Shanghai",
"America/Los_Angeles", "2018-01-01 23:59:59");
+ // daylight saving started at 2012-03-11 02:00:00 in Los Angeles
+ testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles",
"Asia/Shanghai", "2012-03-11 01:59:59", NOT_DST);
+ testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles",
"Asia/Shanghai", "2012-03-11 03:00:00", IS_DST);
+ testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles",
"Asia/Shanghai", "2012-03-11 03:00:01", IS_DST);
+ // daylight saving ended at 2012-11-04 02:00:00 in Los Angeles
+ testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles",
"Asia/Shanghai", "2012-11-04 01:59:59", IS_DST);
+ testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles",
"Asia/Shanghai", "2012-11-04 02:00:00", NOT_DST);
+ testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles",
"Asia/Shanghai", "2012-11-04 02:00:01", NOT_DST);
+ // other daylight saving time
+ testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles",
"Asia/Shanghai", "2014-06-06 12:34:56", IS_DST);
+ testWriteTimestampWithTimezone(fileVersion, "America/Los_Angeles",
"America/Los_Angeles", "2014-06-06 12:34:56", IS_DST);
+ }
+#endif
+
TEST_P(WriterTest, writeCharAndVarcharColumn) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool * pool = getDefaultPool();