This is an automated email from the ASF dual-hosted git repository.
bejancsaba pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 1b1d388ef7 NIFI-10530 MiNiFi: Add option to compress c2 requests
1b1d388ef7 is described below
commit 1b1d388ef778bfb69f9e58e26ca8b179f8d4ff53
Author: Ferenc Kis <[email protected]>
AuthorDate: Wed Sep 21 15:30:09 2022 +0200
NIFI-10530 MiNiFi: Add option to compress c2 requests
This closes #6439
Signed-off-by: Csaba Bejan <[email protected]>
---
.../org/apache/nifi/c2/client/C2ClientConfig.java | 13 ++-
.../apache/nifi/c2/client/http/C2HttpClient.java | 12 ++-
.../nifi/c2/client/http/C2RequestCompression.java | 84 ++++++++++++++++
.../c2/client/http/C2RequestCompressionTest.java | 107 +++++++++++++++++++++
.../main/markdown/minifi-java-agent-quick-start.md | 3 +-
.../src/main/resources/conf/bootstrap.conf | 1 +
.../java/org/apache/nifi/c2/C2NiFiProperties.java | 4 +
.../org/apache/nifi/c2/C2NifiClientService.java | 1 +
8 files changed, 219 insertions(+), 6 deletions(-)
diff --git
a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
index ecf677c399..90f14e8037 100644
---
a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
+++
b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
@@ -40,7 +40,7 @@ public class C2ClientConfig {
private final long callTimeout;
private final long readTimeout;
private final long connectTimeout;
-
+ private final String c2RequestCompression;
private C2ClientConfig(final Builder builder) {
this.c2Url = builder.c2Url;
@@ -62,6 +62,7 @@ public class C2ClientConfig {
this.truststoreType = builder.truststoreType;
this.readTimeout = builder.readTimeout;
this.connectTimeout = builder.connectTimeout;
+ this.c2RequestCompression = builder.c2RequestCompression;
}
public String getC2Url() {
@@ -140,6 +141,10 @@ public class C2ClientConfig {
return connectTimeout;
}
+ public String getC2RequestCompression() {
+ return c2RequestCompression;
+ }
+
/**
* Builder for client configuration.
*/
@@ -164,6 +169,7 @@ public class C2ClientConfig {
private String truststoreType;
private long readTimeout;
private long connectTimeout;
+ private String c2RequestCompression;
public Builder c2Url(final String c2Url) {
this.c2Url = c2Url;
@@ -260,6 +266,11 @@ public class C2ClientConfig {
return this;
}
+ public Builder c2RequestCompression(final String c2RequestCompression)
{
+ this.c2RequestCompression = c2RequestCompression;
+ return this;
+ }
+
public C2ClientConfig build() {
return new C2ClientConfig(this);
}
diff --git
a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
index 0d3b4e6cd3..e9871276a8 100644
---
a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
+++
b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.nifi.c2.client.http;
import java.io.FileInputStream;
@@ -39,17 +40,17 @@ import okhttp3.logging.HttpLoggingInterceptor;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.c2.client.C2ClientConfig;
import org.apache.nifi.c2.client.api.C2Client;
-import org.apache.nifi.c2.serializer.C2Serializer;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.serializer.C2Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class C2HttpClient implements C2Client {
+ static final MediaType MEDIA_TYPE_APPLICATION_JSON =
MediaType.parse("application/json");
private static final Logger logger =
LoggerFactory.getLogger(C2HttpClient.class);
- private static final MediaType MEDIA_TYPE_APPLICATION_JSON =
MediaType.parse("application/json");
private final AtomicReference<OkHttpClient> httpClientReference = new
AtomicReference<>();
private final C2ClientConfig clientConfig;
@@ -126,6 +127,7 @@ public class C2HttpClient implements C2Client {
serializer.serialize(operationAck)
.map(operationAckBody -> RequestBody.create(operationAckBody,
MEDIA_TYPE_APPLICATION_JSON))
.map(requestBody -> new
Request.Builder().post(requestBody).url(clientConfig.getC2AckUrl()).build())
+
.map(C2RequestCompression.forType(clientConfig.getC2RequestCompression())::compress)
.ifPresent(this::sendAck);
}
@@ -136,7 +138,9 @@ public class C2HttpClient implements C2Client {
.url(clientConfig.getC2Url())
.build();
- try (Response heartbeatResponse =
httpClientReference.get().newCall(request).execute()) {
+ Request decoratedRequest =
C2RequestCompression.forType(clientConfig.getC2RequestCompression()).compress(request);
+
+ try (Response heartbeatResponse =
httpClientReference.get().newCall(decoratedRequest).execute()) {
c2HeartbeatResponse =
getResponseBody(heartbeatResponse).flatMap(response ->
serializer.deserialize(response, C2HeartbeatResponse.class));
} catch (IOException ce) {
logger.error("Send Heartbeat failed [{}]",
clientConfig.getC2Url(), ce);
@@ -237,7 +241,7 @@ public class C2HttpClient implements C2Client {
}
private void sendAck(Request request) {
- try(Response heartbeatResponse =
httpClientReference.get().newCall(request).execute()) {
+ try (Response heartbeatResponse =
httpClientReference.get().newCall(request).execute()) {
if (!heartbeatResponse.isSuccessful()) {
logger.warn("Acknowledgement was not successful with c2 server
[{}] with status code {}", clientConfig.getC2AckUrl(),
heartbeatResponse.code());
}
diff --git
a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2RequestCompression.java
b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2RequestCompression.java
new file mode 100644
index 0000000000..d12160ac51
--- /dev/null
+++
b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2RequestCompression.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.http;
+
+import java.io.IOException;
+import java.util.stream.Stream;
+import okhttp3.MediaType;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okio.BufferedSink;
+import okio.GzipSink;
+import okio.Okio;
+
+public enum C2RequestCompression {
+ NONE("none") {
+ @Override
+ public Request compress(Request request) {
+ return request;
+ }
+ },
+ GZIP("gzip") {
+ @Override
+ public Request compress(Request request) {
+ return request.newBuilder()
+ .header(CONTENT_ENCODING_HEADER, GZIP_ENCODING)
+ .method(request.method(), toGzipRequestBody(request.body()))
+ .build();
+ }
+
+ private RequestBody toGzipRequestBody(RequestBody requestBody) {
+ return new RequestBody() {
+ @Override
+ public MediaType contentType() {
+ return requestBody.contentType();
+ }
+
+ @Override
+ public long contentLength() {
+ return -1;
+ }
+
+ @Override
+ public void writeTo(BufferedSink sink) throws IOException {
+ try (BufferedSink bufferedGzipSink = Okio.buffer(new
GzipSink(sink))) {
+ requestBody.writeTo(bufferedGzipSink);
+ }
+ }
+ };
+ }
+ };
+
+ static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
+ static final String GZIP_ENCODING = "gzip";
+
+ private final String compressionType;
+
+ C2RequestCompression(String compressionType) {
+ this.compressionType = compressionType;
+ }
+
+ public static C2RequestCompression forType(String compressionType) {
+ return Stream.of(values())
+ .filter(c2RequestCompression ->
c2RequestCompression.compressionType.equalsIgnoreCase(compressionType))
+ .findAny()
+ .orElse(NONE);
+ }
+
+ public abstract Request compress(Request request);
+}
\ No newline at end of file
diff --git
a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2RequestCompressionTest.java
b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2RequestCompressionTest.java
new file mode 100644
index 0000000000..479fe205aa
--- /dev/null
+++
b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2RequestCompressionTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.http;
+
+import static
org.apache.nifi.c2.client.http.C2HttpClient.MEDIA_TYPE_APPLICATION_JSON;
+import static
org.apache.nifi.c2.client.http.C2RequestCompression.CONTENT_ENCODING_HEADER;
+import static org.apache.nifi.c2.client.http.C2RequestCompression.GZIP;
+import static
org.apache.nifi.c2.client.http.C2RequestCompression.GZIP_ENCODING;
+import static org.apache.nifi.c2.client.http.C2RequestCompression.NONE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.util.stream.Stream;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okio.Buffer;
+import okio.GzipSource;
+import okio.Okio;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+public class C2RequestCompressionTest {
+
+ private static final String DEFAULT_C2_SERVER_URL = "http://localhost/c2";
+ private static final String DEFAULT_POST_BODY = "{ \"field\": \"value\" }";
+
+ private static Stream<Arguments> compressionTypes() {
+ return Stream.of(
+ Arguments.of("none", NONE),
+ Arguments.of("gzip", GZIP),
+ Arguments.of("unknown_compression_type", NONE)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("compressionTypes")
+ public void testAppropriateCompressionTypeIsGivenBackForType(String
compressionType, C2RequestCompression expectedCompression) {
+ assertEquals(expectedCompression,
C2RequestCompression.forType(compressionType));
+ }
+
+ @Test
+ public void testNoneCompressionShouldLeaveRequestBodyIntact() throws
IOException {
+ // given
+ Request request = new Request.Builder()
+ .post(RequestBody.create(DEFAULT_POST_BODY,
MEDIA_TYPE_APPLICATION_JSON))
+ .url(DEFAULT_C2_SERVER_URL)
+ .build();
+
+ // when
+ Request result = NONE.compress(request);
+
+ // then
+
assertTrue(result.body().contentType().toString().contains(MEDIA_TYPE_APPLICATION_JSON.toString()));
+ assertEquals(DEFAULT_POST_BODY,
uncompressedRequestBodyToString(result));
+ }
+
+ @Test
+ public void
testGzipCompressionShouldCompressRequestBodyAndAdjustRequestHeader() throws
IOException {
+ // given
+ Request request = new Request.Builder()
+ .post(RequestBody.create(DEFAULT_POST_BODY,
MEDIA_TYPE_APPLICATION_JSON))
+ .url(DEFAULT_C2_SERVER_URL)
+ .build();
+
+ // when
+ Request result = GZIP.compress(request);
+
+ // then
+
assertTrue(result.body().contentType().toString().contains(MEDIA_TYPE_APPLICATION_JSON.toString()));
+ assertEquals(GZIP_ENCODING,
result.headers().get(CONTENT_ENCODING_HEADER));
+ assertEquals(DEFAULT_POST_BODY, gzippedRequestBodyToString(result));
+ }
+
+ private String uncompressedRequestBodyToString(Request request) throws
IOException {
+ Buffer buffer = requestToBuffer(request);
+ return buffer.readUtf8();
+ }
+
+ private String gzippedRequestBodyToString(Request request) throws
IOException {
+ Buffer buffer = requestToBuffer(request);
+ return Okio.buffer(new GzipSource(buffer)).readUtf8();
+ }
+
+ private Buffer requestToBuffer(Request request) throws IOException {
+ Buffer buffer = new Buffer();
+ request.body().writeTo(buffer);
+ return buffer;
+ }
+}
diff --git
a/minifi/minifi-docs/src/main/markdown/minifi-java-agent-quick-start.md
b/minifi/minifi-docs/src/main/markdown/minifi-java-agent-quick-start.md
index 602686f300..3a24adf096 100644
--- a/minifi/minifi-docs/src/main/markdown/minifi-java-agent-quick-start.md
+++ b/minifi/minifi-docs/src/main/markdown/minifi-java-agent-quick-start.md
@@ -136,7 +136,8 @@ nifi.minifi.notifier.ingestors.file.polling.period.seconds=5
4. Start MiNiFi
5. When a new flow is available on the C2 server, MiNiFi will download it via
C2 and restart itself to pick up the changes
-**Note:** Flow definitions are class based. Each class has one flow defined
for it. As a result, all the agents belonging to the same class will get the
flow at update.
+**Note:** Flow definitions are class based. Each class has one flow defined
for it. As a result, all the agents belonging to the same class will get the
flow at update.<br>
+**Note:** Compression can be turned on for C2 requests by setting
`c2.request.compression=gzip`. Compression is turned off by default when the
parameter is omitted, or when `c2.request.compression=none` is given. It can be
beneficial to turn compression on to prevent network saturation.
## Loading a New Dataflow
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
index 1233a05f1d..3606b67002 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
@@ -158,6 +158,7 @@ java.arg.14=-Djava.awt.headless=true
#c2.security.keystore.location=
#c2.security.keystore.password=
#c2.security.keystore.type=JKS
+#c2.request.compression=none
# The following ingestor configuration needs to be enabled in order to apply
configuration updates coming from C2 server
#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.FileChangeIngestor
#nifi.minifi.notifier.ingestors.file.config.path=./conf/config-new.yml
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java
index 40373436df..be24847438 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java
@@ -39,6 +39,7 @@ public class C2NiFiProperties {
public static final String C2_AGENT_CLASS_KEY = C2_PREFIX + "agent.class";
public static final String C2_AGENT_IDENTIFIER_KEY = C2_PREFIX +
"agent.identifier";
public static final String C2_FULL_HEARTBEAT_KEY = C2_PREFIX +
"full.heartbeat";
+ public static final String C2_REQUEST_COMPRESSION_KEY = C2_PREFIX +
"request.compression";
public static final String C2_ROOT_CLASS_DEFINITIONS_KEY = C2_PREFIX +
"root.class.definitions";
public static final String C2_METRICS_NAME_KEY =
C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.name";
@@ -69,4 +70,7 @@ public class C2NiFiProperties {
public static final String C2_DEFAULT_READ_TIMEOUT = "5 sec";
// Call timeout of 10 seconds
public static final String C2_DEFAULT_CALL_TIMEOUT = "10 sec";
+
+ // C2 request compression is turned off by default
+ public static final String C2_REQUEST_COMPRESSION= "none";
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java
index fe227b1f9e..35d591b192 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java
@@ -115,6 +115,7 @@ public class C2NifiClientService {
.callTimeout((long)
FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_CALL_TIMEOUT,
C2NiFiProperties.C2_DEFAULT_CALL_TIMEOUT),
TimeUnit.MILLISECONDS))
.c2Url(properties.getProperty(C2NiFiProperties.C2_REST_URL_KEY, ""))
+
.c2RequestCompression(properties.getProperty(C2NiFiProperties.C2_REQUEST_COMPRESSION_KEY,
C2NiFiProperties.C2_REQUEST_COMPRESSION))
.confDirectory(properties.getProperty(C2NiFiProperties.C2_CONFIG_DIRECTORY_KEY,
DEFAULT_CONF_DIR))
.runtimeManifestIdentifier(properties.getProperty(C2NiFiProperties.C2_RUNTIME_MANIFEST_IDENTIFIER_KEY,
""))
.runtimeType(properties.getProperty(C2NiFiProperties.C2_RUNTIME_TYPE_KEY, ""))