rdblue commented on a change in pull request #1266:
URL: https://github.com/apache/iceberg/pull/1266#discussion_r469438023



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -19,64 +19,678 @@
 
 package org.apache.iceberg.flink.data;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
 import java.util.List;
-import org.apache.flink.types.Row;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.parquet.BaseParquetReaders;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
-public class FlinkParquetReaders extends BaseParquetReaders<Row> {
+class FlinkParquetReaders {
+  private FlinkParquetReaders() {
+  }
 
-  private static final FlinkParquetReaders INSTANCE = new 
FlinkParquetReaders();
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema, 
MessageType fileSchema) {
+    return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
+  }
 
-  private FlinkParquetReaders() {
+  @SuppressWarnings("unchecked")
+  public static ParquetValueReader<RowData> buildReader(Schema expectedSchema,
+                                                        MessageType fileSchema,
+                                                        Map<Integer, ?> 
idToConstant) {
+    return (ParquetValueReader<RowData>) 
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+        new ReadBuilder(fileSchema, idToConstant)
+    );
+  }
+
+  private static class ReadBuilder extends 
TypeWithSchemaVisitor<ParquetValueReader<?>> {
+    private final MessageType type;
+    private final Map<Integer, ?> idToConstant;
+
+    ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
+      this.type = type;
+      this.idToConstant = idToConstant;
+    }
+
+    @Override
+    public ParquetValueReader<RowData> message(Types.StructType expected, 
MessageType message,
+                                               List<ParquetValueReader<?>> 
fieldReaders) {
+      return struct(expected, message.asGroupType(), fieldReaders);
+    }
+
+    @Override
+    public ParquetValueReader<RowData> struct(Types.StructType expected, 
GroupType struct,
+                                              List<ParquetValueReader<?>> 
fieldReaders) {
+      // match the expected struct's order
+      Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
+      Map<Integer, Type> typesById = Maps.newHashMap();
+      List<Type> fields = struct.getFields();
+      for (int i = 0; i < fields.size(); i += 1) {
+        Type fieldType = fields.get(i);
+        int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
+        if (fieldType.getId() != null) {
+          int id = fieldType.getId().intValue();
+          readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
+          typesById.put(id, fieldType);
+        }
+      }
+
+      List<Types.NestedField> expectedFields = expected != null ?
+          expected.fields() : ImmutableList.of();
+      List<ParquetValueReader<?>> reorderedFields = 
Lists.newArrayListWithExpectedSize(
+          expectedFields.size());
+      List<Type> types = 
Lists.newArrayListWithExpectedSize(expectedFields.size());
+      for (Types.NestedField field : expectedFields) {
+        int id = field.fieldId();
+        if (idToConstant.containsKey(id)) {
+          // containsKey is used because the constant may be null
+          
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
+          types.add(null);
+        } else {
+          ParquetValueReader<?> reader = readersById.get(id);
+          if (reader != null) {
+            reorderedFields.add(reader);
+            types.add(typesById.get(id));
+          } else {
+            reorderedFields.add(ParquetValueReaders.nulls());
+            types.add(null);
+          }
+        }
+      }
+
+      return new RowDataReader(types, reorderedFields);
+    }
+
+    @Override
+    public ParquetValueReader<?> list(Types.ListType expectedList, GroupType 
array,
+                                      ParquetValueReader<?> elementReader) {
+      GroupType repeated = array.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type elementType = repeated.getType(0);
+      int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 
1;
+
+      return new ArrayReader<>(repeatedD, repeatedR, 
ParquetValueReaders.option(elementType, elementD, elementReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> map(Types.MapType expectedMap, GroupType map,
+                                     ParquetValueReader<?> keyReader,
+                                     ParquetValueReader<?> valueReader) {
+      GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
+      String[] repeatedPath = currentPath();
+
+      int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
+      int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
+
+      Type keyType = repeatedKeyValue.getType(0);
+      int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
+      Type valueType = repeatedKeyValue.getType(1);
+      int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
+
+      return new MapReader<>(repeatedD, repeatedR,
+          ParquetValueReaders.option(keyType, keyD, keyReader),
+          ParquetValueReaders.option(valueType, valueD, valueReader));
+    }
+
+    @Override
+    public ParquetValueReader<?> 
primitive(org.apache.iceberg.types.Type.PrimitiveType expected,
+                                           PrimitiveType primitive) {
+      ColumnDescriptor desc = type.getColumnDescription(currentPath());
+
+      if (primitive.getOriginalType() != null) {
+        switch (primitive.getOriginalType()) {
+          case ENUM:
+          case JSON:
+          case UTF8:
+            return new StringReader(desc);
+          case INT_8:
+          case INT_16:
+          case INT_32:
+            if (expected != null && expected.typeId() == 
Types.LongType.get().typeId()) {
+              return new ParquetValueReaders.IntAsLongReader(desc);
+            } else {
+              return new ParquetValueReaders.UnboxedReader<>(desc);
+            }
+          case TIME_MICROS:
+            return new LossyMicrosToMillisTimeReader(desc);
+          case DATE:
+          case INT_64:
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          case TIMESTAMP_MICROS:
+            return new TimestampMicrosReader(desc);
+          case DECIMAL:
+            DecimalLogicalTypeAnnotation decimal = 
(DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation();
+            switch (primitive.getPrimitiveTypeName()) {
+              case BINARY:
+              case FIXED_LEN_BYTE_ARRAY:
+                return new BinaryDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
+              case INT64:
+                return new LongDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
+              case INT32:
+                return new IntegerDecimalReader(desc, decimal.getPrecision(), 
decimal.getScale());
+              default:
+                throw new UnsupportedOperationException(
+                    "Unsupported base type for decimal: " + 
primitive.getPrimitiveTypeName());
+            }
+          case BSON:
+            return new ParquetValueReaders.ByteArrayReader(desc);
+          default:
+            throw new UnsupportedOperationException(
+                "Unsupported logical type: " + primitive.getOriginalType());
+        }
+      }
+
+      switch (primitive.getPrimitiveTypeName()) {
+        case FIXED_LEN_BYTE_ARRAY:
+        case BINARY:
+          return new ParquetValueReaders.ByteArrayReader(desc);
+        case INT32:
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.LONG) {
+            return new ParquetValueReaders.IntAsLongReader(desc);
+          } else {
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          }
+        case FLOAT:
+          if (expected != null && expected.typeId() == 
org.apache.iceberg.types.Type.TypeID.DOUBLE) {
+            return new ParquetValueReaders.FloatAsDoubleReader(desc);
+          } else {
+            return new ParquetValueReaders.UnboxedReader<>(desc);
+          }
+        case BOOLEAN:
+        case INT64:
+        case DOUBLE:
+          return new ParquetValueReaders.UnboxedReader<>(desc);
+        default:
+          throw new UnsupportedOperationException("Unsupported type: " + 
primitive);
+      }
+    }
+  }
+
+  private static class BinaryDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int precision;
+    private final int scale;
+
+    BinaryDecimalReader(ColumnDescriptor desc, int precision, int scale) {
+      super(desc);
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      Binary binary = column.nextBinary();
+      BigDecimal bigDecimal = new BigDecimal(new 
BigInteger(binary.getBytes()), scale);
+      // TODO: need a unit test to write-read-validate decimal via 
FlinkParquetWrite/Reader
+      return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
+    }
+  }
+
+  private static class IntegerDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int precision;
+    private final int scale;
+
+    IntegerDecimalReader(ColumnDescriptor desc, int precision, int scale) {
+      super(desc);
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      return DecimalData.fromUnscaledLong(column.nextInteger(), precision, 
scale);
+    }
   }
 
-  public static ParquetValueReader<Row> buildReader(Schema expectedSchema, 
MessageType fileSchema) {
-    return INSTANCE.createReader(expectedSchema, fileSchema);
+  private static class LongDecimalReader extends 
ParquetValueReaders.PrimitiveReader<DecimalData> {
+    private final int precision;
+    private final int scale;
+
+    LongDecimalReader(ColumnDescriptor desc, int precision, int scale) {
+      super(desc);
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    @Override
+    public DecimalData read(DecimalData ignored) {
+      return DecimalData.fromUnscaledLong(column.nextLong(), precision, scale);
+    }
   }
 
-  @Override
-  protected ParquetValueReader<Row> createStructReader(List<Type> types,
-                                                       
List<ParquetValueReader<?>> fieldReaders,
-                                                       Types.StructType 
structType) {
-    return new RowReader(types, fieldReaders, structType);
+  private static class TimestampMicrosReader extends 
ParquetValueReaders.UnboxedReader<TimestampData> {
+    TimestampMicrosReader(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    public TimestampData read(TimestampData ignored) {
+      long micros = readLong();
+      return TimestampData.fromEpochMillis(Math.floorDiv(micros, 1_000),
+          (int) Math.floorMod(micros, 1_000) * 1_000);
+    }
+
+    @Override
+    public long readLong() {
+      return column.nextLong();
+    }
+  }
+
+  private static class StringReader extends 
ParquetValueReaders.PrimitiveReader<StringData> {
+    StringReader(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    public StringData read(StringData ignored) {
+      Binary binary = column.nextBinary();
+      ByteBuffer buffer = binary.toByteBuffer();
+      if (buffer.hasArray()) {
+        return StringData.fromBytes(
+            buffer.array(), buffer.arrayOffset() + buffer.position(), 
buffer.remaining());
+      } else {
+        return StringData.fromBytes(binary.getBytes());
+      }
+    }
+  }
+
+  private static class LossyMicrosToMillisTimeReader extends 
ParquetValueReaders.PrimitiveReader<Integer> {
+    LossyMicrosToMillisTimeReader(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    public Integer read(Integer reuse) {
+      // Discard microseconds since Flink uses millisecond unit for TIME type.
+      return (int) Math.floorDiv(column.nextLong(), 1000);
+    }
+  }
+
+  private static class ArrayReader<E> extends 
ParquetValueReaders.RepeatedReader<ArrayData, ReusableArrayData, E> {
+    private int readPos = 0;
+    private int writePos = 0;
+
+    ArrayReader(int definitionLevel, int repetitionLevel, 
ParquetValueReader<E> reader) {
+      super(definitionLevel, repetitionLevel, reader);
+    }
+
+    @Override
+    protected ReusableArrayData newListData(ArrayData reuse) {
+      this.readPos = 0;
+      this.writePos = 0;
+
+      if (reuse instanceof ReusableArrayData) {
+        return (ReusableArrayData) reuse;
+      } else {
+        return new ReusableArrayData();
+      }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected E getElement(ReusableArrayData list) {
+      E value = null;
+      if (readPos < list.capacity()) {
+        value = (E) list.values[readPos];
+      }
+
+      readPos += 1;
+
+      return value;
+    }
+
+    @Override
+    protected void addElement(ReusableArrayData reused, E element) {
+      if (writePos >= reused.capacity()) {
+        reused.grow();
+      }
+
+      reused.values[writePos] = element;
+
+      writePos += 1;
+    }
+
+    @Override
+    protected ArrayData buildList(ReusableArrayData list) {
+      list.setNumElements(writePos);
+      return list;
+    }
+  }
+
+  private static class MapReader<K, V> extends
+      ParquetValueReaders.RepeatedKeyValueReader<MapData, ReusableMapData, K, 
V> {
+    private int readPos = 0;
+    private int writePos = 0;
+
+    private final ParquetValueReaders.ReusableEntry<K, V> entry = new 
ParquetValueReaders.ReusableEntry<>();
+    private final ParquetValueReaders.ReusableEntry<K, V> nullEntry = new 
ParquetValueReaders.ReusableEntry<>();
+
+    MapReader(int definitionLevel, int repetitionLevel,
+              ParquetValueReader<K> keyReader, ParquetValueReader<V> 
valueReader) {
+      super(definitionLevel, repetitionLevel, keyReader, valueReader);
+    }
+
+    @Override
+    protected ReusableMapData newMapData(MapData reuse) {
+      this.readPos = 0;
+      this.writePos = 0;
+
+      if (reuse instanceof ReusableMapData) {
+        return (ReusableMapData) reuse;
+      } else {
+        return new ReusableMapData();
+      }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected Map.Entry<K, V> getPair(ReusableMapData map) {
+      Map.Entry<K, V> kv = nullEntry;
+      if (readPos < map.capacity()) {
+        entry.set((K) map.keys.values[readPos], (V) 
map.values.values[readPos]);
+        kv = entry;
+      }
+
+      readPos += 1;
+
+      return kv;
+    }
+
+    @Override
+    protected void addPair(ReusableMapData map, K key, V value) {
+      if (writePos >= map.capacity()) {
+        map.grow();
+      }
+
+      map.keys.values[writePos] = key;
+      map.values.values[writePos] = value;
+
+      writePos += 1;
+    }
+
+    @Override
+    protected MapData buildMap(ReusableMapData map) {
+      map.setNumElements(writePos);
+      return map;
+    }
   }
 
-  private static class RowReader extends ParquetValueReaders.StructReader<Row, 
Row> {
-    private final Types.StructType structType;
+  private static class RowDataReader extends 
ParquetValueReaders.StructReader<RowData, GenericRowData> {
+    private final int numFields;
 
-    RowReader(List<Type> types, List<ParquetValueReader<?>> readers, 
Types.StructType struct) {
+    RowDataReader(List<Type> types, List<ParquetValueReader<?>> readers) {
       super(types, readers);
-      this.structType = struct;
+      this.numFields = readers.size();
     }
 
     @Override
-    protected Row newStructData(Row reuse) {
-      if (reuse != null) {
-        return reuse;
+    protected GenericRowData newStructData(RowData reuse) {
+      if (reuse instanceof GenericRowData) {
+        return (GenericRowData) reuse;
       } else {
-        return new Row(structType.fields().size());
+        return new GenericRowData(numFields);
       }
     }
 
     @Override
-    protected Object getField(Row row, int pos) {
-      return row.getField(pos);
+    protected Object getField(GenericRowData intermediate, int pos) {
+      return intermediate.getField(pos);
+    }
+
+    @Override
+    protected RowData buildStruct(GenericRowData struct) {
+      return struct;
+    }
+
+    @Override
+    protected void set(GenericRowData row, int pos, Object value) {
+      row.setField(pos, value);
+    }
+
+    @Override
+    protected void setNull(GenericRowData row, int pos) {
+      row.setField(pos, null);
+    }
+
+    @Override
+    protected void setBoolean(GenericRowData row, int pos, boolean value) {
+      row.setField(pos, value);
+    }
+
+    @Override
+    protected void setInteger(GenericRowData row, int pos, int value) {
+      row.setField(pos, value);
+    }
+
+    @Override
+    protected void setLong(GenericRowData row, int pos, long value) {
+      row.setField(pos, value);
     }
 
     @Override
-    protected Row buildStruct(Row row) {
-      return row;
+    protected void setFloat(GenericRowData row, int pos, float value) {
+      row.setField(pos, value);
     }
 
     @Override
-    protected void set(Row row, int pos, Object value) {
+    protected void setDouble(GenericRowData row, int pos, double value) {
       row.setField(pos, value);
     }
   }
+
+  private static class ReusableMapData implements MapData {

Review comment:
       I think we can use the same ReusableMapData class for both Spark and 
Flink, since they are so similar. Some of the data access methods in the 
concrete classes would need to be implemented in a concrete class the 
engine-specific interface, but the main parts could be shared. And I think that 
would mean we could share significant parts of the reader classes as well.
   
   I don't think we need to do this refactor now, but we should try to 
deduplicate this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to