This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 17b5ca5  Spark: Fix int96 timestamps, add end-to-end test (#1323)
17b5ca5 is described below

commit 17b5ca54560a99f5c8832a844896b64ff3d505ab
Author: Gustavo Torres <[email protected]>
AuthorDate: Tue Aug 11 18:33:39 2020 -0700

    Spark: Fix int96 timestamps, add end-to-end test (#1323)
---
 .../iceberg/data/parquet/BaseParquetReaders.java   |  6 +--
 .../apache/iceberg/parquet/MessageTypeToType.java  |  2 +
 .../iceberg/spark/data/TestSparkParquetReader.java | 46 +++++++++++++++++++++-
 3 files changed, 50 insertions(+), 4 deletions(-)

diff --git 
a/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java 
b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
index 4b7d3a1..710c771 100644
--- a/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
+++ b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
@@ -356,7 +356,7 @@ public abstract class BaseParquetReaders<T> {
     }
   }
 
-  private static class TimestampInt96Reader extends 
ParquetValueReaders.PrimitiveReader<LocalDateTime> {
+  private static class TimestampInt96Reader extends 
ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
     private static final long UNIX_EPOCH_JULIAN = 2_440_588L;
 
     private TimestampInt96Reader(ColumnDescriptor desc) {
@@ -364,14 +364,14 @@ public abstract class BaseParquetReaders<T> {
     }
 
     @Override
-    public LocalDateTime read(LocalDateTime reuse) {
+    public OffsetDateTime read(OffsetDateTime reuse) {
       final ByteBuffer byteBuffer = 
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
       final long timeOfDayNanos = byteBuffer.getLong();
       final int julianDay = byteBuffer.getInt();
 
       return Instant
               .ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - 
UNIX_EPOCH_JULIAN))
-              
.plusNanos(timeOfDayNanos).atOffset(ZoneOffset.UTC).toLocalDateTime();
+              .plusNanos(timeOfDayNanos).atOffset(ZoneOffset.UTC);
     }
   }
 
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java
index 43c2cd2..069e505 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java
@@ -156,6 +156,8 @@ class MessageTypeToType extends ParquetTypeVisitor<Type> {
         return Types.DoubleType.get();
       case FIXED_LEN_BYTE_ARRAY:
         return Types.FixedType.ofLength(primitive.getTypeLength());
+      case INT96:
+        return Types.TimestampType.withZone();
       case BINARY:
         return Types.BinaryType.get();
     }
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java 
b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
index 642a8de..03d234c 100644
--- 
a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
+++ 
b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
@@ -26,14 +26,23 @@ import java.util.List;
 import java.util.Map;
 import org.apache.avro.generic.GenericData;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetUtil;
 import org.apache.iceberg.parquet.ParquetWriteAdapter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.TypeUtil;
@@ -95,6 +104,29 @@ public class TestSparkParquetReader extends AvroDataTest {
     }
   }
 
+  protected Table tableFromInputFile(InputFile inputFile, Schema schema) 
throws IOException {
+    HadoopTables tables = new HadoopTables();
+    Table table =
+        tables.create(
+            schema,
+            PartitionSpec.unpartitioned(),
+            ImmutableMap.of(),
+            temp.newFolder().getCanonicalPath());
+
+    table
+        .newAppend()
+        .appendFile(
+            DataFiles.builder(PartitionSpec.unpartitioned())
+                .withFormat(FileFormat.PARQUET)
+                .withInputFile(inputFile)
+                .withMetrics(ParquetUtil.fileMetrics(inputFile, 
MetricsConfig.getDefault()))
+                .withFileSizeInBytes(inputFile.getLength())
+                .build())
+        .commit();
+
+    return table;
+  }
+
   @Test
   public void testInt96TimestampProducedBySparkIsReadCorrectly() throws 
IOException {
     String outputFilePath = String.format("%s/%s", 
temp.getRoot().getAbsolutePath(), "parquet_int96.parquet");
@@ -120,9 +152,21 @@ public class TestSparkParquetReader extends AvroDataTest {
       writer.addAll(rows);
     }
 
-    List<InternalRow> readRows = 
rowsFromFile(Files.localInput(outputFilePath), schema);
+    InputFile parquetInputFile = Files.localInput(outputFilePath);
+    List<InternalRow> readRows = rowsFromFile(parquetInputFile, schema);
     Assert.assertEquals(rows.size(), readRows.size());
     Assert.assertThat(readRows, CoreMatchers.is(rows));
+
+    // Now we try to import that file as an Iceberg table to make sure Iceberg 
can read
+    // Int96 end to end.
+    Table int96Table = tableFromInputFile(parquetInputFile, schema);
+    List<Record> tableRecords = 
Lists.newArrayList(IcebergGenerics.read(int96Table).build());
+
+    Assert.assertEquals(rows.size(), tableRecords.size());
+
+    for (int i = 0;  i < tableRecords.size(); i++) {
+      GenericsHelpers.assertEqualsUnsafe(schema.asStruct(), 
tableRecords.get(i), rows.get(i));
+    }
   }
 
   /**

Reply via email to