Repository: nifi Updated Branches: refs/heads/master 05dabe034 -> 0db2dc9fc
NIFI-1405 Adding option to invokeHttp to control using chunked encoding for http PUT and POST calls Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0db2dc9f Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0db2dc9f Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0db2dc9f Branch: refs/heads/master Commit: 0db2dc9fc3466051869bbf7ad8b2961405554d80 Parents: 05dabe0 Author: jpercivall <[email protected]> Authored: Mon Jan 18 17:25:46 2016 -0500 Committer: Aldrin Piri <[email protected]> Committed: Sun Jan 24 15:52:05 2016 -0500 ---------------------------------------------------------------------- .../nifi/processors/standard/InvokeHTTP.java | 20 +++++++- .../standard/util/TestInvokeHttpCommon.java | 53 ++++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0db2dc9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java index 44f76e5..7576be3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java @@ -310,6 +310,15 @@ public final class InvokeHTTP extends AbstractProcessor { .allowableValues("true", "false") .build(); + public static final PropertyDescriptor PROP_USE_CHUNKED_ENCODING = new PropertyDescriptor.Builder() + .name("Use Chunked Encoding") + .description("When POST'ing or PUT'ing content set this property to true in order to not pass the 'Content-length' header and instead send 'Transfer-Encoding' with " + + "a value of 'chunked'. This will enable the data transfer mechanism which was introduced in HTTP 1.1 to pass data of unknown lengths in chunks.") + .required(true) + .defaultValue("false") + .allowableValues("true", "false") + .build(); + public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( PROP_METHOD, PROP_URL, @@ -329,7 +338,8 @@ public final class InvokeHTTP extends AbstractProcessor { PROP_OUTPUT_RESPONSE_REGARDLESS, PROP_TRUSTED_HOSTNAME, PROP_ADD_HEADERS_TO_REQUEST, - PROP_CONTENT_TYPE)); + PROP_CONTENT_TYPE, + PROP_USE_CHUNKED_ENCODING)); // relationships public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder() @@ -398,6 +408,7 @@ public final class InvokeHTTP extends AbstractProcessor { } private volatile Pattern regexAttributesToSend = null; + private volatile boolean useChunked = false; @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { @@ -495,6 +506,8 @@ public final class InvokeHTTP extends AbstractProcessor { okHttpClient.setAuthenticator(new CachingAuthenticatorDecorator(authenticator, authCache)); } + useChunked = context.getProperty(PROP_USE_CHUNKED_ENCODING).asBoolean(); + okHttpClientAtomicReference.set(okHttpClient); } @@ -751,6 +764,11 @@ public final class InvokeHTTP extends AbstractProcessor { public void writeTo(BufferedSink sink) throws IOException { session.exportTo(requestFlowFile, sink.outputStream()); } + + @Override + public long contentLength(){ + return useChunked ? -1 : requestFlowFile.getSize(); + } }; } http://git-wip-us.apache.org/repos/asf/nifi/blob/0db2dc9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java index 3464f14..a78fb97 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java @@ -1229,6 +1229,49 @@ public abstract class TestInvokeHttpCommon { Assert.assertEquals(expected1, actual1); } + @Test + public void testChunkedRequest() throws Exception { + MutativeMethodHandler mutativeMethodHandler = new MutativeMethodHandler(MutativeMethod.POST); + mutativeMethodHandler.setHeaderToTrack("Transfer-encoding"); + addHandler(mutativeMethodHandler); + + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_METHOD,"POST"); + runner.setProperty(InvokeHTTP.PROP_USE_CHUNKED_ENCODING,"true"); + + createFlowFiles(runner); + + runner.run(); + + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + + //expected in request status.code and status.message + //original flow file (+attributes) + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); + final String expected = "Hello"; + Assert.assertEquals(expected, actual); + bundle.assertAttributeEquals("Foo", "Bar"); + + //expected in response + //status code, status message, all headers from server response --> ff attributes + //server response message body into payload of ff + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals("Foo", "Bar"); + + String header = mutativeMethodHandler.getTrackedHeaderValue(); + Assert.assertEquals("chunked",header); + } + + public static void createFlowFiles(final TestRunner testRunner) throws UnsupportedEncodingException { final Map<String, String> attributes = new HashMap<>(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text"); @@ -1260,6 +1303,8 @@ public abstract class TestInvokeHttpCommon { public static class MutativeMethodHandler extends AbstractHandler { private final MutativeMethod method; private final String expectedContentType; + private String headerToTrack; + private String trackedHeaderValue; public MutativeMethodHandler(final MutativeMethod method) { this(method, "application/plain-text"); @@ -1269,6 +1314,13 @@ public abstract class TestInvokeHttpCommon { this.method = method; this.expectedContentType = expectedContentType; } + private void setHeaderToTrack(String headerToTrack){ + this.headerToTrack = headerToTrack; + } + + public String getTrackedHeaderValue(){ + return trackedHeaderValue; + } @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) @@ -1279,6 +1331,7 @@ public abstract class TestInvokeHttpCommon { if(method.name().equals(request.getMethod())) { assertEquals(this.expectedContentType,request.getHeader("Content-Type")); final String body = request.getReader().readLine(); + this.trackedHeaderValue = baseRequest.getHttpFields().get(headerToTrack); assertEquals("Hello", body); } else { response.setStatus(404);
