This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ad72742 [GOBBLIN-1601] implement ChangePermissionCommitStep (#3457)
ad72742 is described below
commit ad727427e3fb726d5066034e7335abac1457c60d
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Jan 31 10:42:01 2022 -0800
[GOBBLIN-1601] implement ChangePermissionCommitStep (#3457)
* implement ChangePermissionCommitStep
add a configuration to match permission of ancestor directories permissions
in source and destination
* address review comments
---
.../gobblin/data/management/copy/CopySource.java | 9 ++-
.../gobblin/data/management/copy/CopyableFile.java | 50 +++++++++++--
.../management/copy/RecursiveCopyableDataset.java | 29 +++++++-
.../writer/FileAwareInputStreamDataWriter.java | 4 +-
.../util/commit/SetPermissionCommitStep.java | 81 ++++++++++++++++++++++
.../util/commit/SetPermissionCommitStepTest.java | 76 ++++++++++++++++++++
6 files changed, 234 insertions(+), 15 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 437d070..074b86a 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -27,11 +27,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import javax.annotation.Nullable;
-
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -46,6 +41,10 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
+import javax.annotation.Nullable;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index dbe9920..fd04fa3 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -17,16 +17,27 @@
package org.apache.gobblin.data.management.copy;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
+
import org.apache.gobblin.data.management.copy.PreserveAttributes.Option;
import org.apache.gobblin.data.management.partition.File;
import org.apache.gobblin.dataset.DatasetConstants;
@@ -35,11 +46,6 @@ import org.apache.gobblin.dataset.Descriptor;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.guid.Guid;
-import org.apache.hadoop.fs.FileChecksum;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
/**
@@ -354,6 +360,36 @@ public class CopyableFile extends CopyEntity implements
File {
return ownerAndPermissions;
}
+ public static Map<String, OwnerAndPermission>
resolveReplicatedAncestorOwnerAndPermissionsRecursively(FileSystem sourceFs,
Path fromPath,
+ Path toPath, CopyConfiguration copyConfiguration) throws IOException {
+
Preconditions.checkArgument(sourceFs.getFileStatus(fromPath).isDirectory(),
"Source path must be a directory.");
+
+ Map<String, OwnerAndPermission> ownerAndPermissions = Maps.newHashMap();
+
+ // We only pass directories to this method anyways. Those directories
themselves need permissions set.
+ Path currentOriginPath = fromPath;
+ Path currentTargetPath = toPath;
+
+ if (!PathUtils.isAncestor(currentTargetPath, currentOriginPath)) {
+ throw new IOException(String.format("currentTargetPath %s must be an
ancestor of currentOriginPath %s.", currentTargetPath, currentOriginPath));
+ }
+
+ while (PathUtils.isAncestor(currentTargetPath,
currentOriginPath.getParent())) {
+
ownerAndPermissions.put(PathUtils.getPathWithoutSchemeAndAuthority(currentOriginPath).toString(),
resolveReplicatedOwnerAndPermission(sourceFs, currentOriginPath,
copyConfiguration));
+ currentOriginPath = currentOriginPath.getParent();
+ }
+
+ // Walk through the parents and preserve the permissions from Origin ->
Target as we go in lockstep.
+ while (currentOriginPath != null && currentTargetPath != null
+ && currentOriginPath.getName().equals(currentTargetPath.getName())) {
+
ownerAndPermissions.put(PathUtils.getPathWithoutSchemeAndAuthority(currentOriginPath).toString(),
resolveReplicatedOwnerAndPermission(sourceFs, currentOriginPath,
copyConfiguration));
+ currentOriginPath = currentOriginPath.getParent();
+ currentTargetPath = currentTargetPath.getParent();
+ }
+
+ return ownerAndPermissions;
+ }
+
@Override
public FileStatus getFileStatus() {
return this.origin;
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index 03d9128..2c15b9e 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.TreeMap;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -36,11 +37,13 @@ import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
import org.apache.gobblin.data.management.dataset.DatasetUtils;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.commit.SetPermissionCommitStep;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
@@ -58,6 +61,8 @@ public class RecursiveCopyableDataset implements
CopyableDataset, FileSystemData
public static final String DELETE_KEY = CONFIG_PREFIX + ".delete";
/** If true, will delete newly empty directories up to the dataset root. */
public static final String DELETE_EMPTY_DIRECTORIES_KEY = CONFIG_PREFIX +
".deleteEmptyDirectories";
+ /** If true, will use our new logic to preserve permissions, owner, and
group of ancestors. */
+ public static final String USE_NEW_PRESERVE_LOGIC_KEY = CONFIG_PREFIX +
".useNewPreserveLogic";
private final Path rootPath;
private final FileSystem fs;
@@ -74,6 +79,8 @@ public class RecursiveCopyableDataset implements
CopyableDataset, FileSystemData
private final boolean deleteEmptyDirectories;
//Apply filter to directories
private final boolean applyFilterToDirectories;
+ // Use new preserve logic which recurses down and walks the parent links up
for preservation of permissions, user, and group.
+ private final boolean useNewPreserveLogic;
private final Properties properties;
@@ -93,6 +100,7 @@ public class RecursiveCopyableDataset implements
CopyableDataset, FileSystemData
Boolean.parseBoolean(properties.getProperty(CopyConfiguration.INCLUDE_EMPTY_DIRECTORIES));
this.applyFilterToDirectories =
Boolean.parseBoolean(properties.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES,
"false"));
+ this.useNewPreserveLogic =
Boolean.parseBoolean(properties.getProperty(USE_NEW_PRESERVE_LOGIC_KEY));
this.properties = properties;
}
@@ -133,10 +141,21 @@ public class RecursiveCopyableDataset implements
CopyableDataset, FileSystemData
List<CopyEntity> copyEntities = Lists.newArrayList();
List<CopyableFile> copyableFiles = Lists.newArrayList();
+ // map of paths and permissions sorted by depth of path, so that
permissions can be set in order
+ Map<String, OwnerAndPermission> ancestorOwnerAndPermissions = new
TreeMap<>(
+ (o1, o2) -> Long.compare(o2.chars().filter(ch -> ch == '/').count(),
o1.chars().filter(ch -> ch == '/').count()));
+
for (Path path : toCopy) {
FileStatus file = filesInSource.get(path);
Path filePathRelativeToSearchPath =
PathUtils.relativizePath(file.getPath(), replacedPrefix);
Path thisTargetPath = new Path(replacingPrefix,
filePathRelativeToSearchPath);
+
+ if (this.useNewPreserveLogic) {
+ ancestorOwnerAndPermissions.putAll(CopyableFile
+ .resolveReplicatedAncestorOwnerAndPermissionsRecursively(this.fs,
file.getPath().getParent(),
+ replacedPrefix, configuration));
+ }
+
CopyableFile copyableFile =
CopyableFile.fromOriginAndDestination(this.fs, file,
thisTargetPath, configuration)
.fileSet(datasetURN())
@@ -153,8 +172,16 @@ public class RecursiveCopyableDataset implements
CopyableDataset, FileSystemData
if (!toDelete.isEmpty()) {
CommitStep step = new DeleteFileCommitStep(targetFs, toDelete.values(),
this.properties,
this.deleteEmptyDirectories ?
Optional.of(deleteEmptyDirectoriesUpTo) : Optional.<Path>absent());
- copyEntities.add(new PrePublishStep(datasetURN(), Maps.<String,
String>newHashMap(), step, 1));
+ copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(),
step, 1));
}
+
+ if (this.useNewPreserveLogic) {
+ Properties props = new Properties();
+ props.setProperty(SetPermissionCommitStep.STOP_ON_ERROR_KEY, "true");
+ CommitStep step = new SetPermissionCommitStep(targetFs,
ancestorOwnerAndPermissions, props);
+ copyEntities.add(new PostPublishStep(datasetURN(), Maps.newHashMap(),
step, 1));
+ }
+
return copyEntities;
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index c42d51f..96e6333 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -486,14 +486,14 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
}
if (ownerAndPermission.getFsPermission() != null) {
- log.debug("Applying permissions %s to path %s.",
ownerAndPermission.getFsPermission(), path);
+ log.debug("Applying permissions {} to path {}.",
ownerAndPermission.getFsPermission(), path);
fs.setPermission(path,
addExecutePermissionToOwner(ownerAndPermission.getFsPermission()));
}
String group = ownerAndPermission.getGroup();
String owner = ownerAndPermission.getOwner();
if (group != null || owner != null) {
- log.debug("Applying owner %s and group %s to path %s.", owner, group,
path);
+ log.debug("Applying owner {} and group {} to path {}.", owner, group,
path);
fs.setOwner(path, owner, group);
}
} else {
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/SetPermissionCommitStep.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/SetPermissionCommitStep.java
new file mode 100644
index 0000000..dd1a68b
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/SetPermissionCommitStep.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.commit;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessControlException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.data.management.copy.OwnerAndPermission;
+
+/**
+ * An implementation of {@link CommitStep} for setting any file permissions.
+ * Current implementation only sets permissions, but it is capable of setting
owner and group as well.
+ */
+@Slf4j
+public class SetPermissionCommitStep implements CommitStep {
+ Map<String, OwnerAndPermission> pathAndPermissions;
+ private final URI fsUri;
+ public final boolean stopOnError;
+ public static final String STOP_ON_ERROR_KEY = "stop.on.error";
+ public static final String DEFAULT_STOP_ON_ERROR = "false";
+ private boolean isCompleted = false;
+
+ public SetPermissionCommitStep(FileSystem targetFs, Map<String,
OwnerAndPermission> pathAndPermissions,
+ Properties props) {
+ this.pathAndPermissions = pathAndPermissions;
+ this.fsUri = targetFs.getUri();
+ this.stopOnError =
Boolean.parseBoolean(props.getProperty(STOP_ON_ERROR_KEY,
DEFAULT_STOP_ON_ERROR));
+ }
+
+ @Override
+ public boolean isCompleted() throws IOException {
+ return isCompleted;
+ }
+
+ @Override
+ public void execute() throws IOException {
+ FileSystem fs = FileSystem.get(this.fsUri, new Configuration());
+
+ for (Map.Entry<String, OwnerAndPermission> entry :
pathAndPermissions.entrySet()) {
+ Path path = new Path(entry.getKey());
+ try {
+ log.info("Setting permission {} on path {}",
entry.getValue().getFsPermission(), path);
+ fs.setPermission(path, entry.getValue().getFsPermission());
+ // TODO : we can also set owner and group here.
+ } catch (AccessControlException e) {
+ log.warn("Error while setting permission on " + path, e);
+ if (this.stopOnError) {
+ log.info("Skip setting rest of the permissions because stopOnError
is true.");
+ break;
+ }
+ }
+ }
+
+ isCompleted = true;
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/SetPermissionCommitStepTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/SetPermissionCommitStepTest.java
new file mode 100644
index 0000000..5df1b8f
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/SetPermissionCommitStepTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.commit;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.data.management.copy.OwnerAndPermission;
+
+
+/**
+ * Test for {@link SetPermissionCommitStep}.
+ */
+@Test(groups = { "gobblin.commit" })
+public class SetPermissionCommitStepTest {
+ private static final String ROOT_DIR = "set-permission-commit-step-test";
+
+ private FileSystem fs;
+ private SetPermissionCommitStep step;
+ Path dir1;
+ FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL,
FsAction.ALL);
+
+ @BeforeClass
+ public void setUp() throws IOException {
+ this.fs = FileSystem.getLocal(new Configuration());
+ this.fs.delete(new Path(ROOT_DIR), true);
+
+ dir1 = new Path(ROOT_DIR, "dir1");
+ this.fs.mkdirs(dir1);
+
+ OwnerAndPermission ownerAndPermission = new OwnerAndPermission("owner",
"group", permission);
+ Map<String, OwnerAndPermission> pathAndPermissions = new HashMap<>();
+ pathAndPermissions.put(dir1.toString(), ownerAndPermission);
+
+ this.step = new SetPermissionCommitStep(this.fs, pathAndPermissions, new
Properties());
+ }
+
+ @AfterClass
+ public void tearDown() throws IOException {
+ this.fs.delete(new Path(ROOT_DIR), true);
+ }
+
+ @Test
+ public void testExecute() throws IOException {
+ Assert.assertNotEquals(this.fs.getFileStatus(dir1).getPermission(),
permission);
+ this.step.execute();
+ Assert.assertEquals(this.fs.getFileStatus(dir1).getPermission(),
permission);
+ }
+}