This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d8ebfb25d0 NIFI-10161 Added Gzip Content-Encoding to InvokeHTTP and 
ListenHTTP
d8ebfb25d0 is described below

commit d8ebfb25d09536a9201959c0e1d4e2454ca68c64
Author: exceptionfactory <[email protected]>
AuthorDate: Thu Jun 23 11:51:17 2022 -0500

    NIFI-10161 Added Gzip Content-Encoding to InvokeHTTP and ListenHTTP
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #6150.
---
 .../nifi/processors/standard/InvokeHTTP.java       |  51 ++++++++--
 .../standard/http/ContentEncodingStrategy.java     |  49 ++++++++++
 .../standard/servlets/ListenHTTPServlet.java       |   6 +-
 .../nifi/processors/standard/InvokeHTTPTest.java   |  34 ++++++-
 .../nifi/processors/standard/TestListenHTTP.java   | 106 ++++++++++++++-------
 5 files changed, 199 insertions(+), 47 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
index 839d0f5188..49fffdc48e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
@@ -73,6 +73,9 @@ import okhttp3.RequestBody;
 import okhttp3.Response;
 import okhttp3.ResponseBody;
 import okio.BufferedSink;
+import okio.GzipSink;
+import okio.Okio;
+import okio.Source;
 import org.apache.commons.io.input.TeeInputStream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.DynamicProperties;
@@ -103,6 +106,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.http.ContentEncodingStrategy;
 import org.apache.nifi.processors.standard.http.FlowFileNamingStrategy;
 import org.apache.nifi.processors.standard.http.CookieStrategy;
 import org.apache.nifi.processors.standard.util.ProxyAuthenticator;
@@ -186,6 +190,7 @@ public class InvokeHTTP extends AbstractProcessor {
 
     private static final Pattern DYNAMIC_FORM_PARAMETER_NAME = 
Pattern.compile("post:form:(?<formDataName>.*)$");
     private static final String FORM_DATA_NAME_GROUP = "formDataName";
+    private static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
 
     // properties
     public static final PropertyDescriptor PROP_METHOD = new 
PropertyDescriptor.Builder()
@@ -329,6 +334,15 @@ public class InvokeHTTP extends AbstractProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor PROP_CONTENT_ENCODING = new 
PropertyDescriptor.Builder()
+            .name("Content-Encoding")
+            .displayName("Content-Encoding")
+            .description("HTTP Content-Encoding applied to request body during 
transmission. The receiving server must support the selected encoding to avoid 
request failures.")
+            .required(true)
+            .defaultValue(ContentEncodingStrategy.DISABLED.getValue())
+            .allowableValues(ContentEncodingStrategy.class)
+            .build();
+
     public static final PropertyDescriptor PROP_CONTENT_TYPE = new 
PropertyDescriptor.Builder()
             .name("Content-Type")
             .description("The Content-Type to specify for when content is 
being transmitted through a PUT, POST or PATCH. "
@@ -564,6 +578,7 @@ public class InvokeHTTP extends AbstractProcessor {
             PROP_DIGEST_AUTH,
             PROP_OUTPUT_RESPONSE_REGARDLESS,
             PROP_ADD_HEADERS_TO_REQUEST,
+            PROP_CONTENT_ENCODING,
             PROP_CONTENT_TYPE,
             PROP_SEND_BODY,
             PROP_USE_CHUNKED_ENCODING,
@@ -1112,6 +1127,12 @@ public class InvokeHTTP extends AbstractProcessor {
             }
         }
 
+        final String contentEncoding = 
context.getProperty(PROP_CONTENT_ENCODING).getValue();
+        final ContentEncodingStrategy contentEncodingStrategy = 
ContentEncodingStrategy.valueOf(contentEncoding);
+        if (ContentEncodingStrategy.GZIP == contentEncodingStrategy) {
+            requestBuilder.addHeader(CONTENT_ENCODING_HEADER, 
ContentEncodingStrategy.GZIP.getValue().toLowerCase());
+        }
+
         // set the request method
         String method = 
trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile).getValue()).toUpperCase();
         switch (method) {
@@ -1119,15 +1140,15 @@ public class InvokeHTTP extends AbstractProcessor {
                 requestBuilder.get();
                 break;
             case POST_METHOD:
-                RequestBody requestBody = getRequestBodyToSend(session, 
context, requestFlowFile);
+                RequestBody requestBody = getRequestBodyToSend(session, 
context, requestFlowFile, contentEncodingStrategy);
                 requestBuilder.post(requestBody);
                 break;
             case PUT_METHOD:
-                requestBody = getRequestBodyToSend(session, context, 
requestFlowFile);
+                requestBody = getRequestBodyToSend(session, context, 
requestFlowFile, contentEncodingStrategy);
                 requestBuilder.put(requestBody);
                 break;
             case PATCH_METHOD:
-                requestBody = getRequestBodyToSend(session, context, 
requestFlowFile);
+                requestBody = getRequestBodyToSend(session, context, 
requestFlowFile, contentEncodingStrategy);
                 requestBuilder.patch(requestBody);
                 break;
             case HEAD_METHOD:
@@ -1149,8 +1170,9 @@ public class InvokeHTTP extends AbstractProcessor {
     }
 
     private RequestBody getRequestBodyToSend(final ProcessSession session, 
final ProcessContext context,
-                                             final FlowFile requestFlowFile) {
-
+                                             final FlowFile requestFlowFile,
+                                             final ContentEncodingStrategy 
contentEncodingStrategy
+    ) {
         boolean sendBody = context.getProperty(PROP_SEND_BODY).asBoolean();
 
         String evalContentType = context.getProperty(PROP_CONTENT_TYPE)
@@ -1168,6 +1190,7 @@ public class InvokeHTTP extends AbstractProcessor {
             }
         }
 
+        final boolean contentLengthUnknown = useChunked || 
ContentEncodingStrategy.GZIP == contentEncodingStrategy;
         RequestBody requestBody = new RequestBody() {
             @Nullable
             @Override
@@ -1176,13 +1199,25 @@ public class InvokeHTTP extends AbstractProcessor {
             }
 
             @Override
-            public void writeTo(BufferedSink sink) {
-                session.exportTo(requestFlowFile, sink.outputStream());
+            public void writeTo(final BufferedSink sink) throws IOException {
+                final BufferedSink outputSink = (ContentEncodingStrategy.GZIP 
== contentEncodingStrategy)
+                        ? Okio.buffer(new GzipSink(sink))
+                        : sink;
+
+                session.read(requestFlowFile, inputStream -> {
+                    final Source source = Okio.source(inputStream);
+                    outputSink.writeAll(source);
+                });
+
+                // Close Output Sink for gzip to write trailing bytes
+                if (ContentEncodingStrategy.GZIP == contentEncodingStrategy) {
+                    outputSink.close();
+                }
             }
 
             @Override
             public long contentLength() {
-                return useChunked ? -1 : requestFlowFile.getSize();
+                return contentLengthUnknown ? -1 : requestFlowFile.getSize();
             }
         };
 
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/http/ContentEncodingStrategy.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/http/ContentEncodingStrategy.java
new file mode 100644
index 0000000000..2fe770eacc
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/http/ContentEncodingStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.http;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * HTTP Content-Encoding configuration strategy
+ */
+public enum ContentEncodingStrategy implements DescribedValue {
+    DISABLED("Content encoding not applied during transmission"),
+
+    GZIP( "Gzip content encoding and HTTP Content-Encoding header applied 
during transmission");
+
+    private final String description;
+
+    ContentEncodingStrategy(final String description) {
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return name();
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index d25879a9b2..1a2d3c6291 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -101,6 +101,7 @@ public class ListenHTTPServlet extends HttpServlet {
     public static final String GZIPPED_HEADER = "flowfile-gzipped";
     public static final String PROTOCOL_VERSION_HEADER = 
"x-nifi-transfer-protocol-version";
     public static final String PROTOCOL_VERSION = "3";
+    protected static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
 
     private final AtomicLong filesReceived = new AtomicLong(0L);
     private final AtomicBoolean spaceAvailable = new AtomicBoolean(true);
@@ -193,7 +194,10 @@ public class ListenHTTPServlet extends HttpServlet {
             }
             response.setHeader("Content-Type", MediaType.TEXT_PLAIN);
 
-            final boolean contentGzipped = 
Boolean.parseBoolean(request.getHeader(GZIPPED_HEADER));
+            final boolean flowFileGzipped = 
Boolean.parseBoolean(request.getHeader(GZIPPED_HEADER));
+            final String contentEncoding = 
request.getHeader(CONTENT_ENCODING_HEADER);
+            final boolean contentEncodingGzip = 
ACCEPT_ENCODING_VALUE.equals(contentEncoding);
+            final boolean contentGzipped = flowFileGzipped || 
contentEncodingGzip;
 
             final X509Certificate[] certs = (X509Certificate[]) 
request.getAttribute("javax.servlet.request.X509Certificate");
             foundSubject = DEFAULT_FOUND_SUBJECT;
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/InvokeHTTPTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/InvokeHTTPTest.java
index ca83fdae11..ce0c2603db 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/InvokeHTTPTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/InvokeHTTPTest.java
@@ -19,10 +19,13 @@ package org.apache.nifi.processors.standard;
 import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
+import okio.Buffer;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.standard.http.ContentEncodingStrategy;
 import org.apache.nifi.processors.standard.http.FlowFileNamingStrategy;
 import org.apache.nifi.processors.standard.http.CookieStrategy;
 import org.apache.nifi.reporting.InitializationException;
@@ -44,6 +47,7 @@ import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.security.GeneralSecurityException;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
@@ -55,6 +59,7 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.Stream;
+import java.util.zip.GZIPInputStream;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
@@ -101,6 +106,8 @@ public class InvokeHTTPTest {
 
     private static final String CONTENT_LENGTH_HEADER = "Content-Length";
 
+    private static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
+
     private static final String CONTENT_TYPE_HEADER = "Content-Type";
 
     private static final String LOCATION_HEADER = "Location";
@@ -521,7 +528,7 @@ public class InvokeHTTPTest {
         final String authorization = request.getHeader(AUTHORIZATION_HEADER);
         assertNotNull(authorization, "Authorization Header not found");
 
-        final Pattern basicAuthPattern = Pattern.compile("^Basic [^\\s]+$");
+        final Pattern basicAuthPattern = Pattern.compile("^Basic \\S+$");
         assertTrue(basicAuthPattern.matcher(authorization).matches(), "Basic 
Authentication not matched");
     }
 
@@ -723,6 +730,31 @@ public class InvokeHTTPTest {
         assertRequestMethodSuccess(POST_METHOD);
     }
 
+    @Test
+    public void testRunPostHttp200SuccessContentEncodingGzip() throws 
InterruptedException, IOException {
+        runner.setProperty(InvokeHTTP.PROP_METHOD, POST_METHOD);
+        runner.setProperty(InvokeHTTP.PROP_CONTENT_ENCODING, 
ContentEncodingStrategy.GZIP.getValue());
+        runner.setProperty(InvokeHTTP.PROP_SEND_BODY, Boolean.TRUE.toString());
+
+        enqueueResponseCodeAndRun(HTTP_OK);
+
+        assertResponseSuccessRelationships();
+        assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
+
+        final RecordedRequest request = takeRequestCompleted();
+        final String contentLength = request.getHeader(CONTENT_LENGTH_HEADER);
+        assertNull(contentLength, "Content-Length Request Header found");
+
+        final String contentEncoding = 
request.getHeader(CONTENT_ENCODING_HEADER);
+        assertEquals(ContentEncodingStrategy.GZIP.getValue().toLowerCase(), 
contentEncoding);
+
+        final Buffer body = request.getBody();
+        try (final GZIPInputStream gzipInputStream = new 
GZIPInputStream(body.inputStream())) {
+            final String decompressed = IOUtils.toString(gzipInputStream, 
StandardCharsets.UTF_8);
+            assertEquals(FLOW_FILE_CONTENT, decompressed);
+        }
+    }
+
     @Test
     public void testRunPostHttp200SuccessChunkedEncoding() throws 
InterruptedException {
         runner.setProperty(InvokeHTTP.PROP_METHOD, POST_METHOD);
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
index 489a4d3382..b5616d470d 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.standard;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -45,8 +46,12 @@ import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.RequestBody;
 import okhttp3.Response;
+import okio.BufferedSink;
+import okio.GzipSink;
+import okio.Okio;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processors.standard.http.ContentEncodingStrategy;
 import org.apache.nifi.processors.standard.http.HttpProtocolStrategy;
 import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.reporting.InitializationException;
@@ -54,6 +59,7 @@ import org.apache.nifi.security.util.SslContextFactory;
 import org.apache.nifi.security.util.StandardTlsConfiguration;
 import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
 import org.apache.nifi.security.util.TlsConfiguration;
+import org.apache.nifi.security.util.TlsPlatform;
 import org.apache.nifi.serialization.record.MockRecordParser;
 import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.serialization.record.RecordFieldType;
@@ -65,17 +71,17 @@ import org.apache.nifi.util.TestRunners;
 import org.apache.nifi.web.util.ssl.SslContextUtils;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.util.thread.ThreadPool;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIf;
 import org.mockito.Mockito;
 
 import static 
org.apache.nifi.processors.standard.ListenHTTP.RELATIONSHIP_SUCCESS;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestListenHTTP {
 
@@ -114,7 +120,11 @@ public class TestListenHTTP {
 
     private int availablePort;
 
-    @BeforeClass
+    static boolean isTls13Supported() {
+        return TLS_1_3.equals(TlsPlatform.getLatestProtocol());
+    }
+
+    @BeforeAll
     public static void setUpSuite() throws GeneralSecurityException {
         // generate new keystore and truststore
         final TlsConfiguration tlsConfiguration = new 
TemporaryKeyStoreBuilder().build();
@@ -172,7 +182,7 @@ public class TestListenHTTP {
         );
     }
 
-    @Before
+    @BeforeEach
     public void setup() throws IOException {
         proc = new ListenHTTP();
         runner = TestRunners.newTestRunner(proc);
@@ -181,7 +191,7 @@ public class TestListenHTTP {
         runner.setVariable(BASEPATH_VARIABLE, HTTP_BASE_PATH);
     }
 
-    @After
+    @AfterEach
     public void shutdownServer() {
         proc.shutdownHttpServer();
     }
@@ -360,13 +370,14 @@ public class TestListenHTTP {
         startSecureServer();
 
         final SSLSocketFactory sslSocketFactory = 
trustStoreSslContext.getSocketFactory();
-        final SSLSocket sslSocket = (SSLSocket) 
sslSocketFactory.createSocket(LOCALHOST, availablePort);
-        final String currentProtocol = 
serverNoTruststoreConfiguration.getProtocol();
-        sslSocket.setEnabledProtocols(new String[]{currentProtocol});
+        try (final SSLSocket sslSocket = (SSLSocket) 
sslSocketFactory.createSocket(LOCALHOST, availablePort)) {
+            final String currentProtocol = 
serverNoTruststoreConfiguration.getProtocol();
+            sslSocket.setEnabledProtocols(new String[]{currentProtocol});
 
-        sslSocket.startHandshake();
-        final SSLSession sslSession = sslSocket.getSession();
-        assertEquals("SSL Session Protocol not matched", currentProtocol, 
sslSession.getProtocol());
+            sslSocket.startHandshake();
+            final SSLSession sslSession = sslSocket.getSession();
+            assertEquals(currentProtocol, sslSession.getProtocol());
+        }
     }
 
     @Test
@@ -384,12 +395,9 @@ public class TestListenHTTP {
         assertEquals(HttpServletResponse.SC_NO_CONTENT, responseCode);
     }
 
+    @EnabledIf(value = "isTls13Supported", disabledReason = "TLSv1.3 is not 
supported")
     @Test
     public void testSecureServerRejectsUnsupportedTlsProtocolVersion() throws 
Exception {
-        final String currentProtocol = 
TlsConfiguration.getHighestCurrentSupportedTlsProtocolVersion();
-        final String protocolMessage = String.format("TLS Protocol required 
[%s] found [%s]", TLS_1_3, currentProtocol);
-        Assume.assumeTrue(protocolMessage, TLS_1_3.equals(currentProtocol));
-
         
configureProcessorSslContextService(ListenHTTP.ClientAuthentication.AUTO, 
serverTls_1_3_Configuration);
 
         runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
@@ -399,68 +407,58 @@ public class TestListenHTTP {
 
         startWebServer();
         final SSLSocketFactory sslSocketFactory = 
trustStoreSslContext.getSocketFactory();
-        final SSLSocket sslSocket = (SSLSocket) 
sslSocketFactory.createSocket(LOCALHOST, availablePort);
-        sslSocket.setEnabledProtocols(new String[]{TLS_1_2});
+        try (final SSLSocket sslSocket = (SSLSocket) 
sslSocketFactory.createSocket(LOCALHOST, availablePort)) {
+            sslSocket.setEnabledProtocols(new String[]{TLS_1_2});
 
-        assertThrows(SSLHandshakeException.class, sslSocket::startHandshake);
+            assertThrows(SSLHandshakeException.class, 
sslSocket::startHandshake);
+        }
     }
 
     @Test
     public void testMaxThreadPoolSizeTooLow() {
-        // GIVEN, WHEN
         runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
         runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
         runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "7");
 
-        // THEN
         runner.assertNotValid();
     }
 
     @Test
     public void testMaxThreadPoolSizeTooHigh() {
-        // GIVEN, WHEN
         runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
         runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
         runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "1001");
 
-        // THEN
         runner.assertNotValid();
     }
 
     @Test
     public void testMaxThreadPoolSizeOkLowerBound() {
-        // GIVEN, WHEN
         runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
         runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
         runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "8");
 
-        // THEN
         runner.assertValid();
     }
 
     @Test
     public void testMaxThreadPoolSizeOkUpperBound() {
-        // GIVEN, WHEN
         runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
         runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
         runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "1000");
 
-        // THEN
         runner.assertValid();
     }
 
     @Test
     public void 
testMaxThreadPoolSizeSpecifiedInThePropertyIsSetInTheServerInstance() {
-        // GIVEN
         int maxThreadPoolSize = 201;
         runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
         runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
         runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, 
Integer.toString(maxThreadPoolSize));
 
-        // WHEN
         startWebServer();
 
-        // THEN
         Server server = proc.getServer();
         ThreadPool threadPool = server.getThreadPool();
         ThreadPool.SizedThreadPool sizedThreadPool = 
(ThreadPool.SizedThreadPool) threadPool;
@@ -518,6 +516,40 @@ public class TestListenHTTP {
         runner.assertTransferCount(RELATIONSHIP_SUCCESS, 0);
     }
 
+    @Test
+    public void testPostContentEncodingGzipAccepted() throws IOException {
+        runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+        runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+        runner.setProperty(ListenHTTP.RETURN_CODE, 
Integer.toString(HttpServletResponse.SC_NO_CONTENT));
+
+        startWebServer();
+
+        final OkHttpClient okHttpClient = getOkHttpClient(false, false);
+        final Request.Builder requestBuilder = new Request.Builder();
+        final String url = buildUrl(false);
+        requestBuilder.url(url);
+
+        final String message = String.class.getSimpleName();
+        final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        final BufferedSink gzipSink = Okio.buffer(new 
GzipSink(Okio.sink(outputStream)));
+        gzipSink.write(message.getBytes(StandardCharsets.UTF_8));
+        gzipSink.close();
+        final byte[] compressed = outputStream.toByteArray();
+
+        final RequestBody requestBody = RequestBody.create(compressed, 
APPLICATION_OCTET_STREAM);
+        final Request request = requestBuilder.post(requestBody)
+                .addHeader("Content-Encoding", 
ContentEncodingStrategy.GZIP.getValue().toLowerCase())
+                .build();
+
+        try (final Response response = 
okHttpClient.newCall(request).execute()) {
+            assertTrue(response.isSuccessful());
+
+            runner.assertTransferCount(RELATIONSHIP_SUCCESS, 1);
+            final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).iterator().next();
+            flowFile.assertContentEquals(message);
+        }
+    }
+
     private MockRecordParser setupRecordReaderTest() throws 
InitializationException {
         final MockRecordParser parser = new MockRecordParser();
         final MockRecordWriter writer = new MockRecordWriter();
@@ -636,7 +668,7 @@ public class TestListenHTTP {
 
         for (final String message : messages) {
             final int statusCode = postMessage(message, secure, 
clientAuthRequired);
-            assertEquals("HTTP Status Code not matched", expectedStatusCode, 
statusCode);
+            assertEquals(expectedStatusCode, statusCode, "HTTP Status Code not 
matched");
         }
     }
 
@@ -690,7 +722,7 @@ public class TestListenHTTP {
         try (Response response = client.newCall(request).execute()) {
             Files.deleteIfExists(Paths.get(String.valueOf(file1)));
             Files.deleteIfExists(Paths.get(String.valueOf(file2)));
-            Assert.assertTrue(String.format("Unexpected code: %s, body: %s", 
response.code(), response.body()), response.isSuccessful());
+            assertTrue(response.isSuccessful(), String.format("Unexpected 
code: %s, body: %s", response.code(), response.body()));
         }
 
         runner.assertAllFlowFilesTransferred(ListenHTTP.RELATIONSHIP_SUCCESS, 
5);

Reply via email to