chenjunjiedada commented on a change in pull request #1266:
URL: https://github.com/apache/iceberg/pull/1266#discussion_r465498843
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,719 @@
package org.apache.iceberg.flink.data;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+class FlinkParquetReaders {
+ private FlinkParquetReaders() {
+ }
- private static final FlinkParquetReaders INSTANCE = new
FlinkParquetReaders();
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
MessageType fileSchema) {
+ return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+ }
- private FlinkParquetReaders() {
+ @SuppressWarnings("unchecked")
+ public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+ MessageType fileSchema,
+ Map<Integer, ?>
idToConstant) {
+ ReadBuilder builder = new ReadBuilder(fileSchema, idToConstant);
+ if (ParquetSchemaUtil.hasIds(fileSchema)) {
+ return (ParquetValueReader<RowData>)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
builder);
+ } else {
+ return (ParquetValueReader<RowData>)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new FallbackReadBuilder(builder));
+ }
+ }
+
+ private static class FallbackReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+ private MessageType type;
+ private final TypeWithSchemaVisitor<ParquetValueReader<?>> builder;
+
+ FallbackReadBuilder(TypeWithSchemaVisitor<ParquetValueReader<?>> builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public ParquetValueReader<?> message(Types.StructType expected,
MessageType message,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // the top level matches by ID, but the remaining IDs are missing
+ this.type = message;
+ return builder.struct(expected, message, fieldReaders);
+ }
+
+ @Override
+ public ParquetValueReader<?> struct(Types.StructType ignored, GroupType
struct,
+ List<ParquetValueReader<?>>
fieldReaders) {
+ // the expected struct is ignored because nested fields are never found
when the IDs are missing
+ List<ParquetValueReader<?>> newFields =
Lists.newArrayListWithExpectedSize(
+ fieldReaders.size());
+ List<Type> types =
Lists.newArrayListWithExpectedSize(fieldReaders.size());
+ List<Type> fields = struct.getFields();
+ for (int i = 0; i < fields.size(); i += 1) {
+ Type fieldType = fields.get(i);
+ int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+ newFields.add(ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
+ types.add(fieldType);
+ }
+
+ return new RowDataReader(types, newFields);
+ }
+ }
+
+ private static class ReadBuilder extends
TypeWithSchemaVisitor<ParquetValueReader<?>> {
Review comment:
Thanks @JingsongLi for your input, will take a look again when we start
that task.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]