This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 602c2b2dbe Flink 1.20: Update Flink to use planned Avro reads (#11386)
602c2b2dbe is described below

commit 602c2b2dbecb81d7d84940f988579add7ffd1030
Author: JB Onofré <[email protected]>
AuthorDate: Tue Oct 29 17:54:24 2024 +0100

    Flink 1.20: Update Flink to use planned Avro reads (#11386)
---
 .../apache/iceberg/flink/data/FlinkAvroReader.java | 12 ++++
 ...AvroReader.java => FlinkPlannedAvroReader.java} | 83 ++++++++++++++--------
 .../iceberg/flink/data/FlinkValueReaders.java      | 32 +++++++++
 .../flink/source/RowDataFileScanTaskReader.java    |  4 +-
 .../flink/data/TestFlinkAvroReaderWriter.java      |  2 +-
 .../iceberg/flink/data/TestRowProjection.java      |  2 +-
 6 files changed, 101 insertions(+), 34 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
index 8640495973..baae91dd18 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
@@ -37,16 +37,28 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 
+/**
+ * @deprecated will be removed in 1.8.0; use FlinkPlannedAvroReader instead.
+ */
+@Deprecated
 public class FlinkAvroReader implements DatumReader<RowData>, 
SupportsRowPosition {
 
   private final Schema readSchema;
   private final ValueReader<RowData> reader;
   private Schema fileSchema = null;
 
+  /**
+   * @deprecated will be removed in 1.8.0; use FlinkPlannedAvroReader instead.
+   */
+  @Deprecated
   public FlinkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema 
readSchema) {
     this(expectedSchema, readSchema, ImmutableMap.of());
   }
 
+  /**
+   * @deprecated will be removed in 1.8.0; use FlinkPlannedAvroReader instead.
+   */
+  @Deprecated
   @SuppressWarnings("unchecked")
   public FlinkAvroReader(
       org.apache.iceberg.Schema expectedSchema, Schema readSchema, 
Map<Integer, ?> constants) {
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java
similarity index 61%
copy from 
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
copy to 
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java
index 8640495973..b7a81752d4 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java
@@ -28,42 +28,51 @@ import org.apache.avro.Schema;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.flink.table.data.RowData;
-import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
+import org.apache.iceberg.avro.AvroWithPartnerVisitor;
 import org.apache.iceberg.avro.SupportsRowPosition;
 import org.apache.iceberg.avro.ValueReader;
 import org.apache.iceberg.avro.ValueReaders;
-import org.apache.iceberg.data.avro.DecoderResolver;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
 
-public class FlinkAvroReader implements DatumReader<RowData>, 
SupportsRowPosition {
+public class FlinkPlannedAvroReader implements DatumReader<RowData>, 
SupportsRowPosition {
 
-  private final Schema readSchema;
-  private final ValueReader<RowData> reader;
-  private Schema fileSchema = null;
+  private final Types.StructType expectedType;
+  private final Map<Integer, ?> idToConstant;
+  private ValueReader<RowData> reader;
 
-  public FlinkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema 
readSchema) {
-    this(expectedSchema, readSchema, ImmutableMap.of());
+  public static FlinkPlannedAvroReader create(org.apache.iceberg.Schema 
schema) {
+    return create(schema, ImmutableMap.of());
   }
 
-  @SuppressWarnings("unchecked")
-  public FlinkAvroReader(
-      org.apache.iceberg.Schema expectedSchema, Schema readSchema, 
Map<Integer, ?> constants) {
-    this.readSchema = readSchema;
-    this.reader =
-        (ValueReader<RowData>)
-            AvroSchemaWithTypeVisitor.visit(expectedSchema, readSchema, new 
ReadBuilder(constants));
+  public static FlinkPlannedAvroReader create(
+      org.apache.iceberg.Schema schema, Map<Integer, ?> constants) {
+    return new FlinkPlannedAvroReader(schema, constants);
+  }
+
+  private FlinkPlannedAvroReader(
+      org.apache.iceberg.Schema expectedSchema, Map<Integer, ?> constants) {
+    this.expectedType = expectedSchema.asStruct();
+    this.idToConstant = constants;
   }
 
   @Override
-  public void setSchema(Schema newFileSchema) {
-    this.fileSchema = Schema.applyAliases(newFileSchema, readSchema);
+  @SuppressWarnings("unchecked")
+  public void setSchema(Schema fileSchema) {
+    this.reader =
+        (ValueReader<RowData>)
+            AvroWithPartnerVisitor.visit(
+                expectedType,
+                fileSchema,
+                new ReadBuilder(idToConstant),
+                AvroWithPartnerVisitor.FieldIDAccessors.get());
   }
 
   @Override
   public RowData read(RowData reuse, Decoder decoder) throws IOException {
-    return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, 
reader, reuse);
+    return reader.read(decoder, reuse);
   }
 
   @Override
@@ -73,7 +82,7 @@ public class FlinkAvroReader implements DatumReader<RowData>, 
SupportsRowPositio
     }
   }
 
-  private static class ReadBuilder extends 
AvroSchemaWithTypeVisitor<ValueReader<?>> {
+  private static class ReadBuilder extends AvroWithPartnerVisitor<Type, 
ValueReader<?>> {
     private final Map<Integer, ?> idToConstant;
 
     private ReadBuilder(Map<Integer, ?> idToConstant) {
@@ -81,39 +90,47 @@ public class FlinkAvroReader implements 
DatumReader<RowData>, SupportsRowPositio
     }
 
     @Override
-    public ValueReader<?> record(
-        Types.StructType expected, Schema record, List<String> names, 
List<ValueReader<?>> fields) {
-      return FlinkValueReaders.struct(fields, expected.asStructType(), 
idToConstant);
+    public ValueReader<?> record(Type partner, Schema record, 
List<ValueReader<?>> fieldReaders) {
+      if (partner == null) {
+        return ValueReaders.skipStruct(fieldReaders);
+      }
+
+      Types.StructType expected = partner.asStructType();
+      List<Pair<Integer, ValueReader<?>>> readPlan =
+          ValueReaders.buildReadPlan(expected, record, fieldReaders, 
idToConstant);
+
+      // TODO: should this pass expected so that struct.get can reuse 
containers?
+      return FlinkValueReaders.struct(readPlan, expected.fields().size());
     }
 
     @Override
-    public ValueReader<?> union(Type expected, Schema union, 
List<ValueReader<?>> options) {
+    public ValueReader<?> union(Type partner, Schema union, 
List<ValueReader<?>> options) {
       return ValueReaders.union(options);
     }
 
     @Override
-    public ValueReader<?> array(
-        Types.ListType expected, Schema array, ValueReader<?> elementReader) {
+    public ValueReader<?> array(Type partner, Schema array, ValueReader<?> 
elementReader) {
       return FlinkValueReaders.array(elementReader);
     }
 
     @Override
-    public ValueReader<?> map(
-        Types.MapType expected, Schema map, ValueReader<?> keyReader, 
ValueReader<?> valueReader) {
+    public ValueReader<?> arrayMap(
+        Type partner, Schema map, ValueReader<?> keyReader, ValueReader<?> 
valueReader) {
       return FlinkValueReaders.arrayMap(keyReader, valueReader);
     }
 
     @Override
-    public ValueReader<?> map(Types.MapType expected, Schema map, 
ValueReader<?> valueReader) {
+    public ValueReader<?> map(Type partner, Schema map, ValueReader<?> 
valueReader) {
       return FlinkValueReaders.map(FlinkValueReaders.strings(), valueReader);
     }
 
     @Override
-    public ValueReader<?> primitive(Type.PrimitiveType expected, Schema 
primitive) {
+    public ValueReader<?> primitive(Type partner, Schema primitive) {
       LogicalType logicalType = primitive.getLogicalType();
       if (logicalType != null) {
         switch (logicalType.getName()) {
           case "date":
+            // Flink uses the same representation
             return ValueReaders.ints();
 
           case "time-micros":
@@ -136,7 +153,7 @@ public class FlinkAvroReader implements 
DatumReader<RowData>, SupportsRowPositio
             return FlinkValueReaders.uuids();
 
           default:
-            throw new IllegalArgumentException("Unknown logical type: " + 
logicalType);
+            throw new IllegalArgumentException("Unknown logical type: " + 
logicalType.getName());
         }
       }
 
@@ -146,10 +163,16 @@ public class FlinkAvroReader implements 
DatumReader<RowData>, SupportsRowPositio
         case BOOLEAN:
           return ValueReaders.booleans();
         case INT:
+          if (partner != null && partner.typeId() == Type.TypeID.LONG) {
+            return ValueReaders.intsAsLongs();
+          }
           return ValueReaders.ints();
         case LONG:
           return ValueReaders.longs();
         case FLOAT:
+          if (partner != null && partner.typeId() == Type.TypeID.DOUBLE) {
+            return ValueReaders.floatsAsDoubles();
+          }
           return ValueReaders.floats();
         case DOUBLE:
           return ValueReaders.doubles();
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java
index 32f6c3a2cc..0c6ff24111 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.avro.ValueReaders;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
 
 public class FlinkValueReaders {
 
@@ -86,6 +87,10 @@ public class FlinkValueReaders {
     return new MapReader(keyReader, valueReader);
   }
 
+  static ValueReader<RowData> struct(List<Pair<Integer, ValueReader<?>>> 
readPlan, int numFields) {
+    return new PlannedStructReader(readPlan, numFields);
+  }
+
   static ValueReader<RowData> struct(
       List<ValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> 
idToConstant) {
     return new StructReader(readers, struct, idToConstant);
@@ -282,6 +287,33 @@ public class FlinkValueReaders {
     }
   }
 
+  private static class PlannedStructReader extends 
ValueReaders.PlannedStructReader<RowData> {
+    private final int numFields;
+
+    private PlannedStructReader(List<Pair<Integer, ValueReader<?>>> readPlan, 
int numFields) {
+      super(readPlan);
+      this.numFields = numFields;
+    }
+
+    @Override
+    protected RowData reuseOrCreate(Object reuse) {
+      if (reuse instanceof GenericRowData && ((GenericRowData) 
reuse).getArity() == numFields) {
+        return (RowData) reuse;
+      }
+      return new GenericRowData(numFields);
+    }
+
+    @Override
+    protected Object get(RowData struct, int pos) {
+      return null;
+    }
+
+    @Override
+    protected void set(RowData struct, int pos, Object value) {
+      ((GenericRowData) struct).setField(pos, value);
+    }
+  }
+
   private static class StructReader extends ValueReaders.StructReader<RowData> 
{
     private final int numFields;
 
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
index 88364f4e87..9c75a5e0f0 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
@@ -35,9 +35,9 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.FlinkSourceFilter;
 import org.apache.iceberg.flink.RowDataWrapper;
-import org.apache.iceberg.flink.data.FlinkAvroReader;
 import org.apache.iceberg.flink.data.FlinkOrcReader;
 import org.apache.iceberg.flink.data.FlinkParquetReaders;
+import org.apache.iceberg.flink.data.FlinkPlannedAvroReader;
 import org.apache.iceberg.flink.data.RowDataProjection;
 import org.apache.iceberg.flink.data.RowDataUtil;
 import org.apache.iceberg.io.CloseableIterable;
@@ -154,7 +154,7 @@ public class RowDataFileScanTaskReader implements 
FileScanTaskReader<RowData> {
             .reuseContainers()
             .project(schema)
             .split(task.start(), task.length())
-            .createReaderFunc(readSchema -> new FlinkAvroReader(schema, 
readSchema, idToConstant));
+            .createReaderFunc(readSchema -> 
FlinkPlannedAvroReader.create(schema, idToConstant));
 
     if (nameMapping != null) {
       builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
index a1039d27d8..2b9e8694b6 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
@@ -91,7 +91,7 @@ public class TestFlinkAvroReaderWriter extends DataTest {
     try (CloseableIterable<RowData> reader =
         Avro.read(Files.localInput(recordsFile))
             .project(schema)
-            .createReaderFunc(FlinkAvroReader::new)
+            .createResolvingReader(FlinkPlannedAvroReader::create)
             .build()) {
       Iterator<Record> expected = expectedRecords.iterator();
       Iterator<RowData> rows = reader.iterator();
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
index 7dd4e8759c..3b6cf0c58f 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
@@ -64,7 +64,7 @@ public class TestRowProjection {
     Iterable<RowData> records =
         Avro.read(Files.localInput(file))
             .project(readSchema)
-            .createReaderFunc(FlinkAvroReader::new)
+            .createResolvingReader(FlinkPlannedAvroReader::create)
             .build();
 
     return Iterables.getOnlyElement(records);

Reply via email to