aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799865337



##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
##########
@@ -407,6 +428,216 @@ public void close() throws IOException {
     }
   }
 
+  private static class UnpartitionedDeltaWriter extends BaseDeltaWriter {
+    private final PositionDeltaWriter<InternalRow> delegate;
+    private final FileIO io;
+    private final PartitionSpec dataSpec;
+    private final Map<Integer, PartitionSpec> specs;
+    private final InternalRowWrapper deletePartitionRowWrapper;
+    private final Map<Integer, StructProjection> deletePartitionProjections;
+    private final int specIdOrdinal;
+    private final int partitionOrdinal;
+    private final int fileOrdinal;
+    private final int positionOrdinal;
+
+    private boolean closed = false;
+
+    UnpartitionedDeltaWriter(Table table, SparkFileWriterFactory writerFactory,
+                             OutputFileFactory dataFileFactory, 
OutputFileFactory deleteFileFactory,
+                             Context context) {
+
+      Preconditions.checkArgument(table.spec().isUnpartitioned(), "Data spec 
must be unpartitioned");
+
+      ClusteredDataWriter<InternalRow> insertWriter = new 
ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredDataWriter<InternalRow> updateWriter = new 
ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredPositionDeleteWriter<InternalRow> deleteWriter = new 
ClusteredPositionDeleteWriter<>(
+          writerFactory, deleteFileFactory, table.io(),
+          context.deleteFileFormat(), context.targetDeleteFileSize());
+
+      this.delegate = new BasePositionDeltaWriter<>(insertWriter, 
updateWriter, deleteWriter);
+      this.io = table.io();
+      this.dataSpec = table.spec();
+      this.specs = table.specs();
+
+      Types.StructType partitionType = Partitioning.partitionType(table);
+      this.deletePartitionRowWrapper = initPartitionRowWrapper(partitionType);
+      this.deletePartitionProjections = 
buildPartitionProjections(partitionType, specs);
+
+      this.specIdOrdinal = 
context.metadataSparkType().fieldIndex(MetadataColumns.SPEC_ID.name());
+      this.partitionOrdinal = 
context.metadataSparkType().fieldIndex(MetadataColumns.PARTITION_COLUMN_NAME);
+      this.fileOrdinal = 
context.deleteSparkType().fieldIndex(MetadataColumns.FILE_PATH.name());
+      this.positionOrdinal = 
context.deleteSparkType().fieldIndex(MetadataColumns.ROW_POSITION.name());
+    }
+
+    @Override
+    public void delete(InternalRow meta, InternalRow id) throws IOException {
+      int specId = meta.getInt(specIdOrdinal);
+      PartitionSpec spec = specs.get(specId);
+
+      InternalRow partition = meta.getStruct(partitionOrdinal, 
deletePartitionRowWrapper.size());
+      StructProjection partitionProjection = 
deletePartitionProjections.get(specId);
+      partitionProjection.wrap(deletePartitionRowWrapper.wrap(partition));
+
+      String file = id.getString(fileOrdinal);
+      long position = id.getLong(positionOrdinal);
+      delegate.delete(file, position, spec, partitionProjection);
+    }
+
+    @Override
+    public void update(InternalRow meta, InternalRow id, InternalRow row) 
throws IOException {
+      delete(meta, id);
+      delegate.update(row, dataSpec, null);
+    }
+
+    @Override
+    public void insert(InternalRow row) throws IOException {
+      delegate.insert(row, dataSpec, null);
+    }
+
+    @Override
+    public WriterCommitMessage commit() throws IOException {
+      close();
+
+      WriteResult result = delegate.result();
+      return new DeltaTaskCommit(result);
+    }
+
+    @Override
+    public void abort() throws IOException {
+      close();
+
+      WriteResult result = delegate.result();
+      cleanFiles(io, Arrays.asList(result.dataFiles()));
+      cleanFiles(io, Arrays.asList(result.deleteFiles()));
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (!closed) {
+        delegate.close();
+        this.closed = true;
+      }
+    }
+  }
+
+  private static class PartitionedDeltaWriter extends BaseDeltaWriter {

Review comment:
       I combined these two classes in one and exposed a protected field in the 
parent instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to