rdblue commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798756710
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
##########
@@ -407,6 +428,216 @@ public void close() throws IOException {
}
}
+ private static class UnpartitionedDeltaWriter extends BaseDeltaWriter {
+ private final PositionDeltaWriter<InternalRow> delegate;
+ private final FileIO io;
+ private final PartitionSpec dataSpec;
+ private final Map<Integer, PartitionSpec> specs;
+ private final InternalRowWrapper deletePartitionRowWrapper;
+ private final Map<Integer, StructProjection> deletePartitionProjections;
+ private final int specIdOrdinal;
+ private final int partitionOrdinal;
+ private final int fileOrdinal;
+ private final int positionOrdinal;
+
+ private boolean closed = false;
+
+ UnpartitionedDeltaWriter(Table table, SparkFileWriterFactory writerFactory,
+ OutputFileFactory dataFileFactory,
OutputFileFactory deleteFileFactory,
+ Context context) {
+
+ Preconditions.checkArgument(table.spec().isUnpartitioned(), "Data spec
must be unpartitioned");
+
+ ClusteredDataWriter<InternalRow> insertWriter = new
ClusteredDataWriter<>(
+ writerFactory, dataFileFactory, table.io(),
+ context.dataFileFormat(), context.targetDataFileSize());
+
+ ClusteredDataWriter<InternalRow> updateWriter = new
ClusteredDataWriter<>(
+ writerFactory, dataFileFactory, table.io(),
+ context.dataFileFormat(), context.targetDataFileSize());
+
+ ClusteredPositionDeleteWriter<InternalRow> deleteWriter = new
ClusteredPositionDeleteWriter<>(
+ writerFactory, deleteFileFactory, table.io(),
+ context.deleteFileFormat(), context.targetDeleteFileSize());
+
+ this.delegate = new BasePositionDeltaWriter<>(insertWriter,
updateWriter, deleteWriter);
+ this.io = table.io();
+ this.dataSpec = table.spec();
+ this.specs = table.specs();
+
+ Types.StructType partitionType = Partitioning.partitionType(table);
+ this.deletePartitionRowWrapper = initPartitionRowWrapper(partitionType);
+ this.deletePartitionProjections =
buildPartitionProjections(partitionType, specs);
+
+ this.specIdOrdinal =
context.metadataSparkType().fieldIndex(MetadataColumns.SPEC_ID.name());
+ this.partitionOrdinal =
context.metadataSparkType().fieldIndex(MetadataColumns.PARTITION_COLUMN_NAME);
+ this.fileOrdinal =
context.deleteSparkType().fieldIndex(MetadataColumns.FILE_PATH.name());
+ this.positionOrdinal =
context.deleteSparkType().fieldIndex(MetadataColumns.ROW_POSITION.name());
+ }
+
+ @Override
+ public void delete(InternalRow meta, InternalRow id) throws IOException {
+ int specId = meta.getInt(specIdOrdinal);
+ PartitionSpec spec = specs.get(specId);
+
+ InternalRow partition = meta.getStruct(partitionOrdinal,
deletePartitionRowWrapper.size());
+ StructProjection partitionProjection =
deletePartitionProjections.get(specId);
+ partitionProjection.wrap(deletePartitionRowWrapper.wrap(partition));
+
+ String file = id.getString(fileOrdinal);
+ long position = id.getLong(positionOrdinal);
+ delegate.delete(file, position, spec, partitionProjection);
+ }
+
+ @Override
+ public void update(InternalRow meta, InternalRow id, InternalRow row)
throws IOException {
+ delete(meta, id);
+ delegate.update(row, dataSpec, null);
Review comment:
`dataSpec` is required here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]