This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 6e5a276cb2 NIFI-13718 Switched Request Replication to Web Client
Service (#9234)
6e5a276cb2 is described below
commit 6e5a276cb2166eea1f11dafd702c609dca216464
Author: David Handermann <[email protected]>
AuthorDate: Thu Sep 12 12:50:03 2024 -0500
NIFI-13718 Switched Request Replication to Web Client Service (#9234)
- Added Request Replication Header enumeration with lowercased header names
for HTTP/2
---
.../nifi-framework/nifi-framework-cluster/pom.xml | 26 +-
.../http/StandardHttpResponseMapper.java | 4 +-
.../http/replication/HttpReplicationClient.java | 3 +-
.../http/replication/RequestReplicationHeader.java | 78 +++++
.../http/replication/RequestReplicator.java | 42 ---
.../StandardUploadRequestReplicator.java | 4 +-
.../replication/ThreadPoolRequestReplicator.java | 25 +-
.../PreparedRequestHeader.java} | 25 +-
.../client/StandardHttpReplicationClient.java | 376 +++++++++++++++++++++
.../StandardPreparedRequest.java} | 36 +-
.../{okhttp => io}/EntitySerializer.java | 2 +-
.../{okhttp => io}/JacksonResponse.java | 2 +-
.../{okhttp => io}/JsonEntitySerializer.java | 2 +-
.../{okhttp => io}/XmlEntitySerializer.java | 2 +-
.../http/replication/okhttp/CallEventListener.java | 161 ---------
.../okhttp/OkHttpReplicationClient.java | 327 ------------------
.../okhttp/RequestReplicationEventListener.java | 174 ----------
.../apache/nifi/cluster/manager/NodeResponse.java | 7 -
.../FrameworkClusterConfiguration.java | 6 +-
.../TestThreadPoolRequestReplicator.java | 10 +-
.../client/TestStandardHttpReplicationClient.java | 258 ++++++++++++++
.../{okhttp => io}/TestJsonEntitySerializer.java | 2 +-
.../replication/util/MockReplicationClient.java | 5 +-
.../FlowAnalysisResultEntityMergerTest.java | 8 -
.../org/apache/nifi/asset/AssetsRestApiClient.java | 2 +-
.../org/apache/nifi/client/NiFiRestApiClient.java | 2 +-
.../java/org/apache/nifi/nar/NarRestApiClient.java | 2 +-
.../apache/nifi/web/StandardNiFiContentAccess.java | 3 +-
.../apache/nifi/web/api/ApplicationResource.java | 75 ++--
.../nifi/web/api/ProvenanceEventResource.java | 4 +-
.../security/csrf/SkipReplicatedCsrfFilter.java | 4 +-
.../org/apache/nifi/tests/system/NiFiSystemIT.java | 2 +-
32 files changed, 827 insertions(+), 852 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
index 695caa2f46..41ad7c6040 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
@@ -38,6 +38,11 @@
<artifactId>nifi-framework-components</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-web-client-api</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client</artifactId>
@@ -95,11 +100,6 @@
<version>2.0.0-SNAPSHOT</version>
</dependency>
- <dependency>
- <groupId>org.jetbrains</groupId>
- <artifactId>annotations</artifactId>
- <version>24.1.0</version>
- </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>c2-protocol-component-api</artifactId>
@@ -207,10 +207,6 @@
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
</dependency>
- <dependency>
- <groupId>com.squareup.okhttp3</groupId>
- <artifactId>okhttp</artifactId>
- </dependency>
<!-- spring dependencies -->
<dependency>
@@ -251,6 +247,18 @@
</excludes>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <!-- Add Implementation-Version for User-Agent
Header in replicated requests -->
+
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
+ </manifest>
+ </archive>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
index fb7d2ba0ba..6da275481a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
@@ -95,7 +95,6 @@ import
org.apache.nifi.cluster.coordination.http.endpoints.UserGroupEndpointMerg
import
org.apache.nifi.cluster.coordination.http.endpoints.UserGroupsEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.UsersEndpointMerger;
import
org.apache.nifi.cluster.coordination.http.endpoints.VerifyConfigEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.util.FormatUtils;
@@ -104,6 +103,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.net.HttpURLConnection;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@@ -282,7 +282,7 @@ public class StandardHttpResponseMapper implements
HttpResponseMapper {
responses.stream()
.parallel() // "parallelize" the draining of the responses,
since we have multiple streams to consume
.filter(response -> response != exclude) // don't include the
explicitly excluded node
- .filter(response -> response.getStatus() !=
RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any continue
responses because they contain no content
+ .filter(response -> response.getStatus() !=
HttpURLConnection.HTTP_ACCEPTED) // don't include any continue responses
because they contain no content
.forEach(this::drainResponse); // drain all node responses
that didn't get filtered out
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java
index 0571e66e3c..1b71736175 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java
@@ -18,6 +18,7 @@
package org.apache.nifi.cluster.coordination.http.replication;
import java.io.IOException;
+import java.net.URI;
import java.util.Map;
import jakarta.ws.rs.core.Response;
@@ -26,6 +27,6 @@ public interface HttpReplicationClient {
PreparedRequest prepareRequest(String method, Map<String, String> headers,
Object entity);
- Response replicate(PreparedRequest request, String uri) throws IOException;
+ Response replicate(PreparedRequest request, URI uri) throws IOException;
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicationHeader.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicationHeader.java
new file mode 100644
index 0000000000..3da60e5c06
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicationHeader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http.replication;
+
+/**
+ * Enumeration of HTTP headers for Request Replication with lowercasing for
compatibility with HTTP/2
+ */
+public enum RequestReplicationHeader {
+ /**
+ * Indicator to cancel transaction processing
+ */
+ CANCEL_TRANSACTION("cancel-transaction"),
+
+ /**
+ * Seed for deterministic cluster identifier generation
+ */
+ CLUSTER_ID_GENERATION_SEED("cluster-id-generation-seed"),
+
+ /**
+ * Indicator to continue transaction processing
+ */
+ EXECUTION_CONTINUE("execution-continue"),
+
+ /**
+ * When replicating a request to the cluster coordinator, it may be useful
to denote that the request should
+ * be replicated only to a single node. This happens, for instance, when
retrieving a Provenance Event that
+ * we know lives on a specific node. This request must still be replicated
through the cluster coordinator.
+ * This header tells the cluster coordinator the UUID's (comma-separated
list, possibly with spaces between)
+ * of the nodes that the request should be replicated to.
+ */
+ REPLICATION_TARGET_ID("replication-target-id"),
+
+ /**
+ * When we replicate a request across the cluster, we replicate it only
from the cluster coordinator.
+ * If the request needs to be replicated by another node, it first
replicates the request to the coordinator,
+ * which then replicates the request on the node's behalf. This header
name and value are used to denote
+ * that the request has already been to the cluster coordinator, and the
cluster coordinator is the one replicating
+ * the request. This allows us to know that the request should be
serviced, rather than proxied back to the
+ * cluster coordinator.
+ */
+ REQUEST_REPLICATED("request-replicated"),
+
+ /**
+ * Transaction Identifier for replicated requests
+ */
+ REQUEST_TRANSACTION_ID("request-transaction-id"),
+
+ /**
+ * The HTTP header that the requestor specifies to ask a node if they are
able to process a given request.
+ * The value is always 202-Accepted. The node will respond with 202
ACCEPTED if it is able to
+ * process the request, 417 EXPECTATION_FAILED otherwise.
+ */
+ VALIDATION_EXPECTS("validation-expects");
+
+ private final String header;
+
+ RequestReplicationHeader(final String header) {
+ this.header = header;
+ }
+
+ public String getHeader() {
+ return header;
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
index 1965ad98b6..c6d8bb82a9 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
@@ -26,48 +26,6 @@ import java.util.Map;
import java.util.Set;
public interface RequestReplicator {
-
- public static final String REQUEST_TRANSACTION_ID_HEADER =
"X-RequestTransactionId";
- public static final String CLUSTER_ID_GENERATION_SEED_HEADER =
"X-Cluster-Id-Generation-Seed";
-
- /**
- * The HTTP header that the requestor specifies to ask a node if they are
able to process a given request.
- * The value is always 202-Accepted. The node will respond with 202
ACCEPTED if it is able to
- * process the request, 417 EXPECTATION_FAILED otherwise.
- */
- public static final String REQUEST_VALIDATION_HTTP_HEADER =
"X-Validation-Expects";
- public static final String NODE_CONTINUE = "202-Accepted";
- public static final int NODE_CONTINUE_STATUS_CODE = 202;
-
- /**
- * Indicates that the request is intended to cancel a transaction that was
previously created without performing the action
- */
- public static final String REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER =
"X-Cancel-Transaction";
-
- /**
- * Indicates that this is the second phase of the two phase commit and the
execution of the action should proceed.
- */
- public static final String REQUEST_EXECUTION_HTTP_HEADER =
"X-Execution-Continue";
-
- /**
- * When we replicate a request across the cluster, we replicate it only
from the cluster coordinator.
- * If the request needs to be replicated by another node, it first
replicates the request to the coordinator,
- * which then replicates the request on the node's behalf. This header
name and value are used to denote
- * that the request has already been to the cluster coordinator, and the
cluster coordinator is the one replicating
- * the request. This allows us to know that the request should be
serviced, rather than proxied back to the
- * cluster coordinator.
- */
- public static final String REPLICATION_INDICATOR_HEADER =
"X-Request-Replicated";
-
- /**
- * When replicating a request to the cluster coordinator, it may be useful
to denote that the request should
- * be replicated only to a single node. This happens, for instance, when
retrieving a Provenance Event that
- * we know lives on a specific node. This request must still be replicated
through the cluster coordinator.
- * This header tells the cluster coordinator the UUID's (comma-separated
list, possibly with spaces between)
- * of the nodes that the request should be replicated to.
- */
- public static final String REPLICATION_TARGET_NODE_UUID_HEADER =
"X-Replication-Target-Id";
-
/**
* Stops the instance from replicating requests. Calling this method on a
stopped instance has no effect.
*/
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java
index 25dfaac465..73b93acd9b 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java
@@ -153,8 +153,8 @@ public class StandardUploadRequestReplicator implements
UploadRequestReplicator
.uri(requestUri)
.body(inputStream,
OptionalLong.of(inputStream.available()))
// Special NiFi-specific headers to indicate that the
request should be performed and not replicated to the nodes
- .header(RequestReplicator.REQUEST_EXECUTION_HTTP_HEADER,
"true")
- .header(RequestReplicator.REPLICATION_INDICATOR_HEADER,
"true")
+
.header(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader(),
Boolean.TRUE.toString())
+
.header(RequestReplicationHeader.REQUEST_REPLICATED.getHeader(),
Boolean.TRUE.toString())
.header(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN,
ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user))
.header(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS,
ProxiedEntitiesUtils.buildProxiedEntityGroupsString(user.getIdentityProviderGroups()));
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index ab150aa531..ae4344af32 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -61,6 +61,7 @@ import jakarta.ws.rs.core.Response.Status;
import java.io.Closeable;
import java.io.IOException;
+import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -98,6 +99,7 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator, Closeable
private static final String COOKIE_HEADER = "Cookie";
private static final String HOST_HEADER = "Host";
+ private static final String NODE_CONTINUE = "202-Accepted";
private final int maxConcurrentRequests; // maximum number of concurrent
requests
private final HttpResponseMapper responseMapper;
@@ -276,9 +278,9 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator, Closeable
final boolean indicateReplicated,
final boolean performVerification) {
final Map<String, String> updatedHeaders = new HashMap<>(headers);
-
updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER,
ComponentIdGenerator.generateId().toString());
+
updatedHeaders.put(RequestReplicationHeader.CLUSTER_ID_GENERATION_SEED.getHeader(),
ComponentIdGenerator.generateId().toString());
if (indicateReplicated) {
- updatedHeaders.put(RequestReplicator.REPLICATION_INDICATOR_HEADER,
"true");
+
updatedHeaders.put(RequestReplicationHeader.REQUEST_REPLICATED.getHeader(),
Boolean.TRUE.toString());
}
// include the proxied entities header
@@ -380,7 +382,7 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator, Closeable
// Update headers to indicate the current revision so that we can
// prevent multiple users changing the flow at the same time
final Map<String, String> updatedHeaders = new HashMap<>(headers);
- final String requestId =
updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key ->
UUID.randomUUID().toString());
+ final String requestId = computeRequestId(updatedHeaders);
long verifyClusterStateNanos = -1;
if (performVerification) {
@@ -458,7 +460,7 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator, Closeable
// instruct the node to actually perform the underlying action
if (mutableRequest && executionPhase) {
- updatedHeaders.put(REQUEST_EXECUTION_HTTP_HEADER, "true");
+
updatedHeaders.put(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader(),
"true");
}
// replicate the request to all nodes
@@ -486,13 +488,16 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator, Closeable
}
}
+ private String computeRequestId(final Map<String, String> headers) {
+ return
headers.computeIfAbsent(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader(),
header -> UUID.randomUUID().toString());
+ }
private void performVerification(final Set<NodeIdentifier> nodeIds, final
String method, final URI uri, final Object entity, final Map<String, String>
headers,
final StandardAsyncClusterResponse clusterResponse, final boolean
merge, final Object monitor) {
logger.debug("Verifying that mutable request {} {} can be made",
method, uri.getPath());
final Map<String, String> validationHeaders = new HashMap<>(headers);
- validationHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE);
+
validationHeaders.put(RequestReplicationHeader.VALIDATION_EXPECTS.getHeader(),
NODE_CONTINUE);
final long startNanos = System.nanoTime();
final int numNodes = nodeIds.size();
@@ -523,7 +528,7 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator, Closeable
clusterResponse.addTiming("Verification Completed",
"All Nodes", nanos);
// Check if we have any requests that do not have a
202-Accepted status code.
- final long dissentingCount =
nodeResponses.stream().filter(p -> p.getStatus() !=
NODE_CONTINUE_STATUS_CODE).count();
+ final long dissentingCount =
nodeResponses.stream().filter(p -> p.getStatus() !=
HttpURLConnection.HTTP_ACCEPTED).count();
// If all nodes responded with 202-Accepted, then we
can replicate the original request
// to all nodes and we are finished.
@@ -535,7 +540,7 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator, Closeable
try {
final Map<String, String> cancelLockHeaders = new
HashMap<>(headers);
-
cancelLockHeaders.put(REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER, "true");
+
cancelLockHeaders.put(RequestReplicationHeader.CANCEL_TRANSACTION.getHeader(),
"true");
final Thread cancelLockThread = new Thread(new
Runnable() {
@Override
public void run() {
@@ -554,7 +559,7 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator, Closeable
// Add a NodeResponse for each node to the Cluster
Response
// Check that all nodes responded successfully.
for (final NodeResponse response : nodeResponses) {
- if (response.getStatus() !=
NODE_CONTINUE_STATUS_CODE) {
+ if (response.getStatus() !=
HttpURLConnection.HTTP_ACCEPTED) {
final Response clientResponse =
response.getClientResponse();
final String message;
@@ -645,7 +650,7 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator, Closeable
logger.debug("Replicating request to {} {}, request ID = {}, headers =
{}", request.getMethod(), uri, requestId, request.getHeaders());
// invoke the request
- response = httpClient.replicate(request, uri.toString());
+ response = httpClient.replicate(request, uri);
final long nanos = System.nanoTime() - startNanos;
clusterResponse.addTiming("Perform HTTP Request", nodeId.toString(),
nanos);
@@ -866,7 +871,7 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator, Closeable
try {
// create and send the request
- final String requestId =
request.getHeaders().get("x-nifi-request-id");
+ final String requestId =
request.getHeaders().get(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
logger.debug("Replicating request {} {} to {}", method,
uri.getPath(), nodeId);
nodeResponse = replicateRequest(request, nodeId, uri,
requestId, clusterResponse);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/PreparedRequestHeader.java
similarity index 60%
copy from
nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java
copy to
nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/PreparedRequestHeader.java
index 0571e66e3c..09b175b437 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/PreparedRequestHeader.java
@@ -14,18 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.nifi.cluster.coordination.http.replication.client;
-package org.apache.nifi.cluster.coordination.http.replication;
+/**
+ * Enumeration of HTTP headers for preparing replicated requests
+ */
+enum PreparedRequestHeader {
+ ACCEPT_ENCODING("accept-encoding"),
+
+ CONTENT_ENCODING("content-encoding"),
-import java.io.IOException;
-import java.util.Map;
+ CONTENT_LENGTH("content-length"),
-import jakarta.ws.rs.core.Response;
+ CONTENT_TYPE("content-type"),
-public interface HttpReplicationClient {
+ USER_AGENT("user-agent");
- PreparedRequest prepareRequest(String method, Map<String, String> headers,
Object entity);
+ private final String header;
- Response replicate(PreparedRequest request, String uri) throws IOException;
+ PreparedRequestHeader(final String header) {
+ this.header = header;
+ }
+ String getHeader() {
+ return header;
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/StandardHttpReplicationClient.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/StandardHttpReplicationClient.java
new file mode 100644
index 0000000000..522f235f87
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/StandardHttpReplicationClient.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http.replication.client;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import
com.fasterxml.jackson.module.jakarta.xmlbind.JakartaXmlBindAnnotationIntrospector;
+import jakarta.ws.rs.core.MultivaluedHashMap;
+import jakarta.ws.rs.core.MultivaluedMap;
+import jakarta.ws.rs.core.Response;
+
+import
org.apache.nifi.cluster.coordination.http.replication.HttpReplicationClient;
+import org.apache.nifi.cluster.coordination.http.replication.PreparedRequest;
+import
org.apache.nifi.cluster.coordination.http.replication.io.EntitySerializer;
+import
org.apache.nifi.cluster.coordination.http.replication.io.JacksonResponse;
+import
org.apache.nifi.cluster.coordination.http.replication.io.JsonEntitySerializer;
+import
org.apache.nifi.cluster.coordination.http.replication.io.XmlEntitySerializer;
+import org.apache.nifi.web.client.api.HttpEntityHeaders;
+import org.apache.nifi.web.client.api.HttpRequestBodySpec;
+import org.apache.nifi.web.client.api.HttpRequestMethod;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.api.WebClientService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * Standard HTTP Replication Client based on Web Client Service
+ */
+public class StandardHttpReplicationClient implements HttpReplicationClient {
+ private static final Set<String> REQUEST_BODY_METHODS = Set.of("PATCH",
"POST", "PUT");
+
+ private static final Set<String> DISALLOWED_HEADERS = Set.of("connection",
"content-length", "expect", "host", "upgrade");
+
+ private static final char PSEUDO_HEADER_PREFIX = ':';
+
+ private static final String GZIP_ENCODING = "gzip";
+
+ private static final String QUERY_SEPARATOR = "&";
+
+ private static final String QUERY_NAME_VALUE_SEPARATOR = "=";
+
+ private static final String APPLICATION_JSON_CONTENT_TYPE =
"application/json";
+
+ private static final String APPLICATION_XML_CONTENT_TYPE =
"application/xml";
+
+ private static final String USER_AGENT_PRODUCT = "Apache NiFi";
+
+ private static final String USER_AGENT_FORMAT = "%s/%s";
+
+ private static final String USER_AGENT_VERSION = "SNAPSHOT";
+
+ private static final String USER_AGENT;
+
+ private static final Logger logger =
LoggerFactory.getLogger(StandardHttpReplicationClient.class);
+
+ static {
+ final Package clientPackage =
StandardHttpReplicationClient.class.getPackage();
+ final String userAgentVersion;
+ if (clientPackage == null || clientPackage.getImplementationVersion()
== null) {
+ userAgentVersion = USER_AGENT_VERSION;
+ } else {
+ // Set User Agent Version from JAR MANIFEST.MF Version when found
+ userAgentVersion = clientPackage.getImplementationVersion();
+ }
+ USER_AGENT = USER_AGENT_FORMAT.formatted(USER_AGENT_PRODUCT,
userAgentVersion);
+ }
+
+ private final WebClientService webClientService;
+
+ private final Supplier<HttpUriBuilder> httpUriBuilderSupplier;
+
+ private final EntitySerializer jsonSerializer;
+
+ private final EntitySerializer xmlSerializer;
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ public StandardHttpReplicationClient(final WebClientService
webClientService, final Supplier<HttpUriBuilder> httpUriBuilderSupplier) {
+ this.webClientService = Objects.requireNonNull(webClientService, "Web
Client Service required");
+ this.httpUriBuilderSupplier =
Objects.requireNonNull(httpUriBuilderSupplier, "HTTP URI Builder supplier
required");
+
+
objectMapper.setDefaultPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL,
JsonInclude.Include.ALWAYS));
+ objectMapper.setAnnotationIntrospector(new
JakartaXmlBindAnnotationIntrospector(objectMapper.getTypeFactory()));
+
+ jsonSerializer = new JsonEntitySerializer(objectMapper);
+ xmlSerializer = new XmlEntitySerializer();
+ }
+
+ /**
+ * Prepare Request for Replication with serialized Request Entity
+ *
+ * @param method HTTP Method
+ * @param headers HTTP Request Headers
+ * @param requestEntity Request Entity to be serialized
+ * @return Prepared Request for replication
+ */
+ @Override
+ public PreparedRequest prepareRequest(final String method, final
Map<String, String> headers, final Object requestEntity) {
+ final Map<String, String> preparedHeaders =
getPreparedHeaders(headers, method);
+ final byte[] requestBody = getRequestBody(requestEntity,
preparedHeaders);
+ return new StandardPreparedRequest(method, preparedHeaders,
requestEntity, requestBody);
+ }
+
+ /**
+ * Replicate Prepared HTTP Request to destination URI
+ *
+ * @param request Prepared HTTP Request for replication
+ * @param uri Destination URI for sending the request
+ * @return Jakarta REST Response
+ * @throws IOException Thrown on communication failures sending requests
or retrieving responses
+ */
+ @Override
+ public Response replicate(final PreparedRequest request, final URI uri)
throws IOException {
+ if (request instanceof StandardPreparedRequest preparedRequest) {
+ return replicate(preparedRequest, uri);
+ } else {
+ throw new IllegalArgumentException("HTTP Prepared Request not
provided");
+ }
+ }
+
+ private Map<String, String> getPreparedHeaders(final Map<String, String>
headers, final String method) {
+ final Map<String, String> preparedHeaders = new LinkedHashMap<>();
+
+ for (final Map.Entry<String, String> header : headers.entrySet()) {
+ final String headerName = header.getKey().toLowerCase();
+ if
(PreparedRequestHeader.ACCEPT_ENCODING.getHeader().equals(headerName)) {
+ // Remove Accept-Encoding from original client request in
favor of specific value for replication
+ continue;
+ }
+
+ final String headerValue = header.getValue();
+ preparedHeaders.put(headerName, headerValue);
+ }
+
+ // Set Accept-Encoding to request gzip encoded responses
+ preparedHeaders.put(PreparedRequestHeader.ACCEPT_ENCODING.getHeader(),
GZIP_ENCODING);
+
+ processContentType(method, preparedHeaders);
+ processUserAgent(preparedHeaders);
+ return preparedHeaders;
+ }
+
+ private Response replicate(final StandardPreparedRequest preparedRequest,
final URI location) throws IOException {
+ final HttpRequestMethod requestMethod =
getRequestMethod(preparedRequest);
+ final URI requestUri = getRequestUri(preparedRequest, location);
+
+ final HttpRequestBodySpec httpRequestBodySpec =
webClientService.method(requestMethod).uri(requestUri);
+
+ final Map<String, String> requestHeaders = preparedRequest.headers();
+ for (final Map.Entry<String, String> requestHeader :
requestHeaders.entrySet()) {
+ final String headerName = requestHeader.getKey();
+ final String headerNameLowerCased = headerName.toLowerCase();
+ if (!DISALLOWED_HEADERS.contains(headerNameLowerCased)) {
+ httpRequestBodySpec.header(headerName,
requestHeader.getValue());
+ }
+ }
+
+ if (REQUEST_BODY_METHODS.contains(requestMethod.getMethod())) {
+ final byte[] requestBody = preparedRequest.requestBody();
+ final ByteArrayInputStream body = new
ByteArrayInputStream(requestBody);
+ final OptionalLong contentLength =
OptionalLong.of(requestBody.length);
+ httpRequestBodySpec.body(body, contentLength);
+ }
+
+ return replicate(httpRequestBodySpec, preparedRequest.method(),
location);
+ }
+
+ private Response replicate(final HttpRequestBodySpec httpRequestBodySpec,
final String method, final URI location) throws IOException {
+ final long started = System.currentTimeMillis();
+
+ try (HttpResponseEntity responseEntity =
httpRequestBodySpec.retrieve()) {
+ final int statusCode = responseEntity.statusCode();
+ final HttpEntityHeaders headers = responseEntity.headers();
+ final MultivaluedMap<String, String> responseHeaders =
getResponseHeaders(headers);
+ final byte[] responseBody = getResponseBody(responseEntity.body(),
headers);
+
+ final long elapsed = System.currentTimeMillis() - started;
+ logger.debug("Replicated {} {} HTTP {} in {} ms", method,
location, statusCode, elapsed);
+
+ return new JacksonResponse(objectMapper, responseBody,
responseHeaders, location, statusCode, null);
+ }
+ }
+
+ private URI getRequestUri(final StandardPreparedRequest preparedRequest,
final URI location) {
+ final HttpUriBuilder httpUriBuilder = httpUriBuilderSupplier.get();
+
+ httpUriBuilder.scheme(location.getScheme());
+ httpUriBuilder.host(location.getHost());
+ httpUriBuilder.port(location.getPort());
+ httpUriBuilder.encodedPath(location.getPath());
+
+ final String query = location.getQuery();
+ if (query != null) {
+ final String[] parameters = query.split(QUERY_SEPARATOR);
+ for (final String parameter : parameters) {
+ final String[] parameterNameValue =
parameter.split(QUERY_NAME_VALUE_SEPARATOR);
+ if (parameterNameValue.length == 1) {
+ final String parameterName = parameterNameValue[0];
+ httpUriBuilder.addQueryParameter(parameterName, null);
+ } else if (parameterNameValue.length == 2) {
+ final String parameterName = parameterNameValue[0];
+ final String parameterValue = parameterNameValue[1];
+ httpUriBuilder.addQueryParameter(parameterName,
parameterValue);
+ }
+ }
+ }
+
+ final Object requestEntity = preparedRequest.entity();
+ if (requestEntity instanceof MultivaluedMap<?, ?> parameterEntity) {
+ for (final Object key : parameterEntity.keySet()) {
+ final String parameterName = key.toString();
+ final Object parameterValues =
parameterEntity.get(parameterName);
+ if (parameterValues instanceof List<?> values) {
+ for (final Object value : values) {
+ httpUriBuilder.addQueryParameter(parameterName,
value.toString());
+ }
+ }
+ }
+ }
+
+ return httpUriBuilder.build();
+ }
+
+ private HttpRequestMethod getRequestMethod(final PreparedRequest
preparedRequest) {
+ final String method = preparedRequest.getMethod();
+ return new HttpRequestMethod() {
+ @Override
+ public String getMethod() {
+ return method;
+ }
+
+ @Override
+ public String toString() {
+ return method;
+ }
+ };
+ }
+
+ private MultivaluedMap<String, String> getResponseHeaders(final
HttpEntityHeaders responseHeaders) {
+ final MultivaluedMap<String, String> headers = new
MultivaluedHashMap<>();
+ for (final String name : responseHeaders.getHeaderNames()) {
+ // Remove pseudo-headers returned from HTTP/2 responses
+ if (name.charAt(0) == PSEUDO_HEADER_PREFIX) {
+ continue;
+ }
+ // Remove Content-Encoding Response Header to align with gzip
decoding of Response Body
+ if
(PreparedRequestHeader.CONTENT_ENCODING.getHeader().equalsIgnoreCase(name)) {
+ continue;
+ }
+ // Remove Content-Length Response Header to align with gzip
decoding of Response Body
+ if
(PreparedRequestHeader.CONTENT_LENGTH.getHeader().equalsIgnoreCase(name)) {
+ continue;
+ }
+ final List<String> values = responseHeaders.getHeader(name);
+ headers.addAll(name, values);
+ }
+ return headers;
+ }
+
+ private byte[] getResponseBody(final InputStream inputStream, final
HttpEntityHeaders responseHeaders) throws IOException {
+ final boolean gzipEncoded = isGzipEncoded(responseHeaders);
+
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (InputStream responseBodyStream = gzipEncoded ? new
GZIPInputStream(inputStream) : inputStream) {
+ responseBodyStream.transferTo(outputStream);
+ }
+ return outputStream.toByteArray();
+ }
+
+ private byte[] getRequestBody(final Object requestEntity, final
Map<String, String> headers) {
+ final Optional<String> contentTypeFound = getContentType(headers);
+ final String contentType =
contentTypeFound.orElse(APPLICATION_JSON_CONTENT_TYPE);
+ final EntitySerializer serializer = getSerializer(contentType);
+
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try {
+ serializer.serialize(requestEntity, outputStream);
+ } catch (final IOException e) {
+ throw new UncheckedIOException("Request Entity serialization
failed", e);
+ }
+
+ return outputStream.toByteArray();
+ }
+
+ private void processContentType(final String method, final Map<String,
String> headers) {
+ if (REQUEST_BODY_METHODS.contains(method)) {
+ final Optional<String> contentTypeHeaderFound =
getHeaderName(headers, PreparedRequestHeader.CONTENT_TYPE);
+ if (contentTypeHeaderFound.isEmpty()) {
+ // Set default Content-Type to JSON
+ headers.put(PreparedRequestHeader.CONTENT_TYPE.getHeader(),
APPLICATION_JSON_CONTENT_TYPE);
+ }
+ }
+ }
+
+ private void processUserAgent(final Map<String, String> headers) {
+ final Optional<String> userAgentHeaderFound = getHeaderName(headers,
PreparedRequestHeader.USER_AGENT);
+ final String userAgentHeader =
userAgentHeaderFound.orElseGet(PreparedRequestHeader.USER_AGENT::getHeader);
+ headers.put(userAgentHeader, USER_AGENT);
+ }
+
+ private EntitySerializer getSerializer(final String contentType) {
+ final EntitySerializer serializer;
+
+ if (APPLICATION_XML_CONTENT_TYPE.equalsIgnoreCase(contentType)) {
+ serializer = xmlSerializer;
+ } else {
+ serializer = jsonSerializer;
+ }
+
+ return serializer;
+ }
+
+ private boolean isGzipEncoded(final HttpEntityHeaders headers) {
+ final Optional<String> contentEncodingFound = headers.getHeaderNames()
+ .stream()
+
.filter(PreparedRequestHeader.CONTENT_ENCODING.getHeader()::equalsIgnoreCase)
+ .map(headers::getFirstHeader)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .findFirst();
+
+ return
contentEncodingFound.map(GZIP_ENCODING::equalsIgnoreCase).orElse(false);
+ }
+
+ private Optional<String> getContentType(final Map<String, String> headers)
{
+ final Optional<String> headerNameFound = getHeaderName(headers,
PreparedRequestHeader.CONTENT_TYPE);
+
+ final String header;
+ if (headerNameFound.isPresent()) {
+ final String name = headerNameFound.get();
+ header = headers.get(name);
+ } else {
+ header = null;
+ }
+
+ return Optional.ofNullable(header);
+ }
+
+ private Optional<String> getHeaderName(final Map<String, String> headers,
final PreparedRequestHeader httpHeader) {
+ return headers.keySet()
+ .stream()
+ .filter(httpHeader.getHeader()::equalsIgnoreCase)
+ .findFirst();
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpPreparedRequest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/StandardPreparedRequest.java
similarity index 58%
rename from
nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpPreparedRequest.java
rename to
nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/StandardPreparedRequest.java
index 9b9a699edc..96006cc131 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpPreparedRequest.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/StandardPreparedRequest.java
@@ -15,26 +15,21 @@
* limitations under the License.
*/
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
-
-import java.util.Map;
+package org.apache.nifi.cluster.coordination.http.replication.client;
import org.apache.nifi.cluster.coordination.http.replication.PreparedRequest;
-import okhttp3.RequestBody;
-
-public class OkHttpPreparedRequest implements PreparedRequest {
- private final String method;
- private final Map<String, String> headers;
- private final Object entity;
- private final RequestBody requestBody;
+import java.util.Map;
- public OkHttpPreparedRequest(final String method, final Map<String,
String> headers, final Object entity, final RequestBody requestBody) {
- this.method = method;
- this.headers = headers;
- this.entity = entity;
- this.requestBody = requestBody;
- }
+/**
+ * Standard record implementation of Request prepared for Replication
+ *
+ * @param method HTTP Method
+ * @param headers Map of HTTP Request Headers
+ * @param entity HTTP Request Entity
+ * @param requestBody Serialized Request Body
+ */
+record StandardPreparedRequest(String method, Map<String, String> headers,
Object entity, byte[] requestBody) implements PreparedRequest {
@Override
public String getMethod() {
@@ -50,13 +45,4 @@ public class OkHttpPreparedRequest implements
PreparedRequest {
public Object getEntity() {
return entity;
}
-
- public RequestBody getRequestBody() {
- return requestBody;
- }
-
- @Override
- public String toString() {
- return "OkHttpPreparedRequest[method=" + method + ", headers=" +
headers + "]";
- }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/EntitySerializer.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/EntitySerializer.java
similarity index 93%
rename from
nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/EntitySerializer.java
rename to
nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/EntitySerializer.java
index 19028e393c..703503aa26 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/EntitySerializer.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/EntitySerializer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
+package org.apache.nifi.cluster.coordination.http.replication.io;
import java.io.IOException;
import java.io.OutputStream;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JacksonResponse.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/JacksonResponse.java
similarity index 98%
rename from
nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JacksonResponse.java
rename to
nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/JacksonResponse.java
index be7b1eebb4..7a65846bfc 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JacksonResponse.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/JacksonResponse.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
+package org.apache.nifi.cluster.coordination.http.replication.io;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JsonEntitySerializer.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/JsonEntitySerializer.java
similarity index 95%
rename from
nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JsonEntitySerializer.java
rename to
nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/JsonEntitySerializer.java
index d652291430..8884bc07fa 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JsonEntitySerializer.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/JsonEntitySerializer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
+package org.apache.nifi.cluster.coordination.http.replication.io;
import java.io.IOException;
import java.io.OutputStream;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/XmlEntitySerializer.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/XmlEntitySerializer.java
similarity index 97%
rename from
nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/XmlEntitySerializer.java
rename to
nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/XmlEntitySerializer.java
index e8a005cc4a..122131a95f 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/XmlEntitySerializer.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/XmlEntitySerializer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
+package org.apache.nifi.cluster.coordination.http.replication.io;
import java.io.IOException;
import java.io.OutputStream;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/CallEventListener.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/CallEventListener.java
deleted file mode 100644
index 2ca8596147..0000000000
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/CallEventListener.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
-
-import okhttp3.Call;
-
-import java.net.SocketAddress;
-import java.text.NumberFormat;
-import java.util.HashMap;
-import java.util.Map;
-
-public class CallEventListener {
- private final Call call;
- private final Map<String, Timing> dnsTimings = new HashMap<>();
- private final Map<String, Timing> establishConnectionTiming = new
HashMap<>();
- private long callStart;
- private long callEnd;
- private long responseBodyStart;
- private long responseBodyEnd;
- private long responseHeaderStart;
- private long responseHeaderEnd;
- private long requestHeaderStart;
- private long requestHeaderEnd;
- private long requestBodyStart;
- private long requestBodyEnd;
- private long secureConnectStart;
- private long secureConnectEnd;
-
-
- public CallEventListener(final Call call) {
- this.call = call;
- }
-
- public void callStart() {
- callStart = System.nanoTime();
- }
-
- public void callEnd() {
- callEnd = System.nanoTime();
- }
-
- public void dnsStart(final String domainName) {
- dnsTimings.computeIfAbsent(domainName, k -> new
Timing(domainName)).start();
- }
-
- public void dnsEnd(final String domainName) {
- dnsTimings.computeIfAbsent(domainName, k -> new
Timing(domainName)).end();
- }
-
- public void responseBodyStart() {
- responseBodyStart = System.nanoTime();
- }
-
- public void responseBodyEnd() {
- responseBodyEnd = System.nanoTime();
- }
-
- public void responseHeaderStart() {
- responseHeaderStart = System.nanoTime();
- }
-
- public void responseHeaderEnd() {
- responseHeaderEnd = System.nanoTime();
- }
-
- public void requestHeaderStart() {
- requestHeaderStart = System.nanoTime();
- }
-
- public void requestHeaderEnd() {
- requestHeaderEnd = System.nanoTime();
- }
-
- public void requestBodyStart() {
- requestBodyStart = System.nanoTime();
- }
-
- public void requestBodyEnd() {
- requestBodyEnd = System.nanoTime();
- }
-
- public void connectStart(final SocketAddress address) {
- establishConnectionTiming.computeIfAbsent(address.toString(),
Timing::new).start();
- }
-
- public void connectionAcquired(final SocketAddress address) {
- establishConnectionTiming.computeIfAbsent(address.toString(),
Timing::new).end();
- }
-
- public void secureConnectStart() {
- secureConnectStart = System.nanoTime();
- }
-
- public void secureConnectEnd() {
- secureConnectEnd = System.nanoTime();
- }
-
- public Call getCall() {
- return call;
- }
-
- @Override
- public String toString() {
- final NumberFormat numberFormat = NumberFormat.getInstance();
-
- return "CallEventListener{" +
- "url=" + call.request().url() +
- ", dnsTimings=" + dnsTimings.values() +
- ", establishConnectionTiming=" +
establishConnectionTiming.values() +
- ", tlsInitialization=" + numberFormat.format(secureConnectEnd -
secureConnectStart) + " nanos" +
- ", writeRequestHeaders=" + numberFormat.format(requestHeaderEnd -
requestHeaderStart) + " nanos" +
- ", writeRequestBody=" + numberFormat.format(requestBodyEnd -
requestBodyStart) + " nanos" +
- ", readResponseHeaders=" + numberFormat.format(responseHeaderEnd -
responseHeaderStart) + " nanos" +
- ", readResponseBody=" + numberFormat.format(responseBodyEnd -
responseBodyStart) + " nanos" +
- ", callTime=" + numberFormat.format(callEnd - callStart) + "
nanos" +
- '}';
- }
-
- private static class Timing {
- private final String address;
- private long start;
- private long nanos;
-
- public Timing(final String address) {
- this.address = address;
- }
-
- public String getAddress() {
- return address;
- }
-
- public void start() {
- start = System.nanoTime();
- }
-
- public void end() {
- if (start > 0) {
- nanos += (System.nanoTime() - start);
- }
- }
-
- public String toString() {
- return "{address=" + address + ", nanos=" +
NumberFormat.getInstance().format(nanos) + "}";
- }
- }
-}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
deleted file mode 100644
index 91b7532b9d..0000000000
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
-
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.annotation.JsonInclude.Value;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import
com.fasterxml.jackson.module.jakarta.xmlbind.JakartaXmlBindAnnotationIntrospector;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import java.util.zip.GZIPInputStream;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocketFactory;
-import javax.net.ssl.X509TrustManager;
-import jakarta.ws.rs.HttpMethod;
-import jakarta.ws.rs.core.MultivaluedHashMap;
-import jakarta.ws.rs.core.MultivaluedMap;
-import jakarta.ws.rs.core.Response;
-import okhttp3.Call;
-import okhttp3.ConnectionPool;
-import okhttp3.Headers;
-import okhttp3.HttpUrl;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import org.apache.commons.lang3.StringUtils;
-import
org.apache.nifi.cluster.coordination.http.replication.HttpReplicationClient;
-import org.apache.nifi.cluster.coordination.http.replication.PreparedRequest;
-import org.apache.nifi.remote.protocol.http.HttpHeaders;
-import org.apache.nifi.stream.io.GZIPOutputStream;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.StreamUtils;
-
-public class OkHttpReplicationClient implements HttpReplicationClient {
- private static final Logger logger =
LoggerFactory.getLogger(OkHttpReplicationClient.class);
- private static final Set<String> gzipEncodings = Stream.of("gzip",
"x-gzip").collect(Collectors.toSet());
-
- private final EntitySerializer jsonSerializer;
- private final EntitySerializer xmlSerializer;
-
- private final ObjectMapper jsonCodec = new ObjectMapper();
- private final OkHttpClient okHttpClient;
- private final SSLContext sslContext;
- private final X509TrustManager trustManager;
-
- public OkHttpReplicationClient(
- final NiFiProperties properties,
- final SSLContext sslContext,
- final X509TrustManager trustManager
- ) {
-
jsonCodec.setDefaultPropertyInclusion(Value.construct(Include.NON_NULL,
Include.ALWAYS));
- jsonCodec.setAnnotationIntrospector(new
JakartaXmlBindAnnotationIntrospector(jsonCodec.getTypeFactory()));
-
- jsonSerializer = new JsonEntitySerializer(jsonCodec);
- xmlSerializer = new XmlEntitySerializer();
-
- this.sslContext = sslContext;
- this.trustManager = trustManager;
- okHttpClient = createOkHttpClient(properties);
- }
-
- @Override
- public PreparedRequest prepareRequest(final String method, final
Map<String, String> headers, final Object entity) {
- final boolean gzip = isUseGzip(headers);
- checkContentLengthHeader(method, headers);
- final RequestBody requestBody = createRequestBody(headers, entity,
gzip);
-
- final Map<String, String> updatedHeaders = gzip ?
updateHeadersForGzip(headers) : headers;
- return new OkHttpPreparedRequest(method, updatedHeaders, entity,
requestBody);
- }
-
- /**
- * Checks the content length header on DELETE requests to ensure it is set
to '0', avoiding request timeouts on replicated requests.
- *
- * @param method the HTTP method of the request
- * @param headers the header keys and values
- */
- private void checkContentLengthHeader(String method, Map<String, String>
headers) {
- // Only applies to DELETE requests
- if (HttpMethod.DELETE.equalsIgnoreCase(method)) {
- // Find the Content-Length header if present
- final String CONTENT_LENGTH_HEADER_KEY = "Content-Length";
- Map.Entry<String, String> contentLengthEntry =
headers.entrySet().stream().filter(entry ->
entry.getKey().equalsIgnoreCase(CONTENT_LENGTH_HEADER_KEY)).findFirst().orElse(null);
- // If no CL header, do nothing
- if (contentLengthEntry != null) {
- // If the provided CL value is non-zero, override it
- if (contentLengthEntry.getValue() != null &&
!contentLengthEntry.getValue().equalsIgnoreCase("0")) {
- logger.warn("This is a DELETE request; the provided
Content-Length was {}; setting Content-Length to 0",
contentLengthEntry.getValue());
- headers.put(CONTENT_LENGTH_HEADER_KEY, "0");
- }
- }
- }
- }
-
- @Override
- public Response replicate(final PreparedRequest request, final String uri)
throws IOException {
- if (!(Objects.requireNonNull(request) instanceof
OkHttpPreparedRequest)) {
- throw new IllegalArgumentException("Replication Client is only
able to replicate requests that the client itself has prepared");
- }
-
- return replicate((OkHttpPreparedRequest) request, uri);
- }
-
- private Response replicate(final OkHttpPreparedRequest request, final
String uri) throws IOException {
- logger.debug("Replicating request {} to {}", request, uri);
- final Call call = createCall(request, uri);
- final okhttp3.Response callResponse = call.execute();
-
- final byte[] responseBytes = getResponseBytes(callResponse);
- final MultivaluedMap<String, String> responseHeaders =
getHeaders(callResponse);
- logger.debug("Received response code {} with headers {} for request {}
to {}", callResponse.code(), responseHeaders, request, uri);
-
- final Response response = new JacksonResponse(jsonCodec,
responseBytes, responseHeaders, URI.create(uri), callResponse.code(),
callResponse::close);
- return response;
- }
-
- private MultivaluedMap<String, String> getHeaders(final okhttp3.Response
callResponse) {
- final Headers headers = callResponse.headers();
- final MultivaluedMap<String, String> headerMap = new
MultivaluedHashMap<>();
- for (final String name : headers.names()) {
- final List<String> values = headers.values(name);
- headerMap.addAll(name, values);
- }
-
- return headerMap;
- }
-
- private byte[] getResponseBytes(final okhttp3.Response callResponse)
throws IOException {
- final byte[] rawBytes = callResponse.body().bytes();
-
- final String contentEncoding = callResponse.header("Content-Encoding");
- if (gzipEncodings.contains(contentEncoding)) {
- try (final InputStream gzipIn = new GZIPInputStream(new
ByteArrayInputStream(rawBytes));
- final ByteArrayOutputStream baos = new
ByteArrayOutputStream()) {
-
- StreamUtils.copy(gzipIn, baos);
- return baos.toByteArray();
- }
- } else {
- return rawBytes;
- }
- }
-
- private Call createCall(final OkHttpPreparedRequest request, final String
uri) {
- Request.Builder requestBuilder = new Request.Builder();
-
- final HttpUrl url = buildUrl(request, uri);
- requestBuilder = requestBuilder.url(url);
-
- // build the request body
- final String method = request.getMethod().toUpperCase();
- switch (method) {
- case "POST":
- case "PUT":
- case "PATCH":
- requestBuilder = requestBuilder.method(method,
request.getRequestBody());
- break;
- default:
- requestBuilder = requestBuilder.method(method, null);
- break;
- }
-
- // Add appropriate headers
- for (final Map.Entry<String, String> header :
request.getHeaders().entrySet()) {
- requestBuilder = requestBuilder.addHeader(header.getKey(),
header.getValue());
- }
-
- // Build the request
- final Request okHttpRequest = requestBuilder.build();
- final Call call = okHttpClient.newCall(okHttpRequest);
- return call;
- }
-
-
- @SuppressWarnings("unchecked")
- private HttpUrl buildUrl(final OkHttpPreparedRequest request, final String
uri) {
- HttpUrl.Builder urlBuilder = HttpUrl.parse(uri).newBuilder();
- switch (request.getMethod().toUpperCase()) {
- case HttpMethod.DELETE:
- case HttpMethod.HEAD:
- case HttpMethod.GET:
- case HttpMethod.OPTIONS:
- if (request.getEntity() instanceof MultivaluedMap) {
- final MultivaluedMap<String, String> entityMap =
(MultivaluedMap<String, String>) request.getEntity();
-
- for (final Entry<String, List<String>> queryEntry :
entityMap.entrySet()) {
- final String queryName = queryEntry.getKey();
- for (final String queryValue : queryEntry.getValue()) {
- urlBuilder =
urlBuilder.addQueryParameter(queryName, queryValue);
- }
- }
- }
-
- break;
- }
-
- return urlBuilder.build();
- }
-
- private RequestBody createRequestBody(final Map<String, String> headers,
final Object entity, final boolean gzip) {
- final String contentType = getContentType(headers, "application/json");
- final byte[] serialized = serializeEntity(entity, contentType, gzip);
-
- final MediaType mediaType = MediaType.parse(contentType);
- return RequestBody.create(serialized, mediaType);
- }
-
- private String getContentType(final Map<String, String> headers, final
String defaultValue) {
- for (final Map.Entry<String, String> entry : headers.entrySet()) {
- if (entry.getKey().equalsIgnoreCase("content-type")) {
- return entry.getValue();
- }
- }
-
- return defaultValue;
- }
-
- private byte[] serializeEntity(final Object entity, final String
contentType, final boolean gzip) {
- try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final OutputStream out = gzip ? new GZIPOutputStream(baos, 1) :
baos) {
-
- getSerializer(contentType).serialize(entity, out);
- out.close();
-
- return baos.toByteArray();
- } catch (final IOException e) {
- // This should never happen with a ByteArrayOutputStream
- throw new RuntimeException("Failed to serialize entity for cluster
replication", e);
- }
- }
-
- private EntitySerializer getSerializer(final String contentType) {
- switch (contentType.toLowerCase()) {
- case "application/xml":
- return xmlSerializer;
- case "application/json":
- default:
- return jsonSerializer;
- }
- }
-
-
- private Map<String, String> updateHeadersForGzip(final Map<String, String>
headers) {
- final String encodingHeader = headers.get("Content-Encoding");
- if (gzipEncodings.contains(encodingHeader)) {
- return headers;
- }
-
- final Map<String, String> updatedHeaders = new HashMap<>(headers);
- updatedHeaders.put("Content-Encoding", "gzip");
- return updatedHeaders;
- }
-
-
- private boolean isUseGzip(final Map<String, String> headers) {
- String rawAcceptEncoding = headers.get(HttpHeaders.ACCEPT_ENCODING);
-
- if (rawAcceptEncoding == null) {
- rawAcceptEncoding =
headers.get(HttpHeaders.ACCEPT_ENCODING.toLowerCase());
- }
-
- if (rawAcceptEncoding == null) {
- return false;
- } else {
- final String[] acceptEncodingTokens = rawAcceptEncoding.split(",");
- return Stream.of(acceptEncodingTokens)
- .map(String::trim)
- .filter(StringUtils::isNotEmpty)
- .map(String::toLowerCase)
- .anyMatch(gzipEncodings::contains);
- }
- }
-
- private OkHttpClient createOkHttpClient(final NiFiProperties properties) {
- final String connectionTimeout =
properties.getClusterNodeConnectionTimeout();
- final long connectionTimeoutMs = (long)
FormatUtils.getPreciseTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS);
- final String readTimeout = properties.getClusterNodeReadTimeout();
- final long readTimeoutMs = (long)
FormatUtils.getPreciseTimeDuration(readTimeout, TimeUnit.MILLISECONDS);
-
- OkHttpClient.Builder okHttpClientBuilder = new
OkHttpClient().newBuilder();
- okHttpClientBuilder.connectTimeout(connectionTimeoutMs,
TimeUnit.MILLISECONDS);
- okHttpClientBuilder.readTimeout(readTimeoutMs, TimeUnit.MILLISECONDS);
- okHttpClientBuilder.followRedirects(true);
- final int connectionPoolSize =
properties.getClusterNodeMaxConcurrentRequests();
- okHttpClientBuilder.connectionPool(new
ConnectionPool(connectionPoolSize, 5, TimeUnit.MINUTES));
- okHttpClientBuilder.eventListener(new
RequestReplicationEventListener());
-
- if (sslContext != null) {
- final SSLSocketFactory sslSocketFactory =
sslContext.getSocketFactory();
- okHttpClientBuilder.sslSocketFactory(sslSocketFactory,
trustManager);
- }
-
- return okHttpClientBuilder.build();
- }
-}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/RequestReplicationEventListener.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/RequestReplicationEventListener.java
deleted file mode 100644
index 209a10b5ec..0000000000
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/RequestReplicationEventListener.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
-
-import okhttp3.Call;
-import okhttp3.Connection;
-import okhttp3.EventListener;
-import okhttp3.Handshake;
-import okhttp3.Request;
-import okhttp3.Response;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Proxy;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class RequestReplicationEventListener extends EventListener {
- private static final Logger logger =
LoggerFactory.getLogger(RequestReplicationEventListener.class);
-
- private final ConcurrentMap<Call, CallEventListener> eventListeners = new
ConcurrentHashMap<>();
-
- private CallEventListener getListener(final Call call) {
- return eventListeners.computeIfAbsent(call, CallEventListener::new);
- }
-
- @Override
- public void dnsStart(@NotNull final Call call, @NotNull final String
domainName) {
- super.dnsStart(call, domainName);
- getListener(call).dnsStart(domainName);
- }
-
- @Override
- public void dnsEnd(@NotNull final Call call, @NotNull final String
domainName, @NotNull final List<InetAddress> inetAddressList) {
- super.dnsEnd(call, domainName, inetAddressList);
- getListener(call).dnsEnd(domainName);
- }
-
- @Override
- public void callStart(@NotNull final Call call) {
- super.callStart(call);
- getListener(call).callStart();
- }
-
- @Override
- public void callEnd(@NotNull final Call call) {
- super.callEnd(call);
- final CallEventListener callListener = getListener(call);
- callListener.callEnd();
-
- logTimingInfo(callListener);
- eventListeners.remove(call);
- }
-
- @Override
- public void callFailed(@NotNull final Call call, @NotNull final
IOException ioe) {
- super.callFailed(call, ioe);
-
- final CallEventListener callListener = getListener(call);
- callListener.callEnd();
-
- logTimingInfo(callListener);
- eventListeners.remove(call);
- }
-
- @Override
- public void responseBodyStart(@NotNull final Call call) {
- super.responseBodyStart(call);
- getListener(call).responseBodyStart();
- }
-
- @Override
- public void responseBodyEnd(@NotNull final Call call, final long
byteCount) {
- super.responseBodyEnd(call, byteCount);
- getListener(call).responseBodyEnd();
- }
-
- @Override
- public void responseFailed(@NotNull final Call call, @NotNull final
IOException ioe) {
- super.responseFailed(call, ioe);
- getListener(call).responseBodyEnd();
- }
-
- @Override
- public void responseHeadersStart(@NotNull final Call call) {
- super.responseHeadersStart(call);
- getListener(call).responseHeaderStart();
- }
-
- @Override
- public void responseHeadersEnd(@NotNull final Call call, @NotNull final
Response response) {
- super.responseHeadersEnd(call, response);
- getListener(call).responseHeaderEnd();
- }
-
- @Override
- public void requestHeadersStart(@NotNull final Call call) {
- super.requestHeadersStart(call);
- getListener(call).requestHeaderStart();
- }
-
- @Override
- public void requestHeadersEnd(@NotNull final Call call, @NotNull final
Request request) {
- super.requestHeadersEnd(call, request);
- getListener(call).requestHeaderEnd();
- }
-
- @Override
- public void requestBodyStart(@NotNull final Call call) {
- super.requestBodyStart(call);
- getListener(call).requestBodyStart();
- }
-
- @Override
- public void requestBodyEnd(@NotNull final Call call, final long byteCount)
{
- super.requestBodyEnd(call, byteCount);
- getListener(call).requestBodyEnd();
- }
-
- @Override
- public void requestFailed(@NotNull final Call call, @NotNull final
IOException ioe) {
- super.requestFailed(call, ioe);
- getListener(call).requestBodyEnd();
- }
-
- @Override
- public void connectStart(@NotNull final Call call, @NotNull final
InetSocketAddress inetSocketAddress, @NotNull final Proxy proxy) {
- super.connectStart(call, inetSocketAddress, proxy);
- getListener(call).connectStart(inetSocketAddress);
- }
-
- @Override
- public void connectionAcquired(@NotNull final Call call, @NotNull final
Connection connection) {
- super.connectionAcquired(call, connection);
-
getListener(call).connectionAcquired(connection.socket().getRemoteSocketAddress());
- }
-
- @Override
- public void secureConnectStart(@NotNull final Call call) {
- super.secureConnectStart(call);
- getListener(call).secureConnectStart();
- }
-
- @Override
- public void secureConnectEnd(@NotNull final Call call, @Nullable final
Handshake handshake) {
- super.secureConnectEnd(call, handshake);
- getListener(call).secureConnectEnd();
- }
-
- private void logTimingInfo(final CallEventListener eventListener) {
- logger.debug("Timing information {}", eventListener);
- }
-}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
index 8135b444cd..d0ce10d6f5 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
@@ -245,13 +245,6 @@ public class NodeResponse {
* the content-length. Let the outgoing response builder
determine it.
*/
continue;
- } else if (key.equals("X-ClusterContext")) {
- /*
- * do not copy the cluster context to the response because
- * this information is private and should not be sent to
- * the client
- */
- continue;
}
responseBuilder.header(key, value);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/framework/configuration/FrameworkClusterConfiguration.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/framework/configuration/FrameworkClusterConfiguration.java
index 10ecf5ccd7..ad4ded6321 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/framework/configuration/FrameworkClusterConfiguration.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/framework/configuration/FrameworkClusterConfiguration.java
@@ -19,15 +19,17 @@ package org.apache.nifi.framework.configuration;
import org.apache.nifi.cluster.ClusterDetailsFactory;
import org.apache.nifi.cluster.StandardClusterDetailsFactory;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import
org.apache.nifi.cluster.coordination.http.replication.HttpReplicationClient;
import
org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
import
org.apache.nifi.cluster.coordination.http.replication.StandardUploadRequestReplicator;
import
org.apache.nifi.cluster.coordination.http.replication.ThreadPoolRequestReplicator;
import
org.apache.nifi.cluster.coordination.http.replication.UploadRequestReplicator;
-import
org.apache.nifi.cluster.coordination.http.replication.okhttp.OkHttpReplicationClient;
+import
org.apache.nifi.cluster.coordination.http.replication.client.StandardHttpReplicationClient;
import org.apache.nifi.cluster.lifecycle.ClusterDecommissionTask;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.client.StandardHttpUriBuilder;
import org.apache.nifi.web.client.api.WebClientService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
@@ -87,7 +89,7 @@ public class FrameworkClusterConfiguration {
if (clusterCoordinator == null) {
replicator = null;
} else {
- final OkHttpReplicationClient replicationClient = new
OkHttpReplicationClient(properties, sslContext, trustManager);
+ final HttpReplicationClient replicationClient = new
StandardHttpReplicationClient(webClientService, StandardHttpUriBuilder::new);
replicator = new ThreadPoolRequestReplicator(
properties.getClusterNodeProtocolMaxPoolSize(),
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index 6524edec96..0db0287e62 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -72,6 +72,8 @@ import static org.mockito.Mockito.when;
public class TestThreadPoolRequestReplicator {
+ private static final String NODE_CONTINUE = "202-Accepted";
+
@BeforeAll
public static void setupClass() {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH,
"src/test/resources/conf/nifi.properties");
@@ -278,11 +280,11 @@ public class TestThreadPoolRequestReplicator {
protected NodeResponse replicateRequest(final PreparedRequest
request, final NodeIdentifier nodeId,
final URI uri, final String requestId, final
StandardAsyncClusterResponse response) {
// the resource builder will not expose its headers to us, so
we are using Mockito's Whitebox class to extract them.
- final Object expectsHeader =
request.getHeaders().get(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
+ final Object expectsHeader =
request.getHeaders().get(RequestReplicationHeader.VALIDATION_EXPECTS.getHeader());
final int statusCode;
if (requestCount.incrementAndGet() == 1) {
- assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE,
expectsHeader);
+ assertEquals(NODE_CONTINUE, expectsHeader);
statusCode = Status.ACCEPTED.getStatusCode();
} else {
assertNull(expectsHeader);
@@ -343,10 +345,10 @@ public class TestThreadPoolRequestReplicator {
protected NodeResponse replicateRequest(final PreparedRequest
request, final NodeIdentifier nodeId,
final URI uri, final String requestId, final
StandardAsyncClusterResponse response) {
// the resource builder will not expose its headers to us, so
we are using Mockito's Whitebox class to extract them.
- final Object expectsHeader =
request.getHeaders().get(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
+ final Object expectsHeader =
request.getHeaders().get(RequestReplicationHeader.VALIDATION_EXPECTS.getHeader());
final int requestIndex = requestCount.incrementAndGet();
- assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE,
expectsHeader);
+ assertEquals(NODE_CONTINUE, expectsHeader);
if (requestIndex == 1) {
final Response clientResponse = mock(Response.class);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/client/TestStandardHttpReplicationClient.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/client/TestStandardHttpReplicationClient.java
new file mode 100644
index 0000000000..485011309a
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/client/TestStandardHttpReplicationClient.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http.replication.client;
+
+import jakarta.ws.rs.core.MultivaluedHashMap;
+import jakarta.ws.rs.core.MultivaluedMap;
+import jakarta.ws.rs.core.Response;
+import org.apache.nifi.cluster.coordination.http.replication.PreparedRequest;
+import org.apache.nifi.web.client.StandardHttpUriBuilder;
+import org.apache.nifi.web.client.api.HttpEntityHeaders;
+import org.apache.nifi.web.client.api.HttpRequestBodySpec;
+import org.apache.nifi.web.client.api.HttpRequestUriSpec;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.WebClientService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class TestStandardHttpReplicationClient {
+
+ private static final String GET_METHOD = "GET";
+
+ private static final String POST_METHOD = "POST";
+
+ private static final byte[] EMPTY_MAP_SERIALIZED = new byte[]{123, 125};
+
+ private static final String CONTENT_TYPE_LOWERCASED = "content-type";
+
+ private static final String APPLICATION_JSON = "application/json";
+
+ private static final URI REPLICATE_URI =
URI.create("http://localhost/nifi-api/flow/current-user");
+
+ private static final String URI_QUERY = "recursive=false";
+
+ private static final URI REPLICATE_URI_QUERY =
URI.create("http://localhost/nifi-api/flow/process-groups/root/status?%s".formatted(URI_QUERY));
+
+ private static final String QUERY_PARAMETER_NAME = "revision";
+
+ private static final String QUERY_PARAMETER_VALUE = "1";
+
+ private static final String QUERY_EXPECTED =
"%s=%s".formatted(QUERY_PARAMETER_NAME, QUERY_PARAMETER_VALUE);
+
+ private static final String STATUS_PSEUDO_HEADER = ":status";
+
+ @Mock
+ private WebClientService webClientService;
+
+ @Mock
+ private HttpRequestUriSpec httpRequestUriSpec;
+
+ @Mock
+ private HttpRequestBodySpec httpRequestBodySpec;
+
+ @Mock
+ private HttpResponseEntity httpResponseEntity;
+
+ @Mock
+ private HttpEntityHeaders httpResponseHeaders;
+
+ @Captor
+ private ArgumentCaptor<URI> uriCaptor;
+
+ private StandardHttpReplicationClient client;
+
+ @BeforeEach
+ void setClient() {
+ client = new StandardHttpReplicationClient(webClientService,
StandardHttpUriBuilder::new);
+ }
+
+ @Test
+ void testPrepareRequest() {
+ final Map<String, String> headers = Collections.emptyMap();
+ final Map<String, String> requestEntity = Collections.emptyMap();
+ final PreparedRequest preparedRequest =
client.prepareRequest(GET_METHOD, headers, requestEntity);
+
+ assertNotNull(preparedRequest);
+ assertInstanceOf(StandardPreparedRequest.class, preparedRequest);
+
+ assertEquals(GET_METHOD, preparedRequest.getMethod());
+ assertNotEquals(headers, preparedRequest.getHeaders());
+ assertEquals(requestEntity, preparedRequest.getEntity());
+
+ final StandardPreparedRequest standardPreparedRequest =
(StandardPreparedRequest) preparedRequest;
+ assertArrayEquals(EMPTY_MAP_SERIALIZED,
standardPreparedRequest.requestBody());
+ }
+
+ @Test
+ void testReplicateIllegalArgumentException() {
+ assertThrows(IllegalArgumentException.class, () ->
client.replicate(null, REPLICATE_URI));
+ }
+
+ @Test
+ void testReplicate() throws IOException {
+ final Map<String, String> headers = Map.of(CONTENT_TYPE_LOWERCASED,
APPLICATION_JSON);
+ final Map<String, String> requestEntity = Collections.emptyMap();
+ final PreparedRequest preparedRequest =
client.prepareRequest(GET_METHOD, headers, requestEntity);
+
+ when(webClientService.method(any())).thenReturn(httpRequestUriSpec);
+ when(httpRequestUriSpec.uri(any())).thenReturn(httpRequestBodySpec);
+ when(httpRequestBodySpec.header(anyString(),
anyString())).thenReturn(httpRequestBodySpec);
+ when(httpRequestBodySpec.retrieve()).thenReturn(httpResponseEntity);
+
+ when(httpResponseEntity.statusCode()).thenReturn(HTTP_OK);
+ when(httpResponseEntity.headers()).thenReturn(httpResponseHeaders);
+
+ final Set<String> responseHeaderNames = Set.of(
+ PreparedRequestHeader.CONTENT_TYPE.getHeader(),
+ PreparedRequestHeader.CONTENT_ENCODING.getHeader(),
+ PreparedRequestHeader.CONTENT_LENGTH.getHeader(),
+ STATUS_PSEUDO_HEADER
+ );
+
when(httpResponseHeaders.getHeaderNames()).thenReturn(responseHeaderNames);
+
when(httpResponseHeaders.getHeader(eq(PreparedRequestHeader.CONTENT_TYPE.getHeader()))).thenReturn(List.of(APPLICATION_JSON));
+
+ final ByteArrayInputStream responseBody = new
ByteArrayInputStream(EMPTY_MAP_SERIALIZED);
+ when(httpResponseEntity.body()).thenReturn(responseBody);
+
+ final Response response = client.replicate(preparedRequest,
REPLICATE_URI);
+
+ assertResponseFound(response);
+
+ final String responseContentType =
response.getHeaderString(PreparedRequestHeader.CONTENT_TYPE.getHeader());
+ assertEquals(APPLICATION_JSON, responseContentType);
+
+ final String responseStatusHeader =
response.getHeaderString(STATUS_PSEUDO_HEADER);
+ assertNull(responseStatusHeader);
+
+ final String contentEncodingHeader =
response.getHeaderString(PreparedRequestHeader.CONTENT_ENCODING.getHeader());
+ assertNull(contentEncodingHeader);
+
+ final String contentLengthHeader =
response.getHeaderString(PreparedRequestHeader.CONTENT_LENGTH.getHeader());
+ assertNull(contentLengthHeader);
+ }
+
+ @Test
+ void testReplicatePostBody() throws IOException {
+ final Map<String, String> headers = Map.of(CONTENT_TYPE_LOWERCASED,
APPLICATION_JSON);
+ final Map<String, String> requestEntity = Collections.emptyMap();
+ final PreparedRequest preparedRequest =
client.prepareRequest(POST_METHOD, headers, requestEntity);
+
+ when(webClientService.method(any())).thenReturn(httpRequestUriSpec);
+ when(httpRequestUriSpec.uri(any())).thenReturn(httpRequestBodySpec);
+ when(httpRequestBodySpec.header(anyString(),
anyString())).thenReturn(httpRequestBodySpec);
+ when(httpRequestBodySpec.retrieve()).thenReturn(httpResponseEntity);
+
+ when(httpResponseEntity.statusCode()).thenReturn(HTTP_OK);
+ when(httpResponseEntity.headers()).thenReturn(httpResponseHeaders);
+
+ final ByteArrayInputStream responseBody = new
ByteArrayInputStream(EMPTY_MAP_SERIALIZED);
+ when(httpResponseEntity.body()).thenReturn(responseBody);
+
+ final Response response = client.replicate(preparedRequest,
REPLICATE_URI);
+
+ assertResponseFound(response);
+ }
+
+ @Test
+ void testReplicateGetMultivaluedMap() throws IOException {
+ final Map<String, String> headers =
Map.of(PreparedRequestHeader.CONTENT_TYPE.getHeader(), APPLICATION_JSON);
+
+ final MultivaluedMap<String, String> requestEntity = new
MultivaluedHashMap<>();
+ requestEntity.add(QUERY_PARAMETER_NAME, QUERY_PARAMETER_VALUE);
+ final PreparedRequest preparedRequest =
client.prepareRequest(GET_METHOD, headers, requestEntity);
+
+ when(webClientService.method(any())).thenReturn(httpRequestUriSpec);
+ when(httpRequestUriSpec.uri(any())).thenReturn(httpRequestBodySpec);
+ when(httpRequestBodySpec.header(anyString(),
anyString())).thenReturn(httpRequestBodySpec);
+ when(httpRequestBodySpec.retrieve()).thenReturn(httpResponseEntity);
+
+ when(httpResponseEntity.statusCode()).thenReturn(HTTP_OK);
+ when(httpResponseEntity.headers()).thenReturn(httpResponseHeaders);
+
+ final ByteArrayInputStream responseBody = new
ByteArrayInputStream(EMPTY_MAP_SERIALIZED);
+ when(httpResponseEntity.body()).thenReturn(responseBody);
+
+ final Response response = client.replicate(preparedRequest,
REPLICATE_URI);
+
+ assertResponseFound(response);
+
+ verify(httpRequestUriSpec).uri(uriCaptor.capture());
+
+ final URI requestUri = uriCaptor.getValue();
+ assertEquals(QUERY_EXPECTED, requestUri.getQuery());
+ }
+
+ @Test
+ void testReplicateGetUriQuery() throws IOException {
+ final Map<String, String> headers =
Map.of(PreparedRequestHeader.CONTENT_TYPE.getHeader(), APPLICATION_JSON);
+
+ final PreparedRequest preparedRequest =
client.prepareRequest(GET_METHOD, headers, Collections.emptyMap());
+
+ when(webClientService.method(any())).thenReturn(httpRequestUriSpec);
+ when(httpRequestUriSpec.uri(any())).thenReturn(httpRequestBodySpec);
+ when(httpRequestBodySpec.header(anyString(),
anyString())).thenReturn(httpRequestBodySpec);
+ when(httpRequestBodySpec.retrieve()).thenReturn(httpResponseEntity);
+
+ when(httpResponseEntity.statusCode()).thenReturn(HTTP_OK);
+ when(httpResponseEntity.headers()).thenReturn(httpResponseHeaders);
+
+ final ByteArrayInputStream responseBody = new
ByteArrayInputStream(EMPTY_MAP_SERIALIZED);
+ when(httpResponseEntity.body()).thenReturn(responseBody);
+
+ final Response response = client.replicate(preparedRequest,
REPLICATE_URI_QUERY);
+
+ assertResponseFound(response);
+
+ verify(httpRequestUriSpec).uri(uriCaptor.capture());
+
+ final URI requestUri = uriCaptor.getValue();
+ assertEquals(URI_QUERY, requestUri.getQuery());
+ }
+
+ private void assertResponseFound(final Response response) {
+ assertNotNull(response);
+ assertEquals(HTTP_OK, response.getStatus());
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/TestJsonEntitySerializer.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/io/TestJsonEntitySerializer.java
similarity index 98%
rename from
nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/TestJsonEntitySerializer.java
rename to
nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/io/TestJsonEntitySerializer.java
index 982735edab..9f138724a6 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/TestJsonEntitySerializer.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/io/TestJsonEntitySerializer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
+package org.apache.nifi.cluster.coordination.http.replication.io;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/util/MockReplicationClient.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/util/MockReplicationClient.java
index fbda29d9de..922e26c951 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/util/MockReplicationClient.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/util/MockReplicationClient.java
@@ -17,7 +17,6 @@
package org.apache.nifi.cluster.coordination.http.replication.util;
-import java.io.IOException;
import java.lang.annotation.Annotation;
import java.net.URI;
import java.util.Collections;
@@ -71,7 +70,7 @@ public class MockReplicationClient implements
HttpReplicationClient {
}
@Override
- public Response replicate(PreparedRequest request, String uri) throws
IOException {
+ public Response replicate(PreparedRequest request, URI uri) {
return new Response() {
@Override
@@ -173,7 +172,7 @@ public class MockReplicationClient implements
HttpReplicationClient {
@Override
public URI getLocation() {
- return URI.create(uri);
+ return uri;
}
@Override
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/FlowAnalysisResultEntityMergerTest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/FlowAnalysisResultEntityMergerTest.java
index a76daf61d7..ff2c9843f0 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/FlowAnalysisResultEntityMergerTest.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/FlowAnalysisResultEntityMergerTest.java
@@ -22,7 +22,6 @@ import org.apache.nifi.web.api.dto.FlowAnalysisRuleDTO;
import org.apache.nifi.web.api.dto.FlowAnalysisRuleViolationDTO;
import org.apache.nifi.web.api.dto.PermissionsDTO;
import org.apache.nifi.web.api.entity.FlowAnalysisResultEntity;
-import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -187,13 +186,11 @@ public class FlowAnalysisResultEntityMergerTest {
));
}
- @NotNull
private static NodeIdentifier nodeIdOf(String nodeId) {
NodeIdentifier nodeIdentifier = new NodeIdentifier(nodeId,
"unimportant", 1, "unimportant", 1, "unimportant", 1, 1, false);
return nodeIdentifier;
}
- @NotNull
private static FlowAnalysisRuleDTO ruleOf(String ruleId) {
FlowAnalysisRuleDTO rule = new FlowAnalysisRuleDTO();
@@ -202,7 +199,6 @@ public class FlowAnalysisResultEntityMergerTest {
return rule;
}
- @NotNull
private static FlowAnalysisRuleViolationDTO ruleViolationOf(
String ruleId,
boolean canRead,
@@ -216,7 +212,6 @@ public class FlowAnalysisResultEntityMergerTest {
return ruleViolation;
}
- @NotNull
private static PermissionsDTO permissionOf(boolean canRead, boolean
canWrite) {
PermissionsDTO subjectPermissionDto = new PermissionsDTO();
@@ -226,7 +221,6 @@ public class FlowAnalysisResultEntityMergerTest {
return subjectPermissionDto;
}
- @NotNull
private static FlowAnalysisResultEntity
resultEntityOf(List<FlowAnalysisRuleDTO> rules,
List<FlowAnalysisRuleViolationDTO> ruleViolations) {
FlowAnalysisResultEntity clientEntity = new FlowAnalysisResultEntity();
@@ -236,7 +230,6 @@ public class FlowAnalysisResultEntityMergerTest {
return clientEntity;
}
- @NotNull
private static Map<NodeIdentifier, FlowAnalysisResultEntity>
resultEntityMapOf(FlowAnalysisResultEntity clientEntity1,
FlowAnalysisResultEntity clientEntity2) {
Map<NodeIdentifier, FlowAnalysisResultEntity> entityMap = new
HashMap<>();
@@ -246,7 +239,6 @@ public class FlowAnalysisResultEntityMergerTest {
return entityMap;
}
- @NotNull
private static <T> List<T> listOf(T... items) {
List<T> itemSet = new ArrayList<>();
for (T item : items) {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/AssetsRestApiClient.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/AssetsRestApiClient.java
index b53a2387a2..b8f2052dd5 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/AssetsRestApiClient.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/AssetsRestApiClient.java
@@ -63,7 +63,7 @@ class AssetsRestApiClient extends NiFiRestApiClient {
final HttpRequestBodySpec requestBodySpec = webClientService.get()
.uri(requestUri)
.header(ACCEPT_HEADER, APPLICATION_JSON)
- .header(X_REQUEST_REPLICATED_HEADER, "true");
+ .header(REQUEST_REPLICATED_HEADER, Boolean.TRUE.toString());
return executeEntityRequest(requestUri, requestBodySpec,
AssetsEntity.class);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/client/NiFiRestApiClient.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/client/NiFiRestApiClient.java
index 18171eef69..7c6fe765b9 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/client/NiFiRestApiClient.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/client/NiFiRestApiClient.java
@@ -42,7 +42,7 @@ public abstract class NiFiRestApiClient {
private static final String HTTPS_SCHEME = "https";
protected static final String ACCEPT_HEADER = "Accept";
- protected static final String X_REQUEST_REPLICATED_HEADER =
"X-Request-Replicated";
+ protected static final String REQUEST_REPLICATED_HEADER =
"request-replicated";
protected static final String APPLICATION_JSON = "application/json";
protected static final String APPLICATION_OCTET_STREAM =
"application/octet-stream";
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarRestApiClient.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarRestApiClient.java
index e82bd68751..d3a05518ba 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarRestApiClient.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarRestApiClient.java
@@ -65,7 +65,7 @@ public class NarRestApiClient extends NiFiRestApiClient {
final HttpRequestBodySpec requestBodySpec = webClientService.get()
.uri(requestUri)
.header(ACCEPT_HEADER, APPLICATION_JSON)
- .header(X_REQUEST_REPLICATED_HEADER, "true");
+ .header(REQUEST_REPLICATED_HEADER, Boolean.TRUE.toString());
return executeEntityRequest(requestUri, requestBodySpec,
NarSummariesEntity.class);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
index cfa7f63ce3..83b8250736 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
@@ -19,6 +19,7 @@ package org.apache.nifi.web;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import
org.apache.nifi.cluster.coordination.http.replication.RequestReplicationHeader;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.manager.NodeResponse;
@@ -91,7 +92,7 @@ public class StandardNiFiContentAccess implements
ContentAccess {
// replicate the request to the cluster coordinator, indicating
the target node
NodeResponse nodeResponse;
try {
-
headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER,
nodeId.getId());
+
headers.put(RequestReplicationHeader.REPLICATION_TARGET_ID.getHeader(),
nodeId.getId());
final NodeIdentifier coordinatorNode =
clusterCoordinator.getElectedActiveCoordinatorNode();
if (coordinatorNode == null) {
throw new NoClusterCoordinatorException();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index cb0cf76d33..759eecc92b 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -32,6 +32,7 @@ import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import
org.apache.nifi.cluster.coordination.http.replication.RequestReplicationHeader;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@@ -87,6 +88,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.core.context.SecurityContextHolder;
import javax.net.ssl.SSLPeerUnverifiedException;
+import java.net.HttpURLConnection;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.security.cert.X509Certificate;
@@ -222,7 +224,7 @@ public abstract class ApplicationResource {
}
protected Optional<String> getIdGenerationSeed() {
- final String idGenerationSeed =
httpServletRequest.getHeader(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER);
+ final String idGenerationSeed =
httpServletRequest.getHeader(RequestReplicationHeader.CLUSTER_ID_GENERATION_SEED.getHeader());
if (StringUtils.isBlank(idGenerationSeed)) {
return Optional.empty();
}
@@ -268,7 +270,7 @@ public abstract class ApplicationResource {
* @return a 202 Accepted (Node Continue) response to be used within the
cluster request handshake
*/
protected ResponseBuilder generateContinueResponse() {
- return Response.status(RequestReplicator.NODE_CONTINUE_STATUS_CODE);
+ return Response.status(HttpURLConnection.HTTP_ACCEPTED);
}
protected URI getAbsolutePath() {
@@ -317,50 +319,15 @@ public abstract class ApplicationResource {
}
}
- // if the scheme is not set by the client, include the details from
this request but don't override
- final String proxyScheme =
getFirstHeaderValue(ProxyHeader.PROXY_SCHEME.getHeader(),
ProxyHeader.FORWARDED_PROTO.getHeader());
- if (proxyScheme == null) {
- result.put(ProxyHeader.PROXY_SCHEME.getHeader(),
httpServletRequest.getScheme());
- }
-
- // if the host is not set by the client, include the details from this
request but don't override
- final String proxyHost =
getFirstHeaderValue(ProxyHeader.PROXY_HOST.getHeader(),
ProxyHeader.FORWARDED_HOST.getHeader());
- if (proxyHost == null) {
- result.put(ProxyHeader.PROXY_HOST.getHeader(),
httpServletRequest.getServerName());
- }
-
- // if the port is not set by the client, include the details from this
request but don't override
- final String proxyPort =
getFirstHeaderValue(ProxyHeader.PROXY_PORT.getHeader(),
ProxyHeader.FORWARDED_PORT.getHeader());
- if (proxyPort == null) {
- result.put(ProxyHeader.PROXY_PORT.getHeader(),
String.valueOf(httpServletRequest.getServerPort()));
- }
+ final URI requestUri =
RequestUriBuilder.fromHttpServletRequest(httpServletRequest).build();
+ // Set Proxy Headers based on resolved URI from supported values
+ result.put(ProxyHeader.PROXY_SCHEME.getHeader(),
requestUri.getScheme());
+ result.put(ProxyHeader.PROXY_HOST.getHeader(), requestUri.getHost());
+ result.put(ProxyHeader.PROXY_PORT.getHeader(),
Integer.toString(requestUri.getPort()));
return result;
}
- /**
- * Returns the value for the first key discovered when inspecting the
current request. Will
- * return null if there are no keys specified or if none of the specified
keys are found.
- *
- * @param keys http header keys
- * @return the value for the first key found
- */
- private String getFirstHeaderValue(final String... keys) {
- if (keys == null) {
- return null;
- }
-
- for (final String key : keys) {
- final String value = httpServletRequest.getHeader(key);
-
- if (value != null) {
- return value;
- }
- }
-
- return null;
- }
-
/**
* Checks whether the request is part of a two-phase commit style request
(either phase 1 or phase 2)
*
@@ -368,7 +335,7 @@ public abstract class ApplicationResource {
* @return <code>true</code> if the request represents a two-phase commit
style request
*/
protected boolean isTwoPhaseRequest(final HttpServletRequest
httpServletRequest) {
- final String transactionId =
httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
+ final String transactionId =
httpServletRequest.getHeader(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
return transactionId != null && isClustered();
}
@@ -383,15 +350,15 @@ public abstract class ApplicationResource {
* first of the two phases.
*/
protected boolean isValidationPhase(final HttpServletRequest
httpServletRequest) {
- return isTwoPhaseRequest(httpServletRequest) &&
httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER)
!= null;
+ return isTwoPhaseRequest(httpServletRequest) &&
httpServletRequest.getHeader(RequestReplicationHeader.VALIDATION_EXPECTS.getHeader())
!= null;
}
protected boolean isExecutionPhase(final HttpServletRequest
httpServletRequest) {
- return isTwoPhaseRequest(httpServletRequest) &&
httpServletRequest.getHeader(RequestReplicator.REQUEST_EXECUTION_HTTP_HEADER)
!= null;
+ return isTwoPhaseRequest(httpServletRequest) &&
httpServletRequest.getHeader(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader())
!= null;
}
protected boolean isCancellationPhase(final HttpServletRequest
httpServletRequest) {
- return isTwoPhaseRequest(httpServletRequest) &&
httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER)
!= null;
+ return isTwoPhaseRequest(httpServletRequest) &&
httpServletRequest.getHeader(RequestReplicationHeader.CANCEL_TRANSACTION.getHeader())
!= null;
}
/**
@@ -412,9 +379,9 @@ public abstract class ApplicationResource {
return false;
}
- // Check if the X-Request-Replicated header is set. If so, the request
has already been replicated,
+ // Check if the replicated header is set. If so, the request has
already been replicated,
// so we need to service the request locally. If not, then replicate
the request to the entire cluster.
- final String header =
httpServletRequest.getHeader(RequestReplicator.REPLICATION_INDICATOR_HEADER);
+ final String header =
httpServletRequest.getHeader(RequestReplicationHeader.REQUEST_REPLICATED.getHeader());
return header == null;
}
@@ -719,7 +686,7 @@ public abstract class ApplicationResource {
}
// get the transaction id
- final String transactionId =
httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
+ final String transactionId =
httpServletRequest.getHeader(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
if (StringUtils.isBlank(transactionId)) {
throw new IllegalArgumentException("Two phase commit Transaction
Id missing.");
}
@@ -739,7 +706,7 @@ public abstract class ApplicationResource {
private <T extends Entity> Request<T> phaseTwoVerifyTransaction() {
// get the transaction id
- final String transactionId =
httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
+ final String transactionId =
httpServletRequest.getHeader(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
if (StringUtils.isBlank(transactionId)) {
throw new IllegalArgumentException("Two phase commit Transaction
Id missing.");
}
@@ -775,7 +742,7 @@ public abstract class ApplicationResource {
private void cancelTransaction() {
// get the transaction id
- final String transactionId =
httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
+ final String transactionId =
httpServletRequest.getHeader(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
if (StringUtils.isBlank(transactionId)) {
throw new IllegalArgumentException("Two phase commit Transaction
Id missing.");
}
@@ -891,7 +858,7 @@ public abstract class ApplicationResource {
final Set<NodeIdentifier> targetNodes =
Collections.singleton(nodeId);
return requestReplicator.replicate(targetNodes, method, path,
entity, headers, true, true).awaitMergedResponse().getResponse();
} else {
-
headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER,
nodeId.getId());
+
headers.put(RequestReplicationHeader.REPLICATION_TARGET_ID.getHeader(),
nodeId.getId());
return
requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method,
path, entity, headers).awaitMergedResponse().getResponse();
}
} catch (final InterruptedException ie) {
@@ -936,7 +903,7 @@ public abstract class ApplicationResource {
final Set<NodeIdentifier> nodeIds =
Collections.singleton(targetNode);
return getRequestReplicator().replicate(nodeIds, method,
getAbsolutePath(), entity, getHeaders(), true,
true).awaitMergedResponse().getResponse();
} else {
- final Map<String, String> headers =
getHeaders(Collections.singletonMap(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER,
targetNode.getId()));
+ final Map<String, String> headers =
getHeaders(Collections.singletonMap(RequestReplicationHeader.REPLICATION_TARGET_ID.getHeader(),
targetNode.getId()));
return
requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method,
getAbsolutePath(), entity, headers).awaitMergedResponse().getResponse();
}
} catch (final InterruptedException ie) {
@@ -1049,7 +1016,7 @@ public abstract class ApplicationResource {
}
} finally {
final long replicateNanos = System.nanoTime() - replicateStart;
- final String transactionId =
headers.get(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
+ final String transactionId =
headers.get(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
final String requestId = transactionId == null ? "Request with no
ID" : transactionId;
logger.debug("Took a total of {} millis to {} for {}",
TimeUnit.NANOSECONDS.toMillis(replicateNanos), action, requestId);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java
index ff72e78535..ffd3b72d28 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java
@@ -45,7 +45,7 @@ import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
-import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
+import
org.apache.nifi.cluster.coordination.http.replication.RequestReplicationHeader;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.web.DownloadableContent;
@@ -451,7 +451,7 @@ public class ProvenanceEventResource extends
ApplicationResource {
}
// handle expects request (usually from the cluster manager)
- final String expects =
httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
+ final String expects =
httpServletRequest.getHeader(RequestReplicationHeader.VALIDATION_EXPECTS.getHeader());
if (expects != null) {
return generateContinueResponse().build();
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/csrf/SkipReplicatedCsrfFilter.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/csrf/SkipReplicatedCsrfFilter.java
index 2f22cd5412..e617e1c869 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/csrf/SkipReplicatedCsrfFilter.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/csrf/SkipReplicatedCsrfFilter.java
@@ -33,8 +33,8 @@ import java.io.IOException;
* Skip Replicated Cross-Site Request Forgery Filter disables subsequent
filtering for matched requests
*/
public class SkipReplicatedCsrfFilter extends OncePerRequestFilter {
- /** RequestReplicator.REQUEST_TRANSACTION_ID_HEADER applied to replicated
cluster requests */
- protected static final String REPLICATED_REQUEST_HEADER =
"X-RequestTransactionId";
+ /** Replication HTTP Header applied to replicated cluster requests */
+ protected static final String REPLICATED_REQUEST_HEADER =
"request-transaction-id";
/** Requests containing replicated header and not containing authorization
cookies will be skipped */
private static final RequestMatcher REQUEST_MATCHER = new
AndRequestMatcher(
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index bb9b88ccfb..8bd2d5063f 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -77,7 +77,7 @@ public abstract class NiFiSystemIT implements
NiFiInstanceProvider {
// Group ID | Source
Name | Dest Name | Conn Name | Queue Size |
private static final String QUEUE_SIZES_FORMAT = "| %1$-36.36s |
%2$-30.30s | %3$-30.30s | %4$-30.30s | %5$-30.30s |";
- public static final RequestConfig DO_NOT_REPLICATE = () ->
Collections.singletonMap("X-Request-Replicated", "value");
+ public static final RequestConfig DO_NOT_REPLICATE = () ->
Collections.singletonMap("request-replicated", Boolean.TRUE.toString());
public static final int CLUSTERED_CLIENT_API_BASE_PORT = 5671;
public static final int STANDALONE_CLIENT_API_BASE_PORT = 5670;