This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 62f645c58 [GOBBLIN-2049] Configure Gobblin Distcp Writer to fail if 
setPermission fails (#3929)
62f645c58 is described below

commit 62f645c586499808956ea9346753777628d52a92
Author: umustafi <[email protected]>
AuthorDate: Thu Apr 18 18:42:40 2024 -0700

    [GOBBLIN-2049] Configure Gobblin Distcp Writer to fail if setPermission 
fails (#3929)
    
    Configure Gobblin Distcp Writer to fail if setPermission fails
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../copy/publisher/CopyDataPublisher.java          |  4 ++-
 .../writer/FileAwareInputStreamDataWriter.java     | 32 +++++++++++++++++-----
 2 files changed, 28 insertions(+), 8 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index 855147eb9..bc51e7f23 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -95,6 +95,7 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
   protected final DataFileVersionStrategy dstDataFileVersionStrategy;
   protected final boolean preserveDirModTime;
   protected final boolean resyncDirOwnerAndPermission;
+  protected final boolean shouldFailWhenPermissionsFail;
 
   /**
    * Build a new {@link CopyDataPublisher} from {@link State}. The constructor 
expects the following to be set in the
@@ -135,6 +136,7 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
     // Default to be true to preserve the original behavior
     this.preserveDirModTime = 
state.getPropAsBoolean(CopyConfiguration.PRESERVE_MODTIME_FOR_DIR, true);
     this.resyncDirOwnerAndPermission = 
state.getPropAsBoolean(CopyConfiguration.RESYNC_DIR_OWNER_AND_PERMISSION_FOR_MANIFEST_COPY,
 false);
+    this.shouldFailWhenPermissionsFail = 
state.getPropAsBoolean(FileAwareInputStreamDataWriter.GOBBLIN_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL,
 FileAwareInputStreamDataWriter.DEFAULT_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL);
   }
 
   @Override
@@ -197,7 +199,7 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
       FileStatus dstFile = 
this.fs.getFileStatus(copyableFile.getDestination());
       // User specifically try to copy dir metadata, so we change the group 
and permissions on destination even when the dir already existed
       log.info("Setting destination directory {} owner and permission to {}", 
dstFile.getPath(), 
copyableFile.getDestinationOwnerAndPermission().getFsPermission());
-      FileAwareInputStreamDataWriter.safeSetPathPermission(this.fs, dstFile, 
copyableFile.getDestinationOwnerAndPermission());
+      FileAwareInputStreamDataWriter.setPathPermission(this.fs, dstFile, 
copyableFile.getDestinationOwnerAndPermission(), 
this.shouldFailWhenPermissionsFail);
     }
     if (preserveDirModTime || copyableFile.getFileStatus().isFile()) {
       // Preserving File ModTime, and set the access time to an initializing 
value when ModTime is declared to be preserved.
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 6f1626a37..05fe82a7e 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -90,6 +90,8 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
   public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
   public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = 
"gobblin.copy.task.overwrite.on.commit";
   public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = 
false;
+  public static final String GOBBLIN_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL = 
"gobblin.copy.shouldFailWhenPermissionsFail";
+  public static final boolean DEFAULT_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL = 
true;
 
   protected final AtomicLong bytesWritten = new AtomicLong();
   protected final AtomicLong filesWritten = new AtomicLong();
@@ -108,6 +110,7 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
   private final Configuration conf;
 
   protected final Meter copySpeedMeter;
+  protected final boolean shouldFailWhenPermissionsFail;
 
   protected final Optional<String> writerAttemptIdOptional;
   /**
@@ -181,6 +184,8 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
     } else {
       this.renameOptions = Options.Rename.NONE;
     }
+    this.shouldFailWhenPermissionsFail = 
state.getPropAsBoolean(GOBBLIN_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL,
+        DEFAULT_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL);
   }
 
   public FileAwareInputStreamDataWriter(State state, int numBranches, int 
branchId)
@@ -350,10 +355,13 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
   }
 
   /**
-   * Sets the {@link FsPermission}, owner, group for the path passed. It will 
not throw exceptions, if operations
-   * cannot be executed, will warn and continue.
+   * Sets the {@link FsPermission}, owner, group for the path passed. It uses 
`requirePermissionSetForSuccess` param
+   * to determine whether an exception will be thrown or only error log 
printed in the case of failure.
+   * @param requirePermissionSetForSuccess if true then throw exception, 
otherwise log error message and continue when
+   *                                       operations cannot be executed.
    */
-  public static void safeSetPathPermission(FileSystem fs, FileStatus file, 
OwnerAndPermission ownerAndPermission) {
+  public static void setPathPermission(FileSystem fs, FileStatus file, 
OwnerAndPermission ownerAndPermission,
+      boolean requirePermissionSetForSuccess) throws IOException {
 
     Path path = file.getPath();
     OwnerAndPermission targetOwnerAndPermission = 
setOwnerExecuteBitIfDirectory(file, ownerAndPermission);
@@ -367,7 +375,12 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
         fs.setPermission(path, targetOwnerAndPermission.getFsPermission());
       }
     } catch (IOException ioe) {
-      log.warn("Failed to set permission for directory " + path, ioe);
+      String permissionFailureMessage = "Failed to set permission for 
directory " + path;
+      if (requirePermissionSetForSuccess) {
+        throw new IOException(permissionFailureMessage, ioe);
+      } else {
+        log.error(permissionFailureMessage, ioe);
+      }
     }
 
     String owner = Strings.isNullOrEmpty(targetOwnerAndPermission.getOwner()) 
? null : targetOwnerAndPermission.getOwner();
@@ -378,7 +391,12 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
         fs.setOwner(path, owner, group);
       }
     } catch (IOException ioe) {
-      log.warn("Failed to set owner and/or group for path " + path + " to " + 
owner + ":" + group, ioe);
+      String ownerGroupFailureMessage = "Failed to set owner and/or group for 
path " + path + " to " + owner + ":" + group;
+      if (requirePermissionSetForSuccess) {
+        throw new IOException(ownerGroupFailureMessage, ioe);
+      } else {
+        log.error(ownerGroupFailureMessage, ioe);
+      }
     }
   }
 
@@ -394,7 +412,7 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
     Collections.reverse(files);
 
     for (FileStatus file : files) {
-      safeSetPathPermission(this.fs, file, ownerAndPermission);
+      setPathPermission(this.fs, file, ownerAndPermission, 
this.shouldFailWhenPermissionsFail);
     }
   }
 
@@ -480,7 +498,7 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
       try {
         this.fs.delete(this.stagingDir, true);
       } catch (IOException ioe) {
-        log.warn("Failed to delete staging path at " + this.stagingDir);
+        log.error("Failed to delete staging path at " + this.stagingDir);
       }
     }
   }

Reply via email to