amogh-jahagirdar commented on code in PR #7029:
URL: https://github.com/apache/iceberg/pull/7029#discussion_r1134278293
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -761,4 +818,270 @@ public void close() throws IOException {
delegate.close();
}
}
+
+ class PositionDeleteBatchWrite implements BatchWrite {
+
+ private String fileSetID;
+
+ private PositionDeleteBatchWrite(String fileSetID) {
+ this.fileSetID = fileSetID;
+ }
+
+ @Override
+ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
+ // broadcast the table metadata as the writer factory will be sent to
executors
+ Broadcast<Table> tableBroadcast =
+ sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+ return new PositionDeltaWriteFactory(
+ tableBroadcast, queryId, format, targetFileSize, writeSchema,
dsSchema);
+ }
+
+ @Override
+ public void commit(WriterCommitMessage[] messages) {
+ PositionDeletesRewriteCoordinator coordinator =
PositionDeletesRewriteCoordinator.get();
+ coordinator.stageRewrite(table, fileSetID,
ImmutableSet.copyOf(files(messages)));
+ }
+
+ @Override
+ public void abort(WriterCommitMessage[] messages) {
+ if (cleanupOnAbort) {
+ SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages));
+ } else {
+ LOG.warn("Skipping cleanup of written files");
+ }
+ }
+
+ private List<DeleteFile> files(WriterCommitMessage[] messages) {
+ List<DeleteFile> files = Lists.newArrayList();
+
+ for (WriterCommitMessage message : messages) {
+ if (message != null) {
+ DeleteTaskCommit taskCommit = (DeleteTaskCommit) message;
+ files.addAll(Arrays.asList(taskCommit.files()));
+ }
+ }
+
+ return files;
+ }
+ }
+
+ static class PositionDeltaWriteFactory implements DataWriterFactory {
+ private final Broadcast<Table> tableBroadcast;
+ private final String queryId;
+ private final FileFormat format;
+ private final Long targetFileSize;
+ private final Schema writeSchema;
+ private final StructType dsSchema;
+
+ PositionDeltaWriteFactory(
+ Broadcast<Table> tableBroadcast,
+ String queryId,
+ FileFormat format,
+ long targetFileSize,
+ Schema writeSchema,
+ StructType dsSchema) {
+ this.tableBroadcast = tableBroadcast;
+ this.queryId = queryId;
+ this.format = format;
+ this.targetFileSize = targetFileSize;
+ this.writeSchema = writeSchema;
+ this.dsSchema = dsSchema;
+ }
+
+ @Override
+ public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
+ Table table = tableBroadcast.value();
+
+ OutputFileFactory deleteFileFactory =
+ OutputFileFactory.builderFor(table, partitionId, taskId)
+ .format(format)
+ .operationId(queryId)
+ .suffix("deletes")
+ .build();
+
+ Schema positionDeleteRowSchema =
+ new Schema(
+ writeSchema
+ .findField(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME)
+ .type()
+ .asStructType()
+ .fields());
+ StructType deleteFileType =
+ new StructType(
+ new StructField[] {
+ dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()),
+ dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()),
+ dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME)
+ });
+
+ SparkFileWriterFactory writerFactoryWithRow =
+ SparkFileWriterFactory.builderFor(table)
+ .dataFileFormat(format)
+ .dataSchema(writeSchema)
+ .dataSparkType(dsSchema)
+ .deleteFileFormat(format)
+ .positionDeleteRowSchema(positionDeleteRowSchema)
+ .positionDeleteSparkType(deleteFileType)
+ .build();
+
+ SparkFileWriterFactory writerFactoryWithoutRow =
+ SparkFileWriterFactory.builderFor(table)
+ .dataFileFormat(format)
+ .dataSchema(writeSchema)
+ .dataSparkType(dsSchema)
+ .deleteFileFormat(format)
+ .positionDeleteSparkType(deleteFileType)
+ .build();
+
+ return new DeleteWriter(
+ table,
+ writerFactoryWithRow,
+ writerFactoryWithoutRow,
+ deleteFileFactory,
+ targetFileSize,
+ dsSchema);
+ }
+ }
+
+ private static class DeleteWriter implements DataWriter<InternalRow> {
+ private final ClusteredPositionDeleteWriter<InternalRow> writerWithRow;
+ private final ClusteredPositionDeleteWriter<InternalRow> writerWithoutRow;
+ private final PositionDelete<InternalRow> positionDelete;
+ private final FileIO io;
+ private final Map<Integer, PartitionSpec> specs;
+ private final InternalRowWrapper partitionRowWrapper;
+ private final Map<Integer, StructProjection> partitionProjections;
+ private final int specIdOrdinal;
+ private final Option<Integer> partitionOrdinalOption;
+ private final int fileOrdinal;
+ private final int positionOrdinal;
+ private final int rowOrdinal;
+ private final int rowSize;
+
+ private boolean closed = false;
+
+ /**
+ * Writer for position deletes metadata table.
+ *
+ * <p>Delete files need to either have 'row' as required field, or omit
'row' altogether, for
+ * delete file stats accuracy Hence, this is a fanout writer, redirecting
rows with null 'row'
+ * to one delegate, and non-null 'row' to another
Review Comment:
sorry completely missed this thread
https://github.com/apache/iceberg/pull/7029#discussion_r1127012441, which seems
to discuss this point
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]