This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4cfa2f733c [GOBBLIN-2203] Add Support for Delete Manifests in Iceberg
Full Table Replication (#4112)
4cfa2f733c is described below
commit 4cfa2f733c913a71338286e477af027dec9e7889
Author: Vivek Rai <[email protected]>
AuthorDate: Wed Apr 23 15:54:56 2025 +0530
[GOBBLIN-2203] Add Support for Delete Manifests in Iceberg Full Table
Replication (#4112)
* added support for delete manifests in iceberg full table replication
* remove unused imports
---
.../data/management/copy/iceberg/IcebergTable.java | 12 ++
.../management/copy/iceberg/IcebergTableTest.java | 206 ++++++++++++++++-----
2 files changed, 175 insertions(+), 43 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index e3ec46aa1e..fab570120b 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -30,6 +30,8 @@ import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
@@ -200,6 +202,9 @@ public class IcebergTable {
}
protected static IcebergSnapshotInfo.ManifestFileInfo
calcManifestFileInfo(ManifestFile manifest, FileIO io) throws IOException {
+ if (manifest.content() == ManifestContent.DELETES) {
+ return new ManifestFileInfo(manifest.path(),
discoverDeleteFilePaths(manifest, io));
+ }
return new ManifestFileInfo(manifest.path(),
discoverDataFilePaths(manifest, io));
}
@@ -209,6 +214,13 @@ public class IcebergTable {
}
}
+ protected static List<String> discoverDeleteFilePaths(ManifestFile manifest,
FileIO io) throws IOException {
+ try (ManifestReader<DeleteFile> deleteFileManifestReader =
ManifestFiles.readDeleteManifest(manifest, io, null);
+ CloseableIterator<DeleteFile> deleteFiles =
deleteFileManifestReader.iterator()) {
+ return Lists.newArrayList(Iterators.transform(deleteFiles, (deleteFile)
-> deleteFile.path().toString()));
+ }
+ }
+
public DatasetDescriptor getDatasetDescriptor(FileSystem fs) {
DatasetDescriptor descriptor = new DatasetDescriptor(
datasetDescriptorPlatform,
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index 63aa27221b..90ba02b848 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -24,20 +24,24 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
@@ -45,14 +49,25 @@ import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.hive.HiveMetastoreTest;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -73,6 +88,18 @@ public class IcebergTableTest extends HiveMetastoreTest {
protected static final PartitionSpec icebergPartitionSpec =
PartitionSpec.builderFor(icebergSchema)
.identity("id")
.build();
+ protected static final List<List<String>> perSnapshotDataFilesets =
Lists.newArrayList(
+ Lists.newArrayList("path/to/data-a0.orc"),
+ Lists.newArrayList("path/to/data-b0.orc", "path/to/data-b1.orc"),
+ Lists.newArrayList("path/to/data-c0.orc", "path/to/data-c1.orc",
"path/to/data-c2.orc"),
+ Lists.newArrayList("path/to/data-d0.orc")
+ );
+ protected static final List<List<String>> perSnapshotDeleteFilesets =
Lists.newArrayList(
+ Lists.newArrayList("path/to/delete-a0.orc"),
+ Lists.newArrayList("path/to/delete-b0.orc", "path/to/delete-b1.orc"),
+ Lists.newArrayList("path/to/delete-c0.orc", "path/to/delete-c1.orc",
"path/to/delete-c2.orc"),
+ Lists.newArrayList("path/to/delete-d0.orc")
+ );
private final String dbName = "myicebergdb";
private final String tableName = "justtesting";
@@ -91,7 +118,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
@BeforeMethod
public void setUpEachTest() {
tableId = TableIdentifier.of(dbName, tableName);
- table = catalog.createTable(tableId, icebergSchema, icebergPartitionSpec);
+ table = catalog.createTable(tableId, icebergSchema, icebergPartitionSpec,
Collections.singletonMap("format-version", "2"));
catalogUri = catalog.getConf().get(CatalogProperties.URI);
metadataBasePath = calcMetadataBasePath(tableId);
}
@@ -101,17 +128,44 @@ public class IcebergTableTest extends HiveMetastoreTest {
catalog.dropTable(tableId);
}
- /** Verify info about the current snapshot only */
+ /** Test to verify getCurrentSnapshotInfo, getAllSnapshotInfosIterator,
getIncrementalSnapshotInfosIterator for iceberg table containing only data
files.*/
@Test
- public void testGetCurrentSnapshotInfo() throws IOException {
- List<List<String>> perSnapshotFilesets = Lists.newArrayList(
- Lists.newArrayList("/path/to/data-a0.orc"),
- Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc"),
- Lists.newArrayList("/path/to/data-c0.orc", "/path/to/data-c1.orc",
"/path/to/data-c2.orc"),
- Lists.newArrayList("/path/to/data-d0.orc")
- );
+ public void testGetSnapshotInfosForDataFilesOnlyTable() throws IOException {
+ initializeSnapshots(table, perSnapshotDataFilesets);
- initializeSnapshots(table, perSnapshotFilesets);
+ IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId,
catalog.newTableOps(tableId), catalogUri,
+ catalog.loadTable(tableId)).getCurrentSnapshotInfo();
+ verifySnapshotInfo(snapshotInfo, perSnapshotDataFilesets,
perSnapshotDataFilesets.size());
+
+ List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new
IcebergTable(tableId, catalog.newTableOps(tableId),
+ catalogUri, catalog.loadTable(tableId)).getAllSnapshotInfosIterator());
+ Assert.assertEquals(snapshotInfos.size(), perSnapshotDataFilesets.size(),
"num snapshots");
+ for (int i = 0; i < perSnapshotDataFilesets.size(); ++i) {
+ System.err.println("verifying snapshotInfo[" + i + "]");
+ verifySnapshotInfo(snapshotInfos.get(i),
perSnapshotDataFilesets.subList(0, i + 1), snapshotInfos.size());
+ }
+
+ List<IcebergSnapshotInfo> incrementalSnapshotInfos =
Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId),
+ catalogUri,
catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator());
+ Assert.assertEquals(incrementalSnapshotInfos.size(),
perSnapshotDataFilesets.size(), "num snapshots");
+ for (int i = 0; i < incrementalSnapshotInfos.size(); ++i) {
+ System.err.println("verifying snapshotInfo[" + i + "]");
+ verifySnapshotInfo(incrementalSnapshotInfos.get(i),
perSnapshotDataFilesets.subList(i, i + 1), incrementalSnapshotInfos.size());
+ }
+ }
+
+ @DataProvider(name = "isPosDeleteProvider")
+ public Object[][] isPosDeleteProvider() {
+ return new Object[][] {{true}, {false}};
+ }
+
+ /** Verify info about the current snapshot only */
+ @Test(dataProvider = "isPosDeleteProvider")
+ public void testGetCurrentSnapshotInfo(boolean isPosDelete) throws
IOException {
+ initializeSnapshots(table, perSnapshotDataFilesets);
+ initializeSnapshotsWithDeleteFiles(table,
catalog.newTableOps(tableId).io(), perSnapshotDeleteFilesets, isPosDelete);
+ List<List<String>> perSnapshotFilesets =
Stream.concat(perSnapshotDeleteFilesets.stream(),
perSnapshotDataFilesets.stream())
+ .collect(Collectors.toList());
IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId,
catalog.newTableOps(tableId), catalogUri,
catalog.loadTable(tableId)).getCurrentSnapshotInfo();
verifySnapshotInfo(snapshotInfo, perSnapshotFilesets,
perSnapshotFilesets.size());
@@ -127,37 +181,37 @@ public class IcebergTableTest extends HiveMetastoreTest {
}
/** Verify info about all (full) snapshots */
- @Test
- public void testGetAllSnapshotInfosIterator() throws IOException {
- List<List<String>> perSnapshotFilesets = Lists.newArrayList(
- Lists.newArrayList("/path/to/data-a0.orc"),
- Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc"),
- Lists.newArrayList("/path/to/data-c0.orc", "/path/to/data-c1.orc",
"/path/to/data-c2.orc"),
- Lists.newArrayList("/path/to/data-d0.orc")
- );
+ @Test(dataProvider = "isPosDeleteProvider")
+ public void testGetAllSnapshotInfosIterator(boolean isPosDelete) throws
IOException {
+ int numDataSnapshots = perSnapshotDataFilesets.size();
+ int numDeleteSnapshots = perSnapshotDeleteFilesets.size();
- initializeSnapshots(table, perSnapshotFilesets);
+ initializeSnapshots(table, perSnapshotDataFilesets);
+ initializeSnapshotsWithDeleteFiles(table,
catalog.newTableOps(tableId).io(), perSnapshotDeleteFilesets, isPosDelete);
List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new
IcebergTable(tableId, catalog.newTableOps(tableId),
catalogUri, catalog.loadTable(tableId)).getAllSnapshotInfosIterator());
- Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num
snapshots");
+ Assert.assertEquals(snapshotInfos.size(), numDataSnapshots +
numDeleteSnapshots, "num snapshots");
- for (int i = 0; i < snapshotInfos.size(); ++i) {
+ for (int i = 0; i < numDataSnapshots; ++i) {
System.err.println("verifying snapshotInfo[" + i + "]");
- verifySnapshotInfo(snapshotInfos.get(i), perSnapshotFilesets.subList(0,
i + 1), snapshotInfos.size());
+ verifySnapshotInfo(snapshotInfos.get(i),
perSnapshotDataFilesets.subList(0, i + 1), snapshotInfos.size());
+ }
+
+ for (int i = 0 ; i < numDeleteSnapshots ; i++) {
+ System.err.println("verifying snapshotInfo[" + i + "]");
+ List<List<String>> curSnapshotFileSets =
Stream.concat(perSnapshotDeleteFilesets.subList(0, i + 1).stream(),
perSnapshotDataFilesets.stream())
+ .collect(Collectors.toList());
+ verifySnapshotInfo(snapshotInfos.get(i + numDataSnapshots),
curSnapshotFileSets, snapshotInfos.size());
}
}
/** Verify info about all snapshots (incremental deltas) */
- @Test
- public void testGetIncrementalSnapshotInfosIterator() throws IOException {
- List<List<String>> perSnapshotFilesets = Lists.newArrayList(
- Lists.newArrayList("/path/to/data-a0.orc"),
- Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc"),
- Lists.newArrayList("/path/to/data-c0.orc", "/path/to/data-c1.orc",
"/path/to/data-c2.orc"),
- Lists.newArrayList("/path/to/data-d0.orc")
- );
-
- initializeSnapshots(table, perSnapshotFilesets);
+ @Test(dataProvider = "isPosDeleteProvider")
+ public void testGetIncrementalSnapshotInfosIterator(boolean isPosDelete)
throws IOException {
+ initializeSnapshots(table, perSnapshotDataFilesets);
+ initializeSnapshotsWithDeleteFiles(table,
catalog.newTableOps(tableId).io(), perSnapshotDeleteFilesets, isPosDelete);
+ List<List<String>> perSnapshotFilesets =
Stream.concat(perSnapshotDataFilesets.stream(),
perSnapshotDeleteFilesets.stream())
+ .collect(Collectors.toList());
List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new
IcebergTable(tableId, catalog.newTableOps(tableId),
catalogUri,
catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator());
Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num
snapshots");
@@ -169,26 +223,38 @@ public class IcebergTableTest extends HiveMetastoreTest {
}
/** Verify info about all snapshots (incremental deltas) correctly
eliminates repeated data files */
- @Test
- public void testGetIncrementalSnapshotInfosIteratorRepeatedFiles() throws
IOException {
- List<List<String>> perSnapshotFilesets = Lists.newArrayList(
- Lists.newArrayList("/path/to/data-a0.orc"),
- Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc",
"/path/to/data-a0.orc"),
- Lists.newArrayList("/path/to/data-a0.orc","/path/to/data-c0.orc",
"/path/to/data-b1.orc", "/path/to/data-c1.orc", "/path/to/data-c2.orc"),
- Lists.newArrayList("/path/to/data-d0.orc")
+ @Test(dataProvider = "isPosDeleteProvider")
+ public void testGetIncrementalSnapshotInfosIteratorRepeatedFiles(boolean
isPosDelete) throws IOException {
+ List<List<String>> perSnapshotFilesets1 = Lists.newArrayList(
+ Lists.newArrayList("path/to/data-a0.orc"),
+ Lists.newArrayList("path/to/data-b0.orc", "path/to/data-b1.orc",
"path/to/data-a0.orc"),
+ Lists.newArrayList("path/to/data-a0.orc","path/to/data-c0.orc",
"path/to/data-b1.orc", "path/to/data-c1.orc", "path/to/data-c2.orc"),
+ Lists.newArrayList("path/to/data-d0.orc")
);
- initializeSnapshots(table, perSnapshotFilesets);
+ // Note : Keeping the name as data- only to test the functionality without
changing below validation code
+ List<List<String>> perSnapshotFilesets2 = Lists.newArrayList(
+ Lists.newArrayList("path/to/data-e0.orc"),
+ Lists.newArrayList("path/to/data-f0.orc", "path/to/data-f1.orc",
"path/to/data-e0.orc"),
+ Lists.newArrayList("path/to/data-e0.orc","path/to/data-g0.orc",
"path/to/data-f1.orc", "path/to/data-g1.orc", "path/to/data-g2.orc"),
+ Lists.newArrayList("path/to/data-h0.orc")
+ );
+
+ List<List<String>> perSnapshotFileSets = new
ArrayList<>(perSnapshotFilesets1);
+ perSnapshotFileSets.addAll(perSnapshotFilesets2);
+
+ initializeSnapshots(table, perSnapshotFilesets1);
+ initializeSnapshotsWithDeleteFiles(table,
catalog.newTableOps(tableId).io(), perSnapshotFilesets2, isPosDelete);
List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new
IcebergTable(tableId, catalog.newTableOps(tableId),
catalogUri,
catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator());
- Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num
snapshots");
+ Assert.assertEquals(snapshotInfos.size(), perSnapshotFileSets.size(), "num
snapshots");
for (int i = 0; i < snapshotInfos.size(); ++i) {
System.err.println("verifying snapshotInfo[" + i + "] - " +
snapshotInfos.get(i));
char initialChar = (char) ((int) 'a' + i);
// adjust expectations to eliminate duplicate entries (i.e. those
bearing letter not aligned with ordinal fileset)
- List<String> fileset = perSnapshotFilesets.get(i).stream().filter(name
-> {
- String uniquePortion = name.substring("/path/to/data-".length());
+ List<String> fileset = perSnapshotFileSets.get(i).stream().filter(name
-> {
+ String uniquePortion = name.substring("path/to/data-".length());
return uniquePortion.startsWith(Character.toString(initialChar));
}).collect(Collectors.toList());
verifySnapshotInfo(snapshotInfos.get(i), Arrays.asList(fileset),
snapshotInfos.size());
@@ -353,6 +419,36 @@ public class IcebergTableTest extends HiveMetastoreTest {
}
}
+ protected static void initializeSnapshotsWithDeleteFiles(Table table, FileIO
fileIO,
+ List<List<String>> perSnapshotFilesets, boolean isPosDelete) {
+ Schema deleteSchema = icebergSchema.select("id");
+ int[] equalityFieldIds = {0};
+ GenericAppenderFactory appenderFactory =
+ new GenericAppenderFactory(
+ icebergSchema,
+ icebergPartitionSpec,
+ equalityFieldIds,
+ deleteSchema,
+ deleteSchema);
+ EncryptionManager encryptionManager = table.encryption();
+ Record deleteRecord =
GenericRecord.create(deleteSchema).copy(ImmutableMap.of("id", "testVal"));
+ PartitionData partitionData = new
PartitionData(icebergPartitionSpec.partitionType());
+ partitionData.set(0, "testVal");
+
+ for (List<String> snapshotFileset : perSnapshotFilesets) {
+ RowDelta rowDelta = table.newRowDelta();
+ for (String filePath : snapshotFileset) {
+ EncryptedOutputFile encryptedOutputFile =
encryptionManager.encrypt(fileIO.newOutputFile(filePath));
+ if (isPosDelete) {
+ rowDelta.addDeletes(createPosDeleteFile(appenderFactory,
encryptedOutputFile, partitionData, deleteRecord));
+ } else {
+ rowDelta.addDeletes(createEqDeleteFile(appenderFactory,
encryptedOutputFile, partitionData, deleteRecord));
+ }
+ }
+ rowDelta.commit();
+ }
+ }
+
/** Extract whatever kind of iceberg metadata file, iff recognized by
`doesResemble` */
protected static Optional<File> extractSomeMetadataFilepath(String
candidatePath, String basePath, Predicate<String> doesResemble) {
try {
@@ -416,6 +512,30 @@ public class IcebergTableTest extends HiveMetastoreTest {
.build();
}
+ protected static DeleteFile createEqDeleteFile(GenericAppenderFactory
appenderFactory,
+ EncryptedOutputFile encryptedOutputFile, StructLike partitionData,
Record record) {
+ EqualityDeleteWriter<Record> eqDeleteWriter =
appenderFactory.newEqDeleteWriter(encryptedOutputFile, FileFormat.ORC,
partitionData);
+ try (EqualityDeleteWriter<Record> clsEqDeleteWriter = eqDeleteWriter) {
+ clsEqDeleteWriter.write(record);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return eqDeleteWriter.toDeleteFile();
+ }
+
+ protected static DeleteFile createPosDeleteFile(GenericAppenderFactory
appenderFactory,
+ EncryptedOutputFile encryptedOutputFile, StructLike partitionData,
Record record) {
+ PositionDelete<Record> posDelRecord = PositionDelete.create();
+ posDelRecord.set("dummyFilePath", 0, record);
+ PositionDeleteWriter<Record> posDeleteWriter =
appenderFactory.newPosDeleteWriter(encryptedOutputFile, FileFormat.ORC,
partitionData);
+ try (PositionDeleteWriter<Record> clsPosDeleteWriter = posDeleteWriter) {
+ clsPosDeleteWriter.write(posDelRecord);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return posDeleteWriter.toDeleteFile();
+ }
+
/** general utility: order-independent/set equality between collections */
protected static <T> void verifyAnyOrder(Collection<T> actual, Collection<T>
expected, String message) {
Assert.assertEquals(Sets.newHashSet(actual), Sets.newHashSet(expected),
message);