This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d565b222a Core, Data: Implementation of AvroFormatModel (#15254)
6d565b222a is described below

commit 6d565b222a885c0e8db6c4ed2c80ef2b7c478061
Author: pvary <[email protected]>
AuthorDate: Fri Feb 13 18:12:33 2026 +0100

    Core, Data: Implementation of AvroFormatModel (#15254)
---
 .../main/java/org/apache/iceberg/avro/Avro.java    |   7 +-
 .../org/apache/iceberg/avro/AvroFormatModel.java   | 277 +++++++++++++++++++++
 .../org/apache/iceberg/deletes/PositionDelete.java |   5 +
 .../iceberg/formats/FormatModelRegistry.java       |   9 +-
 .../apache/iceberg/data/GenericFormatModels.java   |  40 +++
 .../org/apache/iceberg/data/DataTestHelpers.java   |   8 +
 .../iceberg/data/TestGenericFormatModels.java      | 204 +++++++++++++++
 7 files changed, 541 insertions(+), 9 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java 
b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index 6c7edc25b6..4a5136f58e 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -182,8 +182,7 @@ public class Avro {
     }
 
     // supposed to always be a private method used strictly by data and delete 
write builders
-    private WriteBuilder createContextFunc(
-        Function<Map<String, String>, Context> newCreateContextFunc) {
+    WriteBuilder createContextFunc(Function<Map<String, String>, Context> 
newCreateContextFunc) {
       this.createContextFunc = newCreateContextFunc;
       return this;
     }
@@ -217,7 +216,7 @@ public class Avro {
           overwrite);
     }
 
-    private static class Context {
+    static class Context {
       private final CodecFactory codec;
 
       private Context(CodecFactory codec) {
@@ -568,7 +567,7 @@ public class Avro {
   }
 
   /** A {@link DatumWriter} implementation that wraps another to produce 
position deletes. */
-  private static class PositionDatumWriter implements 
MetricsAwareDatumWriter<PositionDelete<?>> {
+  static class PositionDatumWriter implements 
MetricsAwareDatumWriter<PositionDelete<?>> {
     private static final ValueWriter<Object> PATH_WRITER = 
ValueWriters.strings();
     private static final ValueWriter<Long> POS_WRITER = ValueWriters.longs();
 
diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java 
b/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java
new file mode 100644
index 0000000000..e0fcf89526
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.formats.BaseFormatModel;
+import org.apache.iceberg.formats.ModelWriteBuilder;
+import org.apache.iceberg.formats.ReadBuilder;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+public class AvroFormatModel<D, S>
+    extends BaseFormatModel<D, S, DatumWriter<D>, DatumReader<D>, Schema> {
+
+  public static <D> AvroFormatModel<PositionDelete<D>, Void> 
forPositionDeletes() {
+    return new AvroFormatModel<>(PositionDelete.deleteClass(), Void.class, 
null, null);
+  }
+
+  public static <D, S> AvroFormatModel<D, S> create(
+      Class<D> type,
+      Class<S> schemaType,
+      WriterFunction<DatumWriter<D>, S, Schema> writerFunction,
+      ReaderFunction<DatumReader<D>, S, Schema> readerFunction) {
+    return new AvroFormatModel<>(type, schemaType, writerFunction, 
readerFunction);
+  }
+
+  private AvroFormatModel(
+      Class<D> type,
+      Class<S> schemaType,
+      WriterFunction<DatumWriter<D>, S, Schema> writerFunction,
+      ReaderFunction<DatumReader<D>, S, Schema> readerFunction) {
+    super(type, schemaType, writerFunction, readerFunction);
+  }
+
+  @Override
+  public FileFormat format() {
+    return FileFormat.AVRO;
+  }
+
+  @Override
+  public ModelWriteBuilder<D, S> writeBuilder(EncryptedOutputFile outputFile) {
+    return new WriteBuilderWrapper<>(outputFile, writerFunction());
+  }
+
+  @Override
+  public ReadBuilder<D, S> readBuilder(InputFile inputFile) {
+    return new ReadBuilderWrapper<>(inputFile, readerFunction());
+  }
+
+  private static class WriteBuilderWrapper<D, S> implements 
ModelWriteBuilder<D, S> {
+    private final Avro.WriteBuilder internal;
+    private final WriterFunction<DatumWriter<D>, S, Schema> writerFunction;
+    private org.apache.iceberg.Schema schema;
+    private S engineSchema;
+    private FileContent content;
+
+    private WriteBuilderWrapper(
+        EncryptedOutputFile outputFile, WriterFunction<DatumWriter<D>, S, 
Schema> writerFunction) {
+      this.internal = Avro.write(outputFile.encryptingOutputFile());
+      this.writerFunction = writerFunction;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> schema(org.apache.iceberg.Schema newSchema) 
{
+      this.schema = newSchema;
+      internal.schema(newSchema);
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> engineSchema(S newSchema) {
+      this.engineSchema = newSchema;
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> set(String property, String value) {
+      internal.set(property, value);
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> setAll(Map<String, String> properties) {
+      internal.setAll(properties);
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> meta(String property, String value) {
+      internal.meta(property, value);
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> meta(Map<String, String> properties) {
+      internal.meta(properties);
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> content(FileContent newContent) {
+      this.content = newContent;
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> metricsConfig(MetricsConfig metricsConfig) {
+      internal.metricsConfig(metricsConfig);
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> overwrite() {
+      internal.overwrite();
+      return this;
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> withFileEncryptionKey(ByteBuffer 
encryptionKey) {
+      throw new UnsupportedOperationException("Avro does not support file 
encryption keys");
+    }
+
+    @Override
+    public ModelWriteBuilder<D, S> withAADPrefix(ByteBuffer aadPrefix) {
+      throw new UnsupportedOperationException("Avro does not support AAD 
prefix");
+    }
+
+    @Override
+    public FileAppender<D> build() throws IOException {
+      switch (content) {
+        case DATA:
+          internal.createContextFunc(Avro.WriteBuilder.Context::dataContext);
+          internal.createWriterFunc(
+              avroSchema -> writerFunction.write(schema, avroSchema, 
engineSchema));
+          break;
+        case EQUALITY_DELETES:
+          internal.createContextFunc(Avro.WriteBuilder.Context::deleteContext);
+          internal.createWriterFunc(
+              avroSchema -> writerFunction.write(schema, avroSchema, 
engineSchema));
+          break;
+        case POSITION_DELETES:
+          Preconditions.checkState(
+              schema == null,
+              "Invalid schema: %s. Position deletes with schema are not 
supported by the API.",
+              schema);
+          Preconditions.checkState(
+              engineSchema == null,
+              "Invalid engineSchema: %s. Position deletes with schema are not 
supported by the API.",
+              engineSchema);
+
+          internal.createContextFunc(Avro.WriteBuilder.Context::deleteContext);
+          internal.createWriterFunc(unused -> new Avro.PositionDatumWriter());
+          internal.schema(DeleteSchemaUtil.pathPosSchema());
+          break;
+        default:
+          throw new IllegalArgumentException("Unknown file content: " + 
content);
+      }
+
+      return internal.build();
+    }
+  }
+
+  private static class ReadBuilderWrapper<D, S> implements ReadBuilder<D, S> {
+    private final Avro.ReadBuilder internal;
+    private final ReaderFunction<DatumReader<D>, S, Schema> readerFunction;
+    private S engineSchema;
+    private Map<Integer, ?> idToConstant = ImmutableMap.of();
+
+    private ReadBuilderWrapper(
+        InputFile inputFile, ReaderFunction<DatumReader<D>, S, Schema> 
readerFunction) {
+      this.internal = Avro.read(inputFile);
+      this.readerFunction = readerFunction;
+    }
+
+    @Override
+    public ReadBuilder<D, S> split(long newStart, long newLength) {
+      internal.split(newStart, newLength);
+      return this;
+    }
+
+    @Override
+    public ReadBuilder<D, S> project(org.apache.iceberg.Schema schema) {
+      internal.project(schema);
+      return this;
+    }
+
+    @Override
+    public ReadBuilder<D, S> engineProjection(S schema) {
+      this.engineSchema = schema;
+      return this;
+    }
+
+    @Override
+    public ReadBuilder<D, S> caseSensitive(boolean caseSensitive) {
+      // Filtering is not supported in Avro reader, so case sensitivity does 
not matter
+      // This is not an error since filtering is best-effort.
+      return this;
+    }
+
+    @Override
+    public ReadBuilder<D, S> filter(Expression filter) {
+      // Filtering is not supported in Avro reader
+      // This is not an error since filtering is best-effort.
+      return this;
+    }
+
+    @Override
+    public ReadBuilder<D, S> set(String key, String value) {
+      // Configuration is not used for Avro reader creation
+      return this;
+    }
+
+    @Override
+    public ReadBuilder<D, S> reuseContainers() {
+      internal.reuseContainers();
+      return this;
+    }
+
+    @Override
+    public ReadBuilder<D, S> recordsPerBatch(int numRowsPerBatch) {
+      throw new UnsupportedOperationException("Batch reading is not supported 
in Avro reader");
+    }
+
+    @Override
+    public ReadBuilder<D, S> idToConstant(Map<Integer, ?> newIdToConstant) {
+      this.idToConstant = newIdToConstant;
+      return this;
+    }
+
+    @Override
+    public ReadBuilder<D, S> 
withNameMapping(org.apache.iceberg.mapping.NameMapping nameMapping) {
+      internal.withNameMapping(nameMapping);
+      return this;
+    }
+
+    @Override
+    public CloseableIterable<D> build() {
+      // The file schema is passed directly to the DatumReader by the Avro 
read path, so null is
+      // passed here
+      return internal
+          .createResolvingReader(
+              icebergSchema -> readerFunction.read(icebergSchema, null, 
engineSchema, idToConstant))
+          .build();
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java 
b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
index b202e7fcf3..c3b6cbaa9b 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
@@ -31,6 +31,11 @@ public class PositionDelete<R> implements StructLike {
 
   private PositionDelete() {}
 
+  @SuppressWarnings("unchecked")
+  public static <T> Class<PositionDelete<T>> deleteClass() {
+    return (Class<PositionDelete<T>>) (Class<?>) PositionDelete.class;
+  }
+
   public PositionDelete<R> set(CharSequence newPath, long newPos) {
     this.path = newPath;
     this.pos = newPos;
diff --git 
a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java 
b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
index 91301b95dc..b9adafdbc2 100644
--- a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
+++ b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
@@ -54,7 +54,8 @@ public final class FormatModelRegistry {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(FormatModelRegistry.class);
   // The list of classes which are used for registering the reader and writer 
builders
-  private static final List<String> CLASSES_TO_REGISTER = ImmutableList.of();
+  private static final List<String> CLASSES_TO_REGISTER =
+      ImmutableList.of("org.apache.iceberg.data.GenericFormatModels");
 
   // Format models indexed by file format and object model class
   private static final Map<Pair<FileFormat, Class<?>>, FormatModel<?, ?>> 
MODELS =
@@ -169,12 +170,10 @@ public final class FormatModelRegistry {
    * @param outputFile destination for the written data
    * @return a configured delete write builder for creating a {@link 
PositionDeleteWriter}
    */
-  @SuppressWarnings("unchecked")
   public static <D> FileWriterBuilder<PositionDeleteWriter<D>, ?> 
positionDeleteWriteBuilder(
       FileFormat format, EncryptedOutputFile outputFile) {
-    Class<PositionDelete<D>> deleteClass =
-        (Class<PositionDelete<D>>) (Class<?>) PositionDelete.class;
-    FormatModel<PositionDelete<D>, ?> model = 
FormatModelRegistry.modelFor(format, deleteClass);
+    FormatModel<PositionDelete<D>, ?> model =
+        FormatModelRegistry.modelFor(format, PositionDelete.deleteClass());
     return FileWriterBuilderImpl.forPositionDelete(model, outputFile);
   }
 
diff --git 
a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java 
b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java
new file mode 100644
index 0000000000..e2e13861ca
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import org.apache.iceberg.avro.AvroFormatModel;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.avro.PlannedDataReader;
+import org.apache.iceberg.formats.FormatModelRegistry;
+
+public class GenericFormatModels {
+  public static void register() {
+    FormatModelRegistry.register(
+        AvroFormatModel.create(
+            Record.class,
+            Void.class,
+            (icebergSchema, fileSchema, engineSchema) -> 
DataWriter.create(fileSchema),
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                PlannedDataReader.create(icebergSchema, idToConstant)));
+
+    FormatModelRegistry.register(AvroFormatModel.forPositionDeletes());
+  }
+
+  private GenericFormatModels() {}
+}
diff --git a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java 
b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java
index fc8d47680b..f2e2b4e7fa 100644
--- a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java
+++ b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java
@@ -31,6 +31,14 @@ import org.apache.iceberg.variants.VariantTestUtil;
 public class DataTestHelpers {
   private DataTestHelpers() {}
 
+  public static void assertEquals(
+      Types.StructType struct, List<Record> expected, List<Record> actual) {
+    assertThat(actual).hasSize(expected.size());
+    for (int i = 0; i < expected.size(); i += 1) {
+      assertEquals(struct, expected.get(i), actual.get(i));
+    }
+  }
+
   public static void assertEquals(Types.StructType struct, Record expected, 
Record actual) {
     assertEquals(struct, expected, actual, null, -1);
   }
diff --git 
a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java 
b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
new file mode 100644
index 0000000000..ca3dda30ab
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TestBase;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.formats.FileWriterBuilder;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.inmemory.InMemoryFileIO;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.FieldSource;
+
+public class TestGenericFormatModels {
+  private static final List<Record> TEST_RECORDS =
+      RandomGenericData.generate(TestBase.SCHEMA, 10, 1L);
+
+  private static final FileFormat[] FILE_FORMATS = new FileFormat[] 
{FileFormat.AVRO};
+
+  @TempDir protected Path temp;
+
+  private InMemoryFileIO fileIO;
+  private EncryptedOutputFile encryptedFile;
+
+  @BeforeEach
+  public void before() {
+    this.fileIO = new InMemoryFileIO();
+    this.encryptedFile =
+        EncryptedFiles.encryptedOutput(
+            fileIO.newOutputFile("test-file"), EncryptionKeyMetadata.EMPTY);
+  }
+
+  @AfterEach
+  public void after() throws IOException {
+    fileIO.deleteFile(encryptedFile.encryptingOutputFile());
+    this.encryptedFile = null;
+    if (fileIO != null) {
+      fileIO.close();
+    }
+  }
+
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  public void testDataWriterRoundTrip(FileFormat fileFormat) throws 
IOException {
+    FileWriterBuilder<DataWriter<Record>, Schema> writerBuilder =
+        FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, 
encryptedFile);
+
+    DataFile dataFile;
+    DataWriter<Record> writer =
+        
writerBuilder.schema(TestBase.SCHEMA).spec(PartitionSpec.unpartitioned()).build();
+    try (writer) {
+      for (Record record : TEST_RECORDS) {
+        writer.write(record);
+      }
+    }
+
+    dataFile = writer.toDataFile();
+
+    assertThat(dataFile).isNotNull();
+    assertThat(dataFile.recordCount()).isEqualTo(TEST_RECORDS.size());
+    assertThat(dataFile.format()).isEqualTo(fileFormat);
+
+    // Verify the file content by reading it back
+    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+    List<Record> readRecords;
+    try (CloseableIterable<Record> reader =
+        FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
+            .project(TestBase.SCHEMA)
+            .reuseContainers()
+            .build()) {
+      readRecords = ImmutableList.copyOf(CloseableIterable.transform(reader, 
Record::copy));
+    }
+
+    DataTestHelpers.assertEquals(TestBase.SCHEMA.asStruct(), TEST_RECORDS, 
readRecords);
+  }
+
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  public void testEqualityDeleteWriterRoundTrip(FileFormat fileFormat) throws 
IOException {
+    FileWriterBuilder<EqualityDeleteWriter<Record>, Schema> writerBuilder =
+        FormatModelRegistry.equalityDeleteWriteBuilder(fileFormat, 
Record.class, encryptedFile);
+
+    DeleteFile deleteFile;
+    EqualityDeleteWriter<Record> writer =
+        writerBuilder
+            .schema(TestBase.SCHEMA)
+            .spec(PartitionSpec.unpartitioned())
+            .equalityFieldIds(3)
+            .build();
+    try (writer) {
+      for (Record record : TEST_RECORDS) {
+        writer.write(record);
+      }
+    }
+
+    deleteFile = writer.toDeleteFile();
+
+    assertThat(deleteFile).isNotNull();
+    assertThat(deleteFile.recordCount()).isEqualTo(TEST_RECORDS.size());
+    assertThat(deleteFile.format()).isEqualTo(fileFormat);
+    assertThat(deleteFile.equalityFieldIds()).containsExactly(3);
+
+    // Verify the file content by reading it back
+    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+    List<Record> readRecords;
+    try (CloseableIterable<Record> reader =
+        FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
+            .project(TestBase.SCHEMA)
+            .build()) {
+      readRecords = ImmutableList.copyOf(reader);
+    }
+
+    DataTestHelpers.assertEquals(TestBase.SCHEMA.asStruct(), TEST_RECORDS, 
readRecords);
+  }
+
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  public void testPositionDeleteWriterRoundTrip(FileFormat fileFormat) throws 
IOException {
+    Schema positionDeleteSchema = new Schema(DELETE_FILE_PATH, 
DELETE_FILE_POS);
+
+    FileWriterBuilder<PositionDeleteWriter<Record>, ?> writerBuilder =
+        FormatModelRegistry.positionDeleteWriteBuilder(fileFormat, 
encryptedFile);
+
+    PositionDelete<Record> delete1 = PositionDelete.create();
+    delete1.set("data-file-1.parquet", 0L);
+
+    PositionDelete<Record> delete2 = PositionDelete.create();
+    delete2.set("data-file-1.parquet", 1L);
+
+    List<PositionDelete<Record>> positionDeletes = ImmutableList.of(delete1, 
delete2);
+
+    DeleteFile deleteFile;
+    PositionDeleteWriter<Record> writer = 
writerBuilder.spec(PartitionSpec.unpartitioned()).build();
+    try (writer) {
+      for (PositionDelete<Record> delete : positionDeletes) {
+        writer.write(delete);
+      }
+    }
+
+    deleteFile = writer.toDeleteFile();
+
+    assertThat(deleteFile).isNotNull();
+    assertThat(deleteFile.recordCount()).isEqualTo(2);
+    assertThat(deleteFile.format()).isEqualTo(fileFormat);
+
+    // Verify the file content by reading it back
+    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+    List<Record> readRecords;
+    try (CloseableIterable<Record> reader =
+        FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
+            .project(positionDeleteSchema)
+            .build()) {
+      readRecords = ImmutableList.copyOf(reader);
+    }
+
+    List<Record> expected =
+        ImmutableList.of(
+            GenericRecord.create(positionDeleteSchema)
+                .copy(DELETE_FILE_PATH.name(), "data-file-1.parquet", 
DELETE_FILE_POS.name(), 0L),
+            GenericRecord.create(positionDeleteSchema)
+                .copy(DELETE_FILE_PATH.name(), "data-file-1.parquet", 
DELETE_FILE_POS.name(), 1L));
+
+    DataTestHelpers.assertEquals(positionDeleteSchema.asStruct(), expected, 
readRecords);
+  }
+}

Reply via email to