This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d0cd535cb9a [improvement](insert) refactor group commit stream load 
(#25560)
d0cd535cb9a is described below

commit d0cd535cb9a615d837d41dc6c09d02ba4db1565e
Author: meiyi <[email protected]>
AuthorDate: Fri Oct 20 13:27:30 2023 +0800

    [improvement](insert) refactor group commit stream load (#25560)
---
 be/src/common/config.cpp                           |   2 +-
 be/src/common/config.h                             |   2 +-
 be/src/exec/data_sink.cpp                          |  15 ++
 be/src/http/action/http_stream.cpp                 |   6 -
 be/src/http/action/stream_load.cpp                 |   7 -
 .../exec/group_commit_block_sink_operator.h        |  50 ++++
 be/src/pipeline/pipeline_fragment_context.cpp      |   6 +
 be/src/runtime/group_commit_mgr.cpp                | 297 +++++++--------------
 be/src/runtime/group_commit_mgr.h                  |  26 +-
 .../runtime/stream_load/stream_load_executor.cpp   |  12 +-
 be/src/vec/core/future_block.cpp                   |   6 +-
 be/src/vec/core/future_block.h                     |   7 +-
 be/src/vec/sink/group_commit_block_sink.cpp        |  70 ++++-
 be/src/vec/sink/group_commit_block_sink.h          |  18 +-
 .../apache/doris/analysis/NativeInsertStmt.java    |  20 +-
 .../apache/doris/planner/GroupCommitBlockSink.java |  34 +--
 .../org/apache/doris/planner/OlapTableSink.java    |   1 +
 .../apache/doris/planner/StreamLoadPlanner.java    |  23 +-
 .../apache/doris/service/FrontendServiceImpl.java  |  21 +-
 .../GroupCommitTableValuedFunction.java            |   4 +-
 .../java/org/apache/doris/task/StreamLoadTask.java |  11 +
 gensrc/thrift/DataSinks.thrift                     |   2 +
 .../insert_group_commit_into_duplicate.out         |   3 +
 .../insert_group_commit_into_duplicate.groovy      |  78 +++++-
 .../test_group_commit_http_stream.groovy           |  78 +++---
 .../test_group_commit_stream_load.groovy           |  75 +++---
 26 files changed, 495 insertions(+), 379 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 1634ea575f7..58121768723 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1099,7 +1099,7 @@ DEFINE_Int32(group_commit_sync_wal_batch, "10");
 
 // the count of thread to group commit insert
 DEFINE_Int32(group_commit_insert_threads, "10");
-DEFINE_mInt32(group_commit_interval_seconds, "10");
+DEFINE_mInt32(group_commit_interval_ms, "10000");
 
 DEFINE_mInt32(scan_thread_nice_value, "0");
 DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index e17e4b5f670..f8350d87316 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1162,7 +1162,7 @@ DECLARE_Int32(group_commit_sync_wal_batch);
 
 // This config can be set to limit thread number in group commit insert thread 
pool.
 DECLARE_mInt32(group_commit_insert_threads);
-DECLARE_mInt32(group_commit_interval_seconds);
+DECLARE_mInt32(group_commit_interval_ms);
 
 // The configuration item is used to lower the priority of the scanner thread,
 // typically employed to ensure CPU scheduling for write operations.
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index e160e9ebeec..1b9653e8b8a 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -31,6 +31,7 @@
 
 #include "common/config.h"
 #include "vec/sink/async_writer_sink.h"
+#include "vec/sink/group_commit_block_sink.h"
 #include "vec/sink/multi_cast_data_stream_sink.h"
 #include "vec/sink/vdata_stream_sender.h"
 #include "vec/sink/vmemory_scratch_sink.h"
@@ -163,6 +164,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         RETURN_IF_ERROR(status);
         break;
     }
+    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
+        Status status = Status::OK();
+        DCHECK(thrift_sink.__isset.olap_table_sink);
+        sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc, 
output_exprs, &status));
+        RETURN_IF_ERROR(status);
+        break;
+    }
     case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
         return Status::NotSupported("MULTI_CAST_DATA_STREAM_SINK only support 
in pipeline engine");
     }
@@ -319,6 +327,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         RETURN_IF_ERROR(status);
         break;
     }
+    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
+        Status status = Status::OK();
+        DCHECK(thrift_sink.__isset.olap_table_sink);
+        sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc, 
output_exprs, &status));
+        RETURN_IF_ERROR(status);
+        break;
+    }
     default: {
         std::stringstream error_msg;
         std::map<int, const char*>::const_iterator i =
diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index 2a2ddd8ad44..067f8c5d28d 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -324,12 +324,6 @@ Status HttpStreamAction::_process_put(HttpRequest* 
http_req,
     ctx->label = ctx->put_result.params.import_label;
     ctx->put_result.params.__set_wal_id(ctx->wal_id);
 
-    if (ctx->group_commit) {
-        ctx->db_id = ctx->put_result.db_id;
-        ctx->table_id = ctx->put_result.table_id;
-        ctx->schema_version = ctx->put_result.base_schema_version;
-        return _exec_env->group_commit_mgr()->group_commit_stream_load(ctx);
-    }
     return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
 }
 
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index be4d8703236..55541843241 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -601,13 +601,6 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
         return Status::OK();
     }
 
-    if (ctx->group_commit) {
-        ctx->db_id = ctx->put_result.db_id;
-        ctx->table_id = ctx->put_result.table_id;
-        ctx->schema_version = ctx->put_result.base_schema_version;
-        return _exec_env->group_commit_mgr()->group_commit_stream_load(ctx);
-    }
-
     return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
 }
 
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h 
b/be/src/pipeline/exec/group_commit_block_sink_operator.h
new file mode 100644
index 00000000000..0cf36818db1
--- /dev/null
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "operator.h"
+#include "vec/sink/group_commit_block_sink.h"
+
+namespace doris {
+
+namespace pipeline {
+
+class GroupCommitBlockSinkOperatorBuilder final
+        : public DataSinkOperatorBuilder<vectorized::GroupCommitBlockSink> {
+public:
+    GroupCommitBlockSinkOperatorBuilder(int32_t id, DataSink* sink)
+            : DataSinkOperatorBuilder(id, "GroupCommitBlockSinkOperator", 
sink) {}
+
+    OperatorPtr build_operator() override;
+};
+
+class GroupCommitBlockSinkOperator final
+        : public DataSinkOperator<GroupCommitBlockSinkOperatorBuilder> {
+public:
+    GroupCommitBlockSinkOperator(OperatorBuilderBase* operator_builder, 
DataSink* sink)
+            : DataSinkOperator(operator_builder, sink) {}
+
+    bool can_write() override { return true; } // TODO: need use mem_limit
+};
+
+OperatorPtr GroupCommitBlockSinkOperatorBuilder::build_operator() {
+    return std::make_shared<GroupCommitBlockSinkOperator>(this, _sink);
+}
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 22a533c233e..6ba3ba941aa 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -57,6 +57,7 @@
 #include "pipeline/exec/empty_source_operator.h"
 #include "pipeline/exec/exchange_sink_operator.h"
 #include "pipeline/exec/exchange_source_operator.h"
+#include "pipeline/exec/group_commit_block_sink_operator.h"
 #include "pipeline/exec/hashjoin_build_sink.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
 #include "pipeline/exec/multi_cast_data_stream_sink.h"
@@ -770,6 +771,11 @@ Status PipelineFragmentContext::_create_sink(int 
sender_id, const TDataSink& thr
         }
         break;
     }
+    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
+        sink_ = 
std::make_shared<GroupCommitBlockSinkOperatorBuilder>(next_operator_builder_id(),
+                                                                      
_sink.get());
+        break;
+    }
     case TDataSinkType::MYSQL_TABLE_SINK:
     case TDataSinkType::JDBC_TABLE_SINK:
     case TDataSinkType::ODBC_TABLE_SINK: {
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 9c08ccaf8d0..3c919158874 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -48,11 +48,6 @@ Status 
LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock> block)
     if (block->rows() > 0) {
         _block_queue.push_back(block);
     }
-    if (block->is_eos()) {
-        _load_ids.erase(block->get_load_id());
-    } else if (block->is_first()) {
-        _load_ids.emplace(block->get_load_id());
-    }
     _cv->notify_one();
     return Status::OK();
 }
@@ -62,31 +57,31 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, 
bool* find_block, boo
     *eos = false;
     std::unique_lock l(*_mutex);
     if (!need_commit) {
-        auto left_seconds = config::group_commit_interval_seconds -
-                            std::chrono::duration_cast<std::chrono::seconds>(
-                                    std::chrono::steady_clock::now() - 
_start_time)
-                                    .count();
-        if (left_seconds <= 0) {
+        auto left_milliseconds = config::group_commit_interval_ms -
+                                 
std::chrono::duration_cast<std::chrono::milliseconds>(
+                                         std::chrono::steady_clock::now() - 
_start_time)
+                                         .count();
+        if (left_milliseconds <= 0) {
             need_commit = true;
         }
     }
     while (_status.ok() && _block_queue.empty() &&
            (!need_commit || (need_commit && !_load_ids.empty()))) {
-        auto left_seconds = config::group_commit_interval_seconds;
+        auto left_milliseconds = config::group_commit_interval_ms;
         if (!need_commit) {
-            left_seconds = config::group_commit_interval_seconds -
-                           std::chrono::duration_cast<std::chrono::seconds>(
-                                   std::chrono::steady_clock::now() - 
_start_time)
-                                   .count();
-            if (left_seconds <= 0) {
+            left_milliseconds = config::group_commit_interval_ms -
+                                
std::chrono::duration_cast<std::chrono::milliseconds>(
+                                        std::chrono::steady_clock::now() - 
_start_time)
+                                        .count();
+            if (left_milliseconds <= 0) {
                 need_commit = true;
                 break;
             }
         }
 #if !defined(USE_BTHREAD_SCANNER)
-        _cv->wait_for(l, std::chrono::seconds(left_seconds));
+        _cv->wait_for(l, std::chrono::milliseconds(left_milliseconds));
 #else
-        _cv->wait_for(l, left_seconds * 1000000);
+        _cv->wait_for(l, left_milliseconds * 1000);
 #endif
     }
     if (!_block_queue.empty()) {
@@ -96,12 +91,10 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, 
bool* find_block, boo
         *find_block = true;
         _block_queue.pop_front();
     }
-    if (_block_queue.empty()) {
-        if (need_commit && _load_ids.empty()) {
-            *eos = true;
-        } else {
-            *eos = false;
-        }
+    if (_block_queue.empty() && need_commit && _load_ids.empty()) {
+        *eos = true;
+    } else {
+        *eos = false;
     }
     return Status::OK();
 }
@@ -114,6 +107,16 @@ void LoadBlockQueue::remove_load_id(const UniqueId& 
load_id) {
     }
 }
 
+Status LoadBlockQueue::add_load_id(const UniqueId& load_id) {
+    std::unique_lock l(*_mutex);
+    if (need_commit) {
+        return Status::InternalError("block queue is set need commit, id=" +
+                                     load_instance_id.to_string());
+    }
+    _load_ids.emplace(load_id);
+    return Status::OK();
+}
+
 void LoadBlockQueue::cancel(const Status& st) {
     DCHECK(!st.ok());
     std::unique_lock l(*_mutex);
@@ -133,59 +136,62 @@ Status GroupCommitTable::get_first_block_load_queue(
         int64_t table_id, std::shared_ptr<vectorized::FutureBlock> block,
         std::shared_ptr<LoadBlockQueue>& load_block_queue) {
     DCHECK(table_id == _table_id);
-    DCHECK(block->is_first() == true);
+    auto base_schema_version = block->get_schema_version();
     {
         std::unique_lock l(_lock);
-        for (auto it = _load_block_queues.begin(); it != 
_load_block_queues.end(); ++it) {
-            // TODO if block schema version is less than fragment schema 
version, return error
-            if (!it->second->need_commit &&
-                it->second->schema_version == block->get_schema_version()) {
-                if (block->get_schema_version() == it->second->schema_version) 
{
-                    load_block_queue = it->second;
-                    break;
-                } else if (block->get_schema_version() < 
it->second->schema_version) {
-                    return Status::DataQualityError("schema version not 
match");
-                }
-            }
-        }
-    }
-    if (load_block_queue == nullptr) {
-        Status st = Status::OK();
-        for (int i = 0; i < 3; ++i) {
-            std::unique_lock l(_request_fragment_mutex);
-            // check if there is a re-usefully fragment
-            {
-                std::unique_lock l1(_lock);
-                for (auto it = _load_block_queues.begin(); it != 
_load_block_queues.end(); ++it) {
-                    // TODO if block schema version is less than fragment 
schema version, return error
-                    if (!it->second->need_commit) {
-                        if (block->get_schema_version() == 
it->second->schema_version) {
+        for (int i = 0; i < 3; i++) {
+            bool is_schema_version_match = true;
+            for (auto it = _load_block_queues.begin(); it != 
_load_block_queues.end(); ++it) {
+                if (!it->second->need_commit) {
+                    if (base_schema_version == it->second->schema_version) {
+                        if 
(it->second->add_load_id(block->get_load_id()).ok()) {
                             load_block_queue = it->second;
-                            break;
-                        } else if (block->get_schema_version() < 
it->second->schema_version) {
-                            return Status::DataQualityError("schema version 
not match");
+                            return Status::OK();
                         }
+                    } else if (base_schema_version < 
it->second->schema_version) {
+                        is_schema_version_match = false;
                     }
                 }
             }
-            if (load_block_queue == nullptr) {
-                st = _create_group_commit_load(table_id, load_block_queue);
-                if (LIKELY(st.ok())) {
-                    break;
+            if (!is_schema_version_match) {
+                return Status::DataQualityError("schema version not match");
+            }
+            if (!_need_plan_fragment) {
+                _need_plan_fragment = true;
+                RETURN_IF_ERROR(_thread_pool->submit_func([&] {
+                    [[maybe_unused]] auto st = 
_create_group_commit_load(load_block_queue);
+                }));
+            }
+#if !defined(USE_BTHREAD_SCANNER)
+            _cv.wait_for(l, std::chrono::seconds(4));
+#else
+            _cv.wait_for(l, 4 * 1000000);
+#endif
+            if (load_block_queue != nullptr) {
+                if (load_block_queue->schema_version == base_schema_version) {
+                    if 
(load_block_queue->add_load_id(block->get_load_id()).ok()) {
+                        return Status::OK();
+                    }
+                } else if (base_schema_version < 
load_block_queue->schema_version) {
+                    return Status::DataQualityError("schema version not 
match");
                 }
+                load_block_queue.reset();
             }
         }
-        RETURN_IF_ERROR(st);
-        if (load_block_queue->schema_version != block->get_schema_version()) {
-            // TODO check this is the first block
-            return Status::DataQualityError("schema version not match");
-        }
     }
-    return Status::OK();
+    return Status::InternalError("can not get a block queue");
 }
 
 Status GroupCommitTable::_create_group_commit_load(
-        int64_t table_id, std::shared_ptr<LoadBlockQueue>& load_block_queue) {
+        std::shared_ptr<LoadBlockQueue>& load_block_queue) {
+    Status st = Status::OK();
+    std::unique_ptr<int, std::function<void(int*)>> 
remove_pipe_func((int*)0x01, [&](int*) {
+        if (!st.ok()) {
+            std::unique_lock l(_lock);
+            _need_plan_fragment = false;
+            _cv.notify_all();
+        }
+    });
     TStreamLoadPutRequest request;
     UniqueId load_id = UniqueId::gen_uid();
     TUniqueId tload_id;
@@ -194,8 +200,8 @@ Status GroupCommitTable::_create_group_commit_load(
     std::regex reg("-");
     std::string label = "group_commit_" + 
std::regex_replace(load_id.to_string(), reg, "_");
     std::stringstream ss;
-    ss << "insert into table_id(" << table_id << ") WITH LABEL " << label
-       << " select * from group_commit(\"table_id\"=\"" << table_id << "\")";
+    ss << "insert into table_id(" << _table_id << ") WITH LABEL " << label
+       << " select * from group_commit(\"table_id\"=\"" << _table_id << "\")";
     request.__set_load_sql(ss.str());
     request.__set_loadId(tload_id);
     request.__set_label(label);
@@ -209,13 +215,14 @@ Status GroupCommitTable::_create_group_commit_load(
     }
     TStreamLoadPutResult result;
     TNetworkAddress master_addr = _exec_env->master_info()->network_address;
-    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
+    st = ThriftRpcHelper::rpc<FrontendServiceClient>(
             master_addr.hostname, master_addr.port,
             [&result, &request](FrontendServiceConnection& client) {
                 client->streamLoadPut(result, request);
             },
-            10000L));
-    Status st = Status::create(result.status);
+            10000L);
+    RETURN_IF_ERROR(st);
+    st = Status::create(result.status);
     if (!st.ok()) {
         LOG(WARNING) << "create group commit load error, st=" << 
st.to_string();
     }
@@ -236,7 +243,7 @@ Status GroupCommitTable::_create_group_commit_load(
         DCHECK(pipeline_params.local_params.size() == 1);
         instance_id = pipeline_params.local_params[0].fragment_instance_id;
     }
-    VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table=" << 
table_id
+    VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table=" << 
_table_id
                << ", schema version=" << schema_version << ", label=" << label
                << ", txn_id=" << txn_id << ", instance_id=" << 
print_id(instance_id)
                << ", is_pipeline=" << is_pipeline;
@@ -245,11 +252,13 @@ Status GroupCommitTable::_create_group_commit_load(
                 std::make_shared<LoadBlockQueue>(instance_id, label, txn_id, 
schema_version);
         std::unique_lock l(_lock);
         _load_block_queues.emplace(instance_id, load_block_queue);
+        _need_plan_fragment = false;
+        _cv.notify_all();
     }
-    params.__set_import_label(label);
-    st = _exec_plan_fragment(_db_id, table_id, label, txn_id, is_pipeline, 
params, pipeline_params);
+    st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, 
params,
+                             pipeline_params);
     if (!st.ok()) {
-        static_cast<void>(_finish_group_commit_load(_db_id, table_id, label, 
txn_id, instance_id,
+        static_cast<void>(_finish_group_commit_load(_db_id, _table_id, label, 
txn_id, instance_id,
                                                     st, true, nullptr));
     }
     return st;
@@ -346,6 +355,7 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
     if (state && !(state->get_error_log_file_path().empty())) {
         ss << ", error_url=" << state->get_error_log_file_path();
     }
+    ss << ", rows=" << state->num_rows_load_success();
     LOG(INFO) << ss.str();
     return st;
 }
@@ -384,6 +394,10 @@ GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : 
_exec_env(exec_env) {
                               
.set_min_threads(config::group_commit_insert_threads)
                               
.set_max_threads(config::group_commit_insert_threads)
                               .build(&_insert_into_thread_pool));
+    static_cast<void>(ThreadPoolBuilder("GroupCommitThreadPool")
+                              .set_min_threads(1)
+                              
.set_max_threads(config::group_commit_insert_threads)
+                              .build(&_thread_pool));
 }
 
 GroupCommitMgr::~GroupCommitMgr() {
@@ -392,6 +406,7 @@ GroupCommitMgr::~GroupCommitMgr() {
 
 void GroupCommitMgr::stop() {
     _insert_into_thread_pool->shutdown();
+    _thread_pool->shutdown();
     LOG(INFO) << "GroupCommitMgr is stopped";
 }
 
@@ -456,17 +471,16 @@ Status GroupCommitMgr::group_commit_insert(int64_t 
table_id, const TPlan& plan,
         std::unique_ptr<doris::vectorized::Block> _block =
                 doris::vectorized::Block::create_unique();
         bool eof = false;
-        bool first = true;
         while (!eof) {
             // TODO what to do if read one block error
             RETURN_IF_ERROR(file_scan_node.get_next(runtime_state.get(), 
_block.get(), &eof));
             std::shared_ptr<doris::vectorized::FutureBlock> future_block =
                     std::make_shared<doris::vectorized::FutureBlock>();
             future_block->swap(*(_block.get()));
-            future_block->set_info(request->base_schema_version(), load_id, 
first, eof);
+            future_block->set_info(request->base_schema_version(), load_id);
             if (load_block_queue == nullptr) {
-                RETURN_IF_ERROR(_get_first_block_load_queue(request->db_id(), 
table_id,
-                                                            future_block, 
load_block_queue));
+                RETURN_IF_ERROR(get_first_block_load_queue(request->db_id(), 
table_id, future_block,
+                                                           load_block_queue));
                 response->set_label(load_block_queue->label);
                 response->set_txn_id(load_block_queue->txn_id);
             }
@@ -475,7 +489,6 @@ Status GroupCommitMgr::group_commit_insert(int64_t 
table_id, const TPlan& plan,
                 future_blocks.emplace_back(future_block);
             }
             RETURN_IF_ERROR(load_block_queue->add_block(future_block));
-            first = false;
         }
         if (!runtime_state->get_error_log_file_path().empty()) {
             LOG(INFO) << "id=" << print_id(load_id)
@@ -515,139 +528,15 @@ Status 
GroupCommitMgr::_append_row(std::shared_ptr<io::StreamLoadPipe> pipe,
     return Status::OK();
 }
 
-Status 
GroupCommitMgr::group_commit_stream_load(std::shared_ptr<StreamLoadContext> 
ctx) {
-    return _insert_into_thread_pool->submit_func([ctx, this] {
-        Status st = _group_commit_stream_load(ctx);
-        if (!st.ok()) {
-            ctx->promise.set_value(st);
-        }
-    });
-}
-
-Status 
GroupCommitMgr::_group_commit_stream_load(std::shared_ptr<StreamLoadContext> 
ctx) {
-    auto& fragment_params = ctx->put_result.params;
-    auto& tdesc_tbl = fragment_params.desc_tbl;
-    DCHECK(fragment_params.params.per_node_scan_ranges.size() == 1);
-    DCHECK(fragment_params.params.per_node_scan_ranges.begin()->second.size() 
== 1);
-    auto& tscan_range_params = 
fragment_params.params.per_node_scan_ranges.begin()->second.at(0);
-    auto& nodes = fragment_params.fragment.plan.nodes;
-    DCHECK(nodes.size() > 0);
-    auto& plan_node = nodes.at(0);
-
-    std::vector<std::shared_ptr<doris::vectorized::FutureBlock>> future_blocks;
-    {
-        std::shared_ptr<LoadBlockQueue> load_block_queue;
-        // 1. FileScanNode consumes data from the pipe.
-        std::unique_ptr<RuntimeState> runtime_state = 
RuntimeState::create_unique();
-        TUniqueId load_id;
-        load_id.hi = ctx->id.hi;
-        load_id.lo = ctx->id.lo;
-        TQueryOptions query_options;
-        query_options.query_type = TQueryType::LOAD;
-        TQueryGlobals query_globals;
-        static_cast<void>(runtime_state->init(load_id, query_options, 
query_globals, _exec_env));
-        
runtime_state->set_query_mem_tracker(std::make_shared<MemTrackerLimiter>(
-                MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", 
ctx->id.to_string()), -1));
-        DescriptorTbl* desc_tbl = nullptr;
-        RETURN_IF_ERROR(DescriptorTbl::create(runtime_state->obj_pool(), 
tdesc_tbl, &desc_tbl));
-        runtime_state->set_desc_tbl(desc_tbl);
-        auto file_scan_node =
-                vectorized::NewFileScanNode(runtime_state->obj_pool(), 
plan_node, *desc_tbl);
-        Status status = Status::OK();
-        auto sink = stream_load::GroupCommitBlockSink(
-                runtime_state->obj_pool(), file_scan_node.row_desc(),
-                fragment_params.fragment.output_exprs, &status);
-        std::unique_ptr<int, std::function<void(int*)>> 
close_scan_node_func((int*)0x01, [&](int*) {
-            if (load_block_queue != nullptr) {
-                load_block_queue->remove_load_id(load_id);
-            }
-            static_cast<void>(file_scan_node.close(runtime_state.get()));
-            static_cast<void>(sink.close(runtime_state.get(), status));
-        });
-        RETURN_IF_ERROR(file_scan_node.init(plan_node, runtime_state.get()));
-        RETURN_IF_ERROR(file_scan_node.prepare(runtime_state.get()));
-        std::vector<TScanRangeParams> params_vector;
-        params_vector.emplace_back(tscan_range_params);
-        file_scan_node.set_scan_ranges(params_vector);
-        RETURN_IF_ERROR(file_scan_node.open(runtime_state.get()));
-
-        RETURN_IF_ERROR(status);
-        RETURN_IF_ERROR(sink.init(fragment_params.fragment.output_sink));
-        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(sink.prepare(runtime_state.get()));
-        RETURN_IF_ERROR(sink.open(runtime_state.get()));
-
-        // 2. Put the block into block queue.
-        std::unique_ptr<doris::vectorized::Block> _block =
-                doris::vectorized::Block::create_unique();
-        bool first = true;
-        bool eof = false;
-        while (!eof) {
-            // TODO what to do if scan one block error
-            RETURN_IF_ERROR(file_scan_node.get_next(runtime_state.get(), 
_block.get(), &eof));
-            RETURN_IF_ERROR(sink.send(runtime_state.get(), _block.get()));
-            std::shared_ptr<doris::vectorized::FutureBlock> future_block =
-                    std::make_shared<doris::vectorized::FutureBlock>();
-            future_block->swap(*(_block.get()));
-            future_block->set_info(ctx->schema_version, load_id, first, eof);
-            // TODO what to do if add one block error
-            if (load_block_queue == nullptr) {
-                RETURN_IF_ERROR(_get_first_block_load_queue(ctx->db_id, 
ctx->table_id, future_block,
-                                                            load_block_queue));
-                ctx->label = load_block_queue->label;
-                ctx->txn_id = load_block_queue->txn_id;
-            }
-            if (future_block->rows() > 0) {
-                future_blocks.emplace_back(future_block);
-            }
-            RETURN_IF_ERROR(load_block_queue->add_block(future_block));
-            first = false;
-        }
-        ctx->number_unselected_rows = 
runtime_state->num_rows_load_unselected();
-        ctx->number_filtered_rows = runtime_state->num_rows_load_filtered();
-        ctx->error_url = runtime_state->get_error_log_file_path();
-        if (!runtime_state->get_error_log_file_path().empty()) {
-            LOG(INFO) << "id=" << print_id(load_id)
-                      << ", url=" << runtime_state->get_error_log_file_path()
-                      << ", load rows=" << runtime_state->num_rows_load_total()
-                      << ", filter rows=" << 
runtime_state->num_rows_load_filtered()
-                      << ", unselect rows=" << 
runtime_state->num_rows_load_unselected()
-                      << ", success rows=" << 
runtime_state->num_rows_load_success();
-        }
-    }
-
-    int64_t total_rows = 0;
-    int64_t loaded_rows = 0;
-    // 3. wait to wal
-    for (const auto& future_block : future_blocks) {
-        std::unique_lock<doris::Mutex> l(*(future_block->lock));
-        if (!future_block->is_handled()) {
-            future_block->cv->wait(l);
-        }
-        // future_block->get_status()
-        total_rows += future_block->get_total_rows();
-        loaded_rows += future_block->get_loaded_rows();
-    }
-    ctx->number_total_rows = total_rows + ctx->number_unselected_rows + 
ctx->number_filtered_rows;
-    ctx->number_loaded_rows = loaded_rows;
-    ctx->number_filtered_rows += total_rows - ctx->number_loaded_rows;
-    ctx->promise.set_value(Status::OK());
-    VLOG_DEBUG << "finish read all block of pipe=" << ctx->id.to_string()
-               << ", total rows=" << ctx->number_total_rows
-               << ", loaded rows=" << ctx->number_loaded_rows
-               << ", filtered rows=" << ctx->number_filtered_rows
-               << ", unselected rows=" << ctx->number_unselected_rows;
-    return Status::OK();
-}
-
-Status GroupCommitMgr::_get_first_block_load_queue(
+Status GroupCommitMgr::get_first_block_load_queue(
         int64_t db_id, int64_t table_id, 
std::shared_ptr<vectorized::FutureBlock> block,
         std::shared_ptr<LoadBlockQueue>& load_block_queue) {
     std::shared_ptr<GroupCommitTable> group_commit_table;
     {
         std::lock_guard wlock(_lock);
         if (_table_map.find(table_id) == _table_map.end()) {
-            _table_map.emplace(table_id,
-                               std::make_shared<GroupCommitTable>(_exec_env, 
db_id, table_id));
+            _table_map.emplace(table_id, std::make_shared<GroupCommitTable>(
+                                                 _exec_env, 
_thread_pool.get(), db_id, table_id));
         }
         group_commit_table = _table_map[table_id];
     }
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index 01a0905c404..aa8d05534ca 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -53,6 +53,7 @@ public:
 
     Status add_block(std::shared_ptr<vectorized::FutureBlock> block);
     Status get_block(vectorized::Block* block, bool* find_block, bool* eos);
+    Status add_load_id(const UniqueId& load_id);
     void remove_load_id(const UniqueId& load_id);
     void cancel(const Status& st);
 
@@ -76,8 +77,9 @@ private:
 
 class GroupCommitTable {
 public:
-    GroupCommitTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id)
-            : _exec_env(exec_env), _db_id(db_id), _table_id(table_id) {};
+    GroupCommitTable(ExecEnv* exec_env, doris::ThreadPool* thread_pool, 
int64_t db_id,
+                     int64_t table_id)
+            : _exec_env(exec_env), _thread_pool(thread_pool), _db_id(db_id), 
_table_id(table_id) {};
     Status get_first_block_load_queue(int64_t table_id,
                                       std::shared_ptr<vectorized::FutureBlock> 
block,
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
@@ -85,8 +87,7 @@ public:
                                 std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
 
 private:
-    Status _create_group_commit_load(int64_t table_id,
-                                     std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
+    Status _create_group_commit_load(std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
     Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const 
std::string& label,
                                int64_t txn_id, bool is_pipeline,
                                const TExecPlanFragmentParams& params,
@@ -96,13 +97,14 @@ private:
                                      bool prepare_failed, RuntimeState* state);
 
     ExecEnv* _exec_env;
+    ThreadPool* _thread_pool;
     int64_t _db_id;
     int64_t _table_id;
     doris::Mutex _lock;
+    doris::ConditionVariable _cv;
     // fragment_instance_id to load_block_queue
     std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> 
_load_block_queues;
-
-    doris::Mutex _request_fragment_mutex;
+    bool _need_plan_fragment = false;
 };
 
 class GroupCommitMgr {
@@ -119,22 +121,17 @@ public:
                                const PGroupCommitInsertRequest* request,
                                PGroupCommitInsertResponse* response);
 
-    // stream load
-    Status group_commit_stream_load(std::shared_ptr<StreamLoadContext> ctx);
-
     // used when init group_commit_scan_node
     Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
                                 std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
+    Status get_first_block_load_queue(int64_t db_id, int64_t table_id,
+                                      std::shared_ptr<vectorized::FutureBlock> 
block,
+                                      std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
 
 private:
     // used by insert into
     Status _append_row(std::shared_ptr<io::StreamLoadPipe> pipe,
                        const PGroupCommitInsertRequest* request);
-    // used by stream load
-    Status _group_commit_stream_load(std::shared_ptr<StreamLoadContext> ctx);
-    Status _get_first_block_load_queue(int64_t db_id, int64_t table_id,
-                                       
std::shared_ptr<vectorized::FutureBlock> block,
-                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
 
     ExecEnv* _exec_env;
 
@@ -144,6 +141,7 @@ private:
 
     // thread pool to handle insert into: append data to pipe
     std::unique_ptr<doris::ThreadPool> _insert_into_thread_pool;
+    std::unique_ptr<doris::ThreadPool> _thread_pool;
 };
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index b85d72b3b2a..32e4d76dc7c 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -74,6 +74,10 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
     if (ctx->put_result.__isset.params) {
         st = _exec_env->fragment_mgr()->exec_plan_fragment(
                 ctx->put_result.params, [ctx, this](RuntimeState* state, 
Status* status) {
+                    if (ctx->group_commit) {
+                        ctx->label = state->import_label();
+                        ctx->txn_id = state->wal_id();
+                    }
                     ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
                     ctx->commit_infos = 
std::move(state->tablet_commit_infos());
                     if (status->ok()) {
@@ -84,7 +88,7 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
 
                         int64_t num_selected_rows =
                                 ctx->number_total_rows - 
ctx->number_unselected_rows;
-                        if (num_selected_rows > 0 &&
+                        if (!ctx->group_commit && num_selected_rows > 0 &&
                             (double)ctx->number_filtered_rows / 
num_selected_rows >
                                     ctx->max_filter_ratio) {
                             // NOTE: Do not modify the error message here, for 
historical reasons,
@@ -147,6 +151,10 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
     } else {
         st = _exec_env->fragment_mgr()->exec_plan_fragment(
                 ctx->put_result.pipeline_params, [ctx, this](RuntimeState* 
state, Status* status) {
+                    if (ctx->group_commit) {
+                        ctx->label = state->import_label();
+                        ctx->txn_id = state->wal_id();
+                    }
                     ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
                     ctx->commit_infos = 
std::move(state->tablet_commit_infos());
                     if (status->ok()) {
@@ -157,7 +165,7 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
 
                         int64_t num_selected_rows =
                                 ctx->number_total_rows - 
ctx->number_unselected_rows;
-                        if (num_selected_rows > 0 &&
+                        if (!ctx->group_commit && num_selected_rows > 0 &&
                             (double)ctx->number_filtered_rows / 
num_selected_rows >
                                     ctx->max_filter_ratio) {
                             // NOTE: Do not modify the error message here, for 
historical reasons,
diff --git a/be/src/vec/core/future_block.cpp b/be/src/vec/core/future_block.cpp
index 3f3f59c3446..19cb09163a4 100644
--- a/be/src/vec/core/future_block.cpp
+++ b/be/src/vec/core/future_block.cpp
@@ -21,11 +21,9 @@
 
 namespace doris::vectorized {
 
-void FutureBlock::set_info(int64_t schema_version, const TUniqueId& load_id, 
bool first, bool eos) {
+void FutureBlock::set_info(int64_t schema_version, const TUniqueId& load_id) {
     this->_schema_version = schema_version;
     this->_load_id = load_id;
-    this->_first = first;
-    this->_eos = eos;
 }
 
 void FutureBlock::set_result(Status status, int64_t total_rows, int64_t 
loaded_rows) {
@@ -35,7 +33,7 @@ void FutureBlock::set_result(Status status, int64_t 
total_rows, int64_t loaded_r
 
 void FutureBlock::swap_future_block(std::shared_ptr<FutureBlock> other) {
     Block::swap(*other.get());
-    set_info(other->_schema_version, other->_load_id, other->_first, 
other->_eos);
+    set_info(other->_schema_version, other->_load_id);
     lock = other->lock;
     cv = other->cv;
     _result = other->_result;
diff --git a/be/src/vec/core/future_block.h b/be/src/vec/core/future_block.h
index ca63fa3799e..ee943b3f79c 100644
--- a/be/src/vec/core/future_block.h
+++ b/be/src/vec/core/future_block.h
@@ -30,12 +30,9 @@ class FutureBlock : public Block {
 public:
     FutureBlock() : Block() {};
     void swap_future_block(std::shared_ptr<FutureBlock> other);
-    void set_info(int64_t block_schema_version, const TUniqueId& load_id, bool 
first,
-                  bool block_eos);
+    void set_info(int64_t block_schema_version, const TUniqueId& load_id);
     int64_t get_schema_version() { return _schema_version; }
     TUniqueId get_load_id() { return _load_id; }
-    bool is_first() { return _first; }
-    bool is_eos() { return _eos; }
 
     // hold lock before call this function
     void set_result(Status status, int64_t total_rows = 0, int64_t loaded_rows 
= 0);
@@ -50,8 +47,6 @@ public:
 private:
     int64_t _schema_version;
     TUniqueId _load_id;
-    bool _first = false;
-    bool _eos = false;
 
     std::shared_ptr<std::tuple<bool, Status, int64_t, int64_t>> _result =
             std::make_shared<std::tuple<bool, Status, int64_t, 
int64_t>>(false, Status::OK(), 0, 0);
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp 
b/be/src/vec/sink/group_commit_block_sink.cpp
index 08f6d87ade7..d7df3a2e698 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -26,7 +26,7 @@
 
 namespace doris {
 
-namespace stream_load {
+namespace vectorized {
 
 GroupCommitBlockSink::GroupCommitBlockSink(ObjectPool* pool, const 
RowDescriptor& row_desc,
                                            const std::vector<TExpr>& texprs, 
Status* status)
@@ -44,6 +44,9 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) {
     _tuple_desc_id = table_sink.tuple_id;
     _schema.reset(new OlapTableSchemaParam());
     RETURN_IF_ERROR(_schema->init(table_sink.schema));
+    _db_id = table_sink.db_id;
+    _table_id = table_sink.table_id;
+    _base_schema_version = table_sink.base_schema_version;
     return Status::OK();
 }
 
@@ -77,6 +80,31 @@ Status GroupCommitBlockSink::open(RuntimeState* state) {
     return vectorized::VExpr::open(_output_vexpr_ctxs, state);
 }
 
+Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) {
+    if (_load_block_queue) {
+        _load_block_queue->remove_load_id(_load_id);
+    }
+    RETURN_IF_ERROR(DataSink::close(state, close_status));
+    RETURN_IF_ERROR(close_status);
+    // wait to wal
+    int64_t total_rows = 0;
+    int64_t loaded_rows = 0;
+    for (const auto& future_block : _future_blocks) {
+        std::unique_lock<doris::Mutex> l(*(future_block->lock));
+        if (!future_block->is_handled()) {
+            future_block->cv->wait(l);
+        }
+        // future_block->get_status()
+        loaded_rows += future_block->get_loaded_rows();
+        total_rows += future_block->get_total_rows();
+    }
+    state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() 
+ total_rows -
+                                         loaded_rows);
+    state->set_num_rows_load_total(loaded_rows + 
state->num_rows_load_unselected() +
+                                   state->num_rows_load_filtered());
+    return Status::OK();
+}
+
 Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block* 
input_block, bool eos) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     Status status = Status::OK();
@@ -97,9 +125,43 @@ Status GroupCommitBlockSink::send(RuntimeState* state, 
vectorized::Block* input_
     bool has_filtered_rows = false;
     RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
             state, input_block, block, _output_vexpr_ctxs, rows, 
has_filtered_rows));
-    block->swap(*input_block);
-    return Status::OK();
+    // add block into block queue
+    return _add_block(state, block);
+}
+
+Status GroupCommitBlockSink::_add_block(RuntimeState* state,
+                                        std::shared_ptr<vectorized::Block> 
block) {
+    if (block->rows() == 0) {
+        return Status::OK();
+    }
+    // add block to queue
+    auto _cur_mutable_block = 
vectorized::MutableBlock::create_unique(block->clone_empty());
+    {
+        vectorized::IColumn::Selector selector;
+        for (auto i = 0; i < block->rows(); i++) {
+            selector.emplace_back(i);
+        }
+        block->append_to_block_by_selector(_cur_mutable_block.get(), selector);
+    }
+    std::shared_ptr<vectorized::Block> output_block =
+            
std::make_shared<vectorized::Block>(_cur_mutable_block->to_block());
+
+    std::shared_ptr<doris::vectorized::FutureBlock> future_block =
+            std::make_shared<doris::vectorized::FutureBlock>();
+    future_block->swap(*(output_block.get()));
+    TUniqueId load_id;
+    load_id.__set_hi(load_id.hi);
+    load_id.__set_lo(load_id.lo);
+    future_block->set_info(_base_schema_version, load_id);
+    if (_load_block_queue == nullptr) {
+        
RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
+                _db_id, _table_id, future_block, _load_block_queue));
+        state->set_import_label(_load_block_queue->label);
+        state->set_wal_id(_load_block_queue->txn_id);
+    }
+    _future_blocks.emplace_back(future_block);
+    return _load_block_queue->add_block(future_block);
 }
 
-} // namespace stream_load
+} // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/sink/group_commit_block_sink.h 
b/be/src/vec/sink/group_commit_block_sink.h
index a309413f5ad..ff798ffb000 100644
--- a/be/src/vec/sink/group_commit_block_sink.h
+++ b/be/src/vec/sink/group_commit_block_sink.h
@@ -24,8 +24,11 @@ namespace doris {
 
 class OlapTableSchemaParam;
 class MemTracker;
+class LoadBlockQueue;
 
-namespace stream_load {
+namespace vectorized {
+
+class FutureBlock;
 
 class GroupCommitBlockSink : public DataSink {
 public:
@@ -42,7 +45,11 @@ public:
 
     Status send(RuntimeState* state, vectorized::Block* block, bool eos = 
false) override;
 
+    Status close(RuntimeState* state, Status close_status) override;
+
 private:
+    Status _add_block(RuntimeState* state, std::shared_ptr<vectorized::Block> 
block);
+
     vectorized::VExprContextSPtrs _output_vexpr_ctxs;
 
     int _tuple_desc_id = -1;
@@ -53,7 +60,14 @@ private:
     // this is tuple descriptor of destination OLAP table
     TupleDescriptor* _output_tuple_desc = nullptr;
     std::unique_ptr<vectorized::OlapTableBlockConvertor> _block_convertor;
+
+    int64_t _db_id;
+    int64_t _table_id;
+    int64_t _base_schema_version = 0;
+    UniqueId _load_id;
+    std::shared_ptr<LoadBlockQueue> _load_block_queue;
+    std::vector<std::shared_ptr<vectorized::FutureBlock>> _future_blocks;
 };
 
-} // namespace stream_load
+} // namespace vectorized
 } // namespace doris
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 637462931b6..dad7dc46015 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -48,6 +48,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.DataPartition;
 import org.apache.doris.planner.DataSink;
 import org.apache.doris.planner.ExportSink;
+import org.apache.doris.planner.GroupCommitBlockSink;
 import org.apache.doris.planner.GroupCommitOlapTableSink;
 import org.apache.doris.planner.OlapTableSink;
 import org.apache.doris.planner.StreamLoadPlanner;
@@ -167,6 +168,7 @@ public class NativeInsertStmt extends InsertStmt {
     private long tableId = -1;
     // true if be generates an insert from group commit tvf stmt and executes 
to load data
     public boolean isGroupCommitTvf = false;
+    public boolean isGroupCommitStreamLoadSql = false;
 
     private boolean isFromDeleteOrUpdateStmt = false;
 
@@ -933,10 +935,17 @@ public class NativeInsertStmt extends InsertStmt {
         }
         if (targetTable instanceof OlapTable) {
             checkInnerGroupCommit();
-            OlapTableSink sink = isGroupCommitTvf ? new 
GroupCommitOlapTableSink((OlapTable) targetTable, olapTuple,
-                    targetPartitionIds, 
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert())
-                    : new OlapTableSink((OlapTable) targetTable, olapTuple, 
targetPartitionIds,
-                            
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
+            OlapTableSink sink;
+            if (isGroupCommitTvf) {
+                sink = new GroupCommitOlapTableSink((OlapTable) targetTable, 
olapTuple,
+                        targetPartitionIds, 
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
+            } else if (isGroupCommitStreamLoadSql) {
+                sink = new GroupCommitBlockSink((OlapTable) targetTable, 
olapTuple,
+                        targetPartitionIds, 
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
+            } else {
+                sink = new OlapTableSink((OlapTable) targetTable, olapTuple, 
targetPartitionIds,
+                        
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
+            }
             dataSink = sink;
             sink.setPartialUpdateInputColumns(isPartialUpdate, 
partialUpdateCols);
             dataPartition = dataSink.getOutputPartition();
@@ -1092,7 +1101,8 @@ public class NativeInsertStmt extends InsertStmt {
         streamLoadPutRequest.setDb(db.getFullName()).setMaxFilterRatio(1)
                 .setTbl(getTbl())
                 
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
-                
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId);
+                
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)
+                .setGroupCommit(true);
         StreamLoadTask streamLoadTask = 
StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest);
         StreamLoadPlanner planner = new StreamLoadPlanner((Database) 
getDbObj(), olapTable, streamLoadTask);
         // Will using load id as query id in fragment
diff --git a/be/src/vec/core/future_block.cpp 
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java
similarity index 50%
copy from be/src/vec/core/future_block.cpp
copy to 
fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java
index 3f3f59c3446..63ab187335c 100644
--- a/be/src/vec/core/future_block.cpp
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java
@@ -15,30 +15,22 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "vec/core/future_block.h"
+package org.apache.doris.planner;
 
-#include <tuple>
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.thrift.TDataSinkType;
 
-namespace doris::vectorized {
+import java.util.List;
 
-void FutureBlock::set_info(int64_t schema_version, const TUniqueId& load_id, 
bool first, bool eos) {
-    this->_schema_version = schema_version;
-    this->_load_id = load_id;
-    this->_first = first;
-    this->_eos = eos;
-}
+public class GroupCommitBlockSink extends OlapTableSink {
 
-void FutureBlock::set_result(Status status, int64_t total_rows, int64_t 
loaded_rows) {
-    auto result = std::make_tuple(true, status, total_rows, loaded_rows);
-    result.swap(*_result);
-}
+    public GroupCommitBlockSink(OlapTable dstTable, TupleDescriptor 
tupleDescriptor, List<Long> partitionIds,
+            boolean singleReplicaLoad) {
+        super(dstTable, tupleDescriptor, partitionIds, singleReplicaLoad);
+    }
 
-void FutureBlock::swap_future_block(std::shared_ptr<FutureBlock> other) {
-    Block::swap(*other.get());
-    set_info(other->_schema_version, other->_load_id, other->_first, 
other->_eos);
-    lock = other->lock;
-    cv = other->cv;
-    _result = other->_result;
+    protected TDataSinkType getDataSinkType() {
+        return TDataSinkType.GROUP_COMMIT_BLOCK_SINK;
+    }
 }
-
-} // namespace doris::vectorized
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index bd1f6c8bff2..23bec446f4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -125,6 +125,7 @@ public class OlapTableSink extends DataSink {
         tSink.setLoadId(loadId);
         tSink.setTxnId(txnId);
         tSink.setDbId(dbId);
+        tSink.setBaseSchemaVersion(dstTable.getBaseSchemaVersion());
         tSink.setLoadChannelTimeoutS(loadChannelTimeoutS);
         tSink.setSendBatchParallelism(sendBatchParallelism);
         this.isStrictMode = isStrictMode;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index a6e47f33f26..934bca7ac0c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -48,6 +48,7 @@ import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.load.routineload.RoutineLoadJob;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.task.LoadTaskInfo;
+import org.apache.doris.task.StreamLoadTask;
 import org.apache.doris.thrift.PaloInternalServiceVersion;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
@@ -254,10 +255,15 @@ public class StreamLoadPlanner {
 
         // create dest sink
         List<Long> partitionIds = getAllPartitionIds();
-        OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, 
partitionIds,
-                Config.enable_single_replica_load);
-        olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
-                taskInfo.getSendBatchParallelism(), 
taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
+        OlapTableSink olapTableSink;
+        if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) 
taskInfo).isGroupCommit()) {
+            olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, 
partitionIds,
+                    Config.enable_single_replica_load);
+        } else {
+            olapTableSink = new OlapTableSink(destTable, tupleDesc, 
partitionIds, Config.enable_single_replica_load);
+        }
+        olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, 
taskInfo.getSendBatchParallelism(),
+                taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
         olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, 
partialUpdateInputColumns);
         olapTableSink.complete(analyzer);
 
@@ -463,8 +469,13 @@ public class StreamLoadPlanner {
 
         // create dest sink
         List<Long> partitionIds = getAllPartitionIds();
-        OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, 
partitionIds,
-                Config.enable_single_replica_load);
+        OlapTableSink olapTableSink;
+        if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) 
taskInfo).isGroupCommit()) {
+            olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, 
partitionIds,
+                    Config.enable_single_replica_load);
+        } else {
+            olapTableSink = new OlapTableSink(destTable, tupleDesc, 
partitionIds, Config.enable_single_replica_load);
+        }
         olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
                 taskInfo.getSendBatchParallelism(), 
taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
         olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, 
partialUpdateInputColumns);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 7d8626c1ea0..d9d0a03f3d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2090,8 +2090,11 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             NativeInsertStmt parsedStmt = (NativeInsertStmt) 
SqlParserUtils.getFirstStmt(parser);
             parsedStmt.setOrigStmt(new OriginStatement(originStmt, 0));
             parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
-            if (request.isGroupCommit() && parsedStmt.getLabel() != null) {
-                throw new AnalysisException("label and group_commit can't be 
set at the same time");
+            if (request.isGroupCommit()) {
+                if (parsedStmt.getLabel() != null) {
+                    throw new AnalysisException("label and group_commit can't 
be set at the same time");
+                }
+                parsedStmt.isGroupCommitStreamLoadSql = true;
             }
             StmtExecutor executor = new StmtExecutor(ctx, parsedStmt);
             ctx.setExecutor(executor);
@@ -2235,13 +2238,15 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             StreamLoadPlanner planner = new StreamLoadPlanner(db, table, 
streamLoadTask);
             TPipelineFragmentParams plan = 
planner.planForPipeline(streamLoadTask.getId(),
                     multiTableFragmentInstanceIdIndex);
-            // add table indexes to transaction state
-            TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
-                    .getTransactionState(db.getId(), request.getTxnId());
-            if (txnState == null) {
-                throw new UserException("txn does not exist: " + 
request.getTxnId());
+            if (!request.isGroupCommit()) {
+                // add table indexes to transaction state
+                TransactionState txnState = 
Env.getCurrentGlobalTransactionMgr()
+                        .getTransactionState(db.getId(), request.getTxnId());
+                if (txnState == null) {
+                    throw new UserException("txn does not exist: " + 
request.getTxnId());
+                }
+                txnState.addTableIndexes(table);
             }
-            txnState.addTableIndexes(table);
             return plan;
         } finally {
             table.readUnlock();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
index 3abcbee06ec..0b08f8c988c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
@@ -69,10 +69,10 @@ public class GroupCommitTableValuedFunction extends 
ExternalFileTableValuedFunct
         Column deleteSignColumn = ((OlapTable) table).getDeleteSignColumn();
         List<Column> tableColumns = table.getBaseSchema(false);
         for (int i = 1; i <= tableColumns.size(); i++) {
-            fileColumns.add(new Column("c" + i, tableColumns.get(i - 
1).getDataType(), true));
+            fileColumns.add(new Column("c" + i, tableColumns.get(i - 
1).getType(), true));
         }
         if (deleteSignColumn != null) {
-            fileColumns.add(new Column("c" + (tableColumns.size() + 1), 
deleteSignColumn.getDataType(), true));
+            fileColumns.add(new Column("c" + (tableColumns.size() + 1), 
deleteSignColumn.getType(), true));
         }
         return fileColumns;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 4bcf28da156..485a3599b38 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -94,6 +94,8 @@ public class StreamLoadTask implements LoadTaskInfo {
 
     private byte escape = 0;
 
+    private boolean groupCommit = false;
+
     public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, 
TFileFormatType formatType,
             TFileCompressType compressType) {
         this.id = id;
@@ -312,6 +314,7 @@ public class StreamLoadTask implements LoadTaskInfo {
                 request.getFileType(), request.getFormatType(),
                 request.getCompressType());
         streamLoadTask.setOptionalFromTSLPutRequest(request);
+        streamLoadTask.setGroupCommit(request.isGroupCommit());
         if (request.isSetFileSize()) {
             streamLoadTask.fileSize = request.getFileSize();
         }
@@ -519,5 +522,13 @@ public class StreamLoadTask implements LoadTaskInfo {
     public double getMaxFilterRatio() {
         return maxFilterRatio;
     }
+
+    public void setGroupCommit(boolean groupCommit) {
+        this.groupCommit = groupCommit;
+    }
+
+    public boolean isGroupCommit() {
+        return groupCommit;
+    }
 }
 
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index f1cad4cf266..91458072d3f 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -37,6 +37,7 @@ enum TDataSinkType {
     JDBC_TABLE_SINK,
     MULTI_CAST_DATA_STREAM_SINK,
     GROUP_COMMIT_OLAP_TABLE_SINK,
+    GROUP_COMMIT_BLOCK_SINK,
 }
 
 enum TResultSinkType {
@@ -255,6 +256,7 @@ struct TOlapTableSink {
     18: optional Descriptors.TOlapTableLocationParam slave_location
     19: optional i64 txn_timeout_s // timeout of load txn in second
     20: optional bool write_file_cache
+    21: optional i64 base_schema_version
 }
 
 struct TDataSink {
diff --git 
a/regression-test/data/insert_p0/insert_group_commit_into_duplicate.out 
b/regression-test/data/insert_p0/insert_group_commit_into_duplicate.out
index 5ee9ddd0be0..97fc1897552 100644
--- a/regression-test/data/insert_p0/insert_group_commit_into_duplicate.out
+++ b/regression-test/data/insert_p0/insert_group_commit_into_duplicate.out
@@ -91,3 +91,6 @@ q     50
 q      50
 q      50
 
+-- !sql --
+0      service_46da0dab-e27d-4820-aea2-9bfc15741615    1697032066304   0       
3229b7cd-f3a2-4359-aa24-946388c9cc54    0       
CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVm
 [...]
+
diff --git 
a/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy
index eed1a26f143..c5ee05ac1e4 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy
@@ -53,6 +53,20 @@ suite("insert_group_commit_into_duplicate") {
         return false
     }
 
+    def group_commit_insert = { sql, expected_row_count ->
+        def stmt = prepareStatement """ ${sql}  """
+        def result = stmt.executeUpdate()
+        logger.info("insert result: " + result)
+        def serverInfo = (((StatementImpl) stmt).results).getServerInfo()
+        logger.info("result server info: " + serverInfo)
+        if (result != expected_row_count) {
+            logger.warn("insert result: " + result + ", expected_row_count: " 
+ expected_row_count + ", sql: " + sql)
+        }
+        // assertEquals(result, expected_row_count)
+        assertTrue(serverInfo.contains("'status':'PREPARE'"))
+        assertTrue(serverInfo.contains("'label':'group_commit_"))
+    }
+
     try {
         // create table
         sql """ drop table if exists ${table}; """
@@ -74,20 +88,6 @@ suite("insert_group_commit_into_duplicate") {
         );
         """
 
-        def group_commit_insert = { sql, expected_row_count ->
-            def stmt = prepareStatement """ ${sql}  """
-            def result = stmt.executeUpdate()
-            logger.info("insert result: " + result)
-            def serverInfo = (((StatementImpl) stmt).results).getServerInfo()
-            logger.info("result server info: " + serverInfo)
-            if (result != expected_row_count) {
-                logger.warn("insert result: " + result + ", 
expected_row_count: " + expected_row_count + ", sql: " + sql)
-            }
-            // assertEquals(result, expected_row_count)
-            assertTrue(serverInfo.contains("'status':'PREPARE'"))
-            assertTrue(serverInfo.contains("'label':'group_commit_"))
-        }
-
         connect(user = context.config.jdbcUser, password = 
context.config.jdbcPassword, url = context.config.jdbcUrl) {
             sql """ set enable_insert_group_commit = true; """
             // TODO
@@ -178,4 +178,54 @@ suite("insert_group_commit_into_duplicate") {
     } finally {
         // try_sql("DROP TABLE ${table}")
     }
+
+    // table with array type
+    tableName = "insert_group_commit_into_duplicate_array"
+    table = dbName + "." + tableName
+    try {
+        // create table
+        sql """ drop table if exists ${table}; """
+
+        sql """
+        CREATE table ${table} (
+            teamID varchar(255),
+            service_id varchar(255),
+            start_time BigInt,
+            time_bucket BigInt ,
+            segment_id String ,
+            trace_id String ,
+            data_binary String ,
+            end_time BigInt ,
+            endpoint_id String ,
+            endpoint_name String ,
+            is_error Boolean ,
+            latency Int ,
+            service_instance_id String ,
+            statement String ,
+            tags Array<String>
+        ) UNIQUE key (`teamID`,`service_id`, `start_time`)
+        DISTRIBUTED BY hash(`start_time`)
+        BUCKETS 1
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+        """
+
+        connect(user = context.config.jdbcUser, password = 
context.config.jdbcPassword, url = context.config.jdbcUrl) {
+            sql """ set enable_insert_group_commit = true; """
+            // TODO
+            sql """ set enable_nereids_dml = false; """
+
+            // 1. insert into
+            group_commit_insert """ 
+            INSERT INTO ${table} (`data_binary`, `end_time`, `endpoint_id`, 
`endpoint_name`, `is_error`, `latency`, `segment_id`, `service_id`, 
`service_instance_id`, `start_time`, `statement`, `tags`, `teamID`, 
`time_bucket`, `trace_id`) 
+            VALUES 
+            
('CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2
 [...]
+            1697032066304, '36b2d9ff-4c25-49f3-a726-eea812564411', 
'355f96cd-b1b1-4688-a5f6-a8e3f3a55c9a', false, 3, 
'3229b7cd-f3a2-4359-aa24-946388c9cc54', 
'service_46da0dab-e27d-4820-aea2-9bfc15741615', 
'service_instanceac89a4b7-81f7-43e8-85ed-d2b578d98050', 1697032066304, 
'statement: b9903670-3821-4f4c-a587-bbcf02c04b77', ['[tagKey_5=tagValue_5, 
tagKey_3=tagValue_3, tagKey_1=tagValue_1, tagKey_16=tagValue_16, 
tagKey_8=tagValue_8, tagKey_15=tagValue_15, tagKey_6=tagValue_6, tagKey_11=t 
[...]
+            """, 1
+
+            getRowCount(1)
+            qt_sql """ select * from ${table}; """
+        }
+    } finally {
+        // try_sql("DROP TABLE ${table}")
+    }
 }
diff --git 
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
 
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
index b1c51286e00..818749f2ffc 100644
--- 
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
+++ 
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
@@ -49,6 +49,26 @@ suite("test_group_commit_http_stream") {
         return false
     }
 
+    def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, 
filtered_rows, unselected_rows ->
+        if (exception != null) {
+            throw exception
+        }
+        log.info("Stream load result: ${result}".toString())
+        def json = parseJson(result)
+        assertEquals("success", json.Status.toLowerCase())
+        assertTrue(json.GroupCommit)
+        assertTrue(json.Label.startsWith("group_commit_"))
+        assertEquals(total_rows, json.NumberTotalRows)
+        assertEquals(loaded_rows, json.NumberLoadedRows)
+        assertEquals(filtered_rows, json.NumberFilteredRows)
+        assertEquals(unselected_rows, json.NumberUnselectedRows)
+        if (filtered_rows > 0) {
+            assertFalse(json.ErrorURL.isEmpty())
+        } else {
+            assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+        }
+    }
+
     try {
         // create table
         sql """ drop table if exists ${tableName}; """
@@ -85,6 +105,10 @@ suite("test_group_commit_http_stream") {
                 unset 'label'
 
                 time 10000 // limit inflight 10s
+
+                check { result, exception, startTime, endTime ->
+                    checkStreamLoadResult(exception, result, 4, 4, 0, 0)
+                }
             }
         }
 
@@ -101,6 +125,10 @@ suite("test_group_commit_http_stream") {
             unset 'label'
 
             time 10000 // limit inflight 10s
+
+            check { result, exception, startTime, endTime ->
+                checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+            }
         }
 
         // stream load with different column order
@@ -116,6 +144,10 @@ suite("test_group_commit_http_stream") {
             unset 'label'
 
             time 10000 // limit inflight 10s
+
+            check { result, exception, startTime, endTime ->
+                checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+            }
         }
 
         // stream load with where condition
@@ -133,17 +165,8 @@ suite("test_group_commit_http_stream") {
             time 10000 // limit inflight 10s
 
             check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertTrue(json.GroupCommit)
-                // assertEquals(2, json.NumberTotalRows)
-                assertEquals(1, json.NumberLoadedRows)
-                assertEquals(0, json.NumberFilteredRows)
-                // assertEquals(1, json.NumberUnselectedRows)
+                // TODO different with stream load: 2, 1, 0, 1
+                checkStreamLoadResult(exception, result, 1, 1, 0, 0)
             }
         }
 
@@ -160,6 +183,10 @@ suite("test_group_commit_http_stream") {
             unset 'label'
 
             time 10000 // limit inflight 10s
+
+            check { result, exception, startTime, endTime ->
+                checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+            }
         }
 
         // stream load with filtered rows
@@ -179,18 +206,8 @@ suite("test_group_commit_http_stream") {
             time 10000 // limit inflight 10s
 
             check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertTrue(json.GroupCommit)
-                // assertEquals(6, json.NumberTotalRows)
-                // assertEquals(2, json.NumberLoadedRows)
-                // assertEquals(3, json.NumberFilteredRows)
-                // assertEquals(1, json.NumberUnselectedRows)
-                // assertFalse(json.ErrorURL.isEmpty())
+                // TODO different with stream load: 6, 2, 3, 1
+                checkStreamLoadResult(exception, result, 6, 4, 2, 0)
             }
         }
 
@@ -217,7 +234,7 @@ suite("test_group_commit_http_stream") {
             }
         }
 
-        getRowCount(7)
+        getRowCount(23)
         qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
     } finally {
         // try_sql("DROP TABLE ${tableName}")
@@ -301,18 +318,7 @@ suite("test_group_commit_http_stream") {
                 // if declared a check callback, the default check condition 
will ignore.
                 // So you must check all condition
                 check { result, exception, startTime, endTime ->
-                    if (exception != null) {
-                        throw exception
-                    }
-                    log.info("Stream load ${i}, result: ${result}")
-                    def json = parseJson(result)
-                    assertEquals("success", json.Status.toLowerCase())
-                    assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
-                    if (json.NumberLoadedRows != 600572) {
-                        logger.warn("Stream load ${i}, loaded rows: 
${json.NumberLoadedRows}")
-                    }
-                    assertTrue(json.LoadBytes > 0)
-                    assertTrue(json.GroupCommit)
+                    checkStreamLoadResult(exception, result, 600572, 600572, 
0, 0)
                 }
             }
         }
diff --git 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
index 6034e35a109..b5f46f29225 100644
--- 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
@@ -48,6 +48,26 @@ suite("test_group_commit_stream_load") {
         return false
     }
 
+    def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, 
filtered_rows, unselected_rows ->
+        if (exception != null) {
+            throw exception
+        }
+        log.info("Stream load result: ${result}".toString())
+        def json = parseJson(result)
+        assertEquals("success", json.Status.toLowerCase())
+        assertTrue(json.GroupCommit)
+        assertTrue(json.Label.startsWith("group_commit_"))
+        assertEquals(total_rows, json.NumberTotalRows)
+        assertEquals(loaded_rows, json.NumberLoadedRows)
+        assertEquals(filtered_rows, json.NumberFilteredRows)
+        assertEquals(unselected_rows, json.NumberUnselectedRows)
+        if (filtered_rows > 0) {
+            assertFalse(json.ErrorURL.isEmpty())
+        } else {
+            assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+        }
+    }
+
     try {
         // create table
         sql """ drop table if exists ${tableName}; """
@@ -84,6 +104,10 @@ suite("test_group_commit_stream_load") {
                 unset 'label'
 
                 time 10000 // limit inflight 10s
+
+                check { result, exception, startTime, endTime ->
+                    checkStreamLoadResult(exception, result, 4, 4, 0, 0)
+                }
             }
         }
 
@@ -98,6 +122,10 @@ suite("test_group_commit_stream_load") {
             unset 'label'
 
             time 10000 // limit inflight 10s
+
+            check { result, exception, startTime, endTime ->
+                checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+            }
         }
 
         // stream load with different column order
@@ -111,6 +139,10 @@ suite("test_group_commit_stream_load") {
             unset 'label'
 
             time 10000 // limit inflight 10s
+
+            check { result, exception, startTime, endTime ->
+                checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+            }
         }
 
         // stream load with where condition
@@ -127,17 +159,7 @@ suite("test_group_commit_stream_load") {
             time 10000 // limit inflight 10s
 
             check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertTrue(json.GroupCommit)
-                assertEquals(2, json.NumberTotalRows)
-                assertEquals(1, json.NumberLoadedRows)
-                assertEquals(0, json.NumberFilteredRows)
-                assertEquals(1, json.NumberUnselectedRows)
+                checkStreamLoadResult(exception, result, 2, 1, 0, 1)
             }
         }
 
@@ -152,6 +174,10 @@ suite("test_group_commit_stream_load") {
             unset 'label'
 
             time 10000 // limit inflight 10s
+
+            check { result, exception, startTime, endTime ->
+                checkStreamLoadResult(exception, result, 2, 2, 0, 0)
+            }
         }
 
         // stream load with filtered rows
@@ -168,18 +194,7 @@ suite("test_group_commit_stream_load") {
             time 10000 // limit inflight 10s
 
             check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertTrue(json.GroupCommit)
-                assertEquals(6, json.NumberTotalRows)
-                assertEquals(2, json.NumberLoadedRows)
-                assertEquals(3, json.NumberFilteredRows)
-                assertEquals(1, json.NumberUnselectedRows)
-                assertFalse(json.ErrorURL.isEmpty())
+                checkStreamLoadResult(exception, result, 6, 2, 3, 1)
             }
         }
 
@@ -286,19 +301,7 @@ suite("test_group_commit_stream_load") {
                 // if declared a check callback, the default check condition 
will ignore.
                 // So you must check all condition
                 check { result, exception, startTime, endTime ->
-                    if (exception != null) {
-                        throw exception
-                    }
-                    log.info("Stream load ${i}, result: ${result}")
-                    def json = parseJson(result)
-                    assertEquals("success", json.Status.toLowerCase())
-                    assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
-                    if (json.NumberLoadedRows != 600572) {
-                       logger.warn("Stream load ${i}, loaded rows: 
${json.NumberLoadedRows}")
-                    }
-                    // assertEquals(json.NumberLoadedRows, 600572)
-                    assertTrue(json.LoadBytes > 0)
-                    assertTrue(json.GroupCommit)
+                    checkStreamLoadResult(exception, result, 600572, 600572, 
0, 0)
                 }
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to