This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ad72742  [GOBBLIN-1601] implement ChangePermissionCommitStep (#3457)
ad72742 is described below

commit ad727427e3fb726d5066034e7335abac1457c60d
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Jan 31 10:42:01 2022 -0800

    [GOBBLIN-1601] implement ChangePermissionCommitStep (#3457)
    
    * implement ChangePermissionCommitStep
    add a configuration to match permission of ancestor directories permissions 
in source and destination
    
    * address review comments
---
 .../gobblin/data/management/copy/CopySource.java   |  9 ++-
 .../gobblin/data/management/copy/CopyableFile.java | 50 +++++++++++--
 .../management/copy/RecursiveCopyableDataset.java  | 29 +++++++-
 .../writer/FileAwareInputStreamDataWriter.java     |  4 +-
 .../util/commit/SetPermissionCommitStep.java       | 81 ++++++++++++++++++++++
 .../util/commit/SetPermissionCommitStepTest.java   | 76 ++++++++++++++++++++
 6 files changed, 234 insertions(+), 15 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 437d070..074b86a 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -27,11 +27,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-import javax.annotation.Nullable;
-
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 
@@ -46,6 +41,10 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.SetMultimap;
 
+import javax.annotation.Nullable;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index dbe9920..fd04fa3 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -17,16 +17,27 @@
 
 package org.apache.gobblin.data.management.copy;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import lombok.AccessLevel;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
+
 import org.apache.gobblin.data.management.copy.PreserveAttributes.Option;
 import org.apache.gobblin.data.management.partition.File;
 import org.apache.gobblin.dataset.DatasetConstants;
@@ -35,11 +46,6 @@ import org.apache.gobblin.dataset.Descriptor;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.guid.Guid;
-import org.apache.hadoop.fs.FileChecksum;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
 
 
 /**
@@ -354,6 +360,36 @@ public class CopyableFile extends CopyEntity implements 
File {
     return ownerAndPermissions;
   }
 
+  public static Map<String, OwnerAndPermission> 
resolveReplicatedAncestorOwnerAndPermissionsRecursively(FileSystem sourceFs, 
Path fromPath,
+      Path toPath, CopyConfiguration copyConfiguration) throws IOException {
+    
Preconditions.checkArgument(sourceFs.getFileStatus(fromPath).isDirectory(), 
"Source path must be a directory.");
+
+    Map<String, OwnerAndPermission> ownerAndPermissions = Maps.newHashMap();
+
+    // We only pass directories to this method anyways. Those directories 
themselves need permissions set.
+    Path currentOriginPath = fromPath;
+    Path currentTargetPath = toPath;
+
+    if (!PathUtils.isAncestor(currentTargetPath, currentOriginPath)) {
+      throw new IOException(String.format("currentTargetPath %s must be an 
ancestor of currentOriginPath %s.", currentTargetPath, currentOriginPath));
+    }
+
+    while (PathUtils.isAncestor(currentTargetPath, 
currentOriginPath.getParent())) {
+      
ownerAndPermissions.put(PathUtils.getPathWithoutSchemeAndAuthority(currentOriginPath).toString(),
 resolveReplicatedOwnerAndPermission(sourceFs, currentOriginPath, 
copyConfiguration));
+      currentOriginPath = currentOriginPath.getParent();
+    }
+
+    // Walk through the parents and preserve the permissions from Origin -> 
Target as we go in lockstep.
+    while (currentOriginPath != null && currentTargetPath != null
+        && currentOriginPath.getName().equals(currentTargetPath.getName())) {
+      
ownerAndPermissions.put(PathUtils.getPathWithoutSchemeAndAuthority(currentOriginPath).toString(),
 resolveReplicatedOwnerAndPermission(sourceFs, currentOriginPath, 
copyConfiguration));
+      currentOriginPath = currentOriginPath.getParent();
+      currentTargetPath = currentTargetPath.getParent();
+    }
+
+    return ownerAndPermissions;
+  }
+
   @Override
   public FileStatus getFileStatus() {
     return this.origin;
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index 03d9128..2c15b9e 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.TreeMap;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -36,11 +37,13 @@ import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
 import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
 import org.apache.gobblin.data.management.dataset.DatasetUtils;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.util.FileListUtils;
 import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.commit.SetPermissionCommitStep;
 import org.apache.gobblin.util.commit.DeleteFileCommitStep;
 
 
@@ -58,6 +61,8 @@ public class RecursiveCopyableDataset implements 
CopyableDataset, FileSystemData
   public static final String DELETE_KEY = CONFIG_PREFIX + ".delete";
   /** If true, will delete newly empty directories up to the dataset root. */
   public static final String DELETE_EMPTY_DIRECTORIES_KEY = CONFIG_PREFIX + 
".deleteEmptyDirectories";
+  /** If true, will use our new logic to preserve permissions, owner, and 
group of ancestors. */
+  public static final String USE_NEW_PRESERVE_LOGIC_KEY = CONFIG_PREFIX + 
".useNewPreserveLogic";
 
   private final Path rootPath;
   private final FileSystem fs;
@@ -74,6 +79,8 @@ public class RecursiveCopyableDataset implements 
CopyableDataset, FileSystemData
   private final boolean deleteEmptyDirectories;
   //Apply filter to directories
   private final boolean applyFilterToDirectories;
+  // Use new preserve logic which recurses down and walks the parent links up 
for preservation of permissions, user, and group.
+  private final boolean useNewPreserveLogic;
 
   private final Properties properties;
 
@@ -93,6 +100,7 @@ public class RecursiveCopyableDataset implements 
CopyableDataset, FileSystemData
         
Boolean.parseBoolean(properties.getProperty(CopyConfiguration.INCLUDE_EMPTY_DIRECTORIES));
     this.applyFilterToDirectories =
         
Boolean.parseBoolean(properties.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES,
 "false"));
+    this.useNewPreserveLogic = 
Boolean.parseBoolean(properties.getProperty(USE_NEW_PRESERVE_LOGIC_KEY));
     this.properties = properties;
   }
 
@@ -133,10 +141,21 @@ public class RecursiveCopyableDataset implements 
CopyableDataset, FileSystemData
     List<CopyEntity> copyEntities = Lists.newArrayList();
     List<CopyableFile> copyableFiles = Lists.newArrayList();
 
+    // map of paths and permissions sorted by depth of path, so that 
permissions can be set in order
+    Map<String, OwnerAndPermission> ancestorOwnerAndPermissions = new 
TreeMap<>(
+        (o1, o2) -> Long.compare(o2.chars().filter(ch -> ch == '/').count(), 
o1.chars().filter(ch -> ch == '/').count()));
+
     for (Path path : toCopy) {
       FileStatus file = filesInSource.get(path);
       Path filePathRelativeToSearchPath = 
PathUtils.relativizePath(file.getPath(), replacedPrefix);
       Path thisTargetPath = new Path(replacingPrefix, 
filePathRelativeToSearchPath);
+
+      if (this.useNewPreserveLogic) {
+        ancestorOwnerAndPermissions.putAll(CopyableFile
+            .resolveReplicatedAncestorOwnerAndPermissionsRecursively(this.fs, 
file.getPath().getParent(),
+                replacedPrefix, configuration));
+      }
+
       CopyableFile copyableFile =
               CopyableFile.fromOriginAndDestination(this.fs, file, 
thisTargetPath, configuration)
                       .fileSet(datasetURN())
@@ -153,8 +172,16 @@ public class RecursiveCopyableDataset implements 
CopyableDataset, FileSystemData
     if (!toDelete.isEmpty()) {
       CommitStep step = new DeleteFileCommitStep(targetFs, toDelete.values(), 
this.properties,
               this.deleteEmptyDirectories ? 
Optional.of(deleteEmptyDirectoriesUpTo) : Optional.<Path>absent());
-      copyEntities.add(new PrePublishStep(datasetURN(), Maps.<String, 
String>newHashMap(), step, 1));
+      copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), 
step, 1));
     }
+
+    if (this.useNewPreserveLogic) {
+      Properties props = new Properties();
+      props.setProperty(SetPermissionCommitStep.STOP_ON_ERROR_KEY, "true");
+      CommitStep step = new SetPermissionCommitStep(targetFs, 
ancestorOwnerAndPermissions, props);
+      copyEntities.add(new PostPublishStep(datasetURN(), Maps.newHashMap(), 
step, 1));
+    }
+
     return copyEntities;
   }
 
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index c42d51f..96e6333 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -486,14 +486,14 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
       }
 
       if (ownerAndPermission.getFsPermission() != null) {
-        log.debug("Applying permissions %s to path %s.", 
ownerAndPermission.getFsPermission(), path);
+        log.debug("Applying permissions {} to path {}.", 
ownerAndPermission.getFsPermission(), path);
         fs.setPermission(path, 
addExecutePermissionToOwner(ownerAndPermission.getFsPermission()));
       }
 
       String group = ownerAndPermission.getGroup();
       String owner = ownerAndPermission.getOwner();
       if (group != null || owner != null) {
-        log.debug("Applying owner %s and group %s to path %s.", owner, group, 
path);
+        log.debug("Applying owner {} and group {} to path {}.", owner, group, 
path);
         fs.setOwner(path, owner, group);
       }
     } else {
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/SetPermissionCommitStep.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/SetPermissionCommitStep.java
new file mode 100644
index 0000000..dd1a68b
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/SetPermissionCommitStep.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.commit;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessControlException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.data.management.copy.OwnerAndPermission;
+
+/**
+ * An implementation of {@link CommitStep} for setting any file permissions.
+ * Current implementation only sets permissions, but it is capable of setting 
owner and group as well.
+ */
+@Slf4j
+public class SetPermissionCommitStep implements CommitStep {
+  Map<String, OwnerAndPermission> pathAndPermissions;
+  private final URI fsUri;
+  public final boolean stopOnError;
+  public static final String STOP_ON_ERROR_KEY = "stop.on.error";
+  public static final String DEFAULT_STOP_ON_ERROR = "false";
+  private boolean isCompleted = false;
+
+  public SetPermissionCommitStep(FileSystem targetFs, Map<String, 
OwnerAndPermission> pathAndPermissions,
+      Properties props) {
+    this.pathAndPermissions = pathAndPermissions;
+    this.fsUri = targetFs.getUri();
+    this.stopOnError = 
Boolean.parseBoolean(props.getProperty(STOP_ON_ERROR_KEY, 
DEFAULT_STOP_ON_ERROR));
+  }
+
+  @Override
+  public boolean isCompleted() throws IOException {
+    return isCompleted;
+  }
+
+  @Override
+  public void execute() throws IOException {
+    FileSystem fs = FileSystem.get(this.fsUri, new Configuration());
+
+    for (Map.Entry<String, OwnerAndPermission> entry : 
pathAndPermissions.entrySet()) {
+      Path path = new Path(entry.getKey());
+      try {
+        log.info("Setting permission {} on path {}", 
entry.getValue().getFsPermission(), path);
+        fs.setPermission(path, entry.getValue().getFsPermission());
+        // TODO : we can also set owner and group here.
+      } catch (AccessControlException e) {
+        log.warn("Error while setting permission on " + path, e);
+        if (this.stopOnError) {
+          log.info("Skip setting rest of the permissions because stopOnError 
is true.");
+          break;
+        }
+      }
+    }
+
+    isCompleted = true;
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/SetPermissionCommitStepTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/SetPermissionCommitStepTest.java
new file mode 100644
index 0000000..5df1b8f
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/SetPermissionCommitStepTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.commit;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.data.management.copy.OwnerAndPermission;
+
+
+/**
+ * Test for {@link SetPermissionCommitStep}.
+ */
+@Test(groups = { "gobblin.commit" })
+public class SetPermissionCommitStepTest {
+  private static final String ROOT_DIR = "set-permission-commit-step-test";
+
+  private FileSystem fs;
+  private SetPermissionCommitStep step;
+  Path dir1;
+  FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, 
FsAction.ALL);
+
+  @BeforeClass
+  public void setUp() throws IOException {
+    this.fs = FileSystem.getLocal(new Configuration());
+    this.fs.delete(new Path(ROOT_DIR), true);
+
+    dir1 = new Path(ROOT_DIR, "dir1");
+    this.fs.mkdirs(dir1);
+
+    OwnerAndPermission ownerAndPermission = new OwnerAndPermission("owner", 
"group", permission);
+    Map<String, OwnerAndPermission> pathAndPermissions = new HashMap<>();
+    pathAndPermissions.put(dir1.toString(), ownerAndPermission);
+
+    this.step = new SetPermissionCommitStep(this.fs, pathAndPermissions, new 
Properties());
+  }
+
+  @AfterClass
+  public void tearDown() throws IOException {
+    this.fs.delete(new Path(ROOT_DIR), true);
+  }
+
+  @Test
+  public void testExecute() throws IOException {
+    Assert.assertNotEquals(this.fs.getFileStatus(dir1).getPermission(), 
permission);
+    this.step.execute();
+    Assert.assertEquals(this.fs.getFileStatus(dir1).getPermission(), 
permission);
+  }
+}

Reply via email to