This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 62f645c58 [GOBBLIN-2049] Configure Gobblin Distcp Writer to fail if
setPermission fails (#3929)
62f645c58 is described below
commit 62f645c586499808956ea9346753777628d52a92
Author: umustafi <[email protected]>
AuthorDate: Thu Apr 18 18:42:40 2024 -0700
[GOBBLIN-2049] Configure Gobblin Distcp Writer to fail if setPermission
fails (#3929)
Configure Gobblin Distcp Writer to fail if setPermission fails
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../copy/publisher/CopyDataPublisher.java | 4 ++-
.../writer/FileAwareInputStreamDataWriter.java | 32 +++++++++++++++++-----
2 files changed, 28 insertions(+), 8 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index 855147eb9..bc51e7f23 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -95,6 +95,7 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
protected final DataFileVersionStrategy dstDataFileVersionStrategy;
protected final boolean preserveDirModTime;
protected final boolean resyncDirOwnerAndPermission;
+ protected final boolean shouldFailWhenPermissionsFail;
/**
* Build a new {@link CopyDataPublisher} from {@link State}. The constructor
expects the following to be set in the
@@ -135,6 +136,7 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
// Default to be true to preserve the original behavior
this.preserveDirModTime =
state.getPropAsBoolean(CopyConfiguration.PRESERVE_MODTIME_FOR_DIR, true);
this.resyncDirOwnerAndPermission =
state.getPropAsBoolean(CopyConfiguration.RESYNC_DIR_OWNER_AND_PERMISSION_FOR_MANIFEST_COPY,
false);
+ this.shouldFailWhenPermissionsFail =
state.getPropAsBoolean(FileAwareInputStreamDataWriter.GOBBLIN_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL,
FileAwareInputStreamDataWriter.DEFAULT_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL);
}
@Override
@@ -197,7 +199,7 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
FileStatus dstFile =
this.fs.getFileStatus(copyableFile.getDestination());
// User specifically try to copy dir metadata, so we change the group
and permissions on destination even when the dir already existed
log.info("Setting destination directory {} owner and permission to {}",
dstFile.getPath(),
copyableFile.getDestinationOwnerAndPermission().getFsPermission());
- FileAwareInputStreamDataWriter.safeSetPathPermission(this.fs, dstFile,
copyableFile.getDestinationOwnerAndPermission());
+ FileAwareInputStreamDataWriter.setPathPermission(this.fs, dstFile,
copyableFile.getDestinationOwnerAndPermission(),
this.shouldFailWhenPermissionsFail);
}
if (preserveDirModTime || copyableFile.getFileStatus().isFile()) {
// Preserving File ModTime, and set the access time to an initializing
value when ModTime is declared to be preserved.
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 6f1626a37..05fe82a7e 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -90,6 +90,8 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT =
"gobblin.copy.task.overwrite.on.commit";
public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT =
false;
+ public static final String GOBBLIN_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL =
"gobblin.copy.shouldFailWhenPermissionsFail";
+ public static final boolean DEFAULT_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL =
true;
protected final AtomicLong bytesWritten = new AtomicLong();
protected final AtomicLong filesWritten = new AtomicLong();
@@ -108,6 +110,7 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
private final Configuration conf;
protected final Meter copySpeedMeter;
+ protected final boolean shouldFailWhenPermissionsFail;
protected final Optional<String> writerAttemptIdOptional;
/**
@@ -181,6 +184,8 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
} else {
this.renameOptions = Options.Rename.NONE;
}
+ this.shouldFailWhenPermissionsFail =
state.getPropAsBoolean(GOBBLIN_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL,
+ DEFAULT_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL);
}
public FileAwareInputStreamDataWriter(State state, int numBranches, int
branchId)
@@ -350,10 +355,13 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
}
/**
- * Sets the {@link FsPermission}, owner, group for the path passed. It will
not throw exceptions, if operations
- * cannot be executed, will warn and continue.
+ * Sets the {@link FsPermission}, owner, group for the path passed. It uses
`requirePermissionSetForSuccess` param
+ * to determine whether an exception will be thrown or only error log
printed in the case of failure.
+ * @param requirePermissionSetForSuccess if true then throw exception,
otherwise log error message and continue when
+ * operations cannot be executed.
*/
- public static void safeSetPathPermission(FileSystem fs, FileStatus file,
OwnerAndPermission ownerAndPermission) {
+ public static void setPathPermission(FileSystem fs, FileStatus file,
OwnerAndPermission ownerAndPermission,
+ boolean requirePermissionSetForSuccess) throws IOException {
Path path = file.getPath();
OwnerAndPermission targetOwnerAndPermission =
setOwnerExecuteBitIfDirectory(file, ownerAndPermission);
@@ -367,7 +375,12 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
fs.setPermission(path, targetOwnerAndPermission.getFsPermission());
}
} catch (IOException ioe) {
- log.warn("Failed to set permission for directory " + path, ioe);
+ String permissionFailureMessage = "Failed to set permission for
directory " + path;
+ if (requirePermissionSetForSuccess) {
+ throw new IOException(permissionFailureMessage, ioe);
+ } else {
+ log.error(permissionFailureMessage, ioe);
+ }
}
String owner = Strings.isNullOrEmpty(targetOwnerAndPermission.getOwner())
? null : targetOwnerAndPermission.getOwner();
@@ -378,7 +391,12 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
fs.setOwner(path, owner, group);
}
} catch (IOException ioe) {
- log.warn("Failed to set owner and/or group for path " + path + " to " +
owner + ":" + group, ioe);
+ String ownerGroupFailureMessage = "Failed to set owner and/or group for
path " + path + " to " + owner + ":" + group;
+ if (requirePermissionSetForSuccess) {
+ throw new IOException(ownerGroupFailureMessage, ioe);
+ } else {
+ log.error(ownerGroupFailureMessage, ioe);
+ }
}
}
@@ -394,7 +412,7 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
Collections.reverse(files);
for (FileStatus file : files) {
- safeSetPathPermission(this.fs, file, ownerAndPermission);
+ setPathPermission(this.fs, file, ownerAndPermission,
this.shouldFailWhenPermissionsFail);
}
}
@@ -480,7 +498,7 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
try {
this.fs.delete(this.stagingDir, true);
} catch (IOException ioe) {
- log.warn("Failed to delete staging path at " + this.stagingDir);
+ log.error("Failed to delete staging path at " + this.stagingDir);
}
}
}