This is an automated email from the ASF dual-hosted git repository.
kuczoram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 06c5923 HIVE-21215: Read Parquet INT64 timestamp (Marta Kuczora,
reviewed by Karen Coppage and Peter Vary)
06c5923 is described below
commit 06c5923bf24330a692718801e6726e70ac6aa84d
Author: Marta Kuczora <[email protected]>
AuthorDate: Tue Feb 4 10:51:06 2020 +0100
HIVE-21215: Read Parquet INT64 timestamp (Marta Kuczora, reviewed by Karen
Coppage and Peter Vary)
---
.../apache/hadoop/hive/common/type/Timestamp.java | 5 ++
.../hive/ql/io/parquet/convert/ETypeConverter.java | 22 ++++++
.../parquet/timestamp/ParquetTimestampUtils.java | 56 +++++++++++++++
.../vector/ParquetDataColumnReaderFactory.java | 62 ++++++++++++++---
.../vector/VectorizedPrimitiveColumnReader.java | 3 +
.../ql/io/parquet/convert/TestETypeConverter.java | 81 ++++++++++++++++++++++
6 files changed, 219 insertions(+), 10 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java
b/common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java
index f2c1493..0193aba 100644
--- a/common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java
+++ b/common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java
@@ -21,6 +21,7 @@ import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
@@ -182,6 +183,10 @@ public class Timestamp implements Comparable<Timestamp> {
LocalDateTime.ofEpochSecond(epochSecond, nanos, ZoneOffset.UTC));
}
+ public static Timestamp ofEpochSecond(long epochSecond, long nanos, ZoneId
zone) {
+ return new
Timestamp(LocalDateTime.ofInstant(Instant.ofEpochSecond(epochSecond, nanos),
zone));
+ }
+
public static Timestamp ofEpochMilli(long epochMilli) {
return new Timestamp(LocalDateTime
.ofInstant(Instant.ofEpochMilli(epochMilli), ZoneOffset.UTC));
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java
index d67b030..490b71e 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.ParquetTimestampUtils;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.io.DateWritableV2;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -48,6 +49,7 @@ import org.apache.parquet.schema.LogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor;
import
org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
@@ -683,6 +685,21 @@ public enum ETypeConverter {
};
}
},
+ EINT64_TIMESTAMP_CONVERTER(TimestampWritableV2.class) {
+ @Override
+ PrimitiveConverter getConverter(final PrimitiveType type, final int index,
final ConverterParent parent,
+ TypeInfo hiveTypeInfo) {
+ return new PrimitiveConverter() {
+ @Override
+ public void addLong(final long value) {
+ TimestampLogicalTypeAnnotation logicalType =
(TimestampLogicalTypeAnnotation) type.getLogicalTypeAnnotation();
+ Timestamp timestamp =
+ ParquetTimestampUtils.getTimestamp(value, logicalType.getUnit(),
logicalType.isAdjustedToUTC());
+ parent.set(index, new TimestampWritableV2(timestamp));
+ }
+ };
+ }
+ },
EDATE_CONVERTER(DateWritableV2.class) {
@Override
PrimitiveConverter getConverter(final PrimitiveType type, final int index,
final ConverterParent parent, TypeInfo hiveTypeInfo) {
@@ -730,6 +747,11 @@ public enum ETypeConverter {
public Optional<PrimitiveConverter>
visit(DateLogicalTypeAnnotation logicalTypeAnnotation) {
return Optional.of(EDATE_CONVERTER.getConverter(type, index,
parent, hiveTypeInfo));
}
+
+ @Override
+ public Optional<PrimitiveConverter>
visit(TimestampLogicalTypeAnnotation logicalTypeAnnotation) {
+ return Optional.of(EINT64_TIMESTAMP_CONVERTER.getConverter(type,
index, parent, hiveTypeInfo));
+ }
});
if (converter.isPresent()) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/ParquetTimestampUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/ParquetTimestampUtils.java
new file mode 100644
index 0000000..9ce07e7
--- /dev/null
+++
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/ParquetTimestampUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.timestamp;
+
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+
+import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
+
+public class ParquetTimestampUtils {
+ private static final long MILLI = 1000;
+ private static final long MICRO = 1_000_000;
+ private static final long NANO = 1_000_000_000;
+
+ public static Timestamp getTimestamp(long value, TimeUnit timeUnit, boolean
isAdjustedToUTC) {
+
+ ZoneId zone = ZoneOffset.UTC;
+ if (isAdjustedToUTC) {
+ zone = ZoneId.systemDefault();
+ }
+ long seconds = 0L;
+ long nanoseconds = 0L;
+
+ switch (timeUnit) {
+ case MILLIS:
+ seconds = value / MILLI;
+ nanoseconds = (value % MILLI) * MICRO;
+ break;
+
+ case MICROS:
+ seconds = value / MICRO;
+ nanoseconds = (value % MICRO) * MILLI;
+ break;
+
+ case NANOS:
+ seconds = value / NANO;
+ nanoseconds = (value % NANO);
+ break;
+ default:
+ break;
+ }
+ return Timestamp.ofEpochSecond(seconds, nanoseconds, zone);
+ }
+}
\ No newline at end of file
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java
index 519bd81..10dfe22 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java
@@ -25,6 +25,7 @@ import
org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
import org.apache.hadoop.hive.ql.io.parquet.convert.ETypeConverter;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.ParquetTimestampUtils;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
@@ -43,6 +44,8 @@ import org.apache.parquet.schema.LogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor;
import
org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation;
+import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
@@ -427,6 +430,9 @@ public final class ParquetDataColumnReaderFactory {
*/
public static class TypesFromInt64PageReader extends
DefaultParquetDataColumnReader {
+ private boolean isAdjustedToUTC;
+ private TimeUnit timeUnit;
+
public TypesFromInt64PageReader(ValuesReader realReader, int length, int
precision, int scale) {
super(realReader, length, precision, scale);
}
@@ -435,6 +441,18 @@ public final class ParquetDataColumnReaderFactory {
super(dict, length, precision, scale);
}
+ public TypesFromInt64PageReader(ValuesReader realReader, int length,
boolean isAdjustedToUTC, TimeUnit timeUnit) {
+ super(realReader, length);
+ this.isAdjustedToUTC = isAdjustedToUTC;
+ this.timeUnit = timeUnit;
+ }
+
+ public TypesFromInt64PageReader(Dictionary dict, int length, boolean
isAdjustedToUTC, TimeUnit timeUnit) {
+ super(dict, length);
+ this.isAdjustedToUTC = isAdjustedToUTC;
+ this.timeUnit = timeUnit;
+ }
+
@Override
public long readInteger() {
return super.validatedLong(valuesReader.readLong(),
serdeConstants.INT_TYPE_NAME);
@@ -533,6 +551,21 @@ public final class ParquetDataColumnReaderFactory {
return convertToBytes(value);
}
+ private Timestamp convert(Long value) {
+ Timestamp timestamp = ParquetTimestampUtils.getTimestamp(value,
timeUnit, isAdjustedToUTC);
+ return timestamp;
+ }
+
+ @Override
+ public Timestamp readTimestamp(int id) {
+ return convert(dict.decodeToLong(id));
+ }
+
+ @Override
+ public Timestamp readTimestamp() {
+ return convert(valuesReader.readLong());
+ }
+
private static String convertToString(long value) {
return Long.toString(value);
}
@@ -1844,20 +1877,29 @@ public final class ParquetDataColumnReaderFactory {
hiveScale);
}
case INT64:
+ LogicalTypeAnnotation logicalType =
parquetType.getLogicalTypeAnnotation();
+ if (logicalType instanceof TimestampLogicalTypeAnnotation) {
+ TimestampLogicalTypeAnnotation timestampLogicalType =
(TimestampLogicalTypeAnnotation) logicalType;
+ boolean isAdjustedToUTC = timestampLogicalType.isAdjustedToUTC();
+ TimeUnit timeUnit = timestampLogicalType.getUnit();
+ return isDictionary ? new TypesFromInt64PageReader(dictionary, length,
isAdjustedToUTC, timeUnit)
+ : new TypesFromInt64PageReader(valuesReader, length,
isAdjustedToUTC, timeUnit);
+ }
+
if (ETypeConverter.isUnsignedInteger(parquetType)) {
- return isDictionary ? new TypesFromUInt64PageReader(dictionary,
length, hivePrecision,
- hiveScale) : new TypesFromUInt64PageReader(valuesReader, length,
hivePrecision,
- hiveScale);
- } else if (parquetType.getLogicalTypeAnnotation() instanceof
DecimalLogicalTypeAnnotation) {
- DecimalLogicalTypeAnnotation logicalType =
(DecimalLogicalTypeAnnotation) parquetType.getLogicalTypeAnnotation();
- final short scale = (short) logicalType.getScale();
+ return isDictionary ? new TypesFromUInt64PageReader(dictionary,
length, hivePrecision, hiveScale)
+ : new TypesFromUInt64PageReader(valuesReader, length, hivePrecision,
hiveScale);
+ }
+
+ if (logicalType instanceof DecimalLogicalTypeAnnotation) {
+ DecimalLogicalTypeAnnotation decimalLogicalType =
(DecimalLogicalTypeAnnotation) logicalType;
+ final short scale = (short) decimalLogicalType.getScale();
return isDictionary ? new TypesFromInt64DecimalPageReader(dictionary,
length, scale, hivePrecision, hiveScale)
: new TypesFromInt64DecimalPageReader(valuesReader, length, scale,
hivePrecision, hiveScale);
- } else {
- return isDictionary ? new TypesFromInt64PageReader(dictionary, length,
hivePrecision,
- hiveScale) : new TypesFromInt64PageReader(valuesReader, length,
hivePrecision,
- hiveScale);
}
+
+ return isDictionary ? new TypesFromInt64PageReader(dictionary, length,
hivePrecision, hiveScale)
+ : new TypesFromInt64PageReader(valuesReader, length, hivePrecision,
hiveScale);
case FLOAT:
return isDictionary ? new TypesFromFloatPageReader(dictionary, length,
hivePrecision,
hiveScale) : new TypesFromFloatPageReader(valuesReader, length,
hivePrecision, hiveScale);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
index 2803baf..26ce573 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
@@ -448,6 +448,9 @@ public class VectorizedPrimitiveColumnReader extends
BaseVectorizedColumnReader
case INT96:
c.set(rowId, dataColumn.readTimestamp().toSqlTimestamp());
break;
+ case INT64:
+ c.set(rowId, dataColumn.readTimestamp().toSqlTimestamp());
+ break;
default:
throw new IOException(
"Unsupported parquet logical type: " +
type.getLogicalTypeAnnotation().toString() + " for timestamp");
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/convert/TestETypeConverter.java
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/convert/TestETypeConverter.java
index f6ee571..be4c880 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/convert/TestETypeConverter.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/convert/TestETypeConverter.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hive.ql.io.parquet.convert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.time.ZoneId;
+
import org.apache.hadoop.hive.common.type.Timestamp;
import
org.apache.hadoop.hive.ql.io.parquet.convert.ETypeConverter.BinaryConverter;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
@@ -43,6 +45,8 @@ import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type.Repetition;
@@ -115,6 +119,69 @@ public class TestETypeConverter {
}
@Test
+ public void testGetTimestampProlepticConverter() throws Exception {
+ Timestamp timestamp = Timestamp.valueOf("1572-06-15 15:12:20.0");
+ NanoTime nanoTime = NanoTimeUtils.getNanoTime(timestamp, true);
+ PrimitiveType primitiveType =
Types.optional(PrimitiveTypeName.INT96).named("value");
+ Writable writable = getWritableFromBinaryConverter(null, primitiveType,
nanoTime.toBinary());
+ TimestampWritableV2 timestampWritable = (TimestampWritableV2) writable;
+ assertEquals(timestamp.getNanos(), timestampWritable.getNanos());
+ }
+
+ @Test
+ public void testGetInt64MillisTimestampConverter() throws Exception {
+ Timestamp timestamp = Timestamp.valueOf("2018-07-15 15:12:20.112");
+ PrimitiveType primitiveType = createInt64TimestampType(false,
TimeUnit.MILLIS);
+ Writable writable = getWritableFromPrimitiveConverter(null, primitiveType,
timestamp.toEpochMilli());
+ TimestampWritableV2 timestampWritable = (TimestampWritableV2) writable;
+ assertEquals(timestamp.toEpochMilli(),
timestampWritable.getTimestamp().toEpochMilli());
+ }
+
+ @Test
+ public void testGetInt64MillisTimestampProlepticConverter() throws Exception
{
+ Timestamp timestamp = Timestamp.valueOf("1572-07-15 15:12:20.112");
+ PrimitiveType primitiveType = createInt64TimestampType(false,
TimeUnit.MILLIS);
+ Writable writable = getWritableFromPrimitiveConverter(null, primitiveType,
timestamp.toEpochMilli());
+ TimestampWritableV2 timestampWritable = (TimestampWritableV2) writable;
+ assertEquals(timestamp.toEpochMilli(),
timestampWritable.getTimestamp().toEpochMilli());
+ }
+
+ @Test
+ public void testGetInt64MicrosTimestampConverter() throws Exception {
+ Timestamp timestamp = Timestamp.valueOf("2018-07-15 15:12:20.112233");
+ PrimitiveType primitiveType = createInt64TimestampType(false,
TimeUnit.MICROS);
+ long time = timestamp.toEpochSecond() * 1000000 + timestamp.getNanos() /
1000;
+ Writable writable = getWritableFromPrimitiveConverter(null, primitiveType,
time);
+ TimestampWritableV2 timestampWritable = (TimestampWritableV2) writable;
+ assertEquals(timestamp.toEpochMilli(),
timestampWritable.getTimestamp().toEpochMilli());
+ assertEquals(timestamp.getNanos(), timestampWritable.getNanos());
+ }
+
+ @Test
+ public void testGetInt64NanosTimestampConverter() throws Exception {
+ Timestamp timestamp = Timestamp.valueOf("2018-07-15 15:12:20.11223344");
+ PrimitiveType primitiveType = createInt64TimestampType(false,
TimeUnit.NANOS);
+ long time = timestamp.toEpochSecond() * 1000000000 + timestamp.getNanos();
+ Writable writable = getWritableFromPrimitiveConverter(null, primitiveType,
time);
+ TimestampWritableV2 timestampWritable = (TimestampWritableV2) writable;
+ assertEquals(timestamp.toEpochMilli(),
timestampWritable.getTimestamp().toEpochMilli());
+ assertEquals(timestamp.getNanos(), timestampWritable.getNanos());
+ }
+
+ @Test
+ public void testGetInt64NanosAdjustedToUTCTimestampConverter() throws
Exception {
+ ZoneId zone = ZoneId.systemDefault();
+ Timestamp timestamp = Timestamp.valueOf("2018-07-15 15:12:20.11223344");
+ PrimitiveType primitiveType = createInt64TimestampType(true,
TimeUnit.NANOS);
+ long time = timestamp.toEpochSecond() * 1000000000 + timestamp.getNanos();
+ Writable writable = getWritableFromPrimitiveConverter(null, primitiveType,
time);
+ TimestampWritableV2 timestampWritable = (TimestampWritableV2) writable;
+ timestamp = Timestamp.ofEpochSecond(timestamp.toEpochSecond(),
timestamp.getNanos(), zone);
+ assertEquals(timestamp.toEpochMilli(),
timestampWritable.getTimestamp().toEpochMilli());
+ assertEquals(timestamp.getNanos(), timestampWritable.getNanos());
+ }
+
+ @Test
public void testGetTextConverter() throws Exception {
PrimitiveType primitiveType = Types.optional(PrimitiveTypeName.BINARY)
.as(LogicalTypeAnnotation.stringType()).named("value");
@@ -292,9 +359,23 @@ public class TestETypeConverter {
return converterParent.getValue();
}
+ private Writable getWritableFromPrimitiveConverter(TypeInfo hiveTypeInfo,
PrimitiveType primitiveType,
+ Long valueToAdd) {
+ MyConverterParent converterParent = new MyConverterParent();
+ PrimitiveConverter converter =
ETypeConverter.getNewConverter(primitiveType, 1, converterParent, hiveTypeInfo);
+ ((PrimitiveConverter) converter).addLong(valueToAdd);
+ return converterParent.getValue();
+ }
+
private PrimitiveTypeInfo createHiveTypeInfo(String typeName) {
PrimitiveTypeInfo hiveTypeInfo = new PrimitiveTypeInfo();
hiveTypeInfo.setTypeName(typeName);
return hiveTypeInfo;
}
+
+ private PrimitiveType createInt64TimestampType(boolean isAdjustedToUTC,
TimeUnit unit) {
+ TimestampLogicalTypeAnnotation logicalType =
TimestampLogicalTypeAnnotation.timestampType(isAdjustedToUTC, unit);
+ PrimitiveType primitiveType =
Types.optional(PrimitiveTypeName.INT64).as(logicalType).named("value");
+ return primitiveType;
+ }
}