Repository: nifi Updated Branches: refs/heads/master 05700a200 -> cf5763939
NIFI-4311 Allowing umask to get set properly before initializing the FileSystem Signed-off-by: Pierre Villard <[email protected]> This closes #2106. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cf576393 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cf576393 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cf576393 Branch: refs/heads/master Commit: cf5763939658826fcfe3318ba100c834e30b50c5 Parents: 05700a2 Author: Bryan Bende <[email protected]> Authored: Tue Aug 22 14:19:14 2017 -0400 Committer: Pierre Villard <[email protected]> Committed: Tue Aug 22 22:40:26 2017 +0200 ---------------------------------------------------------------------- .../hadoop/AbstractHadoopProcessor.java | 14 +++++++++++++ .../hadoop/AbstractPutHDFSRecord.java | 22 +++++++++++--------- .../apache/nifi/processors/hadoop/PutHDFS.java | 11 ++++------ 3 files changed, 30 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/cf576393/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 2cec866..378dd70 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -252,6 +252,9 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { getConfigurationFromResources(config, configResources); + // give sub-classes a chance to process configuration + preProcessConfiguration(config, context); + // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout checkHdfsUriForTimeout(config); @@ -288,6 +291,17 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } /** + * This method will be called after the Configuration has been created, but before the FileSystem is created, + * allowing sub-classes to take further action on the Configuration before creating the FileSystem. + * + * @param config the Configuration that will be used to create the FileSystem + * @param context the context that can be used to retrieve additional values + */ + protected void preProcessConfiguration(final Configuration config, final ProcessContext context) { + + } + + /** * This exists in order to allow unit tests to override it so that they don't take several minutes waiting for UDP packets to be received * * @param config http://git-wip-us.apache.org/repos/asf/nifi/blob/cf576393/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java index 70a3697..e08b4fb 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java @@ -210,14 +210,8 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor { return putHdfsRecordProperties; } - - @OnScheduled - public final void onScheduled(final ProcessContext context) throws IOException { - super.abstractOnScheduled(context); - - this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue(); - this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue(); - + @Override + protected void preProcessConfiguration(Configuration config, ProcessContext context) { // Set umask once, to avoid thread safety issues doing it in onTrigger final PropertyValue umaskProp = context.getProperty(UMASK); final short dfsUmask; @@ -226,8 +220,16 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor { } else { dfsUmask = FsPermission.DEFAULT_UMASK; } - final Configuration conf = getConfiguration(); - FsPermission.setUMask(conf, new FsPermission(dfsUmask)); + + FsPermission.setUMask(config, new FsPermission(dfsUmask)); + } + + @OnScheduled + public final void onScheduled(final ProcessContext context) throws IOException { + super.abstractOnScheduled(context); + + this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue(); + this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue(); } /** http://git-wip-us.apache.org/repos/asf/nifi/blob/cf576393/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 41ddf59..4a9b2c1 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -32,7 +32,6 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -188,10 +187,8 @@ public class PutHDFS extends AbstractHadoopProcessor { return props; } - @OnScheduled - public void onScheduled(ProcessContext context) throws Exception { - super.abstractOnScheduled(context); - + @Override + protected void preProcessConfiguration(final Configuration config, final ProcessContext context) { // Set umask once, to avoid thread safety issues doing it in onTrigger final PropertyValue umaskProp = context.getProperty(UMASK); final short dfsUmask; @@ -200,8 +197,8 @@ public class PutHDFS extends AbstractHadoopProcessor { } else { dfsUmask = FsPermission.DEFAULT_UMASK; } - final Configuration conf = getConfiguration(); - FsPermission.setUMask(conf, new FsPermission(dfsUmask)); + + FsPermission.setUMask(config, new FsPermission(dfsUmask)); } @Override
