This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cfa2f733c [GOBBLIN-2203] Add Support for Delete Manifests in Iceberg 
Full Table Replication (#4112)
4cfa2f733c is described below

commit 4cfa2f733c913a71338286e477af027dec9e7889
Author: Vivek Rai <[email protected]>
AuthorDate: Wed Apr 23 15:54:56 2025 +0530

    [GOBBLIN-2203] Add Support for Delete Manifests in Iceberg Full Table 
Replication (#4112)
    
    * added support for delete manifests in iceberg full table replication
    
    * remove unused imports
---
 .../data/management/copy/iceberg/IcebergTable.java |  12 ++
 .../management/copy/iceberg/IcebergTableTest.java  | 206 ++++++++++++++++-----
 2 files changed, 175 insertions(+), 43 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index e3ec46aa1e..fab570120b 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -30,6 +30,8 @@ import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.ManifestContent;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestReader;
@@ -200,6 +202,9 @@ public class IcebergTable {
   }
 
   protected static IcebergSnapshotInfo.ManifestFileInfo 
calcManifestFileInfo(ManifestFile manifest, FileIO io) throws IOException {
+    if (manifest.content() == ManifestContent.DELETES) {
+      return new ManifestFileInfo(manifest.path(), 
discoverDeleteFilePaths(manifest, io));
+    }
     return new ManifestFileInfo(manifest.path(), 
discoverDataFilePaths(manifest, io));
   }
 
@@ -209,6 +214,13 @@ public class IcebergTable {
     }
   }
 
+  protected static List<String> discoverDeleteFilePaths(ManifestFile manifest, 
FileIO io) throws IOException {
+    try (ManifestReader<DeleteFile> deleteFileManifestReader = 
ManifestFiles.readDeleteManifest(manifest, io, null);
+        CloseableIterator<DeleteFile> deleteFiles = 
deleteFileManifestReader.iterator()) {
+      return Lists.newArrayList(Iterators.transform(deleteFiles, (deleteFile) 
-> deleteFile.path().toString()));
+    }
+  }
+
   public DatasetDescriptor getDatasetDescriptor(FileSystem fs) {
     DatasetDescriptor descriptor = new DatasetDescriptor(
         datasetDescriptorPlatform,
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index 63aa27221b..90ba02b848 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -24,20 +24,24 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionData;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
@@ -45,14 +49,25 @@ import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.hive.HiveMetastoreTest;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -73,6 +88,18 @@ public class IcebergTableTest extends HiveMetastoreTest {
   protected static final PartitionSpec icebergPartitionSpec = 
PartitionSpec.builderFor(icebergSchema)
       .identity("id")
       .build();
+  protected static final List<List<String>> perSnapshotDataFilesets = 
Lists.newArrayList(
+      Lists.newArrayList("path/to/data-a0.orc"),
+      Lists.newArrayList("path/to/data-b0.orc", "path/to/data-b1.orc"),
+      Lists.newArrayList("path/to/data-c0.orc", "path/to/data-c1.orc", 
"path/to/data-c2.orc"),
+      Lists.newArrayList("path/to/data-d0.orc")
+  );
+  protected static final List<List<String>> perSnapshotDeleteFilesets = 
Lists.newArrayList(
+      Lists.newArrayList("path/to/delete-a0.orc"),
+      Lists.newArrayList("path/to/delete-b0.orc", "path/to/delete-b1.orc"),
+      Lists.newArrayList("path/to/delete-c0.orc", "path/to/delete-c1.orc", 
"path/to/delete-c2.orc"),
+      Lists.newArrayList("path/to/delete-d0.orc")
+  );
 
   private final String dbName = "myicebergdb";
   private final String tableName = "justtesting";
@@ -91,7 +118,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
   @BeforeMethod
   public void setUpEachTest() {
     tableId = TableIdentifier.of(dbName, tableName);
-    table = catalog.createTable(tableId, icebergSchema, icebergPartitionSpec);
+    table = catalog.createTable(tableId, icebergSchema, icebergPartitionSpec, 
Collections.singletonMap("format-version", "2"));
     catalogUri = catalog.getConf().get(CatalogProperties.URI);
     metadataBasePath = calcMetadataBasePath(tableId);
   }
@@ -101,17 +128,44 @@ public class IcebergTableTest extends HiveMetastoreTest {
     catalog.dropTable(tableId);
   }
 
-  /** Verify info about the current snapshot only */
+  /** Test to verify getCurrentSnapshotInfo, getAllSnapshotInfosIterator, 
getIncrementalSnapshotInfosIterator for iceberg table containing only data 
files.*/
   @Test
-  public void testGetCurrentSnapshotInfo() throws IOException {
-    List<List<String>> perSnapshotFilesets = Lists.newArrayList(
-        Lists.newArrayList("/path/to/data-a0.orc"),
-        Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc"),
-        Lists.newArrayList("/path/to/data-c0.orc", "/path/to/data-c1.orc", 
"/path/to/data-c2.orc"),
-        Lists.newArrayList("/path/to/data-d0.orc")
-    );
+  public void testGetSnapshotInfosForDataFilesOnlyTable() throws IOException {
+    initializeSnapshots(table, perSnapshotDataFilesets);
 
-    initializeSnapshots(table, perSnapshotFilesets);
+    IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, 
catalog.newTableOps(tableId), catalogUri,
+        catalog.loadTable(tableId)).getCurrentSnapshotInfo();
+    verifySnapshotInfo(snapshotInfo, perSnapshotDataFilesets, 
perSnapshotDataFilesets.size());
+
+    List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new 
IcebergTable(tableId, catalog.newTableOps(tableId),
+        catalogUri, catalog.loadTable(tableId)).getAllSnapshotInfosIterator());
+    Assert.assertEquals(snapshotInfos.size(), perSnapshotDataFilesets.size(), 
"num snapshots");
+    for (int i = 0; i < perSnapshotDataFilesets.size(); ++i) {
+      System.err.println("verifying snapshotInfo[" + i + "]");
+      verifySnapshotInfo(snapshotInfos.get(i), 
perSnapshotDataFilesets.subList(0, i + 1), snapshotInfos.size());
+    }
+
+    List<IcebergSnapshotInfo> incrementalSnapshotInfos = 
Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId),
+        catalogUri, 
catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator());
+    Assert.assertEquals(incrementalSnapshotInfos.size(), 
perSnapshotDataFilesets.size(), "num snapshots");
+    for (int i = 0; i < incrementalSnapshotInfos.size(); ++i) {
+      System.err.println("verifying snapshotInfo[" + i + "]");
+      verifySnapshotInfo(incrementalSnapshotInfos.get(i), 
perSnapshotDataFilesets.subList(i, i + 1), incrementalSnapshotInfos.size());
+    }
+  }
+
+  @DataProvider(name = "isPosDeleteProvider")
+  public Object[][] isPosDeleteProvider() {
+    return new Object[][] {{true}, {false}};
+  }
+
+  /** Verify info about the current snapshot only */
+  @Test(dataProvider = "isPosDeleteProvider")
+  public void testGetCurrentSnapshotInfo(boolean isPosDelete) throws 
IOException {
+    initializeSnapshots(table, perSnapshotDataFilesets);
+    initializeSnapshotsWithDeleteFiles(table, 
catalog.newTableOps(tableId).io(), perSnapshotDeleteFilesets, isPosDelete);
+    List<List<String>> perSnapshotFilesets = 
Stream.concat(perSnapshotDeleteFilesets.stream(), 
perSnapshotDataFilesets.stream())
+        .collect(Collectors.toList());
     IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, 
catalog.newTableOps(tableId), catalogUri,
         catalog.loadTable(tableId)).getCurrentSnapshotInfo();
     verifySnapshotInfo(snapshotInfo, perSnapshotFilesets, 
perSnapshotFilesets.size());
@@ -127,37 +181,37 @@ public class IcebergTableTest extends HiveMetastoreTest {
   }
 
   /** Verify info about all (full) snapshots */
-  @Test
-  public void testGetAllSnapshotInfosIterator() throws IOException {
-    List<List<String>> perSnapshotFilesets = Lists.newArrayList(
-        Lists.newArrayList("/path/to/data-a0.orc"),
-        Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc"),
-        Lists.newArrayList("/path/to/data-c0.orc", "/path/to/data-c1.orc", 
"/path/to/data-c2.orc"),
-        Lists.newArrayList("/path/to/data-d0.orc")
-    );
+  @Test(dataProvider = "isPosDeleteProvider")
+  public void testGetAllSnapshotInfosIterator(boolean isPosDelete) throws 
IOException {
+    int numDataSnapshots = perSnapshotDataFilesets.size();
+    int numDeleteSnapshots = perSnapshotDeleteFilesets.size();
 
-    initializeSnapshots(table, perSnapshotFilesets);
+    initializeSnapshots(table, perSnapshotDataFilesets);
+    initializeSnapshotsWithDeleteFiles(table, 
catalog.newTableOps(tableId).io(), perSnapshotDeleteFilesets, isPosDelete);
     List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new 
IcebergTable(tableId, catalog.newTableOps(tableId),
         catalogUri, catalog.loadTable(tableId)).getAllSnapshotInfosIterator());
-    Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num 
snapshots");
+    Assert.assertEquals(snapshotInfos.size(), numDataSnapshots + 
numDeleteSnapshots, "num snapshots");
 
-    for (int i = 0; i < snapshotInfos.size(); ++i) {
+    for (int i = 0; i < numDataSnapshots; ++i) {
       System.err.println("verifying snapshotInfo[" + i + "]");
-      verifySnapshotInfo(snapshotInfos.get(i), perSnapshotFilesets.subList(0, 
i + 1), snapshotInfos.size());
+      verifySnapshotInfo(snapshotInfos.get(i), 
perSnapshotDataFilesets.subList(0, i + 1), snapshotInfos.size());
+    }
+
+    for (int i = 0 ; i < numDeleteSnapshots ; i++) {
+      System.err.println("verifying snapshotInfo[" + i + "]");
+      List<List<String>> curSnapshotFileSets = 
Stream.concat(perSnapshotDeleteFilesets.subList(0, i + 1).stream(), 
perSnapshotDataFilesets.stream())
+          .collect(Collectors.toList());
+      verifySnapshotInfo(snapshotInfos.get(i + numDataSnapshots), 
curSnapshotFileSets, snapshotInfos.size());
     }
   }
 
   /** Verify info about all snapshots (incremental deltas) */
-  @Test
-  public void testGetIncrementalSnapshotInfosIterator() throws IOException {
-    List<List<String>> perSnapshotFilesets = Lists.newArrayList(
-        Lists.newArrayList("/path/to/data-a0.orc"),
-        Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc"),
-        Lists.newArrayList("/path/to/data-c0.orc", "/path/to/data-c1.orc", 
"/path/to/data-c2.orc"),
-        Lists.newArrayList("/path/to/data-d0.orc")
-    );
-
-    initializeSnapshots(table, perSnapshotFilesets);
+  @Test(dataProvider = "isPosDeleteProvider")
+  public void testGetIncrementalSnapshotInfosIterator(boolean isPosDelete) 
throws IOException {
+    initializeSnapshots(table, perSnapshotDataFilesets);
+    initializeSnapshotsWithDeleteFiles(table, 
catalog.newTableOps(tableId).io(), perSnapshotDeleteFilesets, isPosDelete);
+    List<List<String>> perSnapshotFilesets = 
Stream.concat(perSnapshotDataFilesets.stream(), 
perSnapshotDeleteFilesets.stream())
+        .collect(Collectors.toList());
     List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new 
IcebergTable(tableId, catalog.newTableOps(tableId),
         catalogUri, 
catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator());
     Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num 
snapshots");
@@ -169,26 +223,38 @@ public class IcebergTableTest extends HiveMetastoreTest {
   }
 
   /** Verify info about all snapshots (incremental deltas) correctly 
eliminates repeated data files */
-  @Test
-  public void testGetIncrementalSnapshotInfosIteratorRepeatedFiles() throws 
IOException {
-    List<List<String>> perSnapshotFilesets = Lists.newArrayList(
-        Lists.newArrayList("/path/to/data-a0.orc"),
-        Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc", 
"/path/to/data-a0.orc"),
-        Lists.newArrayList("/path/to/data-a0.orc","/path/to/data-c0.orc", 
"/path/to/data-b1.orc", "/path/to/data-c1.orc", "/path/to/data-c2.orc"),
-        Lists.newArrayList("/path/to/data-d0.orc")
+  @Test(dataProvider = "isPosDeleteProvider")
+  public void testGetIncrementalSnapshotInfosIteratorRepeatedFiles(boolean 
isPosDelete) throws IOException {
+    List<List<String>> perSnapshotFilesets1 = Lists.newArrayList(
+        Lists.newArrayList("path/to/data-a0.orc"),
+        Lists.newArrayList("path/to/data-b0.orc", "path/to/data-b1.orc", 
"path/to/data-a0.orc"),
+        Lists.newArrayList("path/to/data-a0.orc","path/to/data-c0.orc", 
"path/to/data-b1.orc", "path/to/data-c1.orc", "path/to/data-c2.orc"),
+        Lists.newArrayList("path/to/data-d0.orc")
     );
 
-    initializeSnapshots(table, perSnapshotFilesets);
+    // Note : Keeping the name as data- only to test the functionality without 
changing below validation code
+    List<List<String>> perSnapshotFilesets2 = Lists.newArrayList(
+        Lists.newArrayList("path/to/data-e0.orc"),
+        Lists.newArrayList("path/to/data-f0.orc", "path/to/data-f1.orc", 
"path/to/data-e0.orc"),
+        Lists.newArrayList("path/to/data-e0.orc","path/to/data-g0.orc", 
"path/to/data-f1.orc", "path/to/data-g1.orc", "path/to/data-g2.orc"),
+        Lists.newArrayList("path/to/data-h0.orc")
+    );
+
+    List<List<String>> perSnapshotFileSets = new 
ArrayList<>(perSnapshotFilesets1);
+    perSnapshotFileSets.addAll(perSnapshotFilesets2);
+
+    initializeSnapshots(table, perSnapshotFilesets1);
+    initializeSnapshotsWithDeleteFiles(table, 
catalog.newTableOps(tableId).io(), perSnapshotFilesets2, isPosDelete);
     List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new 
IcebergTable(tableId, catalog.newTableOps(tableId),
         catalogUri, 
catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator());
-    Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num 
snapshots");
+    Assert.assertEquals(snapshotInfos.size(), perSnapshotFileSets.size(), "num 
snapshots");
 
     for (int i = 0; i < snapshotInfos.size(); ++i) {
       System.err.println("verifying snapshotInfo[" + i + "] - " + 
snapshotInfos.get(i));
       char initialChar = (char) ((int) 'a' + i);
       // adjust expectations to eliminate duplicate entries (i.e. those 
bearing letter not aligned with ordinal fileset)
-      List<String> fileset = perSnapshotFilesets.get(i).stream().filter(name 
-> {
-        String uniquePortion = name.substring("/path/to/data-".length());
+      List<String> fileset = perSnapshotFileSets.get(i).stream().filter(name 
-> {
+        String uniquePortion = name.substring("path/to/data-".length());
         return uniquePortion.startsWith(Character.toString(initialChar));
       }).collect(Collectors.toList());
       verifySnapshotInfo(snapshotInfos.get(i), Arrays.asList(fileset), 
snapshotInfos.size());
@@ -353,6 +419,36 @@ public class IcebergTableTest extends HiveMetastoreTest {
     }
   }
 
+  protected static void initializeSnapshotsWithDeleteFiles(Table table, FileIO 
fileIO,
+      List<List<String>> perSnapshotFilesets, boolean isPosDelete) {
+    Schema deleteSchema = icebergSchema.select("id");
+    int[] equalityFieldIds = {0};
+    GenericAppenderFactory appenderFactory =
+        new GenericAppenderFactory(
+            icebergSchema,
+            icebergPartitionSpec,
+            equalityFieldIds,
+            deleteSchema,
+            deleteSchema);
+    EncryptionManager encryptionManager = table.encryption();
+    Record deleteRecord = 
GenericRecord.create(deleteSchema).copy(ImmutableMap.of("id", "testVal"));
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, "testVal");
+
+    for (List<String> snapshotFileset : perSnapshotFilesets) {
+      RowDelta rowDelta = table.newRowDelta();
+      for (String filePath : snapshotFileset) {
+        EncryptedOutputFile encryptedOutputFile = 
encryptionManager.encrypt(fileIO.newOutputFile(filePath));
+        if (isPosDelete) {
+          rowDelta.addDeletes(createPosDeleteFile(appenderFactory, 
encryptedOutputFile, partitionData, deleteRecord));
+        } else {
+          rowDelta.addDeletes(createEqDeleteFile(appenderFactory, 
encryptedOutputFile, partitionData, deleteRecord));
+        }
+      }
+      rowDelta.commit();
+    }
+  }
+
   /** Extract whatever kind of iceberg metadata file, iff recognized by 
`doesResemble` */
   protected static Optional<File> extractSomeMetadataFilepath(String 
candidatePath, String basePath, Predicate<String> doesResemble) {
     try {
@@ -416,6 +512,30 @@ public class IcebergTableTest extends HiveMetastoreTest {
         .build();
   }
 
+  protected static DeleteFile createEqDeleteFile(GenericAppenderFactory 
appenderFactory,
+      EncryptedOutputFile encryptedOutputFile, StructLike partitionData, 
Record record) {
+    EqualityDeleteWriter<Record> eqDeleteWriter = 
appenderFactory.newEqDeleteWriter(encryptedOutputFile, FileFormat.ORC, 
partitionData);
+    try (EqualityDeleteWriter<Record> clsEqDeleteWriter = eqDeleteWriter) {
+      clsEqDeleteWriter.write(record);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return eqDeleteWriter.toDeleteFile();
+  }
+
+  protected static DeleteFile createPosDeleteFile(GenericAppenderFactory 
appenderFactory,
+      EncryptedOutputFile encryptedOutputFile, StructLike partitionData, 
Record record) {
+    PositionDelete<Record> posDelRecord = PositionDelete.create();
+    posDelRecord.set("dummyFilePath", 0, record);
+    PositionDeleteWriter<Record> posDeleteWriter = 
appenderFactory.newPosDeleteWriter(encryptedOutputFile, FileFormat.ORC, 
partitionData);
+    try (PositionDeleteWriter<Record> clsPosDeleteWriter = posDeleteWriter) {
+      clsPosDeleteWriter.write(posDelRecord);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return posDeleteWriter.toDeleteFile();
+  }
+
   /** general utility: order-independent/set equality between collections */
   protected static <T> void verifyAnyOrder(Collection<T> actual, Collection<T> 
expected, String message) {
     Assert.assertEquals(Sets.newHashSet(actual), Sets.newHashSet(expected), 
message);

Reply via email to