This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new eaed5825a3 [GOBBLIN-2128] Fix bugs in setpermission step where it does 
not properly add records… (#4020)
eaed5825a3 is described below

commit eaed5825a3e7a5d94361fa56d0fcf9c69c85d911
Author: William Lo <[email protected]>
AuthorDate: Sun Aug 11 15:06:18 2024 -0400

    [GOBBLIN-2128] Fix bugs in setpermission step where it does not properly 
add records… (#4020)
    
    * Fix bugs in setpermission step where it does not properly add records 
with equal file depths
---
 .../data/management/copy/ManifestBasedDataset.java |  40 ++++--
 .../management/copy/RecursiveCopyableDataset.java  |   3 +-
 .../dataset/ManifestBasedDatasetFinderTest.java    | 153 +++++++++++++++++++--
 .../longNestedDirectoryTreeManifest.json           |  26 ++++
 4 files changed, 198 insertions(+), 24 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
index 834010ff4c..6d7df69043 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
@@ -19,13 +19,12 @@ package org.apache.gobblin.data.management.copy;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
@@ -65,6 +64,9 @@ public class ManifestBasedDataset implements 
IterableCopyableDataset {
   private static final String DELETE_FILE_NOT_EXIST_ON_SOURCE = 
ManifestBasedDatasetFinder.CONFIG_PREFIX + ".deleteFileNotExistOnSource";
   private static final String COMMON_FILES_PARENT = 
ManifestBasedDatasetFinder.CONFIG_PREFIX + ".commonFilesParent";
   private static final String PERMISSION_CACHE_TTL_SECONDS = 
ManifestBasedDatasetFinder.CONFIG_PREFIX + ".permission.cache.ttl.seconds";
+
+  // Enable setting permission post publish to reset permission bits, default 
is true
+  private static final String ENABLE_SET_PERMISSION_POST_PUBLISH = 
ManifestBasedDatasetFinder.CONFIG_PREFIX + ".enableSetPermissionPostPublish";
   private static final String DEFAULT_PERMISSION_CACHE_TTL_SECONDS = "30";
   private static final String DEFAULT_COMMON_FILES_PARENT = "/";
   private final FileSystem srcFs;
@@ -75,6 +77,8 @@ public class ManifestBasedDataset implements 
IterableCopyableDataset {
   private final String commonFilesParent;
   private final int permissionCacheTTLSeconds;
 
+  private final boolean enableSetPermissionPostPublish;
+
   public ManifestBasedDataset(final FileSystem srcFs, final FileSystem 
manifestReadFs, final Path manifestPath, final Properties properties) {
     this.srcFs = srcFs;
     this.manifestReadFs = manifestReadFs;
@@ -83,6 +87,7 @@ public class ManifestBasedDataset implements 
IterableCopyableDataset {
     this.deleteFileThatNotExistOnSource = 
Boolean.parseBoolean(properties.getProperty(DELETE_FILE_NOT_EXIST_ON_SOURCE, 
"false"));
     this.commonFilesParent = properties.getProperty(COMMON_FILES_PARENT, 
DEFAULT_COMMON_FILES_PARENT);
     this.permissionCacheTTLSeconds = 
Integer.parseInt(properties.getProperty(PERMISSION_CACHE_TTL_SECONDS, 
DEFAULT_PERMISSION_CACHE_TTL_SECONDS));
+    this.enableSetPermissionPostPublish = 
Boolean.parseBoolean(properties.getProperty(ENABLE_SET_PERMISSION_POST_PUBLISH, 
"true"));
   }
 
   @Override
@@ -109,7 +114,7 @@ public class ManifestBasedDataset implements 
IterableCopyableDataset {
     // map of paths and permissions sorted by depth of path, so that 
permissions can be set in order
     Map<String, List<OwnerAndPermission>> ancestorOwnerAndPermissions = new 
HashMap<>();
     TreeMap<String, OwnerAndPermission> flattenedAncestorPermissions = new 
TreeMap<>(
-        (o1, o2) -> Long.compare(o1.chars().filter(ch -> ch == '/').count(), 
o2.chars().filter(ch -> ch == '/').count()));
+        Comparator.comparingInt((String o) -> 
o.split("/").length).thenComparing(o -> o));
     try {
       long startTime = System.currentTimeMillis();
       manifests = CopyManifest.getReadIterator(this.manifestReadFs, 
this.manifestPath);
@@ -144,7 +149,6 @@ public class ManifestBasedDataset implements 
IterableCopyableDataset {
             // Avoid duplicate calculation for the same ancestor
             if (fromPath != null && 
!ancestorOwnerAndPermissions.containsKey(PathUtils.getPathWithoutSchemeAndAuthority(fromPath).toString())
 && !targetFs.exists(fromPath)) {
               ancestorOwnerAndPermissions.put(fromPath.toString(), 
copyableFile.getAncestorsOwnerAndPermission());
-              
flattenedAncestorPermissions.putAll(CopyableFile.resolveReplicatedAncestorOwnerAndPermissionsRecursively(srcFs,
 fromPath, new Path(commonFilesParent), configuration));
             }
 
             if (existOnTarget && srcFile.isFile()) {
@@ -157,20 +161,26 @@ public class ManifestBasedDataset implements 
IterableCopyableDataset {
           toDelete.add(targetFs.getFileStatus(fileToCopy));
         }
       }
-      // Only set permission for newly created folders on target
-      // To change permissions for existing dirs, expectation is to add the 
folder to the manifest
-      Set<String> parentFolders = new 
HashSet<>(flattenedAncestorPermissions.keySet());
-      for (String folder : parentFolders) {
-        if (targetFs.exists(new Path(folder))) {
-          flattenedAncestorPermissions.remove(folder);
-        }
-      }
-      // We need both precommit step to create the directories copying to, and 
a postcommit step to ensure that the execute bit needed for recursive rename is 
reset
+
+      // Precreate the directories to avoid an edge case where recursive 
rename can create extra directories in the target
       CommitStep createDirectoryWithPermissionsCommitStep = new 
CreateDirectoryWithPermissionsCommitStep(targetFs, ancestorOwnerAndPermissions, 
this.properties);
-      CommitStep setPermissionCommitStep = new 
SetPermissionCommitStep(targetFs, flattenedAncestorPermissions, 
this.properties);
       copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), 
createDirectoryWithPermissionsCommitStep, 1));
-      copyEntities.add(new PostPublishStep(datasetURN(), Maps.newHashMap(), 
setPermissionCommitStep, 1));
 
+      if (this.enableSetPermissionPostPublish) {
+        for (Map.Entry<String, List<OwnerAndPermission>> 
recursiveParentPermissions : ancestorOwnerAndPermissions.entrySet()) {
+          Path currentPath = new Path(recursiveParentPermissions.getKey());
+          for (OwnerAndPermission ownerAndPermission : 
recursiveParentPermissions.getValue()) {
+            // Ignore folders that already exist in destination, we assume 
that the publisher will re-sync those permissions if needed and
+            // those folders should be added in the manifest.
+            if 
(!flattenedAncestorPermissions.containsKey(currentPath.toString()) && 
!targetFs.exists(currentPath)) {
+              flattenedAncestorPermissions.put(currentPath.toString(), 
ownerAndPermission);
+            }
+            currentPath = currentPath.getParent();
+          }
+        }
+        CommitStep setPermissionCommitStep = new 
SetPermissionCommitStep(targetFs, flattenedAncestorPermissions, 
this.properties);
+        copyEntities.add(new PostPublishStep(datasetURN(), Maps.newHashMap(), 
setPermissionCommitStep, 1));
+      }
       if (!toDelete.isEmpty()) {
         //todo: add support sync for empty dir
         CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, 
this.properties, Optional.<Path>absent());
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index 2228cad396..0c38f414e0 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.data.management.copy;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -144,7 +145,7 @@ public class RecursiveCopyableDataset implements 
CopyableDataset, FileSystemData
 
     // map of paths and permissions sorted by depth of path, so that 
permissions can be set in order
     TreeMap<String, OwnerAndPermission> ancestorOwnerAndPermissions = new 
TreeMap<>(
-        (o1, o2) -> Long.compare(o2.chars().filter(ch -> ch == '/').count(), 
o1.chars().filter(ch -> ch == '/').count()));
+        Comparator.comparingInt((String o) -> 
o.split("/").length).thenComparing(o -> o));
 
     for (Path path : toCopy) {
       FileStatus file = filesInSource.get(path);
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
index 33100d7b8d..d81de494d0 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
@@ -21,16 +21,19 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -90,7 +93,7 @@ public class ManifestBasedDatasetFinderTest {
     try (FileSystem sourceFs = Mockito.mock(FileSystem.class);
         FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
         FileSystem destFs = Mockito.mock(FileSystem.class);) {
-      setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs);
+      setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs, 
true);
 
       Iterator<FileSet<CopyEntity>> fileSets =
           new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, 
props).getFileSetIterator(destFs,
@@ -127,7 +130,7 @@ public class ManifestBasedDatasetFinderTest {
         FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
         FileSystem destFs = Mockito.mock(FileSystem.class)
     ) {
-      setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs);
+      setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs, 
true);
       Mockito.when(destFs.exists(new 
Path("/tmp/dataset/test1.txt"))).thenReturn(true);
       Mockito.when(destFs.exists(new 
Path("/tmp/dataset/test2.txt"))).thenReturn(false);
       Mockito.when(destFs.exists(new Path("/tmp"))).thenReturn(true);
@@ -158,8 +161,6 @@ public class ManifestBasedDatasetFinderTest {
       // Ignore /tmp as it already exists on destination
       Assert.assertEquals(ownerAndPermissionMap.size(), 1);
       Assert.assertTrue(ownerAndPermissionMap.containsKey("/tmp/dataset"));
-
-
     }
   }
 
@@ -176,7 +177,7 @@ public class ManifestBasedDatasetFinderTest {
         FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
         FileSystem destFs = Mockito.mock(FileSystem.class)
     ) {
-      setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs);
+      setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs, 
true);
       Mockito.when(destFs.exists(new 
Path("/tmp/dataset/test1.txt"))).thenReturn(true);
       Mockito.when(destFs.exists(new 
Path("/tmp/dataset/test2.txt"))).thenReturn(true);
       
Mockito.when(destFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new
 Path(tmpDir.toString())));
@@ -212,7 +213,7 @@ public class ManifestBasedDatasetFinderTest {
     FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
     FileSystem destFs = Mockito.mock(FileSystem.class);
     Path manifestPath = new Path(manifestLocation);
-    setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs);
+    setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs, 
true);
     Iterator<FileSet<CopyEntity>> fileSets = new 
ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, 
props).getFileSetIterator(destFs,
         CopyConfiguration.builder(destFs, props).build());
     Assert.assertTrue(fileSets.hasNext());
@@ -223,7 +224,126 @@ public class ManifestBasedDatasetFinderTest {
 
   }
 
-  private void setSourceAndDestFsMocks(FileSystem sourceFs, FileSystem destFs, 
Path manifestPath, FileSystem manifestReadFs) throws IOException, 
URISyntaxException {
+  @Test
+  public void testSetPermissionNestedTreePermissions() throws IOException, 
URISyntaxException {
+
+    //Get manifest Path
+    Path manifestPath = new 
Path(getClass().getClassLoader().getResource("manifestBasedDistcpTest/longNestedDirectoryTreeManifest.json").getPath());
+    // Test manifestDatasetFinder
+    Properties props = new Properties();
+    props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/");
+    props.setProperty("gobblin.copy.preserved.attributes", "rbugpvta");
+    try (FileSystem sourceFs = Mockito.mock(FileSystem.class);
+        FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
+        FileSystem destFs = Mockito.mock(FileSystem.class)) {
+      setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs, 
false);
+      // Mock that these files exist but still recopy due to different 
permissions
+      Mockito.when(destFs.exists(new 
Path("/tmp/dataset/hourly/metadata/test1.txt"))).thenReturn(true);
+      Mockito.when(destFs.exists(new 
Path("/tmp/dataset/hourly/metadata/test2.txt"))).thenReturn(true);
+      Mockito.when(destFs.exists(new Path("/tmp"))).thenReturn(false);
+
+      
Mockito.when(destFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new
 Path(tmpDir.toString())));
+
+      setFsMockPathWithPermissions(sourceFs, 
"/tmp/dataset/hourly/metadata/test1.txt", "-rwxrwxrwx", "owner1", "group1", 
false);
+      setFsMockPathWithPermissions(sourceFs, 
"/tmp/dataset/hourly/metadata/test2.txt", "-rwxrwxrwx", "owner1", "group1", 
false);
+      setFsMockPathWithPermissions(sourceFs, 
"/tmp/dataset2/hourly/metadata/test1.txt", "-rwxrwxrwx", "owner2", "group2", 
false);
+      setFsMockPathWithPermissions(sourceFs, 
"/tmp/dataset2/hourly/metadata/test2.txt", "-rwxrwxrwx", "owner2", "group2", 
false);
+      setFsMockPathWithPermissions(sourceFs, "/tmp/dataset/hourly/metadata", 
"drwxrw-rw-", "owner1", "group1", true);
+      setFsMockPathWithPermissions(sourceFs, "/tmp/dataset2/hourly/metadata", 
"dr-xr-xr-x", "owner2", "group2", true);
+      setFsMockPathWithPermissions(sourceFs, "/tmp/dataset/hourly", 
"drwxrw-rw-", "owner1", "group1", true);
+      setFsMockPathWithPermissions(sourceFs, "/tmp/dataset2/hourly", 
"dr-xr-xr-x", "owner2", "group2", true);
+      setFsMockPathWithPermissions(sourceFs, "/tmp/dataset", "drwxr-x---", 
"owner1", "group1", true);
+      setFsMockPathWithPermissions(sourceFs, "/tmp/dataset2", "dr-xr-xr-x", 
"owner2", "group2", true);
+      setFsMockPathWithPermissions(sourceFs, "/tmp", "dr--r--r--", "owner3", 
"group3", true);
+
+      // Specify a different acl for the destination file so that it is 
recopied even though the modification time is the same
+      AclStatus aclStatusDest = 
buildAclStatusWithPermissions("user::r--,group::---,other::---", "group3", 
"owner3");
+      
Mockito.when(destFs.getAclStatus(any(Path.class))).thenReturn(aclStatusDest);
+
+      Iterator<FileSet<CopyEntity>> fileSets =
+          new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, 
props).getFileSetIterator(destFs,
+              CopyConfiguration.builder(destFs, props).build());
+      Assert.assertTrue(fileSets.hasNext());
+      FileSet<CopyEntity> fileSet = fileSets.next();
+      System.out.println(fileSet.getFiles().get(6).toString());
+      // 4 files to copy + 1 pre publish step + 1 post publish step + 1 
deleteFileCommitStep for a temporary directory
+      Assert.assertEquals(fileSet.getFiles().size(), 7);
+      CommitStep createDirectoryStep = ((PrePublishStep) 
fileSet.getFiles().get(4)).getStep();
+      Assert.assertTrue(createDirectoryStep instanceof 
CreateDirectoryWithPermissionsCommitStep);
+      Map<String, List<OwnerAndPermission>> pathAndPermissions = 
((CreateDirectoryWithPermissionsCommitStep) 
createDirectoryStep).getPathAndPermissions();
+      Assert.assertEquals(pathAndPermissions.size(), 2);
+      
Assert.assertTrue(pathAndPermissions.containsKey("/tmp/dataset/hourly/metadata"));
+      
Assert.assertTrue(pathAndPermissions.containsKey("/tmp/dataset2/hourly/metadata"));
+
+      CommitStep setPermissionStep = ((PostPublishStep) 
fileSet.getFiles().get(5)).getStep();
+      Assert.assertTrue(setPermissionStep instanceof SetPermissionCommitStep);
+      Map<String, OwnerAndPermission> ownerAndPermissionMap = 
((SetPermissionCommitStep) setPermissionStep).getPathAndPermissions();
+      // Ignore /tmp as it already exists on destination
+      Assert.assertEquals(ownerAndPermissionMap.size(), 7);
+      System.out.println(ownerAndPermissionMap);
+      List<String> sortedMapKeys = new 
ArrayList<>(ownerAndPermissionMap.keySet());
+      Assert.assertEquals(sortedMapKeys.get(0), "/tmp");
+      Assert.assertEquals(ownerAndPermissionMap.get("/tmp").getFsPermission(), 
FsPermission.valueOf("dr--r--r--"));
+      Assert.assertEquals(ownerAndPermissionMap.get("/tmp").getOwner(), 
"owner3");
+      Assert.assertEquals(ownerAndPermissionMap.get("/tmp").getGroup(), 
"group3");
+
+      Assert.assertEquals(sortedMapKeys.get(1), "/tmp/dataset");
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset").getFsPermission(),
 FsPermission.valueOf("drwxr-x---"));
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset").getOwner(), 
"owner1");
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset").getGroup(), 
"group1");
+
+      Assert.assertEquals(sortedMapKeys.get(2), "/tmp/dataset2");
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2").getFsPermission(),
 FsPermission.valueOf("dr-xr-xr-x"));
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2").getOwner(), 
"owner2");
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2").getGroup(), 
"group2");
+
+      Assert.assertEquals(sortedMapKeys.get(3), "/tmp/dataset/hourly");
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset/hourly").getFsPermission(),
 FsPermission.valueOf("drwxrw-rw-"));
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset/hourly").getOwner(),
 "owner1");
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset/hourly").getGroup(),
 "group1");
+
+      Assert.assertEquals(sortedMapKeys.get(4), "/tmp/dataset2/hourly");
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2/hourly").getFsPermission(),
 FsPermission.valueOf("dr-xr-xr-x"));
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2/hourly").getOwner(),
 "owner2");
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2/hourly").getGroup(),
 "group2");
+
+      Assert.assertEquals(sortedMapKeys.get(5), 
"/tmp/dataset/hourly/metadata");
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset/hourly/metadata").getFsPermission(),
 FsPermission.valueOf("drwxrw-rw-"));
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset/hourly/metadata").getOwner(),
 "owner1");
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset/hourly/metadata").getGroup(),
 "group1");
+
+      Assert.assertEquals(sortedMapKeys.get(6), 
"/tmp/dataset2/hourly/metadata");
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2/hourly/metadata").getFsPermission(),
 FsPermission.valueOf("dr-xr-xr-x"));
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2/hourly/metadata").getOwner(),
 "owner2");
+      
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2/hourly/metadata").getGroup(),
 "group2");
+    }
+  }
+
+  @Test
+  public void testDisableSetPermissionStep() throws Exception {
+    //Get manifest Path
+    String manifestLocation = 
getClass().getClassLoader().getResource("manifestBasedDistcpTest/manifestRootDirEmpty.json").getPath();
+    // Test manifestDatasetFinder
+    Properties props = new Properties();
+    props.setProperty("gobblin.copy.manifestBased.manifest.location", 
manifestLocation);
+    
props.setProperty("gobblin.copy.manifestBased.enableSetPermissionPostPublish", 
"false");
+    props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/");
+    ManifestBasedDatasetFinder finder = new 
ManifestBasedDatasetFinder(localFs, props);
+    List<ManifestBasedDataset> datasets = finder.findDatasets();
+    Assert.assertEquals(datasets.size(), 1);
+    FileSystem sourceFs = Mockito.mock(FileSystem.class);
+    FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
+    FileSystem destFs = Mockito.mock(FileSystem.class);
+    Path manifestPath = new Path(manifestLocation);
+    setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs, 
true);
+    Iterator<FileSet<CopyEntity>> fileSets = new 
ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, 
props).getFileSetIterator(destFs,
+        CopyConfiguration.builder(destFs, props).build());
+    Assert.assertTrue(fileSets.hasNext());
+    FileSet<CopyEntity> fileSet = fileSets.next();
+    Assert.assertEquals(fileSet.getFiles().size(), 2);  // 1 files to copy + 1 
pre publish step
+  }
+
+  private void setSourceAndDestFsMocks(FileSystem sourceFs, FileSystem destFs, 
Path manifestPath, FileSystem manifestReadFs, boolean setFileStatusMock) throws 
IOException, URISyntaxException {
     URI SRC_FS_URI = new URI("source", "the.source.org", "/", null);
     URI MANIFEST_READ_FS_URI = new URI("manifest-read", 
"the.manifest-source.org", "/", null);
     URI DEST_FS_URI = new URI("dest", "the.dest.org", "/", null);
@@ -231,7 +351,9 @@ public class ManifestBasedDatasetFinderTest {
     Mockito.when(manifestReadFs.getUri()).thenReturn(MANIFEST_READ_FS_URI);
     Mockito.when(destFs.getUri()).thenReturn(DEST_FS_URI);
     Mockito.when(destFs.exists(new Path("/tmp"))).thenReturn(true);
-    
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new
 Path(tmpDir.toString())));
+    if (setFileStatusMock) {
+      
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new
 Path(tmpDir.toString())));
+    }
     Mockito.when(sourceFs.exists(any(Path.class))).thenReturn(true);
     Mockito.when(manifestReadFs.exists(any(Path.class))).thenReturn(true);
     
Mockito.when(manifestReadFs.getFileStatus(manifestPath)).thenReturn(localFs.getFileStatus(manifestPath));
@@ -243,4 +365,19 @@ public class ManifestBasedDatasetFinderTest {
       return localFs.makeQualified(path);
     }).when(sourceFs).makeQualified(any(Path.class));
   }
+
+  private AclStatus buildAclStatusWithPermissions(String aclSpec, String 
group, String owner) {
+    List<AclEntry> aclEntries = AclEntry.parseAclSpec(aclSpec, true);
+    return new 
AclStatus.Builder().group(group).owner(owner).addEntries(aclEntries).build();
+  }
+
+  private FileStatus createFileStatus(String path, boolean isDir, String 
owner, String group, FsPermission permission) throws IOException {
+    return new FileStatus(1028, isDir, 0, 0, 0, 0, permission, owner, group, 
null, new Path(path));
+  }
+
+  private void setFsMockPathWithPermissions(FileSystem fs, String path, String 
permissionStr, String owner, String group, boolean isDir) throws IOException {
+    AclStatus aclStatus = new 
AclStatus.Builder().owner(owner).group(group).build();
+    Mockito.when(fs.getFileStatus(new 
Path(path))).thenReturn(createFileStatus(path, isDir, owner, group, 
FsPermission.valueOf(permissionStr)));
+    Mockito.when(fs.getAclStatus(new Path(path))).thenReturn(aclStatus);
+  }
 }
diff --git 
a/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/longNestedDirectoryTreeManifest.json
 
b/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/longNestedDirectoryTreeManifest.json
new file mode 100644
index 0000000000..5df299399d
--- /dev/null
+++ 
b/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/longNestedDirectoryTreeManifest.json
@@ -0,0 +1,26 @@
+[
+  {
+    "id":"1",
+    "fileName":"/tmp/dataset/hourly/metadata/test1.txt",
+    "fileGroup":"/tmp/dataset",
+    "fileSizeInBytes":"1024"
+  },
+  {
+    "id":"2",
+    "fileName":"/tmp/dataset/hourly/metadata/test2.txt",
+    "fileGroup":"/tmp/dataset",
+    "fileSizeInBytes":"1028"
+  },
+  {
+    "id":"3",
+    "fileName":"/tmp/dataset2/hourly/metadata/test1.txt",
+    "fileGroup":"/tmp/dataset2",
+    "fileSizeInBytes":"1028"
+  },
+  {
+    "id":"4",
+    "fileName":"/tmp/dataset2/hourly/metadata/test2.txt",
+    "fileGroup":"/tmp/dataset2",
+    "fileSizeInBytes":"1028"
+  }
+]
\ No newline at end of file

Reply via email to