Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 594262a84 -> 14e73bc24


NIFI-374: Route ProcessException's to failure


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/ab6794b2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/ab6794b2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/ab6794b2

Branch: refs/heads/develop
Commit: ab6794b29ebd4e7205e9aa0a7e8c227374a4b6d3
Parents: 6e5469c
Author: Mark Payne <[email protected]>
Authored: Wed Apr 29 17:02:08 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Wed Apr 29 17:02:08 2015 -0400

----------------------------------------------------------------------
 .../processors/standard/EncryptContent.java     | 100 ++++++++++---------
 .../processors/standard/TestEncryptContent.java |  21 ++++
 2 files changed, 74 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ab6794b2/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
index c0f6301..7fe9fbc 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
@@ -74,35 +74,35 @@ public class EncryptContent extends AbstractProcessor {
     public static final int DEFAULT_SALT_SIZE = 8;
 
     public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
-            .name("Mode")
-            .description("Specifies whether the content should be encrypted or 
decrypted")
-            .required(true)
-            .allowableValues(ENCRYPT_MODE, DECRYPT_MODE)
-            .defaultValue(ENCRYPT_MODE)
-            .build();
+    .name("Mode")
+    .description("Specifies whether the content should be encrypted or 
decrypted")
+    .required(true)
+    .allowableValues(ENCRYPT_MODE, DECRYPT_MODE)
+    .defaultValue(ENCRYPT_MODE)
+    .build();
     public static final PropertyDescriptor ENCRYPTION_ALGORITHM = new 
PropertyDescriptor.Builder()
-            .name("Encryption Algorithm")
-            .description("The Encryption Algorithm to use")
-            .required(true)
-            .allowableValues(EncryptionMethod.values())
-            .defaultValue(EncryptionMethod.MD5_256AES.name())
-            .build();
+    .name("Encryption Algorithm")
+    .description("The Encryption Algorithm to use")
+    .required(true)
+    .allowableValues(EncryptionMethod.values())
+    .defaultValue(EncryptionMethod.MD5_256AES.name())
+    .build();
     public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
-            .name("Password")
-            .description("The Password to use for encrypting or decrypting the 
data")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .sensitive(true)
-            .build();
+    .name("Password")
+    .description("The Password to use for encrypting or decrypting the data")
+    .required(true)
+    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+    .sensitive(true)
+    .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("Any FlowFile that is successfully encrypted or 
decrypted will be routed to success")
-            .build();
+    .name("success")
+    .description("Any FlowFile that is successfully encrypted or decrypted 
will be routed to success")
+    .build();
     public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("Any FlowFile that cannot be encrypted or decrypted 
will be routed to failure")
-            .build();
+    .name("failure")
+    .description("Any FlowFile that cannot be encrypted or decrypted will be 
routed to failure")
+    .build();
 
     private List<PropertyDescriptor> properties;
     private Set<Relationship> relationships;
@@ -172,34 +172,40 @@ public class EncryptContent extends AbstractProcessor {
         final int saltSize = (algorithmBlockSize > 0) ? algorithmBlockSize : 
DEFAULT_SALT_SIZE;
 
         final StopWatch stopWatch = new StopWatch(true);
-        if 
(context.getProperty(MODE).getValue().equalsIgnoreCase(ENCRYPT_MODE)) {
-            final byte[] salt = new byte[saltSize];
-            secureRandom.nextBytes(salt);
+        final String mode = context.getProperty(MODE).getValue();
+        try {
+            if (mode.equalsIgnoreCase(ENCRYPT_MODE)) {
+                final byte[] salt = new byte[saltSize];
+                secureRandom.nextBytes(salt);
+
+                final PBEParameterSpec parameterSpec = new 
PBEParameterSpec(salt, 1000);
+                try {
+                    cipher.init(Cipher.ENCRYPT_MODE, secretKey, parameterSpec);
+                } catch (final InvalidKeyException | 
InvalidAlgorithmParameterException e) {
+                    logger.error("unable to encrypt {} due to {}", new 
Object[]{flowFile, e});
+                    session.transfer(flowFile, REL_FAILURE);
+                    return;
+                }
 
-            final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 
1000);
-            try {
-                cipher.init(Cipher.ENCRYPT_MODE, secretKey, parameterSpec);
-            } catch (final InvalidKeyException | 
InvalidAlgorithmParameterException e) {
-                logger.error("unable to encrypt {} due to {}", new 
Object[]{flowFile, e});
-                session.transfer(flowFile, REL_FAILURE);
-                return;
-            }
+                flowFile = session.write(flowFile, new EncryptCallback(cipher, 
salt));
+                logger.info("Successfully encrypted {}", new 
Object[]{flowFile});
+            } else {
+                if (flowFile.getSize() <= saltSize) {
+                    logger.error("Cannot decrypt {} because its file size is 
not greater than the salt size", new Object[]{flowFile});
+                    session.transfer(flowFile, REL_FAILURE);
+                    return;
+                }
 
-            flowFile = session.write(flowFile, new EncryptCallback(cipher, 
salt));
-            logger.info("Successfully encrypted {}", new Object[]{flowFile});
-        } else {
-            if (flowFile.getSize() <= saltSize) {
-                logger.error("Cannot decrypt {} because its file size is not 
greater than the salt size", new Object[]{flowFile});
-                session.transfer(flowFile, REL_FAILURE);
-                return;
+                flowFile = session.write(flowFile, new DecryptCallback(cipher, 
secretKey, saltSize));
+                logger.info("successfully decrypted {}", new 
Object[]{flowFile});
             }
 
-            flowFile = session.write(flowFile, new DecryptCallback(cipher, 
secretKey, saltSize));
-            logger.info("successfully decrypted {}", new Object[]{flowFile});
+            session.getProvenanceReporter().modifyContent(flowFile, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (final ProcessException pe) {
+            getLogger().error("Failed to {} {} due to {}; routing to failure", 
new Object[] {mode, flowFile, pe});
+            session.transfer(flowFile, REL_FAILURE);
         }
-
-        session.getProvenanceReporter().modifyContent(flowFile, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-        session.transfer(flowFile, REL_SUCCESS);
     }
 
     private static class DecryptCallback implements StreamCallback {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ab6794b2/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java
index 7340e0f..59ac975 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncryptContent.java
@@ -61,4 +61,25 @@ public class TestEncryptContent {
         }
     }
 
+    @Test
+    public void testDecryptNonEncryptedFile() throws IOException {
+        final TestRunner testRunner = TestRunners.newTestRunner(new 
EncryptContent());
+        testRunner.setProperty(EncryptContent.PASSWORD, "Hello, World!");
+
+        for (final EncryptionMethod method : EncryptionMethod.values()) {
+            if (method.isUnlimitedStrength()) {
+                continue;   // cannot test unlimited strength in unit tests 
because it's not enabled by the JVM by default.
+            }
+
+            testRunner.setProperty(EncryptContent.ENCRYPTION_ALGORITHM, 
method.name());
+            testRunner.setProperty(EncryptContent.MODE, 
EncryptContent.DECRYPT_MODE);
+
+            testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
+            testRunner.clearTransferState();
+            testRunner.run();
+
+            
testRunner.assertAllFlowFilesTransferred(EncryptContent.REL_FAILURE, 1);
+        }
+    }
+
 }

Reply via email to