rdblue commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798761578
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
##########
@@ -407,6 +428,216 @@ public void close() throws IOException {
}
}
+ private static class UnpartitionedDeltaWriter extends BaseDeltaWriter {
+ private final PositionDeltaWriter<InternalRow> delegate;
+ private final FileIO io;
+ private final PartitionSpec dataSpec;
+ private final Map<Integer, PartitionSpec> specs;
+ private final InternalRowWrapper deletePartitionRowWrapper;
+ private final Map<Integer, StructProjection> deletePartitionProjections;
+ private final int specIdOrdinal;
+ private final int partitionOrdinal;
+ private final int fileOrdinal;
+ private final int positionOrdinal;
+
+ private boolean closed = false;
+
+ UnpartitionedDeltaWriter(Table table, SparkFileWriterFactory writerFactory,
+ OutputFileFactory dataFileFactory,
OutputFileFactory deleteFileFactory,
+ Context context) {
+
+ Preconditions.checkArgument(table.spec().isUnpartitioned(), "Data spec
must be unpartitioned");
+
+ ClusteredDataWriter<InternalRow> insertWriter = new
ClusteredDataWriter<>(
+ writerFactory, dataFileFactory, table.io(),
+ context.dataFileFormat(), context.targetDataFileSize());
+
+ ClusteredDataWriter<InternalRow> updateWriter = new
ClusteredDataWriter<>(
+ writerFactory, dataFileFactory, table.io(),
+ context.dataFileFormat(), context.targetDataFileSize());
+
+ ClusteredPositionDeleteWriter<InternalRow> deleteWriter = new
ClusteredPositionDeleteWriter<>(
+ writerFactory, deleteFileFactory, table.io(),
+ context.deleteFileFormat(), context.targetDeleteFileSize());
+
+ this.delegate = new BasePositionDeltaWriter<>(insertWriter,
updateWriter, deleteWriter);
+ this.io = table.io();
+ this.dataSpec = table.spec();
+ this.specs = table.specs();
+
+ Types.StructType partitionType = Partitioning.partitionType(table);
+ this.deletePartitionRowWrapper = initPartitionRowWrapper(partitionType);
+ this.deletePartitionProjections =
buildPartitionProjections(partitionType, specs);
+
+ this.specIdOrdinal =
context.metadataSparkType().fieldIndex(MetadataColumns.SPEC_ID.name());
+ this.partitionOrdinal =
context.metadataSparkType().fieldIndex(MetadataColumns.PARTITION_COLUMN_NAME);
+ this.fileOrdinal =
context.deleteSparkType().fieldIndex(MetadataColumns.FILE_PATH.name());
+ this.positionOrdinal =
context.deleteSparkType().fieldIndex(MetadataColumns.ROW_POSITION.name());
+ }
+
+ @Override
+ public void delete(InternalRow meta, InternalRow id) throws IOException {
+ int specId = meta.getInt(specIdOrdinal);
+ PartitionSpec spec = specs.get(specId);
+
+ InternalRow partition = meta.getStruct(partitionOrdinal,
deletePartitionRowWrapper.size());
+ StructProjection partitionProjection =
deletePartitionProjections.get(specId);
+ partitionProjection.wrap(deletePartitionRowWrapper.wrap(partition));
+
+ String file = id.getString(fileOrdinal);
+ long position = id.getLong(positionOrdinal);
+ delegate.delete(file, position, spec, partitionProjection);
+ }
+
+ @Override
+ public void update(InternalRow meta, InternalRow id, InternalRow row)
throws IOException {
+ delete(meta, id);
+ delegate.update(row, dataSpec, null);
+ }
+
+ @Override
+ public void insert(InternalRow row) throws IOException {
+ delegate.insert(row, dataSpec, null);
+ }
+
+ @Override
+ public WriterCommitMessage commit() throws IOException {
+ close();
+
+ WriteResult result = delegate.result();
+ return new DeltaTaskCommit(result);
+ }
+
+ @Override
+ public void abort() throws IOException {
+ close();
+
+ WriteResult result = delegate.result();
+ cleanFiles(io, Arrays.asList(result.dataFiles()));
+ cleanFiles(io, Arrays.asList(result.deleteFiles()));
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ delegate.close();
+ this.closed = true;
+ }
+ }
+ }
+
+ private static class PartitionedDeltaWriter extends BaseDeltaWriter {
+ private final PositionDeltaWriter<InternalRow> delegate;
+ private final FileIO io;
+ private final PartitionSpec dataSpec;
+ private final Map<Integer, PartitionSpec> specs;
+ private final InternalRowWrapper deletePartitionRowWrapper;
+ private final Map<Integer, StructProjection> deletePartitionProjections;
+ private final PartitionKey dataPartitionKey;
+ private final InternalRowWrapper internalRowDataWrapper;
+ private final int specIdOrdinal;
+ private final int partitionOrdinal;
+ private final int fileOrdinal;
+ private final int positionOrdinal;
+
+ private boolean closed = false;
+
+ PartitionedDeltaWriter(Table table, SparkFileWriterFactory writerFactory,
+ OutputFileFactory dataFileFactory,
OutputFileFactory deleteFileFactory,
+ Context context) {
+
+ Preconditions.checkArgument(table.spec().isPartitioned(), "Data spec
must be partitioned");
+
+ PartitioningWriter<InternalRow, DataWriteResult> insertWriter;
+ if (context.fanoutWriterEnabled()) {
+ insertWriter = new FanoutDataWriter<>(
+ writerFactory, dataFileFactory, table.io(),
+ context.dataFileFormat(), context.targetDataFileSize());
+ } else {
+ insertWriter = new ClusteredDataWriter<>(
+ writerFactory, dataFileFactory, table.io(),
+ context.dataFileFormat(), context.targetDataFileSize());
+ }
+
+ // always use a separate fanout data writer for updates as they may be
out of order
Review comment:
If I remember correctly, this is because we sort by partition, file, and
pos, but the correct output order depends on the new data. So we always delete
with clustering, but the updated rows may not still conform to the original
clustering. Right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]