This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 21b5fb4e7e7 NIFI-15784 Forward request headers during upload 
replication (#11094)
21b5fb4e7e7 is described below

commit 21b5fb4e7e7c9311f0d5eb4d0d44fa245e55a6e5
Author: Kevin Doran <[email protected]>
AuthorDate: Thu Apr 2 14:24:46 2026 -0400

    NIFI-15784 Forward request headers during upload replication (#11094)
    
    UploadRequest replication (used for asset and NAR uploads) did not forward 
inbound servlet headers to peer nodes.
    Only three explicit headers (filename, content-type, 
cluster-id-generation-seed) were included.
    This caused ConnectorRequestContext to not have the full original request 
context.
    
    Standard cluster replication (ThreadPoolRequestReplicator) already forwards 
all inbound headers via ApplicationResource.getHeaders(),
    with appropriate stripping of credentials, hop-by-hop headers, and 
replication protocol headers.
    Upload replication now use the same approach for consistency.
    
    - Extract shared header-preparation logic into ReplicationHeaderUtils so 
credential stripping, proxied-entity injection, and replication header 
sanitisation cannot drift between the two replication paths
    - Add UploadRequest.forwardRequestHeaders(Map) so callers can opt in to 
forwarding inbound servlet headers
    - Rebuild outbound headers in StandardUploadRequestReplicator using a 
secure pipeline: strip replication headers, strip hop-by-hop, apply explicit 
builder headers, apply proxy/credential policy, force-set replication flags
    - Wire all current upload call sites (ConnectorResource.createAsset, 
ParameterContextResource asset upload, ControllerResource.uploadNar) to forward 
inbound headers
    - Use File.length() instead of InputStream.available() for reliable 
content-length in upload replication
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../http/replication/ReplicationHeaderUtils.java   | 154 +++++++++++
 .../StandardUploadRequestReplicator.java           |  50 ++--
 .../replication/ThreadPoolRequestReplicator.java   |  78 +-----
 .../http/replication/UploadRequest.java            |  26 ++
 .../replication/TestReplicationHeaderUtils.java    | 162 +++++++++++
 ...TestStandardUploadRequestReplicatorHeaders.java | 306 +++++++++++++++++++++
 .../org/apache/nifi/web/api/ConnectorResource.java |   1 +
 .../apache/nifi/web/api/ControllerResource.java    |   1 +
 .../nifi/web/api/ParameterContextResource.java     |   1 +
 9 files changed, 686 insertions(+), 93 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ReplicationHeaderUtils.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ReplicationHeaderUtils.java
new file mode 100644
index 00000000000..a5c4de9201c
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ReplicationHeaderUtils.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http.replication;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.web.security.ProxiedEntitiesUtils;
+import org.apache.nifi.web.security.http.SecurityCookieName;
+import org.apache.nifi.web.security.http.SecurityHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Shared header-preparation logic for cluster request replication. Both 
{@link ThreadPoolRequestReplicator}
+ * and {@link StandardUploadRequestReplicator} use these utilities so that 
credential stripping, proxied-entity
+ * injection, and replication-header sanitisation follow the same policy.
+ */
+public final class ReplicationHeaderUtils {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ReplicationHeaderUtils.class);
+
+    private static final String COOKIE_HEADER = "Cookie";
+    private static final String HOST_HEADER = "Host";
+
+    /**
+     * Headers that must not be forwarded because they are hop-by-hop, 
describe the original
+     * transport framing, or negotiate content/transfer encodings that the 
upload replication
+     * path cannot handle transparently.
+     *
+     * <p>This is intentionally a superset of
+     * {@link 
org.apache.nifi.cluster.coordination.http.replication.client.StandardHttpReplicationClient#DISALLOWED_HEADERS
+     * StandardHttpReplicationClient.DISALLOWED_HEADERS}, which omits 
encoding-negotiation
+     * headers because {@code StandardHttpReplicationClient} manages its own 
gzip pipeline.
+     * {@code StandardUploadRequestReplicator} reads responses raw, so 
encoding headers must
+     * also be stripped here.
+     */
+    static final Set<String> HOP_BY_HOP_HEADERS = Set.of(
+            "accept-encoding", "connection", "content-encoding", 
"content-length",
+            "expect", "host", "te", "transfer-encoding", "upgrade"
+    );
+
+    private ReplicationHeaderUtils() {
+    }
+
+    /**
+     * Prepares headers for a replicated request. When a non-null user is 
provided, the
+     * {@code X-ProxiedEntitiesChain} and {@code X-ProxiedEntityGroups} 
headers are set so the
+     * receiving node knows the request is on behalf of that user. When user 
is {@code null},
+     * these headers are omitted, indicating the request is made directly by 
the cluster node
+     * itself (e.g. for background connector state polling).
+     *
+     * <p>In all cases the {@code Authorization} header, auth-related cookies, 
and the
+     * {@code Host} header are removed because the replicated call uses mTLS 
for transport
+     * authentication and the proxied-entity chain for user identity.
+     *
+     * @param headers mutable map of HTTP headers to update
+     * @param user    the user on whose behalf the request is being made, or 
{@code null} for direct node requests
+     */
+    public static void applyUserProxyAndStripCredentials(final Map<String, 
String> headers, final NiFiUser user) {
+        if (user == null) {
+            logger.debug("No user provided: omitting proxied entities header 
from request");
+        } else {
+            logger.debug("NiFi User provided: adding proxied entities header 
to request");
+            headers.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN,
+                    
ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user));
+
+            headers.put(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS,
+                    
ProxiedEntitiesUtils.buildProxiedEntityGroupsString(user.getIdentityProviderGroups()));
+        }
+
+        removeHeader(headers, SecurityHeader.AUTHORIZATION.getHeader());
+        removeCookie(headers, 
SecurityCookieName.AUTHORIZATION_BEARER.getName());
+        removeCookie(headers, SecurityCookieName.REQUEST_TOKEN.getName());
+
+        removeHeader(headers, HOST_HEADER);
+    }
+
+    /**
+     * Removes all {@link RequestReplicationHeader} names from the map 
(case-insensitive) so
+     * that inbound client requests cannot spoof replication protocol headers.
+     */
+    public static void stripRequestReplicationHeaders(final Map<String, 
String> headers) {
+        for (final RequestReplicationHeader rh : 
RequestReplicationHeader.values()) {
+            removeHeader(headers, rh.getHeader());
+        }
+    }
+
+    /**
+     * Removes hop-by-hop / transport-framing headers that should not be 
forwarded in a
+     * replicated request (case-insensitive).
+     */
+    public static void stripHopByHopHeaders(final Map<String, String> headers) 
{
+        for (final String name : HOP_BY_HOP_HEADERS) {
+            removeHeader(headers, name);
+        }
+    }
+
+    static void removeHeader(final Map<String, String> headers, final String 
headerNameSearch) {
+        findHeaderName(headers, headerNameSearch).ifPresent(headers::remove);
+    }
+
+    static void removeCookie(final Map<String, String> headers, final String 
cookieName) {
+        final Optional<String> cookieHeaderNameFound = findHeaderName(headers, 
COOKIE_HEADER);
+
+        if (cookieHeaderNameFound.isPresent()) {
+            final String cookieHeaderName = cookieHeaderNameFound.get();
+            final String rawCookies = headers.get(cookieHeaderName);
+            final String[] rawCookieParts = rawCookies.split(";");
+            final Set<String> filteredCookieParts = Stream.of(rawCookieParts)
+                    .map(String::trim)
+                    .filter(cookie -> !cookie.startsWith(cookieName + "="))
+                    .collect(Collectors.toSet());
+
+            if (filteredCookieParts.isEmpty()) {
+                headers.remove(cookieHeaderName);
+            } else {
+                headers.put(cookieHeaderName, 
StringUtils.join(filteredCookieParts, "; "));
+            }
+        }
+    }
+
+    /**
+     * Find an HTTP header name in a map regardless of case, since HTTP/1.1 
capitalises headers
+     * but HTTP/2 returns lowercased headers.
+     */
+    static Optional<String> findHeaderName(final Map<String, String> headers, 
final String headerName) {
+        if (headerName == null || headerName.isBlank()) {
+            return Optional.empty();
+        }
+        return headers.keySet().stream()
+                .filter(headerName::equalsIgnoreCase)
+                .findFirst();
+    }
+}
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java
index 73b93acd9ba..9fbbd1c52e0 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java
@@ -19,7 +19,6 @@ package org.apache.nifi.cluster.coordination.http.replication;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.io.IOUtils;
-import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.util.NiFiProperties;
@@ -28,7 +27,6 @@ import org.apache.nifi.web.client.StandardHttpUriBuilder;
 import org.apache.nifi.web.client.api.HttpRequestBodySpec;
 import org.apache.nifi.web.client.api.HttpResponseEntity;
 import org.apache.nifi.web.client.api.WebClientService;
-import org.apache.nifi.web.security.ProxiedEntitiesUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -145,27 +143,20 @@ public class StandardUploadRequestReplicator implements 
UploadRequestReplicator
                 .encodedPath(exampleRequestUri.getPath())
                 .build();
 
-        final NiFiUser user = uploadRequest.getUser();
         final String filename = uploadRequest.getFilename();
 
+        final Map<String, String> outboundHeaders = 
buildOutboundHeaders(uploadRequest);
+
         try (final InputStream inputStream = new FileInputStream(contents)) {
-            final HttpRequestBodySpec request = webClientService.post()
-                    .uri(requestUri)
-                    .body(inputStream, 
OptionalLong.of(inputStream.available()))
-                    // Special NiFi-specific headers to indicate that the 
request should be performed and not replicated to the nodes
-                    
.header(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader(), 
Boolean.TRUE.toString())
-                    
.header(RequestReplicationHeader.REQUEST_REPLICATED.getHeader(), 
Boolean.TRUE.toString())
-                    .header(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, 
ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user))
-                    .header(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS, 
ProxiedEntitiesUtils.buildProxiedEntityGroupsString(user.getIdentityProviderGroups()));
-
-            final Map<String, String> additionalHeaders = 
uploadRequest.getHeaders();
-            for (Map.Entry<String, String> headerEntry : 
additionalHeaders.entrySet()) {
-                request.header(headerEntry.getKey(), headerEntry.getValue());
+            HttpRequestBodySpec request = 
webClientService.post().uri(requestUri);
+
+            for (final Map.Entry<String, String> entry : 
outboundHeaders.entrySet()) {
+                request = request.header(entry.getKey(), entry.getValue());
             }
 
             logger.debug("Replicating upload request for {} to {}", filename, 
nodeId);
 
-            try (final HttpResponseEntity response = request.retrieve()) {
+            try (final HttpResponseEntity response = request.body(inputStream, 
OptionalLong.of(contents.length())).retrieve()) {
                 final int statusCode = response.statusCode();
                 if (uploadRequest.getSuccessfulResponseStatus() != statusCode) 
{
                     final String responseMessage = 
IOUtils.toString(response.body(), StandardCharsets.UTF_8);
@@ -175,6 +166,33 @@ public class StandardUploadRequestReplicator implements 
UploadRequestReplicator
                 return objectMapper.readValue(responseBody, 
uploadRequest.getResponseClass());
             }
         }
+    }
+
+    /**
+     * Builds the complete set of outbound headers for a replicated upload 
request, following the
+     * same trust model as {@link ThreadPoolRequestReplicator}:
+     * <ol>
+     *   <li>Start with any forwarded inbound servlet headers.</li>
+     *   <li>Strip all {@link RequestReplicationHeader} names (prevent 
spoofing).</li>
+     *   <li>Strip hop-by-hop / transport-framing headers.</li>
+     *   <li>Apply explicit builder headers (filename, content-type, seed) so 
upload metadata wins.</li>
+     *   <li>Apply user proxy headers and strip credentials (Authorization, 
auth cookies, Host).</li>
+     *   <li>Force-set {@code request-replicated} and {@code 
execution-continue}.</li>
+     * </ol>
+     */
+    <T> Map<String, String> buildOutboundHeaders(final UploadRequest<T> 
uploadRequest) {
+        final Map<String, String> headers = new 
HashMap<>(uploadRequest.getForwardedRequestHeaders());
+
+        ReplicationHeaderUtils.stripRequestReplicationHeaders(headers);
+        ReplicationHeaderUtils.stripHopByHopHeaders(headers);
+
+        headers.putAll(uploadRequest.getHeaders());
+
+        ReplicationHeaderUtils.applyUserProxyAndStripCredentials(headers, 
uploadRequest.getUser());
+
+        headers.put(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader(), 
Boolean.TRUE.toString());
+        headers.put(RequestReplicationHeader.REQUEST_REPLICATED.getHeader(), 
Boolean.TRUE.toString());
 
+        return headers;
     }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index 6b946e2492b..fe79e356b72 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -20,7 +20,6 @@ package org.apache.nifi.cluster.coordination.http.replication;
 import jakarta.ws.rs.HttpMethod;
 import jakarta.ws.rs.core.Response;
 import jakarta.ws.rs.core.Response.Status;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.AccessDeniedException;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
@@ -53,9 +52,6 @@ import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.ComponentIdGenerator;
 import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.security.ProxiedEntitiesUtils;
-import org.apache.nifi.web.security.http.SecurityCookieName;
-import org.apache.nifi.web.security.http.SecurityHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +68,6 @@ import java.util.List;
 import java.util.LongSummaryStatistics;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -90,15 +85,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 public class ThreadPoolRequestReplicator implements RequestReplicator, 
Closeable {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
     private static final Pattern SNIPPET_URI_PATTERN = 
Pattern.compile("/nifi-api/snippets/[a-f0-9\\-]{36}");
 
-    private static final String COOKIE_HEADER = "Cookie";
-    private static final String HOST_HEADER = "Host";
     private static final String NODE_CONTINUE = "202-Accepted";
 
     private final int maxConcurrentRequests; // maximum number of concurrent 
requests
@@ -248,30 +240,7 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator, Closeable
      * @param user the user on whose behalf the request is being made, or 
{@code null} for direct node requests
      */
     void updateRequestHeaders(final Map<String, String> headers, final 
NiFiUser user) {
-        if (user == null) {
-            // Background tasks where the node itself is the identity making 
the request, not a proxied user
-            logger.debug("No user provided: omitting proxied entities header 
from request");
-        } else {
-            logger.debug("NiFi User provided: adding proxied entities header 
to request");
-            // Add the user as a proxied entity so that when the receiving 
NiFi receives the request,
-            // it knows that we are acting as a proxy on behalf of the current 
user.
-            final String proxiedEntitiesChain = 
ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user);
-            headers.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, 
proxiedEntitiesChain);
-
-            // Add the header containing the group information for the end 
user in the proxied entity chain, these groups would
-            // only be populated if the end user authenticated against an 
external identity provider like SAML or OIDC
-            final String proxiedEntityGroups = 
ProxiedEntitiesUtils.buildProxiedEntityGroupsString(user.getIdentityProviderGroups());
-            headers.put(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS, 
proxiedEntityGroups);
-        }
-
-        // remove the access token if present, since the user is already 
authenticated... authorization
-        // will happen when the request is replicated using the proxy chain 
above
-        removeHeader(headers, SecurityHeader.AUTHORIZATION.getHeader());
-        removeCookie(headers, 
SecurityCookieName.AUTHORIZATION_BEARER.getName());
-        removeCookie(headers, SecurityCookieName.REQUEST_TOKEN.getName());
-
-        // remove the host header
-        removeHeader(headers, HOST_HEADER);
+        ReplicationHeaderUtils.applyUserProxyAndStripCredentials(headers, 
user);
     }
 
     @Override
@@ -890,49 +859,4 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator, Closeable
         return responseMap.size();
     }
 
-    private void removeCookie(final Map<String, String> headers, final String 
cookieName) {
-        final Optional<String> cookieHeaderNameFound = findHeaderName(headers, 
COOKIE_HEADER);
-
-        if (cookieHeaderNameFound.isPresent()) {
-            final String cookieHeaderName = cookieHeaderNameFound.get();
-
-            final String rawCookies = headers.get(cookieHeaderName);
-            final String[] rawCookieParts = rawCookies.split(";");
-            final Set<String> filteredCookieParts = 
Stream.of(rawCookieParts).map(String::trim).filter(cookie -> 
!cookie.startsWith(cookieName + "=")).collect(Collectors.toSet());
-
-            if (filteredCookieParts.isEmpty()) {
-                headers.remove(cookieHeaderName);
-            } else {
-                final String filteredCookies = 
StringUtils.join(filteredCookieParts, "; ");
-                headers.put(cookieHeaderName, filteredCookies);
-            }
-        }
-    }
-
-    private void removeHeader(final Map<String, String> headers, final String 
headerNameSearch) {
-        final Optional<String> headerNameFound = findHeaderName(headers, 
headerNameSearch);
-        headerNameFound.ifPresent(headers::remove);
-    }
-
-    /**
-     * Find HTTP Header name in map regardless of case since HTTP/1.1 
capitalizes headers but HTTP/2 returns lowercased headers
-     *
-     * @param headers Map of header name to value
-     * @param headerName Header name to be found
-     * @return Optional match with header name from map of headers
-     */
-    private Optional<String> findHeaderName(final Map<String, String> headers, 
final String headerName) {
-        final Optional<String> headerNameFound;
-
-        if (headerName == null || headerName.isBlank()) {
-            headerNameFound = Optional.empty();
-        } else {
-            headerNameFound = headers.keySet()
-                    .stream()
-                    .filter(headerName::equalsIgnoreCase)
-                    .findFirst();
-        }
-
-        return headerNameFound;
-    }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/UploadRequest.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/UploadRequest.java
index 00d941f9f53..cf01b2d879b 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/UploadRequest.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/UploadRequest.java
@@ -21,6 +21,7 @@ import org.apache.nifi.authorization.user.NiFiUser;
 
 import java.io.InputStream;
 import java.net.URI;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -37,6 +38,7 @@ public class UploadRequest<T> {
     private final String identifier;
     private final InputStream contents;
     private final Map<String, String> headers;
+    private final Map<String, String> forwardedRequestHeaders;
     private final URI exampleRequestUri;
     private final Class<T> responseClass;
     private final int successfulResponseStatus;
@@ -47,6 +49,8 @@ public class UploadRequest<T> {
         this.identifier = Objects.requireNonNull(builder.identifier);
         this.contents = Objects.requireNonNull(builder.contents);
         this.headers = Map.copyOf(builder.headers);
+        this.forwardedRequestHeaders = builder.forwardedRequestHeaders == null
+                ? Collections.emptyMap() : 
Map.copyOf(builder.forwardedRequestHeaders);
         this.exampleRequestUri = 
Objects.requireNonNull(builder.exampleRequestUri);
         this.responseClass = Objects.requireNonNull(builder.responseClass);
         this.successfulResponseStatus = builder.successfulResponseStatus;
@@ -75,6 +79,15 @@ public class UploadRequest<T> {
         return headers;
     }
 
+    /**
+     * Returns the inbound servlet request headers that should be forwarded 
during replication.
+     * These are sanitised by the replicator before being sent to target 
nodes. Empty if the
+     * caller did not supply forwarded headers.
+     */
+    public Map<String, String> getForwardedRequestHeaders() {
+        return forwardedRequestHeaders;
+    }
+
     public URI getExampleRequestUri() {
         return exampleRequestUri;
     }
@@ -96,6 +109,7 @@ public class UploadRequest<T> {
         private Class<T> responseClass;
         private int successfulResponseStatus;
         private final Map<String, String> headers = new HashMap<>();
+        private Map<String, String> forwardedRequestHeaders;
 
         public Builder<T> user(NiFiUser user) {
             this.user = user;
@@ -132,6 +146,18 @@ public class UploadRequest<T> {
             return this;
         }
 
+        /**
+         * Sets the inbound servlet request headers that should be forwarded 
during replication.
+         * The replicator sanitises these (strips credentials, replication 
protocol headers,
+         * hop-by-hop headers) before sending to target nodes.
+         *
+         * @param forwardedRequestHeaders the original inbound headers, or 
{@code null} for none
+         */
+        public Builder<T> forwardRequestHeaders(Map<String, String> 
forwardedRequestHeaders) {
+            this.forwardedRequestHeaders = forwardedRequestHeaders == null ? 
null : new HashMap<>(forwardedRequestHeaders);
+            return this;
+        }
+
         public Builder<T> header(String name, String value) {
             this.headers.put(name, value);
             return this;
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestReplicationHeaderUtils.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestReplicationHeaderUtils.java
new file mode 100644
index 00000000000..09a24745ca4
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestReplicationHeaderUtils.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http.replication;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.user.StandardNiFiUser;
+import org.apache.nifi.web.security.ProxiedEntitiesUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestReplicationHeaderUtils {
+
+    private static final String TEST_USER_IDENTITY = "alice";
+
+    private static final String AUTHORIZATION_HEADER = "Authorization";
+    private static final String AUTHORIZATION_VALUE = "Bearer secret";
+
+    private static final String CUSTOM_HEADER = "X-Custom-Token";
+    private static final String CUSTOM_HEADER_VALUE = "custom-token-123";
+
+    private static final String COOKIE_HEADER = "Cookie";
+    private static final String HOST_HEADER = "Host";
+    private static final String HOST_VALUE = "original-host:8080";
+
+    private static final String ACCEPT_ENCODING_HEADER = "Accept-Encoding";
+    private static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
+    private static final String CONTENT_LENGTH_HEADER = "Content-Length";
+    private static final String TE_HEADER = "TE";
+    private static final String TRANSFER_ENCODING_HEADER = "Transfer-Encoding";
+    private static final String CONNECTION_HEADER = "Connection";
+
+    private static final String SPOOFED_VALUE = "spoofed";
+    private static final String SHOULD_SURVIVE_VALUE = "should-survive";
+
+    @Test
+    void testApplyUserProxyAndStripCredentialsSetsProxiedEntities() {
+        final NiFiUser user = new 
StandardNiFiUser.Builder().identity(TEST_USER_IDENTITY).build();
+        final Map<String, String> headers = new HashMap<>();
+        headers.put(AUTHORIZATION_HEADER, AUTHORIZATION_VALUE);
+        headers.put(CUSTOM_HEADER, CUSTOM_HEADER_VALUE);
+
+        ReplicationHeaderUtils.applyUserProxyAndStripCredentials(headers, 
user);
+
+        assertNotNull(headers.get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN));
+        
assertTrue(headers.get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN).contains(TEST_USER_IDENTITY));
+        assertNotNull(headers.get(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS));
+        assertNull(headers.get(AUTHORIZATION_HEADER));
+        assertEquals(CUSTOM_HEADER_VALUE, headers.get(CUSTOM_HEADER));
+    }
+
+    @Test
+    void testApplyUserProxyWithNullUserOmitsProxiedEntities() {
+        final Map<String, String> headers = new HashMap<>();
+        headers.put(AUTHORIZATION_HEADER, AUTHORIZATION_VALUE);
+
+        ReplicationHeaderUtils.applyUserProxyAndStripCredentials(headers, 
null);
+
+        assertNull(headers.get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN));
+        assertNull(headers.get(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS));
+        assertNull(headers.get(AUTHORIZATION_HEADER));
+    }
+
+    @Test
+    void testStripRequestReplicationHeadersRemovesAllProtocolHeaders() {
+        final Map<String, String> headers = new HashMap<>();
+        for (final RequestReplicationHeader rh : 
RequestReplicationHeader.values()) {
+            headers.put(rh.getHeader(), SPOOFED_VALUE);
+        }
+        headers.put(CUSTOM_HEADER, SHOULD_SURVIVE_VALUE);
+
+        ReplicationHeaderUtils.stripRequestReplicationHeaders(headers);
+
+        for (final RequestReplicationHeader rh : 
RequestReplicationHeader.values()) {
+            assertNull(headers.get(rh.getHeader()), "Replication header should 
have been stripped: " + rh.getHeader());
+        }
+        assertEquals(SHOULD_SURVIVE_VALUE, headers.get(CUSTOM_HEADER));
+    }
+
+    @Test
+    void testStripRequestReplicationHeadersCaseInsensitive() {
+        final Map<String, String> headers = new HashMap<>();
+        headers.put("Request-Replicated", "true");
+        headers.put("EXECUTION-CONTINUE", "true");
+
+        ReplicationHeaderUtils.stripRequestReplicationHeaders(headers);
+
+        assertFalse(headers.containsKey("Request-Replicated"));
+        assertFalse(headers.containsKey("EXECUTION-CONTINUE"));
+    }
+
+    @Test
+    void testStripHopByHopHeaders() {
+        final Map<String, String> headers = new HashMap<>();
+        headers.put(ACCEPT_ENCODING_HEADER, "gzip, deflate");
+        headers.put(CONTENT_ENCODING_HEADER, "gzip");
+        headers.put(CONTENT_LENGTH_HEADER, "12345");
+        headers.put(HOST_HEADER, HOST_VALUE);
+        headers.put(TE_HEADER, "trailers, deflate");
+        headers.put(TRANSFER_ENCODING_HEADER, "chunked");
+        headers.put(CONNECTION_HEADER, "keep-alive");
+        headers.put(CUSTOM_HEADER, SHOULD_SURVIVE_VALUE);
+
+        ReplicationHeaderUtils.stripHopByHopHeaders(headers);
+
+        assertNull(headers.get(ACCEPT_ENCODING_HEADER));
+        assertNull(headers.get(CONTENT_ENCODING_HEADER));
+        assertNull(headers.get(CONTENT_LENGTH_HEADER));
+        assertNull(headers.get(HOST_HEADER));
+        assertNull(headers.get(TE_HEADER));
+        assertNull(headers.get(TRANSFER_ENCODING_HEADER));
+        assertNull(headers.get(CONNECTION_HEADER));
+        assertEquals(SHOULD_SURVIVE_VALUE, headers.get(CUSTOM_HEADER));
+    }
+
+    @Test
+    void testStripAuthCookies() {
+        final Map<String, String> headers = new HashMap<>();
+        headers.put(COOKIE_HEADER, "__Secure-Authorization-Bearer=token123; 
__Secure-Request-Token=rt456; other=value");
+
+        ReplicationHeaderUtils.applyUserProxyAndStripCredentials(headers, 
null);
+
+        final String remaining = headers.get(COOKIE_HEADER);
+        assertNotNull(remaining);
+        assertFalse(remaining.contains("__Secure-Authorization-Bearer"));
+        assertFalse(remaining.contains("__Secure-Request-Token"));
+        assertTrue(remaining.contains("other=value"));
+    }
+
+    @Test
+    void testRemoveHostHeader() {
+        final Map<String, String> headers = new HashMap<>();
+        headers.put(HOST_HEADER, HOST_VALUE);
+        headers.put(CUSTOM_HEADER, SHOULD_SURVIVE_VALUE);
+
+        ReplicationHeaderUtils.applyUserProxyAndStripCredentials(headers, 
null);
+
+        assertNull(headers.get(HOST_HEADER));
+        assertEquals(SHOULD_SURVIVE_VALUE, headers.get(CUSTOM_HEADER));
+    }
+}
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestStandardUploadRequestReplicatorHeaders.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestStandardUploadRequestReplicatorHeaders.java
new file mode 100644
index 00000000000..ecc8a7fc116
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestStandardUploadRequestReplicatorHeaders.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http.replication;
+
+import org.apache.nifi.authorization.user.StandardNiFiUser;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.client.api.WebClientService;
+import org.apache.nifi.web.security.ProxiedEntitiesUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+class TestStandardUploadRequestReplicatorHeaders {
+
+    private static final String TEST_USER_IDENTITY = "test-user";
+    private static final String REAL_USER_IDENTITY = "real-user";
+    private static final String EVIL_USER_IDENTITY = "evil-user";
+
+    private static final String TEST_FILENAME = "test.txt";
+    private static final String TEST_IDENTIFIER = "id-1";
+    private static final URI TEST_REQUEST_URI = 
URI.create("https://localhost:8443/nifi-api/test";);
+    private static final int SUCCESS_STATUS = 200;
+
+    private static final String FILENAME_HEADER = "Filename";
+    private static final String CONTENT_TYPE_HEADER = "Content-Type";
+    private static final String CONTENT_TYPE_VALUE = 
"application/octet-stream";
+
+    private static final String AUTHORIZATION_HEADER = "Authorization";
+    private static final String AUTHORIZATION_VALUE = "Bearer secret-token";
+
+    private static final String CUSTOM_TOKEN_HEADER = "X-Custom-Token";
+    private static final String CUSTOM_TOKEN_VALUE = "custom-token-abc";
+    private static final String CUSTOM_TOKEN_VALUE_ALT = "keep-this";
+    private static final String CUSTOM_TOKEN_VALUE_SURVIVE = "keep-me";
+
+    private static final String CUSTOM_HEADER = "X-Custom-Header";
+    private static final String CUSTOM_HEADER_VALUE = "custom-value";
+
+    private static final String ACCEPT_ENCODING_HEADER = "Accept-Encoding";
+    private static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
+    private static final String CONTENT_LENGTH_HEADER = "Content-Length";
+    private static final String HOST_HEADER = "Host";
+    private static final String TE_HEADER = "TE";
+    private static final String TRANSFER_ENCODING_HEADER = "Transfer-Encoding";
+    private static final String CONNECTION_HEADER = "Connection";
+
+    private static final String COOKIE_HEADER = "Cookie";
+    private static final String COOKIE_VALUE_WITH_AUTH = 
"__Secure-Authorization-Bearer=token123; __Secure-Request-Token=rt456; 
custom=value";
+    private static final String COOKIE_CUSTOM_SEGMENT = "custom=value";
+
+    private static final String SPOOFED_VALUE = "spoofed";
+    private static final String SPOOFED_TX_VALUE = "spoofed-tx";
+
+    private StandardUploadRequestReplicator replicator;
+
+    @BeforeEach
+    void setUp(@TempDir Path tempDir) throws IOException {
+        final Properties props = new Properties();
+        props.setProperty(NiFiProperties.UPLOAD_WORKING_DIRECTORY, 
tempDir.toString());
+        final NiFiProperties nifiProperties = 
NiFiProperties.createBasicNiFiProperties((String) null, props);
+        replicator = new StandardUploadRequestReplicator(
+                mock(ClusterCoordinator.class),
+                mock(WebClientService.class),
+                nifiProperties
+        );
+    }
+
+    @Test
+    void testCustomHeaderPreservedFromForwardedHeaders() {
+        final Map<String, String> forwarded = new HashMap<>();
+        forwarded.put(CUSTOM_TOKEN_HEADER, CUSTOM_TOKEN_VALUE);
+        forwarded.put(CUSTOM_HEADER, CUSTOM_HEADER_VALUE);
+
+        final UploadRequest<String> request = buildUploadRequest(forwarded);
+        final Map<String, String> result = 
replicator.buildOutboundHeaders(request);
+
+        assertEquals(CUSTOM_TOKEN_VALUE, result.get(CUSTOM_TOKEN_HEADER));
+        assertEquals(CUSTOM_HEADER_VALUE, result.get(CUSTOM_HEADER));
+    }
+
+    @Test
+    void testReplicationHeadersFromForwardedInputAreStripped() {
+        final Map<String, String> forwarded = new HashMap<>();
+        forwarded.put(RequestReplicationHeader.REQUEST_REPLICATED.getHeader(), 
SPOOFED_VALUE);
+        forwarded.put(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader(), 
SPOOFED_VALUE);
+        
forwarded.put(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader(), 
SPOOFED_TX_VALUE);
+
+        final UploadRequest<String> request = buildUploadRequest(forwarded);
+        final Map<String, String> result = 
replicator.buildOutboundHeaders(request);
+
+        assertEquals(Boolean.TRUE.toString(), 
result.get(RequestReplicationHeader.REQUEST_REPLICATED.getHeader()));
+        assertEquals(Boolean.TRUE.toString(), 
result.get(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader()));
+        
assertNull(result.get(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader()));
+    }
+
+    @Test
+    void testAuthorizationHeaderStripped() {
+        final Map<String, String> forwarded = new HashMap<>();
+        forwarded.put(AUTHORIZATION_HEADER, AUTHORIZATION_VALUE);
+        forwarded.put(CUSTOM_TOKEN_HEADER, CUSTOM_TOKEN_VALUE_ALT);
+
+        final UploadRequest<String> request = buildUploadRequest(forwarded);
+        final Map<String, String> result = 
replicator.buildOutboundHeaders(request);
+
+        assertNull(result.get(AUTHORIZATION_HEADER));
+        assertEquals(CUSTOM_TOKEN_VALUE_ALT, result.get(CUSTOM_TOKEN_HEADER));
+    }
+
+    @Test
+    void testHopByHopHeadersStripped() {
+        final Map<String, String> forwarded = new HashMap<>();
+        forwarded.put(ACCEPT_ENCODING_HEADER, "gzip, deflate");
+        forwarded.put(CONTENT_ENCODING_HEADER, "gzip");
+        forwarded.put(CONTENT_LENGTH_HEADER, "99999");
+        forwarded.put(HOST_HEADER, "original-host:443");
+        forwarded.put(TE_HEADER, "trailers, deflate");
+        forwarded.put(TRANSFER_ENCODING_HEADER, "chunked");
+        forwarded.put(CONNECTION_HEADER, "keep-alive");
+
+        final UploadRequest<String> request = buildUploadRequest(forwarded);
+        final Map<String, String> result = 
replicator.buildOutboundHeaders(request);
+
+        assertNull(result.get(ACCEPT_ENCODING_HEADER));
+        assertNull(result.get(CONTENT_ENCODING_HEADER));
+        assertNull(result.get(CONTENT_LENGTH_HEADER));
+        assertNull(result.get(TE_HEADER));
+        assertNull(result.get(TRANSFER_ENCODING_HEADER));
+        assertNull(result.get(CONNECTION_HEADER));
+        assertNull(result.get(HOST_HEADER));
+    }
+
+    @Test
+    void testExplicitBuilderHeadersWinOverForwarded() {
+        final Map<String, String> forwarded = new HashMap<>();
+        forwarded.put(FILENAME_HEADER, "forwarded-name.txt");
+
+        final UploadRequest<String> request = new 
UploadRequest.Builder<String>()
+                .user(new 
StandardNiFiUser.Builder().identity(TEST_USER_IDENTITY).build())
+                .filename(TEST_FILENAME)
+                .identifier(TEST_IDENTIFIER)
+                .contents(new ByteArrayInputStream(new byte[0]))
+                .forwardRequestHeaders(forwarded)
+                .header(FILENAME_HEADER, "explicit-name.txt")
+                .exampleRequestUri(TEST_REQUEST_URI)
+                .responseClass(String.class)
+                .successfulResponseStatus(SUCCESS_STATUS)
+                .build();
+
+        final Map<String, String> result = 
replicator.buildOutboundHeaders(request);
+        assertEquals("explicit-name.txt", result.get(FILENAME_HEADER));
+    }
+
+    @Test
+    void testProxiedEntitiesSetFromUser() {
+        final Map<String, String> forwarded = new HashMap<>();
+        forwarded.put(CUSTOM_TOKEN_HEADER, CUSTOM_TOKEN_VALUE);
+
+        final UploadRequest<String> request = buildUploadRequest(forwarded);
+        final Map<String, String> result = 
replicator.buildOutboundHeaders(request);
+
+        assertNotNull(result.get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN));
+        
assertTrue(result.get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN).contains(TEST_USER_IDENTITY));
+        assertNotNull(result.get(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS));
+    }
+
+    @Test
+    void testReplicationFlagsAlwaysSet() {
+        final UploadRequest<String> request = buildUploadRequest(new 
HashMap<>());
+        final Map<String, String> result = 
replicator.buildOutboundHeaders(request);
+
+        assertEquals(Boolean.TRUE.toString(), 
result.get(RequestReplicationHeader.REQUEST_REPLICATED.getHeader()));
+        assertEquals(Boolean.TRUE.toString(), 
result.get(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader()));
+    }
+
+    @Test
+    void testNoForwardedHeadersStillWorks() {
+        final UploadRequest<String> request = new 
UploadRequest.Builder<String>()
+                .user(new 
StandardNiFiUser.Builder().identity(TEST_USER_IDENTITY).build())
+                .filename(TEST_FILENAME)
+                .identifier(TEST_IDENTIFIER)
+                .contents(new ByteArrayInputStream(new byte[0]))
+                .header(FILENAME_HEADER, TEST_FILENAME)
+                .exampleRequestUri(TEST_REQUEST_URI)
+                .responseClass(String.class)
+                .successfulResponseStatus(SUCCESS_STATUS)
+                .build();
+
+        final Map<String, String> result = 
replicator.buildOutboundHeaders(request);
+
+        assertEquals(TEST_FILENAME, result.get(FILENAME_HEADER));
+        assertEquals(Boolean.TRUE.toString(), 
result.get(RequestReplicationHeader.REQUEST_REPLICATED.getHeader()));
+        assertNotNull(result.get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN));
+    }
+
+    @Test
+    void testBuilderCannotOverrideProxiedEntities() {
+        final UploadRequest<String> request = new 
UploadRequest.Builder<String>()
+                .user(new 
StandardNiFiUser.Builder().identity(REAL_USER_IDENTITY).build())
+                .filename(TEST_FILENAME)
+                .identifier(TEST_IDENTIFIER)
+                .contents(new ByteArrayInputStream(new byte[0]))
+                .header(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, "<" + 
EVIL_USER_IDENTITY + ">")
+                .exampleRequestUri(TEST_REQUEST_URI)
+                .responseClass(String.class)
+                .successfulResponseStatus(SUCCESS_STATUS)
+                .build();
+
+        final Map<String, String> result = 
replicator.buildOutboundHeaders(request);
+
+        
assertFalse(result.get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN).contains(EVIL_USER_IDENTITY));
+        
assertTrue(result.get(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN).contains(REAL_USER_IDENTITY));
+    }
+
+    @Test
+    void testAuthCookiesStrippedFromForwardedHeaders() {
+        final Map<String, String> forwarded = new HashMap<>();
+        forwarded.put(COOKIE_HEADER, COOKIE_VALUE_WITH_AUTH);
+        forwarded.put(CUSTOM_TOKEN_HEADER, CUSTOM_TOKEN_VALUE_SURVIVE);
+
+        final UploadRequest<String> request = buildUploadRequest(forwarded);
+        final Map<String, String> result = 
replicator.buildOutboundHeaders(request);
+
+        final String cookies = result.get(COOKIE_HEADER);
+        assertNotNull(cookies);
+        assertFalse(cookies.contains("__Secure-Authorization-Bearer"));
+        assertFalse(cookies.contains("__Secure-Request-Token"));
+        assertTrue(cookies.contains(COOKIE_CUSTOM_SEGMENT));
+        assertEquals(CUSTOM_TOKEN_VALUE_SURVIVE, 
result.get(CUSTOM_TOKEN_HEADER));
+    }
+
+    @Test
+    void testForwardRequestHeadersNullProducesSameAsOmitted() {
+        final UploadRequest<String> withNull = new 
UploadRequest.Builder<String>()
+                .user(new 
StandardNiFiUser.Builder().identity(TEST_USER_IDENTITY).build())
+                .filename(TEST_FILENAME)
+                .identifier(TEST_IDENTIFIER)
+                .contents(new ByteArrayInputStream(new byte[0]))
+                .forwardRequestHeaders(null)
+                .header(FILENAME_HEADER, TEST_FILENAME)
+                .exampleRequestUri(TEST_REQUEST_URI)
+                .responseClass(String.class)
+                .successfulResponseStatus(SUCCESS_STATUS)
+                .build();
+
+        final UploadRequest<String> withoutCall = new 
UploadRequest.Builder<String>()
+                .user(new 
StandardNiFiUser.Builder().identity(TEST_USER_IDENTITY).build())
+                .filename(TEST_FILENAME)
+                .identifier(TEST_IDENTIFIER)
+                .contents(new ByteArrayInputStream(new byte[0]))
+                .header(FILENAME_HEADER, TEST_FILENAME)
+                .exampleRequestUri(TEST_REQUEST_URI)
+                .responseClass(String.class)
+                .successfulResponseStatus(SUCCESS_STATUS)
+                .build();
+
+        final Map<String, String> resultNull = 
replicator.buildOutboundHeaders(withNull);
+        final Map<String, String> resultOmitted = 
replicator.buildOutboundHeaders(withoutCall);
+
+        assertEquals(resultOmitted, resultNull);
+    }
+
+    private UploadRequest<String> buildUploadRequest(final Map<String, String> 
forwardedHeaders) {
+        return new UploadRequest.Builder<String>()
+                .user(new 
StandardNiFiUser.Builder().identity(TEST_USER_IDENTITY).build())
+                .filename(TEST_FILENAME)
+                .identifier(TEST_IDENTIFIER)
+                .contents(new ByteArrayInputStream(new byte[0]))
+                .forwardRequestHeaders(forwardedHeaders)
+                .header(FILENAME_HEADER, TEST_FILENAME)
+                .header(CONTENT_TYPE_HEADER, CONTENT_TYPE_VALUE)
+                .exampleRequestUri(TEST_REQUEST_URI)
+                .responseClass(String.class)
+                .successfulResponseStatus(SUCCESS_STATUS)
+                .build();
+    }
+}
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
index 5543e2ba717..9b51c56b9aa 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
@@ -2032,6 +2032,7 @@ public class ConnectorResource extends 
ApplicationResource {
                 .filename(sanitizedAssetName)
                 .identifier(uploadRequestId)
                 .contents(maxLengthInputStream)
+                .forwardRequestHeaders(getHeaders())  // Necessary for 
ConnectorRequestContext when using a ConnectorConfigurationProvider
                 .header(FILENAME_HEADER, sanitizedAssetName)
                 .header(CONTENT_TYPE_HEADER, UPLOAD_CONTENT_TYPE)
                 
.header(RequestReplicationHeader.CLUSTER_ID_GENERATION_SEED.getHeader(), 
uploadRequestId)
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
index a6af326fb5d..fe4687d54f9 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
@@ -2868,6 +2868,7 @@ public class ControllerResource extends 
ApplicationResource {
                     .filename(filename)
                     .identifier(UUID.randomUUID().toString())
                     .contents(maxLengthInputStream)
+                    .forwardRequestHeaders(getHeaders())
                     .header(FILENAME_HEADER, filename)
                     .header(CONTENT_TYPE_HEADER, UPLOAD_CONTENT_TYPE)
                     .exampleRequestUri(getAbsolutePath())
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
index d8b50385495..7d7fcce0399 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
@@ -443,6 +443,7 @@ public class ParameterContextResource extends 
AbstractParameterResource {
                     .filename(sanitizedAssetName)
                     .identifier(UUID.randomUUID().toString())
                     .contents(maxLengthInputStream)
+                    .forwardRequestHeaders(getHeaders())
                     .header(FILENAME_HEADER, sanitizedAssetName)
                     .header(CONTENT_TYPE_HEADER, UPLOAD_CONTENT_TYPE)
                     .exampleRequestUri(getAbsolutePath())

Reply via email to