This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c961f2272 [refactor](load) move generate_delete_bitmap from memtable 
to beta rowset writer (#21329)
1c961f2272 is described below

commit 1c961f2272183e4ea8909a7c3c5e36d83a5cc420
Author: Kaijie Chen <[email protected]>
AuthorDate: Sat Jul 1 17:22:45 2023 +0800

    [refactor](load) move generate_delete_bitmap from memtable to beta rowset 
writer (#21329)
---
 be/src/olap/delta_writer.cpp              |  15 +---
 be/src/olap/delta_writer.h                |   2 -
 be/src/olap/memtable.cpp                  | 144 +++++++-----------------------
 be/src/olap/memtable.h                    |  35 +++-----
 be/src/olap/rowset/beta_rowset_writer.cpp |  47 ++++++++--
 be/src/olap/rowset/beta_rowset_writer.h   |  15 ++--
 be/src/olap/rowset/rowset_writer.h        |   8 +-
 be/src/olap/tablet.cpp                    |   3 +-
 8 files changed, 95 insertions(+), 174 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 3c84e67103..ab615d7e7f 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -34,10 +34,8 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "exec/tablet_info.h"
-#include "gutil/integral_types.h"
 #include "gutil/strings/numbers.h"
 #include "io/fs/file_writer.h" // IWYU pragma: keep
-#include "olap/data_dir.h"
 #include "olap/memtable.h"
 #include "olap/memtable_flush_executor.h"
 #include "olap/olap_define.h"
@@ -47,8 +45,6 @@
 #include "olap/rowset/rowset_writer.h"
 #include "olap/rowset/rowset_writer_context.h"
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
-#include "olap/rowset/segment_v2/segment.h"
-#include "olap/schema.h"
 #include "olap/schema_change.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
@@ -95,7 +91,7 @@ void DeltaWriter::_init_profile(RuntimeProfile* profile) {
     _segment_writer_timer = ADD_TIMER(_profile, "SegmentWriterTime");
     _wait_flush_timer = ADD_TIMER(_profile, "MemTableWaitFlushTime");
     _put_into_output_timer = ADD_TIMER(_profile, "MemTablePutIntoOutputTime");
-    _delete_bitmap_timer = ADD_TIMER(_profile, "MemTableDeleteBitmapTime");
+    _delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapTime");
     _close_wait_timer = ADD_TIMER(_profile, "DeltaWriterCloseWaitTime");
     _sort_times = ADD_COUNTER(_profile, "MemTableSortTimes", TUnit::UNIT);
     _agg_times = ADD_COUNTER(_profile, "MemTableAggTimes", TUnit::UNIT);
@@ -206,7 +202,6 @@ Status DeltaWriter::init() {
                                                        _delete_bitmap);
     RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &_rowset_writer));
 
-    _schema.reset(new Schema(_tablet_schema));
     _reset_mem_table();
 
     // create flush handler
@@ -346,10 +341,8 @@ void DeltaWriter::_reset_mem_table() {
         _mem_table_insert_trackers.push_back(mem_table_insert_tracker);
         _mem_table_flush_trackers.push_back(mem_table_flush_tracker);
     }
-    auto mow_context = std::make_shared<MowContext>(_cur_max_version, 
_req.txn_id, _rowset_ids,
-                                                    _delete_bitmap);
-    _mem_table.reset(new MemTable(_tablet, _schema.get(), 
_tablet_schema.get(), _req.slots,
-                                  _req.tuple_desc, _rowset_writer.get(), 
mow_context,
+    _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(), 
_req.slots, _req.tuple_desc,
+                                  _rowset_writer.get(), 
_tablet->enable_unique_key_merge_on_write(),
                                   mem_table_insert_tracker, 
mem_table_flush_tracker));
 
     COUNTER_UPDATE(_segment_num, 1);
@@ -359,7 +352,6 @@ void DeltaWriter::_reset_mem_table() {
         COUNTER_SET(_agg_timer, _memtable_stat.agg_ns);
         COUNTER_SET(_memtable_duration_timer, _memtable_stat.duration_ns);
         COUNTER_SET(_segment_writer_timer, _memtable_stat.segment_writer_ns);
-        COUNTER_SET(_delete_bitmap_timer, _memtable_stat.delete_bitmap_ns);
         COUNTER_SET(_put_into_output_timer, _memtable_stat.put_into_output_ns);
         COUNTER_SET(_sort_times, _memtable_stat.sort_times);
         COUNTER_SET(_agg_times, _memtable_stat.agg_times);
@@ -502,6 +494,7 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& 
slave_tablet_nodes,
         }
     }
     COUNTER_UPDATE(_lock_timer, _lock_watch.elapsed_time() / 1000);
+    COUNTER_SET(_delete_bitmap_timer, _rowset_writer->delete_bitmap_ns());
     return Status::OK();
 }
 
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 52b407876f..5289068c6d 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -44,7 +44,6 @@ namespace doris {
 class FlushToken;
 class MemTable;
 class MemTracker;
-class Schema;
 class StorageEngine;
 class TupleDescriptor;
 class SlotDescriptor;
@@ -158,7 +157,6 @@ private:
     std::unique_ptr<RowsetWriter> _rowset_writer;
     // TODO: Recheck the lifetime of _mem_table, Look should use unique_ptr
     std::unique_ptr<MemTable> _mem_table;
-    std::unique_ptr<Schema> _schema;
     //const TabletSchema* _tablet_schema;
     // tablet schema owned by delta writer, all write will use this tablet 
schema
     // it's build from tablet_schema(stored when create tablet) and 
OlapTableSchema
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index ace9f4f689..6d8975e84c 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -22,9 +22,7 @@
 #include <pdqsort.h>
 
 #include <algorithm>
-#include <cstddef>
 #include <limits>
-#include <shared_mutex>
 #include <string>
 #include <utility>
 #include <vector>
@@ -33,11 +31,7 @@
 #include "common/consts.h"
 #include "common/logging.h"
 #include "olap/olap_define.h"
-#include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/rowset_writer.h"
-#include "olap/rowset/segment_v2/segment.h"
-#include "olap/schema.h"
-#include "olap/schema_change.h"
 #include "olap/tablet_schema.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
@@ -46,52 +40,45 @@
 #include "util/doris_metrics.h"
 #include "util/runtime_profile.h"
 #include "util/stopwatch.hpp"
-#include "util/string_util.h"
 #include "vec/aggregate_functions/aggregate_function_reader.h"
 #include "vec/aggregate_functions/aggregate_function_simple_factory.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_object.h"
-#include "vec/columns/column_string.h"
 #include "vec/common/assert_cast.h"
 #include "vec/common/schema_util.h"
 #include "vec/core/column_with_type_and_name.h"
-#include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_factory.hpp"
-#include "vec/json/path_in_data.h"
-#include "vec/jsonb/serialize.h"
 
 namespace doris {
 using namespace ErrorCode;
 
-MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* 
tablet_schema,
+MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
                    const std::vector<SlotDescriptor*>* slot_descs, 
TupleDescriptor* tuple_desc,
-                   RowsetWriter* rowset_writer, std::shared_ptr<MowContext> 
mow_context,
+                   RowsetWriter* rowset_writer, bool enable_unique_key_mow,
                    const std::shared_ptr<MemTracker>& insert_mem_tracker,
                    const std::shared_ptr<MemTracker>& flush_mem_tracker)
-        : _tablet(std::move(tablet)),
-          _keys_type(_tablet->keys_type()),
-          _schema(schema),
+        : _tablet_id(tablet_id),
+          _enable_unique_key_mow(enable_unique_key_mow),
+          _keys_type(tablet_schema->keys_type()),
           _tablet_schema(tablet_schema),
           _insert_mem_tracker(insert_mem_tracker),
           _flush_mem_tracker(flush_mem_tracker),
-          _schema_size(_schema->schema_size()),
           _rowset_writer(rowset_writer),
           _is_first_insertion(true),
-          _agg_functions(schema->num_columns()),
-          _offsets_of_aggregate_states(schema->num_columns()),
+          _agg_functions(tablet_schema->num_columns()),
+          _offsets_of_aggregate_states(tablet_schema->num_columns()),
           _total_size_of_aggregate_states(0),
-          _mem_usage(0),
-          _mow_context(mow_context) {
+          _mem_usage(0) {
 #ifndef BE_TEST
     _insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
-            fmt::format("MemTableHookInsert:TabletId={}", 
std::to_string(tablet_id())),
+            fmt::format("MemTableHookInsert:TabletId={}", 
std::to_string(tablet_id)),
             ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
 #else
     _insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
-            fmt::format("MemTableHookInsert:TabletId={}", 
std::to_string(tablet_id())));
+            fmt::format("MemTableHookInsert:TabletId={}", 
std::to_string(tablet_id)));
 #endif
     _arena = std::make_unique<vectorized::Arena>();
-    _vec_row_comparator = std::make_shared<RowInBlockComparator>(_schema);
+    _vec_row_comparator = 
std::make_shared<RowInBlockComparator>(_tablet_schema);
     // TODO: Support ZOrderComparator in the future
     _init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
     _num_columns = _tablet_schema->num_columns();
@@ -113,9 +100,9 @@ void MemTable::_init_columns_offset_by_slot_descs(const 
std::vector<SlotDescript
 }
 
 void MemTable::_init_agg_functions(const vectorized::Block* block) {
-    for (uint32_t cid = _schema->num_key_columns(); cid < _num_columns; ++cid) 
{
+    for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; 
++cid) {
         vectorized::AggregateFunctionPtr function;
-        if (_keys_type == KeysType::UNIQUE_KEYS && 
_tablet->enable_unique_key_merge_on_write()) {
+        if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow) {
             // In such table, non-key column's aggregation type is NONE, so we 
need to construct
             // the aggregate function manually.
             function = 
vectorized::AggregateFunctionSimpleFactory::instance().get(
@@ -130,7 +117,7 @@ void MemTable::_init_agg_functions(const vectorized::Block* 
block) {
         _agg_functions[cid] = function;
     }
 
-    for (uint32_t cid = _schema->num_key_columns(); cid < _num_columns; ++cid) 
{
+    for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; 
++cid) {
         _offsets_of_aggregate_states[cid] = _total_size_of_aggregate_states;
         _total_size_of_aggregate_states += _agg_functions[cid]->size_of_data();
 
@@ -155,7 +142,7 @@ MemTable::~MemTable() {
             }
             // We should release agg_places here, because they are not 
released when a
             // load is canceled.
-            for (size_t i = _schema->num_key_columns(); i < _num_columns; ++i) 
{
+            for (size_t i = _tablet_schema->num_key_columns(); i < 
_num_columns; ++i) {
                 auto function = _agg_functions[i];
                 DCHECK(function != nullptr);
                 function->destroy((*it)->agg_places(i));
@@ -172,7 +159,7 @@ MemTable::~MemTable() {
 }
 
 int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* 
right) const {
-    return _pblock->compare_at(left->_row_pos, right->_row_pos, 
_schema->num_key_columns(),
+    return _pblock->compare_at(left->_row_pos, right->_row_pos, 
_tablet_schema->num_key_columns(),
                                *_pblock, -1);
 }
 
@@ -217,26 +204,26 @@ void MemTable::insert(const vectorized::Block* 
input_block, const std::vector<in
 }
 
 void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& 
mutable_block,
-                                           RowInBlock* new_row, RowInBlock* 
row_in_skiplist) {
+                                           RowInBlock* src_row, RowInBlock* 
dst_row) {
     if (_tablet_schema->has_sequence_col()) {
         auto sequence_idx = _tablet_schema->sequence_col_idx();
         DCHECK_LT(sequence_idx, mutable_block.columns());
         auto col_ptr = mutable_block.mutable_columns()[sequence_idx].get();
-        auto res = col_ptr->compare_at(row_in_skiplist->_row_pos, 
new_row->_row_pos, *col_ptr, -1);
+        auto res = col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos, 
*col_ptr, -1);
         // dst sequence column larger than src, don't need to update
         if (res > 0) {
             return;
         }
-        // need to update the row pos in skiplist to the new row pos when has
+        // need to update the row pos in dst row to the src row pos when has
         // sequence column
-        row_in_skiplist->_row_pos = new_row->_row_pos;
+        dst_row->_row_pos = src_row->_row_pos;
     }
     // dst is non-sequence row, or dst sequence is smaller
-    for (uint32_t cid = _schema->num_key_columns(); cid < _num_columns; ++cid) 
{
+    for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; 
++cid) {
         auto col_ptr = mutable_block.mutable_columns()[cid].get();
-        _agg_functions[cid]->add(row_in_skiplist->agg_places(cid),
+        _agg_functions[cid]->add(dst_row->agg_places(cid),
                                  const_cast<const 
doris::vectorized::IColumn**>(&col_ptr),
-                                 new_row->_row_pos, _arena.get());
+                                 src_row->_row_pos, _arena.get());
     }
 }
 void MemTable::_put_into_output(vectorized::Block& in_block) {
@@ -257,7 +244,7 @@ size_t MemTable::_sort() {
     size_t same_keys_num = 0;
     // sort new rows
     Tie tie = Tie(_last_sorted_pos, _row_in_blocks.size());
-    for (size_t i = 0; i < _schema->num_key_columns(); i++) {
+    for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) {
         auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int {
             return _input_mutable_block.compare_one_column(lhs->_row_pos, 
rhs->_row_pos, i, -1);
         };
@@ -311,13 +298,13 @@ void MemTable::_finalize_one_row(RowInBlock* row,
                                  const vectorized::ColumnsWithTypeAndName& 
block_data,
                                  int row_pos) {
     // move key columns
-    for (size_t i = 0; i < _schema->num_key_columns(); ++i) {
+    for (size_t i = 0; i < _tablet_schema->num_key_columns(); ++i) {
         
_output_mutable_block.get_column_by_position(i)->insert_from(*block_data[i].column.get(),
                                                                      
row->_row_pos);
     }
     if (row->has_init_agg()) {
         // get value columns from agg_places
-        for (size_t i = _schema->num_key_columns(); i < _num_columns; ++i) {
+        for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; 
++i) {
             auto function = _agg_functions[i];
             auto agg_place = row->agg_places(i);
             auto col_ptr = 
_output_mutable_block.get_column_by_position(i).get();
@@ -335,7 +322,7 @@ void MemTable::_finalize_one_row(RowInBlock* row,
         }
     } else {
         // move columns for rows do not need agg
-        for (size_t i = _schema->num_key_columns(); i < _num_columns; ++i) {
+        for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; 
++i) {
             _output_mutable_block.get_column_by_position(i)->insert_from(
                     *block_data[i].column.get(), row->_row_pos);
         }
@@ -366,7 +353,8 @@ void MemTable::_aggregate() {
                 prev_row->init_agg_places(
                         _arena->aligned_alloc(_total_size_of_aggregate_states, 
16),
                         _offsets_of_aggregate_states.data());
-                for (auto cid = _schema->num_key_columns(); cid < 
_schema->num_columns(); cid++) {
+                for (auto cid = _tablet_schema->num_key_columns();
+                     cid < _tablet_schema->num_columns(); cid++) {
                     auto col_ptr = mutable_block.mutable_columns()[cid].get();
                     auto data = prev_row->agg_places(cid);
                     _agg_functions[cid]->create(data);
@@ -443,41 +431,6 @@ bool MemTable::need_agg() const {
     return false;
 }
 
-Status MemTable::_generate_delete_bitmap(int32_t segment_id) {
-    SCOPED_RAW_TIMER(&_stat.delete_bitmap_ns);
-    // generate delete bitmap, build a tmp rowset and load recent segment
-    if (!_tablet->enable_unique_key_merge_on_write()) {
-        return Status::OK();
-    }
-    auto rowset = _rowset_writer->build_tmp();
-    auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
-    std::vector<segment_v2::SegmentSharedPtr> segments;
-    RETURN_IF_ERROR(beta_rowset->load_segments(segment_id, segment_id + 1, 
&segments));
-    std::vector<RowsetSharedPtr> specified_rowsets;
-    {
-        std::shared_lock meta_rlock(_tablet->get_header_lock());
-        // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-        if (_tablet->tablet_state() == TABLET_NOTREADY &&
-            SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
-            return Status::OK();
-        }
-        specified_rowsets = 
_tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
-    }
-    OlapStopWatch watch;
-    RETURN_IF_ERROR(_tablet->calc_delete_bitmap(rowset, segments, 
specified_rowsets,
-                                                _mow_context->delete_bitmap,
-                                                _mow_context->max_version));
-    size_t total_rows = std::accumulate(
-            segments.begin(), segments.end(), 0,
-            [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
-    LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " << 
tablet_id()
-              << ", rowset_ids: " << _mow_context->rowset_ids.size()
-              << ", cur max_version: " << _mow_context->max_version
-              << ", transaction_id: " << _mow_context->txn_id
-              << ", cost: " << watch.get_elapse_time_us() << "(us), total 
rows: " << total_rows;
-    return Status::OK();
-}
-
 Status MemTable::flush() {
     VLOG_CRITICAL << "begin to flush memtable for tablet: " << tablet_id()
                   << ", memsize: " << memory_usage() << ", rows: " << 
_stat.raw_rows;
@@ -501,7 +454,7 @@ Status MemTable::_do_flush() {
     SCOPED_CONSUME_MEM_TRACKER(_flush_mem_tracker);
     size_t same_keys_num = _sort();
     if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
-        if (_keys_type == KeysType::DUP_KEYS && _schema->num_key_columns() == 
0) {
+        if (_keys_type == KeysType::DUP_KEYS && 
_tablet_schema->num_key_columns() == 0) {
             _output_mutable_block.swap(_input_mutable_block);
         } else {
             vectorized::Block in_block = _input_mutable_block.to_block();
@@ -517,11 +470,6 @@ Status MemTable::_do_flush() {
         // Unfold variant column
         RETURN_IF_ERROR(unfold_variant_column(block, &ctx));
     }
-    if (!_tablet_schema->is_partial_update()) {
-        ctx.generate_delete_bitmap = [this](size_t segment_id) {
-            return _generate_delete_bitmap(segment_id);
-        };
-    }
     ctx.segment_id = _segment_id;
     SCOPED_RAW_TIMER(&_stat.segment_writer_ns);
     RETURN_IF_ERROR(_rowset_writer->flush_single_memtable(&block, 
&_flush_size, &ctx));
@@ -609,36 +557,4 @@ Status MemTable::unfold_variant_column(vectorized::Block& 
block, FlushContext* c
     return Status::OK();
 }
 
-void MemTable::serialize_block_to_row_column(vectorized::Block& block) {
-    if (block.rows() == 0) {
-        return;
-    }
-    MonotonicStopWatch watch;
-    watch.start();
-    // find row column id
-    int row_column_id = 0;
-    for (int i = 0; i < _num_columns; ++i) {
-        if (_tablet_schema->column(i).is_row_store_column()) {
-            row_column_id = i;
-            break;
-        }
-    }
-    if (row_column_id == 0) {
-        return;
-    }
-    vectorized::ColumnString* row_store_column =
-            
static_cast<vectorized::ColumnString*>(block.get_by_position(row_column_id)
-                                                           
.column->assume_mutable_ref()
-                                                           .assume_mutable()
-                                                           .get());
-    row_store_column->clear();
-    vectorized::DataTypeSerDeSPtrs serdes =
-            vectorized::create_data_type_serdes(block.get_data_types());
-    vectorized::JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block, 
*row_store_column,
-                                                   
_tablet_schema->num_columns(), serdes);
-    VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ", 
row_column_id:" << row_column_id
-               << ", total_byte_size:" << block.allocated_bytes() << ", 
serialize_cost(us)"
-               << watch.elapsed_time() / 1000;
-}
-
 } // namespace doris
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 912f2f4211..49efd77143 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -31,8 +31,6 @@
 #include "common/status.h"
 #include "gutil/integral_types.h"
 #include "olap/olap_common.h"
-#include "olap/tablet.h"
-#include "olap/tablet_meta.h"
 #include "runtime/memory/mem_tracker.h"
 #include "vec/aggregate_functions/aggregate_function.h"
 #include "vec/common/arena.h"
@@ -75,8 +73,8 @@ public:
     class Iter {
     public:
         Iter(Tie& tie) : _tie(tie), _next(tie._begin + 1) {}
-        size_t left() { return _left; }
-        size_t right() { return _right; }
+        size_t left() const { return _left; }
+        size_t right() const { return _right; }
 
         // return false means no more ranges
         bool next() {
@@ -130,7 +128,7 @@ private:
 
 class RowInBlockComparator {
 public:
-    RowInBlockComparator(const Schema* schema) : _schema(schema) {}
+    RowInBlockComparator(const TabletSchema* tablet_schema) : 
_tablet_schema(tablet_schema) {}
     // call set_block before operator().
     // only first time insert block to create _input_mutable_block,
     // so can not Comparator of construct to set pblock
@@ -138,8 +136,8 @@ public:
     int operator()(const RowInBlock* left, const RowInBlock* right) const;
 
 private:
-    const Schema* _schema;
-    vectorized::MutableBlock* _pblock; // 对应Memtable::_input_mutable_block
+    const TabletSchema* _tablet_schema;
+    vectorized::MutableBlock* _pblock; //  corresponds to 
Memtable::_input_mutable_block
 };
 
 class MemTableStat {
@@ -150,7 +148,6 @@ public:
         sort_ns += stat.sort_ns;
         agg_ns += stat.agg_ns;
         put_into_output_ns += stat.put_into_output_ns;
-        delete_bitmap_ns += stat.delete_bitmap_ns;
         segment_writer_ns += stat.segment_writer_ns;
         duration_ns += stat.duration_ns;
         sort_times += stat.sort_times;
@@ -164,7 +161,6 @@ public:
     int64_t sort_ns = 0;
     int64_t agg_ns = 0;
     int64_t put_into_output_ns = 0;
-    int64_t delete_bitmap_ns = 0;
     int64_t segment_writer_ns = 0;
     int64_t duration_ns = 0;
     int64_t sort_times = 0;
@@ -173,14 +169,14 @@ public:
 
 class MemTable {
 public:
-    MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* 
tablet_schema,
+    MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
              const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* 
tuple_desc,
-             RowsetWriter* rowset_writer, std::shared_ptr<MowContext> 
mow_context,
+             RowsetWriter* rowset_writer, bool enable_unique_key_mow,
              const std::shared_ptr<MemTracker>& insert_mem_tracker,
              const std::shared_ptr<MemTracker>& flush_mem_tracker);
     ~MemTable();
 
-    int64_t tablet_id() const { return _tablet->tablet_id(); }
+    int64_t tablet_id() const { return _tablet_id; }
     size_t memory_usage() const {
         return _insert_mem_tracker->consumption() + _arena->used_size() +
                _flush_mem_tracker->consumption();
@@ -216,12 +212,6 @@ private:
     void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, 
RowInBlock* new_row,
                                      RowInBlock* row_in_skiplist);
 
-    Status _generate_delete_bitmap(int32_t segment_id);
-
-    // serialize block to row store format and append serialized data into row 
store column
-    // in block
-    void serialize_block_to_row_column(vectorized::Block& block);
-
     // Unfold variant column to Block
     // Eg. [A | B | C | (D, E, F)]
     // After unfold block structure changed to -> [A | B | C | D | E | F]
@@ -230,9 +220,9 @@ private:
     Status unfold_variant_column(vectorized::Block& block, FlushContext* ctx);
 
 private:
-    TabletSharedPtr _tablet;
+    int64_t _tablet_id;
+    bool _enable_unique_key_mow = false;
     const KeysType _keys_type;
-    Schema* _schema;
     const TabletSchema* _tablet_schema;
 
     std::shared_ptr<RowInBlockComparator> _vec_row_comparator;
@@ -253,11 +243,6 @@ private:
     std::unique_ptr<vectorized::Arena> _arena;
     // The object buffer pool for convert tuple to row
     ObjectPool _agg_buffer_pool;
-    // Only the rows will be inserted into SkipList can acquire the owner ship 
from
-    // `_agg_buffer_pool`
-    ObjectPool _agg_object_pool;
-
-    size_t _schema_size;
 
     void _init_columns_offset_by_slot_descs(const 
std::vector<SlotDescriptor*>* slot_descs,
                                             const TupleDescriptor* tuple_desc);
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 9e79f9287b..3af5bb1c46 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -24,7 +24,6 @@
 
 #include <ctime> // time
 #include <filesystem>
-#include <memory>
 #include <sstream>
 #include <utility>
 
@@ -32,12 +31,10 @@
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/config.h"
 #include "common/logging.h"
-#include "gutil/integral_types.h"
 #include "gutil/strings/substitute.h"
 #include "io/fs/file_reader_options.h"
 #include "io/fs/file_system.h"
 #include "io/fs/file_writer.h"
-#include "olap/data_dir.h"
 #include "olap/olap_define.h"
 #include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/rowset_factory.h"
@@ -46,10 +43,9 @@
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
 #include "olap/rowset/segment_v2/segment.h"
 #include "olap/rowset/segment_v2/segment_writer.h"
+#include "olap/schema_change.h"
 #include "olap/storage_engine.h"
-#include "olap/tablet.h"
 #include "olap/tablet_schema.h"
-#include "segcompaction.h"
 #include "util/slice.h"
 #include "util/time.h"
 #include "vec/common/schema_util.h" // LocalSchemaChangeRecorder
@@ -141,6 +137,41 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
+    SCOPED_RAW_TIMER(&_delete_bitmap_ns);
+    if (!_context.tablet->enable_unique_key_merge_on_write() ||
+        _context.tablet_schema->is_partial_update()) {
+        return Status::OK();
+    }
+    auto rowset = _build_tmp();
+    auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
+    std::vector<segment_v2::SegmentSharedPtr> segments;
+    RETURN_IF_ERROR(beta_rowset->load_segments(segment_id, segment_id + 1, 
&segments));
+    std::vector<RowsetSharedPtr> specified_rowsets;
+    {
+        std::shared_lock meta_rlock(_context.tablet->get_header_lock());
+        // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
+        if (_context.tablet->tablet_state() == TABLET_NOTREADY &&
+            
SchemaChangeHandler::tablet_in_converting(_context.tablet->tablet_id())) {
+            return Status::OK();
+        }
+        specified_rowsets = 
_context.tablet->get_rowset_by_ids(&_context.mow_context->rowset_ids);
+    }
+    OlapStopWatch watch;
+    RETURN_IF_ERROR(_context.tablet->calc_delete_bitmap(rowset, segments, 
specified_rowsets,
+                                                        
_context.mow_context->delete_bitmap,
+                                                        
_context.mow_context->max_version));
+    size_t total_rows = std::accumulate(
+            segments.begin(), segments.end(), 0,
+            [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
+    LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " << 
_context.tablet->tablet_id()
+              << ", rowset_ids: " << _context.mow_context->rowset_ids.size()
+              << ", cur max_version: " << _context.mow_context->max_version
+              << ", transaction_id: " << _context.mow_context->txn_id
+              << ", cost: " << watch.get_elapse_time_us() << "(us), total 
rows: " << total_rows;
+    return Status::OK();
+}
+
 Status BetaRowsetWriter::_load_noncompacted_segments(
         std::vector<segment_v2::SegmentSharedPtr>* segments, size_t num) {
     auto fs = _rowset_meta->fs();
@@ -501,9 +532,7 @@ Status BetaRowsetWriter::flush_single_memtable(const 
vectorized::Block* block, i
         DCHECK_EQ(writer.get(), raw_writer);
     }
     RETURN_IF_ERROR(_flush_segment_writer(&writer, flush_size));
-    if (ctx != nullptr && ctx->generate_delete_bitmap) {
-        RETURN_IF_ERROR(ctx->generate_delete_bitmap(segment_id));
-    }
+    RETURN_IF_ERROR(_generate_delete_bitmap(segment_id));
     RETURN_IF_ERROR(_segcompaction_if_necessary());
     return Status::OK();
 }
@@ -684,7 +713,7 @@ void 
BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met
     }
 }
 
-RowsetSharedPtr BetaRowsetWriter::build_tmp() {
+RowsetSharedPtr BetaRowsetWriter::_build_tmp() {
     std::shared_ptr<RowsetMeta> rowset_meta_ = std::make_shared<RowsetMeta>();
     *rowset_meta_ = *_rowset_meta;
     _build_rowset_meta(rowset_meta_);
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index b646f2a681..f0057c9dec 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -86,10 +86,6 @@ public:
 
     RowsetSharedPtr build() override;
 
-    // build a tmp rowset for load segment to calc delete_bitmap
-    // for this segment
-    RowsetSharedPtr build_tmp() override;
-
     RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta) 
override;
 
     Version version() override { return _context.version; }
@@ -123,6 +119,10 @@ public:
 
     Status wait_flying_segcompaction() override;
 
+    void set_segment_start_id(int32_t start_id) override { _segment_start_id = 
start_id; }
+
+    int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; }
+
 private:
     Status _do_add_block(const vectorized::Block* block,
                          std::unique_ptr<segment_v2::SegmentWriter>* 
segment_writer,
@@ -138,6 +138,7 @@ private:
                                   const FlushContext* ctx = nullptr);
     Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* 
writer,
                                  int64_t* flush_size = nullptr);
+    Status _generate_delete_bitmap(int32_t segment_id);
     void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
     Status _segcompaction_if_necessary();
     Status _segcompaction_ramaining_if_necessary();
@@ -157,7 +158,9 @@ private:
     Status _rename_compacted_segment_plain(uint64_t seg_id);
     Status _rename_compacted_indices(int64_t begin, int64_t end, uint64_t 
seg_id);
 
-    void set_segment_start_id(int32_t start_id) override { _segment_start_id = 
start_id; }
+    // build a tmp rowset for load segment to calc delete_bitmap
+    // for this segment
+    RowsetSharedPtr _build_tmp();
 
 protected:
     RowsetWriterContext _context;
@@ -219,6 +222,8 @@ protected:
     fmt::memory_buffer vlog_buffer;
 
     std::shared_ptr<MowContext> _mow_context;
+
+    int64_t _delete_bitmap_ns = 0;
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/rowset_writer.h 
b/be/src/olap/rowset/rowset_writer.h
index aea4a05bcc..e8d214e00d 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -41,7 +41,6 @@ struct FlushContext {
     TabletSchemaSPtr flush_schema = nullptr;
     const vectorized::Block* block = nullptr;
     std::optional<int32_t> segment_id = std::nullopt;
-    std::function<Status(int32_t)> generate_delete_bitmap;
 };
 
 class RowsetWriter {
@@ -82,11 +81,6 @@ public:
     // return nullptr when failed
     virtual RowsetSharedPtr build() = 0;
 
-    // we have to load segment data to build delete_bitmap for current segment,
-    // so we  build a tmp rowset ptr to load segment data.
-    // real build will be called in DeltaWriter close_wait.
-    virtual RowsetSharedPtr build_tmp() = 0;
-
     // For ordered rowset compaction, manual build rowset
     virtual RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& 
rowset_meta) = 0;
 
@@ -113,6 +107,8 @@ public:
     virtual vectorized::schema_util::LocalSchemaChangeRecorder*
     mutable_schema_change_recorder() = 0;
 
+    virtual int64_t delete_bitmap_ns() { return 0; }
+
 private:
     DISALLOW_COPY_AND_ASSIGN(RowsetWriter);
 };
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 4eb124a135..a47728941d 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2812,9 +2812,8 @@ void Tablet::sort_block(vectorized::Block& in_block, 
vectorized::Block& output_b
     std::vector<RowInBlock*> _row_in_blocks;
     _row_in_blocks.reserve(in_block.rows());
 
-    std::unique_ptr<Schema> schema(new Schema(_schema));
     std::shared_ptr<RowInBlockComparator> vec_row_comparator =
-            std::make_shared<RowInBlockComparator>(schema.get());
+            std::make_shared<RowInBlockComparator>(_schema.get());
     vec_row_comparator->set_block(&mutable_input_block);
 
     std::vector<RowInBlock*> row_in_blocks;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to