This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 6739efe ORC: Implement TestGenericData and fix related issues (#778)
6739efe is described below
commit 6739efe0fc701ed8b40ef0880c58d4d26fc5a451
Author: Shardul Mahadik <[email protected]>
AuthorDate: Tue Mar 31 14:54:30 2020 -0700
ORC: Implement TestGenericData and fix related issues (#778)
---
.../apache/iceberg/data/orc/GenericOrcReader.java | 152 ++++++++++++++++---
.../apache/iceberg/data/orc/GenericOrcWriter.java | 168 ++++++++++++++++++---
.../apache/iceberg/data/orc/TestGenericData.java | 130 ++++++++++++++++
orc/src/main/java/org/apache/iceberg/orc/ORC.java | 2 +-
.../java/org/apache/iceberg/orc/ORCSchemaUtil.java | 67 +++++---
.../org/apache/iceberg/orc/OrcFileAppender.java | 2 +-
site/docs/spec.md | 41 ++---
.../apache/iceberg/spark/data/SparkOrcReader.java | 12 +-
.../apache/iceberg/spark/data/SparkOrcWriter.java | 6 +-
versions.lock | 2 +-
versions.props | 2 +-
11 files changed, 485 insertions(+), 99 deletions(-)
diff --git
a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
index e5f0619..88c407d 100644
--- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
+++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
@@ -23,13 +23,22 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.orc.ORCSchemaUtil;
import org.apache.iceberg.orc.OrcValueReader;
import org.apache.iceberg.types.Types;
import org.apache.orc.TypeDescription;
@@ -53,6 +62,9 @@ public class GenericOrcReader implements
OrcValueReader<Record> {
private final List<TypeDescription> columns;
private final Converter[] converters;
+ private static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+ private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
private GenericOrcReader(Schema expectedSchema, TypeDescription readSchema) {
this.schema = expectedSchema;
this.columns = readSchema.getChildren();
@@ -70,8 +82,7 @@ public class GenericOrcReader implements
OrcValueReader<Record> {
return newConverters;
}
- public static OrcValueReader<Record> buildReader(Schema expectedSchema,
- TypeDescription fileSchema)
{
+ public static OrcValueReader<Record> buildReader(Schema expectedSchema,
TypeDescription fileSchema) {
return new GenericOrcReader(expectedSchema, fileSchema);
}
@@ -136,6 +147,30 @@ public class GenericOrcReader implements
OrcValueReader<Record> {
}
}
+ private static class TimeConverter implements Converter<LocalTime> {
+ @Override
+ public LocalTime convert(ColumnVector vector, int row) {
+ int rowIndex = vector.isRepeating ? 0 : row;
+ if (!vector.noNulls && vector.isNull[rowIndex]) {
+ return null;
+ } else {
+ return LocalTime.ofNanoOfDay(((LongColumnVector)
vector).vector[rowIndex] * 1_000);
+ }
+ }
+ }
+
+ private static class DateConverter implements Converter<LocalDate> {
+ @Override
+ public LocalDate convert(ColumnVector vector, int row) {
+ int rowIndex = vector.isRepeating ? 0 : row;
+ if (!vector.noNulls && vector.isNull[rowIndex]) {
+ return null;
+ } else {
+ return EPOCH_DAY.plusDays((int) ((LongColumnVector)
vector).vector[rowIndex]);
+ }
+ }
+ }
+
private static class LongConverter implements Converter<Long> {
@Override
public Long convert(ColumnVector vector, int row) {
@@ -172,14 +207,13 @@ public class GenericOrcReader implements
OrcValueReader<Record> {
}
}
- private static class TimestampConverter implements Converter<Long> {
- private Long convert(TimestampColumnVector vector, int row) {
- // compute microseconds past 1970.
- return (vector.time[row] / 1000) * 1_000_000 + vector.nanos[row] / 1000;
+ private static class TimestampTzConverter implements
Converter<OffsetDateTime> {
+ private OffsetDateTime convert(TimestampColumnVector vector, int row) {
+ return Instant.ofEpochSecond(Math.floorDiv(vector.time[row], 1_000),
vector.nanos[row]).atOffset(ZoneOffset.UTC);
}
@Override
- public Long convert(ColumnVector vector, int row) {
+ public OffsetDateTime convert(ColumnVector vector, int row) {
int rowIndex = vector.isRepeating ? 0 : row;
if (!vector.noNulls && vector.isNull[rowIndex]) {
return null;
@@ -189,7 +223,25 @@ public class GenericOrcReader implements
OrcValueReader<Record> {
}
}
- private static class BinaryConverter implements Converter<byte[]> {
+ private static class TimestampConverter implements Converter<LocalDateTime> {
+
+ private LocalDateTime convert(TimestampColumnVector vector, int row) {
+ return Instant.ofEpochSecond(Math.floorDiv(vector.time[row], 1_000),
vector.nanos[row]).atOffset(ZoneOffset.UTC)
+ .toLocalDateTime();
+ }
+
+ @Override
+ public LocalDateTime convert(ColumnVector vector, int row) {
+ int rowIndex = vector.isRepeating ? 0 : row;
+ if (!vector.noNulls && vector.isNull[rowIndex]) {
+ return null;
+ } else {
+ return convert((TimestampColumnVector) vector, rowIndex);
+ }
+ }
+ }
+
+ private static class FixedConverter implements Converter<byte[]> {
@Override
public byte[] convert(ColumnVector vector, int row) {
int rowIndex = vector.isRepeating ? 0 : row;
@@ -197,22 +249,53 @@ public class GenericOrcReader implements
OrcValueReader<Record> {
return null;
} else {
BytesColumnVector bytesVector = (BytesColumnVector) vector;
- return Arrays.copyOfRange(bytesVector.vector[rowIndex],
- bytesVector.start[rowIndex],
+ return Arrays.copyOfRange(bytesVector.vector[rowIndex],
bytesVector.start[rowIndex],
bytesVector.start[rowIndex] + bytesVector.length[rowIndex]);
}
}
}
+ private static class BinaryConverter implements Converter<ByteBuffer> {
+ @Override
+ public ByteBuffer convert(ColumnVector vector, int row) {
+ int rowIndex = vector.isRepeating ? 0 : row;
+ if (!vector.noNulls && vector.isNull[rowIndex]) {
+ return null;
+ } else {
+ BytesColumnVector bytesVector = (BytesColumnVector) vector;
+ return ByteBuffer.wrap(bytesVector.vector[rowIndex],
bytesVector.start[rowIndex], bytesVector.length[rowIndex]);
+ }
+ }
+ }
+
+ private static class UUIDConverter implements Converter<UUID> {
+ @Override
+ public UUID convert(ColumnVector vector, int row) {
+ int rowIndex = vector.isRepeating ? 0 : row;
+ if (!vector.noNulls && vector.isNull[rowIndex]) {
+ return null;
+ } else {
+ BytesColumnVector bytesVector = (BytesColumnVector) vector;
+ ByteBuffer buf = ByteBuffer.wrap(bytesVector.vector[rowIndex],
bytesVector.start[rowIndex],
+ bytesVector.length[rowIndex]);
+ long mostSigBits = buf.getLong();
+ long leastSigBits = buf.getLong();
+ return new UUID(mostSigBits, leastSigBits);
+ }
+ }
+ }
+
private static class StringConverter implements Converter<String> {
@Override
public String convert(ColumnVector vector, int row) {
- BinaryConverter converter = new BinaryConverter();
- byte[] byteData = converter.convert(vector, row);
- if (byteData == null) {
+ int rowIndex = vector.isRepeating ? 0 : row;
+ if (!vector.noNulls && vector.isNull[rowIndex]) {
return null;
+ } else {
+ BytesColumnVector bytesVector = (BytesColumnVector) vector;
+ return new String(bytesVector.vector[rowIndex],
bytesVector.start[rowIndex], bytesVector.length[rowIndex],
+ StandardCharsets.UTF_8);
}
- return new String(byteData, StandardCharsets.UTF_8);
}
}
@@ -223,8 +306,8 @@ public class GenericOrcReader implements
OrcValueReader<Record> {
if (!vector.noNulls && vector.isNull[rowIndex]) {
return null;
} else {
- return ((DecimalColumnVector) vector).vector[rowIndex]
- .getHiveDecimal().bigDecimalValue();
+ DecimalColumnVector cv = (DecimalColumnVector) vector;
+ return
cv.vector[rowIndex].getHiveDecimal().bigDecimalValue().setScale(cv.scale);
}
}
}
@@ -283,10 +366,9 @@ public class GenericOrcReader implements
OrcValueReader<Record> {
final int offset = (int) vector.offsets[row];
final int length = (int) vector.lengths[row];
- // serialize the keys
- Map<String, Object> map = Maps.newHashMapWithExpectedSize(length);
+ Map<Object, Object> map = Maps.newHashMapWithExpectedSize(length);
for (int c = 0; c < length; ++c) {
- String key = String.valueOf(keyConvert.convert(vector.keys, offset +
c));
+ Object key = keyConvert.convert(vector.keys, offset + c);
Object value = valueConvert.convert(vector.values, offset + c);
map.put(key, value);
}
@@ -341,8 +423,7 @@ public class GenericOrcReader implements
OrcValueReader<Record> {
}
}
- private static Converter buildConverter(final Types.NestedField icebergField,
- final TypeDescription schema) {
+ private static Converter buildConverter(final Types.NestedField
icebergField, final TypeDescription schema) {
switch (schema.getCategory()) {
case BOOLEAN:
return new BooleanConverter();
@@ -351,20 +432,45 @@ public class GenericOrcReader implements
OrcValueReader<Record> {
case SHORT:
return new ShortConverter();
case DATE:
+ return new DateConverter();
case INT:
return new IntConverter();
case LONG:
- return new LongConverter();
+ String longAttributeValue =
schema.getAttributeValue(ORCSchemaUtil.ICEBERG_LONG_TYPE_ATTRIBUTE);
+ ORCSchemaUtil.LongType longType = longAttributeValue == null ?
ORCSchemaUtil.LongType.LONG :
+ ORCSchemaUtil.LongType.valueOf(longAttributeValue);
+ switch (longType) {
+ case TIME:
+ return new TimeConverter();
+ case LONG:
+ return new LongConverter();
+ default:
+ throw new IllegalStateException("Unhandled Long type found in ORC
type attribute: " + longType);
+ }
case FLOAT:
return new FloatConverter();
case DOUBLE:
return new DoubleConverter();
case TIMESTAMP:
return new TimestampConverter();
+ case TIMESTAMP_INSTANT:
+ return new TimestampTzConverter();
case DECIMAL:
return new DecimalConverter();
case BINARY:
- return new BinaryConverter();
+ String binaryAttributeValue =
schema.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE);
+ ORCSchemaUtil.BinaryType binaryType = binaryAttributeValue == null ?
ORCSchemaUtil.BinaryType.BINARY :
+ ORCSchemaUtil.BinaryType.valueOf(binaryAttributeValue);
+ switch (binaryType) {
+ case UUID:
+ return new UUIDConverter();
+ case FIXED:
+ return new FixedConverter();
+ case BINARY:
+ return new BinaryConverter();
+ default:
+ throw new IllegalStateException("Unhandled Binary type found in
ORC type attribute: " + binaryType);
+ }
case STRING:
case CHAR:
case VARCHAR:
diff --git
a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
index 78c6f0d..d6f2025 100644
--- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
+++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
@@ -22,10 +22,20 @@ package org.apache.iceberg.data.orc;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.orc.ORCSchemaUtil;
import org.apache.iceberg.orc.OrcValueWriter;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.common.type.HiveDecimal;
@@ -40,10 +50,10 @@ import
org.apache.orc.storage.ql.exec.vector.StructColumnVector;
import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
-/**
- */
public class GenericOrcWriter implements OrcValueWriter<Record> {
private final Converter[] converters;
+ private static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+ private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
private GenericOrcWriter(TypeDescription schema) {
this.converters = buildConverters(schema);
@@ -151,6 +161,23 @@ public class GenericOrcWriter implements
OrcValueWriter<Record> {
}
}
+ static class TimeConverter implements Converter<LocalTime> {
+ @Override
+ public Class<LocalTime> getJavaClass() {
+ return LocalTime.class;
+ }
+
+ public void addValue(int rowId, LocalTime data, ColumnVector output) {
+ if (data == null) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ ((LongColumnVector) output).vector[rowId] = data.toNanoOfDay() / 1_000;
+ }
+ }
+ }
+
static class LongConverter implements Converter<Long> {
@Override
public Class<Long> getJavaClass() {
@@ -224,7 +251,44 @@ public class GenericOrcWriter implements
OrcValueWriter<Record> {
}
}
- static class BytesConverter implements Converter<byte[]> {
+ static class BytesConverter implements Converter<ByteBuffer> {
+ @Override
+ public Class<ByteBuffer> getJavaClass() {
+ return ByteBuffer.class;
+ }
+
+ public void addValue(int rowId, ByteBuffer data, ColumnVector output) {
+ if (data == null) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ ((BytesColumnVector) output).setRef(rowId, data.array(), 0,
data.array().length);
+ }
+ }
+ }
+
+ static class UUIDConverter implements Converter<UUID> {
+ @Override
+ public Class<UUID> getJavaClass() {
+ return UUID.class;
+ }
+
+ public void addValue(int rowId, UUID data, ColumnVector output) {
+ if (data == null) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ ByteBuffer buffer = ByteBuffer.allocate(16);
+ buffer.putLong(data.getMostSignificantBits());
+ buffer.putLong(data.getLeastSignificantBits());
+ ((BytesColumnVector) output).setRef(rowId, buffer.array(), 0,
buffer.array().length);
+ }
+ }
+ }
+
+ static class FixedConverter implements Converter<byte[]> {
@Override
public Class<byte[]> getJavaClass() {
return byte[].class;
@@ -237,30 +301,67 @@ public class GenericOrcWriter implements
OrcValueWriter<Record> {
output.isNull[rowId] = true;
} else {
output.isNull[rowId] = false;
- // getBinary always makes a copy, so we don't need to worry about it
- // being changed behind our back.
((BytesColumnVector) output).setRef(rowId, data, 0, data.length);
}
}
}
- static class TimestampConverter implements Converter<Long> {
+ static class DateConverter implements Converter<LocalDate> {
@Override
- public Class<Long> getJavaClass() {
- return Long.class;
+ public Class<LocalDate> getJavaClass() {
+ return LocalDate.class;
}
@Override
- public void addValue(int rowId, Long data, ColumnVector output) {
+ public void addValue(int rowId, LocalDate data, ColumnVector output) {
+ if (data == null) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ ((LongColumnVector) output).vector[rowId] =
ChronoUnit.DAYS.between(EPOCH_DAY, data);
+ }
+ }
+ }
+
+ static class TimestampTzConverter implements Converter<OffsetDateTime> {
+ @Override
+ public Class<OffsetDateTime> getJavaClass() {
+ return OffsetDateTime.class;
+ }
+
+ @Override
+ public void addValue(int rowId, OffsetDateTime data, ColumnVector output) {
if (data == null) {
output.noNulls = false;
output.isNull[rowId] = true;
} else {
output.isNull[rowId] = false;
TimestampColumnVector cv = (TimestampColumnVector) output;
- long micros = data;
- cv.time[rowId] = micros / 1_000; // millis
- cv.nanos[rowId] = (int) (micros % 1_000_000) * 1_000; // nanos
+ cv.time[rowId] = data.toInstant().toEpochMilli(); // millis
+ cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos
to only keep microsecond precision
+ }
+ }
+ }
+
+ static class TimestampConverter implements Converter<LocalDateTime> {
+
+ @Override
+ public Class<LocalDateTime> getJavaClass() {
+ return LocalDateTime.class;
+ }
+
+ @Override
+ public void addValue(int rowId, LocalDateTime data, ColumnVector output) {
+ if (data == null) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ TimestampColumnVector cv = (TimestampColumnVector) output;
+ cv.setIsUTC(true);
+ cv.time[rowId] = data.toInstant(ZoneOffset.UTC).toEpochMilli(); //
millis
+ cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos
to only keep microsecond precision
}
}
}
@@ -286,7 +387,7 @@ public class GenericOrcWriter implements
OrcValueWriter<Record> {
} else {
output.isNull[rowId] = false;
((DecimalColumnVector) output).vector[rowId]
- .setFromLongAndScale(data.longValueExact(), scale);
+ .setFromLongAndScale(data.unscaledValue().longValueExact(), scale);
}
}
}
@@ -308,7 +409,7 @@ public class GenericOrcWriter implements
OrcValueWriter<Record> {
output.isNull[rowId] = true;
} else {
output.isNull[rowId] = false;
- ((DecimalColumnVector)
output).vector[rowId].set(HiveDecimal.create(data));
+ ((DecimalColumnVector)
output).vector[rowId].set(HiveDecimal.create(data, false));
}
}
}
@@ -402,10 +503,10 @@ public class GenericOrcWriter implements
OrcValueWriter<Record> {
output.isNull[rowId] = true;
} else {
output.isNull[rowId] = false;
- Map<String, Object> map = (Map<String, Object>) data;
- List<String> keys = Lists.newArrayListWithExpectedSize(map.size());
+ Map<Object, Object> map = (Map<Object, Object>) data;
+ List<Object> keys = Lists.newArrayListWithExpectedSize(map.size());
List<Object> values = Lists.newArrayListWithExpectedSize(map.size());
- for (Map.Entry<String, ?> entry : map.entrySet()) {
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
keys.add(entry.getKey());
values.add(entry.getValue());
}
@@ -436,26 +537,49 @@ public class GenericOrcWriter implements
OrcValueWriter<Record> {
case SHORT:
return new ShortConverter();
case DATE:
+ return new DateConverter();
case INT:
return new IntConverter();
case LONG:
- return new LongConverter();
+ String longAttributeValue =
schema.getAttributeValue(ORCSchemaUtil.ICEBERG_LONG_TYPE_ATTRIBUTE);
+ ORCSchemaUtil.LongType longType = longAttributeValue == null ?
ORCSchemaUtil.LongType.LONG :
+ ORCSchemaUtil.LongType.valueOf(longAttributeValue);
+ switch (longType) {
+ case TIME:
+ return new TimeConverter();
+ case LONG:
+ return new LongConverter();
+ default:
+ throw new IllegalStateException("Unhandled Long type found in ORC
type attribute: " + longType);
+ }
case FLOAT:
return new FloatConverter();
case DOUBLE:
return new DoubleConverter();
case BINARY:
- return new BytesConverter();
+ String binaryAttributeValue =
schema.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE);
+ ORCSchemaUtil.BinaryType binaryType = binaryAttributeValue == null ?
ORCSchemaUtil.BinaryType.BINARY :
+ ORCSchemaUtil.BinaryType.valueOf(binaryAttributeValue);
+ switch (binaryType) {
+ case UUID:
+ return new UUIDConverter();
+ case FIXED:
+ return new FixedConverter();
+ case BINARY:
+ return new BytesConverter();
+ default:
+ throw new IllegalStateException("Unhandled Binary type found in
ORC type attribute: " + binaryType);
+ }
case STRING:
case CHAR:
case VARCHAR:
return new StringConverter();
case DECIMAL:
- return schema.getPrecision() <= 18 ?
- new Decimal18Converter(schema) :
- new Decimal38Converter(schema);
+ return schema.getPrecision() <= 18 ? new Decimal18Converter(schema) :
new Decimal38Converter(schema);
case TIMESTAMP:
return new TimestampConverter();
+ case TIMESTAMP_INSTANT:
+ return new TimestampTzConverter();
case STRUCT:
return new StructConverter(schema);
case LIST:
diff --git
a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java
b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java
new file mode 100644
index 0000000..670b455
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.TimeZone;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.DataTest;
+import org.apache.iceberg.data.DataTestHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestGenericData extends DataTest {
+
+ @Override
+ protected void writeAndValidate(Schema schema) throws IOException {
+ List<Record> expected = RandomGenericData.generate(schema, 100, 0L);
+
+ File testFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", testFile.delete());
+
+ try (FileAppender<Record> writer = ORC.write(Files.localOutput(testFile))
+ .schema(schema)
+ .createWriterFunc(GenericOrcWriter::buildWriter)
+ .build()) {
+ for (Record rec : expected) {
+ writer.add(rec);
+ }
+ }
+
+ List<Record> rows;
+ try (CloseableIterable<Record> reader =
ORC.read(Files.localInput(testFile))
+ .project(schema)
+ .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema,
fileSchema))
+ .build()) {
+ rows = Lists.newArrayList(reader);
+ }
+
+ for (int i = 0; i < expected.size(); i += 1) {
+ DataTestHelpers.assertEquals(schema.asStruct(), expected.get(i),
rows.get(i));
+ }
+ }
+
+ @Test
+ public void writeAndValidateTimestamps() throws IOException {
+ Schema timestampSchema = new Schema(
+ required(1, "tsTzCol", Types.TimestampType.withZone()),
+ required(2, "tsCol", Types.TimestampType.withoutZone())
+ );
+
+ // Write using America/New_York timezone
+ TimeZone.setDefault(TimeZone.getTimeZone("America/New_York"));
+ GenericRecord record1 = GenericRecord.create(timestampSchema);
+ record1.setField("tsTzCol",
OffsetDateTime.parse("2017-01-16T17:10:34-08:00"));
+ record1.setField("tsCol", LocalDateTime.parse("1970-01-01T00:01:00"));
+ GenericRecord record2 = GenericRecord.create(timestampSchema);
+ record2.setField("tsTzCol",
OffsetDateTime.parse("2017-05-16T17:10:34-08:00"));
+ record2.setField("tsCol", LocalDateTime.parse("1970-05-01T00:01:00"));
+ GenericRecord record3 = GenericRecord.create(timestampSchema);
+ record3.setField("tsTzCol",
OffsetDateTime.parse("1935-01-16T17:10:34-08:00"));
+ record3.setField("tsCol", LocalDateTime.parse("1935-01-01T00:01:00"));
+ GenericRecord record4 = GenericRecord.create(timestampSchema);
+ record4.setField("tsTzCol",
OffsetDateTime.parse("1935-05-16T17:10:34-08:00"));
+ record4.setField("tsCol", LocalDateTime.parse("1935-05-01T00:01:00"));
+
+ File testFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", testFile.delete());
+
+ try (FileAppender<Record> writer = ORC.write(Files.localOutput(testFile))
+ .schema(timestampSchema)
+ .createWriterFunc(GenericOrcWriter::buildWriter)
+ .build()) {
+ writer.add(record1);
+ writer.add(record2);
+ writer.add(record3);
+ writer.add(record4);
+ }
+
+ // Read using Asia/Kolkata timezone
+ TimeZone.setDefault(TimeZone.getTimeZone("Asia/Kolkata"));
+ List<Record> rows;
+ try (CloseableIterable<Record> reader =
ORC.read(Files.localInput(testFile))
+ .project(timestampSchema)
+ .createReaderFunc(fileSchema ->
GenericOrcReader.buildReader(timestampSchema, fileSchema))
+ .build()) {
+ rows = Lists.newArrayList(reader);
+ }
+
+ Assert.assertEquals(OffsetDateTime.parse("2017-01-17T01:10:34Z"),
rows.get(0).getField("tsTzCol"));
+ Assert.assertEquals(LocalDateTime.parse("1970-01-01T00:01:00"),
rows.get(0).getField("tsCol"));
+ Assert.assertEquals(OffsetDateTime.parse("2017-05-17T01:10:34Z"),
rows.get(1).getField("tsTzCol"));
+ Assert.assertEquals(LocalDateTime.parse("1970-05-01T00:01:00"),
rows.get(1).getField("tsCol"));
+ Assert.assertEquals(OffsetDateTime.parse("1935-01-17T01:10:34Z"),
rows.get(2).getField("tsTzCol"));
+ Assert.assertEquals(LocalDateTime.parse("1935-01-01T00:01:00"),
rows.get(2).getField("tsCol"));
+ Assert.assertEquals(OffsetDateTime.parse("1935-05-17T01:10:34Z"),
rows.get(3).getField("tsTzCol"));
+ Assert.assertEquals(LocalDateTime.parse("1935-05-01T00:01:00"),
rows.get(3).getField("tsCol"));
+ }
+}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index 7a3e0ce..7236a3c 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -183,7 +183,7 @@ public class ORC {
}
static Reader newFileReader(InputFile file, Configuration config) {
- ReaderOptions readerOptions = OrcFile.readerOptions(config);
+ ReaderOptions readerOptions =
OrcFile.readerOptions(config).useUTCTimestamp(true);
if (file instanceof HadoopInputFile) {
readerOptions.filesystem(((HadoopInputFile) file).getFileSystem());
}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
index 558650a..3af6f58 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
@@ -41,12 +41,12 @@ import org.apache.orc.TypeDescription;
*/
public final class ORCSchemaUtil {
- private enum BinaryType {
+ public enum BinaryType {
UUID, FIXED, BINARY
}
- private enum IntegerType {
- TIME, INTEGER
+ public enum LongType {
+ TIME, LONG
}
private static class OrcField {
@@ -70,20 +70,27 @@ public final class ORCSchemaUtil {
private static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id";
private static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required";
- private static final String ICEBERG_BINARY_TYPE_ATTRIBUTE =
"iceberg.binary-type";
- private static final String ICEBERG_INTEGER_TYPE_ATTRIBUTE =
"iceberg.integer-type";
+ /**
+ * The name of the ORC {@link TypeDescription} attribute indicating the
Iceberg type corresponding to an
+ * ORC binary type. The values for this attribute are denoted in {@code
BinaryType}.
+ */
+ public static final String ICEBERG_BINARY_TYPE_ATTRIBUTE =
"iceberg.binary-type";
+ /**
+ * The name of the ORC {@link TypeDescription} attribute indicating the
Iceberg type corresponding to an
+ * ORC long type. The values for this attribute are denoted in {@code
LongType}.
+ */
+ public static final String ICEBERG_LONG_TYPE_ATTRIBUTE = "iceberg.long-type";
private static final String ICEBERG_FIELD_LENGTH = "iceberg.length";
private static final ImmutableMap<Type.TypeID, TypeDescription.Category>
TYPE_MAPPING =
ImmutableMap.<Type.TypeID, TypeDescription.Category>builder()
.put(Type.TypeID.BOOLEAN, TypeDescription.Category.BOOLEAN)
.put(Type.TypeID.INTEGER, TypeDescription.Category.INT)
- .put(Type.TypeID.TIME, TypeDescription.Category.INT)
.put(Type.TypeID.LONG, TypeDescription.Category.LONG)
+ .put(Type.TypeID.TIME, TypeDescription.Category.LONG)
.put(Type.TypeID.FLOAT, TypeDescription.Category.FLOAT)
.put(Type.TypeID.DOUBLE, TypeDescription.Category.DOUBLE)
.put(Type.TypeID.DATE, TypeDescription.Category.DATE)
- .put(Type.TypeID.TIMESTAMP, TypeDescription.Category.TIMESTAMP)
.put(Type.TypeID.STRING, TypeDescription.Category.STRING)
.put(Type.TypeID.UUID, TypeDescription.Category.BINARY)
.put(Type.TypeID.FIXED, TypeDescription.Category.BINARY)
@@ -112,14 +119,14 @@ public final class ORCSchemaUtil {
break;
case INTEGER:
orcType = TypeDescription.createInt();
- orcType.setAttribute(ICEBERG_INTEGER_TYPE_ATTRIBUTE,
IntegerType.INTEGER.toString());
break;
case TIME:
- orcType = TypeDescription.createInt();
- orcType.setAttribute(ICEBERG_INTEGER_TYPE_ATTRIBUTE,
IntegerType.TIME.toString());
+ orcType = TypeDescription.createLong();
+ orcType.setAttribute(ICEBERG_LONG_TYPE_ATTRIBUTE,
LongType.TIME.toString());
break;
case LONG:
orcType = TypeDescription.createLong();
+ orcType.setAttribute(ICEBERG_LONG_TYPE_ATTRIBUTE,
LongType.LONG.toString());
break;
case FLOAT:
orcType = TypeDescription.createFloat();
@@ -131,7 +138,12 @@ public final class ORCSchemaUtil {
orcType = TypeDescription.createDate();
break;
case TIMESTAMP:
- orcType = TypeDescription.createTimestamp();
+ Types.TimestampType tsType = (Types.TimestampType) type;
+ if (tsType.shouldAdjustToUTC()) {
+ orcType = TypeDescription.createTimestampInstant();
+ } else {
+ orcType = TypeDescription.createTimestamp();
+ }
break;
case STRING:
orcType = TypeDescription.createString();
@@ -355,7 +367,14 @@ public final class ORCSchemaUtil {
}
private static boolean isSameType(TypeDescription orcType, Type icebergType)
{
- return Objects.equals(TYPE_MAPPING.get(icebergType.typeId()),
orcType.getCategory());
+ if (icebergType.typeId() == Type.TypeID.TIMESTAMP) {
+ Types.TimestampType tsType = (Types.TimestampType) icebergType;
+ return Objects.equals(
+ tsType.shouldAdjustToUTC() ?
TypeDescription.Category.TIMESTAMP_INSTANT : TypeDescription.Category.TIMESTAMP,
+ orcType.getCategory());
+ } else {
+ return Objects.equals(TYPE_MAPPING.get(icebergType.typeId()),
orcType.getCategory());
+ }
}
private static Optional<Integer> icebergID(TypeDescription orcType) {
@@ -390,19 +409,18 @@ public final class ORCSchemaUtil {
case BYTE:
case SHORT:
case INT:
- IntegerType integerType = IntegerType.valueOf(
- orcType.getAttributeValue(ICEBERG_INTEGER_TYPE_ATTRIBUTE)
- );
- switch (integerType) {
+ return getIcebergType(icebergID, name, Types.IntegerType.get(),
isRequired);
+ case LONG:
+ String longAttributeValue =
orcType.getAttributeValue(ICEBERG_LONG_TYPE_ATTRIBUTE);
+ LongType longType = longAttributeValue == null ? LongType.LONG :
LongType.valueOf(longAttributeValue);
+ switch (longType) {
case TIME:
return getIcebergType(icebergID, name, Types.TimeType.get(),
isRequired);
- case INTEGER:
- return getIcebergType(icebergID, name, Types.IntegerType.get(),
isRequired);
+ case LONG:
+ return getIcebergType(icebergID, name, Types.LongType.get(),
isRequired);
default:
- throw new IllegalStateException("Invalid Integer type found in ORC
type attribute");
+ throw new IllegalStateException("Invalid Long type found in ORC
type attribute");
}
- case LONG:
- return getIcebergType(icebergID, name, Types.LongType.get(),
isRequired);
case FLOAT:
return getIcebergType(icebergID, name, Types.FloatType.get(),
isRequired);
case DOUBLE:
@@ -412,8 +430,9 @@ public final class ORCSchemaUtil {
case VARCHAR:
return getIcebergType(icebergID, name, Types.StringType.get(),
isRequired);
case BINARY:
- BinaryType binaryType = BinaryType.valueOf(
- orcType.getAttributeValue(ICEBERG_BINARY_TYPE_ATTRIBUTE));
+ String binaryAttributeValue =
orcType.getAttributeValue(ICEBERG_BINARY_TYPE_ATTRIBUTE);
+ BinaryType binaryType = binaryAttributeValue == null ?
BinaryType.BINARY :
+ BinaryType.valueOf(binaryAttributeValue);
switch (binaryType) {
case UUID:
return getIcebergType(icebergID, name, Types.UUIDType.get(),
isRequired);
@@ -428,6 +447,8 @@ public final class ORCSchemaUtil {
case DATE:
return getIcebergType(icebergID, name, Types.DateType.get(),
isRequired);
case TIMESTAMP:
+ return getIcebergType(icebergID, name,
Types.TimestampType.withoutZone(), isRequired);
+ case TIMESTAMP_INSTANT:
return getIcebergType(icebergID, name, Types.TimestampType.withZone(),
isRequired);
case DECIMAL:
return getIcebergType(icebergID, name,
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
index 99e3f51..78c8acf 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
@@ -67,7 +67,7 @@ class OrcFileAppender<D> implements FileAppender<D> {
TypeDescription orcSchema = ORCSchemaUtil.convert(this.schema);
this.batch = orcSchema.createRowBatch(this.batchSize);
- OrcFile.WriterOptions options = OrcFile.writerOptions(conf);
+ OrcFile.WriterOptions options =
OrcFile.writerOptions(conf).useUTCTimestamp(true);
if (file instanceof HadoopOutputFile) {
options.fileSystem(((HadoopOutputFile) file).getFileSystem());
}
diff --git a/site/docs/spec.md b/site/docs/spec.md
index 8a7e7a0..df1c0c4 100644
--- a/site/docs/spec.md
+++ b/site/docs/spec.md
@@ -486,26 +486,29 @@ Lists must use the [3-level
representation](https://github.com/apache/parquet-fo
**Data Type Mappings**
-| Type | ORC type | Notes
|
-|--------------------|-------------|-----------------------------------------------------------------------------------------|
-| **`boolean`** | `boolean` |
|
-| **`int`** | `int` | ORC `tinyint` and `smallint` would also
map to **`int`**. |
-| **`long`** | `long` |
|
-| **`float`** | `float` |
|
-| **`double`** | `double` |
|
-| **`decimal(P,S)`** | `decimal` |
|
-| **`date`** | `date` |
|
-| **`time`** | `int` | Stores microseconds from midnight.
|
-| **`timestamp`** | `timestamp` |
|
-| **`timestamptz`** | `struct` | We should add this to ORC’s type model
(ORC-294). |
-| **`string`** | `string` | ORC `varchar` and `char` would also map
to **`string`**. |
-| **`uuid`** | `binary` |
|
-| **`fixed(L)`** | `binary` | The length would not be checked by the
ORC reader and should be checked by the adapter. |
-| **`binary`** | `binary` |
|
-| **`struct`** | `struct` | ORC `uniontype` would also map to
**`struct`**. |
-| **`list`** | `array` |
|
-| **`map`** | `map` |
|
+| Type | ORC type | ORC type attributes
| Notes
|
+|--------------------|---------------------|------------------------------------------------------|-----------------------------------------------------------------------------------------|
+| **`boolean`** | `boolean` |
|
|
+| **`int`** | `int` |
| ORC `tinyint` and `smallint` would also map to **`int`**.
|
+| **`long`** | `long` |
|
|
+| **`float`** | `float` |
|
|
+| **`double`** | `double` |
|
|
+| **`decimal(P,S)`** | `decimal` |
|
|
+| **`date`** | `date` |
|
|
+| **`time`** | `long` | `iceberg.long-type`=`TIME`
| Stores microseconds from midnight.
|
+| **`timestamp`** | `timestamp` |
| [1]
|
+| **`timestamptz`** | `timestamp_instant` |
| [1]
|
+| **`string`** | `string` |
| ORC `varchar` and `char` would also map to **`string`**.
|
+| **`uuid`** | `binary` | `iceberg.binary-type`=`UUID`
|
|
+| **`fixed(L)`** | `binary` | `iceberg.binary-type`=`FIXED` &
`iceberg.length`=`L` | The length would not be checked by the ORC reader and
should be checked by the adapter. |
+| **`binary`** | `binary` |
|
|
+| **`struct`** | `struct` |
|
|
+| **`list`** | `array` |
|
|
+| **`map`** | `map` |
|
|
+Notes:
+
+1. ORC's
[TimestampColumnVector](https://orc.apache.org/api/hive-storage-api/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.html)
comprises of a time field (milliseconds since epoch) and a nanos field
(nanoseconds within the second). Hence the milliseconds within the second are
reported twice; once in the time field and again in the nanos field. The read
adapter should only use milliseconds within the second from one of these
fields. The write adapter should also report mill [...]
One of the interesting challenges with this is how to map Iceberg’s schema
evolution (id based) on to ORC’s (name based). In theory, we could use
Iceberg’s column ids as the column and field names, but that would suck from a
user’s point of view.
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
index 47bb94b..b965c6d 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
@@ -146,8 +146,10 @@ public class SparkOrcReader implements
OrcValueReader<InternalRow> {
return row.getDecimal(ord, schema.getPrecision(),
schema.getScale()).toString();
case DATE:
return "\"" + new DateWritable(row.getInt(ord)) + "\"";
- case TIMESTAMP:
- return "\"" + new Timestamp(row.getLong(ord)) + "\"";
+ case TIMESTAMP_INSTANT:
+ Timestamp ts = new Timestamp((row.getLong(ord) / 1_000_000) * 1_000);
// initialize with seconds
+ ts.setNanos((int) (row.getLong(ord) % 1_000_000) * 1_000); // add the
rest (millis to nanos)
+ return "\"" + ts + "\"";
case STRUCT:
return rowToString(row.getStruct(ord, schema.getChildren().size()),
schema);
case LIST: {
@@ -381,7 +383,7 @@ public class SparkOrcReader implements
OrcValueReader<InternalRow> {
}
}
- private static class TimestampConverter implements Converter {
+ private static class TimestampTzConverter implements Converter {
private long convert(TimestampColumnVector vector, int row) {
// compute microseconds past 1970.
@@ -693,8 +695,8 @@ public class SparkOrcReader implements
OrcValueReader<InternalRow> {
return new FloatConverter();
case DOUBLE:
return new DoubleConverter();
- case TIMESTAMP:
- return new TimestampConverter();
+ case TIMESTAMP_INSTANT:
+ return new TimestampTzConverter();
case DECIMAL:
if (schema.getPrecision() <= Decimal.MAX_LONG_DIGITS()) {
return new Decimal18Converter(schema.getPrecision(),
schema.getScale());
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
index 21b896b..c27f958 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
@@ -204,7 +204,7 @@ public class SparkOrcWriter implements
OrcValueWriter<InternalRow> {
}
}
- static class TimestampConverter implements Converter {
+ static class TimestampTzConverter implements Converter {
@Override
public void addValue(int rowId, int column, SpecializedGetters data,
ColumnVector output) {
@@ -391,8 +391,8 @@ public class SparkOrcWriter implements
OrcValueWriter<InternalRow> {
return schema.getPrecision() <= 18 ?
new Decimal18Converter(schema) :
new Decimal38Converter(schema);
- case TIMESTAMP:
- return new TimestampConverter();
+ case TIMESTAMP_INSTANT:
+ return new TimestampTzConverter();
case STRUCT:
return new StructConverter(schema);
case LIST:
diff --git a/versions.lock b/versions.lock
index 3120ccd..93ec82f 100644
--- a/versions.lock
+++ b/versions.lock
@@ -149,7 +149,7 @@ org.apache.httpcomponents:httpclient:4.5.6 (4 constraints:
573134dd)
org.apache.httpcomponents:httpcore:4.4.10 (3 constraints: d327f763)
org.apache.ivy:ivy:2.4.0 (3 constraints: 0826dbf1)
org.apache.orc:orc-core:1.6.2 (3 constraints: ba1d17ad)
-org.apache.orc:orc-mapreduce:1.5.5 (1 constraints: c30cc227)
+org.apache.orc:orc-mapreduce:1.6.2 (1 constraints: c30cc227)
org.apache.orc:orc-shims:1.6.2 (1 constraints: 3f0aeabc)
org.apache.parquet:parquet-avro:1.11.0 (1 constraints: 35052c3b)
org.apache.parquet:parquet-column:1.11.0 (3 constraints: 9429f2ca)
diff --git a/versions.props b/versions.props
index d8b2d39..a22c05f 100644
--- a/versions.props
+++ b/versions.props
@@ -3,7 +3,7 @@ com.google.guava:guava = 28.0-jre
org.apache.avro:avro = 1.9.2
org.apache.hadoop:* = 2.7.3
org.apache.hive:hive-metastore = 2.3.6
-org.apache.orc:orc-core = 1.6.2
+org.apache.orc:* = 1.6.2
org.apache.parquet:parquet-avro = 1.11.0
org.apache.spark:spark-hive_2.11 = 2.4.4
org.apache.spark:spark-avro_2.11 = 2.4.4