chenjunjiedada commented on a change in pull request #1237:
URL: https://github.com/apache/iceberg/pull/1237#discussion_r462275653



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,714 @@
 
 package org.apache.iceberg.flink.data;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
 import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+public class FlinkParquetReaders {
+  private FlinkParquetReaders() {
+  }
 
-  private static final FlinkParquetReaders INSTANCE = new 
FlinkParquetReaders();
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema, 
MessageType fileSchema) {
+    return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+  }
 
-  private FlinkParquetReaders() {
+  @SuppressWarnings("unchecked")
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+                                                        MessageType fileSchema,
+                                                        Map<Integer, ?> 
idToConstant) {
+    if (ParquetSchemaUtil.hasIds(fileSchema)) {
+      return (ParquetValueReader<RowData>)
+          TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+              new ReadBuilder(fileSchema, idToConstant));
+    } else {
+      return (ParquetValueReader<RowData>)
+          TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+              new FallbackReadBuilder(fileSchema, idToConstant));
+    }
+  }
+
+  private static class FallbackReadBuilder extends ReadBuilder {
+    FallbackReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+      super(type, idToConstant);
+    }
+
+    @Override
+    public ParquetValueReader<?> message(Types.StructType expected, 
MessageType message,
+                                         List<ParquetValueReader<?>> 
fieldReaders) {
+      // the top level matches by ID, but the remaining IDs are missing
+      return super.struct(expected, message, fieldReaders);
+    }
+
+    @Override
+    public ParquetValueReader<?> struct(Types.StructType ignored, GroupType 
struct,
+                                        List<ParquetValueReader<?>> 
fieldReaders) {
+      // the expected struct is ignored because nested fields are never found 
when the
+      List<ParquetValueReader<?>> newFields = 
Lists.newArrayListWithExpectedSize(
+          fieldReaders.size());
+      List<Type> types = 
Lists.newArrayListWithExpectedSize(fieldReaders.size());
+      List<Type> fields = struct.getFields();
+      for (int i = 0; i < fields.size(); i += 1) {
+        Type fieldType = fields.get(i);
+        int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 
1;
+        newFields.add(ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
+        types.add(fieldType);
+      }
+
+      return new RowDataReader(types, newFields);
+    }
+  }
+
+  private static class ReadBuilder extends 
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+    private final MessageType type;
+    private final Map<Integer, ?> idToConstant;
+
+    ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+      this.type = type;
+      this.idToConstant = idToConstant;
+    }
+
+    @Override
+    public ParquetValueReader<?> message(Types.StructType expected, 
MessageType message,
+                                         List<ParquetValueReader<?>> 
fieldReaders) {
+      return struct(expected, message.asGroupType(), fieldReaders);
+    }
+
+    @Override
+    public ParquetValueReader<?> struct(Types.StructType expected, GroupType 
struct,
+                                        List<ParquetValueReader<?>> 
fieldReaders) {
+      // match the expected struct's order
+      Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
+      Map<Integer, Type> typesById = Maps.newHashMap();
+      List<Type> fields = struct.getFields();
+      for (int i = 0; i < fields.size(); i += 1) {
+        Type fieldType = fields.get(i);
+        int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+        if (fieldType.getId() != null) {
+          int id = fieldType.getId().intValue();
+          readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
+          typesById.put(id, fieldType);
+        }
+      }
+
+      List<Types.NestedField> expectedFields = expected != null ?
+          expected.fields() : ImmutableList.of();
+      List<ParquetValueReader<?>> reorderedFields = 
Lists.newArrayListWithExpectedSize(
+          expectedFields.size());
+      List<Type> types = 
Lists.newArrayListWithExpectedSize(expectedFields.size());
+      for (Types.NestedField field : expectedFields) {
+        int id = field.fieldId();
+        if (idToConstant.containsKey(id)) {
+          // containsKey is used because the constant may be null
+          
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+          types.add(null);
+        } else {
+          ParquetValueReader<?> reader = readersById.get(id);
+          if (reader != null) {
+            reorderedFields.add(reader);
+            types.add(typesById.get(id));
+          } else {
+            reorderedFields.add(ParquetValueReaders.nulls());
+            types.add(null);
+          }
+        }
+      }
+
+      return new RowDataReader(types, reorderedFields);
+    }
+
+    @Override
+    public ParquetValueReader<?> list(Types.ListType expectedList, GroupType 
array,
+                                      ParquetValueReader<?> elementReader) {
+      GroupType repeated = array.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type elementType = repeated.getType(0);
+      int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 
1;
+
+      return new ArrayReader<>(repeatedD, repeatedR, 
ParquetValueReaders.option(elementType, elementD, elementReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
+                                     ParquetValueReader<?> keyReader,
+                                     ParquetValueReader<?> valueReader) {
+      GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type keyType = repeatedKeyValue.getType(0);
+      int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
+      Type valueType = repeatedKeyValue.getType(1);
+      int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
+
+      return new MapReader<>(repeatedD, repeatedR,
+          ParquetValueReaders.option(keyType, keyD, keyReader),
+          ParquetValueReaders.option(valueType, valueD, valueReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> 
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+                                           PrimitiveType primitive) {
+      ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+      if (primitive.getOriginalType() != null) {
+        switch (primitive.getOriginalType()) {
+          case ENUM:
+          case JSON:
+          case UTF8:
+            return new StringReader(desc);
+          case INT_8:
+          case INT_16:
+          case INT_32:
+          case TIME_MICROS:
+          case DATE:
+            if (expected != null && expected.typeId() == 
Types.LongType.get().typeId()) {
+              return new ParquetValueReaders.IntAsLongReader(desc);
+            } else {
+              return new ParquetValueReaders.UnboxedReader<>(desc);
+            }
+          case INT_64:
+          case TIMESTAMP_MICROS:
+            return new TimestampMicroReader(desc);

Review comment:
       Done.

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,714 @@
 
 package org.apache.iceberg.flink.data;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
 import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+public class FlinkParquetReaders {
+  private FlinkParquetReaders() {
+  }
 
-  private static final FlinkParquetReaders INSTANCE = new 
FlinkParquetReaders();
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema, 
MessageType fileSchema) {
+    return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+  }
 
-  private FlinkParquetReaders() {
+  @SuppressWarnings("unchecked")
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+                                                        MessageType fileSchema,
+                                                        Map<Integer, ?> 
idToConstant) {
+    if (ParquetSchemaUtil.hasIds(fileSchema)) {
+      return (ParquetValueReader<RowData>)
+          TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+              new ReadBuilder(fileSchema, idToConstant));
+    } else {
+      return (ParquetValueReader<RowData>)
+          TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+              new FallbackReadBuilder(fileSchema, idToConstant));
+    }
+  }
+
+  private static class FallbackReadBuilder extends ReadBuilder {
+    FallbackReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+      super(type, idToConstant);
+    }
+
+    @Override
+    public ParquetValueReader<?> message(Types.StructType expected, 
MessageType message,
+                                         List<ParquetValueReader<?>> 
fieldReaders) {
+      // the top level matches by ID, but the remaining IDs are missing
+      return super.struct(expected, message, fieldReaders);
+    }
+
+    @Override
+    public ParquetValueReader<?> struct(Types.StructType ignored, GroupType 
struct,
+                                        List<ParquetValueReader<?>> 
fieldReaders) {
+      // the expected struct is ignored because nested fields are never found 
when the
+      List<ParquetValueReader<?>> newFields = 
Lists.newArrayListWithExpectedSize(
+          fieldReaders.size());
+      List<Type> types = 
Lists.newArrayListWithExpectedSize(fieldReaders.size());
+      List<Type> fields = struct.getFields();
+      for (int i = 0; i < fields.size(); i += 1) {
+        Type fieldType = fields.get(i);
+        int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 
1;
+        newFields.add(ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
+        types.add(fieldType);
+      }
+
+      return new RowDataReader(types, newFields);
+    }
+  }
+
+  private static class ReadBuilder extends 
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+    private final MessageType type;
+    private final Map<Integer, ?> idToConstant;
+
+    ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+      this.type = type;
+      this.idToConstant = idToConstant;
+    }
+
+    @Override
+    public ParquetValueReader<?> message(Types.StructType expected, 
MessageType message,
+                                         List<ParquetValueReader<?>> 
fieldReaders) {
+      return struct(expected, message.asGroupType(), fieldReaders);
+    }
+
+    @Override
+    public ParquetValueReader<?> struct(Types.StructType expected, GroupType 
struct,
+                                        List<ParquetValueReader<?>> 
fieldReaders) {
+      // match the expected struct's order
+      Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
+      Map<Integer, Type> typesById = Maps.newHashMap();
+      List<Type> fields = struct.getFields();
+      for (int i = 0; i < fields.size(); i += 1) {
+        Type fieldType = fields.get(i);
+        int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+        if (fieldType.getId() != null) {
+          int id = fieldType.getId().intValue();
+          readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
+          typesById.put(id, fieldType);
+        }
+      }
+
+      List<Types.NestedField> expectedFields = expected != null ?
+          expected.fields() : ImmutableList.of();
+      List<ParquetValueReader<?>> reorderedFields = 
Lists.newArrayListWithExpectedSize(
+          expectedFields.size());
+      List<Type> types = 
Lists.newArrayListWithExpectedSize(expectedFields.size());
+      for (Types.NestedField field : expectedFields) {
+        int id = field.fieldId();
+        if (idToConstant.containsKey(id)) {
+          // containsKey is used because the constant may be null
+          
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+          types.add(null);
+        } else {
+          ParquetValueReader<?> reader = readersById.get(id);
+          if (reader != null) {
+            reorderedFields.add(reader);
+            types.add(typesById.get(id));
+          } else {
+            reorderedFields.add(ParquetValueReaders.nulls());
+            types.add(null);
+          }
+        }
+      }
+
+      return new RowDataReader(types, reorderedFields);
+    }
+
+    @Override
+    public ParquetValueReader<?> list(Types.ListType expectedList, GroupType 
array,
+                                      ParquetValueReader<?> elementReader) {
+      GroupType repeated = array.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type elementType = repeated.getType(0);
+      int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 
1;
+
+      return new ArrayReader<>(repeatedD, repeatedR, 
ParquetValueReaders.option(elementType, elementD, elementReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
+                                     ParquetValueReader<?> keyReader,
+                                     ParquetValueReader<?> valueReader) {
+      GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type keyType = repeatedKeyValue.getType(0);
+      int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
+      Type valueType = repeatedKeyValue.getType(1);
+      int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
+
+      return new MapReader<>(repeatedD, repeatedR,
+          ParquetValueReaders.option(keyType, keyD, keyReader),
+          ParquetValueReaders.option(valueType, valueD, valueReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> 
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+                                           PrimitiveType primitive) {
+      ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+      if (primitive.getOriginalType() != null) {
+        switch (primitive.getOriginalType()) {
+          case ENUM:
+          case JSON:
+          case UTF8:
+            return new StringReader(desc);
+          case INT_8:
+          case INT_16:
+          case INT_32:
+          case TIME_MICROS:

Review comment:
       Done. FYI, I discard the microseconds in milliseconds in #1266.

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,714 @@
 
 package org.apache.iceberg.flink.data;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
 import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+public class FlinkParquetReaders {
+  private FlinkParquetReaders() {
+  }
 
-  private static final FlinkParquetReaders INSTANCE = new 
FlinkParquetReaders();
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema, 
MessageType fileSchema) {
+    return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+  }
 
-  private FlinkParquetReaders() {
+  @SuppressWarnings("unchecked")
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+                                                        MessageType fileSchema,
+                                                        Map<Integer, ?> 
idToConstant) {
+    if (ParquetSchemaUtil.hasIds(fileSchema)) {
+      return (ParquetValueReader<RowData>)
+          TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+              new ReadBuilder(fileSchema, idToConstant));
+    } else {
+      return (ParquetValueReader<RowData>)
+          TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+              new FallbackReadBuilder(fileSchema, idToConstant));
+    }
+  }
+
+  private static class FallbackReadBuilder extends ReadBuilder {
+    FallbackReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+      super(type, idToConstant);
+    }
+
+    @Override
+    public ParquetValueReader<?> message(Types.StructType expected, 
MessageType message,
+                                         List<ParquetValueReader<?>> 
fieldReaders) {
+      // the top level matches by ID, but the remaining IDs are missing
+      return super.struct(expected, message, fieldReaders);
+    }
+
+    @Override
+    public ParquetValueReader<?> struct(Types.StructType ignored, GroupType 
struct,
+                                        List<ParquetValueReader<?>> 
fieldReaders) {
+      // the expected struct is ignored because nested fields are never found 
when the
+      List<ParquetValueReader<?>> newFields = 
Lists.newArrayListWithExpectedSize(
+          fieldReaders.size());
+      List<Type> types = 
Lists.newArrayListWithExpectedSize(fieldReaders.size());
+      List<Type> fields = struct.getFields();
+      for (int i = 0; i < fields.size(); i += 1) {
+        Type fieldType = fields.get(i);
+        int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 
1;
+        newFields.add(ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
+        types.add(fieldType);
+      }
+
+      return new RowDataReader(types, newFields);
+    }
+  }
+
+  private static class ReadBuilder extends 
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+    private final MessageType type;
+    private final Map<Integer, ?> idToConstant;
+
+    ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+      this.type = type;
+      this.idToConstant = idToConstant;
+    }
+
+    @Override
+    public ParquetValueReader<?> message(Types.StructType expected, 
MessageType message,
+                                         List<ParquetValueReader<?>> 
fieldReaders) {
+      return struct(expected, message.asGroupType(), fieldReaders);
+    }
+
+    @Override
+    public ParquetValueReader<?> struct(Types.StructType expected, GroupType 
struct,
+                                        List<ParquetValueReader<?>> 
fieldReaders) {
+      // match the expected struct's order
+      Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
+      Map<Integer, Type> typesById = Maps.newHashMap();
+      List<Type> fields = struct.getFields();
+      for (int i = 0; i < fields.size(); i += 1) {
+        Type fieldType = fields.get(i);
+        int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+        if (fieldType.getId() != null) {
+          int id = fieldType.getId().intValue();
+          readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
+          typesById.put(id, fieldType);
+        }
+      }
+
+      List<Types.NestedField> expectedFields = expected != null ?
+          expected.fields() : ImmutableList.of();
+      List<ParquetValueReader<?>> reorderedFields = 
Lists.newArrayListWithExpectedSize(
+          expectedFields.size());
+      List<Type> types = 
Lists.newArrayListWithExpectedSize(expectedFields.size());
+      for (Types.NestedField field : expectedFields) {
+        int id = field.fieldId();
+        if (idToConstant.containsKey(id)) {
+          // containsKey is used because the constant may be null
+          
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+          types.add(null);
+        } else {
+          ParquetValueReader<?> reader = readersById.get(id);
+          if (reader != null) {
+            reorderedFields.add(reader);
+            types.add(typesById.get(id));
+          } else {
+            reorderedFields.add(ParquetValueReaders.nulls());
+            types.add(null);
+          }
+        }
+      }
+
+      return new RowDataReader(types, reorderedFields);
+    }
+
+    @Override
+    public ParquetValueReader<?> list(Types.ListType expectedList, GroupType 
array,
+                                      ParquetValueReader<?> elementReader) {
+      GroupType repeated = array.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type elementType = repeated.getType(0);
+      int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 
1;
+
+      return new ArrayReader<>(repeatedD, repeatedR, 
ParquetValueReaders.option(elementType, elementD, elementReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
+                                     ParquetValueReader<?> keyReader,
+                                     ParquetValueReader<?> valueReader) {
+      GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type keyType = repeatedKeyValue.getType(0);
+      int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
+      Type valueType = repeatedKeyValue.getType(1);
+      int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
+
+      return new MapReader<>(repeatedD, repeatedR,
+          ParquetValueReaders.option(keyType, keyD, keyReader),
+          ParquetValueReaders.option(valueType, valueD, valueReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> 
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+                                           PrimitiveType primitive) {
+      ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+      if (primitive.getOriginalType() != null) {
+        switch (primitive.getOriginalType()) {
+          case ENUM:
+          case JSON:
+          case UTF8:
+            return new StringReader(desc);
+          case INT_8:
+          case INT_16:
+          case INT_32:
+          case TIME_MICROS:
+          case DATE:
+            if (expected != null && expected.typeId() == 
Types.LongType.get().typeId()) {
+              return new ParquetValueReaders.IntAsLongReader(desc);
+            } else {
+              return new ParquetValueReaders.UnboxedReader<>(desc);
+            }
+          case INT_64:
+          case TIMESTAMP_MICROS:
+            return new TimestampMicroReader(desc);
+          case DECIMAL:
+            DecimalLogicalTypeAnnotation decimal = 
(DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation();
+            switch (primitive.getPrimitiveTypeName()) {
+              case BINARY:
+              case FIXED_LEN_BYTE_ARRAY:
+                return new BinaryDecimalReader(desc, decimal.getScale());
+              case INT64:
+                return new LongDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
+              case INT32:
+                return new IntegerDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
+              default:
+                throw new UnsupportedOperationException(
+                    "Unsupported base type for decimal: " + 
primitive.getPrimitiveTypeName());
+            }
+          case BSON:
+            return new BytesReader(desc);
+          default:
+            throw new UnsupportedOperationException(
+                "Unsupported logical type: " + primitive.getOriginalType());
+        }
+      }
+
+      switch (primitive.getPrimitiveTypeName()) {
+        case FIXED_LEN_BYTE_ARRAY:
+        case BINARY:
+          return new BytesReader(desc);
+        case INT32:
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.LONG) {
+            return new ParquetValueReaders.IntAsLongReader(desc);
+          } else {
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          }
+        case FLOAT:
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.DOUBLE) {
+            return new ParquetValueReaders.FloatAsDoubleReader(desc);
+          } else {
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          }
+        case BOOLEAN:
+        case INT64:
+        case DOUBLE:
+          return new ParquetValueReaders.UnboxedReader<>(desc);
+        default:
+          throw new UnsupportedOperationException("Unsupported type: " + 
primitive);
+      }
+    }
+
+    protected MessageType type() {
+      return type;
+    }
+  }
+
+  private static class BinaryDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int scale;
+
+    BinaryDecimalReader(ColumnDescriptor desc, int scale) {
+      super(desc);
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      Binary binary = column.nextBinary();
+      BigDecimal bigDecimal = new BigDecimal(new 
BigInteger(binary.getBytes()), scale);
+      return DecimalData.fromBigDecimal(bigDecimal, bigDecimal.precision(), 
scale);
+    }
+  }
+
+  private static class IntegerDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int precision;
+    private final int scale;
+
+    IntegerDecimalReader(ColumnDescriptor desc, int precision, int scale) {
+      super(desc);
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      return DecimalData.fromUnscaledLong(column.nextInteger(), precision, 
scale);
+    }
+  }
+
+  private static class LongDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int precision;
+    private final int scale;
+
+    LongDecimalReader(ColumnDescriptor desc, int precision, int scale) {
+      super(desc);
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      return DecimalData.fromUnscaledLong(column.nextLong(), precision, scale);
+    }
+  }
+
+  private static class TimestampMicroReader extends 
ParquetValueReaders.UnboxedReader<TimestampData> {
+    TimestampMicroReader(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    public TimestampData read(TimestampData ignored) {
+      return TimestampData.fromEpochMillis(readLong() / 1000);

Review comment:
       Done.

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
##########
@@ -19,38 +19,439 @@
 
 package org.apache.iceberg.flink.data;
 
+import java.math.BigDecimal;
+import java.util.Iterator;
 import java.util.List;
-import org.apache.flink.types.Row;
-import org.apache.iceberg.data.parquet.BaseParquetWriter;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.iceberg.parquet.ParquetValueReaders;
 import org.apache.iceberg.parquet.ParquetValueWriter;
 import org.apache.iceberg.parquet.ParquetValueWriters;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
 
-public class FlinkParquetWriters extends BaseParquetWriter<Row> {
+public class FlinkParquetWriters {
+  private FlinkParquetWriters() {
+  }
 
-  private static final FlinkParquetWriters INSTANCE = new 
FlinkParquetWriters();
+  @SuppressWarnings("unchecked")
+  public static <T> ParquetValueWriter<T> buildWriter(LogicalType schema, 
MessageType type) {
+    return (ParquetValueWriter<T>) ParquetWithFlinkSchemaVisitor.visit(schema, 
type, new WriteBuilder(type));
+  }
 
-  private FlinkParquetWriters() {
+  private static class WriteBuilder extends 
ParquetWithFlinkSchemaVisitor<ParquetValueWriter<?>> {
+    private final MessageType type;
+
+    WriteBuilder(MessageType type) {
+      this.type = type;
+    }
+
+    @Override
+    public ParquetValueWriter<?> message(RowType sStruct, MessageType message, 
List<ParquetValueWriter<?>> fields) {
+      return struct(sStruct, message.asGroupType(), fields);
+    }
+
+    @Override
+    public ParquetValueWriter<?> struct(RowType sStruct, GroupType struct,
+                                        List<ParquetValueWriter<?>> 
fieldWriters) {
+      List<Type> fields = struct.getFields();
+      List<RowField> flinkFields = sStruct.getFields();
+      List<ParquetValueWriter<?>> writers = 
Lists.newArrayListWithExpectedSize(fieldWriters.size());
+      List<LogicalType> flinkTypes = Lists.newArrayList();
+      for (int i = 0; i < fields.size(); i += 1) {
+        writers.add(newOption(struct.getType(i), fieldWriters.get(i)));
+        flinkTypes.add(flinkFields.get(i).getType());
+      }
+
+      return new RowDataWriter(writers, flinkTypes);
+    }
+
+    @Override
+    public ParquetValueWriter<?> list(ArrayType sArray, GroupType array, 
ParquetValueWriter<?> elementWriter) {
+      GroupType repeated = array.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath);
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath);
+
+      return new ArrayDataWriter<>(repeatedD, repeatedR,
+          newOption(repeated.getType(0), elementWriter),
+          sArray.getElementType());
+    }
+
+    @Override
+    public ParquetValueWriter<?> map(MapType sMap, GroupType map,
+                                     ParquetValueWriter<?> keyWriter, 
ParquetValueWriter<?> valueWriter) {
+      GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath);
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath);
+
+      return new MapDataWriter<>(repeatedD, repeatedR,
+          newOption(repeatedKeyValue.getType(0), keyWriter),
+          newOption(repeatedKeyValue.getType(1), valueWriter),
+          sMap.getKeyType(), sMap.getValueType());
+    }
+
+
+    private ParquetValueWriter<?> newOption(org.apache.parquet.schema.Type 
fieldType, ParquetValueWriter<?> writer) {
+      int maxD = type.getMaxDefinitionLevel(path(fieldType.getName()));
+      return ParquetValueWriters.option(fieldType, maxD, writer);
+    }
+
+    @Override
+    public ParquetValueWriter<?> primitive(LogicalType sType, PrimitiveType 
primitive) {
+      ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+      if (primitive.getOriginalType() != null) {
+        switch (primitive.getOriginalType()) {
+          case ENUM:
+          case JSON:
+          case UTF8:
+            return strings(desc);
+          case DATE:
+          case INT_8:
+          case INT_16:
+          case INT_32:
+          case TIME_MICROS:

Review comment:
       Will update in the writer side PR.

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,714 @@
 
 package org.apache.iceberg.flink.data;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
 import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+public class FlinkParquetReaders {
+  private FlinkParquetReaders() {
+  }
 
-  private static final FlinkParquetReaders INSTANCE = new 
FlinkParquetReaders();
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema, 
MessageType fileSchema) {
+    return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+  }
 
-  private FlinkParquetReaders() {
+  @SuppressWarnings("unchecked")
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+                                                        MessageType fileSchema,
+                                                        Map<Integer, ?> 
idToConstant) {
+    if (ParquetSchemaUtil.hasIds(fileSchema)) {
+      return (ParquetValueReader<RowData>)
+          TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+              new ReadBuilder(fileSchema, idToConstant));
+    } else {
+      return (ParquetValueReader<RowData>)
+          TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+              new FallbackReadBuilder(fileSchema, idToConstant));
+    }
+  }
+
+  private static class FallbackReadBuilder extends ReadBuilder {
+    FallbackReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+      super(type, idToConstant);
+    }
+
+    @Override
+    public ParquetValueReader<?> message(Types.StructType expected, 
MessageType message,
+                                         List<ParquetValueReader<?>> 
fieldReaders) {
+      // the top level matches by ID, but the remaining IDs are missing
+      return super.struct(expected, message, fieldReaders);
+    }
+
+    @Override
+    public ParquetValueReader<?> struct(Types.StructType ignored, GroupType 
struct,
+                                        List<ParquetValueReader<?>> 
fieldReaders) {
+      // the expected struct is ignored because nested fields are never found 
when the
+      List<ParquetValueReader<?>> newFields = 
Lists.newArrayListWithExpectedSize(
+          fieldReaders.size());
+      List<Type> types = 
Lists.newArrayListWithExpectedSize(fieldReaders.size());
+      List<Type> fields = struct.getFields();
+      for (int i = 0; i < fields.size(); i += 1) {
+        Type fieldType = fields.get(i);
+        int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 
1;
+        newFields.add(ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
+        types.add(fieldType);
+      }
+
+      return new RowDataReader(types, newFields);
+    }
+  }
+
+  private static class ReadBuilder extends 
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+    private final MessageType type;
+    private final Map<Integer, ?> idToConstant;
+
+    ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+      this.type = type;
+      this.idToConstant = idToConstant;
+    }
+
+    @Override
+    public ParquetValueReader<?> message(Types.StructType expected, 
MessageType message,
+                                         List<ParquetValueReader<?>> 
fieldReaders) {
+      return struct(expected, message.asGroupType(), fieldReaders);
+    }
+
+    @Override
+    public ParquetValueReader<?> struct(Types.StructType expected, GroupType 
struct,
+                                        List<ParquetValueReader<?>> 
fieldReaders) {
+      // match the expected struct's order
+      Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
+      Map<Integer, Type> typesById = Maps.newHashMap();
+      List<Type> fields = struct.getFields();
+      for (int i = 0; i < fields.size(); i += 1) {
+        Type fieldType = fields.get(i);
+        int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+        if (fieldType.getId() != null) {
+          int id = fieldType.getId().intValue();
+          readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
+          typesById.put(id, fieldType);
+        }
+      }
+
+      List<Types.NestedField> expectedFields = expected != null ?
+          expected.fields() : ImmutableList.of();
+      List<ParquetValueReader<?>> reorderedFields = 
Lists.newArrayListWithExpectedSize(
+          expectedFields.size());
+      List<Type> types = 
Lists.newArrayListWithExpectedSize(expectedFields.size());
+      for (Types.NestedField field : expectedFields) {
+        int id = field.fieldId();
+        if (idToConstant.containsKey(id)) {
+          // containsKey is used because the constant may be null
+          
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+          types.add(null);
+        } else {
+          ParquetValueReader<?> reader = readersById.get(id);
+          if (reader != null) {
+            reorderedFields.add(reader);
+            types.add(typesById.get(id));
+          } else {
+            reorderedFields.add(ParquetValueReaders.nulls());
+            types.add(null);
+          }
+        }
+      }
+
+      return new RowDataReader(types, reorderedFields);
+    }
+
+    @Override
+    public ParquetValueReader<?> list(Types.ListType expectedList, GroupType 
array,
+                                      ParquetValueReader<?> elementReader) {
+      GroupType repeated = array.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type elementType = repeated.getType(0);
+      int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 
1;
+
+      return new ArrayReader<>(repeatedD, repeatedR, 
ParquetValueReaders.option(elementType, elementD, elementReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
+                                     ParquetValueReader<?> keyReader,
+                                     ParquetValueReader<?> valueReader) {
+      GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type keyType = repeatedKeyValue.getType(0);
+      int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
+      Type valueType = repeatedKeyValue.getType(1);
+      int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
+
+      return new MapReader<>(repeatedD, repeatedR,
+          ParquetValueReaders.option(keyType, keyD, keyReader),
+          ParquetValueReaders.option(valueType, valueD, valueReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> 
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+                                           PrimitiveType primitive) {
+      ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+      if (primitive.getOriginalType() != null) {
+        switch (primitive.getOriginalType()) {
+          case ENUM:
+          case JSON:
+          case UTF8:
+            return new StringReader(desc);
+          case INT_8:
+          case INT_16:
+          case INT_32:
+          case TIME_MICROS:
+          case DATE:
+            if (expected != null && expected.typeId() == 
Types.LongType.get().typeId()) {
+              return new ParquetValueReaders.IntAsLongReader(desc);
+            } else {
+              return new ParquetValueReaders.UnboxedReader<>(desc);
+            }
+          case INT_64:
+          case TIMESTAMP_MICROS:
+            return new TimestampMicroReader(desc);
+          case DECIMAL:
+            DecimalLogicalTypeAnnotation decimal = 
(DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation();
+            switch (primitive.getPrimitiveTypeName()) {
+              case BINARY:
+              case FIXED_LEN_BYTE_ARRAY:
+                return new BinaryDecimalReader(desc, decimal.getScale());
+              case INT64:
+                return new LongDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
+              case INT32:
+                return new IntegerDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
+              default:
+                throw new UnsupportedOperationException(
+                    "Unsupported base type for decimal: " + 
primitive.getPrimitiveTypeName());
+            }
+          case BSON:
+            return new BytesReader(desc);
+          default:
+            throw new UnsupportedOperationException(
+                "Unsupported logical type: " + primitive.getOriginalType());
+        }
+      }
+
+      switch (primitive.getPrimitiveTypeName()) {
+        case FIXED_LEN_BYTE_ARRAY:
+        case BINARY:
+          return new BytesReader(desc);
+        case INT32:
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.LONG) {
+            return new ParquetValueReaders.IntAsLongReader(desc);
+          } else {
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          }
+        case FLOAT:
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.DOUBLE) {
+            return new ParquetValueReaders.FloatAsDoubleReader(desc);
+          } else {
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          }
+        case BOOLEAN:
+        case INT64:
+        case DOUBLE:
+          return new ParquetValueReaders.UnboxedReader<>(desc);
+        default:
+          throw new UnsupportedOperationException("Unsupported type: " + 
primitive);
+      }
+    }
+
+    protected MessageType type() {
+      return type;
+    }
+  }
+
+  private static class BinaryDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int scale;
+
+    BinaryDecimalReader(ColumnDescriptor desc, int scale) {
+      super(desc);
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      Binary binary = column.nextBinary();
+      BigDecimal bigDecimal = new BigDecimal(new 
BigInteger(binary.getBytes()), scale);
+      return DecimalData.fromBigDecimal(bigDecimal, bigDecimal.precision(), 
scale);
+    }
+  }
+
+  private static class IntegerDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int precision;
+    private final int scale;
+
+    IntegerDecimalReader(ColumnDescriptor desc, int precision, int scale) {
+      super(desc);
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      return DecimalData.fromUnscaledLong(column.nextInteger(), precision, 
scale);
+    }
+  }
+
+  private static class LongDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int precision;
+    private final int scale;
+
+    LongDecimalReader(ColumnDescriptor desc, int precision, int scale) {
+      super(desc);
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      return DecimalData.fromUnscaledLong(column.nextLong(), precision, scale);
+    }
+  }
+
+  private static class TimestampMicroReader extends 
ParquetValueReaders.UnboxedReader<TimestampData> {
+    TimestampMicroReader(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    public TimestampData read(TimestampData ignored) {
+      return TimestampData.fromEpochMillis(readLong() / 1000);
+    }
+
+    @Override
+    public long readLong() {
+      return column.nextLong();
+    }
+  }
+
+  private static class StringReader extends 
ParquetValueReaders.PrimitiveReader<StringData> {
+    StringReader(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    public StringData read(StringData ignored) {
+      Binary binary = column.nextBinary();
+      ByteBuffer buffer = binary.toByteBuffer();
+      if (buffer.hasArray()) {
+        return StringData.fromBytes(
+            buffer.array(), buffer.arrayOffset() + buffer.position(), 
buffer.remaining());
+      } else {
+        return StringData.fromBytes(binary.getBytes());
+      }
+    }
   }
 
-  public static ParquetValueReader<Row> buildReader(Schema expectedSchema, 
MessageType fileSchema) {
-    return INSTANCE.createReader(expectedSchema, fileSchema);
+  private static class BytesReader extends 
ParquetValueReaders.PrimitiveReader<byte[]> {

Review comment:
       Done.

##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java
##########
@@ -41,34 +47,193 @@
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
 
-  private void testCorrectness(Schema schema, int numRecords, Iterable<Row> 
iterable) throws IOException {
+  private void testCorrectness(Schema schema, int numRecords, 
Iterable<RowData> iterable) throws IOException {
     File testFile = temp.newFile();
     Assert.assertTrue("Delete should succeed", testFile.delete());
 
-    try (FileAppender<Row> writer = Parquet.write(Files.localOutput(testFile))
+    try (FileAppender<RowData> writer = 
Parquet.write(Files.localOutput(testFile))
         .schema(schema)
-        .createWriterFunc(FlinkParquetWriters::buildWriter)
+        .createWriterFunc(msgType -> 
FlinkParquetWriters.buildWriter(FlinkSchemaUtil.convert(schema), msgType))
         .build()) {
       writer.addAll(iterable);
     }
 
-    try (CloseableIterable<Row> reader = 
Parquet.read(Files.localInput(testFile))
+    try (CloseableIterable<RowData> reader = 
Parquet.read(Files.localInput(testFile))
         .project(schema)
         .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, 
type))
         .build()) {
-      Iterator<Row> expected = iterable.iterator();
-      Iterator<Row> rows = reader.iterator();
+      Iterator<RowData> expected = iterable.iterator();
+      Iterator<RowData> rows = reader.iterator();
       for (int i = 0; i < numRecords; i += 1) {
         Assert.assertTrue("Should have expected number of rows", 
rows.hasNext());
-        Assert.assertEquals(expected.next(), rows.next());
+        assertRowData(schema.asStruct(), expected.next(), rows.next());
       }
       Assert.assertFalse("Should not have extra rows", rows.hasNext());
     }
   }
 
+  private void assertRowData(Type type, RowData expected, RowData actual) {

Review comment:
       Done.

##########
File path: flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java
##########
@@ -88,20 +102,151 @@ public Row next() {
     };
   }
 
+  private static Iterable<RowData> generateRowData(Schema schema, int 
numRecords,
+                                                   
Supplier<RandomRowDataGenerator> supplier) {
+    return () -> new Iterator<RowData>() {
+      private final RandomRowDataGenerator generator = supplier.get();
+      private int count = 0;
+
+      @Override
+      public boolean hasNext() {
+        return count < numRecords;
+      }
+
+      @Override
+      public RowData next() {
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+        ++count;
+        return (RowData) TypeUtil.visit(schema, generator);
+      }
+    };
+  }
+
+  public static Iterable<RowData> generateRowData(Schema schema, int 
numRecords, long seed) {
+    return generateRowData(schema, numRecords, () -> new 
RandomRowDataGenerator(seed));
+  }
+
   public static Iterable<Row> generate(Schema schema, int numRecords, long 
seed) {
     return generateData(schema, numRecords, () -> new 
RandomRowGenerator(seed));
   }
 
-  public static Iterable<Row> generateFallbackData(Schema schema, int 
numRecords, long seed, long numDictRows) {
-    return generateData(schema, numRecords, () -> new FallbackGenerator(seed, 
numDictRows));
+  public static Iterable<RowData> generateFallbackData(Schema schema, int 
numRecords, long seed, long numDictRows) {
+    return generateRowData(schema, numRecords, () -> new 
FallbackGenerator(seed, numDictRows));
   }
 
-  public static Iterable<Row> generateDictionaryEncodableData(Schema schema, 
int numRecords, long seed) {
-    return generateData(schema, numRecords, () -> new 
DictionaryEncodedGenerator(seed));
+  public static Iterable<RowData> generateDictionaryEncodableData(Schema 
schema, int numRecords, long seed) {
+    return generateRowData(schema, numRecords, () -> new 
DictionaryEncodedGenerator(seed));
   }
 
-  private static class RandomRowGenerator extends 
RandomGenericData.RandomDataGenerator<Row> {
+  private static class RandomRowDataGenerator extends 
TypeUtil.CustomOrderSchemaVisitor<Object> {

Review comment:
       Done. Now it writes generics in parquet files and readout through Flink 
parquet reader, and compare the `Record` with `RowData`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to