szehon-ho commented on code in PR #7029:
URL: https://github.com/apache/iceberg/pull/7029#discussion_r1139546119
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -767,4 +824,270 @@ public void close() throws IOException {
delegate.close();
}
}
+
+ class PositionDeleteBatchWrite implements BatchWrite {
+
+ private String fileSetID;
+
+ private PositionDeleteBatchWrite(String fileSetID) {
+ this.fileSetID = fileSetID;
+ }
+
+ @Override
+ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
+ // broadcast the table metadata as the writer factory will be sent to
executors
+ Broadcast<Table> tableBroadcast =
+ sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+ return new PositionDeltaWriteFactory(
+ tableBroadcast, queryId, format, targetFileSize, writeSchema,
dsSchema);
+ }
+
+ @Override
+ public void commit(WriterCommitMessage[] messages) {
+ PositionDeletesRewriteCoordinator coordinator =
PositionDeletesRewriteCoordinator.get();
+ coordinator.stageRewrite(table, fileSetID,
ImmutableSet.copyOf(files(messages)));
+ }
+
+ @Override
+ public void abort(WriterCommitMessage[] messages) {
+ if (cleanupOnAbort) {
+ SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages));
+ } else {
+ LOG.warn("Skipping cleanup of written files");
+ }
+ }
+
+ private List<DeleteFile> files(WriterCommitMessage[] messages) {
+ List<DeleteFile> files = Lists.newArrayList();
+
+ for (WriterCommitMessage message : messages) {
+ if (message != null) {
+ DeleteTaskCommit taskCommit = (DeleteTaskCommit) message;
+ files.addAll(Arrays.asList(taskCommit.files()));
+ }
+ }
+
+ return files;
+ }
+ }
+
+ static class PositionDeltaWriteFactory implements DataWriterFactory {
+ private final Broadcast<Table> tableBroadcast;
+ private final String queryId;
+ private final FileFormat format;
+ private final Long targetFileSize;
+ private final Schema writeSchema;
+ private final StructType dsSchema;
+
+ PositionDeltaWriteFactory(
+ Broadcast<Table> tableBroadcast,
+ String queryId,
+ FileFormat format,
+ long targetFileSize,
+ Schema writeSchema,
+ StructType dsSchema) {
+ this.tableBroadcast = tableBroadcast;
+ this.queryId = queryId;
+ this.format = format;
+ this.targetFileSize = targetFileSize;
+ this.writeSchema = writeSchema;
+ this.dsSchema = dsSchema;
+ }
+
+ @Override
+ public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
+ Table table = tableBroadcast.value();
+
+ OutputFileFactory deleteFileFactory =
+ OutputFileFactory.builderFor(table, partitionId, taskId)
+ .format(format)
+ .operationId(queryId)
+ .suffix("deletes")
+ .build();
+
+ Schema positionDeleteRowSchema =
+ new Schema(
+ writeSchema
+ .findField(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME)
+ .type()
+ .asStructType()
+ .fields());
+ StructType deleteFileType =
+ new StructType(
+ new StructField[] {
+ dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()),
+ dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()),
+ dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME)
+ });
+
+ SparkFileWriterFactory writerFactoryWithRow =
+ SparkFileWriterFactory.builderFor(table)
+ .dataFileFormat(format)
+ .dataSchema(writeSchema)
+ .dataSparkType(dsSchema)
+ .deleteFileFormat(format)
+ .positionDeleteRowSchema(positionDeleteRowSchema)
+ .positionDeleteSparkType(deleteFileType)
+ .build();
+
+ SparkFileWriterFactory writerFactoryWithoutRow =
+ SparkFileWriterFactory.builderFor(table)
+ .dataFileFormat(format)
+ .dataSchema(writeSchema)
+ .dataSparkType(dsSchema)
+ .deleteFileFormat(format)
+ .positionDeleteSparkType(deleteFileType)
+ .build();
+
+ return new DeleteWriter(
+ table,
+ writerFactoryWithRow,
+ writerFactoryWithoutRow,
+ deleteFileFactory,
+ targetFileSize,
+ dsSchema);
+ }
+ }
+
+ private static class DeleteWriter implements DataWriter<InternalRow> {
+ private final ClusteredPositionDeleteWriter<InternalRow> writerWithRow;
+ private final ClusteredPositionDeleteWriter<InternalRow> writerWithoutRow;
+ private final PositionDelete<InternalRow> positionDelete;
+ private final FileIO io;
+ private final Map<Integer, PartitionSpec> specs;
+ private final InternalRowWrapper partitionRowWrapper;
+ private final Map<Integer, StructProjection> partitionProjections;
+ private final int specIdOrdinal;
+ private final Option<Integer> partitionOrdinalOption;
+ private final int fileOrdinal;
+ private final int positionOrdinal;
+ private final int rowOrdinal;
+ private final int rowSize;
+
+ private boolean closed = false;
+
+ /**
+ * Writer for position deletes metadata table.
+ *
+ * <p>Delete files need to either have 'row' as required field, or omit
'row' altogether, for
+ * delete file stats accuracy Hence, this is a fanout writer, redirecting
rows with null 'row'
+ * to one delegate, and non-null 'row' to another
+ *
+ * @param table position deletes metadata table
+ * @param writerFactoryWithRow writer factory for deletes with non-null
'row'
+ * @param writerFactoryWithoutRow writer factory for deletes with null
'row'
+ * @param deleteFileFactory delete file factory
+ * @param targetFileSize target file size
+ * @param dsSchema schema of incoming dataset
+ */
+ DeleteWriter(
+ Table table,
+ SparkFileWriterFactory writerFactoryWithRow,
+ SparkFileWriterFactory writerFactoryWithoutRow,
+ OutputFileFactory deleteFileFactory,
+ long targetFileSize,
+ StructType dsSchema) {
+ this.writerWithRow =
+ new ClusteredPositionDeleteWriter<>(
+ writerFactoryWithRow, deleteFileFactory, table.io(),
targetFileSize);
+ this.writerWithoutRow =
+ new ClusteredPositionDeleteWriter<>(
+ writerFactoryWithoutRow, deleteFileFactory, table.io(),
targetFileSize);
+ this.positionDelete = PositionDelete.create();
+ this.io = table.io();
+ this.specs = table.specs();
+
+ Types.StructType partitionType = Partitioning.partitionType(table);
+
+ this.specIdOrdinal = dsSchema.fieldIndex(PositionDeletesTable.SPEC_ID);
Review Comment:
I ended up propagating via the ScanTaskSetManager.
I could go either way, but doing this way does seem a little bit hacky,
versus before the code was more useful in more cases.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -601,6 +622,42 @@ DataFile[] files() {
}
}
+ public static class DeleteTaskCommit implements WriterCommitMessage {
+ private final DeleteFile[] taskFiles;
+ private final CharSequence[] referencedDataFiles;
+
+ DeleteTaskCommit(List<DeleteFile> deleteFiles, List<CharSequence>
referencedDataFiles) {
+ this.taskFiles = deleteFiles.toArray(new DeleteFile[0]);
+ this.referencedDataFiles = referencedDataFiles.toArray(new
CharSequence[0]);
+ }
+
+ // Reports bytesWritten and recordsWritten to the Spark output metrics.
+ // Can only be called in executor.
+ void reportOutputMetrics() {
+ long bytesWritten = 0L;
+ long recordsWritten = 0L;
+ for (DeleteFile dataFile : taskFiles) {
+ bytesWritten += dataFile.fileSizeInBytes();
+ recordsWritten += dataFile.recordCount();
+ }
+
+ TaskContext taskContext = TaskContext$.MODULE$.get();
+ if (taskContext != null) {
+ OutputMetrics outputMetrics =
taskContext.taskMetrics().outputMetrics();
+ outputMetrics.setBytesWritten(bytesWritten);
+ outputMetrics.setRecordsWritten(recordsWritten);
+ }
+ }
+
+ DeleteFile[] files() {
+ return taskFiles;
+ }
+
+ CharSequence[] referencedDataFiles() {
Review Comment:
Removed
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -160,6 +177,10 @@ BatchWrite asRewrite(String fileSetID) {
return new RewriteFiles(fileSetID);
}
+ BatchWrite asPositionDeletesRewrite(String fileSetId) {
Review Comment:
Refactored out
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]