This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 6d565b222a Core, Data: Implementation of AvroFormatModel (#15254)
6d565b222a is described below
commit 6d565b222a885c0e8db6c4ed2c80ef2b7c478061
Author: pvary <[email protected]>
AuthorDate: Fri Feb 13 18:12:33 2026 +0100
Core, Data: Implementation of AvroFormatModel (#15254)
---
.../main/java/org/apache/iceberg/avro/Avro.java | 7 +-
.../org/apache/iceberg/avro/AvroFormatModel.java | 277 +++++++++++++++++++++
.../org/apache/iceberg/deletes/PositionDelete.java | 5 +
.../iceberg/formats/FormatModelRegistry.java | 9 +-
.../apache/iceberg/data/GenericFormatModels.java | 40 +++
.../org/apache/iceberg/data/DataTestHelpers.java | 8 +
.../iceberg/data/TestGenericFormatModels.java | 204 +++++++++++++++
7 files changed, 541 insertions(+), 9 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java
b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index 6c7edc25b6..4a5136f58e 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -182,8 +182,7 @@ public class Avro {
}
// supposed to always be a private method used strictly by data and delete
write builders
- private WriteBuilder createContextFunc(
- Function<Map<String, String>, Context> newCreateContextFunc) {
+ WriteBuilder createContextFunc(Function<Map<String, String>, Context>
newCreateContextFunc) {
this.createContextFunc = newCreateContextFunc;
return this;
}
@@ -217,7 +216,7 @@ public class Avro {
overwrite);
}
- private static class Context {
+ static class Context {
private final CodecFactory codec;
private Context(CodecFactory codec) {
@@ -568,7 +567,7 @@ public class Avro {
}
/** A {@link DatumWriter} implementation that wraps another to produce
position deletes. */
- private static class PositionDatumWriter implements
MetricsAwareDatumWriter<PositionDelete<?>> {
+ static class PositionDatumWriter implements
MetricsAwareDatumWriter<PositionDelete<?>> {
private static final ValueWriter<Object> PATH_WRITER =
ValueWriters.strings();
private static final ValueWriter<Long> POS_WRITER = ValueWriters.longs();
diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java
b/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java
new file mode 100644
index 0000000000..e0fcf89526
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.formats.BaseFormatModel;
+import org.apache.iceberg.formats.ModelWriteBuilder;
+import org.apache.iceberg.formats.ReadBuilder;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+public class AvroFormatModel<D, S>
+ extends BaseFormatModel<D, S, DatumWriter<D>, DatumReader<D>, Schema> {
+
+ public static <D> AvroFormatModel<PositionDelete<D>, Void>
forPositionDeletes() {
+ return new AvroFormatModel<>(PositionDelete.deleteClass(), Void.class,
null, null);
+ }
+
+ public static <D, S> AvroFormatModel<D, S> create(
+ Class<D> type,
+ Class<S> schemaType,
+ WriterFunction<DatumWriter<D>, S, Schema> writerFunction,
+ ReaderFunction<DatumReader<D>, S, Schema> readerFunction) {
+ return new AvroFormatModel<>(type, schemaType, writerFunction,
readerFunction);
+ }
+
+ private AvroFormatModel(
+ Class<D> type,
+ Class<S> schemaType,
+ WriterFunction<DatumWriter<D>, S, Schema> writerFunction,
+ ReaderFunction<DatumReader<D>, S, Schema> readerFunction) {
+ super(type, schemaType, writerFunction, readerFunction);
+ }
+
+ @Override
+ public FileFormat format() {
+ return FileFormat.AVRO;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> writeBuilder(EncryptedOutputFile outputFile) {
+ return new WriteBuilderWrapper<>(outputFile, writerFunction());
+ }
+
+ @Override
+ public ReadBuilder<D, S> readBuilder(InputFile inputFile) {
+ return new ReadBuilderWrapper<>(inputFile, readerFunction());
+ }
+
+ private static class WriteBuilderWrapper<D, S> implements
ModelWriteBuilder<D, S> {
+ private final Avro.WriteBuilder internal;
+ private final WriterFunction<DatumWriter<D>, S, Schema> writerFunction;
+ private org.apache.iceberg.Schema schema;
+ private S engineSchema;
+ private FileContent content;
+
+ private WriteBuilderWrapper(
+ EncryptedOutputFile outputFile, WriterFunction<DatumWriter<D>, S,
Schema> writerFunction) {
+ this.internal = Avro.write(outputFile.encryptingOutputFile());
+ this.writerFunction = writerFunction;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> schema(org.apache.iceberg.Schema newSchema)
{
+ this.schema = newSchema;
+ internal.schema(newSchema);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> engineSchema(S newSchema) {
+ this.engineSchema = newSchema;
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> set(String property, String value) {
+ internal.set(property, value);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> setAll(Map<String, String> properties) {
+ internal.setAll(properties);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> meta(String property, String value) {
+ internal.meta(property, value);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> meta(Map<String, String> properties) {
+ internal.meta(properties);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> content(FileContent newContent) {
+ this.content = newContent;
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> metricsConfig(MetricsConfig metricsConfig) {
+ internal.metricsConfig(metricsConfig);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> overwrite() {
+ internal.overwrite();
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> withFileEncryptionKey(ByteBuffer
encryptionKey) {
+ throw new UnsupportedOperationException("Avro does not support file
encryption keys");
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> withAADPrefix(ByteBuffer aadPrefix) {
+ throw new UnsupportedOperationException("Avro does not support AAD
prefix");
+ }
+
+ @Override
+ public FileAppender<D> build() throws IOException {
+ switch (content) {
+ case DATA:
+ internal.createContextFunc(Avro.WriteBuilder.Context::dataContext);
+ internal.createWriterFunc(
+ avroSchema -> writerFunction.write(schema, avroSchema,
engineSchema));
+ break;
+ case EQUALITY_DELETES:
+ internal.createContextFunc(Avro.WriteBuilder.Context::deleteContext);
+ internal.createWriterFunc(
+ avroSchema -> writerFunction.write(schema, avroSchema,
engineSchema));
+ break;
+ case POSITION_DELETES:
+ Preconditions.checkState(
+ schema == null,
+ "Invalid schema: %s. Position deletes with schema are not
supported by the API.",
+ schema);
+ Preconditions.checkState(
+ engineSchema == null,
+ "Invalid engineSchema: %s. Position deletes with schema are not
supported by the API.",
+ engineSchema);
+
+ internal.createContextFunc(Avro.WriteBuilder.Context::deleteContext);
+ internal.createWriterFunc(unused -> new Avro.PositionDatumWriter());
+ internal.schema(DeleteSchemaUtil.pathPosSchema());
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown file content: " +
content);
+ }
+
+ return internal.build();
+ }
+ }
+
+ private static class ReadBuilderWrapper<D, S> implements ReadBuilder<D, S> {
+ private final Avro.ReadBuilder internal;
+ private final ReaderFunction<DatumReader<D>, S, Schema> readerFunction;
+ private S engineSchema;
+ private Map<Integer, ?> idToConstant = ImmutableMap.of();
+
+ private ReadBuilderWrapper(
+ InputFile inputFile, ReaderFunction<DatumReader<D>, S, Schema>
readerFunction) {
+ this.internal = Avro.read(inputFile);
+ this.readerFunction = readerFunction;
+ }
+
+ @Override
+ public ReadBuilder<D, S> split(long newStart, long newLength) {
+ internal.split(newStart, newLength);
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> project(org.apache.iceberg.Schema schema) {
+ internal.project(schema);
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> engineProjection(S schema) {
+ this.engineSchema = schema;
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> caseSensitive(boolean caseSensitive) {
+ // Filtering is not supported in Avro reader, so case sensitivity does
not matter
+ // This is not an error since filtering is best-effort.
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> filter(Expression filter) {
+ // Filtering is not supported in Avro reader
+ // This is not an error since filtering is best-effort.
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> set(String key, String value) {
+ // Configuration is not used for Avro reader creation
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> reuseContainers() {
+ internal.reuseContainers();
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> recordsPerBatch(int numRowsPerBatch) {
+ throw new UnsupportedOperationException("Batch reading is not supported
in Avro reader");
+ }
+
+ @Override
+ public ReadBuilder<D, S> idToConstant(Map<Integer, ?> newIdToConstant) {
+ this.idToConstant = newIdToConstant;
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S>
withNameMapping(org.apache.iceberg.mapping.NameMapping nameMapping) {
+ internal.withNameMapping(nameMapping);
+ return this;
+ }
+
+ @Override
+ public CloseableIterable<D> build() {
+ // The file schema is passed directly to the DatumReader by the Avro
read path, so null is
+ // passed here
+ return internal
+ .createResolvingReader(
+ icebergSchema -> readerFunction.read(icebergSchema, null,
engineSchema, idToConstant))
+ .build();
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
index b202e7fcf3..c3b6cbaa9b 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
@@ -31,6 +31,11 @@ public class PositionDelete<R> implements StructLike {
private PositionDelete() {}
+ @SuppressWarnings("unchecked")
+ public static <T> Class<PositionDelete<T>> deleteClass() {
+ return (Class<PositionDelete<T>>) (Class<?>) PositionDelete.class;
+ }
+
public PositionDelete<R> set(CharSequence newPath, long newPos) {
this.path = newPath;
this.pos = newPos;
diff --git
a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
index 91301b95dc..b9adafdbc2 100644
--- a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
+++ b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
@@ -54,7 +54,8 @@ public final class FormatModelRegistry {
private static final Logger LOG =
LoggerFactory.getLogger(FormatModelRegistry.class);
// The list of classes which are used for registering the reader and writer
builders
- private static final List<String> CLASSES_TO_REGISTER = ImmutableList.of();
+ private static final List<String> CLASSES_TO_REGISTER =
+ ImmutableList.of("org.apache.iceberg.data.GenericFormatModels");
// Format models indexed by file format and object model class
private static final Map<Pair<FileFormat, Class<?>>, FormatModel<?, ?>>
MODELS =
@@ -169,12 +170,10 @@ public final class FormatModelRegistry {
* @param outputFile destination for the written data
* @return a configured delete write builder for creating a {@link
PositionDeleteWriter}
*/
- @SuppressWarnings("unchecked")
public static <D> FileWriterBuilder<PositionDeleteWriter<D>, ?>
positionDeleteWriteBuilder(
FileFormat format, EncryptedOutputFile outputFile) {
- Class<PositionDelete<D>> deleteClass =
- (Class<PositionDelete<D>>) (Class<?>) PositionDelete.class;
- FormatModel<PositionDelete<D>, ?> model =
FormatModelRegistry.modelFor(format, deleteClass);
+ FormatModel<PositionDelete<D>, ?> model =
+ FormatModelRegistry.modelFor(format, PositionDelete.deleteClass());
return FileWriterBuilderImpl.forPositionDelete(model, outputFile);
}
diff --git
a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java
b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java
new file mode 100644
index 0000000000..e2e13861ca
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import org.apache.iceberg.avro.AvroFormatModel;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.avro.PlannedDataReader;
+import org.apache.iceberg.formats.FormatModelRegistry;
+
+public class GenericFormatModels {
+ public static void register() {
+ FormatModelRegistry.register(
+ AvroFormatModel.create(
+ Record.class,
+ Void.class,
+ (icebergSchema, fileSchema, engineSchema) ->
DataWriter.create(fileSchema),
+ (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+ PlannedDataReader.create(icebergSchema, idToConstant)));
+
+ FormatModelRegistry.register(AvroFormatModel.forPositionDeletes());
+ }
+
+ private GenericFormatModels() {}
+}
diff --git a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java
b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java
index fc8d47680b..f2e2b4e7fa 100644
--- a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java
+++ b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java
@@ -31,6 +31,14 @@ import org.apache.iceberg.variants.VariantTestUtil;
public class DataTestHelpers {
private DataTestHelpers() {}
+ public static void assertEquals(
+ Types.StructType struct, List<Record> expected, List<Record> actual) {
+ assertThat(actual).hasSize(expected.size());
+ for (int i = 0; i < expected.size(); i += 1) {
+ assertEquals(struct, expected.get(i), actual.get(i));
+ }
+ }
+
public static void assertEquals(Types.StructType struct, Record expected,
Record actual) {
assertEquals(struct, expected, actual, null, -1);
}
diff --git
a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
new file mode 100644
index 0000000000..ca3dda30ab
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TestBase;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.formats.FileWriterBuilder;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.inmemory.InMemoryFileIO;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.FieldSource;
+
+public class TestGenericFormatModels {
+ private static final List<Record> TEST_RECORDS =
+ RandomGenericData.generate(TestBase.SCHEMA, 10, 1L);
+
+ private static final FileFormat[] FILE_FORMATS = new FileFormat[]
{FileFormat.AVRO};
+
+ @TempDir protected Path temp;
+
+ private InMemoryFileIO fileIO;
+ private EncryptedOutputFile encryptedFile;
+
+ @BeforeEach
+ public void before() {
+ this.fileIO = new InMemoryFileIO();
+ this.encryptedFile =
+ EncryptedFiles.encryptedOutput(
+ fileIO.newOutputFile("test-file"), EncryptionKeyMetadata.EMPTY);
+ }
+
+ @AfterEach
+ public void after() throws IOException {
+ fileIO.deleteFile(encryptedFile.encryptingOutputFile());
+ this.encryptedFile = null;
+ if (fileIO != null) {
+ fileIO.close();
+ }
+ }
+
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ public void testDataWriterRoundTrip(FileFormat fileFormat) throws
IOException {
+ FileWriterBuilder<DataWriter<Record>, Schema> writerBuilder =
+ FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class,
encryptedFile);
+
+ DataFile dataFile;
+ DataWriter<Record> writer =
+
writerBuilder.schema(TestBase.SCHEMA).spec(PartitionSpec.unpartitioned()).build();
+ try (writer) {
+ for (Record record : TEST_RECORDS) {
+ writer.write(record);
+ }
+ }
+
+ dataFile = writer.toDataFile();
+
+ assertThat(dataFile).isNotNull();
+ assertThat(dataFile.recordCount()).isEqualTo(TEST_RECORDS.size());
+ assertThat(dataFile.format()).isEqualTo(fileFormat);
+
+ // Verify the file content by reading it back
+ InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+ List<Record> readRecords;
+ try (CloseableIterable<Record> reader =
+ FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
+ .project(TestBase.SCHEMA)
+ .reuseContainers()
+ .build()) {
+ readRecords = ImmutableList.copyOf(CloseableIterable.transform(reader,
Record::copy));
+ }
+
+ DataTestHelpers.assertEquals(TestBase.SCHEMA.asStruct(), TEST_RECORDS,
readRecords);
+ }
+
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ public void testEqualityDeleteWriterRoundTrip(FileFormat fileFormat) throws
IOException {
+ FileWriterBuilder<EqualityDeleteWriter<Record>, Schema> writerBuilder =
+ FormatModelRegistry.equalityDeleteWriteBuilder(fileFormat,
Record.class, encryptedFile);
+
+ DeleteFile deleteFile;
+ EqualityDeleteWriter<Record> writer =
+ writerBuilder
+ .schema(TestBase.SCHEMA)
+ .spec(PartitionSpec.unpartitioned())
+ .equalityFieldIds(3)
+ .build();
+ try (writer) {
+ for (Record record : TEST_RECORDS) {
+ writer.write(record);
+ }
+ }
+
+ deleteFile = writer.toDeleteFile();
+
+ assertThat(deleteFile).isNotNull();
+ assertThat(deleteFile.recordCount()).isEqualTo(TEST_RECORDS.size());
+ assertThat(deleteFile.format()).isEqualTo(fileFormat);
+ assertThat(deleteFile.equalityFieldIds()).containsExactly(3);
+
+ // Verify the file content by reading it back
+ InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+ List<Record> readRecords;
+ try (CloseableIterable<Record> reader =
+ FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
+ .project(TestBase.SCHEMA)
+ .build()) {
+ readRecords = ImmutableList.copyOf(reader);
+ }
+
+ DataTestHelpers.assertEquals(TestBase.SCHEMA.asStruct(), TEST_RECORDS,
readRecords);
+ }
+
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ public void testPositionDeleteWriterRoundTrip(FileFormat fileFormat) throws
IOException {
+ Schema positionDeleteSchema = new Schema(DELETE_FILE_PATH,
DELETE_FILE_POS);
+
+ FileWriterBuilder<PositionDeleteWriter<Record>, ?> writerBuilder =
+ FormatModelRegistry.positionDeleteWriteBuilder(fileFormat,
encryptedFile);
+
+ PositionDelete<Record> delete1 = PositionDelete.create();
+ delete1.set("data-file-1.parquet", 0L);
+
+ PositionDelete<Record> delete2 = PositionDelete.create();
+ delete2.set("data-file-1.parquet", 1L);
+
+ List<PositionDelete<Record>> positionDeletes = ImmutableList.of(delete1,
delete2);
+
+ DeleteFile deleteFile;
+ PositionDeleteWriter<Record> writer =
writerBuilder.spec(PartitionSpec.unpartitioned()).build();
+ try (writer) {
+ for (PositionDelete<Record> delete : positionDeletes) {
+ writer.write(delete);
+ }
+ }
+
+ deleteFile = writer.toDeleteFile();
+
+ assertThat(deleteFile).isNotNull();
+ assertThat(deleteFile.recordCount()).isEqualTo(2);
+ assertThat(deleteFile.format()).isEqualTo(fileFormat);
+
+ // Verify the file content by reading it back
+ InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+ List<Record> readRecords;
+ try (CloseableIterable<Record> reader =
+ FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
+ .project(positionDeleteSchema)
+ .build()) {
+ readRecords = ImmutableList.copyOf(reader);
+ }
+
+ List<Record> expected =
+ ImmutableList.of(
+ GenericRecord.create(positionDeleteSchema)
+ .copy(DELETE_FILE_PATH.name(), "data-file-1.parquet",
DELETE_FILE_POS.name(), 0L),
+ GenericRecord.create(positionDeleteSchema)
+ .copy(DELETE_FILE_PATH.name(), "data-file-1.parquet",
DELETE_FILE_POS.name(), 1L));
+
+ DataTestHelpers.assertEquals(positionDeleteSchema.asStruct(), expected,
readRecords);
+ }
+}