This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new dccbde4  NIFI-5452: Allow ignore block locality in HDFS Fixed 
formatting. Fallback to autoboxing
dccbde4 is described below

commit dccbde40f3da5aecf8299513bcd4691702ffc02a
Author: David Mollitor <[email protected]>
AuthorDate: Thu Aug 15 16:35:14 2019 -0400

    NIFI-5452: Allow ignore block locality in HDFS
    Fixed formatting. Fallback to autoboxing
    
    This closes #3652.
---
 .../org/apache/nifi/processors/hadoop/PutHDFS.java | 27 +++++++++++++++++++++-
 1 file changed, 26 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 51f6a82..116caba 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -17,8 +17,10 @@
 package org.apache.nifi.processors.hadoop;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsCreateModes;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.ipc.RemoteException;
@@ -61,6 +63,7 @@ import java.io.OutputStream;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
@@ -170,6 +173,17 @@ public class PutHDFS extends AbstractHadoopProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
+    public static final PropertyDescriptor IGNORE_LOCALITY = new 
PropertyDescriptor.Builder()
+            .name("Ignore Locality")
+            .displayName("Ignore Locality")
+            .description(
+                    "Directs the HDFS system to ignore locality rules so that 
data is distributed randomly throughout the cluster")
+            .required(false)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
     private static final Set<Relationship> relationships;
 
     static {
@@ -199,6 +213,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
         props.add(REMOTE_OWNER);
         props.add(REMOTE_GROUP);
         props.add(COMPRESSION_CODEC);
+        props.add(IGNORE_LOCALITY);
         return props;
     }
 
@@ -313,8 +328,18 @@ public class PutHDFS extends AbstractHadoopProcessor {
                                 if 
(conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && destinationExists) 
{
                                     fos = hdfs.append(copyFile, bufferSize);
                                 } else {
-                                    fos = hdfs.create(tempCopyFile, true, 
bufferSize, replication, blockSize);
+                                  final EnumSet<CreateFlag> cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
+
+                                  final Boolean ignoreLocality = 
context.getProperty(IGNORE_LOCALITY).asBoolean();
+                                  if (ignoreLocality) {
+                                    
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
+                                  }
+
+                                  fos = hdfs.create(tempCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+                                      FsPermission.getUMask(hdfs.getConf())), 
cflags, bufferSize, replication, blockSize,
+                                      null, null);
                                 }
+
                                 if (codec != null) {
                                     fos = codec.createOutputStream(fos);
                                 }

Reply via email to