This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new f1b89ae2a67 branch-4.1: [fix](fe) Fix broken pipe risk on stream load
redirect with unconsumed request body (#64302)
f1b89ae2a67 is described below
commit f1b89ae2a6771d458097ce8c88c8dd6aac5cd303
Author: Xin Liao <[email protected]>
AuthorDate: Wed Jun 10 11:52:03 2026 +0800
branch-4.1: [fix](fe) Fix broken pipe risk on stream load redirect with
unconsumed request body (#64302)
Pick apache/doris#63332
Co-authored-by: Wen Zhenghu <[email protected]>
Co-authored-by: yaoxiao <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 24 ++
.../config/WebServerFactoryCustomizerConfig.java | 3 +
.../org/apache/doris/httpv2/rest/LoadAction.java | 156 +++++---
.../doris/httpv2/rest/RestBaseController.java | 51 ++-
.../httpv2/util/StreamLoadRedirectDrainUtil.java | 154 +++++++
.../apache/doris/httpv2/rest/LoadActionTest.java | 445 ++++++++++++++++++++-
.../doris/httpv2/rest/RestBaseControllerTest.java | 64 +++
.../util/StreamLoadRedirectDrainUtilTest.java | 394 ++++++++++++++++++
.../scripts/stream_load_redirect_chunked_e2e.py | 140 +++++++
...test_stream_load_fe_redirect_chunked_e2e.groovy | 117 ++++++
10 files changed, 1460 insertions(+), 88 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 74a3ffa1f70..500d17382cb 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -464,6 +464,13 @@ public class Config extends ConfigBase {
"The maximum HTTP POST size of Jetty, in bytes, the default value
is 100MB."})
public static int jetty_server_max_http_post_size = 100 * 1024 * 1024;
+ @ConfField(description = {
+ "Jetty 在应用未消费完请求体时,额外尝试读取剩余内容的最大次数。"
+ + "-1 表示不限制,0 表示不额外读取,正数表示最大读取次数。",
+ "The maximum number of extra reads Jetty performs for unconsumed
request content. "
+ + "-1 means unlimited, 0 means disabled, and a positive
value limits the read attempts."})
+ public static int jetty_server_max_unconsumed_request_content_reads = -1;
+
@ConfField(description = {"Jetty 的最大 HTTP header 大小,单位是字节,默认值是 1MB。",
"The maximum HTTP header size of Jetty, in bytes, the default
value is 1MB."})
public static int jetty_server_max_http_header_size = 1048576;
@@ -3628,6 +3635,23 @@ public class Config extends ConfigBase {
+ "public-private/public/private/direct/random-be and empty
string" })
public static String streamload_redirect_policy = "";
+ @ConfField(mutable = true, description = {
+ "Stream Load redirect 场景下,FE 在返回 307 后额外丢弃请求体的最大字节数。"
+ + "0 表示关闭该兼容逻辑,正数表示最大丢弃字节数。",
+ "The maximum number of request body bytes FE drains after
returning 307 for Stream Load redirects. "
+ + "0 disables the compatibility logic, and a positive
value sets the byte limit."})
+ // Enable a generous bounded drain window by default to preserve FE
redirect compatibility on Jetty 12.
+ public static long stream_load_redirect_bounded_drain_max_bytes = 1024L *
1024 * 1024;
+
+ @ConfField(mutable = true, description = {
+ "Stream Load redirect 场景下,FE 在检测到请求体暂时无可读数据后继续等待的最大空闲时长,单位毫秒。"
+ + "0 表示不额外等待,用于给慢客户端或分段到达的数据保留一个有限的缓冲窗口。",
+ "The maximum idle wait time in milliseconds after FE detects no
readable request body bytes "
+ + "during Stream Load redirect drain. 0 disables the extra
idle wait, while a positive value "
+ + "keeps a bounded grace window for slow clients or
delayed request body chunks."})
+ // Keep a small grace period for delayed body chunks after FE has already
written the redirect.
+ public static int stream_load_redirect_bounded_drain_max_idle_time_ms =
1000;
+
@ConfField(mutable = true, description = {
"存算分离模式下是否启用 group commit 的 streamload BE 转发功能。"
+ "解决 LB 随机转发导致 group commit 攒批失效的问题,通过 BE 二次转发确保同表请求到达同一
BE 节点。",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/config/WebServerFactoryCustomizerConfig.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/config/WebServerFactoryCustomizerConfig.java
index 04add103e31..7f9ce0fff10 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/config/WebServerFactoryCustomizerConfig.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/config/WebServerFactoryCustomizerConfig.java
@@ -62,6 +62,9 @@ public class WebServerFactoryCustomizerConfig implements
WebServerFactoryCustomi
if (httpFactory != null) {
HttpConfiguration httpConfig =
httpFactory.getHttpConfiguration();
httpConfig.setRequestHeaderSize(Config.jetty_server_max_http_header_size);
+ // Apply the unconsumed request content read limit to
every HTTP connector.
+ httpConfig.setMaxUnconsumedRequestContentReads(
+
Config.jetty_server_max_unconsumed_request_content_reads);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 133e0c965bc..7e817cc202b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -23,7 +23,6 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.cloud.qe.ComputeGroupException;
-import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -33,6 +32,7 @@ import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
+import org.apache.doris.httpv2.util.StreamLoadRedirectDrainUtil;
import org.apache.doris.load.StreamLoadHandler;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.GroupCommitPlanner;
@@ -60,8 +60,8 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.view.RedirectView;
+import java.io.IOException;
import java.net.InetAddress;
-import java.net.URI;
import java.util.Enumeration;
import java.util.List;
import java.util.Optional;
@@ -132,7 +132,7 @@ public class LoadAction extends RestBaseController {
if (!checkClusterToken(authToken)) {
throw new UnauthorizedException("Invalid token: " + authToken);
}
- return executeWithClusterToken(request, db, table, true);
+ return executeWithClusterToken(request, response, db, table, true);
} else {
try {
executeCheckPassword(request, response);
@@ -191,8 +191,7 @@ public class LoadAction extends RestBaseController {
LOG.info("redirect load action to destination={}, label: {}",
redirectAddr.toString(), label);
- RedirectView redirectView = redirectTo(request, redirectAddr);
- return redirectView;
+ return createRedirectResponse(request, response, redirectAddr,
true, null, null, label);
} catch (Exception e) {
return new RestBaseResult(e.getMessage());
}
@@ -313,11 +312,17 @@ public class LoadAction extends RestBaseController {
redirectAddr.toString(), isStreamLoad, dbName,
tableName, label);
}
- RedirectView redirectView = redirectTo(request, redirectAddr);
- return redirectView;
+ return createRedirectResponse(request, response, redirectAddr,
isStreamLoad, dbName, tableName, label);
} catch (StreamLoadForwardException e) {
- // Special handling for stream load forwarding
- return e.getRedirectView();
+ // Handle IOException from redirect response generation in the
forwarding path.
+ try {
+ return createRedirectResponse(request, response,
e.getRedirectView(),
+ isStreamLoad, db, table, label);
+ } catch (IOException ioException) {
+ LOG.warn("stream load forward redirect failed, stream: {}, db:
{}, tbl: {}, label: {}, err: {}",
+ isStreamLoad, db, table, label,
ioException.getMessage());
+ return new RestBaseResult(ioException.getMessage());
+ }
} catch (Exception e) {
LOG.warn("load failed, stream: {}, db: {}, tbl: {}, label: {},
err: {}",
isStreamLoad, db, table, label, e.getMessage());
@@ -594,7 +599,7 @@ public class LoadAction extends RestBaseController {
// AuditlogPlugin should be re-disigned carefully, and blow method focuses
on
// temporarily addressing the users' needs for audit logs.
// So this function is not widely tested under general scenario
- private Object executeWithClusterToken(HttpServletRequest request, String
db,
+ private Object executeWithClusterToken(HttpServletRequest request,
HttpServletResponse response, String db,
String table, boolean isStreamLoad) {
try {
ConnectContext ctx = new ConnectContext();
@@ -637,29 +642,7 @@ public class LoadAction extends RestBaseController {
+ "stream: {}, db: {}, tbl: {}, label: {}",
redirectAddr.toString(), isStreamLoad, dbName, tableName,
label);
- URI urlObj = null;
- URI resultUriObj = null;
- String urlStr = request.getRequestURI();
- String userInfo = null;
-
- try {
- urlObj = new URI(urlStr);
- resultUriObj = new URI("http", userInfo,
redirectAddr.getHostname(),
- redirectAddr.getPort(), urlObj.getPath(), "", null);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- String redirectUrl = resultUriObj.toASCIIString();
- if (!Strings.isNullOrEmpty(request.getQueryString())) {
- redirectUrl += request.getQueryString();
- }
- LOG.info("Redirect url: {}", "http://" +
redirectAddr.getHostname() + ":"
- + redirectAddr.getPort() + urlObj.getPath());
- RedirectView redirectView = new RedirectView(redirectUrl);
- redirectView.setContentType("text/html;charset=utf-8");
-
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
-
- return redirectView;
+ return createRedirectResponse(request, response, redirectAddr,
isStreamLoad, dbName, tableName, label);
} catch (Exception e) {
LOG.warn("Failed to execute stream load with cluster token, {}",
e.getMessage(), e);
return new RestBaseResult(e.getMessage());
@@ -679,6 +662,80 @@ public class LoadAction extends RestBaseController {
return headers.toString();
}
+ private Object createRedirectResponse(HttpServletRequest request,
HttpServletResponse response,
+ TNetworkAddress redirectAddr, boolean isStreamLoad, String dbName,
String tableName, String label)
+ throws IOException {
+ String redirectUrl = buildRedirectUrl(request, redirectAddr);
+ if (!shouldUseBoundedDrainForStreamLoad(isStreamLoad)) {
+ return redirectTo(request, redirectAddr);
+ }
+ writeTemporaryRedirect(response, redirectUrl);
+ DrainDecision drainDecision =
decideDrainDecisionForStreamLoadRedirect(request);
+ if (drainDecision != DrainDecision.DRAIN) {
+ LOG.info("skip bounded drain after stream load redirect, target:
{}, db: {}, tbl: {}, label: {},"
+ + " reason: {}",
+ redirectAddr, dbName, tableName, label, drainDecision);
+ return null;
+ }
+ drainStreamLoadRequestBodyAfterRedirect(request,
redirectAddr.toString(), dbName, tableName, label);
+ return null;
+ }
+
+ private Object createRedirectResponse(HttpServletRequest request,
HttpServletResponse response,
+ RedirectView redirectView, boolean isStreamLoad, String dbName,
String tableName, String label)
+ throws IOException {
+ if (!shouldUseBoundedDrainForStreamLoad(isStreamLoad)) {
+ return redirectView;
+ }
+ writeTemporaryRedirect(response, redirectView.getUrl());
+ DrainDecision drainDecision =
decideDrainDecisionForStreamLoadRedirect(request);
+ if (drainDecision != DrainDecision.DRAIN) {
+ LOG.info("skip bounded drain after stream load redirect, target:
{}, db: {}, tbl: {}, label: {},"
+ + " reason: {}",
+ redirectView.getUrl(), dbName, tableName, label,
drainDecision);
+ return null;
+ }
+ drainStreamLoadRequestBodyAfterRedirect(request,
redirectView.getUrl(), dbName, tableName, label);
+ return null;
+ }
+
+ private boolean shouldUseBoundedDrainForStreamLoad(boolean isStreamLoad) {
+ return isStreamLoad &&
Config.stream_load_redirect_bounded_drain_max_bytes > 0;
+ }
+
+ // Skip the bounded drain for header-only probes and oversized
fixed-length bodies.
+ private DrainDecision
decideDrainDecisionForStreamLoadRedirect(HttpServletRequest request) {
+ long contentLength = request.getContentLengthLong();
+ String transferEncoding =
request.getHeader(HttpHeaderNames.TRANSFER_ENCODING.toString());
+ if (contentLength <= 0 && Strings.isNullOrEmpty(transferEncoding)) {
+ return DrainDecision.SKIP_NO_REQUEST_BODY;
+ }
+ if (contentLength >
Config.stream_load_redirect_bounded_drain_max_bytes) {
+ return DrainDecision.SKIP_CONTENT_LENGTH_EXCEEDS_MAX_BYTES;
+ }
+ return DrainDecision.DRAIN;
+ }
+
+ private void drainStreamLoadRequestBodyAfterRedirect(HttpServletRequest
request, String redirectTarget,
+ String dbName, String tableName, String label) {
+ long drainLimit = Config.stream_load_redirect_bounded_drain_max_bytes;
+ LOG.info("write stream load redirect and start bounded drain, target:
{}, db: {}, tbl: {}, label: {},"
+ + " max_drain_bytes: {}",
+ redirectTarget, dbName, tableName, label, drainLimit);
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(request, drainLimit);
+ LOG.info("finish bounded drain after stream load redirect, target: {},
db: {}, tbl: {}, label: {},"
+ + " drained_bytes: {}, elapsed_ms: {}, exit_reason:
{}",
+ redirectTarget, dbName, tableName, label,
drainResult.getDrainedBytes(),
+ drainResult.getElapsedMillis(), drainResult.getExitReason());
+ }
+
+ private enum DrainDecision {
+ SKIP_NO_REQUEST_BODY,
+ SKIP_CONTENT_LENGTH_EXCEEDS_MAX_BYTES,
+ DRAIN
+ }
+
private boolean isSensitiveHeader(String headerName) {
return "Authorization".equalsIgnoreCase(headerName)
|| "Proxy-Authorization".equalsIgnoreCase(headerName)
@@ -737,38 +794,17 @@ public class LoadAction extends RestBaseController {
* @param addr the endpoint address to redirect to (public/private
endpoint)
* @param forwardTarget the target BE node in "host:port" format for final
processing
* @return RedirectView configured for stream load forwarding
- */
+ */
private RedirectView redirectToStreamLoadForward(HttpServletRequest
request, TNetworkAddress addr,
String forwardTarget) {
- URI urlObj = null;
- URI resultUriObj = null;
- String urlStr = request.getRequestURI();
- String userInfo = null;
- String modifiedPath = null;
-
- if (!Strings.isNullOrEmpty(request.getHeader("Authorization"))) {
- ActionAuthorizationInfo authInfo = getAuthorizationInfo(request);
- userInfo =
ClusterNamespace.getNameFromFullName(authInfo.fullUserName)
- + ":" + authInfo.password;
- }
- try {
- urlObj = new URI(urlStr);
- // Replace _stream_load with _stream_load_forward in the path
- modifiedPath = urlObj.getPath().replace("/_stream_load",
"/_stream_load_forward");
- resultUriObj = new URI("http", userInfo, addr.getHostname(),
- addr.getPort(), modifiedPath, "", null);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- String redirectUrl = resultUriObj.toASCIIString();
-
- // Add forward_to parameter (note: toASCIIString() already includes
'?' due to empty query)
+ // Replace _stream_load with _stream_load_forward in the path.
+ String modifiedPath = request.getRequestURI().replace("/_stream_load",
"/_stream_load_forward");
String queryString = request.getQueryString();
+ String redirectQuery = "forward_to=" + forwardTarget;
if (!Strings.isNullOrEmpty(queryString)) {
- redirectUrl += queryString + "&forward_to=" + forwardTarget;
- } else {
- redirectUrl += "forward_to=" + forwardTarget;
+ redirectQuery = queryString + "&" + redirectQuery;
}
+ String redirectUrl = buildRedirectUrl(request, addr, modifiedPath,
redirectQuery);
LOG.info("Redirect stream load forward url: {}, forward_to: {}",
"http://" + addr.getHostname() + ":" + addr.getPort() +
modifiedPath, forwardTarget);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
index 78ac8ef31ef..29cdce15910 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
@@ -89,17 +89,12 @@ public class RestBaseController extends BaseController {
return authInfo;
}
- public RedirectView redirectTo(HttpServletRequest request, TNetworkAddress
addr) {
- RedirectView redirectView = new RedirectView(getRedirectUrL(request,
addr));
- redirectView.setContentType("text/html;charset=utf-8");
-
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
- return redirectView;
+ protected String buildRedirectUrl(HttpServletRequest request,
TNetworkAddress addr) {
+ return buildRedirectUrl(request, addr, request.getRequestURI(),
request.getQueryString());
}
- public String getRedirectUrL(HttpServletRequest request, TNetworkAddress
addr) {
- URI urlObj = null;
- URI resultUriObj = null;
- String urlStr = request.getRequestURI();
+ protected String buildRedirectUrl(HttpServletRequest request,
TNetworkAddress addr, String requestPath,
+ String queryString) {
String userInfo = null;
if (!Strings.isNullOrEmpty(request.getHeader("Authorization"))) {
ActionAuthorizationInfo authInfo = getAuthorizationInfo(request);
@@ -107,19 +102,37 @@ public class RestBaseController extends BaseController {
+ ":" + authInfo.password;
}
try {
- urlObj = new URI(urlStr);
- resultUriObj = new URI(request.getScheme(), userInfo,
addr.getHostname(),
- addr.getPort(), urlObj.getPath(), "", null);
+ // Preserve the original request path to avoid re-encoding an
already encoded URI path.
+ URI authorityUri = new URI(request.getScheme(), userInfo,
addr.getHostname(),
+ addr.getPort(), null, null, null);
+ String redirectUrl = authorityUri.toASCIIString() + requestPath;
+ if (!Strings.isNullOrEmpty(queryString)) {
+ redirectUrl += "?" + queryString;
+ }
+ LOG.info("Redirect url: {}", request.getScheme() + "://" +
addr.getHostname() + ":"
+ + addr.getPort() + requestPath);
+ return redirectUrl;
} catch (Exception e) {
throw new RuntimeException(e);
}
- String redirectUrl = resultUriObj.toASCIIString();
- if (!Strings.isNullOrEmpty(request.getQueryString())) {
- redirectUrl += request.getQueryString();
- }
- LOG.info("Redirect url: {}", request.getScheme() + "://" +
addr.getHostname() + ":"
- + addr.getPort() + urlObj.getPath());
- return redirectUrl;
+ }
+
+ protected void writeTemporaryRedirect(HttpServletResponse response, String
redirectUrl) throws IOException {
+ response.setContentType("text/html;charset=utf-8");
+ response.setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+ response.setHeader("Location", redirectUrl);
+ response.flushBuffer();
+ }
+
+ public RedirectView redirectTo(HttpServletRequest request, TNetworkAddress
addr) {
+ RedirectView redirectView = new RedirectView(buildRedirectUrl(request,
addr));
+ redirectView.setContentType("text/html;charset=utf-8");
+
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
+ return redirectView;
+ }
+
+ public String getRedirectUrL(HttpServletRequest request, TNetworkAddress
addr) {
+ return buildRedirectUrl(request, addr);
}
public RedirectView redirectToObj(String sign) throws URISyntaxException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtil.java
new file mode 100644
index 00000000000..a9d345ad38d
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtil.java
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.util;
+
+import org.apache.doris.common.Config;
+
+import com.google.common.base.Preconditions;
+import jakarta.servlet.ServletInputStream;
+import jakarta.servlet.http.HttpServletRequest;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+
+public final class StreamLoadRedirectDrainUtil {
+ private static final Logger LOG =
LogManager.getLogger(StreamLoadRedirectDrainUtil.class);
+
+ private static final int BUFFER_SIZE = 8 * 1024;
+ private static final int IDLE_SLEEP_MS = 5;
+
+ private StreamLoadRedirectDrainUtil() {
+ }
+
+ public static DrainResult drainRequestBodyAfterRedirect(HttpServletRequest
request, long maxBytes) {
+ try {
+ return drainRequestBodyAfterRedirect(request.getInputStream(),
maxBytes);
+ } catch (IOException e) {
+ LOG.warn("failed to get request input stream for stream load
redirect drain", e);
+ return new DrainResult(0, 0, ExitReason.ERROR);
+ }
+ }
+
+ static DrainResult drainRequestBodyAfterRedirect(ServletInputStream
inputStream, long maxBytes) {
+ Preconditions.checkArgument(maxBytes > 0, "maxBytes must be positive");
+
+ long startNanos = System.nanoTime();
+ long drainedBytes = 0;
+ long idleStartNanos = -1L;
+ byte[] buffer = new byte[(int) Math.min(BUFFER_SIZE, maxBytes)];
+
+ try {
+ while (drainedBytes < maxBytes) {
+ // Prefer an explicit EOF signal before relying on available()
as a readiness hint.
+ if (inputStream.isFinished()) {
+ return new DrainResult(drainedBytes,
elapsedMillis(startNanos), ExitReason.EOF);
+ }
+ int availableBytes = inputStream.available();
+ if (availableBytes <= 0) {
+ // Allow a bounded idle window so slow clients can still
deliver buffered bytes.
+ idleStartNanos = idleStartNanos < 0 ? System.nanoTime() :
idleStartNanos;
+ DrainResult idleWaitResult =
waitForMoreDataOrExit(idleStartNanos, drainedBytes, startNanos);
+ if (idleWaitResult != null) {
+ return idleWaitResult;
+ }
+ continue;
+ }
+
+ idleStartNanos = -1L;
+ int readLimit = (int) Math.min(Math.min(maxBytes -
drainedBytes, buffer.length), availableBytes);
+ int readBytes = inputStream.read(buffer, 0, readLimit);
+ if (readBytes < 0) {
+ return new DrainResult(drainedBytes,
elapsedMillis(startNanos), ExitReason.EOF);
+ }
+ if (readBytes == 0) {
+ // Treat zero-byte reads as transient backpressure instead
of busy-spinning.
+ idleStartNanos = idleStartNanos < 0 ? System.nanoTime() :
idleStartNanos;
+ DrainResult idleWaitResult =
waitForMoreDataOrExit(idleStartNanos, drainedBytes, startNanos);
+ if (idleWaitResult != null) {
+ return idleWaitResult;
+ }
+ continue;
+ }
+ drainedBytes += readBytes;
+ }
+ return new DrainResult(drainedBytes, elapsedMillis(startNanos),
ExitReason.MAX_BYTES);
+ } catch (IOException e) {
+ LOG.warn("failed while draining request body after stream load
redirect", e);
+ return new DrainResult(drainedBytes, elapsedMillis(startNanos),
ExitReason.ERROR);
+ }
+ }
+
+ // Convert a bounded idle wait into a terminal drain result when the grace
window expires or gets interrupted.
+ private static DrainResult waitForMoreDataOrExit(long idleStartNanos, long
drainedBytes, long startNanos) {
+ if (elapsedMillis(idleStartNanos) >=
Config.stream_load_redirect_bounded_drain_max_idle_time_ms) {
+ return new DrainResult(drainedBytes, elapsedMillis(startNanos),
ExitReason.IDLE_TIMEOUT);
+ }
+ if (!sleepForIdleWindow()) {
+ return new DrainResult(drainedBytes, elapsedMillis(startNanos),
ExitReason.INTERRUPTED);
+ }
+ return null;
+ }
+
+ private static boolean sleepForIdleWindow() {
+ try {
+ Thread.sleep(IDLE_SLEEP_MS);
+ return true;
+ } catch (InterruptedException e) {
+ LOG.warn("stream load redirect drain idle wait is interrupted", e);
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+
+ private static long elapsedMillis(long startNanos) {
+ return (System.nanoTime() - startNanos) / 1_000_000;
+ }
+
+ public enum ExitReason {
+ EOF,
+ MAX_BYTES,
+ IDLE_TIMEOUT,
+ INTERRUPTED,
+ ERROR
+ }
+
+ public static final class DrainResult {
+ private final long drainedBytes;
+ private final long elapsedMillis;
+ private final ExitReason exitReason;
+
+ public DrainResult(long drainedBytes, long elapsedMillis, ExitReason
exitReason) {
+ this.drainedBytes = drainedBytes;
+ this.elapsedMillis = elapsedMillis;
+ this.exitReason = exitReason;
+ }
+
+ public long getDrainedBytes() {
+ return drainedBytes;
+ }
+
+ public long getElapsedMillis() {
+ return elapsedMillis;
+ }
+
+ public ExitReason getExitReason() {
+ return exitReason;
+ }
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
index 28d7cda587c..4d57c25fd71 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
@@ -17,20 +17,226 @@
package org.apache.doris.httpv2.rest;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import jakarta.servlet.ReadListener;
+import jakarta.servlet.ServletInputStream;
import jakarta.servlet.http.HttpServletRequest;
-import org.junit.Assert;
-import org.junit.Test;
+import jakarta.servlet.http.HttpServletResponse;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.servlet.view.RedirectView;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
public class LoadActionTest {
+ private final boolean originalEnableDebugPoints =
Config.enable_debug_points;
+ private final String originalCloudUniqueId = Config.cloud_unique_id;
+ private final boolean originalEnableGroupCommitStreamLoadBeForward =
+ Config.enable_group_commit_streamload_be_forward;
+
+
+ @AfterEach
+ public void tearDown() {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 1024L * 1024 *
1024;
+ Config.enable_debug_points = originalEnableDebugPoints;
+ Config.cloud_unique_id = originalCloudUniqueId;
+ Config.enable_group_commit_streamload_be_forward =
originalEnableGroupCommitStreamLoadBeForward;
+ DebugPointUtil.clearDebugPoints();
+ Thread.interrupted();
+ org.apache.doris.qe.ConnectContext.remove();
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 1000;
+ }
+
+ @Test
+ public void
testCreateRedirectResponseReturnsRedirectViewWhenBoundedDrainDisabled() throws
Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 0;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest();
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ new TNetworkAddress("be-host", 8040), true, "db1", "tbl1",
"label1");
+
+ Assertions.assertTrue(result instanceof RedirectView);
+ RedirectView redirectView = (RedirectView) result;
+
Assertions.assertEquals("http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar",
redirectView.getUrl());
+ Mockito.verifyNoInteractions(response);
+ }
+
+ @Test
+ public void testCreateRedirectResponseWrites307WhenBoundedDrainEnabled()
throws Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(-1, "chunked",
true);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ new TNetworkAddress("be-host", 8040), true, "db1", "tbl1",
"label1");
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setContentType("text/html;charset=utf-8");
+
Mockito.verify(response).setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar");
+ Mockito.verify(response).flushBuffer();
+ Mockito.verify(request).getInputStream();
+ }
+
+ @Test
+ public void
testCreateRedirectResponseDrainsFixedLengthRequestWithinLimit() throws
Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(4, null, true);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ new TNetworkAddress("be-host", 8040), true, "db1", "tbl1",
"label1");
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar");
+ Mockito.verify(request).getInputStream();
+ }
+
+ @Test
+ public void testCreateRedirectResponseSkipsDrainWithoutRequestBody()
throws Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ new TNetworkAddress("be-host", 8040), true, "db1", "tbl1",
"label1");
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setContentType("text/html;charset=utf-8");
+
Mockito.verify(response).setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar");
+ Mockito.verify(response).flushBuffer();
+ Mockito.verify(request, Mockito.never()).getInputStream();
+ }
+
+ @Test
+ public void testCreateRedirectResponseSkipsDrainWhenContentLengthIsZero()
throws Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(0, null, false);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ new TNetworkAddress("be-host", 8040), true, "db1", "tbl1",
"label1");
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setContentType("text/html;charset=utf-8");
+
Mockito.verify(response).setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar");
+ Mockito.verify(response).flushBuffer();
+ Mockito.verify(request, Mockito.never()).getInputStream();
+ }
+
+ @Test
+ public void
testCreateRedirectResponseSkipsDrainWhenContentLengthExceedsLimit() throws
Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(16, null, false);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ new TNetworkAddress("be-host", 8040), true, "db1", "tbl1",
"label1");
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setContentType("text/html;charset=utf-8");
+
Mockito.verify(response).setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar");
+ Mockito.verify(response).flushBuffer();
+ Mockito.verify(request, Mockito.never()).getInputStream();
+ }
+
+ @Test
+ public void
testCreateRedirectResponseWithRedirectViewReturnsRedirectViewWhenDrainDisabled()
throws Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 0;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+ RedirectView redirectView = new
RedirectView("http://be-host:8040/redirect");
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ redirectView, true, "db1", "tbl1", "label1");
+
+ Assertions.assertSame(redirectView, result);
+ Mockito.verifyNoInteractions(response);
+ Mockito.verify(request, Mockito.never()).getInputStream();
+ }
+
+ @Test
+ public void
testCreateRedirectResponseWithRedirectViewSkipsDrainWithoutRequestBody() throws
Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+ RedirectView redirectView = new
RedirectView("http://be-host:8040/redirect");
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ redirectView, true, "db1", "tbl1", "label1");
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/redirect");
+ Mockito.verify(request, Mockito.never()).getInputStream();
+ }
+
+ @Test
+ public void
testCreateRedirectResponseWithRedirectViewDrainsChunkedRequest() throws
Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(-1, "chunked",
true);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+ RedirectView redirectView = new
RedirectView("http://be-host:8040/redirect");
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ redirectView, true, "db1", "tbl1", "label1");
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/redirect");
+ Mockito.verify(request).getInputStream();
+ }
+
+ @Test
+ public void testCreateRedirectResponseKeepsNonStreamLoadBehavior() throws
Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest();
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ new TNetworkAddress("be-host", 8040), false, "db1", "tbl1",
"label1");
+
+ Assertions.assertTrue(result instanceof RedirectView);
+ Mockito.verifyNoInteractions(response);
+ }
@Test
public void testGetAllHeadersMasksSensitiveHeaders() throws Exception {
- LoadAction action = new LoadAction();
+ LoadAction loadAction = new LoadAction();
HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
Mockito.when(request.getHeaderNames()).thenReturn(Collections.enumeration(Arrays.asList(
"Authorization", "Cookie", "Set-Cookie", "token", "label")));
@@ -38,12 +244,233 @@ public class LoadActionTest {
Method method = LoadAction.class.getDeclaredMethod("getAllHeaders",
HttpServletRequest.class);
method.setAccessible(true);
- String headers = (String) method.invoke(action, request);
+ String headers = (String) method.invoke(loadAction, request);
+
+ Assertions.assertTrue(headers.contains("Authorization:***MASKED***"));
+ Assertions.assertTrue(headers.contains("Cookie:***MASKED***"));
+ Assertions.assertTrue(headers.contains("Set-Cookie:***MASKED***"));
+ Assertions.assertTrue(headers.contains("token:***MASKED***"));
+ Assertions.assertTrue(headers.contains("label:load_label"));
+ }
+
+ @Test
+ public void testExecuteWithoutPasswordRedirectsToBackend() throws
Exception {
+ Config.enable_debug_points = true;
+
DebugPointUtil.addDebugPointWithValue("LoadAction.selectRedirectBackend.backendId",
1L);
+ TestLoadAction loadAction = new TestLoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+ Backend backend = mockBackend("be-host", 8040, null);
+ SystemInfoService systemInfoService =
Mockito.mock(SystemInfoService.class);
+ Mockito.when(systemInfoService.getBackend(1L)).thenReturn(backend);
+ Mockito.when(request.getHeader("expect")).thenReturn("100-continue");
+
+ org.apache.doris.qe.ConnectContext connectContext = new
org.apache.doris.qe.ConnectContext();
+ connectContext.setThreadLocalInfo();
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService);
+
+ Object result = invokeExecuteWithoutPassword(loadAction, request,
response, "db1", "tbl1", true, false);
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar");
+ }
+ }
+
+ @Test
+ public void testExecuteWithClusterTokenRedirectsToBackend() throws
Exception {
+ Config.enable_debug_points = true;
+
DebugPointUtil.addDebugPointWithValue("LoadAction.selectRedirectBackend.backendId",
1L);
+ TestLoadAction loadAction = new TestLoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+ Backend backend = mockBackend("be-host", 8040, null);
+ SystemInfoService systemInfoService =
Mockito.mock(SystemInfoService.class);
+ Env env = Mockito.mock(Env.class);
+ InternalCatalog internalCatalog = Mockito.mock(InternalCatalog.class);
+ Mockito.when(systemInfoService.getBackend(1L)).thenReturn(backend);
+ // Provide the default catalog name required by
ConnectContext.setEnv().
+ Mockito.when(env.getInternalCatalog()).thenReturn(internalCatalog);
+ Mockito.when(internalCatalog.getName()).thenReturn("internal");
+ Mockito.when(request.getHeader("expect")).thenReturn("100-continue");
+ Mockito.when(request.getHeader("label")).thenReturn("label1");
+ Mockito.when(request.getRemoteAddr()).thenReturn("127.0.0.1");
+
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService);
+ mockedEnv.when(Env::getCurrentEnv).thenReturn(env);
+
+ Object result = invokeExecuteWithClusterToken(loadAction, request,
response, "db1", "tbl1", true);
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar");
+ }
+ }
+
+ @Test
+ public void testStreamLoadWithSqlRedirectsWhenExpectHeaderExists() {
+ Config.enable_debug_points = true;
+
DebugPointUtil.addDebugPointWithValue("LoadAction.selectRedirectBackend.backendId",
1L);
+ TestLoadAction loadAction = new TestLoadAction();
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+ Backend backend = mockBackend("be-host", 8040, null);
+ SystemInfoService systemInfoService =
Mockito.mock(SystemInfoService.class);
+ Mockito.when(systemInfoService.getBackend(1L)).thenReturn(backend);
+ Mockito.when(request.getHeader("sql")).thenReturn("insert into
db1.tbl1 values(1)");
+ Mockito.when(request.getHeader("expect")).thenReturn("100-continue");
+
Mockito.when(request.getRequestURI()).thenReturn("/api/db1/tbl1/_stream_load");
+ Mockito.when(request.getQueryString()).thenReturn("foo=bar");
+ Mockito.when(request.getScheme()).thenReturn("http");
+ Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+ Mockito.when(request.getContentLengthLong()).thenReturn(-1L);
+ Mockito.when(request.getHeader("transfer-encoding")).thenReturn(null);
+
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService);
+
+ Object result = loadAction.streamLoadWithSql(request, response);
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar");
+ }
+ }
+
+ @Test
+ public void testRedirectToStreamLoadForwardBuildsForwardUrl() throws
Exception {
+ TestLoadAction loadAction = new TestLoadAction();
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+
Mockito.when(request.getRequestURI()).thenReturn("/api/db1/tbl1/_stream_load");
+ Mockito.when(request.getQueryString()).thenReturn("k=v");
+ Mockito.when(request.getScheme()).thenReturn("http");
+ Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+
+ RedirectView redirectView =
invokeRedirectToStreamLoadForward(loadAction, request,
+ new TNetworkAddress("lb-host", 18040), "be-host:8040");
+
+
Assertions.assertEquals("http://lb-host:18040/api/db1/tbl1/_stream_load_forward?k=v&forward_to=be-host:8040",
+ redirectView.getUrl());
+ }
+
+ private Object invokeCreateRedirectResponse(LoadAction loadAction,
HttpServletRequest request,
+ HttpServletResponse response, TNetworkAddress redirectAddr,
boolean isStreamLoad, String dbName,
+ String tableName, String label) throws Exception {
+ Method method =
LoadAction.class.getDeclaredMethod("createRedirectResponse",
+ HttpServletRequest.class, HttpServletResponse.class,
TNetworkAddress.class,
+ boolean.class, String.class, String.class, String.class);
+ method.setAccessible(true);
+ return method.invoke(loadAction, request, response, redirectAddr,
isStreamLoad, dbName, tableName, label);
+ }
+
+ private Object invokeCreateRedirectResponse(LoadAction loadAction,
HttpServletRequest request,
+ HttpServletResponse response, RedirectView redirectView, boolean
isStreamLoad, String dbName,
+ String tableName, String label) throws Exception {
+ Method method =
LoadAction.class.getDeclaredMethod("createRedirectResponse",
+ HttpServletRequest.class, HttpServletResponse.class,
RedirectView.class,
+ boolean.class, String.class, String.class, String.class);
+ method.setAccessible(true);
+ return method.invoke(loadAction, request, response, redirectView,
isStreamLoad, dbName, tableName, label);
+ }
+
+ private Object invokeExecuteWithoutPassword(LoadAction loadAction,
HttpServletRequest request,
+ HttpServletResponse response, String db, String table, boolean
isStreamLoad, boolean groupCommit)
+ throws Exception {
+ Method method =
LoadAction.class.getDeclaredMethod("executeWithoutPassword",
+ HttpServletRequest.class, HttpServletResponse.class,
String.class, String.class,
+ boolean.class, boolean.class);
+ method.setAccessible(true);
+ return method.invoke(loadAction, request, response, db, table,
isStreamLoad, groupCommit);
+ }
+
+ private Object invokeExecuteWithClusterToken(LoadAction loadAction,
HttpServletRequest request,
+ HttpServletResponse response, String db, String table, boolean
isStreamLoad) throws Exception {
+ Method method =
LoadAction.class.getDeclaredMethod("executeWithClusterToken",
+ HttpServletRequest.class, HttpServletResponse.class,
String.class, String.class, boolean.class);
+ method.setAccessible(true);
+ return method.invoke(loadAction, request, response, db, table,
isStreamLoad);
+ }
+
+ private RedirectView invokeRedirectToStreamLoadForward(LoadAction
loadAction, HttpServletRequest request,
+ TNetworkAddress addr, String forwardTarget) throws Exception {
+ Method method =
LoadAction.class.getDeclaredMethod("redirectToStreamLoadForward",
+ HttpServletRequest.class, TNetworkAddress.class, String.class);
+ method.setAccessible(true);
+ return (RedirectView) method.invoke(loadAction, request, addr,
forwardTarget);
+ }
+
+ private HttpServletRequest mockStreamLoadRequest() throws Exception {
+ return mockStreamLoadRequest(-1, null, true);
+ }
+
+ // Provide explicit request body metadata so the drain decision can be
verified in isolation.
+ private HttpServletRequest mockStreamLoadRequest(long contentLength,
String transferEncoding,
+ boolean stubInputStream) throws Exception {
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ Mockito.when(request.getScheme()).thenReturn("http");
+
Mockito.when(request.getRequestURI()).thenReturn("/api/db1/tbl1/_stream_load");
+ Mockito.when(request.getQueryString()).thenReturn("foo=bar");
+ Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+ Mockito.when(request.getContentLengthLong()).thenReturn(contentLength);
+
Mockito.when(request.getHeader("Transfer-Encoding")).thenReturn(transferEncoding);
+
Mockito.when(request.getHeader("transfer-encoding")).thenReturn(transferEncoding);
+ if (stubInputStream) {
+ Mockito.when(request.getInputStream()).thenReturn(new
IdleServletInputStream());
+ }
+ return request;
+ }
+
+ // Create a backend mock with only the redirect-related fields populated
for lightweight controller tests.
+ private Backend mockBackend(String host, int httpPort, String
privateEndpoint) {
+ Backend backend = Mockito.mock(Backend.class);
+ Mockito.when(backend.getHost()).thenReturn(host);
+ Mockito.when(backend.getHttpPort()).thenReturn(httpPort);
+ Mockito.when(backend.getPrivateEndpoint()).thenReturn(privateEndpoint);
+ Mockito.when(backend.getPublicEndpoint()).thenReturn(null);
+ return backend;
+ }
+
+ private static class TestLoadAction extends LoadAction {
+ @Override
+ public
org.apache.doris.httpv2.controller.BaseController.ActionAuthorizationInfo
executeCheckPassword(
+ HttpServletRequest request,
+ HttpServletResponse response) {
+ return new
org.apache.doris.httpv2.controller.BaseController.ActionAuthorizationInfo();
+ }
+
+ @Override
+ protected void checkTblAuth(org.apache.doris.analysis.UserIdentity
currentUser, String db, String tbl,
+ org.apache.doris.mysql.privilege.PrivPredicate predicate) {
+ }
+
+ @Override
+ protected boolean checkClusterToken(String token) {
+ return true;
+ }
+ }
+
+ private static class IdleServletInputStream extends ServletInputStream {
+ @Override
+ public int read() {
+ return -1;
+ }
+
+ @Override
+ public int available() {
+ return 0;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return false;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
- Assert.assertTrue(headers.contains("Authorization:***MASKED***"));
- Assert.assertTrue(headers.contains("Cookie:***MASKED***"));
- Assert.assertTrue(headers.contains("Set-Cookie:***MASKED***"));
- Assert.assertTrue(headers.contains("token:***MASKED***"));
- Assert.assertTrue(headers.contains("label:load_label"));
+ @Override
+ public void setReadListener(ReadListener readListener) {
+ }
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/RestBaseControllerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/RestBaseControllerTest.java
new file mode 100644
index 00000000000..b838e25ef50
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/RestBaseControllerTest.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.rest;
+
+import org.apache.doris.thrift.TNetworkAddress;
+
+import jakarta.servlet.http.HttpServletRequest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class RestBaseControllerTest {
+
+ @Test
+ public void testBuildRedirectUrlPreservesEncodedPath() {
+ // Keep the original encoded path unchanged when rebuilding the
redirect URL.
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ Mockito.when(request.getScheme()).thenReturn("http");
+ Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+
+ TestRestController controller = new TestRestController();
+ String redirectUrl = controller.buildRedirectUrlForTest(request,
+ new TNetworkAddress("be-host", 8040),
"/api/db%2Ftbl/_stream_load", "k=a%2Bb");
+
+
Assert.assertEquals("http://be-host:8040/api/db%2Ftbl/_stream_load?k=a%2Bb",
redirectUrl);
+ }
+
+ @Test
+ public void testBuildRedirectUrlWithoutQueryString() {
+ // Avoid appending a dangling question mark when the original request
has no query string.
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ Mockito.when(request.getScheme()).thenReturn("http");
+ Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+
+ TestRestController controller = new TestRestController();
+ String redirectUrl = controller.buildRedirectUrlForTest(request,
+ new TNetworkAddress("be-host", 8040),
"/api/db%2Ftbl/_stream_load", null);
+
+ Assert.assertEquals("http://be-host:8040/api/db%2Ftbl/_stream_load",
redirectUrl);
+ }
+
+ // Expose the protected helper so the redirect URL can be verified
directly.
+ private static class TestRestController extends RestBaseController {
+ private String buildRedirectUrlForTest(HttpServletRequest request,
TNetworkAddress addr,
+ String requestPath, String queryString) {
+ return buildRedirectUrl(request, addr, requestPath, queryString);
+ }
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtilTest.java
new file mode 100644
index 00000000000..8a8d6cae165
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtilTest.java
@@ -0,0 +1,394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.util;
+
+import org.apache.doris.common.Config;
+
+import jakarta.servlet.ReadListener;
+import jakarta.servlet.ServletInputStream;
+import jakarta.servlet.http.HttpServletRequest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+public class StreamLoadRedirectDrainUtilTest {
+
+ @AfterEach
+ public void tearDown() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 1000;
+ }
+
+ @Test
+ public void testDrainRequestBodyWithinMaxBytes() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(
+ new
QueueAvailableServletInputStream("hello".getBytes(), 5, 0, 0, 0), 16);
+
+ Assertions.assertEquals(5, drainResult.getDrainedBytes());
+ Assertions.assertTrue(drainResult.getElapsedMillis() >= 0);
+ Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void testDrainRequestBodyAfterRedirectUsesRequestInputStream()
throws Exception {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ Mockito.when(request.getInputStream())
+ .thenReturn(new
QueueAvailableServletInputStream("hello".getBytes(), 5, 0, 0, 0));
+
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(request, 16);
+
+ Assertions.assertEquals(5, drainResult.getDrainedBytes());
+ Assertions.assertTrue(drainResult.getElapsedMillis() >= 0);
+ Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void
testDrainRequestBodyAfterRedirectReturnsErrorWhenGetInputStreamFails() throws
Exception {
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ Mockito.when(request.getInputStream()).thenThrow(new IOException("open
error"));
+
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(request, 16);
+
+ Assertions.assertEquals(0, drainResult.getDrainedBytes());
+ Assertions.assertEquals(0, drainResult.getElapsedMillis());
+ Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.ERROR,
drainResult.getExitReason());
+ }
+
+ @Test
+ // Verify delayed body chunks are still drained when they arrive within
the bounded idle window.
+ public void testDrainRequestBodyAllowsDelayedArrivalWithinIdleWindow() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(
+ new
QueueAvailableServletInputStream("hello".getBytes(), 0, 0, 0, 0, 0, 5), 16);
+
+ Assertions.assertEquals(5, drainResult.getDrainedBytes());
+ Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void testDrainRequestBodyStopsAtMaxBytes() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(
+ new QueueAvailableServletInputStream("hello
world".getBytes(), 11), 5);
+
+ Assertions.assertEquals(5, drainResult.getDrainedBytes());
+
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.MAX_BYTES,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void testDrainRequestBodyIdleTimeout() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new
NeverReadyServletInputStream(), 8);
+
+ Assertions.assertEquals(0, drainResult.getDrainedBytes());
+
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.IDLE_TIMEOUT,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void testDrainRequestBodyUsesConfiguredIdleTime() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(
+ new
QueueAvailableServletInputStream("hello".getBytes(), 0, 5), 16);
+
+ Assertions.assertEquals(0, drainResult.getDrainedBytes());
+
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.IDLE_TIMEOUT,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void testDrainRequestBodyInterruptedWhileWaitingForMoreData() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ try {
+ Thread.currentThread().interrupt();
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new
NeverReadyServletInputStream(), 8);
+
+ Assertions.assertEquals(0, drainResult.getDrainedBytes());
+
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.INTERRUPTED,
+ drainResult.getExitReason());
+ } finally {
+ // Clear the interrupt flag to avoid affecting later tests in the
same thread.
+ Thread.interrupted();
+ }
+ }
+
+ @Test
+ public void testDrainRequestBodyReadError() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new
ErrorServletInputStream(), 8);
+
+ Assertions.assertEquals(0, drainResult.getDrainedBytes());
+ Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.ERROR,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void testDrainRequestBodyEof() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new
EofServletInputStream(), 8);
+
+ Assertions.assertEquals(0, drainResult.getDrainedBytes());
+ Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void testDrainRequestBodyRejectsNonPositiveMaxBytes() {
+ Assertions.assertThrows(IllegalArgumentException.class,
+ () ->
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new
NeverReadyServletInputStream(), 0));
+ }
+
+ @Test
+ public void testDrainRequestBodyReturnsEofWhenReadReturnsNegative() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new
ReadNegativeServletInputStream(), 8);
+
+ Assertions.assertEquals(0, drainResult.getDrainedBytes());
+ Assertions.assertTrue(drainResult.getElapsedMillis() >= 0);
+ Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void testDrainRequestBodyReadZeroTriggersIdleTimeout() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new
ReadZeroServletInputStream(), 8);
+
+ Assertions.assertEquals(0, drainResult.getDrainedBytes());
+ Assertions.assertTrue(drainResult.getElapsedMillis() >= 0);
+
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.IDLE_TIMEOUT,
drainResult.getExitReason());
+ }
+
+ private static class QueueAvailableServletInputStream extends
ServletInputStream {
+ private final byte[] data;
+ private final Queue<Integer> availableValues = new ArrayDeque<>();
+ private int offset = 0;
+
+ QueueAvailableServletInputStream(byte[] data, int... availableValues) {
+ this.data = data;
+ for (int availableValue : availableValues) {
+ this.availableValues.add(availableValue);
+ }
+ }
+
+ @Override
+ public int read() {
+ if (offset >= data.length) {
+ return -1;
+ }
+ return data[offset++] & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ if (offset >= data.length) {
+ return -1;
+ }
+ int readBytes = Math.min(len, data.length - offset);
+ System.arraycopy(data, offset, b, off, readBytes);
+ offset += readBytes;
+ return readBytes;
+ }
+
+ @Override
+ public int available() {
+ if (!availableValues.isEmpty()) {
+ return availableValues.poll();
+ }
+ return Math.max(0, data.length - offset);
+ }
+
+ @Override
+ public boolean isFinished() {
+ return offset >= data.length;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener) {
+ }
+ }
+
+ private static class ErrorServletInputStream extends ServletInputStream {
+ @Override
+ public int read() throws IOException {
+ throw new IOException("read error");
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ throw new IOException("read error");
+ }
+
+ @Override
+ public int available() {
+ return 1;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return false;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener) {
+ }
+ }
+
+ private static class EofServletInputStream extends ServletInputStream {
+ @Override
+ public int read() {
+ return -1;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ return -1;
+ }
+
+ @Override
+ public int available() {
+ return 1;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return true;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener) {
+ }
+ }
+
+ private static class ReadNegativeServletInputStream extends
ServletInputStream {
+ @Override
+ public int read() {
+ return -1;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ return -1;
+ }
+
+ @Override
+ public int available() {
+ return 1;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return false;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener) {
+ }
+ }
+
+ private static class ReadZeroServletInputStream extends ServletInputStream
{
+ @Override
+ public int read() {
+ return 0;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ return 0;
+ }
+
+ @Override
+ public int available() {
+ return 1;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return false;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener) {
+ }
+ }
+
+ // Keep reporting no readable bytes without reaching EOF to simulate a
stalled client.
+ private static class NeverReadyServletInputStream extends
ServletInputStream {
+ @Override
+ public int read() {
+ return -1;
+ }
+
+ @Override
+ public int available() {
+ return 0;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return false;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener) {
+ }
+ }
+}
diff --git
a/regression-test/suites/load_p0/stream_load/scripts/stream_load_redirect_chunked_e2e.py
b/regression-test/suites/load_p0/stream_load/scripts/stream_load_redirect_chunked_e2e.py
new file mode 100644
index 00000000000..5dca0091b7e
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/scripts/stream_load_redirect_chunked_e2e.py
@@ -0,0 +1,140 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Exercise the FE redirect path with a chunked stream load request."""
+
+import argparse
+import base64
+import http.client
+import json
+import sys
+import time
+import uuid
+
+
+def parse_args():
+ parser = argparse.ArgumentParser(
+ description="Send a chunked stream load request to FE and capture the
redirect result."
+ )
+ parser.add_argument("--host", required=True, help="FE host")
+ parser.add_argument("--fe-http-port", required=True, type=int, help="FE
HTTP port")
+ parser.add_argument("--user", required=True, help="FE HTTP user")
+ parser.add_argument("--password", default="", help="FE HTTP password")
+ parser.add_argument("--db", required=True, help="Target database")
+ parser.add_argument("--table", required=True, help="Target table")
+ parser.add_argument("--payload-mb", type=int, default=8, help="Approximate
payload size in MiB")
+ parser.add_argument("--chunk-kb", type=int, default=8, help="Chunk size in
KiB")
+ parser.add_argument("--sleep-ms", type=int, default=10, help="Delay
between chunks in milliseconds")
+ parser.add_argument("--connect-timeout", type=int, default=5,
help="Connect timeout in seconds")
+ parser.add_argument("--read-timeout", type=int, default=120, help="Read
timeout in seconds")
+ return parser.parse_args()
+
+
+def build_auth_header(user, password):
+ token =
base64.b64encode(f"{user}:{password}".encode("utf-8")).decode("ascii")
+ return f"Basic {token}"
+
+
+def build_csv_chunk(chunk_bytes, chunk_index):
+ # Generate deterministic CSV rows so the request body is valid for stream
load.
+ rows = []
+ current = 0
+ row_index = chunk_index * 100000
+ payload_width = max(8, min(256, max(1, chunk_bytes // 8)))
+ while True:
+ row = f"{row_index},payload_{chunk_index}_{'x' * payload_width}\n"
+ row_size = len(row.encode("utf-8"))
+ if rows and current + row_size > chunk_bytes:
+ break
+ rows.append(row)
+ current += row_size
+ row_index += 1
+ if current >= chunk_bytes:
+ break
+ return "".join(rows).encode("utf-8")
+
+
+def chunked_csv_generator(total_bytes, chunk_bytes, sleep_ms):
+ sent = 0
+ chunk_index = 0
+ while sent < total_bytes:
+ current_size = min(chunk_bytes, total_bytes - sent)
+ yield build_csv_chunk(current_size, chunk_index)
+ sent += current_size
+ chunk_index += 1
+ if sleep_ms > 0:
+ time.sleep(sleep_ms / 1000.0)
+
+
+def main():
+ args = parse_args()
+ path = f"/api/{args.db}/{args.table}/_stream_load"
+ conn = http.client.HTTPConnection(args.host, args.fe_http_port,
timeout=args.read_timeout)
+ label = f"stream_load_redirect_regression_{uuid.uuid4().hex[:12]}"
+ total_bytes = args.payload_mb * 1024 * 1024
+ chunk_bytes = args.chunk_kb * 1024
+ started = time.time()
+
+ try:
+ # Send the request body with explicit chunk framing so the client
keeps writing
+ # while FE returns the redirect response.
+ conn.putrequest("PUT", path)
+ conn.putheader("Authorization", build_auth_header(args.user,
args.password))
+ conn.putheader("Expect", "100-continue")
+ conn.putheader("Transfer-Encoding", "chunked")
+ conn.putheader("column_separator", ",")
+ conn.putheader("label", label)
+ conn.endheaders()
+
+ for chunk in chunked_csv_generator(total_bytes, chunk_bytes,
args.sleep_ms):
+ conn.send(f"{len(chunk):X}\r\n".encode("ascii"))
+ conn.send(chunk)
+ conn.send(b"\r\n")
+ conn.send(b"0\r\n\r\n")
+
+ response = conn.getresponse()
+ body = response.read().decode("utf-8", errors="replace")
+ result = {
+ "status_code": response.status,
+ "elapsed_seconds": round(time.time() - started, 3),
+ "headers": dict(response.getheaders()),
+ "body": body[:2000],
+ "exception_type": None,
+ "exception": None,
+ "label": label,
+ }
+ print(json.dumps(result, ensure_ascii=False))
+ return 0
+ except Exception as exc:
+ result = {
+ "status_code": None,
+ "elapsed_seconds": round(time.time() - started, 3),
+ "headers": {},
+ "body": "",
+ "exception_type": type(exc).__name__,
+ "exception": repr(exc),
+ "label": label,
+ }
+ print(json.dumps(result, ensure_ascii=False))
+ return 1
+ finally:
+ conn.close()
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_fe_redirect_chunked_e2e.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_fe_redirect_chunked_e2e.groovy
new file mode 100644
index 00000000000..282f14331ee
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_stream_load_fe_redirect_chunked_e2e.groovy
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_fe_redirect_chunked_e2e", "p0") {
+ String dbName = context.config.getDbNameByFile(context.file)
+ String tableName = "test_stream_load_fe_redirect_chunked_e2e"
+ String helperPath =
"${context.file.parent}/scripts/stream_load_redirect_chunked_e2e.py"
+ String feHttpAddress = context.config.feHttpAddress
+ int feAddressSeparator = feHttpAddress.lastIndexOf(":")
+ assertTrue(feAddressSeparator > 0)
+ String feHost = feHttpAddress.substring(0, feAddressSeparator)
+ String fePort = feHttpAddress.substring(feAddressSeparator + 1)
+
+ sql """ CREATE DATABASE IF NOT EXISTS ${dbName} """
+ sql """ DROP TABLE IF EXISTS ${dbName}.${tableName} """
+ sql """
+ CREATE TABLE ${dbName}.${tableName} (
+ k1 BIGINT,
+ v1 STRING
+ )
+ DUPLICATE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ // Read the current FE values first so the ordinary suite can restore
shared settings.
+ def getFrontendConfigValue = { configKey ->
+ def configRows = sql """ admin show frontend config """
+ def configRow = configRows.find { it[0] == configKey }
+ assertTrue(configRow != null)
+ return configRow[1].toString()
+ }
+
+ String originalDrainMaxBytes =
getFrontendConfigValue("stream_load_redirect_bounded_drain_max_bytes")
+ String originalDrainMaxIdleTimeMs =
getFrontendConfigValue("stream_load_redirect_bounded_drain_max_idle_time_ms")
+
+ // Treat common client-side disconnects as the reproduced historical
failure signal.
+ def reproducedExceptionTypes = ["BrokenPipeError", "ConnectionResetError"]
as Set
+
+ // Keep the helper single-shot and let the regression test control retries
and assertions.
+ def runChunkedStreamLoad = {
+ def command = [
+ "python3",
+ helperPath,
+ "--host", feHost,
+ "--fe-http-port", fePort,
+ "--user", context.config.feHttpUser,
+ "--password", context.config.feHttpPassword == null ? "" :
context.config.feHttpPassword,
+ "--db", dbName,
+ "--table", tableName,
+ "--payload-mb", "8",
+ "--chunk-kb", "8",
+ "--sleep-ms", "10"
+ ]
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.in.text.trim()
+ def err = process.err.text.trim()
+ log.info("stream load redirect helper stdout: ${out}")
+ if (!err.isEmpty()) {
+ log.info("stream load redirect helper stderr: ${err}")
+ }
+ return [code: code, result: parseJson(out)]
+ }
+
+ try {
+ // First disable the drain and reproduce the historical client
disconnect behavior.
+ sql """ admin set frontend
config("stream_load_redirect_bounded_drain_max_bytes" = "0") """
+ sql """ admin set frontend
config("stream_load_redirect_bounded_drain_max_idle_time_ms" = "1") """
+ boolean brokenPipeObserved = false
+ for (int i = 0; i < 5; i++) {
+ def attempt = runChunkedStreamLoad()
+ def result = attempt.result
+ if (reproducedExceptionTypes.contains(result.exception_type)) {
+ brokenPipeObserved = true
+ break
+ }
+ assertEquals(307, result.status_code as Integer)
+ assertTrue(result.exception_type == null)
+ assertTrue(result.exception == null)
+ }
+ assertTrue(brokenPipeObserved)
+
+ // Then enable the drain and verify the same request finishes with a
redirect.
+ sql """ admin set frontend
config("stream_load_redirect_bounded_drain_max_bytes" = "16777216") """
+ sql """ admin set frontend
config("stream_load_redirect_bounded_drain_max_idle_time_ms" = "2000") """
+ for (int i = 0; i < 3; i++) {
+ def attempt = runChunkedStreamLoad()
+ def result = attempt.result
+ assertEquals(0, attempt.code as Integer)
+ assertEquals(307, result.status_code as Integer)
+ assertTrue(result.exception_type == null)
+ assertTrue(result.exception == null)
+
assertTrue(result.headers.Location.contains("/api/${dbName}/${tableName}/_stream_load"))
+ }
+ } finally {
+ // Restore the shared FE settings before leaving the ordinary
regression environment.
+ sql """ admin set frontend
config("stream_load_redirect_bounded_drain_max_bytes" =
"${originalDrainMaxBytes}") """
+ sql """ admin set frontend
config("stream_load_redirect_bounded_drain_max_idle_time_ms" =
"${originalDrainMaxIdleTimeMs}") """
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]