This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dac2b638c6 [refactor](load) move memtable flush logic to flush token 
and rowset writer (#21547)
dac2b638c6 is described below

commit dac2b638c6924cac8902606662fc7744470e9458
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Jul 6 17:04:30 2023 +0800

    [refactor](load) move memtable flush logic to flush token and rowset writer 
(#21547)
---
 be/src/olap/delta_writer.cpp              |  35 ++++----
 be/src/olap/delta_writer.h                |   2 -
 be/src/olap/memtable.cpp                  | 129 +-----------------------------
 be/src/olap/memtable.h                    |  39 ++-------
 be/src/olap/memtable_flush_executor.cpp   |  59 ++++++++++----
 be/src/olap/memtable_flush_executor.h     |  17 +++-
 be/src/olap/rowset/beta_rowset_writer.cpp |  98 ++++++++++++++++++++++-
 be/src/olap/rowset/beta_rowset_writer.h   |  23 ++++--
 be/src/olap/rowset/rowset_writer.h        |  15 ++--
 be/src/olap/tablet.cpp                    |   2 +-
 10 files changed, 205 insertions(+), 214 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index ab615d7e7f..0fc42cc219 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -209,7 +209,7 @@ Status DeltaWriter::init() {
     // we can make sure same keys sort in the same order in all replicas.
     bool should_serial = false;
     
RETURN_IF_ERROR(_storage_engine->memtable_flush_executor()->create_flush_token(
-            &_flush_token, _rowset_writer->type(), should_serial, 
_req.is_high_priority));
+            _flush_token, _rowset_writer.get(), should_serial, 
_req.is_high_priority));
 
     _is_init = true;
     return Status::OK();
@@ -263,10 +263,6 @@ Status DeltaWriter::write(const vectorized::Block* block, 
const std::vector<int>
 }
 
 Status DeltaWriter::_flush_memtable_async() {
-    if (_mem_table->empty()) {
-        return Status::OK();
-    }
-    _mem_table->assign_segment_id();
     return _flush_token->submit(std::move(_mem_table));
 }
 
@@ -342,22 +338,10 @@ void DeltaWriter::_reset_mem_table() {
         _mem_table_flush_trackers.push_back(mem_table_flush_tracker);
     }
     _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(), 
_req.slots, _req.tuple_desc,
-                                  _rowset_writer.get(), 
_tablet->enable_unique_key_merge_on_write(),
+                                  _tablet->enable_unique_key_merge_on_write(),
                                   mem_table_insert_tracker, 
mem_table_flush_tracker));
 
     COUNTER_UPDATE(_segment_num, 1);
-    _mem_table->set_callback([this](MemTableStat& stat) {
-        _memtable_stat += stat;
-        COUNTER_SET(_sort_timer, _memtable_stat.sort_ns);
-        COUNTER_SET(_agg_timer, _memtable_stat.agg_ns);
-        COUNTER_SET(_memtable_duration_timer, _memtable_stat.duration_ns);
-        COUNTER_SET(_segment_writer_timer, _memtable_stat.segment_writer_ns);
-        COUNTER_SET(_put_into_output_timer, _memtable_stat.put_into_output_ns);
-        COUNTER_SET(_sort_times, _memtable_stat.sort_times);
-        COUNTER_SET(_agg_times, _memtable_stat.agg_times);
-        COUNTER_SET(_raw_rows_num, _memtable_stat.raw_rows);
-        COUNTER_SET(_merged_rows_num, _memtable_stat.merged_rows);
-    });
 }
 
 Status DeltaWriter::close() {
@@ -417,10 +401,11 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& 
slave_tablet_nodes,
 
     _mem_table.reset();
 
-    if (_rowset_writer->num_rows() + _memtable_stat.merged_rows != 
_total_received_rows) {
+    if (_rowset_writer->num_rows() + _flush_token->memtable_stat().merged_rows 
!=
+        _total_received_rows) {
         LOG(WARNING) << "the rows number written doesn't match, rowset num 
rows written to file: "
                      << _rowset_writer->num_rows()
-                     << ", merged_rows: " << _memtable_stat.merged_rows
+                     << ", merged_rows: " << 
_flush_token->memtable_stat().merged_rows
                      << ", total received rows: " << _total_received_rows;
         return Status::InternalError("rows number written by delta writer 
dosen't match");
     }
@@ -495,6 +480,16 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& 
slave_tablet_nodes,
     }
     COUNTER_UPDATE(_lock_timer, _lock_watch.elapsed_time() / 1000);
     COUNTER_SET(_delete_bitmap_timer, _rowset_writer->delete_bitmap_ns());
+    COUNTER_SET(_segment_writer_timer, _rowset_writer->segment_writer_ns());
+    const auto& memtable_stat = _flush_token->memtable_stat();
+    COUNTER_SET(_sort_timer, memtable_stat.sort_ns);
+    COUNTER_SET(_agg_timer, memtable_stat.agg_ns);
+    COUNTER_SET(_memtable_duration_timer, memtable_stat.duration_ns);
+    COUNTER_SET(_put_into_output_timer, memtable_stat.put_into_output_ns);
+    COUNTER_SET(_sort_times, memtable_stat.sort_times);
+    COUNTER_SET(_agg_times, memtable_stat.agg_times);
+    COUNTER_SET(_raw_rows_num, memtable_stat.raw_rows);
+    COUNTER_SET(_merged_rows_num, memtable_stat.merged_rows);
     return Status::OK();
 }
 
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 98202dac35..c12a53c4fa 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -200,8 +200,6 @@ private:
     RuntimeProfile::Counter* _merged_rows_num = nullptr;
 
     MonotonicStopWatch _lock_watch;
-
-    MemTableStat _memtable_stat;
 };
 
 } // namespace doris
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index ae0165b9fc..c7d970968a 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -24,37 +24,26 @@
 #include <algorithm>
 #include <limits>
 #include <string>
-#include <utility>
 #include <vector>
 
 #include "common/config.h"
-#include "common/consts.h"
-#include "common/logging.h"
 #include "olap/olap_define.h"
-#include "olap/rowset/rowset_writer.h"
 #include "olap/tablet_schema.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/load_channel_mgr.h"
-#include "runtime/thread_context.h"
-#include "util/doris_metrics.h"
 #include "util/runtime_profile.h"
 #include "util/stopwatch.hpp"
 #include "vec/aggregate_functions/aggregate_function_reader.h"
 #include "vec/aggregate_functions/aggregate_function_simple_factory.h"
 #include "vec/columns/column.h"
-#include "vec/columns/column_object.h"
-#include "vec/common/assert_cast.h"
-#include "vec/common/schema_util.h"
-#include "vec/core/column_with_type_and_name.h"
-#include "vec/data_types/data_type_factory.hpp"
 
 namespace doris {
 using namespace ErrorCode;
 
 MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
                    const std::vector<SlotDescriptor*>* slot_descs, 
TupleDescriptor* tuple_desc,
-                   RowsetWriter* rowset_writer, bool enable_unique_key_mow,
+                   bool enable_unique_key_mow,
                    const std::shared_ptr<MemTracker>& insert_mem_tracker,
                    const std::shared_ptr<MemTracker>& flush_mem_tracker)
         : _tablet_id(tablet_id),
@@ -63,7 +52,6 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* 
tablet_schema,
           _tablet_schema(tablet_schema),
           _insert_mem_tracker(insert_mem_tracker),
           _flush_mem_tracker(flush_mem_tracker),
-          _rowset_writer(rowset_writer),
           _is_first_insertion(true),
           _agg_functions(tablet_schema->num_columns()),
           _offsets_of_aggregate_states(tablet_schema->num_columns()),
@@ -435,27 +423,7 @@ bool MemTable::need_agg() const {
     return false;
 }
 
-Status MemTable::flush() {
-    VLOG_CRITICAL << "begin to flush memtable for tablet: " << tablet_id()
-                  << ", memsize: " << memory_usage() << ", rows: " << 
_stat.raw_rows;
-    // For merge_on_write table, it must get all segments in this flush.
-    // The id of new segment is set by the _num_segment of beta_rowset_writer,
-    // and new segment ids is between [atomic_num_segments_before_flush, 
atomic_num_segments_after_flush),
-    // and use the ids to load segment data file for calc delete bitmap.
-    int64_t duration_ns;
-    SCOPED_RAW_TIMER(&duration_ns);
-    SKIP_MEMORY_CHECK(RETURN_IF_ERROR(_do_flush()));
-    _delta_writer_callback(_stat);
-    DorisMetrics::instance()->memtable_flush_total->increment(1);
-    
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 
1000);
-    VLOG_CRITICAL << "after flush memtable for tablet: " << tablet_id()
-                  << ", flushsize: " << _flush_size;
-
-    return Status::OK();
-}
-
-Status MemTable::_do_flush() {
-    SCOPED_CONSUME_MEM_TRACKER(_flush_mem_tracker);
+std::unique_ptr<vectorized::Block> MemTable::to_block() {
     size_t same_keys_num = _sort();
     if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
         if (_keys_type == KeysType::DUP_KEYS && 
_tablet_schema->num_key_columns() == 0) {
@@ -467,98 +435,7 @@ Status MemTable::_do_flush() {
     } else {
         _aggregate<true>();
     }
-    vectorized::Block block = _output_mutable_block.to_block();
-    FlushContext ctx;
-    ctx.block = &block;
-    if (_tablet_schema->is_dynamic_schema()) {
-        // Unfold variant column
-        RETURN_IF_ERROR(unfold_variant_column(block, &ctx));
-    }
-    ctx.segment_id = _segment_id;
-    SCOPED_RAW_TIMER(&_stat.segment_writer_ns);
-    RETURN_IF_ERROR(_rowset_writer->flush_single_memtable(&block, 
&_flush_size, &ctx));
-    return Status::OK();
-}
-
-void MemTable::assign_segment_id() {
-    _segment_id = std::optional<int32_t> 
{_rowset_writer->allocate_segment_id()};
-}
-
-Status MemTable::close() {
-    return flush();
-}
-
-Status MemTable::unfold_variant_column(vectorized::Block& block, FlushContext* 
ctx) {
-    if (block.rows() == 0) {
-        return Status::OK();
-    }
-
-    // Sanitize block to match exactly from the same type of frontend meta
-    vectorized::schema_util::FullBaseSchemaView schema_view;
-    schema_view.table_id = _tablet_schema->table_id();
-    vectorized::ColumnWithTypeAndName* variant_column =
-            block.try_get_by_name(BeConsts::DYNAMIC_COLUMN_NAME);
-    if (!variant_column) {
-        return Status::OK();
-    }
-    auto base_column = variant_column->column;
-    vectorized::ColumnObject& object_column =
-            
assert_cast<vectorized::ColumnObject&>(base_column->assume_mutable_ref());
-    if (object_column.empty()) {
-        block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
-        return Status::OK();
-    }
-    object_column.finalize();
-    // Has extended columns
-    
RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
-    // Dynamic Block consists of two parts, dynamic part of columns and static 
part of columns
-    //  static   dynamic
-    // | ----- | ------- |
-    // The static ones are original _tablet_schame columns
-    TabletSchemaSPtr flush_schema = 
std::make_shared<TabletSchema>(*_tablet_schema);
-    vectorized::Block flush_block(std::move(block));
-    // The dynamic ones are auto generated and extended, append them the the 
orig_block
-    for (auto& entry : object_column.get_subcolumns()) {
-        const std::string& column_name = entry->path.get_path();
-        auto column_iter = schema_view.column_name_to_column.find(column_name);
-        if (UNLIKELY(column_iter == schema_view.column_name_to_column.end())) {
-            // Column maybe dropped by light weight schema change DDL
-            continue;
-        }
-        TabletColumn column(column_iter->second);
-        auto data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
-                column, column.is_nullable());
-        // Dynamic generated columns does not appear in original tablet schema
-        if (_tablet_schema->field_index(column.name()) < 0) {
-            flush_schema->append_column(column);
-            flush_block.insert({data_type->create_column(), data_type, 
column.name()});
-        }
-    }
-
-    // Ensure column are all present at this schema version.Otherwise there 
will be some senario:
-    //  Load1 -> version(10) with schema [a, b, c, d, e], d & e is new added 
columns and schema version became 10
-    //  Load2 -> version(10) with schema [a, b, c] and has no extended columns 
and fetched the schema at version 10
-    //  Load2 will persist meta with [a, b, c] but Load1 will persist meta 
with [a, b, c, d, e]
-    // So we should make sure that rowset at the same schema version alawys 
contain the same size of columns.
-    // so that all columns at schema_version is in either _tablet_schema or 
schema_change_recorder
-    for (const auto& [name, column] : schema_view.column_name_to_column) {
-        if (_tablet_schema->field_index(name) == -1) {
-            const auto& tcolumn = schema_view.column_name_to_column[name];
-            TabletColumn new_column(tcolumn);
-            
_rowset_writer->mutable_schema_change_recorder()->add_extended_columns(
-                    column, schema_view.schema_version);
-        }
-    }
-
-    // Last schema alignment before flush to disk, due to the schema maybe 
variant before this procedure
-    // Eg. add columnA(INT) -> drop ColumnA -> add ColumnA(Double), then 
columnA could be type of `Double`,
-    // unfold will cast to Double type
-    RETURN_IF_ERROR(vectorized::schema_util::unfold_object(
-            flush_block.get_position_by_name(BeConsts::DYNAMIC_COLUMN_NAME), 
flush_block, true));
-    flush_block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
-    ctx->flush_schema = flush_schema;
-    block.swap(flush_block);
-    return Status::OK();
+    return vectorized::Block::create_unique(_output_mutable_block.to_block());
 }
 
 } // namespace doris
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 49efd77143..aac071f8a4 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -38,13 +38,11 @@
 
 namespace doris {
 
-class RowsetWriter;
 class Schema;
 class SlotDescriptor;
 class TabletSchema;
 class TupleDescriptor;
 enum KeysType : int;
-struct FlushContext;
 
 // row pos in _input_mutable_block
 struct RowInBlock {
@@ -142,13 +140,12 @@ private:
 
 class MemTableStat {
 public:
-    MemTableStat& operator+=(MemTableStat& stat) {
+    MemTableStat& operator+=(const MemTableStat& stat) {
         raw_rows += stat.raw_rows;
         merged_rows += stat.merged_rows;
         sort_ns += stat.sort_ns;
         agg_ns += stat.agg_ns;
         put_into_output_ns += stat.put_into_output_ns;
-        segment_writer_ns += stat.segment_writer_ns;
         duration_ns += stat.duration_ns;
         sort_times += stat.sort_times;
         agg_times += stat.agg_times;
@@ -161,7 +158,6 @@ public:
     int64_t sort_ns = 0;
     int64_t agg_ns = 0;
     int64_t put_into_output_ns = 0;
-    int64_t segment_writer_ns = 0;
     int64_t duration_ns = 0;
     int64_t sort_times = 0;
     int64_t agg_times = 0;
@@ -171,8 +167,7 @@ class MemTable {
 public:
     MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
              const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* 
tuple_desc,
-             RowsetWriter* rowset_writer, bool enable_unique_key_mow,
-             const std::shared_ptr<MemTracker>& insert_mem_tracker,
+             bool enable_unique_key_mow, const std::shared_ptr<MemTracker>& 
insert_mem_tracker,
              const std::shared_ptr<MemTracker>& flush_mem_tracker);
     ~MemTable();
 
@@ -191,34 +186,19 @@ public:
 
     bool need_agg() const;
 
-    /// Flush
-    Status flush();
-    Status close();
-
-    int64_t flush_size() const { return _flush_size; }
-
-    void set_callback(std::function<void(MemTableStat&)> callback) {
-        _delta_writer_callback = callback;
-    }
+    std::unique_ptr<vectorized::Block> to_block();
 
     bool empty() const { return _input_mutable_block.rows() == 0; }
-    void assign_segment_id();
 
-private:
-    Status _do_flush();
+    const MemTableStat& stat() { return _stat; }
+
+    std::shared_ptr<MemTracker> flush_mem_tracker() { return 
_flush_mem_tracker; }
 
 private:
     // for vectorized
     void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, 
RowInBlock* new_row,
                                      RowInBlock* row_in_skiplist);
 
-    // Unfold variant column to Block
-    // Eg. [A | B | C | (D, E, F)]
-    // After unfold block structure changed to -> [A | B | C | D | E | F]
-    // The expanded D, E, F is dynamic part of the block
-    // The flushed Block columns should match exactly from the same type of 
frontend meta
-    Status unfold_variant_column(vectorized::Block& block, FlushContext* ctx);
-
 private:
     int64_t _tablet_id;
     bool _enable_unique_key_mow = false;
@@ -248,10 +228,6 @@ private:
                                             const TupleDescriptor* tuple_desc);
     std::vector<int> _column_offset;
 
-    RowsetWriter* _rowset_writer;
-
-    // the data size flushed on disk of this memtable
-    int64_t _flush_size = 0;
     // Number of rows inserted to this memtable.
     // This is not the rows in this memtable, because rows may be merged
     // in unique or aggregate key model.
@@ -273,8 +249,6 @@ private:
     void _aggregate();
     void _put_into_output(vectorized::Block& in_block);
     bool _is_first_insertion;
-    std::function<void(MemTableStat&)> _delta_writer_callback;
-    std::optional<int32_t> _segment_id = std::nullopt;
 
     void _init_agg_functions(const vectorized::Block* block);
     std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
@@ -284,7 +258,6 @@ private:
     // Memory usage without _arena.
     size_t _mem_usage;
 
-    std::shared_ptr<MowContext> _mow_context;
     size_t _num_columns;
 }; // class MemTable
 
diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index 9c5b92ed10..9f823c347d 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -26,6 +26,8 @@
 #include "common/config.h"
 #include "common/logging.h"
 #include "olap/memtable.h"
+#include "olap/rowset/rowset_writer.h"
+#include "util/doris_metrics.h"
 #include "util/stopwatch.hpp"
 #include "util/time.h"
 
@@ -35,21 +37,23 @@ using namespace ErrorCode;
 class MemtableFlushTask final : public Runnable {
 public:
     MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable> 
memtable,
-                      int64_t submit_task_time)
+                      int32_t segment_id, int64_t submit_task_time)
             : _flush_token(flush_token),
               _memtable(std::move(memtable)),
+              _segment_id(segment_id),
               _submit_task_time(submit_task_time) {}
 
     ~MemtableFlushTask() override = default;
 
     void run() override {
-        _flush_token->_flush_memtable(_memtable.get(), _submit_task_time);
+        _flush_token->_flush_memtable(_memtable.get(), _segment_id, 
_submit_task_time);
         _memtable.reset();
     }
 
 private:
     FlushToken* _flush_token;
     std::unique_ptr<MemTable> _memtable;
+    int32_t _segment_id;
     int64_t _submit_task_time;
 };
 
@@ -68,8 +72,12 @@ Status FlushToken::submit(std::unique_ptr<MemTable> 
mem_table) {
     if (s != OK) {
         return Status::Error(s);
     }
+    if (mem_table->empty()) {
+        return Status::OK();
+    }
     int64_t submit_task_time = MonotonicNanos();
-    auto task = std::make_shared<MemtableFlushTask>(this, 
std::move(mem_table), submit_task_time);
+    auto task = std::make_shared<MemtableFlushTask>(
+            this, std::move(mem_table), _rowset_writer->allocate_segment_id(), 
submit_task_time);
     _stats.flush_running_count++;
     return _flush_token->submit(std::move(task));
 }
@@ -84,7 +92,24 @@ Status FlushToken::wait() {
     return s == OK ? Status::OK() : Status::Error(s);
 }
 
-void FlushToken::_flush_memtable(MemTable* memtable, int64_t submit_task_time) 
{
+Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, 
int64_t* flush_size) {
+    VLOG_CRITICAL << "begin to flush memtable for tablet: " << 
memtable->tablet_id()
+                  << ", memsize: " << memtable->memory_usage()
+                  << ", rows: " << memtable->stat().raw_rows;
+    int64_t duration_ns;
+    SCOPED_RAW_TIMER(&duration_ns);
+    std::unique_ptr<vectorized::Block> block = memtable->to_block();
+    
SKIP_MEMORY_CHECK(RETURN_IF_ERROR(_rowset_writer->unfold_variant_column_and_flush_block(
+            block.get(), segment_id, memtable->flush_mem_tracker(), 
flush_size)));
+    _memtable_stat += memtable->stat();
+    DorisMetrics::instance()->memtable_flush_total->increment(1);
+    
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 
1000);
+    VLOG_CRITICAL << "after flush memtable for tablet: " << 
memtable->tablet_id()
+                  << ", flushsize: " << *flush_size;
+    return Status::OK();
+}
+
+void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, 
int64_t submit_task_time) {
     uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
     _stats.flush_wait_time_ns += flush_wait_time_ns;
     // If previous flush has failed, return directly
@@ -95,7 +120,10 @@ void FlushToken::_flush_memtable(MemTable* memtable, 
int64_t submit_task_time) {
     MonotonicStopWatch timer;
     timer.start();
     size_t memory_usage = memtable->memory_usage();
-    Status s = memtable->flush();
+
+    int64_t flush_size;
+    Status s = _do_flush_memtable(memtable, segment_id, &flush_size);
+
     if (!s) {
         LOG(WARNING) << "Flush memtable failed with res = " << s;
         // If s is not ok, ignore the code, just use other code is ok
@@ -109,12 +137,12 @@ void FlushToken::_flush_memtable(MemTable* memtable, 
int64_t submit_task_time) {
                   << "(ns), flush memtable cost: " << timer.elapsed_time()
                   << "(ns), running count: " << _stats.flush_running_count
                   << ", finish count: " << _stats.flush_finish_count
-                  << ", mem size: " << memory_usage << ", disk size: " << 
memtable->flush_size();
+                  << ", mem size: " << memory_usage << ", disk size: " << 
flush_size;
     _stats.flush_time_ns += timer.elapsed_time();
     _stats.flush_finish_count++;
     _stats.flush_running_count--;
     _stats.flush_size_bytes += memtable->memory_usage();
-    _stats.flush_disk_size_bytes += memtable->flush_size();
+    _stats.flush_disk_size_bytes += flush_size;
 }
 
 void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {
@@ -135,30 +163,31 @@ void MemTableFlushExecutor::init(const 
std::vector<DataDir*>& data_dirs) {
 }
 
 // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are 
flushed in order.
-Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>* 
flush_token,
-                                                 RowsetTypePB rowset_type, 
bool should_serial,
+Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& 
flush_token,
+                                                 RowsetWriter* rowset_writer, 
bool should_serial,
                                                  bool is_high_priority) {
     if (!is_high_priority) {
-        if (rowset_type == BETA_ROWSET && !should_serial) {
+        if (rowset_writer->type() == BETA_ROWSET && !should_serial) {
             // beta rowset can be flush in CONCURRENT, because each memtable 
using a new segment writer.
-            flush_token->reset(
+            flush_token.reset(
                     new 
FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)));
         } else {
             // alpha rowset do not support flush in CONCURRENT.
-            flush_token->reset(
+            flush_token.reset(
                     new 
FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
         }
     } else {
-        if (rowset_type == BETA_ROWSET && !should_serial) {
+        if (rowset_writer->type() == BETA_ROWSET && !should_serial) {
             // beta rowset can be flush in CONCURRENT, because each memtable 
using a new segment writer.
-            flush_token->reset(new FlushToken(
+            flush_token.reset(new FlushToken(
                     
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)));
         } else {
             // alpha rowset do not support flush in CONCURRENT.
-            flush_token->reset(new FlushToken(
+            flush_token.reset(new FlushToken(
                     
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
         }
     }
+    flush_token->set_rowset_writer(rowset_writer);
     return Status::OK();
 }
 
diff --git a/be/src/olap/memtable_flush_executor.h 
b/be/src/olap/memtable_flush_executor.h
index 7608fa4d19..61cc31ab39 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -26,13 +26,14 @@
 #include <vector>
 
 #include "common/status.h"
+#include "olap/memtable.h"
 #include "util/threadpool.h"
 
 namespace doris {
 
 class DataDir;
 class MemTable;
-enum RowsetTypePB : int;
+class RowsetWriter;
 
 // the statistic of a certain flush handler.
 // use atomic because it may be updated by multi threads
@@ -71,10 +72,16 @@ public:
     // get flush operations' statistics
     const FlushStatistic& get_stats() const { return _stats; }
 
+    void set_rowset_writer(RowsetWriter* rowset_writer) { _rowset_writer = 
rowset_writer; }
+
+    const MemTableStat& memtable_stat() { return _memtable_stat; }
+
 private:
     friend class MemtableFlushTask;
 
-    void _flush_memtable(MemTable* mem_table, int64_t submit_task_time);
+    void _flush_memtable(MemTable* mem_table, int32_t segment_id, int64_t 
submit_task_time);
+
+    Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* 
flush_size);
 
     std::unique_ptr<ThreadPoolToken> _flush_token;
 
@@ -83,6 +90,10 @@ private:
     std::atomic<int> _flush_status;
 
     FlushStatistic _stats;
+
+    RowsetWriter* _rowset_writer;
+
+    MemTableStat _memtable_stat;
 };
 
 // MemTableFlushExecutor is responsible for flushing memtables to disk.
@@ -106,7 +117,7 @@ public:
     // because it needs path hash of each data dir.
     void init(const std::vector<DataDir*>& data_dirs);
 
-    Status create_flush_token(std::unique_ptr<FlushToken>* flush_token, 
RowsetTypePB rowset_type,
+    Status create_flush_token(std::unique_ptr<FlushToken>& flush_token, 
RowsetWriter* rowset_writer,
                               bool should_serial, bool is_high_priority);
 
 private:
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index e547522317..d2b314ff3a 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -46,10 +46,14 @@
 #include "olap/schema_change.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_schema.h"
+#include "runtime/thread_context.h"
 #include "util/slice.h"
 #include "util/time.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_object.h"
 #include "vec/common/schema_util.h" // LocalSchemaChangeRecorder
 #include "vec/core/block.h"
+#include "vec/data_types/data_type_factory.hpp"
 
 namespace doris {
 using namespace ErrorCode;
@@ -514,8 +518,25 @@ Status BetaRowsetWriter::flush() {
     return Status::OK();
 }
 
-Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block, 
int64* flush_size,
-                                               const FlushContext* ctx) {
+Status BetaRowsetWriter::unfold_variant_column_and_flush_block(
+        vectorized::Block* block, int32_t segment_id,
+        const std::shared_ptr<MemTracker>& flush_mem_tracker, int64_t* 
flush_size) {
+    SCOPED_CONSUME_MEM_TRACKER(flush_mem_tracker);
+
+    FlushContext ctx;
+    ctx.block = block;
+    if (_context.tablet_schema->is_dynamic_schema()) {
+        // Unfold variant column
+        RETURN_IF_ERROR(_unfold_variant_column(*block, ctx.flush_schema));
+    }
+    ctx.segment_id = std::optional<int32_t> {segment_id};
+    SCOPED_RAW_TIMER(&_segment_writer_ns);
+    RETURN_IF_ERROR(flush_single_block(block, flush_size, &ctx));
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::flush_single_block(const vectorized::Block* block, 
int64* flush_size,
+                                            const FlushContext* ctx) {
     if (block->rows() == 0) {
         return Status::OK();
     }
@@ -926,4 +947,77 @@ Status 
BetaRowsetWriter::flush_segment_writer_for_segcompaction(
     return Status::OK();
 }
 
+Status BetaRowsetWriter::_unfold_variant_column(vectorized::Block& block,
+                                                TabletSchemaSPtr& 
flush_schema) {
+    if (block.rows() == 0) {
+        return Status::OK();
+    }
+
+    // Sanitize block to match exactly from the same type of frontend meta
+    vectorized::schema_util::FullBaseSchemaView schema_view;
+    schema_view.table_id = _context.tablet_schema->table_id();
+    vectorized::ColumnWithTypeAndName* variant_column =
+            block.try_get_by_name(BeConsts::DYNAMIC_COLUMN_NAME);
+    if (!variant_column) {
+        return Status::OK();
+    }
+    auto base_column = variant_column->column;
+    vectorized::ColumnObject& object_column =
+            
assert_cast<vectorized::ColumnObject&>(base_column->assume_mutable_ref());
+    if (object_column.empty()) {
+        block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
+        return Status::OK();
+    }
+    object_column.finalize();
+    // Has extended columns
+    
RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view));
+    // Dynamic Block consists of two parts, dynamic part of columns and static 
part of columns
+    //  static   dynamic
+    // | ----- | ------- |
+    // The static ones are original _tablet_schame columns
+    flush_schema = std::make_shared<TabletSchema>(*_context.tablet_schema);
+    vectorized::Block flush_block(std::move(block));
+    // The dynamic ones are auto generated and extended, append them the the 
orig_block
+    for (auto& entry : object_column.get_subcolumns()) {
+        const std::string& column_name = entry->path.get_path();
+        auto column_iter = schema_view.column_name_to_column.find(column_name);
+        if (UNLIKELY(column_iter == schema_view.column_name_to_column.end())) {
+            // Column maybe dropped by light weight schema change DDL
+            continue;
+        }
+        TabletColumn column(column_iter->second);
+        auto data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                column, column.is_nullable());
+        // Dynamic generated columns does not appear in original tablet schema
+        if (_context.tablet_schema->field_index(column.name()) < 0) {
+            flush_schema->append_column(column);
+            flush_block.insert({data_type->create_column(), data_type, 
column.name()});
+        }
+    }
+
+    // Ensure column are all present at this schema version.Otherwise there 
will be some senario:
+    //  Load1 -> version(10) with schema [a, b, c, d, e], d & e is new added 
columns and schema version became 10
+    //  Load2 -> version(10) with schema [a, b, c] and has no extended columns 
and fetched the schema at version 10
+    //  Load2 will persist meta with [a, b, c] but Load1 will persist meta 
with [a, b, c, d, e]
+    // So we should make sure that rowset at the same schema version alawys 
contain the same size of columns.
+    // so that all columns at schema_version is in either 
_context.tablet_schema or schema_change_recorder
+    for (const auto& [name, column] : schema_view.column_name_to_column) {
+        if (_context.tablet_schema->field_index(name) == -1) {
+            const auto& tcolumn = schema_view.column_name_to_column[name];
+            TabletColumn new_column(tcolumn);
+            _context.schema_change_recorder->add_extended_columns(column,
+                                                                  
schema_view.schema_version);
+        }
+    }
+
+    // Last schema alignment before flush to disk, due to the schema maybe 
variant before this procedure
+    // Eg. add columnA(INT) -> drop ColumnA -> add ColumnA(Double), then 
columnA could be type of `Double`,
+    // unfold will cast to Double type
+    RETURN_IF_ERROR(vectorized::schema_util::unfold_object(
+            flush_block.get_position_by_name(BeConsts::DYNAMIC_COLUMN_NAME), 
flush_block, true));
+    flush_block.erase(BeConsts::DYNAMIC_COLUMN_NAME);
+    block.swap(flush_block);
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index 32115d6635..e88efdc000 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -90,10 +90,14 @@ public:
 
     Status flush() override;
 
+    Status unfold_variant_column_and_flush_block(
+            vectorized::Block* block, int32_t segment_id,
+            const std::shared_ptr<MemTracker>& flush_mem_tracker, int64_t* 
flush_size) override;
+
     // Return the file size flushed to disk in "flush_size"
     // This method is thread-safe.
-    Status flush_single_memtable(const vectorized::Block* block, int64_t* 
flush_size,
-                                 const FlushContext* ctx = nullptr) override;
+    Status flush_single_block(const vectorized::Block* block, int64_t* 
flush_size,
+                              const FlushContext* ctx = nullptr) override;
 
     RowsetSharedPtr build() override;
 
@@ -115,11 +119,6 @@ public:
 
     int32_t allocate_segment_id() override { return 
_next_segment_id.fetch_add(1); };
 
-    // Maybe modified by local schema change
-    vectorized::schema_util::LocalSchemaChangeRecorder* 
mutable_schema_change_recorder() override {
-        return _context.schema_change_recorder.get();
-    }
-
     SegcompactionWorker& get_segcompaction_worker() { return 
_segcompaction_worker; }
 
     Status flush_segment_writer_for_segcompaction(
@@ -134,6 +133,8 @@ public:
 
     int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; }
 
+    int64_t segment_writer_ns() override { return _segment_writer_ns; }
+
 private:
     Status _do_add_block(const vectorized::Block* block,
                          std::unique_ptr<segment_v2::SegmentWriter>* 
segment_writer,
@@ -171,6 +172,13 @@ private:
     Status _rename_compacted_segment_plain(uint64_t seg_id);
     Status _rename_compacted_indices(int64_t begin, int64_t end, uint64_t 
seg_id);
 
+    // Unfold variant column to Block
+    // Eg. [A | B | C | (D, E, F)]
+    // After unfold block structure changed to -> [A | B | C | D | E | F]
+    // The expanded D, E, F is dynamic part of the block
+    // The flushed Block columns should match exactly from the same type of 
frontend meta
+    Status _unfold_variant_column(vectorized::Block& block, TabletSchemaSPtr& 
flush_schema);
+
     // build a tmp rowset for load segment to calc delete_bitmap
     // for this segment
     RowsetSharedPtr _build_tmp();
@@ -231,6 +239,7 @@ protected:
     std::shared_ptr<MowContext> _mow_context;
 
     int64_t _delete_bitmap_ns = 0;
+    int64_t _segment_writer_ns = 0;
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/rowset_writer.h 
b/be/src/olap/rowset/rowset_writer.h
index e8d214e00d..d5655a89b9 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -72,8 +72,14 @@ public:
     }
     virtual Status final_flush() { return 
Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(); }
 
-    virtual Status flush_single_memtable(const vectorized::Block* block, 
int64_t* flush_size,
-                                         const FlushContext* ctx = nullptr) {
+    virtual Status unfold_variant_column_and_flush_block(
+            vectorized::Block* block, int32_t segment_id,
+            const std::shared_ptr<MemTracker>& flush_mem_tracker, int64_t* 
flush_size) {
+        return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>();
+    }
+
+    virtual Status flush_single_block(const vectorized::Block* block, int64_t* 
flush_size,
+                                      const FlushContext* ctx = nullptr) {
         return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>();
     }
 
@@ -104,11 +110,10 @@ public:
 
     virtual void set_segment_start_id(int num_segment) { LOG(FATAL) << "not 
supported!"; }
 
-    virtual vectorized::schema_util::LocalSchemaChangeRecorder*
-    mutable_schema_change_recorder() = 0;
-
     virtual int64_t delete_bitmap_ns() { return 0; }
 
+    virtual int64_t segment_writer_ns() { return 0; }
+
 private:
     DISALLOW_COPY_AND_ASSIGN(RowsetWriter);
 };
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index a47728941d..55353f210c 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2955,7 +2955,7 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr 
rowset,
                 rowset_schema, read_plan_ori, read_plan_update, 
rsid_to_rowset, &block));
         sort_block(block, ordered_block);
         int64_t size;
-        RETURN_IF_ERROR(rowset_writer->flush_single_memtable(&ordered_block, 
&size));
+        RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block, 
&size));
     }
     LOG(INFO) << "calc segment delete bitmap, tablet: " << tablet_id() << " 
rowset: " << rowset_id
               << " seg_id: " << seg->id() << " dummy_version: " << end_version 
+ 1


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to