This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 0a79c547ffb [Refactor](Sink) Remove is_append mode in table sink 
(#34684)
0a79c547ffb is described below

commit 0a79c547ffba56b8a4a0696c617d76a7d4f95624
Author: lihangyu <[email protected]>
AuthorDate: Sat May 11 11:20:10 2024 +0800

    [Refactor](Sink) Remove is_append mode in table sink (#34684)
    
    Remove the is_append mode from the sink component due to the following 
reasons:
    1. The performance improvement from this mode is relatively minor, 
approximately 10%, as demonstrated in previous benchmarks.
    2. The mode complicates maintenance. It requires a separate data writing 
path to avoid copying, which increases complexity and poses a risk of potential 
data loss.
    
    I've already test the compability with previous version
---
 be/src/olap/delta_writer.cpp                 | 11 ++--
 be/src/olap/delta_writer.h                   |  5 +-
 be/src/olap/delta_writer_v2.cpp              | 11 ++--
 be/src/olap/delta_writer_v2.h                |  5 +-
 be/src/olap/memtable.cpp                     | 11 +---
 be/src/olap/memtable.h                       |  3 +-
 be/src/olap/memtable_writer.cpp              | 18 ++-----
 be/src/olap/memtable_writer.h                |  5 +-
 be/src/runtime/tablets_channel.cpp           | 16 ++----
 be/src/vec/sink/writer/vtablet_writer.cpp    | 81 +++-------------------------
 be/src/vec/sink/writer/vtablet_writer.h      |  2 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp |  2 +-
 12 files changed, 30 insertions(+), 140 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 53d68b9d3d3..1ec4dd17313 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -112,13 +112,8 @@ Status BaseDeltaWriter::init() {
     return Status::OK();
 }
 
-Status BaseDeltaWriter::append(const vectorized::Block* block) {
-    return write(block, {}, true);
-}
-
-Status BaseDeltaWriter::write(const vectorized::Block* block, const 
std::vector<uint32_t>& row_idxs,
-                              bool is_append) {
-    if (UNLIKELY(row_idxs.empty() && !is_append)) {
+Status BaseDeltaWriter::write(const vectorized::Block* block, const 
std::vector<uint32_t>& row_idxs) {
+    if (UNLIKELY(row_idxs.empty())) {
         return Status::OK();
     }
     _lock_watch.start();
@@ -134,7 +129,7 @@ Status BaseDeltaWriter::write(const vectorized::Block* 
block, const std::vector<
             std::this_thread::sleep_for(std::chrono::milliseconds(10));
         }
     }
-    return _memtable_writer->write(block, row_idxs, is_append);
+    return _memtable_writer->write(block, row_idxs);
 }
 Status BaseDeltaWriter::wait_flush() {
     return _memtable_writer->wait_flush();
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index d2419d2d811..9dbcc0a62d7 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -65,10 +65,7 @@ public:
 
     Status init();
 
-    Status write(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs,
-                 bool is_append = false);
-
-    Status append(const vectorized::Block* block);
+    Status write(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs);
 
     // flush the last memtable to flush queue, must call it before 
build_rowset()
     Status close();
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index cae6ab88012..51cef7e9f58 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -139,13 +139,8 @@ Status DeltaWriterV2::init() {
     return Status::OK();
 }
 
-Status DeltaWriterV2::append(const vectorized::Block* block) {
-    return write(block, {}, true);
-}
-
-Status DeltaWriterV2::write(const vectorized::Block* block, const 
std::vector<uint32_t>& row_idxs,
-                            bool is_append) {
-    if (UNLIKELY(row_idxs.empty() && !is_append)) {
+Status DeltaWriterV2::write(const vectorized::Block* block, const 
std::vector<uint32_t>& row_idxs) {
+    if (UNLIKELY(row_idxs.empty())) {
         return Status::OK();
     }
     _lock_watch.start();
@@ -167,7 +162,7 @@ Status DeltaWriterV2::write(const vectorized::Block* block, 
const std::vector<ui
         }
     }
     SCOPED_RAW_TIMER(&_write_memtable_time);
-    return _memtable_writer->write(block, row_idxs, is_append);
+    return _memtable_writer->write(block, row_idxs);
 }
 
 Status DeltaWriterV2::close() {
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index e9e051608d2..31b364e1038 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -71,10 +71,7 @@ public:
 
     Status init();
 
-    Status write(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs,
-                 bool is_append = false);
-
-    Status append(const vectorized::Block* block);
+    Status write(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs);
 
     // flush the last memtable to flush queue, must call it before close_wait()
     Status close();
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 87ae20237e5..123eb7d8264 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -169,8 +169,7 @@ int RowInBlockComparator::operator()(const RowInBlock* 
left, const RowInBlock* r
                                *_pblock, -1);
 }
 
-void MemTable::insert(const vectorized::Block* input_block, const 
std::vector<uint32_t>& row_idxs,
-                      bool is_append) {
+void MemTable::insert(const vectorized::Block* input_block, const 
std::vector<uint32_t>& row_idxs) {
     vectorized::Block target_block = *input_block;
     target_block = input_block->copy_block(_column_offset);
     if (_is_first_insertion) {
@@ -201,13 +200,7 @@ void MemTable::insert(const vectorized::Block* 
input_block, const std::vector<ui
     auto num_rows = row_idxs.size();
     size_t cursor_in_mutableblock = _input_mutable_block.rows();
     auto block_size0 = _input_mutable_block.allocated_bytes();
-    if (is_append) {
-        // Append the block, call insert range from
-        _input_mutable_block.add_rows(&target_block, 0, target_block.rows());
-        num_rows = target_block.rows();
-    } else {
-        _input_mutable_block.add_rows(&target_block, row_idxs.data(), 
row_idxs.data() + num_rows);
-    }
+    _input_mutable_block.add_rows(&target_block, row_idxs.data(), 
row_idxs.data() + num_rows);
     auto block_size1 = _input_mutable_block.allocated_bytes();
     g_memtable_input_block_allocated_size << block_size1 - block_size0;
     auto input_size = size_t(target_block.bytes() * num_rows / 
target_block.rows() *
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 4ee245af359..c95e38fb05a 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -181,8 +181,7 @@ public:
                _flush_mem_tracker->consumption();
     }
     // insert tuple from (row_pos) to (row_pos+num_rows)
-    void insert(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs,
-                bool is_append = false);
+    void insert(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs);
 
     void shrink_memtable_by_agg();
 
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 940359641ba..c7ad1590a89 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -89,13 +89,9 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> 
rowset_writer,
     return Status::OK();
 }
 
-Status MemTableWriter::append(const vectorized::Block* block) {
-    return write(block, {}, true);
-}
-
-Status MemTableWriter::write(const vectorized::Block* block, const 
std::vector<uint32_t>& row_idxs,
-                             bool is_append) {
-    if (UNLIKELY(row_idxs.empty() && !is_append)) {
+Status MemTableWriter::write(const vectorized::Block* block,
+                             const std::vector<uint32_t>& row_idxs) {
+    if (UNLIKELY(row_idxs.empty())) {
         return Status::OK();
     }
     _lock_watch.start();
@@ -112,12 +108,8 @@ Status MemTableWriter::write(const vectorized::Block* 
block, const std::vector<u
                                              _req.tablet_id, 
_req.load_id.hi(), _req.load_id.lo());
     }
 
-    if (is_append) {
-        _total_received_rows += block->rows();
-    } else {
-        _total_received_rows += row_idxs.size();
-    }
-    _mem_table->insert(block, row_idxs, is_append);
+    _total_received_rows += row_idxs.size();
+    _mem_table->insert(block, row_idxs);
 
     if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) {
         _mem_table->shrink_memtable_by_agg();
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index 6d935e75f3d..b34fe0baee4 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -71,10 +71,7 @@ public:
                 std::shared_ptr<PartialUpdateInfo> partial_update_info,
                 ThreadPool* wg_flush_pool_ptr, bool unique_key_mow = false);
 
-    Status write(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs,
-                 bool is_append = false);
-
-    Status append(const vectorized::Block* block);
+    Status write(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs);
 
     // flush the last memtable to flush queue, must call it before close_wait()
     Status close();
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 7ed0a9a8e05..526c979968d 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -558,19 +558,11 @@ Status BaseTabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& request
         return Status::OK();
     };
 
-    if (request.is_single_tablet_block()) {
-        SCOPED_TIMER(_write_block_timer);
-        RETURN_IF_ERROR(write_tablet_data(request.tablet_ids(0), 
[&](BaseDeltaWriter* writer) {
-            return writer->append(&send_data);
+    SCOPED_TIMER(_write_block_timer);
+    for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
+        RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first, 
[&](BaseDeltaWriter* writer) {
+            return writer->write(&send_data, tablet_to_rowidxs_it.second);
         }));
-    } else {
-        SCOPED_TIMER(_write_block_timer);
-        for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
-            RETURN_IF_ERROR(
-                    write_tablet_data(tablet_to_rowidxs_it.first, 
[&](BaseDeltaWriter* writer) {
-                        return writer->write(&send_data, 
tablet_to_rowidxs_it.second);
-                    }));
-        }
     }
 
     {
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 8d3a4c21168..c2a029e7870 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -484,7 +484,7 @@ Status VNodeChannel::open_wait() {
     return status;
 }
 
-Status VNodeChannel::add_block(vectorized::Block* block, const Payload* 
payload, bool is_append) {
+Status VNodeChannel::add_block(vectorized::Block* block, const Payload* 
payload) {
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
     if (payload->second.empty()) {
         return Status::OK();
@@ -517,56 +517,12 @@ Status VNodeChannel::add_block(vectorized::Block* block, 
const Payload* payload,
     }
 
     SCOPED_RAW_TIMER(&_stat.append_node_channel_ns);
-    if (is_append) {
-        if (_cur_mutable_block && !_cur_mutable_block->empty()) {
-            // When is-append is true, the previous block may not have been 
sent out yet.
-            // (e.x. The previous block is not load to single tablet, and its 
row num was
-            // 4064, which is smaller than the send batch size 8192).
-            // If we clear the previous block directly here, it will cause 
data loss.
-            {
-                SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
-                std::lock_guard<std::mutex> l(_pending_batches_lock);
-                // To simplify the add_row logic, postpone adding block into 
req until the time of sending req
-                _pending_batches_bytes += 
_cur_mutable_block->allocated_bytes();
-                _cur_add_block_request->set_eos(
-                        false); // for multi-add, only when marking close we 
set it eos.
-                // Copy the request to tmp request to add to pend block queue
-                auto tmp_add_block_request = 
std::make_shared<PTabletWriterAddBlockRequest>();
-                *tmp_add_block_request = *_cur_add_block_request;
-                _pending_blocks.emplace(std::move(_cur_mutable_block), 
tmp_add_block_request);
-                _pending_batches_num++;
-                VLOG_DEBUG << "VTabletWriter:" << _parent << " VNodeChannel:" 
<< this
-                           << " pending_batches_bytes:" << 
_pending_batches_bytes
-                           << " jobid:" << 
std::to_string(_state->load_job_id())
-                           << " loadinfo:" << _load_info;
-            }
-            _cur_mutable_block = 
vectorized::MutableBlock::create_unique(block->clone_empty());
-            _cur_add_block_request->clear_tablet_ids();
-        }
-        // Do not split the data of the block by tablets but append it to a 
single delta writer.
-        // This is a faster way to send block than append_to_block_by_selector
-        // TODO: we could write to local delta writer if single_replica_load 
is true
-        VLOG_DEBUG << "send whole block by append block";
-        std::vector<int64_t> tablets(block->rows(), payload->second[0]);
-        vectorized::MutableColumns& columns = 
_cur_mutable_block->mutable_columns();
-        columns.clear();
-        columns.reserve(block->columns());
-        // Hold the reference of block columns to avoid copying
-        for (auto column : block->get_columns()) {
-            columns.push_back(std::move(*column).mutate());
-        }
-        *_cur_add_block_request->mutable_tablet_ids() = {tablets.begin(), 
tablets.end()};
-        _cur_add_block_request->set_is_single_tablet_block(true);
-    } else {
-        block->append_to_block_by_selector(_cur_mutable_block.get(), 
*(payload->first));
-        for (auto tablet_id : payload->second) {
-            _cur_add_block_request->add_tablet_ids(tablet_id);
-        }
-        // need to reset to false avoid load data to incorrect tablet.
-        _cur_add_block_request->set_is_single_tablet_block(false);
+    block->append_to_block_by_selector(_cur_mutable_block.get(), 
*(payload->first));
+    for (auto tablet_id : payload->second) {
+        _cur_add_block_request->add_tablet_ids(tablet_id);
     }
 
-    if (is_append || _cur_mutable_block->rows() >= _batch_size ||
+    if (_cur_mutable_block->rows() >= _batch_size ||
         _cur_mutable_block->bytes() > config::doris_scanner_row_bytes) {
         {
             SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
@@ -1680,35 +1636,12 @@ Status VTabletWriter::write(doris::vectorized::Block& 
input_block) {
     _generate_index_channels_payloads(_row_part_tablet_ids, 
channel_to_payload);
     _row_distribution_watch.stop();
 
-    // Random distribution and the block belongs to a single tablet, we could 
optimize to append the whole
-    // block into node channel.
-    bool load_block_to_single_tablet =
-            !_vpartition->is_auto_partition() && 
_tablet_finder->is_single_tablet();
-    if (load_block_to_single_tablet) {
-        SCOPED_RAW_TIMER(&_filter_ns);
-        // Filter block
-        if (has_filtered_rows) {
-            auto filter = vectorized::ColumnUInt8::create(block->rows(), 0);
-            vectorized::UInt8* filter_data =
-                    
static_cast<vectorized::ColumnUInt8*>(filter.get())->get_data().data();
-            vectorized::IColumn::Filter& filter_col =
-                    
static_cast<vectorized::ColumnUInt8*>(filter.get())->get_data();
-            for (size_t i = 0; i < filter_col.size(); ++i) {
-                filter_data[i] = !_block_convertor->filter_map()[i];
-            }
-            RETURN_IF_CATCH_EXCEPTION(vectorized::Block::filter_block_internal(
-                    block.get(), filter_col, block->columns()));
-        }
-    }
-
     // Add block to node channel
     for (size_t i = 0; i < _channels.size(); i++) {
         for (const auto& entry : channel_to_payload[i]) {
             // if this node channel is already failed, this add_row will be 
skipped
-            auto st = entry.first->add_block(
-                    block.get(), &entry.second, // entry.second is a [row -> 
tablet] mapping
-                    // if it is load single tablet, then append this whole 
block
-                    load_block_to_single_tablet);
+            // entry.second is a [row -> tablet] mapping
+            auto st = entry.first->add_block(block.get(), &entry.second);
             if (!st.ok()) {
                 _channels[i]->mark_as_failed(entry.first, st.to_string());
             }
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index 32539712e3b..bcc5228457a 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -244,7 +244,7 @@ public:
     // this function will called multi times. NON_REENTRANT
     Status open_wait();
 
-    Status add_block(vectorized::Block* block, const Payload* payload, bool 
is_append = false);
+    Status add_block(vectorized::Block* block, const Payload* payload);
 
     // @return: 1 if running, 0 if finished.
     // @caller: VOlapTabletSink::_send_batch_process. it's a continual 
asynchronous process.
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index ea7fed96d6d..b883d8e87c9 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -456,7 +456,7 @@ Status 
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
         
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
     }
     SCOPED_TIMER(_write_memtable_timer);
-    auto st = delta_writer->write(block.get(), rows.row_idxes, false);
+    auto st = delta_writer->write(block.get(), rows.row_idxes);
     return st;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to