This is an automated email from the ASF dual-hosted git repository. ashvin pushed a commit to branch 348-deletion-vectors-iceberg-target in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
commit eb6a8c7b363d46d33bb82aa932aea18d12fe8f89 Author: Ashvin Agrawal <[email protected]> AuthorDate: Sun Jan 26 11:57:38 2025 -0800 Handle deletion vectors in Iceberg target --- pom.xml | 1 + xtable-core/pom.xml | 4 + .../apache/xtable/hudi/HudiFileStatsExtractor.java | 9 +- .../xtable/iceberg/IcebergConversionTarget.java | 27 +- .../xtable/iceberg/IcebergDataFileUpdatesSync.java | 53 +++- .../iceberg/IcebergDeleteVectorConverter.java | 94 ++++++ .../iceberg/ITIcebergDeleteVectorConvert.java | 350 +++++++++++++++++++++ .../iceberg/TestIcebergDeleteVectorConverter.java | 95 ++++++ 8 files changed, 612 insertions(+), 21 deletions(-) diff --git a/pom.xml b/pom.xml index 46641dee..8b762634 100644 --- a/pom.xml +++ b/pom.xml @@ -270,6 +270,7 @@ <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-parquet</artifactId> <version>${iceberg.version}</version> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml index f277495e..97397d3b 100644 --- a/xtable-core/pom.xml +++ b/xtable-core/pom.xml @@ -92,6 +92,10 @@ <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-api</artifactId> </dependency> + <dependency> + <groupId>org.apache.iceberg</groupId> + <artifactId>iceberg-parquet</artifactId> + </dependency> <!-- Delta dependencies --> <dependency> diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java index e47ef72e..0a11bd16 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java @@ -36,7 +36,6 @@ import lombok.AllArgsConstructor; import lombok.NonNull; import org.apache.hadoop.fs.Path; -import org.apache.parquet.io.api.Binary; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; @@ -258,10 +257,10 @@ public class HudiFileStatsExtractor { && (minValue instanceof ByteBuffer || maxValue instanceof ByteBuffer)) { minValue = minValue == null ? null : new String(((ByteBuffer) minValue).array()); maxValue = maxValue == null ? null : new String(((ByteBuffer) maxValue).array()); - } else if (field.getSchema().getDataType() == InternalType.FIXED - && (minValue instanceof Binary || maxValue instanceof Binary)) { - minValue = minValue == null ? null : ByteBuffer.wrap(((Binary) minValue).getBytes()); - maxValue = maxValue == null ? null : ByteBuffer.wrap(((Binary) maxValue).getBytes()); + // } else if (field.getSchema().getDataType() == InternalType.FIXED + // && (minValue instanceof Binary || maxValue instanceof Binary)) { + // minValue = minValue == null ? null : ByteBuffer.wrap(((Binary) minValue).getBytes()); + // maxValue = maxValue == null ? null : ByteBuffer.wrap(((Binary) maxValue).getBytes()); } boolean isScalar = minValue == null || minValue.compareTo(maxValue) == 0; Range range = isScalar ? Range.scalar(minValue) : Range.vector(minValue, maxValue); diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java index ecdbfa26..2050a2dc 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java @@ -18,6 +18,7 @@ package org.apache.xtable.iceberg; +import java.io.IOException; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.List; @@ -194,19 +195,27 @@ public class IcebergConversionTarget implements ConversionTarget { @Override public void syncFilesForSnapshot(List<PartitionFileGroup> partitionedDataFiles) { - dataFileUpdatesExtractor.applySnapshot( - table, - internalTableState, - transaction, - partitionedDataFiles, - transaction.table().schema(), - transaction.table().spec()); + try { + dataFileUpdatesExtractor.applySnapshot( + table, + internalTableState, + transaction, + partitionedDataFiles, + transaction.table().schema(), + transaction.table().spec()); + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override public void syncFilesForDiff(DataFilesDiff dataFilesDiff) { - dataFileUpdatesExtractor.applyDiff( - transaction, dataFilesDiff, transaction.table().schema(), transaction.table().spec()); + try { + dataFileUpdatesExtractor.applyDiff( + transaction, dataFilesDiff, transaction.table().schema(), transaction.table().spec()); + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java index 80e1559f..35348adf 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java @@ -18,6 +18,7 @@ package org.apache.xtable.iceberg; +import java.io.IOException; import java.util.*; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -26,6 +27,7 @@ import lombok.AllArgsConstructor; import org.apache.iceberg.*; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; import org.apache.xtable.exception.NotSupportedException; import org.apache.xtable.exception.ReadException; @@ -33,6 +35,7 @@ import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.FilesDiff; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalDeletionVector; import org.apache.xtable.model.storage.PartitionFileGroup; @AllArgsConstructor(staticName = "of") @@ -46,7 +49,8 @@ public class IcebergDataFileUpdatesSync { Transaction transaction, List<PartitionFileGroup> partitionedDataFiles, Schema schema, - PartitionSpec partitionSpec) { + PartitionSpec partitionSpec) + throws IOException { Map<String, DataFile> previousFiles = new HashMap<>(); try (CloseableIterable<FileScanTask> iterator = table.newScan().planFiles()) { @@ -67,7 +71,8 @@ public class IcebergDataFileUpdatesSync { Transaction transaction, DataFilesDiff dataFilesDiff, Schema schema, - PartitionSpec partitionSpec) { + PartitionSpec partitionSpec) + throws IOException { Collection<DataFile> filesRemoved = dataFilesDiff.getFilesRemoved().stream() @@ -82,11 +87,45 @@ public class IcebergDataFileUpdatesSync { Collection<InternalDataFile> filesAdded, Collection<DataFile> filesRemoved, Schema schema, - PartitionSpec partitionSpec) { - OverwriteFiles overwriteFiles = transaction.newOverwrite(); - filesAdded.forEach(f -> overwriteFiles.addFile(getDataFile(partitionSpec, schema, f))); - filesRemoved.forEach(overwriteFiles::deleteFile); - overwriteFiles.commit(); + PartitionSpec partitionSpec) + throws IOException { + + List<InternalDeletionVector> deletionVectors = + filesAdded.stream() + .filter(dataFile -> dataFile instanceof InternalDeletionVector) + .map(dataFile -> (InternalDeletionVector) dataFile) + .collect(Collectors.toList()); + + if (!filesRemoved.isEmpty() || filesAdded.size() > deletionVectors.size()) { + OverwriteFiles overwriteFiles = transaction.newOverwrite(); + filesAdded.stream() + .filter(dataFile -> !(dataFile instanceof InternalDeletionVector)) + .forEach( + dataFile -> overwriteFiles.addFile(getDataFile(partitionSpec, schema, dataFile))); + filesRemoved.forEach(overwriteFiles::deleteFile); + overwriteFiles.commit(); + } + + if (deletionVectors.isEmpty()) { + return; + } + RowDelta rowDeletes = transaction.newRowDelta(); + String basePath = transaction.table().location(); + IcebergDeleteVectorConverter converter = + IcebergDeleteVectorConverter.builder().directoryPath(basePath).build(); + for (InternalDeletionVector dataFile : deletionVectors) { + rowDeletes.addDeletes(getDeleteFile(transaction, dataFile, converter)); + } + rowDeletes.commit(); + } + + private DeleteFile getDeleteFile( + Transaction transaction, + InternalDeletionVector deletionVector, + IcebergDeleteVectorConverter converter) + throws IOException { + FileIO io = transaction.table().io(); + return converter.toIceberg(io, deletionVector); } private DataFile getDataFile( diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDeleteVectorConverter.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDeleteVectorConverter.java new file mode 100644 index 00000000..4fafa684 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDeleteVectorConverter.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.iceberg; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; + +import lombok.Builder; +import lombok.extern.log4j.Log4j2; + +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; + +import org.apache.xtable.model.storage.InternalDeletionVector; + +@Log4j2 +@Builder(builderClassName = "Builder") +public class IcebergDeleteVectorConverter { + // Path to the directory where the delete files generated by this converter are stored + private String directoryPath; + + public static class Builder { + public Builder directoryPath(Path directoryPath) { + return directoryPath(directoryPath.toString()); + } + + public Builder directoryPath(String directoryPath) { + this.directoryPath = directoryPath; + return this; + } + } + + /** + * Converts (@link InternalDeletionVector internal representation of positional deletes) to + * Iceberg positional delete representation and writes to a position delete file. The method + * generates a new position delete file using parquet format using delete-UUID.parquet name + * pattern in the given directory + * + * @return metadata, {@link DeleteFile}, of the generated file + */ + public DeleteFile toIceberg(FileIO fileIO, InternalDeletionVector vector) throws IOException { + // generate a name for a new positional delete file + String posDeleteFileName = "delete-" + UUID.randomUUID() + ".parquet"; + String posDeleteFilePath = Paths.get(directoryPath, posDeleteFileName).toString(); + String dataFilePath = vector.dataFilePath(); + log.info("Creating a new positional delete file: {} for {}", posDeleteFilePath, dataFilePath); + OutputFile out = fileIO.newOutputFile(posDeleteFilePath); + + PositionDeleteWriter<Record> deleteWriter = + Parquet.writeDeletes(out) + .createWriterFunc(GenericParquetWriter::buildWriter) + // .rowSchema(tableSchema) + // TODO add support for partitioned tables + .withSpec(PartitionSpec.unpartitioned()) + .buildPositionWriter(); + PositionDelete<Record> positionDelete = PositionDelete.create(); + try (PositionDeleteWriter<Record> writer = deleteWriter) { + vector + .ordinalsIterator() + .forEachRemaining( + ordinal -> writer.write(positionDelete.set(dataFilePath, ordinal, null))); + } + + // TODO optimize files by partitions + DeleteFile posDeleteFile = deleteWriter.toDeleteFile(); + log.info("Created a new positional delete file: {}", posDeleteFilePath); + return posDeleteFile; + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergDeleteVectorConvert.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergDeleteVectorConvert.java new file mode 100644 index 00000000..49357dec --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergDeleteVectorConvert.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.iceberg; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.serializer.KryoSerializer; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import org.apache.spark.sql.delta.DeltaLog; +import org.apache.spark.sql.delta.actions.AddFile; +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor; + +import scala.Option; + +import org.apache.xtable.GenericTable; +import org.apache.xtable.TestSparkDeltaTable; +import org.apache.xtable.conversion.ConversionTargetFactory; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.delta.DeltaConversionSource; +import org.apache.xtable.delta.DeltaConversionSourceProvider; +import org.apache.xtable.model.CommitsBacklog; +import org.apache.xtable.model.IncrementalTableChanges; +import org.apache.xtable.model.InstantsForIncrementalSync; +import org.apache.xtable.model.TableChange; +import org.apache.xtable.model.metadata.TableSyncMetadata; +import org.apache.xtable.model.storage.InternalDeletionVector; +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.model.sync.SyncResult; +import org.apache.xtable.spi.sync.ConversionTarget; +import org.apache.xtable.spi.sync.TableFormatSync; + +/** + * Integration test for conversion between {@link InternalDeletionVector} to and from Iceberg + * deletion vectors. Currently, the tests uses Delta table format either as the source of deletion + * vectors which are then converted to Iceberg deletion vectors, or as a consumer to validate + * conversion of deletion vectors from an Iceberg source table. + */ +public class ITIcebergDeleteVectorConvert { + @TempDir private static Path tempDir; + private static SparkSession sparkSession; + + private Configuration hadoopConf; + private DeltaConversionSourceProvider conversionSourceProvider; + private TestSparkDeltaTable testSparkDeltaTable; + + @BeforeAll + public static void setupOnce() { + sparkSession = + SparkSession.builder() + .appName("TestDeltaTable") + .master("local[4]") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog") + .config("spark.databricks.delta.retentionDurationCheck.enabled", "false") + .config("spark.databricks.delta.schema.autoMerge.enabled", "true") + .config("spark.sql.shuffle.partitions", "1") + .config("spark.default.parallelism", "1") + .config("spark.serializer", KryoSerializer.class.getName()) + .getOrCreate(); + } + + @AfterAll + public static void teardown() { + if (sparkSession != null) { + sparkSession.close(); + } + } + + @BeforeEach + void setUp() { + hadoopConf = new Configuration(); + hadoopConf.set("fs.defaultFS", "file:///"); + + conversionSourceProvider = new DeltaConversionSourceProvider(); + conversionSourceProvider.init(hadoopConf); + } + + private static class TableState { + Map<String, AddFile> activeFiles; + List<Row> rowsToDelete; + + TableState(Map<String, AddFile> activeFiles) { + this(activeFiles, Collections.emptyList()); + } + + TableState(Map<String, AddFile> activeFiles, List<Row> rowsToDelete) { + this.activeFiles = activeFiles; + this.rowsToDelete = rowsToDelete; + } + } + + /** + * Test to validate conversion of deletion vectors from Delta table to Iceberg delete vectors. The + * tests uses spark sql to insert rows and delete rows in a Delta table and then converts the + * deletion vectors to Iceberg delete vectors. The test uses incremental sync, which would result + * in equal number of commits in delta and iceberg. For validation, the test compares the records + * returned by iceberg table and by validating the presence of iceberg position delete files. + */ + @Test + public void testInsertsUpsertsAndDeletes() { + String tableName = GenericTable.getTableName(); + testSparkDeltaTable = new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false); + String tableBasePath = testSparkDeltaTable.getBasePath(); + + // enable deletion vectors for the test table + testSparkDeltaTable + .getSparkSession() + .sql( + "ALTER TABLE " + + tableName + + " SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + + List<TableChange> allTableChanges = new ArrayList<>(); + List<Row> rows = testSparkDeltaTable.insertRows(50); + assertEquals(50L, testSparkDeltaTable.getNumRows()); + + List<Row> rows1 = testSparkDeltaTable.insertRows(50); + assertEquals(100L, testSparkDeltaTable.getNumRows()); + + testSparkDeltaTable.insertRows(50); + assertEquals(150L, testSparkDeltaTable.getNumRows()); + + // delete a few rows with gaps in ids + List<Row> rowsToDelete = + rows1.subList(0, 10).stream() + .filter(row -> (row.get(0).hashCode() % 2) == 0) + .collect(Collectors.toList()); + rowsToDelete.addAll(rows.subList(35, 45)); + testSparkDeltaTable.deleteRows(rowsToDelete); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 15); + assertEquals(135L, testSparkDeltaTable.getNumRows()); + + testSparkDeltaTable.insertRows(50); + assertEquals(185L, testSparkDeltaTable.getNumRows()); + + // delete a few rows from a file which already has a deletion vector, this should generate a + // merged deletion vector file. Some rows were already deleted in the previous delete step. + // This deletion step intentionally deletes the same rows again to test the merge. + rowsToDelete = rows1.subList(5, 15); + testSparkDeltaTable.deleteRows(rowsToDelete); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 22); + assertEquals(178L, testSparkDeltaTable.getNumRows()); + + testSparkDeltaTable.insertRows(50); + assertEquals(228L, testSparkDeltaTable.getNumRows()); + + SourceTable tableConfig = + SourceTable.builder() + .name(testSparkDeltaTable.getTableName()) + .basePath(tableBasePath) + .formatName(TableFormat.DELTA) + .build(); + DeltaConversionSource conversionSource = + conversionSourceProvider.getConversionSourceInstance(tableConfig); + + // Get changes in incremental format. + InstantsForIncrementalSync instantsForIncrementalSync = + InstantsForIncrementalSync.builder() + .lastSyncInstant(Instant.now().minus(Duration.ofHours(1))) + .build(); + CommitsBacklog<Long> commitsBacklog = + conversionSource.getCommitsBacklog(instantsForIncrementalSync); + for (Long version : commitsBacklog.getCommitsToProcess()) { + TableChange tableChange = conversionSource.getTableChangeForCommit(version); + allTableChanges.add(tableChange); + } + + sparkSession.close(); + + TargetTable icebergTarget = + TargetTable.builder() + .formatName(TableFormat.ICEBERG) + .name(tableName) + .basePath(tableBasePath) + .build(); + ConversionTargetFactory targetFactory = ConversionTargetFactory.getInstance(); + ConversionTarget conversionTarget = targetFactory.createForFormat(icebergTarget, hadoopConf); + + IncrementalTableChanges incrementalTableChanges = + IncrementalTableChanges.builder().tableChanges(allTableChanges.iterator()).build(); + + Map<ConversionTarget, TableSyncMetadata> conversionTargetWithMetadata = new HashMap<>(); + conversionTargetWithMetadata.put( + conversionTarget, + TableSyncMetadata.of(Instant.now().minus(Duration.ofHours(1)), Collections.emptyList())); + + Map<String, List<SyncResult>> result = + TableFormatSync.getInstance() + .syncChanges(conversionTargetWithMetadata, incrementalTableChanges); + } + + // collects active files in the current snapshot as a map and adds it to the list + private Map<String, AddFile> collectActiveFilesAfterCommit( + TestSparkDeltaTable testSparkDeltaTable) { + Map<String, AddFile> allFiles = + testSparkDeltaTable.getAllActiveFilesInfo().stream() + .collect( + Collectors.toMap( + file -> getAddFileAbsolutePath(file, testSparkDeltaTable.getBasePath()), + file -> file)); + return allFiles; + } + + private void validateDeletionInfo( + List<TableState> testTableStates, List<TableChange> allTableChanges) { + if (allTableChanges.isEmpty() && testTableStates.size() <= 1) { + return; + } + + assertEquals( + allTableChanges.size(), + testTableStates.size() - 1, + "Number of table changes should be equal to number of commits - 1"); + + for (int i = 0; i < allTableChanges.size() - 1; i++) { + Map<String, AddFile> activeFileAfterCommit = testTableStates.get(i + 1).activeFiles; + Map<String, AddFile> activeFileBeforeCommit = testTableStates.get(i).activeFiles; + + Map<String, AddFile> activeFilesWithUpdatedDeleteInfo = + activeFileAfterCommit.entrySet().stream() + .filter(e -> e.getValue().deletionVector() != null) + .filter( + entry -> { + if (activeFileBeforeCommit.get(entry.getKey()) == null) { + return true; + } + if (activeFileBeforeCommit.get(entry.getKey()).deletionVector() == null) { + return true; + } + DeletionVectorDescriptor deletionVectorDescriptor = + activeFileBeforeCommit.get(entry.getKey()).deletionVector(); + return !deletionVectorDescriptor.equals(entry.getValue().deletionVector()); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (activeFilesWithUpdatedDeleteInfo.isEmpty()) { + continue; + } + + // validate all new delete vectors are correctly detected + validateDeletionInfoForCommit( + testTableStates.get(i + 1), activeFilesWithUpdatedDeleteInfo, allTableChanges.get(i)); + } + } + + private void validateDeletionInfoForCommit( + TableState tableState, + Map<String, AddFile> activeFilesAfterCommit, + TableChange changeDetectedForCommit) { + Map<String, InternalDeletionVector> detectedDeleteInfos = + changeDetectedForCommit.getFilesDiff().getFilesAdded().stream() + .filter(file -> file instanceof InternalDeletionVector) + .map(file -> (InternalDeletionVector) file) + .collect(Collectors.toMap(InternalDeletionVector::dataFilePath, file -> file)); + + Map<String, AddFile> filesWithDeleteVectors = + activeFilesAfterCommit.entrySet().stream() + .filter(file -> file.getValue().deletionVector() != null) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + assertEquals(filesWithDeleteVectors.size(), detectedDeleteInfos.size()); + + for (Map.Entry<String, AddFile> fileWithDeleteVector : filesWithDeleteVectors.entrySet()) { + InternalDeletionVector deleteInfo = detectedDeleteInfos.get(fileWithDeleteVector.getKey()); + assertNotNull(deleteInfo); + DeletionVectorDescriptor deletionVectorDescriptor = + fileWithDeleteVector.getValue().deletionVector(); + assertEquals(deletionVectorDescriptor.cardinality(), deleteInfo.getRecordCount()); + assertEquals(deletionVectorDescriptor.sizeInBytes(), deleteInfo.getFileSizeBytes()); + assertEquals(deletionVectorDescriptor.offset().get(), deleteInfo.offset()); + + String deletionFilePath = + deletionVectorDescriptor + .absolutePath(new org.apache.hadoop.fs.Path(testSparkDeltaTable.getBasePath())) + .toString(); + assertEquals(deletionFilePath, deleteInfo.getPhysicalPath()); + + Iterator<Long> iterator = deleteInfo.ordinalsIterator(); + List<Long> deletes = new ArrayList<>(); + iterator.forEachRemaining(deletes::add); + assertEquals(deletes.size(), deleteInfo.getRecordCount()); + } + } + + private static String getAddFileAbsolutePath(AddFile file, String tableBasePath) { + String filePath = file.path(); + if (filePath.startsWith(tableBasePath)) { + return filePath; + } + return Paths.get(tableBasePath, file.path()).toString(); + } + + private void validateDeletedRecordCount( + DeltaLog deltaLog, int deleteVectorFileCount, int deletionRecordCount) { + List<AddFile> allFiles = + deltaLog + .getSnapshotAt(deltaLog.snapshot().version(), Option.empty()) + .allFiles() + .collectAsList(); + List<AddFile> filesWithDeletionVectors = + allFiles.stream().filter(f -> f.deletionVector() != null).collect(Collectors.toList()); + + assertEquals(deleteVectorFileCount, filesWithDeletionVectors.size()); + assertEquals( + deletionRecordCount, + filesWithDeletionVectors.stream() + .collect(Collectors.summarizingLong(AddFile::numDeletedRecords)) + .getSum()); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDeleteVectorConverter.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDeleteVectorConverter.java new file mode 100644 index 00000000..bf532d05 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDeleteVectorConverter.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.iceberg; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.file.Files; +import java.nio.file.LinkOption; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +import org.apache.xtable.model.storage.InternalDeletionVector; + +public class TestIcebergDeleteVectorConverter { + @TempDir private Path tempDirPath; + + @Test + public void x() throws Exception { + Path tablePath = Paths.get(tempDirPath.toString(), "table"); + Path dataFilePath = Paths.get(tablePath.toString(), "part=1", "data_1.parquet"); + + List<Long> ordinals = Arrays.asList(1L, 10L, 12L, 13L, 20L); + + InternalDeletionVector vector = + InternalDeletionVector.builder() + .physicalPath("") + .dataFilePath(dataFilePath.toString()) + .ordinalsSupplier(ordinals::iterator) + .build(); + + IcebergDeleteVectorConverter converter = + IcebergDeleteVectorConverter.builder().directoryPath(tempDirPath).build(); + + HadoopFileIO fileIO = new HadoopFileIO(new Configuration()); + DeleteFile deleteFile = converter.toIceberg(fileIO, vector); + assertNotNull(deleteFile); + assertNotNull(deleteFile.path()); + assertTrue(Files.exists(Paths.get(deleteFile.path().toString()), LinkOption.NOFOLLOW_LINKS)); + + // assert delete file exists + CharSequence deleteFilePath = deleteFile.path(); + Schema deleteSchema = DeleteSchemaUtil.posDeleteSchema(null); + + InputFile deleteInputFile = org.apache.iceberg.Files.localInput(deleteFilePath.toString()); + try (CloseableIterable<Record> reader = + Parquet.read(deleteInputFile) + .project(deleteSchema) + .createReaderFunc(schema -> GenericParquetReaders.buildReader(deleteSchema, schema)) + .build()) { + ArrayList<Record> deletedRecords = Lists.newArrayList(reader); + for (int i = 0; i < ordinals.size(); i++) { + Record record = deletedRecords.get(i); + assertEquals(2, record.size()); + assertEquals(record.get(0), dataFilePath.toString()); + assertEquals(record.get(1), ordinals.get(i)); + } + } + } +}
