Repository: nifi
Updated Branches:
  refs/heads/master 05dabe034 -> 0db2dc9fc


NIFI-1405 Adding option to invokeHttp to control using chunked encoding for 
http PUT and POST calls

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0db2dc9f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0db2dc9f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0db2dc9f

Branch: refs/heads/master
Commit: 0db2dc9fc3466051869bbf7ad8b2961405554d80
Parents: 05dabe0
Author: jpercivall <[email protected]>
Authored: Mon Jan 18 17:25:46 2016 -0500
Committer: Aldrin Piri <[email protected]>
Committed: Sun Jan 24 15:52:05 2016 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/InvokeHTTP.java    | 20 +++++++-
 .../standard/util/TestInvokeHttpCommon.java     | 53 ++++++++++++++++++++
 2 files changed, 72 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0db2dc9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
index 44f76e5..7576be3 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
@@ -310,6 +310,15 @@ public final class InvokeHTTP extends AbstractProcessor {
             .allowableValues("true", "false")
             .build();
 
+    public static final PropertyDescriptor PROP_USE_CHUNKED_ENCODING = new 
PropertyDescriptor.Builder()
+            .name("Use Chunked Encoding")
+            .description("When POST'ing or PUT'ing content set this property 
to true in order to not pass the 'Content-length' header and instead send 
'Transfer-Encoding' with "
+                    + "a value of 'chunked'. This will enable the data 
transfer mechanism which was introduced in HTTP 1.1 to pass data of unknown 
lengths in chunks.")
+            .required(true)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+
     public static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
             PROP_METHOD,
             PROP_URL,
@@ -329,7 +338,8 @@ public final class InvokeHTTP extends AbstractProcessor {
             PROP_OUTPUT_RESPONSE_REGARDLESS,
             PROP_TRUSTED_HOSTNAME,
             PROP_ADD_HEADERS_TO_REQUEST,
-            PROP_CONTENT_TYPE));
+            PROP_CONTENT_TYPE,
+            PROP_USE_CHUNKED_ENCODING));
 
     // relationships
     public static final Relationship REL_SUCCESS_REQ = new 
Relationship.Builder()
@@ -398,6 +408,7 @@ public final class InvokeHTTP extends AbstractProcessor {
     }
 
     private volatile Pattern regexAttributesToSend = null;
+    private volatile boolean useChunked = false;
 
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
@@ -495,6 +506,8 @@ public final class InvokeHTTP extends AbstractProcessor {
             okHttpClient.setAuthenticator(new 
CachingAuthenticatorDecorator(authenticator, authCache));
         }
 
+        useChunked = 
context.getProperty(PROP_USE_CHUNKED_ENCODING).asBoolean();
+
         okHttpClientAtomicReference.set(okHttpClient);
     }
 
@@ -751,6 +764,11 @@ public final class InvokeHTTP extends AbstractProcessor {
             public void writeTo(BufferedSink sink) throws IOException {
                 session.exportTo(requestFlowFile, sink.outputStream());
             }
+
+            @Override
+            public long contentLength(){
+                return useChunked ? -1 : requestFlowFile.getSize();
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0db2dc9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
index 3464f14..a78fb97 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
@@ -1229,6 +1229,49 @@ public abstract class TestInvokeHttpCommon {
         Assert.assertEquals(expected1, actual1);
     }
 
+    @Test
+    public void testChunkedRequest() throws Exception {
+        MutativeMethodHandler mutativeMethodHandler = new 
MutativeMethodHandler(MutativeMethod.POST);
+        mutativeMethodHandler.setHeaderToTrack("Transfer-encoding");
+        addHandler(mutativeMethodHandler);
+
+        runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+        runner.setProperty(InvokeHTTP.PROP_METHOD,"POST");
+        runner.setProperty(InvokeHTTP.PROP_USE_CHUNKED_ENCODING,"true");
+
+        createFlowFiles(runner);
+
+        runner.run();
+
+        runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+        runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+        runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+        runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+        runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+
+        //expected in request status.code and status.message
+        //original flow file (+attributes)
+        final MockFlowFile bundle = 
runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
+        bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+        bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+        final String actual = new String(bundle.toByteArray(), 
StandardCharsets.UTF_8);
+        final String expected = "Hello";
+        Assert.assertEquals(expected, actual);
+        bundle.assertAttributeEquals("Foo", "Bar");
+
+        //expected in response
+        //status code, status message, all headers from server response --> ff 
attributes
+        //server response message body into payload of ff
+        final MockFlowFile bundle1 = 
runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
+        bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+        bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+        bundle1.assertAttributeEquals("Foo", "Bar");
+
+        String header = mutativeMethodHandler.getTrackedHeaderValue();
+        Assert.assertEquals("chunked",header);
+    }
+
+
     public static void createFlowFiles(final TestRunner testRunner) throws 
UnsupportedEncodingException {
         final Map<String, String> attributes = new HashMap<>();
         attributes.put(CoreAttributes.MIME_TYPE.key(), 
"application/plain-text");
@@ -1260,6 +1303,8 @@ public abstract class TestInvokeHttpCommon {
     public static class MutativeMethodHandler extends AbstractHandler {
         private final MutativeMethod method;
         private final String expectedContentType;
+        private String headerToTrack;
+        private String trackedHeaderValue;
 
         public MutativeMethodHandler(final MutativeMethod method) {
             this(method, "application/plain-text");
@@ -1269,6 +1314,13 @@ public abstract class TestInvokeHttpCommon {
             this.method = method;
             this.expectedContentType = expectedContentType;
         }
+        private void setHeaderToTrack(String headerToTrack){
+            this.headerToTrack = headerToTrack;
+        }
+
+        public String getTrackedHeaderValue(){
+            return trackedHeaderValue;
+        }
 
         @Override
         public void handle(String target, Request baseRequest, 
HttpServletRequest request, HttpServletResponse response)
@@ -1279,6 +1331,7 @@ public abstract class TestInvokeHttpCommon {
             if(method.name().equals(request.getMethod())) {
                 
assertEquals(this.expectedContentType,request.getHeader("Content-Type"));
                 final String body = request.getReader().readLine();
+                this.trackedHeaderValue = 
baseRequest.getHttpFields().get(headerToTrack);
                 assertEquals("Hello", body);
             } else {
                 response.setStatus(404);

Reply via email to