This is an automated email from the ASF dual-hosted git repository.
aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 07e76fc [GOBBLIN-1488] Added option to set perm group at table level
(#3334)
07e76fc is described below
commit 07e76fc8205beeceb935594f3816c832614a474d
Author: vbohra <[email protected]>
AuthorDate: Mon Jul 19 13:29:32 2021 -0700
[GOBBLIN-1488] Added option to set perm group at table level (#3334)
This option will allow us to set permissions for publisher output, on table
level.
The publisher output directory can be one of the following:
* data.publisher.final.dir ( if data.publisher.appendExtractToFinalDir is
set to false)
* data.publisher.final.dir/db/table ( if
data.publisher.appendExtractToFinalDir is set to true and writer.file.path.type
= namespace_table)
* data.publisher.final.dir/table ( if
data.publisher.appendExtractToFinalDir is set to true and writer.file.path.type
= tablename)
*and a default
Deprecated data.publisher.final.dir.group since it is set incorrectly.
---
.../apache/gobblin/configuration/ConfigurationKeys.java | 6 +++++-
.../org/apache/gobblin/publisher/BaseDataPublisher.java | 14 +++++++++++++-
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index dce740c..8e7109c 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -494,8 +494,12 @@ public class ConfigurationKeys {
public static final String DATA_PUBLISHER_REPLACE_FINAL_DIR =
DATA_PUBLISHER_PREFIX + ".replace.final.dir";
public static final String DATA_PUBLISHER_FINAL_NAME = DATA_PUBLISHER_PREFIX
+ ".final.name";
public static final String DATA_PUBLISHER_OVERWRITE_ENABLED =
DATA_PUBLISHER_PREFIX + ".overwrite.enabled";
- // This property is used to specify the owner group of the data publisher
final output directory
+ // @DATA_PUBLISHER_FINAL_DIR is the final publishing root directory
+ // @DATA_PUBLISHER_FINAL_DIR_GROUP is set at the leaf level
(DATA_PUBLISHER_FINAL_DIR/EXTRACT/file.xxx) which is incorrect
+ // Use @DATA_PUBLISHER_OUTPUT_DIR_GROUP to set group at output dir level
@DATA_PUBLISHER_FINAL_DIR/EXTRACT
+ @Deprecated
public static final String DATA_PUBLISHER_FINAL_DIR_GROUP =
DATA_PUBLISHER_PREFIX + ".final.dir.group";
+ public static final String DATA_PUBLISHER_OUTPUT_DIR_GROUP =
DATA_PUBLISHER_PREFIX + ".output.dir.group";
public static final String DATA_PUBLISHER_PERMISSIONS =
DATA_PUBLISHER_PREFIX + ".permissions";
public static final String PUBLISH_DATA_AT_JOB_LEVEL =
"publish.data.at.job.level";
public static final boolean DEFAULT_PUBLISH_DATA_AT_JOB_LEVEL = true;
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index 000ae19..56d7270 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -103,6 +103,7 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
protected final List<FileSystem> publisherFileSystemByBranches;
protected final List<FileSystem> metaDataWriterFileSystemByBranches;
protected final List<Optional<String>>
publisherFinalDirOwnerGroupsByBranches;
+ protected final List<Optional<String>>
publisherOutputDirOwnerGroupByBranches;
protected final List<FsPermission> permissions;
protected final Closer closer;
protected final Closer parallelRunnerCloser;
@@ -161,6 +162,7 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
this.publisherFileSystemByBranches =
Lists.newArrayListWithCapacity(this.numBranches);
this.metaDataWriterFileSystemByBranches =
Lists.newArrayListWithCapacity(this.numBranches);
this.publisherFinalDirOwnerGroupsByBranches =
Lists.newArrayListWithCapacity(this.numBranches);
+ this.publisherOutputDirOwnerGroupByBranches =
Lists.newArrayListWithCapacity(this.numBranches);
this.permissions = Lists.newArrayListWithCapacity(this.numBranches);
this.metadataMergers = new HashMap<>();
@@ -178,9 +180,11 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
this.metaDataWriterFileSystemByBranches.add(FileSystem.get(publisherUri,
conf));
// The group(s) will be applied to the final publisher output
directory(ies)
+ // (Deprecated) See ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR_GROUP
this.publisherFinalDirOwnerGroupsByBranches.add(Optional.fromNullable(this.getState().getProp(ForkOperatorUtils
.getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR_GROUP,
this.numBranches, i))));
-
+
this.publisherOutputDirOwnerGroupByBranches.add(Optional.fromNullable(this.getState().getProp(ForkOperatorUtils
+
.getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_OUTPUT_DIR_GROUP,
this.numBranches, i))));
// The permission(s) will be applied to all directories created by the
publisher,
// which do NOT include directories created by the writer and moved by
the publisher.
// The permissions of those directories are controlled by
writer.file.permissions and writer.dir.permissions.
@@ -396,6 +400,10 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
// Create final output directory
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
publisherOutputDir,
this.permissions.get(branchId), retrierConfig);
+
if(this.publisherOutputDirOwnerGroupByBranches.get(branchId).isPresent()) {
+ LOG.info(String.format("Setting path %s group to %s",
publisherOutputDir.toString(),
this.publisherOutputDirOwnerGroupByBranches.get(branchId).get()));
+ HadoopUtils.setGroup(this.publisherFileSystemByBranches.get(branchId),
publisherOutputDir,
this.publisherOutputDirOwnerGroupByBranches.get(branchId).get());
+ }
addSingleTaskWriterOutputToExistingDir(writerOutputDir,
publisherOutputDir, state, branchId, parallelRunner);
} else {
if (writerOutputPathsMoved.contains(writerOutputDir)) {
@@ -424,6 +432,10 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
// Create the parent directory of the final output directory if it
does not exist
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
publisherOutputDir.getParent(), this.permissions.get(branchId),
retrierConfig);
+
if(this.publisherOutputDirOwnerGroupByBranches.get(branchId).isPresent()) {
+ LOG.info(String.format("Setting path %s group to %s",
publisherOutputDir.toString(),
this.publisherOutputDirOwnerGroupByBranches.get(branchId).get()));
+
HadoopUtils.setGroup(this.publisherFileSystemByBranches.get(branchId),
publisherOutputDir,
this.publisherOutputDirOwnerGroupByBranches.get(branchId).get());
+ }
}
movePath(parallelRunner, state, writerOutputDir, publisherOutputDir,
branchId);