This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new 5c48e29 ORC-666: [C++] Support timestamp with local timezone
5c48e29 is described below
commit 5c48e291f3cbb4060243895739977102f82b861a
Author: coderex2522 <[email protected]>
AuthorDate: Fri Apr 23 10:08:39 2021 +0800
ORC-666: [C++] Support timestamp with local timezone
Add TIMESTAMP_INSTANT type to C++ ORC writer/reader. Make it consistent
with the Java side.
This closes #688
---
c++/include/orc/Type.hh | 3 ++-
c++/src/ColumnPrinter.cc | 1 +
c++/src/ColumnReader.cc | 21 +++++++++++++-----
c++/src/ColumnWriter.cc | 26 +++++++++++++++++++----
c++/src/Reader.cc | 1 +
c++/src/Statistics.cc | 1 +
c++/src/TypeImpl.cc | 9 +++++++-
c++/src/Writer.cc | 4 ++++
c++/test/TestType.cc | 5 +++++
c++/test/TestWriter.cc | 53 ++++++++++++++++++++++++++++++++++++++++++++++
tools/src/CSVFileImport.cc | 1 +
11 files changed, 114 insertions(+), 11 deletions(-)
diff --git a/c++/include/orc/Type.hh b/c++/include/orc/Type.hh
index cfef512..a7df830 100644
--- a/c++/include/orc/Type.hh
+++ b/c++/include/orc/Type.hh
@@ -43,7 +43,8 @@ namespace orc {
DECIMAL = 14,
DATE = 15,
VARCHAR = 16,
- CHAR = 17
+ CHAR = 17,
+ TIMESTAMP_INSTANT = 18
};
class Type {
diff --git a/c++/src/ColumnPrinter.cc b/c++/src/ColumnPrinter.cc
index d781eea..ab6b690 100644
--- a/c++/src/ColumnPrinter.cc
+++ b/c++/src/ColumnPrinter.cc
@@ -249,6 +249,7 @@ namespace orc {
break;
case TIMESTAMP:
+ case TIMESTAMP_INSTANT:
result = new TimestampColumnPrinter(buffer);
break;
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index f0d7dde..439d92a 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -310,7 +310,9 @@ namespace orc {
const bool sameTimezone;
public:
- TimestampColumnReader(const Type& type, StripeStreams& stripe);
+ TimestampColumnReader(const Type& type,
+ StripeStreams& stripe,
+ bool isInstantType);
~TimestampColumnReader() override;
uint64_t skip(uint64_t numValues) override;
@@ -325,10 +327,15 @@ namespace orc {
TimestampColumnReader::TimestampColumnReader(const Type& type,
- StripeStreams& stripe
+ StripeStreams& stripe,
+ bool isInstantType
): ColumnReader(type, stripe),
- writerTimezone(stripe.getWriterTimezone()),
- readerTimezone(stripe.getReaderTimezone()),
+ writerTimezone(isInstantType ?
+ getTimezoneByName("GMT") :
+ stripe.getWriterTimezone()),
+ readerTimezone(isInstantType ?
+ getTimezoneByName("GMT") :
+ stripe.getReaderTimezone()),
epochOffset(writerTimezone.getEpoch()),
sameTimezone(&writerTimezone ==
&readerTimezone){
RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
@@ -1823,7 +1830,11 @@ namespace orc {
case TIMESTAMP:
return std::unique_ptr<ColumnReader>
- (new TimestampColumnReader(type, stripe));
+ (new TimestampColumnReader(type, stripe, false));
+
+ case TIMESTAMP_INSTANT:
+ return std::unique_ptr<ColumnReader>
+ (new TimestampColumnReader(type, stripe, true));
case DECIMAL:
// is this a Hive 0.11 or 0.12 file?
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 9a1f28b..a259594 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -1701,7 +1701,8 @@ namespace orc {
public:
TimestampColumnWriter(const Type& type,
const StreamsFactory& factory,
- const WriterOptions& options);
+ const WriterOptions& options,
+ bool isInstantType);
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
@@ -1723,15 +1724,21 @@ namespace orc {
private:
RleVersion rleVersion;
const Timezone& timezone;
+ const bool isUTC;
};
TimestampColumnWriter::TimestampColumnWriter(
const Type& type,
const StreamsFactory& factory,
- const WriterOptions& options) :
+ const WriterOptions& options,
+ bool isInstantType) :
ColumnWriter(type, factory, options),
rleVersion(options.getRleVersion()),
- timezone(options.getTimezone()){
+ timezone(isInstantType ?
+ getTimezoneByName("GMT") :
+ options.getTimezone()),
+ isUTC(isInstantType ||
+ options.getTimezoneName() == "GMT") {
std::unique_ptr<BufferedOutputStream> dataStream =
factory.createStream(proto::Stream_Kind_DATA);
std::unique_ptr<BufferedOutputStream> secondaryStream =
@@ -1801,6 +1808,9 @@ namespace orc {
if (notNull == nullptr || notNull[i]) {
// TimestampVectorBatch already stores data in UTC
int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000;
+ if (!isUTC) {
+ millsUTC = timezone.convertToUTC(millsUTC);
+ }
++count;
if (enableBloomFilter) {
bloomFilter->addLong(millsUTC);
@@ -2958,7 +2968,15 @@ namespace orc {
new TimestampColumnWriter(
type,
factory,
- options));
+ options,
+ false));
+ case TIMESTAMP_INSTANT:
+ return std::unique_ptr<ColumnWriter>(
+ new TimestampColumnWriter(
+ type,
+ factory,
+ options,
+ true));
case DECIMAL:
if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_64) {
return std::unique_ptr<ColumnWriter>(
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 81a96f5..0811dbe 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -784,6 +784,7 @@ namespace orc {
case proto::Type_Kind_BINARY:
case proto::Type_Kind_DECIMAL:
case proto::Type_Kind_TIMESTAMP:
+ case proto::Type_Kind_TIMESTAMP_INSTANT:
return 3;
case proto::Type_Kind_CHAR:
case proto::Type_Kind_STRING:
diff --git a/c++/src/Statistics.cc b/c++/src/Statistics.cc
index 20e2fc9..b0c9de8 100644
--- a/c++/src/Statistics.cc
+++ b/c++/src/Statistics.cc
@@ -415,6 +415,7 @@ namespace orc {
return std::unique_ptr<MutableColumnStatistics>(
new DateColumnStatisticsImpl());
case TIMESTAMP:
+ case TIMESTAMP_INSTANT:
return std::unique_ptr<MutableColumnStatistics>(
new TimestampColumnStatisticsImpl());
case DECIMAL:
diff --git a/c++/src/TypeImpl.cc b/c++/src/TypeImpl.cc
index 3049174..4d5a5a9 100644
--- a/c++/src/TypeImpl.cc
+++ b/c++/src/TypeImpl.cc
@@ -205,6 +205,8 @@ namespace orc {
return "binary";
case TIMESTAMP:
return "timestamp";
+ case TIMESTAMP_INSTANT:
+ return "timestamp with local time zone";
case LIST:
return "array<" + (subTypes[0] ? subTypes[0]->toString() : "void") + ">";
case MAP:
@@ -286,6 +288,7 @@ namespace orc {
(new StringVectorBatch(capacity, memoryPool));
case TIMESTAMP:
+ case TIMESTAMP_INSTANT:
return std::unique_ptr<ColumnVectorBatch>
(new TimestampVectorBatch(capacity, memoryPool));
@@ -401,6 +404,7 @@ namespace orc {
case proto::Type_Kind_STRING:
case proto::Type_Kind_BINARY:
case proto::Type_Kind_TIMESTAMP:
+ case proto::Type_Kind_TIMESTAMP_INSTANT:
case proto::Type_Kind_DATE:
ret = std::unique_ptr<Type>
(new TypeImpl(static_cast<TypeKind>(type.kind())));
@@ -484,6 +488,7 @@ namespace orc {
case STRING:
case BINARY:
case TIMESTAMP:
+ case TIMESTAMP_INSTANT:
case DATE:
result = new TypeImpl(fileType->getKind());
break;
@@ -658,6 +663,8 @@ namespace orc {
return std::unique_ptr<Type>(new TypeImpl(BINARY));
} else if (category == "timestamp") {
return std::unique_ptr<Type>(new TypeImpl(TIMESTAMP));
+ } else if (category == "timestamp with local time zone") {
+ return std::unique_ptr<Type>(new TypeImpl(TIMESTAMP_INSTANT));
} else if (category == "array") {
return parseArrayType(input, start, end);
} else if (category == "map") {
@@ -700,7 +707,7 @@ namespace orc {
if (input[endPos] == ':') {
fieldName = input.substr(pos, endPos - pos);
pos = ++endPos;
- while (endPos < end && isalpha(input[endPos])) {
+ while (endPos < end && (isalpha(input[endPos]) || input[endPos] == '
')) {
++endPos;
}
}
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index f6d127f..fda5f5c 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -585,6 +585,10 @@ namespace orc {
protoType.set_kind(proto::Type_Kind_TIMESTAMP);
break;
}
+ case TIMESTAMP_INSTANT: {
+ protoType.set_kind(proto::Type_Kind_TIMESTAMP_INSTANT);
+ break;
+ }
case LIST: {
protoType.set_kind(proto::Type_Kind_LIST);
break;
diff --git a/c++/test/TestType.cc b/c++/test/TestType.cc
index e70a9ef..3d6f2d1 100644
--- a/c++/test/TestType.cc
+++ b/c++/test/TestType.cc
@@ -295,6 +295,11 @@ namespace orc {
EXPECT_EQ(typeStr, type->toString());
typeStr =
+ "struct<a:bigint,b:struct<a:binary,b:timestamp with local time zone>>";
+ type = Type::buildTypeFromString(typeStr);
+ EXPECT_EQ(typeStr, type->toString());
+
+ typeStr =
"struct<a:bigint,b:struct<a:binary,b:timestamp>,c:map<double,tinyint>>";
type = Type::buildTypeFromString(typeStr);
EXPECT_EQ(typeStr, type->toString());
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index f91aecb..506887e 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -800,6 +800,59 @@ namespace orc {
}
#endif
+ TEST_P(WriterTest, writeTimestampInstant) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool* pool = getDefaultPool();
+ std::unique_ptr<Type> type(Type::buildTypeFromString(
+ "struct<col1:timestamp with local time zone>"));
+
+ uint64_t stripeSize = 16 * 1024;
+ uint64_t compressionBlockSize = 1024;
+ uint64_t rowCount = 102400;
+
+ std::unique_ptr<Writer> writer = createWriter(stripeSize,
+ compressionBlockSize,
+ CompressionKind_ZLIB,
+ *type,
+ pool,
+ &memStream,
+ fileVersion);
+ std::unique_ptr<ColumnVectorBatch> batch =
writer->createRowBatch(rowCount);
+ StructVectorBatch * structBatch =
+ dynamic_cast<StructVectorBatch *>(batch.get());
+ TimestampVectorBatch * tsBatch =
+ dynamic_cast<TimestampVectorBatch *>(structBatch->fields[0]);
+
+ std::vector<std::time_t> times(rowCount);
+ for (uint64_t i = 0; i < rowCount; ++i) {
+ time_t currTime = -14210715; // 1969-07-20 12:34:45
+ times[i] = static_cast<int64_t>(currTime) + static_cast<int64_t >(i *
3660);
+ tsBatch->data[i] = times[i];
+ tsBatch->nanoseconds[i] = static_cast<int64_t>(i * 1000);
+ }
+ structBatch->numElements = rowCount;
+ tsBatch->numElements = rowCount;
+
+ writer->add(*batch);
+ writer->close();
+
+ std::unique_ptr<InputStream> inStream(
+ new MemoryInputStream (memStream.getData(), memStream.getLength()));
+ std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+ std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+ EXPECT_EQ(rowCount, reader->getNumberOfRows());
+
+ batch = rowReader->createRowBatch(rowCount);
+ EXPECT_EQ(true, rowReader->next(*batch));
+
+ structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+ tsBatch = dynamic_cast<TimestampVectorBatch *>(structBatch->fields[0]);
+ for (uint64_t i = 0; i < rowCount; ++i) {
+ EXPECT_EQ(times[i], tsBatch->data[i]);
+ EXPECT_EQ(i * 1000, tsBatch->nanoseconds[i]);
+ }
+ }
+
TEST_P(WriterTest, writeCharAndVarcharColumn) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool * pool = getDefaultPool();
diff --git a/tools/src/CSVFileImport.cc b/tools/src/CSVFileImport.cc
index a98b511..3857adc 100644
--- a/tools/src/CSVFileImport.cc
+++ b/tools/src/CSVFileImport.cc
@@ -448,6 +448,7 @@ int main(int argc, char* argv[]) {
i);
break;
case orc::TIMESTAMP:
+ case orc::TIMESTAMP_INSTANT:
fillTimestampValues(data,
structBatch->fields[i],
numValues,