This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8be7fb637 [GOBBLIN-1720]Add ancestors owner permissions preservations 
for iceberg distcp (#3577)
8be7fb637 is described below

commit 8be7fb637e9390ed76b4de30377d64908e229ea5
Author: meethngala <[email protected]>
AuthorDate: Tue Nov 15 11:47:24 2022 -0800

    [GOBBLIN-1720]Add ancestors owner permissions preservations for iceberg 
distcp (#3577)
    
    * rebasing from master
    
    * Rebasing from master
    
    * fix checkstyle
    
    * minor javadoc update
    
    * updated path utils and iceberg dataset unit test for Fs permissions
    
    * addressed comments on PR
    
    * added counter test for not preserving Fs attributes
    
    * minor: changed set to list
    
    * fixed checkstyle
    
    * adding verifcation for ancestor owner and permissions of Fs for iceberg 
dataset test
    
    Co-authored-by: Meeth Gala <[email protected]>
---
 .../management/copy/iceberg/IcebergDataset.java    |  10 +-
 .../copy/iceberg/IcebergDatasetTest.java           | 317 +++++++++++++++------
 .../java/org/apache/gobblin/util/PathUtils.java    |  16 ++
 3 files changed, 261 insertions(+), 82 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
index 006e9c1c6..63b23260f 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -51,10 +51,12 @@ import 
org.apache.gobblin.data.management.copy.CopyConfiguration;
 import org.apache.gobblin.data.management.copy.CopyEntity;
 import org.apache.gobblin.data.management.copy.CopyableDataset;
 import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.OwnerAndPermission;
 import 
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
 import org.apache.gobblin.data.management.partition.FileSet;
 import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.request_allocation.PushDownRequestor;
 
 /**
@@ -144,13 +146,17 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
       FileStatus srcFileStatus = entry.getValue();
       // TODO: should be the same FS each time; try creating once, reusing 
thereafter, to not recreate wastefully
       FileSystem actualSourceFs = 
getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
+      Path greatestAncestorPath = PathUtils.getRootPathChild(srcPath);
 
-      // TODO: Add preservation of ancestor ownership and permissions!
-
+      // preserving ancestor permissions till root path's child between src 
and dest
+      List<OwnerAndPermission> ancestorOwnerAndPermissionList =
+          
CopyableFile.resolveReplicatedOwnerAndPermissionsRecursively(actualSourceFs,
+              srcPath.getParent(), greatestAncestorPath, copyConfig);
       CopyableFile fileEntity = CopyableFile.fromOriginAndDestination(
           actualSourceFs, srcFileStatus, targetFs.makeQualified(srcPath), 
copyConfig)
           .fileSet(fileSet)
           .datasetOutputPath(targetFs.getUri().getPath())
+          .ancestorsOwnerAndPermission(ancestorOwnerAndPermissionList)
           .build();
       fileEntity.setSourceData(getSourceDataset(this.sourceFs));
       fileEntity.setDestinationData(getDestinationDataset(targetFs));
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index 159eca34c..d8683d250 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -32,30 +32,35 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
-
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
-import lombok.Data;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
+import org.testng.collections.Sets;
 
+import com.google.api.client.util.Maps;
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 
+import joptsimple.internal.Strings;
+import lombok.Data;
+
 import org.apache.gobblin.data.management.copy.CopyConfiguration;
 import org.apache.gobblin.data.management.copy.CopyContext;
 import org.apache.gobblin.data.management.copy.CopyEntity;
 import org.apache.gobblin.data.management.copy.PreserveAttributes;
-import org.testng.collections.Sets;
 
 import static org.mockito.Matchers.any;
 
@@ -65,6 +70,7 @@ public class IcebergDatasetTest {
 
   private static final URI SRC_FS_URI;
   private static final URI DEST_FS_URI;
+
   static {
     try {
       SRC_FS_URI = new URI("abc", "the.source.org", "/", null);
@@ -80,20 +86,18 @@ public class IcebergDatasetTest {
   private static final String MANIFEST_PATH_0 = ROOT_PATH + 
"metadata/manifest.a";
   private static final String MANIFEST_DATA_PATH_0A = ROOT_PATH + "data/p0/a";
   private static final String MANIFEST_DATA_PATH_0B = ROOT_PATH + "data/p0/b";
-  private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_0 = new 
MockIcebergTable.SnapshotPaths(
-      Optional.of(METADATA_PATH), MANIFEST_LIST_PATH_0,
-      Arrays.asList(new IcebergSnapshotInfo.ManifestFileInfo(
-          MANIFEST_PATH_0, Arrays.asList(MANIFEST_DATA_PATH_0A, 
MANIFEST_DATA_PATH_0B)))
-  );
+  private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_0 =
+      new MockIcebergTable.SnapshotPaths(Optional.of(METADATA_PATH), 
MANIFEST_LIST_PATH_0, Arrays.asList(
+          new IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_PATH_0,
+              Arrays.asList(MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B))));
   private static final String MANIFEST_LIST_PATH_1 = 
MANIFEST_LIST_PATH_0.replaceAll("\\.x$", ".y");
   private static final String MANIFEST_PATH_1 = 
MANIFEST_PATH_0.replaceAll("\\.a$", ".b");
   private static final String MANIFEST_DATA_PATH_1A = 
MANIFEST_DATA_PATH_0A.replaceAll("/p0/", "/p1/");
   private static final String MANIFEST_DATA_PATH_1B = 
MANIFEST_DATA_PATH_0B.replaceAll("/p0/", "/p1/");
-  private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_1 = new 
MockIcebergTable.SnapshotPaths(
-      Optional.empty(), MANIFEST_LIST_PATH_1,
-      Arrays.asList(new IcebergSnapshotInfo.ManifestFileInfo(
-          MANIFEST_PATH_1, Arrays.asList(MANIFEST_DATA_PATH_1A, 
MANIFEST_DATA_PATH_1B)))
-  );
+  private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_1 =
+      new MockIcebergTable.SnapshotPaths(Optional.empty(), 
MANIFEST_LIST_PATH_1, Arrays.asList(
+          new IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_PATH_1,
+              Arrays.asList(MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B))));
 
   private final String testDbName = "test_db_name";
   private final String testTblName = "test_tbl_name";
@@ -150,10 +154,8 @@ public class IcebergDatasetTest {
     List<String> existingDestPaths = Lists.newArrayList(MANIFEST_LIST_PATH_1);
     Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), 
SNAPSHOT_PATHS_0);
     expectedResultPaths.remove(missingPath);
-    validateGetFilePathsGivenDestState(
-        icebergSnapshots,
-        
Optional.of(existingSourcePaths.stream().map(Path::toString).collect(Collectors.toList())),
-        existingDestPaths,
+    validateGetFilePathsGivenDestState(icebergSnapshots,
+        
Optional.of(existingSourcePaths.stream().map(Path::toString).collect(Collectors.toList())),
 existingDestPaths,
         expectedResultPaths);
   }
 
@@ -171,7 +173,8 @@ public class IcebergDatasetTest {
     List<MockIcebergTable.SnapshotPaths> icebergSnapshots = 
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
     List<String> existingDestPaths = Lists.newArrayList(METADATA_PATH, 
MANIFEST_LIST_PATH_1, MANIFEST_LIST_PATH_0);
     Set<Path> expectedResultPaths = Sets.newHashSet(); // not expecting any 
delta
-    IcebergTable mockTable = 
validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, 
expectedResultPaths);
+    IcebergTable mockTable =
+        validateGetFilePathsGivenDestState(icebergSnapshots, 
existingDestPaths, expectedResultPaths);
     // ensure short-circuiting was able to avert iceberg manifests scan
     Mockito.verify(mockTable, 
Mockito.times(1)).getCurrentSnapshotInfoOverviewOnly();
     Mockito.verifyNoMoreInteractions(mockTable);
@@ -219,21 +222,20 @@ public class IcebergDatasetTest {
   public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException {
     List<String> expectedPaths = Arrays.asList(METADATA_PATH, 
MANIFEST_LIST_PATH_0,
         MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B);
-
     MockFileSystemBuilder sourceBuilder = new 
MockFileSystemBuilder(SRC_FS_URI);
     sourceBuilder.addPaths(expectedPaths);
     FileSystem sourceFs = sourceBuilder.build();
 
     IcebergTable icebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
-    IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, 
testTblName, icebergTable, new Properties(), sourceFs);
+    IcebergDataset icebergDataset =
+        new TrickIcebergDataset(testDbName, testTblName, icebergTable, new 
Properties(), sourceFs);
 
     MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destBuilder.build();
 
-    CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, 
copyConfigProperties)
-        .preserve(PreserveAttributes.fromMnemonicString(""))
-        .copyContext(new CopyContext())
-        .build();
+    CopyConfiguration copyConfiguration =
+        CopyConfiguration.builder(destFs, 
copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString(""))
+            .copyContext(new CopyContext()).build();
     Collection<CopyEntity> copyEntities = 
icebergDataset.generateCopyEntities(destFs, copyConfiguration);
     verifyCopyEntities(copyEntities, expectedPaths);
   }
@@ -250,45 +252,102 @@ public class IcebergDatasetTest {
     FileSystem sourceFs = sourceBuilder.build();
 
     IcebergTable icebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, 
SNAPSHOT_PATHS_0));
-    IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, 
testTblName, icebergTable, new Properties(), sourceFs);
+    IcebergDataset icebergDataset =
+        new TrickIcebergDataset(testDbName, testTblName, icebergTable, new 
Properties(), sourceFs);
 
     MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destBuilder.build();
 
-    CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, 
copyConfigProperties)
-        .preserve(PreserveAttributes.fromMnemonicString(""))
-        .copyContext(new CopyContext())
-        .build();
+    CopyConfiguration copyConfiguration =
+        CopyConfiguration.builder(destFs, 
copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString(""))
+            .copyContext(new CopyContext()).build();
     Collection<CopyEntity> copyEntities = 
icebergDataset.generateCopyEntities(destFs, copyConfiguration);
     verifyCopyEntities(copyEntities, expectedPaths);
   }
 
+  @Test
+  public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() throws 
IOException {
+    FileStatus metadataFileStatus = new FileStatus(0, false, 0, 0, 0, 0, new 
FsPermission(FsAction.WRITE, FsAction.READ, FsAction.NONE), "metadata_owner", 
"metadata_group", null);
+    FileStatus manifestFileStatus = new FileStatus(0, false, 0, 0, 0, 0, new 
FsPermission(FsAction.WRITE, FsAction.READ, FsAction.NONE), 
"manifest_list_owner", "manifest_list_group", null);
+    FileStatus manifestDataFileStatus = new FileStatus(0, false, 0, 0, 0, 0, 
new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE), 
"manifest_data_owner", "manifest_data_group", null);
+    Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap();
+    expectedPathsAndFileStatuses.put(METADATA_PATH, metadataFileStatus);
+    expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, manifestFileStatus);
+    expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, manifestFileStatus);
+    expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A, 
manifestDataFileStatus);
+    expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B, 
manifestDataFileStatus);
+
+    MockFileSystemBuilder sourceBuilder = new 
MockFileSystemBuilder(SRC_FS_URI);
+    sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses);
+    FileSystem sourceFs = sourceBuilder.build();
+
+    IcebergTable icebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
+    IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, 
testTblName, icebergTable, new Properties(), sourceFs);
+
+    MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
+    FileSystem destFs = destBuilder.build();
+
+    CopyConfiguration copyConfiguration =
+        CopyConfiguration.builder(destFs, copyConfigProperties)
+            // preserving attributes for owner, group and permissions 
respectively
+            .preserve(PreserveAttributes.fromMnemonicString("ugp"))
+            .copyContext(new CopyContext()).build();
+
+    Collection<CopyEntity> copyEntities = 
icebergDataset.generateCopyEntities(destFs, copyConfiguration);
+    verifyFsOwnershipAndPermissionPreservation(copyEntities, 
sourceBuilder.getPathsAndFileStatuses());
+  }
+
+  @Test
+  public void testFsOwnershipAndPermissionWithoutPreservationWhenDestEmpty() 
throws IOException {
+    List<String> expectedPaths = Arrays.asList(METADATA_PATH, 
MANIFEST_LIST_PATH_0,
+        MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B);
+    Map<Path, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap();
+    for (String expectedPath : expectedPaths) {
+      expectedPathsAndFileStatuses.putIfAbsent(new Path(expectedPath), new 
FileStatus());
+    }
+    MockFileSystemBuilder sourceBuilder = new 
MockFileSystemBuilder(SRC_FS_URI);
+    sourceBuilder.addPaths(expectedPaths);
+    FileSystem sourceFs = sourceBuilder.build();
+
+    IcebergTable icebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
+    IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, 
testTblName, icebergTable, new Properties(), sourceFs);
+
+    MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
+    FileSystem destFs = destBuilder.build();
+
+    CopyConfiguration copyConfiguration =
+        CopyConfiguration.builder(destFs, copyConfigProperties)
+            // without preserving attributes for owner, group and permissions
+            .preserve(PreserveAttributes.fromMnemonicString(""))
+            .copyContext(new CopyContext()).build();
+
+    Collection<CopyEntity> copyEntities = 
icebergDataset.generateCopyEntities(destFs, copyConfiguration);
+    verifyFsOwnershipAndPermissionPreservation(copyEntities, 
expectedPathsAndFileStatuses);
+  }
+
   /**
    *  exercise {@link IcebergDataset::getFilePaths} and validate the result
    *  @return {@link IcebergTable} (mock!), for behavioral verification
    */
-  protected IcebergTable validateGetFilePathsGivenDestState(
-      List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets,
-      List<String> existingDestPaths,
-      Set<Path> expectedResultPaths) throws IOException {
-    return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, 
Optional.empty(),existingDestPaths, expectedResultPaths);
+  protected IcebergTable 
validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> 
sourceSnapshotPathSets,
+      List<String> existingDestPaths, Set<Path> expectedResultPaths) throws 
IOException {
+    return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, 
Optional.empty(), existingDestPaths,
+        expectedResultPaths);
   }
 
   /**
    *  exercise {@link IcebergDataset::getFilePaths} and validate the result
    *  @return {@link IcebergTable} (mock!), for behavioral verification
    */
-  protected IcebergTable validateGetFilePathsGivenDestState(
-      List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets,
-      Optional<List<String>> optExistingSourcePaths,
-      List<String> existingDestPaths,
-      Set<Path> expectedResultPaths) throws IOException {
+  protected IcebergTable 
validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> 
sourceSnapshotPathSets,
+      Optional<List<String>> optExistingSourcePaths, List<String> 
existingDestPaths, Set<Path> expectedResultPaths) throws IOException {
     IcebergTable icebergTable = 
MockIcebergTable.withSnapshots(sourceSnapshotPathSets);
 
     MockFileSystemBuilder sourceFsBuilder = new 
MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent());
     optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths);
     FileSystem sourceFs = sourceFsBuilder.build();
-    IcebergDataset icebergDataset = new IcebergDataset(testDbName, 
testTblName, icebergTable, new Properties(), sourceFs);
+    IcebergDataset icebergDataset =
+        new IcebergDataset(testDbName, testTblName, icebergTable, new 
Properties(), sourceFs);
 
     MockFileSystemBuilder destFsBuilder = new 
MockFileSystemBuilder(DEST_FS_URI);
     destFsBuilder.addPaths(existingDestPaths);
@@ -307,42 +366,65 @@ public class IcebergDatasetTest {
   /** @return `paths` after adding to it all paths of every one of 
`snapshotDefs` */
   protected static Set<Path> withAllSnapshotPaths(Set<Path> paths, 
MockIcebergTable.SnapshotPaths... snapshotDefs) {
     Arrays.stream(snapshotDefs).flatMap(snapshotDef ->
-        snapshotDef.asSnapshotInfo().getAllPaths().stream()
-    ).forEach(p ->
-        paths.add(new Path(p))
-    );
+            snapshotDef.asSnapshotInfo().getAllPaths().stream())
+        .forEach(p ->
+            paths.add(new Path(p))
+        );
     return paths;
   }
 
   private CopyConfiguration createEmptyCopyConfiguration(FileSystem fs) {
-    return CopyConfiguration.builder(fs, copyConfigProperties)
-        .copyContext(new CopyContext())
-        .build();
+    return CopyConfiguration.builder(fs, copyConfigProperties).copyContext(new 
CopyContext()).build();
   }
 
   private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, 
List<String> expected) {
     List<String> actual = new ArrayList<>();
     for (CopyEntity copyEntity : copyEntities) {
       String json = copyEntity.toString();
-      String filepath = new Gson().fromJson(json, JsonObject.class)
-          .getAsJsonObject("object-data").getAsJsonObject("origin")
-          .getAsJsonObject("object-data").getAsJsonObject("path")
-          .getAsJsonObject("object-data").getAsJsonObject("uri")
-          .getAsJsonPrimitive("object-data").getAsString();
+      String filepath = 
CopyEntityDeserializer.getFilePathAsStringFromJson(json);
       actual.add(filepath);
     }
-    Assert.assertEquals(actual.size(), expected.size(),
-        "Set" + actual.toString() + " vs Set" + expected.toString());
+    Assert.assertEquals(actual.size(), expected.size(), "Set" + 
actual.toString() + " vs Set" + expected.toString());
     Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray());
   }
 
+  private static void 
verifyFsOwnershipAndPermissionPreservation(Collection<CopyEntity> copyEntities, 
Map<Path, FileStatus> expectedPathsAndFileStatuses) {
+    for (CopyEntity copyEntity : copyEntities) {
+      String copyEntityJson = copyEntity.toString();
+      List<CopyEntityDeserializer.FileOwnerAndPermissions> 
ancestorFileOwnerAndPermissionsList = 
CopyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson);
+      CopyEntityDeserializer.FileOwnerAndPermissions 
destinationFileOwnerAndPermissions = 
CopyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson);
+      Path filePath = new 
Path(CopyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson));
+      FileStatus fileStatus = expectedPathsAndFileStatuses.get(filePath);
+      verifyFileStatus(destinationFileOwnerAndPermissions, fileStatus);
+      // providing path's parent to verify ancestor owner and permissions
+      verifyAncestorPermissions(ancestorFileOwnerAndPermissionsList, 
filePath.getParent(), expectedPathsAndFileStatuses);
+    }
+  }
+
+  private static void 
verifyFileStatus(CopyEntityDeserializer.FileOwnerAndPermissions actual, 
FileStatus expected) {
+    Assert.assertEquals(actual.owner, expected.getOwner());
+    Assert.assertEquals(actual.group, expected.getGroup());
+    Assert.assertEquals(actual.userActionPermission, 
expected.getPermission().getUserAction().toString());
+    Assert.assertEquals(actual.groupActionPermission, 
expected.getPermission().getGroupAction().toString());
+    Assert.assertEquals(actual.otherActionPermission, 
expected.getPermission().getOtherAction().toString());
+  }
+
+  private static void 
verifyAncestorPermissions(List<CopyEntityDeserializer.FileOwnerAndPermissions> 
actualList, Path path, Map<Path, FileStatus> pathFileStatusMap) {
+
+    for (CopyEntityDeserializer.FileOwnerAndPermissions actual : actualList) {
+      FileStatus expected = pathFileStatusMap.getOrDefault(path, new 
FileStatus());
+      verifyFileStatus(actual, expected);
+      path = path.getParent();
+    }
+  }
 
   /**
    *  Sadly, this is needed to avoid losing `FileSystem` mock to replacement 
from the `FileSystem.get` `static`
    *  Without this, so to lose the mock, we'd be unable to set up any source 
paths as existing.
    */
   protected static class TrickIcebergDataset extends IcebergDataset {
-    public TrickIcebergDataset(String db, String table, IcebergTable 
icebergTbl, Properties properties, FileSystem sourceFs) {
+    public TrickIcebergDataset(String db, String table, IcebergTable 
icebergTbl, Properties properties,
+        FileSystem sourceFs) {
       super(db, table, icebergTbl, properties, sourceFs);
     }
 
@@ -350,14 +432,15 @@ public class IcebergDatasetTest {
     protected FileSystem getSourceFileSystemFromFileStatus(FileStatus 
fileStatus, Configuration hadoopConfig) throws IOException {
       return this.sourceFs;
     }
-  };
+  }
 
+  ;
 
   /** Builds a {@link FileSystem} mock */
   protected static class MockFileSystemBuilder {
     private final URI fsURI;
     /** when not `.isPresent()`, all paths exist; when `.get().isEmpty()`, 
none exist; else only those indicated do */
-    private final Optional<Set<Path>> optPaths;
+    private final Optional<Map<Path, FileStatus>> optPathsWithFileStatuses;
 
     public MockFileSystemBuilder(URI fsURI) {
       this(fsURI, false);
@@ -365,47 +448,63 @@ public class IcebergDatasetTest {
 
     public MockFileSystemBuilder(URI fsURI, boolean shouldRepresentEveryPath) {
       this.fsURI = fsURI;
-      this.optPaths = shouldRepresentEveryPath ? Optional.empty() : 
Optional.of(Sets.newHashSet());
-    }
-
-    public Optional<Set<Path>> getPaths() {
-      return this.optPaths.map(Sets::newHashSet); // copy before returning
+      this.optPathsWithFileStatuses = shouldRepresentEveryPath ? 
Optional.empty() : Optional.of(Maps.newHashMap());
     }
 
     public void addPaths(List<String> pathStrings) {
+      Map<String, FileStatus> map = Maps.newHashMap();
       for (String pathString : pathStrings) {
-        addPath(pathString);
+        map.putIfAbsent(pathString, null);
       }
+      addPathsAndFileStatuses(map);
     }
 
-    public void addPath(String pathString) {
-      addPath(new Path(pathString));
+    public void addPathsAndFileStatuses(Map<String, FileStatus> 
pathAndFileStatuses) {
+      for (Map.Entry<String, FileStatus> entry : 
pathAndFileStatuses.entrySet()) {
+        String pathString = entry.getKey();
+        FileStatus fileStatus = entry.getValue();
+        addPathsAndFileStatuses(pathString, fileStatus);
+      }
     }
 
-    public void addPath(Path path) {
-      if (!this.optPaths.isPresent()) {
-        throw new IllegalStateException("unable to add paths when constructed 
with `shouldRepresentEveryPath == true`");
+    public void addPathsAndFileStatuses(String pathString, FileStatus 
fileStatus) {
+      Path path = new Path(pathString);
+      if(fileStatus != null) { fileStatus.setPath(path);}
+      addPathAndFileStatus(path, fileStatus);
+    }
+
+    public void addPathAndFileStatus(Path path, FileStatus fileStatus) {
+      if (!this.optPathsWithFileStatuses.isPresent()) {
+        throw new IllegalStateException("unable to add paths and file statuses 
when constructed");
       }
-      if (this.optPaths.get().add(path) && !path.isRoot()) { // recursively 
add ancestors of a previously unknown path
-        addPath(path.getParent());
+      optPathsWithFileStatuses.get().putIfAbsent(path, fileStatus);
+      if (!path.isRoot()) { // recursively add ancestors of a previously 
unknown path
+        addPathAndFileStatus(path.getParent(), fileStatus);
       }
     }
 
-    public FileSystem build() throws IOException {
+    public Map<Path, FileStatus> getPathsAndFileStatuses() {
+      return optPathsWithFileStatuses.get();
+    }
+
+    public FileSystem build()
+        throws IOException {
       FileSystem fs = Mockito.mock(FileSystem.class);
       Mockito.when(fs.getUri()).thenReturn(fsURI);
       Mockito.when(fs.makeQualified(any(Path.class)))
           .thenAnswer(invocation -> invocation.getArgumentAt(0, 
Path.class).makeQualified(fsURI, new Path("/")));
 
-      if (!this.optPaths.isPresent()) {
-        Mockito.when(fs.getFileStatus(any(Path.class))).thenAnswer(invocation 
->
-            createEmptyFileStatus(invocation.getArgumentAt(0, 
Path.class).toString()));
+      if (!this.optPathsWithFileStatuses.isPresent()) {
+        Mockito.when(fs.getFileStatus(any(Path.class)))
+            .thenAnswer(invocation -> 
createEmptyFileStatus(invocation.getArgumentAt(0, Path.class).toString()));
       } else {
         // WARNING: order is critical--specific paths *after* `any(Path)`; in 
addition, since mocking further
         // an already-mocked instance, `.doReturn/.when` is needed (vs. 
`.when/.thenReturn`)
         Mockito.when(fs.getFileStatus(any(Path.class))).thenThrow(new 
FileNotFoundException());
-        for (Path p : this.optPaths.get()) {
-          
Mockito.doReturn(createEmptyFileStatus(p.toString())).when(fs).getFileStatus(p);
+        for (Map.Entry<Path, FileStatus> entry : 
this.optPathsWithFileStatuses.get().entrySet()) {
+          Path p = entry.getKey();
+          FileStatus fileStatus = entry.getValue();
+          Mockito.doReturn(fileStatus != null ? fileStatus : 
createEmptyFileStatus(p.toString())).when(fs).getFileStatus(p);
         }
       }
       return fs;
@@ -419,7 +518,6 @@ public class IcebergDatasetTest {
     }
   }
 
-
   private static class MockIcebergTable {
 
     @Data
@@ -445,8 +543,8 @@ public class IcebergDatasetTest {
     public static IcebergTable withSnapshots(List<SnapshotPaths> 
snapshotPathSets) throws IOException {
       IcebergTable table = Mockito.mock(IcebergTable.class);
       int lastIndex = snapshotPathSets.size() - 1;
-      Mockito.when(table.getCurrentSnapshotInfoOverviewOnly()).thenReturn(
-          snapshotPathSets.get(lastIndex).asSnapshotInfo(lastIndex));
+      Mockito.when(table.getCurrentSnapshotInfoOverviewOnly())
+          
.thenReturn(snapshotPathSets.get(lastIndex).asSnapshotInfo(lastIndex));
       // ADMISSION: this is strictly more analogous to 
`IcebergTable.getAllSnapshotInfosIterator()`, as it doesn't
       // filter only the delta... nonetheless, it should work fine for the 
tests herein
       Mockito.when(table.getIncrementalSnapshotInfosIterator()).thenReturn(
@@ -464,5 +562,64 @@ public class IcebergDatasetTest {
           inputs, IntStream.iterate(0, i -> i + 1).boxed(), f);
     }
   }
+
+  private static class CopyEntityDeserializer {
+
+    @Data
+    public static class FileOwnerAndPermissions {
+      String owner;
+      String group;
+      // assigning default values
+      String userActionPermission = FsAction.valueOf("READ_WRITE").toString();
+      String groupActionPermission = FsAction.valueOf("READ_WRITE").toString();
+      String otherActionPermission = FsAction.valueOf("READ_WRITE").toString();
+    }
+
+    public static String getFilePathAsStringFromJson(String json) {
+      String filepath = new Gson().fromJson(json, JsonObject.class)
+              .getAsJsonObject("object-data").getAsJsonObject("origin")
+              
.getAsJsonObject("object-data").getAsJsonObject("path").getAsJsonObject("object-data")
+              
.getAsJsonObject("uri").getAsJsonPrimitive("object-data").getAsString();
+      return filepath;
+    }
+
+    public static List<FileOwnerAndPermissions> 
getAncestorOwnerAndPermissions(String json) {
+      JsonArray ancestorsOwnerAndPermissions = new Gson().fromJson(json, 
JsonObject.class)
+              .getAsJsonObject("object-data")
+              .getAsJsonArray("ancestorsOwnerAndPermission");
+      List<FileOwnerAndPermissions> fileOwnerAndPermissionsList = 
Lists.newArrayList();
+      for (JsonElement jsonElement : ancestorsOwnerAndPermissions) {
+        
fileOwnerAndPermissionsList.add(getFileOwnerAndPermissions(jsonElement.getAsJsonObject()));
+      }
+      return fileOwnerAndPermissionsList;
+    }
+
+    public static FileOwnerAndPermissions 
getDestinationOwnerAndPermissions(String json) {
+      JsonObject destinationOwnerAndPermissionsJsonObject = new 
Gson().fromJson(json, JsonObject.class)
+          .getAsJsonObject("object-data")
+          .getAsJsonObject("destinationOwnerAndPermission");
+      FileOwnerAndPermissions fileOwnerAndPermissions = 
getFileOwnerAndPermissions(destinationOwnerAndPermissionsJsonObject);
+      return fileOwnerAndPermissions;
+    }
+
+    private static FileOwnerAndPermissions 
getFileOwnerAndPermissions(JsonObject jsonObject) {
+      FileOwnerAndPermissions fileOwnerAndPermissions = new 
FileOwnerAndPermissions();
+      JsonObject objData = jsonObject.getAsJsonObject("object-data");
+      fileOwnerAndPermissions.owner = objData.has("owner") ? 
objData.getAsJsonPrimitive("owner").getAsString() : Strings.EMPTY;
+      fileOwnerAndPermissions.group = objData.has("group") ? 
objData.getAsJsonPrimitive("group").getAsString() : Strings.EMPTY;
+
+      JsonObject fsPermission = objData.has("fsPermission") ? 
objData.getAsJsonObject("fsPermission") : null;
+      if (fsPermission != null) {
+        JsonObject objectData = fsPermission.getAsJsonObject("object-data");
+        fileOwnerAndPermissions.userActionPermission =
+            
objectData.getAsJsonObject("useraction").getAsJsonPrimitive("object-data").getAsString();
+        fileOwnerAndPermissions.groupActionPermission =
+            
objectData.getAsJsonObject("groupaction").getAsJsonPrimitive("object-data").getAsString();
+        fileOwnerAndPermissions.otherActionPermission =
+            
objectData.getAsJsonObject("otheraction").getAsJsonPrimitive("object-data").getAsString();
+      }
+      return fileOwnerAndPermissions;
+    }
+  }
 }
 
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
index 64aa0a39e..28a9114cc 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
@@ -80,6 +80,22 @@ public class PathUtils {
     return getRootPath(path.getParent());
   }
 
+  /**
+   * Returns the root path child for the specified path.
+   * Example: input: /a/b/c then it will return /a
+   *
+   */
+  public static Path getRootPathChild(Path path) {
+    if (path.getParent() == null) {
+      return null;
+    }
+
+    if (path.getParent().isRoot()) {
+      return path;
+    }
+    return getRootPathChild(path.getParent());
+  }
+
   /**
    * Removes the leading slash if present.
    *

Reply via email to