This is an automated email from the ASF dual-hosted git repository.

abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 858f706fa0 [GOBBLIN-2234] Fix removal of existing ACLs issue for path 
existing in destination (#4149)
858f706fa0 is described below

commit 858f706fa01e5446fc5e1c261183d8827dce1063
Author: abhishekmjain <[email protected]>
AuthorDate: Mon Nov 3 14:38:12 2025 +0530

    [GOBBLIN-2234] Fix removal of existing ACLs issue for path existing in 
destination (#4149)
    
    * Fix removal of existing ACL issue for path existing in destination
---
 .../copy/publisher/CopyDataPublisher.java          |   3 +-
 .../writer/FileAwareInputStreamDataWriter.java     |  22 ++
 .../writer/FileAwareInputStreamDataWriterTest.java | 322 ++++++++++++++++++++-
 3 files changed, 343 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index 0cc77a9aec..5c32dcf440 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -203,7 +203,8 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
       FileStatus dstFile = 
this.fs.getFileStatus(copyableFile.getDestination());
       // User specifically try to copy dir metadata, so we change the group 
and permissions on destination even when the dir already existed
       log.info("Setting destination directory {} owner and permission to {}", 
dstFile.getPath(), 
copyableFile.getDestinationOwnerAndPermission().getFsPermission());
-      FileAwareInputStreamDataWriter.setPathPermission(this.fs, dstFile, 
copyableFile.getDestinationOwnerAndPermission(), 
this.shouldFailWhenPermissionsFail);
+      // Remove existing ACLs to ensure target matches source exactly (avoid 
ACL accumulation)
+      FileAwareInputStreamDataWriter.setPathPermission(this.fs, dstFile, 
copyableFile.getDestinationOwnerAndPermission(), 
this.shouldFailWhenPermissionsFail, true);
     }
     if (preserveDirModTime || copyableFile.getFileStatus().isFile()) {
       // Preserving File ModTime, and set the access time to an initializing 
value when ModTime is declared to be preserved.
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 0eb50a4607..9978906505 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -381,10 +381,32 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
    */
   public static void setPathPermission(FileSystem fs, FileStatus file, 
OwnerAndPermission ownerAndPermission,
       boolean requirePermissionSetForSuccess) throws IOException {
+    setPathPermission(fs, file, ownerAndPermission, 
requirePermissionSetForSuccess, false);
+  }
+
+  /**
+   * Sets the {@link FsPermission}, owner, group for the path passed. It uses 
`requirePermissionSetForSuccess` param
+   * to determine whether an exception will be thrown or only error log 
printed in the case of failure.
+   * @param requirePermissionSetForSuccess if true then throw exception, 
otherwise log error message and continue when
+   *                                       operations cannot be executed.
+   * @param removeExistingAcls if true, removes existing ACLs before setting 
new ones to avoid accumulation.
+   *                           This should be true when setting permissions on 
existing target files/directories
+   *                           to ensure ACLs match the source exactly.
+   */
+  public static void setPathPermission(FileSystem fs, FileStatus file, 
OwnerAndPermission ownerAndPermission,
+      boolean requirePermissionSetForSuccess, boolean removeExistingAcls) 
throws IOException {
 
     Path path = file.getPath();
     OwnerAndPermission targetOwnerAndPermission = 
setOwnerExecuteBitIfDirectory(file, ownerAndPermission);
     try {
+      // Handle ACLs
+      // Remove existing ACLs first if requested, even if source has no ACLs
+      // This ensures destination matches source exactly (no ACLs if source 
has none)
+      if (removeExistingAcls) {
+        fs.removeAcl(path);
+      }
+
+      // Set new ACLs if source has any
       if (!targetOwnerAndPermission.getAclEntries().isEmpty()) {
         // use modify acls instead of setAcl since latter requires all three 
acl entry types: user, group and others
         // while overwriting the acls for a given path. If anyone is absent it 
fails acl transformation validation.
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
index e53d1eae2f..b9565206fb 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
@@ -684,15 +684,331 @@ public class FileAwareInputStreamDataWriterTest {
   }
 
   /**
-   * Created this class to support `setAcl` method for {@link LocalFileSystem} 
for unit testing since LocalFileSystem
-   * doesn't provide any implementation for `setAcl` method
+   * Test that setPathPermission with removeExistingAcls=true removes existing 
ACLs before adding new ones.
+   */
+  @Test
+  public void testSetPathPermissionRemovesExistingAcls() throws Exception {
+    Path testPath = new Path(testTempPath, "testAclReplacement");
+    fs.mkdirs(testPath);
+    FileStatus fileStatus = fs.getFileStatus(testPath);
+    testPath = fileStatus.getPath();
+
+    // Simulate existing ACLs on target directory
+    List<AclEntry> existingAcls = Lists.newArrayList(
+        AclEntry.parseAclEntry("user:olduser:rwx", true),
+        AclEntry.parseAclEntry("user:anotheruser:r-x", true)
+    );
+    fs.modifyAclEntries(testPath, existingAcls);
+    Assert.assertEquals(existingAcls, fs.getAclEntries(testPath));
+
+    // Set new ACLs with removeExistingAcls=true
+    List<AclEntry> newAcls = Lists.newArrayList(
+        AclEntry.parseAclEntry("user:newuser:rw-", true)
+    );
+    OwnerAndPermission ownerAndPermission = new OwnerAndPermission(null, null,
+        FsPermission.valueOf("drwxr-xr-x"), newAcls);
+
+    FileAwareInputStreamDataWriter.setPathPermission(fs, fileStatus, 
ownerAndPermission, false, true);
+
+    // Verify old ACLs were removed and only new ACLs exist
+    List<AclEntry> resultAcls = fs.getAclEntries(testPath);
+    Assert.assertEquals(1, resultAcls.size());
+    Assert.assertEquals(newAcls, resultAcls);
+    Assert.assertTrue(fs.wasRemoveAclCalled(testPath));
+  }
+
+  /**
+   * Test that setPathPermission with removeExistingAcls=false does NOT remove 
existing ACLs.
+   * This tests the default behavior where ACLs are added without removal.
+   */
+  @Test
+  public void testSetPathPermissionDoesNotRemoveExistingAcls() throws 
Exception {
+    Path testPath = new Path(testTempPath, "testAclAddition");
+    fs.mkdirs(testPath);
+    FileStatus fileStatus = fs.getFileStatus(testPath);
+    testPath = fileStatus.getPath();
+
+    // Set initial ACLs
+    List<AclEntry> initialAcls = Lists.newArrayList(
+        AclEntry.parseAclEntry("user:existinguser:rwx", true)
+    );
+    fs.modifyAclEntries(testPath, initialAcls);
+
+    // Add new ACLs with removeExistingAcls=false
+    List<AclEntry> newAcls = Lists.newArrayList(
+        AclEntry.parseAclEntry("user:newuser:r-x", true)
+    );
+    OwnerAndPermission ownerAndPermission = new OwnerAndPermission(null, null,
+        FsPermission.valueOf("drwxr-xr-x"), newAcls);
+
+    FileAwareInputStreamDataWriter.setPathPermission(fs, fileStatus, 
ownerAndPermission, false, false);
+
+    // Verify removeAcl was NOT called
+    Assert.assertFalse(fs.wasRemoveAclCalled(testPath));
+    // Verify new ACLs were added
+    List<AclEntry> resultAcls = fs.getAclEntries(testPath);
+    Assert.assertNotNull(resultAcls);
+    // Should have old as well as new acls
+    initialAcls.addAll(newAcls);
+    Assert.assertEquals(initialAcls, resultAcls);
+  }
+
+  /**
+   * Test that setPathPermission with default parameter (no 
removeExistingAcls) defaults to false.
+   */
+  @Test
+  public void testSetPathPermissionDefaultBehaviorDoesNotRemoveAcls() throws 
Exception {
+    Path testPath = new Path(testTempPath, "testDefaultBehavior");
+    fs.mkdirs(testPath);
+    FileStatus fileStatus = fs.getFileStatus(testPath);
+    testPath = fileStatus.getPath();
+
+    List<AclEntry> acls = Lists.newArrayList(
+        AclEntry.parseAclEntry("user:testuser:rwx", true)
+    );
+    OwnerAndPermission ownerAndPermission = new OwnerAndPermission(null, null,
+        FsPermission.valueOf("drwxr-xr-x"), acls);
+
+    // Call without removeExistingAcls parameter (should default to false)
+    FileAwareInputStreamDataWriter.setPathPermission(fs, fileStatus, 
ownerAndPermission, false);
+
+    // Verify removeAcl was NOT called (default is false)
+    Assert.assertFalse(fs.wasRemoveAclCalled(testPath));
+    // Verify ACLs were set (if filesystem supports it)
+    List<AclEntry> resultAcls = fs.getAclEntries(testPath);
+    if (resultAcls != null) {
+      Assert.assertEquals(acls, resultAcls);
+    }
+  }
+
+  /**
+   * Test that setPathPermission works correctly for files (not just 
directories).
+   */
+  @Test
+  public void testSetPathPermissionForFile() throws Exception {
+    Path testPath = new Path(testTempPath, "testFile.txt");
+    fs.create(testPath).close();
+    FileStatus fileStatus = fs.getFileStatus(testPath);
+    testPath = fileStatus.getPath();
+
+    // Set initial ACLs on file
+    List<AclEntry> initialAcls = Lists.newArrayList(
+        AclEntry.parseAclEntry("user:olduser:rwx", true)
+    );
+    fs.modifyAclEntries(testPath, initialAcls);
+
+    // Replace with new ACLs
+    List<AclEntry> newAcls = Lists.newArrayList(
+        AclEntry.parseAclEntry("user:newuser:r--", true)
+    );
+    OwnerAndPermission ownerAndPermission = new OwnerAndPermission(null, null,
+        FsPermission.valueOf("-rw-r--r--"), newAcls);
+
+    FileAwareInputStreamDataWriter.setPathPermission(fs, fileStatus, 
ownerAndPermission, false, true);
+
+    // Verify ACLs were replaced
+    Assert.assertTrue(fs.wasRemoveAclCalled(testPath));
+    List<AclEntry> resultAcls = fs.getAclEntries(testPath);
+    Assert.assertNotNull(resultAcls);
+    Assert.assertEquals(newAcls, resultAcls);
+  }
+
+  /**
+   * Test that setPathPermission handles multiple ACL entries correctly.
+   */
+  @Test
+  public void testSetPathPermissionWithMultipleAclEntries() throws Exception {
+    Path testPath = new Path(testTempPath, "testMultipleAcls");
+    fs.mkdirs(testPath);
+    FileStatus fileStatus = fs.getFileStatus(testPath);
+    testPath = fileStatus.getPath();
+
+    // Create multiple ACL entries
+    List<AclEntry> multipleAcls = Lists.newArrayList(
+        AclEntry.parseAclEntry("user:alice:rwx", true),
+        AclEntry.parseAclEntry("user:bob:r-x", true),
+        AclEntry.parseAclEntry("user:charlie:rw-", true),
+        AclEntry.parseAclEntry("group:developers:rwx", true)
+    );
+    OwnerAndPermission ownerAndPermission = new OwnerAndPermission(null, null,
+        FsPermission.valueOf("drwxr-xr-x"), multipleAcls);
+
+    FileAwareInputStreamDataWriter.setPathPermission(fs, fileStatus, 
ownerAndPermission, false, true);
+
+    // Verify all ACLs were set
+    Assert.assertTrue(fs.wasRemoveAclCalled(testPath));
+    List<AclEntry> resultAcls = fs.getAclEntries(testPath);
+    Assert.assertNotNull(resultAcls);
+    Assert.assertEquals(4, resultAcls.size());
+    Assert.assertEquals(multipleAcls, resultAcls);
+  }
+
+  /**
+   * Test that setPathPermission works with directory execute bit handling.
+   * Directories without owner execute bit get it added automatically.
+   */
+  @Test
+  public void testSetPathPermissionWithDirectoryExecuteBit() throws Exception {
+    Path testPath = new Path(testTempPath, "testDirExecuteBit");
+    fs.mkdirs(testPath);
+    FileStatus fileStatus = fs.getFileStatus(testPath);
+    testPath = fileStatus.getPath();
+
+    List<AclEntry> acls = Lists.newArrayList(
+        AclEntry.parseAclEntry("user:testuser:rw-", true)
+    );
+    // Directory permission without owner execute bit
+    OwnerAndPermission ownerAndPermission = new OwnerAndPermission(null, null,
+        FsPermission.valueOf("drw-rw-rw-"), acls);
+
+    FileAwareInputStreamDataWriter.setPathPermission(fs, fileStatus, 
ownerAndPermission, false, true);
+
+    // Verify ACLs were set even with execute bit handling
+    Assert.assertTrue(fs.wasRemoveAclCalled(testPath));
+    List<AclEntry> resultAcls = fs.getAclEntries(testPath);
+    Assert.assertNotNull(resultAcls);
+    Assert.assertEquals(acls, resultAcls);
+  }
+
+  /**
+   * Test that setPathPermission correctly handles the scenario where a 
directory
+   * has existing ACLs that need to be completely replaced.
+   */
+  @Test
+  public void testSetPathPermissionReplacesAllExistingAcls() throws Exception {
+    Path testPath = new Path(testTempPath, "testCompleteReplacement");
+    fs.mkdirs(testPath);
+    FileStatus fileStatus = fs.getFileStatus(testPath);
+    testPath = fileStatus.getPath();
+
+    // Set multiple existing ACLs
+    List<AclEntry> existingAcls = Lists.newArrayList(
+        AclEntry.parseAclEntry("user:user1:rwx", true),
+        AclEntry.parseAclEntry("user:user2:r-x", true),
+        AclEntry.parseAclEntry("group:group1:rwx", true)
+    );
+    fs.modifyAclEntries(testPath, existingAcls);
+
+    // Replace with completely different ACLs
+    List<AclEntry> newAcls = Lists.newArrayList(
+        AclEntry.parseAclEntry("user:differentuser:r--", true)
+    );
+    OwnerAndPermission ownerAndPermission = new OwnerAndPermission(null, null,
+        FsPermission.valueOf("drwxr-xr-x"), newAcls);
+
+    FileAwareInputStreamDataWriter.setPathPermission(fs, fileStatus, 
ownerAndPermission, false, true);
+
+    // Verify complete replacement
+    Assert.assertTrue(fs.wasRemoveAclCalled(testPath));
+    List<AclEntry> resultAcls = fs.getAclEntries(testPath);
+    Assert.assertNotNull(resultAcls);
+    Assert.assertEquals(1, resultAcls.size());
+    Assert.assertEquals(newAcls, resultAcls);
+    // Verify none of the old ACLs exist
+    for (AclEntry oldAcl : existingAcls) {
+      Assert.assertFalse(resultAcls.contains(oldAcl));
+    }
+  }
+
+  /**
+   * Test that permissions are still set correctly when ACL operations are 
performed.
+   */
+  @Test
+  public void testSetPathPermissionSetsPermissionsCorrectly() throws Exception 
{
+    Path testPath = new Path(testTempPath, "testPermissions");
+    fs.mkdirs(testPath);
+    FileStatus fileStatus = fs.getFileStatus(testPath);
+    testPath = fileStatus.getPath();
+
+    List<AclEntry> acls = Lists.newArrayList(
+        AclEntry.parseAclEntry("user:testuser:rwx", true)
+    );
+    FsPermission permission = FsPermission.valueOf("drwxr-x---");
+    OwnerAndPermission ownerAndPermission = new OwnerAndPermission(null, null, 
permission, acls);
+
+    FileAwareInputStreamDataWriter.setPathPermission(fs, fileStatus, 
ownerAndPermission, false, true);
+
+    // Verify ACLs were set
+    Assert.assertTrue(fs.wasRemoveAclCalled(testPath));
+    Assert.assertNotNull(fs.getAclEntries(testPath));
+
+    // Verify permissions were also set (LocalFileSystem adds execute bit for 
directories)
+    FileStatus updatedStatus = fs.getFileStatus(testPath);
+    
Assert.assertTrue(updatedStatus.getPermission().getUserAction().implies(FsAction.EXECUTE));
+  }
+
+  /**
+   * Test that when destination has ACLs but source has no ACLs (empty list),
+   * all ACLs are removed from destination when removeExistingAcls=true.
+   * This ensures destination matches source exactly (no ACLs).
+   */
+  @Test
+  public void testSetPathPermissionRemovesDestinationAclsWhenSourceHasNoAcls() 
throws Exception {
+    Path testPath = new Path(testTempPath, "testEmptySourceAcls");
+    fs.mkdirs(testPath);
+    FileStatus fileStatus = fs.getFileStatus(testPath);
+    testPath = fileStatus.getPath();
+
+    // Destination has existing ACLs
+    List<AclEntry> existingAcls = Lists.newArrayList(
+        AclEntry.parseAclEntry("user:user1:rwx", true),
+        AclEntry.parseAclEntry("user:user2:r-x", true),
+        AclEntry.parseAclEntry("group:group1:rw-", true)
+    );
+    fs.modifyAclEntries(testPath, existingAcls);
+    Assert.assertEquals(existingAcls, fs.getAclEntries(testPath));
+
+    // Source has no ACLs (empty list)
+    List<AclEntry> emptyAcls = Lists.newArrayList();
+    OwnerAndPermission ownerAndPermission = new OwnerAndPermission(null, null,
+        FsPermission.valueOf("drwxr-xr-x"), emptyAcls);
+
+    // Call with removeExistingAcls=true
+    FileAwareInputStreamDataWriter.setPathPermission(fs, fileStatus, 
ownerAndPermission, false, true);
+
+    // Verify removeAcl was called to clear all ACLs
+    Assert.assertTrue(fs.wasRemoveAclCalled(testPath));
+    // Verify no ACLs exist on destination (matches source which has no ACLs)
+    List<AclEntry> resultAcls = fs.getAclEntries(testPath);
+    Assert.assertNull(resultAcls,
+        "Destination should have no ACLs when source has no ACLs and 
removeExistingAcls=true");
+  }
+
+  /**
+   * Enhanced TestLocalFileSystem to support ACL testing with tracking of 
removeAcl calls.
    */
   protected class TestLocalFileSystem extends LocalFileSystem {
     private final ConcurrentHashMap<Path, List<AclEntry>> pathToAclEntries = 
new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Path, Boolean> removeAclCalls = new 
ConcurrentHashMap<>();
+
     @Override
     public void modifyAclEntries(Path path, List<AclEntry> aclEntries) {
-      pathToAclEntries.put(path, aclEntries);
+      // Use computeIfAbsent to reflect true behavior: add to existing ACLs, 
don't replace
+      pathToAclEntries.computeIfAbsent(path, k -> 
Lists.newArrayList()).addAll(aclEntries);
+    }
+
+    @Override
+    public void removeAcl(Path path) {
+      removeAclCalls.put(path, true);
+      pathToAclEntries.remove(path);
+    }
+
+    /**
+     * Get ACL entries for a given path.
+     * @return List of ACL entries, or null if no ACLs are set
+     */
+    public List<AclEntry> getAclEntries(Path path) {
+      return pathToAclEntries.get(path);
     }
+
+    /**
+     * Check if removeAcl was called for a given path.
+     * @return true if removeAcl was called, false otherwise
+     */
+    public boolean wasRemoveAclCalled(Path path) {
+      return removeAclCalls.getOrDefault(path, false);
+    }
+
     public ImmutableMap<Path, List<AclEntry>> getPathToAclEntries() {
       return ImmutableMap.copyOf(pathToAclEntries);
     }

Reply via email to