This is an automated email from the ASF dual-hosted git repository.

ashvin pushed a commit to branch 348-deletion-vectors-iceberg-target
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git

commit eb6a8c7b363d46d33bb82aa932aea18d12fe8f89
Author: Ashvin Agrawal <[email protected]>
AuthorDate: Sun Jan 26 11:57:38 2025 -0800

    Handle deletion vectors in Iceberg target
---
 pom.xml                                            |   1 +
 xtable-core/pom.xml                                |   4 +
 .../apache/xtable/hudi/HudiFileStatsExtractor.java |   9 +-
 .../xtable/iceberg/IcebergConversionTarget.java    |  27 +-
 .../xtable/iceberg/IcebergDataFileUpdatesSync.java |  53 +++-
 .../iceberg/IcebergDeleteVectorConverter.java      |  94 ++++++
 .../iceberg/ITIcebergDeleteVectorConvert.java      | 350 +++++++++++++++++++++
 .../iceberg/TestIcebergDeleteVectorConverter.java  |  95 ++++++
 8 files changed, 612 insertions(+), 21 deletions(-)

diff --git a/pom.xml b/pom.xml
index 46641dee..8b762634 100644
--- a/pom.xml
+++ b/pom.xml
@@ -270,6 +270,7 @@
                 <groupId>org.apache.iceberg</groupId>
                 <artifactId>iceberg-parquet</artifactId>
                 <version>${iceberg.version}</version>
+                <scope>provided</scope>
             </dependency>
             <dependency>
                 <groupId>org.apache.iceberg</groupId>
diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml
index f277495e..97397d3b 100644
--- a/xtable-core/pom.xml
+++ b/xtable-core/pom.xml
@@ -92,6 +92,10 @@
             <groupId>org.apache.iceberg</groupId>
             <artifactId>iceberg-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-parquet</artifactId>
+        </dependency>
 
         <!-- Delta dependencies -->
         <dependency>
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java
index e47ef72e..0a11bd16 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java
@@ -36,7 +36,6 @@ import lombok.AllArgsConstructor;
 import lombok.NonNull;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.io.api.Binary;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
@@ -258,10 +257,10 @@ public class HudiFileStatsExtractor {
         && (minValue instanceof ByteBuffer || maxValue instanceof ByteBuffer)) 
{
       minValue = minValue == null ? null : new String(((ByteBuffer) 
minValue).array());
       maxValue = maxValue == null ? null : new String(((ByteBuffer) 
maxValue).array());
-    } else if (field.getSchema().getDataType() == InternalType.FIXED
-        && (minValue instanceof Binary || maxValue instanceof Binary)) {
-      minValue = minValue == null ? null : ByteBuffer.wrap(((Binary) 
minValue).getBytes());
-      maxValue = maxValue == null ? null : ByteBuffer.wrap(((Binary) 
maxValue).getBytes());
+      //    } else if (field.getSchema().getDataType() == InternalType.FIXED
+      //        && (minValue instanceof Binary || maxValue instanceof Binary)) 
{
+      //      minValue = minValue == null ? null : ByteBuffer.wrap(((Binary) 
minValue).getBytes());
+      //      maxValue = maxValue == null ? null : ByteBuffer.wrap(((Binary) 
maxValue).getBytes());
     }
     boolean isScalar = minValue == null || minValue.compareTo(maxValue) == 0;
     Range range = isScalar ? Range.scalar(minValue) : Range.vector(minValue, 
maxValue);
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
index ecdbfa26..2050a2dc 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
@@ -18,6 +18,7 @@
  
 package org.apache.xtable.iceberg;
 
+import java.io.IOException;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.List;
@@ -194,19 +195,27 @@ public class IcebergConversionTarget implements 
ConversionTarget {
 
   @Override
   public void syncFilesForSnapshot(List<PartitionFileGroup> 
partitionedDataFiles) {
-    dataFileUpdatesExtractor.applySnapshot(
-        table,
-        internalTableState,
-        transaction,
-        partitionedDataFiles,
-        transaction.table().schema(),
-        transaction.table().spec());
+    try {
+      dataFileUpdatesExtractor.applySnapshot(
+          table,
+          internalTableState,
+          transaction,
+          partitionedDataFiles,
+          transaction.table().schema(),
+          transaction.table().spec());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   public void syncFilesForDiff(DataFilesDiff dataFilesDiff) {
-    dataFileUpdatesExtractor.applyDiff(
-        transaction, dataFilesDiff, transaction.table().schema(), 
transaction.table().spec());
+    try {
+      dataFileUpdatesExtractor.applyDiff(
+          transaction, dataFilesDiff, transaction.table().schema(), 
transaction.table().spec());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
index 80e1559f..35348adf 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
@@ -18,6 +18,7 @@
  
 package org.apache.xtable.iceberg;
 
+import java.io.IOException;
 import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -26,6 +27,7 @@ import lombok.AllArgsConstructor;
 
 import org.apache.iceberg.*;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
 
 import org.apache.xtable.exception.NotSupportedException;
 import org.apache.xtable.exception.ReadException;
@@ -33,6 +35,7 @@ import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.storage.DataFilesDiff;
 import org.apache.xtable.model.storage.FilesDiff;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalDeletionVector;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 
 @AllArgsConstructor(staticName = "of")
@@ -46,7 +49,8 @@ public class IcebergDataFileUpdatesSync {
       Transaction transaction,
       List<PartitionFileGroup> partitionedDataFiles,
       Schema schema,
-      PartitionSpec partitionSpec) {
+      PartitionSpec partitionSpec)
+      throws IOException {
 
     Map<String, DataFile> previousFiles = new HashMap<>();
     try (CloseableIterable<FileScanTask> iterator = 
table.newScan().planFiles()) {
@@ -67,7 +71,8 @@ public class IcebergDataFileUpdatesSync {
       Transaction transaction,
       DataFilesDiff dataFilesDiff,
       Schema schema,
-      PartitionSpec partitionSpec) {
+      PartitionSpec partitionSpec)
+      throws IOException {
 
     Collection<DataFile> filesRemoved =
         dataFilesDiff.getFilesRemoved().stream()
@@ -82,11 +87,45 @@ public class IcebergDataFileUpdatesSync {
       Collection<InternalDataFile> filesAdded,
       Collection<DataFile> filesRemoved,
       Schema schema,
-      PartitionSpec partitionSpec) {
-    OverwriteFiles overwriteFiles = transaction.newOverwrite();
-    filesAdded.forEach(f -> overwriteFiles.addFile(getDataFile(partitionSpec, 
schema, f)));
-    filesRemoved.forEach(overwriteFiles::deleteFile);
-    overwriteFiles.commit();
+      PartitionSpec partitionSpec)
+      throws IOException {
+
+    List<InternalDeletionVector> deletionVectors =
+        filesAdded.stream()
+            .filter(dataFile -> dataFile instanceof InternalDeletionVector)
+            .map(dataFile -> (InternalDeletionVector) dataFile)
+            .collect(Collectors.toList());
+
+    if (!filesRemoved.isEmpty() || filesAdded.size() > deletionVectors.size()) 
{
+      OverwriteFiles overwriteFiles = transaction.newOverwrite();
+      filesAdded.stream()
+          .filter(dataFile -> !(dataFile instanceof InternalDeletionVector))
+          .forEach(
+              dataFile -> overwriteFiles.addFile(getDataFile(partitionSpec, 
schema, dataFile)));
+      filesRemoved.forEach(overwriteFiles::deleteFile);
+      overwriteFiles.commit();
+    }
+
+    if (deletionVectors.isEmpty()) {
+      return;
+    }
+    RowDelta rowDeletes = transaction.newRowDelta();
+    String basePath = transaction.table().location();
+    IcebergDeleteVectorConverter converter =
+        IcebergDeleteVectorConverter.builder().directoryPath(basePath).build();
+    for (InternalDeletionVector dataFile : deletionVectors) {
+      rowDeletes.addDeletes(getDeleteFile(transaction, dataFile, converter));
+    }
+    rowDeletes.commit();
+  }
+
+  private DeleteFile getDeleteFile(
+      Transaction transaction,
+      InternalDeletionVector deletionVector,
+      IcebergDeleteVectorConverter converter)
+      throws IOException {
+    FileIO io = transaction.table().io();
+    return converter.toIceberg(io, deletionVector);
   }
 
   private DataFile getDataFile(
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDeleteVectorConverter.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDeleteVectorConverter.java
new file mode 100644
index 00000000..4fafa684
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDeleteVectorConverter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.iceberg;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+
+import lombok.Builder;
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+
+import org.apache.xtable.model.storage.InternalDeletionVector;
+
+@Log4j2
+@Builder(builderClassName = "Builder")
+public class IcebergDeleteVectorConverter {
+  // Path to the directory where the delete files generated by this converter 
are stored
+  private String directoryPath;
+
+  public static class Builder {
+    public Builder directoryPath(Path directoryPath) {
+      return directoryPath(directoryPath.toString());
+    }
+
+    public Builder directoryPath(String directoryPath) {
+      this.directoryPath = directoryPath;
+      return this;
+    }
+  }
+
+  /**
+   * Converts (@link InternalDeletionVector internal representation of 
positional deletes) to
+   * Iceberg positional delete representation and writes to a position delete 
file. The method
+   * generates a new position delete file using parquet format using 
delete-UUID.parquet name
+   * pattern in the given directory
+   *
+   * @return metadata, {@link DeleteFile}, of the generated file
+   */
+  public DeleteFile toIceberg(FileIO fileIO, InternalDeletionVector vector) 
throws IOException {
+    // generate a name for a new positional delete file
+    String posDeleteFileName = "delete-" + UUID.randomUUID() + ".parquet";
+    String posDeleteFilePath = Paths.get(directoryPath, 
posDeleteFileName).toString();
+    String dataFilePath = vector.dataFilePath();
+    log.info("Creating a new positional delete file: {} for {}", 
posDeleteFilePath, dataFilePath);
+    OutputFile out = fileIO.newOutputFile(posDeleteFilePath);
+
+    PositionDeleteWriter<Record> deleteWriter =
+        Parquet.writeDeletes(out)
+            .createWriterFunc(GenericParquetWriter::buildWriter)
+            // .rowSchema(tableSchema)
+            // TODO add support for partitioned tables
+            .withSpec(PartitionSpec.unpartitioned())
+            .buildPositionWriter();
+    PositionDelete<Record> positionDelete = PositionDelete.create();
+    try (PositionDeleteWriter<Record> writer = deleteWriter) {
+      vector
+          .ordinalsIterator()
+          .forEachRemaining(
+              ordinal -> writer.write(positionDelete.set(dataFilePath, 
ordinal, null)));
+    }
+
+    // TODO optimize files by partitions
+    DeleteFile posDeleteFile = deleteWriter.toDeleteFile();
+    log.info("Created a new positional delete file: {}", posDeleteFilePath);
+    return posDeleteFile;
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergDeleteVectorConvert.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergDeleteVectorConvert.java
new file mode 100644
index 00000000..49357dec
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergDeleteVectorConvert.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.iceberg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.serializer.KryoSerializer;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.spark.sql.delta.DeltaLog;
+import org.apache.spark.sql.delta.actions.AddFile;
+import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
+
+import scala.Option;
+
+import org.apache.xtable.GenericTable;
+import org.apache.xtable.TestSparkDeltaTable;
+import org.apache.xtable.conversion.ConversionTargetFactory;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetTable;
+import org.apache.xtable.delta.DeltaConversionSource;
+import org.apache.xtable.delta.DeltaConversionSourceProvider;
+import org.apache.xtable.model.CommitsBacklog;
+import org.apache.xtable.model.IncrementalTableChanges;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.metadata.TableSyncMetadata;
+import org.apache.xtable.model.storage.InternalDeletionVector;
+import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.model.sync.SyncResult;
+import org.apache.xtable.spi.sync.ConversionTarget;
+import org.apache.xtable.spi.sync.TableFormatSync;
+
+/**
+ * Integration test for conversion between {@link InternalDeletionVector} to 
and from Iceberg
+ * deletion vectors. Currently, the tests uses Delta table format either as 
the source of deletion
+ * vectors which are then converted to Iceberg deletion vectors, or as a 
consumer to validate
+ * conversion of deletion vectors from an Iceberg source table.
+ */
+public class ITIcebergDeleteVectorConvert {
+  @TempDir private static Path tempDir;
+  private static SparkSession sparkSession;
+
+  private Configuration hadoopConf;
+  private DeltaConversionSourceProvider conversionSourceProvider;
+  private TestSparkDeltaTable testSparkDeltaTable;
+
+  @BeforeAll
+  public static void setupOnce() {
+    sparkSession =
+        SparkSession.builder()
+            .appName("TestDeltaTable")
+            .master("local[4]")
+            .config("spark.sql.extensions", 
"io.delta.sql.DeltaSparkSessionExtension")
+            .config(
+                "spark.sql.catalog.spark_catalog",
+                "org.apache.spark.sql.delta.catalog.DeltaCatalog")
+            .config("spark.databricks.delta.retentionDurationCheck.enabled", 
"false")
+            .config("spark.databricks.delta.schema.autoMerge.enabled", "true")
+            .config("spark.sql.shuffle.partitions", "1")
+            .config("spark.default.parallelism", "1")
+            .config("spark.serializer", KryoSerializer.class.getName())
+            .getOrCreate();
+  }
+
+  @AfterAll
+  public static void teardown() {
+    if (sparkSession != null) {
+      sparkSession.close();
+    }
+  }
+
+  @BeforeEach
+  void setUp() {
+    hadoopConf = new Configuration();
+    hadoopConf.set("fs.defaultFS", "file:///");
+
+    conversionSourceProvider = new DeltaConversionSourceProvider();
+    conversionSourceProvider.init(hadoopConf);
+  }
+
+  private static class TableState {
+    Map<String, AddFile> activeFiles;
+    List<Row> rowsToDelete;
+
+    TableState(Map<String, AddFile> activeFiles) {
+      this(activeFiles, Collections.emptyList());
+    }
+
+    TableState(Map<String, AddFile> activeFiles, List<Row> rowsToDelete) {
+      this.activeFiles = activeFiles;
+      this.rowsToDelete = rowsToDelete;
+    }
+  }
+
+  /**
+   * Test to validate conversion of deletion vectors from Delta table to 
Iceberg delete vectors. The
+   * tests uses spark sql to insert rows and delete rows in a Delta table and 
then converts the
+   * deletion vectors to Iceberg delete vectors. The test uses incremental 
sync, which would result
+   * in equal number of commits in delta and iceberg. For validation, the test 
compares the records
+   * returned by iceberg table and by validating the presence of iceberg 
position delete files.
+   */
+  @Test
+  public void testInsertsUpsertsAndDeletes() {
+    String tableName = GenericTable.getTableName();
+    testSparkDeltaTable = new TestSparkDeltaTable(tableName, tempDir, 
sparkSession, null, false);
+    String tableBasePath = testSparkDeltaTable.getBasePath();
+
+    // enable deletion vectors for the test table
+    testSparkDeltaTable
+        .getSparkSession()
+        .sql(
+            "ALTER TABLE "
+                + tableName
+                + " SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)");
+
+    List<TableChange> allTableChanges = new ArrayList<>();
+    List<Row> rows = testSparkDeltaTable.insertRows(50);
+    assertEquals(50L, testSparkDeltaTable.getNumRows());
+
+    List<Row> rows1 = testSparkDeltaTable.insertRows(50);
+    assertEquals(100L, testSparkDeltaTable.getNumRows());
+
+    testSparkDeltaTable.insertRows(50);
+    assertEquals(150L, testSparkDeltaTable.getNumRows());
+
+    // delete a few rows with gaps in ids
+    List<Row> rowsToDelete =
+        rows1.subList(0, 10).stream()
+            .filter(row -> (row.get(0).hashCode() % 2) == 0)
+            .collect(Collectors.toList());
+    rowsToDelete.addAll(rows.subList(35, 45));
+    testSparkDeltaTable.deleteRows(rowsToDelete);
+    validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 15);
+    assertEquals(135L, testSparkDeltaTable.getNumRows());
+
+    testSparkDeltaTable.insertRows(50);
+    assertEquals(185L, testSparkDeltaTable.getNumRows());
+
+    // delete a few rows from a file which already has a deletion vector, this 
should generate a
+    // merged deletion vector file. Some rows were already deleted in the 
previous delete step.
+    // This deletion step intentionally deletes the same rows again to test 
the merge.
+    rowsToDelete = rows1.subList(5, 15);
+    testSparkDeltaTable.deleteRows(rowsToDelete);
+    validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 22);
+    assertEquals(178L, testSparkDeltaTable.getNumRows());
+
+    testSparkDeltaTable.insertRows(50);
+    assertEquals(228L, testSparkDeltaTable.getNumRows());
+
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(testSparkDeltaTable.getTableName())
+            .basePath(tableBasePath)
+            .formatName(TableFormat.DELTA)
+            .build();
+    DeltaConversionSource conversionSource =
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
+
+    // Get changes in incremental format.
+    InstantsForIncrementalSync instantsForIncrementalSync =
+        InstantsForIncrementalSync.builder()
+            .lastSyncInstant(Instant.now().minus(Duration.ofHours(1)))
+            .build();
+    CommitsBacklog<Long> commitsBacklog =
+        conversionSource.getCommitsBacklog(instantsForIncrementalSync);
+    for (Long version : commitsBacklog.getCommitsToProcess()) {
+      TableChange tableChange = 
conversionSource.getTableChangeForCommit(version);
+      allTableChanges.add(tableChange);
+    }
+
+    sparkSession.close();
+
+    TargetTable icebergTarget =
+        TargetTable.builder()
+            .formatName(TableFormat.ICEBERG)
+            .name(tableName)
+            .basePath(tableBasePath)
+            .build();
+    ConversionTargetFactory targetFactory = 
ConversionTargetFactory.getInstance();
+    ConversionTarget conversionTarget = 
targetFactory.createForFormat(icebergTarget, hadoopConf);
+
+    IncrementalTableChanges incrementalTableChanges =
+        
IncrementalTableChanges.builder().tableChanges(allTableChanges.iterator()).build();
+
+    Map<ConversionTarget, TableSyncMetadata> conversionTargetWithMetadata = 
new HashMap<>();
+    conversionTargetWithMetadata.put(
+        conversionTarget,
+        TableSyncMetadata.of(Instant.now().minus(Duration.ofHours(1)), 
Collections.emptyList()));
+
+    Map<String, List<SyncResult>> result =
+        TableFormatSync.getInstance()
+            .syncChanges(conversionTargetWithMetadata, 
incrementalTableChanges);
+  }
+
+  // collects active files in the current snapshot as a map and adds it to the 
list
+  private Map<String, AddFile> collectActiveFilesAfterCommit(
+      TestSparkDeltaTable testSparkDeltaTable) {
+    Map<String, AddFile> allFiles =
+        testSparkDeltaTable.getAllActiveFilesInfo().stream()
+            .collect(
+                Collectors.toMap(
+                    file -> getAddFileAbsolutePath(file, 
testSparkDeltaTable.getBasePath()),
+                    file -> file));
+    return allFiles;
+  }
+
+  private void validateDeletionInfo(
+      List<TableState> testTableStates, List<TableChange> allTableChanges) {
+    if (allTableChanges.isEmpty() && testTableStates.size() <= 1) {
+      return;
+    }
+
+    assertEquals(
+        allTableChanges.size(),
+        testTableStates.size() - 1,
+        "Number of table changes should be equal to number of commits - 1");
+
+    for (int i = 0; i < allTableChanges.size() - 1; i++) {
+      Map<String, AddFile> activeFileAfterCommit = testTableStates.get(i + 
1).activeFiles;
+      Map<String, AddFile> activeFileBeforeCommit = 
testTableStates.get(i).activeFiles;
+
+      Map<String, AddFile> activeFilesWithUpdatedDeleteInfo =
+          activeFileAfterCommit.entrySet().stream()
+              .filter(e -> e.getValue().deletionVector() != null)
+              .filter(
+                  entry -> {
+                    if (activeFileBeforeCommit.get(entry.getKey()) == null) {
+                      return true;
+                    }
+                    if 
(activeFileBeforeCommit.get(entry.getKey()).deletionVector() == null) {
+                      return true;
+                    }
+                    DeletionVectorDescriptor deletionVectorDescriptor =
+                        
activeFileBeforeCommit.get(entry.getKey()).deletionVector();
+                    return 
!deletionVectorDescriptor.equals(entry.getValue().deletionVector());
+                  })
+              .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+      if (activeFilesWithUpdatedDeleteInfo.isEmpty()) {
+        continue;
+      }
+
+      // validate all new delete vectors are correctly detected
+      validateDeletionInfoForCommit(
+          testTableStates.get(i + 1), activeFilesWithUpdatedDeleteInfo, 
allTableChanges.get(i));
+    }
+  }
+
+  private void validateDeletionInfoForCommit(
+      TableState tableState,
+      Map<String, AddFile> activeFilesAfterCommit,
+      TableChange changeDetectedForCommit) {
+    Map<String, InternalDeletionVector> detectedDeleteInfos =
+        changeDetectedForCommit.getFilesDiff().getFilesAdded().stream()
+            .filter(file -> file instanceof InternalDeletionVector)
+            .map(file -> (InternalDeletionVector) file)
+            .collect(Collectors.toMap(InternalDeletionVector::dataFilePath, 
file -> file));
+
+    Map<String, AddFile> filesWithDeleteVectors =
+        activeFilesAfterCommit.entrySet().stream()
+            .filter(file -> file.getValue().deletionVector() != null)
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+    assertEquals(filesWithDeleteVectors.size(), detectedDeleteInfos.size());
+
+    for (Map.Entry<String, AddFile> fileWithDeleteVector : 
filesWithDeleteVectors.entrySet()) {
+      InternalDeletionVector deleteInfo = 
detectedDeleteInfos.get(fileWithDeleteVector.getKey());
+      assertNotNull(deleteInfo);
+      DeletionVectorDescriptor deletionVectorDescriptor =
+          fileWithDeleteVector.getValue().deletionVector();
+      assertEquals(deletionVectorDescriptor.cardinality(), 
deleteInfo.getRecordCount());
+      assertEquals(deletionVectorDescriptor.sizeInBytes(), 
deleteInfo.getFileSizeBytes());
+      assertEquals(deletionVectorDescriptor.offset().get(), 
deleteInfo.offset());
+
+      String deletionFilePath =
+          deletionVectorDescriptor
+              .absolutePath(new 
org.apache.hadoop.fs.Path(testSparkDeltaTable.getBasePath()))
+              .toString();
+      assertEquals(deletionFilePath, deleteInfo.getPhysicalPath());
+
+      Iterator<Long> iterator = deleteInfo.ordinalsIterator();
+      List<Long> deletes = new ArrayList<>();
+      iterator.forEachRemaining(deletes::add);
+      assertEquals(deletes.size(), deleteInfo.getRecordCount());
+    }
+  }
+
+  private static String getAddFileAbsolutePath(AddFile file, String 
tableBasePath) {
+    String filePath = file.path();
+    if (filePath.startsWith(tableBasePath)) {
+      return filePath;
+    }
+    return Paths.get(tableBasePath, file.path()).toString();
+  }
+
+  private void validateDeletedRecordCount(
+      DeltaLog deltaLog, int deleteVectorFileCount, int deletionRecordCount) {
+    List<AddFile> allFiles =
+        deltaLog
+            .getSnapshotAt(deltaLog.snapshot().version(), Option.empty())
+            .allFiles()
+            .collectAsList();
+    List<AddFile> filesWithDeletionVectors =
+        allFiles.stream().filter(f -> f.deletionVector() != 
null).collect(Collectors.toList());
+
+    assertEquals(deleteVectorFileCount, filesWithDeletionVectors.size());
+    assertEquals(
+        deletionRecordCount,
+        filesWithDeletionVectors.stream()
+            .collect(Collectors.summarizingLong(AddFile::numDeletedRecords))
+            .getSum());
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDeleteVectorConverter.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDeleteVectorConverter.java
new file mode 100644
index 00000000..bf532d05
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDeleteVectorConverter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.iceberg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+import org.apache.xtable.model.storage.InternalDeletionVector;
+
+public class TestIcebergDeleteVectorConverter {
+  @TempDir private Path tempDirPath;
+
+  @Test
+  public void x() throws Exception {
+    Path tablePath = Paths.get(tempDirPath.toString(), "table");
+    Path dataFilePath = Paths.get(tablePath.toString(), "part=1", 
"data_1.parquet");
+
+    List<Long> ordinals = Arrays.asList(1L, 10L, 12L, 13L, 20L);
+
+    InternalDeletionVector vector =
+        InternalDeletionVector.builder()
+            .physicalPath("")
+            .dataFilePath(dataFilePath.toString())
+            .ordinalsSupplier(ordinals::iterator)
+            .build();
+
+    IcebergDeleteVectorConverter converter =
+        
IcebergDeleteVectorConverter.builder().directoryPath(tempDirPath).build();
+
+    HadoopFileIO fileIO = new HadoopFileIO(new Configuration());
+    DeleteFile deleteFile = converter.toIceberg(fileIO, vector);
+    assertNotNull(deleteFile);
+    assertNotNull(deleteFile.path());
+    assertTrue(Files.exists(Paths.get(deleteFile.path().toString()), 
LinkOption.NOFOLLOW_LINKS));
+
+    // assert delete file exists
+    CharSequence deleteFilePath = deleteFile.path();
+    Schema deleteSchema = DeleteSchemaUtil.posDeleteSchema(null);
+
+    InputFile deleteInputFile = 
org.apache.iceberg.Files.localInput(deleteFilePath.toString());
+    try (CloseableIterable<Record> reader =
+        Parquet.read(deleteInputFile)
+            .project(deleteSchema)
+            .createReaderFunc(schema -> 
GenericParquetReaders.buildReader(deleteSchema, schema))
+            .build()) {
+      ArrayList<Record> deletedRecords = Lists.newArrayList(reader);
+      for (int i = 0; i < ordinals.size(); i++) {
+        Record record = deletedRecords.get(i);
+        assertEquals(2, record.size());
+        assertEquals(record.get(0), dataFilePath.toString());
+        assertEquals(record.get(1), ordinals.get(i));
+      }
+    }
+  }
+}

Reply via email to