This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6371f06ab [GOBBLIN-1973] Change Manifest distcp logic to compare 
permissions of source and dest files even when source is older (#3845)
6371f06ab is described below

commit 6371f06ab0508d6396e0b7878316dc9b21314c44
Author: William Lo <[email protected]>
AuthorDate: Thu Dec 14 16:35:45 2023 -0500

    [GOBBLIN-1973] Change Manifest distcp logic to compare permissions of 
source and dest files even when source is older (#3845)
    
    * change should copy logic
    
    * Add tests, address review
    
    * Fix checkstyle
    
    * Remove unused imports
---
 .../data/management/copy/ManifestBasedDataset.java |   7 +-
 .../dataset/ManifestBasedDatasetFinderTest.java    | 122 +++++++++++++++++----
 2 files changed, 101 insertions(+), 28 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
index 6969123e5..b81d59663 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
@@ -176,10 +176,7 @@ public class ManifestBasedDataset implements 
IterableCopyableDataset {
 
   private static boolean shouldCopy(FileSystem targetFs, FileStatus 
fileInSource, FileStatus fileInTarget, OwnerAndPermission replicatedPermission)
       throws IOException {
-    if (fileInSource.isDirectory() || fileInSource.getModificationTime() == 
fileInTarget.getModificationTime()) {
-      // if source is dir or source and dst has same version, we compare the 
permission to determine whether it needs another sync
-      return !replicatedPermission.hasSameOwnerAndPermission(targetFs, 
fileInTarget);
-    }
-    return fileInSource.getModificationTime() > 
fileInTarget.getModificationTime();
+    // Copy only if source is newer than target or if the owner or permission 
is different
+    return fileInSource.getModificationTime() > 
fileInTarget.getModificationTime() || 
!replicatedPermission.hasSameOwnerAndPermission(targetFs, fileInTarget);
   }
 }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
index 4fb7db592..a0dbcacad 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
@@ -36,6 +36,8 @@ import org.apache.gobblin.util.commit.SetPermissionCommitStep;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -79,34 +81,18 @@ public class ManifestBasedDatasetFinderTest {
     Properties props = new Properties();
     props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/");
 
-    try (
-        FileSystem sourceFs = Mockito.mock(FileSystem.class);
+    try (FileSystem sourceFs = Mockito.mock(FileSystem.class);
         FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
-        FileSystem destFs = Mockito.mock(FileSystem.class);
-    ) {
-      URI SRC_FS_URI = new URI("source", "the.source.org", "/", null);
-      URI MANIFEST_READ_FS_URI = new URI("manifest-read", 
"the.manifest-source.org", "/", null);
-      URI DEST_FS_URI = new URI("dest", "the.dest.org", "/", null);
-      Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI);
-      Mockito.when(manifestReadFs.getUri()).thenReturn(MANIFEST_READ_FS_URI);
-      Mockito.when(destFs.getUri()).thenReturn(DEST_FS_URI);
-      
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new
 Path(tmpDir.toString())));
-      Mockito.when(sourceFs.exists(any(Path.class))).thenReturn(true);
-      Mockito.when(manifestReadFs.exists(any(Path.class))).thenReturn(true);
-      
Mockito.when(manifestReadFs.getFileStatus(manifestPath)).thenReturn(localFs.getFileStatus(manifestPath));
-      
Mockito.when(manifestReadFs.open(manifestPath)).thenReturn(localFs.open(manifestPath));
-      Mockito.when(destFs.exists(any(Path.class))).thenReturn(false);
-      Mockito.doAnswer(invocation -> {
-        Object[] args = invocation.getArguments();
-        Path path = (Path)args[0];
-        return localFs.makeQualified(path);
-      }).when(sourceFs).makeQualified(any(Path.class));
+        FileSystem destFs = Mockito.mock(FileSystem.class);) {
+      setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs);
+
       Iterator<FileSet<CopyEntity>> fileSets =
-          new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, 
props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs, 
props).build());
+          new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, 
props).getFileSetIterator(destFs,
+              CopyConfiguration.builder(destFs, props).build());
       Assert.assertTrue(fileSets.hasNext());
       FileSet<CopyEntity> fileSet = fileSets.next();
       Assert.assertEquals(fileSet.getFiles().size(), 3);  // 2 files to copy + 
1 post publish step
-      Assert.assertTrue(((PostPublishStep)fileSet.getFiles().get(2)).getStep() 
instanceof SetPermissionCommitStep);
+      Assert.assertTrue(((PostPublishStep) 
fileSet.getFiles().get(2)).getStep() instanceof SetPermissionCommitStep);
       Mockito.verify(manifestReadFs, Mockito.times(1)).exists(manifestPath);
       Mockito.verify(manifestReadFs, 
Mockito.times(1)).getFileStatus(manifestPath);
       Mockito.verify(manifestReadFs, Mockito.times(1)).open(manifestPath);
@@ -114,4 +100,94 @@ public class ManifestBasedDatasetFinderTest {
       Mockito.verify(sourceFs, Mockito.times(2)).exists(any(Path.class));
     }
   }
+
+  @Test
+  public void testFindFilesWithDifferentPermissions() throws IOException, 
URISyntaxException {
+
+    //Get manifest Path
+    Path manifestPath = new 
Path(getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath());
+    // Test manifestDatasetFinder
+    Properties props = new Properties();
+    props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/");
+    props.setProperty("gobblin.copy.preserved.attributes", "rbugpvta");
+    try (
+        FileSystem sourceFs = Mockito.mock(FileSystem.class);
+        FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
+        FileSystem destFs = Mockito.mock(FileSystem.class)
+    ) {
+      setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs);
+      Mockito.when(destFs.exists(new 
Path("/tmp/dataset/test1.txt"))).thenReturn(true);
+      Mockito.when(destFs.exists(new 
Path("/tmp/dataset/test2.txt"))).thenReturn(false);
+      
Mockito.when(destFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new
 Path(tmpDir.toString())));
+
+      List<AclEntry> aclEntrySource = 
AclEntry.parseAclSpec("user::rwx,group::rwx,other::rwx", true);
+      AclStatus aclStatusSource = new 
AclStatus.Builder().group("group").owner("owner").addEntries(aclEntrySource).build();
+      
Mockito.when(sourceFs.getAclStatus(any(Path.class))).thenReturn(aclStatusSource);
+      // Specify a different acl for the destination file so that it is 
recopied even though the modification time is the same
+      List<AclEntry> aclEntryDest = 
AclEntry.parseAclSpec("user::rwx,group::rw-,other::r--", true);
+      AclStatus aclStatusDest = new 
AclStatus.Builder().group("groupDest").owner("owner").addEntries(aclEntryDest).build();
+      
Mockito.when(destFs.getAclStatus(any(Path.class))).thenReturn(aclStatusDest);
+
+      Iterator<FileSet<CopyEntity>> fileSets =
+          new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, 
props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs, 
props).build());
+      Assert.assertTrue(fileSets.hasNext());
+      FileSet<CopyEntity> fileSet = fileSets.next();
+      Assert.assertEquals(fileSet.getFiles().size(), 3);  // 2 files to copy + 
1 post publish step
+      Assert.assertTrue(((PostPublishStep)fileSet.getFiles().get(2)).getStep() 
instanceof SetPermissionCommitStep);
+
+    }
+  }
+
+  @Test
+  public void testIgnoreFilesWithSamePermissions() throws IOException, 
URISyntaxException {
+    //Get manifest Path
+    Path manifestPath = new 
Path(getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath());
+    // Test manifestDatasetFinder
+    Properties props = new Properties();
+    props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/");
+    props.setProperty("gobblin.copy.preserved.attributes", "rbugpvta");
+    try (
+        FileSystem sourceFs = Mockito.mock(FileSystem.class);
+        FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
+        FileSystem destFs = Mockito.mock(FileSystem.class)
+    ) {
+      setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs);
+      Mockito.when(destFs.exists(new 
Path("/tmp/dataset/test1.txt"))).thenReturn(true);
+      Mockito.when(destFs.exists(new 
Path("/tmp/dataset/test2.txt"))).thenReturn(true);
+      
Mockito.when(destFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new
 Path(tmpDir.toString())));
+
+      List<AclEntry> aclEntrySource = 
AclEntry.parseAclSpec("user::rwx,group::rwx,other::rwx", true);
+      AclStatus aclStatusSource = new 
AclStatus.Builder().group("group").owner("owner").addEntries(aclEntrySource).build();
+      
Mockito.when(sourceFs.getAclStatus(any(Path.class))).thenReturn(aclStatusSource);
+      // Same as source acls, files should not be copied
+      AclStatus aclStatusDest = new 
AclStatus.Builder().group("groupDest").owner("owner").addEntries(aclEntrySource).build();
+      
Mockito.when(destFs.getAclStatus(any(Path.class))).thenReturn(aclStatusDest);
+
+      Iterator<FileSet<CopyEntity>> fileSets =
+          new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, 
props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs, 
props).build());
+      Assert.assertTrue(fileSets.hasNext());
+      FileSet<CopyEntity> fileSet = fileSets.next();
+      Assert.assertEquals(fileSet.getFiles().size(), 1); // Post publish step
+    }
+  }
+
+  private void setSourceAndDestFsMocks(FileSystem sourceFs, FileSystem destFs, 
Path manifestPath, FileSystem manifestReadFs) throws IOException, 
URISyntaxException {
+    URI SRC_FS_URI = new URI("source", "the.source.org", "/", null);
+    URI MANIFEST_READ_FS_URI = new URI("manifest-read", 
"the.manifest-source.org", "/", null);
+    URI DEST_FS_URI = new URI("dest", "the.dest.org", "/", null);
+    Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI);
+    Mockito.when(manifestReadFs.getUri()).thenReturn(MANIFEST_READ_FS_URI);
+    Mockito.when(destFs.getUri()).thenReturn(DEST_FS_URI);
+    
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new
 Path(tmpDir.toString())));
+    Mockito.when(sourceFs.exists(any(Path.class))).thenReturn(true);
+    Mockito.when(manifestReadFs.exists(any(Path.class))).thenReturn(true);
+    
Mockito.when(manifestReadFs.getFileStatus(manifestPath)).thenReturn(localFs.getFileStatus(manifestPath));
+    
Mockito.when(manifestReadFs.open(manifestPath)).thenReturn(localFs.open(manifestPath));
+
+    Mockito.doAnswer(invocation -> {
+      Object[] args = invocation.getArguments();
+      Path path = (Path)args[0];
+      return localFs.makeQualified(path);
+    }).when(sourceFs).makeQualified(any(Path.class));
+  }
 }

Reply via email to