This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 6739efe  ORC: Implement TestGenericData and fix related issues (#778)
6739efe is described below

commit 6739efe0fc701ed8b40ef0880c58d4d26fc5a451
Author: Shardul Mahadik <[email protected]>
AuthorDate: Tue Mar 31 14:54:30 2020 -0700

    ORC: Implement TestGenericData and fix related issues (#778)
---
 .../apache/iceberg/data/orc/GenericOrcReader.java  | 152 ++++++++++++++++---
 .../apache/iceberg/data/orc/GenericOrcWriter.java  | 168 ++++++++++++++++++---
 .../apache/iceberg/data/orc/TestGenericData.java   | 130 ++++++++++++++++
 orc/src/main/java/org/apache/iceberg/orc/ORC.java  |   2 +-
 .../java/org/apache/iceberg/orc/ORCSchemaUtil.java |  67 +++++---
 .../org/apache/iceberg/orc/OrcFileAppender.java    |   2 +-
 site/docs/spec.md                                  |  41 ++---
 .../apache/iceberg/spark/data/SparkOrcReader.java  |  12 +-
 .../apache/iceberg/spark/data/SparkOrcWriter.java  |   6 +-
 versions.lock                                      |   2 +-
 versions.props                                     |   2 +-
 11 files changed, 485 insertions(+), 99 deletions(-)

diff --git 
a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java 
b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
index e5f0619..88c407d 100644
--- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
+++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
@@ -23,13 +23,22 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.orc.ORCSchemaUtil;
 import org.apache.iceberg.orc.OrcValueReader;
 import org.apache.iceberg.types.Types;
 import org.apache.orc.TypeDescription;
@@ -53,6 +62,9 @@ public class GenericOrcReader implements 
OrcValueReader<Record> {
   private final List<TypeDescription> columns;
   private final Converter[] converters;
 
+  private static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
   private GenericOrcReader(Schema expectedSchema, TypeDescription readSchema) {
     this.schema = expectedSchema;
     this.columns = readSchema.getChildren();
@@ -70,8 +82,7 @@ public class GenericOrcReader implements 
OrcValueReader<Record> {
     return newConverters;
   }
 
-  public static OrcValueReader<Record> buildReader(Schema expectedSchema,
-                                                   TypeDescription fileSchema) 
{
+  public static OrcValueReader<Record> buildReader(Schema expectedSchema, 
TypeDescription fileSchema) {
     return new GenericOrcReader(expectedSchema, fileSchema);
   }
 
@@ -136,6 +147,30 @@ public class GenericOrcReader implements 
OrcValueReader<Record> {
     }
   }
 
+  private static class TimeConverter implements Converter<LocalTime> {
+    @Override
+    public LocalTime convert(ColumnVector vector, int row) {
+      int rowIndex = vector.isRepeating ? 0 : row;
+      if (!vector.noNulls && vector.isNull[rowIndex]) {
+        return null;
+      } else {
+        return LocalTime.ofNanoOfDay(((LongColumnVector) 
vector).vector[rowIndex] * 1_000);
+      }
+    }
+  }
+
+  private static class DateConverter implements Converter<LocalDate> {
+    @Override
+    public LocalDate convert(ColumnVector vector, int row) {
+      int rowIndex = vector.isRepeating ? 0 : row;
+      if (!vector.noNulls && vector.isNull[rowIndex]) {
+        return null;
+      } else {
+        return EPOCH_DAY.plusDays((int) ((LongColumnVector) 
vector).vector[rowIndex]);
+      }
+    }
+  }
+
   private static class LongConverter implements Converter<Long> {
     @Override
     public Long convert(ColumnVector vector, int row) {
@@ -172,14 +207,13 @@ public class GenericOrcReader implements 
OrcValueReader<Record> {
     }
   }
 
-  private static class TimestampConverter implements Converter<Long> {
-    private Long convert(TimestampColumnVector vector, int row) {
-      // compute microseconds past 1970.
-      return (vector.time[row] / 1000) * 1_000_000 + vector.nanos[row] / 1000;
+  private static class TimestampTzConverter implements 
Converter<OffsetDateTime> {
+    private OffsetDateTime convert(TimestampColumnVector vector, int row) {
+      return Instant.ofEpochSecond(Math.floorDiv(vector.time[row], 1_000), 
vector.nanos[row]).atOffset(ZoneOffset.UTC);
     }
 
     @Override
-    public Long convert(ColumnVector vector, int row) {
+    public OffsetDateTime convert(ColumnVector vector, int row) {
       int rowIndex = vector.isRepeating ? 0 : row;
       if (!vector.noNulls && vector.isNull[rowIndex]) {
         return null;
@@ -189,7 +223,25 @@ public class GenericOrcReader implements 
OrcValueReader<Record> {
     }
   }
 
-  private static class BinaryConverter implements Converter<byte[]> {
+  private static class TimestampConverter implements Converter<LocalDateTime> {
+
+    private LocalDateTime convert(TimestampColumnVector vector, int row) {
+      return Instant.ofEpochSecond(Math.floorDiv(vector.time[row], 1_000), 
vector.nanos[row]).atOffset(ZoneOffset.UTC)
+          .toLocalDateTime();
+    }
+
+    @Override
+    public LocalDateTime convert(ColumnVector vector, int row) {
+      int rowIndex = vector.isRepeating ? 0 : row;
+      if (!vector.noNulls && vector.isNull[rowIndex]) {
+        return null;
+      } else {
+        return convert((TimestampColumnVector) vector, rowIndex);
+      }
+    }
+  }
+
+  private static class FixedConverter implements Converter<byte[]> {
     @Override
     public byte[] convert(ColumnVector vector, int row) {
       int rowIndex = vector.isRepeating ? 0 : row;
@@ -197,22 +249,53 @@ public class GenericOrcReader implements 
OrcValueReader<Record> {
         return null;
       } else {
         BytesColumnVector bytesVector = (BytesColumnVector) vector;
-        return Arrays.copyOfRange(bytesVector.vector[rowIndex],
-            bytesVector.start[rowIndex],
+        return Arrays.copyOfRange(bytesVector.vector[rowIndex], 
bytesVector.start[rowIndex],
             bytesVector.start[rowIndex] + bytesVector.length[rowIndex]);
       }
     }
   }
 
+  private static class BinaryConverter implements Converter<ByteBuffer> {
+    @Override
+    public ByteBuffer convert(ColumnVector vector, int row) {
+      int rowIndex = vector.isRepeating ? 0 : row;
+      if (!vector.noNulls && vector.isNull[rowIndex]) {
+        return null;
+      } else {
+        BytesColumnVector bytesVector = (BytesColumnVector) vector;
+        return ByteBuffer.wrap(bytesVector.vector[rowIndex], 
bytesVector.start[rowIndex], bytesVector.length[rowIndex]);
+      }
+    }
+  }
+
+  private static class UUIDConverter implements Converter<UUID> {
+    @Override
+    public UUID convert(ColumnVector vector, int row) {
+      int rowIndex = vector.isRepeating ? 0 : row;
+      if (!vector.noNulls && vector.isNull[rowIndex]) {
+        return null;
+      } else {
+        BytesColumnVector bytesVector = (BytesColumnVector) vector;
+        ByteBuffer buf = ByteBuffer.wrap(bytesVector.vector[rowIndex], 
bytesVector.start[rowIndex],
+            bytesVector.length[rowIndex]);
+        long mostSigBits = buf.getLong();
+        long leastSigBits = buf.getLong();
+        return new UUID(mostSigBits, leastSigBits);
+      }
+    }
+  }
+
   private static class StringConverter implements Converter<String> {
     @Override
     public String convert(ColumnVector vector, int row) {
-      BinaryConverter converter = new BinaryConverter();
-      byte[] byteData = converter.convert(vector, row);
-      if (byteData == null) {
+      int rowIndex = vector.isRepeating ? 0 : row;
+      if (!vector.noNulls && vector.isNull[rowIndex]) {
         return null;
+      } else {
+        BytesColumnVector bytesVector = (BytesColumnVector) vector;
+        return new String(bytesVector.vector[rowIndex], 
bytesVector.start[rowIndex], bytesVector.length[rowIndex],
+            StandardCharsets.UTF_8);
       }
-      return new String(byteData, StandardCharsets.UTF_8);
     }
   }
 
@@ -223,8 +306,8 @@ public class GenericOrcReader implements 
OrcValueReader<Record> {
       if (!vector.noNulls && vector.isNull[rowIndex]) {
         return null;
       } else {
-        return ((DecimalColumnVector) vector).vector[rowIndex]
-            .getHiveDecimal().bigDecimalValue();
+        DecimalColumnVector cv = (DecimalColumnVector) vector;
+        return 
cv.vector[rowIndex].getHiveDecimal().bigDecimalValue().setScale(cv.scale);
       }
     }
   }
@@ -283,10 +366,9 @@ public class GenericOrcReader implements 
OrcValueReader<Record> {
       final int offset = (int) vector.offsets[row];
       final int length = (int) vector.lengths[row];
 
-      // serialize the keys
-      Map<String, Object> map = Maps.newHashMapWithExpectedSize(length);
+      Map<Object, Object> map = Maps.newHashMapWithExpectedSize(length);
       for (int c = 0; c < length; ++c) {
-        String key = String.valueOf(keyConvert.convert(vector.keys, offset + 
c));
+        Object key = keyConvert.convert(vector.keys, offset + c);
         Object value = valueConvert.convert(vector.values, offset + c);
         map.put(key, value);
       }
@@ -341,8 +423,7 @@ public class GenericOrcReader implements 
OrcValueReader<Record> {
     }
   }
 
-  private static Converter buildConverter(final Types.NestedField icebergField,
-                                          final TypeDescription schema) {
+  private static Converter buildConverter(final Types.NestedField 
icebergField, final TypeDescription schema) {
     switch (schema.getCategory()) {
       case BOOLEAN:
         return new BooleanConverter();
@@ -351,20 +432,45 @@ public class GenericOrcReader implements 
OrcValueReader<Record> {
       case SHORT:
         return new ShortConverter();
       case DATE:
+        return new DateConverter();
       case INT:
         return new IntConverter();
       case LONG:
-        return new LongConverter();
+        String longAttributeValue = 
schema.getAttributeValue(ORCSchemaUtil.ICEBERG_LONG_TYPE_ATTRIBUTE);
+        ORCSchemaUtil.LongType longType = longAttributeValue == null ? 
ORCSchemaUtil.LongType.LONG :
+            ORCSchemaUtil.LongType.valueOf(longAttributeValue);
+        switch (longType) {
+          case TIME:
+            return new TimeConverter();
+          case LONG:
+            return new LongConverter();
+          default:
+            throw new IllegalStateException("Unhandled Long type found in ORC 
type attribute: " + longType);
+        }
       case FLOAT:
         return new FloatConverter();
       case DOUBLE:
         return new DoubleConverter();
       case TIMESTAMP:
         return new TimestampConverter();
+      case TIMESTAMP_INSTANT:
+        return new TimestampTzConverter();
       case DECIMAL:
         return new DecimalConverter();
       case BINARY:
-        return new BinaryConverter();
+        String binaryAttributeValue = 
schema.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE);
+        ORCSchemaUtil.BinaryType binaryType = binaryAttributeValue == null ? 
ORCSchemaUtil.BinaryType.BINARY :
+            ORCSchemaUtil.BinaryType.valueOf(binaryAttributeValue);
+        switch (binaryType) {
+          case UUID:
+            return new UUIDConverter();
+          case FIXED:
+            return new FixedConverter();
+          case BINARY:
+            return new BinaryConverter();
+          default:
+            throw new IllegalStateException("Unhandled Binary type found in 
ORC type attribute: " + binaryType);
+        }
       case STRING:
       case CHAR:
       case VARCHAR:
diff --git 
a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java 
b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
index 78c6f0d..d6f2025 100644
--- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
+++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
@@ -22,10 +22,20 @@ package org.apache.iceberg.data.orc;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.orc.ORCSchemaUtil;
 import org.apache.iceberg.orc.OrcValueWriter;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.storage.common.type.HiveDecimal;
@@ -40,10 +50,10 @@ import 
org.apache.orc.storage.ql.exec.vector.StructColumnVector;
 import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 
-/**
- */
 public class GenericOrcWriter implements OrcValueWriter<Record> {
   private final Converter[] converters;
+  private static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
 
   private GenericOrcWriter(TypeDescription schema) {
     this.converters = buildConverters(schema);
@@ -151,6 +161,23 @@ public class GenericOrcWriter implements 
OrcValueWriter<Record> {
     }
   }
 
+  static class TimeConverter implements Converter<LocalTime> {
+    @Override
+    public Class<LocalTime> getJavaClass() {
+      return LocalTime.class;
+    }
+
+    public void addValue(int rowId, LocalTime data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data.toNanoOfDay() / 1_000;
+      }
+    }
+  }
+
   static class LongConverter implements Converter<Long> {
     @Override
     public Class<Long> getJavaClass() {
@@ -224,7 +251,44 @@ public class GenericOrcWriter implements 
OrcValueWriter<Record> {
     }
   }
 
-  static class BytesConverter implements Converter<byte[]> {
+  static class BytesConverter implements Converter<ByteBuffer> {
+    @Override
+    public Class<ByteBuffer> getJavaClass() {
+      return ByteBuffer.class;
+    }
+
+    public void addValue(int rowId, ByteBuffer data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((BytesColumnVector) output).setRef(rowId, data.array(), 0, 
data.array().length);
+      }
+    }
+  }
+
+  static class UUIDConverter implements Converter<UUID> {
+    @Override
+    public Class<UUID> getJavaClass() {
+      return UUID.class;
+    }
+
+    public void addValue(int rowId, UUID data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ByteBuffer buffer = ByteBuffer.allocate(16);
+        buffer.putLong(data.getMostSignificantBits());
+        buffer.putLong(data.getLeastSignificantBits());
+        ((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, 
buffer.array().length);
+      }
+    }
+  }
+
+  static class FixedConverter implements Converter<byte[]> {
     @Override
     public Class<byte[]> getJavaClass() {
       return byte[].class;
@@ -237,30 +301,67 @@ public class GenericOrcWriter implements 
OrcValueWriter<Record> {
         output.isNull[rowId] = true;
       } else {
         output.isNull[rowId] = false;
-        // getBinary always makes a copy, so we don't need to worry about it
-        // being changed behind our back.
         ((BytesColumnVector) output).setRef(rowId, data, 0, data.length);
       }
     }
   }
 
-  static class TimestampConverter implements Converter<Long> {
+  static class DateConverter implements Converter<LocalDate> {
     @Override
-    public Class<Long> getJavaClass() {
-      return Long.class;
+    public Class<LocalDate> getJavaClass() {
+      return LocalDate.class;
     }
 
     @Override
-    public void addValue(int rowId, Long data, ColumnVector output) {
+    public void addValue(int rowId, LocalDate data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = 
ChronoUnit.DAYS.between(EPOCH_DAY, data);
+      }
+    }
+  }
+
+  static class TimestampTzConverter implements Converter<OffsetDateTime> {
+    @Override
+    public Class<OffsetDateTime> getJavaClass() {
+      return OffsetDateTime.class;
+    }
+
+    @Override
+    public void addValue(int rowId, OffsetDateTime data, ColumnVector output) {
       if (data == null) {
         output.noNulls = false;
         output.isNull[rowId] = true;
       } else {
         output.isNull[rowId] = false;
         TimestampColumnVector cv = (TimestampColumnVector) output;
-        long micros = data;
-        cv.time[rowId] = micros / 1_000; // millis
-        cv.nanos[rowId] = (int) (micros % 1_000_000) * 1_000; // nanos
+        cv.time[rowId] = data.toInstant().toEpochMilli(); // millis
+        cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos 
to only keep microsecond precision
+      }
+    }
+  }
+
+  static class TimestampConverter implements Converter<LocalDateTime> {
+
+    @Override
+    public Class<LocalDateTime> getJavaClass() {
+      return LocalDateTime.class;
+    }
+
+    @Override
+    public void addValue(int rowId, LocalDateTime data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        TimestampColumnVector cv = (TimestampColumnVector) output;
+        cv.setIsUTC(true);
+        cv.time[rowId] = data.toInstant(ZoneOffset.UTC).toEpochMilli(); // 
millis
+        cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos 
to only keep microsecond precision
       }
     }
   }
@@ -286,7 +387,7 @@ public class GenericOrcWriter implements 
OrcValueWriter<Record> {
       } else {
         output.isNull[rowId] = false;
         ((DecimalColumnVector) output).vector[rowId]
-            .setFromLongAndScale(data.longValueExact(), scale);
+            .setFromLongAndScale(data.unscaledValue().longValueExact(), scale);
       }
     }
   }
@@ -308,7 +409,7 @@ public class GenericOrcWriter implements 
OrcValueWriter<Record> {
         output.isNull[rowId] = true;
       } else {
         output.isNull[rowId] = false;
-        ((DecimalColumnVector) 
output).vector[rowId].set(HiveDecimal.create(data));
+        ((DecimalColumnVector) 
output).vector[rowId].set(HiveDecimal.create(data, false));
       }
     }
   }
@@ -402,10 +503,10 @@ public class GenericOrcWriter implements 
OrcValueWriter<Record> {
         output.isNull[rowId] = true;
       } else {
         output.isNull[rowId] = false;
-        Map<String, Object> map = (Map<String, Object>) data;
-        List<String> keys = Lists.newArrayListWithExpectedSize(map.size());
+        Map<Object, Object> map = (Map<Object, Object>) data;
+        List<Object> keys = Lists.newArrayListWithExpectedSize(map.size());
         List<Object> values = Lists.newArrayListWithExpectedSize(map.size());
-        for (Map.Entry<String, ?> entry : map.entrySet()) {
+        for (Map.Entry<?, ?> entry : map.entrySet()) {
           keys.add(entry.getKey());
           values.add(entry.getValue());
         }
@@ -436,26 +537,49 @@ public class GenericOrcWriter implements 
OrcValueWriter<Record> {
       case SHORT:
         return new ShortConverter();
       case DATE:
+        return new DateConverter();
       case INT:
         return new IntConverter();
       case LONG:
-        return new LongConverter();
+        String longAttributeValue = 
schema.getAttributeValue(ORCSchemaUtil.ICEBERG_LONG_TYPE_ATTRIBUTE);
+        ORCSchemaUtil.LongType longType = longAttributeValue == null ? 
ORCSchemaUtil.LongType.LONG :
+            ORCSchemaUtil.LongType.valueOf(longAttributeValue);
+        switch (longType) {
+          case TIME:
+            return new TimeConverter();
+          case LONG:
+            return new LongConverter();
+          default:
+            throw new IllegalStateException("Unhandled Long type found in ORC 
type attribute: " + longType);
+        }
       case FLOAT:
         return new FloatConverter();
       case DOUBLE:
         return new DoubleConverter();
       case BINARY:
-        return new BytesConverter();
+        String binaryAttributeValue = 
schema.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE);
+        ORCSchemaUtil.BinaryType binaryType = binaryAttributeValue == null ? 
ORCSchemaUtil.BinaryType.BINARY :
+            ORCSchemaUtil.BinaryType.valueOf(binaryAttributeValue);
+        switch (binaryType) {
+          case UUID:
+            return new UUIDConverter();
+          case FIXED:
+            return new FixedConverter();
+          case BINARY:
+            return new BytesConverter();
+          default:
+            throw new IllegalStateException("Unhandled Binary type found in 
ORC type attribute: " + binaryType);
+        }
       case STRING:
       case CHAR:
       case VARCHAR:
         return new StringConverter();
       case DECIMAL:
-        return schema.getPrecision() <= 18 ?
-            new Decimal18Converter(schema) :
-            new Decimal38Converter(schema);
+        return schema.getPrecision() <= 18 ? new Decimal18Converter(schema) : 
new Decimal38Converter(schema);
       case TIMESTAMP:
         return new TimestampConverter();
+      case TIMESTAMP_INSTANT:
+        return new TimestampTzConverter();
       case STRUCT:
         return new StructConverter(schema);
       case LIST:
diff --git 
a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java 
b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java
new file mode 100644
index 0000000..670b455
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.TimeZone;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.DataTest;
+import org.apache.iceberg.data.DataTestHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestGenericData extends DataTest {
+
+  @Override
+  protected void writeAndValidate(Schema schema) throws IOException {
+    List<Record> expected = RandomGenericData.generate(schema, 100, 0L);
+
+    File testFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", testFile.delete());
+
+    try (FileAppender<Record> writer = ORC.write(Files.localOutput(testFile))
+        .schema(schema)
+        .createWriterFunc(GenericOrcWriter::buildWriter)
+        .build()) {
+      for (Record rec : expected) {
+        writer.add(rec);
+      }
+    }
+
+    List<Record> rows;
+    try (CloseableIterable<Record> reader = 
ORC.read(Files.localInput(testFile))
+        .project(schema)
+        .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, 
fileSchema))
+        .build()) {
+      rows = Lists.newArrayList(reader);
+    }
+
+    for (int i = 0; i < expected.size(); i += 1) {
+      DataTestHelpers.assertEquals(schema.asStruct(), expected.get(i), 
rows.get(i));
+    }
+  }
+
+  @Test
+  public void writeAndValidateTimestamps() throws IOException {
+    Schema timestampSchema = new Schema(
+        required(1, "tsTzCol", Types.TimestampType.withZone()),
+        required(2, "tsCol", Types.TimestampType.withoutZone())
+    );
+
+    // Write using America/New_York timezone
+    TimeZone.setDefault(TimeZone.getTimeZone("America/New_York"));
+    GenericRecord record1 = GenericRecord.create(timestampSchema);
+    record1.setField("tsTzCol", 
OffsetDateTime.parse("2017-01-16T17:10:34-08:00"));
+    record1.setField("tsCol", LocalDateTime.parse("1970-01-01T00:01:00"));
+    GenericRecord record2 = GenericRecord.create(timestampSchema);
+    record2.setField("tsTzCol", 
OffsetDateTime.parse("2017-05-16T17:10:34-08:00"));
+    record2.setField("tsCol", LocalDateTime.parse("1970-05-01T00:01:00"));
+    GenericRecord record3 = GenericRecord.create(timestampSchema);
+    record3.setField("tsTzCol", 
OffsetDateTime.parse("1935-01-16T17:10:34-08:00"));
+    record3.setField("tsCol", LocalDateTime.parse("1935-01-01T00:01:00"));
+    GenericRecord record4 = GenericRecord.create(timestampSchema);
+    record4.setField("tsTzCol", 
OffsetDateTime.parse("1935-05-16T17:10:34-08:00"));
+    record4.setField("tsCol", LocalDateTime.parse("1935-05-01T00:01:00"));
+
+    File testFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", testFile.delete());
+
+    try (FileAppender<Record> writer = ORC.write(Files.localOutput(testFile))
+        .schema(timestampSchema)
+        .createWriterFunc(GenericOrcWriter::buildWriter)
+        .build()) {
+      writer.add(record1);
+      writer.add(record2);
+      writer.add(record3);
+      writer.add(record4);
+    }
+
+    // Read using Asia/Kolkata timezone
+    TimeZone.setDefault(TimeZone.getTimeZone("Asia/Kolkata"));
+    List<Record> rows;
+    try (CloseableIterable<Record> reader = 
ORC.read(Files.localInput(testFile))
+        .project(timestampSchema)
+        .createReaderFunc(fileSchema -> 
GenericOrcReader.buildReader(timestampSchema, fileSchema))
+        .build()) {
+      rows = Lists.newArrayList(reader);
+    }
+
+    Assert.assertEquals(OffsetDateTime.parse("2017-01-17T01:10:34Z"), 
rows.get(0).getField("tsTzCol"));
+    Assert.assertEquals(LocalDateTime.parse("1970-01-01T00:01:00"), 
rows.get(0).getField("tsCol"));
+    Assert.assertEquals(OffsetDateTime.parse("2017-05-17T01:10:34Z"), 
rows.get(1).getField("tsTzCol"));
+    Assert.assertEquals(LocalDateTime.parse("1970-05-01T00:01:00"), 
rows.get(1).getField("tsCol"));
+    Assert.assertEquals(OffsetDateTime.parse("1935-01-17T01:10:34Z"), 
rows.get(2).getField("tsTzCol"));
+    Assert.assertEquals(LocalDateTime.parse("1935-01-01T00:01:00"), 
rows.get(2).getField("tsCol"));
+    Assert.assertEquals(OffsetDateTime.parse("1935-05-17T01:10:34Z"), 
rows.get(3).getField("tsTzCol"));
+    Assert.assertEquals(LocalDateTime.parse("1935-05-01T00:01:00"), 
rows.get(3).getField("tsCol"));
+  }
+}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java 
b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index 7a3e0ce..7236a3c 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -183,7 +183,7 @@ public class ORC {
   }
 
   static Reader newFileReader(InputFile file, Configuration config) {
-    ReaderOptions readerOptions = OrcFile.readerOptions(config);
+    ReaderOptions readerOptions = 
OrcFile.readerOptions(config).useUTCTimestamp(true);
     if (file instanceof HadoopInputFile) {
       readerOptions.filesystem(((HadoopInputFile) file).getFileSystem());
     }
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java 
b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
index 558650a..3af6f58 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
@@ -41,12 +41,12 @@ import org.apache.orc.TypeDescription;
  */
 public final class ORCSchemaUtil {
 
-  private enum BinaryType {
+  public enum BinaryType {
     UUID, FIXED, BINARY
   }
 
-  private enum IntegerType {
-    TIME, INTEGER
+  public enum LongType {
+    TIME, LONG
   }
 
   private static class OrcField {
@@ -70,20 +70,27 @@ public final class ORCSchemaUtil {
   private static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id";
   private static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required";
 
-  private static final String ICEBERG_BINARY_TYPE_ATTRIBUTE = 
"iceberg.binary-type";
-  private static final String ICEBERG_INTEGER_TYPE_ATTRIBUTE = 
"iceberg.integer-type";
+  /**
+   * The name of the ORC {@link TypeDescription} attribute indicating the 
Iceberg type corresponding to an
+   * ORC binary type. The values for this attribute are denoted in {@code 
BinaryType}.
+   */
+  public static final String ICEBERG_BINARY_TYPE_ATTRIBUTE = 
"iceberg.binary-type";
+  /**
+   * The name of the ORC {@link TypeDescription} attribute indicating the 
Iceberg type corresponding to an
+   * ORC long type. The values for this attribute are denoted in {@code 
LongType}.
+   */
+  public static final String ICEBERG_LONG_TYPE_ATTRIBUTE = "iceberg.long-type";
   private static final String ICEBERG_FIELD_LENGTH = "iceberg.length";
 
   private static final ImmutableMap<Type.TypeID, TypeDescription.Category> 
TYPE_MAPPING =
       ImmutableMap.<Type.TypeID, TypeDescription.Category>builder()
           .put(Type.TypeID.BOOLEAN, TypeDescription.Category.BOOLEAN)
           .put(Type.TypeID.INTEGER, TypeDescription.Category.INT)
-          .put(Type.TypeID.TIME, TypeDescription.Category.INT)
           .put(Type.TypeID.LONG, TypeDescription.Category.LONG)
+          .put(Type.TypeID.TIME, TypeDescription.Category.LONG)
           .put(Type.TypeID.FLOAT, TypeDescription.Category.FLOAT)
           .put(Type.TypeID.DOUBLE, TypeDescription.Category.DOUBLE)
           .put(Type.TypeID.DATE, TypeDescription.Category.DATE)
-          .put(Type.TypeID.TIMESTAMP, TypeDescription.Category.TIMESTAMP)
           .put(Type.TypeID.STRING, TypeDescription.Category.STRING)
           .put(Type.TypeID.UUID, TypeDescription.Category.BINARY)
           .put(Type.TypeID.FIXED, TypeDescription.Category.BINARY)
@@ -112,14 +119,14 @@ public final class ORCSchemaUtil {
         break;
       case INTEGER:
         orcType = TypeDescription.createInt();
-        orcType.setAttribute(ICEBERG_INTEGER_TYPE_ATTRIBUTE, 
IntegerType.INTEGER.toString());
         break;
       case TIME:
-        orcType = TypeDescription.createInt();
-        orcType.setAttribute(ICEBERG_INTEGER_TYPE_ATTRIBUTE, 
IntegerType.TIME.toString());
+        orcType = TypeDescription.createLong();
+        orcType.setAttribute(ICEBERG_LONG_TYPE_ATTRIBUTE, 
LongType.TIME.toString());
         break;
       case LONG:
         orcType = TypeDescription.createLong();
+        orcType.setAttribute(ICEBERG_LONG_TYPE_ATTRIBUTE, 
LongType.LONG.toString());
         break;
       case FLOAT:
         orcType = TypeDescription.createFloat();
@@ -131,7 +138,12 @@ public final class ORCSchemaUtil {
         orcType = TypeDescription.createDate();
         break;
       case TIMESTAMP:
-        orcType = TypeDescription.createTimestamp();
+        Types.TimestampType tsType = (Types.TimestampType) type;
+        if (tsType.shouldAdjustToUTC()) {
+          orcType = TypeDescription.createTimestampInstant();
+        } else {
+          orcType = TypeDescription.createTimestamp();
+        }
         break;
       case STRING:
         orcType = TypeDescription.createString();
@@ -355,7 +367,14 @@ public final class ORCSchemaUtil {
   }
 
   private static boolean isSameType(TypeDescription orcType, Type icebergType) 
{
-    return Objects.equals(TYPE_MAPPING.get(icebergType.typeId()), 
orcType.getCategory());
+    if (icebergType.typeId() == Type.TypeID.TIMESTAMP) {
+      Types.TimestampType tsType = (Types.TimestampType) icebergType;
+      return Objects.equals(
+          tsType.shouldAdjustToUTC() ? 
TypeDescription.Category.TIMESTAMP_INSTANT : TypeDescription.Category.TIMESTAMP,
+          orcType.getCategory());
+    } else {
+      return Objects.equals(TYPE_MAPPING.get(icebergType.typeId()), 
orcType.getCategory());
+    }
   }
 
   private static Optional<Integer> icebergID(TypeDescription orcType) {
@@ -390,19 +409,18 @@ public final class ORCSchemaUtil {
       case BYTE:
       case SHORT:
       case INT:
-        IntegerType integerType = IntegerType.valueOf(
-            orcType.getAttributeValue(ICEBERG_INTEGER_TYPE_ATTRIBUTE)
-        );
-        switch (integerType) {
+        return getIcebergType(icebergID, name, Types.IntegerType.get(), 
isRequired);
+      case LONG:
+        String longAttributeValue = 
orcType.getAttributeValue(ICEBERG_LONG_TYPE_ATTRIBUTE);
+        LongType longType = longAttributeValue == null ? LongType.LONG : 
LongType.valueOf(longAttributeValue);
+        switch (longType) {
           case TIME:
             return getIcebergType(icebergID, name, Types.TimeType.get(), 
isRequired);
-          case INTEGER:
-            return getIcebergType(icebergID, name, Types.IntegerType.get(), 
isRequired);
+          case LONG:
+            return getIcebergType(icebergID, name, Types.LongType.get(), 
isRequired);
           default:
-            throw new IllegalStateException("Invalid Integer type found in ORC 
type attribute");
+            throw new IllegalStateException("Invalid Long type found in ORC 
type attribute");
         }
-      case LONG:
-        return getIcebergType(icebergID, name, Types.LongType.get(), 
isRequired);
       case FLOAT:
         return getIcebergType(icebergID, name, Types.FloatType.get(), 
isRequired);
       case DOUBLE:
@@ -412,8 +430,9 @@ public final class ORCSchemaUtil {
       case VARCHAR:
         return getIcebergType(icebergID, name, Types.StringType.get(), 
isRequired);
       case BINARY:
-        BinaryType binaryType = BinaryType.valueOf(
-            orcType.getAttributeValue(ICEBERG_BINARY_TYPE_ATTRIBUTE));
+        String binaryAttributeValue = 
orcType.getAttributeValue(ICEBERG_BINARY_TYPE_ATTRIBUTE);
+        BinaryType binaryType = binaryAttributeValue == null ? 
BinaryType.BINARY :
+            BinaryType.valueOf(binaryAttributeValue);
         switch (binaryType) {
           case UUID:
             return getIcebergType(icebergID, name, Types.UUIDType.get(), 
isRequired);
@@ -428,6 +447,8 @@ public final class ORCSchemaUtil {
       case DATE:
         return getIcebergType(icebergID, name, Types.DateType.get(), 
isRequired);
       case TIMESTAMP:
+        return getIcebergType(icebergID, name, 
Types.TimestampType.withoutZone(), isRequired);
+      case TIMESTAMP_INSTANT:
         return getIcebergType(icebergID, name, Types.TimestampType.withZone(), 
isRequired);
       case DECIMAL:
         return getIcebergType(icebergID, name,
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java 
b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
index 99e3f51..78c8acf 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
@@ -67,7 +67,7 @@ class OrcFileAppender<D> implements FileAppender<D> {
     TypeDescription orcSchema = ORCSchemaUtil.convert(this.schema);
     this.batch = orcSchema.createRowBatch(this.batchSize);
 
-    OrcFile.WriterOptions options = OrcFile.writerOptions(conf);
+    OrcFile.WriterOptions options = 
OrcFile.writerOptions(conf).useUTCTimestamp(true);
     if (file instanceof HadoopOutputFile) {
       options.fileSystem(((HadoopOutputFile) file).getFileSystem());
     }
diff --git a/site/docs/spec.md b/site/docs/spec.md
index 8a7e7a0..df1c0c4 100644
--- a/site/docs/spec.md
+++ b/site/docs/spec.md
@@ -486,26 +486,29 @@ Lists must use the [3-level 
representation](https://github.com/apache/parquet-fo
 
 **Data Type Mappings**
 
-| Type               | ORC type    | Notes                                     
                                              |
-|--------------------|-------------|-----------------------------------------------------------------------------------------|
-| **`boolean`**      | `boolean`   |                                           
                                              |
-| **`int`**          | `int`       | ORC `tinyint` and `smallint` would also 
map to **`int`**.                               |
-| **`long`**         | `long`      |                                           
                                              |
-| **`float`**        | `float`     |                                           
                                              |
-| **`double`**       | `double`    |                                           
                                              |
-| **`decimal(P,S)`** | `decimal`   |                                           
                                              |
-| **`date`**         | `date`      |                                           
                                              |
-| **`time`**         | `int`       | Stores microseconds from midnight.        
                                              |
-| **`timestamp`**    | `timestamp` |                                           
                                              |
-| **`timestamptz`**  | `struct`    | We should add this to ORC’s type model 
(ORC-294).                                       |
-| **`string`**       | `string`    | ORC `varchar` and `char` would also map 
to **`string`**.                                |
-| **`uuid`**         | `binary`    |                                           
                                              |
-| **`fixed(L)`**     | `binary`    | The length would not be checked by the 
ORC reader and should be checked by the adapter. |
-| **`binary`**       | `binary`    |                                           
                                              |
-| **`struct`**       | `struct`    | ORC `uniontype` would also map to 
**`struct`**.                                         |
-| **`list`**         | `array`     |                                           
                                              |
-| **`map`**          | `map`       |                                           
                                              |
+| Type               | ORC type            | ORC type attributes               
                   | Notes                                                      
                             |
+|--------------------|---------------------|------------------------------------------------------|-----------------------------------------------------------------------------------------|
+| **`boolean`**      | `boolean`           |                                   
                   |                                                            
                             |
+| **`int`**          | `int`               |                                   
                   | ORC `tinyint` and `smallint` would also map to **`int`**.  
                             |
+| **`long`**         | `long`              |                                   
                   |                                                            
                             |
+| **`float`**        | `float`             |                                   
                   |                                                            
                             |
+| **`double`**       | `double`            |                                   
                   |                                                            
                             |
+| **`decimal(P,S)`** | `decimal`           |                                   
                   |                                                            
                             |
+| **`date`**         | `date`              |                                   
                   |                                                            
                             |
+| **`time`**         | `long`              | `iceberg.long-type`=`TIME`        
                   | Stores microseconds from midnight.                         
                             |
+| **`timestamp`**    | `timestamp`         |                                   
                   | [1]                                                        
                             |
+| **`timestamptz`**  | `timestamp_instant` |                                   
                   | [1]                                                        
                             |
+| **`string`**       | `string`            |                                   
                   | ORC `varchar` and `char` would also map to **`string`**.   
                             |
+| **`uuid`**         | `binary`            | `iceberg.binary-type`=`UUID`      
                   |                                                            
                             |
+| **`fixed(L)`**     | `binary`            | `iceberg.binary-type`=`FIXED` & 
`iceberg.length`=`L` | The length would not be checked by the ORC reader and 
should be checked by the adapter. |
+| **`binary`**       | `binary`            |                                   
                   |                                                            
                             |
+| **`struct`**       | `struct`            |                                   
                   |                                                            
                             |
+| **`list`**         | `array`             |                                   
                   |                                                            
                             |
+| **`map`**          | `map`               |                                   
                   |                                                            
                             |
 
+Notes:
+
+1. ORC's 
[TimestampColumnVector](https://orc.apache.org/api/hive-storage-api/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.html)
 comprises of a time field (milliseconds since epoch) and a nanos field 
(nanoseconds within the second). Hence the milliseconds within the second are 
reported twice; once in the time field and again in the nanos field. The read 
adapter should only use milliseconds within the second from one of these 
fields. The write adapter should also report mill [...]
 
 One of the interesting challenges with this is how to map Iceberg’s schema 
evolution (id based) on to ORC’s (name based). In theory, we could use 
Iceberg’s column ids as the column and field names, but that would suck from a 
user’s point of view. 
 
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java 
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
index 47bb94b..b965c6d 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
@@ -146,8 +146,10 @@ public class SparkOrcReader implements 
OrcValueReader<InternalRow> {
         return row.getDecimal(ord, schema.getPrecision(), 
schema.getScale()).toString();
       case DATE:
         return "\"" + new DateWritable(row.getInt(ord)) + "\"";
-      case TIMESTAMP:
-        return "\"" + new Timestamp(row.getLong(ord)) + "\"";
+      case TIMESTAMP_INSTANT:
+        Timestamp ts = new Timestamp((row.getLong(ord) / 1_000_000) * 1_000); 
// initialize with seconds
+        ts.setNanos((int) (row.getLong(ord) % 1_000_000) * 1_000); // add the 
rest (millis to nanos)
+        return "\"" + ts + "\"";
       case STRUCT:
         return rowToString(row.getStruct(ord, schema.getChildren().size()), 
schema);
       case LIST: {
@@ -381,7 +383,7 @@ public class SparkOrcReader implements 
OrcValueReader<InternalRow> {
     }
   }
 
-  private static class TimestampConverter implements Converter {
+  private static class TimestampTzConverter implements Converter {
 
     private long convert(TimestampColumnVector vector, int row) {
       // compute microseconds past 1970.
@@ -693,8 +695,8 @@ public class SparkOrcReader implements 
OrcValueReader<InternalRow> {
         return new FloatConverter();
       case DOUBLE:
         return new DoubleConverter();
-      case TIMESTAMP:
-        return new TimestampConverter();
+      case TIMESTAMP_INSTANT:
+        return new TimestampTzConverter();
       case DECIMAL:
         if (schema.getPrecision() <= Decimal.MAX_LONG_DIGITS()) {
           return new Decimal18Converter(schema.getPrecision(), 
schema.getScale());
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java 
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
index 21b896b..c27f958 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
@@ -204,7 +204,7 @@ public class SparkOrcWriter implements 
OrcValueWriter<InternalRow> {
     }
   }
 
-  static class TimestampConverter implements Converter {
+  static class TimestampTzConverter implements Converter {
     @Override
     public void addValue(int rowId, int column, SpecializedGetters data,
                          ColumnVector output) {
@@ -391,8 +391,8 @@ public class SparkOrcWriter implements 
OrcValueWriter<InternalRow> {
         return schema.getPrecision() <= 18 ?
             new Decimal18Converter(schema) :
             new Decimal38Converter(schema);
-      case TIMESTAMP:
-        return new TimestampConverter();
+      case TIMESTAMP_INSTANT:
+        return new TimestampTzConverter();
       case STRUCT:
         return new StructConverter(schema);
       case LIST:
diff --git a/versions.lock b/versions.lock
index 3120ccd..93ec82f 100644
--- a/versions.lock
+++ b/versions.lock
@@ -149,7 +149,7 @@ org.apache.httpcomponents:httpclient:4.5.6 (4 constraints: 
573134dd)
 org.apache.httpcomponents:httpcore:4.4.10 (3 constraints: d327f763)
 org.apache.ivy:ivy:2.4.0 (3 constraints: 0826dbf1)
 org.apache.orc:orc-core:1.6.2 (3 constraints: ba1d17ad)
-org.apache.orc:orc-mapreduce:1.5.5 (1 constraints: c30cc227)
+org.apache.orc:orc-mapreduce:1.6.2 (1 constraints: c30cc227)
 org.apache.orc:orc-shims:1.6.2 (1 constraints: 3f0aeabc)
 org.apache.parquet:parquet-avro:1.11.0 (1 constraints: 35052c3b)
 org.apache.parquet:parquet-column:1.11.0 (3 constraints: 9429f2ca)
diff --git a/versions.props b/versions.props
index d8b2d39..a22c05f 100644
--- a/versions.props
+++ b/versions.props
@@ -3,7 +3,7 @@ com.google.guava:guava = 28.0-jre
 org.apache.avro:avro = 1.9.2
 org.apache.hadoop:* = 2.7.3
 org.apache.hive:hive-metastore = 2.3.6
-org.apache.orc:orc-core = 1.6.2
+org.apache.orc:* = 1.6.2
 org.apache.parquet:parquet-avro = 1.11.0
 org.apache.spark:spark-hive_2.11 = 2.4.4
 org.apache.spark:spark-avro_2.11 = 2.4.4

Reply via email to