This is an automated email from the ASF dual-hosted git repository.

ashvin pushed a commit to branch 595-detect-delete-vectors-files
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to 
refs/heads/595-detect-delete-vectors-files by this push:
     new b190e666 Fix commit log parsing of Delta tables with delete vector
b190e666 is described below

commit b190e666a2b2e0cf165fb1a248baad6002e5b906
Author: Ashvin Agrawal <[email protected]>
AuthorDate: Sun Dec 8 21:00:09 2024 -0800

    Fix commit log parsing of Delta tables with delete vector
    
    - Add support for `tightBounds` property in Delta stats representation
    - Correct handling of delete vectors to avoid adding data file paths to 
both new and removed file sets in `FileDiff`
---
 .../apache/xtable/delta/DeltaActionsConverter.java | 21 +++++++
 .../apache/xtable/delta/DeltaConversionSource.java | 49 +++++++++++++---
 .../apache/xtable/delta/DeltaStatsExtractor.java   |  1 +
 .../xtable/delta/ITDeltaDeleteVectorConvert.java   | 45 +++++++++++----
 .../xtable/delta/TestDeltaActionsConverter.java    | 66 ++++++++++++++++++++++
 5 files changed, 163 insertions(+), 19 deletions(-)

diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
index fbee89f4..40b822df 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
 
 import org.apache.spark.sql.delta.Snapshot;
 import org.apache.spark.sql.delta.actions.AddFile;
+import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
 import org.apache.spark.sql.delta.actions.RemoveFile;
 
 import org.apache.xtable.exception.NotSupportedException;
@@ -106,4 +107,24 @@ public class DeltaActionsConverter {
     }
     return tableBasePath + Path.SEPARATOR + dataFilePath;
   }
+
+  /**
+   * Extracts the representation of the deletion vector information 
corresponding to an AddFile
+   * action. Currently, this method extracts and returns the path to the data 
file for which a
+   * deletion vector data is present.
+   *
+   * @param snapshot the commit snapshot
+   * @param addFile the add file action
+   * @return the deletion vector representation (path of data file), or null 
if no deletion vector
+   *     is present
+   */
+  public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) {
+    DeletionVectorDescriptor deletionVector = addFile.deletionVector();
+    if (deletionVector == null) {
+      return null;
+    }
+
+    String dataFilePath = addFile.path();
+    return getFullPathToFile(snapshot, dataFilePath);
+  }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
index 19ecc02c..ef9c47d0 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
@@ -21,8 +21,10 @@ package org.apache.xtable.delta;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
@@ -99,11 +101,16 @@ public class DeltaConversionSource implements 
ConversionSource<Long> {
     Snapshot snapshotAtVersion = deltaLog.getSnapshotAt(versionNumber, 
Option.empty());
     FileFormat fileFormat =
         
actionsConverter.convertToFileFormat(snapshotAtVersion.metadata().format().provider());
-    Set<InternalDataFile> addedFiles = new HashSet<>();
-    Set<InternalDataFile> removedFiles = new HashSet<>();
+
+    // All 3 of the following maps use data file path as the key
+    Map<String, InternalDataFile> addedFiles = new HashMap<>();
+    Map<String, InternalDataFile> removedFiles = new HashMap<>();
+    // Set of data file paths for which deletion vectors exists.
+    Set<String> deletionVectors = new HashSet<>();
+
     for (Action action : actionsForVersion) {
       if (action instanceof AddFile) {
-        addedFiles.add(
+        InternalDataFile dataFile =
             actionsConverter.convertAddActionToInternalDataFile(
                 (AddFile) action,
                 snapshotAtVersion,
@@ -112,19 +119,47 @@ public class DeltaConversionSource implements 
ConversionSource<Long> {
                 tableAtVersion.getReadSchema().getFields(),
                 true,
                 DeltaPartitionExtractor.getInstance(),
-                DeltaStatsExtractor.getInstance()));
+                DeltaStatsExtractor.getInstance());
+        addedFiles.put(dataFile.getPhysicalPath(), dataFile);
+        String deleteVectorPath =
+            actionsConverter.extractDeletionVectorFile(snapshotAtVersion, 
(AddFile) action);
+        if (deleteVectorPath != null) {
+          deletionVectors.add(deleteVectorPath);
+        }
       } else if (action instanceof RemoveFile) {
-        removedFiles.add(
+        InternalDataFile dataFile =
             actionsConverter.convertRemoveActionToInternalDataFile(
                 (RemoveFile) action,
                 snapshotAtVersion,
                 fileFormat,
                 tableAtVersion.getPartitioningFields(),
-                DeltaPartitionExtractor.getInstance()));
+                DeltaPartitionExtractor.getInstance());
+        removedFiles.put(dataFile.getPhysicalPath(), dataFile);
+      }
+    }
+
+    // In Delta Lake if delete vector information is added for an existing 
data file, as a result of
+    // a delete operation, then a new RemoveFile action is added to the commit 
log to remove the old
+    // entry which is replaced by a new entry, AddFile with delete vector 
information. Since the
+    // same data file is removed and added, we need to remove it from the 
added and removed file
+    // maps which are used to track actual added and removed data files.
+    for (String deletionVector : deletionVectors) {
+      // validate that a Remove action is also added for the data file
+      if (removedFiles.containsKey(deletionVector)) {
+        addedFiles.remove(deletionVector);
+        removedFiles.remove(deletionVector);
+      } else {
+        log.warn(
+            "No Remove action found for the data file for which deletion 
vector is added {}. This is unexpected.",
+            deletionVector);
       }
     }
+
     DataFilesDiff dataFilesDiff =
-        
DataFilesDiff.builder().filesAdded(addedFiles).filesRemoved(removedFiles).build();
+        DataFilesDiff.builder()
+            .filesAdded(addedFiles.values())
+            .filesRemoved(removedFiles.values())
+            .build();
     return 
TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build();
   }
 
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
index a6f74cee..d5d919b6 100644
--- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
+++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
@@ -246,6 +246,7 @@ public class DeltaStatsExtractor {
     Map<String, Object> minValues;
     Map<String, Object> maxValues;
     Map<String, Object> nullCount;
+    boolean tightBounds;
   }
 
   @Value
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
 
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
index d1d33bf8..ed02893e 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
@@ -21,6 +21,7 @@ package org.apache.xtable.delta;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.nio.file.Path;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -42,7 +43,13 @@ import scala.Option;
 
 import org.apache.xtable.GenericTable;
 import org.apache.xtable.TestSparkDeltaTable;
+import org.apache.xtable.ValidationTestHelper;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.model.CommitsBacklog;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.InternalSnapshot;
 import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.storage.TableFormat;
 
 public class ITDeltaDeleteVectorConvert {
   @TempDir private static Path tempDir;
@@ -147,18 +154,32 @@ public class ITDeltaDeleteVectorConvert {
     allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
     assertEquals(228L, testSparkDeltaTable.getNumRows());
 
-    // TODO conversion fails if delete vectors are enabled, this is because of 
missing handlers for
-    // deletion files.
-    // TODO pending for another PR
-    //    SourceTable tableConfig =
-    //        SourceTable.builder()
-    //            .name(testSparkDeltaTable.getTableName())
-    //            .basePath(testSparkDeltaTable.getBasePath())
-    //            .formatName(TableFormat.DELTA)
-    //            .build();
-    //    DeltaConversionSource conversionSource =
-    //        
conversionSourceProvider.getConversionSourceInstance(tableConfig);
-    //    InternalSnapshot internalSnapshot = 
conversionSource.getCurrentSnapshot();
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(testSparkDeltaTable.getTableName())
+            .basePath(testSparkDeltaTable.getBasePath())
+            .formatName(TableFormat.DELTA)
+            .build();
+    DeltaConversionSource conversionSource =
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
+    InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
+
+    //    validateDeltaPartitioning(internalSnapshot);
+    ValidationTestHelper.validateSnapshot(
+        internalSnapshot, allActiveFiles.get(allActiveFiles.size() - 1));
+
+    // Get changes in incremental format.
+    InstantsForIncrementalSync instantsForIncrementalSync =
+        InstantsForIncrementalSync.builder()
+            .lastSyncInstant(Instant.ofEpochMilli(timestamp1))
+            .build();
+    CommitsBacklog<Long> commitsBacklog =
+        conversionSource.getCommitsBacklog(instantsForIncrementalSync);
+    for (Long version : commitsBacklog.getCommitsToProcess()) {
+      TableChange tableChange = 
conversionSource.getTableChangeForCommit(version);
+      allTableChanges.add(tableChange);
+    }
+    ValidationTestHelper.validateTableChanges(allActiveFiles, allTableChanges);
   }
 
   private void validateDeletedRecordCount(
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
new file mode 100644
index 00000000..c8e34b77
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.delta;
+
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import org.apache.spark.sql.delta.DeltaLog;
+import org.apache.spark.sql.delta.Snapshot;
+import org.apache.spark.sql.delta.actions.AddFile;
+import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
+
+import scala.Option;
+
+class TestDeltaActionsConverter {
+
+  @Test
+  void extractDeletionVector() throws URISyntaxException {
+    DeltaActionsConverter actionsConverter = 
DeltaActionsConverter.getInstance();
+
+    int size = 123;
+    long time = 234L;
+    boolean dataChange = true;
+    String stats = "";
+    String filePath = "file:///file_path";
+    Snapshot snapshot = Mockito.mock(Snapshot.class);
+    DeltaLog deltaLog = Mockito.mock(DeltaLog.class);
+
+    DeletionVectorDescriptor deletionVector = null;
+    AddFile addFileAction =
+        new AddFile(filePath, null, size, time, dataChange, stats, null, 
deletionVector);
+    Assertions.assertNull(actionsConverter.extractDeletionVectorFile(snapshot, 
addFileAction));
+
+    deletionVector =
+        DeletionVectorDescriptor.onDiskWithAbsolutePath(
+            filePath, size, 42, Option.empty(), Option.empty());
+
+    addFileAction =
+        new AddFile(filePath, null, size, time, dataChange, stats, null, 
deletionVector);
+
+    Mockito.when(snapshot.deltaLog()).thenReturn(deltaLog);
+    Mockito.when(deltaLog.dataPath()).thenReturn(new Path("file:///"));
+    Assertions.assertEquals(
+        filePath, actionsConverter.extractDeletionVectorFile(snapshot, 
addFileAction));
+  }
+}

Reply via email to