[ https://issues.apache.org/jira/browse/GOBBLIN-1720?focusedWorklogId=821639&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-821639 ]
ASF GitHub Bot logged work on GOBBLIN-1720: ------------------------------------------- Author: ASF GitHub Bot Created on: 29/Oct/22 00:09 Start Date: 29/Oct/22 00:09 Worklog Time Spent: 10m Work Description: phet commented on code in PR #3577: URL: https://github.com/apache/gobblin/pull/3577#discussion_r1008561380 ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java: ########## @@ -144,13 +146,17 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati FileStatus srcFileStatus = entry.getValue(); // TODO: should be the same FS each time; try creating once, reusing thereafter, to not recreate wastefully FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration); + Path toPath = PathUtils.getRootPathChild(srcPath); Review Comment: `toPath` sounds like a copy destination. how about `greatestAncestorPath` or sth like that? ########## gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java: ########## @@ -80,6 +80,18 @@ public static Path getRootPath(Path path) { return getRootPath(path.getParent()); } + /** + * Returns the root path child for the specified path. + * + * @see Path + */ + public static Path getRootPathChild(Path path) { + if (path.getParent() != null && path.getParent().isRoot()) { Review Comment: in what case is `.getParent() == null`... is it for `/`? anyway, looks like that would recurse to NPE maybe return `null` instead or '/' (itself)? ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java: ########## @@ -460,8 +559,52 @@ public static class IndexingStreams { /** @return {@link Stream} equivalent of `inputs.zipWithIndex.map(f)` in scala */ public static <T, R> Stream<R> transformWithIndex(Stream<T> inputs, BiFunction<T, Integer, R> f) { // given sketchy import, sequester for now within enclosing test class, rather than adding to `gobblin-utility` - return org.apache.iceberg.relocated.com.google.common.collect.Streams.zip( - inputs, IntStream.iterate(0, i -> i + 1).boxed(), f); + return org.apache.iceberg.relocated.com.google.common.collect.Streams.zip(inputs, + IntStream.iterate(0, i -> i + 1).boxed(), f); + } + } + + private static class CopyEntityDeserializer { + + public String getFilePathAsStringFromJson(String json) { + String filepath = + new Gson().fromJson(json, JsonObject.class).getAsJsonObject("object-data").getAsJsonObject("origin") + .getAsJsonObject("object-data").getAsJsonObject("path").getAsJsonObject("object-data") + .getAsJsonObject("uri").getAsJsonPrimitive("object-data").getAsString(); + return filepath; + } + + public JsonArray getAncestorOwnerAndPermissions(String json) { + JsonArray ancestorsOwnerAndPermissions = + new Gson().fromJson(json, JsonObject.class).getAsJsonObject("object-data") + .getAsJsonArray("ancestorsOwnerAndPermission"); + return ancestorsOwnerAndPermissions; + } + + public JsonObject getDestinationOwnerAndPermissions(String json) { + JsonObject destinationOwnerAndPermissions = + new Gson().fromJson(json, JsonObject.class).getAsJsonObject("object-data") + .getAsJsonObject("destinationOwnerAndPermission"); + return destinationOwnerAndPermissions; + } + + public Map<String, String> getDestinationOwnerAndPermissionsFsPermissionsMap(JsonObject jsonObject) { Review Comment: seems unclear abstraction to have every other method take `String json` except this one `JsonObject` can't you call it internally (i.e. as a `private` method), and produce a richer representation on a method like `getDestinationOwnerAndPermission`, so there's no need for callers to return and ask for further parsing of something another method of this same class already gave them? ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java: ########## @@ -460,8 +559,52 @@ public static class IndexingStreams { /** @return {@link Stream} equivalent of `inputs.zipWithIndex.map(f)` in scala */ public static <T, R> Stream<R> transformWithIndex(Stream<T> inputs, BiFunction<T, Integer, R> f) { // given sketchy import, sequester for now within enclosing test class, rather than adding to `gobblin-utility` - return org.apache.iceberg.relocated.com.google.common.collect.Streams.zip( - inputs, IntStream.iterate(0, i -> i + 1).boxed(), f); + return org.apache.iceberg.relocated.com.google.common.collect.Streams.zip(inputs, + IntStream.iterate(0, i -> i + 1).boxed(), f); + } + } + + private static class CopyEntityDeserializer { + + public String getFilePathAsStringFromJson(String json) { + String filepath = + new Gson().fromJson(json, JsonObject.class).getAsJsonObject("object-data").getAsJsonObject("origin") + .getAsJsonObject("object-data").getAsJsonObject("path").getAsJsonObject("object-data") + .getAsJsonObject("uri").getAsJsonPrimitive("object-data").getAsString(); + return filepath; + } + + public JsonArray getAncestorOwnerAndPermissions(String json) { + JsonArray ancestorsOwnerAndPermissions = + new Gson().fromJson(json, JsonObject.class).getAsJsonObject("object-data") + .getAsJsonArray("ancestorsOwnerAndPermission"); + return ancestorsOwnerAndPermissions; + } + + public JsonObject getDestinationOwnerAndPermissions(String json) { + JsonObject destinationOwnerAndPermissions = + new Gson().fromJson(json, JsonObject.class).getAsJsonObject("object-data") + .getAsJsonObject("destinationOwnerAndPermission"); + return destinationOwnerAndPermissions; + } + + public Map<String, String> getDestinationOwnerAndPermissionsFsPermissionsMap(JsonObject jsonObject) { + Map<String, String> fsPermissionsMap = Maps.newHashMap(); + JsonObject fsPermission = jsonObject.getAsJsonObject("object-data").getAsJsonObject("fsPermission"); + JsonObject objectData = fsPermission.getAsJsonObject("object-data"); + + String userActionPermission = + objectData.getAsJsonObject("useraction").getAsJsonPrimitive("object-data").getAsString(); + String groupActionPermission = + objectData.getAsJsonObject("groupaction").getAsJsonPrimitive("object-data").getAsString(); + String otherActionPermission = + objectData.getAsJsonObject("otheraction").getAsJsonPrimitive("object-data").getAsString(); + + fsPermissionsMap.put("useraction", userActionPermission); + fsPermissionsMap.put("groupaction", groupActionPermission); + fsPermissionsMap.put("otheraction", otherActionPermission); Review Comment: if you think there's a particular advantage to using a map, when a struct (e.g. `lombok.Data`) would be canonical, please justify w/ a comment for maintainers. ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java: ########## @@ -80,48 +84,50 @@ public class IcebergDatasetTest { private static final String MANIFEST_PATH_0 = ROOT_PATH + "metadata/manifest.a"; private static final String MANIFEST_DATA_PATH_0A = ROOT_PATH + "data/p0/a"; private static final String MANIFEST_DATA_PATH_0B = ROOT_PATH + "data/p0/b"; - private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_0 = new MockIcebergTable.SnapshotPaths( - Optional.of(METADATA_PATH), MANIFEST_LIST_PATH_0, - Arrays.asList(new IcebergSnapshotInfo.ManifestFileInfo( - MANIFEST_PATH_0, Arrays.asList(MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B))) - ); + private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_0 = + new MockIcebergTable.SnapshotPaths(Optional.of(METADATA_PATH), MANIFEST_LIST_PATH_0, Arrays.asList( + new IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_PATH_0, + Arrays.asList(MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B)))); private static final String MANIFEST_LIST_PATH_1 = MANIFEST_LIST_PATH_0.replaceAll("\\.x$", ".y"); private static final String MANIFEST_PATH_1 = MANIFEST_PATH_0.replaceAll("\\.a$", ".b"); private static final String MANIFEST_DATA_PATH_1A = MANIFEST_DATA_PATH_0A.replaceAll("/p0/", "/p1/"); private static final String MANIFEST_DATA_PATH_1B = MANIFEST_DATA_PATH_0B.replaceAll("/p0/", "/p1/"); - private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_1 = new MockIcebergTable.SnapshotPaths( - Optional.empty(), MANIFEST_LIST_PATH_1, - Arrays.asList(new IcebergSnapshotInfo.ManifestFileInfo( - MANIFEST_PATH_1, Arrays.asList(MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B))) - ); + private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_1 = + new MockIcebergTable.SnapshotPaths(Optional.empty(), MANIFEST_LIST_PATH_1, Arrays.asList( + new IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_PATH_1, + Arrays.asList(MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B)))); private final String testDbName = "test_db_name"; private final String testTblName = "test_tbl_name"; private final Properties copyConfigProperties = new Properties(); @BeforeClass - public void setUp() throws Exception { + public void setUp() + throws Exception { Review Comment: overall there's a ton of seemingly non-essential code reformatting which makes it considerably more tedious to review. e.g. is this line break really required? I see a larger number of counter examples in this same repo. ########## gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java: ########## @@ -80,6 +80,18 @@ public static Path getRootPath(Path path) { return getRootPath(path.getParent()); } + /** + * Returns the root path child for the specified path. + * + * @see Path Review Comment: what should we see there? ########## gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java: ########## @@ -80,6 +80,18 @@ public static Path getRootPath(Path path) { return getRootPath(path.getParent()); } + /** + * Returns the root path child for the specified path. Review Comment: this really could benefit from an example. e.g. if `/a/b`, do you return `/a`? ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java: ########## @@ -216,210 +228,295 @@ public void testPathErrorConsolidator() { * without calculating any difference between the source and destination */ @Test - public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, MANIFEST_LIST_PATH_0, - MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B); + public void testGenerateCopyEntitiesWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + Set<String> expectedPaths = expectedPathsAndFileStatuses.keySet(); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); Review Comment: seems no value in requiring everyone to provide `null` value. why not just take the file paths and internally fallback to `null`? ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java: ########## @@ -216,210 +228,295 @@ public void testPathErrorConsolidator() { * without calculating any difference between the source and destination */ @Test - public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, MANIFEST_LIST_PATH_0, - MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B); + public void testGenerateCopyEntitiesWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + Set<String> expectedPaths = expectedPathsAndFileStatuses.keySet(); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } /** Test generating copy entities for a multi-snapshot iceberg; given empty dest, src-dest delta will be entirety */ @Test - public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, - MANIFEST_LIST_PATH_0, MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B, - MANIFEST_LIST_PATH_1, MANIFEST_PATH_1, MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B); + public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1B, null); + Set<String> expectedPaths = (expectedPathsAndFileStatuses.keySet()); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } + @Test + public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, createFileStatus(METADATA_PATH, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, createFileStatus(MANIFEST_LIST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, createFileStatus(MANIFEST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, createFileStatus(MANIFEST_DATA_PATH_0A, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, createFileStatus(MANIFEST_DATA_PATH_0B, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); Review Comment: is this roughly equivalent: ``` List<String> expectedPaths = Arrays.asList(METADATA_PATH, MANIFEST_LIST_PATH_0, MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B, MANIFEST_LIST_PATH_1, MANIFEST_PATH_1, MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B); Map<String, FileStatus> expectedPathsAndFileStatuses = expectedPaths.stream().collect(Collectors.toMap(Function.identity(), p -> createFileStatus(p, null, null, new FsPermission(...)))); ``` since it clarifies the intent of setting these up all w/ the same perms, why not use that? also doesn't the permissions logic work on the ancestors? if so, shall we set path info and perms for them? ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java: ########## @@ -216,210 +228,295 @@ public void testPathErrorConsolidator() { * without calculating any difference between the source and destination */ @Test - public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, MANIFEST_LIST_PATH_0, - MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B); + public void testGenerateCopyEntitiesWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + Set<String> expectedPaths = expectedPathsAndFileStatuses.keySet(); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } /** Test generating copy entities for a multi-snapshot iceberg; given empty dest, src-dest delta will be entirety */ @Test - public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, - MANIFEST_LIST_PATH_0, MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B, - MANIFEST_LIST_PATH_1, MANIFEST_PATH_1, MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B); + public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1B, null); + Set<String> expectedPaths = (expectedPathsAndFileStatuses.keySet()); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } + @Test + public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, createFileStatus(METADATA_PATH, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, createFileStatus(MANIFEST_LIST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, createFileStatus(MANIFEST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, createFileStatus(MANIFEST_DATA_PATH_0A, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, createFileStatus(MANIFEST_DATA_PATH_0B, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + + MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); + FileSystem sourceFs = sourceBuilder.build(); + + IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + + MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); + FileSystem destFs = destBuilder.build(); + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("ugp")) Review Comment: comment please to remind what "ugp" means. even better might be to encode the semantics into the name of the test method BTW, since I believe "ug" is username and groupname, why are you setting those both `null`? ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java: ########## @@ -216,210 +228,295 @@ public void testPathErrorConsolidator() { * without calculating any difference between the source and destination */ @Test - public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, MANIFEST_LIST_PATH_0, - MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B); + public void testGenerateCopyEntitiesWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + Set<String> expectedPaths = expectedPathsAndFileStatuses.keySet(); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } /** Test generating copy entities for a multi-snapshot iceberg; given empty dest, src-dest delta will be entirety */ @Test - public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, - MANIFEST_LIST_PATH_0, MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B, - MANIFEST_LIST_PATH_1, MANIFEST_PATH_1, MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B); + public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1B, null); + Set<String> expectedPaths = (expectedPathsAndFileStatuses.keySet()); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } + @Test + public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, createFileStatus(METADATA_PATH, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, createFileStatus(MANIFEST_LIST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, createFileStatus(MANIFEST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, createFileStatus(MANIFEST_DATA_PATH_0A, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, createFileStatus(MANIFEST_DATA_PATH_0B, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + + MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); + FileSystem sourceFs = sourceBuilder.build(); + + IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + + MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); + FileSystem destFs = destBuilder.build(); + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("ugp")) + .copyContext(new CopyContext()).build(); + + Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); + verifyFsOwnershipAndPermissionPreservation(copyEntities, expectedPathsAndFileStatuses); + } + /** * exercise {@link IcebergDataset::getFilePaths} and validate the result * @return {@link IcebergTable} (mock!), for behavioral verification */ - protected IcebergTable validateGetFilePathsGivenDestState( - List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, - List<String> existingDestPaths, - Set<Path> expectedResultPaths) throws IOException { - return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(),existingDestPaths, expectedResultPaths); + protected IcebergTable validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, + List<String> existingDestPaths, Set<Path> expectedResultPaths) + throws IOException { + return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(), existingDestPaths, + expectedResultPaths); } /** * exercise {@link IcebergDataset::getFilePaths} and validate the result * @return {@link IcebergTable} (mock!), for behavioral verification */ - protected IcebergTable validateGetFilePathsGivenDestState( - List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, - Optional<List<String>> optExistingSourcePaths, - List<String> existingDestPaths, - Set<Path> expectedResultPaths) throws IOException { + protected IcebergTable validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, + Optional<List<String>> optExistingSourcePaths, List<String> existingDestPaths, Set<Path> expectedResultPaths) + throws IOException { IcebergTable icebergTable = MockIcebergTable.withSnapshots(sourceSnapshotPathSets); + Map<String, FileStatus> sourcePathAndFileStatusMap = Maps.newHashMap(); MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent()); - optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths); + optExistingSourcePaths.ifPresent((pathList) -> { + for (String path : pathList) { + sourcePathAndFileStatusMap.putIfAbsent(path, null); + } + sourceFsBuilder.addPathsAndFileStatuses(sourcePathAndFileStatusMap); Review Comment: as suggested above, if you make a method on the builder that takes only a list of paths (and falls back to `null` for `FileStatus`), this unnecessary complexity would go away ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java: ########## @@ -216,210 +228,295 @@ public void testPathErrorConsolidator() { * without calculating any difference between the source and destination */ @Test - public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, MANIFEST_LIST_PATH_0, - MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B); + public void testGenerateCopyEntitiesWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + Set<String> expectedPaths = expectedPathsAndFileStatuses.keySet(); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } /** Test generating copy entities for a multi-snapshot iceberg; given empty dest, src-dest delta will be entirety */ @Test - public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, - MANIFEST_LIST_PATH_0, MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B, - MANIFEST_LIST_PATH_1, MANIFEST_PATH_1, MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B); + public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1B, null); + Set<String> expectedPaths = (expectedPathsAndFileStatuses.keySet()); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } + @Test + public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, createFileStatus(METADATA_PATH, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, createFileStatus(MANIFEST_LIST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, createFileStatus(MANIFEST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, createFileStatus(MANIFEST_DATA_PATH_0A, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, createFileStatus(MANIFEST_DATA_PATH_0B, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + + MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); + FileSystem sourceFs = sourceBuilder.build(); + + IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + + MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); + FileSystem destFs = destBuilder.build(); + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("ugp")) + .copyContext(new CopyContext()).build(); + + Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); + verifyFsOwnershipAndPermissionPreservation(copyEntities, expectedPathsAndFileStatuses); + } + /** * exercise {@link IcebergDataset::getFilePaths} and validate the result * @return {@link IcebergTable} (mock!), for behavioral verification */ - protected IcebergTable validateGetFilePathsGivenDestState( - List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, - List<String> existingDestPaths, - Set<Path> expectedResultPaths) throws IOException { - return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(),existingDestPaths, expectedResultPaths); + protected IcebergTable validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, + List<String> existingDestPaths, Set<Path> expectedResultPaths) + throws IOException { + return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(), existingDestPaths, + expectedResultPaths); } /** * exercise {@link IcebergDataset::getFilePaths} and validate the result * @return {@link IcebergTable} (mock!), for behavioral verification */ - protected IcebergTable validateGetFilePathsGivenDestState( - List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, - Optional<List<String>> optExistingSourcePaths, - List<String> existingDestPaths, - Set<Path> expectedResultPaths) throws IOException { + protected IcebergTable validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, + Optional<List<String>> optExistingSourcePaths, List<String> existingDestPaths, Set<Path> expectedResultPaths) + throws IOException { IcebergTable icebergTable = MockIcebergTable.withSnapshots(sourceSnapshotPathSets); + Map<String, FileStatus> sourcePathAndFileStatusMap = Maps.newHashMap(); MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent()); - optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths); + optExistingSourcePaths.ifPresent((pathList) -> { + for (String path : pathList) { + sourcePathAndFileStatusMap.putIfAbsent(path, null); + } + sourceFsBuilder.addPathsAndFileStatuses(sourcePathAndFileStatusMap); + }); FileSystem sourceFs = sourceFsBuilder.build(); - IcebergDataset icebergDataset = new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI); - destFsBuilder.addPaths(existingDestPaths); + Map<String, FileStatus> destPathAndFileStatusMap = Maps.newHashMap(); + for (String destPath : existingDestPaths) { + destPathAndFileStatusMap.putIfAbsent(destPath, null); + } + destFsBuilder.addPathsAndFileStatuses(destPathAndFileStatusMap); FileSystem destFs = destFsBuilder.build(); CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs); Map<Path, FileStatus> filePathsToFileStatus = icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration); Assert.assertEquals(filePathsToFileStatus.keySet(), expectedResultPaths); // verify solely the path portion of the `FileStatus`, since that's all mock sets up - Assert.assertEquals( - filePathsToFileStatus.values().stream().map(FileStatus::getPath).collect(Collectors.toSet()), + Assert.assertEquals(filePathsToFileStatus.values().stream().map(FileStatus::getPath).collect(Collectors.toSet()), expectedResultPaths); return icebergTable; } /** @return `paths` after adding to it all paths of every one of `snapshotDefs` */ protected static Set<Path> withAllSnapshotPaths(Set<Path> paths, MockIcebergTable.SnapshotPaths... snapshotDefs) { - Arrays.stream(snapshotDefs).flatMap(snapshotDef -> - snapshotDef.asSnapshotInfo().getAllPaths().stream() - ).forEach(p -> - paths.add(new Path(p)) - ); + Arrays.stream(snapshotDefs).flatMap(snapshotDef -> snapshotDef.asSnapshotInfo().getAllPaths().stream()) + .forEach(p -> paths.add(new Path(p))); return paths; Review Comment: my $0.02... and also, not sure if specified by our coding convention, but I find the original easier to read because it highlights the functions passed to the methods, not the higher-order methods themself. ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java: ########## @@ -460,8 +559,52 @@ public static class IndexingStreams { /** @return {@link Stream} equivalent of `inputs.zipWithIndex.map(f)` in scala */ public static <T, R> Stream<R> transformWithIndex(Stream<T> inputs, BiFunction<T, Integer, R> f) { // given sketchy import, sequester for now within enclosing test class, rather than adding to `gobblin-utility` - return org.apache.iceberg.relocated.com.google.common.collect.Streams.zip( - inputs, IntStream.iterate(0, i -> i + 1).boxed(), f); + return org.apache.iceberg.relocated.com.google.common.collect.Streams.zip(inputs, + IntStream.iterate(0, i -> i + 1).boxed(), f); + } + } + + private static class CopyEntityDeserializer { + + public String getFilePathAsStringFromJson(String json) { Review Comment: I don't understand... why instance methods? if this object would cary no state, why not `static`? that said, a better design would be to have a `String json` ctor to initialize a `JsonObject` member that all of the *instance* methods utilize ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java: ########## @@ -216,210 +228,295 @@ public void testPathErrorConsolidator() { * without calculating any difference between the source and destination */ @Test - public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, MANIFEST_LIST_PATH_0, - MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B); + public void testGenerateCopyEntitiesWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + Set<String> expectedPaths = expectedPathsAndFileStatuses.keySet(); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } /** Test generating copy entities for a multi-snapshot iceberg; given empty dest, src-dest delta will be entirety */ @Test - public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, - MANIFEST_LIST_PATH_0, MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B, - MANIFEST_LIST_PATH_1, MANIFEST_PATH_1, MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B); + public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1B, null); + Set<String> expectedPaths = (expectedPathsAndFileStatuses.keySet()); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } + @Test + public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, createFileStatus(METADATA_PATH, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, createFileStatus(MANIFEST_LIST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, createFileStatus(MANIFEST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, createFileStatus(MANIFEST_DATA_PATH_0A, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, createFileStatus(MANIFEST_DATA_PATH_0B, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + + MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); + FileSystem sourceFs = sourceBuilder.build(); + + IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + + MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); + FileSystem destFs = destBuilder.build(); + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("ugp")) + .copyContext(new CopyContext()).build(); + + Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); + verifyFsOwnershipAndPermissionPreservation(copyEntities, expectedPathsAndFileStatuses); + } + /** * exercise {@link IcebergDataset::getFilePaths} and validate the result * @return {@link IcebergTable} (mock!), for behavioral verification */ - protected IcebergTable validateGetFilePathsGivenDestState( - List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, - List<String> existingDestPaths, - Set<Path> expectedResultPaths) throws IOException { - return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(),existingDestPaths, expectedResultPaths); + protected IcebergTable validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, + List<String> existingDestPaths, Set<Path> expectedResultPaths) + throws IOException { + return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(), existingDestPaths, + expectedResultPaths); } /** * exercise {@link IcebergDataset::getFilePaths} and validate the result * @return {@link IcebergTable} (mock!), for behavioral verification */ - protected IcebergTable validateGetFilePathsGivenDestState( - List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, - Optional<List<String>> optExistingSourcePaths, - List<String> existingDestPaths, - Set<Path> expectedResultPaths) throws IOException { + protected IcebergTable validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, + Optional<List<String>> optExistingSourcePaths, List<String> existingDestPaths, Set<Path> expectedResultPaths) + throws IOException { IcebergTable icebergTable = MockIcebergTable.withSnapshots(sourceSnapshotPathSets); + Map<String, FileStatus> sourcePathAndFileStatusMap = Maps.newHashMap(); MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent()); - optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths); + optExistingSourcePaths.ifPresent((pathList) -> { + for (String path : pathList) { + sourcePathAndFileStatusMap.putIfAbsent(path, null); + } + sourceFsBuilder.addPathsAndFileStatuses(sourcePathAndFileStatusMap); + }); FileSystem sourceFs = sourceFsBuilder.build(); - IcebergDataset icebergDataset = new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI); - destFsBuilder.addPaths(existingDestPaths); + Map<String, FileStatus> destPathAndFileStatusMap = Maps.newHashMap(); + for (String destPath : existingDestPaths) { + destPathAndFileStatusMap.putIfAbsent(destPath, null); + } + destFsBuilder.addPathsAndFileStatuses(destPathAndFileStatusMap); FileSystem destFs = destFsBuilder.build(); CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs); Map<Path, FileStatus> filePathsToFileStatus = icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration); Assert.assertEquals(filePathsToFileStatus.keySet(), expectedResultPaths); // verify solely the path portion of the `FileStatus`, since that's all mock sets up - Assert.assertEquals( - filePathsToFileStatus.values().stream().map(FileStatus::getPath).collect(Collectors.toSet()), + Assert.assertEquals(filePathsToFileStatus.values().stream().map(FileStatus::getPath).collect(Collectors.toSet()), expectedResultPaths); return icebergTable; } /** @return `paths` after adding to it all paths of every one of `snapshotDefs` */ protected static Set<Path> withAllSnapshotPaths(Set<Path> paths, MockIcebergTable.SnapshotPaths... snapshotDefs) { - Arrays.stream(snapshotDefs).flatMap(snapshotDef -> - snapshotDef.asSnapshotInfo().getAllPaths().stream() - ).forEach(p -> - paths.add(new Path(p)) - ); + Arrays.stream(snapshotDefs).flatMap(snapshotDef -> snapshotDef.asSnapshotInfo().getAllPaths().stream()) + .forEach(p -> paths.add(new Path(p))); return paths; } private CopyConfiguration createEmptyCopyConfiguration(FileSystem fs) { - return CopyConfiguration.builder(fs, copyConfigProperties) - .copyContext(new CopyContext()) - .build(); + return CopyConfiguration.builder(fs, copyConfigProperties).copyContext(new CopyContext()).build(); } - private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, List<String> expected) { - List<String> actual = new ArrayList<>(); + private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, Set<String> expected) { + Set<String> actual = new HashSet<>(); + CopyEntityDeserializer copyEntityDeserializer = new CopyEntityDeserializer(); for (CopyEntity copyEntity : copyEntities) { String json = copyEntity.toString(); - String filepath = new Gson().fromJson(json, JsonObject.class) - .getAsJsonObject("object-data").getAsJsonObject("origin") - .getAsJsonObject("object-data").getAsJsonObject("path") - .getAsJsonObject("object-data").getAsJsonObject("uri") - .getAsJsonPrimitive("object-data").getAsString(); + String filepath = copyEntityDeserializer.getFilePathAsStringFromJson(json); actual.add(filepath); } - Assert.assertEquals(actual.size(), expected.size(), - "Set" + actual.toString() + " vs Set" + expected.toString()); + Assert.assertEquals(actual.size(), expected.size(), "Set" + actual.toString() + " vs Set" + expected.toString()); Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray()); } + private void verifyFsOwnershipAndPermissionPreservation(Collection<CopyEntity> copyEntities, + Map<String, FileStatus> expectedMap) { + CopyEntityDeserializer copyEntityDeserializer = new CopyEntityDeserializer(); + + for (CopyEntity copyEntity : copyEntities) { + String copyEntityJson = copyEntity.toString(); + + JsonArray ancestorOwnerAndPermissions = copyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson); + String filePath = copyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson); + JsonObject destinationOwnerAndPermissions = + copyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson); + + FileStatus expected = expectedMap.get(filePath); + Map<String, String> actual = + copyEntityDeserializer.getDestinationOwnerAndPermissionsFsPermissionsMap(destinationOwnerAndPermissions); + + Assert.assertEquals(ancestorOwnerAndPermissions.size(), new Path(filePath).getParent().depth() - 1); + Assert.assertEquals(expected.getPermission().getUserAction().toString(), actual.get("useraction")); + Assert.assertEquals(expected.getPermission().getGroupAction().toString(), actual.get("groupaction")); + Assert.assertEquals(expected.getPermission().getOtherAction().toString(), actual.get("otheraction")); + } + } + + private FileStatus createFileStatus(String pathString, String owner, String group, FsPermission fsPermission) { + Path path = new Path(pathString); + return new FileStatus(0, false, 0, 0, 0, 0, fsPermission, owner, group, path); + } /** * Sadly, this is needed to avoid losing `FileSystem` mock to replacement from the `FileSystem.get` `static` * Without this, so to lose the mock, we'd be unable to set up any source paths as existing. */ protected static class TrickIcebergDataset extends IcebergDataset { - public TrickIcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, FileSystem sourceFs) { + public TrickIcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, + FileSystem sourceFs) { super(db, table, icebergTbl, properties, sourceFs); } @Override // as the `static` is not mock-able - protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Configuration hadoopConfig) throws IOException { + protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Configuration hadoopConfig) + throws IOException { return this.sourceFs; } - }; + } + ; /** Builds a {@link FileSystem} mock */ protected static class MockFileSystemBuilder { private final URI fsURI; /** when not `.isPresent()`, all paths exist; when `.get().isEmpty()`, none exist; else only those indicated do */ - private final Optional<Set<Path>> optPaths; + //private final Optional<Set<Path>> optPaths; // convert it to a map<path, filestatus> + private final Optional<Map<Path, FileStatus>> optPathsWithFileStatuses; public MockFileSystemBuilder(URI fsURI) { this(fsURI, false); } public MockFileSystemBuilder(URI fsURI, boolean shouldRepresentEveryPath) { this.fsURI = fsURI; - this.optPaths = shouldRepresentEveryPath ? Optional.empty() : Optional.of(Sets.newHashSet()); - } - - public Optional<Set<Path>> getPaths() { - return this.optPaths.map(Sets::newHashSet); // copy before returning + //this.optPaths = shouldRepresentEveryPath ? Optional.empty() : Optional.of(Sets.newHashSet()); + this.optPathsWithFileStatuses = shouldRepresentEveryPath ? Optional.empty() : Optional.of(Maps.newHashMap()); } - public void addPaths(List<String> pathStrings) { - for (String pathString : pathStrings) { - addPath(pathString); + public void addPathsAndFileStatuses(Map<String, FileStatus> pathStringToFsPermissionsMap) { + for (Map.Entry<String, FileStatus> entry : pathStringToFsPermissionsMap.entrySet()) { + String path = entry.getKey(); + FileStatus fileStatus = entry.getValue(); + addPathAndFileStatus(path, fileStatus); } } - public void addPath(String pathString) { - addPath(new Path(pathString)); + public void addPathAndFileStatus(String pathString, FileStatus fileStatus) { + addPathAndFileStatus(new Path(pathString), fileStatus); } - public void addPath(Path path) { - if (!this.optPaths.isPresent()) { + public void addPathAndFileStatus(Path path, FileStatus fileStatus) { + if (!this.optPathsWithFileStatuses.isPresent()) { throw new IllegalStateException("unable to add paths when constructed with `shouldRepresentEveryPath == true`"); } - if (this.optPaths.get().add(path) && !path.isRoot()) { // recursively add ancestors of a previously unknown path - addPath(path.getParent()); + optPathsWithFileStatuses.get().putIfAbsent(path, fileStatus); + if (!path.isRoot()) { // recursively add ancestors of a previously unknown path + addPathAndFileStatus(path.getParent(), fileStatus); } } - public FileSystem build() throws IOException { + public FileSystem build() + throws IOException { FileSystem fs = Mockito.mock(FileSystem.class); Mockito.when(fs.getUri()).thenReturn(fsURI); Mockito.when(fs.makeQualified(any(Path.class))) .thenAnswer(invocation -> invocation.getArgumentAt(0, Path.class).makeQualified(fsURI, new Path("/"))); - if (!this.optPaths.isPresent()) { - Mockito.when(fs.getFileStatus(any(Path.class))).thenAnswer(invocation -> - createEmptyFileStatus(invocation.getArgumentAt(0, Path.class).toString())); + if (!this.optPathsWithFileStatuses.isPresent()) { + Mockito.when(fs.getFileStatus(any(Path.class))) + .thenAnswer(invocation -> createEmptyFileStatus(invocation.getArgumentAt(0, Path.class).toString())); } else { // WARNING: order is critical--specific paths *after* `any(Path)`; in addition, since mocking further // an already-mocked instance, `.doReturn/.when` is needed (vs. `.when/.thenReturn`) Mockito.when(fs.getFileStatus(any(Path.class))).thenThrow(new FileNotFoundException()); - for (Path p : this.optPaths.get()) { - Mockito.doReturn(createEmptyFileStatus(p.toString())).when(fs).getFileStatus(p); + for (Path p : this.optPathsWithFileStatuses.get().keySet()) { + FileStatus fileStatus = this.optPathsWithFileStatuses.get().get(p); + if (fileStatus != null) { + Mockito.doReturn(fileStatus).when(fs).getFileStatus(p); + } else { + Mockito.doReturn(createEmptyFileStatus(p.toString())).when(fs).getFileStatus(p); Review Comment: looks like ternary operator to me ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java: ########## @@ -216,210 +228,295 @@ public void testPathErrorConsolidator() { * without calculating any difference between the source and destination */ @Test - public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, MANIFEST_LIST_PATH_0, - MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B); + public void testGenerateCopyEntitiesWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + Set<String> expectedPaths = expectedPathsAndFileStatuses.keySet(); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } /** Test generating copy entities for a multi-snapshot iceberg; given empty dest, src-dest delta will be entirety */ @Test - public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, - MANIFEST_LIST_PATH_0, MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B, - MANIFEST_LIST_PATH_1, MANIFEST_PATH_1, MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B); + public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1B, null); + Set<String> expectedPaths = (expectedPathsAndFileStatuses.keySet()); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } + @Test + public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, createFileStatus(METADATA_PATH, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, createFileStatus(MANIFEST_LIST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, createFileStatus(MANIFEST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, createFileStatus(MANIFEST_DATA_PATH_0A, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, createFileStatus(MANIFEST_DATA_PATH_0B, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + + MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); + FileSystem sourceFs = sourceBuilder.build(); + + IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + + MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); + FileSystem destFs = destBuilder.build(); + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("ugp")) + .copyContext(new CopyContext()).build(); + + Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); + verifyFsOwnershipAndPermissionPreservation(copyEntities, expectedPathsAndFileStatuses); + } + /** * exercise {@link IcebergDataset::getFilePaths} and validate the result * @return {@link IcebergTable} (mock!), for behavioral verification */ - protected IcebergTable validateGetFilePathsGivenDestState( - List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, - List<String> existingDestPaths, - Set<Path> expectedResultPaths) throws IOException { - return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(),existingDestPaths, expectedResultPaths); + protected IcebergTable validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, + List<String> existingDestPaths, Set<Path> expectedResultPaths) + throws IOException { + return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(), existingDestPaths, + expectedResultPaths); } /** * exercise {@link IcebergDataset::getFilePaths} and validate the result * @return {@link IcebergTable} (mock!), for behavioral verification */ - protected IcebergTable validateGetFilePathsGivenDestState( - List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, - Optional<List<String>> optExistingSourcePaths, - List<String> existingDestPaths, - Set<Path> expectedResultPaths) throws IOException { + protected IcebergTable validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, + Optional<List<String>> optExistingSourcePaths, List<String> existingDestPaths, Set<Path> expectedResultPaths) + throws IOException { IcebergTable icebergTable = MockIcebergTable.withSnapshots(sourceSnapshotPathSets); + Map<String, FileStatus> sourcePathAndFileStatusMap = Maps.newHashMap(); MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent()); - optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths); + optExistingSourcePaths.ifPresent((pathList) -> { + for (String path : pathList) { + sourcePathAndFileStatusMap.putIfAbsent(path, null); + } + sourceFsBuilder.addPathsAndFileStatuses(sourcePathAndFileStatusMap); + }); FileSystem sourceFs = sourceFsBuilder.build(); - IcebergDataset icebergDataset = new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI); - destFsBuilder.addPaths(existingDestPaths); + Map<String, FileStatus> destPathAndFileStatusMap = Maps.newHashMap(); + for (String destPath : existingDestPaths) { + destPathAndFileStatusMap.putIfAbsent(destPath, null); + } + destFsBuilder.addPathsAndFileStatuses(destPathAndFileStatusMap); FileSystem destFs = destFsBuilder.build(); CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs); Map<Path, FileStatus> filePathsToFileStatus = icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration); Assert.assertEquals(filePathsToFileStatus.keySet(), expectedResultPaths); // verify solely the path portion of the `FileStatus`, since that's all mock sets up - Assert.assertEquals( - filePathsToFileStatus.values().stream().map(FileStatus::getPath).collect(Collectors.toSet()), + Assert.assertEquals(filePathsToFileStatus.values().stream().map(FileStatus::getPath).collect(Collectors.toSet()), expectedResultPaths); return icebergTable; } /** @return `paths` after adding to it all paths of every one of `snapshotDefs` */ protected static Set<Path> withAllSnapshotPaths(Set<Path> paths, MockIcebergTable.SnapshotPaths... snapshotDefs) { - Arrays.stream(snapshotDefs).flatMap(snapshotDef -> - snapshotDef.asSnapshotInfo().getAllPaths().stream() - ).forEach(p -> - paths.add(new Path(p)) - ); + Arrays.stream(snapshotDefs).flatMap(snapshotDef -> snapshotDef.asSnapshotInfo().getAllPaths().stream()) + .forEach(p -> paths.add(new Path(p))); return paths; } private CopyConfiguration createEmptyCopyConfiguration(FileSystem fs) { - return CopyConfiguration.builder(fs, copyConfigProperties) - .copyContext(new CopyContext()) - .build(); + return CopyConfiguration.builder(fs, copyConfigProperties).copyContext(new CopyContext()).build(); } - private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, List<String> expected) { - List<String> actual = new ArrayList<>(); + private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, Set<String> expected) { + Set<String> actual = new HashSet<>(); + CopyEntityDeserializer copyEntityDeserializer = new CopyEntityDeserializer(); for (CopyEntity copyEntity : copyEntities) { String json = copyEntity.toString(); - String filepath = new Gson().fromJson(json, JsonObject.class) - .getAsJsonObject("object-data").getAsJsonObject("origin") - .getAsJsonObject("object-data").getAsJsonObject("path") - .getAsJsonObject("object-data").getAsJsonObject("uri") - .getAsJsonPrimitive("object-data").getAsString(); + String filepath = copyEntityDeserializer.getFilePathAsStringFromJson(json); actual.add(filepath); } - Assert.assertEquals(actual.size(), expected.size(), - "Set" + actual.toString() + " vs Set" + expected.toString()); + Assert.assertEquals(actual.size(), expected.size(), "Set" + actual.toString() + " vs Set" + expected.toString()); Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray()); } + private void verifyFsOwnershipAndPermissionPreservation(Collection<CopyEntity> copyEntities, + Map<String, FileStatus> expectedMap) { + CopyEntityDeserializer copyEntityDeserializer = new CopyEntityDeserializer(); + + for (CopyEntity copyEntity : copyEntities) { + String copyEntityJson = copyEntity.toString(); + + JsonArray ancestorOwnerAndPermissions = copyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson); + String filePath = copyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson); + JsonObject destinationOwnerAndPermissions = + copyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson); + + FileStatus expected = expectedMap.get(filePath); + Map<String, String> actual = + copyEntityDeserializer.getDestinationOwnerAndPermissionsFsPermissionsMap(destinationOwnerAndPermissions); + + Assert.assertEquals(ancestorOwnerAndPermissions.size(), new Path(filePath).getParent().depth() - 1); + Assert.assertEquals(expected.getPermission().getUserAction().toString(), actual.get("useraction")); + Assert.assertEquals(expected.getPermission().getGroupAction().toString(), actual.get("groupaction")); + Assert.assertEquals(expected.getPermission().getOtherAction().toString(), actual.get("otheraction")); + } + } + + private FileStatus createFileStatus(String pathString, String owner, String group, FsPermission fsPermission) { + Path path = new Path(pathString); + return new FileStatus(0, false, 0, 0, 0, 0, fsPermission, owner, group, path); + } /** * Sadly, this is needed to avoid losing `FileSystem` mock to replacement from the `FileSystem.get` `static` * Without this, so to lose the mock, we'd be unable to set up any source paths as existing. */ protected static class TrickIcebergDataset extends IcebergDataset { - public TrickIcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, FileSystem sourceFs) { + public TrickIcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, + FileSystem sourceFs) { super(db, table, icebergTbl, properties, sourceFs); } @Override // as the `static` is not mock-able - protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Configuration hadoopConfig) throws IOException { + protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Configuration hadoopConfig) + throws IOException { return this.sourceFs; } - }; + } + ; /** Builds a {@link FileSystem} mock */ protected static class MockFileSystemBuilder { private final URI fsURI; /** when not `.isPresent()`, all paths exist; when `.get().isEmpty()`, none exist; else only those indicated do */ - private final Optional<Set<Path>> optPaths; + //private final Optional<Set<Path>> optPaths; // convert it to a map<path, filestatus> + private final Optional<Map<Path, FileStatus>> optPathsWithFileStatuses; public MockFileSystemBuilder(URI fsURI) { this(fsURI, false); } public MockFileSystemBuilder(URI fsURI, boolean shouldRepresentEveryPath) { this.fsURI = fsURI; - this.optPaths = shouldRepresentEveryPath ? Optional.empty() : Optional.of(Sets.newHashSet()); - } - - public Optional<Set<Path>> getPaths() { - return this.optPaths.map(Sets::newHashSet); // copy before returning + //this.optPaths = shouldRepresentEveryPath ? Optional.empty() : Optional.of(Sets.newHashSet()); Review Comment: also here ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java: ########## @@ -216,210 +228,295 @@ public void testPathErrorConsolidator() { * without calculating any difference between the source and destination */ @Test - public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, MANIFEST_LIST_PATH_0, - MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B); + public void testGenerateCopyEntitiesWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + Set<String> expectedPaths = expectedPathsAndFileStatuses.keySet(); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } /** Test generating copy entities for a multi-snapshot iceberg; given empty dest, src-dest delta will be entirety */ @Test - public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, - MANIFEST_LIST_PATH_0, MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B, - MANIFEST_LIST_PATH_1, MANIFEST_PATH_1, MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B); + public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1B, null); + Set<String> expectedPaths = (expectedPathsAndFileStatuses.keySet()); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } + @Test + public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, createFileStatus(METADATA_PATH, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, createFileStatus(MANIFEST_LIST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, createFileStatus(MANIFEST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, createFileStatus(MANIFEST_DATA_PATH_0A, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, createFileStatus(MANIFEST_DATA_PATH_0B, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + + MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); + FileSystem sourceFs = sourceBuilder.build(); + + IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + + MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); + FileSystem destFs = destBuilder.build(); + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("ugp")) + .copyContext(new CopyContext()).build(); + + Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); + verifyFsOwnershipAndPermissionPreservation(copyEntities, expectedPathsAndFileStatuses); + } + /** * exercise {@link IcebergDataset::getFilePaths} and validate the result * @return {@link IcebergTable} (mock!), for behavioral verification */ - protected IcebergTable validateGetFilePathsGivenDestState( - List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, - List<String> existingDestPaths, - Set<Path> expectedResultPaths) throws IOException { - return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(),existingDestPaths, expectedResultPaths); + protected IcebergTable validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, + List<String> existingDestPaths, Set<Path> expectedResultPaths) + throws IOException { + return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(), existingDestPaths, + expectedResultPaths); } /** * exercise {@link IcebergDataset::getFilePaths} and validate the result * @return {@link IcebergTable} (mock!), for behavioral verification */ - protected IcebergTable validateGetFilePathsGivenDestState( - List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, - Optional<List<String>> optExistingSourcePaths, - List<String> existingDestPaths, - Set<Path> expectedResultPaths) throws IOException { + protected IcebergTable validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, + Optional<List<String>> optExistingSourcePaths, List<String> existingDestPaths, Set<Path> expectedResultPaths) + throws IOException { IcebergTable icebergTable = MockIcebergTable.withSnapshots(sourceSnapshotPathSets); + Map<String, FileStatus> sourcePathAndFileStatusMap = Maps.newHashMap(); MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent()); - optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths); + optExistingSourcePaths.ifPresent((pathList) -> { + for (String path : pathList) { + sourcePathAndFileStatusMap.putIfAbsent(path, null); + } + sourceFsBuilder.addPathsAndFileStatuses(sourcePathAndFileStatusMap); + }); FileSystem sourceFs = sourceFsBuilder.build(); - IcebergDataset icebergDataset = new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI); - destFsBuilder.addPaths(existingDestPaths); + Map<String, FileStatus> destPathAndFileStatusMap = Maps.newHashMap(); + for (String destPath : existingDestPaths) { + destPathAndFileStatusMap.putIfAbsent(destPath, null); + } + destFsBuilder.addPathsAndFileStatuses(destPathAndFileStatusMap); FileSystem destFs = destFsBuilder.build(); CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs); Map<Path, FileStatus> filePathsToFileStatus = icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration); Assert.assertEquals(filePathsToFileStatus.keySet(), expectedResultPaths); // verify solely the path portion of the `FileStatus`, since that's all mock sets up - Assert.assertEquals( - filePathsToFileStatus.values().stream().map(FileStatus::getPath).collect(Collectors.toSet()), + Assert.assertEquals(filePathsToFileStatus.values().stream().map(FileStatus::getPath).collect(Collectors.toSet()), expectedResultPaths); return icebergTable; } /** @return `paths` after adding to it all paths of every one of `snapshotDefs` */ protected static Set<Path> withAllSnapshotPaths(Set<Path> paths, MockIcebergTable.SnapshotPaths... snapshotDefs) { - Arrays.stream(snapshotDefs).flatMap(snapshotDef -> - snapshotDef.asSnapshotInfo().getAllPaths().stream() - ).forEach(p -> - paths.add(new Path(p)) - ); + Arrays.stream(snapshotDefs).flatMap(snapshotDef -> snapshotDef.asSnapshotInfo().getAllPaths().stream()) + .forEach(p -> paths.add(new Path(p))); return paths; } private CopyConfiguration createEmptyCopyConfiguration(FileSystem fs) { - return CopyConfiguration.builder(fs, copyConfigProperties) - .copyContext(new CopyContext()) - .build(); + return CopyConfiguration.builder(fs, copyConfigProperties).copyContext(new CopyContext()).build(); } - private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, List<String> expected) { - List<String> actual = new ArrayList<>(); + private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, Set<String> expected) { + Set<String> actual = new HashSet<>(); + CopyEntityDeserializer copyEntityDeserializer = new CopyEntityDeserializer(); for (CopyEntity copyEntity : copyEntities) { String json = copyEntity.toString(); - String filepath = new Gson().fromJson(json, JsonObject.class) - .getAsJsonObject("object-data").getAsJsonObject("origin") - .getAsJsonObject("object-data").getAsJsonObject("path") - .getAsJsonObject("object-data").getAsJsonObject("uri") - .getAsJsonPrimitive("object-data").getAsString(); + String filepath = copyEntityDeserializer.getFilePathAsStringFromJson(json); actual.add(filepath); } - Assert.assertEquals(actual.size(), expected.size(), - "Set" + actual.toString() + " vs Set" + expected.toString()); + Assert.assertEquals(actual.size(), expected.size(), "Set" + actual.toString() + " vs Set" + expected.toString()); Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray()); } + private void verifyFsOwnershipAndPermissionPreservation(Collection<CopyEntity> copyEntities, + Map<String, FileStatus> expectedMap) { + CopyEntityDeserializer copyEntityDeserializer = new CopyEntityDeserializer(); + + for (CopyEntity copyEntity : copyEntities) { + String copyEntityJson = copyEntity.toString(); + + JsonArray ancestorOwnerAndPermissions = copyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson); + String filePath = copyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson); + JsonObject destinationOwnerAndPermissions = + copyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson); + + FileStatus expected = expectedMap.get(filePath); + Map<String, String> actual = + copyEntityDeserializer.getDestinationOwnerAndPermissionsFsPermissionsMap(destinationOwnerAndPermissions); + + Assert.assertEquals(ancestorOwnerAndPermissions.size(), new Path(filePath).getParent().depth() - 1); + Assert.assertEquals(expected.getPermission().getUserAction().toString(), actual.get("useraction")); + Assert.assertEquals(expected.getPermission().getGroupAction().toString(), actual.get("groupaction")); + Assert.assertEquals(expected.getPermission().getOtherAction().toString(), actual.get("otheraction")); + } + } + + private FileStatus createFileStatus(String pathString, String owner, String group, FsPermission fsPermission) { + Path path = new Path(pathString); + return new FileStatus(0, false, 0, 0, 0, 0, fsPermission, owner, group, path); + } /** * Sadly, this is needed to avoid losing `FileSystem` mock to replacement from the `FileSystem.get` `static` * Without this, so to lose the mock, we'd be unable to set up any source paths as existing. */ protected static class TrickIcebergDataset extends IcebergDataset { - public TrickIcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, FileSystem sourceFs) { + public TrickIcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, + FileSystem sourceFs) { super(db, table, icebergTbl, properties, sourceFs); } @Override // as the `static` is not mock-able - protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Configuration hadoopConfig) throws IOException { + protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Configuration hadoopConfig) + throws IOException { return this.sourceFs; } - }; + } + ; /** Builds a {@link FileSystem} mock */ protected static class MockFileSystemBuilder { private final URI fsURI; /** when not `.isPresent()`, all paths exist; when `.get().isEmpty()`, none exist; else only those indicated do */ - private final Optional<Set<Path>> optPaths; + //private final Optional<Set<Path>> optPaths; // convert it to a map<path, filestatus> Review Comment: can remove comment ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java: ########## @@ -216,210 +228,295 @@ public void testPathErrorConsolidator() { * without calculating any difference between the source and destination */ @Test - public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, MANIFEST_LIST_PATH_0, - MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B); + public void testGenerateCopyEntitiesWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + Set<String> expectedPaths = expectedPathsAndFileStatuses.keySet(); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } /** Test generating copy entities for a multi-snapshot iceberg; given empty dest, src-dest delta will be entirety */ @Test - public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, - MANIFEST_LIST_PATH_0, MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B, - MANIFEST_LIST_PATH_1, MANIFEST_PATH_1, MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B); + public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1B, null); + Set<String> expectedPaths = (expectedPathsAndFileStatuses.keySet()); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } + @Test + public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, createFileStatus(METADATA_PATH, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, createFileStatus(MANIFEST_LIST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, createFileStatus(MANIFEST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, createFileStatus(MANIFEST_DATA_PATH_0A, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, createFileStatus(MANIFEST_DATA_PATH_0B, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + + MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); + FileSystem sourceFs = sourceBuilder.build(); + + IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + + MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); + FileSystem destFs = destBuilder.build(); + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("ugp")) + .copyContext(new CopyContext()).build(); + + Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); + verifyFsOwnershipAndPermissionPreservation(copyEntities, expectedPathsAndFileStatuses); + } + /** * exercise {@link IcebergDataset::getFilePaths} and validate the result * @return {@link IcebergTable} (mock!), for behavioral verification */ - protected IcebergTable validateGetFilePathsGivenDestState( - List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, - List<String> existingDestPaths, - Set<Path> expectedResultPaths) throws IOException { - return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(),existingDestPaths, expectedResultPaths); + protected IcebergTable validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, + List<String> existingDestPaths, Set<Path> expectedResultPaths) + throws IOException { + return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(), existingDestPaths, + expectedResultPaths); } /** * exercise {@link IcebergDataset::getFilePaths} and validate the result * @return {@link IcebergTable} (mock!), for behavioral verification */ - protected IcebergTable validateGetFilePathsGivenDestState( - List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, - Optional<List<String>> optExistingSourcePaths, - List<String> existingDestPaths, - Set<Path> expectedResultPaths) throws IOException { + protected IcebergTable validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, + Optional<List<String>> optExistingSourcePaths, List<String> existingDestPaths, Set<Path> expectedResultPaths) + throws IOException { IcebergTable icebergTable = MockIcebergTable.withSnapshots(sourceSnapshotPathSets); + Map<String, FileStatus> sourcePathAndFileStatusMap = Maps.newHashMap(); MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent()); - optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths); + optExistingSourcePaths.ifPresent((pathList) -> { + for (String path : pathList) { + sourcePathAndFileStatusMap.putIfAbsent(path, null); + } + sourceFsBuilder.addPathsAndFileStatuses(sourcePathAndFileStatusMap); + }); FileSystem sourceFs = sourceFsBuilder.build(); - IcebergDataset icebergDataset = new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI); - destFsBuilder.addPaths(existingDestPaths); + Map<String, FileStatus> destPathAndFileStatusMap = Maps.newHashMap(); + for (String destPath : existingDestPaths) { + destPathAndFileStatusMap.putIfAbsent(destPath, null); + } + destFsBuilder.addPathsAndFileStatuses(destPathAndFileStatusMap); FileSystem destFs = destFsBuilder.build(); CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs); Map<Path, FileStatus> filePathsToFileStatus = icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration); Assert.assertEquals(filePathsToFileStatus.keySet(), expectedResultPaths); // verify solely the path portion of the `FileStatus`, since that's all mock sets up - Assert.assertEquals( - filePathsToFileStatus.values().stream().map(FileStatus::getPath).collect(Collectors.toSet()), + Assert.assertEquals(filePathsToFileStatus.values().stream().map(FileStatus::getPath).collect(Collectors.toSet()), expectedResultPaths); return icebergTable; } /** @return `paths` after adding to it all paths of every one of `snapshotDefs` */ protected static Set<Path> withAllSnapshotPaths(Set<Path> paths, MockIcebergTable.SnapshotPaths... snapshotDefs) { - Arrays.stream(snapshotDefs).flatMap(snapshotDef -> - snapshotDef.asSnapshotInfo().getAllPaths().stream() - ).forEach(p -> - paths.add(new Path(p)) - ); + Arrays.stream(snapshotDefs).flatMap(snapshotDef -> snapshotDef.asSnapshotInfo().getAllPaths().stream()) + .forEach(p -> paths.add(new Path(p))); return paths; } private CopyConfiguration createEmptyCopyConfiguration(FileSystem fs) { - return CopyConfiguration.builder(fs, copyConfigProperties) - .copyContext(new CopyContext()) - .build(); + return CopyConfiguration.builder(fs, copyConfigProperties).copyContext(new CopyContext()).build(); } - private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, List<String> expected) { - List<String> actual = new ArrayList<>(); + private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, Set<String> expected) { + Set<String> actual = new HashSet<>(); + CopyEntityDeserializer copyEntityDeserializer = new CopyEntityDeserializer(); for (CopyEntity copyEntity : copyEntities) { String json = copyEntity.toString(); - String filepath = new Gson().fromJson(json, JsonObject.class) - .getAsJsonObject("object-data").getAsJsonObject("origin") - .getAsJsonObject("object-data").getAsJsonObject("path") - .getAsJsonObject("object-data").getAsJsonObject("uri") - .getAsJsonPrimitive("object-data").getAsString(); + String filepath = copyEntityDeserializer.getFilePathAsStringFromJson(json); actual.add(filepath); } - Assert.assertEquals(actual.size(), expected.size(), - "Set" + actual.toString() + " vs Set" + expected.toString()); + Assert.assertEquals(actual.size(), expected.size(), "Set" + actual.toString() + " vs Set" + expected.toString()); Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray()); } + private void verifyFsOwnershipAndPermissionPreservation(Collection<CopyEntity> copyEntities, + Map<String, FileStatus> expectedMap) { + CopyEntityDeserializer copyEntityDeserializer = new CopyEntityDeserializer(); + + for (CopyEntity copyEntity : copyEntities) { + String copyEntityJson = copyEntity.toString(); + + JsonArray ancestorOwnerAndPermissions = copyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson); + String filePath = copyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson); + JsonObject destinationOwnerAndPermissions = + copyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson); + + FileStatus expected = expectedMap.get(filePath); + Map<String, String> actual = + copyEntityDeserializer.getDestinationOwnerAndPermissionsFsPermissionsMap(destinationOwnerAndPermissions); + + Assert.assertEquals(ancestorOwnerAndPermissions.size(), new Path(filePath).getParent().depth() - 1); + Assert.assertEquals(expected.getPermission().getUserAction().toString(), actual.get("useraction")); + Assert.assertEquals(expected.getPermission().getGroupAction().toString(), actual.get("groupaction")); + Assert.assertEquals(expected.getPermission().getOtherAction().toString(), actual.get("otheraction")); + } + } + + private FileStatus createFileStatus(String pathString, String owner, String group, FsPermission fsPermission) { + Path path = new Path(pathString); + return new FileStatus(0, false, 0, 0, 0, 0, fsPermission, owner, group, path); + } /** * Sadly, this is needed to avoid losing `FileSystem` mock to replacement from the `FileSystem.get` `static` * Without this, so to lose the mock, we'd be unable to set up any source paths as existing. */ protected static class TrickIcebergDataset extends IcebergDataset { - public TrickIcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, FileSystem sourceFs) { + public TrickIcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, + FileSystem sourceFs) { super(db, table, icebergTbl, properties, sourceFs); } @Override // as the `static` is not mock-able - protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Configuration hadoopConfig) throws IOException { + protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Configuration hadoopConfig) + throws IOException { return this.sourceFs; } - }; + } + ; /** Builds a {@link FileSystem} mock */ protected static class MockFileSystemBuilder { private final URI fsURI; /** when not `.isPresent()`, all paths exist; when `.get().isEmpty()`, none exist; else only those indicated do */ - private final Optional<Set<Path>> optPaths; + //private final Optional<Set<Path>> optPaths; // convert it to a map<path, filestatus> + private final Optional<Map<Path, FileStatus>> optPathsWithFileStatuses; public MockFileSystemBuilder(URI fsURI) { this(fsURI, false); } public MockFileSystemBuilder(URI fsURI, boolean shouldRepresentEveryPath) { this.fsURI = fsURI; - this.optPaths = shouldRepresentEveryPath ? Optional.empty() : Optional.of(Sets.newHashSet()); - } - - public Optional<Set<Path>> getPaths() { - return this.optPaths.map(Sets::newHashSet); // copy before returning + //this.optPaths = shouldRepresentEveryPath ? Optional.empty() : Optional.of(Sets.newHashSet()); + this.optPathsWithFileStatuses = shouldRepresentEveryPath ? Optional.empty() : Optional.of(Maps.newHashMap()); } - public void addPaths(List<String> pathStrings) { - for (String pathString : pathStrings) { - addPath(pathString); + public void addPathsAndFileStatuses(Map<String, FileStatus> pathStringToFsPermissionsMap) { + for (Map.Entry<String, FileStatus> entry : pathStringToFsPermissionsMap.entrySet()) { + String path = entry.getKey(); + FileStatus fileStatus = entry.getValue(); + addPathAndFileStatus(path, fileStatus); } } - public void addPath(String pathString) { - addPath(new Path(pathString)); + public void addPathAndFileStatus(String pathString, FileStatus fileStatus) { + addPathAndFileStatus(new Path(pathString), fileStatus); } - public void addPath(Path path) { - if (!this.optPaths.isPresent()) { + public void addPathAndFileStatus(Path path, FileStatus fileStatus) { + if (!this.optPathsWithFileStatuses.isPresent()) { throw new IllegalStateException("unable to add paths when constructed with `shouldRepresentEveryPath == true`"); } - if (this.optPaths.get().add(path) && !path.isRoot()) { // recursively add ancestors of a previously unknown path - addPath(path.getParent()); + optPathsWithFileStatuses.get().putIfAbsent(path, fileStatus); + if (!path.isRoot()) { // recursively add ancestors of a previously unknown path + addPathAndFileStatus(path.getParent(), fileStatus); } } - public FileSystem build() throws IOException { + public FileSystem build() + throws IOException { FileSystem fs = Mockito.mock(FileSystem.class); Mockito.when(fs.getUri()).thenReturn(fsURI); Mockito.when(fs.makeQualified(any(Path.class))) .thenAnswer(invocation -> invocation.getArgumentAt(0, Path.class).makeQualified(fsURI, new Path("/"))); - if (!this.optPaths.isPresent()) { - Mockito.when(fs.getFileStatus(any(Path.class))).thenAnswer(invocation -> - createEmptyFileStatus(invocation.getArgumentAt(0, Path.class).toString())); + if (!this.optPathsWithFileStatuses.isPresent()) { + Mockito.when(fs.getFileStatus(any(Path.class))) + .thenAnswer(invocation -> createEmptyFileStatus(invocation.getArgumentAt(0, Path.class).toString())); } else { // WARNING: order is critical--specific paths *after* `any(Path)`; in addition, since mocking further // an already-mocked instance, `.doReturn/.when` is needed (vs. `.when/.thenReturn`) Mockito.when(fs.getFileStatus(any(Path.class))).thenThrow(new FileNotFoundException()); - for (Path p : this.optPaths.get()) { - Mockito.doReturn(createEmptyFileStatus(p.toString())).when(fs).getFileStatus(p); + for (Path p : this.optPathsWithFileStatuses.get().keySet()) { + FileStatus fileStatus = this.optPathsWithFileStatuses.get().get(p); Review Comment: if you wanted the value, why not `entrySet()`? ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java: ########## @@ -216,210 +228,295 @@ public void testPathErrorConsolidator() { * without calculating any difference between the source and destination */ @Test - public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, MANIFEST_LIST_PATH_0, - MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B); + public void testGenerateCopyEntitiesWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + Set<String> expectedPaths = expectedPathsAndFileStatuses.keySet(); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } /** Test generating copy entities for a multi-snapshot iceberg; given empty dest, src-dest delta will be entirety */ @Test - public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOException { - List<String> expectedPaths = Arrays.asList(METADATA_PATH, - MANIFEST_LIST_PATH_0, MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B, - MANIFEST_LIST_PATH_1, MANIFEST_PATH_1, MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B); + public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, null); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_1, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1A, null); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_1B, null); + Set<String> expectedPaths = (expectedPathsAndFileStatuses.keySet()); MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); - sourceBuilder.addPaths(expectedPaths); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); - CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties) - .preserve(PreserveAttributes.fromMnemonicString("")) - .copyContext(new CopyContext()) - .build(); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } + @Test + public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() + throws IOException { + Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap(); + expectedPathsAndFileStatuses.put(METADATA_PATH, createFileStatus(METADATA_PATH, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, createFileStatus(MANIFEST_LIST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, createFileStatus(MANIFEST_PATH_0, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, createFileStatus(MANIFEST_DATA_PATH_0A, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, createFileStatus(MANIFEST_DATA_PATH_0B, null, null, + new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE))); + + MockFileSystemBuilder sourceBuilder = new MockFileSystemBuilder(SRC_FS_URI); + sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); + FileSystem sourceFs = sourceBuilder.build(); + + IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergDataset icebergDataset = + new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + + MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); + FileSystem destFs = destBuilder.build(); + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("ugp")) + .copyContext(new CopyContext()).build(); + + Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); + verifyFsOwnershipAndPermissionPreservation(copyEntities, expectedPathsAndFileStatuses); + } + /** * exercise {@link IcebergDataset::getFilePaths} and validate the result * @return {@link IcebergTable} (mock!), for behavioral verification */ - protected IcebergTable validateGetFilePathsGivenDestState( - List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, - List<String> existingDestPaths, - Set<Path> expectedResultPaths) throws IOException { - return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(),existingDestPaths, expectedResultPaths); + protected IcebergTable validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, + List<String> existingDestPaths, Set<Path> expectedResultPaths) + throws IOException { + return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(), existingDestPaths, + expectedResultPaths); } /** * exercise {@link IcebergDataset::getFilePaths} and validate the result * @return {@link IcebergTable} (mock!), for behavioral verification */ - protected IcebergTable validateGetFilePathsGivenDestState( - List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, - Optional<List<String>> optExistingSourcePaths, - List<String> existingDestPaths, - Set<Path> expectedResultPaths) throws IOException { + protected IcebergTable validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets, + Optional<List<String>> optExistingSourcePaths, List<String> existingDestPaths, Set<Path> expectedResultPaths) + throws IOException { IcebergTable icebergTable = MockIcebergTable.withSnapshots(sourceSnapshotPathSets); + Map<String, FileStatus> sourcePathAndFileStatusMap = Maps.newHashMap(); MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent()); - optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths); + optExistingSourcePaths.ifPresent((pathList) -> { + for (String path : pathList) { + sourcePathAndFileStatusMap.putIfAbsent(path, null); + } + sourceFsBuilder.addPathsAndFileStatuses(sourcePathAndFileStatusMap); + }); FileSystem sourceFs = sourceFsBuilder.build(); - IcebergDataset icebergDataset = new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = + new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI); - destFsBuilder.addPaths(existingDestPaths); + Map<String, FileStatus> destPathAndFileStatusMap = Maps.newHashMap(); + for (String destPath : existingDestPaths) { + destPathAndFileStatusMap.putIfAbsent(destPath, null); + } + destFsBuilder.addPathsAndFileStatuses(destPathAndFileStatusMap); FileSystem destFs = destFsBuilder.build(); CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs); Map<Path, FileStatus> filePathsToFileStatus = icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration); Assert.assertEquals(filePathsToFileStatus.keySet(), expectedResultPaths); // verify solely the path portion of the `FileStatus`, since that's all mock sets up - Assert.assertEquals( - filePathsToFileStatus.values().stream().map(FileStatus::getPath).collect(Collectors.toSet()), + Assert.assertEquals(filePathsToFileStatus.values().stream().map(FileStatus::getPath).collect(Collectors.toSet()), expectedResultPaths); return icebergTable; } /** @return `paths` after adding to it all paths of every one of `snapshotDefs` */ protected static Set<Path> withAllSnapshotPaths(Set<Path> paths, MockIcebergTable.SnapshotPaths... snapshotDefs) { - Arrays.stream(snapshotDefs).flatMap(snapshotDef -> - snapshotDef.asSnapshotInfo().getAllPaths().stream() - ).forEach(p -> - paths.add(new Path(p)) - ); + Arrays.stream(snapshotDefs).flatMap(snapshotDef -> snapshotDef.asSnapshotInfo().getAllPaths().stream()) + .forEach(p -> paths.add(new Path(p))); return paths; } private CopyConfiguration createEmptyCopyConfiguration(FileSystem fs) { - return CopyConfiguration.builder(fs, copyConfigProperties) - .copyContext(new CopyContext()) - .build(); + return CopyConfiguration.builder(fs, copyConfigProperties).copyContext(new CopyContext()).build(); } - private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, List<String> expected) { - List<String> actual = new ArrayList<>(); + private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, Set<String> expected) { + Set<String> actual = new HashSet<>(); + CopyEntityDeserializer copyEntityDeserializer = new CopyEntityDeserializer(); for (CopyEntity copyEntity : copyEntities) { String json = copyEntity.toString(); - String filepath = new Gson().fromJson(json, JsonObject.class) - .getAsJsonObject("object-data").getAsJsonObject("origin") - .getAsJsonObject("object-data").getAsJsonObject("path") - .getAsJsonObject("object-data").getAsJsonObject("uri") - .getAsJsonPrimitive("object-data").getAsString(); + String filepath = copyEntityDeserializer.getFilePathAsStringFromJson(json); actual.add(filepath); } - Assert.assertEquals(actual.size(), expected.size(), - "Set" + actual.toString() + " vs Set" + expected.toString()); + Assert.assertEquals(actual.size(), expected.size(), "Set" + actual.toString() + " vs Set" + expected.toString()); Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray()); } + private void verifyFsOwnershipAndPermissionPreservation(Collection<CopyEntity> copyEntities, + Map<String, FileStatus> expectedMap) { + CopyEntityDeserializer copyEntityDeserializer = new CopyEntityDeserializer(); + + for (CopyEntity copyEntity : copyEntities) { + String copyEntityJson = copyEntity.toString(); + + JsonArray ancestorOwnerAndPermissions = copyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson); + String filePath = copyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson); + JsonObject destinationOwnerAndPermissions = + copyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson); + + FileStatus expected = expectedMap.get(filePath); + Map<String, String> actual = + copyEntityDeserializer.getDestinationOwnerAndPermissionsFsPermissionsMap(destinationOwnerAndPermissions); + + Assert.assertEquals(ancestorOwnerAndPermissions.size(), new Path(filePath).getParent().depth() - 1); Review Comment: shan't we verify more than the length? BTW, where are the user and group names verified? Issue Time Tracking ------------------- Worklog Id: (was: 821639) Time Spent: 50m (was: 40m) > Preserve Ancestor Owner and Permissions for Fs between Src and Dest for > Iceberg Distcp > -------------------------------------------------------------------------------------- > > Key: GOBBLIN-1720 > URL: https://issues.apache.org/jira/browse/GOBBLIN-1720 > Project: Apache Gobblin > Issue Type: Improvement > Reporter: Meeth Gala > Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > We want to preserve the Fs ownership and permissions between src and dest > while performing an Iceberg based distcp. Currently, we are preserving all > the permissions up to root dir for Iceberg tables. -- This message was sent by Atlassian Jira (v8.20.10#820010)