This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new eaed5825a3 [GOBBLIN-2128] Fix bugs in setpermission step where it does
not properly add records… (#4020)
eaed5825a3 is described below
commit eaed5825a3e7a5d94361fa56d0fcf9c69c85d911
Author: William Lo <[email protected]>
AuthorDate: Sun Aug 11 15:06:18 2024 -0400
[GOBBLIN-2128] Fix bugs in setpermission step where it does not properly
add records… (#4020)
* Fix bugs in setpermission step where it does not properly add records
with equal file depths
---
.../data/management/copy/ManifestBasedDataset.java | 40 ++++--
.../management/copy/RecursiveCopyableDataset.java | 3 +-
.../dataset/ManifestBasedDatasetFinderTest.java | 153 +++++++++++++++++++--
.../longNestedDirectoryTreeManifest.json | 26 ++++
4 files changed, 198 insertions(+), 24 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
index 834010ff4c..6d7df69043 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
@@ -19,13 +19,12 @@ package org.apache.gobblin.data.management.copy;
import java.io.IOException;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
@@ -65,6 +64,9 @@ public class ManifestBasedDataset implements
IterableCopyableDataset {
private static final String DELETE_FILE_NOT_EXIST_ON_SOURCE =
ManifestBasedDatasetFinder.CONFIG_PREFIX + ".deleteFileNotExistOnSource";
private static final String COMMON_FILES_PARENT =
ManifestBasedDatasetFinder.CONFIG_PREFIX + ".commonFilesParent";
private static final String PERMISSION_CACHE_TTL_SECONDS =
ManifestBasedDatasetFinder.CONFIG_PREFIX + ".permission.cache.ttl.seconds";
+
+ // Enable setting permission post publish to reset permission bits, default
is true
+ private static final String ENABLE_SET_PERMISSION_POST_PUBLISH =
ManifestBasedDatasetFinder.CONFIG_PREFIX + ".enableSetPermissionPostPublish";
private static final String DEFAULT_PERMISSION_CACHE_TTL_SECONDS = "30";
private static final String DEFAULT_COMMON_FILES_PARENT = "/";
private final FileSystem srcFs;
@@ -75,6 +77,8 @@ public class ManifestBasedDataset implements
IterableCopyableDataset {
private final String commonFilesParent;
private final int permissionCacheTTLSeconds;
+ private final boolean enableSetPermissionPostPublish;
+
public ManifestBasedDataset(final FileSystem srcFs, final FileSystem
manifestReadFs, final Path manifestPath, final Properties properties) {
this.srcFs = srcFs;
this.manifestReadFs = manifestReadFs;
@@ -83,6 +87,7 @@ public class ManifestBasedDataset implements
IterableCopyableDataset {
this.deleteFileThatNotExistOnSource =
Boolean.parseBoolean(properties.getProperty(DELETE_FILE_NOT_EXIST_ON_SOURCE,
"false"));
this.commonFilesParent = properties.getProperty(COMMON_FILES_PARENT,
DEFAULT_COMMON_FILES_PARENT);
this.permissionCacheTTLSeconds =
Integer.parseInt(properties.getProperty(PERMISSION_CACHE_TTL_SECONDS,
DEFAULT_PERMISSION_CACHE_TTL_SECONDS));
+ this.enableSetPermissionPostPublish =
Boolean.parseBoolean(properties.getProperty(ENABLE_SET_PERMISSION_POST_PUBLISH,
"true"));
}
@Override
@@ -109,7 +114,7 @@ public class ManifestBasedDataset implements
IterableCopyableDataset {
// map of paths and permissions sorted by depth of path, so that
permissions can be set in order
Map<String, List<OwnerAndPermission>> ancestorOwnerAndPermissions = new
HashMap<>();
TreeMap<String, OwnerAndPermission> flattenedAncestorPermissions = new
TreeMap<>(
- (o1, o2) -> Long.compare(o1.chars().filter(ch -> ch == '/').count(),
o2.chars().filter(ch -> ch == '/').count()));
+ Comparator.comparingInt((String o) ->
o.split("/").length).thenComparing(o -> o));
try {
long startTime = System.currentTimeMillis();
manifests = CopyManifest.getReadIterator(this.manifestReadFs,
this.manifestPath);
@@ -144,7 +149,6 @@ public class ManifestBasedDataset implements
IterableCopyableDataset {
// Avoid duplicate calculation for the same ancestor
if (fromPath != null &&
!ancestorOwnerAndPermissions.containsKey(PathUtils.getPathWithoutSchemeAndAuthority(fromPath).toString())
&& !targetFs.exists(fromPath)) {
ancestorOwnerAndPermissions.put(fromPath.toString(),
copyableFile.getAncestorsOwnerAndPermission());
-
flattenedAncestorPermissions.putAll(CopyableFile.resolveReplicatedAncestorOwnerAndPermissionsRecursively(srcFs,
fromPath, new Path(commonFilesParent), configuration));
}
if (existOnTarget && srcFile.isFile()) {
@@ -157,20 +161,26 @@ public class ManifestBasedDataset implements
IterableCopyableDataset {
toDelete.add(targetFs.getFileStatus(fileToCopy));
}
}
- // Only set permission for newly created folders on target
- // To change permissions for existing dirs, expectation is to add the
folder to the manifest
- Set<String> parentFolders = new
HashSet<>(flattenedAncestorPermissions.keySet());
- for (String folder : parentFolders) {
- if (targetFs.exists(new Path(folder))) {
- flattenedAncestorPermissions.remove(folder);
- }
- }
- // We need both precommit step to create the directories copying to, and
a postcommit step to ensure that the execute bit needed for recursive rename is
reset
+
+ // Precreate the directories to avoid an edge case where recursive
rename can create extra directories in the target
CommitStep createDirectoryWithPermissionsCommitStep = new
CreateDirectoryWithPermissionsCommitStep(targetFs, ancestorOwnerAndPermissions,
this.properties);
- CommitStep setPermissionCommitStep = new
SetPermissionCommitStep(targetFs, flattenedAncestorPermissions,
this.properties);
copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(),
createDirectoryWithPermissionsCommitStep, 1));
- copyEntities.add(new PostPublishStep(datasetURN(), Maps.newHashMap(),
setPermissionCommitStep, 1));
+ if (this.enableSetPermissionPostPublish) {
+ for (Map.Entry<String, List<OwnerAndPermission>>
recursiveParentPermissions : ancestorOwnerAndPermissions.entrySet()) {
+ Path currentPath = new Path(recursiveParentPermissions.getKey());
+ for (OwnerAndPermission ownerAndPermission :
recursiveParentPermissions.getValue()) {
+ // Ignore folders that already exist in destination, we assume
that the publisher will re-sync those permissions if needed and
+ // those folders should be added in the manifest.
+ if
(!flattenedAncestorPermissions.containsKey(currentPath.toString()) &&
!targetFs.exists(currentPath)) {
+ flattenedAncestorPermissions.put(currentPath.toString(),
ownerAndPermission);
+ }
+ currentPath = currentPath.getParent();
+ }
+ }
+ CommitStep setPermissionCommitStep = new
SetPermissionCommitStep(targetFs, flattenedAncestorPermissions,
this.properties);
+ copyEntities.add(new PostPublishStep(datasetURN(), Maps.newHashMap(),
setPermissionCommitStep, 1));
+ }
if (!toDelete.isEmpty()) {
//todo: add support sync for empty dir
CommitStep step = new DeleteFileCommitStep(targetFs, toDelete,
this.properties, Optional.<Path>absent());
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index 2228cad396..0c38f414e0 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.data.management.copy;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -144,7 +145,7 @@ public class RecursiveCopyableDataset implements
CopyableDataset, FileSystemData
// map of paths and permissions sorted by depth of path, so that
permissions can be set in order
TreeMap<String, OwnerAndPermission> ancestorOwnerAndPermissions = new
TreeMap<>(
- (o1, o2) -> Long.compare(o2.chars().filter(ch -> ch == '/').count(),
o1.chars().filter(ch -> ch == '/').count()));
+ Comparator.comparingInt((String o) ->
o.split("/").length).thenComparing(o -> o));
for (Path path : toCopy) {
FileStatus file = filesInSource.get(path);
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
index 33100d7b8d..d81de494d0 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
@@ -21,16 +21,19 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -90,7 +93,7 @@ public class ManifestBasedDatasetFinderTest {
try (FileSystem sourceFs = Mockito.mock(FileSystem.class);
FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
FileSystem destFs = Mockito.mock(FileSystem.class);) {
- setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs);
+ setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs,
true);
Iterator<FileSet<CopyEntity>> fileSets =
new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath,
props).getFileSetIterator(destFs,
@@ -127,7 +130,7 @@ public class ManifestBasedDatasetFinderTest {
FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
FileSystem destFs = Mockito.mock(FileSystem.class)
) {
- setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs);
+ setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs,
true);
Mockito.when(destFs.exists(new
Path("/tmp/dataset/test1.txt"))).thenReturn(true);
Mockito.when(destFs.exists(new
Path("/tmp/dataset/test2.txt"))).thenReturn(false);
Mockito.when(destFs.exists(new Path("/tmp"))).thenReturn(true);
@@ -158,8 +161,6 @@ public class ManifestBasedDatasetFinderTest {
// Ignore /tmp as it already exists on destination
Assert.assertEquals(ownerAndPermissionMap.size(), 1);
Assert.assertTrue(ownerAndPermissionMap.containsKey("/tmp/dataset"));
-
-
}
}
@@ -176,7 +177,7 @@ public class ManifestBasedDatasetFinderTest {
FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
FileSystem destFs = Mockito.mock(FileSystem.class)
) {
- setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs);
+ setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs,
true);
Mockito.when(destFs.exists(new
Path("/tmp/dataset/test1.txt"))).thenReturn(true);
Mockito.when(destFs.exists(new
Path("/tmp/dataset/test2.txt"))).thenReturn(true);
Mockito.when(destFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new
Path(tmpDir.toString())));
@@ -212,7 +213,7 @@ public class ManifestBasedDatasetFinderTest {
FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
FileSystem destFs = Mockito.mock(FileSystem.class);
Path manifestPath = new Path(manifestLocation);
- setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs);
+ setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs,
true);
Iterator<FileSet<CopyEntity>> fileSets = new
ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath,
props).getFileSetIterator(destFs,
CopyConfiguration.builder(destFs, props).build());
Assert.assertTrue(fileSets.hasNext());
@@ -223,7 +224,126 @@ public class ManifestBasedDatasetFinderTest {
}
- private void setSourceAndDestFsMocks(FileSystem sourceFs, FileSystem destFs,
Path manifestPath, FileSystem manifestReadFs) throws IOException,
URISyntaxException {
+ @Test
+ public void testSetPermissionNestedTreePermissions() throws IOException,
URISyntaxException {
+
+ //Get manifest Path
+ Path manifestPath = new
Path(getClass().getClassLoader().getResource("manifestBasedDistcpTest/longNestedDirectoryTreeManifest.json").getPath());
+ // Test manifestDatasetFinder
+ Properties props = new Properties();
+ props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/");
+ props.setProperty("gobblin.copy.preserved.attributes", "rbugpvta");
+ try (FileSystem sourceFs = Mockito.mock(FileSystem.class);
+ FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
+ FileSystem destFs = Mockito.mock(FileSystem.class)) {
+ setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs,
false);
+ // Mock that these files exist but still recopy due to different
permissions
+ Mockito.when(destFs.exists(new
Path("/tmp/dataset/hourly/metadata/test1.txt"))).thenReturn(true);
+ Mockito.when(destFs.exists(new
Path("/tmp/dataset/hourly/metadata/test2.txt"))).thenReturn(true);
+ Mockito.when(destFs.exists(new Path("/tmp"))).thenReturn(false);
+
+
Mockito.when(destFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new
Path(tmpDir.toString())));
+
+ setFsMockPathWithPermissions(sourceFs,
"/tmp/dataset/hourly/metadata/test1.txt", "-rwxrwxrwx", "owner1", "group1",
false);
+ setFsMockPathWithPermissions(sourceFs,
"/tmp/dataset/hourly/metadata/test2.txt", "-rwxrwxrwx", "owner1", "group1",
false);
+ setFsMockPathWithPermissions(sourceFs,
"/tmp/dataset2/hourly/metadata/test1.txt", "-rwxrwxrwx", "owner2", "group2",
false);
+ setFsMockPathWithPermissions(sourceFs,
"/tmp/dataset2/hourly/metadata/test2.txt", "-rwxrwxrwx", "owner2", "group2",
false);
+ setFsMockPathWithPermissions(sourceFs, "/tmp/dataset/hourly/metadata",
"drwxrw-rw-", "owner1", "group1", true);
+ setFsMockPathWithPermissions(sourceFs, "/tmp/dataset2/hourly/metadata",
"dr-xr-xr-x", "owner2", "group2", true);
+ setFsMockPathWithPermissions(sourceFs, "/tmp/dataset/hourly",
"drwxrw-rw-", "owner1", "group1", true);
+ setFsMockPathWithPermissions(sourceFs, "/tmp/dataset2/hourly",
"dr-xr-xr-x", "owner2", "group2", true);
+ setFsMockPathWithPermissions(sourceFs, "/tmp/dataset", "drwxr-x---",
"owner1", "group1", true);
+ setFsMockPathWithPermissions(sourceFs, "/tmp/dataset2", "dr-xr-xr-x",
"owner2", "group2", true);
+ setFsMockPathWithPermissions(sourceFs, "/tmp", "dr--r--r--", "owner3",
"group3", true);
+
+ // Specify a different acl for the destination file so that it is
recopied even though the modification time is the same
+ AclStatus aclStatusDest =
buildAclStatusWithPermissions("user::r--,group::---,other::---", "group3",
"owner3");
+
Mockito.when(destFs.getAclStatus(any(Path.class))).thenReturn(aclStatusDest);
+
+ Iterator<FileSet<CopyEntity>> fileSets =
+ new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath,
props).getFileSetIterator(destFs,
+ CopyConfiguration.builder(destFs, props).build());
+ Assert.assertTrue(fileSets.hasNext());
+ FileSet<CopyEntity> fileSet = fileSets.next();
+ System.out.println(fileSet.getFiles().get(6).toString());
+ // 4 files to copy + 1 pre publish step + 1 post publish step + 1
deleteFileCommitStep for a temporary directory
+ Assert.assertEquals(fileSet.getFiles().size(), 7);
+ CommitStep createDirectoryStep = ((PrePublishStep)
fileSet.getFiles().get(4)).getStep();
+ Assert.assertTrue(createDirectoryStep instanceof
CreateDirectoryWithPermissionsCommitStep);
+ Map<String, List<OwnerAndPermission>> pathAndPermissions =
((CreateDirectoryWithPermissionsCommitStep)
createDirectoryStep).getPathAndPermissions();
+ Assert.assertEquals(pathAndPermissions.size(), 2);
+
Assert.assertTrue(pathAndPermissions.containsKey("/tmp/dataset/hourly/metadata"));
+
Assert.assertTrue(pathAndPermissions.containsKey("/tmp/dataset2/hourly/metadata"));
+
+ CommitStep setPermissionStep = ((PostPublishStep)
fileSet.getFiles().get(5)).getStep();
+ Assert.assertTrue(setPermissionStep instanceof SetPermissionCommitStep);
+ Map<String, OwnerAndPermission> ownerAndPermissionMap =
((SetPermissionCommitStep) setPermissionStep).getPathAndPermissions();
+ // Ignore /tmp as it already exists on destination
+ Assert.assertEquals(ownerAndPermissionMap.size(), 7);
+ System.out.println(ownerAndPermissionMap);
+ List<String> sortedMapKeys = new
ArrayList<>(ownerAndPermissionMap.keySet());
+ Assert.assertEquals(sortedMapKeys.get(0), "/tmp");
+ Assert.assertEquals(ownerAndPermissionMap.get("/tmp").getFsPermission(),
FsPermission.valueOf("dr--r--r--"));
+ Assert.assertEquals(ownerAndPermissionMap.get("/tmp").getOwner(),
"owner3");
+ Assert.assertEquals(ownerAndPermissionMap.get("/tmp").getGroup(),
"group3");
+
+ Assert.assertEquals(sortedMapKeys.get(1), "/tmp/dataset");
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset").getFsPermission(),
FsPermission.valueOf("drwxr-x---"));
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset").getOwner(),
"owner1");
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset").getGroup(),
"group1");
+
+ Assert.assertEquals(sortedMapKeys.get(2), "/tmp/dataset2");
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2").getFsPermission(),
FsPermission.valueOf("dr-xr-xr-x"));
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2").getOwner(),
"owner2");
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2").getGroup(),
"group2");
+
+ Assert.assertEquals(sortedMapKeys.get(3), "/tmp/dataset/hourly");
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset/hourly").getFsPermission(),
FsPermission.valueOf("drwxrw-rw-"));
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset/hourly").getOwner(),
"owner1");
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset/hourly").getGroup(),
"group1");
+
+ Assert.assertEquals(sortedMapKeys.get(4), "/tmp/dataset2/hourly");
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2/hourly").getFsPermission(),
FsPermission.valueOf("dr-xr-xr-x"));
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2/hourly").getOwner(),
"owner2");
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2/hourly").getGroup(),
"group2");
+
+ Assert.assertEquals(sortedMapKeys.get(5),
"/tmp/dataset/hourly/metadata");
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset/hourly/metadata").getFsPermission(),
FsPermission.valueOf("drwxrw-rw-"));
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset/hourly/metadata").getOwner(),
"owner1");
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset/hourly/metadata").getGroup(),
"group1");
+
+ Assert.assertEquals(sortedMapKeys.get(6),
"/tmp/dataset2/hourly/metadata");
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2/hourly/metadata").getFsPermission(),
FsPermission.valueOf("dr-xr-xr-x"));
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2/hourly/metadata").getOwner(),
"owner2");
+
Assert.assertEquals(ownerAndPermissionMap.get("/tmp/dataset2/hourly/metadata").getGroup(),
"group2");
+ }
+ }
+
+ @Test
+ public void testDisableSetPermissionStep() throws Exception {
+ //Get manifest Path
+ String manifestLocation =
getClass().getClassLoader().getResource("manifestBasedDistcpTest/manifestRootDirEmpty.json").getPath();
+ // Test manifestDatasetFinder
+ Properties props = new Properties();
+ props.setProperty("gobblin.copy.manifestBased.manifest.location",
manifestLocation);
+
props.setProperty("gobblin.copy.manifestBased.enableSetPermissionPostPublish",
"false");
+ props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/");
+ ManifestBasedDatasetFinder finder = new
ManifestBasedDatasetFinder(localFs, props);
+ List<ManifestBasedDataset> datasets = finder.findDatasets();
+ Assert.assertEquals(datasets.size(), 1);
+ FileSystem sourceFs = Mockito.mock(FileSystem.class);
+ FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
+ FileSystem destFs = Mockito.mock(FileSystem.class);
+ Path manifestPath = new Path(manifestLocation);
+ setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs,
true);
+ Iterator<FileSet<CopyEntity>> fileSets = new
ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath,
props).getFileSetIterator(destFs,
+ CopyConfiguration.builder(destFs, props).build());
+ Assert.assertTrue(fileSets.hasNext());
+ FileSet<CopyEntity> fileSet = fileSets.next();
+ Assert.assertEquals(fileSet.getFiles().size(), 2); // 1 files to copy + 1
pre publish step
+ }
+
+ private void setSourceAndDestFsMocks(FileSystem sourceFs, FileSystem destFs,
Path manifestPath, FileSystem manifestReadFs, boolean setFileStatusMock) throws
IOException, URISyntaxException {
URI SRC_FS_URI = new URI("source", "the.source.org", "/", null);
URI MANIFEST_READ_FS_URI = new URI("manifest-read",
"the.manifest-source.org", "/", null);
URI DEST_FS_URI = new URI("dest", "the.dest.org", "/", null);
@@ -231,7 +351,9 @@ public class ManifestBasedDatasetFinderTest {
Mockito.when(manifestReadFs.getUri()).thenReturn(MANIFEST_READ_FS_URI);
Mockito.when(destFs.getUri()).thenReturn(DEST_FS_URI);
Mockito.when(destFs.exists(new Path("/tmp"))).thenReturn(true);
-
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new
Path(tmpDir.toString())));
+ if (setFileStatusMock) {
+
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new
Path(tmpDir.toString())));
+ }
Mockito.when(sourceFs.exists(any(Path.class))).thenReturn(true);
Mockito.when(manifestReadFs.exists(any(Path.class))).thenReturn(true);
Mockito.when(manifestReadFs.getFileStatus(manifestPath)).thenReturn(localFs.getFileStatus(manifestPath));
@@ -243,4 +365,19 @@ public class ManifestBasedDatasetFinderTest {
return localFs.makeQualified(path);
}).when(sourceFs).makeQualified(any(Path.class));
}
+
+ private AclStatus buildAclStatusWithPermissions(String aclSpec, String
group, String owner) {
+ List<AclEntry> aclEntries = AclEntry.parseAclSpec(aclSpec, true);
+ return new
AclStatus.Builder().group(group).owner(owner).addEntries(aclEntries).build();
+ }
+
+ private FileStatus createFileStatus(String path, boolean isDir, String
owner, String group, FsPermission permission) throws IOException {
+ return new FileStatus(1028, isDir, 0, 0, 0, 0, permission, owner, group,
null, new Path(path));
+ }
+
+ private void setFsMockPathWithPermissions(FileSystem fs, String path, String
permissionStr, String owner, String group, boolean isDir) throws IOException {
+ AclStatus aclStatus = new
AclStatus.Builder().owner(owner).group(group).build();
+ Mockito.when(fs.getFileStatus(new
Path(path))).thenReturn(createFileStatus(path, isDir, owner, group,
FsPermission.valueOf(permissionStr)));
+ Mockito.when(fs.getAclStatus(new Path(path))).thenReturn(aclStatus);
+ }
}
diff --git
a/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/longNestedDirectoryTreeManifest.json
b/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/longNestedDirectoryTreeManifest.json
new file mode 100644
index 0000000000..5df299399d
--- /dev/null
+++
b/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/longNestedDirectoryTreeManifest.json
@@ -0,0 +1,26 @@
+[
+ {
+ "id":"1",
+ "fileName":"/tmp/dataset/hourly/metadata/test1.txt",
+ "fileGroup":"/tmp/dataset",
+ "fileSizeInBytes":"1024"
+ },
+ {
+ "id":"2",
+ "fileName":"/tmp/dataset/hourly/metadata/test2.txt",
+ "fileGroup":"/tmp/dataset",
+ "fileSizeInBytes":"1028"
+ },
+ {
+ "id":"3",
+ "fileName":"/tmp/dataset2/hourly/metadata/test1.txt",
+ "fileGroup":"/tmp/dataset2",
+ "fileSizeInBytes":"1028"
+ },
+ {
+ "id":"4",
+ "fileName":"/tmp/dataset2/hourly/metadata/test2.txt",
+ "fileGroup":"/tmp/dataset2",
+ "fileSizeInBytes":"1028"
+ }
+]
\ No newline at end of file