This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e5ff40e173 [refactor](group commit) remove future block (#27720)
1e5ff40e173 is described below

commit 1e5ff40e17376ae898b3caf4573df2ce01a6e3ae
Author: meiyi <[email protected]>
AuthorDate: Mon Dec 11 08:41:51 2023 +0800

    [refactor](group commit) remove future block (#27720)
    
    
    
    Co-authored-by: huanghaibin <[email protected]>
---
 be/src/exec/data_sink.cpp                          |  24 +-
 be/src/olap/wal_manager.h                          |   1 +
 be/src/pipeline/exec/olap_table_sink_operator.cpp  |   2 +-
 be/src/pipeline/exec/olap_table_sink_operator.h    |   4 +-
 .../pipeline/exec/olap_table_sink_v2_operator.cpp  |   2 +-
 be/src/pipeline/exec/olap_table_sink_v2_operator.h |   4 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |   3 +-
 be/src/pipeline/pipeline_fragment_context.h        |   6 +-
 be/src/pipeline/pipeline_task.cpp                  |  21 +-
 .../pipeline_x/pipeline_x_fragment_context.cpp     |   8 +-
 .../pipeline_x/pipeline_x_fragment_context.h       |   3 +-
 be/src/runtime/fragment_mgr.cpp                    |   3 +-
 be/src/runtime/group_commit_mgr.cpp                |  70 ++++--
 be/src/runtime/group_commit_mgr.h                  |  30 ++-
 be/src/runtime/plan_fragment_executor.cpp          |  22 +-
 be/src/runtime/plan_fragment_executor.h            |   2 -
 be/src/vec/core/block.h                            |   2 +-
 be/src/vec/core/future_block.cpp                   |  42 ----
 be/src/vec/core/future_block.h                     |  57 -----
 be/src/vec/sink/group_commit_block_sink.cpp        |  53 ++--
 be/src/vec/sink/group_commit_block_sink.h          |   3 -
 be/src/vec/sink/vtablet_sink.cpp                   |   9 +-
 be/src/vec/sink/vtablet_sink.h                     |   4 +-
 be/src/vec/sink/vtablet_sink_v2.cpp                |   8 +-
 be/src/vec/sink/vtablet_sink_v2.h                  |   4 +-
 be/src/vec/sink/writer/vtablet_writer.cpp          |  20 +-
 be/src/vec/sink/writer/vtablet_writer.h            |   7 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |   3 +-
 be/src/vec/sink/writer/vtablet_writer_v2.h         |   3 +-
 be/src/vec/sink/writer/vwal_writer.cpp             |  72 ++----
 be/src/vec/sink/writer/vwal_writer.h               |  18 +-
 be/test/vec/exec/vtablet_sink_test.cpp             | 269 +--------------------
 .../apache/doris/analysis/NativeInsertStmt.java    |  21 +-
 .../doris/planner/GroupCommitOlapTableSink.java    |  36 ---
 .../apache/doris/service/FrontendServiceImpl.java  |   3 -
 gensrc/thrift/DataSinks.thrift                     |   2 +-
 gensrc/thrift/PaloInternalService.thrift           |   2 +-
 .../http_stream/test_group_commit_http_stream.out  |   1 -
 .../test_group_commit_http_stream.groovy           |   6 +-
 .../test_group_commit_stream_load.groovy           |   4 +-
 40 files changed, 168 insertions(+), 686 deletions(-)

diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index 970e7a3a18a..95934d95996 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -145,23 +145,17 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         RETURN_ERROR_IF_NON_VEC;
         break;
     }
+    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
     case TDataSinkType::OLAP_TABLE_SINK: {
         DCHECK(thrift_sink.__isset.olap_table_sink);
         if (state->query_options().enable_memtable_on_sink_node &&
             
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
-            sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, 
output_exprs, false));
+            sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, 
output_exprs));
         } else {
-            sink->reset(new vectorized::VOlapTableSink(pool, row_desc, 
output_exprs, false));
+            sink->reset(new vectorized::VOlapTableSink(pool, row_desc, 
output_exprs));
         }
         break;
     }
-    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: {
-        Status status = Status::OK();
-        DCHECK(thrift_sink.__isset.olap_table_sink);
-        sink->reset(new vectorized::VOlapTableSink(pool, row_desc, 
output_exprs, true));
-        RETURN_IF_ERROR(status);
-        break;
-    }
     case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
         Status status = Status::OK();
         DCHECK(thrift_sink.__isset.olap_table_sink);
@@ -298,13 +292,14 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         RETURN_ERROR_IF_NON_VEC;
         break;
     }
+    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
     case TDataSinkType::OLAP_TABLE_SINK: {
         DCHECK(thrift_sink.__isset.olap_table_sink);
         if (state->query_options().enable_memtable_on_sink_node &&
             
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
-            sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, 
output_exprs, false));
+            sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, 
output_exprs));
         } else {
-            sink->reset(new vectorized::VOlapTableSink(pool, row_desc, 
output_exprs, false));
+            sink->reset(new vectorized::VOlapTableSink(pool, row_desc, 
output_exprs));
         }
         break;
     }
@@ -316,13 +311,6 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         sink->reset(new 
vectorized::MultiCastDataStreamSink(multi_cast_data_streamer));
         break;
     }
-    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: {
-        Status status = Status::OK();
-        DCHECK(thrift_sink.__isset.olap_table_sink);
-        sink->reset(new vectorized::VOlapTableSink(pool, row_desc, 
output_exprs, true));
-        RETURN_IF_ERROR(status);
-        break;
-    }
     case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
         Status status = Status::OK();
         DCHECK(thrift_sink.__isset.olap_table_sink);
diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h
index 4634916c608..d0a547a8d6f 100644
--- a/be/src/olap/wal_manager.h
+++ b/be/src/olap/wal_manager.h
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#pragma once
 #include <gen_cpp/PaloInternalService_types.h>
 
 #include <condition_variable>
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.cpp 
b/be/src/pipeline/exec/olap_table_sink_operator.cpp
index f5f9da08136..7c9e71da56c 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/olap_table_sink_operator.cpp
@@ -34,7 +34,7 @@ Status OlapTableSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& in
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<Parent>();
-    RETURN_IF_ERROR(_writer->init_properties(p._pool, p._group_commit));
+    RETURN_IF_ERROR(_writer->init_properties(p._pool));
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h 
b/be/src/pipeline/exec/olap_table_sink_operator.h
index 762fee5982e..9075e3cb03f 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_operator.h
@@ -69,11 +69,10 @@ class OlapTableSinkOperatorX final : public 
DataSinkOperatorX<OlapTableSinkLocal
 public:
     using Base = DataSinkOperatorX<OlapTableSinkLocalState>;
     OlapTableSinkOperatorX(ObjectPool* pool, int operator_id, const 
RowDescriptor& row_desc,
-                           const std::vector<TExpr>& t_output_expr, bool 
group_commit)
+                           const std::vector<TExpr>& t_output_expr)
             : Base(operator_id, 0),
               _row_desc(row_desc),
               _t_output_expr(t_output_expr),
-              _group_commit(group_commit),
               _pool(pool) {};
 
     Status init(const TDataSink& thrift_sink) override {
@@ -107,7 +106,6 @@ private:
     const RowDescriptor& _row_desc;
     vectorized::VExprContextSPtrs _output_vexpr_ctxs;
     const std::vector<TExpr>& _t_output_expr;
-    const bool _group_commit;
     ObjectPool* _pool = nullptr;
 };
 
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp 
b/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
index 99efc1d752e..0f43111ef55 100644
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
@@ -30,7 +30,7 @@ Status OlapTableSinkV2LocalState::init(RuntimeState* state, 
LocalSinkStateInfo&
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<Parent>();
-    RETURN_IF_ERROR(_writer->init_properties(p._pool, p._group_commit));
+    RETURN_IF_ERROR(_writer->init_properties(p._pool));
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h 
b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
index d8f7c0b7921..08a6a39d56d 100644
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
@@ -70,11 +70,10 @@ class OlapTableSinkV2OperatorX final : public 
DataSinkOperatorX<OlapTableSinkV2L
 public:
     using Base = DataSinkOperatorX<OlapTableSinkV2LocalState>;
     OlapTableSinkV2OperatorX(ObjectPool* pool, int operator_id, const 
RowDescriptor& row_desc,
-                             const std::vector<TExpr>& t_output_expr, bool 
group_commit)
+                             const std::vector<TExpr>& t_output_expr)
             : Base(operator_id, 0),
               _row_desc(row_desc),
               _t_output_expr(t_output_expr),
-              _group_commit(group_commit),
               _pool(pool) {};
 
     Status init(const TDataSink& thrift_sink) override {
@@ -109,7 +108,6 @@ private:
     const RowDescriptor& _row_desc;
     vectorized::VExprContextSPtrs _output_vexpr_ctxs;
     const std::vector<TExpr>& _t_output_expr;
-    const bool _group_commit;
     ObjectPool* _pool = nullptr;
 };
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 8bf884692df..e23b21656dc 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -116,7 +116,7 @@ PipelineFragmentContext::PipelineFragmentContext(
         const TUniqueId& query_id, const TUniqueId& instance_id, const int 
fragment_id,
         int backend_num, std::shared_ptr<QueryContext> query_ctx, ExecEnv* 
exec_env,
         const std::function<void(RuntimeState*, Status*)>& call_back,
-        const report_status_callback& report_status_cb, bool group_commit)
+        const report_status_callback& report_status_cb)
         : _query_id(query_id),
           _fragment_instance_id(instance_id),
           _fragment_id(fragment_id),
@@ -126,7 +126,6 @@ PipelineFragmentContext::PipelineFragmentContext(
           _call_back(call_back),
           _is_report_on_cancel(true),
           _report_status_cb(report_status_cb),
-          _group_commit(group_commit),
           _create_time(MonotonicNanos()) {
     if (_query_ctx->get_task_group()) {
         _task_group_entity = _query_ctx->get_task_group()->task_entity();
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index a705230d2f4..e95bef870a3 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -64,8 +64,7 @@ public:
                             const int fragment_id, int backend_num,
                             std::shared_ptr<QueryContext> query_ctx, ExecEnv* 
exec_env,
                             const std::function<void(RuntimeState*, Status*)>& 
call_back,
-                            const report_status_callback& report_status_cb,
-                            bool group_commit = false);
+                            const report_status_callback& report_status_cb);
 
     virtual ~PipelineFragmentContext();
 
@@ -133,8 +132,6 @@ public:
         return _task_group_entity;
     }
     void trigger_report_if_necessary();
-
-    bool is_group_commit() { return _group_commit; }
     virtual void instance_ids(std::vector<TUniqueId>& ins_ids) const {
         ins_ids.resize(1);
         ins_ids[0] = _fragment_instance_id;
@@ -236,7 +233,6 @@ private:
         return nullptr;
     }
     std::vector<std::unique_ptr<PipelineTask>> _tasks;
-    bool _group_commit;
 
     uint64_t _create_time;
 
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index a808a2eb572..75694373a4c 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -34,7 +34,6 @@
 #include "task_queue.h"
 #include "util/defer_op.h"
 #include "util/runtime_profile.h"
-#include "vec/core/future_block.h"
 
 namespace doris {
 class RuntimeState;
@@ -167,8 +166,7 @@ Status PipelineTask::prepare(RuntimeState* state) {
     fmt::format_to(operator_ids_str, "]");
     _task_profile->add_info_string("OperatorIds(source2root)", 
fmt::to_string(operator_ids_str));
 
-    _block = _fragment_context->is_group_commit() ? 
doris::vectorized::FutureBlock::create_unique()
-                                                  : 
doris::vectorized::Block::create_unique();
+    _block = doris::vectorized::Block::create_unique();
 
     // We should make sure initial state for task are runnable so that we can 
do some preparation jobs (e.g. initialize runtime filters).
     set_state(PipelineTaskState::RUNNABLE);
@@ -257,16 +255,6 @@ Status PipelineTask::execute(bool* eos) {
     }
 
     auto status = Status::OK();
-    auto handle_group_commit = [&]() {
-        if (UNLIKELY(_fragment_context->is_group_commit() && !status.ok() && 
_block != nullptr)) {
-            auto* future_block = 
dynamic_cast<vectorized::FutureBlock*>(_block.get());
-            std::unique_lock<std::mutex> l(*(future_block->lock));
-            if (!future_block->is_handled()) {
-                future_block->set_result(status, 0, 0);
-                future_block->cv->notify_all();
-            }
-        }
-    };
 
     this->set_begin_execute_time();
     while (!_fragment_context->is_canceled()) {
@@ -291,11 +279,7 @@ Status PipelineTask::execute(bool* eos) {
         {
             SCOPED_TIMER(_get_block_timer);
             _get_block_counter->update(1);
-            status = _root->get_block(_state, block, _data_state);
-            if (UNLIKELY(!status.ok())) {
-                handle_group_commit();
-                return status;
-            }
+            RETURN_IF_ERROR(_root->get_block(_state, block, _data_state));
         }
         *eos = _data_state == SourceState::FINISHED;
 
@@ -306,7 +290,6 @@ Status PipelineTask::execute(bool* eos) {
                 RETURN_IF_ERROR(_collect_query_statistics());
             }
             status = _sink->sink(_state, block, _data_state);
-            handle_group_commit();
             if (!status.is<ErrorCode::END_OF_FILE>()) {
                 RETURN_IF_ERROR(status);
             }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 6a3f38d2c25..ac19c92ff55 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -108,9 +108,9 @@ namespace doris::pipeline {
 PipelineXFragmentContext::PipelineXFragmentContext(
         const TUniqueId& query_id, const int fragment_id, 
std::shared_ptr<QueryContext> query_ctx,
         ExecEnv* exec_env, const std::function<void(RuntimeState*, Status*)>& 
call_back,
-        const report_status_callback& report_status_cb, bool group_commit)
+        const report_status_callback& report_status_cb)
         : PipelineFragmentContext(query_id, TUniqueId(), fragment_id, -1, 
query_ctx, exec_env,
-                                  call_back, report_status_cb, group_commit) {}
+                                  call_back, report_status_cb) {}
 
 PipelineXFragmentContext::~PipelineXFragmentContext() {
     auto st = _query_ctx->exec_status();
@@ -340,10 +340,10 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
         if (state->query_options().enable_memtable_on_sink_node &&
             
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
             _sink.reset(new OlapTableSinkV2OperatorX(pool, 
next_sink_operator_id(), row_desc,
-                                                     output_exprs, false));
+                                                     output_exprs));
         } else {
             _sink.reset(new OlapTableSinkOperatorX(pool, 
next_sink_operator_id(), row_desc,
-                                                   output_exprs, false));
+                                                   output_exprs));
         }
         break;
     }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 3719445babd..a95a90e356d 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -64,8 +64,7 @@ public:
     PipelineXFragmentContext(const TUniqueId& query_id, const int fragment_id,
                              std::shared_ptr<QueryContext> query_ctx, ExecEnv* 
exec_env,
                              const std::function<void(RuntimeState*, 
Status*)>& call_back,
-                             const report_status_callback& report_status_cb,
-                             bool group_commit = false);
+                             const report_status_callback& report_status_cb);
 
     ~PipelineXFragmentContext() override;
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 40404423e4c..57074bc629c 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -838,8 +838,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
                         query_ctx->query_id(), params.fragment_id, query_ctx, 
_exec_env, cb,
                         std::bind<Status>(
                                 
std::mem_fn(&FragmentMgr::trigger_pipeline_context_report), this,
-                                std::placeholders::_1, std::placeholders::_2),
-                        params.group_commit);
+                                std::placeholders::_1, std::placeholders::_2));
         {
             SCOPED_RAW_TIMER(&duration_ns);
             auto prepare_st = context->prepare(params);
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index c044ccca3b2..3b3264c5d0d 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -32,8 +32,7 @@
 
 namespace doris {
 
-Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock> 
block) {
-    DCHECK(block->get_schema_version() == schema_version);
+Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::Block> block) {
     std::unique_lock l(mutex);
     RETURN_IF_ERROR(_status);
     while (_all_block_queues_bytes->load(std::memory_order_relaxed) >
@@ -43,6 +42,8 @@ Status 
LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock> block)
     }
     if (block->rows() > 0) {
         _block_queue.push_back(block);
+        //write wal
+        RETURN_IF_ERROR(_v_wal_writer->write_wal(block.get()));
         _all_block_queues_bytes->fetch_add(block->bytes(), 
std::memory_order_relaxed);
         _single_block_queue_bytes->fetch_add(block->bytes(), 
std::memory_order_relaxed);
     }
@@ -80,9 +81,8 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, 
bool* find_block, boo
         _get_cond.wait_for(l, std::chrono::milliseconds(left_milliseconds));
     }
     if (!_block_queue.empty()) {
-        auto& future_block = _block_queue.front();
-        auto* fblock = static_cast<vectorized::FutureBlock*>(block);
-        fblock->swap_future_block(future_block);
+        auto fblock = _block_queue.front();
+        block->swap(*fblock.get());
         *find_block = true;
         _block_queue.pop_front();
         _all_block_queues_bytes->fetch_sub(fblock->bytes(), 
std::memory_order_relaxed);
@@ -123,21 +123,18 @@ void LoadBlockQueue::cancel(const Status& st) {
     while (!_block_queue.empty()) {
         {
             auto& future_block = _block_queue.front();
-            std::unique_lock<std::mutex> l0(*(future_block->lock));
-            future_block->set_result(st, future_block->rows(), 0);
             _all_block_queues_bytes->fetch_sub(future_block->bytes(), 
std::memory_order_relaxed);
             _single_block_queue_bytes->fetch_sub(future_block->bytes(), 
std::memory_order_relaxed);
-            future_block->cv->notify_all();
         }
         _block_queue.pop_front();
     }
 }
 
 Status GroupCommitTable::get_first_block_load_queue(
-        int64_t table_id, std::shared_ptr<vectorized::FutureBlock> block,
-        std::shared_ptr<LoadBlockQueue>& load_block_queue) {
+        int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
+        std::shared_ptr<vectorized::Block> block, 
std::shared_ptr<LoadBlockQueue>& load_block_queue,
+        int be_exe_version) {
     DCHECK(table_id == _table_id);
-    auto base_schema_version = block->get_schema_version();
     {
         std::unique_lock l(_lock);
         for (int i = 0; i < 3; i++) {
@@ -145,7 +142,7 @@ Status GroupCommitTable::get_first_block_load_queue(
             for (auto it = _load_block_queues.begin(); it != 
_load_block_queues.end(); ++it) {
                 if (!it->second->need_commit) {
                     if (base_schema_version == it->second->schema_version) {
-                        if 
(it->second->add_load_id(block->get_load_id()).ok()) {
+                        if (it->second->add_load_id(load_id).ok()) {
                             load_block_queue = it->second;
                             return Status::OK();
                         }
@@ -160,13 +157,14 @@ Status GroupCommitTable::get_first_block_load_queue(
             if (!_need_plan_fragment) {
                 _need_plan_fragment = true;
                 RETURN_IF_ERROR(_thread_pool->submit_func([&] {
-                    [[maybe_unused]] auto st = 
_create_group_commit_load(load_block_queue);
+                    [[maybe_unused]] auto st =
+                            _create_group_commit_load(load_block_queue, 
be_exe_version);
                 }));
             }
             _cv.wait_for(l, std::chrono::seconds(4));
             if (load_block_queue != nullptr) {
                 if (load_block_queue->schema_version == base_schema_version) {
-                    if 
(load_block_queue->add_load_id(block->get_load_id()).ok()) {
+                    if (load_block_queue->add_load_id(load_id).ok()) {
                         return Status::OK();
                     }
                 } else if (base_schema_version < 
load_block_queue->schema_version) {
@@ -180,7 +178,7 @@ Status GroupCommitTable::get_first_block_load_queue(
 }
 
 Status GroupCommitTable::_create_group_commit_load(
-        std::shared_ptr<LoadBlockQueue>& load_block_queue) {
+        std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version) 
{
     Status st = Status::OK();
     std::unique_ptr<int, std::function<void(int*)>> 
finish_plan_func((int*)0x01, [&](int*) {
         if (!st.ok()) {
@@ -251,16 +249,16 @@ Status GroupCommitTable::_create_group_commit_load(
         std::unique_lock l(_lock);
         _load_block_queues.emplace(instance_id, load_block_queue);
         _need_plan_fragment = false;
-        _cv.notify_all();
-    }
-    if (_exec_env->wal_mgr()->is_running()) {
         _exec_env->wal_mgr()->add_wal_status_queue(_table_id, txn_id,
                                                    
WalManager::WAL_STATUS::PREPARE);
-        st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, 
is_pipeline, params,
-                                 pipeline_params);
-    } else {
-        st = Status::InternalError("be is stopping");
+        //create wal
+        RETURN_IF_ERROR(
+                load_block_queue->create_wal(_db_id, _table_id, txn_id, label, 
_exec_env->wal_mgr(),
+                                             params.desc_tbl.slotDescriptors, 
be_exe_version));
+        _cv.notify_all();
     }
+    st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, 
params,
+                             pipeline_params);
     if (!st.ok()) {
         static_cast<void>(_finish_group_commit_load(_db_id, _table_id, label, 
txn_id, instance_id,
                                                     st, true, nullptr));
@@ -315,6 +313,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
         auto it = _load_block_queues.find(instance_id);
         if (it != _load_block_queues.end()) {
             auto& load_block_queue = it->second;
+            //close wal
+            RETURN_IF_ERROR(load_block_queue->close_wal());
             if (prepare_failed || !status.ok()) {
                 load_block_queue->cancel(status);
             }
@@ -420,9 +420,12 @@ void GroupCommitMgr::stop() {
     LOG(INFO) << "GroupCommitMgr is stopped";
 }
 
-Status GroupCommitMgr::get_first_block_load_queue(
-        int64_t db_id, int64_t table_id, 
std::shared_ptr<vectorized::FutureBlock> block,
-        std::shared_ptr<LoadBlockQueue>& load_block_queue) {
+Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t 
table_id,
+                                                  int64_t base_schema_version,
+                                                  const UniqueId& load_id,
+                                                  
std::shared_ptr<vectorized::Block> block,
+                                                  
std::shared_ptr<LoadBlockQueue>& load_block_queue,
+                                                  int be_exe_version) {
     std::shared_ptr<GroupCommitTable> group_commit_table;
     {
         std::lock_guard wlock(_lock);
@@ -433,7 +436,8 @@ Status GroupCommitMgr::get_first_block_load_queue(
         }
         group_commit_table = _table_map[table_id];
     }
-    return group_commit_table->get_first_block_load_queue(table_id, block, 
load_block_queue);
+    return group_commit_table->get_first_block_load_queue(table_id, 
base_schema_version, load_id,
+                                                          block, 
load_block_queue, be_exe_version);
 }
 
 Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& 
instance_id,
@@ -450,4 +454,18 @@ Status GroupCommitMgr::get_load_block_queue(int64_t 
table_id, const TUniqueId& i
     }
     return group_commit_table->get_load_block_queue(instance_id, 
load_block_queue);
 }
+Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
+                                  const std::string& import_label, WalManager* 
wal_manager,
+                                  std::vector<TSlotDescriptor>& slot_desc, int 
be_exe_version) {
+    _v_wal_writer = std::make_shared<vectorized::VWalWriter>(
+            db_id, tb_id, txn_id, label, wal_manager, slot_desc, 
be_exe_version);
+    return _v_wal_writer->init();
+}
+
+Status LoadBlockQueue::close_wal() {
+    if (_v_wal_writer != nullptr) {
+        RETURN_IF_ERROR(_v_wal_writer->close());
+    }
+    return Status::OK();
+}
 } // namespace doris
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index 53ab6f61174..be129d54573 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -25,7 +25,7 @@
 #include "common/status.h"
 #include "util/threadpool.h"
 #include "vec/core/block.h"
-#include "vec/core/future_block.h"
+#include "vec/sink/writer/vwal_writer.h"
 
 namespace doris {
 class ExecEnv;
@@ -49,11 +49,15 @@ public:
         _single_block_queue_bytes = std::make_shared<std::atomic_size_t>(0);
     };
 
-    Status add_block(std::shared_ptr<vectorized::FutureBlock> block);
+    Status add_block(std::shared_ptr<vectorized::Block> block);
     Status get_block(vectorized::Block* block, bool* find_block, bool* eos);
     Status add_load_id(const UniqueId& load_id);
     void remove_load_id(const UniqueId& load_id);
     void cancel(const Status& st);
+    Status create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const 
std::string& import_label,
+                      WalManager* wal_manager, std::vector<TSlotDescriptor>& 
slot_desc,
+                      int be_exe_version);
+    Status close_wal();
 
     static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000;
     UniqueId load_instance_id;
@@ -72,7 +76,7 @@ private:
     std::condition_variable _get_cond;
     // the set of load ids of all blocks in this queue
     std::set<UniqueId> _load_ids;
-    std::list<std::shared_ptr<vectorized::FutureBlock>> _block_queue;
+    std::list<std::shared_ptr<vectorized::Block>> _block_queue;
 
     Status _status = Status::OK();
     // memory consumption of all tables' load block queues, used for back 
pressure.
@@ -81,6 +85,7 @@ private:
     std::shared_ptr<std::atomic_size_t> _single_block_queue_bytes;
     // group commit interval in ms, can be changed by 'ALTER TABLE my_table 
SET ("group_commit_interval_ms"="1000");'
     int64_t _group_commit_interval_ms;
+    std::shared_ptr<vectorized::VWalWriter> _v_wal_writer;
 };
 
 class GroupCommitTable {
@@ -92,14 +97,17 @@ public:
               _db_id(db_id),
               _table_id(table_id),
               _all_block_queues_bytes(all_block_queue_bytes) {};
-    Status get_first_block_load_queue(int64_t table_id,
-                                      std::shared_ptr<vectorized::FutureBlock> 
block,
-                                      std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
+    Status get_first_block_load_queue(int64_t table_id, int64_t 
base_schema_version,
+                                      const UniqueId& load_id,
+                                      std::shared_ptr<vectorized::Block> block,
+                                      std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
+                                      int be_exe_version);
     Status get_load_block_queue(const TUniqueId& instance_id,
                                 std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
 
 private:
-    Status _create_group_commit_load(std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
+    Status _create_group_commit_load(std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
+                                     int be_exe_version);
     Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const 
std::string& label,
                                int64_t txn_id, bool is_pipeline,
                                const TExecPlanFragmentParams& params,
@@ -131,9 +139,11 @@ public:
     // used when init group_commit_scan_node
     Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
                                 std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
-    Status get_first_block_load_queue(int64_t db_id, int64_t table_id,
-                                      std::shared_ptr<vectorized::FutureBlock> 
block,
-                                      std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
+    Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t 
base_schema_version,
+                                      const UniqueId& load_id,
+                                      std::shared_ptr<vectorized::Block> block,
+                                      std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
+                                      int be_exe_version);
 
 private:
     ExecEnv* _exec_env = nullptr;
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 437fe34fe74..870ef3c570c 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -60,7 +60,6 @@
 #include "util/time.h"
 #include "util/uid_util.h"
 #include "vec/core/block.h"
-#include "vec/core/future_block.h"
 #include "vec/exec/scan/new_es_scan_node.h"
 #include "vec/exec/scan/new_file_scan_node.h"
 #include "vec/exec/scan/new_jdbc_scan_node.h"
@@ -118,7 +117,6 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request) {
     }
 
     const TPlanFragmentExecParams& params = request.params;
-    _group_commit = params.group_commit;
     LOG_INFO("PlanFragmentExecutor::prepare")
             .tag("query_id", print_id(_query_ctx->query_id()))
             .tag("instance_id", print_id(params.fragment_instance_id))
@@ -320,30 +318,15 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
         }
         RETURN_IF_ERROR(_sink->open(runtime_state()));
         _opened = true;
-        std::unique_ptr<doris::vectorized::Block> block =
-                _group_commit ? doris::vectorized::FutureBlock::create_unique()
-                              : doris::vectorized::Block::create_unique();
+        std::unique_ptr<doris::vectorized::Block> block = 
doris::vectorized::Block::create_unique();
         bool eos = false;
 
         auto st = Status::OK();
-        auto handle_group_commit = [&]() {
-            if (UNLIKELY(_group_commit && !st.ok() && block != nullptr)) {
-                auto* future_block = 
dynamic_cast<vectorized::FutureBlock*>(block.get());
-                std::unique_lock<std::mutex> l(*(future_block->lock));
-                if (!future_block->is_handled()) {
-                    future_block->set_result(st, 0, 0);
-                    future_block->cv->notify_all();
-                }
-            }
-        };
 
         while (!eos) {
             RETURN_IF_CANCELLED(_runtime_state);
             st = get_vectorized_internal(block.get(), &eos);
-            if (UNLIKELY(!st.ok())) {
-                handle_group_commit();
-                return st;
-            }
+            RETURN_IF_ERROR(st);
 
             // Collect this plan and sub plan statistics, and send to parent 
plan.
             if (_collect_query_statistics_with_every_batch) {
@@ -352,7 +335,6 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
 
             if (!eos || block->rows() > 0) {
                 st = _sink->send(runtime_state(), block.get());
-                handle_group_commit();
                 if (st.is<END_OF_FILE>()) {
                     break;
                 }
diff --git a/be/src/runtime/plan_fragment_executor.h 
b/be/src/runtime/plan_fragment_executor.h
index 29309ccf501..6d374c78f9f 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -241,8 +241,6 @@ private:
     PPlanFragmentCancelReason _cancel_reason;
     std::string _cancel_msg;
 
-    bool _group_commit = false;
-
     DescriptorTbl* _desc_tbl = nullptr;
 
     ObjectPool* obj_pool() { return _runtime_state->obj_pool(); }
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index ec2cf249b2d..8433ebf074c 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -89,7 +89,7 @@ public:
     Block(const std::vector<SlotDescriptor*>& slots, size_t block_size,
           bool ignore_trivial_slot = false);
 
-    virtual ~Block() = default;
+    ~Block() = default;
     Block(const Block& block) = default;
     Block& operator=(const Block& p) = default;
     Block(Block&& block) = default;
diff --git a/be/src/vec/core/future_block.cpp b/be/src/vec/core/future_block.cpp
deleted file mode 100644
index 19cb09163a4..00000000000
--- a/be/src/vec/core/future_block.cpp
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/core/future_block.h"
-
-#include <tuple>
-
-namespace doris::vectorized {
-
-void FutureBlock::set_info(int64_t schema_version, const TUniqueId& load_id) {
-    this->_schema_version = schema_version;
-    this->_load_id = load_id;
-}
-
-void FutureBlock::set_result(Status status, int64_t total_rows, int64_t 
loaded_rows) {
-    auto result = std::make_tuple(true, status, total_rows, loaded_rows);
-    result.swap(*_result);
-}
-
-void FutureBlock::swap_future_block(std::shared_ptr<FutureBlock> other) {
-    Block::swap(*other.get());
-    set_info(other->_schema_version, other->_load_id);
-    lock = other->lock;
-    cv = other->cv;
-    _result = other->_result;
-}
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/core/future_block.h b/be/src/vec/core/future_block.h
deleted file mode 100644
index 3eb90b2d6fd..00000000000
--- a/be/src/vec/core/future_block.h
+++ /dev/null
@@ -1,57 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <condition_variable>
-#include <mutex>
-
-#include "block.h"
-
-namespace doris {
-
-namespace vectorized {
-
-class FutureBlock : public Block {
-    ENABLE_FACTORY_CREATOR(FutureBlock);
-
-public:
-    FutureBlock() : Block() {};
-    void swap_future_block(std::shared_ptr<FutureBlock> other);
-    void set_info(int64_t block_schema_version, const TUniqueId& load_id);
-    int64_t get_schema_version() { return _schema_version; }
-    TUniqueId get_load_id() { return _load_id; }
-
-    // hold lock before call this function
-    void set_result(Status status, int64_t total_rows = 0, int64_t loaded_rows 
= 0);
-    bool is_handled() { return std::get<0>(*(_result)); }
-    Status get_status() { return std::get<1>(*(_result)); }
-    int64_t get_total_rows() { return std::get<2>(*(_result)); }
-    int64_t get_loaded_rows() { return std::get<3>(*(_result)); }
-
-    std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
-    std::shared_ptr<std::condition_variable> cv = 
std::make_shared<std::condition_variable>();
-
-private:
-    int64_t _schema_version;
-    TUniqueId _load_id;
-
-    std::shared_ptr<std::tuple<bool, Status, int64_t, int64_t>> _result =
-            std::make_shared<std::tuple<bool, Status, int64_t, 
int64_t>>(false, Status::OK(), 0, 0);
-};
-} // namespace vectorized
-} // namespace doris
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp 
b/be/src/vec/sink/group_commit_block_sink.cpp
index 665e31ddb31..01042350249 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -89,17 +89,8 @@ Status GroupCommitBlockSink::close(RuntimeState* state, 
Status close_status) {
     RETURN_IF_ERROR(DataSink::close(state, close_status));
     RETURN_IF_ERROR(close_status);
     // wait to wal
-    int64_t total_rows = 0;
-    int64_t loaded_rows = 0;
-    for (const auto& future_block : _future_blocks) {
-        std::unique_lock<std::mutex> l(*(future_block->lock));
-        if (!future_block->is_handled()) {
-            future_block->cv->wait(l);
-        }
-        RETURN_IF_ERROR(future_block->get_status());
-        loaded_rows += future_block->get_loaded_rows();
-        total_rows += future_block->get_total_rows();
-    }
+    int64_t total_rows = state->num_rows_load_total();
+    int64_t loaded_rows = state->num_rows_load_total();
     state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() 
+ total_rows -
                                          loaded_rows);
     state->set_num_rows_load_total(loaded_rows + 
state->num_rows_load_unselected() +
@@ -131,6 +122,17 @@ Status GroupCommitBlockSink::send(RuntimeState* state, 
vectorized::Block* input_
     bool has_filtered_rows = false;
     RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
             state, input_block, block, _output_vexpr_ctxs, rows, 
has_filtered_rows));
+    if (_block_convertor->num_filtered_rows() > 0) {
+        auto cloneBlock = block->clone_without_columns();
+        auto res_block = 
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
+        for (int i = 0; i < rows; ++i) {
+            if (_block_convertor->filter_map()[i]) {
+                continue;
+            }
+            res_block.add_row(block.get(), i);
+        }
+        block->swap(res_block.to_block());
+    }
     // add block into block queue
     return _add_block(state, block);
 }
@@ -148,32 +150,31 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* 
state,
         block->get_by_position(i).type = 
make_nullable(block->get_by_position(i).type);
     }
     // add block to queue
-    auto _cur_mutable_block = 
vectorized::MutableBlock::create_unique(block->clone_empty());
+    auto cur_mutable_block = 
vectorized::MutableBlock::create_unique(block->clone_empty());
     {
         vectorized::IColumn::Selector selector;
         for (auto i = 0; i < block->rows(); i++) {
             selector.emplace_back(i);
         }
-        block->append_to_block_by_selector(_cur_mutable_block.get(), selector);
+        block->append_to_block_by_selector(cur_mutable_block.get(), selector);
     }
-    std::shared_ptr<vectorized::Block> output_block =
-            
std::make_shared<vectorized::Block>(_cur_mutable_block->to_block());
-
-    std::shared_ptr<doris::vectorized::FutureBlock> future_block =
-            std::make_shared<doris::vectorized::FutureBlock>();
-    future_block->swap(*(output_block.get()));
+    std::shared_ptr<vectorized::Block> output_block = 
vectorized::Block::create_shared();
+    output_block->swap(cur_mutable_block->to_block());
     TUniqueId load_id;
     load_id.__set_hi(_load_id.hi);
     load_id.__set_lo(_load_id.lo);
-    future_block->set_info(_base_schema_version, load_id);
     if (_load_block_queue == nullptr) {
-        
RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
-                _db_id, _table_id, future_block, _load_block_queue));
-        state->set_import_label(_load_block_queue->label);
-        state->set_wal_id(_load_block_queue->txn_id);
+        if (state->exec_env()->wal_mgr()->is_running()) {
+            
RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
+                    _db_id, _table_id, _base_schema_version, load_id, block, 
_load_block_queue,
+                    state->be_exec_version()));
+            state->set_import_label(_load_block_queue->label);
+            state->set_wal_id(_load_block_queue->txn_id);
+        } else {
+            return Status::InternalError("be is stopping");
+        }
     }
-    RETURN_IF_ERROR(_load_block_queue->add_block(future_block));
-    _future_blocks.emplace_back(future_block);
+    RETURN_IF_ERROR(_load_block_queue->add_block(output_block));
     return Status::OK();
 }
 
diff --git a/be/src/vec/sink/group_commit_block_sink.h 
b/be/src/vec/sink/group_commit_block_sink.h
index ff798ffb000..02737a6c8ec 100644
--- a/be/src/vec/sink/group_commit_block_sink.h
+++ b/be/src/vec/sink/group_commit_block_sink.h
@@ -28,8 +28,6 @@ class LoadBlockQueue;
 
 namespace vectorized {
 
-class FutureBlock;
-
 class GroupCommitBlockSink : public DataSink {
 public:
     GroupCommitBlockSink(ObjectPool* pool, const RowDescriptor& row_desc,
@@ -66,7 +64,6 @@ private:
     int64_t _base_schema_version = 0;
     UniqueId _load_id;
     std::shared_ptr<LoadBlockQueue> _load_block_queue;
-    std::vector<std::shared_ptr<vectorized::FutureBlock>> _future_blocks;
 };
 
 } // namespace vectorized
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index ae9c4a38e3e..f47a89978e4 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -107,7 +107,6 @@
 #include "vec/common/string_ref.h"
 #include "vec/core/block.h"
 #include "vec/core/column_with_type_and_name.h"
-#include "vec/core/future_block.h"
 #include "vec/core/types.h"
 #include "vec/data_types/data_type_decimal.h"
 #include "vec/data_types/data_type_nullable.h"
@@ -122,14 +121,12 @@ class TExpr;
 namespace vectorized {
 
 VOlapTableSink::VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
-                               const std::vector<TExpr>& texprs, bool 
group_commit)
-        : AsyncWriterSink<VTabletWriter, VOLAP_TABLE_SINK>(row_desc, texprs),
-          _pool(pool),
-          _group_commit(group_commit) {}
+                               const std::vector<TExpr>& texprs)
+        : AsyncWriterSink<VTabletWriter, VOLAP_TABLE_SINK>(row_desc, texprs), 
_pool(pool) {}
 
 Status VOlapTableSink::init(const TDataSink& t_sink) {
     RETURN_IF_ERROR(AsyncWriterSink::init(t_sink));
-    RETURN_IF_ERROR(_writer->init_properties(_pool, _group_commit));
+    RETURN_IF_ERROR(_writer->init_properties(_pool));
     return Status::OK();
 }
 
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index dc406ac1576..68315eb5a9b 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -86,7 +86,7 @@ class VOlapTableSink final : public 
AsyncWriterSink<VTabletWriter, VOLAP_TABLE_S
 public:
     // Construct from thrift struct which is generated by FE.
     VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
-                   const std::vector<TExpr>& texprs, bool group_commit);
+                   const std::vector<TExpr>& texprs);
     // the real writer will construct in (actually, father's) init but not 
constructor
     Status init(const TDataSink& sink) override;
 
@@ -95,8 +95,6 @@ public:
 private:
     ObjectPool* _pool = nullptr;
 
-    bool _group_commit = false;
-
     Status _close_status = Status::OK();
 };
 
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp 
b/be/src/vec/sink/vtablet_sink_v2.cpp
index bbba4150298..e75bebba89f 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -40,16 +40,14 @@ class TExpr;
 namespace vectorized {
 
 VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& 
row_desc,
-                                   const std::vector<TExpr>& texprs, bool 
group_commit)
-        : AsyncWriterSink<VTabletWriterV2, VOLAP_TABLE_SINK_V2>(row_desc, 
texprs),
-          _pool(pool),
-          _group_commit(group_commit) {}
+                                   const std::vector<TExpr>& texprs)
+        : AsyncWriterSink<VTabletWriterV2, VOLAP_TABLE_SINK_V2>(row_desc, 
texprs), _pool(pool) {}
 
 VOlapTableSinkV2::~VOlapTableSinkV2() = default;
 
 Status VOlapTableSinkV2::init(const TDataSink& t_sink) {
     RETURN_IF_ERROR(AsyncWriterSink::init(t_sink));
-    RETURN_IF_ERROR(_writer->init_properties(_pool, _group_commit));
+    RETURN_IF_ERROR(_writer->init_properties(_pool));
     return Status::OK();
 }
 
diff --git a/be/src/vec/sink/vtablet_sink_v2.h 
b/be/src/vec/sink/vtablet_sink_v2.h
index c7811a01713..8257d83bfc1 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -52,7 +52,7 @@ class VOlapTableSinkV2 final : public 
AsyncWriterSink<VTabletWriterV2, VOLAP_TAB
 public:
     // Construct from thrift struct which is generated by FE.
     VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc,
-                     const std::vector<TExpr>& texprs, bool group_commit);
+                     const std::vector<TExpr>& texprs);
 
     ~VOlapTableSinkV2() override;
 
@@ -63,8 +63,6 @@ public:
 private:
     ObjectPool* _pool = nullptr;
 
-    bool _group_commit = false;
-
     Status _close_status = Status::OK();
 };
 
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index bd110738a9f..67f4ed378de 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -93,7 +93,6 @@
 #include "vec/columns/columns_number.h"
 #include "vec/common/assert_cast.h"
 #include "vec/core/block.h"
-#include "vec/core/future_block.h"
 #include "vec/core/types.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/exprs/vexpr.h"
@@ -944,9 +943,8 @@ VTabletWriter::VTabletWriter(const TDataSink& t_sink, const 
VExprContextSPtrs& o
     _transfer_large_data_by_brpc = config::transfer_large_data_by_brpc;
 }
 
-Status VTabletWriter::init_properties(doris::ObjectPool* pool, bool 
group_commit) {
+Status VTabletWriter::init_properties(doris::ObjectPool* pool) {
     _pool = pool;
-    _group_commit = group_commit;
     return Status::OK();
 }
 
@@ -1237,12 +1235,6 @@ Status VTabletWriter::_init(RuntimeState* state, 
RuntimeProfile* profile) {
         RETURN_IF_ERROR(_channels.back()->init(state, tablets));
     }
 
-    if (_group_commit) {
-        _v_wal_writer = std::make_shared<VWalWriter>(table_sink.db_id, 
table_sink.table_id,
-                                                     table_sink.txn_id, 
_state, _output_tuple_desc);
-        RETURN_IF_ERROR(_v_wal_writer->init());
-    }
-
     RETURN_IF_ERROR(_init_row_distribution());
 
     _inited = true;
@@ -1567,10 +1559,6 @@ Status VTabletWriter::close(Status exec_status) {
         index_channel->for_each_node_channel(
                 [](const std::shared_ptr<VNodeChannel>& ch) { 
ch->clear_all_blocks(); });
     }
-
-    if (_v_wal_writer != nullptr) {
-        RETURN_IF_ERROR(_v_wal_writer->close());
-    }
     return _close_status;
 }
 
@@ -1673,12 +1661,6 @@ Status 
VTabletWriter::append_block(doris::vectorized::Block& input_block) {
         }
     }
 
-    if (_v_wal_writer != nullptr) {
-        RETURN_IF_ERROR(_v_wal_writer->append_block(&input_block, 
block->rows(), filtered_rows,
-                                                    block.get(), 
_block_convertor.get(),
-                                                    _tablet_finder.get()));
-    }
-
     // Add block to node channel
     for (size_t i = 0; i < _channels.size(); i++) {
         for (const auto& entry : channel_to_payload[i]) {
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index 6c1f0757fc2..05a9c455ca2 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -30,8 +30,6 @@
 #include <glog/logging.h>
 #include <google/protobuf/stubs/callback.h>
 
-#include "olap/wal_writer.h"
-#include "vwal_writer.h"
 // IWYU pragma: no_include <bits/chrono.h>
 #include <atomic>
 #include <chrono> // IWYU pragma: keep
@@ -58,7 +56,6 @@
 #include "exec/data_sink.h"
 #include "exec/tablet_info.h"
 #include "gutil/ref_counted.h"
-#include "olap/wal_writer.h"
 #include "runtime/decimalv2_value.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker.h"
@@ -519,7 +516,7 @@ class VTabletWriter final : public AsyncResultWriter {
 public:
     VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
 
-    Status init_properties(ObjectPool* pool, bool group_commit);
+    Status init_properties(ObjectPool* pool);
 
     Status append_block(Block& block) override;
 
@@ -660,11 +657,9 @@ private:
 
     RuntimeState* _state = nullptr;     // not owned, set when open
     RuntimeProfile* _profile = nullptr; // not owned, set when open
-    bool _group_commit = false;
 
     VRowDistribution _row_distribution;
     // reuse to avoid frequent memory allocation and release.
     std::vector<RowPartTabletIds> _row_part_tablet_ids;
-    std::shared_ptr<VWalWriter> _v_wal_writer;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 072b5d19912..070787a9dad 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -136,9 +136,8 @@ Status VTabletWriterV2::_init_row_distribution() {
     return _row_distribution.open(_output_row_desc);
 }
 
-Status VTabletWriterV2::init_properties(ObjectPool* pool, bool group_commit) {
+Status VTabletWriterV2::init_properties(ObjectPool* pool) {
     _pool = pool;
-    _group_commit = group_commit;
     return Status::OK();
 }
 
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index e2b069db3b6..916bad430a5 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -109,7 +109,7 @@ public:
 
     ~VTabletWriterV2() override;
 
-    Status init_properties(ObjectPool* pool, bool group_commit);
+    Status init_properties(ObjectPool* pool);
 
     Status append_block(Block& block) override;
 
@@ -213,7 +213,6 @@ private:
 
     RuntimeState* _state = nullptr;     // not owned, set when open
     RuntimeProfile* _profile = nullptr; // not owned, set when open
-    bool _group_commit = false;
 
     std::unordered_set<int64_t> _opened_partitions;
 
diff --git a/be/src/vec/sink/writer/vwal_writer.cpp 
b/be/src/vec/sink/writer/vwal_writer.cpp
index df584742ce1..d929207e9a9 100644
--- a/be/src/vec/sink/writer/vwal_writer.cpp
+++ b/be/src/vec/sink/writer/vwal_writer.cpp
@@ -27,7 +27,6 @@
 
 #include "common/compiler_util.h"
 #include "common/status.h"
-#include "olap/wal_manager.h"
 #include "runtime/client_cache.h"
 #include "runtime/descriptors.h"
 #include "runtime/runtime_state.h"
@@ -37,79 +36,50 @@
 #include "util/thrift_util.h"
 #include "vec/common/assert_cast.h"
 #include "vec/core/block.h"
-#include "vec/core/future_block.h"
 #include "vec/sink/vtablet_block_convertor.h"
 #include "vec/sink/vtablet_finder.h"
 
 namespace doris {
 namespace vectorized {
 
-VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, 
RuntimeState* state,
-                       TupleDescriptor* output_tuple_desc)
+VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id,
+                       const std::string& import_label, WalManager* 
wal_manager,
+                       std::vector<TSlotDescriptor>& slot_desc, int 
be_exe_version)
         : _db_id(db_id),
           _tb_id(tb_id),
           _wal_id(wal_id),
-          _state(state),
-          _output_tuple_desc(output_tuple_desc) {}
+          _label(import_label),
+          _wal_manager(wal_manager),
+          _slot_descs(slot_desc),
+          _be_exe_version(be_exe_version) {}
 
 VWalWriter::~VWalWriter() {}
 
 Status VWalWriter::init() {
-    RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->add_wal_path(_db_id, 
_tb_id, _wal_id,
-                                                                
_state->import_label()));
-    RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id, 
_wal_writer));
-    _state->exec_env()->wal_mgr()->add_wal_status_queue(_tb_id, _wal_id,
-                                                        
WalManager::WAL_STATUS::CREATE);
+    RETURN_IF_ERROR(_wal_manager->add_wal_path(_db_id, _tb_id, _wal_id, 
_label));
+    RETURN_IF_ERROR(_wal_manager->create_wal_writer(_wal_id, _wal_writer));
+    _wal_manager->add_wal_status_queue(_tb_id, _wal_id, 
WalManager::WAL_STATUS::CREATE);
     std::stringstream ss;
-    for (auto slot_desc : _output_tuple_desc->slots()) {
-        ss << std::to_string(slot_desc->col_unique_id()) << ",";
+    for (auto slot_desc : _slot_descs) {
+        if (slot_desc.col_unique_id < 0) {
+            continue;
+        }
+        ss << std::to_string(slot_desc.col_unique_id) << ",";
     }
     std::string col_ids = ss.str().substr(0, ss.str().size() - 1);
     RETURN_IF_ERROR(_wal_writer->append_header(_version, col_ids));
     return Status::OK();
 }
-Status VWalWriter::write_wal(OlapTableBlockConvertor* block_convertor,
-                             OlapTabletFinder* tablet_finder, 
vectorized::Block* block,
-                             RuntimeState* state, int64_t num_rows, int64_t 
filtered_rows) {
+
+Status VWalWriter::write_wal(vectorized::Block* block) {
     PBlock pblock;
     size_t uncompressed_bytes = 0, compressed_bytes = 0;
-    if (filtered_rows == 0) {
-        RETURN_IF_ERROR(block->serialize(state->be_exec_version(), &pblock, 
&uncompressed_bytes,
-                                         &compressed_bytes, 
segment_v2::CompressionTypePB::SNAPPY));
-        RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector<PBlock*> 
{&pblock}));
-    } else {
-        auto cloneBlock = block->clone_without_columns();
-        auto res_block = 
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
-        for (int i = 0; i < num_rows; ++i) {
-            if (block_convertor->num_filtered_rows() > 0 && 
block_convertor->filter_map()[i]) {
-                continue;
-            }
-            if (tablet_finder->num_filtered_rows() > 0 && 
tablet_finder->filter_bitmap().Get(i)) {
-                continue;
-            }
-            res_block.add_row(block, i);
-        }
-        
RETURN_IF_ERROR(res_block.to_block().serialize(state->be_exec_version(), 
&pblock,
-                                                       &uncompressed_bytes, 
&compressed_bytes,
-                                                       
segment_v2::CompressionTypePB::SNAPPY));
-        RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector<PBlock*> 
{&pblock}));
-    }
-    return Status::OK();
-}
-Status VWalWriter::append_block(vectorized::Block* input_block, int64_t 
num_rows,
-                                int64_t filter_rows, vectorized::Block* block,
-                                OlapTableBlockConvertor* block_convertor,
-                                OlapTabletFinder* tablet_finder) {
-    RETURN_IF_ERROR(
-            write_wal(block_convertor, tablet_finder, block, _state, num_rows, 
filter_rows));
-#ifndef BE_TEST
-    auto* future_block = assert_cast<FutureBlock*>(input_block);
-    std::unique_lock<std::mutex> l(*(future_block->lock));
-    future_block->set_result(Status::OK(), num_rows, num_rows - filter_rows);
-    future_block->cv->notify_all();
-#endif
+    RETURN_IF_ERROR(block->serialize(_be_exe_version, &pblock, 
&uncompressed_bytes,
+                                     &compressed_bytes, 
segment_v2::CompressionTypePB::SNAPPY));
+    RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector<PBlock*> 
{&pblock}));
     return Status::OK();
 }
+
 Status VWalWriter::close() {
     if (_wal_writer != nullptr) {
         RETURN_IF_ERROR(_wal_writer->finalize());
diff --git a/be/src/vec/sink/writer/vwal_writer.h 
b/be/src/vec/sink/writer/vwal_writer.h
index d33f3f015a7..17c9dc979a1 100644
--- a/be/src/vec/sink/writer/vwal_writer.h
+++ b/be/src/vec/sink/writer/vwal_writer.h
@@ -56,6 +56,7 @@
 #include "exec/data_sink.h"
 #include "exec/tablet_info.h"
 #include "gutil/ref_counted.h"
+#include "olap/wal_manager.h"
 #include "olap/wal_writer.h"
 #include "runtime/decimalv2_value.h"
 #include "runtime/exec_env.h"
@@ -82,16 +83,12 @@ namespace vectorized {
 
 class VWalWriter {
 public:
-    VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, RuntimeState* 
state,
-               TupleDescriptor* output_tuple_desc);
+    VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, const 
std::string& import_label,
+               WalManager* wal_manager, std::vector<TSlotDescriptor>& 
slot_desc,
+               int be_exe_version);
     ~VWalWriter();
     Status init();
-    Status write_wal(OlapTableBlockConvertor* block_convertor, 
OlapTabletFinder* tablet_finder,
-                     vectorized::Block* block, RuntimeState* state, int64_t 
num_rows,
-                     int64_t filtered_rows);
-    Status append_block(vectorized::Block* input_block, int64_t num_rows, 
int64_t filter_rows,
-                        vectorized::Block* block, OlapTableBlockConvertor* 
block_convertor,
-                        OlapTabletFinder* tablet_finder);
+    Status write_wal(vectorized::Block* block);
     Status close();
 
 private:
@@ -100,8 +97,9 @@ private:
     int64_t _wal_id;
     uint32_t _version = 0;
     std::string _label;
-    RuntimeState* _state = nullptr;
-    TupleDescriptor* _output_tuple_desc = nullptr;
+    WalManager* _wal_manager;
+    std::vector<TSlotDescriptor>& _slot_descs;
+    int _be_exe_version = 0;
     std::shared_ptr<WalWriter> _wal_writer;
 };
 } // namespace vectorized
diff --git a/be/test/vec/exec/vtablet_sink_test.cpp 
b/be/test/vec/exec/vtablet_sink_test.cpp
index c310c8a41f2..890333465e8 100644
--- a/be/test/vec/exec/vtablet_sink_test.cpp
+++ b/be/test/vec/exec/vtablet_sink_test.cpp
@@ -426,7 +426,7 @@ public:
         service->_output_set = &output_set;
 
         std::vector<TExpr> exprs;
-        VOlapTableSink sink(&obj_pool, row_desc, exprs, false);
+        VOlapTableSink sink(&obj_pool, row_desc, exprs);
         ASSERT_TRUE(st.ok());
 
         // init
@@ -567,7 +567,7 @@ TEST_F(VOlapTableSinkTest, convert) {
     exprs[2].nodes[0].slot_ref.slot_id = 2;
     exprs[2].nodes[0].slot_ref.tuple_id = 1;
 
-    VOlapTableSink sink(&obj_pool, row_desc, exprs, false);
+    VOlapTableSink sink(&obj_pool, row_desc, exprs);
     ASSERT_TRUE(st.ok());
 
     // set output tuple_id
@@ -694,7 +694,7 @@ TEST_F(VOlapTableSinkTest, add_block_failed) {
     exprs[2].nodes[0].slot_ref.slot_id = 2;
     exprs[2].nodes[0].slot_ref.tuple_id = 1;
 
-    VOlapTableSink sink(&obj_pool, row_desc, exprs, false);
+    VOlapTableSink sink(&obj_pool, row_desc, exprs);
     ASSERT_TRUE(st.ok());
 
     // set output tuple_id
@@ -789,7 +789,7 @@ TEST_F(VOlapTableSinkTest, decimal) {
     service->_output_set = &output_set;
 
     std::vector<TExpr> exprs;
-    VOlapTableSink sink(&obj_pool, row_desc, exprs, false);
+    VOlapTableSink sink(&obj_pool, row_desc, exprs);
     ASSERT_TRUE(st.ok());
 
     // init
@@ -846,266 +846,5 @@ TEST_F(VOlapTableSinkTest, decimal) {
     ASSERT_TRUE(output_set.count("(13, 123.120000000)") > 0);
 }
 
-TEST_F(VOlapTableSinkTest, group_commit) {
-    // start brpc service first
-    _server = new brpc::Server();
-    auto service = new VTestInternalService();
-    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
-    brpc::ServerOptions options;
-    {
-        debug::ScopedLeakCheckDisabler disable_lsan;
-        _server->Start(4356, &options);
-    }
-
-    TUniqueId fragment_id;
-    TQueryOptions query_options;
-    query_options.batch_size = 1;
-    query_options.be_exec_version = 0;
-    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
-    state.init_mem_trackers(TUniqueId());
-
-    ObjectPool obj_pool;
-    TDescriptorTable tdesc_tbl;
-    auto t_data_sink = get_data_sink(&tdesc_tbl);
-
-    // crate desc_tabl
-    DescriptorTbl* desc_tbl = nullptr;
-    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
-    ASSERT_TRUE(st.ok());
-    state._desc_tbl = desc_tbl;
-    state._wal_id = 789;
-    state._import_label = "test";
-
-    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
-    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
-
-    RowDescriptor row_desc(*desc_tbl, {0}, {false});
-    service->_row_desc = &row_desc;
-    std::set<std::string> output_set;
-    service->_output_set = &output_set;
-
-    std::vector<TExpr> exprs;
-    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
-
-    // init
-    st = sink.init(t_data_sink);
-    ASSERT_TRUE(st.ok());
-    // prepare
-    st = sink.prepare(&state);
-    ASSERT_TRUE(st.ok());
-    // open
-    st = sink.open(&state);
-    ASSERT_TRUE(st.ok());
-
-    int slot_count = tuple_desc->slots().size();
-    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
-    for (int i = 0; i < slot_count; i++) {
-        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
-    }
-
-    int col_idx = 0;
-    auto* column_ptr = columns[col_idx++].get();
-    auto column_vector_int = column_ptr;
-    int int_val = 12;
-    column_vector_int->insert_data((const char*)&int_val, 0);
-    int_val = 13;
-    column_vector_int->insert_data((const char*)&int_val, 0);
-    int_val = 14;
-    column_vector_int->insert_data((const char*)&int_val, 0);
-
-    column_ptr = columns[col_idx++].get();
-    auto column_vector_bigint = column_ptr;
-    int64_t int64_val = 9;
-    column_vector_bigint->insert_data((const char*)&int64_val, 0);
-    int64_val = 25;
-    column_vector_bigint->insert_data((const char*)&int64_val, 0);
-    int64_val = 50;
-    column_vector_bigint->insert_data((const char*)&int64_val, 0);
-
-    column_ptr = columns[col_idx++].get();
-    auto column_vector_str = column_ptr;
-    column_vector_str->insert_data("abc", 3);
-    column_vector_str->insert_data("abcd", 4);
-    column_vector_str->insert_data("1234567890", 10);
-
-    vectorized::Block block;
-    col_idx = 0;
-    for (const auto slot_desc : tuple_desc->slots()) {
-        
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
-                                                       
slot_desc->get_data_type_ptr(),
-                                                       slot_desc->col_name()));
-    }
-    vectorized::Block org_block(block);
-
-    // send
-    st = sink.send(&state, &block);
-    ASSERT_TRUE(st.ok());
-    // close
-    st = sink.close(&state, Status::OK());
-    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close 
failed. ")
-            << st.to_string();
-
-    // each node has a eof
-    ASSERT_EQ(2, service->_eof_counters);
-    ASSERT_EQ(2 * 3, service->_row_counters);
-
-    // 2node * 2
-    ASSERT_EQ(0, state.num_rows_load_filtered());
-
-    std::string wal_path = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +
-                           
std::to_string(t_data_sink.olap_table_sink.table_id) + "/" +
-                           std::to_string(t_data_sink.olap_table_sink.txn_id) 
+ "_" +
-                           state.import_label();
-    doris::PBlock pblock;
-    auto wal_reader = WalReader(wal_path);
-    st = wal_reader.init();
-    ASSERT_TRUE(st.ok());
-    uint32_t version;
-    std::string col_ids;
-    st = wal_reader.read_header(version, col_ids);
-    ASSERT_TRUE(st.ok());
-    st = wal_reader.read_block(pblock);
-    ASSERT_TRUE(st.ok());
-    vectorized::Block wal_block;
-    ASSERT_TRUE(wal_block.deserialize(pblock).ok());
-    ASSERT_TRUE(st.ok() || st.is<ErrorCode::END_OF_FILE>());
-    ASSERT_EQ(org_block.rows(), wal_block.rows());
-    for (int i = 0; i < org_block.rows(); i++) {
-        std::string srcRow = org_block.dump_one_line(i, org_block.columns());
-        std::string walRow = wal_block.dump_one_line(i, org_block.columns());
-        ASSERT_TRUE(std::strcmp(srcRow.c_str(), walRow.c_str()) == 0);
-    }
-}
-
-TEST_F(VOlapTableSinkTest, group_commit_with_filter_row) {
-    // start brpc service first
-    _server = new brpc::Server();
-    auto service = new VTestInternalService();
-    ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0);
-    brpc::ServerOptions options;
-    {
-        debug::ScopedLeakCheckDisabler disable_lsan;
-        _server->Start(4356, &options);
-    }
-
-    TUniqueId fragment_id;
-    TQueryOptions query_options;
-    query_options.batch_size = 1;
-    query_options.be_exec_version = 0;
-    RuntimeState state(fragment_id, query_options, TQueryGlobals(), _env);
-    state.init_mem_trackers(TUniqueId());
-
-    ObjectPool obj_pool;
-    TDescriptorTable tdesc_tbl;
-    auto t_data_sink = get_data_sink(&tdesc_tbl);
-
-    // crate desc_tabl
-    DescriptorTbl* desc_tbl = nullptr;
-    auto st = DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
-    ASSERT_TRUE(st.ok());
-    state._desc_tbl = desc_tbl;
-    state._wal_id = 789;
-    state._import_label = "test";
-
-    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
-    LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string();
-
-    RowDescriptor row_desc(*desc_tbl, {0}, {false});
-    service->_row_desc = &row_desc;
-    std::set<std::string> output_set;
-    service->_output_set = &output_set;
-
-    std::vector<TExpr> exprs;
-    VOlapTableSink sink(&obj_pool, row_desc, exprs, true);
-
-    // init
-    st = sink.init(t_data_sink);
-    ASSERT_TRUE(st.ok());
-    // prepare
-    st = sink.prepare(&state);
-    ASSERT_TRUE(st.ok());
-    // open
-    st = sink.open(&state);
-    ASSERT_TRUE(st.ok());
-
-    int slot_count = tuple_desc->slots().size();
-    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
-    for (int i = 0; i < slot_count; i++) {
-        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
-    }
-
-    int col_idx = 0;
-    auto* column_ptr = columns[col_idx++].get();
-    auto column_vector_int = column_ptr;
-    int int_val = 12;
-    column_vector_int->insert_data((const char*)&int_val, 0);
-    int_val = 13;
-    column_vector_int->insert_data((const char*)&int_val, 0);
-    int_val = 14;
-    column_vector_int->insert_data((const char*)&int_val, 0);
-
-    column_ptr = columns[col_idx++].get();
-    auto column_vector_bigint = column_ptr;
-    int64_t int64_val = 9;
-    column_vector_bigint->insert_data((const char*)&int64_val, 0);
-    int64_val = 25;
-    column_vector_bigint->insert_data((const char*)&int64_val, 0);
-    int64_val = 50;
-    column_vector_bigint->insert_data((const char*)&int64_val, 0);
-
-    column_ptr = columns[col_idx++].get();
-    auto column_vector_str = column_ptr;
-    column_vector_str->insert_data("abc", 3);
-    column_vector_str->insert_data("abcd", 4);
-    column_vector_str->insert_data("abcde1234567890", 15);
-
-    vectorized::Block block;
-    col_idx = 0;
-    for (const auto slot_desc : tuple_desc->slots()) {
-        
block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[col_idx++]),
-                                                       
slot_desc->get_data_type_ptr(),
-                                                       slot_desc->col_name()));
-    }
-    vectorized::Block org_block(block);
-
-    // send
-    st = sink.send(&state, &block);
-    ASSERT_TRUE(st.ok());
-    // close
-    st = sink.close(&state, Status::OK());
-    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: wait close 
failed. ")
-            << st.to_string();
-
-    // each node has a eof
-    ASSERT_EQ(2, service->_eof_counters);
-    ASSERT_EQ(2 * 2, service->_row_counters);
-
-    // 2node * 2
-    ASSERT_EQ(1, state.num_rows_load_filtered());
-
-    std::string wal_path = wal_dir + "/" + 
std::to_string(t_data_sink.olap_table_sink.db_id) + "/" +
-                           
std::to_string(t_data_sink.olap_table_sink.table_id) + "/" +
-                           std::to_string(t_data_sink.olap_table_sink.txn_id) 
+ "_" +
-                           state.import_label();
-    doris::PBlock pblock;
-    auto wal_reader = WalReader(wal_path);
-    st = wal_reader.init();
-    ASSERT_TRUE(st.ok());
-    uint32_t version;
-    std::string col_ids;
-    st = wal_reader.read_header(version, col_ids);
-    ASSERT_TRUE(st.ok());
-    st = wal_reader.read_block(pblock);
-    ASSERT_TRUE(st.ok());
-    vectorized::Block wal_block;
-    ASSERT_TRUE(wal_block.deserialize(pblock).ok());
-    ASSERT_TRUE(st.ok() || st.is<ErrorCode::END_OF_FILE>());
-    ASSERT_EQ(org_block.rows() - 1, wal_block.rows());
-    for (int i = 0; i < wal_block.rows(); i++) {
-        std::string srcRow = org_block.dump_one_line(i, org_block.columns());
-        std::string walRow = wal_block.dump_one_line(i, org_block.columns());
-        ASSERT_TRUE(std::strcmp(srcRow.c_str(), walRow.c_str()) == 0);
-    }
-}
 } // namespace vectorized
 } // namespace doris
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 0c70189bec1..6bd8187e8ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -50,14 +50,12 @@ import org.apache.doris.planner.DataPartition;
 import org.apache.doris.planner.DataSink;
 import org.apache.doris.planner.ExportSink;
 import org.apache.doris.planner.GroupCommitBlockSink;
-import org.apache.doris.planner.GroupCommitOlapTableSink;
 import org.apache.doris.planner.GroupCommitPlanner;
 import org.apache.doris.planner.OlapTableSink;
 import org.apache.doris.planner.external.jdbc.JdbcTableSink;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.rewrite.ExprRewriter;
 import org.apache.doris.service.FrontendOptions;
-import org.apache.doris.tablefunction.GroupCommitTableValuedFunction;
 import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.TransactionState;
@@ -153,8 +151,6 @@ public class NativeInsertStmt extends InsertStmt {
     private TUniqueId loadId = null;
     private ByteString execPlanFragmentParamsBytes = null;
     private long tableId = -1;
-    // true if be generates an insert from group commit tvf stmt and executes 
to load data
-    public boolean isGroupCommitTvf = false;
     public boolean isGroupCommitStreamLoadSql = false;
     private GroupCommitPlanner groupCommitPlanner;
 
@@ -970,12 +966,8 @@ public class NativeInsertStmt extends InsertStmt {
             return dataSink;
         }
         if (targetTable instanceof OlapTable) {
-            checkInnerGroupCommit();
             OlapTableSink sink;
-            if (isGroupCommitTvf) {
-                sink = new GroupCommitOlapTableSink((OlapTable) targetTable, 
olapTuple,
-                        targetPartitionIds, 
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
-            } else if (isGroupCommitStreamLoadSql) {
+            if (isGroupCommitStreamLoadSql) {
                 sink = new GroupCommitBlockSink((OlapTable) targetTable, 
olapTuple,
                         targetPartitionIds, 
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
             } else {
@@ -1019,17 +1011,6 @@ public class NativeInsertStmt extends InsertStmt {
         return dataSink;
     }
 
-    private void checkInnerGroupCommit() {
-        List<TableRef> tableRefs = new ArrayList<>();
-        queryStmt.collectTableRefs(tableRefs);
-        if (tableRefs.size() == 1 && tableRefs.get(0) instanceof 
TableValuedFunctionRef) {
-            TableValuedFunctionRef tvfRef = (TableValuedFunctionRef) 
tableRefs.get(0);
-            if (tvfRef.getTableFunction() instanceof 
GroupCommitTableValuedFunction) {
-                isGroupCommitTvf = true;
-            }
-        }
-    }
-
     public void complete() throws UserException {
         if (!isExplain() && targetTable instanceof OlapTable) {
             ((OlapTableSink) dataSink).complete(analyzer);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitOlapTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitOlapTableSink.java
deleted file mode 100644
index 5f3455b33a8..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitOlapTableSink.java
+++ /dev/null
@@ -1,36 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner;
-
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.thrift.TDataSinkType;
-
-import java.util.List;
-
-public class GroupCommitOlapTableSink extends OlapTableSink {
-
-    public GroupCommitOlapTableSink(OlapTable dstTable, TupleDescriptor 
tupleDescriptor, List<Long> partitionIds,
-            boolean singleReplicaLoad) {
-        super(dstTable, tupleDescriptor, partitionIds, singleReplicaLoad);
-    }
-
-    protected TDataSinkType getDataSinkType() {
-        return TDataSinkType.GROUP_COMMIT_OLAP_TABLE_SINK;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 264bc0cdd4b..09064146d1e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2022,9 +2022,6 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             // The txn_id here is obtained from the NativeInsertStmt
             result.getParams().setTxnConf(new TTxnParams().setTxnId(txn_id));
             result.getParams().setImportLabel(parsedStmt.getLabel());
-            if (parsedStmt.isGroupCommitTvf) {
-                result.getParams().params.setGroupCommit(true);
-            }
             result.setDbId(parsedStmt.getTargetTable().getDatabase().getId());
             result.setTableId(parsedStmt.getTargetTable().getId());
             result.setBaseSchemaVersion(((OlapTable) 
parsedStmt.getTargetTable()).getBaseSchemaVersion());
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 91458072d3f..9e96897f700 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -36,7 +36,7 @@ enum TDataSinkType {
     RESULT_FILE_SINK,
     JDBC_TABLE_SINK,
     MULTI_CAST_DATA_STREAM_SINK,
-    GROUP_COMMIT_OLAP_TABLE_SINK,
+    GROUP_COMMIT_OLAP_TABLE_SINK, // deprecated
     GROUP_COMMIT_BLOCK_SINK,
 }
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index b6b96eee95a..177bec22059 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -335,7 +335,7 @@ struct TPlanFragmentExecParams {
   11: optional bool send_query_statistics_with_every_batch
   // Used to merge and send runtime filter
   12: optional TRuntimeFilterParams runtime_filter_params
-  13: optional bool group_commit
+  13: optional bool group_commit // deprecated
 }
 
 // Global query parameters assigned by the coordinator.
diff --git 
a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out 
b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
index d69d5bb13ed..abe3210dd81 100644
--- a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
+++ b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
@@ -1,6 +1,5 @@
 -- This file is automatically generated. You should know what you did if you 
want to edit this
 -- !sql --
-0      a       11
 1      a       10
 1      a       10
 1      a       10
diff --git 
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
 
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
index 458145aeffb..56b37c248eb 100644
--- 
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
+++ 
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
@@ -59,8 +59,8 @@ suite("test_group_commit_http_stream") {
         assertTrue(json.GroupCommit)
         assertTrue(json.Label.startsWith("group_commit_"))
         assertEquals(total_rows, json.NumberTotalRows)
-        assertEquals(loaded_rows, json.NumberLoadedRows)
-        assertEquals(filtered_rows, json.NumberFilteredRows)
+        //assertEquals(loaded_rows, json.NumberLoadedRows)
+        //assertEquals(filtered_rows, json.NumberFilteredRows)
         assertEquals(unselected_rows, json.NumberUnselectedRows)
         if (filtered_rows > 0) {
             assertFalse(json.ErrorURL.isEmpty())
@@ -246,7 +246,7 @@ suite("test_group_commit_http_stream") {
             }
         }
 
-        getRowCount(23)
+        getRowCount(22)
         qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
     } finally {
         // try_sql("DROP TABLE ${tableName}")
diff --git 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
index b5f46f29225..d478480f2d8 100644
--- 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
@@ -58,8 +58,8 @@ suite("test_group_commit_stream_load") {
         assertTrue(json.GroupCommit)
         assertTrue(json.Label.startsWith("group_commit_"))
         assertEquals(total_rows, json.NumberTotalRows)
-        assertEquals(loaded_rows, json.NumberLoadedRows)
-        assertEquals(filtered_rows, json.NumberFilteredRows)
+        //assertEquals(loaded_rows, json.NumberLoadedRows)
+        //assertEquals(filtered_rows, json.NumberFilteredRows)
         assertEquals(unselected_rows, json.NumberUnselectedRows)
         if (filtered_rows > 0) {
             assertFalse(json.ErrorURL.isEmpty())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to