This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d8ebfb25d0 NIFI-10161 Added Gzip Content-Encoding to InvokeHTTP and
ListenHTTP
d8ebfb25d0 is described below
commit d8ebfb25d09536a9201959c0e1d4e2454ca68c64
Author: exceptionfactory <[email protected]>
AuthorDate: Thu Jun 23 11:51:17 2022 -0500
NIFI-10161 Added Gzip Content-Encoding to InvokeHTTP and ListenHTTP
Signed-off-by: Pierre Villard <[email protected]>
This closes #6150.
---
.../nifi/processors/standard/InvokeHTTP.java | 51 ++++++++--
.../standard/http/ContentEncodingStrategy.java | 49 ++++++++++
.../standard/servlets/ListenHTTPServlet.java | 6 +-
.../nifi/processors/standard/InvokeHTTPTest.java | 34 ++++++-
.../nifi/processors/standard/TestListenHTTP.java | 106 ++++++++++++++-------
5 files changed, 199 insertions(+), 47 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
index 839d0f5188..49fffdc48e 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
@@ -73,6 +73,9 @@ import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okio.BufferedSink;
+import okio.GzipSink;
+import okio.Okio;
+import okio.Source;
import org.apache.commons.io.input.TeeInputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperties;
@@ -103,6 +106,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.http.ContentEncodingStrategy;
import org.apache.nifi.processors.standard.http.FlowFileNamingStrategy;
import org.apache.nifi.processors.standard.http.CookieStrategy;
import org.apache.nifi.processors.standard.util.ProxyAuthenticator;
@@ -186,6 +190,7 @@ public class InvokeHTTP extends AbstractProcessor {
private static final Pattern DYNAMIC_FORM_PARAMETER_NAME =
Pattern.compile("post:form:(?<formDataName>.*)$");
private static final String FORM_DATA_NAME_GROUP = "formDataName";
+ private static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
// properties
public static final PropertyDescriptor PROP_METHOD = new
PropertyDescriptor.Builder()
@@ -329,6 +334,15 @@ public class InvokeHTTP extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ public static final PropertyDescriptor PROP_CONTENT_ENCODING = new
PropertyDescriptor.Builder()
+ .name("Content-Encoding")
+ .displayName("Content-Encoding")
+ .description("HTTP Content-Encoding applied to request body during
transmission. The receiving server must support the selected encoding to avoid
request failures.")
+ .required(true)
+ .defaultValue(ContentEncodingStrategy.DISABLED.getValue())
+ .allowableValues(ContentEncodingStrategy.class)
+ .build();
+
public static final PropertyDescriptor PROP_CONTENT_TYPE = new
PropertyDescriptor.Builder()
.name("Content-Type")
.description("The Content-Type to specify for when content is
being transmitted through a PUT, POST or PATCH. "
@@ -564,6 +578,7 @@ public class InvokeHTTP extends AbstractProcessor {
PROP_DIGEST_AUTH,
PROP_OUTPUT_RESPONSE_REGARDLESS,
PROP_ADD_HEADERS_TO_REQUEST,
+ PROP_CONTENT_ENCODING,
PROP_CONTENT_TYPE,
PROP_SEND_BODY,
PROP_USE_CHUNKED_ENCODING,
@@ -1112,6 +1127,12 @@ public class InvokeHTTP extends AbstractProcessor {
}
}
+ final String contentEncoding =
context.getProperty(PROP_CONTENT_ENCODING).getValue();
+ final ContentEncodingStrategy contentEncodingStrategy =
ContentEncodingStrategy.valueOf(contentEncoding);
+ if (ContentEncodingStrategy.GZIP == contentEncodingStrategy) {
+ requestBuilder.addHeader(CONTENT_ENCODING_HEADER,
ContentEncodingStrategy.GZIP.getValue().toLowerCase());
+ }
+
// set the request method
String method =
trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile).getValue()).toUpperCase();
switch (method) {
@@ -1119,15 +1140,15 @@ public class InvokeHTTP extends AbstractProcessor {
requestBuilder.get();
break;
case POST_METHOD:
- RequestBody requestBody = getRequestBodyToSend(session,
context, requestFlowFile);
+ RequestBody requestBody = getRequestBodyToSend(session,
context, requestFlowFile, contentEncodingStrategy);
requestBuilder.post(requestBody);
break;
case PUT_METHOD:
- requestBody = getRequestBodyToSend(session, context,
requestFlowFile);
+ requestBody = getRequestBodyToSend(session, context,
requestFlowFile, contentEncodingStrategy);
requestBuilder.put(requestBody);
break;
case PATCH_METHOD:
- requestBody = getRequestBodyToSend(session, context,
requestFlowFile);
+ requestBody = getRequestBodyToSend(session, context,
requestFlowFile, contentEncodingStrategy);
requestBuilder.patch(requestBody);
break;
case HEAD_METHOD:
@@ -1149,8 +1170,9 @@ public class InvokeHTTP extends AbstractProcessor {
}
private RequestBody getRequestBodyToSend(final ProcessSession session,
final ProcessContext context,
- final FlowFile requestFlowFile) {
-
+ final FlowFile requestFlowFile,
+ final ContentEncodingStrategy
contentEncodingStrategy
+ ) {
boolean sendBody = context.getProperty(PROP_SEND_BODY).asBoolean();
String evalContentType = context.getProperty(PROP_CONTENT_TYPE)
@@ -1168,6 +1190,7 @@ public class InvokeHTTP extends AbstractProcessor {
}
}
+ final boolean contentLengthUnknown = useChunked ||
ContentEncodingStrategy.GZIP == contentEncodingStrategy;
RequestBody requestBody = new RequestBody() {
@Nullable
@Override
@@ -1176,13 +1199,25 @@ public class InvokeHTTP extends AbstractProcessor {
}
@Override
- public void writeTo(BufferedSink sink) {
- session.exportTo(requestFlowFile, sink.outputStream());
+ public void writeTo(final BufferedSink sink) throws IOException {
+ final BufferedSink outputSink = (ContentEncodingStrategy.GZIP
== contentEncodingStrategy)
+ ? Okio.buffer(new GzipSink(sink))
+ : sink;
+
+ session.read(requestFlowFile, inputStream -> {
+ final Source source = Okio.source(inputStream);
+ outputSink.writeAll(source);
+ });
+
+ // Close Output Sink for gzip to write trailing bytes
+ if (ContentEncodingStrategy.GZIP == contentEncodingStrategy) {
+ outputSink.close();
+ }
}
@Override
public long contentLength() {
- return useChunked ? -1 : requestFlowFile.getSize();
+ return contentLengthUnknown ? -1 : requestFlowFile.getSize();
}
};
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/http/ContentEncodingStrategy.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/http/ContentEncodingStrategy.java
new file mode 100644
index 0000000000..2fe770eacc
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/http/ContentEncodingStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.http;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * HTTP Content-Encoding configuration strategy
+ */
+public enum ContentEncodingStrategy implements DescribedValue {
+ DISABLED("Content encoding not applied during transmission"),
+
+ GZIP( "Gzip content encoding and HTTP Content-Encoding header applied
during transmission");
+
+ private final String description;
+
+ ContentEncodingStrategy(final String description) {
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return name();
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index d25879a9b2..1a2d3c6291 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -101,6 +101,7 @@ public class ListenHTTPServlet extends HttpServlet {
public static final String GZIPPED_HEADER = "flowfile-gzipped";
public static final String PROTOCOL_VERSION_HEADER =
"x-nifi-transfer-protocol-version";
public static final String PROTOCOL_VERSION = "3";
+ protected static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
private final AtomicLong filesReceived = new AtomicLong(0L);
private final AtomicBoolean spaceAvailable = new AtomicBoolean(true);
@@ -193,7 +194,10 @@ public class ListenHTTPServlet extends HttpServlet {
}
response.setHeader("Content-Type", MediaType.TEXT_PLAIN);
- final boolean contentGzipped =
Boolean.parseBoolean(request.getHeader(GZIPPED_HEADER));
+ final boolean flowFileGzipped =
Boolean.parseBoolean(request.getHeader(GZIPPED_HEADER));
+ final String contentEncoding =
request.getHeader(CONTENT_ENCODING_HEADER);
+ final boolean contentEncodingGzip =
ACCEPT_ENCODING_VALUE.equals(contentEncoding);
+ final boolean contentGzipped = flowFileGzipped ||
contentEncodingGzip;
final X509Certificate[] certs = (X509Certificate[])
request.getAttribute("javax.servlet.request.X509Certificate");
foundSubject = DEFAULT_FOUND_SUBJECT;
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/InvokeHTTPTest.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/InvokeHTTPTest.java
index ca83fdae11..ce0c2603db 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/InvokeHTTPTest.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/InvokeHTTPTest.java
@@ -19,10 +19,13 @@ package org.apache.nifi.processors.standard;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
+import okio.Buffer;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.standard.http.ContentEncodingStrategy;
import org.apache.nifi.processors.standard.http.FlowFileNamingStrategy;
import org.apache.nifi.processors.standard.http.CookieStrategy;
import org.apache.nifi.reporting.InitializationException;
@@ -44,6 +47,7 @@ import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
@@ -55,6 +59,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Stream;
+import java.util.zip.GZIPInputStream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@@ -101,6 +106,8 @@ public class InvokeHTTPTest {
private static final String CONTENT_LENGTH_HEADER = "Content-Length";
+ private static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
+
private static final String CONTENT_TYPE_HEADER = "Content-Type";
private static final String LOCATION_HEADER = "Location";
@@ -521,7 +528,7 @@ public class InvokeHTTPTest {
final String authorization = request.getHeader(AUTHORIZATION_HEADER);
assertNotNull(authorization, "Authorization Header not found");
- final Pattern basicAuthPattern = Pattern.compile("^Basic [^\\s]+$");
+ final Pattern basicAuthPattern = Pattern.compile("^Basic \\S+$");
assertTrue(basicAuthPattern.matcher(authorization).matches(), "Basic
Authentication not matched");
}
@@ -723,6 +730,31 @@ public class InvokeHTTPTest {
assertRequestMethodSuccess(POST_METHOD);
}
+ @Test
+ public void testRunPostHttp200SuccessContentEncodingGzip() throws
InterruptedException, IOException {
+ runner.setProperty(InvokeHTTP.PROP_METHOD, POST_METHOD);
+ runner.setProperty(InvokeHTTP.PROP_CONTENT_ENCODING,
ContentEncodingStrategy.GZIP.getValue());
+ runner.setProperty(InvokeHTTP.PROP_SEND_BODY, Boolean.TRUE.toString());
+
+ enqueueResponseCodeAndRun(HTTP_OK);
+
+ assertResponseSuccessRelationships();
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
+
+ final RecordedRequest request = takeRequestCompleted();
+ final String contentLength = request.getHeader(CONTENT_LENGTH_HEADER);
+ assertNull(contentLength, "Content-Length Request Header found");
+
+ final String contentEncoding =
request.getHeader(CONTENT_ENCODING_HEADER);
+ assertEquals(ContentEncodingStrategy.GZIP.getValue().toLowerCase(),
contentEncoding);
+
+ final Buffer body = request.getBody();
+ try (final GZIPInputStream gzipInputStream = new
GZIPInputStream(body.inputStream())) {
+ final String decompressed = IOUtils.toString(gzipInputStream,
StandardCharsets.UTF_8);
+ assertEquals(FLOW_FILE_CONTENT, decompressed);
+ }
+ }
+
@Test
public void testRunPostHttp200SuccessChunkedEncoding() throws
InterruptedException {
runner.setProperty(InvokeHTTP.PROP_METHOD, POST_METHOD);
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
index 489a4d3382..b5616d470d 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.standard;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -45,8 +46,12 @@ import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
+import okio.BufferedSink;
+import okio.GzipSink;
+import okio.Okio;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processors.standard.http.ContentEncodingStrategy;
import org.apache.nifi.processors.standard.http.HttpProtocolStrategy;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
@@ -54,6 +59,7 @@ import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.StandardTlsConfiguration;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.security.util.TlsConfiguration;
+import org.apache.nifi.security.util.TlsPlatform;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType;
@@ -65,17 +71,17 @@ import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.thread.ThreadPool;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIf;
import org.mockito.Mockito;
import static
org.apache.nifi.processors.standard.ListenHTTP.RELATIONSHIP_SUCCESS;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestListenHTTP {
@@ -114,7 +120,11 @@ public class TestListenHTTP {
private int availablePort;
- @BeforeClass
+ static boolean isTls13Supported() {
+ return TLS_1_3.equals(TlsPlatform.getLatestProtocol());
+ }
+
+ @BeforeAll
public static void setUpSuite() throws GeneralSecurityException {
// generate new keystore and truststore
final TlsConfiguration tlsConfiguration = new
TemporaryKeyStoreBuilder().build();
@@ -172,7 +182,7 @@ public class TestListenHTTP {
);
}
- @Before
+ @BeforeEach
public void setup() throws IOException {
proc = new ListenHTTP();
runner = TestRunners.newTestRunner(proc);
@@ -181,7 +191,7 @@ public class TestListenHTTP {
runner.setVariable(BASEPATH_VARIABLE, HTTP_BASE_PATH);
}
- @After
+ @AfterEach
public void shutdownServer() {
proc.shutdownHttpServer();
}
@@ -360,13 +370,14 @@ public class TestListenHTTP {
startSecureServer();
final SSLSocketFactory sslSocketFactory =
trustStoreSslContext.getSocketFactory();
- final SSLSocket sslSocket = (SSLSocket)
sslSocketFactory.createSocket(LOCALHOST, availablePort);
- final String currentProtocol =
serverNoTruststoreConfiguration.getProtocol();
- sslSocket.setEnabledProtocols(new String[]{currentProtocol});
+ try (final SSLSocket sslSocket = (SSLSocket)
sslSocketFactory.createSocket(LOCALHOST, availablePort)) {
+ final String currentProtocol =
serverNoTruststoreConfiguration.getProtocol();
+ sslSocket.setEnabledProtocols(new String[]{currentProtocol});
- sslSocket.startHandshake();
- final SSLSession sslSession = sslSocket.getSession();
- assertEquals("SSL Session Protocol not matched", currentProtocol,
sslSession.getProtocol());
+ sslSocket.startHandshake();
+ final SSLSession sslSession = sslSocket.getSession();
+ assertEquals(currentProtocol, sslSession.getProtocol());
+ }
}
@Test
@@ -384,12 +395,9 @@ public class TestListenHTTP {
assertEquals(HttpServletResponse.SC_NO_CONTENT, responseCode);
}
+ @EnabledIf(value = "isTls13Supported", disabledReason = "TLSv1.3 is not
supported")
@Test
public void testSecureServerRejectsUnsupportedTlsProtocolVersion() throws
Exception {
- final String currentProtocol =
TlsConfiguration.getHighestCurrentSupportedTlsProtocolVersion();
- final String protocolMessage = String.format("TLS Protocol required
[%s] found [%s]", TLS_1_3, currentProtocol);
- Assume.assumeTrue(protocolMessage, TLS_1_3.equals(currentProtocol));
-
configureProcessorSslContextService(ListenHTTP.ClientAuthentication.AUTO,
serverTls_1_3_Configuration);
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
@@ -399,68 +407,58 @@ public class TestListenHTTP {
startWebServer();
final SSLSocketFactory sslSocketFactory =
trustStoreSslContext.getSocketFactory();
- final SSLSocket sslSocket = (SSLSocket)
sslSocketFactory.createSocket(LOCALHOST, availablePort);
- sslSocket.setEnabledProtocols(new String[]{TLS_1_2});
+ try (final SSLSocket sslSocket = (SSLSocket)
sslSocketFactory.createSocket(LOCALHOST, availablePort)) {
+ sslSocket.setEnabledProtocols(new String[]{TLS_1_2});
- assertThrows(SSLHandshakeException.class, sslSocket::startHandshake);
+ assertThrows(SSLHandshakeException.class,
sslSocket::startHandshake);
+ }
}
@Test
public void testMaxThreadPoolSizeTooLow() {
- // GIVEN, WHEN
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "7");
- // THEN
runner.assertNotValid();
}
@Test
public void testMaxThreadPoolSizeTooHigh() {
- // GIVEN, WHEN
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "1001");
- // THEN
runner.assertNotValid();
}
@Test
public void testMaxThreadPoolSizeOkLowerBound() {
- // GIVEN, WHEN
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "8");
- // THEN
runner.assertValid();
}
@Test
public void testMaxThreadPoolSizeOkUpperBound() {
- // GIVEN, WHEN
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "1000");
- // THEN
runner.assertValid();
}
@Test
public void
testMaxThreadPoolSizeSpecifiedInThePropertyIsSetInTheServerInstance() {
- // GIVEN
int maxThreadPoolSize = 201;
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE,
Integer.toString(maxThreadPoolSize));
- // WHEN
startWebServer();
- // THEN
Server server = proc.getServer();
ThreadPool threadPool = server.getThreadPool();
ThreadPool.SizedThreadPool sizedThreadPool =
(ThreadPool.SizedThreadPool) threadPool;
@@ -518,6 +516,40 @@ public class TestListenHTTP {
runner.assertTransferCount(RELATIONSHIP_SUCCESS, 0);
}
+ @Test
+ public void testPostContentEncodingGzipAccepted() throws IOException {
+ runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+ runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+ runner.setProperty(ListenHTTP.RETURN_CODE,
Integer.toString(HttpServletResponse.SC_NO_CONTENT));
+
+ startWebServer();
+
+ final OkHttpClient okHttpClient = getOkHttpClient(false, false);
+ final Request.Builder requestBuilder = new Request.Builder();
+ final String url = buildUrl(false);
+ requestBuilder.url(url);
+
+ final String message = String.class.getSimpleName();
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ final BufferedSink gzipSink = Okio.buffer(new
GzipSink(Okio.sink(outputStream)));
+ gzipSink.write(message.getBytes(StandardCharsets.UTF_8));
+ gzipSink.close();
+ final byte[] compressed = outputStream.toByteArray();
+
+ final RequestBody requestBody = RequestBody.create(compressed,
APPLICATION_OCTET_STREAM);
+ final Request request = requestBuilder.post(requestBody)
+ .addHeader("Content-Encoding",
ContentEncodingStrategy.GZIP.getValue().toLowerCase())
+ .build();
+
+ try (final Response response =
okHttpClient.newCall(request).execute()) {
+ assertTrue(response.isSuccessful());
+
+ runner.assertTransferCount(RELATIONSHIP_SUCCESS, 1);
+ final MockFlowFile flowFile =
runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).iterator().next();
+ flowFile.assertContentEquals(message);
+ }
+ }
+
private MockRecordParser setupRecordReaderTest() throws
InitializationException {
final MockRecordParser parser = new MockRecordParser();
final MockRecordWriter writer = new MockRecordWriter();
@@ -636,7 +668,7 @@ public class TestListenHTTP {
for (final String message : messages) {
final int statusCode = postMessage(message, secure,
clientAuthRequired);
- assertEquals("HTTP Status Code not matched", expectedStatusCode,
statusCode);
+ assertEquals(expectedStatusCode, statusCode, "HTTP Status Code not
matched");
}
}
@@ -690,7 +722,7 @@ public class TestListenHTTP {
try (Response response = client.newCall(request).execute()) {
Files.deleteIfExists(Paths.get(String.valueOf(file1)));
Files.deleteIfExists(Paths.get(String.valueOf(file2)));
- Assert.assertTrue(String.format("Unexpected code: %s, body: %s",
response.code(), response.body()), response.isSuccessful());
+ assertTrue(response.isSuccessful(), String.format("Unexpected
code: %s, body: %s", response.code(), response.body()));
}
runner.assertAllFlowFilesTransferred(ListenHTTP.RELATIONSHIP_SUCCESS,
5);