rdblue commented on code in PR #12298: URL: https://github.com/apache/iceberg/pull/12298#discussion_r2766553386
########## core/src/main/java/org/apache/iceberg/formats/FileWriterBuilderImpl.java: ########## @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.formats; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +abstract class FileWriterBuilderImpl<W extends FileWriter<D, ?>, D, S> + implements FileWriterBuilder<W, S> { + private final ModelWriteBuilder<D, S> modelWriteBuilder; + private final String location; + private final FileFormat format; + private Schema schema = null; + private PartitionSpec spec = null; + private StructLike partition = null; + private EncryptionKeyMetadata keyMetadata = null; + private SortOrder sortOrder = null; + + /** Creates a builder for {@link DataWriter} instances for writing data files. */ + static <D, S> FileWriterBuilder<DataWriter<D>, S> forDataFile( + FormatModel<D, S> model, EncryptedOutputFile outputFile) { + return new DataFileWriterBuilder<>(model, outputFile); + } + + /** + * Creates a builder for {@link EqualityDeleteWriter} instances for writing equality delete files. + */ + static <D, S> FileWriterBuilder<EqualityDeleteWriter<D>, S> forEqualityDelete( + FormatModel<D, S> model, EncryptedOutputFile outputFile) { + return new EqualityDeleteWriterBuilder<>(model, outputFile); + } + + /** + * Creates a builder for {@link PositionDeleteWriter} instances for writing position delete files. + */ + static <D, S> FileWriterBuilder<PositionDeleteWriter<D>, S> forPositionDelete( + FormatModel<PositionDelete<D>, S> model, EncryptedOutputFile outputFile) { + return new PositionDeleteWriterBuilder<>(model, outputFile); + } + + private FileWriterBuilderImpl( + FormatModel<D, S> model, EncryptedOutputFile outputFile, FileContent content) { + this.modelWriteBuilder = model.writeBuilder(outputFile).content(content); + this.location = outputFile.encryptingOutputFile().location(); + this.format = model.format(); + } + + @Override + public FileWriterBuilderImpl<W, D, S> set(String property, String value) { + modelWriteBuilder.set(property, value); + return this; + } + + @Override + public FileWriterBuilderImpl<W, D, S> meta(String property, String value) { + modelWriteBuilder.meta(property, value); + return this; + } + + @Override + public FileWriterBuilderImpl<W, D, S> metricsConfig(MetricsConfig metricsConfig) { + modelWriteBuilder.metricsConfig(metricsConfig); + return this; + } + + @Override + public FileWriterBuilderImpl<W, D, S> overwrite() { + modelWriteBuilder.overwrite(); + return this; + } + + @Override + public FileWriterBuilderImpl<W, D, S> withFileEncryptionKey(ByteBuffer encryptionKey) { + modelWriteBuilder.withFileEncryptionKey(encryptionKey); + return this; + } + + @Override + public FileWriterBuilderImpl<W, D, S> withAADPrefix(ByteBuffer aadPrefix) { + modelWriteBuilder.withAADPrefix(aadPrefix); + return this; + } + + @Override + public FileWriterBuilderImpl<W, D, S> schema(Schema newSchema) { + modelWriteBuilder.schema(newSchema); + this.schema = newSchema; + return this; + } + + @Override + public FileWriterBuilderImpl<W, D, S> engineSchema(S newSchema) { + modelWriteBuilder.engineSchema(newSchema); + return this; + } + + @Override + public FileWriterBuilderImpl<W, D, S> spec(PartitionSpec newSpec) { + this.spec = newSpec; + return this; + } + + @Override + public FileWriterBuilderImpl<W, D, S> partition(StructLike newPartition) { + this.partition = newPartition; + return this; + } + + @Override + public FileWriterBuilderImpl<W, D, S> keyMetadata(EncryptionKeyMetadata newKeyMetadata) { + this.keyMetadata = newKeyMetadata; + return this; + } + + @Override + public FileWriterBuilderImpl<W, D, S> sortOrder(SortOrder newSortOrder) { + this.sortOrder = newSortOrder; + return this; + } + + @Override + public FileWriterBuilderImpl<W, D, S> equalityFieldIds(int... fieldIds) { + throw new UnsupportedOperationException( + "Equality field ids not supported for this writer type"); + } + + ModelWriteBuilder<D, S> modelWriteBuilder() { + return modelWriteBuilder; + } + + String location() { + return location; + } + + FileFormat format() { + return format; + } + + Schema schema() { + return schema; + } + + PartitionSpec spec() { + return spec; + } + + StructLike partition() { + return partition; + } + + EncryptionKeyMetadata keyMetadata() { + return keyMetadata; + } + + SortOrder sortOrder() { + return sortOrder; + } + + /** Builder for creating {@link DataWriter} instances for writing data files. */ + private static class DataFileWriterBuilder<D, S> + extends FileWriterBuilderImpl<DataWriter<D>, D, S> { + + private DataFileWriterBuilder(FormatModel<D, S> model, EncryptedOutputFile outputFile) { + super(model, outputFile, FileContent.DATA); + } + + @Override + public DataWriter<D> build() throws IOException { + Preconditions.checkState(schema() != null, "Invalid schema for data writer: null"); + Preconditions.checkArgument(spec() != null, "Invalid partition spec for data writer: null"); + Preconditions.checkArgument( + spec().isUnpartitioned() || partition() != null, + "Invalid partition, does not match spec: %s", + spec()); + + return new DataWriter<>( + modelWriteBuilder().build(), + format(), + location(), + spec(), + partition(), + keyMetadata(), + sortOrder()); + } + } + + /** + * Builder for creating {@link EqualityDeleteWriter} instances for writing equality delete files. + */ + private static class EqualityDeleteWriterBuilder<D, S> + extends FileWriterBuilderImpl<EqualityDeleteWriter<D>, D, S> { + + private int[] equalityFieldIds = null; + + private EqualityDeleteWriterBuilder(FormatModel<D, S> model, EncryptedOutputFile outputFile) { + super(model, outputFile, FileContent.EQUALITY_DELETES); + } + + @Override + public EqualityDeleteWriterBuilder<D, S> equalityFieldIds(int... fieldIds) { + this.equalityFieldIds = fieldIds; + return this; + } + + @Override + public EqualityDeleteWriter<D> build() throws IOException { + Preconditions.checkState(schema() != null, "Invalid schema for equality delete writer: null"); + Preconditions.checkState( + equalityFieldIds != null, "Invalid delete field ids for equality delete writer: null"); + Preconditions.checkArgument( + spec() != null, "Invalid partition spec for equality delete writer: null"); + Preconditions.checkArgument( + spec().isUnpartitioned() || partition() != null, + "Invalid partition, does not match spec: %s", + spec()); + + return new EqualityDeleteWriter<>( + modelWriteBuilder() + .schema(schema()) + .meta("delete-type", "equality") + .meta( + "delete-field-ids", + IntStream.of(equalityFieldIds) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))) + .build(), + format(), + location(), + spec(), + partition(), + keyMetadata(), + sortOrder(), + equalityFieldIds); + } + } + + /** + * Builder for creating {@link PositionDeleteWriter} instances for writing position delete files. + */ + private static class PositionDeleteWriterBuilder<D, S> + extends FileWriterBuilderImpl<PositionDeleteWriter<D>, PositionDelete<D>, S> { + + private PositionDeleteWriterBuilder( + FormatModel<PositionDelete<D>, S> model, EncryptedOutputFile outputFile) { + super(model, outputFile, FileContent.POSITION_DELETES); + } + + @Override + public PositionDeleteWriter<D> build() throws IOException { + Preconditions.checkArgument( + spec() != null, "Invalid partition spec for position delete writer: null"); + Preconditions.checkArgument( + spec().isUnpartitioned() || partition() != null, + "Invalid partition, does not match spec: %s", + spec()); + + return new PositionDeleteWriter<>( + new PositionDeleteFileAppender<>( + modelWriteBuilder().meta("delete-type", "position").build()), + format(), + location(), + spec(), + partition(), + keyMetadata()); + } + + private static class PositionDeleteFileAppender<T> implements FileAppender<StructLike> { Review Comment: Because of this and some of the suppressions (like "rawtypes"), I took a deeper look into the type params in this class. I was able to avoid needing this class by updating `PositionDeleteWriter` so that its `FileAppender` is parameterized by `PositionDelete<T>` rather than `StructLike` (which is a parent of `PositionDelete`). Here's the diff: ```diff diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java index a8af5e9d0f..6fcd772d59 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java @@ -51,7 +51,7 @@ public class PositionDeleteWriter<T> implements FileWriter<PositionDelete<T>, De private static final Set<Integer> FILE_AND_POS_FIELD_IDS = ImmutableSet.of(DELETE_FILE_PATH.fieldId(), DELETE_FILE_POS.fieldId()); - private final FileAppender<StructLike> appender; + private final FileAppender<PositionDelete<T>> appender; private final FileFormat format; private final String location; private final PartitionSpec spec; @@ -61,7 +61,7 @@ public class PositionDeleteWriter<T> implements FileWriter<PositionDelete<T>, De private DeleteFile deleteFile = null; public PositionDeleteWriter( - FileAppender<StructLike> appender, + FileAppender<PositionDelete<T>> appender, FileFormat format, String location, PartitionSpec spec, ``` Also, I looked into consolidating as much as possible into the parent class and I think that validations are cleaner if they are put in a `validate` method on the parent. However, there was an issue with the type param `D` for `PositionDeleteWriteBuilder`, where `PositionDeleteWriter` needs to be constructed in the `PositionDeleteWriterBuilder` class because `D` in `FileWriterBuilderImpl` is `D=PositionDelete<T>` and there is no way to identify `T` in the parent class. That made me keep this model of implementing the `build` methods in the child classes. I also think it's less code to use `protected` instance fields rather than the getter methods. It seems slightly cleaner, but I'm fine if you don't like the change and want to keep the getters. Here's the diff for the other changes: ```diff diff --git a/core/src/main/java/org/apache/iceberg/formats/FileWriterBuilderImpl.java b/core/src/main/java/org/apache/iceberg/formats/FileWriterBuilderImpl.java index 85c7464069..e79161bd7c 100644 --- a/core/src/main/java/org/apache/iceberg/formats/FileWriterBuilderImpl.java +++ b/core/src/main/java/org/apache/iceberg/formats/FileWriterBuilderImpl.java @@ -20,13 +20,11 @@ package org.apache.iceberg.formats; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.List; import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Metrics; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -38,21 +36,11 @@ import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.io.DataWriter; -import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; abstract class FileWriterBuilderImpl<W extends FileWriter<D, ?>, D, S> implements FileWriterBuilder<W, S> { - private final ModelWriteBuilder<D, S> modelWriteBuilder; - private final String location; - private final FileFormat format; - private Schema schema = null; - private PartitionSpec spec = null; - private StructLike partition = null; - private EncryptionKeyMetadata keyMetadata = null; - private SortOrder sortOrder = null; - /** Creates a builder for {@link DataWriter} instances for writing data files. */ static <D, S> FileWriterBuilder<DataWriter<D>, S> forDataFile( FormatModel<D, S> model, EncryptedOutputFile outputFile) { @@ -75,8 +63,20 @@ abstract class FileWriterBuilderImpl<W extends FileWriter<D, ?>, D, S> return new PositionDeleteWriterBuilder<>(model, outputFile); } + private final FileContent content; + protected final ModelWriteBuilder<D, S> modelWriteBuilder; + protected final String location; + protected final FileFormat format; + protected Schema schema = null; + protected PartitionSpec spec = null; + protected StructLike partition = null; + protected EncryptionKeyMetadata keyMetadata = null; + protected SortOrder sortOrder = null; + protected int[] equalityFieldIds = null; + private FileWriterBuilderImpl( FormatModel<D, S> model, EncryptedOutputFile outputFile, FileContent content) { + this.content = content; this.modelWriteBuilder = model.writeBuilder(outputFile).content(content); this.location = outputFile.encryptingOutputFile().location(); this.format = model.format(); @@ -157,40 +157,27 @@ abstract class FileWriterBuilderImpl<W extends FileWriter<D, ?>, D, S> @Override public FileWriterBuilderImpl<W, D, S> equalityFieldIds(int... fieldIds) { - throw new UnsupportedOperationException( - "Equality field ids not supported for this writer type"); - } - - ModelWriteBuilder<D, S> modelWriteBuilder() { - return modelWriteBuilder; - } - - String location() { - return location; - } - - FileFormat format() { - return format; - } - - Schema schema() { - return schema; - } - - PartitionSpec spec() { - return spec; - } + if (content != FileContent.EQUALITY_DELETES) { + throw new UnsupportedOperationException( + "Equality field ids not supported for this writer type"); + } - StructLike partition() { - return partition; - } + this.equalityFieldIds = fieldIds; - EncryptionKeyMetadata keyMetadata() { - return keyMetadata; + return this; } - SortOrder sortOrder() { - return sortOrder; + protected void validate() { + Preconditions.checkState( + content != FileContent.EQUALITY_DELETES || equalityFieldIds != null, + "Invalid delete field ids for equality delete writer: null"); + Preconditions.checkState( + content == FileContent.POSITION_DELETES || schema != null, "Invalid schema: null"); + Preconditions.checkArgument(spec != null, "Invalid partition spec: null"); + Preconditions.checkArgument( + spec.isUnpartitioned() || partition != null, + "Invalid partition, does not match spec: %s", + spec); } /** Builder for creating {@link DataWriter} instances for writing data files. */ @@ -203,21 +190,9 @@ abstract class FileWriterBuilderImpl<W extends FileWriter<D, ?>, D, S> @Override public DataWriter<D> build() throws IOException { - Preconditions.checkState(schema() != null, "Invalid schema for data writer: null"); - Preconditions.checkArgument(spec() != null, "Invalid partition spec for data writer: null"); - Preconditions.checkArgument( - spec().isUnpartitioned() || partition() != null, - "Invalid partition, does not match spec: %s", - spec()); - + validate(); return new DataWriter<>( - modelWriteBuilder().build(), - format(), - location(), - spec(), - partition(), - keyMetadata(), - sortOrder()); + modelWriteBuilder.build(), format, location, spec, partition, keyMetadata, sortOrder); } } @@ -227,33 +202,16 @@ abstract class FileWriterBuilderImpl<W extends FileWriter<D, ?>, D, S> private static class EqualityDeleteWriterBuilder<D, S> extends FileWriterBuilderImpl<EqualityDeleteWriter<D>, D, S> { - private int[] equalityFieldIds = null; - private EqualityDeleteWriterBuilder(FormatModel<D, S> model, EncryptedOutputFile outputFile) { super(model, outputFile, FileContent.EQUALITY_DELETES); } - @Override - public EqualityDeleteWriterBuilder<D, S> equalityFieldIds(int... fieldIds) { - this.equalityFieldIds = fieldIds; - return this; - } - @Override public EqualityDeleteWriter<D> build() throws IOException { - Preconditions.checkState(schema() != null, "Invalid schema for equality delete writer: null"); - Preconditions.checkState( - equalityFieldIds != null, "Invalid delete field ids for equality delete writer: null"); - Preconditions.checkArgument( - spec() != null, "Invalid partition spec for equality delete writer: null"); - Preconditions.checkArgument( - spec().isUnpartitioned() || partition() != null, - "Invalid partition, does not match spec: %s", - spec()); - + validate(); return new EqualityDeleteWriter<>( - modelWriteBuilder() - .schema(schema()) + modelWriteBuilder + .schema(schema) .meta("delete-type", "equality") .meta( "delete-field-ids", @@ -261,12 +219,12 @@ abstract class FileWriterBuilderImpl<W extends FileWriter<D, ?>, D, S> .mapToObj(Objects::toString) .collect(Collectors.joining(", "))) .build(), - format(), - location(), - spec(), - partition(), - keyMetadata(), - sortOrder(), + format, + location, + spec, + partition, + keyMetadata, + sortOrder, equalityFieldIds); } } @@ -284,55 +242,14 @@ abstract class FileWriterBuilderImpl<W extends FileWriter<D, ?>, D, S> @Override public PositionDeleteWriter<D> build() throws IOException { - Preconditions.checkArgument( - spec() != null, "Invalid partition spec for position delete writer: null"); - Preconditions.checkArgument( - spec().isUnpartitioned() || partition() != null, - "Invalid partition, does not match spec: %s", - spec()); - + validate(); return new PositionDeleteWriter<>( - new PositionDeleteFileAppender<>( - modelWriteBuilder().meta("delete-type", "position").build()), - format(), - location(), - spec(), - partition(), - keyMetadata()); - } - - private static class PositionDeleteFileAppender<T> implements FileAppender<StructLike> { - private final FileAppender<PositionDelete<T>> appender; - - PositionDeleteFileAppender(FileAppender<PositionDelete<T>> appender) { - this.appender = appender; - } - - @SuppressWarnings("unchecked") - @Override - public void add(StructLike positionDelete) { - appender.add((PositionDelete<T>) positionDelete); - } - - @Override - public Metrics metrics() { - return appender.metrics(); - } - - @Override - public long length() { - return appender.length(); - } - - @Override - public void close() throws IOException { - appender.close(); - } - - @Override - public List<Long> splitOffsets() { - return appender.splitOffsets(); - } + modelWriteBuilder.meta("delete-type", "position").build(), + format, + location, + spec, + partition, + keyMetadata); } } } ``` I think this is a bit better and removes some of the casting needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
