Repository: nifi
Updated Branches:
  refs/heads/master 1370eefdd -> 7bd2c64ad


NIFI-1865 Close StreamThrottler when processor stops.

- Also, replaced copyrighted sample texts for tests

Signed-off-by: Matt Burgess <mattyb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7bd2c64a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7bd2c64a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7bd2c64a

Branch: refs/heads/master
Commit: 7bd2c64adb6a6abb012861d40d84b30841043b23
Parents: 1370eef
Author: Koji Kawamura <ijokaruma...@gmail.com>
Authored: Wed May 11 08:08:58 2016 +0900
Committer: Matt Burgess <mattyb...@apache.org>
Committed: Tue May 10 22:02:15 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenHTTP.java    | 12 +++++
 .../nifi/processors/standard/PostHTTP.java      |  9 ++++
 .../nifi/processors/standard/TestPostHTTP.java  | 47 +++++++++++++++-----
 3 files changed, 58 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7bd2c64a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 88b6666..5ea116e7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.standard;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -136,6 +137,7 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
     private volatile Server server = null;
     private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap 
= new ConcurrentHashMap<>();
     private final AtomicReference<ProcessSessionFactory> 
sessionFactoryReference = new AtomicReference<>();
+    private final AtomicReference<StreamThrottler> throttlerRef = new 
AtomicReference<>();
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -166,6 +168,15 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
 
     @OnStopped
     public void shutdownHttpServer() {
+        final StreamThrottler throttler = throttlerRef.getAndSet(null);
+        if(throttler != null) {
+            try {
+                throttler.close();
+            } catch (IOException e) {
+                getLogger().error("Failed to close StreamThrottler", e);
+            }
+        }
+
         final Server toShutdown = this.server;
         if (toShutdown == null) {
             return;
@@ -185,6 +196,7 @@ public class ListenHTTP extends 
AbstractSessionFactoryProcessor {
         final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
         final Double maxBytesPerSecond = 
context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
         final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? 
null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
+        throttlerRef.set(streamThrottler);
 
         final boolean needClientAuth = sslContextService == null ? false : 
sslContextService.getTrustStoreFile() != null;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bd2c64a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
index 230790a..79d2815 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
@@ -351,6 +351,15 @@ public class PostHTTP extends AbstractProcessor {
         }
 
         configMap.clear();
+
+        final StreamThrottler throttler = throttlerRef.getAndSet(null);
+        if(throttler != null) {
+            try {
+                throttler.close();
+            } catch (IOException e) {
+                getLogger().error("Failed to close StreamThrottler", e);
+            }
+        }
     }
 
     @OnScheduled

http://git-wip-us.apache.org/repos/asf/nifi/blob/7bd2c64a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java
index 5262964..edff3b4 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java
@@ -261,7 +261,7 @@ public class TestPostHTTP {
 
         final String suppliedMimeType = "text/plain";
         attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType);
-        runner.enqueue("Camping is in tents.".getBytes(), attrs);
+        runner.enqueue("Camping is great!".getBytes(), attrs);
         runner.setProperty(PostHTTP.CHUNKED_ENCODING, "false");
 
         runner.run(1);
@@ -269,7 +269,7 @@ public class TestPostHTTP {
 
         Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
         Assert.assertEquals(suppliedMimeType, 
lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
-        Assert.assertEquals("20",lastPostHeaders.get("Content-Length"));
+        Assert.assertEquals("17",lastPostHeaders.get("Content-Length"));
     }
 
     @Test
@@ -280,7 +280,7 @@ public class TestPostHTTP {
 
         final Map<String, String> attrs = new HashMap<>();
         attrs.put(CoreAttributes.MIME_TYPE.key(), "");
-        runner.enqueue("The wilderness downtown.".getBytes(), attrs);
+        runner.enqueue("The wilderness.".getBytes(), attrs);
 
         runner.run(1);
         runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
@@ -300,7 +300,7 @@ public class TestPostHTTP {
 
         final Map<String, String> attrs = new HashMap<>();
         attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
-        runner.enqueue("Try this trick and spin it.".getBytes(), attrs);
+        runner.enqueue("Sending with content type property.".getBytes(), 
attrs);
 
         runner.run(1);
         runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
@@ -321,7 +321,7 @@ public class TestPostHTTP {
         final Map<String, String> attrs = new HashMap<>();
         attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
 
-        runner.enqueue(StringUtils.repeat("This is the song that never ends. 
It goes on and on my friend.", 100).getBytes(), attrs);
+        runner.enqueue(StringUtils.repeat("Lines of sample text.", 
100).getBytes(), attrs);
 
         runner.run(1);
         runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
@@ -346,7 +346,7 @@ public class TestPostHTTP {
         final Map<String, String> attrs = new HashMap<>();
         attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
 
-        runner.enqueue(StringUtils.repeat("This is the song that never ends. 
It goes on and on my friend.", 100).getBytes(), attrs);
+        runner.enqueue(StringUtils.repeat("Lines of sample text.", 
100).getBytes(), attrs);
 
         runner.run(1);
         runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
@@ -355,7 +355,7 @@ public class TestPostHTTP {
         Assert.assertEquals(suppliedMimeType, 
lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
         // Ensure that the request was not sent with a 'Content-Encoding' 
header
         
Assert.assertNull(lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER));
-        Assert.assertEquals("6200",lastPostHeaders.get("Content-Length"));
+        Assert.assertEquals("2100",lastPostHeaders.get("Content-Length"));
     }
 
     @Test
@@ -371,7 +371,7 @@ public class TestPostHTTP {
         final Map<String, String> attrs = new HashMap<>();
         attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
 
-        runner.enqueue(StringUtils.repeat("This is the song that never ends. 
It goes on and on my friend.", 100).getBytes(), attrs);
+        runner.enqueue(StringUtils.repeat("Lines of sample text.", 
100).getBytes(), attrs);
 
         runner.run(1);
         runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
@@ -394,17 +394,44 @@ public class TestPostHTTP {
         final Map<String, String> attrs = new HashMap<>();
         attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
 
-        runner.enqueue(StringUtils.repeat("This is the song that never ends. 
It goes on and on my friend.", 100).getBytes(), attrs);
+        runner.enqueue(StringUtils.repeat("Lines of sample text.", 
100).getBytes(), attrs);
 
         runner.run(1);
         runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
 
         byte[] postValue = servlet.getLastPost();
-        Assert.assertArrayEquals(StringUtils.repeat("This is the song that 
never ends. It goes on and on my friend.", 100).getBytes(),postValue);
+        Assert.assertArrayEquals(StringUtils.repeat("Lines of sample text.", 
100).getBytes(),postValue);
 
         Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
         Assert.assertEquals(suppliedMimeType, 
lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
         Assert.assertNull(lastPostHeaders.get("Content-Length"));
         
Assert.assertEquals("chunked",lastPostHeaders.get("Transfer-Encoding"));
     }
+
+    @Test
+    public void testSendWithThrottler() throws Exception {
+        setup(null);
+
+        final String suppliedMimeType = "text/plain";
+        runner.setProperty(PostHTTP.URL, server.getUrl());
+        runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType);
+        runner.setProperty(PostHTTP.CHUNKED_ENCODING, "false");
+        runner.setProperty(PostHTTP.MAX_DATA_RATE, "10kb");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+
+        runner.enqueue(StringUtils.repeat("This is a line of sample text. Here 
is another.", 100).getBytes(), attrs);
+
+        boolean stopOnFinish = true;
+        runner.run(1, stopOnFinish);
+        runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
+
+        byte[] postValue = servlet.getLastPost();
+        Assert.assertArrayEquals(StringUtils.repeat("This is a line of sample 
text. Here is another.", 100).getBytes(),postValue);
+
+        Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
+        Assert.assertEquals(suppliedMimeType, 
lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
+        Assert.assertEquals("4700",lastPostHeaders.get("Content-Length"));
+    }
 }

Reply via email to