This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 2d5cd744dfa [Refactor](Sink) Remove is_append mode in table sink
(#34549)
2d5cd744dfa is described below
commit 2d5cd744dfa67bb0f69616556c96d79f27dd1219
Author: lihangyu <[email protected]>
AuthorDate: Wed May 8 20:31:57 2024 +0800
[Refactor](Sink) Remove is_append mode in table sink (#34549)
---
be/src/olap/delta_writer.cpp | 17 +++----------
be/src/olap/delta_writer.h | 5 +---
be/src/olap/memtable.cpp | 11 ++------
be/src/olap/memtable.h | 3 +--
be/src/runtime/tablets_channel.cpp | 18 +++-----------
be/src/vec/sink/vtablet_sink.cpp | 51 +++++---------------------------------
be/src/vec/sink/vtablet_sink.h | 2 +-
7 files changed, 19 insertions(+), 88 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 594bc7b630c..f33040de2cc 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -237,13 +237,8 @@ Status DeltaWriter::init() {
return Status::OK();
}
-Status DeltaWriter::append(const vectorized::Block* block) {
- return write(block, {}, true);
-}
-
-Status DeltaWriter::write(const vectorized::Block* block, const
std::vector<int>& row_idxs,
- bool is_append) {
- if (UNLIKELY(row_idxs.empty() && !is_append)) {
+Status DeltaWriter::write(const vectorized::Block* block, const
std::vector<int>& row_idxs) {
+ if (UNLIKELY(row_idxs.empty())) {
return Status::OK();
}
_lock_watch.start();
@@ -263,12 +258,8 @@ Status DeltaWriter::write(const vectorized::Block* block,
const std::vector<int>
_req.load_id.hi(), _req.load_id.lo(), _req.txn_id);
}
- if (is_append) {
- _total_received_rows += block->rows();
- } else {
- _total_received_rows += row_idxs.size();
- }
- _mem_table->insert(block, row_idxs, is_append);
+ _total_received_rows += row_idxs.size();
+ _mem_table->insert(block, row_idxs);
if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) {
_mem_table->shrink_memtable_by_agg();
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index ede5ca1f03b..497881d585f 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -85,10 +85,7 @@ public:
Status init();
- Status write(const vectorized::Block* block, const std::vector<int>&
row_idxs,
- bool is_append = false);
-
- Status append(const vectorized::Block* block);
+ Status write(const vectorized::Block* block, const std::vector<int>&
row_idxs);
// flush the last memtable to flush queue, must call it before
build_rowset()
Status close();
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 5d272c1a754..8fe58c686be 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -185,8 +185,7 @@ int RowInBlockComparator::operator()(const RowInBlock*
left, const RowInBlock* r
*_pblock, -1);
}
-void MemTable::insert(const vectorized::Block* input_block, const
std::vector<int>& row_idxs,
- bool is_append) {
+void MemTable::insert(const vectorized::Block* input_block, const
std::vector<int>& row_idxs) {
SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get());
vectorized::Block target_block = *input_block;
if (!_tablet_schema->is_dynamic_schema()) {
@@ -222,13 +221,7 @@ void MemTable::insert(const vectorized::Block*
input_block, const std::vector<in
auto num_rows = row_idxs.size();
size_t cursor_in_mutableblock = _input_mutable_block.rows();
- if (is_append) {
- // Append the block, call insert range from
- _input_mutable_block.add_rows(&target_block, 0, target_block.rows());
- num_rows = target_block.rows();
- } else {
- _input_mutable_block.add_rows(&target_block, row_idxs.data(),
row_idxs.data() + num_rows);
- }
+ _input_mutable_block.add_rows(&target_block, row_idxs.data(),
row_idxs.data() + num_rows);
size_t input_size = target_block.allocated_bytes() * num_rows /
target_block.rows();
_mem_usage += input_size;
_insert_mem_tracker->consume(input_size);
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 7d74b8ce43f..016f11f61ac 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -188,8 +188,7 @@ public:
_flush_mem_tracker->consumption();
}
// insert tuple from (row_pos) to (row_pos+num_rows)
- void insert(const vectorized::Block* block, const std::vector<int>&
row_idxs,
- bool is_append = false);
+ void insert(const vectorized::Block* block, const std::vector<int>&
row_idxs);
void shrink_memtable_by_agg();
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 17c84956f69..5293200d532 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -448,9 +448,6 @@ Status TabletsChannel::add_batch(const
PTabletWriterAddBlockRequest& request,
std::unordered_map<int64_t /* tablet_id */, std::vector<int> /* row index
*/> tablet_to_rowidxs;
for (int i = 0; i < request.tablet_ids_size(); ++i) {
- if (request.is_single_tablet_block()) {
- break;
- }
int64_t tablet_id = request.tablet_ids(i);
if (_is_broken_tablet(tablet_id)) {
// skip broken tablets
@@ -496,18 +493,11 @@ Status TabletsChannel::add_batch(const
PTabletWriterAddBlockRequest& request,
return Status::OK();
};
- if (request.is_single_tablet_block()) {
- SCOPED_TIMER(_write_block_timer);
- RETURN_IF_ERROR(write_tablet_data(request.tablet_ids(0),
[&](DeltaWriter* writer) {
- return writer->append(&send_data);
+ SCOPED_TIMER(_write_block_timer);
+ for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
+ RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first,
[&](DeltaWriter* writer) {
+ return writer->write(&send_data, tablet_to_rowidxs_it.second);
}));
- } else {
- SCOPED_TIMER(_write_block_timer);
- for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
- RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first,
[&](DeltaWriter* writer) {
- return writer->write(&send_data, tablet_to_rowidxs_it.second);
- }));
- }
}
{
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 4fade8c7cdd..862c8dae7e2 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -525,7 +525,7 @@ Status VNodeChannel::open_wait() {
return status;
}
-Status VNodeChannel::add_block(vectorized::Block* block, const Payload*
payload, bool is_append) {
+Status VNodeChannel::add_block(vectorized::Block* block, const Payload*
payload) {
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
if (payload->second.empty()) {
return Status::OK();
@@ -604,50 +604,12 @@ Status VNodeChannel::add_block(vectorized::Block* block,
const Payload* payload,
}
SCOPED_RAW_TIMER(&_stat.append_node_channel_ns);
- if (is_append) {
- if (_cur_mutable_block && !_cur_mutable_block->empty()) {
- // When is-append is true, the previous block may not have been
sent out yet.
- // (e.x. The previous block is not load to single tablet, and its
row num was
- // 4064, which is smaller than the send batch size 8192).
- // If we clear the previous block directly here, it will cause
data loss.
- {
- SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
- std::lock_guard<std::mutex> l(_pending_batches_lock);
- _pending_batches_bytes +=
_cur_mutable_block->allocated_bytes();
- _pending_blocks.emplace(std::move(_cur_mutable_block),
_cur_add_block_request);
- _pending_batches_num++;
- VLOG_DEBUG << "VOlapTableSink:" << _parent << " VNodeChannel:"
<< this
- << " pending_batches_bytes:" <<
_pending_batches_bytes
- << " jobid:" <<
std::to_string(_state->load_job_id())
- << " loadinfo:" << _load_info;
- }
- _cur_mutable_block =
vectorized::MutableBlock::create_unique(block->clone_empty());
- _cur_add_block_request.clear_tablet_ids();
- }
- // Do not split the data of the block by tablets but append it to a
single delta writer.
- // This is a faster way to send block than append_block_by_selector
- // TODO: we could write to local delta writer if single_replica_load
is true
- VLOG_DEBUG << "send whole block by append block";
- std::vector<int64_t> tablets(block->rows(), payload->second[0]);
- vectorized::MutableColumns& columns =
_cur_mutable_block->mutable_columns();
- columns.clear();
- columns.reserve(block->columns());
- // Hold the reference of block columns to avoid copying
- for (auto column : block->get_columns()) {
- columns.push_back(column->assume_mutable());
- }
- *_cur_add_block_request.mutable_tablet_ids() = {tablets.begin(),
tablets.end()};
- _cur_add_block_request.set_is_single_tablet_block(true);
- } else {
- block->append_block_by_selector(_cur_mutable_block.get(),
*(payload->first));
- for (auto tablet_id : payload->second) {
- _cur_add_block_request.add_tablet_ids(tablet_id);
- }
- // need to reset to false avoid load data to incorrect tablet.
- _cur_add_block_request.set_is_single_tablet_block(false);
+ block->append_block_by_selector(_cur_mutable_block.get(),
*(payload->first));
+ for (auto tablet_id : payload->second) {
+ _cur_add_block_request.add_tablet_ids(tablet_id);
}
- if (is_append || _cur_mutable_block->rows() >= _batch_size ||
+ if (_cur_mutable_block->rows() >= _batch_size ||
_cur_mutable_block->bytes() > config::doris_scanner_row_bytes) {
{
SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
@@ -1454,9 +1416,8 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block,
for (const auto& entry : channel_to_payload[i]) {
// if this node channel is already failed, this add_row will be
skipped
auto st = entry.first->add_block(
- &block, &entry.second,
// if it is load single tablet, then append this whole
block
- load_block_to_single_tablet);
+ &block, &entry.second);
if (!st.ok()) {
_channels[i]->mark_as_failed(entry.first, st.to_string());
}
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 6b7e0187c49..abdf44a7e43 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -229,7 +229,7 @@ public:
Status open_wait();
- Status add_block(vectorized::Block* block, const Payload* payload, bool
is_append = false);
+ Status add_block(vectorized::Block* block, const Payload* payload);
int try_send_and_fetch_status(RuntimeState* state,
std::unique_ptr<ThreadPoolToken>&
thread_pool_token);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]