stevenzwu commented on code in PR #12774: URL: https://github.com/apache/iceberg/pull/12774#discussion_r2098442003
########## core/src/main/java/org/apache/iceberg/io/WriteBuilder.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; + +/** + * Builder interface for creating file writers across supported data file formats. Each {@link + * FileAccessFactory} implementation provides appropriate {@link WriteBuilder} instances based on: + * + * <ul> + * <li>target file format (Parquet, Avro, ORC) + * <li>engine-specific object representation (spark, flink, generic, etc.) + * <li>content type ({@link FileContent#DATA}, {@link FileContent#EQUALITY_DELETES}, {@link + * FileContent#POSITION_DELETES}) + * </ul> + * + * The {@link WriteBuilder} follows the builder pattern to configure and create {@link FileAppender} + * instances that write data to the target output files. + * + * @param <B> the concrete builder type for method chaining + * @param <E> engine-specific schema type for the input data records + */ +public interface WriteBuilder<B extends WriteBuilder<B, E>, E> { + /** Set the file schema. */ + B schema(Schema newSchema); + + /** + * Set a writer configuration property which affects the writer behavior. + * + * @param property a writer config property name + * @param value config value + * @return this for method chaining + */ + B set(String property, String value); + + default B set(Map<String, String> properties) { + properties.forEach(this::set); + return (B) this; + } + + /** + * Set a file metadata property in the created file. + * + * @param property a file metadata property name + * @param value config value + * @return this for method chaining + */ + B meta(String property, String value); + + /** Sets the metrics configuration used for collecting column metrics for the created file. */ + B metricsConfig(MetricsConfig newMetricsConfig); + + /** Overwrite the file if it already exists. By default, overwrite is disabled. */ + B overwrite(); + + /** + * Overwrite the file if it already exists. The default value is <code>false</code>. + * + * @deprecated Since 1.10.0, will be removed in 1.11.0. Only provided for backward compatibility. + * Use {@link #overwrite()} instead. + */ + @Deprecated + B overwrite(boolean enabled); + + /** + * Sets the encryption key used for writing the file. If the reader does not support encryption, + * then an exception should be thrown. + */ + default B fileEncryptionKey(ByteBuffer encryptionKey) { + throw new UnsupportedOperationException("Not supported"); + } + + /** + * Sets the additional authentication data (aad) prefix used for writing the file. If the reader + * does not support encryption, then an exception should be thrown. + */ + default B aadPrefix(ByteBuffer aadPrefix) { + throw new UnsupportedOperationException("Not supported"); + } + + /** + * Sets the engine-specific schema for the input data records. Review Comment: we changed the method name to `dataSchema`. but `engine` is still referenced in many places (arg and Javadoc) in this class. maybe they can be renamed to `data` as well? ########## core/src/main/java/org/apache/iceberg/io/WriteBuilder.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; + +/** + * Builder interface for creating file writers across supported data file formats. Each {@link + * FileAccessFactory} implementation provides appropriate {@link WriteBuilder} instances based on: + * + * <ul> + * <li>target file format (Parquet, Avro, ORC) + * <li>engine-specific object representation (spark, flink, generic, etc.) + * <li>content type ({@link FileContent#DATA}, {@link FileContent#EQUALITY_DELETES}, {@link + * FileContent#POSITION_DELETES}) + * </ul> + * + * The {@link WriteBuilder} follows the builder pattern to configure and create {@link FileAppender} + * instances that write data to the target output files. + * + * @param <B> the concrete builder type for method chaining + * @param <E> engine-specific schema type for the input data records + */ +public interface WriteBuilder<B extends WriteBuilder<B, E>, E> { + /** Set the file schema. */ Review Comment: nit: is `fileSchema` a little more clear? So the assumption is that the file writer would convert Iceberg schema to the underneath file schema like (Parquet `MessageType`)? ########## data/src/main/java/org/apache/iceberg/data/ContentFileWriteBuilderImpl.java: ########## @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.WriteBuilder; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.ArrayUtil; + +/** + * An internal implementation that handles all {@link ContentFileWriteBuilder} interface variants. + * + * <p>This unified implementation serves as a backend for multiple specialized content writers: + * + * <ul> + * <li>{@link DataWriteBuilder} for creating data files + * <li>{@link EqualityDeleteWriteBuilder} for creating equality delete files + * <li>{@link PositionDeleteWriteBuilder} for creating position delete files + * </ul> + * + * <p>The implementation delegates to a format-specific {@link WriteBuilder} while enriching it with + * content-specific functionality. When building a writer, the implementation configures the + * underlying builder and calls its {@link WriteBuilder#build()} method to create the appropriate + * specialized writer for the requested content type. + * + * @param <C> the concrete builder type for method chaining + * @param <W> the type of the wrapped format-specific writer builder + * @param <E> the engine-specific schema type required by the writer + */ +@SuppressWarnings("unchecked") +class ContentFileWriteBuilderImpl< + C extends ContentFileWriteBuilderImpl<C, W, E>, W extends WriteBuilder<W, E>, E> + implements DataWriteBuilder<C, E>, + EqualityDeleteWriteBuilder<C, E>, + PositionDeleteWriteBuilder<C, E> { + private final org.apache.iceberg.io.WriteBuilder<W, E> writeBuilder; + private final String location; + private final FileFormat format; + private PartitionSpec spec = null; + private StructLike partition = null; + private EncryptionKeyMetadata keyMetadata = null; + private SortOrder sortOrder = null; + private Schema rowSchema = null; + private int[] equalityFieldIds = null; + + ContentFileWriteBuilderImpl( + org.apache.iceberg.io.WriteBuilder<W, E> writeBuilder, String location, FileFormat format) { + this.writeBuilder = writeBuilder; + this.location = location; + this.format = format; + } + + @Override + public C schema(Schema newSchema) { + writeBuilder.schema(newSchema); + return (C) this; + } + + @Override + public C dataSchema(E engineSchema) { + writeBuilder.dataSchema(engineSchema); + return (C) this; + } + + @Override + public C set(String property, String value) { + writeBuilder.set(property, value); + return (C) this; + } + + @Override + public C set(Map<String, String> properties) { + properties.forEach(writeBuilder::set); + return (C) this; + } + + @Override + public C meta(String property, String value) { + writeBuilder.meta(property, value); + return (C) this; + } + + @Override + public C meta(Map<String, String> properties) { + properties.forEach(writeBuilder::meta); + return (C) this; + } + + @Override + public C metricsConfig(MetricsConfig newMetricsConfig) { + writeBuilder.metricsConfig(newMetricsConfig); + return (C) this; + } + + @Override + public C overwrite() { + writeBuilder.overwrite(); + return (C) this; + } + + @Override + public C fileEncryptionKey(ByteBuffer encryptionKey) { + writeBuilder.fileEncryptionKey(encryptionKey); + return (C) this; + } + + @Override + public C aadPrefix(ByteBuffer aadPrefix) { + writeBuilder.aadPrefix(aadPrefix); + return (C) this; + } + + @Override + public C rowSchema(Schema newSchema) { + this.rowSchema = newSchema; + return (C) this; + } + + @Override + public C equalityFieldIds(List<Integer> fieldIds) { + this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds); + return (C) this; + } + + @Override + public C equalityFieldIds(int... fieldIds) { + this.equalityFieldIds = fieldIds; + return (C) this; + } + + @Override + public C spec(PartitionSpec newSpec) { + this.spec = newSpec; + return (C) this; + } + + @Override + public C partition(StructLike newPartition) { + this.partition = newPartition; + return (C) this; + } + + @Override + public C keyMetadata(EncryptionKeyMetadata metadata) { + this.keyMetadata = metadata; + return (C) this; + } + + @Override + public C sortOrder(SortOrder newSortOrder) { + this.sortOrder = newSortOrder; + return (C) this; + } + + @Override + public <D> DataWriter<D> dataWriter() throws IOException { + Preconditions.checkArgument(spec != null, "Cannot create data writer without spec"); + Preconditions.checkArgument( + spec.isUnpartitioned() || partition != null, + "Partition must not be null when creating data writer for partitioned spec"); + + return new DataWriter<>( + writeBuilder.build(), format, location, spec, partition, keyMetadata, sortOrder); + } + + @Override + public <D> EqualityDeleteWriter<D> equalityDeleteWriter() throws IOException { + Preconditions.checkState( + rowSchema != null, "Cannot create equality delete file without a schema"); + Preconditions.checkState( + equalityFieldIds != null, "Cannot create equality delete file without delete field ids"); + Preconditions.checkArgument( + spec != null, "Spec must not be null when creating equality delete writer"); + Preconditions.checkArgument( + spec.isUnpartitioned() || partition != null, + "Partition must not be null for partitioned writes"); + + return new EqualityDeleteWriter<>( + writeBuilder + .schema(rowSchema) + .meta("delete-type", "equality") Review Comment: I know this is the practice of existing code. but these constants have been repeated in multiple classes (for each file format). should we define some constants in the `iceberg-data` module? ########## core/src/main/java/org/apache/iceberg/io/WriteBuilder.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; + +/** + * Builder interface for creating file writers across supported data file formats. Each {@link + * FileAccessFactory} implementation provides appropriate {@link WriteBuilder} instances based on: + * + * <ul> + * <li>target file format (Parquet, Avro, ORC) + * <li>engine-specific object representation (spark, flink, generic, etc.) + * <li>content type ({@link FileContent#DATA}, {@link FileContent#EQUALITY_DELETES}, {@link + * FileContent#POSITION_DELETES}) + * </ul> + * + * The {@link WriteBuilder} follows the builder pattern to configure and create {@link FileAppender} + * instances that write data to the target output files. + * + * @param <B> the concrete builder type for method chaining + * @param <E> engine-specific schema type for the input data records + */ +public interface WriteBuilder<B extends WriteBuilder<B, E>, E> { + /** Set the file schema. */ + B schema(Schema newSchema); + + /** + * Set a writer configuration property which affects the writer behavior. + * + * @param property a writer config property name + * @param value config value + * @return this for method chaining + */ + B set(String property, String value); + + default B set(Map<String, String> properties) { + properties.forEach(this::set); + return (B) this; + } + + /** + * Set a file metadata property in the created file. + * + * @param property a file metadata property name + * @param value config value + * @return this for method chaining + */ + B meta(String property, String value); + + /** Sets the metrics configuration used for collecting column metrics for the created file. */ + B metricsConfig(MetricsConfig newMetricsConfig); + + /** Overwrite the file if it already exists. By default, overwrite is disabled. */ + B overwrite(); + + /** + * Overwrite the file if it already exists. The default value is <code>false</code>. + * + * @deprecated Since 1.10.0, will be removed in 1.11.0. Only provided for backward compatibility. + * Use {@link #overwrite()} instead. + */ + @Deprecated + B overwrite(boolean enabled); + + /** + * Sets the encryption key used for writing the file. If the reader does not support encryption, + * then an exception should be thrown. + */ + default B fileEncryptionKey(ByteBuffer encryptionKey) { + throw new UnsupportedOperationException("Not supported"); + } + + /** + * Sets the additional authentication data (aad) prefix used for writing the file. If the reader + * does not support encryption, then an exception should be thrown. + */ + default B aadPrefix(ByteBuffer aadPrefix) { + throw new UnsupportedOperationException("Not supported"); + } + + /** + * Sets the engine-specific schema for the input data records. + * + * <p>This method is necessary when the mapping between engine types and Iceberg types is not + * one-to-one. For example, when multiple engine types could map to the same Iceberg type, or when + * schema metadata beyond the structure is needed to properly interpret the data. + * + * <p>While the Iceberg schema defines the expected output structure, the engine schema provides + * the exact input format details needed for proper type conversion. + * + * @param newEngineSchema the native schema representation from the engine (Spark, Flink, etc.) + * @return this builder for method chaining + */ + B dataSchema(E newEngineSchema); + + /** Finalizes the configuration and builds the {@link FileAppender}. */ + <D> FileAppender<D> build() throws IOException; Review Comment: `D` is not defined. should `D` be added to the class generic type params? If we use Flink as an example, `E` would be Flink `RowType` and `D` would be `RowData`? ########## data/src/main/java/org/apache/iceberg/data/ContentFileWriteBuilderImpl.java: ########## @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.WriteBuilder; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.ArrayUtil; + +/** + * An internal implementation that handles all {@link ContentFileWriteBuilder} interface variants. + * + * <p>This unified implementation serves as a backend for multiple specialized content writers: + * + * <ul> + * <li>{@link DataWriteBuilder} for creating data files + * <li>{@link EqualityDeleteWriteBuilder} for creating equality delete files + * <li>{@link PositionDeleteWriteBuilder} for creating position delete files + * </ul> + * + * <p>The implementation delegates to a format-specific {@link WriteBuilder} while enriching it with + * content-specific functionality. When building a writer, the implementation configures the + * underlying builder and calls its {@link WriteBuilder#build()} method to create the appropriate + * specialized writer for the requested content type. + * + * @param <C> the concrete builder type for method chaining + * @param <W> the type of the wrapped format-specific writer builder + * @param <E> the engine-specific schema type required by the writer + */ +@SuppressWarnings("unchecked") +class ContentFileWriteBuilderImpl< + C extends ContentFileWriteBuilderImpl<C, W, E>, W extends WriteBuilder<W, E>, E> + implements DataWriteBuilder<C, E>, Review Comment: does it make sense to have those 3 additional interfaces? can they be merged into the `ContentFileWriteBuilder` interface if these 3 builder interfaces aren't needed outside the data module? ########## core/src/main/java/org/apache/iceberg/io/ReadBuilder.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.mapping.NameMapping; + +/** + * Builder interface for creating file readers across supported data file formats. Each {@link + * FileAccessFactory} implementation provides appropriate {@link ReadBuilder} instances based on: + * + * <ul> + * <li>source file format (Parquet, Avro, ORC) + * <li>engine-specific object representation (spark, flink, generic, etc.) + * </ul> + * + * <p>The {@link ReadBuilder} follows the builder pattern to configure and create {@link + * CloseableIterable} instances that read data from source files. Configuration options include + * schema projection, predicate filtering, record batching, and encryption settings. + * + * <p>This interface is directly exposed to users for parameterizing readers. + * + * @param <B> the concrete builder type for method chaining + */ +public interface ReadBuilder<B extends ReadBuilder<B>> { + /** The configuration key for the batch size in the case of vectorized reads. */ + String RECORDS_PER_BATCH_KEY = "iceberg.records-per-batch"; + + /** + * Restricts the read to the given range: [start, start + length). + * + * @param newStart the start position for this read + * @param newLength the length of the range this read should scan + */ + B split(long newStart, long newLength); + + /** Read only the given columns. */ + B project(Schema newSchema); + + /** + * Pushes down the {@link Expression} filter for the reader to prevent reading unnecessary + * records. Some readers might not be able to filter some parts of the expression. In this case + * the reader might return unfiltered or partially filtered rows. It is the caller's + * responsibility to apply the filter again. + * + * @param newFilter the filter to set + * @param filterCaseSensitive whether the filtering is case-sensitive or not + */ + default B filter(Expression newFilter, boolean filterCaseSensitive) { Review Comment: should we break the `caseSensitive` to a separate method and remove this one? that seems to be the approach for the existing code. ########## data/src/main/java/org/apache/iceberg/data/PositionDeleteWriteBuilder.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; + +/** + * A specialized builder for creating position-based delete file writers. + * + * <p>This builder extends the generic {@link ContentFileWriteBuilder} interface with functionality + * specific to creating {@link PositionDeleteWriter} instances. + * + * <p>The builder provides methods to configure the schema for the row data that might be included + * with the position deletes through {@link #rowSchema(Schema)}, enabling optional preservation of + * deleted record content. + * + * @param <B> the concrete builder type for method chaining + * @param <E> engine-specific schema type required by the writer for data conversion + */ +public interface PositionDeleteWriteBuilder<B extends PositionDeleteWriteBuilder<B, E>, E> + extends ContentFileWriteBuilder<B, E> { + /** Sets the row schema for the delete writers. */ + B rowSchema(Schema newSchema); Review Comment: * <p>If {@link #rowSchema(Schema)} is configured, the position delete records should include the * content of the deleted rows. These row values should match the engine schema specified via * {@link #dataSchema(Object)} and will be converted to the target Iceberg schema defined by * {@link #rowSchema(Schema)}. Maybe move the Javadoc from the method below to this method. do we just need a boolean flag? otherwise, we need to validate that `rowSchema` matches `dataSchema`. I guess this is also obsolete with the V3 delete vector where content of the deleted rows are never included. ########## data/src/main/java/org/apache/iceberg/data/ContentFileWriteBuilder.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.WriteBuilder; + +/** + * A generic builder interface for creating specialized file writers in the Iceberg ecosystem. + * + * <p>This builder provides a unified configuration API for generating various types of content + * writers: + * + * <ul> + * <li>{@link DataWriter} for creating data files with table records + * <li>{@link EqualityDeleteWriter} for creating files with equality-based delete records + * <li>{@link PositionDeleteWriter} for creating files with position-based delete records + * </ul> + * + * <p>Each concrete implementation configures the underlying file format writer while adding + * content-specific metadata and behaviors. + * + * @param <B> the concrete builder type for method chaining + * @param <E> engine-specific schema type required by the writer for data conversion + */ +interface ContentFileWriteBuilder<B extends ContentFileWriteBuilder<B, E>, E> { Review Comment: this interface is highly redundant with the new `WriteBuilder` from core module. maybe extract some base interface, like the `WriterBuilderBase`. the base interface would need to live in core so that the `WriteBuilder` can use it. ########## core/src/main/java/org/apache/iceberg/io/WriteBuilder.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; + +/** + * Builder interface for creating file writers across supported data file formats. Each {@link + * FileAccessFactory} implementation provides appropriate {@link WriteBuilder} instances based on: + * + * <ul> + * <li>target file format (Parquet, Avro, ORC) + * <li>engine-specific object representation (spark, flink, generic, etc.) + * <li>content type ({@link FileContent#DATA}, {@link FileContent#EQUALITY_DELETES}, {@link + * FileContent#POSITION_DELETES}) + * </ul> + * + * The {@link WriteBuilder} follows the builder pattern to configure and create {@link FileAppender} + * instances that write data to the target output files. + * + * @param <B> the concrete builder type for method chaining + * @param <E> engine-specific schema type for the input data records + */ +public interface WriteBuilder<B extends WriteBuilder<B, E>, E> { + /** Set the file schema. */ + B schema(Schema newSchema); + + /** + * Set a writer configuration property which affects the writer behavior. + * + * @param property a writer config property name + * @param value config value + * @return this for method chaining + */ + B set(String property, String value); + + default B set(Map<String, String> properties) { + properties.forEach(this::set); + return (B) this; + } + + /** + * Set a file metadata property in the created file. + * + * @param property a file metadata property name + * @param value config value + * @return this for method chaining + */ + B meta(String property, String value); + + /** Sets the metrics configuration used for collecting column metrics for the created file. */ + B metricsConfig(MetricsConfig newMetricsConfig); + + /** Overwrite the file if it already exists. By default, overwrite is disabled. */ + B overwrite(); + + /** + * Overwrite the file if it already exists. The default value is <code>false</code>. + * + * @deprecated Since 1.10.0, will be removed in 1.11.0. Only provided for backward compatibility. + * Use {@link #overwrite()} instead. + */ + @Deprecated + B overwrite(boolean enabled); + + /** + * Sets the encryption key used for writing the file. If the reader does not support encryption, + * then an exception should be thrown. + */ + default B fileEncryptionKey(ByteBuffer encryptionKey) { + throw new UnsupportedOperationException("Not supported"); + } + + /** + * Sets the additional authentication data (aad) prefix used for writing the file. If the reader + * does not support encryption, then an exception should be thrown. + */ + default B aadPrefix(ByteBuffer aadPrefix) { Review Comment: nit: I see both `aadPrefix` and `fileAADPrefiix` are used in existing code. maybe `fileAADPrefix` is a little more appealing to the eye and avoid the confusion with typo. It is also more consistent with the above method naming `fileEncryptionKey` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org