phet commented on code in PR #4058: URL: https://github.com/apache/gobblin/pull/4058#discussion_r1811978448
########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java: ########## @@ -190,8 +165,8 @@ public void testMultipleCopyEntitiesGenerated() throws IOException { srcFilePaths.add(SRC_WRITE_LOCATION + "/file4.orc"); srcFilePaths.add(SRC_WRITE_LOCATION + "/file5.orc"); - List<DataFile> srcDataFiles = getDataFiles(); - Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(srcDataFiles); + List<DataFile> mockSrcDataFiles = createDataFileMocks(); + Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(mockSrcDataFiles); Review Comment: this mocking setup repeats several times. could it live in a `@BeforeMethod`? ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java: ########## @@ -226,6 +226,90 @@ public void testNewTablePropertiesAreRegistered() throws Exception { catalog.dropTable(destTableId); } + /** Verify that getPartitionSpecificDataFiles return datafiles belonging to the partition defined by predicate */ + @Test + public void testGetPartitionSpecificDataFiles() throws IOException { + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/file3.orc", + "/path/tableName/data/id=2/file5.orc", + "/path/tableName/data/file4.orc", + "/path/tableName/data/id=3/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, "1"); + Map<String, PartitionData> pathsWithPartitionData = Maps.newHashMap(); + paths.forEach(path -> pathsWithPartitionData.put(path, partitionData)); + + addPartitionDataFiles(table, createDataFiles(pathsWithPartitionData)); + + IcebergTable icebergTable = new IcebergTable(tableId, + catalog.newTableOps(tableId), + catalogUri, + catalog.loadTable(tableId)); + // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate class + Predicate<StructLike> alwaysTruePredicate = partition -> true; + Predicate<StructLike> alwaysFalsePredicate = partition -> false; + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(), 5); + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(), 0); + } + + /** Verify that overwritePartition replace data files belonging to given partition col and value */ + @Test + public void testOverwritePartition() throws IOException { + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); Review Comment: nit: `partition1Data` (esp. since you use it again below w/ `paths3`) ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java: ########## @@ -183,16 +187,20 @@ private Path addUUIDToPath(String filePathStr) { return new Path(fileDir, newFileName); } - private Map<Path, FileStatus> getDestFilePathWithSrcFileStatus(List<DataFile> srcDataFiles, - List<DataFile> destDataFiles, FileSystem fs) throws IOException { - Map<Path, FileStatus> results = Maps.newHashMap(); - for (int i = 0; i < srcDataFiles.size(); i++) { - Path srcPath = new Path(srcDataFiles.get(i).path().toString()); - Path destPath = new Path(destDataFiles.get(i).path().toString()); - FileStatus srcFileStatus = fs.getFileStatus(srcPath); - results.put(destPath, srcFileStatus); - } - return results; + private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath(Map<Path, DataFile> destDataFileBySrcPath) { + Map<Path, FileStatus> srcFileStatusByDestFilePath = Maps.newHashMap(); + destDataFileBySrcPath.forEach((srcPath, destDataFile) -> { + FileStatus srcFileStatus; + try { + srcFileStatus = this.sourceFs.getFileStatus(srcPath); + } catch (IOException e) { + String errMsg = String.format("~%s~ Failed to get file status for path : %s", this.getFileSetId(), srcPath); + log.error(errMsg); + throw new RuntimeException(errMsg, e); + } Review Comment: I **really** wish `java.util.function.*` played along better w/ checked exceptions... but that's clearly not the case... \*sigh\* throwing `IOException` is actually a key part of [the `FileSet` "contract"](https://github.com/apache/gobblin/blob/5b495aff5d5bbe840add9f2db76ca944dffc69aa/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/partition/FileSet.java#L142), so substituting an unchecked `RuntimeException` (that no caller expects and would NOT be looking out for) is not something we ought to do at this late stage. instead, either write this iteratively (using `for`-each loop) or follow [`IcebergDataset`'s use](https://github.com/apache/gobblin/blob/af4b822a1e2f79721cdff617f1581e77ff261580/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java#L205) of [`CheckedExceptionFunction.wrapToTunneled`](https://github.com/apache/gobblin/blob/585298fb5ebc074f69c1b9db87de6186c4855b26/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java#L69) ``` try { ... } catch (CheckedExceptionFunction.WrappedIOException wrapper) { wrapper.rethrowWrapped(); } ``` the code there actually uses: ``` copyConfig.getCopyContext().getFileStatus(targetFs, new Path(pathStr)).isPresent() ``` for caching, which shouldn't be necessary here, given `IcebergTable::getPartitionSpecificDataFiles` examines only a single snapshot. ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java: ########## @@ -75,9 +73,10 @@ public class IcebergPartitionDatasetTest { private static final String DEST_WRITE_LOCATION = DEST_TEST_DB + "/" + DEST_TEST_TABLE + "/data"; private static final String TEST_ICEBERG_PARTITION_COLUMN_NAME = "testPartition"; private static final String TEST_ICEBERG_PARTITION_COLUMN_VALUE = "testValue"; + private static final String OVERWRITE_COMMIT_STEP = "org.apache.gobblin.data.management.copy.iceberg.IcebergOverwritePartitionsStep"; Review Comment: nit: `IcebergOverwritePartitionsStep.class.getName()` ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java: ########## @@ -226,6 +226,90 @@ public void testNewTablePropertiesAreRegistered() throws Exception { catalog.dropTable(destTableId); } + /** Verify that getPartitionSpecificDataFiles return datafiles belonging to the partition defined by predicate */ + @Test + public void testGetPartitionSpecificDataFiles() throws IOException { + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/file3.orc", + "/path/tableName/data/id=2/file5.orc", + "/path/tableName/data/file4.orc", + "/path/tableName/data/id=3/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, "1"); + Map<String, PartitionData> pathsWithPartitionData = Maps.newHashMap(); + paths.forEach(path -> pathsWithPartitionData.put(path, partitionData)); + + addPartitionDataFiles(table, createDataFiles(pathsWithPartitionData)); + + IcebergTable icebergTable = new IcebergTable(tableId, + catalog.newTableOps(tableId), + catalogUri, + catalog.loadTable(tableId)); + // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate class + Predicate<StructLike> alwaysTruePredicate = partition -> true; + Predicate<StructLike> alwaysFalsePredicate = partition -> false; + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(), 5); + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(), 0); + } + + /** Verify that overwritePartition replace data files belonging to given partition col and value */ + @Test + public void testOverwritePartition() throws IOException { + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, "1"); + Map<String, PartitionData> pathsWithPartitionData = Maps.newHashMap(); + paths.forEach(path -> pathsWithPartitionData.put(path, partitionData)); + + addPartitionDataFiles(table, createDataFiles(pathsWithPartitionData)); + + IcebergTable icebergTable = new IcebergTable(tableId, + catalog.newTableOps(tableId), + catalogUri, + catalog.loadTable(tableId)); + + verifyAnyOrder(paths, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + List<String> paths2 = Arrays.asList( + "/path/tableName/data/file3.orc", + "/path/tableName/data/id=2/file4.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData2 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData2.set(0, "2"); + Map<String, PartitionData> paths2WithPartitionData2 = Maps.newHashMap(); + paths2.forEach(path -> paths2WithPartitionData2.put(path, partitionData2)); + + List<DataFile> partition2DataFiles = createDataFiles(paths2WithPartitionData2); + // here, since partition data with value 2 doesn't exist yet, + // we expect it to get added to the table, w/o changing or deleting any other partitions + icebergTable.overwritePartition(partition2DataFiles, "id", "2"); + List<String> expectedPaths2 = new ArrayList<>(paths); + expectedPaths2.addAll(paths2); + verifyAnyOrder(expectedPaths2, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + List<String> paths3 = Arrays.asList( + "/path/tableName/data/id=2/file5.orc", Review Comment: suggest a note that this naming convention is irrelevant to partition membership, as evidenced by `"/path/tableName/data/id=2/file4.orc"` also in `paths2` ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java: ########## @@ -119,11 +117,13 @@ public void testGetCurrentSnapshotInfo() throws IOException { } /** Verify failure when attempting to get current snapshot info for non-existent table */ - @Test(expectedExceptions = {IcebergTable.TableNotFoundException.class, NoSuchTableException.class}) + @Test(expectedExceptions = {IcebergTable.TableNotFoundException.class}) public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException { TableIdentifier bogusTableId = TableIdentifier.of(dbName, tableName + "_BOGUS"); + // Passing null for Table as catalog.loadTable(bogusTableId) will throw NoSuchTableException so Review Comment: typo on the exception class name ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java: ########## @@ -225,39 +199,57 @@ public void testWithDifferentSrcAndDestTableWriteLocation() throws IOException { List<CopyEntity> copyEntities = (List<CopyEntity>) icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration); - Assert.assertEquals(copyEntities.size(), 2); - verifyCopyEntities(copyEntities, false); + verifyCopyEntities(copyEntities, 2, false); } - private List<DataFile> getDataFiles() throws IOException { + private static void setupSrcFileSystem() throws IOException { + sourceFs = Mockito.mock(FileSystem.class); + Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI); + Mockito.when(sourceFs.makeQualified(any(Path.class))) + .thenAnswer(invocation -> invocation.getArgument(0, Path.class).makeQualified(SRC_FS_URI, new Path("/"))); + Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenAnswer(invocation -> { + Path path = invocation.getArgument(0, Path.class); + Path qualifiedPath = sourceFs.makeQualified(path); + return IcebergDatasetTest.MockFileSystemBuilder.createEmptyFileStatus(qualifiedPath.toString()); + }); + } + + private static void setupDestFileSystem() throws IOException { + targetFs = Mockito.mock(FileSystem.class); + Mockito.when(targetFs.getUri()).thenReturn(DEST_FS_URI); + Mockito.when(targetFs.makeQualified(any(Path.class))) + .thenAnswer(invocation -> invocation.getArgument(0, Path.class).makeQualified(DEST_FS_URI, new Path("/"))); + // Since we are adding UUID to the file name for every file while creating destination path, + // so return file not found exception if trying to find file status on destination file system + Mockito.when(targetFs.getFileStatus(any(Path.class))).thenThrow(new FileNotFoundException()); + } + + private static List<DataFile> createDataFileMocks() throws IOException { List<DataFile> dataFiles = new ArrayList<>(); for (String srcFilePath : srcFilePaths) { DataFile dataFile = Mockito.mock(DataFile.class); Path dataFilePath = new Path(srcFilePath); Path qualifiedPath = sourceFs.makeQualified(dataFilePath); Mockito.when(dataFile.path()).thenReturn(dataFilePath.toString()); - Mockito.when(sourceFs.getFileStatus(Mockito.eq(dataFilePath))).thenReturn(getFileStatus(qualifiedPath)); + Mockito.when(sourceFs.getFileStatus(Mockito.eq(dataFilePath))).thenReturn( + IcebergDatasetTest.MockFileSystemBuilder.createEmptyFileStatus(qualifiedPath.toString())); dataFiles.add(dataFile); } return dataFiles; } - private static FileStatus getFileStatus(Path path) { - FileStatus fileStatus = new FileStatus(); - fileStatus.setPath(path); - return fileStatus; - } - - private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, boolean sameSrcAndDestWriteLocation) { + private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, int expectedCopyEntitiesSize, Review Comment: taking in the expected size is a definite improvement, but let's go further to also take in the `expectedSrcFilePaths` - please [see my reply](https://github.com/apache/gobblin/pull/4058#discussion_r1811938323) ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java: ########## @@ -93,6 +93,7 @@ public class IcebergDatasetTest { private static final String MANIFEST_PATH_0 = ROOT_PATH + "metadata/manifest.a"; private static final String MANIFEST_DATA_PATH_0A = ROOT_PATH + "data/p0/a"; private static final String MANIFEST_DATA_PATH_0B = ROOT_PATH + "data/p0/b"; + private static final String REGISTER_COMMIT_STEP = "org.apache.gobblin.data.management.copy.iceberg.IcebergRegisterStep"; Review Comment: `IcebergRegisterStep.class.getName()` will engage the type checker, whereas a string literal would not ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java: ########## @@ -226,6 +226,90 @@ public void testNewTablePropertiesAreRegistered() throws Exception { catalog.dropTable(destTableId); } + /** Verify that getPartitionSpecificDataFiles return datafiles belonging to the partition defined by predicate */ + @Test + public void testGetPartitionSpecificDataFiles() throws IOException { + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/file3.orc", + "/path/tableName/data/id=2/file5.orc", + "/path/tableName/data/file4.orc", + "/path/tableName/data/id=3/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, "1"); + Map<String, PartitionData> pathsWithPartitionData = Maps.newHashMap(); + paths.forEach(path -> pathsWithPartitionData.put(path, partitionData)); + + addPartitionDataFiles(table, createDataFiles(pathsWithPartitionData)); + + IcebergTable icebergTable = new IcebergTable(tableId, + catalog.newTableOps(tableId), + catalogUri, + catalog.loadTable(tableId)); + // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate class + Predicate<StructLike> alwaysTruePredicate = partition -> true; + Predicate<StructLike> alwaysFalsePredicate = partition -> false; + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(), 5); + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(), 0); + } + + /** Verify that overwritePartition replace data files belonging to given partition col and value */ + @Test + public void testOverwritePartition() throws IOException { + List<String> paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, "1"); + Map<String, PartitionData> pathsWithPartitionData = Maps.newHashMap(); + paths.forEach(path -> pathsWithPartitionData.put(path, partitionData)); + + addPartitionDataFiles(table, createDataFiles(pathsWithPartitionData)); + + IcebergTable icebergTable = new IcebergTable(tableId, + catalog.newTableOps(tableId), + catalogUri, + catalog.loadTable(tableId)); + + verifyAnyOrder(paths, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + List<String> paths2 = Arrays.asList( + "/path/tableName/data/file3.orc", + "/path/tableName/data/id=2/file4.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData2 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData2.set(0, "2"); + Map<String, PartitionData> paths2WithPartitionData2 = Maps.newHashMap(); + paths2.forEach(path -> paths2WithPartitionData2.put(path, partitionData2)); + + List<DataFile> partition2DataFiles = createDataFiles(paths2WithPartitionData2); + // here, since partition data with value 2 doesn't exist yet, + // we expect it to get added to the table, w/o changing or deleting any other partitions + icebergTable.overwritePartition(partition2DataFiles, "id", "2"); + List<String> expectedPaths2 = new ArrayList<>(paths); + expectedPaths2.addAll(paths2); + verifyAnyOrder(expectedPaths2, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + List<String> paths3 = Arrays.asList( + "/path/tableName/data/id=2/file5.orc", + "/path/tableName/data/file6.orc" + ); + // Reusing same partition data to create data file with different paths + Map<String, PartitionData> paths3WithPartitionData = Maps.newHashMap(); + paths3.forEach(path -> paths3WithPartitionData.put(path, partitionData)); + List<DataFile> partition1NewDataFiles = createDataFiles(paths3WithPartitionData); Review Comment: NBD, but for a one-liner: ``` List<DataFile> partition1NewDataFiles = createDataFiles(paths3.stream().collect(Collectors.toMap(x -> x, partition1Data))); ``` (alternative to `x -> x` is `Function.identity()` - your choice) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org