This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new f207976f0a2 [fix](stream_load)fix bug when stream without
content-length or chunk… #27752 (#29169)
f207976f0a2 is described below
commit f207976f0a20f2cd653580395b7496666d06bc3c
Author: xy <[email protected]>
AuthorDate: Sat Dec 30 09:16:05 2023 +0800
[fix](stream_load)fix bug when stream without content-length or chunk…
#27752 (#29169)
---
be/src/http/action/stream_load.cpp | 14 +++++
.../load_p0/stream_load/test_stream_load.groovy | 70 ++++++++++++++++++++++
2 files changed, 84 insertions(+)
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index d8c49f7c147..e23a0344d64 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -285,6 +285,20 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req,
std::shared_ptr<Strea
}
}
+ if (UNLIKELY((http_req->header(HttpHeaders::CONTENT_LENGTH).empty() &&
+ !ctx->is_chunked_transfer))) {
+ LOG(WARNING) << "content_length is empty and
transfer-encoding!=chunked, please set "
+ "content_length or transfer-encoding=chunked";
+ return Status::InvalidArgument(
+ "content_length is empty and transfer-encoding!=chunked,
please set content_length "
+ "or transfer-encoding=chunked");
+ } else if (UNLIKELY(!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()
&&
+ ctx->is_chunked_transfer)) {
+ LOG(WARNING) << "please do not set both content_length and
transfer-encoding";
+ return Status::InvalidArgument(
+ "please do not set both content_length and transfer-encoding");
+ }
+
if (!http_req->header(HTTP_TIMEOUT).empty()) {
try {
ctx->timeout_second = std::stoi(http_req->header(HTTP_TIMEOUT));
diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
index e9a93f82f12..310e995b01a 100644
--- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
@@ -1044,5 +1044,75 @@ suite("test_stream_load", "p0") {
} finally {
sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
}
+
+ def sql_result = sql """ select Host, HttpPort from backends() where alive
= true limit 1; """
+
+ log.info(sql_result[0][0].toString())
+ log.info(sql_result[0][1].toString())
+ log.info(sql_result[0].size.toString())
+
+ def beHost=sql_result[0][0]
+ def beHttpPort=sql_result[0][1]
+ log.info("${beHost}".toString())
+ log.info("${beHttpPort}".toString());
+
+ //test be : chunked transfer + Content Length
+ try {
+ sql """ DROP TABLE IF EXISTS ${tableName16} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName16} (
+ `k1` bigint(20) NULL DEFAULT "1",
+ `k2` bigint(20) NULL ,
+ `v1` tinyint(4) NULL,
+ `v2` tinyint(4) NULL,
+ `v3` tinyint(4) NULL,
+ `v4` DATETIME NULL
+ ) ENGINE=OLAP
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ def command = "curl --location-trusted -u
${context.config.feHttpUser}:${context.config.feHttpPassword} -H
column_separator:| -H ${db}:${tableName16} -H Content-Length:0 -H
Transfer-Encoding:chunked -H columns:k1,k2,v1,v2,v3 -T
${context.dataPath}/test_chunked_transfer.csv
http://${beHost}:${beHttpPort}/api/${db}/${tableName16}/_stream_load"
+ log.info("test chunked transfer command: ${command}")
+ def process = command.execute()
+ code = process.waitFor()
+ out = process.text
+ log.info("test chunked transfer result: ${out}".toString())
+ def json = parseJson(out)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertTrue(json.Message.contains("please do not set both content_length
and transfer-encoding"))
+ } finally {
+ sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
+ }
+
+
+ //test be : no chunked transfer + no Content Length
+ try {
+ sql """ DROP TABLE IF EXISTS ${tableName16} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName16} (
+ `k1` bigint(20) NULL DEFAULT "1",
+ `k2` bigint(20) NULL ,
+ `v1` tinyint(4) NULL,
+ `v2` tinyint(4) NULL,
+ `v3` tinyint(4) NULL,
+ `v4` DATETIME NULL
+ ) ENGINE=OLAP
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ def command = "curl --location-trusted -u
${context.config.feHttpUser}:${context.config.feHttpPassword} -H
column_separator:| -H ${db}:${tableName16} -H Content-Length: -H
Transfer-Encoding: -T ${context.dataPath}/test_chunked_transfer.csv
http://${beHost}:${beHttpPort}/api/${db}/${tableName16}/_stream_load"
+ log.info("test chunked transfer command: ${command}")
+ def process = command.execute()
+ code = process.waitFor()
+ out = process.text
+ log.info("test chunked transfer result: ${out}".toString())
+ def json = parseJson(out)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertTrue(json.Message.contains("content_length is empty and
transfer-encoding!=chunked, please set content_length or
transfer-encoding=chunked"))
+ } finally {
+ sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]