This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 21b5fb4e7e7 NIFI-15784 Forward request headers during upload
replication (#11094)
21b5fb4e7e7 is described below
commit 21b5fb4e7e7c9311f0d5eb4d0d44fa245e55a6e5
Author: Kevin Doran <[email protected]>
AuthorDate: Thu Apr 2 14:24:46 2026 -0400
NIFI-15784 Forward request headers during upload replication (#11094)
UploadRequest replication (used for asset and NAR uploads) did not forward
inbound servlet headers to peer nodes.
Only three explicit headers (filename, content-type,
cluster-id-generation-seed) were included.
This caused ConnectorRequestContext to not have the full original request
context.
Standard cluster replication (ThreadPoolRequestReplicator) already forwards
all inbound headers via ApplicationResource.getHeaders(),
with appropriate stripping of credentials, hop-by-hop headers, and
replication protocol headers.
Upload replication now use the same approach for consistency.
- Extract shared header-preparation logic into ReplicationHeaderUtils so
credential stripping, proxied-entity injection, and replication header
sanitisation cannot drift between the two replication paths
- Add UploadRequest.forwardRequestHeaders(Map) so callers can opt in to
forwarding inbound servlet headers
- Rebuild outbound headers in StandardUploadRequestReplicator using a
secure pipeline: strip replication headers, strip hop-by-hop, apply explicit
builder headers, apply proxy/credential policy, force-set replication flags
- Wire all current upload call sites (ConnectorResource.createAsset,
ParameterContextResource asset upload, ControllerResource.uploadNar) to forward
inbound headers
- Use File.length() instead of InputStream.available() for reliable
content-length in upload replication
Signed-off-by: David Handermann <[email protected]>
---
.../http/replication/ReplicationHeaderUtils.java | 154 +++++++++++
.../StandardUploadRequestReplicator.java | 50 ++--
.../replication/ThreadPoolRequestReplicator.java | 78 +-----
.../http/replication/UploadRequest.java | 26 ++
.../replication/TestReplicationHeaderUtils.java | 162 +++++++++++
...TestStandardUploadRequestReplicatorHeaders.java | 306 +++++++++++++++++++++
.../org/apache/nifi/web/api/ConnectorResource.java | 1 +
.../apache/nifi/web/api/ControllerResource.java | 1 +
.../nifi/web/api/ParameterContextResource.java | 1 +
9 files changed, 686 insertions(+), 93 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ReplicationHeaderUtils.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ReplicationHeaderUtils.java
new file mode 100644
index 00000000000..a5c4de9201c
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ReplicationHeaderUtils.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http.replication;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.web.security.ProxiedEntitiesUtils;
+import org.apache.nifi.web.security.http.SecurityCookieName;
+import org.apache.nifi.web.security.http.SecurityHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Shared header-preparation logic for cluster request replication. Both
{@link ThreadPoolRequestReplicator}
+ * and {@link StandardUploadRequestReplicator} use these utilities so that
credential stripping, proxied-entity
+ * injection, and replication-header sanitisation follow the same policy.
+ */
+public final class ReplicationHeaderUtils {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ReplicationHeaderUtils.class);
+
+ private static final String COOKIE_HEADER = "Cookie";
+ private static final String HOST_HEADER = "Host";
+
+ /**
+ * Headers that must not be forwarded because they are hop-by-hop,
describe the original
+ * transport framing, or negotiate content/transfer encodings that the
upload replication
+ * path cannot handle transparently.
+ *
+ * <p>This is intentionally a superset of
+ * {@link
org.apache.nifi.cluster.coordination.http.replication.client.StandardHttpReplicationClient#DISALLOWED_HEADERS
+ * StandardHttpReplicationClient.DISALLOWED_HEADERS}, which omits
encoding-negotiation
+ * headers because {@code StandardHttpReplicationClient} manages its own
gzip pipeline.
+ * {@code StandardUploadRequestReplicator} reads responses raw, so
encoding headers must
+ * also be stripped here.
+ */
+ static final Set<String> HOP_BY_HOP_HEADERS = Set.of(
+ "accept-encoding", "connection", "content-encoding",
"content-length",
+ "expect", "host", "te", "transfer-encoding", "upgrade"
+ );
+
+ private ReplicationHeaderUtils() {
+ }
+
+ /**
+ * Prepares headers for a replicated request. When a non-null user is
provided, the
+ * {@code X-ProxiedEntitiesChain} and {@code X-ProxiedEntityGroups}
headers are set so the
+ * receiving node knows the request is on behalf of that user. When user
is {@code null},
+ * these headers are omitted, indicating the request is made directly by
the cluster node
+ * itself (e.g. for background connector state polling).
+ *
+ * <p>In all cases the {@code Authorization} header, auth-related cookies,
and the
+ * {@code Host} header are removed because the replicated call uses mTLS
for transport
+ * authentication and the proxied-entity chain for user identity.
+ *
+ * @param headers mutable map of HTTP headers to update
+ * @param user the user on whose behalf the request is being made, or
{@code null} for direct node requests
+ */
+ public static void applyUserProxyAndStripCredentials(final Map<String,
String> headers, final NiFiUser user) {
+ if (user == null) {
+ logger.debug("No user provided: omitting proxied entities header
from request");
+ } else {
+ logger.debug("NiFi User provided: adding proxied entities header
to request");
+ headers.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN,
+
ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user));
+
+ headers.put(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS,
+
ProxiedEntitiesUtils.buildProxiedEntityGroupsString(user.getIdentityProviderGroups()));
+ }
+
+ removeHeader(headers, SecurityHeader.AUTHORIZATION.getHeader());
+ removeCookie(headers,
SecurityCookieName.AUTHORIZATION_BEARER.getName());
+ removeCookie(headers, SecurityCookieName.REQUEST_TOKEN.getName());
+
+ removeHeader(headers, HOST_HEADER);
+ }
+
+ /**
+ * Removes all {@link RequestReplicationHeader} names from the map
(case-insensitive) so
+ * that inbound client requests cannot spoof replication protocol headers.
+ */
+ public static void stripRequestReplicationHeaders(final Map<String,
String> headers) {
+ for (final RequestReplicationHeader rh :
RequestReplicationHeader.values()) {
+ removeHeader(headers, rh.getHeader());
+ }
+ }
+
+ /**
+ * Removes hop-by-hop / transport-framing headers that should not be
forwarded in a
+ * replicated request (case-insensitive).
+ */
+ public static void stripHopByHopHeaders(final Map<String, String> headers)
{
+ for (final String name : HOP_BY_HOP_HEADERS) {
+ removeHeader(headers, name);
+ }
+ }
+
+ static void removeHeader(final Map<String, String> headers, final String
headerNameSearch) {
+ findHeaderName(headers, headerNameSearch).ifPresent(headers::remove);
+ }
+
+ static void removeCookie(final Map<String, String> headers, final String
cookieName) {
+ final Optional<String> cookieHeaderNameFound = findHeaderName(headers,
COOKIE_HEADER);
+
+ if (cookieHeaderNameFound.isPresent()) {
+ final String cookieHeaderName = cookieHeaderNameFound.get();
+ final String rawCookies = headers.get(cookieHeaderName);
+ final String[] rawCookieParts = rawCookies.split(";");
+ final Set<String> filteredCookieParts = Stream.of(rawCookieParts)
+ .map(String::trim)
+ .filter(cookie -> !cookie.startsWith(cookieName + "="))
+ .collect(Collectors.toSet());
+
+ if (filteredCookieParts.isEmpty()) {
+ headers.remove(cookieHeaderName);
+ } else {
+ headers.put(cookieHeaderName,
StringUtils.join(filteredCookieParts, "; "));
+ }
+ }
+ }
+
+ /**
+ * Find an HTTP header name in a map regardless of case, since HTTP/1.1
capitalises headers
+ * but HTTP/2 returns lowercased headers.
+ */
+ static Optional<String> findHeaderName(final Map<String, String> headers,
final String headerName) {
+ if (headerName == null || headerName.isBlank()) {
+ return Optional.empty();
+ }
+ return headers.keySet().stream()
+ .filter(headerName::equalsIgnoreCase)
+ .findFirst();
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java
index 73b93acd9ba..9fbbd1c52e0 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java
@@ -19,7 +19,6 @@ package org.apache.nifi.cluster.coordination.http.replication;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
-import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.util.NiFiProperties;
@@ -28,7 +27,6 @@ import org.apache.nifi.web.client.StandardHttpUriBuilder;
import org.apache.nifi.web.client.api.HttpRequestBodySpec;
import org.apache.nifi.web.client.api.HttpResponseEntity;
import org.apache.nifi.web.client.api.WebClientService;
-import org.apache.nifi.web.security.ProxiedEntitiesUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -145,27 +143,20 @@ public class StandardUploadRequestReplicator implements
UploadRequestReplicator
.encodedPath(exampleRequestUri.getPath())
.build();
- final NiFiUser user = uploadRequest.getUser();
final String filename = uploadRequest.getFilename();
+ final Map<String, String> outboundHeaders =
buildOutboundHeaders(uploadRequest);
+
try (final InputStream inputStream = new FileInputStream(contents)) {
- final HttpRequestBodySpec request = webClientService.post()
- .uri(requestUri)
- .body(inputStream,
OptionalLong.of(inputStream.available()))
- // Special NiFi-specific headers to indicate that the
request should be performed and not replicated to the nodes
-
.header(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader(),
Boolean.TRUE.toString())
-
.header(RequestReplicationHeader.REQUEST_REPLICATED.getHeader(),
Boolean.TRUE.toString())
- .header(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN,
ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user))
- .header(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS,
ProxiedEntitiesUtils.buildProxiedEntityGroupsString(user.getIdentityProviderGroups()));
-
- final Map<String, String> additionalHeaders =
uploadRequest.getHeaders();
- for (Map.Entry<String, String> headerEntry :
additionalHeaders.entrySet()) {
- request.header(headerEntry.getKey(), headerEntry.getValue());
+ HttpRequestBodySpec request =
webClientService.post().uri(requestUri);
+
+ for (final Map.Entry<String, String> entry :
outboundHeaders.entrySet()) {
+ request = request.header(entry.getKey(), entry.getValue());
}
logger.debug("Replicating upload request for {} to {}", filename,
nodeId);
- try (final HttpResponseEntity response = request.retrieve()) {
+ try (final HttpResponseEntity response = request.body(inputStream,
OptionalLong.of(contents.length())).retrieve()) {
final int statusCode = response.statusCode();
if (uploadRequest.getSuccessfulResponseStatus() != statusCode)
{
final String responseMessage =
IOUtils.toString(response.body(), StandardCharsets.UTF_8);
@@ -175,6 +166,33 @@ public class StandardUploadRequestReplicator implements
UploadRequestReplicator
return objectMapper.readValue(responseBody,
uploadRequest.getResponseClass());
}
}
+ }
+
+ /**
+ * Builds the complete set of outbound headers for a replicated upload
request, following the
+ * same trust model as {@link ThreadPoolRequestReplicator}:
+ * <ol>
+ * <li>Start with any forwarded inbound servlet headers.</li>
+ * <li>Strip all {@link RequestReplicationHeader} names (prevent
spoofing).</li>
+ * <li>Strip hop-by-hop / transport-framing headers.</li>
+ * <li>Apply explicit builder headers (filename, content-type, seed) so
upload metadata wins.</li>
+ * <li>Apply user proxy headers and strip credentials (Authorization,
auth cookies, Host).</li>
+ * <li>Force-set {@code request-replicated} and {@code
execution-continue}.</li>
+ * </ol>
+ */
+ <T> Map<String, String> buildOutboundHeaders(final UploadRequest<T>
uploadRequest) {
+ final Map<String, String> headers = new
HashMap<>(uploadRequest.getForwardedRequestHeaders());
+
+ ReplicationHeaderUtils.stripRequestReplicationHeaders(headers);
+ ReplicationHeaderUtils.stripHopByHopHeaders(headers);
+
+ headers.putAll(uploadRequest.getHeaders());
+
+ ReplicationHeaderUtils.applyUserProxyAndStripCredentials(headers,
uploadRequest.getUser());
+
+ headers.put(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader(),
Boolean.TRUE.toString());
+ headers.put(RequestReplicationHeader.REQUEST_REPLICATED.getHeader(),
Boolean.TRUE.toString());
+ return headers;
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index 6b946e2492b..fe79e356b72 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -20,7 +20,6 @@ package org.apache.nifi.cluster.coordination.http.replication;
import jakarta.ws.rs.HttpMethod;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;
-import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
@@ -53,9 +52,6 @@ import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.ComponentIdGenerator;
import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.security.ProxiedEntitiesUtils;
-import org.apache.nifi.web.security.http.SecurityCookieName;
-import org.apache.nifi.web.security.http.SecurityHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,7 +68,6 @@ import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -90,15 +85,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
public class ThreadPoolRequestReplicator implements RequestReplicator,
Closeable {
private static final Logger logger =
LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
private static final Pattern SNIPPET_URI_PATTERN =
Pattern.compile("/nifi-api/snippets/[a-f0-9\\-]{36}");
- private static final String COOKIE_HEADER = "Cookie";
- private static final String HOST_HEADER = "Host";
private static final String NODE_CONTINUE = "202-Accepted";
private final int maxConcurrentRequests; // maximum number of concurrent
requests
@@ -248,30 +240,7 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator, Closeable
* @param user the user on whose behalf the request is being made, or
{@code null} for direct node requests
*/
void updateRequestHeaders(final Map<String, String> headers, final
NiFiUser user) {
- if (user == null) {
- // Background tasks where the node itself is the identity making
the request, not a proxied user
- logger.debug("No user provided: omitting proxied entities header
from request");
- } else {
- logger.debug("NiFi User provided: adding proxied entities header
to request");
- // Add the user as a proxied entity so that when the receiving
NiFi receives the request,
- // it knows that we are acting as a proxy on behalf of the current
user.
- final String proxiedEntitiesChain =
ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user);
- headers.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN,
proxiedEntitiesChain);
-
- // Add the header containing the group information for the end
user in the proxied entity chain, these groups would
- // only be populated if the end user authenticated against an
external identity provider like SAML or OIDC
- final String proxiedEntityGroups =
ProxiedEntitiesUtils.buildProxiedEntityGroupsString(user.getIdentityProviderGroups());
- headers.put(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS,
proxiedEntityGroups);
- }
-
- // remove the access token if present, since the user is already
authenticated... authorization
- // will happen when the request is replicated using the proxy chain
above
- removeHeader(headers, SecurityHeader.AUTHORIZATION.getHeader());
- removeCookie(headers,
SecurityCookieName.AUTHORIZATION_BEARER.getName());
- removeCookie(headers, SecurityCookieName.REQUEST_TOKEN.getName());
-
- // remove the host header
- removeHeader(headers, HOST_HEADER);
+ ReplicationHeaderUtils.applyUserProxyAndStripCredentials(headers,
user);
}
@Override
@@ -890,49 +859,4 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator, Closeable
return responseMap.size();
}
- private void removeCookie(final Map<String, String> headers, final String
cookieName) {
- final Optional<String> cookieHeaderNameFound = findHeaderName(headers,
COOKIE_HEADER);
-
- if (cookieHeaderNameFound.isPresent()) {
- final String cookieHeaderName = cookieHeaderNameFound.get();
-
- final String rawCookies = headers.get(cookieHeaderName);
- final String[] rawCookieParts = rawCookies.split(";");
- final Set<String> filteredCookieParts =
Stream.of(rawCookieParts).map(String::trim).filter(cookie ->
!cookie.startsWith(cookieName + "=")).collect(Collectors.toSet());
-
- if (filteredCookieParts.isEmpty()) {
- headers.remove(cookieHeaderName);
- } else {
- final String filteredCookies =
StringUtils.join(filteredCookieParts, "; ");
- headers.put(cookieHeaderName, filteredCookies);
- }
- }
- }
-
- private void removeHeader(final Map<String, String> headers, final String
headerNameSearch) {
- final Optional<String> headerNameFound = findHeaderName(headers,
headerNameSearch);
- headerNameFound.ifPresent(headers::remove);
- }
-
- /**
- * Find HTTP Header name in map regardless of case since HTTP/1.1
capitalizes headers but HTTP/2 returns lowercased headers
- *
- * @param headers Map of header name to value
- * @param headerName Header name to be found
- * @return Optional match with header name from map of headers
- */
- private Optional<String> findHeaderName(final Map<String, String> headers,
final String headerName) {
- final Optional<String> headerNameFound;
-
- if (headerName == null || headerName.isBlank()) {
- headerNameFound = Optional.empty();
- } else {
- headerNameFound = headers.keySet()
- .stream()
- .filter(headerName::equalsIgnoreCase)
- .findFirst();
- }
-
- return headerNameFound;
- }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/UploadRequest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/UploadRequest.java
index 00d941f9f53..cf01b2d879b 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/UploadRequest.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/UploadRequest.java
@@ -21,6 +21,7 @@ import org.apache.nifi.authorization.user.NiFiUser;
import java.io.InputStream;
import java.net.URI;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -37,6 +38,7 @@ public class UploadRequest<T> {
private final String identifier;
private final InputStream contents;
private final Map<String, String> headers;
+ private final Map<String, String> forwardedRequestHeaders;
private final URI exampleRequestUri;
private final Class<T> responseClass;
private final int successfulResponseStatus;
@@ -47,6 +49,8 @@ public class UploadRequest<T> {
this.identifier = Objects.requireNonNull(builder.identifier);
this.contents = Objects.requireNonNull(builder.contents);
this.headers = Map.copyOf(builder.headers);
+ this.forwardedRequestHeaders = builder.forwardedRequestHeaders == null
+ ? Collections.emptyMap() :
Map.copyOf(builder.forwardedRequestHeaders);
this.exampleRequestUri =
Objects.requireNonNull(builder.exampleRequestUri);
this.responseClass = Objects.requireNonNull(builder.responseClass);
this.successfulResponseStatus = builder.successfulResponseStatus;
@@ -75,6 +79,15 @@ public class UploadRequest<T> {
return headers;
}
+ /**
+ * Returns the inbound servlet request headers that should be forwarded
during replication.
+ * These are sanitised by the replicator before being sent to target
nodes. Empty if the
+ * caller did not supply forwarded headers.
+ */
+ public Map<String, String> getForwardedRequestHeaders() {
+ return forwardedRequestHeaders;
+ }
+
public URI getExampleRequestUri() {
return exampleRequestUri;
}
@@ -96,6 +109,7 @@ public class UploadRequest<T> {
private Class<T> responseClass;
private int successfulResponseStatus;
private final Map<String, String> headers = new HashMap<>();
+ private Map<String, String> forwardedRequestHeaders;
public Builder<T> user(NiFiUser user) {
this.user = user;
@@ -132,6 +146,18 @@ public class UploadRequest<T> {
return this;
}
+ /**
+ * Sets the inbound servlet request headers that should be forwarded
during replication.
+ * The replicator sanitises these (strips credentials, replication
protocol headers,
+ * hop-by-hop headers) before sending to target nodes.
+ *
+ * @param forwardedRequestHeaders the original inbound headers, or
{@code null} for none
+ */
+ public Builder<T> forwardRequestHeaders(Map<String, String>
forwardedRequestHeaders) {
+ this.forwardedRequestHeaders = forwardedRequestHeaders == null ?
null : new HashMap<>(forwardedRequestHeaders);
+ return this;
+ }
+
public Builder<T> header(String name, String value) {
this.headers.put(name, value);
return this;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestReplicationHeaderUtils.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestReplicationHeaderUtils.java
new file mode 100644
index 00000000000..09a24745ca4
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestReplicationHeaderUtils.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http.replication;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.user.StandardNiFiUser;
+import org.apache.nifi.web.security.ProxiedEntitiesUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestReplicationHeaderUtils {
+
+ private static final String TEST_USER_IDENTITY = "alice";
+
+ private static final String AUTHORIZATION_HEADER = "Authorization";
+ private static final String AUTHORIZATION_VALUE = "Bearer secret";
+
+ private static final String CUSTOM_HEADER = "X-Custom-Token";
+ private static final String CUSTOM_HEADER_VALUE = "custom-token-123";
+
+ private static final String COOKIE_HEADER = "Cookie";
+ private static final String HOST_HEADER = "Host";
+ private static final String HOST_VALUE = "original-host:8080";
+
+ private static final String ACCEPT_ENCODING_HEADER = "Accept-Encoding";
+ private static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
+ private static final String CONTENT_LENGTH_HEADER = "Content-Length";
+ private static final String TE_HEADER = "TE";
+ private static final String TRANSFER_ENCODING_HEADER = "Transfer-Encoding";
+ private static final String CONNECTION_HEADER = "Connection";
+
+ private static final String SPOOFED_VALUE = "spoofed";
+ private static final String SHOULD_SURVIVE_VALUE = "should-survive";
+
+ @Test
+ void testApplyUserProxyAndStripCredentialsSetsProxiedEntities() {
+ final NiFiUser user = new
StandardNiFiUser.Builder().identity(TEST_USER_IDENTITY).build();
+ final Map<String, String> headers = new HashMap<>();
+ headers.put(AUTHORIZATION_HEADER, AUTHORIZATION_VALUE);
+ headers.put(CUSTOM_HEADER, CUSTOM_HEADER_VALUE);
+
+ ReplicationHeaderUtils.applyUserProxyAndStripCredentials(headers,
user);
+
+ assertNotNull(headers.get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN));
+
assertTrue(headers.get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN).contains(TEST_USER_IDENTITY));
+ assertNotNull(headers.get(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS));
+ assertNull(headers.get(AUTHORIZATION_HEADER));
+ assertEquals(CUSTOM_HEADER_VALUE, headers.get(CUSTOM_HEADER));
+ }
+
+ @Test
+ void testApplyUserProxyWithNullUserOmitsProxiedEntities() {
+ final Map<String, String> headers = new HashMap<>();
+ headers.put(AUTHORIZATION_HEADER, AUTHORIZATION_VALUE);
+
+ ReplicationHeaderUtils.applyUserProxyAndStripCredentials(headers,
null);
+
+ assertNull(headers.get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN));
+ assertNull(headers.get(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS));
+ assertNull(headers.get(AUTHORIZATION_HEADER));
+ }
+
+ @Test
+ void testStripRequestReplicationHeadersRemovesAllProtocolHeaders() {
+ final Map<String, String> headers = new HashMap<>();
+ for (final RequestReplicationHeader rh :
RequestReplicationHeader.values()) {
+ headers.put(rh.getHeader(), SPOOFED_VALUE);
+ }
+ headers.put(CUSTOM_HEADER, SHOULD_SURVIVE_VALUE);
+
+ ReplicationHeaderUtils.stripRequestReplicationHeaders(headers);
+
+ for (final RequestReplicationHeader rh :
RequestReplicationHeader.values()) {
+ assertNull(headers.get(rh.getHeader()), "Replication header should
have been stripped: " + rh.getHeader());
+ }
+ assertEquals(SHOULD_SURVIVE_VALUE, headers.get(CUSTOM_HEADER));
+ }
+
+ @Test
+ void testStripRequestReplicationHeadersCaseInsensitive() {
+ final Map<String, String> headers = new HashMap<>();
+ headers.put("Request-Replicated", "true");
+ headers.put("EXECUTION-CONTINUE", "true");
+
+ ReplicationHeaderUtils.stripRequestReplicationHeaders(headers);
+
+ assertFalse(headers.containsKey("Request-Replicated"));
+ assertFalse(headers.containsKey("EXECUTION-CONTINUE"));
+ }
+
+ @Test
+ void testStripHopByHopHeaders() {
+ final Map<String, String> headers = new HashMap<>();
+ headers.put(ACCEPT_ENCODING_HEADER, "gzip, deflate");
+ headers.put(CONTENT_ENCODING_HEADER, "gzip");
+ headers.put(CONTENT_LENGTH_HEADER, "12345");
+ headers.put(HOST_HEADER, HOST_VALUE);
+ headers.put(TE_HEADER, "trailers, deflate");
+ headers.put(TRANSFER_ENCODING_HEADER, "chunked");
+ headers.put(CONNECTION_HEADER, "keep-alive");
+ headers.put(CUSTOM_HEADER, SHOULD_SURVIVE_VALUE);
+
+ ReplicationHeaderUtils.stripHopByHopHeaders(headers);
+
+ assertNull(headers.get(ACCEPT_ENCODING_HEADER));
+ assertNull(headers.get(CONTENT_ENCODING_HEADER));
+ assertNull(headers.get(CONTENT_LENGTH_HEADER));
+ assertNull(headers.get(HOST_HEADER));
+ assertNull(headers.get(TE_HEADER));
+ assertNull(headers.get(TRANSFER_ENCODING_HEADER));
+ assertNull(headers.get(CONNECTION_HEADER));
+ assertEquals(SHOULD_SURVIVE_VALUE, headers.get(CUSTOM_HEADER));
+ }
+
+ @Test
+ void testStripAuthCookies() {
+ final Map<String, String> headers = new HashMap<>();
+ headers.put(COOKIE_HEADER, "__Secure-Authorization-Bearer=token123;
__Secure-Request-Token=rt456; other=value");
+
+ ReplicationHeaderUtils.applyUserProxyAndStripCredentials(headers,
null);
+
+ final String remaining = headers.get(COOKIE_HEADER);
+ assertNotNull(remaining);
+ assertFalse(remaining.contains("__Secure-Authorization-Bearer"));
+ assertFalse(remaining.contains("__Secure-Request-Token"));
+ assertTrue(remaining.contains("other=value"));
+ }
+
+ @Test
+ void testRemoveHostHeader() {
+ final Map<String, String> headers = new HashMap<>();
+ headers.put(HOST_HEADER, HOST_VALUE);
+ headers.put(CUSTOM_HEADER, SHOULD_SURVIVE_VALUE);
+
+ ReplicationHeaderUtils.applyUserProxyAndStripCredentials(headers,
null);
+
+ assertNull(headers.get(HOST_HEADER));
+ assertEquals(SHOULD_SURVIVE_VALUE, headers.get(CUSTOM_HEADER));
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestStandardUploadRequestReplicatorHeaders.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestStandardUploadRequestReplicatorHeaders.java
new file mode 100644
index 00000000000..ecc8a7fc116
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestStandardUploadRequestReplicatorHeaders.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http.replication;
+
+import org.apache.nifi.authorization.user.StandardNiFiUser;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.client.api.WebClientService;
+import org.apache.nifi.web.security.ProxiedEntitiesUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+class TestStandardUploadRequestReplicatorHeaders {
+
+ private static final String TEST_USER_IDENTITY = "test-user";
+ private static final String REAL_USER_IDENTITY = "real-user";
+ private static final String EVIL_USER_IDENTITY = "evil-user";
+
+ private static final String TEST_FILENAME = "test.txt";
+ private static final String TEST_IDENTIFIER = "id-1";
+ private static final URI TEST_REQUEST_URI =
URI.create("https://localhost:8443/nifi-api/test");
+ private static final int SUCCESS_STATUS = 200;
+
+ private static final String FILENAME_HEADER = "Filename";
+ private static final String CONTENT_TYPE_HEADER = "Content-Type";
+ private static final String CONTENT_TYPE_VALUE =
"application/octet-stream";
+
+ private static final String AUTHORIZATION_HEADER = "Authorization";
+ private static final String AUTHORIZATION_VALUE = "Bearer secret-token";
+
+ private static final String CUSTOM_TOKEN_HEADER = "X-Custom-Token";
+ private static final String CUSTOM_TOKEN_VALUE = "custom-token-abc";
+ private static final String CUSTOM_TOKEN_VALUE_ALT = "keep-this";
+ private static final String CUSTOM_TOKEN_VALUE_SURVIVE = "keep-me";
+
+ private static final String CUSTOM_HEADER = "X-Custom-Header";
+ private static final String CUSTOM_HEADER_VALUE = "custom-value";
+
+ private static final String ACCEPT_ENCODING_HEADER = "Accept-Encoding";
+ private static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
+ private static final String CONTENT_LENGTH_HEADER = "Content-Length";
+ private static final String HOST_HEADER = "Host";
+ private static final String TE_HEADER = "TE";
+ private static final String TRANSFER_ENCODING_HEADER = "Transfer-Encoding";
+ private static final String CONNECTION_HEADER = "Connection";
+
+ private static final String COOKIE_HEADER = "Cookie";
+ private static final String COOKIE_VALUE_WITH_AUTH =
"__Secure-Authorization-Bearer=token123; __Secure-Request-Token=rt456;
custom=value";
+ private static final String COOKIE_CUSTOM_SEGMENT = "custom=value";
+
+ private static final String SPOOFED_VALUE = "spoofed";
+ private static final String SPOOFED_TX_VALUE = "spoofed-tx";
+
+ private StandardUploadRequestReplicator replicator;
+
+ @BeforeEach
+ void setUp(@TempDir Path tempDir) throws IOException {
+ final Properties props = new Properties();
+ props.setProperty(NiFiProperties.UPLOAD_WORKING_DIRECTORY,
tempDir.toString());
+ final NiFiProperties nifiProperties =
NiFiProperties.createBasicNiFiProperties((String) null, props);
+ replicator = new StandardUploadRequestReplicator(
+ mock(ClusterCoordinator.class),
+ mock(WebClientService.class),
+ nifiProperties
+ );
+ }
+
+ @Test
+ void testCustomHeaderPreservedFromForwardedHeaders() {
+ final Map<String, String> forwarded = new HashMap<>();
+ forwarded.put(CUSTOM_TOKEN_HEADER, CUSTOM_TOKEN_VALUE);
+ forwarded.put(CUSTOM_HEADER, CUSTOM_HEADER_VALUE);
+
+ final UploadRequest<String> request = buildUploadRequest(forwarded);
+ final Map<String, String> result =
replicator.buildOutboundHeaders(request);
+
+ assertEquals(CUSTOM_TOKEN_VALUE, result.get(CUSTOM_TOKEN_HEADER));
+ assertEquals(CUSTOM_HEADER_VALUE, result.get(CUSTOM_HEADER));
+ }
+
+ @Test
+ void testReplicationHeadersFromForwardedInputAreStripped() {
+ final Map<String, String> forwarded = new HashMap<>();
+ forwarded.put(RequestReplicationHeader.REQUEST_REPLICATED.getHeader(),
SPOOFED_VALUE);
+ forwarded.put(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader(),
SPOOFED_VALUE);
+
forwarded.put(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader(),
SPOOFED_TX_VALUE);
+
+ final UploadRequest<String> request = buildUploadRequest(forwarded);
+ final Map<String, String> result =
replicator.buildOutboundHeaders(request);
+
+ assertEquals(Boolean.TRUE.toString(),
result.get(RequestReplicationHeader.REQUEST_REPLICATED.getHeader()));
+ assertEquals(Boolean.TRUE.toString(),
result.get(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader()));
+
assertNull(result.get(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader()));
+ }
+
+ @Test
+ void testAuthorizationHeaderStripped() {
+ final Map<String, String> forwarded = new HashMap<>();
+ forwarded.put(AUTHORIZATION_HEADER, AUTHORIZATION_VALUE);
+ forwarded.put(CUSTOM_TOKEN_HEADER, CUSTOM_TOKEN_VALUE_ALT);
+
+ final UploadRequest<String> request = buildUploadRequest(forwarded);
+ final Map<String, String> result =
replicator.buildOutboundHeaders(request);
+
+ assertNull(result.get(AUTHORIZATION_HEADER));
+ assertEquals(CUSTOM_TOKEN_VALUE_ALT, result.get(CUSTOM_TOKEN_HEADER));
+ }
+
+ @Test
+ void testHopByHopHeadersStripped() {
+ final Map<String, String> forwarded = new HashMap<>();
+ forwarded.put(ACCEPT_ENCODING_HEADER, "gzip, deflate");
+ forwarded.put(CONTENT_ENCODING_HEADER, "gzip");
+ forwarded.put(CONTENT_LENGTH_HEADER, "99999");
+ forwarded.put(HOST_HEADER, "original-host:443");
+ forwarded.put(TE_HEADER, "trailers, deflate");
+ forwarded.put(TRANSFER_ENCODING_HEADER, "chunked");
+ forwarded.put(CONNECTION_HEADER, "keep-alive");
+
+ final UploadRequest<String> request = buildUploadRequest(forwarded);
+ final Map<String, String> result =
replicator.buildOutboundHeaders(request);
+
+ assertNull(result.get(ACCEPT_ENCODING_HEADER));
+ assertNull(result.get(CONTENT_ENCODING_HEADER));
+ assertNull(result.get(CONTENT_LENGTH_HEADER));
+ assertNull(result.get(TE_HEADER));
+ assertNull(result.get(TRANSFER_ENCODING_HEADER));
+ assertNull(result.get(CONNECTION_HEADER));
+ assertNull(result.get(HOST_HEADER));
+ }
+
+ @Test
+ void testExplicitBuilderHeadersWinOverForwarded() {
+ final Map<String, String> forwarded = new HashMap<>();
+ forwarded.put(FILENAME_HEADER, "forwarded-name.txt");
+
+ final UploadRequest<String> request = new
UploadRequest.Builder<String>()
+ .user(new
StandardNiFiUser.Builder().identity(TEST_USER_IDENTITY).build())
+ .filename(TEST_FILENAME)
+ .identifier(TEST_IDENTIFIER)
+ .contents(new ByteArrayInputStream(new byte[0]))
+ .forwardRequestHeaders(forwarded)
+ .header(FILENAME_HEADER, "explicit-name.txt")
+ .exampleRequestUri(TEST_REQUEST_URI)
+ .responseClass(String.class)
+ .successfulResponseStatus(SUCCESS_STATUS)
+ .build();
+
+ final Map<String, String> result =
replicator.buildOutboundHeaders(request);
+ assertEquals("explicit-name.txt", result.get(FILENAME_HEADER));
+ }
+
+ @Test
+ void testProxiedEntitiesSetFromUser() {
+ final Map<String, String> forwarded = new HashMap<>();
+ forwarded.put(CUSTOM_TOKEN_HEADER, CUSTOM_TOKEN_VALUE);
+
+ final UploadRequest<String> request = buildUploadRequest(forwarded);
+ final Map<String, String> result =
replicator.buildOutboundHeaders(request);
+
+ assertNotNull(result.get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN));
+
assertTrue(result.get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN).contains(TEST_USER_IDENTITY));
+ assertNotNull(result.get(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS));
+ }
+
+ @Test
+ void testReplicationFlagsAlwaysSet() {
+ final UploadRequest<String> request = buildUploadRequest(new
HashMap<>());
+ final Map<String, String> result =
replicator.buildOutboundHeaders(request);
+
+ assertEquals(Boolean.TRUE.toString(),
result.get(RequestReplicationHeader.REQUEST_REPLICATED.getHeader()));
+ assertEquals(Boolean.TRUE.toString(),
result.get(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader()));
+ }
+
+ @Test
+ void testNoForwardedHeadersStillWorks() {
+ final UploadRequest<String> request = new
UploadRequest.Builder<String>()
+ .user(new
StandardNiFiUser.Builder().identity(TEST_USER_IDENTITY).build())
+ .filename(TEST_FILENAME)
+ .identifier(TEST_IDENTIFIER)
+ .contents(new ByteArrayInputStream(new byte[0]))
+ .header(FILENAME_HEADER, TEST_FILENAME)
+ .exampleRequestUri(TEST_REQUEST_URI)
+ .responseClass(String.class)
+ .successfulResponseStatus(SUCCESS_STATUS)
+ .build();
+
+ final Map<String, String> result =
replicator.buildOutboundHeaders(request);
+
+ assertEquals(TEST_FILENAME, result.get(FILENAME_HEADER));
+ assertEquals(Boolean.TRUE.toString(),
result.get(RequestReplicationHeader.REQUEST_REPLICATED.getHeader()));
+ assertNotNull(result.get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN));
+ }
+
+ @Test
+ void testBuilderCannotOverrideProxiedEntities() {
+ final UploadRequest<String> request = new
UploadRequest.Builder<String>()
+ .user(new
StandardNiFiUser.Builder().identity(REAL_USER_IDENTITY).build())
+ .filename(TEST_FILENAME)
+ .identifier(TEST_IDENTIFIER)
+ .contents(new ByteArrayInputStream(new byte[0]))
+ .header(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, "<" +
EVIL_USER_IDENTITY + ">")
+ .exampleRequestUri(TEST_REQUEST_URI)
+ .responseClass(String.class)
+ .successfulResponseStatus(SUCCESS_STATUS)
+ .build();
+
+ final Map<String, String> result =
replicator.buildOutboundHeaders(request);
+
+
assertFalse(result.get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN).contains(EVIL_USER_IDENTITY));
+
assertTrue(result.get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN).contains(REAL_USER_IDENTITY));
+ }
+
+ @Test
+ void testAuthCookiesStrippedFromForwardedHeaders() {
+ final Map<String, String> forwarded = new HashMap<>();
+ forwarded.put(COOKIE_HEADER, COOKIE_VALUE_WITH_AUTH);
+ forwarded.put(CUSTOM_TOKEN_HEADER, CUSTOM_TOKEN_VALUE_SURVIVE);
+
+ final UploadRequest<String> request = buildUploadRequest(forwarded);
+ final Map<String, String> result =
replicator.buildOutboundHeaders(request);
+
+ final String cookies = result.get(COOKIE_HEADER);
+ assertNotNull(cookies);
+ assertFalse(cookies.contains("__Secure-Authorization-Bearer"));
+ assertFalse(cookies.contains("__Secure-Request-Token"));
+ assertTrue(cookies.contains(COOKIE_CUSTOM_SEGMENT));
+ assertEquals(CUSTOM_TOKEN_VALUE_SURVIVE,
result.get(CUSTOM_TOKEN_HEADER));
+ }
+
+ @Test
+ void testForwardRequestHeadersNullProducesSameAsOmitted() {
+ final UploadRequest<String> withNull = new
UploadRequest.Builder<String>()
+ .user(new
StandardNiFiUser.Builder().identity(TEST_USER_IDENTITY).build())
+ .filename(TEST_FILENAME)
+ .identifier(TEST_IDENTIFIER)
+ .contents(new ByteArrayInputStream(new byte[0]))
+ .forwardRequestHeaders(null)
+ .header(FILENAME_HEADER, TEST_FILENAME)
+ .exampleRequestUri(TEST_REQUEST_URI)
+ .responseClass(String.class)
+ .successfulResponseStatus(SUCCESS_STATUS)
+ .build();
+
+ final UploadRequest<String> withoutCall = new
UploadRequest.Builder<String>()
+ .user(new
StandardNiFiUser.Builder().identity(TEST_USER_IDENTITY).build())
+ .filename(TEST_FILENAME)
+ .identifier(TEST_IDENTIFIER)
+ .contents(new ByteArrayInputStream(new byte[0]))
+ .header(FILENAME_HEADER, TEST_FILENAME)
+ .exampleRequestUri(TEST_REQUEST_URI)
+ .responseClass(String.class)
+ .successfulResponseStatus(SUCCESS_STATUS)
+ .build();
+
+ final Map<String, String> resultNull =
replicator.buildOutboundHeaders(withNull);
+ final Map<String, String> resultOmitted =
replicator.buildOutboundHeaders(withoutCall);
+
+ assertEquals(resultOmitted, resultNull);
+ }
+
+ private UploadRequest<String> buildUploadRequest(final Map<String, String>
forwardedHeaders) {
+ return new UploadRequest.Builder<String>()
+ .user(new
StandardNiFiUser.Builder().identity(TEST_USER_IDENTITY).build())
+ .filename(TEST_FILENAME)
+ .identifier(TEST_IDENTIFIER)
+ .contents(new ByteArrayInputStream(new byte[0]))
+ .forwardRequestHeaders(forwardedHeaders)
+ .header(FILENAME_HEADER, TEST_FILENAME)
+ .header(CONTENT_TYPE_HEADER, CONTENT_TYPE_VALUE)
+ .exampleRequestUri(TEST_REQUEST_URI)
+ .responseClass(String.class)
+ .successfulResponseStatus(SUCCESS_STATUS)
+ .build();
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
index 5543e2ba717..9b51c56b9aa 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
@@ -2032,6 +2032,7 @@ public class ConnectorResource extends
ApplicationResource {
.filename(sanitizedAssetName)
.identifier(uploadRequestId)
.contents(maxLengthInputStream)
+ .forwardRequestHeaders(getHeaders()) // Necessary for
ConnectorRequestContext when using a ConnectorConfigurationProvider
.header(FILENAME_HEADER, sanitizedAssetName)
.header(CONTENT_TYPE_HEADER, UPLOAD_CONTENT_TYPE)
.header(RequestReplicationHeader.CLUSTER_ID_GENERATION_SEED.getHeader(),
uploadRequestId)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
index a6af326fb5d..fe4687d54f9 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
@@ -2868,6 +2868,7 @@ public class ControllerResource extends
ApplicationResource {
.filename(filename)
.identifier(UUID.randomUUID().toString())
.contents(maxLengthInputStream)
+ .forwardRequestHeaders(getHeaders())
.header(FILENAME_HEADER, filename)
.header(CONTENT_TYPE_HEADER, UPLOAD_CONTENT_TYPE)
.exampleRequestUri(getAbsolutePath())
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
index d8b50385495..7d7fcce0399 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
@@ -443,6 +443,7 @@ public class ParameterContextResource extends
AbstractParameterResource {
.filename(sanitizedAssetName)
.identifier(UUID.randomUUID().toString())
.contents(maxLengthInputStream)
+ .forwardRequestHeaders(getHeaders())
.header(FILENAME_HEADER, sanitizedAssetName)
.header(CONTENT_TYPE_HEADER, UPLOAD_CONTENT_TYPE)
.exampleRequestUri(getAbsolutePath())