Repository: nifi
Updated Branches:
  refs/heads/master 2c9fb676c -> 9ebcc9e4f


NIFI-1393 Providing the ability to send using gzip Content-Encoding in PostHTTP 
if the endpoint server supports it regardless if the processor is configured to 
send as a FlowFile This closes #175

Signed-off-by: Matt Gilman <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9ebcc9e4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9ebcc9e4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9ebcc9e4

Branch: refs/heads/master
Commit: 9ebcc9e4fa14d8ae9e6b312865f11c4661bc1a68
Parents: 2c9fb67
Author: Aldrin Piri <[email protected]>
Authored: Thu Jan 14 13:24:20 2016 -0500
Committer: Matt Gilman <[email protected]>
Committed: Wed Jan 20 16:46:57 2016 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/PostHTTP.java      | 92 ++++++++++----------
 .../processors/standard/CaptureServlet.java     |  5 +-
 .../nifi/processors/standard/TestPostHTTP.java  | 71 +++++++++++++++
 3 files changed, 122 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9ebcc9e4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
index aacdb6a..4aba8fe 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
@@ -141,6 +141,8 @@ public class PostHTTP extends AbstractProcessor {
     public static final String LOCATION_URI_INTENT_NAME = 
"x-location-uri-intent";
     public static final String LOCATION_URI_INTENT_VALUE = "flowfile-hold";
     public static final String GZIPPED_HEADER = "flowfile-gzipped";
+    public static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
+    public static final String CONTENT_ENCODING_GZIP_VALUE = "gzip";
 
     public static final String PROTOCOL_VERSION_HEADER = 
"x-nifi-transfer-protocol-version";
     public static final String TRANSACTION_ID_HEADER = "x-nifi-transaction-id";
@@ -534,12 +536,7 @@ public class PostHTTP extends AbstractProcessor {
                 destinationAccepts = config.getDestinationAccepts();
                 if (destinationAccepts == null) {
                     try {
-                        if (sendAsFlowFile) {
-                            destinationAccepts = 
getDestinationAcceptance(client, url, getLogger(), transactionId);
-                        } else {
-                            destinationAccepts = new DestinationAccepts(false, 
false, false, false, null);
-                        }
-
+                        destinationAccepts = 
getDestinationAcceptance(sendAsFlowFile, client, url, getLogger(), 
transactionId);
                         config.setDestinationAccepts(destinationAccepts);
                     } catch (final IOException e) {
                         flowFile = session.penalize(flowFile);
@@ -673,7 +670,11 @@ public class PostHTTP extends AbstractProcessor {
         post.setHeader(PROTOCOL_VERSION_HEADER, PROTOCOL_VERSION);
         post.setHeader(TRANSACTION_ID_HEADER, transactionId);
         if (compressionLevel > 0 && accepts.isGzipAccepted()) {
-            post.setHeader(GZIPPED_HEADER, "true");
+            if (sendAsFlowFile) {
+                post.setHeader(GZIPPED_HEADER, "true");
+            } else {
+                post.setHeader(CONTENT_ENCODING_HEADER, 
CONTENT_ENCODING_GZIP_VALUE);
+            }
         }
 
         // Do the actual POST
@@ -841,57 +842,58 @@ public class PostHTTP extends AbstractProcessor {
         }
     }
 
-    private DestinationAccepts getDestinationAcceptance(final HttpClient 
client, final String uri, final ProcessorLog logger, final String 
transactionId) throws IOException {
+    private DestinationAccepts getDestinationAcceptance(final boolean 
sendAsFlowFile, final HttpClient client, final String uri,
+                                                        final ProcessorLog 
logger, final String transactionId) throws IOException {
         final HttpHead head = new HttpHead(uri);
-        head.addHeader(TRANSACTION_ID_HEADER, transactionId);
+        if (sendAsFlowFile) {
+            head.addHeader(TRANSACTION_ID_HEADER, transactionId);
+        }
         final HttpResponse response = client.execute(head);
 
+        // we assume that the destination can support FlowFile v1 always when 
the processor is also configured to send as a FlowFile
+        // otherwise, we do not bother to make any determinations concerning 
this compatibility
+        final boolean acceptsFlowFileV1 = sendAsFlowFile;
+        boolean acceptsFlowFileV2 = false;
+        boolean acceptsFlowFileV3 = false;
+        boolean acceptsGzip = false;
+        Integer protocolVersion = null;
+
         final int statusCode = response.getStatusLine().getStatusCode();
         if (statusCode == Status.METHOD_NOT_ALLOWED.getStatusCode()) {
-            // we assume that the destination can support FlowFile v1 always.
-            return new DestinationAccepts(false, false, true, false, null);
+            return new DestinationAccepts(acceptsFlowFileV3, 
acceptsFlowFileV2, acceptsFlowFileV1, false, null);
         } else if (statusCode == Status.OK.getStatusCode()) {
-            boolean acceptsFlowFileV3 = false;
-            boolean acceptsFlowFileV2 = false;
-            boolean acceptsFlowFileV1 = true;
-            boolean acceptsGzip = false;
-            Integer protocolVersion = null;
-
             Header[] headers = response.getHeaders(ACCEPT);
-            if (headers != null) {
-                for (final Header header : headers) {
-                    for (final String accepted : header.getValue().split(",")) 
{
-                        final String trimmed = accepted.trim();
-                        if (trimmed.equals(APPLICATION_FLOW_FILE_V3)) {
-                            acceptsFlowFileV3 = true;
-                        } else if (trimmed.equals(APPLICATION_FLOW_FILE_V2)) {
-                            acceptsFlowFileV2 = true;
-                        } else {
-                            // we assume that the destination accepts FlowFile 
V1 because legacy versions
-                            // of NiFi that accepted V1 did not use an Accept 
header to indicate it... or
-                            // any other header. So the bets thing we can do 
is just assume that V1 is
-                            // accepted, if we're going to send as FlowFile.
-                            acceptsFlowFileV1 = true;
+            // If configured to send as a flowfile, determine the capabilities 
of the endpoint
+            if (sendAsFlowFile) {
+                if (headers != null) {
+                    for (final Header header : headers) {
+                        for (final String accepted : 
header.getValue().split(",")) {
+                            final String trimmed = accepted.trim();
+                            if (trimmed.equals(APPLICATION_FLOW_FILE_V3)) {
+                                acceptsFlowFileV3 = true;
+                            } else if 
(trimmed.equals(APPLICATION_FLOW_FILE_V2)) {
+                                acceptsFlowFileV2 = true;
+                            }
                         }
                     }
                 }
-            }
 
-            final Header destinationVersion = 
response.getFirstHeader(PROTOCOL_VERSION_HEADER);
-            if (destinationVersion != null) {
-                try {
-                    protocolVersion = 
Integer.valueOf(destinationVersion.getValue());
-                } catch (final NumberFormatException e) {
-                    // nothing to do here really.... it's an invalid value, so 
treat the same as if not specified
+                final Header destinationVersion = 
response.getFirstHeader(PROTOCOL_VERSION_HEADER);
+                if (destinationVersion != null) {
+                    try {
+                        protocolVersion = 
Integer.valueOf(destinationVersion.getValue());
+                    } catch (final NumberFormatException e) {
+                        // nothing to do here really.... it's an invalid 
value, so treat the same as if not specified
+                    }
                 }
-            }
 
-            if (acceptsFlowFileV3) {
-                logger.debug("Connection to URI " + uri + " will be using 
Content Type " + APPLICATION_FLOW_FILE_V3 + " if sending data as FlowFile");
-            } else if (acceptsFlowFileV2) {
-                logger.debug("Connection to URI " + uri + " will be using 
Content Type " + APPLICATION_FLOW_FILE_V2 + " if sending data as FlowFile");
-            } else if (acceptsFlowFileV1) {
-                logger.debug("Connection to URI " + uri + " will be using 
Content Type " + APPLICATION_FLOW_FILE_V1 + " if sending data as FlowFile");
+                if (acceptsFlowFileV3) {
+                    logger.debug("Connection to URI " + uri + " will be using 
Content Type " + APPLICATION_FLOW_FILE_V3 + " if sending data as FlowFile");
+                } else if (acceptsFlowFileV2) {
+                    logger.debug("Connection to URI " + uri + " will be using 
Content Type " + APPLICATION_FLOW_FILE_V2 + " if sending data as FlowFile");
+                } else if (acceptsFlowFileV1) {
+                    logger.debug("Connection to URI " + uri + " will be using 
Content Type " + APPLICATION_FLOW_FILE_V1 + " if sending data as FlowFile");
+                }
             }
 
             headers = response.getHeaders(ACCEPT_ENCODING);

http://git-wip-us.apache.org/repos/asf/nifi/blob/9ebcc9e4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java
index 073ff52..b58e532 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java
@@ -71,6 +71,9 @@ public class CaptureServlet extends HttpServlet {
     protected void doHead(final HttpServletRequest request, final 
HttpServletResponse response) throws ServletException, IOException {
         response.setHeader("Accept", 
"application/flowfile-v3,application/flowfile-v2");
         response.setHeader("x-nifi-transfer-protocol-version", "1");
-        response.setHeader("Accept-Encoding", "gzip");
+        // Unless an acceptGzip parameter is explicitly set to false, respond 
that this server accepts gzip
+        if 
(!Boolean.toString(false).equalsIgnoreCase(request.getParameter("acceptGzip"))) 
{
+            response.setHeader("Accept-Encoding", "gzip");
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9ebcc9e4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java
index 274a9ed..67bd82c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java
@@ -24,6 +24,7 @@ import java.io.ByteArrayOutputStream;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.ssl.StandardSSLContextService;
@@ -299,4 +300,74 @@ public class TestPostHTTP {
         Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
         Assert.assertEquals(suppliedMimeType, 
lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
     }
+
+    @Test
+    public void testSendWithCompressionServerAcceptGzip() throws Exception {
+        setup(null);
+
+        final String suppliedMimeType = "text/plain";
+        runner.setProperty(PostHTTP.URL, server.getUrl());
+        runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType);
+        runner.setProperty(PostHTTP.COMPRESSION_LEVEL, "9");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+
+        runner.enqueue(StringUtils.repeat("This is the song that never ends. 
It goes on and on my friend.", 100).getBytes(), attrs);
+
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
+
+        Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
+        Assert.assertEquals(suppliedMimeType, 
lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
+        // Ensure that a 'Content-Encoding' header was set with a 'gzip' value
+        Assert.assertEquals(PostHTTP.CONTENT_ENCODING_GZIP_VALUE, 
lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER));
+    }
+
+    @Test
+    public void testSendWithoutCompressionServerAcceptGzip() throws Exception {
+        setup(null);
+
+        final String suppliedMimeType = "text/plain";
+        runner.setProperty(PostHTTP.URL, server.getUrl());
+        runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType);
+        runner.setProperty(PostHTTP.COMPRESSION_LEVEL, "0");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+
+        runner.enqueue(StringUtils.repeat("This is the song that never ends. 
It goes on and on my friend.", 100).getBytes(), attrs);
+
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
+
+        Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
+        Assert.assertEquals(suppliedMimeType, 
lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
+        // Ensure that the request was not sent with a 'Content-Encoding' 
header
+        
Assert.assertNull(lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER));
+    }
+
+    @Test
+    public void testSendWithCompressionServerNotAcceptGzip() throws Exception {
+        setup(null);
+
+        final String suppliedMimeType = "text/plain";
+        // Specify a property to the URL to have the CaptureServlet specify it 
doesn't accept gzip
+        runner.setProperty(PostHTTP.URL, server.getUrl()+"?acceptGzip=false");
+        runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType);
+        runner.setProperty(PostHTTP.COMPRESSION_LEVEL, "9");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+
+        runner.enqueue(StringUtils.repeat("This is the song that never ends. 
It goes on and on my friend.", 100).getBytes(), attrs);
+
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
+
+        Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
+        Assert.assertEquals(suppliedMimeType, 
lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
+        // Ensure that the request was not sent with a 'Content-Encoding' 
header
+        
Assert.assertNull(lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER));
+    }
 }

Reply via email to