This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new dccbde4 NIFI-5452: Allow ignore block locality in HDFS Fixed
formatting. Fallback to autoboxing
dccbde4 is described below
commit dccbde40f3da5aecf8299513bcd4691702ffc02a
Author: David Mollitor <[email protected]>
AuthorDate: Thu Aug 15 16:35:14 2019 -0400
NIFI-5452: Allow ignore block locality in HDFS
Fixed formatting. Fallback to autoboxing
This closes #3652.
---
.../org/apache/nifi/processors/hadoop/PutHDFS.java | 27 +++++++++++++++++++++-
1 file changed, 26 insertions(+), 1 deletion(-)
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 51f6a82..116caba 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -17,8 +17,10 @@
package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
@@ -61,6 +63,7 @@ import java.io.OutputStream;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
@@ -170,6 +173,17 @@ public class PutHDFS extends AbstractHadoopProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
+ public static final PropertyDescriptor IGNORE_LOCALITY = new
PropertyDescriptor.Builder()
+ .name("Ignore Locality")
+ .displayName("Ignore Locality")
+ .description(
+ "Directs the HDFS system to ignore locality rules so that
data is distributed randomly throughout the cluster")
+ .required(false)
+ .defaultValue("false")
+ .allowableValues("true", "false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
private static final Set<Relationship> relationships;
static {
@@ -199,6 +213,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
props.add(REMOTE_OWNER);
props.add(REMOTE_GROUP);
props.add(COMPRESSION_CODEC);
+ props.add(IGNORE_LOCALITY);
return props;
}
@@ -313,8 +328,18 @@ public class PutHDFS extends AbstractHadoopProcessor {
if
(conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && destinationExists)
{
fos = hdfs.append(copyFile, bufferSize);
} else {
- fos = hdfs.create(tempCopyFile, true,
bufferSize, replication, blockSize);
+ final EnumSet<CreateFlag> cflags =
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
+
+ final Boolean ignoreLocality =
context.getProperty(IGNORE_LOCALITY).asBoolean();
+ if (ignoreLocality) {
+
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
+ }
+
+ fos = hdfs.create(tempCopyFile,
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+ FsPermission.getUMask(hdfs.getConf())),
cflags, bufferSize, replication, blockSize,
+ null, null);
}
+
if (codec != null) {
fos = codec.createOutputStream(fos);
}