rdblue commented on code in PR #12298:
URL: https://github.com/apache/iceberg/pull/12298#discussion_r2766553386


##########
core/src/main/java/org/apache/iceberg/formats/FileWriterBuilderImpl.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.formats;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+abstract class FileWriterBuilderImpl<W extends FileWriter<D, ?>, D, S>
+    implements FileWriterBuilder<W, S> {
+  private final ModelWriteBuilder<D, S> modelWriteBuilder;
+  private final String location;
+  private final FileFormat format;
+  private Schema schema = null;
+  private PartitionSpec spec = null;
+  private StructLike partition = null;
+  private EncryptionKeyMetadata keyMetadata = null;
+  private SortOrder sortOrder = null;
+
+  /** Creates a builder for {@link DataWriter} instances for writing data 
files. */
+  static <D, S> FileWriterBuilder<DataWriter<D>, S> forDataFile(
+      FormatModel<D, S> model, EncryptedOutputFile outputFile) {
+    return new DataFileWriterBuilder<>(model, outputFile);
+  }
+
+  /**
+   * Creates a builder for {@link EqualityDeleteWriter} instances for writing 
equality delete files.
+   */
+  static <D, S> FileWriterBuilder<EqualityDeleteWriter<D>, S> 
forEqualityDelete(
+      FormatModel<D, S> model, EncryptedOutputFile outputFile) {
+    return new EqualityDeleteWriterBuilder<>(model, outputFile);
+  }
+
+  /**
+   * Creates a builder for {@link PositionDeleteWriter} instances for writing 
position delete files.
+   */
+  static <D, S> FileWriterBuilder<PositionDeleteWriter<D>, S> 
forPositionDelete(
+      FormatModel<PositionDelete<D>, S> model, EncryptedOutputFile outputFile) 
{
+    return new PositionDeleteWriterBuilder<>(model, outputFile);
+  }
+
+  private FileWriterBuilderImpl(
+      FormatModel<D, S> model, EncryptedOutputFile outputFile, FileContent 
content) {
+    this.modelWriteBuilder = model.writeBuilder(outputFile).content(content);
+    this.location = outputFile.encryptingOutputFile().location();
+    this.format = model.format();
+  }
+
+  @Override
+  public FileWriterBuilderImpl<W, D, S> set(String property, String value) {
+    modelWriteBuilder.set(property, value);
+    return this;
+  }
+
+  @Override
+  public FileWriterBuilderImpl<W, D, S> meta(String property, String value) {
+    modelWriteBuilder.meta(property, value);
+    return this;
+  }
+
+  @Override
+  public FileWriterBuilderImpl<W, D, S> metricsConfig(MetricsConfig 
metricsConfig) {
+    modelWriteBuilder.metricsConfig(metricsConfig);
+    return this;
+  }
+
+  @Override
+  public FileWriterBuilderImpl<W, D, S> overwrite() {
+    modelWriteBuilder.overwrite();
+    return this;
+  }
+
+  @Override
+  public FileWriterBuilderImpl<W, D, S> withFileEncryptionKey(ByteBuffer 
encryptionKey) {
+    modelWriteBuilder.withFileEncryptionKey(encryptionKey);
+    return this;
+  }
+
+  @Override
+  public FileWriterBuilderImpl<W, D, S> withAADPrefix(ByteBuffer aadPrefix) {
+    modelWriteBuilder.withAADPrefix(aadPrefix);
+    return this;
+  }
+
+  @Override
+  public FileWriterBuilderImpl<W, D, S> schema(Schema newSchema) {
+    modelWriteBuilder.schema(newSchema);
+    this.schema = newSchema;
+    return this;
+  }
+
+  @Override
+  public FileWriterBuilderImpl<W, D, S> engineSchema(S newSchema) {
+    modelWriteBuilder.engineSchema(newSchema);
+    return this;
+  }
+
+  @Override
+  public FileWriterBuilderImpl<W, D, S> spec(PartitionSpec newSpec) {
+    this.spec = newSpec;
+    return this;
+  }
+
+  @Override
+  public FileWriterBuilderImpl<W, D, S> partition(StructLike newPartition) {
+    this.partition = newPartition;
+    return this;
+  }
+
+  @Override
+  public FileWriterBuilderImpl<W, D, S> keyMetadata(EncryptionKeyMetadata 
newKeyMetadata) {
+    this.keyMetadata = newKeyMetadata;
+    return this;
+  }
+
+  @Override
+  public FileWriterBuilderImpl<W, D, S> sortOrder(SortOrder newSortOrder) {
+    this.sortOrder = newSortOrder;
+    return this;
+  }
+
+  @Override
+  public FileWriterBuilderImpl<W, D, S> equalityFieldIds(int... fieldIds) {
+    throw new UnsupportedOperationException(
+        "Equality field ids not supported for this writer type");
+  }
+
+  ModelWriteBuilder<D, S> modelWriteBuilder() {
+    return modelWriteBuilder;
+  }
+
+  String location() {
+    return location;
+  }
+
+  FileFormat format() {
+    return format;
+  }
+
+  Schema schema() {
+    return schema;
+  }
+
+  PartitionSpec spec() {
+    return spec;
+  }
+
+  StructLike partition() {
+    return partition;
+  }
+
+  EncryptionKeyMetadata keyMetadata() {
+    return keyMetadata;
+  }
+
+  SortOrder sortOrder() {
+    return sortOrder;
+  }
+
+  /** Builder for creating {@link DataWriter} instances for writing data 
files. */
+  private static class DataFileWriterBuilder<D, S>
+      extends FileWriterBuilderImpl<DataWriter<D>, D, S> {
+
+    private DataFileWriterBuilder(FormatModel<D, S> model, EncryptedOutputFile 
outputFile) {
+      super(model, outputFile, FileContent.DATA);
+    }
+
+    @Override
+    public DataWriter<D> build() throws IOException {
+      Preconditions.checkState(schema() != null, "Invalid schema for data 
writer: null");
+      Preconditions.checkArgument(spec() != null, "Invalid partition spec for 
data writer: null");
+      Preconditions.checkArgument(
+          spec().isUnpartitioned() || partition() != null,
+          "Invalid partition, does not match spec: %s",
+          spec());
+
+      return new DataWriter<>(
+          modelWriteBuilder().build(),
+          format(),
+          location(),
+          spec(),
+          partition(),
+          keyMetadata(),
+          sortOrder());
+    }
+  }
+
+  /**
+   * Builder for creating {@link EqualityDeleteWriter} instances for writing 
equality delete files.
+   */
+  private static class EqualityDeleteWriterBuilder<D, S>
+      extends FileWriterBuilderImpl<EqualityDeleteWriter<D>, D, S> {
+
+    private int[] equalityFieldIds = null;
+
+    private EqualityDeleteWriterBuilder(FormatModel<D, S> model, 
EncryptedOutputFile outputFile) {
+      super(model, outputFile, FileContent.EQUALITY_DELETES);
+    }
+
+    @Override
+    public EqualityDeleteWriterBuilder<D, S> equalityFieldIds(int... fieldIds) 
{
+      this.equalityFieldIds = fieldIds;
+      return this;
+    }
+
+    @Override
+    public EqualityDeleteWriter<D> build() throws IOException {
+      Preconditions.checkState(schema() != null, "Invalid schema for equality 
delete writer: null");
+      Preconditions.checkState(
+          equalityFieldIds != null, "Invalid delete field ids for equality 
delete writer: null");
+      Preconditions.checkArgument(
+          spec() != null, "Invalid partition spec for equality delete writer: 
null");
+      Preconditions.checkArgument(
+          spec().isUnpartitioned() || partition() != null,
+          "Invalid partition, does not match spec: %s",
+          spec());
+
+      return new EqualityDeleteWriter<>(
+          modelWriteBuilder()
+              .schema(schema())
+              .meta("delete-type", "equality")
+              .meta(
+                  "delete-field-ids",
+                  IntStream.of(equalityFieldIds)
+                      .mapToObj(Objects::toString)
+                      .collect(Collectors.joining(", ")))
+              .build(),
+          format(),
+          location(),
+          spec(),
+          partition(),
+          keyMetadata(),
+          sortOrder(),
+          equalityFieldIds);
+    }
+  }
+
+  /**
+   * Builder for creating {@link PositionDeleteWriter} instances for writing 
position delete files.
+   */
+  private static class PositionDeleteWriterBuilder<D, S>
+      extends FileWriterBuilderImpl<PositionDeleteWriter<D>, 
PositionDelete<D>, S> {
+
+    private PositionDeleteWriterBuilder(
+        FormatModel<PositionDelete<D>, S> model, EncryptedOutputFile 
outputFile) {
+      super(model, outputFile, FileContent.POSITION_DELETES);
+    }
+
+    @Override
+    public PositionDeleteWriter<D> build() throws IOException {
+      Preconditions.checkArgument(
+          spec() != null, "Invalid partition spec for position delete writer: 
null");
+      Preconditions.checkArgument(
+          spec().isUnpartitioned() || partition() != null,
+          "Invalid partition, does not match spec: %s",
+          spec());
+
+      return new PositionDeleteWriter<>(
+          new PositionDeleteFileAppender<>(
+              modelWriteBuilder().meta("delete-type", "position").build()),
+          format(),
+          location(),
+          spec(),
+          partition(),
+          keyMetadata());
+    }
+
+    private static class PositionDeleteFileAppender<T> implements 
FileAppender<StructLike> {

Review Comment:
   Because of this and some of the suppressions (like "rawtypes"), I took a 
deeper look into the type params in this class.
   
   I was able to avoid needing this class by updating `PositionDeleteWriter` so 
that its `FileAppender` is parameterized by `PositionDelete<T>` rather than 
`StructLike` (which is a parent of `PositionDelete`). Here's the diff:
   
   ```diff
   diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java 
b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
   index a8af5e9d0f..6fcd772d59 100644
   --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
   +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
   @@ -51,7 +51,7 @@ public class PositionDeleteWriter<T> implements 
FileWriter<PositionDelete<T>, De
      private static final Set<Integer> FILE_AND_POS_FIELD_IDS =
          ImmutableSet.of(DELETE_FILE_PATH.fieldId(), 
DELETE_FILE_POS.fieldId());
    
   -  private final FileAppender<StructLike> appender;
   +  private final FileAppender<PositionDelete<T>> appender;
      private final FileFormat format;
      private final String location;
      private final PartitionSpec spec;
   @@ -61,7 +61,7 @@ public class PositionDeleteWriter<T> implements 
FileWriter<PositionDelete<T>, De
      private DeleteFile deleteFile = null;
    
      public PositionDeleteWriter(
   -      FileAppender<StructLike> appender,
   +      FileAppender<PositionDelete<T>> appender,
          FileFormat format,
          String location,
          PartitionSpec spec,
   ```
   
   Also, I looked into consolidating as much as possible into the parent class 
and I think that validations are cleaner if they are put in a `validate` method 
on the parent. However, there was an issue with the type param `D` for 
`PositionDeleteWriteBuilder`, where `PositionDeleteWriter` needs to be 
constructed in the `PositionDeleteWriterBuilder` class because `D` in 
`FileWriterBuilderImpl` is `D=PositionDelete<T>` and there is no way to 
identify `T` in the parent class. That made me keep this model of implementing 
the `build` methods in the child classes.
   
   I also think it's less code to use `protected` instance fields rather than 
the getter methods. It seems slightly cleaner, but I'm fine if you don't like 
the change and want to keep the getters. Here's the diff for the other changes:
   
   ```diff
   diff --git 
a/core/src/main/java/org/apache/iceberg/formats/FileWriterBuilderImpl.java 
b/core/src/main/java/org/apache/iceberg/formats/FileWriterBuilderImpl.java
   index 85c7464069..e79161bd7c 100644
   --- 
a/core/src/main/java/org/apache/iceberg/formats/FileWriterBuilderImpl.java
   +++ 
b/core/src/main/java/org/apache/iceberg/formats/FileWriterBuilderImpl.java
   @@ -20,13 +20,11 @@ package org.apache.iceberg.formats;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
   -import java.util.List;
    import java.util.Objects;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    import org.apache.iceberg.FileContent;
    import org.apache.iceberg.FileFormat;
   -import org.apache.iceberg.Metrics;
    import org.apache.iceberg.MetricsConfig;
    import org.apache.iceberg.PartitionSpec;
    import org.apache.iceberg.Schema;
   @@ -38,21 +36,11 @@ import org.apache.iceberg.deletes.PositionDeleteWriter;
    import org.apache.iceberg.encryption.EncryptedOutputFile;
    import org.apache.iceberg.encryption.EncryptionKeyMetadata;
    import org.apache.iceberg.io.DataWriter;
   -import org.apache.iceberg.io.FileAppender;
    import org.apache.iceberg.io.FileWriter;
    import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
    
    abstract class FileWriterBuilderImpl<W extends FileWriter<D, ?>, D, S>
        implements FileWriterBuilder<W, S> {
   -  private final ModelWriteBuilder<D, S> modelWriteBuilder;
   -  private final String location;
   -  private final FileFormat format;
   -  private Schema schema = null;
   -  private PartitionSpec spec = null;
   -  private StructLike partition = null;
   -  private EncryptionKeyMetadata keyMetadata = null;
   -  private SortOrder sortOrder = null;
   -
      /** Creates a builder for {@link DataWriter} instances for writing data 
files. */
      static <D, S> FileWriterBuilder<DataWriter<D>, S> forDataFile(
          FormatModel<D, S> model, EncryptedOutputFile outputFile) {
   @@ -75,8 +63,20 @@ abstract class FileWriterBuilderImpl<W extends 
FileWriter<D, ?>, D, S>
        return new PositionDeleteWriterBuilder<>(model, outputFile);
      }
    
   +  private final FileContent content;
   +  protected final ModelWriteBuilder<D, S> modelWriteBuilder;
   +  protected final String location;
   +  protected final FileFormat format;
   +  protected Schema schema = null;
   +  protected PartitionSpec spec = null;
   +  protected StructLike partition = null;
   +  protected EncryptionKeyMetadata keyMetadata = null;
   +  protected SortOrder sortOrder = null;
   +  protected int[] equalityFieldIds = null;
   +
      private FileWriterBuilderImpl(
          FormatModel<D, S> model, EncryptedOutputFile outputFile, FileContent 
content) {
   +    this.content = content;
        this.modelWriteBuilder = 
model.writeBuilder(outputFile).content(content);
        this.location = outputFile.encryptingOutputFile().location();
        this.format = model.format();
   @@ -157,40 +157,27 @@ abstract class FileWriterBuilderImpl<W extends 
FileWriter<D, ?>, D, S>
    
      @Override
      public FileWriterBuilderImpl<W, D, S> equalityFieldIds(int... fieldIds) {
   -    throw new UnsupportedOperationException(
   -        "Equality field ids not supported for this writer type");
   -  }
   -
   -  ModelWriteBuilder<D, S> modelWriteBuilder() {
   -    return modelWriteBuilder;
   -  }
   -
   -  String location() {
   -    return location;
   -  }
   -
   -  FileFormat format() {
   -    return format;
   -  }
   -
   -  Schema schema() {
   -    return schema;
   -  }
   -
   -  PartitionSpec spec() {
   -    return spec;
   -  }
   +    if (content != FileContent.EQUALITY_DELETES) {
   +      throw new UnsupportedOperationException(
   +          "Equality field ids not supported for this writer type");
   +    }
    
   -  StructLike partition() {
   -    return partition;
   -  }
   +    this.equalityFieldIds = fieldIds;
    
   -  EncryptionKeyMetadata keyMetadata() {
   -    return keyMetadata;
   +    return this;
      }
    
   -  SortOrder sortOrder() {
   -    return sortOrder;
   +  protected void validate() {
   +    Preconditions.checkState(
   +        content != FileContent.EQUALITY_DELETES || equalityFieldIds != null,
   +        "Invalid delete field ids for equality delete writer: null");
   +    Preconditions.checkState(
   +        content == FileContent.POSITION_DELETES || schema != null, "Invalid 
schema: null");
   +    Preconditions.checkArgument(spec != null, "Invalid partition spec: 
null");
   +    Preconditions.checkArgument(
   +        spec.isUnpartitioned() || partition != null,
   +        "Invalid partition, does not match spec: %s",
   +        spec);
      }
    
      /** Builder for creating {@link DataWriter} instances for writing data 
files. */
   @@ -203,21 +190,9 @@ abstract class FileWriterBuilderImpl<W extends 
FileWriter<D, ?>, D, S>
    
        @Override
        public DataWriter<D> build() throws IOException {
   -      Preconditions.checkState(schema() != null, "Invalid schema for data 
writer: null");
   -      Preconditions.checkArgument(spec() != null, "Invalid partition spec 
for data writer: null");
   -      Preconditions.checkArgument(
   -          spec().isUnpartitioned() || partition() != null,
   -          "Invalid partition, does not match spec: %s",
   -          spec());
   -
   +      validate();
          return new DataWriter<>(
   -          modelWriteBuilder().build(),
   -          format(),
   -          location(),
   -          spec(),
   -          partition(),
   -          keyMetadata(),
   -          sortOrder());
   +          modelWriteBuilder.build(), format, location, spec, partition, 
keyMetadata, sortOrder);
        }
      }
   
   @@ -227,33 +202,16 @@ abstract class FileWriterBuilderImpl<W extends 
FileWriter<D, ?>, D, S>
      private static class EqualityDeleteWriterBuilder<D, S>
          extends FileWriterBuilderImpl<EqualityDeleteWriter<D>, D, S> {
    
   -    private int[] equalityFieldIds = null;
   -
        private EqualityDeleteWriterBuilder(FormatModel<D, S> model, 
EncryptedOutputFile outputFile) {
          super(model, outputFile, FileContent.EQUALITY_DELETES);
        }
    
   -    @Override
   -    public EqualityDeleteWriterBuilder<D, S> equalityFieldIds(int... 
fieldIds) {
   -      this.equalityFieldIds = fieldIds;
   -      return this;
   -    }
   -
        @Override
        public EqualityDeleteWriter<D> build() throws IOException {
   -      Preconditions.checkState(schema() != null, "Invalid schema for 
equality delete writer: null");
   -      Preconditions.checkState(
   -          equalityFieldIds != null, "Invalid delete field ids for equality 
delete writer: null");
   -      Preconditions.checkArgument(
   -          spec() != null, "Invalid partition spec for equality delete 
writer: null");
   -      Preconditions.checkArgument(
   -          spec().isUnpartitioned() || partition() != null,
   -          "Invalid partition, does not match spec: %s",
   -          spec());
   -
   +      validate();
          return new EqualityDeleteWriter<>(
   -          modelWriteBuilder()
   -              .schema(schema())
   +          modelWriteBuilder
   +              .schema(schema)
                  .meta("delete-type", "equality")
                  .meta(
                      "delete-field-ids",
   @@ -261,12 +219,12 @@ abstract class FileWriterBuilderImpl<W extends 
FileWriter<D, ?>, D, S>
                          .mapToObj(Objects::toString)
                          .collect(Collectors.joining(", ")))
                  .build(),
   -          format(),
   -          location(),
   -          spec(),
   -          partition(),
   -          keyMetadata(),
   -          sortOrder(),
   +          format,
   +          location,
   +          spec,
   +          partition,
   +          keyMetadata,
   +          sortOrder,
              equalityFieldIds);
        }
      }
   @@ -284,55 +242,14 @@ abstract class FileWriterBuilderImpl<W extends 
FileWriter<D, ?>, D, S>
    
        @Override
        public PositionDeleteWriter<D> build() throws IOException {
   -      Preconditions.checkArgument(
   -          spec() != null, "Invalid partition spec for position delete 
writer: null");
   -      Preconditions.checkArgument(
   -          spec().isUnpartitioned() || partition() != null,
   -          "Invalid partition, does not match spec: %s",
   -          spec());
   -
   +      validate();
          return new PositionDeleteWriter<>(
   -          new PositionDeleteFileAppender<>(
   -              modelWriteBuilder().meta("delete-type", "position").build()),
   -          format(),
   -          location(),
   -          spec(),
   -          partition(),
   -          keyMetadata());
   -    }
   -
   -    private static class PositionDeleteFileAppender<T> implements 
FileAppender<StructLike> {
   -      private final FileAppender<PositionDelete<T>> appender;
   -
   -      PositionDeleteFileAppender(FileAppender<PositionDelete<T>> appender) {
   -        this.appender = appender;
   -      }
   -
   -      @SuppressWarnings("unchecked")
   -      @Override
   -      public void add(StructLike positionDelete) {
   -        appender.add((PositionDelete<T>) positionDelete);
   -      }
   -
   -      @Override
   -      public Metrics metrics() {
   -        return appender.metrics();
   -      }
   -
   -      @Override
   -      public long length() {
   -        return appender.length();
   -      }
   -
   -      @Override
   -      public void close() throws IOException {
   -        appender.close();
   -      }
   -
   -      @Override
   -      public List<Long> splitOffsets() {
   -        return appender.splitOffsets();
   -      }
   +          modelWriteBuilder.meta("delete-type", "position").build(),
   +          format,
   +          location,
   +          spec,
   +          partition,
   +          keyMetadata);
        }
      }
    }
   ```
   
   I think this is a bit better and removes some of the casting needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to