This is an automated email from the ASF dual-hosted git repository.

bejancsaba pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b1d388ef7 NIFI-10530 MiNiFi: Add option to compress c2 requests
1b1d388ef7 is described below

commit 1b1d388ef778bfb69f9e58e26ca8b179f8d4ff53
Author: Ferenc Kis <[email protected]>
AuthorDate: Wed Sep 21 15:30:09 2022 +0200

    NIFI-10530 MiNiFi: Add option to compress c2 requests
    
    This closes #6439
    
    Signed-off-by: Csaba Bejan <[email protected]>
---
 .../org/apache/nifi/c2/client/C2ClientConfig.java  |  13 ++-
 .../apache/nifi/c2/client/http/C2HttpClient.java   |  12 ++-
 .../nifi/c2/client/http/C2RequestCompression.java  |  84 ++++++++++++++++
 .../c2/client/http/C2RequestCompressionTest.java   | 107 +++++++++++++++++++++
 .../main/markdown/minifi-java-agent-quick-start.md |   3 +-
 .../src/main/resources/conf/bootstrap.conf         |   1 +
 .../java/org/apache/nifi/c2/C2NiFiProperties.java  |   4 +
 .../org/apache/nifi/c2/C2NifiClientService.java    |   1 +
 8 files changed, 219 insertions(+), 6 deletions(-)

diff --git 
a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
 
b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
index ecf677c399..90f14e8037 100644
--- 
a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
+++ 
b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
@@ -40,7 +40,7 @@ public class C2ClientConfig {
     private final long callTimeout;
     private final long readTimeout;
     private final long connectTimeout;
-
+    private final String c2RequestCompression;
 
     private C2ClientConfig(final Builder builder) {
         this.c2Url = builder.c2Url;
@@ -62,6 +62,7 @@ public class C2ClientConfig {
         this.truststoreType = builder.truststoreType;
         this.readTimeout = builder.readTimeout;
         this.connectTimeout = builder.connectTimeout;
+        this.c2RequestCompression = builder.c2RequestCompression;
     }
 
     public String getC2Url() {
@@ -140,6 +141,10 @@ public class C2ClientConfig {
         return connectTimeout;
     }
 
+    public String getC2RequestCompression() {
+        return c2RequestCompression;
+    }
+
     /**
      * Builder for client configuration.
      */
@@ -164,6 +169,7 @@ public class C2ClientConfig {
         private String truststoreType;
         private long readTimeout;
         private long connectTimeout;
+        private String c2RequestCompression;
 
         public Builder c2Url(final String c2Url) {
             this.c2Url = c2Url;
@@ -260,6 +266,11 @@ public class C2ClientConfig {
             return this;
         }
 
+        public Builder c2RequestCompression(final String c2RequestCompression) 
{
+            this.c2RequestCompression = c2RequestCompression;
+            return this;
+        }
+
         public C2ClientConfig build() {
             return new C2ClientConfig(this);
         }
diff --git 
a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
 
b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
index 0d3b4e6cd3..e9871276a8 100644
--- 
a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
+++ 
b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.nifi.c2.client.http;
 
 import java.io.FileInputStream;
@@ -39,17 +40,17 @@ import okhttp3.logging.HttpLoggingInterceptor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.c2.client.C2ClientConfig;
 import org.apache.nifi.c2.client.api.C2Client;
-import org.apache.nifi.c2.serializer.C2Serializer;
 import org.apache.nifi.c2.protocol.api.C2Heartbeat;
 import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
 import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.serializer.C2Serializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class C2HttpClient implements C2Client {
 
+    static final MediaType MEDIA_TYPE_APPLICATION_JSON = 
MediaType.parse("application/json");
     private static final Logger logger = 
LoggerFactory.getLogger(C2HttpClient.class);
-    private static final MediaType MEDIA_TYPE_APPLICATION_JSON = 
MediaType.parse("application/json");
 
     private final AtomicReference<OkHttpClient> httpClientReference = new 
AtomicReference<>();
     private final C2ClientConfig clientConfig;
@@ -126,6 +127,7 @@ public class C2HttpClient implements C2Client {
         serializer.serialize(operationAck)
             .map(operationAckBody -> RequestBody.create(operationAckBody, 
MEDIA_TYPE_APPLICATION_JSON))
             .map(requestBody -> new 
Request.Builder().post(requestBody).url(clientConfig.getC2AckUrl()).build())
+            
.map(C2RequestCompression.forType(clientConfig.getC2RequestCompression())::compress)
             .ifPresent(this::sendAck);
     }
 
@@ -136,7 +138,9 @@ public class C2HttpClient implements C2Client {
             .url(clientConfig.getC2Url())
             .build();
 
-        try (Response heartbeatResponse = 
httpClientReference.get().newCall(request).execute()) {
+        Request decoratedRequest = 
C2RequestCompression.forType(clientConfig.getC2RequestCompression()).compress(request);
+
+        try (Response heartbeatResponse = 
httpClientReference.get().newCall(decoratedRequest).execute()) {
             c2HeartbeatResponse = 
getResponseBody(heartbeatResponse).flatMap(response -> 
serializer.deserialize(response, C2HeartbeatResponse.class));
         } catch (IOException ce) {
             logger.error("Send Heartbeat failed [{}]", 
clientConfig.getC2Url(), ce);
@@ -237,7 +241,7 @@ public class C2HttpClient implements C2Client {
     }
 
     private void sendAck(Request request) {
-        try(Response heartbeatResponse = 
httpClientReference.get().newCall(request).execute()) {
+        try (Response heartbeatResponse = 
httpClientReference.get().newCall(request).execute()) {
             if (!heartbeatResponse.isSuccessful()) {
                 logger.warn("Acknowledgement was not successful with c2 server 
[{}] with status code {}", clientConfig.getC2AckUrl(), 
heartbeatResponse.code());
             }
diff --git 
a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2RequestCompression.java
 
b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2RequestCompression.java
new file mode 100644
index 0000000000..d12160ac51
--- /dev/null
+++ 
b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2RequestCompression.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.http;
+
+import java.io.IOException;
+import java.util.stream.Stream;
+import okhttp3.MediaType;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okio.BufferedSink;
+import okio.GzipSink;
+import okio.Okio;
+
+public enum C2RequestCompression {
+    NONE("none") {
+        @Override
+        public Request compress(Request request) {
+            return request;
+        }
+    },
+    GZIP("gzip") {
+        @Override
+        public Request compress(Request request) {
+            return request.newBuilder()
+                .header(CONTENT_ENCODING_HEADER, GZIP_ENCODING)
+                .method(request.method(), toGzipRequestBody(request.body()))
+                .build();
+        }
+
+        private RequestBody toGzipRequestBody(RequestBody requestBody) {
+            return new RequestBody() {
+                @Override
+                public MediaType contentType() {
+                    return requestBody.contentType();
+                }
+
+                @Override
+                public long contentLength() {
+                    return -1;
+                }
+
+                @Override
+                public void writeTo(BufferedSink sink) throws IOException {
+                    try (BufferedSink bufferedGzipSink = Okio.buffer(new 
GzipSink(sink))) {
+                        requestBody.writeTo(bufferedGzipSink);
+                    }
+                }
+            };
+        }
+    };
+
+    static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
+    static final String GZIP_ENCODING = "gzip";
+
+    private final String compressionType;
+
+    C2RequestCompression(String compressionType) {
+        this.compressionType = compressionType;
+    }
+
+    public static C2RequestCompression forType(String compressionType) {
+        return Stream.of(values())
+            .filter(c2RequestCompression -> 
c2RequestCompression.compressionType.equalsIgnoreCase(compressionType))
+            .findAny()
+            .orElse(NONE);
+    }
+
+    public abstract Request compress(Request request);
+}
\ No newline at end of file
diff --git 
a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2RequestCompressionTest.java
 
b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2RequestCompressionTest.java
new file mode 100644
index 0000000000..479fe205aa
--- /dev/null
+++ 
b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2RequestCompressionTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.http;
+
+import static 
org.apache.nifi.c2.client.http.C2HttpClient.MEDIA_TYPE_APPLICATION_JSON;
+import static 
org.apache.nifi.c2.client.http.C2RequestCompression.CONTENT_ENCODING_HEADER;
+import static org.apache.nifi.c2.client.http.C2RequestCompression.GZIP;
+import static 
org.apache.nifi.c2.client.http.C2RequestCompression.GZIP_ENCODING;
+import static org.apache.nifi.c2.client.http.C2RequestCompression.NONE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.util.stream.Stream;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okio.Buffer;
+import okio.GzipSource;
+import okio.Okio;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+public class C2RequestCompressionTest {
+
+    private static final String DEFAULT_C2_SERVER_URL = "http://localhost/c2";;
+    private static final String DEFAULT_POST_BODY = "{ \"field\": \"value\" }";
+
+    private static Stream<Arguments> compressionTypes() {
+        return Stream.of(
+            Arguments.of("none", NONE),
+            Arguments.of("gzip", GZIP),
+            Arguments.of("unknown_compression_type", NONE)
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("compressionTypes")
+    public void testAppropriateCompressionTypeIsGivenBackForType(String 
compressionType, C2RequestCompression expectedCompression) {
+        assertEquals(expectedCompression, 
C2RequestCompression.forType(compressionType));
+    }
+
+    @Test
+    public void testNoneCompressionShouldLeaveRequestBodyIntact() throws 
IOException {
+        // given
+        Request request = new Request.Builder()
+            .post(RequestBody.create(DEFAULT_POST_BODY, 
MEDIA_TYPE_APPLICATION_JSON))
+            .url(DEFAULT_C2_SERVER_URL)
+            .build();
+
+        // when
+        Request result = NONE.compress(request);
+
+        // then
+        
assertTrue(result.body().contentType().toString().contains(MEDIA_TYPE_APPLICATION_JSON.toString()));
+        assertEquals(DEFAULT_POST_BODY, 
uncompressedRequestBodyToString(result));
+    }
+
+    @Test
+    public void 
testGzipCompressionShouldCompressRequestBodyAndAdjustRequestHeader() throws 
IOException {
+        // given
+        Request request = new Request.Builder()
+            .post(RequestBody.create(DEFAULT_POST_BODY, 
MEDIA_TYPE_APPLICATION_JSON))
+            .url(DEFAULT_C2_SERVER_URL)
+            .build();
+
+        // when
+        Request result = GZIP.compress(request);
+
+        // then
+        
assertTrue(result.body().contentType().toString().contains(MEDIA_TYPE_APPLICATION_JSON.toString()));
+        assertEquals(GZIP_ENCODING, 
result.headers().get(CONTENT_ENCODING_HEADER));
+        assertEquals(DEFAULT_POST_BODY, gzippedRequestBodyToString(result));
+    }
+
+    private String uncompressedRequestBodyToString(Request request) throws 
IOException {
+        Buffer buffer = requestToBuffer(request);
+        return buffer.readUtf8();
+    }
+
+    private String gzippedRequestBodyToString(Request request) throws 
IOException {
+        Buffer buffer = requestToBuffer(request);
+        return Okio.buffer(new GzipSource(buffer)).readUtf8();
+    }
+
+    private Buffer requestToBuffer(Request request) throws IOException {
+        Buffer buffer = new Buffer();
+        request.body().writeTo(buffer);
+        return buffer;
+    }
+}
diff --git 
a/minifi/minifi-docs/src/main/markdown/minifi-java-agent-quick-start.md 
b/minifi/minifi-docs/src/main/markdown/minifi-java-agent-quick-start.md
index 602686f300..3a24adf096 100644
--- a/minifi/minifi-docs/src/main/markdown/minifi-java-agent-quick-start.md
+++ b/minifi/minifi-docs/src/main/markdown/minifi-java-agent-quick-start.md
@@ -136,7 +136,8 @@ nifi.minifi.notifier.ingestors.file.polling.period.seconds=5
 4. Start MiNiFi
 5. When a new flow is available on the C2 server, MiNiFi will download it via 
C2 and restart itself to pick up the changes
 
-**Note:** Flow definitions are class based. Each class has one flow defined 
for it. As a result, all the agents belonging to the same class will get the 
flow at update.
+**Note:** Flow definitions are class based. Each class has one flow defined 
for it. As a result, all the agents belonging to the same class will get the 
flow at update.<br>
+**Note:** Compression can be turned on for C2 requests by setting 
`c2.request.compression=gzip`. Compression is turned off by default when the 
parameter is omitted, or when `c2.request.compression=none` is given. It can be 
beneficial to turn compression on to prevent network saturation.
 
 ## Loading a New Dataflow
 
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
index 1233a05f1d..3606b67002 100644
--- 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
@@ -158,6 +158,7 @@ java.arg.14=-Djava.awt.headless=true
 #c2.security.keystore.location=
 #c2.security.keystore.password=
 #c2.security.keystore.type=JKS
+#c2.request.compression=none
 # The following ingestor configuration needs to be enabled in order to apply 
configuration updates coming from C2 server
 
#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.FileChangeIngestor
 #nifi.minifi.notifier.ingestors.file.config.path=./conf/config-new.yml
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java
index 40373436df..be24847438 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java
@@ -39,6 +39,7 @@ public class C2NiFiProperties {
     public static final String C2_AGENT_CLASS_KEY = C2_PREFIX + "agent.class";
     public static final String C2_AGENT_IDENTIFIER_KEY = C2_PREFIX + 
"agent.identifier";
     public static final String C2_FULL_HEARTBEAT_KEY = C2_PREFIX + 
"full.heartbeat";
+    public static final String C2_REQUEST_COMPRESSION_KEY = C2_PREFIX + 
"request.compression";
 
     public static final String C2_ROOT_CLASS_DEFINITIONS_KEY = C2_PREFIX + 
"root.class.definitions";
     public static final String C2_METRICS_NAME_KEY = 
C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.name";
@@ -69,4 +70,7 @@ public class C2NiFiProperties {
     public static final String C2_DEFAULT_READ_TIMEOUT = "5 sec";
     // Call timeout of 10 seconds
     public static final String C2_DEFAULT_CALL_TIMEOUT = "10 sec";
+
+    // C2 request compression is turned off by default
+    public static final String C2_REQUEST_COMPRESSION= "none";
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java
index fe227b1f9e..35d591b192 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java
@@ -115,6 +115,7 @@ public class C2NifiClientService {
                 .callTimeout((long) 
FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_CALL_TIMEOUT,
                     C2NiFiProperties.C2_DEFAULT_CALL_TIMEOUT), 
TimeUnit.MILLISECONDS))
                 
.c2Url(properties.getProperty(C2NiFiProperties.C2_REST_URL_KEY, ""))
+                
.c2RequestCompression(properties.getProperty(C2NiFiProperties.C2_REQUEST_COMPRESSION_KEY,
 C2NiFiProperties.C2_REQUEST_COMPRESSION))
                 
.confDirectory(properties.getProperty(C2NiFiProperties.C2_CONFIG_DIRECTORY_KEY, 
DEFAULT_CONF_DIR))
                 
.runtimeManifestIdentifier(properties.getProperty(C2NiFiProperties.C2_RUNTIME_MANIFEST_IDENTIFIER_KEY,
 ""))
                 
.runtimeType(properties.getProperty(C2NiFiProperties.C2_RUNTIME_TYPE_KEY, ""))

Reply via email to