This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 602c2b2dbe Flink 1.20: Update Flink to use planned Avro reads (#11386)
602c2b2dbe is described below
commit 602c2b2dbecb81d7d84940f988579add7ffd1030
Author: JB Onofré <[email protected]>
AuthorDate: Tue Oct 29 17:54:24 2024 +0100
Flink 1.20: Update Flink to use planned Avro reads (#11386)
---
.../apache/iceberg/flink/data/FlinkAvroReader.java | 12 ++++
...AvroReader.java => FlinkPlannedAvroReader.java} | 83 ++++++++++++++--------
.../iceberg/flink/data/FlinkValueReaders.java | 32 +++++++++
.../flink/source/RowDataFileScanTaskReader.java | 4 +-
.../flink/data/TestFlinkAvroReaderWriter.java | 2 +-
.../iceberg/flink/data/TestRowProjection.java | 2 +-
6 files changed, 101 insertions(+), 34 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
index 8640495973..baae91dd18 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
@@ -37,16 +37,28 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+/**
+ * @deprecated will be removed in 1.8.0; use FlinkPlannedAvroReader instead.
+ */
+@Deprecated
public class FlinkAvroReader implements DatumReader<RowData>,
SupportsRowPosition {
private final Schema readSchema;
private final ValueReader<RowData> reader;
private Schema fileSchema = null;
+ /**
+ * @deprecated will be removed in 1.8.0; use FlinkPlannedAvroReader instead.
+ */
+ @Deprecated
public FlinkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema
readSchema) {
this(expectedSchema, readSchema, ImmutableMap.of());
}
+ /**
+ * @deprecated will be removed in 1.8.0; use FlinkPlannedAvroReader instead.
+ */
+ @Deprecated
@SuppressWarnings("unchecked")
public FlinkAvroReader(
org.apache.iceberg.Schema expectedSchema, Schema readSchema,
Map<Integer, ?> constants) {
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java
similarity index 61%
copy from
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
copy to
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java
index 8640495973..b7a81752d4 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java
@@ -28,42 +28,51 @@ import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.flink.table.data.RowData;
-import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
+import org.apache.iceberg.avro.AvroWithPartnerVisitor;
import org.apache.iceberg.avro.SupportsRowPosition;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
-import org.apache.iceberg.data.avro.DecoderResolver;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
-public class FlinkAvroReader implements DatumReader<RowData>,
SupportsRowPosition {
+public class FlinkPlannedAvroReader implements DatumReader<RowData>,
SupportsRowPosition {
- private final Schema readSchema;
- private final ValueReader<RowData> reader;
- private Schema fileSchema = null;
+ private final Types.StructType expectedType;
+ private final Map<Integer, ?> idToConstant;
+ private ValueReader<RowData> reader;
- public FlinkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema
readSchema) {
- this(expectedSchema, readSchema, ImmutableMap.of());
+ public static FlinkPlannedAvroReader create(org.apache.iceberg.Schema
schema) {
+ return create(schema, ImmutableMap.of());
}
- @SuppressWarnings("unchecked")
- public FlinkAvroReader(
- org.apache.iceberg.Schema expectedSchema, Schema readSchema,
Map<Integer, ?> constants) {
- this.readSchema = readSchema;
- this.reader =
- (ValueReader<RowData>)
- AvroSchemaWithTypeVisitor.visit(expectedSchema, readSchema, new
ReadBuilder(constants));
+ public static FlinkPlannedAvroReader create(
+ org.apache.iceberg.Schema schema, Map<Integer, ?> constants) {
+ return new FlinkPlannedAvroReader(schema, constants);
+ }
+
+ private FlinkPlannedAvroReader(
+ org.apache.iceberg.Schema expectedSchema, Map<Integer, ?> constants) {
+ this.expectedType = expectedSchema.asStruct();
+ this.idToConstant = constants;
}
@Override
- public void setSchema(Schema newFileSchema) {
- this.fileSchema = Schema.applyAliases(newFileSchema, readSchema);
+ @SuppressWarnings("unchecked")
+ public void setSchema(Schema fileSchema) {
+ this.reader =
+ (ValueReader<RowData>)
+ AvroWithPartnerVisitor.visit(
+ expectedType,
+ fileSchema,
+ new ReadBuilder(idToConstant),
+ AvroWithPartnerVisitor.FieldIDAccessors.get());
}
@Override
public RowData read(RowData reuse, Decoder decoder) throws IOException {
- return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema,
reader, reuse);
+ return reader.read(decoder, reuse);
}
@Override
@@ -73,7 +82,7 @@ public class FlinkAvroReader implements DatumReader<RowData>,
SupportsRowPositio
}
}
- private static class ReadBuilder extends
AvroSchemaWithTypeVisitor<ValueReader<?>> {
+ private static class ReadBuilder extends AvroWithPartnerVisitor<Type,
ValueReader<?>> {
private final Map<Integer, ?> idToConstant;
private ReadBuilder(Map<Integer, ?> idToConstant) {
@@ -81,39 +90,47 @@ public class FlinkAvroReader implements
DatumReader<RowData>, SupportsRowPositio
}
@Override
- public ValueReader<?> record(
- Types.StructType expected, Schema record, List<String> names,
List<ValueReader<?>> fields) {
- return FlinkValueReaders.struct(fields, expected.asStructType(),
idToConstant);
+ public ValueReader<?> record(Type partner, Schema record,
List<ValueReader<?>> fieldReaders) {
+ if (partner == null) {
+ return ValueReaders.skipStruct(fieldReaders);
+ }
+
+ Types.StructType expected = partner.asStructType();
+ List<Pair<Integer, ValueReader<?>>> readPlan =
+ ValueReaders.buildReadPlan(expected, record, fieldReaders,
idToConstant);
+
+ // TODO: should this pass expected so that struct.get can reuse
containers?
+ return FlinkValueReaders.struct(readPlan, expected.fields().size());
}
@Override
- public ValueReader<?> union(Type expected, Schema union,
List<ValueReader<?>> options) {
+ public ValueReader<?> union(Type partner, Schema union,
List<ValueReader<?>> options) {
return ValueReaders.union(options);
}
@Override
- public ValueReader<?> array(
- Types.ListType expected, Schema array, ValueReader<?> elementReader) {
+ public ValueReader<?> array(Type partner, Schema array, ValueReader<?>
elementReader) {
return FlinkValueReaders.array(elementReader);
}
@Override
- public ValueReader<?> map(
- Types.MapType expected, Schema map, ValueReader<?> keyReader,
ValueReader<?> valueReader) {
+ public ValueReader<?> arrayMap(
+ Type partner, Schema map, ValueReader<?> keyReader, ValueReader<?>
valueReader) {
return FlinkValueReaders.arrayMap(keyReader, valueReader);
}
@Override
- public ValueReader<?> map(Types.MapType expected, Schema map,
ValueReader<?> valueReader) {
+ public ValueReader<?> map(Type partner, Schema map, ValueReader<?>
valueReader) {
return FlinkValueReaders.map(FlinkValueReaders.strings(), valueReader);
}
@Override
- public ValueReader<?> primitive(Type.PrimitiveType expected, Schema
primitive) {
+ public ValueReader<?> primitive(Type partner, Schema primitive) {
LogicalType logicalType = primitive.getLogicalType();
if (logicalType != null) {
switch (logicalType.getName()) {
case "date":
+ // Flink uses the same representation
return ValueReaders.ints();
case "time-micros":
@@ -136,7 +153,7 @@ public class FlinkAvroReader implements
DatumReader<RowData>, SupportsRowPositio
return FlinkValueReaders.uuids();
default:
- throw new IllegalArgumentException("Unknown logical type: " +
logicalType);
+ throw new IllegalArgumentException("Unknown logical type: " +
logicalType.getName());
}
}
@@ -146,10 +163,16 @@ public class FlinkAvroReader implements
DatumReader<RowData>, SupportsRowPositio
case BOOLEAN:
return ValueReaders.booleans();
case INT:
+ if (partner != null && partner.typeId() == Type.TypeID.LONG) {
+ return ValueReaders.intsAsLongs();
+ }
return ValueReaders.ints();
case LONG:
return ValueReaders.longs();
case FLOAT:
+ if (partner != null && partner.typeId() == Type.TypeID.DOUBLE) {
+ return ValueReaders.floatsAsDoubles();
+ }
return ValueReaders.floats();
case DOUBLE:
return ValueReaders.doubles();
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java
index 32f6c3a2cc..0c6ff24111 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
public class FlinkValueReaders {
@@ -86,6 +87,10 @@ public class FlinkValueReaders {
return new MapReader(keyReader, valueReader);
}
+ static ValueReader<RowData> struct(List<Pair<Integer, ValueReader<?>>>
readPlan, int numFields) {
+ return new PlannedStructReader(readPlan, numFields);
+ }
+
static ValueReader<RowData> struct(
List<ValueReader<?>> readers, Types.StructType struct, Map<Integer, ?>
idToConstant) {
return new StructReader(readers, struct, idToConstant);
@@ -282,6 +287,33 @@ public class FlinkValueReaders {
}
}
+ private static class PlannedStructReader extends
ValueReaders.PlannedStructReader<RowData> {
+ private final int numFields;
+
+ private PlannedStructReader(List<Pair<Integer, ValueReader<?>>> readPlan,
int numFields) {
+ super(readPlan);
+ this.numFields = numFields;
+ }
+
+ @Override
+ protected RowData reuseOrCreate(Object reuse) {
+ if (reuse instanceof GenericRowData && ((GenericRowData)
reuse).getArity() == numFields) {
+ return (RowData) reuse;
+ }
+ return new GenericRowData(numFields);
+ }
+
+ @Override
+ protected Object get(RowData struct, int pos) {
+ return null;
+ }
+
+ @Override
+ protected void set(RowData struct, int pos, Object value) {
+ ((GenericRowData) struct).setField(pos, value);
+ }
+ }
+
private static class StructReader extends ValueReaders.StructReader<RowData>
{
private final int numFields;
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
index 88364f4e87..9c75a5e0f0 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
@@ -35,9 +35,9 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.FlinkSourceFilter;
import org.apache.iceberg.flink.RowDataWrapper;
-import org.apache.iceberg.flink.data.FlinkAvroReader;
import org.apache.iceberg.flink.data.FlinkOrcReader;
import org.apache.iceberg.flink.data.FlinkParquetReaders;
+import org.apache.iceberg.flink.data.FlinkPlannedAvroReader;
import org.apache.iceberg.flink.data.RowDataProjection;
import org.apache.iceberg.flink.data.RowDataUtil;
import org.apache.iceberg.io.CloseableIterable;
@@ -154,7 +154,7 @@ public class RowDataFileScanTaskReader implements
FileScanTaskReader<RowData> {
.reuseContainers()
.project(schema)
.split(task.start(), task.length())
- .createReaderFunc(readSchema -> new FlinkAvroReader(schema,
readSchema, idToConstant));
+ .createReaderFunc(readSchema ->
FlinkPlannedAvroReader.create(schema, idToConstant));
if (nameMapping != null) {
builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
index a1039d27d8..2b9e8694b6 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
@@ -91,7 +91,7 @@ public class TestFlinkAvroReaderWriter extends DataTest {
try (CloseableIterable<RowData> reader =
Avro.read(Files.localInput(recordsFile))
.project(schema)
- .createReaderFunc(FlinkAvroReader::new)
+ .createResolvingReader(FlinkPlannedAvroReader::create)
.build()) {
Iterator<Record> expected = expectedRecords.iterator();
Iterator<RowData> rows = reader.iterator();
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
index 7dd4e8759c..3b6cf0c58f 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
@@ -64,7 +64,7 @@ public class TestRowProjection {
Iterable<RowData> records =
Avro.read(Files.localInput(file))
.project(readSchema)
- .createReaderFunc(FlinkAvroReader::new)
+ .createResolvingReader(FlinkPlannedAvroReader::create)
.build();
return Iterables.getOnlyElement(records);