Repository: nifi Updated Branches: refs/heads/master 1370eefdd -> 7bd2c64ad
NIFI-1865 Close StreamThrottler when processor stops. - Also, replaced copyrighted sample texts for tests Signed-off-by: Matt Burgess <mattyb...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7bd2c64a Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7bd2c64a Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7bd2c64a Branch: refs/heads/master Commit: 7bd2c64adb6a6abb012861d40d84b30841043b23 Parents: 1370eef Author: Koji Kawamura <ijokaruma...@gmail.com> Authored: Wed May 11 08:08:58 2016 +0900 Committer: Matt Burgess <mattyb...@apache.org> Committed: Tue May 10 22:02:15 2016 -0400 ---------------------------------------------------------------------- .../nifi/processors/standard/ListenHTTP.java | 12 +++++ .../nifi/processors/standard/PostHTTP.java | 9 ++++ .../nifi/processors/standard/TestPostHTTP.java | 47 +++++++++++++++----- 3 files changed, 58 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/7bd2c64a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java index 88b6666..5ea116e7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.standard; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -136,6 +137,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { private volatile Server server = null; private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>(); private final AtomicReference<ProcessSessionFactory> sessionFactoryReference = new AtomicReference<>(); + private final AtomicReference<StreamThrottler> throttlerRef = new AtomicReference<>(); @Override protected void init(final ProcessorInitializationContext context) { @@ -166,6 +168,15 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { @OnStopped public void shutdownHttpServer() { + final StreamThrottler throttler = throttlerRef.getAndSet(null); + if(throttler != null) { + try { + throttler.close(); + } catch (IOException e) { + getLogger().error("Failed to close StreamThrottler", e); + } + } + final Server toShutdown = this.server; if (toShutdown == null) { return; @@ -185,6 +196,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B); final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue()); + throttlerRef.set(streamThrottler); final boolean needClientAuth = sslContextService == null ? false : sslContextService.getTrustStoreFile() != null; http://git-wip-us.apache.org/repos/asf/nifi/blob/7bd2c64a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java index 230790a..79d2815 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java @@ -351,6 +351,15 @@ public class PostHTTP extends AbstractProcessor { } configMap.clear(); + + final StreamThrottler throttler = throttlerRef.getAndSet(null); + if(throttler != null) { + try { + throttler.close(); + } catch (IOException e) { + getLogger().error("Failed to close StreamThrottler", e); + } + } } @OnScheduled http://git-wip-us.apache.org/repos/asf/nifi/blob/7bd2c64a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java index 5262964..edff3b4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java @@ -261,7 +261,7 @@ public class TestPostHTTP { final String suppliedMimeType = "text/plain"; attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType); - runner.enqueue("Camping is in tents.".getBytes(), attrs); + runner.enqueue("Camping is great!".getBytes(), attrs); runner.setProperty(PostHTTP.CHUNKED_ENCODING, "false"); runner.run(1); @@ -269,7 +269,7 @@ public class TestPostHTTP { Map<String, String> lastPostHeaders = servlet.getLastPostHeaders(); Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); - Assert.assertEquals("20",lastPostHeaders.get("Content-Length")); + Assert.assertEquals("17",lastPostHeaders.get("Content-Length")); } @Test @@ -280,7 +280,7 @@ public class TestPostHTTP { final Map<String, String> attrs = new HashMap<>(); attrs.put(CoreAttributes.MIME_TYPE.key(), ""); - runner.enqueue("The wilderness downtown.".getBytes(), attrs); + runner.enqueue("The wilderness.".getBytes(), attrs); runner.run(1); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); @@ -300,7 +300,7 @@ public class TestPostHTTP { final Map<String, String> attrs = new HashMap<>(); attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv"); - runner.enqueue("Try this trick and spin it.".getBytes(), attrs); + runner.enqueue("Sending with content type property.".getBytes(), attrs); runner.run(1); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); @@ -321,7 +321,7 @@ public class TestPostHTTP { final Map<String, String> attrs = new HashMap<>(); attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); - runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs); + runner.enqueue(StringUtils.repeat("Lines of sample text.", 100).getBytes(), attrs); runner.run(1); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); @@ -346,7 +346,7 @@ public class TestPostHTTP { final Map<String, String> attrs = new HashMap<>(); attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); - runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs); + runner.enqueue(StringUtils.repeat("Lines of sample text.", 100).getBytes(), attrs); runner.run(1); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); @@ -355,7 +355,7 @@ public class TestPostHTTP { Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); // Ensure that the request was not sent with a 'Content-Encoding' header Assert.assertNull(lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER)); - Assert.assertEquals("6200",lastPostHeaders.get("Content-Length")); + Assert.assertEquals("2100",lastPostHeaders.get("Content-Length")); } @Test @@ -371,7 +371,7 @@ public class TestPostHTTP { final Map<String, String> attrs = new HashMap<>(); attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); - runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs); + runner.enqueue(StringUtils.repeat("Lines of sample text.", 100).getBytes(), attrs); runner.run(1); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); @@ -394,17 +394,44 @@ public class TestPostHTTP { final Map<String, String> attrs = new HashMap<>(); attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); - runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs); + runner.enqueue(StringUtils.repeat("Lines of sample text.", 100).getBytes(), attrs); runner.run(1); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); byte[] postValue = servlet.getLastPost(); - Assert.assertArrayEquals(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(),postValue); + Assert.assertArrayEquals(StringUtils.repeat("Lines of sample text.", 100).getBytes(),postValue); Map<String, String> lastPostHeaders = servlet.getLastPostHeaders(); Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); Assert.assertNull(lastPostHeaders.get("Content-Length")); Assert.assertEquals("chunked",lastPostHeaders.get("Transfer-Encoding")); } + + @Test + public void testSendWithThrottler() throws Exception { + setup(null); + + final String suppliedMimeType = "text/plain"; + runner.setProperty(PostHTTP.URL, server.getUrl()); + runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType); + runner.setProperty(PostHTTP.CHUNKED_ENCODING, "false"); + runner.setProperty(PostHTTP.MAX_DATA_RATE, "10kb"); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); + + runner.enqueue(StringUtils.repeat("This is a line of sample text. Here is another.", 100).getBytes(), attrs); + + boolean stopOnFinish = true; + runner.run(1, stopOnFinish); + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); + + byte[] postValue = servlet.getLastPost(); + Assert.assertArrayEquals(StringUtils.repeat("This is a line of sample text. Here is another.", 100).getBytes(),postValue); + + Map<String, String> lastPostHeaders = servlet.getLastPostHeaders(); + Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); + Assert.assertEquals("4700",lastPostHeaders.get("Content-Length")); + } }