This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 607a5d25f18 [feature](streamload) support HTTP request with chunked
transfer (#26520)
607a5d25f18 is described below
commit 607a5d25f183f85bc68b5438f3612d76bdfe959d
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Wed Nov 8 10:07:05 2023 +0800
[feature](streamload) support HTTP request with chunked transfer (#26520)
---
be/src/http/action/stream_load.cpp | 21 ++++++++++++---
be/src/io/fs/stream_load_pipe.cpp | 2 +-
be/src/runtime/stream_load/stream_load_context.h | 1 +
.../load_p0/stream_load/test_chunked_transfer.csv | 2 ++
.../data/load_p0/stream_load/test_stream_load.out | 4 +++
.../load_p0/stream_load/test_stream_load.groovy | 30 ++++++++++++++++++++++
6 files changed, 56 insertions(+), 4 deletions(-)
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index 99a98afbc37..d16d41795a8 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -76,6 +76,9 @@
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_requests_total, MetricUnit::
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms,
MetricUnit::MILLISECONDS);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing,
MetricUnit::REQUESTS);
+static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024;
+static const string CHUNK = "chunked";
+
#ifdef BE_TEST
TStreamLoadPutResult k_stream_load_put_result;
#endif
@@ -292,6 +295,12 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req,
std::shared_ptr<Strea
#endif
}
+ if (!http_req->header(HttpHeaders::TRANSFER_ENCODING).empty()) {
+ if (http_req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) !=
std::string::npos) {
+ ctx->is_chunked_transfer = true;
+ }
+ }
+
if (!http_req->header(HTTP_TIMEOUT).empty()) {
try {
ctx->timeout_second = std::stoi(http_req->header(HTTP_TIMEOUT));
@@ -369,9 +378,15 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
request.__set_header_type(ctx->header_type);
request.__set_loadId(ctx->id.to_thrift());
if (ctx->use_streaming) {
- auto pipe = std::make_shared<io::StreamLoadPipe>(
- io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024
/* min_chunk_size */,
- ctx->body_bytes /* total_length */);
+ std::shared_ptr<io::StreamLoadPipe> pipe;
+ if (ctx->is_chunked_transfer) {
+ pipe = std::make_shared<io::StreamLoadPipe>(
+ io::kMaxPipeBufferedBytes /* max_buffered_bytes */);
+ } else {
+ pipe = std::make_shared<io::StreamLoadPipe>(
+ io::kMaxPipeBufferedBytes /* max_buffered_bytes */,
+ MIN_CHUNK_SIZE /* min_chunk_size */, ctx->body_bytes /*
total_length */);
+ }
request.fileType = TFileType::FILE_STREAM;
ctx->body_sink = pipe;
ctx->pipe = pipe;
diff --git a/be/src/io/fs/stream_load_pipe.cpp
b/be/src/io/fs/stream_load_pipe.cpp
index 25be598e90e..00fa038f98e 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -90,7 +90,7 @@ Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice
result, size_t* byt
return Status::OK();
}
-// If _total_length == -1, this should be a Kafka routine load task,
+// If _total_length == -1, this should be a Kafka routine load task or stream
load with chunked transfer HTTP request,
// just get the next buffer directly from the buffer queue, because one buffer
contains a complete piece of data.
// Otherwise, this should be a stream load task that needs to read the
specified amount of data.
Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data,
size_t* length) {
diff --git a/be/src/runtime/stream_load/stream_load_context.h
b/be/src/runtime/stream_load/stream_load_context.h
index ffbed37fe3a..2b8d271157d 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -161,6 +161,7 @@ public:
// only used to check if we receive whole body
size_t body_bytes = 0;
size_t receive_bytes = 0;
+ bool is_chunked_transfer = false;
int64_t txn_id = default_txn_id;
diff --git a/regression-test/data/load_p0/stream_load/test_chunked_transfer.csv
b/regression-test/data/load_p0/stream_load/test_chunked_transfer.csv
new file mode 100644
index 00000000000..26831f07e76
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_chunked_transfer.csv
@@ -0,0 +1,2 @@
+2|-50|1|44|1
+1|-50|1|2|1
diff --git a/regression-test/data/load_p0/stream_load/test_stream_load.out
b/regression-test/data/load_p0/stream_load/test_stream_load.out
index a1971041f0e..8c88c4f5dc2 100644
--- a/regression-test/data/load_p0/stream_load/test_stream_load.out
+++ b/regression-test/data/load_p0/stream_load/test_stream_load.out
@@ -113,3 +113,7 @@
1 -50 1 2 1 \N
2 -50 1 44 1 \N
+-- !sql_chunked_transfer --
+1 -50 1 2 1 \N
+2 -50 1 44 1 \N
+
diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
index f4ac161f570..8f4801e9cfe 100644
--- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
@@ -1134,5 +1134,35 @@ suite("test_stream_load", "p0") {
} finally {
sql """ DROP TABLE IF EXISTS ${tableName15} FORCE"""
}
+
+ // test chunked transfer
+ def tableName16 = "test_chunked_transfer"
+ try {
+ sql """ DROP TABLE IF EXISTS ${tableName16} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName16} (
+ `k1` bigint(20) NULL DEFAULT "1",
+ `k2` bigint(20) NULL ,
+ `v1` tinyint(4) NULL,
+ `v2` tinyint(4) NULL,
+ `v3` tinyint(4) NULL,
+ `v4` DATETIME NULL
+ ) ENGINE=OLAP
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ def command = "curl --location-trusted -u
${context.config.feHttpUser}:${context.config.feHttpPassword} -H
column_separator:| -H Transfer-Encoding:chunked -H columns:k1,k2,v1,v2,v3 -T
${context.dataPath}/test_chunked_transfer.csv
http://${context.config.feHttpAddress}/api/${db}/${tableName16}/_stream_load"
+ log.info("test chunked transfer command: ${command}")
+ def process = command.execute()
+ code = process.waitFor()
+ out = process.text
+ json2pc = parseJson(out)
+ log.info("test chunked transfer result: ${out}".toString())
+
+ qt_sql_chunked_transfer "select * from ${tableName16} order by k1"
+ } finally {
+ sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]