aokolnychyi commented on a change in pull request #2935:
URL: https://github.com/apache/iceberg/pull/2935#discussion_r682880852



##########
File path: orc/src/main/java/org/apache/iceberg/orc/ORC.java
##########
@@ -223,6 +232,161 @@ public DataWriteBuilder withSortOrder(SortOrder 
newSortOrder) {
     }
   }
 
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private BiFunction<Schema, TypeDescription, OrcRowWriter<?>> 
createWriterFunc = null;
+    private Schema rowSchema = null;
+    private PartitionSpec spec = null;
+    private StructLike partition = null;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+    private SortOrder sortOrder;
+    private Function<CharSequence, ?> pathTransformFunc = Function.identity();

Review comment:
       This seems to be never used. I think it is needed for query engine 
integrations to transform `CharSequence` to internal string representations. 
Should probably be used in positional delete?

##########
File path: orc/src/main/java/org/apache/iceberg/orc/ORC.java
##########
@@ -223,6 +232,161 @@ public DataWriteBuilder withSortOrder(SortOrder 
newSortOrder) {
     }
   }
 
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private BiFunction<Schema, TypeDescription, OrcRowWriter<?>> 
createWriterFunc = null;
+    private Schema rowSchema = null;
+    private PartitionSpec spec = null;
+    private StructLike partition = null;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+    private SortOrder sortOrder;
+    private Function<CharSequence, ?> pathTransformFunc = Function.identity();
+
+    private DeleteWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DeleteWriteBuilder forTable(Table table) {
+      rowSchema(table.schema());
+      withSpec(table.spec());
+      setAll(table.properties());
+      metricsConfig(MetricsConfig.fromProperties(table.properties()));
+      return this;
+    }
+
+    public DeleteWriteBuilder set(String property, String value) {
+      appenderBuilder.config(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder setAll(Map<String, String> properties) {
+      appenderBuilder.setAll(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(String property, String value) {
+      appenderBuilder.metadata(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public DeleteWriteBuilder overwrite(boolean enabled) {
+      appenderBuilder.overwrite(enabled);
+      return this;
+    }
+
+    public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+      // TODO: keep full metrics for position delete file columns
+      appenderBuilder.metricsConfig(newMetricsConfig);
+      return this;
+    }
+
+    public DeleteWriteBuilder createWriterFunc(BiFunction<Schema, 
TypeDescription, OrcRowWriter<?>> newWriterFunc) {
+      this.createWriterFunc = newWriterFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder rowSchema(Schema newSchema) {
+      this.rowSchema = newSchema;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+      this.spec = newSpec;
+      return this;
+    }
+
+    public DeleteWriteBuilder withPartition(StructLike key) {
+      this.partition = key;
+      return this;
+    }
+
+    public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+      this.keyMetadata = metadata;
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+      this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+      this.equalityFieldIds = fieldIds;
+      return this;
+    }
+
+    public DeleteWriteBuilder transformPaths(Function<CharSequence, ?> 
newPathTransformFunc) {
+      this.pathTransformFunc = newPathTransformFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
+      this.sortOrder = newSortOrder;
+      return this;
+    }
+
+    public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws 
IOException {
+      Preconditions.checkState(rowSchema != null, "Cannot create equality 
delete file without a schema`");
+      Preconditions.checkState(equalityFieldIds != null, "Cannot create 
equality delete file without delete field ids");
+      Preconditions.checkState(createWriterFunc != null,
+          "Cannot create equality delete file unless createWriterFunc is set");
+      Preconditions.checkArgument(spec != null,
+          "Spec must not be null when creating equality delete writer");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null for partitioned writes");
+
+      meta("delete-type", "equality");
+      meta("delete-field-ids", IntStream.of(equalityFieldIds)
+          .mapToObj(Objects::toString)
+          .collect(Collectors.joining(", ")));
+
+      // the appender uses the row schema without extra columns
+      appenderBuilder.schema(rowSchema);
+      appenderBuilder.createWriterFunc(createWriterFunc);

Review comment:
       Do we plan to introduce write table properties for ORC like we have for 
Parquet? Like controlling the stipe size?
   
   I think our ORC and Parquet implementations are inconsistent here. We can 
probably set some ORC props directly in table properties and they will be 
applied but then we have no way to configure delete writers separately.
   
   @rdblue @omalley @pvary @edgarRd @shardulm94, any particular reason why we 
don't have table properties to configure ORC writes?

##########
File path: orc/src/main/java/org/apache/iceberg/orc/ORC.java
##########
@@ -223,6 +232,161 @@ public DataWriteBuilder withSortOrder(SortOrder 
newSortOrder) {
     }
   }
 
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private BiFunction<Schema, TypeDescription, OrcRowWriter<?>> 
createWriterFunc = null;
+    private Schema rowSchema = null;
+    private PartitionSpec spec = null;
+    private StructLike partition = null;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+    private SortOrder sortOrder;
+    private Function<CharSequence, ?> pathTransformFunc = Function.identity();
+
+    private DeleteWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DeleteWriteBuilder forTable(Table table) {
+      rowSchema(table.schema());
+      withSpec(table.spec());
+      setAll(table.properties());
+      metricsConfig(MetricsConfig.fromProperties(table.properties()));
+      return this;
+    }
+
+    public DeleteWriteBuilder set(String property, String value) {
+      appenderBuilder.config(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder setAll(Map<String, String> properties) {
+      appenderBuilder.setAll(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(String property, String value) {
+      appenderBuilder.metadata(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public DeleteWriteBuilder overwrite(boolean enabled) {
+      appenderBuilder.overwrite(enabled);
+      return this;
+    }
+
+    public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+      // TODO: keep full metrics for position delete file columns
+      appenderBuilder.metricsConfig(newMetricsConfig);
+      return this;
+    }
+
+    public DeleteWriteBuilder createWriterFunc(BiFunction<Schema, 
TypeDescription, OrcRowWriter<?>> newWriterFunc) {
+      this.createWriterFunc = newWriterFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder rowSchema(Schema newSchema) {
+      this.rowSchema = newSchema;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+      this.spec = newSpec;
+      return this;
+    }
+
+    public DeleteWriteBuilder withPartition(StructLike key) {
+      this.partition = key;
+      return this;
+    }
+
+    public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+      this.keyMetadata = metadata;
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+      this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+      this.equalityFieldIds = fieldIds;
+      return this;
+    }
+
+    public DeleteWriteBuilder transformPaths(Function<CharSequence, ?> 
newPathTransformFunc) {
+      this.pathTransformFunc = newPathTransformFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
+      this.sortOrder = newSortOrder;
+      return this;
+    }
+
+    public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws 
IOException {
+      Preconditions.checkState(rowSchema != null, "Cannot create equality 
delete file without a schema`");
+      Preconditions.checkState(equalityFieldIds != null, "Cannot create 
equality delete file without delete field ids");
+      Preconditions.checkState(createWriterFunc != null,
+          "Cannot create equality delete file unless createWriterFunc is set");
+      Preconditions.checkArgument(spec != null,
+          "Spec must not be null when creating equality delete writer");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null for partitioned writes");
+
+      meta("delete-type", "equality");
+      meta("delete-field-ids", IntStream.of(equalityFieldIds)
+          .mapToObj(Objects::toString)
+          .collect(Collectors.joining(", ")));
+
+      // the appender uses the row schema without extra columns
+      appenderBuilder.schema(rowSchema);
+      appenderBuilder.createWriterFunc(createWriterFunc);

Review comment:
       Do we plan to introduce write table properties for ORC like we have for 
Parquet? Like controlling the stripe size?
   
   I think our ORC and Parquet implementations are inconsistent here. We can 
probably set some ORC props directly in table properties and they will be 
applied but then we have no way to configure delete writers separately.
   
   @rdblue @omalley @pvary @edgarRd @shardulm94, any particular reason why we 
don't have table properties to configure ORC writes?

##########
File path: orc/src/main/java/org/apache/iceberg/orc/ORC.java
##########
@@ -223,6 +232,161 @@ public DataWriteBuilder withSortOrder(SortOrder 
newSortOrder) {
     }
   }
 
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private BiFunction<Schema, TypeDescription, OrcRowWriter<?>> 
createWriterFunc = null;
+    private Schema rowSchema = null;
+    private PartitionSpec spec = null;
+    private StructLike partition = null;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+    private SortOrder sortOrder;
+    private Function<CharSequence, ?> pathTransformFunc = Function.identity();

Review comment:
       Can we use the introduced delete writers in `BaseWriterFactory` and 
adapt `TestWriterFactory` too? Then we will make sure deletes are working as 
expected.
   
   We have Flink and Spark writer factories right now. Should be trivial to 
consume the new ORC writers there.

##########
File path: orc/src/main/java/org/apache/iceberg/orc/ORC.java
##########
@@ -223,6 +232,161 @@ public DataWriteBuilder withSortOrder(SortOrder 
newSortOrder) {
     }
   }
 
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private BiFunction<Schema, TypeDescription, OrcRowWriter<?>> 
createWriterFunc = null;
+    private Schema rowSchema = null;
+    private PartitionSpec spec = null;
+    private StructLike partition = null;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+    private SortOrder sortOrder;
+    private Function<CharSequence, ?> pathTransformFunc = Function.identity();
+
+    private DeleteWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DeleteWriteBuilder forTable(Table table) {
+      rowSchema(table.schema());
+      withSpec(table.spec());
+      setAll(table.properties());
+      metricsConfig(MetricsConfig.fromProperties(table.properties()));
+      return this;
+    }
+
+    public DeleteWriteBuilder set(String property, String value) {
+      appenderBuilder.config(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder setAll(Map<String, String> properties) {
+      appenderBuilder.setAll(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(String property, String value) {
+      appenderBuilder.metadata(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public DeleteWriteBuilder overwrite(boolean enabled) {
+      appenderBuilder.overwrite(enabled);
+      return this;
+    }
+
+    public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+      // TODO: keep full metrics for position delete file columns
+      appenderBuilder.metricsConfig(newMetricsConfig);
+      return this;
+    }
+
+    public DeleteWriteBuilder createWriterFunc(BiFunction<Schema, 
TypeDescription, OrcRowWriter<?>> newWriterFunc) {
+      this.createWriterFunc = newWriterFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder rowSchema(Schema newSchema) {
+      this.rowSchema = newSchema;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+      this.spec = newSpec;
+      return this;
+    }
+
+    public DeleteWriteBuilder withPartition(StructLike key) {
+      this.partition = key;
+      return this;
+    }
+
+    public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+      this.keyMetadata = metadata;
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+      this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+      this.equalityFieldIds = fieldIds;
+      return this;
+    }
+
+    public DeleteWriteBuilder transformPaths(Function<CharSequence, ?> 
newPathTransformFunc) {
+      this.pathTransformFunc = newPathTransformFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
+      this.sortOrder = newSortOrder;
+      return this;
+    }
+
+    public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws 
IOException {

Review comment:
       nit: do we need to declare `IOException` here?

##########
File path: orc/src/main/java/org/apache/iceberg/orc/ORC.java
##########
@@ -223,6 +232,161 @@ public DataWriteBuilder withSortOrder(SortOrder 
newSortOrder) {
     }
   }
 
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private BiFunction<Schema, TypeDescription, OrcRowWriter<?>> 
createWriterFunc = null;
+    private Schema rowSchema = null;
+    private PartitionSpec spec = null;
+    private StructLike partition = null;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+    private SortOrder sortOrder;
+    private Function<CharSequence, ?> pathTransformFunc = Function.identity();
+
+    private DeleteWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DeleteWriteBuilder forTable(Table table) {
+      rowSchema(table.schema());
+      withSpec(table.spec());
+      setAll(table.properties());
+      metricsConfig(MetricsConfig.fromProperties(table.properties()));
+      return this;
+    }
+
+    public DeleteWriteBuilder set(String property, String value) {
+      appenderBuilder.config(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder setAll(Map<String, String> properties) {
+      appenderBuilder.setAll(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(String property, String value) {
+      appenderBuilder.metadata(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public DeleteWriteBuilder overwrite(boolean enabled) {
+      appenderBuilder.overwrite(enabled);
+      return this;
+    }
+
+    public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+      // TODO: keep full metrics for position delete file columns
+      appenderBuilder.metricsConfig(newMetricsConfig);
+      return this;
+    }
+
+    public DeleteWriteBuilder createWriterFunc(BiFunction<Schema, 
TypeDescription, OrcRowWriter<?>> newWriterFunc) {
+      this.createWriterFunc = newWriterFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder rowSchema(Schema newSchema) {
+      this.rowSchema = newSchema;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+      this.spec = newSpec;
+      return this;
+    }
+
+    public DeleteWriteBuilder withPartition(StructLike key) {
+      this.partition = key;
+      return this;
+    }
+
+    public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+      this.keyMetadata = metadata;
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+      this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+      this.equalityFieldIds = fieldIds;
+      return this;
+    }
+
+    public DeleteWriteBuilder transformPaths(Function<CharSequence, ?> 
newPathTransformFunc) {
+      this.pathTransformFunc = newPathTransformFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
+      this.sortOrder = newSortOrder;
+      return this;
+    }
+
+    public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws 
IOException {
+      Preconditions.checkState(rowSchema != null, "Cannot create equality 
delete file without a schema`");
+      Preconditions.checkState(equalityFieldIds != null, "Cannot create 
equality delete file without delete field ids");
+      Preconditions.checkState(createWriterFunc != null,
+          "Cannot create equality delete file unless createWriterFunc is set");
+      Preconditions.checkArgument(spec != null,
+          "Spec must not be null when creating equality delete writer");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null for partitioned writes");
+
+      meta("delete-type", "equality");
+      meta("delete-field-ids", IntStream.of(equalityFieldIds)
+          .mapToObj(Objects::toString)
+          .collect(Collectors.joining(", ")));
+
+      // the appender uses the row schema without extra columns
+      appenderBuilder.schema(rowSchema);
+      appenderBuilder.createWriterFunc(createWriterFunc);
+
+      return new EqualityDeleteWriter<>(
+          appenderBuilder.build(), FileFormat.ORC, location, spec, partition, 
keyMetadata,
+          sortOrder, equalityFieldIds);
+    }
+
+    public <T> PositionDeleteWriter<T> buildPositionWriter() throws 
IOException {

Review comment:
       nit: do we need to declare `IOException` here?

##########
File path: orc/src/main/java/org/apache/iceberg/orc/ORC.java
##########
@@ -223,6 +232,161 @@ public DataWriteBuilder withSortOrder(SortOrder 
newSortOrder) {
     }
   }
 
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private BiFunction<Schema, TypeDescription, OrcRowWriter<?>> 
createWriterFunc = null;
+    private Schema rowSchema = null;
+    private PartitionSpec spec = null;
+    private StructLike partition = null;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+    private SortOrder sortOrder;
+    private Function<CharSequence, ?> pathTransformFunc = Function.identity();
+
+    private DeleteWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DeleteWriteBuilder forTable(Table table) {
+      rowSchema(table.schema());
+      withSpec(table.spec());
+      setAll(table.properties());
+      metricsConfig(MetricsConfig.fromProperties(table.properties()));
+      return this;
+    }
+
+    public DeleteWriteBuilder set(String property, String value) {
+      appenderBuilder.config(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder setAll(Map<String, String> properties) {
+      appenderBuilder.setAll(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(String property, String value) {
+      appenderBuilder.metadata(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public DeleteWriteBuilder overwrite(boolean enabled) {
+      appenderBuilder.overwrite(enabled);
+      return this;
+    }
+
+    public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+      // TODO: keep full metrics for position delete file columns
+      appenderBuilder.metricsConfig(newMetricsConfig);
+      return this;
+    }
+
+    public DeleteWriteBuilder createWriterFunc(BiFunction<Schema, 
TypeDescription, OrcRowWriter<?>> newWriterFunc) {
+      this.createWriterFunc = newWriterFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder rowSchema(Schema newSchema) {
+      this.rowSchema = newSchema;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+      this.spec = newSpec;
+      return this;
+    }
+
+    public DeleteWriteBuilder withPartition(StructLike key) {
+      this.partition = key;
+      return this;
+    }
+
+    public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+      this.keyMetadata = metadata;
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+      this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+      this.equalityFieldIds = fieldIds;
+      return this;
+    }
+
+    public DeleteWriteBuilder transformPaths(Function<CharSequence, ?> 
newPathTransformFunc) {
+      this.pathTransformFunc = newPathTransformFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
+      this.sortOrder = newSortOrder;
+      return this;
+    }
+
+    public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws 
IOException {
+      Preconditions.checkState(rowSchema != null, "Cannot create equality 
delete file without a schema`");
+      Preconditions.checkState(equalityFieldIds != null, "Cannot create 
equality delete file without delete field ids");
+      Preconditions.checkState(createWriterFunc != null,
+          "Cannot create equality delete file unless createWriterFunc is set");
+      Preconditions.checkArgument(spec != null,
+          "Spec must not be null when creating equality delete writer");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null for partitioned writes");
+
+      meta("delete-type", "equality");
+      meta("delete-field-ids", IntStream.of(equalityFieldIds)
+          .mapToObj(Objects::toString)
+          .collect(Collectors.joining(", ")));
+
+      // the appender uses the row schema without extra columns
+      appenderBuilder.schema(rowSchema);
+      appenderBuilder.createWriterFunc(createWriterFunc);
+
+      return new EqualityDeleteWriter<>(
+          appenderBuilder.build(), FileFormat.ORC, location, spec, partition, 
keyMetadata,
+          sortOrder, equalityFieldIds);
+    }
+
+    public <T> PositionDeleteWriter<T> buildPositionWriter() throws 
IOException {
+      Preconditions.checkState(equalityFieldIds == null, "Cannot create 
position delete file using delete field ids");
+      Preconditions.checkArgument(spec != null,
+          "Spec must not be null when creating position delete writer");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null for partitioned writes");
+
+      meta("delete-type", "position");
+
+      Schema deleteSchema =
+          rowSchema == null ? DeleteSchemaUtil.pathPosSchema() : 
DeleteSchemaUtil.posDeleteSchema(rowSchema);

Review comment:
       I think you can use `DeleteSchemaUtil.posDeleteSchema` directly. It will 
check if `rowSchema` is null for you. Then this will fit on a single line.

##########
File path: orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
##########
@@ -531,6 +543,34 @@ public void nonNullWrite(int rowId, Map<K, V> map, 
ColumnVector output) {
     }
   }
 
+  private static class DeleteWriter implements 
OrcRowWriter<PositionDelete<Record>> {

Review comment:
       I think the naming has to be a bit more specific as this is a position 
delete.
   Also, it should probably accept a path transformation function.

##########
File path: orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
##########
@@ -531,6 +543,34 @@ public void nonNullWrite(int rowId, Map<K, V> map, 
ColumnVector output) {
     }
   }
 
+  private static class DeleteWriter implements 
OrcRowWriter<PositionDelete<Record>> {

Review comment:
       I think the naming has to be a bit more specific as this is a position 
delete writer.
   Also, it should probably accept a path transformation function.

##########
File path: orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
##########
@@ -531,6 +543,34 @@ public void nonNullWrite(int rowId, Map<K, V> map, 
ColumnVector output) {
     }
   }
 
+  private static class DeleteWriter implements 
OrcRowWriter<PositionDelete<Record>> {
+    private final OrcRowWriter<Record> rowWriter;
+    private final Schema deleteSchema;
+    private final GenericRecord record;
+
+    DeleteWriter(Schema deleteSchema, OrcRowWriter rowWriter) {
+      this.deleteSchema = deleteSchema;

Review comment:
       Do we need to pass `deleteSchema` here?
   I think it is only used to construct `rowWriter` that we can build in 
`ORC#buildPositionDeleteWriter`.

##########
File path: orc/src/main/java/org/apache/iceberg/orc/ORC.java
##########
@@ -223,6 +232,161 @@ public DataWriteBuilder withSortOrder(SortOrder 
newSortOrder) {
     }
   }
 
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private BiFunction<Schema, TypeDescription, OrcRowWriter<?>> 
createWriterFunc = null;
+    private Schema rowSchema = null;
+    private PartitionSpec spec = null;
+    private StructLike partition = null;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+    private SortOrder sortOrder;
+    private Function<CharSequence, ?> pathTransformFunc = Function.identity();
+
+    private DeleteWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DeleteWriteBuilder forTable(Table table) {
+      rowSchema(table.schema());
+      withSpec(table.spec());
+      setAll(table.properties());
+      metricsConfig(MetricsConfig.fromProperties(table.properties()));
+      return this;
+    }
+
+    public DeleteWriteBuilder set(String property, String value) {
+      appenderBuilder.config(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder setAll(Map<String, String> properties) {
+      appenderBuilder.setAll(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(String property, String value) {
+      appenderBuilder.metadata(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public DeleteWriteBuilder overwrite(boolean enabled) {
+      appenderBuilder.overwrite(enabled);
+      return this;
+    }
+
+    public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+      // TODO: keep full metrics for position delete file columns
+      appenderBuilder.metricsConfig(newMetricsConfig);
+      return this;
+    }
+
+    public DeleteWriteBuilder createWriterFunc(BiFunction<Schema, 
TypeDescription, OrcRowWriter<?>> newWriterFunc) {
+      this.createWriterFunc = newWriterFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder rowSchema(Schema newSchema) {
+      this.rowSchema = newSchema;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+      this.spec = newSpec;
+      return this;
+    }
+
+    public DeleteWriteBuilder withPartition(StructLike key) {
+      this.partition = key;
+      return this;
+    }
+
+    public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+      this.keyMetadata = metadata;
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+      this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+      this.equalityFieldIds = fieldIds;
+      return this;
+    }
+
+    public DeleteWriteBuilder transformPaths(Function<CharSequence, ?> 
newPathTransformFunc) {
+      this.pathTransformFunc = newPathTransformFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
+      this.sortOrder = newSortOrder;
+      return this;
+    }
+
+    public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws 
IOException {
+      Preconditions.checkState(rowSchema != null, "Cannot create equality 
delete file without a schema`");
+      Preconditions.checkState(equalityFieldIds != null, "Cannot create 
equality delete file without delete field ids");
+      Preconditions.checkState(createWriterFunc != null,
+          "Cannot create equality delete file unless createWriterFunc is set");
+      Preconditions.checkArgument(spec != null,
+          "Spec must not be null when creating equality delete writer");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null for partitioned writes");
+
+      meta("delete-type", "equality");
+      meta("delete-field-ids", IntStream.of(equalityFieldIds)
+          .mapToObj(Objects::toString)
+          .collect(Collectors.joining(", ")));
+
+      // the appender uses the row schema without extra columns
+      appenderBuilder.schema(rowSchema);
+      appenderBuilder.createWriterFunc(createWriterFunc);
+
+      return new EqualityDeleteWriter<>(
+          appenderBuilder.build(), FileFormat.ORC, location, spec, partition, 
keyMetadata,
+          sortOrder, equalityFieldIds);
+    }
+
+    public <T> PositionDeleteWriter<T> buildPositionWriter() throws 
IOException {
+      Preconditions.checkState(equalityFieldIds == null, "Cannot create 
position delete file using delete field ids");
+      Preconditions.checkArgument(spec != null,
+          "Spec must not be null when creating position delete writer");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null for partitioned writes");
+
+      meta("delete-type", "position");
+
+      Schema deleteSchema =
+          rowSchema == null ? DeleteSchemaUtil.pathPosSchema() : 
DeleteSchemaUtil.posDeleteSchema(rowSchema);
+      appenderBuilder.schema(deleteSchema);
+
+      if (createWriterFunc != null) {
+        appenderBuilder.createWriterFunc((schema, typeDescription) -> {
+          OrcRowWriter<?> writer = createWriterFunc.apply(deleteSchema, 
typeDescription);

Review comment:
       I am a bit confused here. I'd probably align the design and logic with 
`PositionDeleteStructWriter` in `Parquet`.

##########
File path: orc/src/main/java/org/apache/iceberg/orc/ORC.java
##########
@@ -223,6 +232,161 @@ public DataWriteBuilder withSortOrder(SortOrder 
newSortOrder) {
     }
   }
 
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private BiFunction<Schema, TypeDescription, OrcRowWriter<?>> 
createWriterFunc = null;
+    private Schema rowSchema = null;
+    private PartitionSpec spec = null;
+    private StructLike partition = null;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+    private SortOrder sortOrder;
+    private Function<CharSequence, ?> pathTransformFunc = Function.identity();
+
+    private DeleteWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DeleteWriteBuilder forTable(Table table) {
+      rowSchema(table.schema());
+      withSpec(table.spec());
+      setAll(table.properties());
+      metricsConfig(MetricsConfig.fromProperties(table.properties()));
+      return this;
+    }
+
+    public DeleteWriteBuilder set(String property, String value) {
+      appenderBuilder.config(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder setAll(Map<String, String> properties) {
+      appenderBuilder.setAll(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(String property, String value) {
+      appenderBuilder.metadata(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public DeleteWriteBuilder overwrite(boolean enabled) {
+      appenderBuilder.overwrite(enabled);
+      return this;
+    }
+
+    public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+      // TODO: keep full metrics for position delete file columns
+      appenderBuilder.metricsConfig(newMetricsConfig);
+      return this;
+    }
+
+    public DeleteWriteBuilder createWriterFunc(BiFunction<Schema, 
TypeDescription, OrcRowWriter<?>> newWriterFunc) {
+      this.createWriterFunc = newWriterFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder rowSchema(Schema newSchema) {
+      this.rowSchema = newSchema;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+      this.spec = newSpec;
+      return this;
+    }
+
+    public DeleteWriteBuilder withPartition(StructLike key) {
+      this.partition = key;
+      return this;
+    }
+
+    public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+      this.keyMetadata = metadata;
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+      this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+      this.equalityFieldIds = fieldIds;
+      return this;
+    }
+
+    public DeleteWriteBuilder transformPaths(Function<CharSequence, ?> 
newPathTransformFunc) {
+      this.pathTransformFunc = newPathTransformFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
+      this.sortOrder = newSortOrder;
+      return this;
+    }
+
+    public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws 
IOException {
+      Preconditions.checkState(rowSchema != null, "Cannot create equality 
delete file without a schema`");
+      Preconditions.checkState(equalityFieldIds != null, "Cannot create 
equality delete file without delete field ids");
+      Preconditions.checkState(createWriterFunc != null,
+          "Cannot create equality delete file unless createWriterFunc is set");
+      Preconditions.checkArgument(spec != null,
+          "Spec must not be null when creating equality delete writer");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null for partitioned writes");
+
+      meta("delete-type", "equality");
+      meta("delete-field-ids", IntStream.of(equalityFieldIds)
+          .mapToObj(Objects::toString)
+          .collect(Collectors.joining(", ")));
+
+      // the appender uses the row schema without extra columns
+      appenderBuilder.schema(rowSchema);
+      appenderBuilder.createWriterFunc(createWriterFunc);
+
+      return new EqualityDeleteWriter<>(
+          appenderBuilder.build(), FileFormat.ORC, location, spec, partition, 
keyMetadata,
+          sortOrder, equalityFieldIds);
+    }
+
+    public <T> PositionDeleteWriter<T> buildPositionWriter() throws 
IOException {
+      Preconditions.checkState(equalityFieldIds == null, "Cannot create 
position delete file using delete field ids");
+      Preconditions.checkArgument(spec != null,
+          "Spec must not be null when creating position delete writer");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null for partitioned writes");
+
+      meta("delete-type", "position");
+
+      Schema deleteSchema =
+          rowSchema == null ? DeleteSchemaUtil.pathPosSchema() : 
DeleteSchemaUtil.posDeleteSchema(rowSchema);
+      appenderBuilder.schema(deleteSchema);
+
+      if (createWriterFunc != null) {
+        appenderBuilder.createWriterFunc((schema, typeDescription) -> {
+          OrcRowWriter<?> writer = createWriterFunc.apply(deleteSchema, 
typeDescription);

Review comment:
       I am a bit confused here.
   I'd probably align the design and logic with `PositionDeleteStructWriter` in 
`Parquet`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to