This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 2874fc46e8 Data, Spark, Flink: Add null engineSchema fallback for
format model writers (#15688)
2874fc46e8 is described below
commit 2874fc46e8b307b93e2ebb1a849fd7365fb3cb31
Author: Joy Haldar <[email protected]>
AuthorDate: Fri Mar 20 17:19:20 2026 +0530
Data, Spark, Flink: Add null engineSchema fallback for format model writers
(#15688)
---
.../apache/iceberg/data/BaseFormatModelTests.java | 94 +++++++++++++++++-----
.../apache/iceberg/flink/data/FlinkAvroWriter.java | 5 ++
.../iceberg/flink/data/FlinkFormatModels.java | 6 +-
.../apache/iceberg/flink/data/FlinkOrcWriter.java | 4 +-
.../iceberg/flink/data/FlinkParquetWriters.java | 8 ++
.../apache/iceberg/flink/data/FlinkAvroWriter.java | 5 ++
.../iceberg/flink/data/FlinkFormatModels.java | 6 +-
.../apache/iceberg/flink/data/FlinkOrcWriter.java | 4 +-
.../iceberg/flink/data/FlinkParquetWriters.java | 8 ++
.../apache/iceberg/flink/data/FlinkAvroWriter.java | 5 ++
.../iceberg/flink/data/FlinkFormatModels.java | 6 +-
.../apache/iceberg/flink/data/FlinkOrcWriter.java | 4 +-
.../iceberg/flink/data/FlinkParquetWriters.java | 8 ++
.../apache/iceberg/spark/data/SparkAvroWriter.java | 5 ++
.../iceberg/spark/source/SparkFormatModels.java | 3 +-
.../apache/iceberg/spark/data/SparkAvroWriter.java | 5 ++
.../iceberg/spark/source/SparkFormatModels.java | 3 +-
.../apache/iceberg/spark/data/SparkAvroWriter.java | 5 ++
.../iceberg/spark/source/SparkFormatModels.java | 3 +-
.../apache/iceberg/spark/data/SparkAvroWriter.java | 5 ++
.../iceberg/spark/source/SparkFormatModels.java | 3 +-
21 files changed, 157 insertions(+), 38 deletions(-)
diff --git
a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
index e9a7f5e0c3..dd563925a3 100644
--- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
+++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
@@ -127,17 +127,36 @@ public abstract class BaseFormatModelTests<T> {
assertThat(dataFile.recordCount()).isEqualTo(engineRecords.size());
assertThat(dataFile.format()).isEqualTo(fileFormat);
- // Read back and verify
- InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
- List<Record> readRecords;
- try (CloseableIterable<Record> reader =
- FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
- .project(schema)
- .build()) {
- readRecords = ImmutableList.copyOf(reader);
+ readAndAssertGenericRecords(fileFormat, schema, genericRecords);
+ }
+
+ /** Write with engine type T without explicit engineSchema, read with
Generic Record */
+ @ParameterizedTest
+ @FieldSource("FORMAT_AND_GENERATOR")
+ void testDataWriterEngineWriteWithoutEngineSchema(
+ FileFormat fileFormat, DataGenerator dataGenerator) throws IOException {
+ Schema schema = dataGenerator.schema();
+ FileWriterBuilder<DataWriter<T>, Object> writerBuilder =
+ FormatModelRegistry.dataWriteBuilder(fileFormat, engineType(),
encryptedFile);
+
+ DataWriter<T> writer =
writerBuilder.schema(schema).spec(PartitionSpec.unpartitioned()).build();
+
+ List<Record> genericRecords = dataGenerator.generateRecords();
+ List<T> engineRecords = convertToEngineRecords(genericRecords, schema);
+
+ try (writer) {
+ for (T record : engineRecords) {
+ writer.write(record);
+ }
}
- DataTestHelpers.assertEquals(schema.asStruct(), genericRecords,
readRecords);
+ DataFile dataFile = writer.toDataFile();
+
+ assertThat(dataFile).isNotNull();
+ assertThat(dataFile.recordCount()).isEqualTo(engineRecords.size());
+ assertThat(dataFile.format()).isEqualTo(fileFormat);
+
+ readAndAssertGenericRecords(fileFormat, schema, genericRecords);
}
@ParameterizedTest
@@ -212,17 +231,45 @@ public abstract class BaseFormatModelTests<T> {
assertThat(deleteFile.format()).isEqualTo(fileFormat);
assertThat(deleteFile.equalityFieldIds()).containsExactly(1);
- // Read back and verify
- InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
- List<Record> readRecords;
- try (CloseableIterable<Record> reader =
- FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
- .project(schema)
- .build()) {
- readRecords = ImmutableList.copyOf(reader);
+ readAndAssertGenericRecords(fileFormat, schema, genericRecords);
+ }
+
+ /**
+ * Write equality deletes with engine type T without explicit engineSchema,
read with Generic
+ * Record
+ */
+ @ParameterizedTest
+ @FieldSource("FORMAT_AND_GENERATOR")
+ void testEqualityDeleteWriterEngineWriteWithoutEngineSchema(
+ FileFormat fileFormat, DataGenerator dataGenerator) throws IOException {
+ Schema schema = dataGenerator.schema();
+ FileWriterBuilder<EqualityDeleteWriter<T>, Object> writerBuilder =
+ FormatModelRegistry.equalityDeleteWriteBuilder(fileFormat,
engineType(), encryptedFile);
+
+ EqualityDeleteWriter<T> writer =
+ writerBuilder
+ .schema(schema)
+ .spec(PartitionSpec.unpartitioned())
+ .equalityFieldIds(1)
+ .build();
+
+ List<Record> genericRecords = dataGenerator.generateRecords();
+ List<T> engineRecords = convertToEngineRecords(genericRecords, schema);
+
+ try (writer) {
+ for (T record : engineRecords) {
+ writer.write(record);
+ }
}
- DataTestHelpers.assertEquals(schema.asStruct(), genericRecords,
readRecords);
+ DeleteFile deleteFile = writer.toDeleteFile();
+
+ assertThat(deleteFile).isNotNull();
+ assertThat(deleteFile.recordCount()).isEqualTo(engineRecords.size());
+ assertThat(deleteFile.format()).isEqualTo(fileFormat);
+ assertThat(deleteFile.equalityFieldIds()).containsExactly(1);
+
+ readAndAssertGenericRecords(fileFormat, schema, genericRecords);
}
@ParameterizedTest
@@ -304,17 +351,20 @@ public abstract class BaseFormatModelTests<T> {
assertThat(deleteFile.recordCount()).isEqualTo(2);
assertThat(deleteFile.format()).isEqualTo(fileFormat);
- // Read back and verify
+ readAndAssertGenericRecords(fileFormat, positionDeleteSchema, records);
+ }
+
+ private void readAndAssertGenericRecords(
+ FileFormat fileFormat, Schema schema, List<Record> expected) throws
IOException {
InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
List<Record> readRecords;
try (CloseableIterable<Record> reader =
FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
- .project(positionDeleteSchema)
+ .project(schema)
.build()) {
readRecords = ImmutableList.copyOf(reader);
}
-
- DataTestHelpers.assertEquals(positionDeleteSchema.asStruct(), records,
readRecords);
+ DataTestHelpers.assertEquals(schema.asStruct(), expected, readRecords);
}
private List<T> convertToEngineRecords(List<Record> records, Schema schema) {
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
index 66ed95792e..8741958392 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.avro.MetricsAwareDatumWriter;
import org.apache.iceberg.avro.ValueWriter;
import org.apache.iceberg.avro.ValueWriters;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
public class FlinkAvroWriter implements MetricsAwareDatumWriter<RowData> {
@@ -43,6 +44,10 @@ public class FlinkAvroWriter implements
MetricsAwareDatumWriter<RowData> {
this.rowType = rowType;
}
+ public FlinkAvroWriter(org.apache.iceberg.Schema icebergSchema, RowType
engineSchema) {
+ this(engineSchema != null ? engineSchema :
FlinkSchemaUtil.convert(icebergSchema));
+ }
+
@Override
@SuppressWarnings("unchecked")
public void setSchema(Schema schema) {
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
index 0026c8a302..dd713b0dce 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
@@ -31,8 +31,7 @@ public class FlinkFormatModels {
ParquetFormatModel.create(
RowData.class,
RowType.class,
- (icebergSchema, fileSchema, engineSchema) ->
- FlinkParquetWriters.buildWriter(engineSchema, fileSchema),
+ FlinkParquetWriters::buildWriter,
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
FlinkParquetReaders.buildReader(icebergSchema, fileSchema,
idToConstant)));
@@ -40,7 +39,8 @@ public class FlinkFormatModels {
AvroFormatModel.create(
RowData.class,
RowType.class,
- (icebergSchema, fileSchema, engineSchema) -> new
FlinkAvroWriter(engineSchema),
+ (icebergSchema, fileSchema, engineSchema) ->
+ new FlinkAvroWriter(icebergSchema, engineSchema),
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
FlinkPlannedAvroReader.create(icebergSchema, idToConstant)));
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
index 6a31accffd..a467d84833 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.orc.GenericOrcWriters;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.orc.OrcRowWriter;
import org.apache.iceberg.orc.OrcValueWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -45,7 +46,8 @@ public class FlinkOrcWriter implements OrcRowWriter<RowData> {
}
public static OrcRowWriter<RowData> buildWriter(RowType rowType, Schema
iSchema) {
- return new FlinkOrcWriter(rowType, iSchema);
+ return new FlinkOrcWriter(
+ rowType != null ? rowType : FlinkSchemaUtil.convert(iSchema), iSchema);
}
@Override
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
index 5c90252723..8de42411cd 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
@@ -37,7 +37,9 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.RowType.RowField;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.flink.FlinkRowData;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
@@ -66,6 +68,12 @@ import org.apache.parquet.schema.Type;
public class FlinkParquetWriters {
private FlinkParquetWriters() {}
+ public static <T> ParquetValueWriter<T> buildWriter(
+ Schema icebergSchema, MessageType type, RowType engineSchema) {
+ return buildWriter(
+ engineSchema != null ? engineSchema :
FlinkSchemaUtil.convert(icebergSchema), type);
+ }
+
@SuppressWarnings("unchecked")
public static <T> ParquetValueWriter<T> buildWriter(LogicalType schema,
MessageType type) {
return (ParquetValueWriter<T>)
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
index 66ed95792e..8741958392 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.avro.MetricsAwareDatumWriter;
import org.apache.iceberg.avro.ValueWriter;
import org.apache.iceberg.avro.ValueWriters;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
public class FlinkAvroWriter implements MetricsAwareDatumWriter<RowData> {
@@ -43,6 +44,10 @@ public class FlinkAvroWriter implements
MetricsAwareDatumWriter<RowData> {
this.rowType = rowType;
}
+ public FlinkAvroWriter(org.apache.iceberg.Schema icebergSchema, RowType
engineSchema) {
+ this(engineSchema != null ? engineSchema :
FlinkSchemaUtil.convert(icebergSchema));
+ }
+
@Override
@SuppressWarnings("unchecked")
public void setSchema(Schema schema) {
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
index 0026c8a302..dd713b0dce 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
@@ -31,8 +31,7 @@ public class FlinkFormatModels {
ParquetFormatModel.create(
RowData.class,
RowType.class,
- (icebergSchema, fileSchema, engineSchema) ->
- FlinkParquetWriters.buildWriter(engineSchema, fileSchema),
+ FlinkParquetWriters::buildWriter,
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
FlinkParquetReaders.buildReader(icebergSchema, fileSchema,
idToConstant)));
@@ -40,7 +39,8 @@ public class FlinkFormatModels {
AvroFormatModel.create(
RowData.class,
RowType.class,
- (icebergSchema, fileSchema, engineSchema) -> new
FlinkAvroWriter(engineSchema),
+ (icebergSchema, fileSchema, engineSchema) ->
+ new FlinkAvroWriter(icebergSchema, engineSchema),
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
FlinkPlannedAvroReader.create(icebergSchema, idToConstant)));
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
index 6a31accffd..a467d84833 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.orc.GenericOrcWriters;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.orc.OrcRowWriter;
import org.apache.iceberg.orc.OrcValueWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -45,7 +46,8 @@ public class FlinkOrcWriter implements OrcRowWriter<RowData> {
}
public static OrcRowWriter<RowData> buildWriter(RowType rowType, Schema
iSchema) {
- return new FlinkOrcWriter(rowType, iSchema);
+ return new FlinkOrcWriter(
+ rowType != null ? rowType : FlinkSchemaUtil.convert(iSchema), iSchema);
}
@Override
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
index 5c90252723..8de42411cd 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
@@ -37,7 +37,9 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.RowType.RowField;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.flink.FlinkRowData;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
@@ -66,6 +68,12 @@ import org.apache.parquet.schema.Type;
public class FlinkParquetWriters {
private FlinkParquetWriters() {}
+ public static <T> ParquetValueWriter<T> buildWriter(
+ Schema icebergSchema, MessageType type, RowType engineSchema) {
+ return buildWriter(
+ engineSchema != null ? engineSchema :
FlinkSchemaUtil.convert(icebergSchema), type);
+ }
+
@SuppressWarnings("unchecked")
public static <T> ParquetValueWriter<T> buildWriter(LogicalType schema,
MessageType type) {
return (ParquetValueWriter<T>)
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
index 66ed95792e..8741958392 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.avro.MetricsAwareDatumWriter;
import org.apache.iceberg.avro.ValueWriter;
import org.apache.iceberg.avro.ValueWriters;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
public class FlinkAvroWriter implements MetricsAwareDatumWriter<RowData> {
@@ -43,6 +44,10 @@ public class FlinkAvroWriter implements
MetricsAwareDatumWriter<RowData> {
this.rowType = rowType;
}
+ public FlinkAvroWriter(org.apache.iceberg.Schema icebergSchema, RowType
engineSchema) {
+ this(engineSchema != null ? engineSchema :
FlinkSchemaUtil.convert(icebergSchema));
+ }
+
@Override
@SuppressWarnings("unchecked")
public void setSchema(Schema schema) {
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
index 0026c8a302..dd713b0dce 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
@@ -31,8 +31,7 @@ public class FlinkFormatModels {
ParquetFormatModel.create(
RowData.class,
RowType.class,
- (icebergSchema, fileSchema, engineSchema) ->
- FlinkParquetWriters.buildWriter(engineSchema, fileSchema),
+ FlinkParquetWriters::buildWriter,
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
FlinkParquetReaders.buildReader(icebergSchema, fileSchema,
idToConstant)));
@@ -40,7 +39,8 @@ public class FlinkFormatModels {
AvroFormatModel.create(
RowData.class,
RowType.class,
- (icebergSchema, fileSchema, engineSchema) -> new
FlinkAvroWriter(engineSchema),
+ (icebergSchema, fileSchema, engineSchema) ->
+ new FlinkAvroWriter(icebergSchema, engineSchema),
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
FlinkPlannedAvroReader.create(icebergSchema, idToConstant)));
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
index 6a31accffd..a467d84833 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.orc.GenericOrcWriters;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.orc.OrcRowWriter;
import org.apache.iceberg.orc.OrcValueWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -45,7 +46,8 @@ public class FlinkOrcWriter implements OrcRowWriter<RowData> {
}
public static OrcRowWriter<RowData> buildWriter(RowType rowType, Schema
iSchema) {
- return new FlinkOrcWriter(rowType, iSchema);
+ return new FlinkOrcWriter(
+ rowType != null ? rowType : FlinkSchemaUtil.convert(iSchema), iSchema);
}
@Override
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
index cc40013fa1..df51a25d21 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
@@ -45,7 +45,9 @@ import org.apache.flink.table.types.logical.VariantType;
import org.apache.flink.types.variant.BinaryVariant;
import org.apache.flink.types.variant.Variant;
import org.apache.iceberg.FieldMetrics;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.flink.FlinkRowData;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
@@ -80,6 +82,12 @@ import org.apache.parquet.schema.Type;
public class FlinkParquetWriters {
private FlinkParquetWriters() {}
+ public static <T> ParquetValueWriter<T> buildWriter(
+ Schema icebergSchema, MessageType type, RowType engineSchema) {
+ return buildWriter(
+ engineSchema != null ? engineSchema :
FlinkSchemaUtil.convert(icebergSchema), type);
+ }
+
@SuppressWarnings("unchecked")
public static <T> ParquetValueWriter<T> buildWriter(LogicalType schema,
MessageType type) {
return (ParquetValueWriter<T>)
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
index 04dfd46a18..186439c58c 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.avro.MetricsAwareDatumWriter;
import org.apache.iceberg.avro.ValueWriter;
import org.apache.iceberg.avro.ValueWriters;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
@@ -46,6 +47,10 @@ public class SparkAvroWriter implements
MetricsAwareDatumWriter<InternalRow> {
this.dsSchema = dsSchema;
}
+ public SparkAvroWriter(org.apache.iceberg.Schema icebergSchema, StructType
dsSchema) {
+ this(dsSchema != null ? dsSchema : SparkSchemaUtil.convert(icebergSchema));
+ }
+
@Override
@SuppressWarnings("unchecked")
public void setSchema(Schema schema) {
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
index a1e8a82b4d..23fbe54a4b 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
@@ -40,7 +40,8 @@ public class SparkFormatModels {
AvroFormatModel.create(
InternalRow.class,
StructType.class,
- (icebergSchema, fileSchema, engineSchema) -> new
SparkAvroWriter(engineSchema),
+ (icebergSchema, fileSchema, engineSchema) ->
+ new SparkAvroWriter(icebergSchema, engineSchema),
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
SparkPlannedAvroReader.create(icebergSchema, idToConstant)));
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
index 04dfd46a18..186439c58c 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.avro.MetricsAwareDatumWriter;
import org.apache.iceberg.avro.ValueWriter;
import org.apache.iceberg.avro.ValueWriters;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
@@ -46,6 +47,10 @@ public class SparkAvroWriter implements
MetricsAwareDatumWriter<InternalRow> {
this.dsSchema = dsSchema;
}
+ public SparkAvroWriter(org.apache.iceberg.Schema icebergSchema, StructType
dsSchema) {
+ this(dsSchema != null ? dsSchema : SparkSchemaUtil.convert(icebergSchema));
+ }
+
@Override
@SuppressWarnings("unchecked")
public void setSchema(Schema schema) {
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
index a1e8a82b4d..23fbe54a4b 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
@@ -40,7 +40,8 @@ public class SparkFormatModels {
AvroFormatModel.create(
InternalRow.class,
StructType.class,
- (icebergSchema, fileSchema, engineSchema) -> new
SparkAvroWriter(engineSchema),
+ (icebergSchema, fileSchema, engineSchema) ->
+ new SparkAvroWriter(icebergSchema, engineSchema),
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
SparkPlannedAvroReader.create(icebergSchema, idToConstant)));
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
index 4946ac1031..fc7d64743d 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.avro.MetricsAwareDatumWriter;
import org.apache.iceberg.avro.ValueWriter;
import org.apache.iceberg.avro.ValueWriters;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
@@ -46,6 +47,10 @@ public class SparkAvroWriter implements
MetricsAwareDatumWriter<InternalRow> {
this.dsSchema = dsSchema;
}
+ public SparkAvroWriter(org.apache.iceberg.Schema icebergSchema, StructType
dsSchema) {
+ this(dsSchema != null ? dsSchema : SparkSchemaUtil.convert(icebergSchema));
+ }
+
@Override
@SuppressWarnings("unchecked")
public void setSchema(Schema schema) {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
index a1e8a82b4d..23fbe54a4b 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
@@ -40,7 +40,8 @@ public class SparkFormatModels {
AvroFormatModel.create(
InternalRow.class,
StructType.class,
- (icebergSchema, fileSchema, engineSchema) -> new
SparkAvroWriter(engineSchema),
+ (icebergSchema, fileSchema, engineSchema) ->
+ new SparkAvroWriter(icebergSchema, engineSchema),
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
SparkPlannedAvroReader.create(icebergSchema, idToConstant)));
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
index 4946ac1031..fc7d64743d 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.avro.MetricsAwareDatumWriter;
import org.apache.iceberg.avro.ValueWriter;
import org.apache.iceberg.avro.ValueWriters;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
@@ -46,6 +47,10 @@ public class SparkAvroWriter implements
MetricsAwareDatumWriter<InternalRow> {
this.dsSchema = dsSchema;
}
+ public SparkAvroWriter(org.apache.iceberg.Schema icebergSchema, StructType
dsSchema) {
+ this(dsSchema != null ? dsSchema : SparkSchemaUtil.convert(icebergSchema));
+ }
+
@Override
@SuppressWarnings("unchecked")
public void setSchema(Schema schema) {
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
index a1e8a82b4d..23fbe54a4b 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
@@ -40,7 +40,8 @@ public class SparkFormatModels {
AvroFormatModel.create(
InternalRow.class,
StructType.class,
- (icebergSchema, fileSchema, engineSchema) -> new
SparkAvroWriter(engineSchema),
+ (icebergSchema, fileSchema, engineSchema) ->
+ new SparkAvroWriter(icebergSchema, engineSchema),
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
SparkPlannedAvroReader.create(icebergSchema, idToConstant)));