This is an automated email from the ASF dual-hosted git repository.

aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 07e76fc  [GOBBLIN-1488] Added option to set perm group at table level 
(#3334)
07e76fc is described below

commit 07e76fc8205beeceb935594f3816c832614a474d
Author: vbohra <[email protected]>
AuthorDate: Mon Jul 19 13:29:32 2021 -0700

    [GOBBLIN-1488] Added option to set perm group at table level (#3334)
    
    This option will allow us to set permissions for publisher output, on table 
level.
    
    The publisher output directory can be one of the following:
    * data.publisher.final.dir ( if data.publisher.appendExtractToFinalDir is 
set to false)
    * data.publisher.final.dir/db/table ( if 
data.publisher.appendExtractToFinalDir is set to true and writer.file.path.type 
= namespace_table)
    * data.publisher.final.dir/table ( if 
data.publisher.appendExtractToFinalDir is set to true and writer.file.path.type 
= tablename)
    *and a default
    
    Deprecated data.publisher.final.dir.group since it is set incorrectly.
---
 .../apache/gobblin/configuration/ConfigurationKeys.java    |  6 +++++-
 .../org/apache/gobblin/publisher/BaseDataPublisher.java    | 14 +++++++++++++-
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index dce740c..8e7109c 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -494,8 +494,12 @@ public class ConfigurationKeys {
   public static final String DATA_PUBLISHER_REPLACE_FINAL_DIR = 
DATA_PUBLISHER_PREFIX + ".replace.final.dir";
   public static final String DATA_PUBLISHER_FINAL_NAME = DATA_PUBLISHER_PREFIX 
+ ".final.name";
   public static final String DATA_PUBLISHER_OVERWRITE_ENABLED = 
DATA_PUBLISHER_PREFIX + ".overwrite.enabled";
-  // This property is used to specify the owner group of the data publisher 
final output directory
+  // @DATA_PUBLISHER_FINAL_DIR is the final publishing root directory
+  // @DATA_PUBLISHER_FINAL_DIR_GROUP is set at the leaf level 
(DATA_PUBLISHER_FINAL_DIR/EXTRACT/file.xxx) which is incorrect
+  // Use @DATA_PUBLISHER_OUTPUT_DIR_GROUP to set group at output dir level 
@DATA_PUBLISHER_FINAL_DIR/EXTRACT
+  @Deprecated
   public static final String DATA_PUBLISHER_FINAL_DIR_GROUP = 
DATA_PUBLISHER_PREFIX + ".final.dir.group";
+  public static final String DATA_PUBLISHER_OUTPUT_DIR_GROUP = 
DATA_PUBLISHER_PREFIX + ".output.dir.group";
   public static final String DATA_PUBLISHER_PERMISSIONS = 
DATA_PUBLISHER_PREFIX + ".permissions";
   public static final String PUBLISH_DATA_AT_JOB_LEVEL = 
"publish.data.at.job.level";
   public static final boolean DEFAULT_PUBLISH_DATA_AT_JOB_LEVEL = true;
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index 000ae19..56d7270 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -103,6 +103,7 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
   protected final List<FileSystem> publisherFileSystemByBranches;
   protected final List<FileSystem> metaDataWriterFileSystemByBranches;
   protected final List<Optional<String>> 
publisherFinalDirOwnerGroupsByBranches;
+  protected final List<Optional<String>> 
publisherOutputDirOwnerGroupByBranches;
   protected final List<FsPermission> permissions;
   protected final Closer closer;
   protected final Closer parallelRunnerCloser;
@@ -161,6 +162,7 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
     this.publisherFileSystemByBranches = 
Lists.newArrayListWithCapacity(this.numBranches);
     this.metaDataWriterFileSystemByBranches = 
Lists.newArrayListWithCapacity(this.numBranches);
     this.publisherFinalDirOwnerGroupsByBranches = 
Lists.newArrayListWithCapacity(this.numBranches);
+    this.publisherOutputDirOwnerGroupByBranches = 
Lists.newArrayListWithCapacity(this.numBranches);
     this.permissions = Lists.newArrayListWithCapacity(this.numBranches);
     this.metadataMergers = new HashMap<>();
 
@@ -178,9 +180,11 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
       this.metaDataWriterFileSystemByBranches.add(FileSystem.get(publisherUri, 
conf));
 
       // The group(s) will be applied to the final publisher output 
directory(ies)
+      // (Deprecated) See ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR_GROUP
       
this.publisherFinalDirOwnerGroupsByBranches.add(Optional.fromNullable(this.getState().getProp(ForkOperatorUtils
           
.getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR_GROUP, 
this.numBranches, i))));
-
+      
this.publisherOutputDirOwnerGroupByBranches.add(Optional.fromNullable(this.getState().getProp(ForkOperatorUtils
+          
.getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_OUTPUT_DIR_GROUP, 
this.numBranches, i))));
       // The permission(s) will be applied to all directories created by the 
publisher,
       // which do NOT include directories created by the writer and moved by 
the publisher.
       // The permissions of those directories are controlled by 
writer.file.permissions and writer.dir.permissions.
@@ -396,6 +400,10 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
       // Create final output directory
       
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
 publisherOutputDir,
           this.permissions.get(branchId), retrierConfig);
+      
if(this.publisherOutputDirOwnerGroupByBranches.get(branchId).isPresent()) {
+        LOG.info(String.format("Setting path %s group to %s", 
publisherOutputDir.toString(), 
this.publisherOutputDirOwnerGroupByBranches.get(branchId).get()));
+        HadoopUtils.setGroup(this.publisherFileSystemByBranches.get(branchId), 
publisherOutputDir, 
this.publisherOutputDirOwnerGroupByBranches.get(branchId).get());
+      }
       addSingleTaskWriterOutputToExistingDir(writerOutputDir, 
publisherOutputDir, state, branchId, parallelRunner);
     } else {
       if (writerOutputPathsMoved.contains(writerOutputDir)) {
@@ -424,6 +432,10 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
         // Create the parent directory of the final output directory if it 
does not exist
         
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
             publisherOutputDir.getParent(), this.permissions.get(branchId), 
retrierConfig);
+        
if(this.publisherOutputDirOwnerGroupByBranches.get(branchId).isPresent()) {
+          LOG.info(String.format("Setting path %s group to %s", 
publisherOutputDir.toString(), 
this.publisherOutputDirOwnerGroupByBranches.get(branchId).get()));
+          
HadoopUtils.setGroup(this.publisherFileSystemByBranches.get(branchId), 
publisherOutputDir, 
this.publisherOutputDirOwnerGroupByBranches.get(branchId).get());
+        }
       }
 
       movePath(parallelRunner, state, writerOutputDir, publisherOutputDir, 
branchId);

Reply via email to