openinx commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468398258
##########
File path: core/src/main/java/org/apache/iceberg/avro/Avro.java
##########
@@ -156,11 +170,175 @@ private CodecFactory codec() {
Preconditions.checkNotNull(schema, "Schema is required");
Preconditions.checkNotNull(name, "Table name is required and cannot be
null");
+ Function<Schema, DatumWriter<?>> writerFunc;
+ if (createWriterFunc != null) {
+ writerFunc = createWriterFunc;
+ } else {
+ writerFunc = GenericAvroWriter::new;
+ }
+
// add the Iceberg schema to keyValueMetadata
meta("iceberg.schema", SchemaParser.toJson(schema));
return new AvroFileAppender<>(
- AvroSchemaUtil.convert(schema, name), file, createWriterFunc,
codec(), metadata, overwrite);
+ AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(),
metadata, overwrite);
+ }
+ }
+
+ public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+ return new DeleteWriteBuilder(file);
+ }
+
+ public static class DeleteWriteBuilder {
+ private final WriteBuilder appenderBuilder;
+ private final String location;
+ private Function<Schema, DatumWriter<?>> createWriterFunc = null;
+ private org.apache.iceberg.Schema rowSchema;
+ private PartitionSpec spec;
+ private StructLike partition;
+ private EncryptionKeyMetadata keyMetadata = null;
+ private int[] equalityFieldIds = null;
+
+ private DeleteWriteBuilder(OutputFile file) {
+ this.appenderBuilder = write(file);
+ this.location = file.location();
+ }
+
+ public DeleteWriteBuilder forTable(Table table) {
+ rowSchema(table.schema());
+ setAll(table.properties());
+ return this;
+ }
+
+ public DeleteWriteBuilder set(String property, String value) {
+ appenderBuilder.set(property, value);
+ return this;
+ }
+
+ public DeleteWriteBuilder setAll(Map<String, String> properties) {
+ appenderBuilder.setAll(properties);
+ return this;
+ }
+
+ public DeleteWriteBuilder meta(String property, String value) {
+ appenderBuilder.meta(property, value);
+ return this;
+ }
+
+ public DeleteWriteBuilder meta(Map<String, String> properties) {
+ appenderBuilder.meta(properties);
+ return this;
+ }
+
+ public DeleteWriteBuilder overwrite() {
+ return overwrite(true);
+ }
+
+ public DeleteWriteBuilder overwrite(boolean enabled) {
+ appenderBuilder.overwrite(enabled);
+ return this;
+ }
+
+ public DeleteWriteBuilder createWriterFunc(Function<Schema,
DatumWriter<?>> writerFunction) {
+ this.createWriterFunc = writerFunction;
+ return this;
+ }
+
+ public DeleteWriteBuilder rowSchema(org.apache.iceberg.Schema
newRowSchema) {
+ this.rowSchema = newRowSchema;
+ return this;
+ }
+
+ public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+ this.spec = newSpec;
+ return this;
+ }
+
+ public DeleteWriteBuilder withPartition(StructLike key) {
+ this.partition = key;
+ return this;
+ }
+
+ public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+ this.keyMetadata = metadata;
+ return this;
+ }
+
+ public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+ this.equalityFieldIds = fieldIds.stream().mapToInt(id -> id).toArray();
+ return this;
+ }
+
+ public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+ this.equalityFieldIds = fieldIds;
+ return this;
+ }
+
+ public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws
IOException {
+ Preconditions.checkState(equalityFieldIds != null, "Cannot create
equality delete file without delete field ids");
+
+ set("delete-type", "equality");
+ set("delete-field-ids", IntStream.of(equalityFieldIds)
+ .mapToObj(Objects::toString)
+ .collect(Collectors.joining(", ")));
+
+ // the appender uses the row schema without extra columns
+ appenderBuilder.schema(rowSchema);
+ if (createWriterFunc != null) {
+ appenderBuilder.createWriterFunc(createWriterFunc);
+ }
+
+ return new EqualityDeleteWriter<>(
+ appenderBuilder.build(), FileFormat.AVRO, location, spec, partition,
keyMetadata, equalityFieldIds);
+ }
+
+ public <T> PositionDeleteWriter<T> buildPositionWriter() throws
IOException {
+ Preconditions.checkState(equalityFieldIds == null, "Cannot create
position delete file using delete field ids");
+
+ set("delete-type", "position");
+
+ // the appender uses the row schema wrapped with position fields
+ appenderBuilder.schema(new org.apache.iceberg.Schema(
+ MetadataColumns.DELETE_FILE_PATH,
+ MetadataColumns.DELETE_FILE_POS,
+ NestedField.optional(
+ MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row",
rowSchema.asStruct(),
+ MetadataColumns.DELETE_FILE_ROW_DOC)));
Review comment:
Seems the whole schema construction would be a separate static method
(with argument `rowSchema`), I saw the unit test also did the duplicate schema
construction.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]