This is an automated email from the ASF dual-hosted git repository.
liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3c9c40fa5fd [fix](fe) Fix broken pipe risk on stream load redirect
with unconsumed request body (#63332)
3c9c40fa5fd is described below
commit 3c9c40fa5fd3d19ec70804afafd911992967b568
Author: Wen Zhenghu <[email protected]>
AuthorDate: Sat May 30 08:28:03 2026 +0800
[fix](fe) Fix broken pipe risk on stream load redirect with unconsumed
request body (#63332)
### What problem does this PR solve?
Issue Number: close #63325
Problem Summary:
**Problem**
- Starting from Doris `3.1.3`, FE uses `Jetty 12`, and this introduced a
compatibility change in the Stream Load redirect path.
- When a Stream Load request is sent to FE, FE may return `307 Temporary
Redirect` before the request body is fully consumed. Under `Jetty 12`,
this behavior is more likely to cause early connection close or reset
while the client is still writing the request body.
- As a result, some `HTTP/1.1` streaming clients may observe errors such
as `BrokenPipeError` or `ConnectionResetError` when sending Stream Load
requests through FE.
- The problem is more visible with chunked uploads, higher network
latency, and clients that continue sending request body data before
fully processing the redirect response.
- In short, this is a compatibility regression introduced by the `Jetty
12` upgrade in Doris `3.1.3` and later.
**Fix**
- We keep the existing FE-to-BE redirect architecture unchanged, so FE
still redirects Stream Load requests to BE instead of proxying the full
request body.
- We add a bounded request-body drain step on the FE Stream Load
redirect path:
- FE first writes the `307 Temporary Redirect` response.
- FE then drains and discards only a bounded amount of the remaining
request body.
- This provides a small compatibility window for in-flight client writes
and reduces the chance of early connection reset.
- We also apply the same handling to token-authenticated Stream Load
requests, so both password-authenticated and token-authenticated paths
behave consistently.
- In addition, we expose Jetty's unconsumed request content read setting
through FE configuration and apply it to HTTP connectors, so operators
can tune Jetty behavior for redirect scenarios where the request body is
not fully consumed.
- To make the compatibility path effective out of the box, this PR also
enables the bounded drain path by default with a `1GB` drain limit and a
`1000ms` idle wait window.
**New Configurations**
- `jetty_server_max_unconsumed_request_content_reads`
- Controls how many extra reads Jetty performs for unconsumed request
content.
- `-1` means unlimited, `0` disables extra reads, and a positive value
sets the maximum number of read attempts.
- Default value in this PR: `-1`.
- This helps tune Jetty behavior after the `Jetty 12` upgrade when FE
returns a response before the request body is fully consumed.
- `stream_load_redirect_bounded_drain_max_bytes`
- Controls the maximum number of request body bytes FE drains after
returning `307` for a Stream Load redirect.
- `0` disables this compatibility logic.
- A positive value enables bounded draining and limits how much data FE
will discard.
- Default value in this PR: `1GB`.
- `stream_load_redirect_bounded_drain_max_idle_time_ms`
- Controls how long FE waits for more readable request body data during
the bounded drain process.
- `0` disables the extra idle wait.
- A positive value provides a small grace window for slow clients or
delayed body chunks, helping absorb in-flight writes without keeping the
connection open indefinitely.
- Default value in this PR: `1000ms`.
**Test Result / Validation**
- Verified the behavior with the same Python `HTTP/1.1` chunked Stream
Load reproduction used during issue analysis.
- Reproduced requests were sent to FE with `Expect: 100-continue`,
`Transfer-Encoding: chunked`, and paced body streaming to maximize the
redirect race window.
- Baseline validation on Doris `3.0` (`9030` / FE `8030`):
- `payload_mb=1`, `chunk_kb=1`, `sleep_ms=0`
- `payload_mb=8`, `chunk_kb=16`, `sleep_ms=10`
- Both requests returned normal `307 Temporary Redirect`.
- Validation on the fixed Doris `3.1.4` instance (`9034` / FE `8034`):
- Before enabling the bounded drain config, the same reproduction still
triggered `BrokenPipeError`.
- After enabling the FE configs below:
- `jetty_server_max_unconsumed_request_content_reads = -1`
- `stream_load_redirect_bounded_drain_max_bytes = 16777216`
- `stream_load_redirect_bounded_drain_max_idle_time_ms = 1000`
- The same two reproduction requests both returned normal `307 Temporary
Redirect`.
- No `BrokenPipeError` or `ConnectionResetError` was observed after the
config took effect.
- The PR now further updates the default bounded drain byte limit from
`16MB` to `1GB`, while keeping the default idle wait at `1000ms`, so the
compatibility path is enabled by default with a more generous drain
window.
**Performance Validation**
- I compared FE redirect response time between the Doris `3.0` baseline
instance (`9030`) and the fixed Doris `3.1.4` instance (`9034`).
- The goal was to check whether the additional bounded drain logic on FE
introduces a noticeable regression compared with the original Jetty 9
behavior.
**Test Setup**
- Reproduction tool: `tools/stream_load_redirect_repro.py`
- Target: FE endpoint on both instances
- Client mode: `httpclient`
- Common parameters:
- `chunk_kb = 16`
- `sleep_ms = 0`
- Payload sizes:
- `32MB`
- `128MB`
- `512MB`
- Each case was executed `3` times on each instance, and the average
`elapsed_seconds` was used for comparison.
**Results**
- `32MB`
- `9030`: `9.515s / 12.032s / 9.727s`
- Average: `10.425s`
- `9034`: `10.695s / 10.847s / 8.763s`
- Average: `10.102s`
- Difference: `9034` was `0.323s` faster, about `3.1%`
- `128MB`
- `9030`: `37.174s / 34.111s / 37.090s`
- Average: `36.125s`
- `9034`: `38.910s / 36.423s / 38.337s`
- Average: `37.890s`
- Difference: `9034` was `1.765s` slower, about `4.9%`
- `512MB`
- `9030`: `157.181s / 161.148s / 174.421s`
- Average: `164.250s`
- `9034`: `172.310s / 176.692s / 160.068s`
- Average: `169.690s`
- Difference: `9034` was `5.440s` slower, about `3.3%`
**Conclusion**
- Across all tested payload sizes, the difference between `9030` and
`9034` stayed within a small range, roughly `-3%` to `+5%`.
- Based on these measurements, the FE bounded drain logic does not show
a significant performance regression compared with the baseline FE
redirect behavior.
- In other words, the fix improves redirect compatibility while keeping
FE redirect response time at a similar level in normal request sizes.
---------
Co-authored-by: yaoxiao <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 24 ++
.../config/WebServerFactoryCustomizerConfig.java | 3 +
.../org/apache/doris/httpv2/rest/LoadAction.java | 152 ++++---
.../doris/httpv2/rest/RestBaseController.java | 51 ++-
.../httpv2/util/StreamLoadRedirectDrainUtil.java | 154 +++++++
.../apache/doris/httpv2/rest/LoadActionTest.java | 445 ++++++++++++++++++++-
.../doris/httpv2/rest/RestBaseControllerTest.java | 64 +++
.../util/StreamLoadRedirectDrainUtilTest.java | 394 ++++++++++++++++++
.../scripts/stream_load_redirect_chunked_e2e.py | 140 +++++++
...test_stream_load_fe_redirect_chunked_e2e.groovy | 117 ++++++
10 files changed, 1459 insertions(+), 85 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index bdd64284c9a..121503e56c4 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -380,6 +380,13 @@ public class Config extends ConfigBase {
@ConfField(description = {"The maximum HTTP POST size of Jetty, in bytes,
the default value is 100MB."})
public static int jetty_server_max_http_post_size = 100 * 1024 * 1024;
+ @ConfField(description = {
+ "Jetty 在应用未消费完请求体时,额外尝试读取剩余内容的最大次数。"
+ + "-1 表示不限制,0 表示不额外读取,正数表示最大读取次数。",
+ "The maximum number of extra reads Jetty performs for unconsumed
request content. "
+ + "-1 means unlimited, 0 means disabled, and a positive
value limits the read attempts."})
+ public static int jetty_server_max_unconsumed_request_content_reads = -1;
+
@ConfField(description = {"The maximum HTTP header size of Jetty, in
bytes, the default value is 1MB."})
public static int jetty_server_max_http_header_size = 1048576;
@@ -3328,6 +3335,23 @@ public class Config extends ConfigBase {
+ "public-private/public/private/direct/random-be and empty
string."})
public static String streamload_redirect_policy = "";
+ @ConfField(mutable = true, description = {
+ "Stream Load redirect 场景下,FE 在返回 307 后额外丢弃请求体的最大字节数。"
+ + "0 表示关闭该兼容逻辑,正数表示最大丢弃字节数。",
+ "The maximum number of request body bytes FE drains after
returning 307 for Stream Load redirects. "
+ + "0 disables the compatibility logic, and a positive
value sets the byte limit."})
+ // Enable a generous bounded drain window by default to preserve FE
redirect compatibility on Jetty 12.
+ public static long stream_load_redirect_bounded_drain_max_bytes = 1024L *
1024 * 1024;
+
+ @ConfField(mutable = true, description = {
+ "Stream Load redirect 场景下,FE 在检测到请求体暂时无可读数据后继续等待的最大空闲时长,单位毫秒。"
+ + "0 表示不额外等待,用于给慢客户端或分段到达的数据保留一个有限的缓冲窗口。",
+ "The maximum idle wait time in milliseconds after FE detects no
readable request body bytes "
+ + "during Stream Load redirect drain. 0 disables the extra
idle wait, while a positive value "
+ + "keeps a bounded grace window for slow clients or
delayed request body chunks."})
+ // Keep a small grace period for delayed body chunks after FE has already
written the redirect.
+ public static int stream_load_redirect_bounded_drain_max_idle_time_ms =
1000;
+
@ConfField(mutable = true, description = {
"Whether to enable group commit streamload BE forward feature in
cloud mode. "
+ "Solves the issue where LB random forwarding breaks
group commit batching "
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/config/WebServerFactoryCustomizerConfig.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/config/WebServerFactoryCustomizerConfig.java
index ead380b260e..e37e60e849e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/config/WebServerFactoryCustomizerConfig.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/config/WebServerFactoryCustomizerConfig.java
@@ -50,6 +50,9 @@ public class WebServerFactoryCustomizerConfig implements
WebServerFactoryCustomi
if (httpFactory != null) {
HttpConfiguration httpConfig =
httpFactory.getHttpConfiguration();
httpConfig.setRequestHeaderSize(Config.jetty_server_max_http_header_size);
+ // Apply the unconsumed request content read limit to
every HTTP connector.
+ httpConfig.setMaxUnconsumedRequestContentReads(
+
Config.jetty_server_max_unconsumed_request_content_reads);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index bea379f4f28..5e058fcba8f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
+import org.apache.doris.httpv2.util.StreamLoadRedirectDrainUtil;
import org.apache.doris.load.StreamLoadHandler;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.GroupCommitPlanner;
@@ -59,8 +60,8 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.view.RedirectView;
+import java.io.IOException;
import java.net.InetAddress;
-import java.net.URI;
import java.util.Enumeration;
import java.util.List;
import java.util.Optional;
@@ -131,7 +132,7 @@ public class LoadAction extends RestBaseController {
if (!checkClusterToken(authToken)) {
throw new UnauthorizedException("Invalid token: " + authToken);
}
- return executeWithClusterToken(request, db, table, true);
+ return executeWithClusterToken(request, response, db, table, true);
} else {
try {
executeCheckPassword(request, response);
@@ -190,8 +191,7 @@ public class LoadAction extends RestBaseController {
LOG.info("redirect load action to destination={}, label: {}",
redirectAddr.toString(), label);
- RedirectView redirectView = redirectTo(request, redirectAddr);
- return redirectView;
+ return createRedirectResponse(request, response, redirectAddr,
true, null, null, label);
} catch (Exception e) {
return new RestBaseResult(e.getMessage());
}
@@ -311,11 +311,17 @@ public class LoadAction extends RestBaseController {
redirectAddr.toString(), isStreamLoad, dbName,
tableName, label);
}
- RedirectView redirectView = redirectTo(request, redirectAddr);
- return redirectView;
+ return createRedirectResponse(request, response, redirectAddr,
isStreamLoad, dbName, tableName, label);
} catch (StreamLoadForwardException e) {
- // Special handling for stream load forwarding
- return e.getRedirectView();
+ // Handle IOException from redirect response generation in the
forwarding path.
+ try {
+ return createRedirectResponse(request, response,
e.getRedirectView(),
+ isStreamLoad, db, table, label);
+ } catch (IOException ioException) {
+ LOG.warn("stream load forward redirect failed, stream: {}, db:
{}, tbl: {}, label: {}, err: {}",
+ isStreamLoad, db, table, label,
ioException.getMessage());
+ return new RestBaseResult(ioException.getMessage());
+ }
} catch (Exception e) {
LOG.warn("load failed, stream: {}, db: {}, tbl: {}, label: {},
err: {}",
isStreamLoad, db, table, label, e.getMessage());
@@ -592,7 +598,7 @@ public class LoadAction extends RestBaseController {
// AuditlogPlugin should be re-disigned carefully, and blow method focuses
on
// temporarily addressing the users' needs for audit logs.
// So this function is not widely tested under general scenario
- private Object executeWithClusterToken(HttpServletRequest request, String
db,
+ private Object executeWithClusterToken(HttpServletRequest request,
HttpServletResponse response, String db,
String table, boolean isStreamLoad) {
try {
ConnectContext ctx = new ConnectContext();
@@ -635,29 +641,7 @@ public class LoadAction extends RestBaseController {
+ "stream: {}, db: {}, tbl: {}, label: {}",
redirectAddr.toString(), isStreamLoad, dbName, tableName,
label);
- URI urlObj = null;
- URI resultUriObj = null;
- String urlStr = request.getRequestURI();
- String userInfo = null;
-
- try {
- urlObj = new URI(urlStr);
- resultUriObj = new URI("http", userInfo,
redirectAddr.getHostname(),
- redirectAddr.getPort(), urlObj.getPath(), "", null);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- String redirectUrl = resultUriObj.toASCIIString();
- if (!Strings.isNullOrEmpty(request.getQueryString())) {
- redirectUrl += request.getQueryString();
- }
- LOG.info("Redirect url: {}", "http://" +
redirectAddr.getHostname() + ":"
- + redirectAddr.getPort() + urlObj.getPath());
- RedirectView redirectView = new RedirectView(redirectUrl);
- redirectView.setContentType("text/html;charset=utf-8");
-
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
-
- return redirectView;
+ return createRedirectResponse(request, response, redirectAddr,
isStreamLoad, dbName, tableName, label);
} catch (Exception e) {
LOG.warn("Failed to execute stream load with cluster token, {}",
e.getMessage(), e);
return new RestBaseResult(e.getMessage());
@@ -677,6 +661,80 @@ public class LoadAction extends RestBaseController {
return headers.toString();
}
+ private Object createRedirectResponse(HttpServletRequest request,
HttpServletResponse response,
+ TNetworkAddress redirectAddr, boolean isStreamLoad, String dbName,
String tableName, String label)
+ throws IOException {
+ String redirectUrl = buildRedirectUrl(request, redirectAddr);
+ if (!shouldUseBoundedDrainForStreamLoad(isStreamLoad)) {
+ return redirectTo(request, redirectAddr);
+ }
+ writeTemporaryRedirect(response, redirectUrl);
+ DrainDecision drainDecision =
decideDrainDecisionForStreamLoadRedirect(request);
+ if (drainDecision != DrainDecision.DRAIN) {
+ LOG.info("skip bounded drain after stream load redirect, target:
{}, db: {}, tbl: {}, label: {},"
+ + " reason: {}",
+ redirectAddr, dbName, tableName, label, drainDecision);
+ return null;
+ }
+ drainStreamLoadRequestBodyAfterRedirect(request,
redirectAddr.toString(), dbName, tableName, label);
+ return null;
+ }
+
+ private Object createRedirectResponse(HttpServletRequest request,
HttpServletResponse response,
+ RedirectView redirectView, boolean isStreamLoad, String dbName,
String tableName, String label)
+ throws IOException {
+ if (!shouldUseBoundedDrainForStreamLoad(isStreamLoad)) {
+ return redirectView;
+ }
+ writeTemporaryRedirect(response, redirectView.getUrl());
+ DrainDecision drainDecision =
decideDrainDecisionForStreamLoadRedirect(request);
+ if (drainDecision != DrainDecision.DRAIN) {
+ LOG.info("skip bounded drain after stream load redirect, target:
{}, db: {}, tbl: {}, label: {},"
+ + " reason: {}",
+ redirectView.getUrl(), dbName, tableName, label,
drainDecision);
+ return null;
+ }
+ drainStreamLoadRequestBodyAfterRedirect(request,
redirectView.getUrl(), dbName, tableName, label);
+ return null;
+ }
+
+ private boolean shouldUseBoundedDrainForStreamLoad(boolean isStreamLoad) {
+ return isStreamLoad &&
Config.stream_load_redirect_bounded_drain_max_bytes > 0;
+ }
+
+ // Skip the bounded drain for header-only probes and oversized
fixed-length bodies.
+ private DrainDecision
decideDrainDecisionForStreamLoadRedirect(HttpServletRequest request) {
+ long contentLength = request.getContentLengthLong();
+ String transferEncoding =
request.getHeader(HttpHeaderNames.TRANSFER_ENCODING.toString());
+ if (contentLength <= 0 && Strings.isNullOrEmpty(transferEncoding)) {
+ return DrainDecision.SKIP_NO_REQUEST_BODY;
+ }
+ if (contentLength >
Config.stream_load_redirect_bounded_drain_max_bytes) {
+ return DrainDecision.SKIP_CONTENT_LENGTH_EXCEEDS_MAX_BYTES;
+ }
+ return DrainDecision.DRAIN;
+ }
+
+ private void drainStreamLoadRequestBodyAfterRedirect(HttpServletRequest
request, String redirectTarget,
+ String dbName, String tableName, String label) {
+ long drainLimit = Config.stream_load_redirect_bounded_drain_max_bytes;
+ LOG.info("write stream load redirect and start bounded drain, target:
{}, db: {}, tbl: {}, label: {},"
+ + " max_drain_bytes: {}",
+ redirectTarget, dbName, tableName, label, drainLimit);
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(request, drainLimit);
+ LOG.info("finish bounded drain after stream load redirect, target: {},
db: {}, tbl: {}, label: {},"
+ + " drained_bytes: {}, elapsed_ms: {}, exit_reason:
{}",
+ redirectTarget, dbName, tableName, label,
drainResult.getDrainedBytes(),
+ drainResult.getElapsedMillis(), drainResult.getExitReason());
+ }
+
+ private enum DrainDecision {
+ SKIP_NO_REQUEST_BODY,
+ SKIP_CONTENT_LENGTH_EXCEEDS_MAX_BYTES,
+ DRAIN
+ }
+
private boolean isSensitiveHeader(String headerName) {
return "Authorization".equalsIgnoreCase(headerName)
|| "Proxy-Authorization".equalsIgnoreCase(headerName)
@@ -738,34 +796,14 @@ public class LoadAction extends RestBaseController {
*/
private RedirectView redirectToStreamLoadForward(HttpServletRequest
request, TNetworkAddress addr,
String forwardTarget) {
- URI urlObj = null;
- URI resultUriObj = null;
- String urlStr = request.getRequestURI();
- String userInfo = null;
- String modifiedPath = null;
-
- if (!Strings.isNullOrEmpty(request.getHeader("Authorization"))) {
- ActionAuthorizationInfo authInfo = getAuthorizationInfo(request);
- userInfo = authInfo.fullUserName + ":" + authInfo.password;
- }
- try {
- urlObj = new URI(urlStr);
- // Replace _stream_load with _stream_load_forward in the path
- modifiedPath = urlObj.getPath().replace("/_stream_load",
"/_stream_load_forward");
- resultUriObj = new URI("http", userInfo, addr.getHostname(),
- addr.getPort(), modifiedPath, "", null);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- String redirectUrl = resultUriObj.toASCIIString();
-
- // Add forward_to parameter (note: toASCIIString() already includes
'?' due to empty query)
+ // Replace _stream_load with _stream_load_forward in the path.
+ String modifiedPath = request.getRequestURI().replace("/_stream_load",
"/_stream_load_forward");
String queryString = request.getQueryString();
+ String redirectQuery = "forward_to=" + forwardTarget;
if (!Strings.isNullOrEmpty(queryString)) {
- redirectUrl += queryString + "&forward_to=" + forwardTarget;
- } else {
- redirectUrl += "forward_to=" + forwardTarget;
+ redirectQuery = queryString + "&" + redirectQuery;
}
+ String redirectUrl = buildRedirectUrl(request, addr, modifiedPath,
redirectQuery);
LOG.info("Redirect stream load forward url: {}, forward_to: {}",
"http://" + addr.getHostname() + ":" + addr.getPort() +
modifiedPath, forwardTarget);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
index 51378a9dc1d..5d55dab3cf2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
@@ -88,36 +88,49 @@ public class RestBaseController extends BaseController {
return authInfo;
}
- public RedirectView redirectTo(HttpServletRequest request, TNetworkAddress
addr) {
- RedirectView redirectView = new RedirectView(getRedirectUrL(request,
addr));
- redirectView.setContentType("text/html;charset=utf-8");
-
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
- return redirectView;
+ protected String buildRedirectUrl(HttpServletRequest request,
TNetworkAddress addr) {
+ return buildRedirectUrl(request, addr, request.getRequestURI(),
request.getQueryString());
}
- public String getRedirectUrL(HttpServletRequest request, TNetworkAddress
addr) {
- URI urlObj = null;
- URI resultUriObj = null;
- String urlStr = request.getRequestURI();
+ protected String buildRedirectUrl(HttpServletRequest request,
TNetworkAddress addr, String requestPath,
+ String queryString) {
String userInfo = null;
if (!Strings.isNullOrEmpty(request.getHeader("Authorization"))) {
ActionAuthorizationInfo authInfo = getAuthorizationInfo(request);
userInfo = authInfo.fullUserName + ":" + authInfo.password;
}
try {
- urlObj = new URI(urlStr);
- resultUriObj = new URI(request.getScheme(), userInfo,
addr.getHostname(),
- addr.getPort(), urlObj.getPath(), "", null);
+ // Preserve the original request path to avoid re-encoding an
already encoded URI path.
+ URI authorityUri = new URI(request.getScheme(), userInfo,
addr.getHostname(),
+ addr.getPort(), null, null, null);
+ String redirectUrl = authorityUri.toASCIIString() + requestPath;
+ if (!Strings.isNullOrEmpty(queryString)) {
+ redirectUrl += "?" + queryString;
+ }
+ LOG.info("Redirect url: {}", request.getScheme() + "://" +
addr.getHostname() + ":"
+ + addr.getPort() + requestPath);
+ return redirectUrl;
} catch (Exception e) {
throw new RuntimeException(e);
}
- String redirectUrl = resultUriObj.toASCIIString();
- if (!Strings.isNullOrEmpty(request.getQueryString())) {
- redirectUrl += request.getQueryString();
- }
- LOG.info("Redirect url: {}", request.getScheme() + "://" +
addr.getHostname() + ":"
- + addr.getPort() + urlObj.getPath());
- return redirectUrl;
+ }
+
+ protected void writeTemporaryRedirect(HttpServletResponse response, String
redirectUrl) throws IOException {
+ response.setContentType("text/html;charset=utf-8");
+ response.setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+ response.setHeader("Location", redirectUrl);
+ response.flushBuffer();
+ }
+
+ public RedirectView redirectTo(HttpServletRequest request, TNetworkAddress
addr) {
+ RedirectView redirectView = new RedirectView(buildRedirectUrl(request,
addr));
+ redirectView.setContentType("text/html;charset=utf-8");
+
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
+ return redirectView;
+ }
+
+ public String getRedirectUrL(HttpServletRequest request, TNetworkAddress
addr) {
+ return buildRedirectUrl(request, addr);
}
public RedirectView redirectToObj(String sign) throws URISyntaxException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtil.java
new file mode 100644
index 00000000000..a9d345ad38d
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtil.java
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.util;
+
+import org.apache.doris.common.Config;
+
+import com.google.common.base.Preconditions;
+import jakarta.servlet.ServletInputStream;
+import jakarta.servlet.http.HttpServletRequest;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+
+public final class StreamLoadRedirectDrainUtil {
+ private static final Logger LOG =
LogManager.getLogger(StreamLoadRedirectDrainUtil.class);
+
+ private static final int BUFFER_SIZE = 8 * 1024;
+ private static final int IDLE_SLEEP_MS = 5;
+
+ private StreamLoadRedirectDrainUtil() {
+ }
+
+ public static DrainResult drainRequestBodyAfterRedirect(HttpServletRequest
request, long maxBytes) {
+ try {
+ return drainRequestBodyAfterRedirect(request.getInputStream(),
maxBytes);
+ } catch (IOException e) {
+ LOG.warn("failed to get request input stream for stream load
redirect drain", e);
+ return new DrainResult(0, 0, ExitReason.ERROR);
+ }
+ }
+
+ static DrainResult drainRequestBodyAfterRedirect(ServletInputStream
inputStream, long maxBytes) {
+ Preconditions.checkArgument(maxBytes > 0, "maxBytes must be positive");
+
+ long startNanos = System.nanoTime();
+ long drainedBytes = 0;
+ long idleStartNanos = -1L;
+ byte[] buffer = new byte[(int) Math.min(BUFFER_SIZE, maxBytes)];
+
+ try {
+ while (drainedBytes < maxBytes) {
+ // Prefer an explicit EOF signal before relying on available()
as a readiness hint.
+ if (inputStream.isFinished()) {
+ return new DrainResult(drainedBytes,
elapsedMillis(startNanos), ExitReason.EOF);
+ }
+ int availableBytes = inputStream.available();
+ if (availableBytes <= 0) {
+ // Allow a bounded idle window so slow clients can still
deliver buffered bytes.
+ idleStartNanos = idleStartNanos < 0 ? System.nanoTime() :
idleStartNanos;
+ DrainResult idleWaitResult =
waitForMoreDataOrExit(idleStartNanos, drainedBytes, startNanos);
+ if (idleWaitResult != null) {
+ return idleWaitResult;
+ }
+ continue;
+ }
+
+ idleStartNanos = -1L;
+ int readLimit = (int) Math.min(Math.min(maxBytes -
drainedBytes, buffer.length), availableBytes);
+ int readBytes = inputStream.read(buffer, 0, readLimit);
+ if (readBytes < 0) {
+ return new DrainResult(drainedBytes,
elapsedMillis(startNanos), ExitReason.EOF);
+ }
+ if (readBytes == 0) {
+ // Treat zero-byte reads as transient backpressure instead
of busy-spinning.
+ idleStartNanos = idleStartNanos < 0 ? System.nanoTime() :
idleStartNanos;
+ DrainResult idleWaitResult =
waitForMoreDataOrExit(idleStartNanos, drainedBytes, startNanos);
+ if (idleWaitResult != null) {
+ return idleWaitResult;
+ }
+ continue;
+ }
+ drainedBytes += readBytes;
+ }
+ return new DrainResult(drainedBytes, elapsedMillis(startNanos),
ExitReason.MAX_BYTES);
+ } catch (IOException e) {
+ LOG.warn("failed while draining request body after stream load
redirect", e);
+ return new DrainResult(drainedBytes, elapsedMillis(startNanos),
ExitReason.ERROR);
+ }
+ }
+
+ // Convert a bounded idle wait into a terminal drain result when the grace
window expires or gets interrupted.
+ private static DrainResult waitForMoreDataOrExit(long idleStartNanos, long
drainedBytes, long startNanos) {
+ if (elapsedMillis(idleStartNanos) >=
Config.stream_load_redirect_bounded_drain_max_idle_time_ms) {
+ return new DrainResult(drainedBytes, elapsedMillis(startNanos),
ExitReason.IDLE_TIMEOUT);
+ }
+ if (!sleepForIdleWindow()) {
+ return new DrainResult(drainedBytes, elapsedMillis(startNanos),
ExitReason.INTERRUPTED);
+ }
+ return null;
+ }
+
+ private static boolean sleepForIdleWindow() {
+ try {
+ Thread.sleep(IDLE_SLEEP_MS);
+ return true;
+ } catch (InterruptedException e) {
+ LOG.warn("stream load redirect drain idle wait is interrupted", e);
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+
+ private static long elapsedMillis(long startNanos) {
+ return (System.nanoTime() - startNanos) / 1_000_000;
+ }
+
+ public enum ExitReason {
+ EOF,
+ MAX_BYTES,
+ IDLE_TIMEOUT,
+ INTERRUPTED,
+ ERROR
+ }
+
+ public static final class DrainResult {
+ private final long drainedBytes;
+ private final long elapsedMillis;
+ private final ExitReason exitReason;
+
+ public DrainResult(long drainedBytes, long elapsedMillis, ExitReason
exitReason) {
+ this.drainedBytes = drainedBytes;
+ this.elapsedMillis = elapsedMillis;
+ this.exitReason = exitReason;
+ }
+
+ public long getDrainedBytes() {
+ return drainedBytes;
+ }
+
+ public long getElapsedMillis() {
+ return elapsedMillis;
+ }
+
+ public ExitReason getExitReason() {
+ return exitReason;
+ }
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
index 28d7cda587c..4d57c25fd71 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/LoadActionTest.java
@@ -17,20 +17,226 @@
package org.apache.doris.httpv2.rest;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import jakarta.servlet.ReadListener;
+import jakarta.servlet.ServletInputStream;
import jakarta.servlet.http.HttpServletRequest;
-import org.junit.Assert;
-import org.junit.Test;
+import jakarta.servlet.http.HttpServletResponse;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.servlet.view.RedirectView;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
public class LoadActionTest {
+ private final boolean originalEnableDebugPoints =
Config.enable_debug_points;
+ private final String originalCloudUniqueId = Config.cloud_unique_id;
+ private final boolean originalEnableGroupCommitStreamLoadBeForward =
+ Config.enable_group_commit_streamload_be_forward;
+
+
+ @AfterEach
+ public void tearDown() {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 1024L * 1024 *
1024;
+ Config.enable_debug_points = originalEnableDebugPoints;
+ Config.cloud_unique_id = originalCloudUniqueId;
+ Config.enable_group_commit_streamload_be_forward =
originalEnableGroupCommitStreamLoadBeForward;
+ DebugPointUtil.clearDebugPoints();
+ Thread.interrupted();
+ org.apache.doris.qe.ConnectContext.remove();
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 1000;
+ }
+
+ @Test
+ public void
testCreateRedirectResponseReturnsRedirectViewWhenBoundedDrainDisabled() throws
Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 0;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest();
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ new TNetworkAddress("be-host", 8040), true, "db1", "tbl1",
"label1");
+
+ Assertions.assertTrue(result instanceof RedirectView);
+ RedirectView redirectView = (RedirectView) result;
+
Assertions.assertEquals("http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar",
redirectView.getUrl());
+ Mockito.verifyNoInteractions(response);
+ }
+
+ @Test
+ public void testCreateRedirectResponseWrites307WhenBoundedDrainEnabled()
throws Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(-1, "chunked",
true);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ new TNetworkAddress("be-host", 8040), true, "db1", "tbl1",
"label1");
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setContentType("text/html;charset=utf-8");
+
Mockito.verify(response).setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar");
+ Mockito.verify(response).flushBuffer();
+ Mockito.verify(request).getInputStream();
+ }
+
+ @Test
+ public void
testCreateRedirectResponseDrainsFixedLengthRequestWithinLimit() throws
Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(4, null, true);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ new TNetworkAddress("be-host", 8040), true, "db1", "tbl1",
"label1");
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar");
+ Mockito.verify(request).getInputStream();
+ }
+
+ @Test
+ public void testCreateRedirectResponseSkipsDrainWithoutRequestBody()
throws Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ new TNetworkAddress("be-host", 8040), true, "db1", "tbl1",
"label1");
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setContentType("text/html;charset=utf-8");
+
Mockito.verify(response).setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar");
+ Mockito.verify(response).flushBuffer();
+ Mockito.verify(request, Mockito.never()).getInputStream();
+ }
+
+ @Test
+ public void testCreateRedirectResponseSkipsDrainWhenContentLengthIsZero()
throws Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(0, null, false);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ new TNetworkAddress("be-host", 8040), true, "db1", "tbl1",
"label1");
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setContentType("text/html;charset=utf-8");
+
Mockito.verify(response).setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar");
+ Mockito.verify(response).flushBuffer();
+ Mockito.verify(request, Mockito.never()).getInputStream();
+ }
+
+ @Test
+ public void
testCreateRedirectResponseSkipsDrainWhenContentLengthExceedsLimit() throws
Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(16, null, false);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ new TNetworkAddress("be-host", 8040), true, "db1", "tbl1",
"label1");
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setContentType("text/html;charset=utf-8");
+
Mockito.verify(response).setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar");
+ Mockito.verify(response).flushBuffer();
+ Mockito.verify(request, Mockito.never()).getInputStream();
+ }
+
+ @Test
+ public void
testCreateRedirectResponseWithRedirectViewReturnsRedirectViewWhenDrainDisabled()
throws Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 0;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+ RedirectView redirectView = new
RedirectView("http://be-host:8040/redirect");
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ redirectView, true, "db1", "tbl1", "label1");
+
+ Assertions.assertSame(redirectView, result);
+ Mockito.verifyNoInteractions(response);
+ Mockito.verify(request, Mockito.never()).getInputStream();
+ }
+
+ @Test
+ public void
testCreateRedirectResponseWithRedirectViewSkipsDrainWithoutRequestBody() throws
Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+ RedirectView redirectView = new
RedirectView("http://be-host:8040/redirect");
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ redirectView, true, "db1", "tbl1", "label1");
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/redirect");
+ Mockito.verify(request, Mockito.never()).getInputStream();
+ }
+
+ @Test
+ public void
testCreateRedirectResponseWithRedirectViewDrainsChunkedRequest() throws
Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(-1, "chunked",
true);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+ RedirectView redirectView = new
RedirectView("http://be-host:8040/redirect");
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ redirectView, true, "db1", "tbl1", "label1");
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/redirect");
+ Mockito.verify(request).getInputStream();
+ }
+
+ @Test
+ public void testCreateRedirectResponseKeepsNonStreamLoadBehavior() throws
Exception {
+ Config.stream_load_redirect_bounded_drain_max_bytes = 8;
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ LoadAction loadAction = new LoadAction();
+ HttpServletRequest request = mockStreamLoadRequest();
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+
+ Object result = invokeCreateRedirectResponse(loadAction, request,
response,
+ new TNetworkAddress("be-host", 8040), false, "db1", "tbl1",
"label1");
+
+ Assertions.assertTrue(result instanceof RedirectView);
+ Mockito.verifyNoInteractions(response);
+ }
@Test
public void testGetAllHeadersMasksSensitiveHeaders() throws Exception {
- LoadAction action = new LoadAction();
+ LoadAction loadAction = new LoadAction();
HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
Mockito.when(request.getHeaderNames()).thenReturn(Collections.enumeration(Arrays.asList(
"Authorization", "Cookie", "Set-Cookie", "token", "label")));
@@ -38,12 +244,233 @@ public class LoadActionTest {
Method method = LoadAction.class.getDeclaredMethod("getAllHeaders",
HttpServletRequest.class);
method.setAccessible(true);
- String headers = (String) method.invoke(action, request);
+ String headers = (String) method.invoke(loadAction, request);
+
+ Assertions.assertTrue(headers.contains("Authorization:***MASKED***"));
+ Assertions.assertTrue(headers.contains("Cookie:***MASKED***"));
+ Assertions.assertTrue(headers.contains("Set-Cookie:***MASKED***"));
+ Assertions.assertTrue(headers.contains("token:***MASKED***"));
+ Assertions.assertTrue(headers.contains("label:load_label"));
+ }
+
+ @Test
+ public void testExecuteWithoutPasswordRedirectsToBackend() throws
Exception {
+ Config.enable_debug_points = true;
+
DebugPointUtil.addDebugPointWithValue("LoadAction.selectRedirectBackend.backendId",
1L);
+ TestLoadAction loadAction = new TestLoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+ Backend backend = mockBackend("be-host", 8040, null);
+ SystemInfoService systemInfoService =
Mockito.mock(SystemInfoService.class);
+ Mockito.when(systemInfoService.getBackend(1L)).thenReturn(backend);
+ Mockito.when(request.getHeader("expect")).thenReturn("100-continue");
+
+ org.apache.doris.qe.ConnectContext connectContext = new
org.apache.doris.qe.ConnectContext();
+ connectContext.setThreadLocalInfo();
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService);
+
+ Object result = invokeExecuteWithoutPassword(loadAction, request,
response, "db1", "tbl1", true, false);
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar");
+ }
+ }
+
+ @Test
+ public void testExecuteWithClusterTokenRedirectsToBackend() throws
Exception {
+ Config.enable_debug_points = true;
+
DebugPointUtil.addDebugPointWithValue("LoadAction.selectRedirectBackend.backendId",
1L);
+ TestLoadAction loadAction = new TestLoadAction();
+ HttpServletRequest request = mockStreamLoadRequest(-1, null, false);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+ Backend backend = mockBackend("be-host", 8040, null);
+ SystemInfoService systemInfoService =
Mockito.mock(SystemInfoService.class);
+ Env env = Mockito.mock(Env.class);
+ InternalCatalog internalCatalog = Mockito.mock(InternalCatalog.class);
+ Mockito.when(systemInfoService.getBackend(1L)).thenReturn(backend);
+ // Provide the default catalog name required by
ConnectContext.setEnv().
+ Mockito.when(env.getInternalCatalog()).thenReturn(internalCatalog);
+ Mockito.when(internalCatalog.getName()).thenReturn("internal");
+ Mockito.when(request.getHeader("expect")).thenReturn("100-continue");
+ Mockito.when(request.getHeader("label")).thenReturn("label1");
+ Mockito.when(request.getRemoteAddr()).thenReturn("127.0.0.1");
+
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService);
+ mockedEnv.when(Env::getCurrentEnv).thenReturn(env);
+
+ Object result = invokeExecuteWithClusterToken(loadAction, request,
response, "db1", "tbl1", true);
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar");
+ }
+ }
+
+ @Test
+ public void testStreamLoadWithSqlRedirectsWhenExpectHeaderExists() {
+ Config.enable_debug_points = true;
+
DebugPointUtil.addDebugPointWithValue("LoadAction.selectRedirectBackend.backendId",
1L);
+ TestLoadAction loadAction = new TestLoadAction();
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+ Backend backend = mockBackend("be-host", 8040, null);
+ SystemInfoService systemInfoService =
Mockito.mock(SystemInfoService.class);
+ Mockito.when(systemInfoService.getBackend(1L)).thenReturn(backend);
+ Mockito.when(request.getHeader("sql")).thenReturn("insert into
db1.tbl1 values(1)");
+ Mockito.when(request.getHeader("expect")).thenReturn("100-continue");
+
Mockito.when(request.getRequestURI()).thenReturn("/api/db1/tbl1/_stream_load");
+ Mockito.when(request.getQueryString()).thenReturn("foo=bar");
+ Mockito.when(request.getScheme()).thenReturn("http");
+ Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+ Mockito.when(request.getContentLengthLong()).thenReturn(-1L);
+ Mockito.when(request.getHeader("transfer-encoding")).thenReturn(null);
+
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService);
+
+ Object result = loadAction.streamLoadWithSql(request, response);
+
+ Assertions.assertNull(result);
+ Mockito.verify(response).setHeader("Location",
"http://be-host:8040/api/db1/tbl1/_stream_load?foo=bar");
+ }
+ }
+
+ @Test
+ public void testRedirectToStreamLoadForwardBuildsForwardUrl() throws
Exception {
+ TestLoadAction loadAction = new TestLoadAction();
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+
Mockito.when(request.getRequestURI()).thenReturn("/api/db1/tbl1/_stream_load");
+ Mockito.when(request.getQueryString()).thenReturn("k=v");
+ Mockito.when(request.getScheme()).thenReturn("http");
+ Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+
+ RedirectView redirectView =
invokeRedirectToStreamLoadForward(loadAction, request,
+ new TNetworkAddress("lb-host", 18040), "be-host:8040");
+
+
Assertions.assertEquals("http://lb-host:18040/api/db1/tbl1/_stream_load_forward?k=v&forward_to=be-host:8040",
+ redirectView.getUrl());
+ }
+
+ private Object invokeCreateRedirectResponse(LoadAction loadAction,
HttpServletRequest request,
+ HttpServletResponse response, TNetworkAddress redirectAddr,
boolean isStreamLoad, String dbName,
+ String tableName, String label) throws Exception {
+ Method method =
LoadAction.class.getDeclaredMethod("createRedirectResponse",
+ HttpServletRequest.class, HttpServletResponse.class,
TNetworkAddress.class,
+ boolean.class, String.class, String.class, String.class);
+ method.setAccessible(true);
+ return method.invoke(loadAction, request, response, redirectAddr,
isStreamLoad, dbName, tableName, label);
+ }
+
+ private Object invokeCreateRedirectResponse(LoadAction loadAction,
HttpServletRequest request,
+ HttpServletResponse response, RedirectView redirectView, boolean
isStreamLoad, String dbName,
+ String tableName, String label) throws Exception {
+ Method method =
LoadAction.class.getDeclaredMethod("createRedirectResponse",
+ HttpServletRequest.class, HttpServletResponse.class,
RedirectView.class,
+ boolean.class, String.class, String.class, String.class);
+ method.setAccessible(true);
+ return method.invoke(loadAction, request, response, redirectView,
isStreamLoad, dbName, tableName, label);
+ }
+
+ private Object invokeExecuteWithoutPassword(LoadAction loadAction,
HttpServletRequest request,
+ HttpServletResponse response, String db, String table, boolean
isStreamLoad, boolean groupCommit)
+ throws Exception {
+ Method method =
LoadAction.class.getDeclaredMethod("executeWithoutPassword",
+ HttpServletRequest.class, HttpServletResponse.class,
String.class, String.class,
+ boolean.class, boolean.class);
+ method.setAccessible(true);
+ return method.invoke(loadAction, request, response, db, table,
isStreamLoad, groupCommit);
+ }
+
+ private Object invokeExecuteWithClusterToken(LoadAction loadAction,
HttpServletRequest request,
+ HttpServletResponse response, String db, String table, boolean
isStreamLoad) throws Exception {
+ Method method =
LoadAction.class.getDeclaredMethod("executeWithClusterToken",
+ HttpServletRequest.class, HttpServletResponse.class,
String.class, String.class, boolean.class);
+ method.setAccessible(true);
+ return method.invoke(loadAction, request, response, db, table,
isStreamLoad);
+ }
+
+ private RedirectView invokeRedirectToStreamLoadForward(LoadAction
loadAction, HttpServletRequest request,
+ TNetworkAddress addr, String forwardTarget) throws Exception {
+ Method method =
LoadAction.class.getDeclaredMethod("redirectToStreamLoadForward",
+ HttpServletRequest.class, TNetworkAddress.class, String.class);
+ method.setAccessible(true);
+ return (RedirectView) method.invoke(loadAction, request, addr,
forwardTarget);
+ }
+
+ private HttpServletRequest mockStreamLoadRequest() throws Exception {
+ return mockStreamLoadRequest(-1, null, true);
+ }
+
+ // Provide explicit request body metadata so the drain decision can be
verified in isolation.
+ private HttpServletRequest mockStreamLoadRequest(long contentLength,
String transferEncoding,
+ boolean stubInputStream) throws Exception {
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ Mockito.when(request.getScheme()).thenReturn("http");
+
Mockito.when(request.getRequestURI()).thenReturn("/api/db1/tbl1/_stream_load");
+ Mockito.when(request.getQueryString()).thenReturn("foo=bar");
+ Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+ Mockito.when(request.getContentLengthLong()).thenReturn(contentLength);
+
Mockito.when(request.getHeader("Transfer-Encoding")).thenReturn(transferEncoding);
+
Mockito.when(request.getHeader("transfer-encoding")).thenReturn(transferEncoding);
+ if (stubInputStream) {
+ Mockito.when(request.getInputStream()).thenReturn(new
IdleServletInputStream());
+ }
+ return request;
+ }
+
+ // Create a backend mock with only the redirect-related fields populated
for lightweight controller tests.
+ private Backend mockBackend(String host, int httpPort, String
privateEndpoint) {
+ Backend backend = Mockito.mock(Backend.class);
+ Mockito.when(backend.getHost()).thenReturn(host);
+ Mockito.when(backend.getHttpPort()).thenReturn(httpPort);
+ Mockito.when(backend.getPrivateEndpoint()).thenReturn(privateEndpoint);
+ Mockito.when(backend.getPublicEndpoint()).thenReturn(null);
+ return backend;
+ }
+
+ private static class TestLoadAction extends LoadAction {
+ @Override
+ public
org.apache.doris.httpv2.controller.BaseController.ActionAuthorizationInfo
executeCheckPassword(
+ HttpServletRequest request,
+ HttpServletResponse response) {
+ return new
org.apache.doris.httpv2.controller.BaseController.ActionAuthorizationInfo();
+ }
+
+ @Override
+ protected void checkTblAuth(org.apache.doris.analysis.UserIdentity
currentUser, String db, String tbl,
+ org.apache.doris.mysql.privilege.PrivPredicate predicate) {
+ }
+
+ @Override
+ protected boolean checkClusterToken(String token) {
+ return true;
+ }
+ }
+
+ private static class IdleServletInputStream extends ServletInputStream {
+ @Override
+ public int read() {
+ return -1;
+ }
+
+ @Override
+ public int available() {
+ return 0;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return false;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
- Assert.assertTrue(headers.contains("Authorization:***MASKED***"));
- Assert.assertTrue(headers.contains("Cookie:***MASKED***"));
- Assert.assertTrue(headers.contains("Set-Cookie:***MASKED***"));
- Assert.assertTrue(headers.contains("token:***MASKED***"));
- Assert.assertTrue(headers.contains("label:load_label"));
+ @Override
+ public void setReadListener(ReadListener readListener) {
+ }
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/RestBaseControllerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/RestBaseControllerTest.java
new file mode 100644
index 00000000000..b838e25ef50
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/rest/RestBaseControllerTest.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.rest;
+
+import org.apache.doris.thrift.TNetworkAddress;
+
+import jakarta.servlet.http.HttpServletRequest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class RestBaseControllerTest {
+
+ @Test
+ public void testBuildRedirectUrlPreservesEncodedPath() {
+ // Keep the original encoded path unchanged when rebuilding the
redirect URL.
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ Mockito.when(request.getScheme()).thenReturn("http");
+ Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+
+ TestRestController controller = new TestRestController();
+ String redirectUrl = controller.buildRedirectUrlForTest(request,
+ new TNetworkAddress("be-host", 8040),
"/api/db%2Ftbl/_stream_load", "k=a%2Bb");
+
+
Assert.assertEquals("http://be-host:8040/api/db%2Ftbl/_stream_load?k=a%2Bb",
redirectUrl);
+ }
+
+ @Test
+ public void testBuildRedirectUrlWithoutQueryString() {
+ // Avoid appending a dangling question mark when the original request
has no query string.
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ Mockito.when(request.getScheme()).thenReturn("http");
+ Mockito.when(request.getHeader("Authorization")).thenReturn(null);
+
+ TestRestController controller = new TestRestController();
+ String redirectUrl = controller.buildRedirectUrlForTest(request,
+ new TNetworkAddress("be-host", 8040),
"/api/db%2Ftbl/_stream_load", null);
+
+ Assert.assertEquals("http://be-host:8040/api/db%2Ftbl/_stream_load",
redirectUrl);
+ }
+
+ // Expose the protected helper so the redirect URL can be verified
directly.
+ private static class TestRestController extends RestBaseController {
+ private String buildRedirectUrlForTest(HttpServletRequest request,
TNetworkAddress addr,
+ String requestPath, String queryString) {
+ return buildRedirectUrl(request, addr, requestPath, queryString);
+ }
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtilTest.java
new file mode 100644
index 00000000000..8a8d6cae165
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/httpv2/util/StreamLoadRedirectDrainUtilTest.java
@@ -0,0 +1,394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.util;
+
+import org.apache.doris.common.Config;
+
+import jakarta.servlet.ReadListener;
+import jakarta.servlet.ServletInputStream;
+import jakarta.servlet.http.HttpServletRequest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+public class StreamLoadRedirectDrainUtilTest {
+
+ @AfterEach
+ public void tearDown() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 1000;
+ }
+
+ @Test
+ public void testDrainRequestBodyWithinMaxBytes() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(
+ new
QueueAvailableServletInputStream("hello".getBytes(), 5, 0, 0, 0), 16);
+
+ Assertions.assertEquals(5, drainResult.getDrainedBytes());
+ Assertions.assertTrue(drainResult.getElapsedMillis() >= 0);
+ Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void testDrainRequestBodyAfterRedirectUsesRequestInputStream()
throws Exception {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ Mockito.when(request.getInputStream())
+ .thenReturn(new
QueueAvailableServletInputStream("hello".getBytes(), 5, 0, 0, 0));
+
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(request, 16);
+
+ Assertions.assertEquals(5, drainResult.getDrainedBytes());
+ Assertions.assertTrue(drainResult.getElapsedMillis() >= 0);
+ Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void
testDrainRequestBodyAfterRedirectReturnsErrorWhenGetInputStreamFails() throws
Exception {
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ Mockito.when(request.getInputStream()).thenThrow(new IOException("open
error"));
+
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(request, 16);
+
+ Assertions.assertEquals(0, drainResult.getDrainedBytes());
+ Assertions.assertEquals(0, drainResult.getElapsedMillis());
+ Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.ERROR,
drainResult.getExitReason());
+ }
+
+ @Test
+ // Verify delayed body chunks are still drained when they arrive within
the bounded idle window.
+ public void testDrainRequestBodyAllowsDelayedArrivalWithinIdleWindow() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(
+ new
QueueAvailableServletInputStream("hello".getBytes(), 0, 0, 0, 0, 0, 5), 16);
+
+ Assertions.assertEquals(5, drainResult.getDrainedBytes());
+ Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void testDrainRequestBodyStopsAtMaxBytes() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(
+ new QueueAvailableServletInputStream("hello
world".getBytes(), 11), 5);
+
+ Assertions.assertEquals(5, drainResult.getDrainedBytes());
+
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.MAX_BYTES,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void testDrainRequestBodyIdleTimeout() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new
NeverReadyServletInputStream(), 8);
+
+ Assertions.assertEquals(0, drainResult.getDrainedBytes());
+
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.IDLE_TIMEOUT,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void testDrainRequestBodyUsesConfiguredIdleTime() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(
+ new
QueueAvailableServletInputStream("hello".getBytes(), 0, 5), 16);
+
+ Assertions.assertEquals(0, drainResult.getDrainedBytes());
+
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.IDLE_TIMEOUT,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void testDrainRequestBodyInterruptedWhileWaitingForMoreData() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ try {
+ Thread.currentThread().interrupt();
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new
NeverReadyServletInputStream(), 8);
+
+ Assertions.assertEquals(0, drainResult.getDrainedBytes());
+
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.INTERRUPTED,
+ drainResult.getExitReason());
+ } finally {
+ // Clear the interrupt flag to avoid affecting later tests in the
same thread.
+ Thread.interrupted();
+ }
+ }
+
+ @Test
+ public void testDrainRequestBodyReadError() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new
ErrorServletInputStream(), 8);
+
+ Assertions.assertEquals(0, drainResult.getDrainedBytes());
+ Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.ERROR,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void testDrainRequestBodyEof() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new
EofServletInputStream(), 8);
+
+ Assertions.assertEquals(0, drainResult.getDrainedBytes());
+ Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void testDrainRequestBodyRejectsNonPositiveMaxBytes() {
+ Assertions.assertThrows(IllegalArgumentException.class,
+ () ->
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new
NeverReadyServletInputStream(), 0));
+ }
+
+ @Test
+ public void testDrainRequestBodyReturnsEofWhenReadReturnsNegative() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 100;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new
ReadNegativeServletInputStream(), 8);
+
+ Assertions.assertEquals(0, drainResult.getDrainedBytes());
+ Assertions.assertTrue(drainResult.getElapsedMillis() >= 0);
+ Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.EOF,
drainResult.getExitReason());
+ }
+
+ @Test
+ public void testDrainRequestBodyReadZeroTriggersIdleTimeout() {
+ Config.stream_load_redirect_bounded_drain_max_idle_time_ms = 0;
+ StreamLoadRedirectDrainUtil.DrainResult drainResult =
+ StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(new
ReadZeroServletInputStream(), 8);
+
+ Assertions.assertEquals(0, drainResult.getDrainedBytes());
+ Assertions.assertTrue(drainResult.getElapsedMillis() >= 0);
+
Assertions.assertEquals(StreamLoadRedirectDrainUtil.ExitReason.IDLE_TIMEOUT,
drainResult.getExitReason());
+ }
+
+ private static class QueueAvailableServletInputStream extends
ServletInputStream {
+ private final byte[] data;
+ private final Queue<Integer> availableValues = new ArrayDeque<>();
+ private int offset = 0;
+
+ QueueAvailableServletInputStream(byte[] data, int... availableValues) {
+ this.data = data;
+ for (int availableValue : availableValues) {
+ this.availableValues.add(availableValue);
+ }
+ }
+
+ @Override
+ public int read() {
+ if (offset >= data.length) {
+ return -1;
+ }
+ return data[offset++] & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ if (offset >= data.length) {
+ return -1;
+ }
+ int readBytes = Math.min(len, data.length - offset);
+ System.arraycopy(data, offset, b, off, readBytes);
+ offset += readBytes;
+ return readBytes;
+ }
+
+ @Override
+ public int available() {
+ if (!availableValues.isEmpty()) {
+ return availableValues.poll();
+ }
+ return Math.max(0, data.length - offset);
+ }
+
+ @Override
+ public boolean isFinished() {
+ return offset >= data.length;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener) {
+ }
+ }
+
+ private static class ErrorServletInputStream extends ServletInputStream {
+ @Override
+ public int read() throws IOException {
+ throw new IOException("read error");
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ throw new IOException("read error");
+ }
+
+ @Override
+ public int available() {
+ return 1;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return false;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener) {
+ }
+ }
+
+ private static class EofServletInputStream extends ServletInputStream {
+ @Override
+ public int read() {
+ return -1;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ return -1;
+ }
+
+ @Override
+ public int available() {
+ return 1;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return true;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener) {
+ }
+ }
+
+ private static class ReadNegativeServletInputStream extends
ServletInputStream {
+ @Override
+ public int read() {
+ return -1;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ return -1;
+ }
+
+ @Override
+ public int available() {
+ return 1;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return false;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener) {
+ }
+ }
+
+ private static class ReadZeroServletInputStream extends ServletInputStream
{
+ @Override
+ public int read() {
+ return 0;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ return 0;
+ }
+
+ @Override
+ public int available() {
+ return 1;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return false;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener) {
+ }
+ }
+
+ // Keep reporting no readable bytes without reaching EOF to simulate a
stalled client.
+ private static class NeverReadyServletInputStream extends
ServletInputStream {
+ @Override
+ public int read() {
+ return -1;
+ }
+
+ @Override
+ public int available() {
+ return 0;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return false;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener) {
+ }
+ }
+}
diff --git
a/regression-test/suites/load_p0/stream_load/scripts/stream_load_redirect_chunked_e2e.py
b/regression-test/suites/load_p0/stream_load/scripts/stream_load_redirect_chunked_e2e.py
new file mode 100644
index 00000000000..5dca0091b7e
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/scripts/stream_load_redirect_chunked_e2e.py
@@ -0,0 +1,140 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Exercise the FE redirect path with a chunked stream load request."""
+
+import argparse
+import base64
+import http.client
+import json
+import sys
+import time
+import uuid
+
+
+def parse_args():
+ parser = argparse.ArgumentParser(
+ description="Send a chunked stream load request to FE and capture the
redirect result."
+ )
+ parser.add_argument("--host", required=True, help="FE host")
+ parser.add_argument("--fe-http-port", required=True, type=int, help="FE
HTTP port")
+ parser.add_argument("--user", required=True, help="FE HTTP user")
+ parser.add_argument("--password", default="", help="FE HTTP password")
+ parser.add_argument("--db", required=True, help="Target database")
+ parser.add_argument("--table", required=True, help="Target table")
+ parser.add_argument("--payload-mb", type=int, default=8, help="Approximate
payload size in MiB")
+ parser.add_argument("--chunk-kb", type=int, default=8, help="Chunk size in
KiB")
+ parser.add_argument("--sleep-ms", type=int, default=10, help="Delay
between chunks in milliseconds")
+ parser.add_argument("--connect-timeout", type=int, default=5,
help="Connect timeout in seconds")
+ parser.add_argument("--read-timeout", type=int, default=120, help="Read
timeout in seconds")
+ return parser.parse_args()
+
+
+def build_auth_header(user, password):
+ token =
base64.b64encode(f"{user}:{password}".encode("utf-8")).decode("ascii")
+ return f"Basic {token}"
+
+
+def build_csv_chunk(chunk_bytes, chunk_index):
+ # Generate deterministic CSV rows so the request body is valid for stream
load.
+ rows = []
+ current = 0
+ row_index = chunk_index * 100000
+ payload_width = max(8, min(256, max(1, chunk_bytes // 8)))
+ while True:
+ row = f"{row_index},payload_{chunk_index}_{'x' * payload_width}\n"
+ row_size = len(row.encode("utf-8"))
+ if rows and current + row_size > chunk_bytes:
+ break
+ rows.append(row)
+ current += row_size
+ row_index += 1
+ if current >= chunk_bytes:
+ break
+ return "".join(rows).encode("utf-8")
+
+
+def chunked_csv_generator(total_bytes, chunk_bytes, sleep_ms):
+ sent = 0
+ chunk_index = 0
+ while sent < total_bytes:
+ current_size = min(chunk_bytes, total_bytes - sent)
+ yield build_csv_chunk(current_size, chunk_index)
+ sent += current_size
+ chunk_index += 1
+ if sleep_ms > 0:
+ time.sleep(sleep_ms / 1000.0)
+
+
+def main():
+ args = parse_args()
+ path = f"/api/{args.db}/{args.table}/_stream_load"
+ conn = http.client.HTTPConnection(args.host, args.fe_http_port,
timeout=args.read_timeout)
+ label = f"stream_load_redirect_regression_{uuid.uuid4().hex[:12]}"
+ total_bytes = args.payload_mb * 1024 * 1024
+ chunk_bytes = args.chunk_kb * 1024
+ started = time.time()
+
+ try:
+ # Send the request body with explicit chunk framing so the client
keeps writing
+ # while FE returns the redirect response.
+ conn.putrequest("PUT", path)
+ conn.putheader("Authorization", build_auth_header(args.user,
args.password))
+ conn.putheader("Expect", "100-continue")
+ conn.putheader("Transfer-Encoding", "chunked")
+ conn.putheader("column_separator", ",")
+ conn.putheader("label", label)
+ conn.endheaders()
+
+ for chunk in chunked_csv_generator(total_bytes, chunk_bytes,
args.sleep_ms):
+ conn.send(f"{len(chunk):X}\r\n".encode("ascii"))
+ conn.send(chunk)
+ conn.send(b"\r\n")
+ conn.send(b"0\r\n\r\n")
+
+ response = conn.getresponse()
+ body = response.read().decode("utf-8", errors="replace")
+ result = {
+ "status_code": response.status,
+ "elapsed_seconds": round(time.time() - started, 3),
+ "headers": dict(response.getheaders()),
+ "body": body[:2000],
+ "exception_type": None,
+ "exception": None,
+ "label": label,
+ }
+ print(json.dumps(result, ensure_ascii=False))
+ return 0
+ except Exception as exc:
+ result = {
+ "status_code": None,
+ "elapsed_seconds": round(time.time() - started, 3),
+ "headers": {},
+ "body": "",
+ "exception_type": type(exc).__name__,
+ "exception": repr(exc),
+ "label": label,
+ }
+ print(json.dumps(result, ensure_ascii=False))
+ return 1
+ finally:
+ conn.close()
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_fe_redirect_chunked_e2e.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_fe_redirect_chunked_e2e.groovy
new file mode 100644
index 00000000000..282f14331ee
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_stream_load_fe_redirect_chunked_e2e.groovy
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_fe_redirect_chunked_e2e", "p0") {
+ String dbName = context.config.getDbNameByFile(context.file)
+ String tableName = "test_stream_load_fe_redirect_chunked_e2e"
+ String helperPath =
"${context.file.parent}/scripts/stream_load_redirect_chunked_e2e.py"
+ String feHttpAddress = context.config.feHttpAddress
+ int feAddressSeparator = feHttpAddress.lastIndexOf(":")
+ assertTrue(feAddressSeparator > 0)
+ String feHost = feHttpAddress.substring(0, feAddressSeparator)
+ String fePort = feHttpAddress.substring(feAddressSeparator + 1)
+
+ sql """ CREATE DATABASE IF NOT EXISTS ${dbName} """
+ sql """ DROP TABLE IF EXISTS ${dbName}.${tableName} """
+ sql """
+ CREATE TABLE ${dbName}.${tableName} (
+ k1 BIGINT,
+ v1 STRING
+ )
+ DUPLICATE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ // Read the current FE values first so the ordinary suite can restore
shared settings.
+ def getFrontendConfigValue = { configKey ->
+ def configRows = sql """ admin show frontend config """
+ def configRow = configRows.find { it[0] == configKey }
+ assertTrue(configRow != null)
+ return configRow[1].toString()
+ }
+
+ String originalDrainMaxBytes =
getFrontendConfigValue("stream_load_redirect_bounded_drain_max_bytes")
+ String originalDrainMaxIdleTimeMs =
getFrontendConfigValue("stream_load_redirect_bounded_drain_max_idle_time_ms")
+
+ // Treat common client-side disconnects as the reproduced historical
failure signal.
+ def reproducedExceptionTypes = ["BrokenPipeError", "ConnectionResetError"]
as Set
+
+ // Keep the helper single-shot and let the regression test control retries
and assertions.
+ def runChunkedStreamLoad = {
+ def command = [
+ "python3",
+ helperPath,
+ "--host", feHost,
+ "--fe-http-port", fePort,
+ "--user", context.config.feHttpUser,
+ "--password", context.config.feHttpPassword == null ? "" :
context.config.feHttpPassword,
+ "--db", dbName,
+ "--table", tableName,
+ "--payload-mb", "8",
+ "--chunk-kb", "8",
+ "--sleep-ms", "10"
+ ]
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.in.text.trim()
+ def err = process.err.text.trim()
+ log.info("stream load redirect helper stdout: ${out}")
+ if (!err.isEmpty()) {
+ log.info("stream load redirect helper stderr: ${err}")
+ }
+ return [code: code, result: parseJson(out)]
+ }
+
+ try {
+ // First disable the drain and reproduce the historical client
disconnect behavior.
+ sql """ admin set frontend
config("stream_load_redirect_bounded_drain_max_bytes" = "0") """
+ sql """ admin set frontend
config("stream_load_redirect_bounded_drain_max_idle_time_ms" = "1") """
+ boolean brokenPipeObserved = false
+ for (int i = 0; i < 5; i++) {
+ def attempt = runChunkedStreamLoad()
+ def result = attempt.result
+ if (reproducedExceptionTypes.contains(result.exception_type)) {
+ brokenPipeObserved = true
+ break
+ }
+ assertEquals(307, result.status_code as Integer)
+ assertTrue(result.exception_type == null)
+ assertTrue(result.exception == null)
+ }
+ assertTrue(brokenPipeObserved)
+
+ // Then enable the drain and verify the same request finishes with a
redirect.
+ sql """ admin set frontend
config("stream_load_redirect_bounded_drain_max_bytes" = "16777216") """
+ sql """ admin set frontend
config("stream_load_redirect_bounded_drain_max_idle_time_ms" = "2000") """
+ for (int i = 0; i < 3; i++) {
+ def attempt = runChunkedStreamLoad()
+ def result = attempt.result
+ assertEquals(0, attempt.code as Integer)
+ assertEquals(307, result.status_code as Integer)
+ assertTrue(result.exception_type == null)
+ assertTrue(result.exception == null)
+
assertTrue(result.headers.Location.contains("/api/${dbName}/${tableName}/_stream_load"))
+ }
+ } finally {
+ // Restore the shared FE settings before leaving the ordinary
regression environment.
+ sql """ admin set frontend
config("stream_load_redirect_bounded_drain_max_bytes" =
"${originalDrainMaxBytes}") """
+ sql """ admin set frontend
config("stream_load_redirect_bounded_drain_max_idle_time_ms" =
"${originalDrainMaxIdleTimeMs}") """
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]