stevenzwu commented on code in PR #12774:
URL: https://github.com/apache/iceberg/pull/12774#discussion_r2098442003


##########
core/src/main/java/org/apache/iceberg/io/WriteBuilder.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+
+/**
+ * Builder interface for creating file writers across supported data file 
formats. Each {@link
+ * FileAccessFactory} implementation provides appropriate {@link WriteBuilder} 
instances based on:
+ *
+ * <ul>
+ *   <li>target file format (Parquet, Avro, ORC)
+ *   <li>engine-specific object representation (spark, flink, generic, etc.)
+ *   <li>content type ({@link FileContent#DATA}, {@link 
FileContent#EQUALITY_DELETES}, {@link
+ *       FileContent#POSITION_DELETES})
+ * </ul>
+ *
+ * The {@link WriteBuilder} follows the builder pattern to configure and 
create {@link FileAppender}
+ * instances that write data to the target output files.
+ *
+ * @param <B> the concrete builder type for method chaining
+ * @param <E> engine-specific schema type for the input data records
+ */
+public interface WriteBuilder<B extends WriteBuilder<B, E>, E> {
+  /** Set the file schema. */
+  B schema(Schema newSchema);
+
+  /**
+   * Set a writer configuration property which affects the writer behavior.
+   *
+   * @param property a writer config property name
+   * @param value config value
+   * @return this for method chaining
+   */
+  B set(String property, String value);
+
+  default B set(Map<String, String> properties) {
+    properties.forEach(this::set);
+    return (B) this;
+  }
+
+  /**
+   * Set a file metadata property in the created file.
+   *
+   * @param property a file metadata property name
+   * @param value config value
+   * @return this for method chaining
+   */
+  B meta(String property, String value);
+
+  /** Sets the metrics configuration used for collecting column metrics for 
the created file. */
+  B metricsConfig(MetricsConfig newMetricsConfig);
+
+  /** Overwrite the file if it already exists. By default, overwrite is 
disabled. */
+  B overwrite();
+
+  /**
+   * Overwrite the file if it already exists. The default value is 
<code>false</code>.
+   *
+   * @deprecated Since 1.10.0, will be removed in 1.11.0. Only provided for 
backward compatibility.
+   *     Use {@link #overwrite()} instead.
+   */
+  @Deprecated
+  B overwrite(boolean enabled);
+
+  /**
+   * Sets the encryption key used for writing the file. If the reader does not 
support encryption,
+   * then an exception should be thrown.
+   */
+  default B fileEncryptionKey(ByteBuffer encryptionKey) {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  /**
+   * Sets the additional authentication data (aad) prefix used for writing the 
file. If the reader
+   * does not support encryption, then an exception should be thrown.
+   */
+  default B aadPrefix(ByteBuffer aadPrefix) {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  /**
+   * Sets the engine-specific schema for the input data records.

Review Comment:
   we changed the method name to `dataSchema`. but `engine` is still referenced 
in many places (arg and Javadoc) in this class. maybe they can be renamed to 
`data` as well?



##########
core/src/main/java/org/apache/iceberg/io/WriteBuilder.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+
+/**
+ * Builder interface for creating file writers across supported data file 
formats. Each {@link
+ * FileAccessFactory} implementation provides appropriate {@link WriteBuilder} 
instances based on:
+ *
+ * <ul>
+ *   <li>target file format (Parquet, Avro, ORC)
+ *   <li>engine-specific object representation (spark, flink, generic, etc.)
+ *   <li>content type ({@link FileContent#DATA}, {@link 
FileContent#EQUALITY_DELETES}, {@link
+ *       FileContent#POSITION_DELETES})
+ * </ul>
+ *
+ * The {@link WriteBuilder} follows the builder pattern to configure and 
create {@link FileAppender}
+ * instances that write data to the target output files.
+ *
+ * @param <B> the concrete builder type for method chaining
+ * @param <E> engine-specific schema type for the input data records
+ */
+public interface WriteBuilder<B extends WriteBuilder<B, E>, E> {
+  /** Set the file schema. */

Review Comment:
   nit: is `fileSchema` a little more clear?
   
   So the assumption is that the file writer would convert Iceberg schema to 
the underneath file schema like (Parquet `MessageType`)?
   



##########
data/src/main/java/org/apache/iceberg/data/ContentFileWriteBuilderImpl.java:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.WriteBuilder;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.ArrayUtil;
+
+/**
+ * An internal implementation that handles all {@link ContentFileWriteBuilder} 
interface variants.
+ *
+ * <p>This unified implementation serves as a backend for multiple specialized 
content writers:
+ *
+ * <ul>
+ *   <li>{@link DataWriteBuilder} for creating data files
+ *   <li>{@link EqualityDeleteWriteBuilder} for creating equality delete files
+ *   <li>{@link PositionDeleteWriteBuilder} for creating position delete files
+ * </ul>
+ *
+ * <p>The implementation delegates to a format-specific {@link WriteBuilder} 
while enriching it with
+ * content-specific functionality. When building a writer, the implementation 
configures the
+ * underlying builder and calls its {@link WriteBuilder#build()} method to 
create the appropriate
+ * specialized writer for the requested content type.
+ *
+ * @param <C> the concrete builder type for method chaining
+ * @param <W> the type of the wrapped format-specific writer builder
+ * @param <E> the engine-specific schema type required by the writer
+ */
+@SuppressWarnings("unchecked")
+class ContentFileWriteBuilderImpl<
+        C extends ContentFileWriteBuilderImpl<C, W, E>, W extends 
WriteBuilder<W, E>, E>
+    implements DataWriteBuilder<C, E>,
+        EqualityDeleteWriteBuilder<C, E>,
+        PositionDeleteWriteBuilder<C, E> {
+  private final org.apache.iceberg.io.WriteBuilder<W, E> writeBuilder;
+  private final String location;
+  private final FileFormat format;
+  private PartitionSpec spec = null;
+  private StructLike partition = null;
+  private EncryptionKeyMetadata keyMetadata = null;
+  private SortOrder sortOrder = null;
+  private Schema rowSchema = null;
+  private int[] equalityFieldIds = null;
+
+  ContentFileWriteBuilderImpl(
+      org.apache.iceberg.io.WriteBuilder<W, E> writeBuilder, String location, 
FileFormat format) {
+    this.writeBuilder = writeBuilder;
+    this.location = location;
+    this.format = format;
+  }
+
+  @Override
+  public C schema(Schema newSchema) {
+    writeBuilder.schema(newSchema);
+    return (C) this;
+  }
+
+  @Override
+  public C dataSchema(E engineSchema) {
+    writeBuilder.dataSchema(engineSchema);
+    return (C) this;
+  }
+
+  @Override
+  public C set(String property, String value) {
+    writeBuilder.set(property, value);
+    return (C) this;
+  }
+
+  @Override
+  public C set(Map<String, String> properties) {
+    properties.forEach(writeBuilder::set);
+    return (C) this;
+  }
+
+  @Override
+  public C meta(String property, String value) {
+    writeBuilder.meta(property, value);
+    return (C) this;
+  }
+
+  @Override
+  public C meta(Map<String, String> properties) {
+    properties.forEach(writeBuilder::meta);
+    return (C) this;
+  }
+
+  @Override
+  public C metricsConfig(MetricsConfig newMetricsConfig) {
+    writeBuilder.metricsConfig(newMetricsConfig);
+    return (C) this;
+  }
+
+  @Override
+  public C overwrite() {
+    writeBuilder.overwrite();
+    return (C) this;
+  }
+
+  @Override
+  public C fileEncryptionKey(ByteBuffer encryptionKey) {
+    writeBuilder.fileEncryptionKey(encryptionKey);
+    return (C) this;
+  }
+
+  @Override
+  public C aadPrefix(ByteBuffer aadPrefix) {
+    writeBuilder.aadPrefix(aadPrefix);
+    return (C) this;
+  }
+
+  @Override
+  public C rowSchema(Schema newSchema) {
+    this.rowSchema = newSchema;
+    return (C) this;
+  }
+
+  @Override
+  public C equalityFieldIds(List<Integer> fieldIds) {
+    this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
+    return (C) this;
+  }
+
+  @Override
+  public C equalityFieldIds(int... fieldIds) {
+    this.equalityFieldIds = fieldIds;
+    return (C) this;
+  }
+
+  @Override
+  public C spec(PartitionSpec newSpec) {
+    this.spec = newSpec;
+    return (C) this;
+  }
+
+  @Override
+  public C partition(StructLike newPartition) {
+    this.partition = newPartition;
+    return (C) this;
+  }
+
+  @Override
+  public C keyMetadata(EncryptionKeyMetadata metadata) {
+    this.keyMetadata = metadata;
+    return (C) this;
+  }
+
+  @Override
+  public C sortOrder(SortOrder newSortOrder) {
+    this.sortOrder = newSortOrder;
+    return (C) this;
+  }
+
+  @Override
+  public <D> DataWriter<D> dataWriter() throws IOException {
+    Preconditions.checkArgument(spec != null, "Cannot create data writer 
without spec");
+    Preconditions.checkArgument(
+        spec.isUnpartitioned() || partition != null,
+        "Partition must not be null when creating data writer for partitioned 
spec");
+
+    return new DataWriter<>(
+        writeBuilder.build(), format, location, spec, partition, keyMetadata, 
sortOrder);
+  }
+
+  @Override
+  public <D> EqualityDeleteWriter<D> equalityDeleteWriter() throws IOException 
{
+    Preconditions.checkState(
+        rowSchema != null, "Cannot create equality delete file without a 
schema");
+    Preconditions.checkState(
+        equalityFieldIds != null, "Cannot create equality delete file without 
delete field ids");
+    Preconditions.checkArgument(
+        spec != null, "Spec must not be null when creating equality delete 
writer");
+    Preconditions.checkArgument(
+        spec.isUnpartitioned() || partition != null,
+        "Partition must not be null for partitioned writes");
+
+    return new EqualityDeleteWriter<>(
+        writeBuilder
+            .schema(rowSchema)
+            .meta("delete-type", "equality")

Review Comment:
   I know this is the practice of existing code. but these constants have been 
repeated in multiple classes (for each file format). should we define some 
constants in the `iceberg-data` module?



##########
core/src/main/java/org/apache/iceberg/io/WriteBuilder.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+
+/**
+ * Builder interface for creating file writers across supported data file 
formats. Each {@link
+ * FileAccessFactory} implementation provides appropriate {@link WriteBuilder} 
instances based on:
+ *
+ * <ul>
+ *   <li>target file format (Parquet, Avro, ORC)
+ *   <li>engine-specific object representation (spark, flink, generic, etc.)
+ *   <li>content type ({@link FileContent#DATA}, {@link 
FileContent#EQUALITY_DELETES}, {@link
+ *       FileContent#POSITION_DELETES})
+ * </ul>
+ *
+ * The {@link WriteBuilder} follows the builder pattern to configure and 
create {@link FileAppender}
+ * instances that write data to the target output files.
+ *
+ * @param <B> the concrete builder type for method chaining
+ * @param <E> engine-specific schema type for the input data records
+ */
+public interface WriteBuilder<B extends WriteBuilder<B, E>, E> {
+  /** Set the file schema. */
+  B schema(Schema newSchema);
+
+  /**
+   * Set a writer configuration property which affects the writer behavior.
+   *
+   * @param property a writer config property name
+   * @param value config value
+   * @return this for method chaining
+   */
+  B set(String property, String value);
+
+  default B set(Map<String, String> properties) {
+    properties.forEach(this::set);
+    return (B) this;
+  }
+
+  /**
+   * Set a file metadata property in the created file.
+   *
+   * @param property a file metadata property name
+   * @param value config value
+   * @return this for method chaining
+   */
+  B meta(String property, String value);
+
+  /** Sets the metrics configuration used for collecting column metrics for 
the created file. */
+  B metricsConfig(MetricsConfig newMetricsConfig);
+
+  /** Overwrite the file if it already exists. By default, overwrite is 
disabled. */
+  B overwrite();
+
+  /**
+   * Overwrite the file if it already exists. The default value is 
<code>false</code>.
+   *
+   * @deprecated Since 1.10.0, will be removed in 1.11.0. Only provided for 
backward compatibility.
+   *     Use {@link #overwrite()} instead.
+   */
+  @Deprecated
+  B overwrite(boolean enabled);
+
+  /**
+   * Sets the encryption key used for writing the file. If the reader does not 
support encryption,
+   * then an exception should be thrown.
+   */
+  default B fileEncryptionKey(ByteBuffer encryptionKey) {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  /**
+   * Sets the additional authentication data (aad) prefix used for writing the 
file. If the reader
+   * does not support encryption, then an exception should be thrown.
+   */
+  default B aadPrefix(ByteBuffer aadPrefix) {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  /**
+   * Sets the engine-specific schema for the input data records.
+   *
+   * <p>This method is necessary when the mapping between engine types and 
Iceberg types is not
+   * one-to-one. For example, when multiple engine types could map to the same 
Iceberg type, or when
+   * schema metadata beyond the structure is needed to properly interpret the 
data.
+   *
+   * <p>While the Iceberg schema defines the expected output structure, the 
engine schema provides
+   * the exact input format details needed for proper type conversion.
+   *
+   * @param newEngineSchema the native schema representation from the engine 
(Spark, Flink, etc.)
+   * @return this builder for method chaining
+   */
+  B dataSchema(E newEngineSchema);
+
+  /** Finalizes the configuration and builds the {@link FileAppender}. */
+  <D> FileAppender<D> build() throws IOException;

Review Comment:
   `D` is not defined. should `D` be added to the class generic type params?
   
   If we use Flink as an example, `E` would be Flink `RowType` and `D` would be 
`RowData`?
   



##########
data/src/main/java/org/apache/iceberg/data/ContentFileWriteBuilderImpl.java:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.WriteBuilder;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.ArrayUtil;
+
+/**
+ * An internal implementation that handles all {@link ContentFileWriteBuilder} 
interface variants.
+ *
+ * <p>This unified implementation serves as a backend for multiple specialized 
content writers:
+ *
+ * <ul>
+ *   <li>{@link DataWriteBuilder} for creating data files
+ *   <li>{@link EqualityDeleteWriteBuilder} for creating equality delete files
+ *   <li>{@link PositionDeleteWriteBuilder} for creating position delete files
+ * </ul>
+ *
+ * <p>The implementation delegates to a format-specific {@link WriteBuilder} 
while enriching it with
+ * content-specific functionality. When building a writer, the implementation 
configures the
+ * underlying builder and calls its {@link WriteBuilder#build()} method to 
create the appropriate
+ * specialized writer for the requested content type.
+ *
+ * @param <C> the concrete builder type for method chaining
+ * @param <W> the type of the wrapped format-specific writer builder
+ * @param <E> the engine-specific schema type required by the writer
+ */
+@SuppressWarnings("unchecked")
+class ContentFileWriteBuilderImpl<
+        C extends ContentFileWriteBuilderImpl<C, W, E>, W extends 
WriteBuilder<W, E>, E>
+    implements DataWriteBuilder<C, E>,

Review Comment:
   does it make sense to have those 3 additional interfaces? can they be merged 
into the `ContentFileWriteBuilder` interface if these 3 builder interfaces 
aren't needed outside the data module?



##########
core/src/main/java/org/apache/iceberg/io/ReadBuilder.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.mapping.NameMapping;
+
+/**
+ * Builder interface for creating file readers across supported data file 
formats. Each {@link
+ * FileAccessFactory} implementation provides appropriate {@link ReadBuilder} 
instances based on:
+ *
+ * <ul>
+ *   <li>source file format (Parquet, Avro, ORC)
+ *   <li>engine-specific object representation (spark, flink, generic, etc.)
+ * </ul>
+ *
+ * <p>The {@link ReadBuilder} follows the builder pattern to configure and 
create {@link
+ * CloseableIterable} instances that read data from source files. 
Configuration options include
+ * schema projection, predicate filtering, record batching, and encryption 
settings.
+ *
+ * <p>This interface is directly exposed to users for parameterizing readers.
+ *
+ * @param <B> the concrete builder type for method chaining
+ */
+public interface ReadBuilder<B extends ReadBuilder<B>> {
+  /** The configuration key for the batch size in the case of vectorized 
reads. */
+  String RECORDS_PER_BATCH_KEY = "iceberg.records-per-batch";
+
+  /**
+   * Restricts the read to the given range: [start, start + length).
+   *
+   * @param newStart the start position for this read
+   * @param newLength the length of the range this read should scan
+   */
+  B split(long newStart, long newLength);
+
+  /** Read only the given columns. */
+  B project(Schema newSchema);
+
+  /**
+   * Pushes down the {@link Expression} filter for the reader to prevent 
reading unnecessary
+   * records. Some readers might not be able to filter some parts of the 
expression. In this case
+   * the reader might return unfiltered or partially filtered rows. It is the 
caller's
+   * responsibility to apply the filter again.
+   *
+   * @param newFilter the filter to set
+   * @param filterCaseSensitive whether the filtering is case-sensitive or not
+   */
+  default B filter(Expression newFilter, boolean filterCaseSensitive) {

Review Comment:
   should we break the `caseSensitive` to a separate method and remove this 
one? that seems to be the approach for the existing code.



##########
data/src/main/java/org/apache/iceberg/data/PositionDeleteWriteBuilder.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.io.IOException;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+
+/**
+ * A specialized builder for creating position-based delete file writers.
+ *
+ * <p>This builder extends the generic {@link ContentFileWriteBuilder} 
interface with functionality
+ * specific to creating {@link PositionDeleteWriter} instances.
+ *
+ * <p>The builder provides methods to configure the schema for the row data 
that might be included
+ * with the position deletes through {@link #rowSchema(Schema)}, enabling 
optional preservation of
+ * deleted record content.
+ *
+ * @param <B> the concrete builder type for method chaining
+ * @param <E> engine-specific schema type required by the writer for data 
conversion
+ */
+public interface PositionDeleteWriteBuilder<B extends 
PositionDeleteWriteBuilder<B, E>, E>
+    extends ContentFileWriteBuilder<B, E> {
+  /** Sets the row schema for the delete writers. */
+  B rowSchema(Schema newSchema);

Review Comment:
      * <p>If {@link #rowSchema(Schema)} is configured, the position delete 
records should include the
      * content of the deleted rows. These row values should match the engine 
schema specified via
      * {@link #dataSchema(Object)} and will be converted to the target Iceberg 
schema defined by
      * {@link #rowSchema(Schema)}.
   
   Maybe move the Javadoc from the method below to this method. do we just need 
a boolean flag? otherwise, we need to validate that `rowSchema` matches 
`dataSchema`. 
   
   I guess this is also obsolete with the V3 delete vector where content of the 
deleted rows are never included.



##########
data/src/main/java/org/apache/iceberg/data/ContentFileWriteBuilder.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.WriteBuilder;
+
+/**
+ * A generic builder interface for creating specialized file writers in the 
Iceberg ecosystem.
+ *
+ * <p>This builder provides a unified configuration API for generating various 
types of content
+ * writers:
+ *
+ * <ul>
+ *   <li>{@link DataWriter} for creating data files with table records
+ *   <li>{@link EqualityDeleteWriter} for creating files with equality-based 
delete records
+ *   <li>{@link PositionDeleteWriter} for creating files with position-based 
delete records
+ * </ul>
+ *
+ * <p>Each concrete implementation configures the underlying file format 
writer while adding
+ * content-specific metadata and behaviors.
+ *
+ * @param <B> the concrete builder type for method chaining
+ * @param <E> engine-specific schema type required by the writer for data 
conversion
+ */
+interface ContentFileWriteBuilder<B extends ContentFileWriteBuilder<B, E>, E> {

Review Comment:
   this interface is highly redundant with the new `WriteBuilder` from core 
module. maybe extract some base interface, like the `WriterBuilderBase`. the 
base interface would need to live in core so that the `WriteBuilder` can use it.



##########
core/src/main/java/org/apache/iceberg/io/WriteBuilder.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+
+/**
+ * Builder interface for creating file writers across supported data file 
formats. Each {@link
+ * FileAccessFactory} implementation provides appropriate {@link WriteBuilder} 
instances based on:
+ *
+ * <ul>
+ *   <li>target file format (Parquet, Avro, ORC)
+ *   <li>engine-specific object representation (spark, flink, generic, etc.)
+ *   <li>content type ({@link FileContent#DATA}, {@link 
FileContent#EQUALITY_DELETES}, {@link
+ *       FileContent#POSITION_DELETES})
+ * </ul>
+ *
+ * The {@link WriteBuilder} follows the builder pattern to configure and 
create {@link FileAppender}
+ * instances that write data to the target output files.
+ *
+ * @param <B> the concrete builder type for method chaining
+ * @param <E> engine-specific schema type for the input data records
+ */
+public interface WriteBuilder<B extends WriteBuilder<B, E>, E> {
+  /** Set the file schema. */
+  B schema(Schema newSchema);
+
+  /**
+   * Set a writer configuration property which affects the writer behavior.
+   *
+   * @param property a writer config property name
+   * @param value config value
+   * @return this for method chaining
+   */
+  B set(String property, String value);
+
+  default B set(Map<String, String> properties) {
+    properties.forEach(this::set);
+    return (B) this;
+  }
+
+  /**
+   * Set a file metadata property in the created file.
+   *
+   * @param property a file metadata property name
+   * @param value config value
+   * @return this for method chaining
+   */
+  B meta(String property, String value);
+
+  /** Sets the metrics configuration used for collecting column metrics for 
the created file. */
+  B metricsConfig(MetricsConfig newMetricsConfig);
+
+  /** Overwrite the file if it already exists. By default, overwrite is 
disabled. */
+  B overwrite();
+
+  /**
+   * Overwrite the file if it already exists. The default value is 
<code>false</code>.
+   *
+   * @deprecated Since 1.10.0, will be removed in 1.11.0. Only provided for 
backward compatibility.
+   *     Use {@link #overwrite()} instead.
+   */
+  @Deprecated
+  B overwrite(boolean enabled);
+
+  /**
+   * Sets the encryption key used for writing the file. If the reader does not 
support encryption,
+   * then an exception should be thrown.
+   */
+  default B fileEncryptionKey(ByteBuffer encryptionKey) {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  /**
+   * Sets the additional authentication data (aad) prefix used for writing the 
file. If the reader
+   * does not support encryption, then an exception should be thrown.
+   */
+  default B aadPrefix(ByteBuffer aadPrefix) {

Review Comment:
   nit: I see both `aadPrefix` and `fileAADPrefiix` are used in existing code. 
maybe `fileAADPrefix` is a little more appealing to the eye and avoid the 
confusion with typo. It is also more consistent with the above method naming 
`fileEncryptionKey`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to