This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9581d2b4eb [refactor](load) split memtable writer out of delta writer 
(#21892)
9581d2b4eb is described below

commit 9581d2b4eb22f5983fffc5debcb711233edeebc7
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Aug 8 22:02:42 2023 +0800

    [refactor](load) split memtable writer out of delta writer (#21892)
---
 be/src/olap/delta_writer.cpp                       | 265 ++-------------
 be/src/olap/delta_writer.h                         |  48 +--
 be/src/olap/memtable_writer.cpp                    | 377 +++++++++++++++++++++
 be/src/olap/{delta_writer.h => memtable_writer.h}  |  97 ++----
 be/test/olap/delta_writer_test.cpp                 |  52 ++-
 .../olap/engine_storage_migration_task_test.cpp    |  14 +-
 be/test/olap/memtable_memory_limiter_test.cpp      |  13 +-
 be/test/olap/remote_rowset_gc_test.cpp             |  13 +-
 be/test/olap/tablet_cooldown_test.cpp              |  19 +-
 9 files changed, 505 insertions(+), 393 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index c3fed74c39..947999a076 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -36,9 +36,7 @@
 #include "exec/tablet_info.h"
 #include "gutil/strings/numbers.h"
 #include "io/fs/file_writer.h" // IWYU pragma: keep
-#include "olap/memtable.h"
 #include "olap/memtable_flush_executor.h"
-#include "olap/memtable_memory_limiter.h"
 #include "olap/olap_define.h"
 #include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/beta_rowset_writer.h"
@@ -52,7 +50,6 @@
 #include "olap/tablet_meta.h"
 #include "olap/txn_manager.h"
 #include "runtime/exec_env.h"
-#include "runtime/memory/mem_tracker.h"
 #include "service/backend_options.h"
 #include "util/brpc_client_cache.h"
 #include "util/mem_info.h"
@@ -76,6 +73,7 @@ DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* 
storage_engine, Runti
           _tablet(nullptr),
           _cur_rowset(nullptr),
           _rowset_writer(nullptr),
+          _memtable_writer(*req, profile),
           _tablet_schema(new TabletSchema),
           _delta_written_success(false),
           _storage_engine(storage_engine),
@@ -85,20 +83,7 @@ DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* 
storage_engine, Runti
 
 void DeltaWriter::_init_profile(RuntimeProfile* profile) {
     _profile = profile->create_child(fmt::format("DeltaWriter {}", 
_req.tablet_id), true, true);
-    _lock_timer = ADD_TIMER(_profile, "LockTime");
-    _sort_timer = ADD_TIMER(_profile, "MemTableSortTime");
-    _agg_timer = ADD_TIMER(_profile, "MemTableAggTime");
-    _memtable_duration_timer = ADD_TIMER(_profile, "MemTableDurationTime");
-    _segment_writer_timer = ADD_TIMER(_profile, "SegmentWriterTime");
-    _wait_flush_timer = ADD_TIMER(_profile, "MemTableWaitFlushTime");
-    _put_into_output_timer = ADD_TIMER(_profile, "MemTablePutIntoOutputTime");
-    _delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapTime");
     _close_wait_timer = ADD_TIMER(_profile, "DeltaWriterCloseWaitTime");
-    _sort_times = ADD_COUNTER(_profile, "MemTableSortTimes", TUnit::UNIT);
-    _agg_times = ADD_COUNTER(_profile, "MemTableAggTimes", TUnit::UNIT);
-    _segment_num = ADD_COUNTER(_profile, "SegmentNum", TUnit::UNIT);
-    _raw_rows_num = ADD_COUNTER(_profile, "RawRowNum", TUnit::UNIT);
-    _merged_rows_num = ADD_COUNTER(_profile, "MergedRowNum", TUnit::UNIT);
 }
 
 DeltaWriter::~DeltaWriter() {
@@ -110,15 +95,13 @@ DeltaWriter::~DeltaWriter() {
         return;
     }
 
-    if (_flush_token != nullptr) {
-        // cancel and wait all memtables in flush queue to be finished
-        _flush_token->cancel();
+    // cancel and wait all memtables in flush queue to be finished
+    _memtable_writer.cancel();
 
-        if (_tablet != nullptr) {
-            const FlushStatistic& stat = _flush_token->get_stats();
-            _tablet->flush_bytes->increment(stat.flush_size_bytes);
-            _tablet->flush_finish_count->increment(stat.flush_finish_count);
-        }
+    if (_tablet != nullptr) {
+        const FlushStatistic& stat = _memtable_writer.get_flush_token_stats();
+        _tablet->flush_bytes->increment(stat.flush_size_bytes);
+        _tablet->flush_finish_count->increment(stat.flush_finish_count);
     }
 
     if (_calc_delete_bitmap_token != nullptr) {
@@ -129,8 +112,6 @@ DeltaWriter::~DeltaWriter() {
         _tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
                                                 
_rowset_writer->rowset_id().to_string());
     }
-
-    _mem_table.reset();
 }
 
 void DeltaWriter::_garbage_collection() {
@@ -214,16 +195,11 @@ Status DeltaWriter::init() {
     context.write_type = DataWriteType::TYPE_DIRECT;
     context.mow_context = std::make_shared<MowContext>(_cur_max_version, 
_req.txn_id, _rowset_ids,
                                                        _delete_bitmap);
-    RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &_rowset_writer));
-
-    _reset_mem_table();
-
-    // create flush handler
-    // by assigning segment_id to memtable before submiting to flush executor,
-    // we can make sure same keys sort in the same order in all replicas.
-    bool should_serial = false;
-    
RETURN_IF_ERROR(_storage_engine->memtable_flush_executor()->create_flush_token(
-            _flush_token, _rowset_writer.get(), should_serial, 
_req.is_high_priority));
+    std::unique_ptr<RowsetWriter> rowset_writer;
+    RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &rowset_writer));
+    _rowset_writer = std::move(rowset_writer);
+    _memtable_writer.init(_rowset_writer, _tablet_schema,
+                          _tablet->enable_unique_key_merge_on_write());
     _calc_delete_bitmap_token = 
_storage_engine->calc_delete_bitmap_executor()->create_token();
 
     _is_init = true;
@@ -245,118 +221,15 @@ Status DeltaWriter::write(const vectorized::Block* 
block, const std::vector<int>
     if (!_is_init && !_is_cancelled) {
         RETURN_IF_ERROR(init());
     }
-
-    if (_is_cancelled) {
-        return _cancel_status;
-    }
-
-    if (_is_closed) {
-        return Status::Error<ALREADY_CLOSED>(
-                "write block after closed tablet_id={}, load_id={}-{}, 
txn_id={}", _req.tablet_id,
-                _req.load_id.hi(), _req.load_id.lo(), _req.txn_id);
-    }
-
-    if (is_append) {
-        _total_received_rows += block->rows();
-    } else {
-        _total_received_rows += row_idxs.size();
-    }
-    _mem_table->insert(block, row_idxs, is_append);
-
-    if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) {
-        _mem_table->shrink_memtable_by_agg();
-    }
-    if (UNLIKELY(_mem_table->need_flush())) {
-        auto s = _flush_memtable_async();
-        _reset_mem_table();
-        if (UNLIKELY(!s.ok())) {
-            return s;
-        }
-    }
-
-    return Status::OK();
-}
-
-Status DeltaWriter::_flush_memtable_async() {
-    return _flush_token->submit(std::move(_mem_table));
+    return _memtable_writer.write(block, row_idxs, is_append);
 }
 
 Status DeltaWriter::flush_memtable_and_wait(bool need_wait) {
-    std::lock_guard<std::mutex> l(_lock);
-    if (!_is_init) {
-        // This writer is not initialized before flushing. Do nothing
-        // But we return OK instead of Status::Error<ALREADY_CANCELLED>(),
-        // Because this method maybe called when trying to reduce mem 
consumption,
-        // and at that time, the writer may not be initialized yet and that is 
a normal case.
-        return Status::OK();
-    }
-
-    if (_is_cancelled) {
-        return _cancel_status;
-    }
-
-    VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
-                << _mem_table->memory_usage() << ", tablet: " << _req.tablet_id
-                << ", load id: " << print_id(_req.load_id);
-    auto s = _flush_memtable_async();
-    _reset_mem_table();
-    if (UNLIKELY(!s.ok())) {
-        return s;
-    }
-
-    if (need_wait) {
-        // wait all memtables in flush queue to be flushed.
-        SCOPED_TIMER(_wait_flush_timer);
-        RETURN_IF_ERROR(_flush_token->wait());
-    }
-    return Status::OK();
+    return _memtable_writer.flush_memtable_and_wait(need_wait);
 }
 
 Status DeltaWriter::wait_flush() {
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        if (!_is_init) {
-            // return OK instead of Status::Error<ALREADY_CANCELLED>() for 
same reason
-            // as described in flush_memtable_and_wait()
-            return Status::OK();
-        }
-        if (_is_cancelled) {
-            return _cancel_status;
-        }
-    }
-    SCOPED_TIMER(_wait_flush_timer);
-    RETURN_IF_ERROR(_flush_token->wait());
-    return Status::OK();
-}
-
-void DeltaWriter::_reset_mem_table() {
-#ifndef BE_TEST
-    auto mem_table_insert_tracker = std::make_shared<MemTracker>(
-            
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
-                        std::to_string(tablet_id()), _mem_table_num, 
_load_id.to_string()),
-            ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker());
-    auto mem_table_flush_tracker = std::make_shared<MemTracker>(
-            
fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
-                        std::to_string(tablet_id()), _mem_table_num++, 
_load_id.to_string()),
-            ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker());
-#else
-    auto mem_table_insert_tracker = std::make_shared<MemTracker>(
-            
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
-                        std::to_string(tablet_id()), _mem_table_num, 
_load_id.to_string()));
-    auto mem_table_flush_tracker = std::make_shared<MemTracker>(
-            
fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
-                        std::to_string(tablet_id()), _mem_table_num++, 
_load_id.to_string()));
-#endif
-    {
-        std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
-        _mem_table_insert_trackers.push_back(mem_table_insert_tracker);
-        _mem_table_flush_trackers.push_back(mem_table_flush_tracker);
-    }
-    _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(), 
_req.slots, _req.tuple_desc,
-                                  _tablet->enable_unique_key_merge_on_write(),
-                                  mem_table_insert_tracker, 
mem_table_flush_tracker));
-
-    COUNTER_UPDATE(_segment_num, 1);
+    return _memtable_writer.wait_flush();
 }
 
 Status DeltaWriter::close() {
@@ -371,25 +244,7 @@ Status DeltaWriter::close() {
         // for this tablet when being closed.
         RETURN_IF_ERROR(init());
     }
-
-    if (_is_cancelled) {
-        return _cancel_status;
-    }
-
-    if (_is_closed) {
-        LOG(WARNING) << "close after closed tablet_id=" << _req.tablet_id
-                     << " load_id=" << _req.load_id << " txn_id=" << 
_req.txn_id;
-        return Status::OK();
-    }
-
-    auto s = _flush_memtable_async();
-    _mem_table.reset();
-    _is_closed = true;
-    if (UNLIKELY(!s.ok())) {
-        return s;
-    } else {
-        return Status::OK();
-    }
+    return _memtable_writer.close();
 }
 
 Status DeltaWriter::build_rowset() {
@@ -397,31 +252,8 @@ Status DeltaWriter::build_rowset() {
     DCHECK(_is_init)
             << "delta writer is supposed be to initialized before 
build_rowset() being called";
 
-    if (_is_cancelled) {
-        return _cancel_status;
-    }
-
-    Status st;
-    // return error if previous flush failed
-    {
-        SCOPED_TIMER(_wait_flush_timer);
-        st = _flush_token->wait();
-    }
-    if (UNLIKELY(!st.ok())) {
-        LOG(WARNING) << "previous flush failed tablet " << 
_tablet->tablet_id();
-        return st;
-    }
+    RETURN_IF_ERROR(_memtable_writer.close_wait());
 
-    _mem_table.reset();
-
-    if (_rowset_writer->num_rows() + _flush_token->memtable_stat().merged_rows 
!=
-        _total_received_rows) {
-        LOG(WARNING) << "the rows number written doesn't match, rowset num 
rows written to file: "
-                     << _rowset_writer->num_rows()
-                     << ", merged_rows: " << 
_flush_token->memtable_stat().merged_rows
-                     << ", total received rows: " << _total_received_rows;
-        return Status::InternalError("rows number written by delta writer 
dosen't match");
-    }
     // use rowset meta manager to save meta
     _cur_rowset = _rowset_writer->build();
     if (_cur_rowset == nullptr) {
@@ -505,31 +337,11 @@ Status DeltaWriter::commit_txn(const PSlaveTabletNodes& 
slave_tablet_nodes,
 
     _delta_written_success = true;
 
-    // const FlushStatistic& stat = _flush_token->get_stats();
-    // print slow log if wait more than 1s
-    /*if (_wait_flush_timer->elapsed_time() > 1000UL * 1000 * 1000) {
-        LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id()
-                  << ", load id: " << print_id(_req.load_id) << ", wait close 
for "
-                  << _wait_flush_timer->elapsed_time() << "(ns), stats: " << 
stat;
-    }*/
-
     if (write_single_replica) {
         for (auto node_info : slave_tablet_nodes.slave_nodes()) {
             _request_slave_tablet_pull_rowset(node_info);
         }
     }
-    COUNTER_UPDATE(_lock_timer, _lock_watch.elapsed_time() / 1000);
-    COUNTER_SET(_delete_bitmap_timer, _rowset_writer->delete_bitmap_ns());
-    COUNTER_SET(_segment_writer_timer, _rowset_writer->segment_writer_ns());
-    const auto& memtable_stat = _flush_token->memtable_stat();
-    COUNTER_SET(_sort_timer, memtable_stat.sort_ns);
-    COUNTER_SET(_agg_timer, memtable_stat.agg_ns);
-    COUNTER_SET(_memtable_duration_timer, memtable_stat.duration_ns);
-    COUNTER_SET(_put_into_output_timer, memtable_stat.put_into_output_ns);
-    COUNTER_SET(_sort_times, memtable_stat.sort_times);
-    COUNTER_SET(_agg_times, memtable_stat.agg_times);
-    COUNTER_SET(_raw_rows_num, memtable_stat.raw_rows);
-    COUNTER_SET(_merged_rows_num, memtable_stat.merged_rows);
     return Status::OK();
 }
 
@@ -558,60 +370,23 @@ Status DeltaWriter::cancel_with_status(const Status& st) {
     if (_is_cancelled) {
         return Status::OK();
     }
+    RETURN_IF_ERROR(_memtable_writer.cancel_with_status(st));
     if (_rowset_writer && _rowset_writer->is_doing_segcompaction()) {
         _rowset_writer->wait_flying_segcompaction(); /* already cancel, ignore 
the return status */
     }
-    _mem_table.reset();
-    if (_flush_token != nullptr) {
-        // cancel and wait all memtables in flush queue to be finished
-        _flush_token->cancel();
-    }
     if (_calc_delete_bitmap_token != nullptr) {
         _calc_delete_bitmap_token->cancel();
     }
     _is_cancelled = true;
-    _cancel_status = st;
     return Status::OK();
 }
 
 int64_t DeltaWriter::mem_consumption(MemType mem) {
-    if (_flush_token == nullptr) {
-        // This method may be called before this writer is initialized.
-        // So _flush_token may be null.
-        return 0;
-    }
-    int64_t mem_usage = 0;
-    {
-        std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
-        if ((mem & MemType::WRITE) == MemType::WRITE) { // 3 & 2 = 2
-            for (auto mem_table_tracker : _mem_table_insert_trackers) {
-                mem_usage += mem_table_tracker->consumption();
-            }
-        }
-        if ((mem & MemType::FLUSH) == MemType::FLUSH) { // 3 & 1 = 1
-            for (auto mem_table_tracker : _mem_table_flush_trackers) {
-                mem_usage += mem_table_tracker->consumption();
-            }
-        }
-    }
-    return mem_usage;
+    return _memtable_writer.mem_consumption(mem);
 }
 
 int64_t DeltaWriter::active_memtable_mem_consumption() {
-    if (_flush_token == nullptr) {
-        // This method may be called before this writer is initialized.
-        // So _flush_token may be null.
-        return 0;
-    }
-    int64_t mem_usage = 0;
-    {
-        std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
-        if (_mem_table_insert_trackers.size() > 0) {
-            mem_usage += (*_mem_table_insert_trackers.rbegin())->consumption();
-            mem_usage += (*_mem_table_flush_trackers.rbegin())->consumption();
-        }
-    }
-    return mem_usage;
+    return _memtable_writer.active_memtable_mem_consumption();
 }
 
 int64_t DeltaWriter::partition_id() const {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index e45a8752e4..4c452eef8a 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -30,7 +30,7 @@
 #include <vector>
 
 #include "common/status.h"
-#include "olap/memtable.h"
+#include "olap/memtable_writer.h"
 #include "olap/olap_common.h"
 #include "olap/rowset/rowset.h"
 #include "olap/tablet.h"
@@ -54,20 +54,12 @@ namespace vectorized {
 class Block;
 } // namespace vectorized
 
-enum MemType { WRITE = 1, FLUSH = 2, ALL = 3 };
-
-struct WriteRequest {
-    int64_t tablet_id;
+struct WriteRequest : MemTableWriter::WriteRequest {
     int32_t schema_hash;
     int64_t txn_id;
     int64_t partition_id;
-    PUniqueId load_id;
-    TupleDescriptor* tuple_desc;
-    // slots are in order of tablet's schema
-    const std::vector<SlotDescriptor*>* slots;
-    bool is_high_priority = false;
-    OlapTableSchemaParam* table_schema_param;
     int64_t index_id = 0;
+    OlapTableSchemaParam* table_schema_param;
 };
 
 // Writer for a particular (load, index, tablet).
@@ -138,13 +130,8 @@ private:
     DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, 
RuntimeProfile* profile,
                 const UniqueId& load_id);
 
-    // push a full memtable to flush executor
-    Status _flush_memtable_async();
-
     void _garbage_collection();
 
-    void _reset_mem_table();
-
     void _build_current_tablet_schema(int64_t index_id,
                                       const OlapTableSchemaParam* 
table_schema_param,
                                       const TabletSchema& ori_tablet_schema);
@@ -155,28 +142,16 @@ private:
 
     bool _is_init = false;
     bool _is_cancelled = false;
-    bool _is_closed = false;
-    Status _cancel_status;
     WriteRequest _req;
     TabletSharedPtr _tablet;
     RowsetSharedPtr _cur_rowset;
-    std::unique_ptr<RowsetWriter> _rowset_writer;
-    // TODO: Recheck the lifetime of _mem_table, Look should use unique_ptr
-    std::unique_ptr<MemTable> _mem_table;
-    //const TabletSchema* _tablet_schema;
-    // tablet schema owned by delta writer, all write will use this tablet 
schema
-    // it's build from tablet_schema(stored when create tablet) and 
OlapTableSchema
-    // every request will have it's own tablet schema so simple schema change 
can work
+    std::shared_ptr<RowsetWriter> _rowset_writer;
+    MemTableWriter _memtable_writer;
     TabletSchemaSPtr _tablet_schema;
     bool _delta_written_success;
 
     StorageEngine* _storage_engine;
     UniqueId _load_id;
-    std::unique_ptr<FlushToken> _flush_token;
-    std::vector<std::shared_ptr<MemTracker>> _mem_table_insert_trackers;
-    std::vector<std::shared_ptr<MemTracker>> _mem_table_flush_trackers;
-    SpinLock _mem_table_tracker_lock;
-    std::atomic<uint32_t> _mem_table_num = 1;
 
     std::mutex _lock;
 
@@ -195,20 +170,7 @@ private:
     int64_t _total_received_rows = 0;
 
     RuntimeProfile* _profile = nullptr;
-    RuntimeProfile::Counter* _lock_timer = nullptr;
-    RuntimeProfile::Counter* _sort_timer = nullptr;
-    RuntimeProfile::Counter* _agg_timer = nullptr;
-    RuntimeProfile::Counter* _wait_flush_timer = nullptr;
-    RuntimeProfile::Counter* _delete_bitmap_timer = nullptr;
-    RuntimeProfile::Counter* _segment_writer_timer = nullptr;
-    RuntimeProfile::Counter* _memtable_duration_timer = nullptr;
-    RuntimeProfile::Counter* _put_into_output_timer = nullptr;
-    RuntimeProfile::Counter* _sort_times = nullptr;
-    RuntimeProfile::Counter* _agg_times = nullptr;
     RuntimeProfile::Counter* _close_wait_timer = nullptr;
-    RuntimeProfile::Counter* _segment_num = nullptr;
-    RuntimeProfile::Counter* _raw_rows_num = nullptr;
-    RuntimeProfile::Counter* _merged_rows_num = nullptr;
 
     MonotonicStopWatch _lock_watch;
 };
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
new file mode 100644
index 0000000000..8284001e47
--- /dev/null
+++ b/be/src/olap/memtable_writer.cpp
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/memtable_writer.h"
+
+#include <fmt/format.h>
+
+#include <filesystem>
+#include <ostream>
+#include <string>
+#include <utility>
+
+// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
+#include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "exec/tablet_info.h"
+#include "gutil/strings/numbers.h"
+#include "io/fs/file_writer.h" // IWYU pragma: keep
+#include "olap/memtable.h"
+#include "olap/memtable_flush_executor.h"
+#include "olap/memtable_memory_limiter.h"
+#include "olap/rowset/beta_rowset_writer.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/schema_change.h"
+#include "olap/storage_engine.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker.h"
+#include "service/backend_options.h"
+#include "util/mem_info.h"
+#include "util/stopwatch.hpp"
+#include "vec/core/block.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+MemTableWriter::MemTableWriter(const WriteRequest& req, RuntimeProfile* 
profile) : _req(req) {
+    _init_profile(profile);
+}
+
+void MemTableWriter::_init_profile(RuntimeProfile* profile) {
+    _profile = profile->create_child(fmt::format("MemTableWriter {}", 
_req.tablet_id), true, true);
+    _lock_timer = ADD_TIMER(_profile, "LockTime");
+    _sort_timer = ADD_TIMER(_profile, "MemTableSortTime");
+    _agg_timer = ADD_TIMER(_profile, "MemTableAggTime");
+    _memtable_duration_timer = ADD_TIMER(_profile, "MemTableDurationTime");
+    _segment_writer_timer = ADD_TIMER(_profile, "SegmentWriterTime");
+    _wait_flush_timer = ADD_TIMER(_profile, "MemTableWaitFlushTime");
+    _put_into_output_timer = ADD_TIMER(_profile, "MemTablePutIntoOutputTime");
+    _delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapTime");
+    _close_wait_timer = ADD_TIMER(_profile, "MemTableWriterCloseWaitTime");
+    _sort_times = ADD_COUNTER(_profile, "MemTableSortTimes", TUnit::UNIT);
+    _agg_times = ADD_COUNTER(_profile, "MemTableAggTimes", TUnit::UNIT);
+    _segment_num = ADD_COUNTER(_profile, "SegmentNum", TUnit::UNIT);
+    _raw_rows_num = ADD_COUNTER(_profile, "RawRowNum", TUnit::UNIT);
+    _merged_rows_num = ADD_COUNTER(_profile, "MergedRowNum", TUnit::UNIT);
+}
+
+MemTableWriter::~MemTableWriter() {
+    if (!_is_init) {
+        return;
+    }
+    if (_flush_token != nullptr) {
+        // cancel and wait all memtables in flush queue to be finished
+        _flush_token->cancel();
+    }
+    _mem_table.reset();
+}
+
+Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
+                            TabletSchemaSPtr tablet_schema, bool 
unique_key_mow) {
+    _rowset_writer = rowset_writer;
+    _tablet_schema = tablet_schema;
+    _unique_key_mow = unique_key_mow;
+
+    _reset_mem_table();
+
+    // create flush handler
+    // by assigning segment_id to memtable before submiting to flush executor,
+    // we can make sure same keys sort in the same order in all replicas.
+    bool should_serial = false;
+    
RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token(
+            _flush_token, _rowset_writer.get(), should_serial, 
_req.is_high_priority));
+
+    _is_init = true;
+    return Status::OK();
+}
+
+Status MemTableWriter::append(const vectorized::Block* block) {
+    return write(block, {}, true);
+}
+
+Status MemTableWriter::write(const vectorized::Block* block, const 
std::vector<int>& row_idxs,
+                             bool is_append) {
+    if (UNLIKELY(row_idxs.empty() && !is_append)) {
+        return Status::OK();
+    }
+    _lock_watch.start();
+    std::lock_guard<std::mutex> l(_lock);
+    _lock_watch.stop();
+    if (_is_cancelled) {
+        return _cancel_status;
+    }
+    if (!_is_init) {
+        return Status::Error<NOT_INITIALIZED>("delta segment writer has not 
been initialized");
+    }
+    if (_is_closed) {
+        return Status::Error<ALREADY_CLOSED>("write block after closed 
tablet_id={}, load_id={}-{}",
+                                             _req.tablet_id, 
_req.load_id.hi(), _req.load_id.lo());
+    }
+
+    if (is_append) {
+        _total_received_rows += block->rows();
+    } else {
+        _total_received_rows += row_idxs.size();
+    }
+    _mem_table->insert(block, row_idxs, is_append);
+
+    if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) {
+        _mem_table->shrink_memtable_by_agg();
+    }
+    if (UNLIKELY(_mem_table->need_flush())) {
+        auto s = _flush_memtable_async();
+        _reset_mem_table();
+        if (UNLIKELY(!s.ok())) {
+            return s;
+        }
+    }
+
+    return Status::OK();
+}
+
+Status MemTableWriter::_flush_memtable_async() {
+    DCHECK(_flush_token != nullptr);
+    return _flush_token->submit(std::move(_mem_table));
+}
+
+Status MemTableWriter::flush_memtable_and_wait(bool need_wait) {
+    std::lock_guard<std::mutex> l(_lock);
+    if (!_is_init) {
+        // This writer is not initialized before flushing. Do nothing
+        // But we return OK instead of Status::Error<ALREADY_CANCELLED>(),
+        // Because this method maybe called when trying to reduce mem 
consumption,
+        // and at that time, the writer may not be initialized yet and that is 
a normal case.
+        return Status::OK();
+    }
+
+    if (_is_cancelled) {
+        return _cancel_status;
+    }
+
+    VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
+                << _mem_table->memory_usage() << ", tablet: " << _req.tablet_id
+                << ", load id: " << print_id(_req.load_id);
+    auto s = _flush_memtable_async();
+    _reset_mem_table();
+    if (UNLIKELY(!s.ok())) {
+        return s;
+    }
+
+    if (need_wait) {
+        // wait all memtables in flush queue to be flushed.
+        SCOPED_TIMER(_wait_flush_timer);
+        RETURN_IF_ERROR(_flush_token->wait());
+    }
+    return Status::OK();
+}
+
+Status MemTableWriter::wait_flush() {
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        if (!_is_init) {
+            // return OK instead of Status::Error<ALREADY_CANCELLED>() for 
same reason
+            // as described in flush_memtable_and_wait()
+            return Status::OK();
+        }
+        if (_is_cancelled) {
+            return _cancel_status;
+        }
+    }
+    SCOPED_TIMER(_wait_flush_timer);
+    RETURN_IF_ERROR(_flush_token->wait());
+    return Status::OK();
+}
+
+void MemTableWriter::_reset_mem_table() {
+#ifndef BE_TEST
+    auto mem_table_insert_tracker = std::make_shared<MemTracker>(
+            
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
+                        std::to_string(tablet_id()), _mem_table_num,
+                        UniqueId(_req.load_id).to_string()),
+            ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker());
+    auto mem_table_flush_tracker = std::make_shared<MemTracker>(
+            
fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
+                        std::to_string(tablet_id()), _mem_table_num++,
+                        UniqueId(_req.load_id).to_string()),
+            ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker());
+#else
+    auto mem_table_insert_tracker = std::make_shared<MemTracker>(fmt::format(
+            "MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
+            std::to_string(tablet_id()), _mem_table_num, 
UniqueId(_req.load_id).to_string()));
+    auto mem_table_flush_tracker = std::make_shared<MemTracker>(fmt::format(
+            "MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", 
std::to_string(tablet_id()),
+            _mem_table_num++, UniqueId(_req.load_id).to_string()));
+#endif
+    {
+        std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
+        _mem_table_insert_trackers.push_back(mem_table_insert_tracker);
+        _mem_table_flush_trackers.push_back(mem_table_flush_tracker);
+    }
+    _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(), 
_req.slots, _req.tuple_desc,
+                                  _unique_key_mow, mem_table_insert_tracker,
+                                  mem_table_flush_tracker));
+
+    COUNTER_UPDATE(_segment_num, 1);
+}
+
+Status MemTableWriter::close() {
+    _lock_watch.start();
+    std::lock_guard<std::mutex> l(_lock);
+    _lock_watch.stop();
+    if (_is_cancelled) {
+        return _cancel_status;
+    }
+    if (!_is_init) {
+        return Status::Error<NOT_INITIALIZED>("delta segment writer has not 
been initialized");
+    }
+    if (_is_closed) {
+        LOG(WARNING) << "close after closed tablet_id=" << _req.tablet_id
+                     << " load_id=" << _req.load_id;
+        return Status::OK();
+    }
+
+    auto s = _flush_memtable_async();
+    _mem_table.reset();
+    _is_closed = true;
+    if (UNLIKELY(!s.ok())) {
+        return s;
+    } else {
+        return Status::OK();
+    }
+}
+
+Status MemTableWriter::close_wait() {
+    SCOPED_TIMER(_close_wait_timer);
+    std::lock_guard<std::mutex> l(_lock);
+    DCHECK(_is_init)
+            << "delta writer is supposed be to initialized before close_wait() 
being called";
+
+    if (_is_cancelled) {
+        return _cancel_status;
+    }
+
+    Status st;
+    // return error if previous flush failed
+    {
+        SCOPED_TIMER(_wait_flush_timer);
+        st = _flush_token->wait();
+    }
+    if (UNLIKELY(!st.ok())) {
+        LOG(WARNING) << "previous flush failed tablet " << _req.tablet_id;
+        return st;
+    }
+
+    _mem_table.reset();
+
+    if (_rowset_writer->num_rows() + _flush_token->memtable_stat().merged_rows 
!=
+        _total_received_rows) {
+        LOG(WARNING) << "the rows number written doesn't match, rowset num 
rows written to file: "
+                     << _rowset_writer->num_rows()
+                     << ", merged_rows: " << 
_flush_token->memtable_stat().merged_rows
+                     << ", total received rows: " << _total_received_rows;
+        return Status::InternalError("rows number written by delta writer 
dosen't match");
+    }
+
+    // const FlushStatistic& stat = _flush_token->get_stats();
+    // print slow log if wait more than 1s
+    /*if (_wait_flush_timer->elapsed_time() > 1000UL * 1000 * 1000) {
+        LOG(INFO) << "close delta writer for tablet: " << req.tablet_id
+                  << ", load id: " << print_id(_req.load_id) << ", wait close 
for "
+                  << _wait_flush_timer->elapsed_time() << "(ns), stats: " << 
stat;
+    }*/
+
+    COUNTER_UPDATE(_lock_timer, _lock_watch.elapsed_time() / 1000);
+    COUNTER_SET(_delete_bitmap_timer, _rowset_writer->delete_bitmap_ns());
+    COUNTER_SET(_segment_writer_timer, _rowset_writer->segment_writer_ns());
+    const auto& memtable_stat = _flush_token->memtable_stat();
+    COUNTER_SET(_sort_timer, memtable_stat.sort_ns);
+    COUNTER_SET(_agg_timer, memtable_stat.agg_ns);
+    COUNTER_SET(_memtable_duration_timer, memtable_stat.duration_ns);
+    COUNTER_SET(_put_into_output_timer, memtable_stat.put_into_output_ns);
+    COUNTER_SET(_sort_times, memtable_stat.sort_times);
+    COUNTER_SET(_agg_times, memtable_stat.agg_times);
+    COUNTER_SET(_raw_rows_num, memtable_stat.raw_rows);
+    COUNTER_SET(_merged_rows_num, memtable_stat.merged_rows);
+    return Status::OK();
+}
+
+Status MemTableWriter::cancel() {
+    return cancel_with_status(Status::Cancelled("already cancelled"));
+}
+
+Status MemTableWriter::cancel_with_status(const Status& st) {
+    std::lock_guard<std::mutex> l(_lock);
+    if (_is_cancelled) {
+        return Status::OK();
+    }
+    _mem_table.reset();
+    if (_flush_token != nullptr) {
+        // cancel and wait all memtables in flush queue to be finished
+        _flush_token->cancel();
+    }
+    _is_cancelled = true;
+    _cancel_status = st;
+    return Status::OK();
+}
+
+const FlushStatistic& MemTableWriter::get_flush_token_stats() {
+    return _flush_token->get_stats();
+}
+
+int64_t MemTableWriter::mem_consumption(MemType mem) {
+    if (_flush_token == nullptr) {
+        // This method may be called before this writer is initialized.
+        // So _flush_token may be null.
+        return 0;
+    }
+    int64_t mem_usage = 0;
+    {
+        std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
+        if ((mem & MemType::WRITE) == MemType::WRITE) { // 3 & 2 = 2
+            for (auto mem_table_tracker : _mem_table_insert_trackers) {
+                mem_usage += mem_table_tracker->consumption();
+            }
+        }
+        if ((mem & MemType::FLUSH) == MemType::FLUSH) { // 3 & 1 = 1
+            for (auto mem_table_tracker : _mem_table_flush_trackers) {
+                mem_usage += mem_table_tracker->consumption();
+            }
+        }
+    }
+    return mem_usage;
+}
+
+int64_t MemTableWriter::active_memtable_mem_consumption() {
+    if (_flush_token == nullptr) {
+        // This method may be called before this writer is initialized.
+        // So _flush_token may be null.
+        return 0;
+    }
+    int64_t mem_usage = 0;
+    {
+        std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
+        if (_mem_table_insert_trackers.size() > 0) {
+            mem_usage += (*_mem_table_insert_trackers.rbegin())->consumption();
+            mem_usage += (*_mem_table_flush_trackers.rbegin())->consumption();
+        }
+    }
+    return mem_usage;
+}
+
+} // namespace doris
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/memtable_writer.h
similarity index 60%
copy from be/src/olap/delta_writer.h
copy to be/src/olap/memtable_writer.h
index e45a8752e4..92600e450e 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -49,6 +49,7 @@ class TupleDescriptor;
 class SlotDescriptor;
 class OlapTableSchemaParam;
 class RowsetWriter;
+struct FlushStatistic;
 
 namespace vectorized {
 class Block;
@@ -56,50 +57,36 @@ class Block;
 
 enum MemType { WRITE = 1, FLUSH = 2, ALL = 3 };
 
-struct WriteRequest {
-    int64_t tablet_id;
-    int32_t schema_hash;
-    int64_t txn_id;
-    int64_t partition_id;
-    PUniqueId load_id;
-    TupleDescriptor* tuple_desc;
-    // slots are in order of tablet's schema
-    const std::vector<SlotDescriptor*>* slots;
-    bool is_high_priority = false;
-    OlapTableSchemaParam* table_schema_param;
-    int64_t index_id = 0;
-};
-
 // Writer for a particular (load, index, tablet).
 // This class is NOT thread-safe, external synchronization is required.
-class DeltaWriter {
+class MemTableWriter {
 public:
-    static Status open(WriteRequest* req, DeltaWriter** writer, 
RuntimeProfile* profile,
-                       const UniqueId& load_id = TUniqueId());
+    struct WriteRequest {
+        int64_t tablet_id;
+        PUniqueId load_id;
+        TupleDescriptor* tuple_desc;
+        // slots are in order of tablet's schema
+        const std::vector<SlotDescriptor*>* slots;
+        bool is_high_priority = false;
+    };
 
-    ~DeltaWriter();
+    MemTableWriter(const WriteRequest& req, RuntimeProfile* profile);
 
-    Status init();
+    ~MemTableWriter();
+
+    Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr 
tablet_schema,
+                bool unique_key_mow = false);
 
     Status write(const vectorized::Block* block, const std::vector<int>& 
row_idxs,
                  bool is_append = false);
 
     Status append(const vectorized::Block* block);
 
-    // flush the last memtable to flush queue, must call it before 
build_rowset()
+    // flush the last memtable to flush queue, must call it before close_wait()
     Status close();
     // wait for all memtables to be flushed.
     // mem_consumption() should be 0 after this function returns.
-    Status build_rowset();
-    Status submit_calc_delete_bitmap_task();
-    Status wait_calc_delete_bitmap();
-    Status commit_txn(const PSlaveTabletNodes& slave_tablet_nodes, const bool 
write_single_replica);
-
-    bool check_slave_replicas_done(google::protobuf::Map<int64_t, 
PSuccessSlaveTabletNodeIds>*
-                                           success_slave_tablet_node_ids);
-
-    void add_finished_slave_replicas(google::protobuf::Map<int64_t, 
PSuccessSlaveTabletNodeIds>*
-                                             success_slave_tablet_node_ids);
+    Status close_wait();
 
     // abandon current memtable and wait for all pending-flushing memtables to 
be destructed.
     // mem_consumption() should be 0 after this function returns.
@@ -113,44 +100,24 @@ public:
     // Otherwise, it will just put memtables to the flush queue and return.
     Status flush_memtable_and_wait(bool need_wait);
 
-    int64_t partition_id() const;
-
     int64_t mem_consumption(MemType mem);
     int64_t active_memtable_mem_consumption();
 
     // Wait all memtable in flush queue to be flushed
     Status wait_flush();
 
-    int64_t tablet_id() { return _tablet->tablet_id(); }
-
-    int64_t txn_id() { return _req.txn_id; }
-
-    void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
+    int64_t tablet_id() { return _req.tablet_id; }
 
     int64_t total_received_rows() const { return _total_received_rows; }
 
-    int64_t num_rows_filtered() const;
-
-    // For UT
-    DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; }
+    const FlushStatistic& get_flush_token_stats();
 
 private:
-    DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, 
RuntimeProfile* profile,
-                const UniqueId& load_id);
-
     // push a full memtable to flush executor
     Status _flush_memtable_async();
 
-    void _garbage_collection();
-
     void _reset_mem_table();
 
-    void _build_current_tablet_schema(int64_t index_id,
-                                      const OlapTableSchemaParam* 
table_schema_param,
-                                      const TabletSchema& ori_tablet_schema);
-
-    void _request_slave_tablet_pull_rowset(PNodeInfo node_info);
-
     void _init_profile(RuntimeProfile* profile);
 
     bool _is_init = false;
@@ -158,20 +125,11 @@ private:
     bool _is_closed = false;
     Status _cancel_status;
     WriteRequest _req;
-    TabletSharedPtr _tablet;
-    RowsetSharedPtr _cur_rowset;
-    std::unique_ptr<RowsetWriter> _rowset_writer;
-    // TODO: Recheck the lifetime of _mem_table, Look should use unique_ptr
+    std::shared_ptr<RowsetWriter> _rowset_writer;
     std::unique_ptr<MemTable> _mem_table;
-    //const TabletSchema* _tablet_schema;
-    // tablet schema owned by delta writer, all write will use this tablet 
schema
-    // it's build from tablet_schema(stored when create tablet) and 
OlapTableSchema
-    // every request will have it's own tablet schema so simple schema change 
can work
     TabletSchemaSPtr _tablet_schema;
-    bool _delta_written_success;
+    bool _unique_key_mow = false;
 
-    StorageEngine* _storage_engine;
-    UniqueId _load_id;
     std::unique_ptr<FlushToken> _flush_token;
     std::vector<std::shared_ptr<MemTracker>> _mem_table_insert_trackers;
     std::vector<std::shared_ptr<MemTracker>> _mem_table_flush_trackers;
@@ -180,18 +138,7 @@ private:
 
     std::mutex _lock;
 
-    std::unordered_set<int64_t> _unfinished_slave_node;
-    PSuccessSlaveTabletNodeIds _success_slave_node_ids;
-    std::shared_mutex _slave_node_lock;
-
-    DeleteBitmapPtr _delete_bitmap = nullptr;
-    std::unique_ptr<CalcDeleteBitmapToken> _calc_delete_bitmap_token;
-    // current rowset_ids, used to do diff in publish_version
-    RowsetIdUnorderedSet _rowset_ids;
-    // current max version, used to calculate delete bitmap
-    int64_t _cur_max_version;
-
-    // total rows num written by DeltaWriter
+    // total rows num written by MemTableWriter
     int64_t _total_received_rows = 0;
 
     RuntimeProfile* _profile = nullptr;
diff --git a/be/test/olap/delta_writer_test.cpp 
b/be/test/olap/delta_writer_test.cpp
index 407d40d6e9..1ca33b1de7 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -486,9 +486,16 @@ TEST_F(TestDeltaWriter, open) {
     PUniqueId load_id;
     load_id.set_hi(0);
     load_id.set_lo(0);
-    WriteRequest write_req = {
-            10003, 270068375, 20001, 30001, load_id, tuple_desc, 
&(tuple_desc->slots()),
-            true,  &param};
+    WriteRequest write_req;
+    write_req.tablet_id = 10003;
+    write_req.schema_hash = 270068375;
+    write_req.txn_id = 20001;
+    write_req.partition_id = 30001;
+    write_req.load_id = load_id;
+    write_req.tuple_desc = tuple_desc;
+    write_req.slots = &(tuple_desc->slots());
+    write_req.is_high_priority = true;
+    write_req.table_schema_param = &param;
     DeltaWriter* delta_writer = nullptr;
 
     // test vec delta writer
@@ -525,9 +532,16 @@ TEST_F(TestDeltaWriter, vec_write) {
     PUniqueId load_id;
     load_id.set_hi(0);
     load_id.set_lo(0);
-    WriteRequest write_req = {
-            10004, 270068376, 20002, 30002, load_id, tuple_desc, 
&(tuple_desc->slots()),
-            false, &param};
+    WriteRequest write_req;
+    write_req.tablet_id = 10004;
+    write_req.schema_hash = 270068376;
+    write_req.txn_id = 20002;
+    write_req.partition_id = 30002;
+    write_req.load_id = load_id;
+    write_req.tuple_desc = tuple_desc;
+    write_req.slots = &(tuple_desc->slots());
+    write_req.is_high_priority = false;
+    write_req.table_schema_param = &param;
     DeltaWriter* delta_writer = nullptr;
     std::unique_ptr<RuntimeProfile> profile;
     profile = std::make_unique<RuntimeProfile>("LoadChannels");
@@ -682,9 +696,16 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
     PUniqueId load_id;
     load_id.set_hi(0);
     load_id.set_lo(0);
-    WriteRequest write_req = {
-            10005, 270068377, 20003, 30003, load_id, tuple_desc, 
&(tuple_desc->slots()),
-            false, &param};
+    WriteRequest write_req;
+    write_req.tablet_id = 10005;
+    write_req.schema_hash = 270068377;
+    write_req.txn_id = 20003;
+    write_req.partition_id = 30003;
+    write_req.load_id = load_id;
+    write_req.tuple_desc = tuple_desc;
+    write_req.slots = &(tuple_desc->slots());
+    write_req.is_high_priority = false;
+    write_req.table_schema_param = &param;
     DeltaWriter* delta_writer = nullptr;
     std::unique_ptr<RuntimeProfile> profile;
     profile = std::make_unique<RuntimeProfile>("LoadChannels");
@@ -792,9 +813,16 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) 
{
     PUniqueId load_id;
     load_id.set_hi(0);
     load_id.set_lo(0);
-    WriteRequest write_req = {
-            10005, 270068377, 20003, 30003, load_id, tuple_desc, 
&(tuple_desc->slots()),
-            false, &param};
+    WriteRequest write_req;
+    write_req.tablet_id = 10005;
+    write_req.schema_hash = 270068377;
+    write_req.txn_id = 20003;
+    write_req.partition_id = 30003;
+    write_req.load_id = load_id;
+    write_req.tuple_desc = tuple_desc;
+    write_req.slots = &(tuple_desc->slots());
+    write_req.is_high_priority = false;
+    write_req.table_schema_param = &param;
     DeltaWriter* delta_writer1 = nullptr;
     DeltaWriter* delta_writer2 = nullptr;
     std::unique_ptr<RuntimeProfile> profile1;
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp 
b/be/test/olap/engine_storage_migration_task_test.cpp
index 2226f03653..39cb847548 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -182,9 +182,17 @@ TEST_F(TestEngineStorageMigrationTask, 
write_and_migration) {
     PUniqueId load_id;
     load_id.set_hi(0);
     load_id.set_lo(0);
-    WriteRequest write_req = {
-            10005, 270068377, 20003, 30003, load_id, tuple_desc, 
&(tuple_desc->slots()),
-            false, &param};
+    WriteRequest write_req;
+    write_req.tablet_id = 10005;
+    write_req.schema_hash = 270068377;
+    write_req.txn_id = 20003;
+    write_req.partition_id = 30003;
+    write_req.load_id = load_id;
+    write_req.tuple_desc = tuple_desc;
+    write_req.slots = &(tuple_desc->slots());
+    write_req.is_high_priority = false;
+    write_req.table_schema_param = &param;
+
     DeltaWriter* delta_writer = nullptr;
 
     std::unique_ptr<RuntimeProfile> profile;
diff --git a/be/test/olap/memtable_memory_limiter_test.cpp 
b/be/test/olap/memtable_memory_limiter_test.cpp
index 7b49b22b32..4dce31da0b 100644
--- a/be/test/olap/memtable_memory_limiter_test.cpp
+++ b/be/test/olap/memtable_memory_limiter_test.cpp
@@ -127,9 +127,16 @@ TEST_F(MemTableMemoryLimiterTest, 
handle_memtable_flush_test) {
     PUniqueId load_id;
     load_id.set_hi(0);
     load_id.set_lo(0);
-    WriteRequest write_req = {
-            10000, 270068372, 20002, 30002, load_id, tuple_desc, 
&(tuple_desc->slots()),
-            false, &param};
+    WriteRequest write_req;
+    write_req.tablet_id = 10000;
+    write_req.schema_hash = 270068372;
+    write_req.txn_id = 20002;
+    write_req.partition_id = 30002;
+    write_req.load_id = load_id;
+    write_req.tuple_desc = tuple_desc;
+    write_req.slots = &(tuple_desc->slots());
+    write_req.is_high_priority = false;
+    write_req.table_schema_param = &param;
     DeltaWriter* delta_writer = nullptr;
     std::unique_ptr<RuntimeProfile> profile;
     profile = std::make_unique<RuntimeProfile>("MemTableMemoryLimiterTest");
diff --git a/be/test/olap/remote_rowset_gc_test.cpp 
b/be/test/olap/remote_rowset_gc_test.cpp
index cf10c89722..675f0ba578 100644
--- a/be/test/olap/remote_rowset_gc_test.cpp
+++ b/be/test/olap/remote_rowset_gc_test.cpp
@@ -188,9 +188,16 @@ TEST_F(RemoteRowsetGcTest, normal) {
     PUniqueId load_id;
     load_id.set_hi(0);
     load_id.set_lo(0);
-    WriteRequest write_req = {
-            10005, 270068377, 20003, 30003, load_id, tuple_desc, 
&(tuple_desc->slots()),
-            false, &param};
+    WriteRequest write_req;
+    write_req.tablet_id = 10005;
+    write_req.schema_hash = 270068377;
+    write_req.txn_id = 20003;
+    write_req.partition_id = 30003;
+    write_req.load_id = load_id;
+    write_req.tuple_desc = tuple_desc;
+    write_req.slots = &(tuple_desc->slots());
+    write_req.is_high_priority = false;
+    write_req.table_schema_param = &param;
     std::unique_ptr<RuntimeProfile> profile;
     profile = std::make_unique<RuntimeProfile>("LoadChannels");
     DeltaWriter* delta_writer = nullptr;
diff --git a/be/test/olap/tablet_cooldown_test.cpp 
b/be/test/olap/tablet_cooldown_test.cpp
index 6cf0582c10..847c115a3b 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -356,15 +356,16 @@ void createTablet(StorageEngine* engine, TabletSharedPtr* 
tablet, int64_t replic
     load_id.set_hi(0);
     load_id.set_lo(0);
 
-    WriteRequest write_req = {tablet_id,
-                              schema_hash,
-                              txn_id,
-                              partition_id,
-                              load_id,
-                              tuple_desc,
-                              &(tuple_desc->slots()),
-                              false,
-                              &param};
+    WriteRequest write_req;
+    write_req.tablet_id = tablet_id;
+    write_req.schema_hash = schema_hash;
+    write_req.txn_id = txn_id;
+    write_req.partition_id = partition_id;
+    write_req.load_id = load_id;
+    write_req.tuple_desc = tuple_desc;
+    write_req.slots = &(tuple_desc->slots());
+    write_req.is_high_priority = false;
+    write_req.table_schema_param = &param;
 
     DeltaWriter* delta_writer = nullptr;
     std::unique_ptr<RuntimeProfile> profile;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to