rdblue commented on a change in pull request #1572:
URL: https://github.com/apache/iceberg/pull/1572#discussion_r506674021



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
##########
@@ -480,178 +481,95 @@ protected ArrayData buildList(ReusableArrayData list) {
     }
 
     @Override
-    protected ReusableMapData newMapData(MapData reuse) {
+    protected FlinkReusableMapData newMapData(MapData reuse) {
       this.readPos = 0;
-      this.writePos = 0;
 
-      if (reuse instanceof ReusableMapData) {
-        return (ReusableMapData) reuse;
+      if (reuse instanceof FlinkReusableMapData) {
+        return (FlinkReusableMapData) reuse;
       } else {
-        return new ReusableMapData();
+        return new FlinkReusableMapData();
       }
     }
 
     @Override
     @SuppressWarnings("unchecked")
-    protected Map.Entry<K, V> getPair(ReusableMapData map) {
-      Map.Entry<K, V> kv = nullEntry;
-      if (readPos < map.capacity()) {
-        entry.set((K) map.keys.values[readPos], (V) 
map.values.values[readPos]);
-        kv = entry;
-      }
-
+    protected Map.Entry<K, V> getPair(FlinkReusableMapData map) {
+      Map.Entry<K, V> kv = map.getRaw(readPos, entry, nullEntry);
       readPos += 1;
 
       return kv;
     }
 
     @Override
-    protected void addPair(ReusableMapData map, K key, V value) {
-      if (writePos >= map.capacity()) {
-        map.grow();
-      }
-
-      map.keys.values[writePos] = key;
-      map.values.values[writePos] = value;
-
-      writePos += 1;
+    protected void addPair(FlinkReusableMapData map, K key, V value) {
+      map.addPair(key, value);
     }
 
     @Override
-    protected MapData buildMap(ReusableMapData map) {
-      map.setNumElements(writePos);
+    protected MapData buildMap(FlinkReusableMapData map) {
       return map;
     }
   }
 
-  private static class RowDataReader extends 
ParquetValueReaders.StructReader<RowData, GenericRowData> {
-    private final int numFields;
-
-    RowDataReader(List<Type> types, List<ParquetValueReader<?>> readers) {
-      super(types, readers);
-      this.numFields = readers.size();
-    }
-
-    @Override
-    protected GenericRowData newStructData(RowData reuse) {
-      if (reuse instanceof GenericRowData) {
-        return (GenericRowData) reuse;
-      } else {
-        return new GenericRowData(numFields);
-      }
-    }
-
-    @Override
-    protected Object getField(GenericRowData intermediate, int pos) {
-      return intermediate.getField(pos);
-    }
-
-    @Override
-    protected RowData buildStruct(GenericRowData struct) {
-      return struct;
-    }
+  private static class FlinkReusableMapData implements ReusableMapData, 
MapData {

Review comment:
       Was the order of classes changed? It looks like that may be why there 
are more changes in this diff.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to