This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new f1b89ae2a67 branch-4.1: [fix](fe) Fix broken pipe risk on stream load 
redirect with unconsumed request body (#64302)
f1b89ae2a67 is described below

commit f1b89ae2a6771d458097ce8c88c8dd6aac5cd303
Author: Xin Liao <[email protected]>
AuthorDate: Wed Jun 10 11:52:03 2026 +0800

    branch-4.1: [fix](fe) Fix broken pipe risk on stream load redirect with 
unconsumed request body (#64302)
    
    Pick apache/doris#63332
    
    Co-authored-by: Wen Zhenghu <[email protected]>
    Co-authored-by: yaoxiao <[email protected]>
---
 .../main/java/org/apache/doris/common/Config.java  |  24 ++
 .../config/WebServerFactoryCustomizerConfig.java   |   3 +
 .../org/apache/doris/httpv2/rest/LoadAction.java   | 156 +++++---
 .../doris/httpv2/rest/RestBaseController.java      |  51 ++-
 .../httpv2/util/StreamLoadRedirectDrainUtil.java   | 154 +++++++
 .../apache/doris/httpv2/rest/LoadActionTest.java   | 445 ++++++++++++++++++++-
 .../doris/httpv2/rest/RestBaseControllerTest.java  |  64 +++
 .../util/StreamLoadRedirectDrainUtilTest.java      | 394 ++++++++++++++++++
 .../scripts/stream_load_redirect_chunked_e2e.py    | 140 +++++++
 ...test_stream_load_fe_redirect_chunked_e2e.groovy | 117 ++++++
 10 files changed, 1460 insertions(+), 88 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 74a3ffa1f70..500d17382cb 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -464,6 +464,13 @@ public class Config extends ConfigBase {
             "The maximum HTTP POST size of Jetty, in bytes, the default value 
is 100MB."})
     public static int jetty_server_max_http_post_size = 100 * 1024 * 1024;
 
+    @ConfField(description = {
+            "Jetty 在应用未消费完请求体时,额外尝试读取剩余内容的最大次数。"
+                    + "-1 表示不限制,0 表示不额外读取,正数表示最大读取次数。",
+            "The maximum number of extra reads Jetty performs for unconsumed 
request content. "
+                    + "-1 means unlimited, 0 means disabled, and a positive 
value limits the read attempts."})
+    public static int jetty_server_max_unconsumed_request_content_reads = -1;
+
     @ConfField(description = {"Jetty 的最大 HTTP header 大小,单位是字节,默认值是 1MB。",
             "The maximum HTTP header size of Jetty, in bytes, the default 
value is 1MB."})
     public static int jetty_server_max_http_header_size = 1048576;
@@ -3628,6 +3635,23 @@ public class Config extends ConfigBase {
             + "public-private/public/private/direct/random-be and empty 
string" })
     public static String streamload_redirect_policy = "";
 
+    @ConfField(mutable = true, description = {
+            "Stream Load redirect 场景下,FE 在返回 307 后额外丢弃请求体的最大字节数。"
+                    + "0 表示关闭该兼容逻辑,正数表示最大丢弃字节数。",
+            "The maximum number of request body bytes FE drains after 
returning 307 for Stream Load redirects. "
+                    + "0 disables the compatibility logic, and a positive 
value sets the byte limit."})
+    // Enable a generous bounded drain window by default to preserve FE 
redirect compatibility on Jetty 12.
+    public static long stream_load_redirect_bounded_drain_max_bytes = 1024L * 
1024 * 1024;
+
+    @ConfField(mutable = true, description = {
+            "Stream Load redirect 场景下,FE 在检测到请求体暂时无可读数据后继续等待的最大空闲时长,单位毫秒。"
+                    + "0 表示不额外等待,用于给慢客户端或分段到达的数据保留一个有限的缓冲窗口。",
+            "The maximum idle wait time in milliseconds after FE detects no 
readable request body bytes "
+                    + "during Stream Load redirect drain. 0 disables the extra 
idle wait, while a positive value "
+                    + "keeps a bounded grace window for slow clients or 
delayed request body chunks."})
+    // Keep a small grace period for delayed body chunks after FE has already 
written the redirect.
+    public static int stream_load_redirect_bounded_drain_max_idle_time_ms = 
1000;
+
     @ConfField(mutable = true, description = {
             "存算分离模式下是否启用 group commit 的 streamload BE 转发功能。"
                     + "解决 LB 随机转发导致 group commit 攒批失效的问题,通过 BE 二次转发确保同表请求到达同一 
BE 节点。",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/config/WebServerFactoryCustomizerConfig.java
 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/config/WebServerFactoryCustomizerConfig.java
index 04add103e31..7f9ce0fff10 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/config/WebServerFactoryCustomizerConfig.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/config/WebServerFactoryCustomizerConfig.java
@@ -62,6 +62,9 @@ public class WebServerFactoryCustomizerConfig implements 
WebServerFactoryCustomi
                     if (httpFactory != null) {
                         HttpConfiguration httpConfig = 
httpFactory.getHttpConfiguration();
                         
httpConfig.setRequestHeaderSize(Config.jetty_server_max_http_header_size);
+                        // Apply the unconsumed request content read limit to 
every HTTP connector.
+                        httpConfig.setMaxUnconsumedRequestContentReads(
+                                
Config.jetty_server_max_unconsumed_request_content_reads);
                     }
                 }
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 133e0c965bc..7e817cc202b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -23,7 +23,6 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.cloud.qe.ComputeGroupException;
-import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -33,6 +32,7 @@ import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
 import org.apache.doris.httpv2.entity.RestBaseResult;
 import org.apache.doris.httpv2.exception.UnauthorizedException;
+import org.apache.doris.httpv2.util.StreamLoadRedirectDrainUtil;
 import org.apache.doris.load.StreamLoadHandler;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.GroupCommitPlanner;
@@ -60,8 +60,8 @@ import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.servlet.view.RedirectView;
 
+import java.io.IOException;
 import java.net.InetAddress;
-import java.net.URI;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Optional;
@@ -132,7 +132,7 @@ public class LoadAction extends RestBaseController {
             if (!checkClusterToken(authToken)) {
                 throw new UnauthorizedException("Invalid token: " + authToken);
             }
-            return executeWithClusterToken(request, db, table, true);
+            return executeWithClusterToken(request, response, db, table, true);
         } else {
             try {
                 executeCheckPassword(request, response);
@@ -191,8 +191,7 @@ public class LoadAction extends RestBaseController {
             LOG.info("redirect load action to destination={}, label: {}",
                     redirectAddr.toString(), label);
 
-            RedirectView redirectView = redirectTo(request, redirectAddr);
-            return redirectView;
+            return createRedirectResponse(request, response, redirectAddr, 
true, null, null, label);
         } catch (Exception e) {
             return new RestBaseResult(e.getMessage());
         }
@@ -313,11 +312,17 @@ public class LoadAction extends RestBaseController {
                         redirectAddr.toString(), isStreamLoad, dbName, 
tableName, label);
             }
 
-            RedirectView redirectView = redirectTo(request, redirectAddr);
-            return redirectView;
+            return createRedirectResponse(request, response, redirectAddr, 
isStreamLoad, dbName, tableName, label);
         } catch (StreamLoadForwardException e) {
-            // Special handling for stream load forwarding
-            return e.getRedirectView();
+            // Handle IOException from redirect response generation in the 
forwarding path.
+            try {
+                return createRedirectResponse(request, response, 
e.getRedirectView(),
+                        isStreamLoad, db, table, label);
+            } catch (IOException ioException) {
+                LOG.warn("stream load forward redirect failed, stream: {}, db: 
{}, tbl: {}, label: {}, err: {}",
+                        isStreamLoad, db, table, label, 
ioException.getMessage());
+                return new RestBaseResult(ioException.getMessage());
+            }
         } catch (Exception e) {
             LOG.warn("load failed, stream: {}, db: {}, tbl: {}, label: {}, 
err: {}",
                     isStreamLoad, db, table, label, e.getMessage());
@@ -594,7 +599,7 @@ public class LoadAction extends RestBaseController {
     // AuditlogPlugin should be re-disigned carefully, and blow method focuses 
on
     // temporarily addressing the users' needs for audit logs.
     // So this function is not widely tested under general scenario
-    private Object executeWithClusterToken(HttpServletRequest request, String 
db,
+    private Object executeWithClusterToken(HttpServletRequest request, 
HttpServletResponse response, String db,
             String table, boolean isStreamLoad) {
         try {
             ConnectContext ctx = new ConnectContext();
@@ -637,29 +642,7 @@ public class LoadAction extends RestBaseController {
                             + "stream: {}, db: {}, tbl: {}, label: {}",
                     redirectAddr.toString(), isStreamLoad, dbName, tableName, 
label);
 
-            URI urlObj = null;
-            URI resultUriObj = null;
-            String urlStr = request.getRequestURI();
-            String userInfo = null;
-
-            try {
-                urlObj = new URI(urlStr);
-                resultUriObj = new URI("http", userInfo, 
redirectAddr.getHostname(),
-                        redirectAddr.getPort(), urlObj.getPath(), "", null);
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-            String redirectUrl = resultUriObj.toASCIIString();
-            if (!Strings.isNullOrEmpty(request.getQueryString())) {
-                redirectUrl += request.getQueryString();
-            }
-            LOG.info("Redirect url: {}", "http://"; + 
redirectAddr.getHostname() + ":"
-                    + redirectAddr.getPort() + urlObj.getPath());
-            RedirectView redirectView = new RedirectView(redirectUrl);
-            redirectView.setContentType("text/html;charset=utf-8");
-            
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
-
-            return redirectView;
+            return createRedirectResponse(request, response, redirectAddr, 
isStreamLoad, dbName, tableName, label);
         } catch (Exception e) {
             LOG.warn("Failed to execute stream load with cluster token, {}", 
e.getMessage(), e);
             return new RestBaseResult(e.getMessage());
@@ -679,6 +662,80 @@ public class LoadAction extends RestBaseController {
         return headers.toString();
     }
 
+    private Object createRedirectResponse(HttpServletRequest request, 
HttpServletResponse response,
+            TNetworkAddress redirectAddr, boolean isStreamLoad, String dbName, 
String tableName, String label)
+            throws IOException {
+        String redirectUrl = buildRedirectUrl(request, redirectAddr);
+        if (!shouldUseBoundedDrainForStreamLoad(isStreamLoad)) {
+            return redirectTo(request, redirectAddr);
+        }
+        writeTemporaryRedirect(response, redirectUrl);
+        DrainDecision drainDecision = 
decideDrainDecisionForStreamLoadRedirect(request);
+        if (drainDecision != DrainDecision.DRAIN) {
+            LOG.info("skip bounded drain after stream load redirect, target: 
{}, db: {}, tbl: {}, label: {},"
+                            + " reason: {}",
+                    redirectAddr, dbName, tableName, label, drainDecision);
+            return null;
+        }
+        drainStreamLoadRequestBodyAfterRedirect(request, 
redirectAddr.toString(), dbName, tableName, label);
+        return null;
+    }
+
+    private Object createRedirectResponse(HttpServletRequest request, 
HttpServletResponse response,
+            RedirectView redirectView, boolean isStreamLoad, String dbName, 
String tableName, String label)
+            throws IOException {
+        if (!shouldUseBoundedDrainForStreamLoad(isStreamLoad)) {
+            return redirectView;
+        }
+        writeTemporaryRedirect(response, redirectView.getUrl());
+        DrainDecision drainDecision = 
decideDrainDecisionForStreamLoadRedirect(request);
+        if (drainDecision != DrainDecision.DRAIN) {
+            LOG.info("skip bounded drain after stream load redirect, target: 
{}, db: {}, tbl: {}, label: {},"
+                            + " reason: {}",
+                    redirectView.getUrl(), dbName, tableName, label, 
drainDecision);
+            return null;
+        }
+        drainStreamLoadRequestBodyAfterRedirect(request, 
redirectView.getUrl(), dbName, tableName, label);
+        return null;
+    }
+
+    private boolean shouldUseBoundedDrainForStreamLoad(boolean isStreamLoad) {
+        return isStreamLoad && 
Config.stream_load_redirect_bounded_drain_max_bytes > 0;
+    }
+
+    // Skip the bounded drain for header-only probes and oversized 
fixed-length bodies.
+    private DrainDecision 
decideDrainDecisionForStreamLoadRedirect(HttpServletRequest request) {
+        long contentLength = request.getContentLengthLong();
+        String transferEncoding = 
request.getHeader(HttpHeaderNames.TRANSFER_ENCODING.toString());
+        if (contentLength <= 0 && Strings.isNullOrEmpty(transferEncoding)) {
+            return DrainDecision.SKIP_NO_REQUEST_BODY;
+        }
+        if (contentLength > 
Config.stream_load_redirect_bounded_drain_max_bytes) {
+            return DrainDecision.SKIP_CONTENT_LENGTH_EXCEEDS_MAX_BYTES;
+        }
+        return DrainDecision.DRAIN;
+    }
+
+    private void drainStreamLoadRequestBodyAfterRedirect(HttpServletRequest 
request, String redirectTarget,
+            String dbName, String tableName, String label) {
+        long drainLimit = Config.stream_load_redirect_bounded_drain_max_bytes;
+        LOG.info("write stream load redirect and start bounded drain, target: 
{}, db: {}, tbl: {}, label: {},"
+                        + " max_drain_bytes: {}",
+                redirectTarget, dbName, tableName, label, drainLimit);
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(request, drainLimit);
+        LOG.info("finish bounded drain after stream load redirect, target: {}, 
db: {}, tbl: {}, label: {},"
+                        + " drained_bytes: {}, elapsed_ms: {}, exit_reason: 
{}",
+                redirectTarget, dbName, tableName, label, 
drainResult.getDrainedBytes(),
+                drainResult.getElapsedMillis(), drainResult.getExitReason());
+    }
+
+    private enum DrainDecision {
+        SKIP_NO_REQUEST_BODY,
+        SKIP_CONTENT_LENGTH_EXCEEDS_MAX_BYTES,
+        DRAIN
+    }
+
     private boolean isSensitiveHeader(String headerName) {
         return "Authorization".equalsIgnoreCase(headerName)
                 || "Proxy-Authorization".equalsIgnoreCase(headerName)
@@ -737,38 +794,17 @@ public class LoadAction extends RestBaseController {
      * @param addr the endpoint address to redirect to (public/private 
endpoint)
      * @param forwardTarget the target BE node in "host:port" format for final 
processing
      * @return RedirectView configured for stream load forwarding
-     */
+    */
     private RedirectView redirectToStreamLoadForward(HttpServletRequest 
request, TNetworkAddress addr,
             String forwardTarget) {
-        URI urlObj = null;
-        URI resultUriObj = null;
-        String urlStr = request.getRequestURI();
-        String userInfo = null;
-        String modifiedPath = null;
-
-        if (!Strings.isNullOrEmpty(request.getHeader("Authorization"))) {
-            ActionAuthorizationInfo authInfo = getAuthorizationInfo(request);
-            userInfo = 
ClusterNamespace.getNameFromFullName(authInfo.fullUserName)
-                    + ":" + authInfo.password;
-        }
-        try {
-            urlObj = new URI(urlStr);
-            // Replace _stream_load with _stream_load_forward in the path
-            modifiedPath = urlObj.getPath().replace("/_stream_load", 
"/_stream_load_forward");
-            resultUriObj = new URI("http", userInfo, addr.getHostname(),
-                    addr.getPort(), modifiedPath, "", null);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        String redirectUrl = resultUriObj.toASCIIString();
-
-        // Add forward_to parameter (note: toASCIIString() already includes 
'?' due to empty query)
+        // Replace _stream_load with _stream_load_forward in the path.
+        String modifiedPath = request.getRequestURI().replace("/_stream_load", 
"/_stream_load_forward");
         String queryString = request.getQueryString();
+        String redirectQuery = "forward_to=" + forwardTarget;
         if (!Strings.isNullOrEmpty(queryString)) {
-            redirectUrl += queryString + "&forward_to=" + forwardTarget;
-        } else {
-            redirectUrl += "forward_to=" + forwardTarget;
+            redirectQuery = queryString + "&" + redirectQuery;
         }
+        String redirectUrl = buildRedirectUrl(request, addr, modifiedPath, 
redirectQuery);
 
         LOG.info("Redirect stream load forward url: {}, forward_to: {}",
                 "http://"; + addr.getHostname() + ":" + addr.getPort() + 
modifiedPath, forwardTarget);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
index 78ac8ef31ef..29cdce15910 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
@@ -89,17 +89,12 @@ public class RestBaseController extends BaseController {
         return authInfo;
     }
 
-    public RedirectView redirectTo(HttpServletRequest request, TNetworkAddress 
addr) {
-        RedirectView redirectView = new RedirectView(getRedirectUrL(request, 
addr));
-        redirectView.setContentType("text/html;charset=utf-8");
-        
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
-        return redirectView;
+    protected String buildRedirectUrl(HttpServletRequest request, 
TNetworkAddress addr) {
+        return buildRedirectUrl(request, addr, request.getRequestURI(), 
request.getQueryString());
     }
 
-    public String getRedirectUrL(HttpServletRequest request, TNetworkAddress 
addr) {
-        URI urlObj = null;
-        URI resultUriObj = null;
-        String urlStr = request.getRequestURI();
+    protected String buildRedirectUrl(HttpServletRequest request, 
TNetworkAddress addr, String requestPath,
+            String queryString) {
         String userInfo = null;
         if (!Strings.isNullOrEmpty(request.getHeader("Authorization"))) {
             ActionAuthorizationInfo authInfo = getAuthorizationInfo(request);
@@ -107,19 +102,37 @@ public class RestBaseController extends BaseController {
                     + ":" + authInfo.password;
         }
         try {
-            urlObj = new URI(urlStr);
-            resultUriObj = new URI(request.getScheme(), userInfo, 
addr.getHostname(),
-                    addr.getPort(), urlObj.getPath(), "", null);
+            // Preserve the original request path to avoid re-encoding an 
already encoded URI path.
+            URI authorityUri = new URI(request.getScheme(), userInfo, 
addr.getHostname(),
+                    addr.getPort(), null, null, null);
+            String redirectUrl = authorityUri.toASCIIString() + requestPath;
+            if (!Strings.isNullOrEmpty(queryString)) {
+                redirectUrl += "?" + queryString;
+            }
+            LOG.info("Redirect url: {}", request.getScheme() + "://" + 
addr.getHostname() + ":"
+                    + addr.getPort() + requestPath);
+            return redirectUrl;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
-        String redirectUrl = resultUriObj.toASCIIString();
-        if (!Strings.isNullOrEmpty(request.getQueryString())) {
-            redirectUrl += request.getQueryString();
-        }
-        LOG.info("Redirect url: {}", request.getScheme() + "://" + 
addr.getHostname() + ":"
-                + addr.getPort() + urlObj.getPath());
-        return redirectUrl;
+    }
+
+    protected void writeTemporaryRedirect(HttpServletResponse response, String 
redirectUrl) throws IOException {
+        response.setContentType("text/html;charset=utf-8");
+        response.setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+        response.setHeader("Location", redirectUrl);
+        response.flushBuffer();
+    }
+
+    public RedirectView redirectTo(HttpServletRequest request, TNetworkAddress 
addr) {
+        RedirectView redirectView = new RedirectView(buildRedirectUrl(request, 
addr));
+        redirectView.setContentType("text/html;charset=utf-8");
+        
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
+        return redirectView;
+    }
+
+    public String getRedirectUrL(HttpServletRequest request, TNetworkAddress 
addr) {
+        return buildRedirectUrl(request, addr);
     }
 
     public RedirectView redirectToObj(String sign) throws URISyntaxException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtil.java
 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtil.java
new file mode 100644
index 00000000000..a9d345ad38d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtil.java
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.util;
+
+import org.apache.doris.common.Config;
+
+import com.google.common.base.Preconditions;
+import jakarta.servlet.ServletInputStream;
+import jakarta.servlet.http.HttpServletRequest;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+
+public final class StreamLoadRedirectDrainUtil {
+    private static final Logger LOG = 
LogManager.getLogger(StreamLoadRedirectDrainUtil.class);
+
+    private static final int BUFFER_SIZE = 8 * 1024;
+    private static final int IDLE_SLEEP_MS = 5;
+
+    private StreamLoadRedirectDrainUtil() {
+    }
+
+    public static DrainResult drainRequestBodyAfterRedirect(HttpServletRequest 
request, long maxBytes) {
+        try {
+            return drainRequestBodyAfterRedirect(request.getInputStream(), 
maxBytes);
+        } catch (IOException e) {
+            LOG.warn("failed to get request input stream for stream load 
redirect drain", e);
+            return new DrainResult(0, 0, ExitReason.ERROR);
+        }
+    }
+
+    static DrainResult drainRequestBodyAfterRedirect(ServletInputStream 
inputStream, long maxBytes) {
+        Preconditions.checkArgument(maxBytes > 0, "maxBytes must be positive");
+
+        long startNanos = System.nanoTime();
+        long drainedBytes = 0;
+        long idleStartNanos = -1L;
+        byte[] buffer = new byte[(int) Math.min(BUFFER_SIZE, maxBytes)];
+
+        try {
+            while (drainedBytes < maxBytes) {
+                // Prefer an explicit EOF signal before relying on available() 
as a readiness hint.
+                if (inputStream.isFinished()) {
+                    return new DrainResult(drainedBytes, 
elapsedMillis(startNanos), ExitReason.EOF);
+                }
+                int availableBytes = inputStream.available();
+                if (availableBytes <= 0) {
+                    // Allow a bounded idle window so slow clients can still 
deliver buffered bytes.
+                    idleStartNanos = idleStartNanos < 0 ? System.nanoTime() : 
idleStartNanos;
+                    DrainResult idleWaitResult = 
waitForMoreDataOrExit(idleStartNanos, drainedBytes, startNanos);
+                    if (idleWaitResult != null) {
+                        return idleWaitResult;
+                    }
+                    continue;
+                }
+
+                idleStartNanos = -1L;
+                int readLimit = (int) Math.min(Math.min(maxBytes - 
drainedBytes, buffer.length), availableBytes);
+                int readBytes = inputStream.read(buffer, 0, readLimit);
+                if (readBytes < 0) {
+                    return new DrainResult(drainedBytes, 
elapsedMillis(startNanos), ExitReason.EOF);
+                }
+                if (readBytes == 0) {
+                    // Treat zero-byte reads as transient backpressure instead 
of busy-spinning.
+                    idleStartNanos = idleStartNanos < 0 ? System.nanoTime() : 
idleStartNanos;
+                    DrainResult idleWaitResult = 
waitForMoreDataOrExit(idleStartNanos, drainedBytes, startNanos);
+                    if (idleWaitResult != null) {
+                        return idleWaitResult;
+                    }
+                    continue;
+                }
+                drainedBytes += readBytes;
+            }
+            return new DrainResult(drainedBytes, elapsedMillis(startNanos), 
ExitReason.MAX_BYTES);
+        } catch (IOException e) {
+            LOG.warn("failed while draining request body after stream load 
redirect", e);
+            return new DrainResult(drainedBytes, elapsedMillis(startNanos), 
ExitReason.ERROR);
+        }
+    }
+
+    // Convert a bounded idle wait into a terminal drain result when the grace 
window expires or gets interrupted.
+    private static DrainResult waitForMoreDataOrExit(long idleStartNanos, long 
drainedBytes, long startNanos) {
+        if (elapsedMillis(idleStartNanos) >= 
Config.stream_load_redirect_bounded_drain_max_idle_time_ms) {
+            return new DrainResult(drainedBytes, elapsedMillis(startNanos), 
ExitReason.IDLE_TIMEOUT);
+        }
+        if (!sleepForIdleWindow()) {
+            return new DrainResult(drainedBytes, elapsedMillis(startNanos), 
ExitReason.INTERRUPTED);
+        }
+        return null;
+    }
+
+    private static boolean sleepForIdleWindow() {
+        try {
+            Thread.sleep(IDLE_SLEEP_MS);
+            return true;
+        } catch (InterruptedException e) {
+            LOG.warn("stream load redirect drain idle wait is interrupted", e);
+            Thread.currentThread().interrupt();
+            return false;
+        }
+    }
+
+    private static long elapsedMillis(long startNanos) {
+        return (System.nanoTime() - startNanos) / 1_000_000;
+    }
+
+    public enum ExitReason {
+        EOF,
+        MAX_BYTES,
+        IDLE_TIMEOUT,
+        INTERRUPTED,
+        ERROR
+    }
+
+    public static final class DrainResult {
+        private final long drainedBytes;
+        private final long elapsedMillis;
+        private final ExitReason exitReason;
+
+        public DrainResult(long drainedBytes, long elapsedMillis, ExitReason 
exitReason) {
+            this.drainedBytes = drainedBytes;
+            this.elapsedMillis = elapsedMillis;
+            this.exitReason = exitReason;
+        }
+
+        public long getDrainedBytes() {
+            return drainedBytes;
+        }
+
+        public long getElapsedMillis() {
+            return elapsedMillis;
+        }
+
+        public ExitReason getExitReason() {
+            return exitReason;
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
index 28d7cda587c..4d57c25fd71 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
@@ -17,20 +17,226 @@
 
 package org.apache.doris.httpv2.rest;
 
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import jakarta.servlet.ReadListener;
+import jakarta.servlet.ServletInputStream;
 import jakarta.servlet.http.HttpServletRequest;
-import org.junit.Assert;
-import org.junit.Test;
+import jakarta.servlet.http.HttpServletResponse;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.servlet.view.RedirectView;
 
 import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.Collections;
 
 public class LoadActionTest {
+    private final boolean originalEnableDebugPoints = 
Config.enable_debug_points;
+    private final String originalCloudUniqueId = Config.cloud_unique_id;
+    private final boolean originalEnableGroupCommitStreamLoadBeForward =
+            Config.enable_group_commit_streamload_be_forward;
+
+
+    @AfterEach
+    public void tearDown() {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 1024L * 1024 * 
1024;
+        Config.enable_debug_points = originalEnableDebugPoints;
+        Config.cloud_unique_id = originalCloudUniqueId;
+        Config.enable_group_commit_streamload_be_forward = 
originalEnableGroupCommitStreamLoadBeForward;
+        DebugPointUtil.clearDebugPoints();
+        Thread.interrupted();
+        org.apache.doris.qe.ConnectContext.remove();
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 1000;
+    }
+
+    @Test
+    public void 
testCreateRedirectResponseReturnsRedirectViewWhenBoundedDrainDisabled() throws 
Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 0;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest();
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                new TNetworkAddress("be-host", 8040), true, "db1", "tbl1", 
"label1");
+
+        Assertions.assertTrue(result instanceof RedirectView);
+        RedirectView redirectView = (RedirectView) result;
+        
Assertions.assertEquals("http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";,
 redirectView.getUrl());
+        Mockito.verifyNoInteractions(response);
+    }
+
+    @Test
+    public void testCreateRedirectResponseWrites307WhenBoundedDrainEnabled() 
throws Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(-1, "chunked", 
true);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                new TNetworkAddress("be-host", 8040), true, "db1", "tbl1", 
"label1");
+
+        Assertions.assertNull(result);
+        Mockito.verify(response).setContentType("text/html;charset=utf-8");
+        
Mockito.verify(response).setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+        Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";);
+        Mockito.verify(response).flushBuffer();
+        Mockito.verify(request).getInputStream();
+    }
+
+    @Test
+    public void 
testCreateRedirectResponseDrainsFixedLengthRequestWithinLimit() throws 
Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(4, null, true);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                new TNetworkAddress("be-host", 8040), true, "db1", "tbl1", 
"label1");
+
+        Assertions.assertNull(result);
+        Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";);
+        Mockito.verify(request).getInputStream();
+    }
+
+    @Test
+    public void testCreateRedirectResponseSkipsDrainWithoutRequestBody() 
throws Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                new TNetworkAddress("be-host", 8040), true, "db1", "tbl1", 
"label1");
+
+        Assertions.assertNull(result);
+        Mockito.verify(response).setContentType("text/html;charset=utf-8");
+        
Mockito.verify(response).setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+        Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";);
+        Mockito.verify(response).flushBuffer();
+        Mockito.verify(request, Mockito.never()).getInputStream();
+    }
+
+    @Test
+    public void testCreateRedirectResponseSkipsDrainWhenContentLengthIsZero() 
throws Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(0, null, false);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                new TNetworkAddress("be-host", 8040), true, "db1", "tbl1", 
"label1");
+
+        Assertions.assertNull(result);
+        Mockito.verify(response).setContentType("text/html;charset=utf-8");
+        
Mockito.verify(response).setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+        Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";);
+        Mockito.verify(response).flushBuffer();
+        Mockito.verify(request, Mockito.never()).getInputStream();
+    }
+
+    @Test
+    public void 
testCreateRedirectResponseSkipsDrainWhenContentLengthExceedsLimit() throws 
Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(16, null, false);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                new TNetworkAddress("be-host", 8040), true, "db1", "tbl1", 
"label1");
+
+        Assertions.assertNull(result);
+        Mockito.verify(response).setContentType("text/html;charset=utf-8");
+        
Mockito.verify(response).setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+        Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";);
+        Mockito.verify(response).flushBuffer();
+        Mockito.verify(request, Mockito.never()).getInputStream();
+    }
+
+    @Test
+    public void 
testCreateRedirectResponseWithRedirectViewReturnsRedirectViewWhenDrainDisabled()
 throws Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 0;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        RedirectView redirectView = new 
RedirectView("http://be-host:8040/redirect";);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                redirectView, true, "db1", "tbl1", "label1");
+
+        Assertions.assertSame(redirectView, result);
+        Mockito.verifyNoInteractions(response);
+        Mockito.verify(request, Mockito.never()).getInputStream();
+    }
+
+    @Test
+    public void 
testCreateRedirectResponseWithRedirectViewSkipsDrainWithoutRequestBody() throws 
Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        RedirectView redirectView = new 
RedirectView("http://be-host:8040/redirect";);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                redirectView, true, "db1", "tbl1", "label1");
+
+        Assertions.assertNull(result);
+        Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/redirect";);
+        Mockito.verify(request, Mockito.never()).getInputStream();
+    }
+
+    @Test
+    public void 
testCreateRedirectResponseWithRedirectViewDrainsChunkedRequest() throws 
Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(-1, "chunked", 
true);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        RedirectView redirectView = new 
RedirectView("http://be-host:8040/redirect";);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                redirectView, true, "db1", "tbl1", "label1");
+
+        Assertions.assertNull(result);
+        Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/redirect";);
+        Mockito.verify(request).getInputStream();
+    }
+
+    @Test
+    public void testCreateRedirectResponseKeepsNonStreamLoadBehavior() throws 
Exception {
+        Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        LoadAction loadAction = new LoadAction();
+        HttpServletRequest request = mockStreamLoadRequest();
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+        Object result = invokeCreateRedirectResponse(loadAction, request, 
response,
+                new TNetworkAddress("be-host", 8040), false, "db1", "tbl1", 
"label1");
+
+        Assertions.assertTrue(result instanceof RedirectView);
+        Mockito.verifyNoInteractions(response);
+    }
 
     @Test
     public void testGetAllHeadersMasksSensitiveHeaders() throws Exception {
-        LoadAction action = new LoadAction();
+        LoadAction loadAction = new LoadAction();
         HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
         
Mockito.when(request.getHeaderNames()).thenReturn(Collections.enumeration(Arrays.asList(
                 "Authorization", "Cookie", "Set-Cookie", "token", "label")));
@@ -38,12 +244,233 @@ public class LoadActionTest {
 
         Method method = LoadAction.class.getDeclaredMethod("getAllHeaders", 
HttpServletRequest.class);
         method.setAccessible(true);
-        String headers = (String) method.invoke(action, request);
+        String headers = (String) method.invoke(loadAction, request);
+
+        Assertions.assertTrue(headers.contains("Authorization:***MASKED***"));
+        Assertions.assertTrue(headers.contains("Cookie:***MASKED***"));
+        Assertions.assertTrue(headers.contains("Set-Cookie:***MASKED***"));
+        Assertions.assertTrue(headers.contains("token:***MASKED***"));
+        Assertions.assertTrue(headers.contains("label:load_label"));
+    }
+
+    @Test
+    public void testExecuteWithoutPasswordRedirectsToBackend() throws 
Exception {
+        Config.enable_debug_points = true;
+        
DebugPointUtil.addDebugPointWithValue("LoadAction.selectRedirectBackend.backendId",
 1L);
+        TestLoadAction loadAction = new TestLoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        Backend backend = mockBackend("be-host", 8040, null);
+        SystemInfoService systemInfoService = 
Mockito.mock(SystemInfoService.class);
+        Mockito.when(systemInfoService.getBackend(1L)).thenReturn(backend);
+        Mockito.when(request.getHeader("expect")).thenReturn("100-continue");
+
+        org.apache.doris.qe.ConnectContext connectContext = new 
org.apache.doris.qe.ConnectContext();
+        connectContext.setThreadLocalInfo();
+        try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+            
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService);
+
+            Object result = invokeExecuteWithoutPassword(loadAction, request, 
response, "db1", "tbl1", true, false);
+
+            Assertions.assertNull(result);
+            Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";);
+        }
+    }
+
+    @Test
+    public void testExecuteWithClusterTokenRedirectsToBackend() throws 
Exception {
+        Config.enable_debug_points = true;
+        
DebugPointUtil.addDebugPointWithValue("LoadAction.selectRedirectBackend.backendId",
 1L);
+        TestLoadAction loadAction = new TestLoadAction();
+        HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        Backend backend = mockBackend("be-host", 8040, null);
+        SystemInfoService systemInfoService = 
Mockito.mock(SystemInfoService.class);
+        Env env = Mockito.mock(Env.class);
+        InternalCatalog internalCatalog = Mockito.mock(InternalCatalog.class);
+        Mockito.when(systemInfoService.getBackend(1L)).thenReturn(backend);
+        // Provide the default catalog name required by 
ConnectContext.setEnv().
+        Mockito.when(env.getInternalCatalog()).thenReturn(internalCatalog);
+        Mockito.when(internalCatalog.getName()).thenReturn("internal");
+        Mockito.when(request.getHeader("expect")).thenReturn("100-continue");
+        Mockito.when(request.getHeader("label")).thenReturn("label1");
+        Mockito.when(request.getRemoteAddr()).thenReturn("127.0.0.1");
+
+        try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+            
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService);
+            mockedEnv.when(Env::getCurrentEnv).thenReturn(env);
+
+            Object result = invokeExecuteWithClusterToken(loadAction, request, 
response, "db1", "tbl1", true);
+
+            Assertions.assertNull(result);
+            Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";);
+        }
+    }
+
+    @Test
+    public void testStreamLoadWithSqlRedirectsWhenExpectHeaderExists() {
+        Config.enable_debug_points = true;
+        
DebugPointUtil.addDebugPointWithValue("LoadAction.selectRedirectBackend.backendId",
 1L);
+        TestLoadAction loadAction = new TestLoadAction();
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        Backend backend = mockBackend("be-host", 8040, null);
+        SystemInfoService systemInfoService = 
Mockito.mock(SystemInfoService.class);
+        Mockito.when(systemInfoService.getBackend(1L)).thenReturn(backend);
+        Mockito.when(request.getHeader("sql")).thenReturn("insert into 
db1.tbl1 values(1)");
+        Mockito.when(request.getHeader("expect")).thenReturn("100-continue");
+        
Mockito.when(request.getRequestURI()).thenReturn("/api/db1/tbl1/_stream_load");
+        Mockito.when(request.getQueryString()).thenReturn("foo=bar");
+        Mockito.when(request.getScheme()).thenReturn("http");
+        Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+        Mockito.when(request.getContentLengthLong()).thenReturn(-1L);
+        Mockito.when(request.getHeader("transfer-encoding")).thenReturn(null);
+
+        try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+            
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService);
+
+            Object result = loadAction.streamLoadWithSql(request, response);
+
+            Assertions.assertNull(result);
+            Mockito.verify(response).setHeader("Location", 
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar";);
+        }
+    }
+
+    @Test
+    public void testRedirectToStreamLoadForwardBuildsForwardUrl() throws 
Exception {
+        TestLoadAction loadAction = new TestLoadAction();
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        
Mockito.when(request.getRequestURI()).thenReturn("/api/db1/tbl1/_stream_load");
+        Mockito.when(request.getQueryString()).thenReturn("k=v");
+        Mockito.when(request.getScheme()).thenReturn("http");
+        Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+
+        RedirectView redirectView = 
invokeRedirectToStreamLoadForward(loadAction, request,
+                new TNetworkAddress("lb-host", 18040), "be-host:8040");
+
+        
Assertions.assertEquals("http://lb-host:18040/api/db1/tbl1/_stream_load_forward?k=v&forward_to=be-host:8040";,
+                redirectView.getUrl());
+    }
+
+    private Object invokeCreateRedirectResponse(LoadAction loadAction, 
HttpServletRequest request,
+            HttpServletResponse response, TNetworkAddress redirectAddr, 
boolean isStreamLoad, String dbName,
+            String tableName, String label) throws Exception {
+        Method method = 
LoadAction.class.getDeclaredMethod("createRedirectResponse",
+                HttpServletRequest.class, HttpServletResponse.class, 
TNetworkAddress.class,
+                boolean.class, String.class, String.class, String.class);
+        method.setAccessible(true);
+        return method.invoke(loadAction, request, response, redirectAddr, 
isStreamLoad, dbName, tableName, label);
+    }
+
+    private Object invokeCreateRedirectResponse(LoadAction loadAction, 
HttpServletRequest request,
+            HttpServletResponse response, RedirectView redirectView, boolean 
isStreamLoad, String dbName,
+            String tableName, String label) throws Exception {
+        Method method = 
LoadAction.class.getDeclaredMethod("createRedirectResponse",
+                HttpServletRequest.class, HttpServletResponse.class, 
RedirectView.class,
+                boolean.class, String.class, String.class, String.class);
+        method.setAccessible(true);
+        return method.invoke(loadAction, request, response, redirectView, 
isStreamLoad, dbName, tableName, label);
+    }
+
+    private Object invokeExecuteWithoutPassword(LoadAction loadAction, 
HttpServletRequest request,
+            HttpServletResponse response, String db, String table, boolean 
isStreamLoad, boolean groupCommit)
+            throws Exception {
+        Method method = 
LoadAction.class.getDeclaredMethod("executeWithoutPassword",
+                HttpServletRequest.class, HttpServletResponse.class, 
String.class, String.class,
+                boolean.class, boolean.class);
+        method.setAccessible(true);
+        return method.invoke(loadAction, request, response, db, table, 
isStreamLoad, groupCommit);
+    }
+
+    private Object invokeExecuteWithClusterToken(LoadAction loadAction, 
HttpServletRequest request,
+            HttpServletResponse response, String db, String table, boolean 
isStreamLoad) throws Exception {
+        Method method = 
LoadAction.class.getDeclaredMethod("executeWithClusterToken",
+                HttpServletRequest.class, HttpServletResponse.class, 
String.class, String.class, boolean.class);
+        method.setAccessible(true);
+        return method.invoke(loadAction, request, response, db, table, 
isStreamLoad);
+    }
+
+    private RedirectView invokeRedirectToStreamLoadForward(LoadAction 
loadAction, HttpServletRequest request,
+            TNetworkAddress addr, String forwardTarget) throws Exception {
+        Method method = 
LoadAction.class.getDeclaredMethod("redirectToStreamLoadForward",
+                HttpServletRequest.class, TNetworkAddress.class, String.class);
+        method.setAccessible(true);
+        return (RedirectView) method.invoke(loadAction, request, addr, 
forwardTarget);
+    }
+
+    private HttpServletRequest mockStreamLoadRequest() throws Exception {
+        return mockStreamLoadRequest(-1, null, true);
+    }
+
+    // Provide explicit request body metadata so the drain decision can be 
verified in isolation.
+    private HttpServletRequest mockStreamLoadRequest(long contentLength, 
String transferEncoding,
+            boolean stubInputStream) throws Exception {
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        Mockito.when(request.getScheme()).thenReturn("http");
+        
Mockito.when(request.getRequestURI()).thenReturn("/api/db1/tbl1/_stream_load");
+        Mockito.when(request.getQueryString()).thenReturn("foo=bar");
+        Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+        Mockito.when(request.getContentLengthLong()).thenReturn(contentLength);
+        
Mockito.when(request.getHeader("Transfer-Encoding")).thenReturn(transferEncoding);
+        
Mockito.when(request.getHeader("transfer-encoding")).thenReturn(transferEncoding);
+        if (stubInputStream) {
+            Mockito.when(request.getInputStream()).thenReturn(new 
IdleServletInputStream());
+        }
+        return request;
+    }
+
+    // Create a backend mock with only the redirect-related fields populated 
for lightweight controller tests.
+    private Backend mockBackend(String host, int httpPort, String 
privateEndpoint) {
+        Backend backend = Mockito.mock(Backend.class);
+        Mockito.when(backend.getHost()).thenReturn(host);
+        Mockito.when(backend.getHttpPort()).thenReturn(httpPort);
+        Mockito.when(backend.getPrivateEndpoint()).thenReturn(privateEndpoint);
+        Mockito.when(backend.getPublicEndpoint()).thenReturn(null);
+        return backend;
+    }
+
+    private static class TestLoadAction extends LoadAction {
+        @Override
+        public 
org.apache.doris.httpv2.controller.BaseController.ActionAuthorizationInfo 
executeCheckPassword(
+                HttpServletRequest request,
+                HttpServletResponse response) {
+            return new 
org.apache.doris.httpv2.controller.BaseController.ActionAuthorizationInfo();
+        }
+
+        @Override
+        protected void checkTblAuth(org.apache.doris.analysis.UserIdentity 
currentUser, String db, String tbl,
+                org.apache.doris.mysql.privilege.PrivPredicate predicate) {
+        }
+
+        @Override
+        protected boolean checkClusterToken(String token) {
+            return true;
+        }
+    }
+
+    private static class IdleServletInputStream extends ServletInputStream {
+        @Override
+        public int read() {
+            return -1;
+        }
+
+        @Override
+        public int available() {
+            return 0;
+        }
+
+        @Override
+        public boolean isFinished() {
+            return false;
+        }
+
+        @Override
+        public boolean isReady() {
+            return true;
+        }
 
-        Assert.assertTrue(headers.contains("Authorization:***MASKED***"));
-        Assert.assertTrue(headers.contains("Cookie:***MASKED***"));
-        Assert.assertTrue(headers.contains("Set-Cookie:***MASKED***"));
-        Assert.assertTrue(headers.contains("token:***MASKED***"));
-        Assert.assertTrue(headers.contains("label:load_label"));
+        @Override
+        public void setReadListener(ReadListener readListener) {
+        }
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/RestBaseControllerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/RestBaseControllerTest.java
new file mode 100644
index 00000000000..b838e25ef50
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/RestBaseControllerTest.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.rest;
+
+import org.apache.doris.thrift.TNetworkAddress;
+
+import jakarta.servlet.http.HttpServletRequest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class RestBaseControllerTest {
+
+    @Test
+    public void testBuildRedirectUrlPreservesEncodedPath() {
+        // Keep the original encoded path unchanged when rebuilding the 
redirect URL.
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        Mockito.when(request.getScheme()).thenReturn("http");
+        Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+
+        TestRestController controller = new TestRestController();
+        String redirectUrl = controller.buildRedirectUrlForTest(request,
+                new TNetworkAddress("be-host", 8040), 
"/api/db%2Ftbl/_stream_load", "k=a%2Bb");
+
+        
Assert.assertEquals("http://be-host:8040/api/db%2Ftbl/_stream_load?k=a%2Bb";, 
redirectUrl);
+    }
+
+    @Test
+    public void testBuildRedirectUrlWithoutQueryString() {
+        // Avoid appending a dangling question mark when the original request 
has no query string.
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        Mockito.when(request.getScheme()).thenReturn("http");
+        Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+
+        TestRestController controller = new TestRestController();
+        String redirectUrl = controller.buildRedirectUrlForTest(request,
+                new TNetworkAddress("be-host", 8040), 
"/api/db%2Ftbl/_stream_load", null);
+
+        Assert.assertEquals("http://be-host:8040/api/db%2Ftbl/_stream_load";, 
redirectUrl);
+    }
+
+    // Expose the protected helper so the redirect URL can be verified 
directly.
+    private static class TestRestController extends RestBaseController {
+        private String buildRedirectUrlForTest(HttpServletRequest request, 
TNetworkAddress addr,
+                String requestPath, String queryString) {
+            return buildRedirectUrl(request, addr, requestPath, queryString);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtilTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtilTest.java
new file mode 100644
index 00000000000..8a8d6cae165
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtilTest.java
@@ -0,0 +1,394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.util;
+
+import org.apache.doris.common.Config;
+
+import jakarta.servlet.ReadListener;
+import jakarta.servlet.ServletInputStream;
+import jakarta.servlet.http.HttpServletRequest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+public class StreamLoadRedirectDrainUtilTest {
+
+    @AfterEach
+    public void tearDown() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 1000;
+    }
+
+    @Test
+    public void testDrainRequestBodyWithinMaxBytes() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(
+                        new 
QueueAvailableServletInputStream("hello".getBytes(), 5, 0, 0, 0), 16);
+
+        Assertions.assertEquals(5, drainResult.getDrainedBytes());
+        Assertions.assertTrue(drainResult.getElapsedMillis() >= 0);
+        Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void testDrainRequestBodyAfterRedirectUsesRequestInputStream() 
throws Exception {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        Mockito.when(request.getInputStream())
+                .thenReturn(new 
QueueAvailableServletInputStream("hello".getBytes(), 5, 0, 0, 0));
+
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(request, 16);
+
+        Assertions.assertEquals(5, drainResult.getDrainedBytes());
+        Assertions.assertTrue(drainResult.getElapsedMillis() >= 0);
+        Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void 
testDrainRequestBodyAfterRedirectReturnsErrorWhenGetInputStreamFails() throws 
Exception {
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        Mockito.when(request.getInputStream()).thenThrow(new IOException("open 
error"));
+
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(request, 16);
+
+        Assertions.assertEquals(0, drainResult.getDrainedBytes());
+        Assertions.assertEquals(0, drainResult.getElapsedMillis());
+        Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.ERROR, 
drainResult.getExitReason());
+    }
+
+    @Test
+    // Verify delayed body chunks are still drained when they arrive within 
the bounded idle window.
+    public void testDrainRequestBodyAllowsDelayedArrivalWithinIdleWindow() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(
+                        new 
QueueAvailableServletInputStream("hello".getBytes(), 0, 0, 0, 0, 0, 5), 16);
+
+        Assertions.assertEquals(5, drainResult.getDrainedBytes());
+        Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void testDrainRequestBodyStopsAtMaxBytes() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(
+                        new QueueAvailableServletInputStream("hello 
world".getBytes(), 11), 5);
+
+        Assertions.assertEquals(5, drainResult.getDrainedBytes());
+        
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.MAX_BYTES, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void testDrainRequestBodyIdleTimeout() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new 
NeverReadyServletInputStream(), 8);
+
+        Assertions.assertEquals(0, drainResult.getDrainedBytes());
+        
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.IDLE_TIMEOUT, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void testDrainRequestBodyUsesConfiguredIdleTime() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(
+                        new 
QueueAvailableServletInputStream("hello".getBytes(), 0, 5), 16);
+
+        Assertions.assertEquals(0, drainResult.getDrainedBytes());
+        
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.IDLE_TIMEOUT, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void testDrainRequestBodyInterruptedWhileWaitingForMoreData() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        try {
+            Thread.currentThread().interrupt();
+            StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                    
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new 
NeverReadyServletInputStream(), 8);
+
+            Assertions.assertEquals(0, drainResult.getDrainedBytes());
+            
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.INTERRUPTED,
+                    drainResult.getExitReason());
+        } finally {
+            // Clear the interrupt flag to avoid affecting later tests in the 
same thread.
+            Thread.interrupted();
+        }
+    }
+
+    @Test
+    public void testDrainRequestBodyReadError() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new 
ErrorServletInputStream(), 8);
+
+        Assertions.assertEquals(0, drainResult.getDrainedBytes());
+        Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.ERROR, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void testDrainRequestBodyEof() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new 
EofServletInputStream(), 8);
+
+        Assertions.assertEquals(0, drainResult.getDrainedBytes());
+        Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void testDrainRequestBodyRejectsNonPositiveMaxBytes() {
+        Assertions.assertThrows(IllegalArgumentException.class,
+                () -> 
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new 
NeverReadyServletInputStream(), 0));
+    }
+
+    @Test
+    public void testDrainRequestBodyReturnsEofWhenReadReturnsNegative() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new 
ReadNegativeServletInputStream(), 8);
+
+        Assertions.assertEquals(0, drainResult.getDrainedBytes());
+        Assertions.assertTrue(drainResult.getElapsedMillis() >= 0);
+        Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF, 
drainResult.getExitReason());
+    }
+
+    @Test
+    public void testDrainRequestBodyReadZeroTriggersIdleTimeout() {
+        Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+        StreamLoadRedirectDrainUtil.DrainResult drainResult =
+                StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new 
ReadZeroServletInputStream(), 8);
+
+        Assertions.assertEquals(0, drainResult.getDrainedBytes());
+        Assertions.assertTrue(drainResult.getElapsedMillis() >= 0);
+        
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.IDLE_TIMEOUT, 
drainResult.getExitReason());
+    }
+
+    private static class QueueAvailableServletInputStream extends 
ServletInputStream {
+        private final byte[] data;
+        private final Queue<Integer> availableValues = new ArrayDeque<>();
+        private int offset = 0;
+
+        QueueAvailableServletInputStream(byte[] data, int... availableValues) {
+            this.data = data;
+            for (int availableValue : availableValues) {
+                this.availableValues.add(availableValue);
+            }
+        }
+
+        @Override
+        public int read() {
+            if (offset >= data.length) {
+                return -1;
+            }
+            return data[offset++] & 0xFF;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) {
+            if (offset >= data.length) {
+                return -1;
+            }
+            int readBytes = Math.min(len, data.length - offset);
+            System.arraycopy(data, offset, b, off, readBytes);
+            offset += readBytes;
+            return readBytes;
+        }
+
+        @Override
+        public int available() {
+            if (!availableValues.isEmpty()) {
+                return availableValues.poll();
+            }
+            return Math.max(0, data.length - offset);
+        }
+
+        @Override
+        public boolean isFinished() {
+            return offset >= data.length;
+        }
+
+        @Override
+        public boolean isReady() {
+            return true;
+        }
+
+        @Override
+        public void setReadListener(ReadListener readListener) {
+        }
+    }
+
+    private static class ErrorServletInputStream extends ServletInputStream {
+        @Override
+        public int read() throws IOException {
+            throw new IOException("read error");
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+            throw new IOException("read error");
+        }
+
+        @Override
+        public int available() {
+            return 1;
+        }
+
+        @Override
+        public boolean isFinished() {
+            return false;
+        }
+
+        @Override
+        public boolean isReady() {
+            return true;
+        }
+
+        @Override
+        public void setReadListener(ReadListener readListener) {
+        }
+    }
+
+    private static class EofServletInputStream extends ServletInputStream {
+        @Override
+        public int read() {
+            return -1;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) {
+            return -1;
+        }
+
+        @Override
+        public int available() {
+            return 1;
+        }
+
+        @Override
+        public boolean isFinished() {
+            return true;
+        }
+
+        @Override
+        public boolean isReady() {
+            return true;
+        }
+
+        @Override
+        public void setReadListener(ReadListener readListener) {
+        }
+    }
+
+    private static class ReadNegativeServletInputStream extends 
ServletInputStream {
+        @Override
+        public int read() {
+            return -1;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) {
+            return -1;
+        }
+
+        @Override
+        public int available() {
+            return 1;
+        }
+
+        @Override
+        public boolean isFinished() {
+            return false;
+        }
+
+        @Override
+        public boolean isReady() {
+            return true;
+        }
+
+        @Override
+        public void setReadListener(ReadListener readListener) {
+        }
+    }
+
+    private static class ReadZeroServletInputStream extends ServletInputStream 
{
+        @Override
+        public int read() {
+            return 0;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) {
+            return 0;
+        }
+
+        @Override
+        public int available() {
+            return 1;
+        }
+
+        @Override
+        public boolean isFinished() {
+            return false;
+        }
+
+        @Override
+        public boolean isReady() {
+            return true;
+        }
+
+        @Override
+        public void setReadListener(ReadListener readListener) {
+        }
+    }
+
+    // Keep reporting no readable bytes without reaching EOF to simulate a 
stalled client.
+    private static class NeverReadyServletInputStream extends 
ServletInputStream {
+        @Override
+        public int read() {
+            return -1;
+        }
+
+        @Override
+        public int available() {
+            return 0;
+        }
+
+        @Override
+        public boolean isFinished() {
+            return false;
+        }
+
+        @Override
+        public boolean isReady() {
+            return true;
+        }
+
+        @Override
+        public void setReadListener(ReadListener readListener) {
+        }
+    }
+}
diff --git 
a/regression-test/suites/load_p0/stream_load/scripts/stream_load_redirect_chunked_e2e.py
 
b/regression-test/suites/load_p0/stream_load/scripts/stream_load_redirect_chunked_e2e.py
new file mode 100644
index 00000000000..5dca0091b7e
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/scripts/stream_load_redirect_chunked_e2e.py
@@ -0,0 +1,140 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Exercise the FE redirect path with a chunked stream load request."""
+
+import argparse
+import base64
+import http.client
+import json
+import sys
+import time
+import uuid
+
+
+def parse_args():
+    parser = argparse.ArgumentParser(
+        description="Send a chunked stream load request to FE and capture the 
redirect result."
+    )
+    parser.add_argument("--host", required=True, help="FE host")
+    parser.add_argument("--fe-http-port", required=True, type=int, help="FE 
HTTP port")
+    parser.add_argument("--user", required=True, help="FE HTTP user")
+    parser.add_argument("--password", default="", help="FE HTTP password")
+    parser.add_argument("--db", required=True, help="Target database")
+    parser.add_argument("--table", required=True, help="Target table")
+    parser.add_argument("--payload-mb", type=int, default=8, help="Approximate 
payload size in MiB")
+    parser.add_argument("--chunk-kb", type=int, default=8, help="Chunk size in 
KiB")
+    parser.add_argument("--sleep-ms", type=int, default=10, help="Delay 
between chunks in milliseconds")
+    parser.add_argument("--connect-timeout", type=int, default=5, 
help="Connect timeout in seconds")
+    parser.add_argument("--read-timeout", type=int, default=120, help="Read 
timeout in seconds")
+    return parser.parse_args()
+
+
+def build_auth_header(user, password):
+    token = 
base64.b64encode(f"{user}:{password}".encode("utf-8")).decode("ascii")
+    return f"Basic {token}"
+
+
+def build_csv_chunk(chunk_bytes, chunk_index):
+    # Generate deterministic CSV rows so the request body is valid for stream 
load.
+    rows = []
+    current = 0
+    row_index = chunk_index * 100000
+    payload_width = max(8, min(256, max(1, chunk_bytes // 8)))
+    while True:
+        row = f"{row_index},payload_{chunk_index}_{'x' * payload_width}\n"
+        row_size = len(row.encode("utf-8"))
+        if rows and current + row_size > chunk_bytes:
+            break
+        rows.append(row)
+        current += row_size
+        row_index += 1
+        if current >= chunk_bytes:
+            break
+    return "".join(rows).encode("utf-8")
+
+
+def chunked_csv_generator(total_bytes, chunk_bytes, sleep_ms):
+    sent = 0
+    chunk_index = 0
+    while sent < total_bytes:
+        current_size = min(chunk_bytes, total_bytes - sent)
+        yield build_csv_chunk(current_size, chunk_index)
+        sent += current_size
+        chunk_index += 1
+        if sleep_ms > 0:
+            time.sleep(sleep_ms / 1000.0)
+
+
+def main():
+    args = parse_args()
+    path = f"/api/{args.db}/{args.table}/_stream_load"
+    conn = http.client.HTTPConnection(args.host, args.fe_http_port, 
timeout=args.read_timeout)
+    label = f"stream_load_redirect_regression_{uuid.uuid4().hex[:12]}"
+    total_bytes = args.payload_mb * 1024 * 1024
+    chunk_bytes = args.chunk_kb * 1024
+    started = time.time()
+
+    try:
+        # Send the request body with explicit chunk framing so the client 
keeps writing
+        # while FE returns the redirect response.
+        conn.putrequest("PUT", path)
+        conn.putheader("Authorization", build_auth_header(args.user, 
args.password))
+        conn.putheader("Expect", "100-continue")
+        conn.putheader("Transfer-Encoding", "chunked")
+        conn.putheader("column_separator", ",")
+        conn.putheader("label", label)
+        conn.endheaders()
+
+        for chunk in chunked_csv_generator(total_bytes, chunk_bytes, 
args.sleep_ms):
+            conn.send(f"{len(chunk):X}\r\n".encode("ascii"))
+            conn.send(chunk)
+            conn.send(b"\r\n")
+        conn.send(b"0\r\n\r\n")
+
+        response = conn.getresponse()
+        body = response.read().decode("utf-8", errors="replace")
+        result = {
+            "status_code": response.status,
+            "elapsed_seconds": round(time.time() - started, 3),
+            "headers": dict(response.getheaders()),
+            "body": body[:2000],
+            "exception_type": None,
+            "exception": None,
+            "label": label,
+        }
+        print(json.dumps(result, ensure_ascii=False))
+        return 0
+    except Exception as exc:
+        result = {
+            "status_code": None,
+            "elapsed_seconds": round(time.time() - started, 3),
+            "headers": {},
+            "body": "",
+            "exception_type": type(exc).__name__,
+            "exception": repr(exc),
+            "label": label,
+        }
+        print(json.dumps(result, ensure_ascii=False))
+        return 1
+    finally:
+        conn.close()
+
+
+if __name__ == "__main__":
+    sys.exit(main())
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_fe_redirect_chunked_e2e.groovy
 
b/regression-test/suites/load_p0/stream_load/test_stream_load_fe_redirect_chunked_e2e.groovy
new file mode 100644
index 00000000000..282f14331ee
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_stream_load_fe_redirect_chunked_e2e.groovy
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_fe_redirect_chunked_e2e", "p0") {
+    String dbName = context.config.getDbNameByFile(context.file)
+    String tableName = "test_stream_load_fe_redirect_chunked_e2e"
+    String helperPath = 
"${context.file.parent}/scripts/stream_load_redirect_chunked_e2e.py"
+    String feHttpAddress = context.config.feHttpAddress
+    int feAddressSeparator = feHttpAddress.lastIndexOf(":")
+    assertTrue(feAddressSeparator > 0)
+    String feHost = feHttpAddress.substring(0, feAddressSeparator)
+    String fePort = feHttpAddress.substring(feAddressSeparator + 1)
+
+    sql """ CREATE DATABASE IF NOT EXISTS ${dbName} """
+    sql """ DROP TABLE IF EXISTS ${dbName}.${tableName} """
+    sql """
+        CREATE TABLE ${dbName}.${tableName} (
+            k1 BIGINT,
+            v1 STRING
+        )
+        DUPLICATE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        )
+    """
+
+    // Read the current FE values first so the ordinary suite can restore 
shared settings.
+    def getFrontendConfigValue = { configKey ->
+        def configRows = sql """ admin show frontend config """
+        def configRow = configRows.find { it[0] == configKey }
+        assertTrue(configRow != null)
+        return configRow[1].toString()
+    }
+
+    String originalDrainMaxBytes = 
getFrontendConfigValue("stream_load_redirect_bounded_drain_max_bytes")
+    String originalDrainMaxIdleTimeMs = 
getFrontendConfigValue("stream_load_redirect_bounded_drain_max_idle_time_ms")
+
+    // Treat common client-side disconnects as the reproduced historical 
failure signal.
+    def reproducedExceptionTypes = ["BrokenPipeError", "ConnectionResetError"] 
as Set
+
+    // Keep the helper single-shot and let the regression test control retries 
and assertions.
+    def runChunkedStreamLoad = {
+        def command = [
+            "python3",
+            helperPath,
+            "--host", feHost,
+            "--fe-http-port", fePort,
+            "--user", context.config.feHttpUser,
+            "--password", context.config.feHttpPassword == null ? "" : 
context.config.feHttpPassword,
+            "--db", dbName,
+            "--table", tableName,
+            "--payload-mb", "8",
+            "--chunk-kb", "8",
+            "--sleep-ms", "10"
+        ]
+        def process = command.execute()
+        def code = process.waitFor()
+        def out = process.in.text.trim()
+        def err = process.err.text.trim()
+        log.info("stream load redirect helper stdout: ${out}")
+        if (!err.isEmpty()) {
+            log.info("stream load redirect helper stderr: ${err}")
+        }
+        return [code: code, result: parseJson(out)]
+    }
+
+    try {
+        // First disable the drain and reproduce the historical client 
disconnect behavior.
+        sql """ admin set frontend 
config("stream_load_redirect_bounded_drain_max_bytes" = "0") """
+        sql """ admin set frontend 
config("stream_load_redirect_bounded_drain_max_idle_time_ms" = "1") """
+        boolean brokenPipeObserved = false
+        for (int i = 0; i < 5; i++) {
+            def attempt = runChunkedStreamLoad()
+            def result = attempt.result
+            if (reproducedExceptionTypes.contains(result.exception_type)) {
+                brokenPipeObserved = true
+                break
+            }
+            assertEquals(307, result.status_code as Integer)
+            assertTrue(result.exception_type == null)
+            assertTrue(result.exception == null)
+        }
+        assertTrue(brokenPipeObserved)
+
+        // Then enable the drain and verify the same request finishes with a 
redirect.
+        sql """ admin set frontend 
config("stream_load_redirect_bounded_drain_max_bytes" = "16777216") """
+        sql """ admin set frontend 
config("stream_load_redirect_bounded_drain_max_idle_time_ms" = "2000") """
+        for (int i = 0; i < 3; i++) {
+            def attempt = runChunkedStreamLoad()
+            def result = attempt.result
+            assertEquals(0, attempt.code as Integer)
+            assertEquals(307, result.status_code as Integer)
+            assertTrue(result.exception_type == null)
+            assertTrue(result.exception == null)
+            
assertTrue(result.headers.Location.contains("/api/${dbName}/${tableName}/_stream_load"))
+        }
+    } finally {
+        // Restore the shared FE settings before leaving the ordinary 
regression environment.
+        sql """ admin set frontend 
config("stream_load_redirect_bounded_drain_max_bytes" = 
"${originalDrainMaxBytes}") """
+        sql """ admin set frontend 
config("stream_load_redirect_bounded_drain_max_idle_time_ms" = 
"${originalDrainMaxIdleTimeMs}") """
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to