openinx commented on a change in pull request #3250:
URL: https://github.com/apache/iceberg/pull/3250#discussion_r730835792
##########
File path: orc/src/main/java/org/apache/iceberg/orc/ORC.java
##########
@@ -223,6 +245,157 @@ public DataWriteBuilder withSortOrder(SortOrder
newSortOrder) {
}
}
+ public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+ return new DeleteWriteBuilder(file);
+ }
+
+ public static class DeleteWriteBuilder {
+ private final WriteBuilder appenderBuilder;
+ private final String location;
+ private BiFunction<Schema, TypeDescription, OrcRowWriter<?>>
createWriterFunc = null;
+ private Schema rowSchema = null;
+ private PartitionSpec spec = null;
+ private StructLike partition = null;
+ private EncryptionKeyMetadata keyMetadata = null;
+ private int[] equalityFieldIds = null;
+ private SortOrder sortOrder;
+ private Function<CharSequence, ?> pathTransformFunc = Function.identity();
+
+ private DeleteWriteBuilder(OutputFile file) {
+ this.appenderBuilder = write(file);
+ this.location = file.location();
+ }
+
+ public DeleteWriteBuilder forTable(Table table) {
+ rowSchema(table.schema());
+ withSpec(table.spec());
+ setAll(table.properties());
+ metricsConfig(MetricsConfig.forTable(table));
+ return this;
+ }
+
+ public DeleteWriteBuilder set(String property, String value) {
+ appenderBuilder.set(property, value);
+ return this;
+ }
+
+ public DeleteWriteBuilder setAll(Map<String, String> properties) {
+ appenderBuilder.setAll(properties);
+ return this;
+ }
+
+ public DeleteWriteBuilder meta(String property, String value) {
+ appenderBuilder.metadata(property, value);
+ return this;
+ }
+
+ public DeleteWriteBuilder overwrite() {
+ return overwrite(true);
+ }
+
+ public DeleteWriteBuilder overwrite(boolean enabled) {
+ appenderBuilder.overwrite(enabled);
+ return this;
+ }
+
+ public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+ appenderBuilder.metricsConfig(newMetricsConfig);
+ return this;
+ }
+
+ public DeleteWriteBuilder createWriterFunc(BiFunction<Schema,
TypeDescription, OrcRowWriter<?>> newWriterFunc) {
+ this.createWriterFunc = newWriterFunc;
+ return this;
+ }
+
+ public DeleteWriteBuilder rowSchema(Schema newSchema) {
+ this.rowSchema = newSchema;
+ return this;
+ }
+
+ public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+ this.spec = newSpec;
+ return this;
+ }
+
+ public DeleteWriteBuilder withPartition(StructLike key) {
+ this.partition = key;
+ return this;
+ }
+
+ public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+ this.keyMetadata = metadata;
+ return this;
+ }
+
+ public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+ this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
+ return this;
+ }
+
+ public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+ this.equalityFieldIds = fieldIds;
+ return this;
+ }
+
+ public DeleteWriteBuilder transformPaths(Function<CharSequence, ?>
newPathTransformFunc) {
+ this.pathTransformFunc = newPathTransformFunc;
+ return this;
+ }
+
+ public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
+ this.sortOrder = newSortOrder;
+ return this;
+ }
+
+ public <T> EqualityDeleteWriter<T> buildEqualityWriter() {
+ Preconditions.checkState(rowSchema != null, "Cannot create equality
delete file without a schema`");
+ Preconditions.checkState(equalityFieldIds != null, "Cannot create
equality delete file without delete field ids");
+ Preconditions.checkState(createWriterFunc != null,
+ "Cannot create equality delete file unless createWriterFunc is set");
+ Preconditions.checkArgument(spec != null,
+ "Spec must not be null when creating equality delete writer");
+ Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+ "Partition must not be null for partitioned writes");
+
+ meta("delete-type", "equality");
+ meta("delete-field-ids", IntStream.of(equalityFieldIds)
+ .mapToObj(Objects::toString)
+ .collect(Collectors.joining(", ")));
+
+ // the appender uses the row schema without extra columns
+ appenderBuilder.schema(rowSchema);
+ appenderBuilder.createWriterFunc(createWriterFunc);
+
+ return new EqualityDeleteWriter<>(
+ appenderBuilder.build(), FileFormat.ORC, location, spec, partition,
keyMetadata,
+ sortOrder, equalityFieldIds);
+ }
+
+ public <T> PositionDeleteWriter<T> buildPositionWriter() {
+ Preconditions.checkState(equalityFieldIds == null, "Cannot create
position delete file using delete field ids");
+ Preconditions.checkArgument(spec != null, "Spec must not be null when
creating position delete writer");
+ Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+ "Partition must not be null for partitioned writes");
+
+ meta("delete-type", "position");
+
+ Schema deleteSchema = DeleteSchemaUtil.posDeleteSchema(rowSchema);
+ appenderBuilder.schema(deleteSchema);
+
+ if (createWriterFunc != null) {
+ appenderBuilder.createWriterFunc((schema, typeDescription) ->
+
GenericOrcWriters.positionDelete(createWriterFunc.apply(deleteSchema,
typeDescription), pathTransformFunc));
Review comment:
Looks like it's
[here](https://github.com/apache/iceberg/blob/1b920e2945d6fc48661735e884e4f1919e9cdf86/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java#L489)
where we've set the rowSchema by default, while in fact we shouldn't use the
`table.schema()` as `rowSchema` when building the parquet posDeleteWriter by
default. I will suggest to use the following to construct the PositionWriter:
```java
PositionDeleteWriter<?> writer = Parquet.writeDeletes(out)
.withSpec(table.spec());
.setAll(table.properties());
.metricsConfig(MetricsConfig.forTable(table))
.withPartition(partition)
.overwrite()
.buildPositionWriter();
```
And if people plan to use `forTable(table)` to construct the position
writer, then the `Preconditions.checkArgument(rowSchema == null ||
createWriterFunc != null)` will remind the devs to add `createWriterFunc` or
fallback to use the separate setters.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]