This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8c3f326c0 [GOBBLIN-2082] Change setpermissioncommitstep to pre-create
folders before commit (#3966)
8c3f326c0 is described below
commit 8c3f326c0eef5f3a7e50c7cfee2d419af78ae1c1
Author: William Lo <[email protected]>
AuthorDate: Mon Jun 10 18:21:40 2024 -0400
[GOBBLIN-2082] Change setpermissioncommitstep to pre-create folders before
commit (#3966)
Change setpermissioncommitstep to pre-create folders so that recursive
rename wont run into race conditions
---
.../data/management/copy/ManifestBasedDataset.java | 11 +++++------
.../data/management/copy/RecursiveCopyableDataset.java | 9 ++++-----
... => CreateAndSetDirectoryPermissionCommitStep.java} | 18 ++++++++++++------
.../dataset/ManifestBasedDatasetFinderTest.java | 13 +++++++------
...CreateAndSetDirectoryPermissionCommitStepTest.java} | 11 +++++------
5 files changed, 33 insertions(+), 29 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
index 013012041..ee43580ad 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
@@ -36,12 +36,11 @@ import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.commit.CommitStep;
-import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
-import org.apache.gobblin.util.commit.SetPermissionCommitStep;
+import
org.apache.gobblin.util.commit.CreateAndSetDirectoryPermissionCommitStep;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -51,7 +50,7 @@ import org.apache.hadoop.fs.Path;
/**
* A dataset that based on Manifest. We expect the Manifest contains the list
of all the files for this dataset.
* At first phase, we only support copy across different clusters to the same
location. (We can add more feature to support rename in the future)
- * We will delete the file on target if it's listed in the manifest and not
exist on source when {@link
ManifestBasedDataset.DELETE_FILE_NOT_EXIST_ON_SOURCE} set to be true
+ * We will delete the file on target if it's listed in the manifest and not
exist on source when {@link
ManifestBasedDataset#DELETE_FILE_NOT_EXIST_ON_SOURCE} set to be true
*/
@Slf4j
public class ManifestBasedDataset implements IterableCopyableDataset {
@@ -162,9 +161,9 @@ public class ManifestBasedDataset implements
IterableCopyableDataset {
}
}
Properties props = new Properties();
- props.setProperty(SetPermissionCommitStep.STOP_ON_ERROR_KEY, "true");
- CommitStep setPermissionCommitStep = new
SetPermissionCommitStep(targetFs, ancestorOwnerAndPermissions, props);
- copyEntities.add(new PostPublishStep(datasetURN(), Maps.newHashMap(),
setPermissionCommitStep, 1));
+
props.setProperty(CreateAndSetDirectoryPermissionCommitStep.STOP_ON_ERROR_KEY,
"true");
+ CommitStep setPermissionCommitStep = new
CreateAndSetDirectoryPermissionCommitStep(targetFs,
ancestorOwnerAndPermissions, props);
+ copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(),
setPermissionCommitStep, 1));
if (!toDelete.isEmpty()) {
//todo: add support sync for empty dir
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index b47604a5d..ce6236206 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -38,13 +38,12 @@ import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.commit.CommitStep;
-import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
import org.apache.gobblin.data.management.dataset.DatasetUtils;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.util.commit.SetPermissionCommitStep;
+import
org.apache.gobblin.util.commit.CreateAndSetDirectoryPermissionCommitStep;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
@@ -177,9 +176,9 @@ public class RecursiveCopyableDataset implements
CopyableDataset, FileSystemData
if (this.useNewPreserveLogic) {
Properties props = new Properties();
- props.setProperty(SetPermissionCommitStep.STOP_ON_ERROR_KEY, "true");
- CommitStep step = new SetPermissionCommitStep(targetFs,
ancestorOwnerAndPermissions, props);
- copyEntities.add(new PostPublishStep(datasetURN(), Maps.newHashMap(),
step, 1));
+
props.setProperty(CreateAndSetDirectoryPermissionCommitStep.STOP_ON_ERROR_KEY,
"true");
+ CommitStep step = new
CreateAndSetDirectoryPermissionCommitStep(targetFs,
ancestorOwnerAndPermissions, props);
+ copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(),
step, 1));
}
return copyEntities;
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/SetPermissionCommitStep.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/CreateAndSetDirectoryPermissionCommitStep.java
similarity index 72%
rename from
gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/SetPermissionCommitStep.java
rename to
gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/CreateAndSetDirectoryPermissionCommitStep.java
index 0887c05bf..e407c20f1 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/SetPermissionCommitStep.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/CreateAndSetDirectoryPermissionCommitStep.java
@@ -34,11 +34,13 @@ import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.data.management.copy.OwnerAndPermission;
/**
- * An implementation of {@link CommitStep} for setting any file permissions.
+ * An implementation of {@link CommitStep} for creating directories and their
associated permissions before commit
+ * Necessary when creating large file paths e.g. Manifest distcp where
multiple threads are creating directories at the same time,
+ * which can lead to some race conditions described in {@link
org.apache.gobblin.util.HadoopUtils#unsafeRenameIfNotExists(FileSystem, Path,
Path)}
* Current implementation only sets permissions, but it is capable of setting
owner and group as well.
*/
@Slf4j
-public class SetPermissionCommitStep implements CommitStep {
+public class CreateAndSetDirectoryPermissionCommitStep implements CommitStep {
@Getter
Map<String, OwnerAndPermission> pathAndPermissions;
private final URI fsUri;
@@ -47,7 +49,7 @@ public class SetPermissionCommitStep implements CommitStep {
public static final String DEFAULT_STOP_ON_ERROR = "false";
private boolean isCompleted = false;
- public SetPermissionCommitStep(FileSystem targetFs, Map<String,
OwnerAndPermission> pathAndPermissions,
+ public CreateAndSetDirectoryPermissionCommitStep(FileSystem targetFs,
Map<String, OwnerAndPermission> pathAndPermissions,
Properties props) {
this.pathAndPermissions = pathAndPermissions;
this.fsUri = targetFs.getUri();
@@ -66,9 +68,13 @@ public class SetPermissionCommitStep implements CommitStep {
for (Map.Entry<String, OwnerAndPermission> entry :
pathAndPermissions.entrySet()) {
Path path = new Path(entry.getKey());
try {
- log.info("Setting permission {} on path {}",
entry.getValue().getFsPermission(), path);
- fs.setPermission(path, entry.getValue().getFsPermission());
- // TODO : we can also set owner and group here.
+ if (!fs.exists(path)) {
+ log.info("Creating path {} with permission {}", path,
entry.getValue().getFsPermission());
+ fs.mkdirs(path, entry.getValue().getFsPermission());
+ } else {
+ log.info("Setting permission {} on existing path {}",
entry.getValue().getFsPermission(), path);
+ fs.setPermission(path, entry.getValue().getFsPermission());
+ }
} catch (AccessControlException e) {
log.warn("Error while setting permission on " + path, e);
if (this.stopOnError) {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
index 16ded2b79..af556db48 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
@@ -29,9 +29,9 @@ import
org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.ManifestBasedDataset;
import org.apache.gobblin.data.management.copy.ManifestBasedDatasetFinder;
-import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
+import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
import org.apache.gobblin.data.management.partition.FileSet;
-import org.apache.gobblin.util.commit.SetPermissionCommitStep;
+import
org.apache.gobblin.util.commit.CreateAndSetDirectoryPermissionCommitStep;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -92,8 +92,9 @@ public class ManifestBasedDatasetFinderTest {
Assert.assertTrue(fileSets.hasNext());
FileSet<CopyEntity> fileSet = fileSets.next();
Assert.assertEquals(fileSet.getFiles().size(), 3); // 2 files to copy +
1 post publish step
- Assert.assertTrue(((PostPublishStep)
fileSet.getFiles().get(2)).getStep() instanceof SetPermissionCommitStep);
- SetPermissionCommitStep step = (SetPermissionCommitStep)
((PostPublishStep) fileSet.getFiles().get(2)).getStep();
+ Assert.assertTrue(((PrePublishStep) fileSet.getFiles().get(2)).getStep()
instanceof CreateAndSetDirectoryPermissionCommitStep);
+ CreateAndSetDirectoryPermissionCommitStep
+ step = (CreateAndSetDirectoryPermissionCommitStep) ((PrePublishStep)
fileSet.getFiles().get(2)).getStep();
Assert.assertEquals(step.getPathAndPermissions().keySet().size(), 1); //
SetPermissionCommitStep only applies to ancestors
Mockito.verify(manifestReadFs, Mockito.times(1)).exists(manifestPath);
@@ -136,7 +137,7 @@ public class ManifestBasedDatasetFinderTest {
Assert.assertTrue(fileSets.hasNext());
FileSet<CopyEntity> fileSet = fileSets.next();
Assert.assertEquals(fileSet.getFiles().size(), 3); // 2 files to copy +
1 post publish step
- Assert.assertTrue(((PostPublishStep)fileSet.getFiles().get(2)).getStep()
instanceof SetPermissionCommitStep);
+ Assert.assertTrue(((PrePublishStep)fileSet.getFiles().get(2)).getStep()
instanceof CreateAndSetDirectoryPermissionCommitStep);
}
}
@@ -196,7 +197,7 @@ public class ManifestBasedDatasetFinderTest {
Assert.assertTrue(fileSets.hasNext());
FileSet<CopyEntity> fileSet = fileSets.next();
Assert.assertEquals(fileSet.getFiles().size(), 2); // 1 files to copy + 1
post publish step
- Assert.assertTrue(((PostPublishStep) fileSet.getFiles().get(1)).getStep()
instanceof SetPermissionCommitStep);
+ Assert.assertTrue(((PrePublishStep) fileSet.getFiles().get(1)).getStep()
instanceof CreateAndSetDirectoryPermissionCommitStep);
}
private void setSourceAndDestFsMocks(FileSystem sourceFs, FileSystem destFs,
Path manifestPath, FileSystem manifestReadFs) throws IOException,
URISyntaxException {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/SetPermissionCommitStepTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/CreateAndSetDirectoryPermissionCommitStepTest.java
similarity index 87%
rename from
gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/SetPermissionCommitStepTest.java
rename to
gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/CreateAndSetDirectoryPermissionCommitStepTest.java
index 5df1b8f28..b230eb4f4 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/SetPermissionCommitStepTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/CreateAndSetDirectoryPermissionCommitStepTest.java
@@ -36,14 +36,14 @@ import
org.apache.gobblin.data.management.copy.OwnerAndPermission;
/**
- * Test for {@link SetPermissionCommitStep}.
+ * Test for {@link CreateAndSetDirectoryPermissionCommitStep}.
*/
@Test(groups = { "gobblin.commit" })
-public class SetPermissionCommitStepTest {
+public class CreateAndSetDirectoryPermissionCommitStepTest {
private static final String ROOT_DIR = "set-permission-commit-step-test";
private FileSystem fs;
- private SetPermissionCommitStep step;
+ private CreateAndSetDirectoryPermissionCommitStep step;
Path dir1;
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL,
FsAction.ALL);
@@ -53,13 +53,12 @@ public class SetPermissionCommitStepTest {
this.fs.delete(new Path(ROOT_DIR), true);
dir1 = new Path(ROOT_DIR, "dir1");
- this.fs.mkdirs(dir1);
OwnerAndPermission ownerAndPermission = new OwnerAndPermission("owner",
"group", permission);
Map<String, OwnerAndPermission> pathAndPermissions = new HashMap<>();
pathAndPermissions.put(dir1.toString(), ownerAndPermission);
- this.step = new SetPermissionCommitStep(this.fs, pathAndPermissions, new
Properties());
+ this.step = new CreateAndSetDirectoryPermissionCommitStep(this.fs,
pathAndPermissions, new Properties());
}
@AfterClass
@@ -69,8 +68,8 @@ public class SetPermissionCommitStepTest {
@Test
public void testExecute() throws IOException {
- Assert.assertNotEquals(this.fs.getFileStatus(dir1).getPermission(),
permission);
this.step.execute();
+ Assert.assertEquals(this.fs.exists(dir1), true);
Assert.assertEquals(this.fs.getFileStatus(dir1).getPermission(),
permission);
}
}