stevenzwu commented on code in PR #12774: URL: https://github.com/apache/iceberg/pull/12774#discussion_r2334137025
########## core/src/main/java/org/apache/iceberg/io/FormatModel.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.util.function.Function; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.FormatModelRegistry; +import org.apache.iceberg.deletes.PositionDelete; + +/** + * Interface that provides a unified abstraction for converting between data file formats and + * input/output data representations. + * + * <p>{@link FormatModel} serves as a bridge between storage formats ({@link FileFormat}) and + * expected input/output data structures, optimizing performance through direct conversion without + * intermediate representations. File format implementations handle the low-level parsing details + * while the object model determines the in-memory representation used for the parsed data. + * Together, these provide a consistent API for consuming data files while optimizing for specific + * processing engines. + * + * <p>Iceberg provides some built-in object models and processing engines can implement custom + * object models to integrate with Iceberg's file reading and writing capabilities. + * + * @param <D> output type used for reading data, and input type for writing data and deletes + * @param <S> the type of the schema for the input/output data + */ +public interface FormatModel<D, S> { + /** The file format which is read/written by the object model. */ + FileFormat format(); + + /** + * Returns the unique identifier for the object model implementation processed by this factory. + * + * <p>The model names (for example: "generic", "spark", "spark-vectorized", "flink", "arrow") act + * as a contract specifying the expected data structures for both reading (converting file formats + * into output objects) and writing (converting input objects into file formats). This ensures + * proper integration between Iceberg's storage layer and processing engines. + * + * <p>Processing engines can define their own object models by implementing this interface and + * using their own model name. They can register these models with Iceberg by using the {@link + * FormatModelRegistry}. This allows custom data representations to be seamlessly integrated with + * Iceberg's file format handlers. + * + * @return string identifier for this model implementation Review Comment: `string identifier` is inaccurate anymore. same for line 49 above. ########## core/src/main/java/org/apache/iceberg/data/FormatModelRegistry.java: ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FormatModel; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.ReadBuilder; +import org.apache.iceberg.io.WriteBuilder; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A registry that manages file-format-specific readers and writers through a unified object model + * factory interface. + * + * <p>This registry provides access to {@link ReadBuilder}s for data consumption and various writer + * builders: + * + * <ul> + * <li>{@link WriteBuilder} for basic file writing, + * <li>{@link DataWriteBuilder} for data files, + * <li>{@link EqualityDeleteWriteBuilder} for equality deletes, + * <li>{@link PositionDeleteWriteBuilder} for position deletes. + * </ul> + * + * The appropriate builder is selected based on {@link FileFormat} and object model name. + * + * <p>{@link FormatModel} objects are registered through {@link #register(FormatModel)} and used for + * creating readers and writers. Read builders are returned directly from the factory. Write + * builders may be wrapped in specialized content file writer implementations depending on the + * requested builder type. + */ +public final class FormatModelRegistry { + private static final Logger LOG = LoggerFactory.getLogger(FormatModelRegistry.class); + // The list of classes which are used for registering the reader and writer builders + private static final List<String> CLASSES_TO_REGISTER = ImmutableList.of(); + + private static final Map<Pair<FileFormat, Class<?>>, FormatModel<?, ?>> FORMAT_MODELS = + Maps.newConcurrentMap(); + + private FormatModelRegistry() {} + + /** + * Registers an {@link FormatModel} in this registry. + * + * <p>The {@link FormatModel} creates readers and writers for a specific combinations of file + * format (Parquet, ORC, Avro) and object model (for example: "generic", "spark", "flink", etc.). + * Registering custom factories allows integration of new data processing engines for the + * supported file formats with Iceberg's file access mechanisms. + * + * <p>Each factory must be uniquely identified by its combination of file format and object model + * name. This uniqueness constraint prevents ambiguity when selecting factories for read and write + * operations. + * + * @param formatModel the factory implementation to register + * @throws IllegalArgumentException if a factory is already registered for the combination of + * {@link FormatModel#format()} and {@link FormatModel#type()} ()} + */ + public static void register(FormatModel<?, ?> formatModel) { + Pair<FileFormat, Class<?>> key = Pair.of(formatModel.format(), formatModel.type()); + Preconditions.checkArgument( Review Comment: this is a static util class. Clients (like Flink) may call the register method again during restart/retry? wondering if it is a good idea to fail on registration override? InternalData doesn't fail the repeated registration. ########## core/src/main/java/org/apache/iceberg/io/FormatModel.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.util.function.Function; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.FormatModelRegistry; +import org.apache.iceberg.deletes.PositionDelete; + +/** + * Interface that provides a unified abstraction for converting between data file formats and + * input/output data representations. + * + * <p>{@link FormatModel} serves as a bridge between storage formats ({@link FileFormat}) and + * expected input/output data structures, optimizing performance through direct conversion without + * intermediate representations. File format implementations handle the low-level parsing details + * while the object model determines the in-memory representation used for the parsed data. + * Together, these provide a consistent API for consuming data files while optimizing for specific + * processing engines. + * + * <p>Iceberg provides some built-in object models and processing engines can implement custom + * object models to integrate with Iceberg's file reading and writing capabilities. + * + * @param <D> output type used for reading data, and input type for writing data and deletes + * @param <S> the type of the schema for the input/output data + */ +public interface FormatModel<D, S> { + /** The file format which is read/written by the object model. */ + FileFormat format(); + + /** + * Returns the unique identifier for the object model implementation processed by this factory. + * + * <p>The model names (for example: "generic", "spark", "spark-vectorized", "flink", "arrow") act + * as a contract specifying the expected data structures for both reading (converting file formats + * into output objects) and writing (converting input objects into file formats). This ensures + * proper integration between Iceberg's storage layer and processing engines. + * + * <p>Processing engines can define their own object models by implementing this interface and + * using their own model name. They can register these models with Iceberg by using the {@link + * FormatModelRegistry}. This allows custom data representations to be seamlessly integrated with + * Iceberg's file format handlers. + * + * @return string identifier for this model implementation + */ + Class<D> type(); + + /** + * Creates a writer builder for standard data files. + * + * <p>The returned {@link WriteBuilder} configures and creates a writer that converts input + * objects into the file format supported by this factory. The builder allows for configuration of + * various writing aspects like schema, metrics collection, compression, encryption. + * + * <p>The builder follows the fluent pattern for configuring writer properties and ultimately + * creates a {@link FileAppender} for writing the files. + * + * @param outputFile destination for the written data + * @return configured writer builder + */ + WriteBuilder<D, S> writeBuilder(OutputFile outputFile); + + /** + * Creates a function that converts {@link PositionDelete} objects into the format-specific + * objects which is used for writing position deletes + * + * @param schema of the position delete row content + * @return a function that converts {@link PositionDelete} objects into the format-specific + * objects. + */ + Function<PositionDelete<D>, D> positionDeleteConverter(Schema schema); + + /** + * Creates a file reader builder for the specified input file. + * + * <p>The returned {@link ReadBuilder} configures and creates a reader that converts data from the Review Comment: nit: similar comment on the Javadoc can be just in ReadBuilder interface ########## core/src/main/java/org/apache/iceberg/io/FormatModel.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.util.function.Function; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.FormatModelRegistry; +import org.apache.iceberg.deletes.PositionDelete; + +/** + * Interface that provides a unified abstraction for converting between data file formats and + * input/output data representations. + * + * <p>{@link FormatModel} serves as a bridge between storage formats ({@link FileFormat}) and + * expected input/output data structures, optimizing performance through direct conversion without + * intermediate representations. File format implementations handle the low-level parsing details + * while the object model determines the in-memory representation used for the parsed data. + * Together, these provide a consistent API for consuming data files while optimizing for specific + * processing engines. + * + * <p>Iceberg provides some built-in object models and processing engines can implement custom + * object models to integrate with Iceberg's file reading and writing capabilities. + * + * @param <D> output type used for reading data, and input type for writing data and deletes + * @param <S> the type of the schema for the input/output data + */ +public interface FormatModel<D, S> { + /** The file format which is read/written by the object model. */ + FileFormat format(); + + /** + * Returns the unique identifier for the object model implementation processed by this factory. + * + * <p>The model names (for example: "generic", "spark", "spark-vectorized", "flink", "arrow") act + * as a contract specifying the expected data structures for both reading (converting file formats + * into output objects) and writing (converting input objects into file formats). This ensures + * proper integration between Iceberg's storage layer and processing engines. + * + * <p>Processing engines can define their own object models by implementing this interface and + * using their own model name. They can register these models with Iceberg by using the {@link + * FormatModelRegistry}. This allows custom data representations to be seamlessly integrated with + * Iceberg's file format handlers. + * + * @return string identifier for this model implementation + */ + Class<D> type(); + + /** + * Creates a writer builder for standard data files. + * + * <p>The returned {@link WriteBuilder} configures and creates a writer that converts input Review Comment: I am wondering if most of the Javadoc here should be moved to the `WriteBuilder` interface? ########## core/src/main/java/org/apache/iceberg/data/ContentFileWriteBuilder.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DataWriter; + +/** + * A generic builder interface for creating specialized file writers in the Iceberg ecosystem. + * + * <p>This builder provides a unified configuration API for generating various types of content + * writers: + * + * <ul> + * <li>{@link DataWriter} for creating data files with table records + * <li>{@link EqualityDeleteWriter} for creating files with equality-based delete records + * <li>{@link PositionDeleteWriter} for creating files with position-based delete records + * </ul> + * + * <p>Each concrete implementation configures the underlying file format writer while adding + * content-specific metadata and behaviors. + * + * @param <B> the concrete builder type for method chaining + */ +interface ContentFileWriteBuilder<B extends ContentFileWriteBuilder<B, S>, S> { Review Comment: I am wondering if these classes should be just put into the `io` sub package? I remember a previous discussion on the `data` module. but here is still the `core` module. ########## core/src/main/java/org/apache/iceberg/io/FormatModel.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.util.function.Function; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.FormatModelRegistry; +import org.apache.iceberg.deletes.PositionDelete; + +/** + * Interface that provides a unified abstraction for converting between data file formats and + * input/output data representations. + * + * <p>{@link FormatModel} serves as a bridge between storage formats ({@link FileFormat}) and + * expected input/output data structures, optimizing performance through direct conversion without + * intermediate representations. File format implementations handle the low-level parsing details + * while the object model determines the in-memory representation used for the parsed data. + * Together, these provide a consistent API for consuming data files while optimizing for specific + * processing engines. + * + * <p>Iceberg provides some built-in object models and processing engines can implement custom + * object models to integrate with Iceberg's file reading and writing capabilities. + * + * @param <D> output type used for reading data, and input type for writing data and deletes + * @param <S> the type of the schema for the input/output data + */ +public interface FormatModel<D, S> { + /** The file format which is read/written by the object model. */ + FileFormat format(); + + /** + * Returns the unique identifier for the object model implementation processed by this factory. + * + * <p>The model names (for example: "generic", "spark", "spark-vectorized", "flink", "arrow") act + * as a contract specifying the expected data structures for both reading (converting file formats + * into output objects) and writing (converting input objects into file formats). This ensures + * proper integration between Iceberg's storage layer and processing engines. + * + * <p>Processing engines can define their own object models by implementing this interface and + * using their own model name. They can register these models with Iceberg by using the {@link + * FormatModelRegistry}. This allows custom data representations to be seamlessly integrated with + * Iceberg's file format handlers. + * + * @return string identifier for this model implementation + */ + Class<D> type(); + + /** + * Creates a writer builder for standard data files. Review Comment: nit: `standard` is unnecessary and probably a bit confusing ########## core/src/main/java/org/apache/iceberg/data/ContentFileWriteBuilderImpl.java: ########## @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.WriteBuilder; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * An internal implementation that handles all {@link ContentFileWriteBuilder} interface variants. + * + * <p>This unified implementation serves as a backend for multiple specialized content writers: + * + * <ul> + * <li>{@link DataWriteBuilder} for creating data files + * <li>{@link EqualityDeleteWriteBuilder} for creating equality delete files + * <li>{@link PositionDeleteWriteBuilder} for creating position delete files + * </ul> + * + * <p>The implementation delegates to a format-specific {@link WriteBuilder} while enriching it with + * content-specific functionality. When building a writer, the implementation configures the + * underlying builder and calls its {@link WriteBuilder#build()} method to create the appropriate + * specialized writer for the requested content type. + * + * @param <B> the concrete builder type for method chaining + * @param <D> the type of data records the writer will accept + */ +@SuppressWarnings("unchecked") +abstract class ContentFileWriteBuilderImpl<B extends ContentFileWriteBuilder<B, S>, D, S> + implements ContentFileWriteBuilder<B, S> { + private final WriteBuilder<D, S> writeBuilder; + private final String location; + private final FileFormat format; + private PartitionSpec spec = null; + private StructLike partition = null; + private EncryptionKeyMetadata keyMetadata = null; + private SortOrder sortOrder = null; + + static <D, S> DataWriteBuilder<D, S> forDataFile( + WriteBuilder<D, S> writeBuilder, String location, FileFormat format) { + return new DataFileWriteBuilder<>(writeBuilder, location, format); + } + + static <D, S> EqualityDeleteWriteBuilder<D, S> forEqualityDelete( + WriteBuilder<D, S> writeBuilder, String location, FileFormat format) { + return new EqualityDeleteFileWriteBuilder<>(writeBuilder, location, format); + } + + static <D, S> PositionDeleteWriteBuilder<D, S> forPositionDelete( + WriteBuilder<D, S> writeBuilder, + String location, + FileFormat format, + Function<Schema, Function<PositionDelete<D>, D>> positionDeleteConverter) { + return new PositionDeleteFileWriteBuilder<>( + writeBuilder, location, format, positionDeleteConverter); + } + + private ContentFileWriteBuilderImpl( + WriteBuilder<D, S> writeBuilder, String location, FileFormat format) { + this.writeBuilder = writeBuilder; + this.location = location; + this.format = format; + } + + @Override + public B schema(Schema schema) { + writeBuilder.schema(schema); + return self(); + } + + @Override + public B inputSchema(S schema) { + writeBuilder.inputSchema(schema); + return self(); + } + + @Override + public B set(String property, String value) { + writeBuilder.set(property, value); + return self(); + } + + @Override + public B meta(String property, String value) { + writeBuilder.meta(property, value); + return self(); + } + + @Override + public B metricsConfig(MetricsConfig metricsConfig) { + writeBuilder.metricsConfig(metricsConfig); + return self(); + } + + @Override + public B overwrite() { + writeBuilder.overwrite(); + return self(); + } + + @Override + public B fileEncryptionKey(ByteBuffer encryptionKey) { + writeBuilder.fileEncryptionKey(encryptionKey); + return self(); + } + + @Override + public B fileAADPrefix(ByteBuffer aadPrefix) { + writeBuilder.fileAADPrefix(aadPrefix); + return self(); + } + + @Override + public B spec(PartitionSpec newSpec) { + this.spec = newSpec; + return self(); + } + + @Override + public B partition(StructLike newPartition) { + this.partition = newPartition; + return self(); + } + + @Override + public B keyMetadata(EncryptionKeyMetadata newKeyMetadata) { + this.keyMetadata = newKeyMetadata; + return self(); + } + + @Override + public B sortOrder(SortOrder newSortOrder) { + this.sortOrder = newSortOrder; + return self(); + } + + private static class DataFileWriteBuilder<D, S> + extends ContentFileWriteBuilderImpl<DataWriteBuilder<D, S>, D, S> + implements DataWriteBuilder<D, S> { + private DataFileWriteBuilder( + WriteBuilder<D, S> writeBuilder, String location, FileFormat format) { + super(writeBuilder, location, format); + } + + @Override + public DataFileWriteBuilder<D, S> self() { + return this; + } + + @Override + public DataWriter<D> build() throws IOException { + Preconditions.checkArgument(super.spec != null, "Cannot create data writer without spec"); + Preconditions.checkArgument( + super.spec.isUnpartitioned() || super.partition != null, + "Partition must not be null when creating data writer for partitioned spec"); + + return new DataWriter<>( + super.writeBuilder.build(), + super.format, + super.location, + super.spec, + super.partition, + super.keyMetadata, + super.sortOrder); + } + } + + private static class EqualityDeleteFileWriteBuilder<D, S> + extends ContentFileWriteBuilderImpl<EqualityDeleteWriteBuilder<D, S>, D, S> + implements EqualityDeleteWriteBuilder<D, S> { + private Schema rowSchema = null; + private int[] equalityFieldIds = null; + + private EqualityDeleteFileWriteBuilder( + WriteBuilder<D, S> writeBuilder, String location, FileFormat format) { + super(writeBuilder, location, format); + } + + @Override + public EqualityDeleteFileWriteBuilder<D, S> self() { + return this; + } + + @Override + public EqualityDeleteFileWriteBuilder<D, S> rowSchema(Schema schema) { + this.rowSchema = schema; + return this; + } + + @Override + public EqualityDeleteFileWriteBuilder<D, S> equalityFieldIds(int... fieldIds) { + this.equalityFieldIds = fieldIds; + return this; + } + + @Override + public EqualityDeleteWriter<D> build() throws IOException { + Preconditions.checkState( + rowSchema != null, "Cannot create equality delete file without a schema"); + Preconditions.checkState( + equalityFieldIds != null, "Cannot create equality delete file without delete field ids"); + Preconditions.checkArgument( + super.spec != null, "Spec must not be null when creating equality delete writer"); + Preconditions.checkArgument( + super.spec.isUnpartitioned() || super.partition != null, + "Partition must not be null for partitioned writes"); + + return new EqualityDeleteWriter<>( + super.writeBuilder + .schema(rowSchema) + .meta("delete-type", "equality") + .meta( + "delete-field-ids", + IntStream.of(equalityFieldIds) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))) + .build(), + super.format, + super.location, + super.spec, + super.partition, + super.keyMetadata, + super.sortOrder, + equalityFieldIds); + } + } + + private static class PositionDeleteFileWriteBuilder<D, S> + extends ContentFileWriteBuilderImpl<PositionDeleteWriteBuilder<D, S>, D, S> + implements PositionDeleteWriteBuilder<D, S> { + private final Function<Schema, Function<PositionDelete<D>, D>> positionDeleteConverter; + private Schema rowSchema = null; + + private PositionDeleteFileWriteBuilder( + WriteBuilder<D, S> writeBuilder, + String location, + FileFormat format, + Function<Schema, Function<PositionDelete<D>, D>> positionDeleteConverter) { + super(writeBuilder, location, format); + this.positionDeleteConverter = positionDeleteConverter; + } + + @Override + public PositionDeleteFileWriteBuilder<D, S> self() { + return this; + } + + @Override + public PositionDeleteFileWriteBuilder<D, S> rowSchema(Schema schema) { + this.rowSchema = schema; + return this; + } + + @Override + public PositionDeleteWriter<D> build() throws IOException { + Preconditions.checkArgument( + positionDeleteConverter != null, + "Positional delete converter must not be null when creating position delete writer"); + Preconditions.checkArgument( + super.spec != null, "Spec must not be null when creating position delete writer"); + Preconditions.checkArgument( + super.spec.isUnpartitioned() || super.partition != null, + "Partition must not be null for partitioned writes"); + + return new PositionDeleteWriter<D>( + (FileAppender) + new ConvertingFileAppender<>( + super.writeBuilder + .meta("delete-type", "position") + .schema(DeleteSchemaUtil.posDeleteSchema(rowSchema)) + .build(), + positionDeleteConverter.apply(rowSchema), + rowSchema), + super.format, + super.location, + super.spec, + super.partition, + super.keyMetadata); + } + } + + private static class ConvertingFileAppender<D> implements FileAppender<PositionDelete<D>> { Review Comment: nit: is `PositionDeleteFileAppender` more informative to capture the purpose of this class? If we prefer to call this `ConvertingFileAppender`, we should avoid the `PositionDelete<D>` type reference. There could be two generic types <S, T>. e.g. t ``` private static class ConvertingFileAppender<S, T> implements FileAppender<S> { ConvertingFileAppender( FileAppender<T> appender, Function<S, T> converter, Schema rowSchema) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org