rdblue commented on a change in pull request #1818:
URL: https://github.com/apache/iceberg/pull/1818#discussion_r530005447
##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -51,39 +60,149 @@ protected BaseTaskWriter(PartitionSpec spec, FileFormat
format, FileAppenderFact
this.targetFileSize = targetFileSize;
}
+ protected PartitionSpec spec() {
+ return spec;
+ }
+
+ protected FileAppenderFactory<T> appenderFactory() {
+ return appenderFactory;
+ }
+
@Override
public void abort() throws IOException {
close();
// clean up files created by this writer
- Tasks.foreach(completedFiles)
+ Tasks.foreach(Iterables.concat(completedFiles, completedDeletes))
.throwFailureWhenFinished()
.noRetry()
.run(file -> io.deleteFile(file.path().toString()));
}
@Override
- public DataFile[] complete() throws IOException {
+ public WriterResult complete() throws IOException {
close();
- return completedFiles.toArray(new DataFile[0]);
+ return WriterResult.builder()
+ .addDataFiles(completedFiles)
+ .addDeleteFiles(completedDeletes)
+ .build();
+ }
+
+ protected abstract class BaseDeltaWriter implements Closeable {
+ private final RollingFileWriter dataWriter;
+
+ private final boolean enableEqDelete;
+ private RollingEqDeleteWriter eqDeleteWriter = null;
+ private SortedPosDeleteWriter<T> posDeleteWriter = null;
+ private StructLikeMap<FilePos> insertedRowMap = null;
+
+ public BaseDeltaWriter(PartitionKey partition, List<Integer>
equalityFieldIds, Schema schema) {
+ this.dataWriter = new RollingFileWriter(partition);
+
+ this.enableEqDelete = equalityFieldIds != null &&
!equalityFieldIds.isEmpty();
+ if (enableEqDelete) {
+ this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
+ this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory,
fileFactory, format, partition);
+
+ Schema deleteSchema = TypeUtil.select(schema,
Sets.newHashSet(equalityFieldIds));
+ this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
+ }
+ }
+
+ protected abstract StructLike asKey(T row);
+
+ protected abstract StructLike asCopiedKey(T row);
+
+ public void write(T row) throws IOException {
+ if (enableEqDelete) {
+ FilePos filePos = FilePos.create(dataWriter.currentPath(),
dataWriter.currentRows());
+
+ StructLike copiedKey = asCopiedKey(row);
+ // Adding a pos-delete to replace the old filePos.
+ FilePos previous = insertedRowMap.put(copiedKey, filePos);
+ if (previous != null) {
+ posDeleteWriter.delete(previous.path, previous.rowOffset, null /*
TODO set non-nullable row*/);
+ }
+ }
+
+ dataWriter.write(row);
+ }
+
+ public void delete(T row) throws IOException {
+ Preconditions.checkState(enableEqDelete, "Could not accept equality
deletion.");
+
+ StructLike key = asKey(row);
+ FilePos previous = insertedRowMap.remove(key);
+
+ if (previous != null) {
+ posDeleteWriter.delete(previous.path, previous.rowOffset, null /* TODO
set non-nullable row */);
+ }
+
+ eqDeleteWriter.write(row);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Moving the completed data files into task writer's completedFiles
automatically.
+ dataWriter.close();
Review comment:
Minor: `dataWriter` should be set to `null` so that it can be garbage
collected and so any further calls to write will fail.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]