stevenzwu commented on code in PR #12774:
URL: https://github.com/apache/iceberg/pull/12774#discussion_r2334137025


##########
core/src/main/java/org/apache/iceberg/io/FormatModel.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.util.function.Function;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.FormatModelRegistry;
+import org.apache.iceberg.deletes.PositionDelete;
+
+/**
+ * Interface that provides a unified abstraction for converting between data 
file formats and
+ * input/output data representations.
+ *
+ * <p>{@link FormatModel} serves as a bridge between storage formats ({@link 
FileFormat}) and
+ * expected input/output data structures, optimizing performance through 
direct conversion without
+ * intermediate representations. File format implementations handle the 
low-level parsing details
+ * while the object model determines the in-memory representation used for the 
parsed data.
+ * Together, these provide a consistent API for consuming data files while 
optimizing for specific
+ * processing engines.
+ *
+ * <p>Iceberg provides some built-in object models and processing engines can 
implement custom
+ * object models to integrate with Iceberg's file reading and writing 
capabilities.
+ *
+ * @param <D> output type used for reading data, and input type for writing 
data and deletes
+ * @param <S> the type of the schema for the input/output data
+ */
+public interface FormatModel<D, S> {
+  /** The file format which is read/written by the object model. */
+  FileFormat format();
+
+  /**
+   * Returns the unique identifier for the object model implementation 
processed by this factory.
+   *
+   * <p>The model names (for example: "generic", "spark", "spark-vectorized", 
"flink", "arrow") act
+   * as a contract specifying the expected data structures for both reading 
(converting file formats
+   * into output objects) and writing (converting input objects into file 
formats). This ensures
+   * proper integration between Iceberg's storage layer and processing engines.
+   *
+   * <p>Processing engines can define their own object models by implementing 
this interface and
+   * using their own model name. They can register these models with Iceberg 
by using the {@link
+   * FormatModelRegistry}. This allows custom data representations to be 
seamlessly integrated with
+   * Iceberg's file format handlers.
+   *
+   * @return string identifier for this model implementation

Review Comment:
   `string identifier` is inaccurate anymore. same for line 49 above.



##########
core/src/main/java/org/apache/iceberg/data/FormatModelRegistry.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FormatModel;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.ReadBuilder;
+import org.apache.iceberg.io.WriteBuilder;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A registry that manages file-format-specific readers and writers through a 
unified object model
+ * factory interface.
+ *
+ * <p>This registry provides access to {@link ReadBuilder}s for data 
consumption and various writer
+ * builders:
+ *
+ * <ul>
+ *   <li>{@link WriteBuilder} for basic file writing,
+ *   <li>{@link DataWriteBuilder} for data files,
+ *   <li>{@link EqualityDeleteWriteBuilder} for equality deletes,
+ *   <li>{@link PositionDeleteWriteBuilder} for position deletes.
+ * </ul>
+ *
+ * The appropriate builder is selected based on {@link FileFormat} and object 
model name.
+ *
+ * <p>{@link FormatModel} objects are registered through {@link 
#register(FormatModel)} and used for
+ * creating readers and writers. Read builders are returned directly from the 
factory. Write
+ * builders may be wrapped in specialized content file writer implementations 
depending on the
+ * requested builder type.
+ */
+public final class FormatModelRegistry {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FormatModelRegistry.class);
+  // The list of classes which are used for registering the reader and writer 
builders
+  private static final List<String> CLASSES_TO_REGISTER = ImmutableList.of();
+
+  private static final Map<Pair<FileFormat, Class<?>>, FormatModel<?, ?>> 
FORMAT_MODELS =
+      Maps.newConcurrentMap();
+
+  private FormatModelRegistry() {}
+
+  /**
+   * Registers an {@link FormatModel} in this registry.
+   *
+   * <p>The {@link FormatModel} creates readers and writers for a specific 
combinations of file
+   * format (Parquet, ORC, Avro) and object model (for example: "generic", 
"spark", "flink", etc.).
+   * Registering custom factories allows integration of new data processing 
engines for the
+   * supported file formats with Iceberg's file access mechanisms.
+   *
+   * <p>Each factory must be uniquely identified by its combination of file 
format and object model
+   * name. This uniqueness constraint prevents ambiguity when selecting 
factories for read and write
+   * operations.
+   *
+   * @param formatModel the factory implementation to register
+   * @throws IllegalArgumentException if a factory is already registered for 
the combination of
+   *     {@link FormatModel#format()} and {@link FormatModel#type()} ()}
+   */
+  public static void register(FormatModel<?, ?> formatModel) {
+    Pair<FileFormat, Class<?>> key = Pair.of(formatModel.format(), 
formatModel.type());
+    Preconditions.checkArgument(

Review Comment:
   this is a static util class. Clients (like Flink) may call the register 
method again during restart/retry? wondering if it is a good idea to fail on 
registration override? InternalData doesn't fail the repeated registration.



##########
core/src/main/java/org/apache/iceberg/io/FormatModel.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.util.function.Function;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.FormatModelRegistry;
+import org.apache.iceberg.deletes.PositionDelete;
+
+/**
+ * Interface that provides a unified abstraction for converting between data 
file formats and
+ * input/output data representations.
+ *
+ * <p>{@link FormatModel} serves as a bridge between storage formats ({@link 
FileFormat}) and
+ * expected input/output data structures, optimizing performance through 
direct conversion without
+ * intermediate representations. File format implementations handle the 
low-level parsing details
+ * while the object model determines the in-memory representation used for the 
parsed data.
+ * Together, these provide a consistent API for consuming data files while 
optimizing for specific
+ * processing engines.
+ *
+ * <p>Iceberg provides some built-in object models and processing engines can 
implement custom
+ * object models to integrate with Iceberg's file reading and writing 
capabilities.
+ *
+ * @param <D> output type used for reading data, and input type for writing 
data and deletes
+ * @param <S> the type of the schema for the input/output data
+ */
+public interface FormatModel<D, S> {
+  /** The file format which is read/written by the object model. */
+  FileFormat format();
+
+  /**
+   * Returns the unique identifier for the object model implementation 
processed by this factory.
+   *
+   * <p>The model names (for example: "generic", "spark", "spark-vectorized", 
"flink", "arrow") act
+   * as a contract specifying the expected data structures for both reading 
(converting file formats
+   * into output objects) and writing (converting input objects into file 
formats). This ensures
+   * proper integration between Iceberg's storage layer and processing engines.
+   *
+   * <p>Processing engines can define their own object models by implementing 
this interface and
+   * using their own model name. They can register these models with Iceberg 
by using the {@link
+   * FormatModelRegistry}. This allows custom data representations to be 
seamlessly integrated with
+   * Iceberg's file format handlers.
+   *
+   * @return string identifier for this model implementation
+   */
+  Class<D> type();
+
+  /**
+   * Creates a writer builder for standard data files.
+   *
+   * <p>The returned {@link WriteBuilder} configures and creates a writer that 
converts input
+   * objects into the file format supported by this factory. The builder 
allows for configuration of
+   * various writing aspects like schema, metrics collection, compression, 
encryption.
+   *
+   * <p>The builder follows the fluent pattern for configuring writer 
properties and ultimately
+   * creates a {@link FileAppender} for writing the files.
+   *
+   * @param outputFile destination for the written data
+   * @return configured writer builder
+   */
+  WriteBuilder<D, S> writeBuilder(OutputFile outputFile);
+
+  /**
+   * Creates a function that converts {@link PositionDelete} objects into the 
format-specific
+   * objects which is used for writing position deletes
+   *
+   * @param schema of the position delete row content
+   * @return a function that converts {@link PositionDelete} objects into the 
format-specific
+   *     objects.
+   */
+  Function<PositionDelete<D>, D> positionDeleteConverter(Schema schema);
+
+  /**
+   * Creates a file reader builder for the specified input file.
+   *
+   * <p>The returned {@link ReadBuilder} configures and creates a reader that 
converts data from the

Review Comment:
   nit: similar comment on the Javadoc can be just in ReadBuilder interface



##########
core/src/main/java/org/apache/iceberg/io/FormatModel.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.util.function.Function;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.FormatModelRegistry;
+import org.apache.iceberg.deletes.PositionDelete;
+
+/**
+ * Interface that provides a unified abstraction for converting between data 
file formats and
+ * input/output data representations.
+ *
+ * <p>{@link FormatModel} serves as a bridge between storage formats ({@link 
FileFormat}) and
+ * expected input/output data structures, optimizing performance through 
direct conversion without
+ * intermediate representations. File format implementations handle the 
low-level parsing details
+ * while the object model determines the in-memory representation used for the 
parsed data.
+ * Together, these provide a consistent API for consuming data files while 
optimizing for specific
+ * processing engines.
+ *
+ * <p>Iceberg provides some built-in object models and processing engines can 
implement custom
+ * object models to integrate with Iceberg's file reading and writing 
capabilities.
+ *
+ * @param <D> output type used for reading data, and input type for writing 
data and deletes
+ * @param <S> the type of the schema for the input/output data
+ */
+public interface FormatModel<D, S> {
+  /** The file format which is read/written by the object model. */
+  FileFormat format();
+
+  /**
+   * Returns the unique identifier for the object model implementation 
processed by this factory.
+   *
+   * <p>The model names (for example: "generic", "spark", "spark-vectorized", 
"flink", "arrow") act
+   * as a contract specifying the expected data structures for both reading 
(converting file formats
+   * into output objects) and writing (converting input objects into file 
formats). This ensures
+   * proper integration between Iceberg's storage layer and processing engines.
+   *
+   * <p>Processing engines can define their own object models by implementing 
this interface and
+   * using their own model name. They can register these models with Iceberg 
by using the {@link
+   * FormatModelRegistry}. This allows custom data representations to be 
seamlessly integrated with
+   * Iceberg's file format handlers.
+   *
+   * @return string identifier for this model implementation
+   */
+  Class<D> type();
+
+  /**
+   * Creates a writer builder for standard data files.
+   *
+   * <p>The returned {@link WriteBuilder} configures and creates a writer that 
converts input

Review Comment:
   I am wondering if most of the Javadoc here should be moved to the 
`WriteBuilder` interface?



##########
core/src/main/java/org/apache/iceberg/data/ContentFileWriteBuilder.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.io.DataWriter;
+
+/**
+ * A generic builder interface for creating specialized file writers in the 
Iceberg ecosystem.
+ *
+ * <p>This builder provides a unified configuration API for generating various 
types of content
+ * writers:
+ *
+ * <ul>
+ *   <li>{@link DataWriter} for creating data files with table records
+ *   <li>{@link EqualityDeleteWriter} for creating files with equality-based 
delete records
+ *   <li>{@link PositionDeleteWriter} for creating files with position-based 
delete records
+ * </ul>
+ *
+ * <p>Each concrete implementation configures the underlying file format 
writer while adding
+ * content-specific metadata and behaviors.
+ *
+ * @param <B> the concrete builder type for method chaining
+ */
+interface ContentFileWriteBuilder<B extends ContentFileWriteBuilder<B, S>, S> {

Review Comment:
   I am wondering if these classes should be just put into the `io` sub package?
   
   I remember a previous discussion on the `data` module. but here is still the 
`core` module.



##########
core/src/main/java/org/apache/iceberg/io/FormatModel.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.util.function.Function;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.FormatModelRegistry;
+import org.apache.iceberg.deletes.PositionDelete;
+
+/**
+ * Interface that provides a unified abstraction for converting between data 
file formats and
+ * input/output data representations.
+ *
+ * <p>{@link FormatModel} serves as a bridge between storage formats ({@link 
FileFormat}) and
+ * expected input/output data structures, optimizing performance through 
direct conversion without
+ * intermediate representations. File format implementations handle the 
low-level parsing details
+ * while the object model determines the in-memory representation used for the 
parsed data.
+ * Together, these provide a consistent API for consuming data files while 
optimizing for specific
+ * processing engines.
+ *
+ * <p>Iceberg provides some built-in object models and processing engines can 
implement custom
+ * object models to integrate with Iceberg's file reading and writing 
capabilities.
+ *
+ * @param <D> output type used for reading data, and input type for writing 
data and deletes
+ * @param <S> the type of the schema for the input/output data
+ */
+public interface FormatModel<D, S> {
+  /** The file format which is read/written by the object model. */
+  FileFormat format();
+
+  /**
+   * Returns the unique identifier for the object model implementation 
processed by this factory.
+   *
+   * <p>The model names (for example: "generic", "spark", "spark-vectorized", 
"flink", "arrow") act
+   * as a contract specifying the expected data structures for both reading 
(converting file formats
+   * into output objects) and writing (converting input objects into file 
formats). This ensures
+   * proper integration between Iceberg's storage layer and processing engines.
+   *
+   * <p>Processing engines can define their own object models by implementing 
this interface and
+   * using their own model name. They can register these models with Iceberg 
by using the {@link
+   * FormatModelRegistry}. This allows custom data representations to be 
seamlessly integrated with
+   * Iceberg's file format handlers.
+   *
+   * @return string identifier for this model implementation
+   */
+  Class<D> type();
+
+  /**
+   * Creates a writer builder for standard data files.

Review Comment:
   nit: `standard` is unnecessary and probably a bit confusing



##########
core/src/main/java/org/apache/iceberg/data/ContentFileWriteBuilderImpl.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.WriteBuilder;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * An internal implementation that handles all {@link ContentFileWriteBuilder} 
interface variants.
+ *
+ * <p>This unified implementation serves as a backend for multiple specialized 
content writers:
+ *
+ * <ul>
+ *   <li>{@link DataWriteBuilder} for creating data files
+ *   <li>{@link EqualityDeleteWriteBuilder} for creating equality delete files
+ *   <li>{@link PositionDeleteWriteBuilder} for creating position delete files
+ * </ul>
+ *
+ * <p>The implementation delegates to a format-specific {@link WriteBuilder} 
while enriching it with
+ * content-specific functionality. When building a writer, the implementation 
configures the
+ * underlying builder and calls its {@link WriteBuilder#build()} method to 
create the appropriate
+ * specialized writer for the requested content type.
+ *
+ * @param <B> the concrete builder type for method chaining
+ * @param <D> the type of data records the writer will accept
+ */
+@SuppressWarnings("unchecked")
+abstract class ContentFileWriteBuilderImpl<B extends 
ContentFileWriteBuilder<B, S>, D, S>
+    implements ContentFileWriteBuilder<B, S> {
+  private final WriteBuilder<D, S> writeBuilder;
+  private final String location;
+  private final FileFormat format;
+  private PartitionSpec spec = null;
+  private StructLike partition = null;
+  private EncryptionKeyMetadata keyMetadata = null;
+  private SortOrder sortOrder = null;
+
+  static <D, S> DataWriteBuilder<D, S> forDataFile(
+      WriteBuilder<D, S> writeBuilder, String location, FileFormat format) {
+    return new DataFileWriteBuilder<>(writeBuilder, location, format);
+  }
+
+  static <D, S> EqualityDeleteWriteBuilder<D, S> forEqualityDelete(
+      WriteBuilder<D, S> writeBuilder, String location, FileFormat format) {
+    return new EqualityDeleteFileWriteBuilder<>(writeBuilder, location, 
format);
+  }
+
+  static <D, S> PositionDeleteWriteBuilder<D, S> forPositionDelete(
+      WriteBuilder<D, S> writeBuilder,
+      String location,
+      FileFormat format,
+      Function<Schema, Function<PositionDelete<D>, D>> 
positionDeleteConverter) {
+    return new PositionDeleteFileWriteBuilder<>(
+        writeBuilder, location, format, positionDeleteConverter);
+  }
+
+  private ContentFileWriteBuilderImpl(
+      WriteBuilder<D, S> writeBuilder, String location, FileFormat format) {
+    this.writeBuilder = writeBuilder;
+    this.location = location;
+    this.format = format;
+  }
+
+  @Override
+  public B schema(Schema schema) {
+    writeBuilder.schema(schema);
+    return self();
+  }
+
+  @Override
+  public B inputSchema(S schema) {
+    writeBuilder.inputSchema(schema);
+    return self();
+  }
+
+  @Override
+  public B set(String property, String value) {
+    writeBuilder.set(property, value);
+    return self();
+  }
+
+  @Override
+  public B meta(String property, String value) {
+    writeBuilder.meta(property, value);
+    return self();
+  }
+
+  @Override
+  public B metricsConfig(MetricsConfig metricsConfig) {
+    writeBuilder.metricsConfig(metricsConfig);
+    return self();
+  }
+
+  @Override
+  public B overwrite() {
+    writeBuilder.overwrite();
+    return self();
+  }
+
+  @Override
+  public B fileEncryptionKey(ByteBuffer encryptionKey) {
+    writeBuilder.fileEncryptionKey(encryptionKey);
+    return self();
+  }
+
+  @Override
+  public B fileAADPrefix(ByteBuffer aadPrefix) {
+    writeBuilder.fileAADPrefix(aadPrefix);
+    return self();
+  }
+
+  @Override
+  public B spec(PartitionSpec newSpec) {
+    this.spec = newSpec;
+    return self();
+  }
+
+  @Override
+  public B partition(StructLike newPartition) {
+    this.partition = newPartition;
+    return self();
+  }
+
+  @Override
+  public B keyMetadata(EncryptionKeyMetadata newKeyMetadata) {
+    this.keyMetadata = newKeyMetadata;
+    return self();
+  }
+
+  @Override
+  public B sortOrder(SortOrder newSortOrder) {
+    this.sortOrder = newSortOrder;
+    return self();
+  }
+
+  private static class DataFileWriteBuilder<D, S>
+      extends ContentFileWriteBuilderImpl<DataWriteBuilder<D, S>, D, S>
+      implements DataWriteBuilder<D, S> {
+    private DataFileWriteBuilder(
+        WriteBuilder<D, S> writeBuilder, String location, FileFormat format) {
+      super(writeBuilder, location, format);
+    }
+
+    @Override
+    public DataFileWriteBuilder<D, S> self() {
+      return this;
+    }
+
+    @Override
+    public DataWriter<D> build() throws IOException {
+      Preconditions.checkArgument(super.spec != null, "Cannot create data 
writer without spec");
+      Preconditions.checkArgument(
+          super.spec.isUnpartitioned() || super.partition != null,
+          "Partition must not be null when creating data writer for 
partitioned spec");
+
+      return new DataWriter<>(
+          super.writeBuilder.build(),
+          super.format,
+          super.location,
+          super.spec,
+          super.partition,
+          super.keyMetadata,
+          super.sortOrder);
+    }
+  }
+
+  private static class EqualityDeleteFileWriteBuilder<D, S>
+      extends ContentFileWriteBuilderImpl<EqualityDeleteWriteBuilder<D, S>, D, 
S>
+      implements EqualityDeleteWriteBuilder<D, S> {
+    private Schema rowSchema = null;
+    private int[] equalityFieldIds = null;
+
+    private EqualityDeleteFileWriteBuilder(
+        WriteBuilder<D, S> writeBuilder, String location, FileFormat format) {
+      super(writeBuilder, location, format);
+    }
+
+    @Override
+    public EqualityDeleteFileWriteBuilder<D, S> self() {
+      return this;
+    }
+
+    @Override
+    public EqualityDeleteFileWriteBuilder<D, S> rowSchema(Schema schema) {
+      this.rowSchema = schema;
+      return this;
+    }
+
+    @Override
+    public EqualityDeleteFileWriteBuilder<D, S> equalityFieldIds(int... 
fieldIds) {
+      this.equalityFieldIds = fieldIds;
+      return this;
+    }
+
+    @Override
+    public EqualityDeleteWriter<D> build() throws IOException {
+      Preconditions.checkState(
+          rowSchema != null, "Cannot create equality delete file without a 
schema");
+      Preconditions.checkState(
+          equalityFieldIds != null, "Cannot create equality delete file 
without delete field ids");
+      Preconditions.checkArgument(
+          super.spec != null, "Spec must not be null when creating equality 
delete writer");
+      Preconditions.checkArgument(
+          super.spec.isUnpartitioned() || super.partition != null,
+          "Partition must not be null for partitioned writes");
+
+      return new EqualityDeleteWriter<>(
+          super.writeBuilder
+              .schema(rowSchema)
+              .meta("delete-type", "equality")
+              .meta(
+                  "delete-field-ids",
+                  IntStream.of(equalityFieldIds)
+                      .mapToObj(Objects::toString)
+                      .collect(Collectors.joining(", ")))
+              .build(),
+          super.format,
+          super.location,
+          super.spec,
+          super.partition,
+          super.keyMetadata,
+          super.sortOrder,
+          equalityFieldIds);
+    }
+  }
+
+  private static class PositionDeleteFileWriteBuilder<D, S>
+      extends ContentFileWriteBuilderImpl<PositionDeleteWriteBuilder<D, S>, D, 
S>
+      implements PositionDeleteWriteBuilder<D, S> {
+    private final Function<Schema, Function<PositionDelete<D>, D>> 
positionDeleteConverter;
+    private Schema rowSchema = null;
+
+    private PositionDeleteFileWriteBuilder(
+        WriteBuilder<D, S> writeBuilder,
+        String location,
+        FileFormat format,
+        Function<Schema, Function<PositionDelete<D>, D>> 
positionDeleteConverter) {
+      super(writeBuilder, location, format);
+      this.positionDeleteConverter = positionDeleteConverter;
+    }
+
+    @Override
+    public PositionDeleteFileWriteBuilder<D, S> self() {
+      return this;
+    }
+
+    @Override
+    public PositionDeleteFileWriteBuilder<D, S> rowSchema(Schema schema) {
+      this.rowSchema = schema;
+      return this;
+    }
+
+    @Override
+    public PositionDeleteWriter<D> build() throws IOException {
+      Preconditions.checkArgument(
+          positionDeleteConverter != null,
+          "Positional delete converter must not be null when creating position 
delete writer");
+      Preconditions.checkArgument(
+          super.spec != null, "Spec must not be null when creating position 
delete writer");
+      Preconditions.checkArgument(
+          super.spec.isUnpartitioned() || super.partition != null,
+          "Partition must not be null for partitioned writes");
+
+      return new PositionDeleteWriter<D>(
+          (FileAppender)
+              new ConvertingFileAppender<>(
+                  super.writeBuilder
+                      .meta("delete-type", "position")
+                      .schema(DeleteSchemaUtil.posDeleteSchema(rowSchema))
+                      .build(),
+                  positionDeleteConverter.apply(rowSchema),
+                  rowSchema),
+          super.format,
+          super.location,
+          super.spec,
+          super.partition,
+          super.keyMetadata);
+    }
+  }
+
+  private static class ConvertingFileAppender<D> implements 
FileAppender<PositionDelete<D>> {

Review Comment:
   nit: is `PositionDeleteFileAppender` more informative to capture the purpose 
of this class?
   
   If we prefer to call this `ConvertingFileAppender`, we should avoid the 
`PositionDelete<D>` type reference.  There could be two generic types <S, T>.
   
   e.g. t
   ```
   private static class ConvertingFileAppender<S, T> implements FileAppender<S> 
{
   
       ConvertingFileAppender(
           FileAppender<T> appender, Function<S, T> converter, Schema rowSchema)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to