openinx commented on a change in pull request #2935:
URL: https://github.com/apache/iceberg/pull/2935#discussion_r715367320
##########
File path: orc/src/main/java/org/apache/iceberg/orc/ORC.java
##########
@@ -223,6 +232,161 @@ public DataWriteBuilder withSortOrder(SortOrder
newSortOrder) {
}
}
+ public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+ return new DeleteWriteBuilder(file);
+ }
+
+ public static class DeleteWriteBuilder {
+ private final WriteBuilder appenderBuilder;
+ private final String location;
+ private BiFunction<Schema, TypeDescription, OrcRowWriter<?>>
createWriterFunc = null;
+ private Schema rowSchema = null;
+ private PartitionSpec spec = null;
+ private StructLike partition = null;
+ private EncryptionKeyMetadata keyMetadata = null;
+ private int[] equalityFieldIds = null;
+ private SortOrder sortOrder;
+ private Function<CharSequence, ?> pathTransformFunc = Function.identity();
+
+ private DeleteWriteBuilder(OutputFile file) {
+ this.appenderBuilder = write(file);
+ this.location = file.location();
+ }
+
+ public DeleteWriteBuilder forTable(Table table) {
+ rowSchema(table.schema());
+ withSpec(table.spec());
+ setAll(table.properties());
+ metricsConfig(MetricsConfig.fromProperties(table.properties()));
+ return this;
+ }
+
+ public DeleteWriteBuilder set(String property, String value) {
+ appenderBuilder.config(property, value);
+ return this;
+ }
+
+ public DeleteWriteBuilder setAll(Map<String, String> properties) {
+ appenderBuilder.setAll(properties);
+ return this;
+ }
+
+ public DeleteWriteBuilder meta(String property, String value) {
+ appenderBuilder.metadata(property, value);
+ return this;
+ }
+
+ public DeleteWriteBuilder overwrite() {
+ return overwrite(true);
+ }
+
+ public DeleteWriteBuilder overwrite(boolean enabled) {
+ appenderBuilder.overwrite(enabled);
+ return this;
+ }
+
+ public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+ // TODO: keep full metrics for position delete file columns
+ appenderBuilder.metricsConfig(newMetricsConfig);
+ return this;
+ }
+
+ public DeleteWriteBuilder createWriterFunc(BiFunction<Schema,
TypeDescription, OrcRowWriter<?>> newWriterFunc) {
+ this.createWriterFunc = newWriterFunc;
+ return this;
+ }
+
+ public DeleteWriteBuilder rowSchema(Schema newSchema) {
+ this.rowSchema = newSchema;
+ return this;
+ }
+
+ public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+ this.spec = newSpec;
+ return this;
+ }
+
+ public DeleteWriteBuilder withPartition(StructLike key) {
+ this.partition = key;
+ return this;
+ }
+
+ public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+ this.keyMetadata = metadata;
+ return this;
+ }
+
+ public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+ this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
+ return this;
+ }
+
+ public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+ this.equalityFieldIds = fieldIds;
+ return this;
+ }
+
+ public DeleteWriteBuilder transformPaths(Function<CharSequence, ?>
newPathTransformFunc) {
+ this.pathTransformFunc = newPathTransformFunc;
+ return this;
+ }
+
+ public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
+ this.sortOrder = newSortOrder;
+ return this;
+ }
+
+ public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws
IOException {
+ Preconditions.checkState(rowSchema != null, "Cannot create equality
delete file without a schema`");
+ Preconditions.checkState(equalityFieldIds != null, "Cannot create
equality delete file without delete field ids");
+ Preconditions.checkState(createWriterFunc != null,
+ "Cannot create equality delete file unless createWriterFunc is set");
+ Preconditions.checkArgument(spec != null,
+ "Spec must not be null when creating equality delete writer");
+ Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+ "Partition must not be null for partitioned writes");
+
+ meta("delete-type", "equality");
+ meta("delete-field-ids", IntStream.of(equalityFieldIds)
+ .mapToObj(Objects::toString)
+ .collect(Collectors.joining(", ")));
+
+ // the appender uses the row schema without extra columns
+ appenderBuilder.schema(rowSchema);
+ appenderBuilder.createWriterFunc(createWriterFunc);
+
+ return new EqualityDeleteWriter<>(
+ appenderBuilder.build(), FileFormat.ORC, location, spec, partition,
keyMetadata,
+ sortOrder, equalityFieldIds);
+ }
+
+ public <T> PositionDeleteWriter<T> buildPositionWriter() throws
IOException {
+ Preconditions.checkState(equalityFieldIds == null, "Cannot create
position delete file using delete field ids");
+ Preconditions.checkArgument(spec != null,
+ "Spec must not be null when creating position delete writer");
+ Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+ "Partition must not be null for partitioned writes");
+
+ meta("delete-type", "position");
+
+ Schema deleteSchema =
+ rowSchema == null ? DeleteSchemaUtil.pathPosSchema() :
DeleteSchemaUtil.posDeleteSchema(rowSchema);
+ appenderBuilder.schema(deleteSchema);
+
+ if (createWriterFunc != null) {
+ appenderBuilder.createWriterFunc((schema, typeDescription) -> {
+ OrcRowWriter<?> writer = createWriterFunc.apply(deleteSchema,
typeDescription);
Review comment:
@pvary @aokolnychyi , The way why we need to introduce a
`pathTransformFunc` is because the parquet are exposing API to load columns by
column description rather than column id, I think this
[comment](https://github.com/apache/iceberg/pull/1836#discussion_r531474478)
will provide the fully background.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]