This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c3f326c0 [GOBBLIN-2082] Change setpermissioncommitstep to pre-create 
folders before commit (#3966)
8c3f326c0 is described below

commit 8c3f326c0eef5f3a7e50c7cfee2d419af78ae1c1
Author: William Lo <[email protected]>
AuthorDate: Mon Jun 10 18:21:40 2024 -0400

    [GOBBLIN-2082] Change setpermissioncommitstep to pre-create folders before 
commit (#3966)
    
    Change setpermissioncommitstep to pre-create folders so that recursive 
rename wont run into race conditions
---
 .../data/management/copy/ManifestBasedDataset.java     | 11 +++++------
 .../data/management/copy/RecursiveCopyableDataset.java |  9 ++++-----
 ... => CreateAndSetDirectoryPermissionCommitStep.java} | 18 ++++++++++++------
 .../dataset/ManifestBasedDatasetFinderTest.java        | 13 +++++++------
 ...CreateAndSetDirectoryPermissionCommitStepTest.java} | 11 +++++------
 5 files changed, 33 insertions(+), 29 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
index 013012041..ee43580ad 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
@@ -36,12 +36,11 @@ import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.commit.CommitStep;
-import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
 import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
 import org.apache.gobblin.data.management.partition.FileSet;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.commit.DeleteFileCommitStep;
-import org.apache.gobblin.util.commit.SetPermissionCommitStep;
+import 
org.apache.gobblin.util.commit.CreateAndSetDirectoryPermissionCommitStep;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -51,7 +50,7 @@ import org.apache.hadoop.fs.Path;
 /**
  * A dataset that based on Manifest. We expect the Manifest contains the list 
of all the files for this dataset.
  * At first phase, we only support copy across different clusters to the same 
location. (We can add more feature to support rename in the future)
- * We will delete the file on target if it's listed in the manifest and not 
exist on source when {@link 
ManifestBasedDataset.DELETE_FILE_NOT_EXIST_ON_SOURCE} set to be true
+ * We will delete the file on target if it's listed in the manifest and not 
exist on source when {@link 
ManifestBasedDataset#DELETE_FILE_NOT_EXIST_ON_SOURCE} set to be true
  */
 @Slf4j
 public class ManifestBasedDataset implements IterableCopyableDataset {
@@ -162,9 +161,9 @@ public class ManifestBasedDataset implements 
IterableCopyableDataset {
         }
       }
       Properties props = new Properties();
-      props.setProperty(SetPermissionCommitStep.STOP_ON_ERROR_KEY, "true");
-      CommitStep setPermissionCommitStep = new 
SetPermissionCommitStep(targetFs, ancestorOwnerAndPermissions, props);
-      copyEntities.add(new PostPublishStep(datasetURN(), Maps.newHashMap(), 
setPermissionCommitStep, 1));
+      
props.setProperty(CreateAndSetDirectoryPermissionCommitStep.STOP_ON_ERROR_KEY, 
"true");
+      CommitStep setPermissionCommitStep = new 
CreateAndSetDirectoryPermissionCommitStep(targetFs, 
ancestorOwnerAndPermissions, props);
+      copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), 
setPermissionCommitStep, 1));
 
       if (!toDelete.isEmpty()) {
         //todo: add support sync for empty dir
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index b47604a5d..ce6236206 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -38,13 +38,12 @@ import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.commit.CommitStep;
-import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
 import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
 import org.apache.gobblin.data.management.dataset.DatasetUtils;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.util.FileListUtils;
 import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.util.commit.SetPermissionCommitStep;
+import 
org.apache.gobblin.util.commit.CreateAndSetDirectoryPermissionCommitStep;
 import org.apache.gobblin.util.commit.DeleteFileCommitStep;
 
 
@@ -177,9 +176,9 @@ public class RecursiveCopyableDataset implements 
CopyableDataset, FileSystemData
 
     if (this.useNewPreserveLogic) {
       Properties props = new Properties();
-      props.setProperty(SetPermissionCommitStep.STOP_ON_ERROR_KEY, "true");
-      CommitStep step = new SetPermissionCommitStep(targetFs, 
ancestorOwnerAndPermissions, props);
-      copyEntities.add(new PostPublishStep(datasetURN(), Maps.newHashMap(), 
step, 1));
+      
props.setProperty(CreateAndSetDirectoryPermissionCommitStep.STOP_ON_ERROR_KEY, 
"true");
+      CommitStep step = new 
CreateAndSetDirectoryPermissionCommitStep(targetFs, 
ancestorOwnerAndPermissions, props);
+      copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), 
step, 1));
     }
 
     return copyEntities;
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/SetPermissionCommitStep.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/CreateAndSetDirectoryPermissionCommitStep.java
similarity index 72%
rename from 
gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/SetPermissionCommitStep.java
rename to 
gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/CreateAndSetDirectoryPermissionCommitStep.java
index 0887c05bf..e407c20f1 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/SetPermissionCommitStep.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/CreateAndSetDirectoryPermissionCommitStep.java
@@ -34,11 +34,13 @@ import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.data.management.copy.OwnerAndPermission;
 
 /**
- * An implementation of {@link CommitStep} for setting any file permissions.
+ * An implementation of {@link CommitStep} for creating directories and their 
associated permissions before commit
+ * Necessary when creating large file paths e.g. Manifest distcp where 
multiple threads are creating directories at the same time,
+ * which can lead to some race conditions described in {@link 
org.apache.gobblin.util.HadoopUtils#unsafeRenameIfNotExists(FileSystem, Path, 
Path)}
  * Current implementation only sets permissions, but it is capable of setting 
owner and group as well.
  */
 @Slf4j
-public class SetPermissionCommitStep implements CommitStep {
+public class CreateAndSetDirectoryPermissionCommitStep implements CommitStep {
   @Getter
   Map<String, OwnerAndPermission> pathAndPermissions;
   private final URI fsUri;
@@ -47,7 +49,7 @@ public class SetPermissionCommitStep implements CommitStep {
   public static final String DEFAULT_STOP_ON_ERROR = "false";
   private boolean isCompleted = false;
 
-  public SetPermissionCommitStep(FileSystem targetFs, Map<String, 
OwnerAndPermission> pathAndPermissions,
+  public CreateAndSetDirectoryPermissionCommitStep(FileSystem targetFs, 
Map<String, OwnerAndPermission> pathAndPermissions,
       Properties props) {
     this.pathAndPermissions = pathAndPermissions;
     this.fsUri = targetFs.getUri();
@@ -66,9 +68,13 @@ public class SetPermissionCommitStep implements CommitStep {
     for (Map.Entry<String, OwnerAndPermission> entry : 
pathAndPermissions.entrySet()) {
       Path path = new Path(entry.getKey());
       try {
-        log.info("Setting permission {} on path {}", 
entry.getValue().getFsPermission(), path);
-        fs.setPermission(path, entry.getValue().getFsPermission());
-        // TODO : we can also set owner and group here.
+        if (!fs.exists(path)) {
+          log.info("Creating path {} with permission {}", path, 
entry.getValue().getFsPermission());
+          fs.mkdirs(path, entry.getValue().getFsPermission());
+        } else {
+          log.info("Setting permission {} on existing path {}", 
entry.getValue().getFsPermission(), path);
+          fs.setPermission(path, entry.getValue().getFsPermission());
+        }
       } catch (AccessControlException e) {
         log.warn("Error while setting permission on " + path, e);
         if (this.stopOnError) {
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
index 16ded2b79..af556db48 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
@@ -29,9 +29,9 @@ import 
org.apache.gobblin.data.management.copy.CopyConfiguration;
 import org.apache.gobblin.data.management.copy.CopyEntity;
 import org.apache.gobblin.data.management.copy.ManifestBasedDataset;
 import org.apache.gobblin.data.management.copy.ManifestBasedDatasetFinder;
-import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
+import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
 import org.apache.gobblin.data.management.partition.FileSet;
-import org.apache.gobblin.util.commit.SetPermissionCommitStep;
+import 
org.apache.gobblin.util.commit.CreateAndSetDirectoryPermissionCommitStep;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -92,8 +92,9 @@ public class ManifestBasedDatasetFinderTest {
       Assert.assertTrue(fileSets.hasNext());
       FileSet<CopyEntity> fileSet = fileSets.next();
       Assert.assertEquals(fileSet.getFiles().size(), 3);  // 2 files to copy + 
1 post publish step
-      Assert.assertTrue(((PostPublishStep) 
fileSet.getFiles().get(2)).getStep() instanceof SetPermissionCommitStep);
-      SetPermissionCommitStep step = (SetPermissionCommitStep) 
((PostPublishStep) fileSet.getFiles().get(2)).getStep();
+      Assert.assertTrue(((PrePublishStep) fileSet.getFiles().get(2)).getStep() 
instanceof CreateAndSetDirectoryPermissionCommitStep);
+      CreateAndSetDirectoryPermissionCommitStep
+          step = (CreateAndSetDirectoryPermissionCommitStep) ((PrePublishStep) 
fileSet.getFiles().get(2)).getStep();
 
       Assert.assertEquals(step.getPathAndPermissions().keySet().size(), 1); // 
SetPermissionCommitStep only applies to ancestors
       Mockito.verify(manifestReadFs, Mockito.times(1)).exists(manifestPath);
@@ -136,7 +137,7 @@ public class ManifestBasedDatasetFinderTest {
       Assert.assertTrue(fileSets.hasNext());
       FileSet<CopyEntity> fileSet = fileSets.next();
       Assert.assertEquals(fileSet.getFiles().size(), 3);  // 2 files to copy + 
1 post publish step
-      Assert.assertTrue(((PostPublishStep)fileSet.getFiles().get(2)).getStep() 
instanceof SetPermissionCommitStep);
+      Assert.assertTrue(((PrePublishStep)fileSet.getFiles().get(2)).getStep() 
instanceof CreateAndSetDirectoryPermissionCommitStep);
 
     }
   }
@@ -196,7 +197,7 @@ public class ManifestBasedDatasetFinderTest {
     Assert.assertTrue(fileSets.hasNext());
     FileSet<CopyEntity> fileSet = fileSets.next();
     Assert.assertEquals(fileSet.getFiles().size(), 2);  // 1 files to copy + 1 
post publish step
-    Assert.assertTrue(((PostPublishStep) fileSet.getFiles().get(1)).getStep() 
instanceof SetPermissionCommitStep);
+    Assert.assertTrue(((PrePublishStep) fileSet.getFiles().get(1)).getStep() 
instanceof CreateAndSetDirectoryPermissionCommitStep);
   }
 
   private void setSourceAndDestFsMocks(FileSystem sourceFs, FileSystem destFs, 
Path manifestPath, FileSystem manifestReadFs) throws IOException, 
URISyntaxException {
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/SetPermissionCommitStepTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/CreateAndSetDirectoryPermissionCommitStepTest.java
similarity index 87%
rename from 
gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/SetPermissionCommitStepTest.java
rename to 
gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/CreateAndSetDirectoryPermissionCommitStepTest.java
index 5df1b8f28..b230eb4f4 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/SetPermissionCommitStepTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/util/commit/CreateAndSetDirectoryPermissionCommitStepTest.java
@@ -36,14 +36,14 @@ import 
org.apache.gobblin.data.management.copy.OwnerAndPermission;
 
 
 /**
- * Test for {@link SetPermissionCommitStep}.
+ * Test for {@link CreateAndSetDirectoryPermissionCommitStep}.
  */
 @Test(groups = { "gobblin.commit" })
-public class SetPermissionCommitStepTest {
+public class CreateAndSetDirectoryPermissionCommitStepTest {
   private static final String ROOT_DIR = "set-permission-commit-step-test";
 
   private FileSystem fs;
-  private SetPermissionCommitStep step;
+  private CreateAndSetDirectoryPermissionCommitStep step;
   Path dir1;
   FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, 
FsAction.ALL);
 
@@ -53,13 +53,12 @@ public class SetPermissionCommitStepTest {
     this.fs.delete(new Path(ROOT_DIR), true);
 
     dir1 = new Path(ROOT_DIR, "dir1");
-    this.fs.mkdirs(dir1);
 
     OwnerAndPermission ownerAndPermission = new OwnerAndPermission("owner", 
"group", permission);
     Map<String, OwnerAndPermission> pathAndPermissions = new HashMap<>();
     pathAndPermissions.put(dir1.toString(), ownerAndPermission);
 
-    this.step = new SetPermissionCommitStep(this.fs, pathAndPermissions, new 
Properties());
+    this.step = new CreateAndSetDirectoryPermissionCommitStep(this.fs, 
pathAndPermissions, new Properties());
   }
 
   @AfterClass
@@ -69,8 +68,8 @@ public class SetPermissionCommitStepTest {
 
   @Test
   public void testExecute() throws IOException {
-    Assert.assertNotEquals(this.fs.getFileStatus(dir1).getPermission(), 
permission);
     this.step.execute();
+    Assert.assertEquals(this.fs.exists(dir1), true);
     Assert.assertEquals(this.fs.getFileStatus(dir1).getPermission(), 
permission);
   }
 }

Reply via email to