Repository: nifi
Updated Branches:
  refs/heads/master a7b97419e -> b188b0abd


NIFI-1420 Fixing bug where a FlowFile should route to failure when PutSplunk 
can't createa connection, defaulting PutSplunk to TCP

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b188b0ab
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b188b0ab
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b188b0ab

Branch: refs/heads/master
Commit: b188b0abd6901f702c58737b616fa7cd1874c1df
Parents: a7b9741
Author: Bryan Bende <[email protected]>
Authored: Wed Mar 16 16:17:23 2016 -0400
Committer: joewitt <[email protected]>
Committed: Wed Mar 16 17:43:28 2016 -0400

----------------------------------------------------------------------
 .../util/put/AbstractPutEventProcessor.java     |  2 +-
 .../nifi/processors/splunk/PutSplunk.java       |  1 +
 .../nifi/processors/splunk/TestPutSplunk.java   | 30 ++++++++++++++++++++
 3 files changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b188b0ab/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index c7313dc..961edf5 100644
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -104,7 +104,7 @@ public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryPr
             .description("The protocol for communication.")
             .required(true)
             .allowableValues(TCP_VALUE, UDP_VALUE)
-            .defaultValue(UDP_VALUE.getValue())
+            .defaultValue(TCP_VALUE.getValue())
             .build();
     public static final PropertyDescriptor MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
             .name("Message Delimiter")

http://git-wip-us.apache.org/repos/asf/nifi/blob/b188b0ab/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
index 482b85d..39c6843 100644
--- 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
+++ 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
@@ -160,6 +160,7 @@ public class PutSplunk extends AbstractPutEventProcessor {
                 getLogger().error("No available connections, and unable to 
create a new one, transferring {} to failure",
                         new Object[]{flowFile}, e);
                 session.transfer(flowFile, REL_FAILURE);
+                session.commit();
                 context.yield();
                 return;
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b188b0ab/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
index bb55372..d6bf3a1 100644
--- 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
+++ 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
@@ -53,6 +53,7 @@ public class TestPutSplunk {
 
     @Test
     public void testUDPSendWholeFlowFile() {
+        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.UDP_VALUE.getValue());
         final String message = "This is one message, should send the whole 
FlowFile";
 
         runner.enqueue(message);
@@ -102,6 +103,8 @@ public class TestPutSplunk {
 
     @Test
     public void testUDPSendDelimitedMessages() {
+        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.UDP_VALUE.getValue());
+
         final String delimiter = "DD";
         runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
 
@@ -286,6 +289,8 @@ public class TestPutSplunk {
 
     @Test
     public void testCompletingPreviousBatchOnNextExecution() {
+        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.UDP_VALUE.getValue());
+
         final String message = "This is one message, should send the whole 
FlowFile";
 
         runner.enqueue(message);
@@ -299,6 +304,30 @@ public class TestPutSplunk {
         Assert.assertEquals(message, sender.getMessages().get(0));
     }
 
+    @Test
+    public void testUnableToCreateConnectionShouldRouteToFailure() {
+        PutSplunk proc = new UnableToConnectPutSplunk();
+        runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutSplunk.PORT, "12345");
+
+        final String message = "This is one message, should send the whole 
FlowFile";
+
+        runner.enqueue(message);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutSplunk.REL_FAILURE, 1);
+    }
+
+    /**
+     * Extend PutSplunk to use a CapturingChannelSender.
+     */
+    private static class UnableToConnectPutSplunk extends PutSplunk {
+
+        @Override
+        protected ChannelSender createSender(String protocol, String host, int 
port, int timeout, int maxSendBufferSize, SSLContext sslContext) throws 
IOException {
+            throw new IOException("Unable to create connection");
+        }
+    }
+
     /**
      * Extend PutSplunk to use a CapturingChannelSender.
      */
@@ -316,6 +345,7 @@ public class TestPutSplunk {
         }
     }
 
+
     /**
      * A ChannelSender that captures each message that was sent.
      */

Reply via email to