This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 607a5d25f18 [feature](streamload) support HTTP request with chunked 
transfer (#26520)
607a5d25f18 is described below

commit 607a5d25f183f85bc68b5438f3612d76bdfe959d
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Wed Nov 8 10:07:05 2023 +0800

    [feature](streamload) support HTTP request with chunked transfer (#26520)
---
 be/src/http/action/stream_load.cpp                 | 21 ++++++++++++---
 be/src/io/fs/stream_load_pipe.cpp                  |  2 +-
 be/src/runtime/stream_load/stream_load_context.h   |  1 +
 .../load_p0/stream_load/test_chunked_transfer.csv  |  2 ++
 .../data/load_p0/stream_load/test_stream_load.out  |  4 +++
 .../load_p0/stream_load/test_stream_load.groovy    | 30 ++++++++++++++++++++++
 6 files changed, 56 insertions(+), 4 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 99a98afbc37..d16d41795a8 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -76,6 +76,9 @@ 
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_requests_total, MetricUnit::
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms, 
MetricUnit::MILLISECONDS);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, 
MetricUnit::REQUESTS);
 
+static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024;
+static const string CHUNK = "chunked";
+
 #ifdef BE_TEST
 TStreamLoadPutResult k_stream_load_put_result;
 #endif
@@ -292,6 +295,12 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, 
std::shared_ptr<Strea
 #endif
     }
 
+    if (!http_req->header(HttpHeaders::TRANSFER_ENCODING).empty()) {
+        if (http_req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != 
std::string::npos) {
+            ctx->is_chunked_transfer = true;
+        }
+    }
+
     if (!http_req->header(HTTP_TIMEOUT).empty()) {
         try {
             ctx->timeout_second = std::stoi(http_req->header(HTTP_TIMEOUT));
@@ -369,9 +378,15 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
     request.__set_header_type(ctx->header_type);
     request.__set_loadId(ctx->id.to_thrift());
     if (ctx->use_streaming) {
-        auto pipe = std::make_shared<io::StreamLoadPipe>(
-                io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 
/* min_chunk_size */,
-                ctx->body_bytes /* total_length */);
+        std::shared_ptr<io::StreamLoadPipe> pipe;
+        if (ctx->is_chunked_transfer) {
+            pipe = std::make_shared<io::StreamLoadPipe>(
+                    io::kMaxPipeBufferedBytes /* max_buffered_bytes */);
+        } else {
+            pipe = std::make_shared<io::StreamLoadPipe>(
+                    io::kMaxPipeBufferedBytes /* max_buffered_bytes */,
+                    MIN_CHUNK_SIZE /* min_chunk_size */, ctx->body_bytes /* 
total_length */);
+        }
         request.fileType = TFileType::FILE_STREAM;
         ctx->body_sink = pipe;
         ctx->pipe = pipe;
diff --git a/be/src/io/fs/stream_load_pipe.cpp 
b/be/src/io/fs/stream_load_pipe.cpp
index 25be598e90e..00fa038f98e 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -90,7 +90,7 @@ Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice 
result, size_t* byt
     return Status::OK();
 }
 
-// If _total_length == -1, this should be a Kafka routine load task,
+// If _total_length == -1, this should be a Kafka routine load task or stream 
load with chunked transfer HTTP request,
 // just get the next buffer directly from the buffer queue, because one buffer 
contains a complete piece of data.
 // Otherwise, this should be a stream load task that needs to read the 
specified amount of data.
 Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, 
size_t* length) {
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index ffbed37fe3a..2b8d271157d 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -161,6 +161,7 @@ public:
     // only used to check if we receive whole body
     size_t body_bytes = 0;
     size_t receive_bytes = 0;
+    bool is_chunked_transfer = false;
 
     int64_t txn_id = default_txn_id;
 
diff --git a/regression-test/data/load_p0/stream_load/test_chunked_transfer.csv 
b/regression-test/data/load_p0/stream_load/test_chunked_transfer.csv
new file mode 100644
index 00000000000..26831f07e76
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_chunked_transfer.csv
@@ -0,0 +1,2 @@
+2|-50|1|44|1
+1|-50|1|2|1
diff --git a/regression-test/data/load_p0/stream_load/test_stream_load.out 
b/regression-test/data/load_p0/stream_load/test_stream_load.out
index a1971041f0e..8c88c4f5dc2 100644
--- a/regression-test/data/load_p0/stream_load/test_stream_load.out
+++ b/regression-test/data/load_p0/stream_load/test_stream_load.out
@@ -113,3 +113,7 @@
 1      -50     1       2       1       \N
 2      -50     1       44      1       \N
 
+-- !sql_chunked_transfer --
+1      -50     1       2       1       \N
+2      -50     1       44      1       \N
+
diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
index f4ac161f570..8f4801e9cfe 100644
--- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
@@ -1134,5 +1134,35 @@ suite("test_stream_load", "p0") {
     } finally {
         sql """ DROP TABLE IF EXISTS ${tableName15} FORCE"""
     }
+
+    // test chunked transfer
+    def tableName16 = "test_chunked_transfer"
+    try {
+        sql """ DROP TABLE IF EXISTS ${tableName16} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName16} (
+                `k1` bigint(20) NULL DEFAULT "1",
+                `k2` bigint(20) NULL ,
+                `v1` tinyint(4) NULL,
+                `v2` tinyint(4) NULL,
+                `v3` tinyint(4) NULL,
+                `v4` DATETIME NULL
+            ) ENGINE=OLAP
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        def command = "curl --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword} -H 
column_separator:| -H Transfer-Encoding:chunked -H columns:k1,k2,v1,v2,v3  -T 
${context.dataPath}/test_chunked_transfer.csv 
http://${context.config.feHttpAddress}/api/${db}/${tableName16}/_stream_load";
+        log.info("test chunked transfer command: ${command}")
+        def process = command.execute()
+        code = process.waitFor()
+        out = process.text
+        json2pc = parseJson(out)
+        log.info("test chunked transfer result: ${out}".toString())
+
+        qt_sql_chunked_transfer "select * from ${tableName16} order by k1"
+    } finally {
+        sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
+    }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to