openinx commented on a change in pull request #1266:
URL: https://github.com/apache/iceberg/pull/1266#discussion_r464871472
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,701 @@
package org.apache.iceberg.flink.data;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.ZoneOffset;
import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+class FlinkParquetReaders {
+ private FlinkParquetReaders() {
+ }
- private static final FlinkParquetReaders INSTANCE = new
FlinkParquetReaders();
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
MessageType fileSchema) {
+ return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+ }
- private FlinkParquetReaders() {
+ @SuppressWarnings("unchecked")
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+ MessageType fileSchema,
+ Map<Integer, ?>
idToConstant) {
+ return (ParquetValueReader<RowData>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new ReadBuilder(fileSchema, idToConstant)
+ );
+ }
+
+ private static class ReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+ private final MessageType type;
+ private final Map<Integer, ?> idToConstant;
+
+ ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+ this.type = type;
+ this.idToConstant = idToConstant;
+ }
+
+ @Override
+ public ParquetValueReader<RowData> message(Types.StructType expected,
MessageType message,
+ List<ParquetValueReader<?>>
fieldReaders) {
Review comment:
nit: could code format it, seems we missing the indent.
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,701 @@
package org.apache.iceberg.flink.data;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.ZoneOffset;
import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+class FlinkParquetReaders {
+ private FlinkParquetReaders() {
+ }
- private static final FlinkParquetReaders INSTANCE = new
FlinkParquetReaders();
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
MessageType fileSchema) {
+ return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+ }
- private FlinkParquetReaders() {
+ @SuppressWarnings("unchecked")
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+ MessageType fileSchema,
+ Map<Integer, ?>
idToConstant) {
+ return (ParquetValueReader<RowData>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new ReadBuilder(fileSchema, idToConstant)
+ );
+ }
+
+ private static class ReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+ private final MessageType type;
+ private final Map<Integer, ?> idToConstant;
+
+ ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+ this.type = type;
+ this.idToConstant = idToConstant;
+ }
+
+ @Override
+ public ParquetValueReader<RowData> message(Types.StructType expected,
MessageType message,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ return struct(expected, message.asGroupType(), fieldReaders);
+ }
+
+ @Override
+ public ParquetValueReader<RowData> struct(Types.StructType expected,
GroupType struct,
+ List<ParquetValueReader<?>>
fieldReaders) {
Review comment:
nit: indent ?
##########
File path: flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java
##########
@@ -88,20 +93,46 @@ public Row next() {
};
}
+ private static Iterable<Record> generateIcebergGenerics(Schema schema, int
numRecords,
+
Supplier<RandomGenericData.RandomDataGenerator<Record>> supplier) {
+ return () -> new Iterator<Record>() {
+ private final RandomGenericData.RandomDataGenerator<Record> generator =
supplier.get();
+ private int count = 0;
+
+ @Override
+ public boolean hasNext() {
+ return count < numRecords;
+ }
+
+ @Override
+ public Record next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ ++count;
+ return (Record) TypeUtil.visit(schema, generator);
+ }
+ };
+ }
+
+
+ public static Iterable<Record> generateRecords(Schema schema, int
numRecords, long seed) {
+ return RandomGenericData.generate(schema, numRecords, seed);
+ }
+
public static Iterable<Row> generate(Schema schema, int numRecords, long
seed) {
return generateData(schema, numRecords, () -> new
RandomRowGenerator(seed));
}
- public static Iterable<Row> generateFallbackData(Schema schema, int
numRecords, long seed, long numDictRows) {
- return generateData(schema, numRecords, () -> new FallbackGenerator(seed,
numDictRows));
+ public static Iterable<Record> generateFallbackRecords(Schema schema, int
numRecords, long seed, long numDictRows) {
+ return generateIcebergGenerics(schema, numRecords, () -> new
FallbackGenerator(seed, numDictRows));
}
- public static Iterable<Row> generateDictionaryEncodableData(Schema schema,
int numRecords, long seed) {
- return generateData(schema, numRecords, () -> new
DictionaryEncodedGenerator(seed));
+ public static Iterable<Record> generateDictionaryEncodableRecords(Schema
schema, int numRecords, long seed) {
Review comment:
ditto.
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
##########
@@ -35,51 +36,49 @@
import static org.apache.iceberg.flink.data.RandomData.COMPLEX_SCHEMA;
-public class TestFlinkParquetReaderWriter {
+public class TestFlinkParquetReader {
Review comment:
Better to extend the `org.apache.iceberg.data.DataTest` because it
provides more test cases, which `COMPLEX_SCHEMA` did not cover. ( I used
`COMPLEX_SCHEMA` before, because I don't know there's a better testing
method).
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,719 @@
package org.apache.iceberg.flink.data;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+class FlinkParquetReaders {
+ private FlinkParquetReaders() {
+ }
- private static final FlinkParquetReaders INSTANCE = new
FlinkParquetReaders();
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
MessageType fileSchema) {
+ return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+ }
- private FlinkParquetReaders() {
+ @SuppressWarnings("unchecked")
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+ MessageType fileSchema,
+ Map<Integer, ?>
idToConstant) {
+ ReadBuilder builder = new ReadBuilder(fileSchema, idToConstant);
+ if (ParquetSchemaUtil.hasIds(fileSchema)) {
+ return (ParquetValueReader<RowData>)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
builder);
+ } else {
+ return (ParquetValueReader<RowData>)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new FallbackReadBuilder(builder));
+ }
+ }
+
+ private static class FallbackReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+ private MessageType type;
+ private final TypeWithSchemaVisitor<ParquetValueReader<?>> builder;
+
+ FallbackReadBuilder(TypeWithSchemaVisitor<ParquetValueReader<?>> builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public ParquetValueReader<?> message(Types.StructType expected,
MessageType message,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // the top level matches by ID, but the remaining IDs are missing
+ this.type = message;
+ return builder.struct(expected, message, fieldReaders);
+ }
+
+ @Override
+ public ParquetValueReader<?> struct(Types.StructType ignored, GroupType
struct,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // the expected struct is ignored because nested fields are never found
when the IDs are missing
+ List<ParquetValueReader<?>> newFields =
Lists.newArrayListWithExpectedSize(
+ fieldReaders.size());
+ List<Type> types =
Lists.newArrayListWithExpectedSize(fieldReaders.size());
+ List<Type> fields = struct.getFields();
+ for (int i = 0; i < fields.size(); i += 1) {
+ Type fieldType = fields.get(i);
+ int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+ newFields.add(ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
+ types.add(fieldType);
+ }
+
+ return new RowDataReader(types, newFields);
+ }
+ }
+
+ private static class ReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+ private final MessageType type;
+ private final Map<Integer, ?> idToConstant;
+
+ ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+ this.type = type;
+ this.idToConstant = idToConstant;
+ }
+
+ @Override
+ public ParquetValueReader<?> message(Types.StructType expected,
MessageType message,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ return struct(expected, message.asGroupType(), fieldReaders);
+ }
+
+ @Override
+ public ParquetValueReader<?> struct(Types.StructType expected, GroupType
struct,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // match the expected struct's order
+ Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
+ Map<Integer, Type> typesById = Maps.newHashMap();
+ List<Type> fields = struct.getFields();
+ for (int i = 0; i < fields.size(); i += 1) {
+ Type fieldType = fields.get(i);
+ int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+ if (fieldType.getId() != null) {
+ int id = fieldType.getId().intValue();
+ readersById.put(id, ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
+ typesById.put(id, fieldType);
+ }
+ }
+
+ List<Types.NestedField> expectedFields = expected != null ?
+ expected.fields() : ImmutableList.of();
+ List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(
+ expectedFields.size());
+ List<Type> types =
Lists.newArrayListWithExpectedSize(expectedFields.size());
+ for (Types.NestedField field : expectedFields) {
+ int id = field.fieldId();
+ if (idToConstant.containsKey(id)) {
+ // containsKey is used because the constant may be null
+
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+ types.add(null);
+ } else {
+ ParquetValueReader<?> reader = readersById.get(id);
+ if (reader != null) {
+ reorderedFields.add(reader);
+ types.add(typesById.get(id));
+ } else {
+ reorderedFields.add(ParquetValueReaders.nulls());
+ types.add(null);
+ }
+ }
+ }
+
+ return new RowDataReader(types, reorderedFields);
+ }
+
+ @Override
+ public ParquetValueReader<?> list(Types.ListType expectedList, GroupType
array,
+ ParquetValueReader<?> elementReader) {
+ GroupType repeated = array.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+ Type elementType = repeated.getType(0);
+ int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) -
1;
+
+ return new ArrayReader<>(repeatedD, repeatedR,
ParquetValueReaders.option(elementType, elementD, elementReader));
+ }
+
+ @Override
+ public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
+ ParquetValueReader<?> keyReader,
+ ParquetValueReader<?> valueReader) {
+ GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+ Type keyType = repeatedKeyValue.getType(0);
+ int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
+ Type valueType = repeatedKeyValue.getType(1);
+ int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
+
+ return new MapReader<>(repeatedD, repeatedR,
+ ParquetValueReaders.option(keyType, keyD, keyReader),
+ ParquetValueReaders.option(valueType, valueD, valueReader));
+ }
+
+ @Override
+ public ParquetValueReader<?>
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+ PrimitiveType primitive) {
+ ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+ if (primitive.getOriginalType() != null) {
+ switch (primitive.getOriginalType()) {
+ case ENUM:
+ case JSON:
+ case UTF8:
+ return new StringReader(desc);
+ case INT_8:
+ case INT_16:
+ case INT_32:
+ case DATE:
+ if (expected != null && expected.typeId() ==
Types.LongType.get().typeId()) {
+ return new ParquetValueReaders.IntAsLongReader(desc);
+ } else {
+ return new ParquetValueReaders.UnboxedReader<>(desc);
+ }
+ case TIME_MICROS:
+ return new TimeMillisReader(desc);
+ case INT_64:
+ return new ParquetValueReaders.UnboxedReader<>(desc);
+ case TIMESTAMP_MICROS:
+ return new TimestampMicroReader(desc);
+ case DECIMAL:
+ DecimalLogicalTypeAnnotation decimal =
(DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation();
+ switch (primitive.getPrimitiveTypeName()) {
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return new BinaryDecimalReader(desc, decimal.getScale());
+ case INT64:
+ return new LongDecimalReader(desc, decimal.getPrecision(),
decimal.getScale());
+ case INT32:
+ return new IntegerDecimalReader(desc, decimal.getPrecision(),
decimal.getScale());
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported base type for decimal: " +
primitive.getPrimitiveTypeName());
+ }
+ case BSON:
+ return new ParquetValueReaders.ByteArrayReader(desc);
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported logical type: " + primitive.getOriginalType());
+ }
+ }
+
+ switch (primitive.getPrimitiveTypeName()) {
+ case FIXED_LEN_BYTE_ARRAY:
+ case BINARY:
+ return new ParquetValueReaders.ByteArrayReader(desc);
+ case INT32:
+ if (expected != null && expected.typeId() ==
org.apache.iceberg.types.Type.TypeID.LONG) {
+ return new ParquetValueReaders.IntAsLongReader(desc);
+ } else {
+ return new ParquetValueReaders.UnboxedReader<>(desc);
+ }
+ case FLOAT:
+ if (expected != null && expected.typeId() ==
org.apache.iceberg.types.Type.TypeID.DOUBLE) {
+ return new ParquetValueReaders.FloatAsDoubleReader(desc);
+ } else {
+ return new ParquetValueReaders.UnboxedReader<>(desc);
+ }
+ case BOOLEAN:
+ case INT64:
+ case DOUBLE:
+ return new ParquetValueReaders.UnboxedReader<>(desc);
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
primitive);
+ }
+ }
+ }
+
+ private static class BinaryDecimalReader extends
ParquetValueReaders.PrimitiveReader<DecimalData> {
+ private final int scale;
+
+ BinaryDecimalReader(ColumnDescriptor desc, int scale) {
+ super(desc);
+ this.scale = scale;
+ }
+
+ @Override
+ public DecimalData read(DecimalData ignored) {
+ Binary binary = column.nextBinary();
+ BigDecimal bigDecimal = new BigDecimal(new
BigInteger(binary.getBytes()), scale);
+ return DecimalData.fromBigDecimal(bigDecimal, bigDecimal.precision(),
scale);
+ }
+ }
+
+ private static class IntegerDecimalReader extends
ParquetValueReaders.PrimitiveReader<DecimalData> {
+ private final int precision;
+ private final int scale;
+
+ IntegerDecimalReader(ColumnDescriptor desc, int precision, int scale) {
+ super(desc);
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ @Override
+ public DecimalData read(DecimalData ignored) {
+ return DecimalData.fromUnscaledLong(column.nextInteger(), precision,
scale);
+ }
+ }
+
+ private static class LongDecimalReader extends
ParquetValueReaders.PrimitiveReader<DecimalData> {
+ private final int precision;
+ private final int scale;
+
+ LongDecimalReader(ColumnDescriptor desc, int precision, int scale) {
+ super(desc);
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ @Override
+ public DecimalData read(DecimalData ignored) {
+ return DecimalData.fromUnscaledLong(column.nextLong(), precision, scale);
+ }
}
- public static ParquetValueReader<Row> buildReader(Schema expectedSchema,
MessageType fileSchema) {
- return INSTANCE.createReader(expectedSchema, fileSchema);
+ private static class TimestampMicroReader extends
ParquetValueReaders.UnboxedReader<TimestampData> {
+ TimestampMicroReader(ColumnDescriptor desc) {
+ super(desc);
+ }
+
+ @Override
+ public TimestampData read(TimestampData ignored) {
+ long value = readLong();
+ return TimestampData.fromInstant(Instant.ofEpochSecond(value / 1000_000,
(value % 1000_000) * 1000));
+ }
+
+ @Override
+ public long readLong() {
+ return column.nextLong();
+ }
+ }
+
+ private static class StringReader extends
ParquetValueReaders.PrimitiveReader<StringData> {
+ StringReader(ColumnDescriptor desc) {
+ super(desc);
+ }
+
+ @Override
+ public StringData read(StringData ignored) {
+ Binary binary = column.nextBinary();
+ ByteBuffer buffer = binary.toByteBuffer();
+ if (buffer.hasArray()) {
+ return StringData.fromBytes(
+ buffer.array(), buffer.arrayOffset() + buffer.position(),
buffer.remaining());
+ } else {
+ return StringData.fromBytes(binary.getBytes());
+ }
+ }
}
- @Override
- protected ParquetValueReader<Row> createStructReader(List<Type> types,
-
List<ParquetValueReader<?>> fieldReaders,
- Types.StructType
structType) {
- return new RowReader(types, fieldReaders, structType);
+ private static class TimeMillisReader extends
ParquetValueReaders.PrimitiveReader<Integer> {
+ TimeMillisReader(ColumnDescriptor desc) {
+ super(desc);
+ }
+
+ @Override
+ public Integer read(Integer reuse) {
+ // Flink only supports millisecond, so we discard microseconds in
millisecond
+ return (int) column.nextLong() / 1000;
+ }
}
- private static class RowReader extends ParquetValueReaders.StructReader<Row,
Row> {
- private final Types.StructType structType;
+ private static class ArrayReader<E> extends
ParquetValueReaders.RepeatedReader<ArrayData, ReusableArrayData, E> {
+ private int readPos = 0;
+ private int writePos = 0;
+
+ ArrayReader(int definitionLevel, int repetitionLevel,
ParquetValueReader<E> reader) {
+ super(definitionLevel, repetitionLevel, reader);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected ReusableArrayData newListData(ArrayData reuse) {
+ this.readPos = 0;
+ this.writePos = 0;
+
+ if (reuse instanceof ReusableArrayData) {
+ return (ReusableArrayData) reuse;
+ } else {
+ return new ReusableArrayData();
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected E getElement(ReusableArrayData list) {
+ E value = null;
+ if (readPos < list.capacity()) {
+ value = (E) list.values[readPos];
+ }
+
+ readPos += 1;
+
+ return value;
+ }
+
+ @Override
+ protected void addElement(ReusableArrayData reused, E element) {
+ if (writePos >= reused.capacity()) {
+ reused.grow();
+ }
+
+ reused.values[writePos] = element;
+
+ writePos += 1;
+ }
+
+ @Override
+ protected ArrayData buildList(ReusableArrayData list) {
+ list.setNumElements(writePos);
+ return list;
+ }
+ }
+
+ private static class MapReader<K, V> extends
+ ParquetValueReaders.RepeatedKeyValueReader<MapData, ReusableMapData, K,
V> {
+ private int readPos = 0;
+ private int writePos = 0;
+
+ private final ParquetValueReaders.ReusableEntry<K, V> entry = new
ParquetValueReaders.ReusableEntry<>();
+ private final ParquetValueReaders.ReusableEntry<K, V> nullEntry = new
ParquetValueReaders.ReusableEntry<>();
+
+ MapReader(int definitionLevel, int repetitionLevel,
+ ParquetValueReader<K> keyReader, ParquetValueReader<V>
valueReader) {
+ super(definitionLevel, repetitionLevel, keyReader, valueReader);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected ReusableMapData newMapData(MapData reuse) {
+ this.readPos = 0;
+ this.writePos = 0;
+
+ if (reuse instanceof ReusableMapData) {
+ return (ReusableMapData) reuse;
+ } else {
+ return new ReusableMapData();
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Map.Entry<K, V> getPair(ReusableMapData map) {
+ Map.Entry<K, V> kv = nullEntry;
+ if (readPos < map.capacity()) {
+ entry.set((K) map.keys.values[readPos], (V)
map.values.values[readPos]);
+ kv = entry;
+ }
+
+ readPos += 1;
+
+ return kv;
+ }
+
+ @Override
+ protected void addPair(ReusableMapData map, K key, V value) {
+ if (writePos >= map.capacity()) {
+ map.grow();
+ }
+
+ map.keys.values[writePos] = key;
+ map.values.values[writePos] = value;
- RowReader(List<Type> types, List<ParquetValueReader<?>> readers,
Types.StructType struct) {
+ writePos += 1;
+ }
+
+ @Override
+ protected MapData buildMap(ReusableMapData map) {
+ map.setNumElements(writePos);
+ return map;
+ }
+ }
+
+ private static class RowDataReader extends
ParquetValueReaders.StructReader<RowData, GenericRowData> {
+ private final int numFields;
+
+ RowDataReader(List<Type> types, List<ParquetValueReader<?>> readers) {
super(types, readers);
- this.structType = struct;
+ this.numFields = readers.size();
}
@Override
- protected Row newStructData(Row reuse) {
- if (reuse != null) {
- return reuse;
+ protected GenericRowData newStructData(RowData reuse) {
+ if (reuse instanceof GenericRowData) {
+ return (GenericRowData) reuse;
} else {
- return new Row(structType.fields().size());
+ return new GenericRowData(numFields);
}
}
@Override
- protected Object getField(Row row, int pos) {
- return row.getField(pos);
+ protected Object getField(GenericRowData intermediate, int pos) {
+ return intermediate.getField(pos);
+ }
+
+ @Override
+ protected RowData buildStruct(GenericRowData struct) {
+ return struct;
+ }
+
+ @Override
+ protected void set(GenericRowData row, int pos, Object value) {
+ row.setField(pos, value);
+ }
+
+ @Override
+ protected void setNull(GenericRowData row, int pos) {
+ row.setField(pos, null);
}
@Override
- protected Row buildStruct(Row row) {
- return row;
+ protected void setBoolean(GenericRowData row, int pos, boolean value) {
+ row.setField(pos, value);
+ }
+
+ @Override
+ protected void setInteger(GenericRowData row, int pos, int value) {
+ row.setField(pos, value);
+ }
+
+ @Override
+ protected void setLong(GenericRowData row, int pos, long value) {
+ row.setField(pos, value);
}
@Override
- protected void set(Row row, int pos, Object value) {
+ protected void setFloat(GenericRowData row, int pos, float value) {
row.setField(pos, value);
}
+
+ @Override
+ protected void setDouble(GenericRowData row, int pos, double value) {
+ row.setField(pos, value);
+ }
+ }
+
+ private static class ReusableMapData implements MapData {
+ private final ReusableArrayData keys;
+ private final ReusableArrayData values;
+
+ private int numElements;
+
+ private ReusableMapData() {
+ this.keys = new ReusableArrayData();
+ this.values = new ReusableArrayData();
+ }
+
+ private void grow() {
+ keys.grow();
+ values.grow();
+ }
+
+ private int capacity() {
+ return keys.capacity();
+ }
+
+ public void setNumElements(int numElements) {
+ this.numElements = numElements;
+ keys.setNumElements(numElements);
+ values.setNumElements(numElements);
+ }
+
+ @Override
+ public int size() {
+ return numElements;
+ }
+
+ @Override
+ public ReusableArrayData keyArray() {
+ return keys;
+ }
+
+ @Override
+ public ReusableArrayData valueArray() {
+ return values;
+ }
+ }
+
+ private static class ReusableArrayData implements ArrayData {
+ private static final Object[] EMPTY = new Object[0];
+
+ private Object[] values = EMPTY;
+ private int numElements = 0;
+
+ private void grow() {
+ if (values.length == 0) {
+ this.values = new Object[20];
+ } else {
+ Object[] old = values;
+ this.values = new Object[old.length << 2];
Review comment:
I still think we don't need to grow four times space, actually double
array space is enough.
##########
File path: flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java
##########
@@ -88,20 +93,46 @@ public Row next() {
};
}
+ private static Iterable<Record> generateIcebergGenerics(Schema schema, int
numRecords,
+
Supplier<RandomGenericData.RandomDataGenerator<Record>> supplier) {
+ return () -> new Iterator<Record>() {
+ private final RandomGenericData.RandomDataGenerator<Record> generator =
supplier.get();
+ private int count = 0;
+
+ @Override
+ public boolean hasNext() {
+ return count < numRecords;
+ }
+
+ @Override
+ public Record next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ ++count;
+ return (Record) TypeUtil.visit(schema, generator);
+ }
+ };
+ }
+
+
+ public static Iterable<Record> generateRecords(Schema schema, int
numRecords, long seed) {
Review comment:
ditto
##########
File path: flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java
##########
@@ -88,20 +93,46 @@ public Row next() {
};
}
+ private static Iterable<Record> generateIcebergGenerics(Schema schema, int
numRecords,
+
Supplier<RandomGenericData.RandomDataGenerator<Record>> supplier) {
+ return () -> new Iterator<Record>() {
+ private final RandomGenericData.RandomDataGenerator<Record> generator =
supplier.get();
+ private int count = 0;
+
+ @Override
+ public boolean hasNext() {
+ return count < numRecords;
+ }
+
+ @Override
+ public Record next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ ++count;
+ return (Record) TypeUtil.visit(schema, generator);
+ }
+ };
+ }
+
+
+ public static Iterable<Record> generateRecords(Schema schema, int
numRecords, long seed) {
+ return RandomGenericData.generate(schema, numRecords, seed);
+ }
+
public static Iterable<Row> generate(Schema schema, int numRecords, long
seed) {
return generateData(schema, numRecords, () -> new
RandomRowGenerator(seed));
}
- public static Iterable<Row> generateFallbackData(Schema schema, int
numRecords, long seed, long numDictRows) {
- return generateData(schema, numRecords, () -> new FallbackGenerator(seed,
numDictRows));
+ public static Iterable<Record> generateFallbackRecords(Schema schema, int
numRecords, long seed, long numDictRows) {
Review comment:
ditto.
##########
File path: flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java
##########
@@ -88,20 +93,46 @@ public Row next() {
};
}
+ private static Iterable<Record> generateIcebergGenerics(Schema schema, int
numRecords,
Review comment:
How about moving this method into
`org.apache.iceberg.data.RandomGenericData`, then we don't need to expose the
RandomRecordGenerator to public ?
btw, we could integrate the `genericIcebergGenerics` method with
`RandomGenericData#generate` .
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/data/ParquetWithFlinkSchemaVisitor.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import java.util.Deque;
+import java.util.List;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+public class ParquetWithFlinkSchemaVisitor<T> {
Review comment:
For this PR ( flink parquet reader) , seems we don't need this writer
visitor .
##########
File path: flink/src/test/java/org/apache/iceberg/flink/data/TestHelpers.java
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+
+public class TestHelpers {
+ private TestHelpers() {}
+
+ private static final OffsetDateTime EPOCH =
Instant.ofEpochMilli(0L).atOffset(ZoneOffset.UTC);
+ private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+ public static void assertRowData(Type type, Record expected, RowData actual)
{
+ if (expected == null && actual == null) {
+ return;
+ }
+
+ List<Type> types = new ArrayList<>();
+ for (Types.NestedField field : type.asStructType().fields()) {
+ types.add(field.type());
+ }
+
+ for (int i = 0; i < types.size(); i += 1) {
+ if (expected.get(i) == null) {
+ Assert.assertTrue(actual.isNullAt(i));
+ continue;
+ }
+ Type.TypeID typeId = types.get(i).typeId();
+ Object value = expected.get(i);
+ switch (typeId) {
+ case BOOLEAN:
+ Assert.assertEquals("boolean value should be equal", value,
actual.getBoolean(i));
+ break;
+ case INTEGER:
+ Assert.assertEquals("int value should be equal", value,
actual.getInt(i));
+ break;
+ case LONG:
+ Assert.assertEquals("long value should be equal", value,
actual.getLong(i));
+ break;
+ case FLOAT:
+ Assert.assertEquals("float value should be equal", value,
actual.getFloat(i));
+ break;
+ case DOUBLE:
+ Assert.assertEquals("double should be equal", value,
actual.getDouble(i));
+ break;
+ case STRING:
+ Assert.assertTrue("Should expect a CharSequence", value instanceof
CharSequence);
+ Assert.assertEquals("string should be equal", String.valueOf(value),
actual.getString(i).toString());
+ break;
+ case DATE:
+ Assert.assertTrue("Should expect a Date", value instanceof
LocalDate);
+ LocalDate date = ChronoUnit.DAYS.addTo(EPOCH_DAY, actual.getInt(i));
+ Assert.assertEquals("date should be equal", value, date);
+ break;
+ case TIME:
+ Assert.assertTrue("Should expect a LocalTime", value instanceof
LocalTime);
+ int milliseconds = (int) (((LocalTime) value).toNanoOfDay() /
1000_000);
+ Assert.assertEquals("time millis should be equal", milliseconds,
actual.getInt(i));
+ break;
+ case TIMESTAMP:
+ if (((Types.TimestampType)
type.asPrimitiveType()).shouldAdjustToUTC()) {
+ Assert.assertTrue("Should expect a OffsetDataTime", value
instanceof OffsetDateTime);
+ OffsetDateTime ts = (OffsetDateTime) value;
+ Assert.assertEquals("OffsetDataTime should be equal",
ts.toLocalDateTime(),
+ actual.getTimestamp(i, 6).toLocalDateTime());
+ } else {
+ Assert.assertTrue("Should expect a LocalDataTime", value
instanceof LocalDateTime);
+ LocalDateTime ts = (LocalDateTime) value;
+ Assert.assertEquals("LocalDataTime should be equal", ts,
+ actual.getTimestamp(i, 6).toLocalDateTime());
+ }
+ break;
+ case FIXED:
+ Assert.assertTrue("Should expect byte[]", value instanceof byte[]);
+ Assert.assertArrayEquals("binary should be equal", (byte[]) value,
actual.getBinary(i));
+ break;
+ case BINARY:
+ Assert.assertTrue("Should expect a ByteBuffer", value instanceof
ByteBuffer);
+ Assert.assertArrayEquals("binary should be equal", ((ByteBuffer)
value).array(), actual.getBinary(i));
+ break;
+ case DECIMAL:
Review comment:
Seems we share the same primitive value assertion between
`assertRowData` and `assertArrayValues`, could it be a common method ?
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,719 @@
package org.apache.iceberg.flink.data;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+class FlinkParquetReaders {
+ private FlinkParquetReaders() {
+ }
- private static final FlinkParquetReaders INSTANCE = new
FlinkParquetReaders();
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
MessageType fileSchema) {
+ return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+ }
- private FlinkParquetReaders() {
+ @SuppressWarnings("unchecked")
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+ MessageType fileSchema,
+ Map<Integer, ?>
idToConstant) {
+ ReadBuilder builder = new ReadBuilder(fileSchema, idToConstant);
+ if (ParquetSchemaUtil.hasIds(fileSchema)) {
+ return (ParquetValueReader<RowData>)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
builder);
+ } else {
+ return (ParquetValueReader<RowData>)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new FallbackReadBuilder(builder));
+ }
+ }
+
+ private static class FallbackReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+ private MessageType type;
+ private final TypeWithSchemaVisitor<ParquetValueReader<?>> builder;
+
+ FallbackReadBuilder(TypeWithSchemaVisitor<ParquetValueReader<?>> builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public ParquetValueReader<?> message(Types.StructType expected,
MessageType message,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // the top level matches by ID, but the remaining IDs are missing
+ this.type = message;
+ return builder.struct(expected, message, fieldReaders);
+ }
+
+ @Override
+ public ParquetValueReader<?> struct(Types.StructType ignored, GroupType
struct,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // the expected struct is ignored because nested fields are never found
when the IDs are missing
+ List<ParquetValueReader<?>> newFields =
Lists.newArrayListWithExpectedSize(
+ fieldReaders.size());
+ List<Type> types =
Lists.newArrayListWithExpectedSize(fieldReaders.size());
+ List<Type> fields = struct.getFields();
+ for (int i = 0; i < fields.size(); i += 1) {
+ Type fieldType = fields.get(i);
+ int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+ newFields.add(ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
+ types.add(fieldType);
+ }
+
+ return new RowDataReader(types, newFields);
+ }
+ }
+
+ private static class ReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
Review comment:
Better to create an issue to address this thing, once the readers and
writers get in, we could do the refactor in that issue .
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]