This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dac2b638c6 [refactor](load) move memtable flush logic to flush token
and rowset writer (#21547)
dac2b638c6 is described below
commit dac2b638c6924cac8902606662fc7744470e9458
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Jul 6 17:04:30 2023 +0800
[refactor](load) move memtable flush logic to flush token and rowset writer
(#21547)
---
be/src/olap/delta_writer.cpp | 35 ++++----
be/src/olap/delta_writer.h | 2 -
be/src/olap/memtable.cpp | 129 +-----------------------------
be/src/olap/memtable.h | 39 ++-------
be/src/olap/memtable_flush_executor.cpp | 59 ++++++++++----
be/src/olap/memtable_flush_executor.h | 17 +++-
be/src/olap/rowset/beta_rowset_writer.cpp | 98 ++++++++++++++++++++++-
be/src/olap/rowset/beta_rowset_writer.h | 23 ++++--
be/src/olap/rowset/rowset_writer.h | 15 ++--
be/src/olap/tablet.cpp | 2 +-
10 files changed, 205 insertions(+), 214 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index ab615d7e7f..0fc42cc219 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -209,7 +209,7 @@ Status DeltaWriter::init() {
// we can make sure same keys sort in the same order in all replicas.
bool should_serial = false;
RETURN_IF_ERROR(_storage_engine->memtable_flush_executor()->create_flush_token(
- &_flush_token, _rowset_writer->type(), should_serial,
_req.is_high_priority));
+ _flush_token, _rowset_writer.get(), should_serial,
_req.is_high_priority));
_is_init = true;
return Status::OK();
@@ -263,10 +263,6 @@ Status DeltaWriter::write(const vectorized::Block* block,
const std::vector<int>
}
Status DeltaWriter::_flush_memtable_async() {
- if (_mem_table->empty()) {
- return Status::OK();
- }
- _mem_table->assign_segment_id();
return _flush_token->submit(std::move(_mem_table));
}
@@ -342,22 +338,10 @@ void DeltaWriter::_reset_mem_table() {
_mem_table_flush_trackers.push_back(mem_table_flush_tracker);
}
_mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(),
_req.slots, _req.tuple_desc,
- _rowset_writer.get(),
_tablet->enable_unique_key_merge_on_write(),
+ _tablet->enable_unique_key_merge_on_write(),
mem_table_insert_tracker,
mem_table_flush_tracker));
COUNTER_UPDATE(_segment_num, 1);
- _mem_table->set_callback([this](MemTableStat& stat) {
- _memtable_stat += stat;
- COUNTER_SET(_sort_timer, _memtable_stat.sort_ns);
- COUNTER_SET(_agg_timer, _memtable_stat.agg_ns);
- COUNTER_SET(_memtable_duration_timer, _memtable_stat.duration_ns);
- COUNTER_SET(_segment_writer_timer, _memtable_stat.segment_writer_ns);
- COUNTER_SET(_put_into_output_timer, _memtable_stat.put_into_output_ns);
- COUNTER_SET(_sort_times, _memtable_stat.sort_times);
- COUNTER_SET(_agg_times, _memtable_stat.agg_times);
- COUNTER_SET(_raw_rows_num, _memtable_stat.raw_rows);
- COUNTER_SET(_merged_rows_num, _memtable_stat.merged_rows);
- });
}
Status DeltaWriter::close() {
@@ -417,10 +401,11 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes&
slave_tablet_nodes,
_mem_table.reset();
- if (_rowset_writer->num_rows() + _memtable_stat.merged_rows !=
_total_received_rows) {
+ if (_rowset_writer->num_rows() + _flush_token->memtable_stat().merged_rows
!=
+ _total_received_rows) {
LOG(WARNING) << "the rows number written doesn't match, rowset num
rows written to file: "
<< _rowset_writer->num_rows()
- << ", merged_rows: " << _memtable_stat.merged_rows
+ << ", merged_rows: " <<
_flush_token->memtable_stat().merged_rows
<< ", total received rows: " << _total_received_rows;
return Status::InternalError("rows number written by delta writer
dosen't match");
}
@@ -495,6 +480,16 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes&
slave_tablet_nodes,
}
COUNTER_UPDATE(_lock_timer, _lock_watch.elapsed_time() / 1000);
COUNTER_SET(_delete_bitmap_timer, _rowset_writer->delete_bitmap_ns());
+ COUNTER_SET(_segment_writer_timer, _rowset_writer->segment_writer_ns());
+ const auto& memtable_stat = _flush_token->memtable_stat();
+ COUNTER_SET(_sort_timer, memtable_stat.sort_ns);
+ COUNTER_SET(_agg_timer, memtable_stat.agg_ns);
+ COUNTER_SET(_memtable_duration_timer, memtable_stat.duration_ns);
+ COUNTER_SET(_put_into_output_timer, memtable_stat.put_into_output_ns);
+ COUNTER_SET(_sort_times, memtable_stat.sort_times);
+ COUNTER_SET(_agg_times, memtable_stat.agg_times);
+ COUNTER_SET(_raw_rows_num, memtable_stat.raw_rows);
+ COUNTER_SET(_merged_rows_num, memtable_stat.merged_rows);
return Status::OK();
}
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 98202dac35..c12a53c4fa 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -200,8 +200,6 @@ private:
RuntimeProfile::Counter* _merged_rows_num = nullptr;
MonotonicStopWatch _lock_watch;
-
- MemTableStat _memtable_stat;
};
} // namespace doris
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index ae0165b9fc..c7d970968a 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -24,37 +24,26 @@
#include <algorithm>
#include <limits>
#include <string>
-#include <utility>
#include <vector>
#include "common/config.h"
-#include "common/consts.h"
-#include "common/logging.h"
#include "olap/olap_define.h"
-#include "olap/rowset/rowset_writer.h"
#include "olap/tablet_schema.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/load_channel_mgr.h"
-#include "runtime/thread_context.h"
-#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/aggregate_functions/aggregate_function_reader.h"
#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
#include "vec/columns/column.h"
-#include "vec/columns/column_object.h"
-#include "vec/common/assert_cast.h"
-#include "vec/common/schema_util.h"
-#include "vec/core/column_with_type_and_name.h"
-#include "vec/data_types/data_type_factory.hpp"
namespace doris {
using namespace ErrorCode;
MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs,
TupleDescriptor* tuple_desc,
- RowsetWriter* rowset_writer, bool enable_unique_key_mow,
+ bool enable_unique_key_mow,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker)
: _tablet_id(tablet_id),
@@ -63,7 +52,6 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema*
tablet_schema,
_tablet_schema(tablet_schema),
_insert_mem_tracker(insert_mem_tracker),
_flush_mem_tracker(flush_mem_tracker),
- _rowset_writer(rowset_writer),
_is_first_insertion(true),
_agg_functions(tablet_schema->num_columns()),
_offsets_of_aggregate_states(tablet_schema->num_columns()),
@@ -435,27 +423,7 @@ bool MemTable::need_agg() const {
return false;
}
-Status MemTable::flush() {
- VLOG_CRITICAL << "begin to flush memtable for tablet: " << tablet_id()
- << ", memsize: " << memory_usage() << ", rows: " <<
_stat.raw_rows;
- // For merge_on_write table, it must get all segments in this flush.
- // The id of new segment is set by the _num_segment of beta_rowset_writer,
- // and new segment ids is between [atomic_num_segments_before_flush,
atomic_num_segments_after_flush),
- // and use the ids to load segment data file for calc delete bitmap.
- int64_t duration_ns;
- SCOPED_RAW_TIMER(&duration_ns);
- SKIP_MEMORY_CHECK(RETURN_IF_ERROR(_do_flush()));
- _delta_writer_callback(_stat);
- DorisMetrics::instance()->memtable_flush_total->increment(1);
-
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns /
1000);
- VLOG_CRITICAL << "after flush memtable for tablet: " << tablet_id()
- << ", flushsize: " << _flush_size;
-
- return Status::OK();
-}
-
-Status MemTable::_do_flush() {
- SCOPED_CONSUME_MEM_TRACKER(_flush_mem_tracker);
+std::unique_ptr<vectorized::Block> MemTable::to_block() {
size_t same_keys_num = _sort();
if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
if (_keys_type == KeysType::DUP_KEYS &&
_tablet_schema->num_key_columns() == 0) {
@@ -467,98 +435,7 @@ Status MemTable::_do_flush() {
} else {
_aggregate<true>();
}
- vectorized::Block block = _output_mutable_block.to_block();
- FlushContext ctx;
- ctx.block = █
- if (_tablet_schema->is_dynamic_schema()) {
- // Unfold variant column
- RETURN_IF_ERROR(unfold_variant_column(block, &ctx));
- }
- ctx.segment_id = _segment_id;
- SCOPED_RAW_TIMER(&_stat.segment_writer_ns);
- RETURN_IF_ERROR(_rowset_writer->flush_single_memtable(&block,
&_flush_size, &ctx));
- return Status::OK();
-}
-
-void MemTable::assign_segment_id() {
- _segment_id = std::optional<int32_t>
{_rowset_writer->allocate_segment_id()};
-}
-
-Status MemTable::close() {
- return flush();
-}
-
-Status MemTable::unfold_variant_column(vectorized::Block& block, FlushContext*
ctx) {
- if (block.rows() == 0) {
- return Status::OK();
- }
-
- // Sanitize block to match exactly from the same type of frontend meta
- vectorized::schema_util::FullBaseSchemaView schema_view;
- schema_view.table_id = _tablet_schema->table_id();
- vectorized::ColumnWithTypeAndName* variant_column =
- block.try_get_by_name(BeConsts::DYNAMIC_COLUMN_NAME);
- if (!variant_column) {
- return Status::OK();
- }
- auto base_column = variant_column->column;
- vectorized::ColumnObject& object_column =
-
assert_cast<vectorized::ColumnObject&>(base_column->assume_mutable_ref());
- if (object_column.empty()) {
- block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
- return Status::OK();
- }
- object_column.finalize();
- // Has extended columns
-
RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
- // Dynamic Block consists of two parts, dynamic part of columns and static
part of columns
- // static dynamic
- // | ----- | ------- |
- // The static ones are original _tablet_schame columns
- TabletSchemaSPtr flush_schema =
std::make_shared<TabletSchema>(*_tablet_schema);
- vectorized::Block flush_block(std::move(block));
- // The dynamic ones are auto generated and extended, append them the the
orig_block
- for (auto& entry : object_column.get_subcolumns()) {
- const std::string& column_name = entry->path.get_path();
- auto column_iter = schema_view.column_name_to_column.find(column_name);
- if (UNLIKELY(column_iter == schema_view.column_name_to_column.end())) {
- // Column maybe dropped by light weight schema change DDL
- continue;
- }
- TabletColumn column(column_iter->second);
- auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(
- column, column.is_nullable());
- // Dynamic generated columns does not appear in original tablet schema
- if (_tablet_schema->field_index(column.name()) < 0) {
- flush_schema->append_column(column);
- flush_block.insert({data_type->create_column(), data_type,
column.name()});
- }
- }
-
- // Ensure column are all present at this schema version.Otherwise there
will be some senario:
- // Load1 -> version(10) with schema [a, b, c, d, e], d & e is new added
columns and schema version became 10
- // Load2 -> version(10) with schema [a, b, c] and has no extended columns
and fetched the schema at version 10
- // Load2 will persist meta with [a, b, c] but Load1 will persist meta
with [a, b, c, d, e]
- // So we should make sure that rowset at the same schema version alawys
contain the same size of columns.
- // so that all columns at schema_version is in either _tablet_schema or
schema_change_recorder
- for (const auto& [name, column] : schema_view.column_name_to_column) {
- if (_tablet_schema->field_index(name) == -1) {
- const auto& tcolumn = schema_view.column_name_to_column[name];
- TabletColumn new_column(tcolumn);
-
_rowset_writer->mutable_schema_change_recorder()->add_extended_columns(
- column, schema_view.schema_version);
- }
- }
-
- // Last schema alignment before flush to disk, due to the schema maybe
variant before this procedure
- // Eg. add columnA(INT) -> drop ColumnA -> add ColumnA(Double), then
columnA could be type of `Double`,
- // unfold will cast to Double type
- RETURN_IF_ERROR(vectorized::schema_util::unfold_object(
- flush_block.get_position_by_name(BeConsts::DYNAMIC_COLUMN_NAME),
flush_block, true));
- flush_block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
- ctx->flush_schema = flush_schema;
- block.swap(flush_block);
- return Status::OK();
+ return vectorized::Block::create_unique(_output_mutable_block.to_block());
}
} // namespace doris
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 49efd77143..aac071f8a4 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -38,13 +38,11 @@
namespace doris {
-class RowsetWriter;
class Schema;
class SlotDescriptor;
class TabletSchema;
class TupleDescriptor;
enum KeysType : int;
-struct FlushContext;
// row pos in _input_mutable_block
struct RowInBlock {
@@ -142,13 +140,12 @@ private:
class MemTableStat {
public:
- MemTableStat& operator+=(MemTableStat& stat) {
+ MemTableStat& operator+=(const MemTableStat& stat) {
raw_rows += stat.raw_rows;
merged_rows += stat.merged_rows;
sort_ns += stat.sort_ns;
agg_ns += stat.agg_ns;
put_into_output_ns += stat.put_into_output_ns;
- segment_writer_ns += stat.segment_writer_ns;
duration_ns += stat.duration_ns;
sort_times += stat.sort_times;
agg_times += stat.agg_times;
@@ -161,7 +158,6 @@ public:
int64_t sort_ns = 0;
int64_t agg_ns = 0;
int64_t put_into_output_ns = 0;
- int64_t segment_writer_ns = 0;
int64_t duration_ns = 0;
int64_t sort_times = 0;
int64_t agg_times = 0;
@@ -171,8 +167,7 @@ class MemTable {
public:
MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor*
tuple_desc,
- RowsetWriter* rowset_writer, bool enable_unique_key_mow,
- const std::shared_ptr<MemTracker>& insert_mem_tracker,
+ bool enable_unique_key_mow, const std::shared_ptr<MemTracker>&
insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker);
~MemTable();
@@ -191,34 +186,19 @@ public:
bool need_agg() const;
- /// Flush
- Status flush();
- Status close();
-
- int64_t flush_size() const { return _flush_size; }
-
- void set_callback(std::function<void(MemTableStat&)> callback) {
- _delta_writer_callback = callback;
- }
+ std::unique_ptr<vectorized::Block> to_block();
bool empty() const { return _input_mutable_block.rows() == 0; }
- void assign_segment_id();
-private:
- Status _do_flush();
+ const MemTableStat& stat() { return _stat; }
+
+ std::shared_ptr<MemTracker> flush_mem_tracker() { return
_flush_mem_tracker; }
private:
// for vectorized
void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block,
RowInBlock* new_row,
RowInBlock* row_in_skiplist);
- // Unfold variant column to Block
- // Eg. [A | B | C | (D, E, F)]
- // After unfold block structure changed to -> [A | B | C | D | E | F]
- // The expanded D, E, F is dynamic part of the block
- // The flushed Block columns should match exactly from the same type of
frontend meta
- Status unfold_variant_column(vectorized::Block& block, FlushContext* ctx);
-
private:
int64_t _tablet_id;
bool _enable_unique_key_mow = false;
@@ -248,10 +228,6 @@ private:
const TupleDescriptor* tuple_desc);
std::vector<int> _column_offset;
- RowsetWriter* _rowset_writer;
-
- // the data size flushed on disk of this memtable
- int64_t _flush_size = 0;
// Number of rows inserted to this memtable.
// This is not the rows in this memtable, because rows may be merged
// in unique or aggregate key model.
@@ -273,8 +249,6 @@ private:
void _aggregate();
void _put_into_output(vectorized::Block& in_block);
bool _is_first_insertion;
- std::function<void(MemTableStat&)> _delta_writer_callback;
- std::optional<int32_t> _segment_id = std::nullopt;
void _init_agg_functions(const vectorized::Block* block);
std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
@@ -284,7 +258,6 @@ private:
// Memory usage without _arena.
size_t _mem_usage;
- std::shared_ptr<MowContext> _mow_context;
size_t _num_columns;
}; // class MemTable
diff --git a/be/src/olap/memtable_flush_executor.cpp
b/be/src/olap/memtable_flush_executor.cpp
index 9c5b92ed10..9f823c347d 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -26,6 +26,8 @@
#include "common/config.h"
#include "common/logging.h"
#include "olap/memtable.h"
+#include "olap/rowset/rowset_writer.h"
+#include "util/doris_metrics.h"
#include "util/stopwatch.hpp"
#include "util/time.h"
@@ -35,21 +37,23 @@ using namespace ErrorCode;
class MemtableFlushTask final : public Runnable {
public:
MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable>
memtable,
- int64_t submit_task_time)
+ int32_t segment_id, int64_t submit_task_time)
: _flush_token(flush_token),
_memtable(std::move(memtable)),
+ _segment_id(segment_id),
_submit_task_time(submit_task_time) {}
~MemtableFlushTask() override = default;
void run() override {
- _flush_token->_flush_memtable(_memtable.get(), _submit_task_time);
+ _flush_token->_flush_memtable(_memtable.get(), _segment_id,
_submit_task_time);
_memtable.reset();
}
private:
FlushToken* _flush_token;
std::unique_ptr<MemTable> _memtable;
+ int32_t _segment_id;
int64_t _submit_task_time;
};
@@ -68,8 +72,12 @@ Status FlushToken::submit(std::unique_ptr<MemTable>
mem_table) {
if (s != OK) {
return Status::Error(s);
}
+ if (mem_table->empty()) {
+ return Status::OK();
+ }
int64_t submit_task_time = MonotonicNanos();
- auto task = std::make_shared<MemtableFlushTask>(this,
std::move(mem_table), submit_task_time);
+ auto task = std::make_shared<MemtableFlushTask>(
+ this, std::move(mem_table), _rowset_writer->allocate_segment_id(),
submit_task_time);
_stats.flush_running_count++;
return _flush_token->submit(std::move(task));
}
@@ -84,7 +92,24 @@ Status FlushToken::wait() {
return s == OK ? Status::OK() : Status::Error(s);
}
-void FlushToken::_flush_memtable(MemTable* memtable, int64_t submit_task_time)
{
+Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id,
int64_t* flush_size) {
+ VLOG_CRITICAL << "begin to flush memtable for tablet: " <<
memtable->tablet_id()
+ << ", memsize: " << memtable->memory_usage()
+ << ", rows: " << memtable->stat().raw_rows;
+ int64_t duration_ns;
+ SCOPED_RAW_TIMER(&duration_ns);
+ std::unique_ptr<vectorized::Block> block = memtable->to_block();
+
SKIP_MEMORY_CHECK(RETURN_IF_ERROR(_rowset_writer->unfold_variant_column_and_flush_block(
+ block.get(), segment_id, memtable->flush_mem_tracker(),
flush_size)));
+ _memtable_stat += memtable->stat();
+ DorisMetrics::instance()->memtable_flush_total->increment(1);
+
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns /
1000);
+ VLOG_CRITICAL << "after flush memtable for tablet: " <<
memtable->tablet_id()
+ << ", flushsize: " << *flush_size;
+ return Status::OK();
+}
+
+void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id,
int64_t submit_task_time) {
uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
_stats.flush_wait_time_ns += flush_wait_time_ns;
// If previous flush has failed, return directly
@@ -95,7 +120,10 @@ void FlushToken::_flush_memtable(MemTable* memtable,
int64_t submit_task_time) {
MonotonicStopWatch timer;
timer.start();
size_t memory_usage = memtable->memory_usage();
- Status s = memtable->flush();
+
+ int64_t flush_size;
+ Status s = _do_flush_memtable(memtable, segment_id, &flush_size);
+
if (!s) {
LOG(WARNING) << "Flush memtable failed with res = " << s;
// If s is not ok, ignore the code, just use other code is ok
@@ -109,12 +137,12 @@ void FlushToken::_flush_memtable(MemTable* memtable,
int64_t submit_task_time) {
<< "(ns), flush memtable cost: " << timer.elapsed_time()
<< "(ns), running count: " << _stats.flush_running_count
<< ", finish count: " << _stats.flush_finish_count
- << ", mem size: " << memory_usage << ", disk size: " <<
memtable->flush_size();
+ << ", mem size: " << memory_usage << ", disk size: " <<
flush_size;
_stats.flush_time_ns += timer.elapsed_time();
_stats.flush_finish_count++;
_stats.flush_running_count--;
_stats.flush_size_bytes += memtable->memory_usage();
- _stats.flush_disk_size_bytes += memtable->flush_size();
+ _stats.flush_disk_size_bytes += flush_size;
}
void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {
@@ -135,30 +163,31 @@ void MemTableFlushExecutor::init(const
std::vector<DataDir*>& data_dirs) {
}
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are
flushed in order.
-Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>*
flush_token,
- RowsetTypePB rowset_type,
bool should_serial,
+Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>&
flush_token,
+ RowsetWriter* rowset_writer,
bool should_serial,
bool is_high_priority) {
if (!is_high_priority) {
- if (rowset_type == BETA_ROWSET && !should_serial) {
+ if (rowset_writer->type() == BETA_ROWSET && !should_serial) {
// beta rowset can be flush in CONCURRENT, because each memtable
using a new segment writer.
- flush_token->reset(
+ flush_token.reset(
new
FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)));
} else {
// alpha rowset do not support flush in CONCURRENT.
- flush_token->reset(
+ flush_token.reset(
new
FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
}
} else {
- if (rowset_type == BETA_ROWSET && !should_serial) {
+ if (rowset_writer->type() == BETA_ROWSET && !should_serial) {
// beta rowset can be flush in CONCURRENT, because each memtable
using a new segment writer.
- flush_token->reset(new FlushToken(
+ flush_token.reset(new FlushToken(
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)));
} else {
// alpha rowset do not support flush in CONCURRENT.
- flush_token->reset(new FlushToken(
+ flush_token.reset(new FlushToken(
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
}
}
+ flush_token->set_rowset_writer(rowset_writer);
return Status::OK();
}
diff --git a/be/src/olap/memtable_flush_executor.h
b/be/src/olap/memtable_flush_executor.h
index 7608fa4d19..61cc31ab39 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -26,13 +26,14 @@
#include <vector>
#include "common/status.h"
+#include "olap/memtable.h"
#include "util/threadpool.h"
namespace doris {
class DataDir;
class MemTable;
-enum RowsetTypePB : int;
+class RowsetWriter;
// the statistic of a certain flush handler.
// use atomic because it may be updated by multi threads
@@ -71,10 +72,16 @@ public:
// get flush operations' statistics
const FlushStatistic& get_stats() const { return _stats; }
+ void set_rowset_writer(RowsetWriter* rowset_writer) { _rowset_writer =
rowset_writer; }
+
+ const MemTableStat& memtable_stat() { return _memtable_stat; }
+
private:
friend class MemtableFlushTask;
- void _flush_memtable(MemTable* mem_table, int64_t submit_task_time);
+ void _flush_memtable(MemTable* mem_table, int32_t segment_id, int64_t
submit_task_time);
+
+ Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t*
flush_size);
std::unique_ptr<ThreadPoolToken> _flush_token;
@@ -83,6 +90,10 @@ private:
std::atomic<int> _flush_status;
FlushStatistic _stats;
+
+ RowsetWriter* _rowset_writer;
+
+ MemTableStat _memtable_stat;
};
// MemTableFlushExecutor is responsible for flushing memtables to disk.
@@ -106,7 +117,7 @@ public:
// because it needs path hash of each data dir.
void init(const std::vector<DataDir*>& data_dirs);
- Status create_flush_token(std::unique_ptr<FlushToken>* flush_token,
RowsetTypePB rowset_type,
+ Status create_flush_token(std::unique_ptr<FlushToken>& flush_token,
RowsetWriter* rowset_writer,
bool should_serial, bool is_high_priority);
private:
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index e547522317..d2b314ff3a 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -46,10 +46,14 @@
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
+#include "runtime/thread_context.h"
#include "util/slice.h"
#include "util/time.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_object.h"
#include "vec/common/schema_util.h" // LocalSchemaChangeRecorder
#include "vec/core/block.h"
+#include "vec/data_types/data_type_factory.hpp"
namespace doris {
using namespace ErrorCode;
@@ -514,8 +518,25 @@ Status BetaRowsetWriter::flush() {
return Status::OK();
}
-Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block,
int64* flush_size,
- const FlushContext* ctx) {
+Status BetaRowsetWriter::unfold_variant_column_and_flush_block(
+ vectorized::Block* block, int32_t segment_id,
+ const std::shared_ptr<MemTracker>& flush_mem_tracker, int64_t*
flush_size) {
+ SCOPED_CONSUME_MEM_TRACKER(flush_mem_tracker);
+
+ FlushContext ctx;
+ ctx.block = block;
+ if (_context.tablet_schema->is_dynamic_schema()) {
+ // Unfold variant column
+ RETURN_IF_ERROR(_unfold_variant_column(*block, ctx.flush_schema));
+ }
+ ctx.segment_id = std::optional<int32_t> {segment_id};
+ SCOPED_RAW_TIMER(&_segment_writer_ns);
+ RETURN_IF_ERROR(flush_single_block(block, flush_size, &ctx));
+ return Status::OK();
+}
+
+Status BetaRowsetWriter::flush_single_block(const vectorized::Block* block,
int64* flush_size,
+ const FlushContext* ctx) {
if (block->rows() == 0) {
return Status::OK();
}
@@ -926,4 +947,77 @@ Status
BetaRowsetWriter::flush_segment_writer_for_segcompaction(
return Status::OK();
}
+Status BetaRowsetWriter::_unfold_variant_column(vectorized::Block& block,
+ TabletSchemaSPtr&
flush_schema) {
+ if (block.rows() == 0) {
+ return Status::OK();
+ }
+
+ // Sanitize block to match exactly from the same type of frontend meta
+ vectorized::schema_util::FullBaseSchemaView schema_view;
+ schema_view.table_id = _context.tablet_schema->table_id();
+ vectorized::ColumnWithTypeAndName* variant_column =
+ block.try_get_by_name(BeConsts::DYNAMIC_COLUMN_NAME);
+ if (!variant_column) {
+ return Status::OK();
+ }
+ auto base_column = variant_column->column;
+ vectorized::ColumnObject& object_column =
+
assert_cast<vectorized::ColumnObject&>(base_column->assume_mutable_ref());
+ if (object_column.empty()) {
+ block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
+ return Status::OK();
+ }
+ object_column.finalize();
+ // Has extended columns
+
RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
+ // Dynamic Block consists of two parts, dynamic part of columns and static
part of columns
+ // static dynamic
+ // | ----- | ------- |
+ // The static ones are original _tablet_schame columns
+ flush_schema = std::make_shared<TabletSchema>(*_context.tablet_schema);
+ vectorized::Block flush_block(std::move(block));
+ // The dynamic ones are auto generated and extended, append them the the
orig_block
+ for (auto& entry : object_column.get_subcolumns()) {
+ const std::string& column_name = entry->path.get_path();
+ auto column_iter = schema_view.column_name_to_column.find(column_name);
+ if (UNLIKELY(column_iter == schema_view.column_name_to_column.end())) {
+ // Column maybe dropped by light weight schema change DDL
+ continue;
+ }
+ TabletColumn column(column_iter->second);
+ auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(
+ column, column.is_nullable());
+ // Dynamic generated columns does not appear in original tablet schema
+ if (_context.tablet_schema->field_index(column.name()) < 0) {
+ flush_schema->append_column(column);
+ flush_block.insert({data_type->create_column(), data_type,
column.name()});
+ }
+ }
+
+ // Ensure column are all present at this schema version.Otherwise there
will be some senario:
+ // Load1 -> version(10) with schema [a, b, c, d, e], d & e is new added
columns and schema version became 10
+ // Load2 -> version(10) with schema [a, b, c] and has no extended columns
and fetched the schema at version 10
+ // Load2 will persist meta with [a, b, c] but Load1 will persist meta
with [a, b, c, d, e]
+ // So we should make sure that rowset at the same schema version alawys
contain the same size of columns.
+ // so that all columns at schema_version is in either
_context.tablet_schema or schema_change_recorder
+ for (const auto& [name, column] : schema_view.column_name_to_column) {
+ if (_context.tablet_schema->field_index(name) == -1) {
+ const auto& tcolumn = schema_view.column_name_to_column[name];
+ TabletColumn new_column(tcolumn);
+ _context.schema_change_recorder->add_extended_columns(column,
+
schema_view.schema_version);
+ }
+ }
+
+ // Last schema alignment before flush to disk, due to the schema maybe
variant before this procedure
+ // Eg. add columnA(INT) -> drop ColumnA -> add ColumnA(Double), then
columnA could be type of `Double`,
+ // unfold will cast to Double type
+ RETURN_IF_ERROR(vectorized::schema_util::unfold_object(
+ flush_block.get_position_by_name(BeConsts::DYNAMIC_COLUMN_NAME),
flush_block, true));
+ flush_block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
+ block.swap(flush_block);
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index 32115d6635..e88efdc000 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -90,10 +90,14 @@ public:
Status flush() override;
+ Status unfold_variant_column_and_flush_block(
+ vectorized::Block* block, int32_t segment_id,
+ const std::shared_ptr<MemTracker>& flush_mem_tracker, int64_t*
flush_size) override;
+
// Return the file size flushed to disk in "flush_size"
// This method is thread-safe.
- Status flush_single_memtable(const vectorized::Block* block, int64_t*
flush_size,
- const FlushContext* ctx = nullptr) override;
+ Status flush_single_block(const vectorized::Block* block, int64_t*
flush_size,
+ const FlushContext* ctx = nullptr) override;
RowsetSharedPtr build() override;
@@ -115,11 +119,6 @@ public:
int32_t allocate_segment_id() override { return
_next_segment_id.fetch_add(1); };
- // Maybe modified by local schema change
- vectorized::schema_util::LocalSchemaChangeRecorder*
mutable_schema_change_recorder() override {
- return _context.schema_change_recorder.get();
- }
-
SegcompactionWorker& get_segcompaction_worker() { return
_segcompaction_worker; }
Status flush_segment_writer_for_segcompaction(
@@ -134,6 +133,8 @@ public:
int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; }
+ int64_t segment_writer_ns() override { return _segment_writer_ns; }
+
private:
Status _do_add_block(const vectorized::Block* block,
std::unique_ptr<segment_v2::SegmentWriter>*
segment_writer,
@@ -171,6 +172,13 @@ private:
Status _rename_compacted_segment_plain(uint64_t seg_id);
Status _rename_compacted_indices(int64_t begin, int64_t end, uint64_t
seg_id);
+ // Unfold variant column to Block
+ // Eg. [A | B | C | (D, E, F)]
+ // After unfold block structure changed to -> [A | B | C | D | E | F]
+ // The expanded D, E, F is dynamic part of the block
+ // The flushed Block columns should match exactly from the same type of
frontend meta
+ Status _unfold_variant_column(vectorized::Block& block, TabletSchemaSPtr&
flush_schema);
+
// build a tmp rowset for load segment to calc delete_bitmap
// for this segment
RowsetSharedPtr _build_tmp();
@@ -231,6 +239,7 @@ protected:
std::shared_ptr<MowContext> _mow_context;
int64_t _delete_bitmap_ns = 0;
+ int64_t _segment_writer_ns = 0;
};
} // namespace doris
diff --git a/be/src/olap/rowset/rowset_writer.h
b/be/src/olap/rowset/rowset_writer.h
index e8d214e00d..d5655a89b9 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -72,8 +72,14 @@ public:
}
virtual Status final_flush() { return
Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(); }
- virtual Status flush_single_memtable(const vectorized::Block* block,
int64_t* flush_size,
- const FlushContext* ctx = nullptr) {
+ virtual Status unfold_variant_column_and_flush_block(
+ vectorized::Block* block, int32_t segment_id,
+ const std::shared_ptr<MemTracker>& flush_mem_tracker, int64_t*
flush_size) {
+ return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>();
+ }
+
+ virtual Status flush_single_block(const vectorized::Block* block, int64_t*
flush_size,
+ const FlushContext* ctx = nullptr) {
return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>();
}
@@ -104,11 +110,10 @@ public:
virtual void set_segment_start_id(int num_segment) { LOG(FATAL) << "not
supported!"; }
- virtual vectorized::schema_util::LocalSchemaChangeRecorder*
- mutable_schema_change_recorder() = 0;
-
virtual int64_t delete_bitmap_ns() { return 0; }
+ virtual int64_t segment_writer_ns() { return 0; }
+
private:
DISALLOW_COPY_AND_ASSIGN(RowsetWriter);
};
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index a47728941d..55353f210c 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2955,7 +2955,7 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr
rowset,
rowset_schema, read_plan_ori, read_plan_update,
rsid_to_rowset, &block));
sort_block(block, ordered_block);
int64_t size;
- RETURN_IF_ERROR(rowset_writer->flush_single_memtable(&ordered_block,
&size));
+ RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block,
&size));
}
LOG(INFO) << "calc segment delete bitmap, tablet: " << tablet_id() << "
rowset: " << rowset_id
<< " seg_id: " << seg->id() << " dummy_version: " << end_version
+ 1
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]