This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 17b5ca5 Spark: Fix int96 timestamps, add end-to-end test (#1323)
17b5ca5 is described below
commit 17b5ca54560a99f5c8832a844896b64ff3d505ab
Author: Gustavo Torres <[email protected]>
AuthorDate: Tue Aug 11 18:33:39 2020 -0700
Spark: Fix int96 timestamps, add end-to-end test (#1323)
---
.../iceberg/data/parquet/BaseParquetReaders.java | 6 +--
.../apache/iceberg/parquet/MessageTypeToType.java | 2 +
.../iceberg/spark/data/TestSparkParquetReader.java | 46 +++++++++++++++++++++-
3 files changed, 50 insertions(+), 4 deletions(-)
diff --git
a/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
index 4b7d3a1..710c771 100644
--- a/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
+++ b/data/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
@@ -356,7 +356,7 @@ public abstract class BaseParquetReaders<T> {
}
}
- private static class TimestampInt96Reader extends
ParquetValueReaders.PrimitiveReader<LocalDateTime> {
+ private static class TimestampInt96Reader extends
ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private static final long UNIX_EPOCH_JULIAN = 2_440_588L;
private TimestampInt96Reader(ColumnDescriptor desc) {
@@ -364,14 +364,14 @@ public abstract class BaseParquetReaders<T> {
}
@Override
- public LocalDateTime read(LocalDateTime reuse) {
+ public OffsetDateTime read(OffsetDateTime reuse) {
final ByteBuffer byteBuffer =
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
final long timeOfDayNanos = byteBuffer.getLong();
final int julianDay = byteBuffer.getInt();
return Instant
.ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay -
UNIX_EPOCH_JULIAN))
-
.plusNanos(timeOfDayNanos).atOffset(ZoneOffset.UTC).toLocalDateTime();
+ .plusNanos(timeOfDayNanos).atOffset(ZoneOffset.UTC);
}
}
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java
b/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java
index 43c2cd2..069e505 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java
@@ -156,6 +156,8 @@ class MessageTypeToType extends ParquetTypeVisitor<Type> {
return Types.DoubleType.get();
case FIXED_LEN_BYTE_ARRAY:
return Types.FixedType.ofLength(primitive.getTypeLength());
+ case INT96:
+ return Types.TimestampType.withZone();
case BINARY:
return Types.BinaryType.get();
}
diff --git
a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
index 642a8de..03d234c 100644
---
a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
+++
b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
@@ -26,14 +26,23 @@ import java.util.List;
import java.util.Map;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.parquet.ParquetWriteAdapter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.TypeUtil;
@@ -95,6 +104,29 @@ public class TestSparkParquetReader extends AvroDataTest {
}
}
+ protected Table tableFromInputFile(InputFile inputFile, Schema schema)
throws IOException {
+ HadoopTables tables = new HadoopTables();
+ Table table =
+ tables.create(
+ schema,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of(),
+ temp.newFolder().getCanonicalPath());
+
+ table
+ .newAppend()
+ .appendFile(
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withFormat(FileFormat.PARQUET)
+ .withInputFile(inputFile)
+ .withMetrics(ParquetUtil.fileMetrics(inputFile,
MetricsConfig.getDefault()))
+ .withFileSizeInBytes(inputFile.getLength())
+ .build())
+ .commit();
+
+ return table;
+ }
+
@Test
public void testInt96TimestampProducedBySparkIsReadCorrectly() throws
IOException {
String outputFilePath = String.format("%s/%s",
temp.getRoot().getAbsolutePath(), "parquet_int96.parquet");
@@ -120,9 +152,21 @@ public class TestSparkParquetReader extends AvroDataTest {
writer.addAll(rows);
}
- List<InternalRow> readRows =
rowsFromFile(Files.localInput(outputFilePath), schema);
+ InputFile parquetInputFile = Files.localInput(outputFilePath);
+ List<InternalRow> readRows = rowsFromFile(parquetInputFile, schema);
Assert.assertEquals(rows.size(), readRows.size());
Assert.assertThat(readRows, CoreMatchers.is(rows));
+
+ // Now we try to import that file as an Iceberg table to make sure Iceberg
can read
+ // Int96 end to end.
+ Table int96Table = tableFromInputFile(parquetInputFile, schema);
+ List<Record> tableRecords =
Lists.newArrayList(IcebergGenerics.read(int96Table).build());
+
+ Assert.assertEquals(rows.size(), tableRecords.size());
+
+ for (int i = 0; i < tableRecords.size(); i++) {
+ GenericsHelpers.assertEqualsUnsafe(schema.asStruct(),
tableRecords.get(i), rows.get(i));
+ }
}
/**