This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6371f06ab [GOBBLIN-1973] Change Manifest distcp logic to compare
permissions of source and dest files even when source is older (#3845)
6371f06ab is described below
commit 6371f06ab0508d6396e0b7878316dc9b21314c44
Author: William Lo <[email protected]>
AuthorDate: Thu Dec 14 16:35:45 2023 -0500
[GOBBLIN-1973] Change Manifest distcp logic to compare permissions of
source and dest files even when source is older (#3845)
* change should copy logic
* Add tests, address review
* Fix checkstyle
* Remove unused imports
---
.../data/management/copy/ManifestBasedDataset.java | 7 +-
.../dataset/ManifestBasedDatasetFinderTest.java | 122 +++++++++++++++++----
2 files changed, 101 insertions(+), 28 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
index 6969123e5..b81d59663 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
@@ -176,10 +176,7 @@ public class ManifestBasedDataset implements
IterableCopyableDataset {
private static boolean shouldCopy(FileSystem targetFs, FileStatus
fileInSource, FileStatus fileInTarget, OwnerAndPermission replicatedPermission)
throws IOException {
- if (fileInSource.isDirectory() || fileInSource.getModificationTime() ==
fileInTarget.getModificationTime()) {
- // if source is dir or source and dst has same version, we compare the
permission to determine whether it needs another sync
- return !replicatedPermission.hasSameOwnerAndPermission(targetFs,
fileInTarget);
- }
- return fileInSource.getModificationTime() >
fileInTarget.getModificationTime();
+ // Copy only if source is newer than target or if the owner or permission
is different
+ return fileInSource.getModificationTime() >
fileInTarget.getModificationTime() ||
!replicatedPermission.hasSameOwnerAndPermission(targetFs, fileInTarget);
}
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
index 4fb7db592..a0dbcacad 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
@@ -36,6 +36,8 @@ import org.apache.gobblin.util.commit.SetPermissionCommitStep;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -79,34 +81,18 @@ public class ManifestBasedDatasetFinderTest {
Properties props = new Properties();
props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/");
- try (
- FileSystem sourceFs = Mockito.mock(FileSystem.class);
+ try (FileSystem sourceFs = Mockito.mock(FileSystem.class);
FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
- FileSystem destFs = Mockito.mock(FileSystem.class);
- ) {
- URI SRC_FS_URI = new URI("source", "the.source.org", "/", null);
- URI MANIFEST_READ_FS_URI = new URI("manifest-read",
"the.manifest-source.org", "/", null);
- URI DEST_FS_URI = new URI("dest", "the.dest.org", "/", null);
- Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI);
- Mockito.when(manifestReadFs.getUri()).thenReturn(MANIFEST_READ_FS_URI);
- Mockito.when(destFs.getUri()).thenReturn(DEST_FS_URI);
-
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new
Path(tmpDir.toString())));
- Mockito.when(sourceFs.exists(any(Path.class))).thenReturn(true);
- Mockito.when(manifestReadFs.exists(any(Path.class))).thenReturn(true);
-
Mockito.when(manifestReadFs.getFileStatus(manifestPath)).thenReturn(localFs.getFileStatus(manifestPath));
-
Mockito.when(manifestReadFs.open(manifestPath)).thenReturn(localFs.open(manifestPath));
- Mockito.when(destFs.exists(any(Path.class))).thenReturn(false);
- Mockito.doAnswer(invocation -> {
- Object[] args = invocation.getArguments();
- Path path = (Path)args[0];
- return localFs.makeQualified(path);
- }).when(sourceFs).makeQualified(any(Path.class));
+ FileSystem destFs = Mockito.mock(FileSystem.class);) {
+ setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs);
+
Iterator<FileSet<CopyEntity>> fileSets =
- new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath,
props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs,
props).build());
+ new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath,
props).getFileSetIterator(destFs,
+ CopyConfiguration.builder(destFs, props).build());
Assert.assertTrue(fileSets.hasNext());
FileSet<CopyEntity> fileSet = fileSets.next();
Assert.assertEquals(fileSet.getFiles().size(), 3); // 2 files to copy +
1 post publish step
- Assert.assertTrue(((PostPublishStep)fileSet.getFiles().get(2)).getStep()
instanceof SetPermissionCommitStep);
+ Assert.assertTrue(((PostPublishStep)
fileSet.getFiles().get(2)).getStep() instanceof SetPermissionCommitStep);
Mockito.verify(manifestReadFs, Mockito.times(1)).exists(manifestPath);
Mockito.verify(manifestReadFs,
Mockito.times(1)).getFileStatus(manifestPath);
Mockito.verify(manifestReadFs, Mockito.times(1)).open(manifestPath);
@@ -114,4 +100,94 @@ public class ManifestBasedDatasetFinderTest {
Mockito.verify(sourceFs, Mockito.times(2)).exists(any(Path.class));
}
}
+
+ @Test
+ public void testFindFilesWithDifferentPermissions() throws IOException,
URISyntaxException {
+
+ //Get manifest Path
+ Path manifestPath = new
Path(getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath());
+ // Test manifestDatasetFinder
+ Properties props = new Properties();
+ props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/");
+ props.setProperty("gobblin.copy.preserved.attributes", "rbugpvta");
+ try (
+ FileSystem sourceFs = Mockito.mock(FileSystem.class);
+ FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
+ FileSystem destFs = Mockito.mock(FileSystem.class)
+ ) {
+ setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs);
+ Mockito.when(destFs.exists(new
Path("/tmp/dataset/test1.txt"))).thenReturn(true);
+ Mockito.when(destFs.exists(new
Path("/tmp/dataset/test2.txt"))).thenReturn(false);
+
Mockito.when(destFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new
Path(tmpDir.toString())));
+
+ List<AclEntry> aclEntrySource =
AclEntry.parseAclSpec("user::rwx,group::rwx,other::rwx", true);
+ AclStatus aclStatusSource = new
AclStatus.Builder().group("group").owner("owner").addEntries(aclEntrySource).build();
+
Mockito.when(sourceFs.getAclStatus(any(Path.class))).thenReturn(aclStatusSource);
+ // Specify a different acl for the destination file so that it is
recopied even though the modification time is the same
+ List<AclEntry> aclEntryDest =
AclEntry.parseAclSpec("user::rwx,group::rw-,other::r--", true);
+ AclStatus aclStatusDest = new
AclStatus.Builder().group("groupDest").owner("owner").addEntries(aclEntryDest).build();
+
Mockito.when(destFs.getAclStatus(any(Path.class))).thenReturn(aclStatusDest);
+
+ Iterator<FileSet<CopyEntity>> fileSets =
+ new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath,
props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs,
props).build());
+ Assert.assertTrue(fileSets.hasNext());
+ FileSet<CopyEntity> fileSet = fileSets.next();
+ Assert.assertEquals(fileSet.getFiles().size(), 3); // 2 files to copy +
1 post publish step
+ Assert.assertTrue(((PostPublishStep)fileSet.getFiles().get(2)).getStep()
instanceof SetPermissionCommitStep);
+
+ }
+ }
+
+ @Test
+ public void testIgnoreFilesWithSamePermissions() throws IOException,
URISyntaxException {
+ //Get manifest Path
+ Path manifestPath = new
Path(getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath());
+ // Test manifestDatasetFinder
+ Properties props = new Properties();
+ props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/");
+ props.setProperty("gobblin.copy.preserved.attributes", "rbugpvta");
+ try (
+ FileSystem sourceFs = Mockito.mock(FileSystem.class);
+ FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
+ FileSystem destFs = Mockito.mock(FileSystem.class)
+ ) {
+ setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs);
+ Mockito.when(destFs.exists(new
Path("/tmp/dataset/test1.txt"))).thenReturn(true);
+ Mockito.when(destFs.exists(new
Path("/tmp/dataset/test2.txt"))).thenReturn(true);
+
Mockito.when(destFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new
Path(tmpDir.toString())));
+
+ List<AclEntry> aclEntrySource =
AclEntry.parseAclSpec("user::rwx,group::rwx,other::rwx", true);
+ AclStatus aclStatusSource = new
AclStatus.Builder().group("group").owner("owner").addEntries(aclEntrySource).build();
+
Mockito.when(sourceFs.getAclStatus(any(Path.class))).thenReturn(aclStatusSource);
+ // Same as source acls, files should not be copied
+ AclStatus aclStatusDest = new
AclStatus.Builder().group("groupDest").owner("owner").addEntries(aclEntrySource).build();
+
Mockito.when(destFs.getAclStatus(any(Path.class))).thenReturn(aclStatusDest);
+
+ Iterator<FileSet<CopyEntity>> fileSets =
+ new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath,
props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs,
props).build());
+ Assert.assertTrue(fileSets.hasNext());
+ FileSet<CopyEntity> fileSet = fileSets.next();
+ Assert.assertEquals(fileSet.getFiles().size(), 1); // Post publish step
+ }
+ }
+
+ private void setSourceAndDestFsMocks(FileSystem sourceFs, FileSystem destFs,
Path manifestPath, FileSystem manifestReadFs) throws IOException,
URISyntaxException {
+ URI SRC_FS_URI = new URI("source", "the.source.org", "/", null);
+ URI MANIFEST_READ_FS_URI = new URI("manifest-read",
"the.manifest-source.org", "/", null);
+ URI DEST_FS_URI = new URI("dest", "the.dest.org", "/", null);
+ Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI);
+ Mockito.when(manifestReadFs.getUri()).thenReturn(MANIFEST_READ_FS_URI);
+ Mockito.when(destFs.getUri()).thenReturn(DEST_FS_URI);
+
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new
Path(tmpDir.toString())));
+ Mockito.when(sourceFs.exists(any(Path.class))).thenReturn(true);
+ Mockito.when(manifestReadFs.exists(any(Path.class))).thenReturn(true);
+
Mockito.when(manifestReadFs.getFileStatus(manifestPath)).thenReturn(localFs.getFileStatus(manifestPath));
+
Mockito.when(manifestReadFs.open(manifestPath)).thenReturn(localFs.open(manifestPath));
+
+ Mockito.doAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ Path path = (Path)args[0];
+ return localFs.makeQualified(path);
+ }).when(sourceFs).makeQualified(any(Path.class));
+ }
}