This is an automated email from the ASF dual-hosted git repository.

kuczoram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 06c5923  HIVE-21215: Read Parquet INT64 timestamp (Marta Kuczora, 
reviewed by Karen Coppage and Peter Vary)
06c5923 is described below

commit 06c5923bf24330a692718801e6726e70ac6aa84d
Author: Marta Kuczora <[email protected]>
AuthorDate: Tue Feb 4 10:51:06 2020 +0100

    HIVE-21215: Read Parquet INT64 timestamp (Marta Kuczora, reviewed by Karen 
Coppage and Peter Vary)
---
 .../apache/hadoop/hive/common/type/Timestamp.java  |  5 ++
 .../hive/ql/io/parquet/convert/ETypeConverter.java | 22 ++++++
 .../parquet/timestamp/ParquetTimestampUtils.java   | 56 +++++++++++++++
 .../vector/ParquetDataColumnReaderFactory.java     | 62 ++++++++++++++---
 .../vector/VectorizedPrimitiveColumnReader.java    |  3 +
 .../ql/io/parquet/convert/TestETypeConverter.java  | 81 ++++++++++++++++++++++
 6 files changed, 219 insertions(+), 10 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java 
b/common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java
index f2c1493..0193aba 100644
--- a/common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java
+++ b/common/src/java/org/apache/hadoop/hive/common/type/Timestamp.java
@@ -21,6 +21,7 @@ import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.ZoneId;
 import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
@@ -182,6 +183,10 @@ public class Timestamp implements Comparable<Timestamp> {
         LocalDateTime.ofEpochSecond(epochSecond, nanos, ZoneOffset.UTC));
   }
 
+  public static Timestamp ofEpochSecond(long epochSecond, long nanos, ZoneId 
zone) {
+    return new 
Timestamp(LocalDateTime.ofInstant(Instant.ofEpochSecond(epochSecond, nanos), 
zone));
+  }
+
   public static Timestamp ofEpochMilli(long epochMilli) {
     return new Timestamp(LocalDateTime
         .ofInstant(Instant.ofEpochMilli(epochMilli), ZoneOffset.UTC));
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java
index d67b030..490b71e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
 import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
 import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.ParquetTimestampUtils;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.io.DateWritableV2;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -48,6 +49,7 @@ import org.apache.parquet.schema.LogicalTypeAnnotation;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
 import org.apache.parquet.schema.PrimitiveType;
 
@@ -683,6 +685,21 @@ public enum ETypeConverter {
       };
     }
   },
+  EINT64_TIMESTAMP_CONVERTER(TimestampWritableV2.class) {
+    @Override
+    PrimitiveConverter getConverter(final PrimitiveType type, final int index, 
final ConverterParent parent,
+        TypeInfo hiveTypeInfo) {
+      return new PrimitiveConverter() {
+        @Override
+        public void addLong(final long value) {
+          TimestampLogicalTypeAnnotation logicalType = 
(TimestampLogicalTypeAnnotation) type.getLogicalTypeAnnotation();
+          Timestamp timestamp =
+              ParquetTimestampUtils.getTimestamp(value, logicalType.getUnit(), 
logicalType.isAdjustedToUTC());
+          parent.set(index, new TimestampWritableV2(timestamp));
+        }
+      };
+    }
+  },
   EDATE_CONVERTER(DateWritableV2.class) {
     @Override
     PrimitiveConverter getConverter(final PrimitiveType type, final int index, 
final ConverterParent parent, TypeInfo hiveTypeInfo) {
@@ -730,6 +747,11 @@ public enum ETypeConverter {
             public Optional<PrimitiveConverter> 
visit(DateLogicalTypeAnnotation logicalTypeAnnotation) {
               return Optional.of(EDATE_CONVERTER.getConverter(type, index, 
parent, hiveTypeInfo));
             }
+
+            @Override
+            public Optional<PrimitiveConverter> 
visit(TimestampLogicalTypeAnnotation logicalTypeAnnotation) {
+              return Optional.of(EINT64_TIMESTAMP_CONVERTER.getConverter(type, 
index, parent, hiveTypeInfo));
+            }
           });
 
       if (converter.isPresent()) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/ParquetTimestampUtils.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/ParquetTimestampUtils.java
new file mode 100644
index 0000000..9ce07e7
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/ParquetTimestampUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.timestamp;
+
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+
+import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
+
+public class ParquetTimestampUtils {
+  private static final long MILLI = 1000;
+  private static final long MICRO = 1_000_000;
+  private static final long NANO = 1_000_000_000;
+
+  public static Timestamp getTimestamp(long value, TimeUnit timeUnit, boolean 
isAdjustedToUTC) {
+
+    ZoneId zone = ZoneOffset.UTC;
+    if (isAdjustedToUTC) {
+      zone = ZoneId.systemDefault();
+    }
+    long seconds = 0L;
+    long nanoseconds = 0L;
+
+    switch (timeUnit) {
+    case MILLIS:
+      seconds = value / MILLI;
+      nanoseconds = (value % MILLI) * MICRO;
+      break;
+
+    case MICROS:
+      seconds = value / MICRO;
+      nanoseconds = (value % MICRO) * MILLI;
+      break;
+
+    case NANOS:
+      seconds = value / NANO;
+      nanoseconds = (value % NANO);
+      break;
+    default:
+      break;
+    }
+    return Timestamp.ofEpochSecond(seconds, nanoseconds, zone);
+  }
+}
\ No newline at end of file
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java
index 519bd81..10dfe22 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java
@@ -25,6 +25,7 @@ import 
org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
 import org.apache.hadoop.hive.ql.io.parquet.convert.ETypeConverter;
 import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
 import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.ParquetTimestampUtils;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
@@ -43,6 +44,8 @@ import org.apache.parquet.schema.LogicalTypeAnnotation;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation;
+import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
 import org.apache.parquet.schema.PrimitiveType;
 
 import java.io.IOException;
@@ -427,6 +430,9 @@ public final class ParquetDataColumnReaderFactory {
    */
   public static class TypesFromInt64PageReader extends 
DefaultParquetDataColumnReader {
 
+    private boolean isAdjustedToUTC;
+    private TimeUnit timeUnit;
+
     public TypesFromInt64PageReader(ValuesReader realReader, int length, int 
precision, int scale) {
       super(realReader, length, precision, scale);
     }
@@ -435,6 +441,18 @@ public final class ParquetDataColumnReaderFactory {
       super(dict, length, precision, scale);
     }
 
+    public TypesFromInt64PageReader(ValuesReader realReader, int length, 
boolean isAdjustedToUTC, TimeUnit timeUnit) {
+      super(realReader, length);
+      this.isAdjustedToUTC = isAdjustedToUTC;
+      this.timeUnit = timeUnit;
+    }
+
+    public TypesFromInt64PageReader(Dictionary dict, int length, boolean 
isAdjustedToUTC, TimeUnit timeUnit) {
+      super(dict, length);
+      this.isAdjustedToUTC = isAdjustedToUTC;
+      this.timeUnit = timeUnit;
+    }
+
     @Override
     public long readInteger() {
       return super.validatedLong(valuesReader.readLong(), 
serdeConstants.INT_TYPE_NAME);
@@ -533,6 +551,21 @@ public final class ParquetDataColumnReaderFactory {
       return convertToBytes(value);
     }
 
+    private Timestamp convert(Long value) {
+      Timestamp timestamp = ParquetTimestampUtils.getTimestamp(value, 
timeUnit, isAdjustedToUTC);
+      return timestamp;
+    }
+
+    @Override
+    public Timestamp readTimestamp(int id) {
+      return convert(dict.decodeToLong(id));
+    }
+
+    @Override
+    public Timestamp readTimestamp() {
+      return convert(valuesReader.readLong());
+    }
+
     private static String convertToString(long value) {
       return Long.toString(value);
     }
@@ -1844,20 +1877,29 @@ public final class ParquetDataColumnReaderFactory {
             hiveScale);
       }
     case INT64:
+      LogicalTypeAnnotation logicalType = 
parquetType.getLogicalTypeAnnotation();
+      if (logicalType instanceof TimestampLogicalTypeAnnotation) {
+        TimestampLogicalTypeAnnotation timestampLogicalType = 
(TimestampLogicalTypeAnnotation) logicalType;
+        boolean isAdjustedToUTC = timestampLogicalType.isAdjustedToUTC();
+        TimeUnit timeUnit = timestampLogicalType.getUnit();
+        return isDictionary ? new TypesFromInt64PageReader(dictionary, length, 
isAdjustedToUTC, timeUnit)
+          : new TypesFromInt64PageReader(valuesReader, length, 
isAdjustedToUTC, timeUnit);
+      }
+
       if (ETypeConverter.isUnsignedInteger(parquetType)) {
-        return isDictionary ? new TypesFromUInt64PageReader(dictionary, 
length, hivePrecision,
-            hiveScale) : new TypesFromUInt64PageReader(valuesReader, length, 
hivePrecision,
-            hiveScale);
-      } else if (parquetType.getLogicalTypeAnnotation() instanceof 
DecimalLogicalTypeAnnotation) {
-        DecimalLogicalTypeAnnotation logicalType = 
(DecimalLogicalTypeAnnotation) parquetType.getLogicalTypeAnnotation();
-        final short scale = (short) logicalType.getScale();
+        return isDictionary ? new TypesFromUInt64PageReader(dictionary, 
length, hivePrecision, hiveScale)
+          : new TypesFromUInt64PageReader(valuesReader, length, hivePrecision, 
hiveScale);
+      }
+
+      if (logicalType instanceof DecimalLogicalTypeAnnotation) {
+        DecimalLogicalTypeAnnotation decimalLogicalType = 
(DecimalLogicalTypeAnnotation) logicalType;
+        final short scale = (short) decimalLogicalType.getScale();
         return isDictionary ? new TypesFromInt64DecimalPageReader(dictionary, 
length, scale, hivePrecision, hiveScale)
           : new TypesFromInt64DecimalPageReader(valuesReader, length, scale, 
hivePrecision, hiveScale);
-      } else {
-        return isDictionary ? new TypesFromInt64PageReader(dictionary, length, 
hivePrecision,
-            hiveScale) : new TypesFromInt64PageReader(valuesReader, length, 
hivePrecision,
-            hiveScale);
       }
+
+      return isDictionary ? new TypesFromInt64PageReader(dictionary, length, 
hivePrecision, hiveScale)
+        : new TypesFromInt64PageReader(valuesReader, length, hivePrecision, 
hiveScale);
     case FLOAT:
       return isDictionary ? new TypesFromFloatPageReader(dictionary, length, 
hivePrecision,
           hiveScale) : new TypesFromFloatPageReader(valuesReader, length, 
hivePrecision, hiveScale);
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
index 2803baf..26ce573 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
@@ -448,6 +448,9 @@ public class VectorizedPrimitiveColumnReader extends 
BaseVectorizedColumnReader
         case INT96:
           c.set(rowId, dataColumn.readTimestamp().toSqlTimestamp());
           break;
+        case INT64:
+          c.set(rowId, dataColumn.readTimestamp().toSqlTimestamp());
+          break;
         default:
           throw new IOException(
               "Unsupported parquet logical type: " + 
type.getLogicalTypeAnnotation().toString() + " for timestamp");
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/convert/TestETypeConverter.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/convert/TestETypeConverter.java
index f6ee571..be4c880 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/convert/TestETypeConverter.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/convert/TestETypeConverter.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hive.ql.io.parquet.convert;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.time.ZoneId;
+
 import org.apache.hadoop.hive.common.type.Timestamp;
 import 
org.apache.hadoop.hive.ql.io.parquet.convert.ETypeConverter.BinaryConverter;
 import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
@@ -43,6 +45,8 @@ import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type.Repetition;
@@ -115,6 +119,69 @@ public class TestETypeConverter {
   }
 
   @Test
+  public void testGetTimestampProlepticConverter() throws Exception {
+    Timestamp timestamp = Timestamp.valueOf("1572-06-15 15:12:20.0");
+    NanoTime nanoTime = NanoTimeUtils.getNanoTime(timestamp, true);
+    PrimitiveType primitiveType = 
Types.optional(PrimitiveTypeName.INT96).named("value");
+    Writable writable = getWritableFromBinaryConverter(null, primitiveType, 
nanoTime.toBinary());
+    TimestampWritableV2 timestampWritable = (TimestampWritableV2) writable;
+    assertEquals(timestamp.getNanos(), timestampWritable.getNanos());
+  }
+
+  @Test
+  public void testGetInt64MillisTimestampConverter() throws Exception {
+    Timestamp timestamp = Timestamp.valueOf("2018-07-15 15:12:20.112");
+    PrimitiveType primitiveType = createInt64TimestampType(false, 
TimeUnit.MILLIS);
+    Writable writable = getWritableFromPrimitiveConverter(null, primitiveType, 
timestamp.toEpochMilli());
+    TimestampWritableV2 timestampWritable = (TimestampWritableV2) writable;
+    assertEquals(timestamp.toEpochMilli(), 
timestampWritable.getTimestamp().toEpochMilli());
+  }
+
+  @Test
+  public void testGetInt64MillisTimestampProlepticConverter() throws Exception 
{
+    Timestamp timestamp = Timestamp.valueOf("1572-07-15 15:12:20.112");
+    PrimitiveType primitiveType = createInt64TimestampType(false, 
TimeUnit.MILLIS);
+    Writable writable = getWritableFromPrimitiveConverter(null, primitiveType, 
timestamp.toEpochMilli());
+    TimestampWritableV2 timestampWritable = (TimestampWritableV2) writable;
+    assertEquals(timestamp.toEpochMilli(), 
timestampWritable.getTimestamp().toEpochMilli());
+  }
+
+  @Test
+  public void testGetInt64MicrosTimestampConverter() throws Exception {
+    Timestamp timestamp = Timestamp.valueOf("2018-07-15 15:12:20.112233");
+    PrimitiveType primitiveType = createInt64TimestampType(false, 
TimeUnit.MICROS);
+    long time = timestamp.toEpochSecond() * 1000000 + timestamp.getNanos() / 
1000;
+    Writable writable = getWritableFromPrimitiveConverter(null, primitiveType, 
time);
+    TimestampWritableV2 timestampWritable = (TimestampWritableV2) writable;
+    assertEquals(timestamp.toEpochMilli(), 
timestampWritable.getTimestamp().toEpochMilli());
+    assertEquals(timestamp.getNanos(), timestampWritable.getNanos());
+  }
+
+  @Test
+  public void testGetInt64NanosTimestampConverter() throws Exception {
+    Timestamp timestamp = Timestamp.valueOf("2018-07-15 15:12:20.11223344");
+    PrimitiveType primitiveType = createInt64TimestampType(false, 
TimeUnit.NANOS);
+    long time = timestamp.toEpochSecond() * 1000000000 + timestamp.getNanos();
+    Writable writable = getWritableFromPrimitiveConverter(null, primitiveType, 
time);
+    TimestampWritableV2 timestampWritable = (TimestampWritableV2) writable;
+    assertEquals(timestamp.toEpochMilli(), 
timestampWritable.getTimestamp().toEpochMilli());
+    assertEquals(timestamp.getNanos(), timestampWritable.getNanos());
+  }
+
+  @Test
+  public void testGetInt64NanosAdjustedToUTCTimestampConverter() throws 
Exception {
+    ZoneId zone = ZoneId.systemDefault();
+    Timestamp timestamp = Timestamp.valueOf("2018-07-15 15:12:20.11223344");
+    PrimitiveType primitiveType = createInt64TimestampType(true, 
TimeUnit.NANOS);
+    long time = timestamp.toEpochSecond() * 1000000000 + timestamp.getNanos();
+    Writable writable = getWritableFromPrimitiveConverter(null, primitiveType, 
time);
+    TimestampWritableV2 timestampWritable = (TimestampWritableV2) writable;
+    timestamp = Timestamp.ofEpochSecond(timestamp.toEpochSecond(), 
timestamp.getNanos(), zone);
+    assertEquals(timestamp.toEpochMilli(), 
timestampWritable.getTimestamp().toEpochMilli());
+    assertEquals(timestamp.getNanos(), timestampWritable.getNanos());
+  }
+
+  @Test
   public void testGetTextConverter() throws Exception {
     PrimitiveType primitiveType = Types.optional(PrimitiveTypeName.BINARY)
         .as(LogicalTypeAnnotation.stringType()).named("value");
@@ -292,9 +359,23 @@ public class TestETypeConverter {
     return converterParent.getValue();
   }
 
+  private Writable getWritableFromPrimitiveConverter(TypeInfo hiveTypeInfo, 
PrimitiveType primitiveType,
+      Long valueToAdd) {
+    MyConverterParent converterParent = new MyConverterParent();
+    PrimitiveConverter converter = 
ETypeConverter.getNewConverter(primitiveType, 1, converterParent, hiveTypeInfo);
+    ((PrimitiveConverter) converter).addLong(valueToAdd);
+    return converterParent.getValue();
+  }
+
   private PrimitiveTypeInfo createHiveTypeInfo(String typeName) {
     PrimitiveTypeInfo hiveTypeInfo = new PrimitiveTypeInfo();
     hiveTypeInfo.setTypeName(typeName);
     return hiveTypeInfo;
   }
+
+  private PrimitiveType createInt64TimestampType(boolean isAdjustedToUTC, 
TimeUnit unit) {
+    TimestampLogicalTypeAnnotation logicalType = 
TimestampLogicalTypeAnnotation.timestampType(isAdjustedToUTC, unit);
+    PrimitiveType primitiveType = 
Types.optional(PrimitiveTypeName.INT64).as(logicalType).named("value");
+    return primitiveType;
+  }
 }

Reply via email to