This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1c961f2272 [refactor](load) move generate_delete_bitmap from memtable
to beta rowset writer (#21329)
1c961f2272 is described below
commit 1c961f2272183e4ea8909a7c3c5e36d83a5cc420
Author: Kaijie Chen <[email protected]>
AuthorDate: Sat Jul 1 17:22:45 2023 +0800
[refactor](load) move generate_delete_bitmap from memtable to beta rowset
writer (#21329)
---
be/src/olap/delta_writer.cpp | 15 +---
be/src/olap/delta_writer.h | 2 -
be/src/olap/memtable.cpp | 144 +++++++-----------------------
be/src/olap/memtable.h | 35 +++-----
be/src/olap/rowset/beta_rowset_writer.cpp | 47 ++++++++--
be/src/olap/rowset/beta_rowset_writer.h | 15 ++--
be/src/olap/rowset/rowset_writer.h | 8 +-
be/src/olap/tablet.cpp | 3 +-
8 files changed, 95 insertions(+), 174 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 3c84e67103..ab615d7e7f 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -34,10 +34,8 @@
#include "common/logging.h"
#include "common/status.h"
#include "exec/tablet_info.h"
-#include "gutil/integral_types.h"
#include "gutil/strings/numbers.h"
#include "io/fs/file_writer.h" // IWYU pragma: keep
-#include "olap/data_dir.h"
#include "olap/memtable.h"
#include "olap/memtable_flush_executor.h"
#include "olap/olap_define.h"
@@ -47,8 +45,6 @@
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
-#include "olap/rowset/segment_v2/segment.h"
-#include "olap/schema.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
@@ -95,7 +91,7 @@ void DeltaWriter::_init_profile(RuntimeProfile* profile) {
_segment_writer_timer = ADD_TIMER(_profile, "SegmentWriterTime");
_wait_flush_timer = ADD_TIMER(_profile, "MemTableWaitFlushTime");
_put_into_output_timer = ADD_TIMER(_profile, "MemTablePutIntoOutputTime");
- _delete_bitmap_timer = ADD_TIMER(_profile, "MemTableDeleteBitmapTime");
+ _delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapTime");
_close_wait_timer = ADD_TIMER(_profile, "DeltaWriterCloseWaitTime");
_sort_times = ADD_COUNTER(_profile, "MemTableSortTimes", TUnit::UNIT);
_agg_times = ADD_COUNTER(_profile, "MemTableAggTimes", TUnit::UNIT);
@@ -206,7 +202,6 @@ Status DeltaWriter::init() {
_delete_bitmap);
RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &_rowset_writer));
- _schema.reset(new Schema(_tablet_schema));
_reset_mem_table();
// create flush handler
@@ -346,10 +341,8 @@ void DeltaWriter::_reset_mem_table() {
_mem_table_insert_trackers.push_back(mem_table_insert_tracker);
_mem_table_flush_trackers.push_back(mem_table_flush_tracker);
}
- auto mow_context = std::make_shared<MowContext>(_cur_max_version,
_req.txn_id, _rowset_ids,
- _delete_bitmap);
- _mem_table.reset(new MemTable(_tablet, _schema.get(),
_tablet_schema.get(), _req.slots,
- _req.tuple_desc, _rowset_writer.get(),
mow_context,
+ _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(),
_req.slots, _req.tuple_desc,
+ _rowset_writer.get(),
_tablet->enable_unique_key_merge_on_write(),
mem_table_insert_tracker,
mem_table_flush_tracker));
COUNTER_UPDATE(_segment_num, 1);
@@ -359,7 +352,6 @@ void DeltaWriter::_reset_mem_table() {
COUNTER_SET(_agg_timer, _memtable_stat.agg_ns);
COUNTER_SET(_memtable_duration_timer, _memtable_stat.duration_ns);
COUNTER_SET(_segment_writer_timer, _memtable_stat.segment_writer_ns);
- COUNTER_SET(_delete_bitmap_timer, _memtable_stat.delete_bitmap_ns);
COUNTER_SET(_put_into_output_timer, _memtable_stat.put_into_output_ns);
COUNTER_SET(_sort_times, _memtable_stat.sort_times);
COUNTER_SET(_agg_times, _memtable_stat.agg_times);
@@ -502,6 +494,7 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes&
slave_tablet_nodes,
}
}
COUNTER_UPDATE(_lock_timer, _lock_watch.elapsed_time() / 1000);
+ COUNTER_SET(_delete_bitmap_timer, _rowset_writer->delete_bitmap_ns());
return Status::OK();
}
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 52b407876f..5289068c6d 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -44,7 +44,6 @@ namespace doris {
class FlushToken;
class MemTable;
class MemTracker;
-class Schema;
class StorageEngine;
class TupleDescriptor;
class SlotDescriptor;
@@ -158,7 +157,6 @@ private:
std::unique_ptr<RowsetWriter> _rowset_writer;
// TODO: Recheck the lifetime of _mem_table, Look should use unique_ptr
std::unique_ptr<MemTable> _mem_table;
- std::unique_ptr<Schema> _schema;
//const TabletSchema* _tablet_schema;
// tablet schema owned by delta writer, all write will use this tablet
schema
// it's build from tablet_schema(stored when create tablet) and
OlapTableSchema
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index ace9f4f689..6d8975e84c 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -22,9 +22,7 @@
#include <pdqsort.h>
#include <algorithm>
-#include <cstddef>
#include <limits>
-#include <shared_mutex>
#include <string>
#include <utility>
#include <vector>
@@ -33,11 +31,7 @@
#include "common/consts.h"
#include "common/logging.h"
#include "olap/olap_define.h"
-#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_writer.h"
-#include "olap/rowset/segment_v2/segment.h"
-#include "olap/schema.h"
-#include "olap/schema_change.h"
#include "olap/tablet_schema.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
@@ -46,52 +40,45 @@
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
-#include "util/string_util.h"
#include "vec/aggregate_functions/aggregate_function_reader.h"
#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
#include "vec/columns/column.h"
#include "vec/columns/column_object.h"
-#include "vec/columns/column_string.h"
#include "vec/common/assert_cast.h"
#include "vec/common/schema_util.h"
#include "vec/core/column_with_type_and_name.h"
-#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_factory.hpp"
-#include "vec/json/path_in_data.h"
-#include "vec/jsonb/serialize.h"
namespace doris {
using namespace ErrorCode;
-MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema*
tablet_schema,
+MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs,
TupleDescriptor* tuple_desc,
- RowsetWriter* rowset_writer, std::shared_ptr<MowContext>
mow_context,
+ RowsetWriter* rowset_writer, bool enable_unique_key_mow,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker)
- : _tablet(std::move(tablet)),
- _keys_type(_tablet->keys_type()),
- _schema(schema),
+ : _tablet_id(tablet_id),
+ _enable_unique_key_mow(enable_unique_key_mow),
+ _keys_type(tablet_schema->keys_type()),
_tablet_schema(tablet_schema),
_insert_mem_tracker(insert_mem_tracker),
_flush_mem_tracker(flush_mem_tracker),
- _schema_size(_schema->schema_size()),
_rowset_writer(rowset_writer),
_is_first_insertion(true),
- _agg_functions(schema->num_columns()),
- _offsets_of_aggregate_states(schema->num_columns()),
+ _agg_functions(tablet_schema->num_columns()),
+ _offsets_of_aggregate_states(tablet_schema->num_columns()),
_total_size_of_aggregate_states(0),
- _mem_usage(0),
- _mow_context(mow_context) {
+ _mem_usage(0) {
#ifndef BE_TEST
_insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
- fmt::format("MemTableHookInsert:TabletId={}",
std::to_string(tablet_id())),
+ fmt::format("MemTableHookInsert:TabletId={}",
std::to_string(tablet_id)),
ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
#else
_insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
- fmt::format("MemTableHookInsert:TabletId={}",
std::to_string(tablet_id())));
+ fmt::format("MemTableHookInsert:TabletId={}",
std::to_string(tablet_id)));
#endif
_arena = std::make_unique<vectorized::Arena>();
- _vec_row_comparator = std::make_shared<RowInBlockComparator>(_schema);
+ _vec_row_comparator =
std::make_shared<RowInBlockComparator>(_tablet_schema);
// TODO: Support ZOrderComparator in the future
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
_num_columns = _tablet_schema->num_columns();
@@ -113,9 +100,9 @@ void MemTable::_init_columns_offset_by_slot_descs(const
std::vector<SlotDescript
}
void MemTable::_init_agg_functions(const vectorized::Block* block) {
- for (uint32_t cid = _schema->num_key_columns(); cid < _num_columns; ++cid)
{
+ for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns;
++cid) {
vectorized::AggregateFunctionPtr function;
- if (_keys_type == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
+ if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow) {
// In such table, non-key column's aggregation type is NONE, so we
need to construct
// the aggregate function manually.
function =
vectorized::AggregateFunctionSimpleFactory::instance().get(
@@ -130,7 +117,7 @@ void MemTable::_init_agg_functions(const vectorized::Block*
block) {
_agg_functions[cid] = function;
}
- for (uint32_t cid = _schema->num_key_columns(); cid < _num_columns; ++cid)
{
+ for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns;
++cid) {
_offsets_of_aggregate_states[cid] = _total_size_of_aggregate_states;
_total_size_of_aggregate_states += _agg_functions[cid]->size_of_data();
@@ -155,7 +142,7 @@ MemTable::~MemTable() {
}
// We should release agg_places here, because they are not
released when a
// load is canceled.
- for (size_t i = _schema->num_key_columns(); i < _num_columns; ++i)
{
+ for (size_t i = _tablet_schema->num_key_columns(); i <
_num_columns; ++i) {
auto function = _agg_functions[i];
DCHECK(function != nullptr);
function->destroy((*it)->agg_places(i));
@@ -172,7 +159,7 @@ MemTable::~MemTable() {
}
int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock*
right) const {
- return _pblock->compare_at(left->_row_pos, right->_row_pos,
_schema->num_key_columns(),
+ return _pblock->compare_at(left->_row_pos, right->_row_pos,
_tablet_schema->num_key_columns(),
*_pblock, -1);
}
@@ -217,26 +204,26 @@ void MemTable::insert(const vectorized::Block*
input_block, const std::vector<in
}
void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock&
mutable_block,
- RowInBlock* new_row, RowInBlock*
row_in_skiplist) {
+ RowInBlock* src_row, RowInBlock*
dst_row) {
if (_tablet_schema->has_sequence_col()) {
auto sequence_idx = _tablet_schema->sequence_col_idx();
DCHECK_LT(sequence_idx, mutable_block.columns());
auto col_ptr = mutable_block.mutable_columns()[sequence_idx].get();
- auto res = col_ptr->compare_at(row_in_skiplist->_row_pos,
new_row->_row_pos, *col_ptr, -1);
+ auto res = col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos,
*col_ptr, -1);
// dst sequence column larger than src, don't need to update
if (res > 0) {
return;
}
- // need to update the row pos in skiplist to the new row pos when has
+ // need to update the row pos in dst row to the src row pos when has
// sequence column
- row_in_skiplist->_row_pos = new_row->_row_pos;
+ dst_row->_row_pos = src_row->_row_pos;
}
// dst is non-sequence row, or dst sequence is smaller
- for (uint32_t cid = _schema->num_key_columns(); cid < _num_columns; ++cid)
{
+ for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns;
++cid) {
auto col_ptr = mutable_block.mutable_columns()[cid].get();
- _agg_functions[cid]->add(row_in_skiplist->agg_places(cid),
+ _agg_functions[cid]->add(dst_row->agg_places(cid),
const_cast<const
doris::vectorized::IColumn**>(&col_ptr),
- new_row->_row_pos, _arena.get());
+ src_row->_row_pos, _arena.get());
}
}
void MemTable::_put_into_output(vectorized::Block& in_block) {
@@ -257,7 +244,7 @@ size_t MemTable::_sort() {
size_t same_keys_num = 0;
// sort new rows
Tie tie = Tie(_last_sorted_pos, _row_in_blocks.size());
- for (size_t i = 0; i < _schema->num_key_columns(); i++) {
+ for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) {
auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int {
return _input_mutable_block.compare_one_column(lhs->_row_pos,
rhs->_row_pos, i, -1);
};
@@ -311,13 +298,13 @@ void MemTable::_finalize_one_row(RowInBlock* row,
const vectorized::ColumnsWithTypeAndName&
block_data,
int row_pos) {
// move key columns
- for (size_t i = 0; i < _schema->num_key_columns(); ++i) {
+ for (size_t i = 0; i < _tablet_schema->num_key_columns(); ++i) {
_output_mutable_block.get_column_by_position(i)->insert_from(*block_data[i].column.get(),
row->_row_pos);
}
if (row->has_init_agg()) {
// get value columns from agg_places
- for (size_t i = _schema->num_key_columns(); i < _num_columns; ++i) {
+ for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns;
++i) {
auto function = _agg_functions[i];
auto agg_place = row->agg_places(i);
auto col_ptr =
_output_mutable_block.get_column_by_position(i).get();
@@ -335,7 +322,7 @@ void MemTable::_finalize_one_row(RowInBlock* row,
}
} else {
// move columns for rows do not need agg
- for (size_t i = _schema->num_key_columns(); i < _num_columns; ++i) {
+ for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns;
++i) {
_output_mutable_block.get_column_by_position(i)->insert_from(
*block_data[i].column.get(), row->_row_pos);
}
@@ -366,7 +353,8 @@ void MemTable::_aggregate() {
prev_row->init_agg_places(
_arena->aligned_alloc(_total_size_of_aggregate_states,
16),
_offsets_of_aggregate_states.data());
- for (auto cid = _schema->num_key_columns(); cid <
_schema->num_columns(); cid++) {
+ for (auto cid = _tablet_schema->num_key_columns();
+ cid < _tablet_schema->num_columns(); cid++) {
auto col_ptr = mutable_block.mutable_columns()[cid].get();
auto data = prev_row->agg_places(cid);
_agg_functions[cid]->create(data);
@@ -443,41 +431,6 @@ bool MemTable::need_agg() const {
return false;
}
-Status MemTable::_generate_delete_bitmap(int32_t segment_id) {
- SCOPED_RAW_TIMER(&_stat.delete_bitmap_ns);
- // generate delete bitmap, build a tmp rowset and load recent segment
- if (!_tablet->enable_unique_key_merge_on_write()) {
- return Status::OK();
- }
- auto rowset = _rowset_writer->build_tmp();
- auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
- std::vector<segment_v2::SegmentSharedPtr> segments;
- RETURN_IF_ERROR(beta_rowset->load_segments(segment_id, segment_id + 1,
&segments));
- std::vector<RowsetSharedPtr> specified_rowsets;
- {
- std::shared_lock meta_rlock(_tablet->get_header_lock());
- // tablet is under alter process. The delete bitmap will be calculated
after conversion.
- if (_tablet->tablet_state() == TABLET_NOTREADY &&
- SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
- return Status::OK();
- }
- specified_rowsets =
_tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
- }
- OlapStopWatch watch;
- RETURN_IF_ERROR(_tablet->calc_delete_bitmap(rowset, segments,
specified_rowsets,
- _mow_context->delete_bitmap,
- _mow_context->max_version));
- size_t total_rows = std::accumulate(
- segments.begin(), segments.end(), 0,
- [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
- LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " <<
tablet_id()
- << ", rowset_ids: " << _mow_context->rowset_ids.size()
- << ", cur max_version: " << _mow_context->max_version
- << ", transaction_id: " << _mow_context->txn_id
- << ", cost: " << watch.get_elapse_time_us() << "(us), total
rows: " << total_rows;
- return Status::OK();
-}
-
Status MemTable::flush() {
VLOG_CRITICAL << "begin to flush memtable for tablet: " << tablet_id()
<< ", memsize: " << memory_usage() << ", rows: " <<
_stat.raw_rows;
@@ -501,7 +454,7 @@ Status MemTable::_do_flush() {
SCOPED_CONSUME_MEM_TRACKER(_flush_mem_tracker);
size_t same_keys_num = _sort();
if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
- if (_keys_type == KeysType::DUP_KEYS && _schema->num_key_columns() ==
0) {
+ if (_keys_type == KeysType::DUP_KEYS &&
_tablet_schema->num_key_columns() == 0) {
_output_mutable_block.swap(_input_mutable_block);
} else {
vectorized::Block in_block = _input_mutable_block.to_block();
@@ -517,11 +470,6 @@ Status MemTable::_do_flush() {
// Unfold variant column
RETURN_IF_ERROR(unfold_variant_column(block, &ctx));
}
- if (!_tablet_schema->is_partial_update()) {
- ctx.generate_delete_bitmap = [this](size_t segment_id) {
- return _generate_delete_bitmap(segment_id);
- };
- }
ctx.segment_id = _segment_id;
SCOPED_RAW_TIMER(&_stat.segment_writer_ns);
RETURN_IF_ERROR(_rowset_writer->flush_single_memtable(&block,
&_flush_size, &ctx));
@@ -609,36 +557,4 @@ Status MemTable::unfold_variant_column(vectorized::Block&
block, FlushContext* c
return Status::OK();
}
-void MemTable::serialize_block_to_row_column(vectorized::Block& block) {
- if (block.rows() == 0) {
- return;
- }
- MonotonicStopWatch watch;
- watch.start();
- // find row column id
- int row_column_id = 0;
- for (int i = 0; i < _num_columns; ++i) {
- if (_tablet_schema->column(i).is_row_store_column()) {
- row_column_id = i;
- break;
- }
- }
- if (row_column_id == 0) {
- return;
- }
- vectorized::ColumnString* row_store_column =
-
static_cast<vectorized::ColumnString*>(block.get_by_position(row_column_id)
-
.column->assume_mutable_ref()
- .assume_mutable()
- .get());
- row_store_column->clear();
- vectorized::DataTypeSerDeSPtrs serdes =
- vectorized::create_data_type_serdes(block.get_data_types());
- vectorized::JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block,
*row_store_column,
-
_tablet_schema->num_columns(), serdes);
- VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ",
row_column_id:" << row_column_id
- << ", total_byte_size:" << block.allocated_bytes() << ",
serialize_cost(us)"
- << watch.elapsed_time() / 1000;
-}
-
} // namespace doris
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 912f2f4211..49efd77143 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -31,8 +31,6 @@
#include "common/status.h"
#include "gutil/integral_types.h"
#include "olap/olap_common.h"
-#include "olap/tablet.h"
-#include "olap/tablet_meta.h"
#include "runtime/memory/mem_tracker.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/common/arena.h"
@@ -75,8 +73,8 @@ public:
class Iter {
public:
Iter(Tie& tie) : _tie(tie), _next(tie._begin + 1) {}
- size_t left() { return _left; }
- size_t right() { return _right; }
+ size_t left() const { return _left; }
+ size_t right() const { return _right; }
// return false means no more ranges
bool next() {
@@ -130,7 +128,7 @@ private:
class RowInBlockComparator {
public:
- RowInBlockComparator(const Schema* schema) : _schema(schema) {}
+ RowInBlockComparator(const TabletSchema* tablet_schema) :
_tablet_schema(tablet_schema) {}
// call set_block before operator().
// only first time insert block to create _input_mutable_block,
// so can not Comparator of construct to set pblock
@@ -138,8 +136,8 @@ public:
int operator()(const RowInBlock* left, const RowInBlock* right) const;
private:
- const Schema* _schema;
- vectorized::MutableBlock* _pblock; // 对应Memtable::_input_mutable_block
+ const TabletSchema* _tablet_schema;
+ vectorized::MutableBlock* _pblock; // corresponds to
Memtable::_input_mutable_block
};
class MemTableStat {
@@ -150,7 +148,6 @@ public:
sort_ns += stat.sort_ns;
agg_ns += stat.agg_ns;
put_into_output_ns += stat.put_into_output_ns;
- delete_bitmap_ns += stat.delete_bitmap_ns;
segment_writer_ns += stat.segment_writer_ns;
duration_ns += stat.duration_ns;
sort_times += stat.sort_times;
@@ -164,7 +161,6 @@ public:
int64_t sort_ns = 0;
int64_t agg_ns = 0;
int64_t put_into_output_ns = 0;
- int64_t delete_bitmap_ns = 0;
int64_t segment_writer_ns = 0;
int64_t duration_ns = 0;
int64_t sort_times = 0;
@@ -173,14 +169,14 @@ public:
class MemTable {
public:
- MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema*
tablet_schema,
+ MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor*
tuple_desc,
- RowsetWriter* rowset_writer, std::shared_ptr<MowContext>
mow_context,
+ RowsetWriter* rowset_writer, bool enable_unique_key_mow,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker);
~MemTable();
- int64_t tablet_id() const { return _tablet->tablet_id(); }
+ int64_t tablet_id() const { return _tablet_id; }
size_t memory_usage() const {
return _insert_mem_tracker->consumption() + _arena->used_size() +
_flush_mem_tracker->consumption();
@@ -216,12 +212,6 @@ private:
void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block,
RowInBlock* new_row,
RowInBlock* row_in_skiplist);
- Status _generate_delete_bitmap(int32_t segment_id);
-
- // serialize block to row store format and append serialized data into row
store column
- // in block
- void serialize_block_to_row_column(vectorized::Block& block);
-
// Unfold variant column to Block
// Eg. [A | B | C | (D, E, F)]
// After unfold block structure changed to -> [A | B | C | D | E | F]
@@ -230,9 +220,9 @@ private:
Status unfold_variant_column(vectorized::Block& block, FlushContext* ctx);
private:
- TabletSharedPtr _tablet;
+ int64_t _tablet_id;
+ bool _enable_unique_key_mow = false;
const KeysType _keys_type;
- Schema* _schema;
const TabletSchema* _tablet_schema;
std::shared_ptr<RowInBlockComparator> _vec_row_comparator;
@@ -253,11 +243,6 @@ private:
std::unique_ptr<vectorized::Arena> _arena;
// The object buffer pool for convert tuple to row
ObjectPool _agg_buffer_pool;
- // Only the rows will be inserted into SkipList can acquire the owner ship
from
- // `_agg_buffer_pool`
- ObjectPool _agg_object_pool;
-
- size_t _schema_size;
void _init_columns_offset_by_slot_descs(const
std::vector<SlotDescriptor*>* slot_descs,
const TupleDescriptor* tuple_desc);
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 9e79f9287b..3af5bb1c46 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -24,7 +24,6 @@
#include <ctime> // time
#include <filesystem>
-#include <memory>
#include <sstream>
#include <utility>
@@ -32,12 +31,10 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
-#include "gutil/integral_types.h"
#include "gutil/strings/substitute.h"
#include "io/fs/file_reader_options.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
-#include "olap/data_dir.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_factory.h"
@@ -46,10 +43,9 @@
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/rowset/segment_v2/segment_writer.h"
+#include "olap/schema_change.h"
#include "olap/storage_engine.h"
-#include "olap/tablet.h"
#include "olap/tablet_schema.h"
-#include "segcompaction.h"
#include "util/slice.h"
#include "util/time.h"
#include "vec/common/schema_util.h" // LocalSchemaChangeRecorder
@@ -141,6 +137,41 @@ Status BetaRowsetWriter::add_block(const
vectorized::Block* block) {
return _add_block(block, &_segment_writer);
}
+Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
+ SCOPED_RAW_TIMER(&_delete_bitmap_ns);
+ if (!_context.tablet->enable_unique_key_merge_on_write() ||
+ _context.tablet_schema->is_partial_update()) {
+ return Status::OK();
+ }
+ auto rowset = _build_tmp();
+ auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
+ std::vector<segment_v2::SegmentSharedPtr> segments;
+ RETURN_IF_ERROR(beta_rowset->load_segments(segment_id, segment_id + 1,
&segments));
+ std::vector<RowsetSharedPtr> specified_rowsets;
+ {
+ std::shared_lock meta_rlock(_context.tablet->get_header_lock());
+ // tablet is under alter process. The delete bitmap will be calculated
after conversion.
+ if (_context.tablet->tablet_state() == TABLET_NOTREADY &&
+
SchemaChangeHandler::tablet_in_converting(_context.tablet->tablet_id())) {
+ return Status::OK();
+ }
+ specified_rowsets =
_context.tablet->get_rowset_by_ids(&_context.mow_context->rowset_ids);
+ }
+ OlapStopWatch watch;
+ RETURN_IF_ERROR(_context.tablet->calc_delete_bitmap(rowset, segments,
specified_rowsets,
+
_context.mow_context->delete_bitmap,
+
_context.mow_context->max_version));
+ size_t total_rows = std::accumulate(
+ segments.begin(), segments.end(), 0,
+ [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
+ LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " <<
_context.tablet->tablet_id()
+ << ", rowset_ids: " << _context.mow_context->rowset_ids.size()
+ << ", cur max_version: " << _context.mow_context->max_version
+ << ", transaction_id: " << _context.mow_context->txn_id
+ << ", cost: " << watch.get_elapse_time_us() << "(us), total
rows: " << total_rows;
+ return Status::OK();
+}
+
Status BetaRowsetWriter::_load_noncompacted_segments(
std::vector<segment_v2::SegmentSharedPtr>* segments, size_t num) {
auto fs = _rowset_meta->fs();
@@ -501,9 +532,7 @@ Status BetaRowsetWriter::flush_single_memtable(const
vectorized::Block* block, i
DCHECK_EQ(writer.get(), raw_writer);
}
RETURN_IF_ERROR(_flush_segment_writer(&writer, flush_size));
- if (ctx != nullptr && ctx->generate_delete_bitmap) {
- RETURN_IF_ERROR(ctx->generate_delete_bitmap(segment_id));
- }
+ RETURN_IF_ERROR(_generate_delete_bitmap(segment_id));
RETURN_IF_ERROR(_segcompaction_if_necessary());
return Status::OK();
}
@@ -684,7 +713,7 @@ void
BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met
}
}
-RowsetSharedPtr BetaRowsetWriter::build_tmp() {
+RowsetSharedPtr BetaRowsetWriter::_build_tmp() {
std::shared_ptr<RowsetMeta> rowset_meta_ = std::make_shared<RowsetMeta>();
*rowset_meta_ = *_rowset_meta;
_build_rowset_meta(rowset_meta_);
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index b646f2a681..f0057c9dec 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -86,10 +86,6 @@ public:
RowsetSharedPtr build() override;
- // build a tmp rowset for load segment to calc delete_bitmap
- // for this segment
- RowsetSharedPtr build_tmp() override;
-
RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta)
override;
Version version() override { return _context.version; }
@@ -123,6 +119,10 @@ public:
Status wait_flying_segcompaction() override;
+ void set_segment_start_id(int32_t start_id) override { _segment_start_id =
start_id; }
+
+ int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; }
+
private:
Status _do_add_block(const vectorized::Block* block,
std::unique_ptr<segment_v2::SegmentWriter>*
segment_writer,
@@ -138,6 +138,7 @@ private:
const FlushContext* ctx = nullptr);
Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>*
writer,
int64_t* flush_size = nullptr);
+ Status _generate_delete_bitmap(int32_t segment_id);
void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
Status _segcompaction_if_necessary();
Status _segcompaction_ramaining_if_necessary();
@@ -157,7 +158,9 @@ private:
Status _rename_compacted_segment_plain(uint64_t seg_id);
Status _rename_compacted_indices(int64_t begin, int64_t end, uint64_t
seg_id);
- void set_segment_start_id(int32_t start_id) override { _segment_start_id =
start_id; }
+ // build a tmp rowset for load segment to calc delete_bitmap
+ // for this segment
+ RowsetSharedPtr _build_tmp();
protected:
RowsetWriterContext _context;
@@ -219,6 +222,8 @@ protected:
fmt::memory_buffer vlog_buffer;
std::shared_ptr<MowContext> _mow_context;
+
+ int64_t _delete_bitmap_ns = 0;
};
} // namespace doris
diff --git a/be/src/olap/rowset/rowset_writer.h
b/be/src/olap/rowset/rowset_writer.h
index aea4a05bcc..e8d214e00d 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -41,7 +41,6 @@ struct FlushContext {
TabletSchemaSPtr flush_schema = nullptr;
const vectorized::Block* block = nullptr;
std::optional<int32_t> segment_id = std::nullopt;
- std::function<Status(int32_t)> generate_delete_bitmap;
};
class RowsetWriter {
@@ -82,11 +81,6 @@ public:
// return nullptr when failed
virtual RowsetSharedPtr build() = 0;
- // we have to load segment data to build delete_bitmap for current segment,
- // so we build a tmp rowset ptr to load segment data.
- // real build will be called in DeltaWriter close_wait.
- virtual RowsetSharedPtr build_tmp() = 0;
-
// For ordered rowset compaction, manual build rowset
virtual RowsetSharedPtr manual_build(const RowsetMetaSharedPtr&
rowset_meta) = 0;
@@ -113,6 +107,8 @@ public:
virtual vectorized::schema_util::LocalSchemaChangeRecorder*
mutable_schema_change_recorder() = 0;
+ virtual int64_t delete_bitmap_ns() { return 0; }
+
private:
DISALLOW_COPY_AND_ASSIGN(RowsetWriter);
};
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 4eb124a135..a47728941d 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2812,9 +2812,8 @@ void Tablet::sort_block(vectorized::Block& in_block,
vectorized::Block& output_b
std::vector<RowInBlock*> _row_in_blocks;
_row_in_blocks.reserve(in_block.rows());
- std::unique_ptr<Schema> schema(new Schema(_schema));
std::shared_ptr<RowInBlockComparator> vec_row_comparator =
- std::make_shared<RowInBlockComparator>(schema.get());
+ std::make_shared<RowInBlockComparator>(_schema.get());
vec_row_comparator->set_block(&mutable_input_block);
std::vector<RowInBlock*> row_in_blocks;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]