openinx commented on a change in pull request #1266:
URL: https://github.com/apache/iceberg/pull/1266#discussion_r466798700



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,701 @@
 
 package org.apache.iceberg.flink.data;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.ZoneOffset;
 import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+class FlinkParquetReaders {
+  private FlinkParquetReaders() {
+  }
 
-  private static final FlinkParquetReaders INSTANCE = new 
FlinkParquetReaders();
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema, 
MessageType fileSchema) {
+    return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+  }
 
-  private FlinkParquetReaders() {
+  @SuppressWarnings("unchecked")
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+                                                        MessageType fileSchema,
+                                                        Map<Integer, ?> 
idToConstant) {
+    return (ParquetValueReader<RowData>) 
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+        new ReadBuilder(fileSchema, idToConstant)
+    );
+  }
+
+  private static class ReadBuilder extends 
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+    private final MessageType type;
+    private final Map<Integer, ?> idToConstant;
+
+    ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+      this.type = type;
+      this.idToConstant = idToConstant;
+    }
+
+    @Override
+    public ParquetValueReader<RowData> message(Types.StructType expected, 
MessageType message,
+                                               List<ParquetValueReader<?>> 
fieldReaders) {
+      return struct(expected, message.asGroupType(), fieldReaders);
+    }
+
+    @Override
+    public ParquetValueReader<RowData> struct(Types.StructType expected, 
GroupType struct,
+                                              List<ParquetValueReader<?>> 
fieldReaders) {
+      // match the expected struct's order
+      Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
+      Map<Integer, Type> typesById = Maps.newHashMap();
+      List<Type> fields = struct.getFields();
+      for (int i = 0; i < fields.size(); i += 1) {
+        Type fieldType = fields.get(i);
+        int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+        if (fieldType.getId() != null) {
+          int id = fieldType.getId().intValue();
+          readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
+          typesById.put(id, fieldType);
+        }
+      }
+
+      List<Types.NestedField> expectedFields = expected != null ?
+          expected.fields() : ImmutableList.of();
+      List<ParquetValueReader<?>> reorderedFields = 
Lists.newArrayListWithExpectedSize(
+          expectedFields.size());
+      List<Type> types = 
Lists.newArrayListWithExpectedSize(expectedFields.size());
+      for (Types.NestedField field : expectedFields) {
+        int id = field.fieldId();
+        if (idToConstant.containsKey(id)) {
+          // containsKey is used because the constant may be null
+          
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+          types.add(null);
+        } else {
+          ParquetValueReader<?> reader = readersById.get(id);
+          if (reader != null) {
+            reorderedFields.add(reader);
+            types.add(typesById.get(id));
+          } else {
+            reorderedFields.add(ParquetValueReaders.nulls());
+            types.add(null);
+          }
+        }
+      }
+
+      return new RowDataReader(types, reorderedFields);
+    }
+
+    @Override
+    public ParquetValueReader<?> list(Types.ListType expectedList, GroupType 
array,
+                                      ParquetValueReader<?> elementReader) {
+      GroupType repeated = array.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type elementType = repeated.getType(0);
+      int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 
1;
+
+      return new ArrayReader<>(repeatedD, repeatedR, 
ParquetValueReaders.option(elementType, elementD, elementReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
+                                     ParquetValueReader<?> keyReader,
+                                     ParquetValueReader<?> valueReader) {
+      GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type keyType = repeatedKeyValue.getType(0);
+      int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
+      Type valueType = repeatedKeyValue.getType(1);
+      int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
+
+      return new MapReader<>(repeatedD, repeatedR,
+          ParquetValueReaders.option(keyType, keyD, keyReader),
+          ParquetValueReaders.option(valueType, valueD, valueReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> 
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+                                           PrimitiveType primitive) {
+      ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+      if (primitive.getOriginalType() != null) {
+        switch (primitive.getOriginalType()) {
+          case ENUM:
+          case JSON:
+          case UTF8:
+            return new StringReader(desc);
+          case INT_8:
+          case INT_16:
+          case INT_32:
+          case DATE:
+            if (expected != null && expected.typeId() == 
Types.LongType.get().typeId()) {
+              return new ParquetValueReaders.IntAsLongReader(desc);
+            } else {
+              return new ParquetValueReaders.UnboxedReader<>(desc);
+            }
+          case TIME_MICROS:
+            return new LossyMicrosToMillisTimeReader(desc);
+          case INT_64:
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          case TIMESTAMP_MICROS:
+            if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
+              return new TimestampTzReader(desc);
+            } else {
+              return new TimestampReader(desc);
+            }
+          case DECIMAL:
+            DecimalLogicalTypeAnnotation decimal = 
(DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation();
+            switch (primitive.getPrimitiveTypeName()) {
+              case BINARY:
+              case FIXED_LEN_BYTE_ARRAY:
+                return new BinaryDecimalReader(desc, decimal.getScale());
+              case INT64:
+                return new LongDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
+              case INT32:
+                return new IntegerDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
+              default:
+                throw new UnsupportedOperationException(
+                    "Unsupported base type for decimal: " + 
primitive.getPrimitiveTypeName());
+            }
+          case BSON:
+            return new ParquetValueReaders.ByteArrayReader(desc);
+          default:
+            throw new UnsupportedOperationException(
+                "Unsupported logical type: " + primitive.getOriginalType());
+        }
+      }
+
+      switch (primitive.getPrimitiveTypeName()) {
+        case FIXED_LEN_BYTE_ARRAY:
+        case BINARY:
+          return new ParquetValueReaders.ByteArrayReader(desc);
+        case INT32:
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.LONG) {
+            return new ParquetValueReaders.IntAsLongReader(desc);
+          } else {
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          }
+        case FLOAT:
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.DOUBLE) {
+            return new ParquetValueReaders.FloatAsDoubleReader(desc);
+          } else {
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          }
+        case BOOLEAN:
+        case INT64:
+        case DOUBLE:
+          return new ParquetValueReaders.UnboxedReader<>(desc);
+        default:
+          throw new UnsupportedOperationException("Unsupported type: " + 
primitive);
+      }
+    }
+  }
+
+  private static class BinaryDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int scale;
+
+    BinaryDecimalReader(ColumnDescriptor desc, int scale) {
+      super(desc);
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      Binary binary = column.nextBinary();
+      BigDecimal bigDecimal = new BigDecimal(new 
BigInteger(binary.getBytes()), scale);
+      return DecimalData.fromBigDecimal(bigDecimal, bigDecimal.precision(), 
scale);

Review comment:
       Here we pass the `bigDecimal.precision()` to 
`DecimalData.fromBigDecimal`,  I think it's incorrect.  Because the precision 
expected for DecimalData is the precision of data type,  it will use this value 
to do `ROUND`  etc. You could see the comments in `DecimalData`: 
   
   ```java
        // The semantics of the fields are as follows:
        //  - `precision` and `scale` represent the precision and scale of SQL 
decimal type
        //  - If `decimalVal` is set, it represents the whole decimal value
        //  - Otherwise, the decimal value is longVal/(10^scale).
        //
        // Note that the (precision, scale) must be correct.
        // if precision > MAX_COMPACT_PRECISION,
        //   `decimalVal` represents the value. `longVal` is undefined
        // otherwise, (longVal, scale) represents the value
        //   `decimalVal` may be set and cached
   
        final int precision;
        final int scale;
   ``` 
   
   I think we need to pass the decimal type's precision and scale,   and the 
use the `DecimalData.fromUnscaledBytes(binary.getBytes(), precision, scale);`  
to construct the DecimalData.
   

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,701 @@
 
 package org.apache.iceberg.flink.data;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.ZoneOffset;
 import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+class FlinkParquetReaders {
+  private FlinkParquetReaders() {
+  }
 
-  private static final FlinkParquetReaders INSTANCE = new 
FlinkParquetReaders();
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema, 
MessageType fileSchema) {
+    return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+  }
 
-  private FlinkParquetReaders() {
+  @SuppressWarnings("unchecked")
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+                                                        MessageType fileSchema,
+                                                        Map<Integer, ?> 
idToConstant) {
+    return (ParquetValueReader<RowData>) 
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+        new ReadBuilder(fileSchema, idToConstant)
+    );
+  }
+
+  private static class ReadBuilder extends 
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+    private final MessageType type;
+    private final Map<Integer, ?> idToConstant;
+
+    ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+      this.type = type;
+      this.idToConstant = idToConstant;
+    }
+
+    @Override
+    public ParquetValueReader<RowData> message(Types.StructType expected, 
MessageType message,
+                                               List<ParquetValueReader<?>> 
fieldReaders) {
+      return struct(expected, message.asGroupType(), fieldReaders);
+    }
+
+    @Override
+    public ParquetValueReader<RowData> struct(Types.StructType expected, 
GroupType struct,
+                                              List<ParquetValueReader<?>> 
fieldReaders) {
+      // match the expected struct's order
+      Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
+      Map<Integer, Type> typesById = Maps.newHashMap();
+      List<Type> fields = struct.getFields();
+      for (int i = 0; i < fields.size(); i += 1) {
+        Type fieldType = fields.get(i);
+        int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+        if (fieldType.getId() != null) {
+          int id = fieldType.getId().intValue();
+          readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
+          typesById.put(id, fieldType);
+        }
+      }
+
+      List<Types.NestedField> expectedFields = expected != null ?
+          expected.fields() : ImmutableList.of();
+      List<ParquetValueReader<?>> reorderedFields = 
Lists.newArrayListWithExpectedSize(
+          expectedFields.size());
+      List<Type> types = 
Lists.newArrayListWithExpectedSize(expectedFields.size());
+      for (Types.NestedField field : expectedFields) {
+        int id = field.fieldId();
+        if (idToConstant.containsKey(id)) {
+          // containsKey is used because the constant may be null
+          
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+          types.add(null);
+        } else {
+          ParquetValueReader<?> reader = readersById.get(id);
+          if (reader != null) {
+            reorderedFields.add(reader);
+            types.add(typesById.get(id));
+          } else {
+            reorderedFields.add(ParquetValueReaders.nulls());
+            types.add(null);
+          }
+        }
+      }
+
+      return new RowDataReader(types, reorderedFields);
+    }
+
+    @Override
+    public ParquetValueReader<?> list(Types.ListType expectedList, GroupType 
array,
+                                      ParquetValueReader<?> elementReader) {
+      GroupType repeated = array.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type elementType = repeated.getType(0);
+      int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 
1;
+
+      return new ArrayReader<>(repeatedD, repeatedR, 
ParquetValueReaders.option(elementType, elementD, elementReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
+                                     ParquetValueReader<?> keyReader,
+                                     ParquetValueReader<?> valueReader) {
+      GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type keyType = repeatedKeyValue.getType(0);
+      int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
+      Type valueType = repeatedKeyValue.getType(1);
+      int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
+
+      return new MapReader<>(repeatedD, repeatedR,
+          ParquetValueReaders.option(keyType, keyD, keyReader),
+          ParquetValueReaders.option(valueType, valueD, valueReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> 
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+                                           PrimitiveType primitive) {
+      ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+      if (primitive.getOriginalType() != null) {
+        switch (primitive.getOriginalType()) {
+          case ENUM:
+          case JSON:
+          case UTF8:
+            return new StringReader(desc);
+          case INT_8:
+          case INT_16:
+          case INT_32:
+          case DATE:
+            if (expected != null && expected.typeId() == 
Types.LongType.get().typeId()) {
+              return new ParquetValueReaders.IntAsLongReader(desc);
+            } else {
+              return new ParquetValueReaders.UnboxedReader<>(desc);
+            }
+          case TIME_MICROS:
+            return new LossyMicrosToMillisTimeReader(desc);
+          case INT_64:
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          case TIMESTAMP_MICROS:
+            if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
+              return new TimestampTzReader(desc);
+            } else {
+              return new TimestampReader(desc);
+            }
+          case DECIMAL:
+            DecimalLogicalTypeAnnotation decimal = 
(DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation();
+            switch (primitive.getPrimitiveTypeName()) {
+              case BINARY:
+              case FIXED_LEN_BYTE_ARRAY:
+                return new BinaryDecimalReader(desc, decimal.getScale());
+              case INT64:
+                return new LongDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
+              case INT32:
+                return new IntegerDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
+              default:
+                throw new UnsupportedOperationException(
+                    "Unsupported base type for decimal: " + 
primitive.getPrimitiveTypeName());
+            }
+          case BSON:
+            return new ParquetValueReaders.ByteArrayReader(desc);
+          default:
+            throw new UnsupportedOperationException(
+                "Unsupported logical type: " + primitive.getOriginalType());
+        }
+      }
+
+      switch (primitive.getPrimitiveTypeName()) {
+        case FIXED_LEN_BYTE_ARRAY:
+        case BINARY:
+          return new ParquetValueReaders.ByteArrayReader(desc);
+        case INT32:
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.LONG) {
+            return new ParquetValueReaders.IntAsLongReader(desc);
+          } else {
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          }
+        case FLOAT:
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.DOUBLE) {
+            return new ParquetValueReaders.FloatAsDoubleReader(desc);
+          } else {
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          }
+        case BOOLEAN:
+        case INT64:
+        case DOUBLE:
+          return new ParquetValueReaders.UnboxedReader<>(desc);
+        default:
+          throw new UnsupportedOperationException("Unsupported type: " + 
primitive);
+      }
+    }
+  }
+
+  private static class BinaryDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int scale;
+
+    BinaryDecimalReader(ColumnDescriptor desc, int scale) {
+      super(desc);
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      Binary binary = column.nextBinary();
+      BigDecimal bigDecimal = new BigDecimal(new 
BigInteger(binary.getBytes()), scale);
+      return DecimalData.fromBigDecimal(bigDecimal, bigDecimal.precision(), 
scale);
+    }
+  }
+
+  private static class IntegerDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int precision;
+    private final int scale;
+
+    IntegerDecimalReader(ColumnDescriptor desc, int precision, int scale) {
+      super(desc);
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      return DecimalData.fromUnscaledLong(column.nextInteger(), precision, 
scale);
+    }
+  }
+
+  private static class LongDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int precision;
+    private final int scale;
+
+    LongDecimalReader(ColumnDescriptor desc, int precision, int scale) {
+      super(desc);
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      return DecimalData.fromUnscaledLong(column.nextLong(), precision, scale);
+    }
+  }
+
+  private static class TimestampTzReader extends 
ParquetValueReaders.UnboxedReader<TimestampData> {
+    TimestampTzReader(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    public TimestampData read(TimestampData ignored) {
+      long value = readLong();
+      return 
TimestampData.fromLocalDateTime(Instant.ofEpochSecond(Math.floorDiv(value, 
1000_000),
+          Math.floorMod(value, 1000_000) * 1000)
+          .atOffset(ZoneOffset.UTC)
+          .toLocalDateTime());
+    }
+
+    @Override
+    public long readLong() {
+      return column.nextLong();
+    }
   }
 
-  public static ParquetValueReader<Row> buildReader(Schema expectedSchema, 
MessageType fileSchema) {
-    return INSTANCE.createReader(expectedSchema, fileSchema);
+  private static class TimestampReader extends 
ParquetValueReaders.UnboxedReader<TimestampData> {
+    TimestampReader(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    public TimestampData read(TimestampData ignored) {
+      long value = readLong();
+      return 
TimestampData.fromInstant(Instant.ofEpochSecond(Math.floorDiv(value, 1000_000),
+          Math.floorMod(value, 1000_000) * 1000));
+    }
+
+    @Override
+    public long readLong() {
+      return column.nextLong();
+    }
   }
 
-  @Override
-  protected ParquetValueReader<Row> createStructReader(List<Type> types,
-                                                       
List<ParquetValueReader<?>> fieldReaders,
-                                                       Types.StructType 
structType) {
-    return new RowReader(types, fieldReaders, structType);
+  private static class StringReader extends 
ParquetValueReaders.PrimitiveReader<StringData> {
+    StringReader(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    public StringData read(StringData ignored) {
+      Binary binary = column.nextBinary();
+      ByteBuffer buffer = binary.toByteBuffer();
+      if (buffer.hasArray()) {
+        return StringData.fromBytes(
+            buffer.array(), buffer.arrayOffset() + buffer.position(), 
buffer.remaining());
+      } else {
+        return StringData.fromBytes(binary.getBytes());
+      }
+    }
+  }
+
+  private static class LossyMicrosToMillisTimeReader extends 
ParquetValueReaders.PrimitiveReader<Integer> {
+    LossyMicrosToMillisTimeReader(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    public Integer read(Integer reuse) {
+      // Discard microseconds since Flink uses millisecond unit for TIME type.
+      return (int) Math.floorDiv(column.nextLong(), 1000);
+    }
+  }
+
+  private static class ArrayReader<E> extends 
ParquetValueReaders.RepeatedReader<ArrayData, ReusableArrayData, E> {
+    private int readPos = 0;
+    private int writePos = 0;
+
+    ArrayReader(int definitionLevel, int repetitionLevel, 
ParquetValueReader<E> reader) {
+      super(definitionLevel, repetitionLevel, reader);
+    }
+
+    @Override
+    protected ReusableArrayData newListData(ArrayData reuse) {
+      this.readPos = 0;
+      this.writePos = 0;
+
+      if (reuse instanceof ReusableArrayData) {
+        return (ReusableArrayData) reuse;
+      } else {
+        return new ReusableArrayData();
+      }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected E getElement(ReusableArrayData list) {
+      E value = null;
+      if (readPos < list.capacity()) {
+        value = (E) list.values[readPos];
+      }
+
+      readPos += 1;

Review comment:
       Q: when the case `readPos >= list.capacity()`, seems we don't need to 
increment the readPos ?  For my understanding,  in that case the `getElement` 
should return a null, means that we will read the value without a provided 
reuse element.  Then we also shouldn't move the readPos because we actually 
don't reuse any object from the list.  
   
   Of course, if we increment readPos here,  the logic is also correct. but 
seem have less possibility to reuse the object in array. 
   
   How do you think ? 

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,701 @@
 
 package org.apache.iceberg.flink.data;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.ZoneOffset;
 import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+class FlinkParquetReaders {
+  private FlinkParquetReaders() {
+  }
 
-  private static final FlinkParquetReaders INSTANCE = new 
FlinkParquetReaders();
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema, 
MessageType fileSchema) {
+    return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+  }
 
-  private FlinkParquetReaders() {
+  @SuppressWarnings("unchecked")
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+                                                        MessageType fileSchema,
+                                                        Map<Integer, ?> 
idToConstant) {
+    return (ParquetValueReader<RowData>) 
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+        new ReadBuilder(fileSchema, idToConstant)
+    );
+  }
+
+  private static class ReadBuilder extends 
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+    private final MessageType type;
+    private final Map<Integer, ?> idToConstant;
+
+    ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+      this.type = type;
+      this.idToConstant = idToConstant;
+    }
+
+    @Override
+    public ParquetValueReader<RowData> message(Types.StructType expected, 
MessageType message,
+                                               List<ParquetValueReader<?>> 
fieldReaders) {
+      return struct(expected, message.asGroupType(), fieldReaders);
+    }
+
+    @Override
+    public ParquetValueReader<RowData> struct(Types.StructType expected, 
GroupType struct,
+                                              List<ParquetValueReader<?>> 
fieldReaders) {
+      // match the expected struct's order
+      Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
+      Map<Integer, Type> typesById = Maps.newHashMap();
+      List<Type> fields = struct.getFields();
+      for (int i = 0; i < fields.size(); i += 1) {
+        Type fieldType = fields.get(i);
+        int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+        if (fieldType.getId() != null) {
+          int id = fieldType.getId().intValue();
+          readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
+          typesById.put(id, fieldType);
+        }
+      }
+
+      List<Types.NestedField> expectedFields = expected != null ?
+          expected.fields() : ImmutableList.of();
+      List<ParquetValueReader<?>> reorderedFields = 
Lists.newArrayListWithExpectedSize(
+          expectedFields.size());
+      List<Type> types = 
Lists.newArrayListWithExpectedSize(expectedFields.size());
+      for (Types.NestedField field : expectedFields) {
+        int id = field.fieldId();
+        if (idToConstant.containsKey(id)) {
+          // containsKey is used because the constant may be null
+          
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+          types.add(null);
+        } else {
+          ParquetValueReader<?> reader = readersById.get(id);
+          if (reader != null) {
+            reorderedFields.add(reader);
+            types.add(typesById.get(id));
+          } else {
+            reorderedFields.add(ParquetValueReaders.nulls());
+            types.add(null);
+          }
+        }
+      }
+
+      return new RowDataReader(types, reorderedFields);
+    }
+
+    @Override
+    public ParquetValueReader<?> list(Types.ListType expectedList, GroupType 
array,
+                                      ParquetValueReader<?> elementReader) {
+      GroupType repeated = array.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type elementType = repeated.getType(0);
+      int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 
1;
+
+      return new ArrayReader<>(repeatedD, repeatedR, 
ParquetValueReaders.option(elementType, elementD, elementReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
+                                     ParquetValueReader<?> keyReader,
+                                     ParquetValueReader<?> valueReader) {
+      GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type keyType = repeatedKeyValue.getType(0);
+      int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
+      Type valueType = repeatedKeyValue.getType(1);
+      int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
+
+      return new MapReader<>(repeatedD, repeatedR,
+          ParquetValueReaders.option(keyType, keyD, keyReader),
+          ParquetValueReaders.option(valueType, valueD, valueReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> 
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+                                           PrimitiveType primitive) {
+      ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+      if (primitive.getOriginalType() != null) {
+        switch (primitive.getOriginalType()) {
+          case ENUM:
+          case JSON:
+          case UTF8:
+            return new StringReader(desc);
+          case INT_8:
+          case INT_16:
+          case INT_32:
+          case DATE:
+            if (expected != null && expected.typeId() == 
Types.LongType.get().typeId()) {
+              return new ParquetValueReaders.IntAsLongReader(desc);
+            } else {
+              return new ParquetValueReaders.UnboxedReader<>(desc);
+            }
+          case TIME_MICROS:
+            return new LossyMicrosToMillisTimeReader(desc);
+          case INT_64:
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          case TIMESTAMP_MICROS:
+            if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
+              return new TimestampTzReader(desc);
+            } else {
+              return new TimestampReader(desc);
+            }
+          case DECIMAL:
+            DecimalLogicalTypeAnnotation decimal = 
(DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation();
+            switch (primitive.getPrimitiveTypeName()) {
+              case BINARY:
+              case FIXED_LEN_BYTE_ARRAY:
+                return new BinaryDecimalReader(desc, decimal.getScale());
+              case INT64:
+                return new LongDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
+              case INT32:
+                return new IntegerDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
+              default:
+                throw new UnsupportedOperationException(
+                    "Unsupported base type for decimal: " + 
primitive.getPrimitiveTypeName());
+            }
+          case BSON:
+            return new ParquetValueReaders.ByteArrayReader(desc);
+          default:
+            throw new UnsupportedOperationException(
+                "Unsupported logical type: " + primitive.getOriginalType());
+        }
+      }
+
+      switch (primitive.getPrimitiveTypeName()) {
+        case FIXED_LEN_BYTE_ARRAY:
+        case BINARY:
+          return new ParquetValueReaders.ByteArrayReader(desc);
+        case INT32:
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.LONG) {
+            return new ParquetValueReaders.IntAsLongReader(desc);
+          } else {
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          }
+        case FLOAT:
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.DOUBLE) {
+            return new ParquetValueReaders.FloatAsDoubleReader(desc);
+          } else {
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          }
+        case BOOLEAN:
+        case INT64:
+        case DOUBLE:
+          return new ParquetValueReaders.UnboxedReader<>(desc);
+        default:
+          throw new UnsupportedOperationException("Unsupported type: " + 
primitive);
+      }
+    }
+  }
+
+  private static class BinaryDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int scale;
+
+    BinaryDecimalReader(ColumnDescriptor desc, int scale) {
+      super(desc);
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      Binary binary = column.nextBinary();
+      BigDecimal bigDecimal = new BigDecimal(new 
BigInteger(binary.getBytes()), scale);
+      return DecimalData.fromBigDecimal(bigDecimal, bigDecimal.precision(), 
scale);

Review comment:
       BTW, is it possible that  we have few unit tests to address this ? 

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,701 @@
 
 package org.apache.iceberg.flink.data;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.ZoneOffset;
 import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+class FlinkParquetReaders {
+  private FlinkParquetReaders() {
+  }
 
-  private static final FlinkParquetReaders INSTANCE = new 
FlinkParquetReaders();
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema, 
MessageType fileSchema) {
+    return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+  }
 
-  private FlinkParquetReaders() {
+  @SuppressWarnings("unchecked")
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+                                                        MessageType fileSchema,
+                                                        Map<Integer, ?> 
idToConstant) {
+    return (ParquetValueReader<RowData>) 
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+        new ReadBuilder(fileSchema, idToConstant)
+    );
+  }
+
+  private static class ReadBuilder extends 
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+    private final MessageType type;
+    private final Map<Integer, ?> idToConstant;
+
+    ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+      this.type = type;
+      this.idToConstant = idToConstant;
+    }
+
+    @Override
+    public ParquetValueReader<RowData> message(Types.StructType expected, 
MessageType message,
+                                               List<ParquetValueReader<?>> 
fieldReaders) {
+      return struct(expected, message.asGroupType(), fieldReaders);
+    }
+
+    @Override
+    public ParquetValueReader<RowData> struct(Types.StructType expected, 
GroupType struct,
+                                              List<ParquetValueReader<?>> 
fieldReaders) {
+      // match the expected struct's order
+      Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
+      Map<Integer, Type> typesById = Maps.newHashMap();
+      List<Type> fields = struct.getFields();
+      for (int i = 0; i < fields.size(); i += 1) {
+        Type fieldType = fields.get(i);
+        int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+        if (fieldType.getId() != null) {
+          int id = fieldType.getId().intValue();
+          readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
+          typesById.put(id, fieldType);
+        }
+      }
+
+      List<Types.NestedField> expectedFields = expected != null ?
+          expected.fields() : ImmutableList.of();
+      List<ParquetValueReader<?>> reorderedFields = 
Lists.newArrayListWithExpectedSize(
+          expectedFields.size());
+      List<Type> types = 
Lists.newArrayListWithExpectedSize(expectedFields.size());
+      for (Types.NestedField field : expectedFields) {
+        int id = field.fieldId();
+        if (idToConstant.containsKey(id)) {
+          // containsKey is used because the constant may be null
+          
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+          types.add(null);
+        } else {
+          ParquetValueReader<?> reader = readersById.get(id);
+          if (reader != null) {
+            reorderedFields.add(reader);
+            types.add(typesById.get(id));
+          } else {
+            reorderedFields.add(ParquetValueReaders.nulls());
+            types.add(null);
+          }
+        }
+      }
+
+      return new RowDataReader(types, reorderedFields);
+    }
+
+    @Override
+    public ParquetValueReader<?> list(Types.ListType expectedList, GroupType 
array,
+                                      ParquetValueReader<?> elementReader) {
+      GroupType repeated = array.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type elementType = repeated.getType(0);
+      int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 
1;
+
+      return new ArrayReader<>(repeatedD, repeatedR, 
ParquetValueReaders.option(elementType, elementD, elementReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
+                                     ParquetValueReader<?> keyReader,
+                                     ParquetValueReader<?> valueReader) {
+      GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type keyType = repeatedKeyValue.getType(0);
+      int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
+      Type valueType = repeatedKeyValue.getType(1);
+      int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
+
+      return new MapReader<>(repeatedD, repeatedR,
+          ParquetValueReaders.option(keyType, keyD, keyReader),
+          ParquetValueReaders.option(valueType, valueD, valueReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> 
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+                                           PrimitiveType primitive) {
+      ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+      if (primitive.getOriginalType() != null) {
+        switch (primitive.getOriginalType()) {
+          case ENUM:
+          case JSON:
+          case UTF8:
+            return new StringReader(desc);
+          case INT_8:
+          case INT_16:
+          case INT_32:
+          case DATE:
+            if (expected != null && expected.typeId() == 
Types.LongType.get().typeId()) {
+              return new ParquetValueReaders.IntAsLongReader(desc);
+            } else {
+              return new ParquetValueReaders.UnboxedReader<>(desc);
+            }
+          case TIME_MICROS:
+            return new LossyMicrosToMillisTimeReader(desc);
+          case INT_64:
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          case TIMESTAMP_MICROS:
+            if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
+              return new TimestampTzReader(desc);
+            } else {
+              return new TimestampReader(desc);
+            }
+          case DECIMAL:
+            DecimalLogicalTypeAnnotation decimal = 
(DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation();
+            switch (primitive.getPrimitiveTypeName()) {
+              case BINARY:
+              case FIXED_LEN_BYTE_ARRAY:
+                return new BinaryDecimalReader(desc, decimal.getScale());
+              case INT64:
+                return new LongDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
+              case INT32:
+                return new IntegerDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
+              default:
+                throw new UnsupportedOperationException(
+                    "Unsupported base type for decimal: " + 
primitive.getPrimitiveTypeName());
+            }
+          case BSON:
+            return new ParquetValueReaders.ByteArrayReader(desc);
+          default:
+            throw new UnsupportedOperationException(
+                "Unsupported logical type: " + primitive.getOriginalType());
+        }
+      }
+
+      switch (primitive.getPrimitiveTypeName()) {
+        case FIXED_LEN_BYTE_ARRAY:
+        case BINARY:
+          return new ParquetValueReaders.ByteArrayReader(desc);
+        case INT32:
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.LONG) {
+            return new ParquetValueReaders.IntAsLongReader(desc);
+          } else {
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          }
+        case FLOAT:
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.DOUBLE) {
+            return new ParquetValueReaders.FloatAsDoubleReader(desc);
+          } else {
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          }
+        case BOOLEAN:
+        case INT64:
+        case DOUBLE:
+          return new ParquetValueReaders.UnboxedReader<>(desc);
+        default:
+          throw new UnsupportedOperationException("Unsupported type: " + 
primitive);
+      }
+    }
+  }
+
+  private static class BinaryDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int scale;
+
+    BinaryDecimalReader(ColumnDescriptor desc, int scale) {
+      super(desc);
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      Binary binary = column.nextBinary();
+      BigDecimal bigDecimal = new BigDecimal(new 
BigInteger(binary.getBytes()), scale);
+      return DecimalData.fromBigDecimal(bigDecimal, bigDecimal.precision(), 
scale);
+    }
+  }
+
+  private static class IntegerDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int precision;
+    private final int scale;
+
+    IntegerDecimalReader(ColumnDescriptor desc, int precision, int scale) {
+      super(desc);
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      return DecimalData.fromUnscaledLong(column.nextInteger(), precision, 
scale);
+    }
+  }
+
+  private static class LongDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int precision;
+    private final int scale;
+
+    LongDecimalReader(ColumnDescriptor desc, int precision, int scale) {
+      super(desc);
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      return DecimalData.fromUnscaledLong(column.nextLong(), precision, scale);
+    }
+  }
+
+  private static class TimestampTzReader extends 
ParquetValueReaders.UnboxedReader<TimestampData> {
+    TimestampTzReader(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    public TimestampData read(TimestampData ignored) {
+      long value = readLong();
+      return 
TimestampData.fromLocalDateTime(Instant.ofEpochSecond(Math.floorDiv(value, 
1000_000),
+          Math.floorMod(value, 1000_000) * 1000)
+          .atOffset(ZoneOffset.UTC)
+          .toLocalDateTime());

Review comment:
       Is it correct ?  For my understanding,   the timestamp with time zone 
don't need to convert to a `LocalDateTime` (because it has its own time zone),  
 while the timestamp without time zone need to.

##########
File path: data/src/test/java/org/apache/iceberg/data/RandomGenericData.java
##########
@@ -55,6 +59,14 @@ private RandomGenericData() {}
     return records;
   }
 
+  public static Iterable<Record> generateFallbackRecords(Schema schema, int 
numRecords, long seed, long numDictRows) {
+    return generateRecords(schema, numRecords, new FallbackGenerator(seed, 
numDictRows));

Review comment:
       I think here we'd better to create the record lazily, for 
`Iterable<Record>` result, (similar to spark 
[RandomData](https://github.com/apache/iceberg/blob/edb8d7759d74418daa677a8c39420f29e28edb66/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java#L68)),
  because if we wanna to generate lots of records, it will be not easy to OOM . 

##########
File path: data/src/test/java/org/apache/iceberg/data/RandomGenericData.java
##########
@@ -46,7 +46,11 @@
   private RandomGenericData() {}
 
   public static List<Record> generate(Schema schema, int numRecords, long 
seed) {
-    RandomRecordGenerator generator = new RandomRecordGenerator(seed);
+    return generateRecords(schema, numRecords, new 
RandomRecordGenerator(seed));
+  }
+
+  public static List<Record> generateRecords(Schema schema, int numRecords,

Review comment:
       We usually don't expose this method to public because 
`RandomRecordGenerator` is a private static class and others could not access 
this method actually.   

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,701 @@
 
 package org.apache.iceberg.flink.data;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.ZoneOffset;
 import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+class FlinkParquetReaders {
+  private FlinkParquetReaders() {
+  }
 
-  private static final FlinkParquetReaders INSTANCE = new 
FlinkParquetReaders();
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema, 
MessageType fileSchema) {
+    return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+  }
 
-  private FlinkParquetReaders() {
+  @SuppressWarnings("unchecked")
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+                                                        MessageType fileSchema,
+                                                        Map<Integer, ?> 
idToConstant) {
+    return (ParquetValueReader<RowData>) 
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+        new ReadBuilder(fileSchema, idToConstant)
+    );
+  }
+
+  private static class ReadBuilder extends 
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+    private final MessageType type;
+    private final Map<Integer, ?> idToConstant;
+
+    ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+      this.type = type;
+      this.idToConstant = idToConstant;
+    }
+
+    @Override
+    public ParquetValueReader<RowData> message(Types.StructType expected, 
MessageType message,
+                                               List<ParquetValueReader<?>> 
fieldReaders) {
+      return struct(expected, message.asGroupType(), fieldReaders);
+    }
+
+    @Override
+    public ParquetValueReader<RowData> struct(Types.StructType expected, 
GroupType struct,
+                                              List<ParquetValueReader<?>> 
fieldReaders) {
+      // match the expected struct's order
+      Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
+      Map<Integer, Type> typesById = Maps.newHashMap();
+      List<Type> fields = struct.getFields();
+      for (int i = 0; i < fields.size(); i += 1) {
+        Type fieldType = fields.get(i);
+        int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+        if (fieldType.getId() != null) {
+          int id = fieldType.getId().intValue();
+          readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
+          typesById.put(id, fieldType);
+        }
+      }
+
+      List<Types.NestedField> expectedFields = expected != null ?
+          expected.fields() : ImmutableList.of();
+      List<ParquetValueReader<?>> reorderedFields = 
Lists.newArrayListWithExpectedSize(
+          expectedFields.size());
+      List<Type> types = 
Lists.newArrayListWithExpectedSize(expectedFields.size());
+      for (Types.NestedField field : expectedFields) {
+        int id = field.fieldId();
+        if (idToConstant.containsKey(id)) {
+          // containsKey is used because the constant may be null
+          
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+          types.add(null);
+        } else {
+          ParquetValueReader<?> reader = readersById.get(id);
+          if (reader != null) {
+            reorderedFields.add(reader);
+            types.add(typesById.get(id));
+          } else {
+            reorderedFields.add(ParquetValueReaders.nulls());
+            types.add(null);
+          }
+        }
+      }
+
+      return new RowDataReader(types, reorderedFields);
+    }
+
+    @Override
+    public ParquetValueReader<?> list(Types.ListType expectedList, GroupType 
array,
+                                      ParquetValueReader<?> elementReader) {
+      GroupType repeated = array.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type elementType = repeated.getType(0);
+      int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 
1;
+
+      return new ArrayReader<>(repeatedD, repeatedR, 
ParquetValueReaders.option(elementType, elementD, elementReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
+                                     ParquetValueReader<?> keyReader,
+                                     ParquetValueReader<?> valueReader) {
+      GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type keyType = repeatedKeyValue.getType(0);
+      int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
+      Type valueType = repeatedKeyValue.getType(1);
+      int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
+
+      return new MapReader<>(repeatedD, repeatedR,
+          ParquetValueReaders.option(keyType, keyD, keyReader),
+          ParquetValueReaders.option(valueType, valueD, valueReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> 
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+                                           PrimitiveType primitive) {
+      ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+      if (primitive.getOriginalType() != null) {
+        switch (primitive.getOriginalType()) {
+          case ENUM:
+          case JSON:
+          case UTF8:
+            return new StringReader(desc);
+          case INT_8:
+          case INT_16:
+          case INT_32:
+          case DATE:

Review comment:
       The `DATE` type should always be an integer ?   For me,  seems more 
reasonable to move the `DATE` case to the place where `INT_64` is, because they 
are surely to use `UnboxedReader` . 

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,701 @@
 
 package org.apache.iceberg.flink.data;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.ZoneOffset;
 import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+class FlinkParquetReaders {
+  private FlinkParquetReaders() {
+  }
 
-  private static final FlinkParquetReaders INSTANCE = new 
FlinkParquetReaders();
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema, 
MessageType fileSchema) {
+    return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+  }
 
-  private FlinkParquetReaders() {
+  @SuppressWarnings("unchecked")
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+                                                        MessageType fileSchema,
+                                                        Map<Integer, ?> 
idToConstant) {
+    return (ParquetValueReader<RowData>) 
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+        new ReadBuilder(fileSchema, idToConstant)
+    );
+  }
+
+  private static class ReadBuilder extends 
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+    private final MessageType type;
+    private final Map<Integer, ?> idToConstant;
+
+    ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+      this.type = type;
+      this.idToConstant = idToConstant;
+    }
+
+    @Override
+    public ParquetValueReader<RowData> message(Types.StructType expected, 
MessageType message,
+                                               List<ParquetValueReader<?>> 
fieldReaders) {
+      return struct(expected, message.asGroupType(), fieldReaders);
+    }
+
+    @Override
+    public ParquetValueReader<RowData> struct(Types.StructType expected, 
GroupType struct,
+                                              List<ParquetValueReader<?>> 
fieldReaders) {
+      // match the expected struct's order
+      Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
+      Map<Integer, Type> typesById = Maps.newHashMap();
+      List<Type> fields = struct.getFields();
+      for (int i = 0; i < fields.size(); i += 1) {
+        Type fieldType = fields.get(i);
+        int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+        if (fieldType.getId() != null) {
+          int id = fieldType.getId().intValue();
+          readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
+          typesById.put(id, fieldType);
+        }
+      }
+
+      List<Types.NestedField> expectedFields = expected != null ?
+          expected.fields() : ImmutableList.of();
+      List<ParquetValueReader<?>> reorderedFields = 
Lists.newArrayListWithExpectedSize(
+          expectedFields.size());
+      List<Type> types = 
Lists.newArrayListWithExpectedSize(expectedFields.size());
+      for (Types.NestedField field : expectedFields) {
+        int id = field.fieldId();
+        if (idToConstant.containsKey(id)) {
+          // containsKey is used because the constant may be null
+          
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+          types.add(null);
+        } else {
+          ParquetValueReader<?> reader = readersById.get(id);
+          if (reader != null) {
+            reorderedFields.add(reader);
+            types.add(typesById.get(id));
+          } else {
+            reorderedFields.add(ParquetValueReaders.nulls());
+            types.add(null);
+          }
+        }
+      }
+
+      return new RowDataReader(types, reorderedFields);
+    }
+
+    @Override
+    public ParquetValueReader<?> list(Types.ListType expectedList, GroupType 
array,
+                                      ParquetValueReader<?> elementReader) {
+      GroupType repeated = array.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type elementType = repeated.getType(0);
+      int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 
1;
+
+      return new ArrayReader<>(repeatedD, repeatedR, 
ParquetValueReaders.option(elementType, elementD, elementReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
+                                     ParquetValueReader<?> keyReader,
+                                     ParquetValueReader<?> valueReader) {
+      GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type keyType = repeatedKeyValue.getType(0);
+      int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
+      Type valueType = repeatedKeyValue.getType(1);
+      int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
+
+      return new MapReader<>(repeatedD, repeatedR,
+          ParquetValueReaders.option(keyType, keyD, keyReader),
+          ParquetValueReaders.option(valueType, valueD, valueReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> 
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+                                           PrimitiveType primitive) {
+      ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+      if (primitive.getOriginalType() != null) {
+        switch (primitive.getOriginalType()) {
+          case ENUM:
+          case JSON:
+          case UTF8:
+            return new StringReader(desc);
+          case INT_8:
+          case INT_16:
+          case INT_32:
+          case DATE:
+            if (expected != null && expected.typeId() == 
Types.LongType.get().typeId()) {
+              return new ParquetValueReaders.IntAsLongReader(desc);
+            } else {
+              return new ParquetValueReaders.UnboxedReader<>(desc);
+            }
+          case TIME_MICROS:
+            return new LossyMicrosToMillisTimeReader(desc);
+          case INT_64:
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          case TIMESTAMP_MICROS:
+            if (((Types.TimestampType) expected).shouldAdjustToUTC()) {
+              return new TimestampTzReader(desc);
+            } else {
+              return new TimestampReader(desc);
+            }
+          case DECIMAL:
+            DecimalLogicalTypeAnnotation decimal = 
(DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation();
+            switch (primitive.getPrimitiveTypeName()) {
+              case BINARY:
+              case FIXED_LEN_BYTE_ARRAY:
+                return new BinaryDecimalReader(desc, decimal.getScale());
+              case INT64:
+                return new LongDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
+              case INT32:
+                return new IntegerDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
+              default:
+                throw new UnsupportedOperationException(
+                    "Unsupported base type for decimal: " + 
primitive.getPrimitiveTypeName());
+            }
+          case BSON:
+            return new ParquetValueReaders.ByteArrayReader(desc);
+          default:
+            throw new UnsupportedOperationException(
+                "Unsupported logical type: " + primitive.getOriginalType());
+        }
+      }
+
+      switch (primitive.getPrimitiveTypeName()) {
+        case FIXED_LEN_BYTE_ARRAY:
+        case BINARY:
+          return new ParquetValueReaders.ByteArrayReader(desc);
+        case INT32:
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.LONG) {
+            return new ParquetValueReaders.IntAsLongReader(desc);
+          } else {
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          }
+        case FLOAT:
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.DOUBLE) {
+            return new ParquetValueReaders.FloatAsDoubleReader(desc);
+          } else {
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          }
+        case BOOLEAN:
+        case INT64:
+        case DOUBLE:
+          return new ParquetValueReaders.UnboxedReader<>(desc);
+        default:
+          throw new UnsupportedOperationException("Unsupported type: " + 
primitive);
+      }
+    }
+  }
+
+  private static class BinaryDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int scale;
+
+    BinaryDecimalReader(ColumnDescriptor desc, int scale) {
+      super(desc);
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      Binary binary = column.nextBinary();
+      BigDecimal bigDecimal = new BigDecimal(new 
BigInteger(binary.getBytes()), scale);
+      return DecimalData.fromBigDecimal(bigDecimal, bigDecimal.precision(), 
scale);
+    }
+  }
+
+  private static class IntegerDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int precision;
+    private final int scale;
+
+    IntegerDecimalReader(ColumnDescriptor desc, int precision, int scale) {
+      super(desc);
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      return DecimalData.fromUnscaledLong(column.nextInteger(), precision, 
scale);
+    }
+  }
+
+  private static class LongDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int precision;
+    private final int scale;
+
+    LongDecimalReader(ColumnDescriptor desc, int precision, int scale) {
+      super(desc);
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      return DecimalData.fromUnscaledLong(column.nextLong(), precision, scale);
+    }
+  }
+
+  private static class TimestampTzReader extends 
ParquetValueReaders.UnboxedReader<TimestampData> {
+    TimestampTzReader(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    public TimestampData read(TimestampData ignored) {
+      long value = readLong();
+      return 
TimestampData.fromLocalDateTime(Instant.ofEpochSecond(Math.floorDiv(value, 
1000_000),
+          Math.floorMod(value, 1000_000) * 1000)
+          .atOffset(ZoneOffset.UTC)
+          .toLocalDateTime());
+    }
+
+    @Override
+    public long readLong() {
+      return column.nextLong();
+    }
   }
 
-  public static ParquetValueReader<Row> buildReader(Schema expectedSchema, 
MessageType fileSchema) {
-    return INSTANCE.createReader(expectedSchema, fileSchema);
+  private static class TimestampReader extends 
ParquetValueReaders.UnboxedReader<TimestampData> {
+    TimestampReader(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    public TimestampData read(TimestampData ignored) {
+      long value = readLong();
+      return 
TimestampData.fromInstant(Instant.ofEpochSecond(Math.floorDiv(value, 1000_000),
+          Math.floorMod(value, 1000_000) * 1000));
+    }
+
+    @Override
+    public long readLong() {
+      return column.nextLong();
+    }
   }
 
-  @Override
-  protected ParquetValueReader<Row> createStructReader(List<Type> types,
-                                                       
List<ParquetValueReader<?>> fieldReaders,
-                                                       Types.StructType 
structType) {
-    return new RowReader(types, fieldReaders, structType);
+  private static class StringReader extends 
ParquetValueReaders.PrimitiveReader<StringData> {
+    StringReader(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    public StringData read(StringData ignored) {
+      Binary binary = column.nextBinary();
+      ByteBuffer buffer = binary.toByteBuffer();
+      if (buffer.hasArray()) {
+        return StringData.fromBytes(
+            buffer.array(), buffer.arrayOffset() + buffer.position(), 
buffer.remaining());
+      } else {
+        return StringData.fromBytes(binary.getBytes());
+      }
+    }
+  }
+
+  private static class LossyMicrosToMillisTimeReader extends 
ParquetValueReaders.PrimitiveReader<Integer> {
+    LossyMicrosToMillisTimeReader(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    public Integer read(Integer reuse) {
+      // Discard microseconds since Flink uses millisecond unit for TIME type.
+      return (int) Math.floorDiv(column.nextLong(), 1000);
+    }
+  }
+
+  private static class ArrayReader<E> extends 
ParquetValueReaders.RepeatedReader<ArrayData, ReusableArrayData, E> {
+    private int readPos = 0;
+    private int writePos = 0;
+
+    ArrayReader(int definitionLevel, int repetitionLevel, 
ParquetValueReader<E> reader) {
+      super(definitionLevel, repetitionLevel, reader);
+    }
+
+    @Override
+    protected ReusableArrayData newListData(ArrayData reuse) {
+      this.readPos = 0;
+      this.writePos = 0;
+
+      if (reuse instanceof ReusableArrayData) {
+        return (ReusableArrayData) reuse;
+      } else {
+        return new ReusableArrayData();
+      }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected E getElement(ReusableArrayData list) {
+      E value = null;
+      if (readPos < list.capacity()) {
+        value = (E) list.values[readPos];
+      }
+
+      readPos += 1;
+
+      return value;
+    }
+
+    @Override
+    protected void addElement(ReusableArrayData reused, E element) {
+      if (writePos >= reused.capacity()) {
+        reused.grow();
+      }
+
+      reused.values[writePos] = element;
+
+      writePos += 1;
+    }
+
+    @Override
+    protected ArrayData buildList(ReusableArrayData list) {
+      list.setNumElements(writePos);
+      return list;
+    }
   }
 
-  private static class RowReader extends ParquetValueReaders.StructReader<Row, 
Row> {
-    private final Types.StructType structType;
+  private static class MapReader<K, V> extends
+      ParquetValueReaders.RepeatedKeyValueReader<MapData, ReusableMapData, K, 
V> {
+    private int readPos = 0;
+    private int writePos = 0;
+
+    private final ParquetValueReaders.ReusableEntry<K, V> entry = new 
ParquetValueReaders.ReusableEntry<>();
+    private final ParquetValueReaders.ReusableEntry<K, V> nullEntry = new 
ParquetValueReaders.ReusableEntry<>();
+
+    MapReader(int definitionLevel, int repetitionLevel,
+              ParquetValueReader<K> keyReader, ParquetValueReader<V> 
valueReader) {
+      super(definitionLevel, repetitionLevel, keyReader, valueReader);
+    }
+
+    @Override
+    protected ReusableMapData newMapData(MapData reuse) {
+      this.readPos = 0;
+      this.writePos = 0;
+
+      if (reuse instanceof ReusableMapData) {
+        return (ReusableMapData) reuse;
+      } else {
+        return new ReusableMapData();
+      }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected Map.Entry<K, V> getPair(ReusableMapData map) {
+      Map.Entry<K, V> kv = nullEntry;
+      if (readPos < map.capacity()) {
+        entry.set((K) map.keys.values[readPos], (V) 
map.values.values[readPos]);
+        kv = entry;
+      }
+
+      readPos += 1;

Review comment:
       Same question .

##########
File path: flink/src/test/java/org/apache/iceberg/flink/data/TestHelpers.java
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+
+public class TestHelpers {
+  private TestHelpers() {}
+
+  private static final OffsetDateTime EPOCH = 
Instant.ofEpochMilli(0L).atOffset(ZoneOffset.UTC);
+  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+  public static void assertRowData(Type type, Record expectedRecord, RowData 
actualRowData) {
+    if (expectedRecord == null && actualRowData == null) {
+      return;
+    }
+
+    Assert.assertTrue("expected Record and actual RowData should be both null 
or not null",
+        expectedRecord != null && actualRowData != null);
+
+    List<Type> types = new ArrayList<>();

Review comment:
       nit:  Lists.newArrayList.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to