This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 2874fc46e8 Data, Spark, Flink: Add null engineSchema fallback for 
format model writers (#15688)
2874fc46e8 is described below

commit 2874fc46e8b307b93e2ebb1a849fd7365fb3cb31
Author: Joy Haldar <[email protected]>
AuthorDate: Fri Mar 20 17:19:20 2026 +0530

    Data, Spark, Flink: Add null engineSchema fallback for format model writers 
(#15688)
---
 .../apache/iceberg/data/BaseFormatModelTests.java  | 94 +++++++++++++++++-----
 .../apache/iceberg/flink/data/FlinkAvroWriter.java |  5 ++
 .../iceberg/flink/data/FlinkFormatModels.java      |  6 +-
 .../apache/iceberg/flink/data/FlinkOrcWriter.java  |  4 +-
 .../iceberg/flink/data/FlinkParquetWriters.java    |  8 ++
 .../apache/iceberg/flink/data/FlinkAvroWriter.java |  5 ++
 .../iceberg/flink/data/FlinkFormatModels.java      |  6 +-
 .../apache/iceberg/flink/data/FlinkOrcWriter.java  |  4 +-
 .../iceberg/flink/data/FlinkParquetWriters.java    |  8 ++
 .../apache/iceberg/flink/data/FlinkAvroWriter.java |  5 ++
 .../iceberg/flink/data/FlinkFormatModels.java      |  6 +-
 .../apache/iceberg/flink/data/FlinkOrcWriter.java  |  4 +-
 .../iceberg/flink/data/FlinkParquetWriters.java    |  8 ++
 .../apache/iceberg/spark/data/SparkAvroWriter.java |  5 ++
 .../iceberg/spark/source/SparkFormatModels.java    |  3 +-
 .../apache/iceberg/spark/data/SparkAvroWriter.java |  5 ++
 .../iceberg/spark/source/SparkFormatModels.java    |  3 +-
 .../apache/iceberg/spark/data/SparkAvroWriter.java |  5 ++
 .../iceberg/spark/source/SparkFormatModels.java    |  3 +-
 .../apache/iceberg/spark/data/SparkAvroWriter.java |  5 ++
 .../iceberg/spark/source/SparkFormatModels.java    |  3 +-
 21 files changed, 157 insertions(+), 38 deletions(-)

diff --git 
a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java 
b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
index e9a7f5e0c3..dd563925a3 100644
--- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
+++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
@@ -127,17 +127,36 @@ public abstract class BaseFormatModelTests<T> {
     assertThat(dataFile.recordCount()).isEqualTo(engineRecords.size());
     assertThat(dataFile.format()).isEqualTo(fileFormat);
 
-    // Read back and verify
-    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
-    List<Record> readRecords;
-    try (CloseableIterable<Record> reader =
-        FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
-            .project(schema)
-            .build()) {
-      readRecords = ImmutableList.copyOf(reader);
+    readAndAssertGenericRecords(fileFormat, schema, genericRecords);
+  }
+
+  /** Write with engine type T without explicit engineSchema, read with 
Generic Record */
+  @ParameterizedTest
+  @FieldSource("FORMAT_AND_GENERATOR")
+  void testDataWriterEngineWriteWithoutEngineSchema(
+      FileFormat fileFormat, DataGenerator dataGenerator) throws IOException {
+    Schema schema = dataGenerator.schema();
+    FileWriterBuilder<DataWriter<T>, Object> writerBuilder =
+        FormatModelRegistry.dataWriteBuilder(fileFormat, engineType(), 
encryptedFile);
+
+    DataWriter<T> writer = 
writerBuilder.schema(schema).spec(PartitionSpec.unpartitioned()).build();
+
+    List<Record> genericRecords = dataGenerator.generateRecords();
+    List<T> engineRecords = convertToEngineRecords(genericRecords, schema);
+
+    try (writer) {
+      for (T record : engineRecords) {
+        writer.write(record);
+      }
     }
 
-    DataTestHelpers.assertEquals(schema.asStruct(), genericRecords, 
readRecords);
+    DataFile dataFile = writer.toDataFile();
+
+    assertThat(dataFile).isNotNull();
+    assertThat(dataFile.recordCount()).isEqualTo(engineRecords.size());
+    assertThat(dataFile.format()).isEqualTo(fileFormat);
+
+    readAndAssertGenericRecords(fileFormat, schema, genericRecords);
   }
 
   @ParameterizedTest
@@ -212,17 +231,45 @@ public abstract class BaseFormatModelTests<T> {
     assertThat(deleteFile.format()).isEqualTo(fileFormat);
     assertThat(deleteFile.equalityFieldIds()).containsExactly(1);
 
-    // Read back and verify
-    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
-    List<Record> readRecords;
-    try (CloseableIterable<Record> reader =
-        FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
-            .project(schema)
-            .build()) {
-      readRecords = ImmutableList.copyOf(reader);
+    readAndAssertGenericRecords(fileFormat, schema, genericRecords);
+  }
+
+  /**
+   * Write equality deletes with engine type T without explicit engineSchema, 
read with Generic
+   * Record
+   */
+  @ParameterizedTest
+  @FieldSource("FORMAT_AND_GENERATOR")
+  void testEqualityDeleteWriterEngineWriteWithoutEngineSchema(
+      FileFormat fileFormat, DataGenerator dataGenerator) throws IOException {
+    Schema schema = dataGenerator.schema();
+    FileWriterBuilder<EqualityDeleteWriter<T>, Object> writerBuilder =
+        FormatModelRegistry.equalityDeleteWriteBuilder(fileFormat, 
engineType(), encryptedFile);
+
+    EqualityDeleteWriter<T> writer =
+        writerBuilder
+            .schema(schema)
+            .spec(PartitionSpec.unpartitioned())
+            .equalityFieldIds(1)
+            .build();
+
+    List<Record> genericRecords = dataGenerator.generateRecords();
+    List<T> engineRecords = convertToEngineRecords(genericRecords, schema);
+
+    try (writer) {
+      for (T record : engineRecords) {
+        writer.write(record);
+      }
     }
 
-    DataTestHelpers.assertEquals(schema.asStruct(), genericRecords, 
readRecords);
+    DeleteFile deleteFile = writer.toDeleteFile();
+
+    assertThat(deleteFile).isNotNull();
+    assertThat(deleteFile.recordCount()).isEqualTo(engineRecords.size());
+    assertThat(deleteFile.format()).isEqualTo(fileFormat);
+    assertThat(deleteFile.equalityFieldIds()).containsExactly(1);
+
+    readAndAssertGenericRecords(fileFormat, schema, genericRecords);
   }
 
   @ParameterizedTest
@@ -304,17 +351,20 @@ public abstract class BaseFormatModelTests<T> {
     assertThat(deleteFile.recordCount()).isEqualTo(2);
     assertThat(deleteFile.format()).isEqualTo(fileFormat);
 
-    // Read back and verify
+    readAndAssertGenericRecords(fileFormat, positionDeleteSchema, records);
+  }
+
+  private void readAndAssertGenericRecords(
+      FileFormat fileFormat, Schema schema, List<Record> expected) throws 
IOException {
     InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
     List<Record> readRecords;
     try (CloseableIterable<Record> reader =
         FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
-            .project(positionDeleteSchema)
+            .project(schema)
             .build()) {
       readRecords = ImmutableList.copyOf(reader);
     }
-
-    DataTestHelpers.assertEquals(positionDeleteSchema.asStruct(), records, 
readRecords);
+    DataTestHelpers.assertEquals(schema.asStruct(), expected, readRecords);
   }
 
   private List<T> convertToEngineRecords(List<Record> records, Schema schema) {
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
index 66ed95792e..8741958392 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.FieldMetrics;
 import org.apache.iceberg.avro.MetricsAwareDatumWriter;
 import org.apache.iceberg.avro.ValueWriter;
 import org.apache.iceberg.avro.ValueWriters;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
 public class FlinkAvroWriter implements MetricsAwareDatumWriter<RowData> {
@@ -43,6 +44,10 @@ public class FlinkAvroWriter implements 
MetricsAwareDatumWriter<RowData> {
     this.rowType = rowType;
   }
 
+  public FlinkAvroWriter(org.apache.iceberg.Schema icebergSchema, RowType 
engineSchema) {
+    this(engineSchema != null ? engineSchema : 
FlinkSchemaUtil.convert(icebergSchema));
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public void setSchema(Schema schema) {
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
index 0026c8a302..dd713b0dce 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
@@ -31,8 +31,7 @@ public class FlinkFormatModels {
         ParquetFormatModel.create(
             RowData.class,
             RowType.class,
-            (icebergSchema, fileSchema, engineSchema) ->
-                FlinkParquetWriters.buildWriter(engineSchema, fileSchema),
+            FlinkParquetWriters::buildWriter,
             (icebergSchema, fileSchema, engineSchema, idToConstant) ->
                 FlinkParquetReaders.buildReader(icebergSchema, fileSchema, 
idToConstant)));
 
@@ -40,7 +39,8 @@ public class FlinkFormatModels {
         AvroFormatModel.create(
             RowData.class,
             RowType.class,
-            (icebergSchema, fileSchema, engineSchema) -> new 
FlinkAvroWriter(engineSchema),
+            (icebergSchema, fileSchema, engineSchema) ->
+                new FlinkAvroWriter(icebergSchema, engineSchema),
             (icebergSchema, fileSchema, engineSchema, idToConstant) ->
                 FlinkPlannedAvroReader.create(icebergSchema, idToConstant)));
 
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
index 6a31accffd..a467d84833 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.FieldMetrics;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.orc.GenericOrcWriters;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.orc.OrcRowWriter;
 import org.apache.iceberg.orc.OrcValueWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -45,7 +46,8 @@ public class FlinkOrcWriter implements OrcRowWriter<RowData> {
   }
 
   public static OrcRowWriter<RowData> buildWriter(RowType rowType, Schema 
iSchema) {
-    return new FlinkOrcWriter(rowType, iSchema);
+    return new FlinkOrcWriter(
+        rowType != null ? rowType : FlinkSchemaUtil.convert(iSchema), iSchema);
   }
 
   @Override
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
index 5c90252723..8de42411cd 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
@@ -37,7 +37,9 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.RowType.RowField;
 import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.flink.FlinkRowData;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.parquet.ParquetValueReaders;
 import org.apache.iceberg.parquet.ParquetValueWriter;
 import org.apache.iceberg.parquet.ParquetValueWriters;
@@ -66,6 +68,12 @@ import org.apache.parquet.schema.Type;
 public class FlinkParquetWriters {
   private FlinkParquetWriters() {}
 
+  public static <T> ParquetValueWriter<T> buildWriter(
+      Schema icebergSchema, MessageType type, RowType engineSchema) {
+    return buildWriter(
+        engineSchema != null ? engineSchema : 
FlinkSchemaUtil.convert(icebergSchema), type);
+  }
+
   @SuppressWarnings("unchecked")
   public static <T> ParquetValueWriter<T> buildWriter(LogicalType schema, 
MessageType type) {
     return (ParquetValueWriter<T>)
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
index 66ed95792e..8741958392 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.FieldMetrics;
 import org.apache.iceberg.avro.MetricsAwareDatumWriter;
 import org.apache.iceberg.avro.ValueWriter;
 import org.apache.iceberg.avro.ValueWriters;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
 public class FlinkAvroWriter implements MetricsAwareDatumWriter<RowData> {
@@ -43,6 +44,10 @@ public class FlinkAvroWriter implements 
MetricsAwareDatumWriter<RowData> {
     this.rowType = rowType;
   }
 
+  public FlinkAvroWriter(org.apache.iceberg.Schema icebergSchema, RowType 
engineSchema) {
+    this(engineSchema != null ? engineSchema : 
FlinkSchemaUtil.convert(icebergSchema));
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public void setSchema(Schema schema) {
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
index 0026c8a302..dd713b0dce 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
@@ -31,8 +31,7 @@ public class FlinkFormatModels {
         ParquetFormatModel.create(
             RowData.class,
             RowType.class,
-            (icebergSchema, fileSchema, engineSchema) ->
-                FlinkParquetWriters.buildWriter(engineSchema, fileSchema),
+            FlinkParquetWriters::buildWriter,
             (icebergSchema, fileSchema, engineSchema, idToConstant) ->
                 FlinkParquetReaders.buildReader(icebergSchema, fileSchema, 
idToConstant)));
 
@@ -40,7 +39,8 @@ public class FlinkFormatModels {
         AvroFormatModel.create(
             RowData.class,
             RowType.class,
-            (icebergSchema, fileSchema, engineSchema) -> new 
FlinkAvroWriter(engineSchema),
+            (icebergSchema, fileSchema, engineSchema) ->
+                new FlinkAvroWriter(icebergSchema, engineSchema),
             (icebergSchema, fileSchema, engineSchema, idToConstant) ->
                 FlinkPlannedAvroReader.create(icebergSchema, idToConstant)));
 
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
index 6a31accffd..a467d84833 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.FieldMetrics;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.orc.GenericOrcWriters;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.orc.OrcRowWriter;
 import org.apache.iceberg.orc.OrcValueWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -45,7 +46,8 @@ public class FlinkOrcWriter implements OrcRowWriter<RowData> {
   }
 
   public static OrcRowWriter<RowData> buildWriter(RowType rowType, Schema 
iSchema) {
-    return new FlinkOrcWriter(rowType, iSchema);
+    return new FlinkOrcWriter(
+        rowType != null ? rowType : FlinkSchemaUtil.convert(iSchema), iSchema);
   }
 
   @Override
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
index 5c90252723..8de42411cd 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
@@ -37,7 +37,9 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.RowType.RowField;
 import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.flink.FlinkRowData;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.parquet.ParquetValueReaders;
 import org.apache.iceberg.parquet.ParquetValueWriter;
 import org.apache.iceberg.parquet.ParquetValueWriters;
@@ -66,6 +68,12 @@ import org.apache.parquet.schema.Type;
 public class FlinkParquetWriters {
   private FlinkParquetWriters() {}
 
+  public static <T> ParquetValueWriter<T> buildWriter(
+      Schema icebergSchema, MessageType type, RowType engineSchema) {
+    return buildWriter(
+        engineSchema != null ? engineSchema : 
FlinkSchemaUtil.convert(icebergSchema), type);
+  }
+
   @SuppressWarnings("unchecked")
   public static <T> ParquetValueWriter<T> buildWriter(LogicalType schema, 
MessageType type) {
     return (ParquetValueWriter<T>)
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
index 66ed95792e..8741958392 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.FieldMetrics;
 import org.apache.iceberg.avro.MetricsAwareDatumWriter;
 import org.apache.iceberg.avro.ValueWriter;
 import org.apache.iceberg.avro.ValueWriters;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
 public class FlinkAvroWriter implements MetricsAwareDatumWriter<RowData> {
@@ -43,6 +44,10 @@ public class FlinkAvroWriter implements 
MetricsAwareDatumWriter<RowData> {
     this.rowType = rowType;
   }
 
+  public FlinkAvroWriter(org.apache.iceberg.Schema icebergSchema, RowType 
engineSchema) {
+    this(engineSchema != null ? engineSchema : 
FlinkSchemaUtil.convert(icebergSchema));
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public void setSchema(Schema schema) {
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
index 0026c8a302..dd713b0dce 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
@@ -31,8 +31,7 @@ public class FlinkFormatModels {
         ParquetFormatModel.create(
             RowData.class,
             RowType.class,
-            (icebergSchema, fileSchema, engineSchema) ->
-                FlinkParquetWriters.buildWriter(engineSchema, fileSchema),
+            FlinkParquetWriters::buildWriter,
             (icebergSchema, fileSchema, engineSchema, idToConstant) ->
                 FlinkParquetReaders.buildReader(icebergSchema, fileSchema, 
idToConstant)));
 
@@ -40,7 +39,8 @@ public class FlinkFormatModels {
         AvroFormatModel.create(
             RowData.class,
             RowType.class,
-            (icebergSchema, fileSchema, engineSchema) -> new 
FlinkAvroWriter(engineSchema),
+            (icebergSchema, fileSchema, engineSchema) ->
+                new FlinkAvroWriter(icebergSchema, engineSchema),
             (icebergSchema, fileSchema, engineSchema, idToConstant) ->
                 FlinkPlannedAvroReader.create(icebergSchema, idToConstant)));
 
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
index 6a31accffd..a467d84833 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.FieldMetrics;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.orc.GenericOrcWriters;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.orc.OrcRowWriter;
 import org.apache.iceberg.orc.OrcValueWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -45,7 +46,8 @@ public class FlinkOrcWriter implements OrcRowWriter<RowData> {
   }
 
   public static OrcRowWriter<RowData> buildWriter(RowType rowType, Schema 
iSchema) {
-    return new FlinkOrcWriter(rowType, iSchema);
+    return new FlinkOrcWriter(
+        rowType != null ? rowType : FlinkSchemaUtil.convert(iSchema), iSchema);
   }
 
   @Override
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
index cc40013fa1..df51a25d21 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
@@ -45,7 +45,9 @@ import org.apache.flink.table.types.logical.VariantType;
 import org.apache.flink.types.variant.BinaryVariant;
 import org.apache.flink.types.variant.Variant;
 import org.apache.iceberg.FieldMetrics;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.flink.FlinkRowData;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.parquet.ParquetValueReaders;
 import org.apache.iceberg.parquet.ParquetValueWriter;
 import org.apache.iceberg.parquet.ParquetValueWriters;
@@ -80,6 +82,12 @@ import org.apache.parquet.schema.Type;
 public class FlinkParquetWriters {
   private FlinkParquetWriters() {}
 
+  public static <T> ParquetValueWriter<T> buildWriter(
+      Schema icebergSchema, MessageType type, RowType engineSchema) {
+    return buildWriter(
+        engineSchema != null ? engineSchema : 
FlinkSchemaUtil.convert(icebergSchema), type);
+  }
+
   @SuppressWarnings("unchecked")
   public static <T> ParquetValueWriter<T> buildWriter(LogicalType schema, 
MessageType type) {
     return (ParquetValueWriter<T>)
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
index 04dfd46a18..186439c58c 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.avro.MetricsAwareDatumWriter;
 import org.apache.iceberg.avro.ValueWriter;
 import org.apache.iceberg.avro.ValueWriters;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.ByteType;
 import org.apache.spark.sql.types.DataType;
@@ -46,6 +47,10 @@ public class SparkAvroWriter implements 
MetricsAwareDatumWriter<InternalRow> {
     this.dsSchema = dsSchema;
   }
 
+  public SparkAvroWriter(org.apache.iceberg.Schema icebergSchema, StructType 
dsSchema) {
+    this(dsSchema != null ? dsSchema : SparkSchemaUtil.convert(icebergSchema));
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public void setSchema(Schema schema) {
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
index a1e8a82b4d..23fbe54a4b 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
@@ -40,7 +40,8 @@ public class SparkFormatModels {
         AvroFormatModel.create(
             InternalRow.class,
             StructType.class,
-            (icebergSchema, fileSchema, engineSchema) -> new 
SparkAvroWriter(engineSchema),
+            (icebergSchema, fileSchema, engineSchema) ->
+                new SparkAvroWriter(icebergSchema, engineSchema),
             (icebergSchema, fileSchema, engineSchema, idToConstant) ->
                 SparkPlannedAvroReader.create(icebergSchema, idToConstant)));
 
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
index 04dfd46a18..186439c58c 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.avro.MetricsAwareDatumWriter;
 import org.apache.iceberg.avro.ValueWriter;
 import org.apache.iceberg.avro.ValueWriters;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.ByteType;
 import org.apache.spark.sql.types.DataType;
@@ -46,6 +47,10 @@ public class SparkAvroWriter implements 
MetricsAwareDatumWriter<InternalRow> {
     this.dsSchema = dsSchema;
   }
 
+  public SparkAvroWriter(org.apache.iceberg.Schema icebergSchema, StructType 
dsSchema) {
+    this(dsSchema != null ? dsSchema : SparkSchemaUtil.convert(icebergSchema));
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public void setSchema(Schema schema) {
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
index a1e8a82b4d..23fbe54a4b 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
@@ -40,7 +40,8 @@ public class SparkFormatModels {
         AvroFormatModel.create(
             InternalRow.class,
             StructType.class,
-            (icebergSchema, fileSchema, engineSchema) -> new 
SparkAvroWriter(engineSchema),
+            (icebergSchema, fileSchema, engineSchema) ->
+                new SparkAvroWriter(icebergSchema, engineSchema),
             (icebergSchema, fileSchema, engineSchema, idToConstant) ->
                 SparkPlannedAvroReader.create(icebergSchema, idToConstant)));
 
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
index 4946ac1031..fc7d64743d 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.avro.MetricsAwareDatumWriter;
 import org.apache.iceberg.avro.ValueWriter;
 import org.apache.iceberg.avro.ValueWriters;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.ByteType;
 import org.apache.spark.sql.types.DataType;
@@ -46,6 +47,10 @@ public class SparkAvroWriter implements 
MetricsAwareDatumWriter<InternalRow> {
     this.dsSchema = dsSchema;
   }
 
+  public SparkAvroWriter(org.apache.iceberg.Schema icebergSchema, StructType 
dsSchema) {
+    this(dsSchema != null ? dsSchema : SparkSchemaUtil.convert(icebergSchema));
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public void setSchema(Schema schema) {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
index a1e8a82b4d..23fbe54a4b 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
@@ -40,7 +40,8 @@ public class SparkFormatModels {
         AvroFormatModel.create(
             InternalRow.class,
             StructType.class,
-            (icebergSchema, fileSchema, engineSchema) -> new 
SparkAvroWriter(engineSchema),
+            (icebergSchema, fileSchema, engineSchema) ->
+                new SparkAvroWriter(icebergSchema, engineSchema),
             (icebergSchema, fileSchema, engineSchema, idToConstant) ->
                 SparkPlannedAvroReader.create(icebergSchema, idToConstant)));
 
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
index 4946ac1031..fc7d64743d 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.avro.MetricsAwareDatumWriter;
 import org.apache.iceberg.avro.ValueWriter;
 import org.apache.iceberg.avro.ValueWriters;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.ByteType;
 import org.apache.spark.sql.types.DataType;
@@ -46,6 +47,10 @@ public class SparkAvroWriter implements 
MetricsAwareDatumWriter<InternalRow> {
     this.dsSchema = dsSchema;
   }
 
+  public SparkAvroWriter(org.apache.iceberg.Schema icebergSchema, StructType 
dsSchema) {
+    this(dsSchema != null ? dsSchema : SparkSchemaUtil.convert(icebergSchema));
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public void setSchema(Schema schema) {
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
index a1e8a82b4d..23fbe54a4b 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
@@ -40,7 +40,8 @@ public class SparkFormatModels {
         AvroFormatModel.create(
             InternalRow.class,
             StructType.class,
-            (icebergSchema, fileSchema, engineSchema) -> new 
SparkAvroWriter(engineSchema),
+            (icebergSchema, fileSchema, engineSchema) ->
+                new SparkAvroWriter(icebergSchema, engineSchema),
             (icebergSchema, fileSchema, engineSchema, idToConstant) ->
                 SparkPlannedAvroReader.create(icebergSchema, idToConstant)));
 

Reply via email to