phet commented on code in PR #4058: URL: https://github.com/apache/gobblin/pull/4058#discussion_r1809524465
########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java: ########## @@ -333,4 +348,119 @@ protected static <T> void verifyAnyOrder(Collection<T> actual, Collection<T> exp protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> cc) { return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList()); } + + @Test + public void testGetPartitionSpecificDataFiles() throws IOException { + TableIdentifier testTableId = TableIdentifier.of(dbName, "testTableForPartitionSpecificDataFiles"); + Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec); + + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/id=1/file3.orc", + "/path/tableName/data/id=1/file5.orc", + "/path/tableName/data/id=1/file4.orc", + "/path/tableName/data/id=1/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, "1"); + List<PartitionData> partitionDataList = Collections.nCopies(paths.size(), partitionData); + + addPartitionDataFiles(testTable, paths, partitionDataList); + + IcebergTable icebergTable = new IcebergTable(testTableId, + catalog.newTableOps(testTableId), + catalogUri, + catalog.loadTable(testTableId)); + // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate class + Predicate<StructLike> alwaysTruePredicate = partition -> true; + Predicate<StructLike> alwaysFalsePredicate = partition -> false; + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(), 5); + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(), 0); + + catalog.dropTable(testTableId); + } + + @Test + public void testOverwritePartition() throws IOException { + TableIdentifier overwritePartitionTestTableId = TableIdentifier.of(dbName, "testTableForOverwritePartition"); + Table testTable = catalog.createTable(overwritePartitionTestTableId, icebergSchema, icebergPartitionSpec); + + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/id=1/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, "1"); + PartitionData partitionData2 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData2.set(0, "1"); + List<PartitionData> partitionDataList = Arrays.asList(partitionData, partitionData2); + + addPartitionDataFiles(testTable, paths, partitionDataList); + + IcebergTable icebergTable = new IcebergTable(overwritePartitionTestTableId, + catalog.newTableOps(overwritePartitionTestTableId), + catalogUri, + catalog.loadTable(overwritePartitionTestTableId)); + + verifyAnyOrder(paths, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + List<String> paths2 = Arrays.asList( + "/path/tableName/data/id=2/file3.orc", + "/path/tableName/data/id=2/file4.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData3 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData3.set(0, "2"); + PartitionData partitionData4 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData4.set(0, "2"); + List<PartitionData> partitionDataList2 = Arrays.asList(partitionData3, partitionData4); + + List<DataFile> dataFiles2 = getDataFiles(paths2, partitionDataList2); + // here, since partition data with value 2 doesn't exist yet, we expect it to get added to the table + icebergTable.overwritePartition(dataFiles2, "id", "2"); + List<String> expectedPaths2 = new ArrayList<>(paths); + expectedPaths2.addAll(paths2); + verifyAnyOrder(expectedPaths2, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + List<String> paths3 = Arrays.asList( + "/path/tableName/data/id=1/file5.orc", + "/path/tableName/data/id=1/file6.orc" + ); + // Reusing same partition dats to create data file with different paths + List<DataFile> dataFiles3 = getDataFiles(paths3, partitionDataList); + // here, since partition data with value 1 already exists, we expect it to get updated in the table with newer path + icebergTable.overwritePartition(dataFiles3, "id", "1"); + List<String> expectedPaths3 = new ArrayList<>(paths2); + expectedPaths3.addAll(paths3); + verifyAnyOrder(expectedPaths3, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + catalog.dropTable(overwritePartitionTestTableId); + } + + private static void addPartitionDataFiles(Table table, List<String> paths, List<PartitionData> partitionDataList) { + Assert.assertEquals(paths.size(), partitionDataList.size()); + getDataFiles(paths, partitionDataList).forEach(dataFile -> table.newAppend().appendFile(dataFile).commit()); + } Review Comment: debatable whether a need for this abstraction, as it's just a `.forEach` call. the confusing params might initially trick us to believe it's doing more than that. the impl should be: ``` void addPartitionDataFiles(Table table, List<DataFile> dataFiles) { dataFiles.forEach(df -> table.newAppend().appendFile(df).commit()); } ``` then call it as: ``` addPartitionDataFiles(table, createDataFiles(partitionDataByPath)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org