Repository: nifi
Updated Branches:
  refs/heads/master ca5a7fbf0 -> fd0dd51ff


NIFI-2553 Fixing handling of Paths in HDFS processors

Signed-off-by: Yolanda M. Davis <[email protected]>

This closes #843


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fd0dd51f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fd0dd51f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fd0dd51f

Branch: refs/heads/master
Commit: fd0dd51ff555e919ecd771bec9361dfc0bc86eb6
Parents: ca5a7fb
Author: Bryan Bende <[email protected]>
Authored: Thu Aug 11 15:50:54 2016 -0400
Committer: Yolanda M. Davis <[email protected]>
Committed: Mon Aug 15 08:59:42 2016 -0400

----------------------------------------------------------------------
 .../nifi/processor/util/StandardValidators.java |  17 +--
 .../hadoop/AbstractHadoopProcessor.java         |  53 ++++----
 .../nifi/processors/hadoop/FetchHDFS.java       |  18 ++-
 .../apache/nifi/processors/hadoop/GetHDFS.java  |  18 +--
 .../apache/nifi/processors/hadoop/ListHDFS.java |  16 +--
 .../apache/nifi/processors/hadoop/PutHDFS.java  |  44 ++++---
 .../processors/hadoop/AbstractHadoopTest.java   |   2 +-
 .../hadoop/GetHDFSSequenceFileTest.java         |   2 +-
 .../nifi/processors/hadoop/GetHDFSTest.java     |  41 +++++++
 .../nifi/processors/hadoop/PutHDFSTest.java     |  67 +++++++++++
 .../nifi/processors/hadoop/TestFetchHDFS.java   | 120 +++++++++++++++++++
 .../nifi/processors/hadoop/TestListHDFS.java    |  50 +++++++-
 12 files changed, 364 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fd0dd51f/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
index 47d5d50..5ceb952 100644
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
@@ -24,6 +24,7 @@ import java.nio.charset.UnsupportedCharsetException;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
@@ -279,15 +280,17 @@ public class StandardValidators {
         @Override
         public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
             if (context.isExpressionLanguageSupported(subject) && 
context.isExpressionLanguagePresent(input)) {
-                return new 
ValidationResult.Builder().subject(subject).input(input).explanation("Expression
 Language Present").valid(true).build();
+                try {
+                    final String result = 
context.newExpressionLanguageCompiler().validateExpression(input, true);
+                    if (!StringUtils.isEmpty(result)) {
+                        return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(result).build();
+                    }
+                } catch (final Exception e) {
+                    return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(e.getMessage()).build();
+                }
             }
 
-            try {
-                context.newExpressionLanguageCompiler().compile(input);
-                return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
-            } catch (final Exception e) {
-                return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(e.getMessage()).build();
-            }
+            return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
         }
 
     };

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd0dd51f/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 540c406..93e0703 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -91,19 +91,35 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
     }
 
     // properties
-    public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = 
new PropertyDescriptor.Builder().name("Hadoop Configuration Resources")
+    public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = 
new PropertyDescriptor.Builder()
+            .name("Hadoop Configuration Resources")
             .description("A file or comma separated list of files which 
contains the Hadoop file system configuration. Without this, Hadoop "
                     + "will search the classpath for a 'core-site.xml' and 
'hdfs-site.xml' file or will revert to a default configuration.")
-            
.required(false).addValidator(createMultipleFilesExistValidator()).build();
+            .required(false)
+            .addValidator(createMultipleFilesExistValidator())
+            .build();
 
-    public static final String DIRECTORY_PROP_NAME = "Directory";
+    public static final PropertyDescriptor DIRECTORY = new 
PropertyDescriptor.Builder()
+            .name("Directory")
+            .description("The HDFS directory from which files should be read")
+            .required(true)
+            
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
 
-    public static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder().name("Compression codec").required(true)
-            
.allowableValues(CompressionType.values()).defaultValue(CompressionType.NONE.toString()).build();
+    public static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
+            .name("Compression codec")
+            .required(true)
+            .allowableValues(CompressionType.values())
+            .defaultValue(CompressionType.NONE.toString())
+            .build();
 
-    public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new 
PropertyDescriptor.Builder().name("Kerberos Relogin Period").required(false)
-            .description("Period of time which should pass before attempting a 
kerberos relogin").defaultValue("4 hours")
-            
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+    public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new 
PropertyDescriptor.Builder()
+            .name("Kerberos Relogin Period").required(false)
+            .description("Period of time which should pass before attempting a 
kerberos relogin")
+            .defaultValue("4 hours")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
     private static final Object RESOURCES_LOCK = new Object();
@@ -191,19 +207,8 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
             }
             HdfsResources resources = hdfsResources.get();
             if (resources.getConfiguration() == null) {
-                String configResources = 
context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
-                final String dir;
-                final PropertyDescriptor directoryPropDescriptor = 
getPropertyDescriptor(DIRECTORY_PROP_NAME);
-                if (directoryPropDescriptor != null) {
-                    if 
(directoryPropDescriptor.isExpressionLanguageSupported()) {
-                        dir = 
context.getProperty(DIRECTORY_PROP_NAME).evaluateAttributeExpressions().getValue();
-                    } else {
-                        dir = 
context.getProperty(DIRECTORY_PROP_NAME).getValue();
-                    }
-                } else {
-                    dir = null;
-                }
-                resources = resetHDFSResources(configResources, dir == null ? 
"/" : dir, context);
+                final String configResources = 
context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
+                resources = resetHDFSResources(configResources, context);
                 hdfsResources.set(resources);
             }
         } catch (IOException ex) {
@@ -249,7 +254,7 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
     /*
      * Reset Hadoop Configuration and FileSystem based on the supplied 
configuration resources.
      */
-    HdfsResources resetHDFSResources(String configResources, String dir, 
ProcessContext context) throws IOException {
+    HdfsResources resetHDFSResources(String configResources, ProcessContext 
context) throws IOException {
         // org.apache.hadoop.conf.Configuration saves its current thread 
context class loader to use for threads that it creates
         // later to do I/O. We need this class loader to be the NarClassLoader 
instead of the magical
         // NarThreadContextClassLoader.
@@ -286,8 +291,10 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
                 }
             }
 
+            final Path workingDir = fs.getWorkingDirectory();
             getLogger().info("Initialized a new HDFS File System with working 
dir: {} default block size: {} default replication: {} config: {}",
-                    new Object[] { fs.getWorkingDirectory(), 
fs.getDefaultBlockSize(new Path(dir)), fs.getDefaultReplication(new Path(dir)), 
config.toString() });
+                    new Object[]{workingDir, 
fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), 
config.toString()});
+
             return new HdfsResources(config, fs, ugi);
 
         } finally {

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd0dd51f/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index fdb2dcf..3b1cce2 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -54,13 +54,14 @@ import java.util.concurrent.TimeUnit;
         + "not be fetched from HDFS")
 @SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class})
 public class FetchHDFS extends AbstractHadoopProcessor {
+
     static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
         .name("HDFS Filename")
         .description("The name of the HDFS file to retrieve")
         .required(true)
         .expressionLanguageSupported(true)
         .defaultValue("${path}/${filename}")
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
         .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -102,9 +103,20 @@ public class FetchHDFS extends AbstractHadoopProcessor {
         }
 
         final FileSystem hdfs = getFileSystem();
-        final Path path = new 
Path(context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue());
-        final URI uri = path.toUri();
+        final String filenameValue = 
context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue();
 
+        Path path = null;
+        try {
+            path = new Path(filenameValue);
+        } catch (IllegalArgumentException e) {
+            getLogger().error("Failed to retrieve content from {} for {} due 
to {}; routing to failure", new Object[] {filenameValue, flowFile, e});
+            flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", 
e.getMessage());
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final URI uri = path.toUri();
         final StopWatch stopWatch = new StopWatch(true);
         try (final FSDataInputStream inStream = hdfs.open(path, 16384)) {
             flowFile = session.importFrom(inStream, flowFile);

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd0dd51f/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 2631840..7ab7ebe 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -86,14 +86,6 @@ public class GetHDFS extends AbstractHadoopProcessor {
     .build();
 
     // properties
-    public static final PropertyDescriptor DIRECTORY = new 
PropertyDescriptor.Builder()
-    .name(DIRECTORY_PROP_NAME)
-    .description("The HDFS directory from which files should be read")
-    .required(true)
-    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-    .expressionLanguageSupported(true)
-    .build();
-
     public static final PropertyDescriptor RECURSE_SUBDIRS = new 
PropertyDescriptor.Builder()
     .name("Recurse Subdirectories")
     .description("Indicates whether to pull files from subdirectories of the 
HDFS directory")
@@ -224,6 +216,16 @@ public class GetHDFS extends AbstractHadoopProcessor {
                     .explanation(MIN_AGE.getName() + " cannot be greater than 
" + MAX_AGE.getName()).build());
         }
 
+        try {
+            new 
Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
+        } catch (Exception e) {
+            problems.add(new ValidationResult.Builder()
+                    .valid(false)
+                    .subject("Directory")
+                    .explanation(e.getMessage())
+                    .build());
+        }
+
         return problems;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd0dd51f/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 6d9f8f7..2ae65b2 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -42,7 +42,6 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.hadoop.util.HDFSListing;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.JsonParseException;
@@ -89,6 +88,7 @@ import java.util.concurrent.TimeUnit;
     + "Node is selected, the new node can pick up where the previous node left 
off, without duplicating the data.")
 @SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
 public class ListHDFS extends AbstractHadoopProcessor {
+
     public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new 
PropertyDescriptor.Builder()
         .name("Distributed Cache Service")
         .description("Specifies the Controller Service that should be used to 
maintain state about what has been pulled from HDFS so that if a new node "
@@ -97,14 +97,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
         .identifiesControllerService(DistributedMapCacheClient.class)
         .build();
 
-    public static final PropertyDescriptor DIRECTORY = new 
PropertyDescriptor.Builder()
-        .name(DIRECTORY_PROP_NAME)
-        .description("The HDFS directory from which files should be read")
-        .required(true)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(true)
-        .build();
-
     public static final PropertyDescriptor RECURSE_SUBDIRS = new 
PropertyDescriptor.Builder()
         .name("Recurse Subdirectories")
         .description("Indicates whether to list files from subdirectories of 
the HDFS directory")
@@ -287,14 +279,14 @@ public class ListHDFS extends AbstractHadoopProcessor {
         // Pull in any file that is newer than the timestamp that we have.
         final FileSystem hdfs = getFileSystem();
         final boolean recursive = 
context.getProperty(RECURSE_SUBDIRS).asBoolean();
-        final Path rootPath = new Path(directory);
 
         final Set<FileStatus> statuses;
         try {
+            final Path rootPath = new Path(directory);
             statuses = getStatuses(rootPath, recursive, hdfs);
             getLogger().debug("Found a total of {} files in HDFS", new 
Object[] {statuses.size()});
-        } catch (final IOException ioe) {
-            getLogger().error("Failed to perform listing of HDFS due to {}", 
new Object[] {ioe});
+        } catch (final IOException | IllegalArgumentException e) {
+            getLogger().error("Failed to perform listing of HDFS due to {}", 
new Object[] {e});
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd0dd51f/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index f05c2c7..3a0cb48 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -96,13 +96,6 @@ public class PutHDFS extends AbstractHadoopProcessor {
             .build();
 
     // properties
-    public static final PropertyDescriptor DIRECTORY = new 
PropertyDescriptor.Builder()
-            .name(DIRECTORY_PROP_NAME)
-            .description("The parent HDFS directory to which files should be 
written")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
 
     public static final PropertyDescriptor CONFLICT_RESOLUTION = new 
PropertyDescriptor.Builder()
             .name("Conflict Resolution Strategy")
@@ -168,7 +161,10 @@ public class PutHDFS extends AbstractHadoopProcessor {
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         List<PropertyDescriptor> props = new ArrayList<>(properties);
-        props.add(DIRECTORY);
+        props.add(new PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(DIRECTORY)
+                .description("The parent HDFS directory to which files should 
be written")
+                .build());
         props.add(CONFLICT_RESOLUTION);
         props.add(BLOCK_SIZE);
         props.add(BUFFER_SIZE);
@@ -212,27 +208,29 @@ public class PutHDFS extends AbstractHadoopProcessor {
             return;
         }
 
-        final Path configuredRootDirPath = new 
Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue());
-        final String conflictResponse = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
+        Path tempDotCopyFile = null;
+        try {
+            final String dirValue = 
context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+            final Path configuredRootDirPath = new Path(dirValue);
+
+            final String conflictResponse = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
 
-        final Double blockSizeProp = 
context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
-        final long blockSize = blockSizeProp != null ? 
blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
+            final Double blockSizeProp = 
context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
+            final long blockSize = blockSizeProp != null ? 
blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
 
-        final Double bufferSizeProp = 
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
-        final int bufferSize = bufferSizeProp != null ? 
bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, 
BUFFER_SIZE_DEFAULT);
+            final Double bufferSizeProp = 
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
+            final int bufferSize = bufferSizeProp != null ? 
bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, 
BUFFER_SIZE_DEFAULT);
 
-        final Integer replicationProp = 
context.getProperty(REPLICATION_FACTOR).asInteger();
-        final short replication = replicationProp != null ? 
replicationProp.shortValue() : hdfs
-                .getDefaultReplication(configuredRootDirPath);
+            final Integer replicationProp = 
context.getProperty(REPLICATION_FACTOR).asInteger();
+            final short replication = replicationProp != null ? 
replicationProp.shortValue() : hdfs
+                    .getDefaultReplication(configuredRootDirPath);
 
-        final CompressionCodec codec = getCompressionCodec(context, 
configuration);
+            final CompressionCodec codec = getCompressionCodec(context, 
configuration);
 
-        final String filename = codec != null
-                ? flowFile.getAttribute(CoreAttributes.FILENAME.key()) + 
codec.getDefaultExtension()
-                : flowFile.getAttribute(CoreAttributes.FILENAME.key());
+            final String filename = codec != null
+                    ? flowFile.getAttribute(CoreAttributes.FILENAME.key()) + 
codec.getDefaultExtension()
+                    : flowFile.getAttribute(CoreAttributes.FILENAME.key());
 
-        Path tempDotCopyFile = null;
-        try {
             final Path tempCopyFile = new Path(configuredRootDirPath, "." + 
filename);
             final Path copyFile = new Path(configuredRootDirPath, filename);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd0dd51f/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
index 76fc15d..9e2193d 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
@@ -112,7 +112,7 @@ public class AbstractHadoopTest {
         SimpleHadoopProcessor processor = new 
SimpleHadoopProcessor(kerberosProperties);
         TestRunner runner = TestRunners.newTestRunner(processor);
         try {
-            
processor.resetHDFSResources("src/test/resources/core-site-broken.xml", 
"/target", runner.getProcessContext());
+            
processor.resetHDFSResources("src/test/resources/core-site-broken.xml", 
runner.getProcessContext());
             Assert.fail("Should have thrown SocketTimeoutException");
         } catch (IOException e) {
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd0dd51f/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
index 79f7f54..6916195 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
@@ -103,7 +103,7 @@ public class GetHDFSSequenceFileTest {
 
     public class TestableGetHDFSSequenceFile extends GetHDFSSequenceFile {
         @Override
-        HdfsResources resetHDFSResources(String configResources, String dir, 
ProcessContext context) throws IOException {
+        HdfsResources resetHDFSResources(String configResources, 
ProcessContext context) throws IOException {
             return hdfsResources;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd0dd51f/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
index 64fe16f..582346a 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
@@ -196,6 +196,47 @@ public class GetHDFSTest {
         flowFile.assertContentEquals(expected);
     }
 
+    @Test
+    public void testDirectoryUsesValidEL() throws IOException {
+        GetHDFS proc = new TestableGetHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutHDFS.DIRECTORY, 
"src/test/resources/${literal('testdata'):substring(0,8)}");
+        runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip");
+        runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
+        runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
+        runner.run();
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        MockFlowFile flowFile = flowFiles.get(0);
+        
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
+        InputStream expected = 
getClass().getResourceAsStream("/testdata/13545423550275052.zip");
+        flowFile.assertContentEquals(expected);
+    }
+
+    @Test
+    public void testDirectoryUsesUnrecognizedEL() throws IOException {
+        GetHDFS proc = new TestableGetHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutHDFS.DIRECTORY, 
"data_${literal('testing'):substring(0,4)%7D");
+        runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip");
+        runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
+        runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testDirectoryUsesInvalidEL() throws IOException {
+        GetHDFS proc = new TestableGetHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutHDFS.DIRECTORY, 
"data_${literal('testing'):foo()}");
+        runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip");
+        runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
+        runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
+        runner.assertNotValid();
+    }
+
     private static class TestableGetHDFS extends GetHDFS {
 
         private final KerberosProperties testKerberosProperties;

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd0dd51f/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index 34efcb2..c8f8fb1 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -295,6 +295,73 @@ public class PutHDFSTest {
         fs.delete(p, true);
     }
 
+    @Test
+    public void testPutFileWhenDirectoryUsesValidELFunction() throws 
IOException {
+        // Refer to comment in the BeforeClass method for an explanation
+        assumeTrue(isNotWindows());
+
+        PutHDFS proc = new TestablePutHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutHDFS.DIRECTORY, 
"target/data_${literal('testing'):substring(0,4)}");
+        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
+        try (FileInputStream fis = new 
FileInputStream("src/test/resources/testdata/randombytes-1");) {
+            Map<String, String> attributes = new HashMap<String, String>();
+            attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
+            runner.enqueue(fis, attributes);
+            runner.run();
+        }
+
+        Configuration config = new Configuration();
+        FileSystem fs = FileSystem.get(config);
+
+        List<MockFlowFile> failedFlowFiles = runner
+                .getFlowFilesForRelationship(new 
Relationship.Builder().name("failure").build());
+        assertTrue(failedFlowFiles.isEmpty());
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+        MockFlowFile flowFile = flowFiles.get(0);
+        assertTrue(fs.exists(new Path("target/test-classes/randombytes-1")));
+        assertEquals("randombytes-1", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+        assertEquals("target/data_test", 
flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+    }
+
+    @Test
+    public void testPutFileWhenDirectoryUsesUnrecognizedEL() throws 
IOException {
+        // Refer to comment in the BeforeClass method for an explanation
+        assumeTrue(isNotWindows());
+
+        PutHDFS proc = new TestablePutHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+
+        // this value somehow causes NiFi to not even recognize the EL, and 
thus it returns successfully from calling
+        // evaluateAttributeExpressions and then tries to create a Path with 
the exact value below and blows up
+        runner.setProperty(PutHDFS.DIRECTORY, 
"data_${literal('testing'):substring(0,4)%7D");
+
+        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
+        try (FileInputStream fis = new 
FileInputStream("src/test/resources/testdata/randombytes-1");) {
+            Map<String, String> attributes = new HashMap<String, String>();
+            attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
+            runner.enqueue(fis, attributes);
+            runner.run();
+        }
+
+        runner.assertAllFlowFilesTransferred(PutHDFS.REL_FAILURE);
+    }
+
+    @Test
+    public void testPutFileWhenDirectoryUsesInvalidEL() throws IOException {
+        // Refer to comment in the BeforeClass method for an explanation
+        assumeTrue(isNotWindows());
+
+        PutHDFS proc = new TestablePutHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        // the validator should pick up the invalid EL
+        runner.setProperty(PutHDFS.DIRECTORY, 
"target/data_${literal('testing'):foo()}");
+        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
+        runner.assertNotValid();
+    }
+
     private boolean isNotWindows() {
         return !System.getProperty("os.name").startsWith("Windows");
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd0dd51f/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
new file mode 100644
index 0000000..e49975b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestFetchHDFS {
+
+    private TestRunner runner;
+    private TestableFetchHDFS proc;
+    private NiFiProperties mockNiFiProperties;
+    private KerberosProperties kerberosProperties;
+
+    @Before
+    public void setup() {
+        mockNiFiProperties = mock(NiFiProperties.class);
+        
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
+        kerberosProperties = KerberosProperties.create(mockNiFiProperties);
+
+        proc = new TestableFetchHDFS(kerberosProperties);
+        runner = TestRunners.newTestRunner(proc);
+    }
+
+    @Test
+    public void testFetchStaticFileThatExists() throws IOException {
+        final String file = "src/test/resources/testdata/randombytes-1";
+        runner.setProperty(FetchHDFS.FILENAME, file);
+        runner.enqueue(new String("trigger flow file"));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testFetchStaticFileThatDoesNotExist() throws IOException {
+        final String file = "src/test/resources/testdata/doesnotexist";
+        runner.setProperty(FetchHDFS.FILENAME, file);
+        runner.enqueue(new String("trigger flow file"));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchHDFS.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testFetchFileThatExistsFromIncomingFlowFile() throws 
IOException {
+        final String file = "src/test/resources/testdata/randombytes-1";
+        runner.setProperty(FetchHDFS.FILENAME, "${my.file}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("my.file", file);
+
+        runner.enqueue(new String("trigger flow file"), attributes);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testFilenameWithValidEL() throws IOException {
+        final String file = 
"src/test/resources/testdata/${literal('randombytes-1')}";
+        runner.setProperty(FetchHDFS.FILENAME, file);
+        runner.enqueue(new String("trigger flow file"));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testFilenameWithInvalidEL() throws IOException {
+        final String file = 
"src/test/resources/testdata/${literal('randombytes-1'):foo()}";
+        runner.setProperty(FetchHDFS.FILENAME, file);
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testFilenameWithUnrecognizedEL() throws IOException {
+        final String file = "data_${literal('testing'):substring(0,4)%7D";
+        runner.setProperty(FetchHDFS.FILENAME, file);
+        runner.enqueue(new String("trigger flow file"));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchHDFS.REL_FAILURE, 1);
+    }
+
+    private static class TestableFetchHDFS extends FetchHDFS {
+        private final KerberosProperties testKerberosProps;
+
+        public TestableFetchHDFS(KerberosProperties testKerberosProps) {
+            this.testKerberosProps = testKerberosProps;
+        }
+
+        @Override
+        protected KerberosProperties getKerberosProperties() {
+            return testKerberosProps;
+        }
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd0dd51f/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
index 6fcea95..d4204ea 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -83,6 +83,47 @@ public class TestListHDFS {
     }
 
     @Test
+    public void testListingWithValidELFunction() throws InterruptedException {
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
+
+        runner.setProperty(ListHDFS.DIRECTORY, 
"${literal('/test'):substring(0,5)}");
+
+        // first iteration will not pick up files because it has to instead 
check timestamps.
+        // We must then wait long enough to ensure that the listing can be 
performed safely and
+        // run the Processor again.
+        runner.run();
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+        final MockFlowFile mff = 
runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+        mff.assertAttributeEquals("path", "/test");
+        mff.assertAttributeEquals("filename", "testFile.txt");
+    }
+
+    @Test
+    public void testListingWithInalidELFunction() throws InterruptedException {
+        runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):foo()}");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testListingWithUnrecognizedELFunction() throws 
InterruptedException {
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
+
+        runner.setProperty(ListHDFS.DIRECTORY, 
"data_${literal('testing'):substring(0,4)%7D");
+
+        // first iteration will not pick up files because it has to instead 
check timestamps.
+        // We must then wait long enough to ensure that the listing can be 
performed safely and
+        // run the Processor again.
+        runner.run();
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+    }
+
+    @Test
     public void testListingHasCorrectAttributes() throws InterruptedException {
         proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
 
@@ -287,13 +328,12 @@ public class TestListHDFS {
         }
     }
 
-
     private class MockFileSystem extends FileSystem {
         private final Map<Path, Set<FileStatus>> fileStatuses = new 
HashMap<>();
 
         public void addFileStatus(final Path parent, final FileStatus child) {
             Set<FileStatus> children = fileStatuses.get(parent);
-            if ( children == null ) {
+            if (children == null) {
                 children = new HashSet<>();
                 fileStatuses.put(parent, children);
             }
@@ -301,7 +341,6 @@ public class TestListHDFS {
             children.add(child);
         }
 
-
         @Override
         public long getDefaultBlockSize() {
             return 1024L;
@@ -324,7 +363,7 @@ public class TestListHDFS {
 
         @Override
         public FSDataOutputStream create(final Path f, final FsPermission 
permission, final boolean overwrite, final int bufferSize, final short 
replication,
-                final long blockSize, final Progressable progress) throws 
IOException {
+                                         final long blockSize, final 
Progressable progress) throws IOException {
             return null;
         }
 
@@ -346,7 +385,7 @@ public class TestListHDFS {
         @Override
         public FileStatus[] listStatus(final Path f) throws 
FileNotFoundException, IOException {
             final Set<FileStatus> statuses = fileStatuses.get(f);
-            if ( statuses == null ) {
+            if (statuses == null) {
                 return new FileStatus[0];
             }
 
@@ -375,7 +414,6 @@ public class TestListHDFS {
 
     }
 
-
     private class MockCacheClient extends AbstractControllerService implements 
DistributedMapCacheClient {
         private final ConcurrentMap<Object, Object> values = new 
ConcurrentHashMap<>();
         private boolean failOnCalls = false;

Reply via email to