This is an automated email from the ASF dual-hosted git repository. ashvin pushed a commit to branch 345-read-and-translate-the-deletion-vectors-in-delta-source-table-to-xtables-internal-representation in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
commit 5f4f851549fa0ac191595bbf99e297a116ddc0df Author: Ashvin Agrawal <[email protected]> AuthorDate: Tue Jan 21 21:00:09 2025 -0800 Extract Delta Lake deletion vectors This change extracts deletion vectors represented as roaring bitmaps in delta lake files and converts them into the XTable intermediate representation. Previously, XTable only detected tables changes that included adding or removing of data files. Now the detected table change also includes any deletion vectors files added in the commit. Note that, in Delta Lake, the Deletion vectors are represented in a compressed binary format. However, once extracted by Xtable, the offset are currently extracted into a list of long offsets. This representation is not the most efficient for large datasets. Optimization is pending to prioritize end-to-end conversion completion. --- .../java/org/apache/xtable/model/TableChange.java | 19 ++- .../model/storage/InternalDeletionVector.java | 63 +++++++ .../apache/xtable/delta/DeltaActionsConverter.java | 39 ++++- .../apache/xtable/delta/DeltaConversionSource.java | 23 +-- .../org/apache/xtable/TestSparkDeltaTable.java | 7 +- .../xtable/delta/ITDeltaDeleteVectorConvert.java | 187 ++++++++++++++++++--- .../xtable/delta/TestDeltaActionsConverter.java | 23 ++- 7 files changed, 318 insertions(+), 43 deletions(-) diff --git a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java index 51f0ee0b..e750916f 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java @@ -18,10 +18,15 @@ package org.apache.xtable.model; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import lombok.Builder; import lombok.Value; import org.apache.xtable.model.storage.DataFilesDiff; +import org.apache.xtable.model.storage.InternalDeletionVector; /** * Captures the changes in a single commit/instant from the source table. @@ -29,11 +34,23 @@ import org.apache.xtable.model.storage.DataFilesDiff; * @since 0.1 */ @Value -@Builder(toBuilder = true) +@Builder(toBuilder = true, builderClassName = "Builder") public class TableChange { // Change in files at the specified instant DataFilesDiff filesDiff; + // A commit can add deletion vectors when some records are deleted. New deletion vectors can be + // added even if no new data files are added. However, as deletion vectors are always associated + // with a data file, they are implicitly removed when a corresponding data file is removed. + List<InternalDeletionVector> deletionVectorsAdded; + /** The {@link InternalTable} at the commit time to which this table change belongs. */ InternalTable tableAsOfChange; + + public static class Builder { + public Builder deletionVectorsAdded(Collection<InternalDeletionVector> deletionVectorsAdded) { + this.deletionVectorsAdded = new ArrayList<>(deletionVectorsAdded); + return this; + } + } } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java new file mode 100644 index 00000000..c7ae037e --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.storage; + +import java.util.Iterator; +import java.util.function.Supplier; + +import lombok.AccessLevel; +import lombok.Builder; +import lombok.Getter; +import lombok.NonNull; +import lombok.Value; +import lombok.experimental.Accessors; + +@Builder(toBuilder = true, builderClassName = "Builder") +@Accessors(fluent = true) +@Value +public class InternalDeletionVector { + // path (absolute with scheme) of data file to which this deletion vector belongs + @NonNull String dataFilePath; + + // physical path of the deletion vector file (absolute with scheme) + String deletionVectorFilePath; + + // offset of deletion vector start in the deletion vector file + int offset; + + // length of the deletion vector in the deletion vector file + int length; + + // count of records deleted by this deletion vector + long countRecordsDeleted; + + @Getter(AccessLevel.NONE) + Supplier<Iterator<Long>> deleteRecordSupplier; + + public Iterator<Long> deleteRecordIterator() { + return deleteRecordSupplier.get(); + } + + public static class Builder { + public Builder deleteRecordSupplier(Supplier<Iterator<Long>> recordsSupplier) { + this.deleteRecordSupplier = recordsSupplier; + return this; + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java index 40b822df..38eb65c6 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java @@ -18,7 +18,9 @@ package org.apache.xtable.delta; +import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import lombok.AccessLevel; @@ -30,6 +32,9 @@ import org.apache.spark.sql.delta.Snapshot; import org.apache.spark.sql.delta.actions.AddFile; import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor; import org.apache.spark.sql.delta.actions.RemoveFile; +import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArray; +import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore; +import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore; import org.apache.xtable.exception.NotSupportedException; import org.apache.xtable.model.schema.InternalField; @@ -37,6 +42,7 @@ import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.stat.ColumnStat; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalDeletionVector; @NoArgsConstructor(access = AccessLevel.PRIVATE) public class DeltaActionsConverter { @@ -115,16 +121,41 @@ public class DeltaActionsConverter { * * @param snapshot the commit snapshot * @param addFile the add file action - * @return the deletion vector representation (path of data file), or null if no deletion vector - * is present + * @return the deletion vector representation, or null if no deletion vector is present */ - public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) { + public InternalDeletionVector extractDeletionVector(Snapshot snapshot, AddFile addFile) { DeletionVectorDescriptor deletionVector = addFile.deletionVector(); if (deletionVector == null) { return null; } String dataFilePath = addFile.path(); - return getFullPathToFile(snapshot, dataFilePath); + dataFilePath = getFullPathToFile(snapshot, dataFilePath); + Path deletionVectorFilePath = deletionVector.absolutePath(snapshot.deltaLog().dataPath()); + + // TODO assumes deletion vector file. Need to handle inlined deletion vectors + InternalDeletionVector deleteVector = + InternalDeletionVector.builder() + .dataFilePath(dataFilePath) + .deletionVectorFilePath(deletionVectorFilePath.toString()) + .countRecordsDeleted(deletionVector.cardinality()) + .offset((Integer) deletionVector.offset().get()) + .length(deletionVector.sizeInBytes()) + .deleteRecordSupplier(() -> deletedRecordsIterator(snapshot, deletionVector)) + .build(); + + return deleteVector; + } + + private Iterator<Long> deletedRecordsIterator( + Snapshot snapshot, DeletionVectorDescriptor deleteVector) { + DeletionVectorStore dvStore = + new HadoopFileSystemDVStore(snapshot.deltaLog().newDeltaHadoopConf()); + + Path deletionVectorFilePath = deleteVector.absolutePath(snapshot.deltaLog().dataPath()); + int size = deleteVector.sizeInBytes(); + int offset = deleteVector.offset().isDefined() ? (int) deleteVector.offset().get() : 1; + RoaringBitmapArray rbm = dvStore.read(deletionVectorFilePath, offset, size); + return Arrays.stream(rbm.values()).iterator(); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java index 140eb8ad..9d44401f 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java @@ -22,11 +22,9 @@ import java.sql.Timestamp; import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import lombok.Builder; import lombok.extern.log4j.Log4j2; @@ -54,6 +52,7 @@ import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalDeletionVector; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.spi.extractor.ConversionSource; import org.apache.xtable.spi.extractor.DataFileIterator; @@ -112,8 +111,8 @@ public class DeltaConversionSource implements ConversionSource<Long> { // All 3 of the following data structures use data file's absolute path as the key Map<String, InternalDataFile> addedFiles = new HashMap<>(); Map<String, InternalDataFile> removedFiles = new HashMap<>(); - // Set of data file paths for which deletion vectors exists. - Set<String> deletionVectors = new HashSet<>(); + // Map of data file paths for which deletion vectors exists. + Map<String, InternalDeletionVector> deletionVectors = new HashMap<>(); for (Action action : actionsForVersion) { if (action instanceof AddFile) { @@ -128,10 +127,10 @@ public class DeltaConversionSource implements ConversionSource<Long> { DeltaPartitionExtractor.getInstance(), DeltaStatsExtractor.getInstance()); addedFiles.put(dataFile.getPhysicalPath(), dataFile); - String deleteVectorPath = - actionsConverter.extractDeletionVectorFile(snapshotAtVersion, (AddFile) action); - if (deleteVectorPath != null) { - deletionVectors.add(deleteVectorPath); + InternalDeletionVector deletionVector = + actionsConverter.extractDeletionVector(snapshotAtVersion, (AddFile) action); + if (deletionVector != null) { + deletionVectors.put(deletionVector.dataFilePath(), deletionVector); } } else if (action instanceof RemoveFile) { InternalDataFile dataFile = @@ -150,7 +149,7 @@ public class DeltaConversionSource implements ConversionSource<Long> { // entry which is replaced by a new entry, AddFile with delete vector information. Since the // same data file is removed and added, we need to remove it from the added and removed file // maps which are used to track actual added and removed data files. - for (String deletionVector : deletionVectors) { + for (String deletionVector : deletionVectors.keySet()) { // validate that a Remove action is also added for the data file if (removedFiles.containsKey(deletionVector)) { addedFiles.remove(deletionVector); @@ -167,7 +166,11 @@ public class DeltaConversionSource implements ConversionSource<Long> { .filesAdded(addedFiles.values()) .filesRemoved(removedFiles.values()) .build(); - return TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build(); + return TableChange.builder() + .tableAsOfChange(tableAtVersion) + .deletionVectorsAdded(deletionVectors.values()) + .filesDiff(dataFilesDiff) + .build(); } @Override diff --git a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java index ee5b1ccd..909b1b79 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java @@ -39,6 +39,7 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.functions; import org.apache.spark.sql.delta.DeltaLog; +import org.apache.spark.sql.delta.actions.AddFile; import com.google.common.base.Preconditions; @@ -212,11 +213,15 @@ public class TestSparkDeltaTable implements GenericTable<Row, Object>, Closeable } public List<String> getAllActiveFiles() { - return deltaLog.snapshot().allFiles().collectAsList().stream() + return getAllActiveFilesInfo().stream() .map(addFile -> addSlashToBasePath(basePath) + addFile.path()) .collect(Collectors.toList()); } + public List<AddFile> getAllActiveFilesInfo() { + return deltaLog.snapshot().allFiles().collectAsList(); + } + private String addSlashToBasePath(String basePath) { if (basePath.endsWith("/")) { return basePath; diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java index ed02893e..c58088d5 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java @@ -19,11 +19,16 @@ package org.apache.xtable.delta; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; @@ -38,6 +43,7 @@ import org.junit.jupiter.api.io.TempDir; import org.apache.spark.sql.delta.DeltaLog; import org.apache.spark.sql.delta.actions.AddFile; +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor; import scala.Option; @@ -49,6 +55,7 @@ import org.apache.xtable.model.CommitsBacklog; import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.TableChange; +import org.apache.xtable.model.storage.InternalDeletionVector; import org.apache.xtable.model.storage.TableFormat; public class ITDeltaDeleteVectorConvert { @@ -56,6 +63,7 @@ public class ITDeltaDeleteVectorConvert { private static SparkSession sparkSession; private DeltaConversionSourceProvider conversionSourceProvider; + private TestSparkDeltaTable testSparkDeltaTable; @BeforeAll public static void setupOnce() { @@ -91,11 +99,24 @@ public class ITDeltaDeleteVectorConvert { conversionSourceProvider.init(hadoopConf); } + private static class TableState { + Map<String, AddFile> activeFiles; + List<Row> rowsToDelete; + + TableState(Map<String, AddFile> activeFiles) { + this(activeFiles, Collections.emptyList()); + } + + TableState(Map<String, AddFile> activeFiles, List<Row> rowsToDelete) { + this.activeFiles = activeFiles; + this.rowsToDelete = rowsToDelete; + } + } + @Test public void testInsertsUpsertsAndDeletes() { String tableName = GenericTable.getTableName(); - TestSparkDeltaTable testSparkDeltaTable = - new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false); + testSparkDeltaTable = new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false); // enable deletion vectors for the test table testSparkDeltaTable @@ -105,25 +126,30 @@ public class ITDeltaDeleteVectorConvert { + tableName + " SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); - List<List<String>> allActiveFiles = new ArrayList<>(); + List<TableState> testTableStates = new ArrayList<>(); List<TableChange> allTableChanges = new ArrayList<>(); List<Row> rows = testSparkDeltaTable.insertRows(50); Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp(); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + Map<String, AddFile> tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles, Collections.emptyList())); List<Row> rows1 = testSparkDeltaTable.insertRows(50); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0); assertEquals(100L, testSparkDeltaTable.getNumRows()); - validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 0, 0); // upsert does not create delete vectors testSparkDeltaTable.upsertRows(rows.subList(0, 20)); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0); assertEquals(100L, testSparkDeltaTable.getNumRows()); - validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 0, 0); testSparkDeltaTable.insertRows(50); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0); assertEquals(150L, testSparkDeltaTable.getNumRows()); // delete a few rows with gaps in ids @@ -133,12 +159,15 @@ public class ITDeltaDeleteVectorConvert { .collect(Collectors.toList()); rowsToDelete.addAll(rows.subList(35, 45)); testSparkDeltaTable.deleteRows(rowsToDelete); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles, rowsToDelete)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 15); assertEquals(135L, testSparkDeltaTable.getNumRows()); - validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 2, 15); testSparkDeltaTable.insertRows(50); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 15); assertEquals(185L, testSparkDeltaTable.getNumRows()); // delete a few rows from a file which already has a deletion vector, this should generate a @@ -146,18 +175,22 @@ public class ITDeltaDeleteVectorConvert { // This deletion step intentionally deletes the same rows again to test the merge. rowsToDelete = rows1.subList(5, 15); testSparkDeltaTable.deleteRows(rowsToDelete); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles, rowsToDelete)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 22); assertEquals(178L, testSparkDeltaTable.getNumRows()); - validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 2, 22); testSparkDeltaTable.insertRows(50); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 22); assertEquals(228L, testSparkDeltaTable.getNumRows()); + String tableBasePath = testSparkDeltaTable.getBasePath(); SourceTable tableConfig = SourceTable.builder() .name(testSparkDeltaTable.getTableName()) - .basePath(testSparkDeltaTable.getBasePath()) + .basePath(tableBasePath) .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = @@ -165,8 +198,9 @@ public class ITDeltaDeleteVectorConvert { InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot(); // validateDeltaPartitioning(internalSnapshot); - ValidationTestHelper.validateSnapshot( - internalSnapshot, allActiveFiles.get(allActiveFiles.size() - 1)); + List<String> activeDataFilePaths = + new ArrayList<>(testTableStates.get(testTableStates.size() - 1).activeFiles.keySet()); + ValidationTestHelper.validateSnapshot(internalSnapshot, activeDataFilePaths); // Get changes in incremental format. InstantsForIncrementalSync instantsForIncrementalSync = @@ -179,13 +213,124 @@ public class ITDeltaDeleteVectorConvert { TableChange tableChange = conversionSource.getTableChangeForCommit(version); allTableChanges.add(tableChange); } - ValidationTestHelper.validateTableChanges(allActiveFiles, allTableChanges); + + List<List<String>> allActiveDataFilePaths = + testTableStates.stream() + .map(s -> s.activeFiles) + .map(Map::keySet) + .map(ArrayList::new) + .collect(Collectors.toList()); + ValidationTestHelper.validateTableChanges(allActiveDataFilePaths, allTableChanges); + + validateDeletionInfo(testTableStates, allTableChanges); + } + + // collects active files in the current snapshot as a map and adds it to the list + private Map<String, AddFile> collectActiveFilesAfterCommit( + TestSparkDeltaTable testSparkDeltaTable) { + Map<String, AddFile> allFiles = + testSparkDeltaTable.getAllActiveFilesInfo().stream() + .collect( + Collectors.toMap( + file -> getAddFileAbsolutePath(file, testSparkDeltaTable.getBasePath()), + file -> file)); + return allFiles; + } + + private void validateDeletionInfo( + List<TableState> testTableStates, List<TableChange> allTableChanges) { + if (allTableChanges.isEmpty() && testTableStates.size() <= 1) { + return; + } + + assertEquals( + allTableChanges.size(), + testTableStates.size() - 1, + "Number of table changes should be equal to number of commits - 1"); + + for (int i = 0; i < allTableChanges.size() - 1; i++) { + Map<String, AddFile> activeFileAfterCommit = testTableStates.get(i + 1).activeFiles; + Map<String, AddFile> activeFileBeforeCommit = testTableStates.get(i).activeFiles; + + Map<String, AddFile> activeFilesWithUpdatedDeleteInfo = + activeFileAfterCommit.entrySet().stream() + .filter(e -> e.getValue().deletionVector() != null) + .filter( + entry -> { + if (activeFileBeforeCommit.get(entry.getKey()) == null) { + return true; + } + if (activeFileBeforeCommit.get(entry.getKey()).deletionVector() == null) { + return true; + } + DeletionVectorDescriptor deletionVectorDescriptor = + activeFileBeforeCommit.get(entry.getKey()).deletionVector(); + return !deletionVectorDescriptor.equals(entry.getValue().deletionVector()); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (activeFilesWithUpdatedDeleteInfo.isEmpty()) { + continue; + } + + // validate all new delete vectors are correctly detected + validateDeletionInfoForCommit( + testTableStates.get(i + 1), activeFilesWithUpdatedDeleteInfo, allTableChanges.get(i)); + } + } + + private void validateDeletionInfoForCommit( + TableState tableState, + Map<String, AddFile> activeFilesAfterCommit, + TableChange changeDetectedForCommit) { + Map<String, InternalDeletionVector> detectedDeleteInfos = + changeDetectedForCommit.getDeletionVectorsAdded().stream() + .collect(Collectors.toMap(InternalDeletionVector::dataFilePath, file -> file)); + + Map<String, AddFile> filesWithDeleteVectors = + activeFilesAfterCommit.entrySet().stream() + .filter(file -> file.getValue().deletionVector() != null) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + assertEquals(filesWithDeleteVectors.size(), detectedDeleteInfos.size()); + + for (Map.Entry<String, AddFile> fileWithDeleteVector : filesWithDeleteVectors.entrySet()) { + InternalDeletionVector deleteInfo = detectedDeleteInfos.get(fileWithDeleteVector.getKey()); + assertNotNull(deleteInfo); + DeletionVectorDescriptor deletionVectorDescriptor = + fileWithDeleteVector.getValue().deletionVector(); + assertEquals(deletionVectorDescriptor.cardinality(), deleteInfo.countRecordsDeleted()); + assertEquals(deletionVectorDescriptor.sizeInBytes(), deleteInfo.length()); + assertEquals(deletionVectorDescriptor.offset().get(), deleteInfo.offset()); + + String deletionFilePath = + deletionVectorDescriptor + .absolutePath(new org.apache.hadoop.fs.Path(testSparkDeltaTable.getBasePath())) + .toString(); + assertEquals(deletionFilePath, deleteInfo.deletionVectorFilePath()); + + Iterator<Long> iterator = deleteInfo.deleteRecordIterator(); + List<Long> deletes = new ArrayList<>(); + iterator.forEachRemaining(deletes::add); + assertEquals(deletes.size(), deleteInfo.countRecordsDeleted()); + } + } + + private static String getAddFileAbsolutePath(AddFile file, String tableBasePath) { + String filePath = file.path(); + if (filePath.startsWith(tableBasePath)) { + return filePath; + } + return Paths.get(tableBasePath, file.path()).toString(); } private void validateDeletedRecordCount( - DeltaLog deltaLog, int version, int deleteVectorFileCount, int deletionRecordCount) { + DeltaLog deltaLog, int deleteVectorFileCount, int deletionRecordCount) { List<AddFile> allFiles = - deltaLog.getSnapshotAt(version, Option.empty()).allFiles().collectAsList(); + deltaLog + .getSnapshotAt(deltaLog.snapshot().version(), Option.empty()) + .allFiles() + .collectAsList(); List<AddFile> filesWithDeletionVectors = allFiles.stream().filter(f -> f.deletionVector() != null).collect(Collectors.toList()); diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java index e62e9341..a2fa12de 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java @@ -18,10 +18,14 @@ package org.apache.xtable.delta; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.when; + import java.net.URISyntaxException; import org.apache.hadoop.fs.Path; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -32,6 +36,8 @@ import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor; import scala.Option; +import org.apache.xtable.model.storage.InternalDeletionVector; + class TestDeltaActionsConverter { @Test @@ -49,7 +55,9 @@ class TestDeltaActionsConverter { DeletionVectorDescriptor deletionVector = null; AddFile addFileAction = new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector); - Assertions.assertNull(actionsConverter.extractDeletionVectorFile(snapshot, addFileAction)); + InternalDeletionVector internaldeletionVector = + actionsConverter.extractDeletionVector(snapshot, addFileAction); + assertNull(internaldeletionVector); deletionVector = DeletionVectorDescriptor.onDiskWithAbsolutePath( @@ -58,10 +66,13 @@ class TestDeltaActionsConverter { addFileAction = new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector); - Mockito.when(snapshot.deltaLog()).thenReturn(deltaLog); - Mockito.when(deltaLog.dataPath()) + when(snapshot.deltaLog()).thenReturn(deltaLog); + when(deltaLog.dataPath()) .thenReturn(new Path("https://container.blob.core.windows.net/tablepath")); - Assertions.assertEquals( - filePath, actionsConverter.extractDeletionVectorFile(snapshot, addFileAction)); + internaldeletionVector = actionsConverter.extractDeletionVector(snapshot, addFileAction); + assertNotNull(internaldeletionVector); + assertEquals(filePath, internaldeletionVector.dataFilePath()); + assertEquals(42, internaldeletionVector.countRecordsDeleted()); + assertEquals(size, internaldeletionVector.length()); } }
