[ https://issues.apache.org/jira/browse/GOBBLIN-2159?focusedWorklogId=939320&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-939320 ]
ASF GitHub Bot logged work on GOBBLIN-2159: ------------------------------------------- Author: ASF GitHub Bot Created on: 21/Oct/24 21:55 Start Date: 21/Oct/24 21:55 Worklog Time Spent: 10m Work Description: phet commented on code in PR #4058: URL: https://github.com/apache/gobblin/pull/4058#discussion_r1809524465 ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java: ########## @@ -333,4 +348,119 @@ protected static <T> void verifyAnyOrder(Collection<T> actual, Collection<T> exp protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> cc) { return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList()); } + + @Test + public void testGetPartitionSpecificDataFiles() throws IOException { + TableIdentifier testTableId = TableIdentifier.of(dbName, "testTableForPartitionSpecificDataFiles"); + Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec); + + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/id=1/file3.orc", + "/path/tableName/data/id=1/file5.orc", + "/path/tableName/data/id=1/file4.orc", + "/path/tableName/data/id=1/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, "1"); + List<PartitionData> partitionDataList = Collections.nCopies(paths.size(), partitionData); + + addPartitionDataFiles(testTable, paths, partitionDataList); + + IcebergTable icebergTable = new IcebergTable(testTableId, + catalog.newTableOps(testTableId), + catalogUri, + catalog.loadTable(testTableId)); + // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate class + Predicate<StructLike> alwaysTruePredicate = partition -> true; + Predicate<StructLike> alwaysFalsePredicate = partition -> false; + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(), 5); + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(), 0); + + catalog.dropTable(testTableId); + } + + @Test + public void testOverwritePartition() throws IOException { + TableIdentifier overwritePartitionTestTableId = TableIdentifier.of(dbName, "testTableForOverwritePartition"); + Table testTable = catalog.createTable(overwritePartitionTestTableId, icebergSchema, icebergPartitionSpec); + + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/id=1/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, "1"); + PartitionData partitionData2 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData2.set(0, "1"); + List<PartitionData> partitionDataList = Arrays.asList(partitionData, partitionData2); + + addPartitionDataFiles(testTable, paths, partitionDataList); + + IcebergTable icebergTable = new IcebergTable(overwritePartitionTestTableId, + catalog.newTableOps(overwritePartitionTestTableId), + catalogUri, + catalog.loadTable(overwritePartitionTestTableId)); + + verifyAnyOrder(paths, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + List<String> paths2 = Arrays.asList( + "/path/tableName/data/id=2/file3.orc", + "/path/tableName/data/id=2/file4.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData3 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData3.set(0, "2"); + PartitionData partitionData4 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData4.set(0, "2"); + List<PartitionData> partitionDataList2 = Arrays.asList(partitionData3, partitionData4); + + List<DataFile> dataFiles2 = getDataFiles(paths2, partitionDataList2); + // here, since partition data with value 2 doesn't exist yet, we expect it to get added to the table + icebergTable.overwritePartition(dataFiles2, "id", "2"); + List<String> expectedPaths2 = new ArrayList<>(paths); + expectedPaths2.addAll(paths2); + verifyAnyOrder(expectedPaths2, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + List<String> paths3 = Arrays.asList( + "/path/tableName/data/id=1/file5.orc", + "/path/tableName/data/id=1/file6.orc" + ); + // Reusing same partition dats to create data file with different paths + List<DataFile> dataFiles3 = getDataFiles(paths3, partitionDataList); + // here, since partition data with value 1 already exists, we expect it to get updated in the table with newer path + icebergTable.overwritePartition(dataFiles3, "id", "1"); + List<String> expectedPaths3 = new ArrayList<>(paths2); + expectedPaths3.addAll(paths3); + verifyAnyOrder(expectedPaths3, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + catalog.dropTable(overwritePartitionTestTableId); + } + + private static void addPartitionDataFiles(Table table, List<String> paths, List<PartitionData> partitionDataList) { + Assert.assertEquals(paths.size(), partitionDataList.size()); + getDataFiles(paths, partitionDataList).forEach(dataFile -> table.newAppend().appendFile(dataFile).commit()); + } Review Comment: debatable whether a need for this abstraction, as it's just a `.forEach` call. the confusing params might initially trick us to believe it's doing more than that. the impl should be: ``` void addPartitionDataFiles(Table table, List<DataFile> dataFiles) { dataFiles.forEach(df -> table.newAppend().appendFile(df).commit()); } ``` then call it as: ``` addPartitionDataFiles(table, createDataFiles(partitionDataByPath)) ``` Issue Time Tracking ------------------- Worklog Id: (was: 939320) Time Spent: 7h 10m (was: 7h) > Support Partition Based Copy in Iceberg Distcp > ---------------------------------------------- > > Key: GOBBLIN-2159 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2159 > Project: Apache Gobblin > Issue Type: Task > Reporter: Vivek Rai > Priority: Major > Time Spent: 7h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.10#820010)