openinx commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468397133
##########
File path: core/src/main/java/org/apache/iceberg/avro/Avro.java
##########
@@ -156,11 +170,175 @@ private CodecFactory codec() {
Preconditions.checkNotNull(schema, "Schema is required");
Preconditions.checkNotNull(name, "Table name is required and cannot be
null");
+ Function<Schema, DatumWriter<?>> writerFunc;
+ if (createWriterFunc != null) {
+ writerFunc = createWriterFunc;
+ } else {
+ writerFunc = GenericAvroWriter::new;
+ }
+
// add the Iceberg schema to keyValueMetadata
meta("iceberg.schema", SchemaParser.toJson(schema));
return new AvroFileAppender<>(
- AvroSchemaUtil.convert(schema, name), file, createWriterFunc,
codec(), metadata, overwrite);
+ AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(),
metadata, overwrite);
+ }
+ }
+
+ public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+ return new DeleteWriteBuilder(file);
+ }
+
+ public static class DeleteWriteBuilder {
+ private final WriteBuilder appenderBuilder;
+ private final String location;
+ private Function<Schema, DatumWriter<?>> createWriterFunc = null;
+ private org.apache.iceberg.Schema rowSchema;
+ private PartitionSpec spec;
+ private StructLike partition;
+ private EncryptionKeyMetadata keyMetadata = null;
+ private int[] equalityFieldIds = null;
+
+ private DeleteWriteBuilder(OutputFile file) {
+ this.appenderBuilder = write(file);
+ this.location = file.location();
+ }
+
+ public DeleteWriteBuilder forTable(Table table) {
+ rowSchema(table.schema());
+ setAll(table.properties());
+ return this;
+ }
+
+ public DeleteWriteBuilder set(String property, String value) {
+ appenderBuilder.set(property, value);
+ return this;
+ }
+
+ public DeleteWriteBuilder setAll(Map<String, String> properties) {
+ appenderBuilder.setAll(properties);
+ return this;
+ }
+
+ public DeleteWriteBuilder meta(String property, String value) {
+ appenderBuilder.meta(property, value);
+ return this;
+ }
+
+ public DeleteWriteBuilder meta(Map<String, String> properties) {
+ appenderBuilder.meta(properties);
+ return this;
+ }
+
+ public DeleteWriteBuilder overwrite() {
+ return overwrite(true);
+ }
+
+ public DeleteWriteBuilder overwrite(boolean enabled) {
+ appenderBuilder.overwrite(enabled);
+ return this;
+ }
+
+ public DeleteWriteBuilder createWriterFunc(Function<Schema,
DatumWriter<?>> writerFunction) {
+ this.createWriterFunc = writerFunction;
+ return this;
+ }
+
+ public DeleteWriteBuilder rowSchema(org.apache.iceberg.Schema
newRowSchema) {
+ this.rowSchema = newRowSchema;
+ return this;
+ }
+
+ public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+ this.spec = newSpec;
+ return this;
+ }
+
+ public DeleteWriteBuilder withPartition(StructLike key) {
+ this.partition = key;
+ return this;
+ }
+
+ public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+ this.keyMetadata = metadata;
+ return this;
+ }
+
+ public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+ this.equalityFieldIds = fieldIds.stream().mapToInt(id -> id).toArray();
+ return this;
+ }
+
+ public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+ this.equalityFieldIds = fieldIds;
+ return this;
+ }
+
+ public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws
IOException {
+ Preconditions.checkState(equalityFieldIds != null, "Cannot create
equality delete file without delete field ids");
+
+ set("delete-type", "equality");
+ set("delete-field-ids", IntStream.of(equalityFieldIds)
+ .mapToObj(Objects::toString)
+ .collect(Collectors.joining(", ")));
+
+ // the appender uses the row schema without extra columns
+ appenderBuilder.schema(rowSchema);
+ if (createWriterFunc != null) {
+ appenderBuilder.createWriterFunc(createWriterFunc);
+ }
+
+ return new EqualityDeleteWriter<>(
+ appenderBuilder.build(), FileFormat.AVRO, location, spec, partition,
keyMetadata, equalityFieldIds);
+ }
+
+ public <T> PositionDeleteWriter<T> buildPositionWriter() throws
IOException {
+ Preconditions.checkState(equalityFieldIds == null, "Cannot create
position delete file using delete field ids");
+
+ set("delete-type", "position");
+
+ // the appender uses the row schema wrapped with position fields
+ appenderBuilder.schema(new org.apache.iceberg.Schema(
+ MetadataColumns.DELETE_FILE_PATH,
+ MetadataColumns.DELETE_FILE_POS,
+ NestedField.optional(
+ MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row",
rowSchema.asStruct(),
+ MetadataColumns.DELETE_FILE_ROW_DOC)));
+
+ if (createWriterFunc != null) {
+ appenderBuilder.createWriterFunc(
+ avroSchema -> new
PositionDatumWriter<>(createWriterFunc.apply(avroSchema)));
+ }
+
+ return new PositionDeleteWriter<>(
+ appenderBuilder.build(), FileFormat.AVRO, location, spec, partition,
keyMetadata, rowSchema.asStruct());
+ }
+ }
+
+ /**
+ * A {@link DatumWriter} implementation that wraps another to produce
position delete rows.
+ *
+ * @param <D> the type of datum written as a deleted row
+ */
+ private static class PositionDatumWriter<D> implements
DatumWriter<PositionDelete<D>> {
+ private static final ValueWriter<Object> PATH_WRITER =
ValueWriters.strings();
+ private static final ValueWriter<Long> POS_WRITER = ValueWriters.longs();
+ private final DatumWriter<D> rowWriter;
+
+ private PositionDatumWriter(DatumWriter<D> rowWriter) {
+ this.rowWriter = rowWriter;
+ }
+
+ @Override
+ public void setSchema(Schema schema) {
+ rowWriter.setSchema(schema.getField("row").schema());
Review comment:
I misunderstood here, although the row field is an optional field,
`schema.getFiled("row")` won't be null and it will be the optional row field,
please ignore this comment.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]