szehon-ho commented on code in PR #7029: URL: https://github.com/apache/iceberg/pull/7029#discussion_r1147886663
########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java: ########## @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.DeleteWriteResult; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +/** + * {@link Write} class for rewriting position delete files from Spark. Responsible for creating + * {@link PositionDeleteBatchWrite} + * + * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ +public class SparkPositionDeletesRewrite implements Write { + private static final Logger LOG = LoggerFactory.getLogger(SparkPositionDeletesRewrite.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final String queryId; + private final FileFormat format; + private final long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final String fileSetId; + private final int specId; + + /** + * Constructs a SparkPositionDeletesWrite. + * + * @param spark spark session + * @param table instance of {@link PositionDeletesTable} + * @param writeConf spark write config + * @param writeInfo spark write info + * @param writeSchema Iceberg output schema + * @param dsSchema schema of original incoming position deletes dataset + */ + SparkPositionDeletesRewrite( + SparkSession spark, + Table table, + SparkWriteConf writeConf, + LogicalWriteInfo writeInfo, + Schema writeSchema, + StructType dsSchema) { + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.table = table; + this.queryId = writeInfo.queryId(); + this.format = writeConf.dataFileFormat(); Review Comment: Good catch, done ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java: ########## @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.DeleteWriteResult; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +/** + * {@link Write} class for rewriting position delete files from Spark. Responsible for creating + * {@link PositionDeleteBatchWrite} + * + * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ +public class SparkPositionDeletesRewrite implements Write { + private static final Logger LOG = LoggerFactory.getLogger(SparkPositionDeletesRewrite.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final String queryId; + private final FileFormat format; + private final long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final String fileSetId; + private final int specId; + + /** + * Constructs a SparkPositionDeletesWrite. + * + * @param spark spark session + * @param table instance of {@link PositionDeletesTable} + * @param writeConf spark write config + * @param writeInfo spark write info + * @param writeSchema Iceberg output schema + * @param dsSchema schema of original incoming position deletes dataset + */ + SparkPositionDeletesRewrite( + SparkSession spark, + Table table, + SparkWriteConf writeConf, + LogicalWriteInfo writeInfo, + Schema writeSchema, + StructType dsSchema) { + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.table = table; + this.queryId = writeInfo.queryId(); + this.format = writeConf.dataFileFormat(); + this.targetFileSize = writeConf.targetDataFileSize(); + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.fileSetId = writeConf.rewrittenFileSetId(); + + // all files of rewrite group have same spec id + ScanTaskSetManager scanTaskSetManager = ScanTaskSetManager.get(); + List<PositionDeletesScanTask> scanTasks = scanTaskSetManager.fetchTasks(table, fileSetId); + this.specId = scanTasks.get(0).spec().specId(); + } + + @Override + public BatchWrite toBatch() { + return new PositionDeleteBatchWrite(); + } + + /** {@link BatchWrite} class for rewriting position deletes files from Spark */ + class PositionDeleteBatchWrite implements BatchWrite { + + @Override + public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { + // broadcast the table metadata as the writer factory will be sent to executors + Broadcast<Table> tableBroadcast = + sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); + return new PositionDeltaWriteFactory( + tableBroadcast, queryId, format, targetFileSize, writeSchema, dsSchema, specId); + } + + @Override + public void commit(WriterCommitMessage[] messages) { + PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get(); + coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages))); + } + + @Override + public void abort(WriterCommitMessage[] messages) { + SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages)); + } + + private List<DeleteFile> files(WriterCommitMessage[] messages) { + List<DeleteFile> files = Lists.newArrayList(); + + for (WriterCommitMessage message : messages) { + if (message != null) { + DeleteTaskCommit taskCommit = (DeleteTaskCommit) message; + files.addAll(Arrays.asList(taskCommit.files())); + } + } + + return files; + } + } + + /** + * Write factory for position deletes metadata table. Responsible for creating {@link + * DeleteWriter}. + * + * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ + static class PositionDeltaWriteFactory implements DataWriterFactory { + private final Broadcast<Table> tableBroadcast; + private final String queryId; + private final FileFormat format; + private final Long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final int specId; + + PositionDeltaWriteFactory( + Broadcast<Table> tableBroadcast, + String queryId, + FileFormat format, + long targetFileSize, + Schema writeSchema, + StructType dsSchema, + int specId) { + this.tableBroadcast = tableBroadcast; + this.queryId = queryId; + this.format = format; + this.targetFileSize = targetFileSize; + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.specId = specId; + } + + @Override + public DataWriter<InternalRow> createWriter(int partitionId, long taskId) { + Table table = tableBroadcast.value(); + + OutputFileFactory deleteFileFactory = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(format) + .operationId(queryId) + .suffix("deletes") + .build(); + + Schema positionDeleteRowSchema = + new Schema( + writeSchema + .findField(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME) + .type() + .asStructType() + .fields()); + StructType deleteFileType = + new StructType( + new StructField[] { + dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME) + }); + + SparkFileWriterFactory writerFactoryWithRow = + SparkFileWriterFactory.builderFor(table) + .dataFileFormat(format) + .dataSchema(writeSchema) + .dataSparkType(dsSchema) + .deleteFileFormat(format) + .positionDeleteRowSchema(positionDeleteRowSchema) + .positionDeleteSparkType(deleteFileType) + .build(); + + SparkFileWriterFactory writerFactoryWithoutRow = + SparkFileWriterFactory.builderFor(table) + .dataFileFormat(format) + .dataSchema(writeSchema) + .dataSparkType(dsSchema) + .deleteFileFormat(format) + .positionDeleteSparkType(deleteFileType) + .build(); + + return new DeleteWriter( + table, + writerFactoryWithRow, + writerFactoryWithoutRow, + deleteFileFactory, + targetFileSize, + dsSchema, + specId); + } + } + + /** + * Writer for position deletes metadata table. + * + * <p>Iceberg specifies delete files schema as having either 'row' as an required field, or omits + * 'row' altogether. This is to ensure accuracy of delete file statistics on 'row' column. Hence, + * this writer, if receiving source position deletes with null and non-null rows, redirects rows + * with null 'row' to one file writer, and non-null 'row' to another file writer. + * + * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition. + */ + private static class DeleteWriter implements DataWriter<InternalRow> { + private final SparkFileWriterFactory writerFactoryWithRow; + private final SparkFileWriterFactory writerFactoryWithoutRow; + private final OutputFileFactory deleteFileFactory; + private final long targetFileSize; + private final PositionDelete<InternalRow> positionDelete; + private final FileIO io; + private final Map<Integer, PartitionSpec> specs; + private final InternalRowWrapper partitionRowWrapper; + private final StructProjection partitionProjection; + private final int specIdOrdinal; + private final Option<Integer> partitionOrdinalOption; + private final int fileOrdinal; + private final int positionOrdinal; + private final int rowOrdinal; + private final int rowSize; + + private ClusteredPositionDeleteWriter<InternalRow> writerWithRow; + private ClusteredPositionDeleteWriter<InternalRow> writerWithoutRow; + private boolean closed = false; + + /** + * Constructs a DeleteWriter + * + * @param table position deletes metadata table + * @param writerFactoryWithRow writer factory for deletes with non-null 'row' + * @param writerFactoryWithoutRow writer factory for deletes with null 'row' + * @param deleteFileFactory delete file factory + * @param targetFileSize target file size + * @param dsSchema schema of incoming dataset of position deletes + * @param specId partition spec id of incoming position deletes. All files of this row-group are + * required to have one spec id. + */ + DeleteWriter( + Table table, + SparkFileWriterFactory writerFactoryWithRow, + SparkFileWriterFactory writerFactoryWithoutRow, + OutputFileFactory deleteFileFactory, + long targetFileSize, + StructType dsSchema, + int specId) { + this.deleteFileFactory = deleteFileFactory; + this.targetFileSize = targetFileSize; + this.writerFactoryWithRow = writerFactoryWithRow; + this.writerFactoryWithoutRow = writerFactoryWithoutRow; + this.positionDelete = PositionDelete.create(); + this.io = table.io(); + this.specs = table.specs(); + + Types.StructType partitionType = Partitioning.partitionType(table); + + this.specIdOrdinal = dsSchema.fieldIndex(PositionDeletesTable.SPEC_ID); + this.partitionOrdinalOption = + dsSchema.getFieldIndex(PositionDeletesTable.PARTITION).map(a -> (Integer) a); + this.partitionRowWrapper = initPartitionRowWrapper(partitionType); + this.partitionProjection = + StructProjection.create(partitionType, table.specs().get(specId).partitionType()); + + this.fileOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_PATH.name()); + this.positionOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_POS.name()); + + this.rowOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME); + DataType type = dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME).dataType(); + Preconditions.checkArgument( + type instanceof StructType, "Expected row as struct type but was %s", type); + this.rowSize = ((StructType) type).size(); + } + + @Override + public void write(InternalRow record) throws IOException { + int specId = record.getInt(specIdOrdinal); + PartitionSpec spec = specs.get(specId); + + InternalRow partition = null; + if (partitionOrdinalOption.isDefined()) { + int partitionOrdinal = partitionOrdinalOption.get(); + partition = record.getStruct(partitionOrdinal, partitionRowWrapper.size()); + } + partitionProjection.wrap(partitionRowWrapper.wrap(partition)); + + String file = record.getString(fileOrdinal); + long position = record.getLong(positionOrdinal); + InternalRow row = record.getStruct(rowOrdinal, rowSize); + if (row != null) { + positionDelete.set(file, position, row); + lazyWriterWithRow().write(positionDelete, spec, partitionProjection); + } else { + positionDelete.set(file, position, null); + lazyWriterWithoutRow().write(positionDelete, spec, partitionProjection); + } + } + + @Override + public WriterCommitMessage commit() throws IOException { + close(); + + List<DeleteFile> allDeleteFiles = Lists.newArrayList(); + if (writerWithRow != null) { + allDeleteFiles.addAll(writerWithRow.result().deleteFiles()); + } + if (writerWithoutRow != null) { + allDeleteFiles.addAll(writerWithoutRow.result().deleteFiles()); + } + return new DeleteTaskCommit(allDeleteFiles); + } + + @Override + public void abort() throws IOException { + close(); + + DeleteWriteResult resultWithRow = writerWithRow.result(); Review Comment: Thanks, fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
