rdblue commented on a change in pull request #1237:
URL: https://github.com/apache/iceberg/pull/1237#discussion_r460561369
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,714 @@
package org.apache.iceberg.flink.data;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+public class FlinkParquetReaders {
+ private FlinkParquetReaders() {
+ }
- private static final FlinkParquetReaders INSTANCE = new
FlinkParquetReaders();
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
MessageType fileSchema) {
+ return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+ }
- private FlinkParquetReaders() {
+ @SuppressWarnings("unchecked")
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+ MessageType fileSchema,
+ Map<Integer, ?>
idToConstant) {
+ if (ParquetSchemaUtil.hasIds(fileSchema)) {
+ return (ParquetValueReader<RowData>)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new ReadBuilder(fileSchema, idToConstant));
+ } else {
+ return (ParquetValueReader<RowData>)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new FallbackReadBuilder(fileSchema, idToConstant));
+ }
+ }
+
+ private static class FallbackReadBuilder extends ReadBuilder {
+ FallbackReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+ super(type, idToConstant);
+ }
+
+ @Override
+ public ParquetValueReader<?> message(Types.StructType expected,
MessageType message,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // the top level matches by ID, but the remaining IDs are missing
+ return super.struct(expected, message, fieldReaders);
+ }
+
+ @Override
+ public ParquetValueReader<?> struct(Types.StructType ignored, GroupType
struct,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // the expected struct is ignored because nested fields are never found
when the
+ List<ParquetValueReader<?>> newFields =
Lists.newArrayListWithExpectedSize(
+ fieldReaders.size());
+ List<Type> types =
Lists.newArrayListWithExpectedSize(fieldReaders.size());
+ List<Type> fields = struct.getFields();
+ for (int i = 0; i < fields.size(); i += 1) {
+ Type fieldType = fields.get(i);
+ int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) -
1;
+ newFields.add(ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
+ types.add(fieldType);
+ }
+
+ return new RowDataReader(types, newFields);
+ }
+ }
+
+ private static class ReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+ private final MessageType type;
+ private final Map<Integer, ?> idToConstant;
+
+ ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+ this.type = type;
+ this.idToConstant = idToConstant;
+ }
+
+ @Override
+ public ParquetValueReader<?> message(Types.StructType expected,
MessageType message,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ return struct(expected, message.asGroupType(), fieldReaders);
+ }
+
+ @Override
+ public ParquetValueReader<?> struct(Types.StructType expected, GroupType
struct,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // match the expected struct's order
+ Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
+ Map<Integer, Type> typesById = Maps.newHashMap();
+ List<Type> fields = struct.getFields();
+ for (int i = 0; i < fields.size(); i += 1) {
+ Type fieldType = fields.get(i);
+ int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+ if (fieldType.getId() != null) {
+ int id = fieldType.getId().intValue();
+ readersById.put(id, ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
+ typesById.put(id, fieldType);
+ }
+ }
+
+ List<Types.NestedField> expectedFields = expected != null ?
+ expected.fields() : ImmutableList.of();
+ List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(
+ expectedFields.size());
+ List<Type> types =
Lists.newArrayListWithExpectedSize(expectedFields.size());
+ for (Types.NestedField field : expectedFields) {
+ int id = field.fieldId();
+ if (idToConstant.containsKey(id)) {
+ // containsKey is used because the constant may be null
+
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+ types.add(null);
+ } else {
+ ParquetValueReader<?> reader = readersById.get(id);
+ if (reader != null) {
+ reorderedFields.add(reader);
+ types.add(typesById.get(id));
+ } else {
+ reorderedFields.add(ParquetValueReaders.nulls());
+ types.add(null);
+ }
+ }
+ }
+
+ return new RowDataReader(types, reorderedFields);
+ }
+
+ @Override
+ public ParquetValueReader<?> list(Types.ListType expectedList, GroupType
array,
+ ParquetValueReader<?> elementReader) {
+ GroupType repeated = array.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+ Type elementType = repeated.getType(0);
+ int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) -
1;
+
+ return new ArrayReader<>(repeatedD, repeatedR,
ParquetValueReaders.option(elementType, elementD, elementReader));
+ }
+
+ @Override
+ public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
+ ParquetValueReader<?> keyReader,
+ ParquetValueReader<?> valueReader) {
+ GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+ String[] repeatedPath = currentPath();
+
+ int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+ int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+ Type keyType = repeatedKeyValue.getType(0);
+ int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
+ Type valueType = repeatedKeyValue.getType(1);
+ int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
+
+ return new MapReader<>(repeatedD, repeatedR,
+ ParquetValueReaders.option(keyType, keyD, keyReader),
+ ParquetValueReaders.option(valueType, valueD, valueReader));
+ }
+
+ @Override
+ public ParquetValueReader<?>
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+ PrimitiveType primitive) {
+ ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+ if (primitive.getOriginalType() != null) {
+ switch (primitive.getOriginalType()) {
+ case ENUM:
+ case JSON:
+ case UTF8:
+ return new StringReader(desc);
+ case INT_8:
+ case INT_16:
+ case INT_32:
+ case TIME_MICROS:
Review comment:
According to the table on #1215, a time should be in milliseconds.
Because Parquet stores the time in micros for TIME_MICROS, this will need a
converter.
@JingsongLi, how should we handle lossy conversions to Flink types?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]