This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 0a79c547ffb [Refactor](Sink) Remove is_append mode in table sink
(#34684)
0a79c547ffb is described below
commit 0a79c547ffba56b8a4a0696c617d76a7d4f95624
Author: lihangyu <[email protected]>
AuthorDate: Sat May 11 11:20:10 2024 +0800
[Refactor](Sink) Remove is_append mode in table sink (#34684)
Remove the is_append mode from the sink component due to the following
reasons:
1. The performance improvement from this mode is relatively minor,
approximately 10%, as demonstrated in previous benchmarks.
2. The mode complicates maintenance. It requires a separate data writing
path to avoid copying, which increases complexity and poses a risk of potential
data loss.
I've already test the compability with previous version
---
be/src/olap/delta_writer.cpp | 11 ++--
be/src/olap/delta_writer.h | 5 +-
be/src/olap/delta_writer_v2.cpp | 11 ++--
be/src/olap/delta_writer_v2.h | 5 +-
be/src/olap/memtable.cpp | 11 +---
be/src/olap/memtable.h | 3 +-
be/src/olap/memtable_writer.cpp | 18 ++-----
be/src/olap/memtable_writer.h | 5 +-
be/src/runtime/tablets_channel.cpp | 16 ++----
be/src/vec/sink/writer/vtablet_writer.cpp | 81 +++-------------------------
be/src/vec/sink/writer/vtablet_writer.h | 2 +-
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 2 +-
12 files changed, 30 insertions(+), 140 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 53d68b9d3d3..1ec4dd17313 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -112,13 +112,8 @@ Status BaseDeltaWriter::init() {
return Status::OK();
}
-Status BaseDeltaWriter::append(const vectorized::Block* block) {
- return write(block, {}, true);
-}
-
-Status BaseDeltaWriter::write(const vectorized::Block* block, const
std::vector<uint32_t>& row_idxs,
- bool is_append) {
- if (UNLIKELY(row_idxs.empty() && !is_append)) {
+Status BaseDeltaWriter::write(const vectorized::Block* block, const
std::vector<uint32_t>& row_idxs) {
+ if (UNLIKELY(row_idxs.empty())) {
return Status::OK();
}
_lock_watch.start();
@@ -134,7 +129,7 @@ Status BaseDeltaWriter::write(const vectorized::Block*
block, const std::vector<
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
- return _memtable_writer->write(block, row_idxs, is_append);
+ return _memtable_writer->write(block, row_idxs);
}
Status BaseDeltaWriter::wait_flush() {
return _memtable_writer->wait_flush();
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index d2419d2d811..9dbcc0a62d7 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -65,10 +65,7 @@ public:
Status init();
- Status write(const vectorized::Block* block, const std::vector<uint32_t>&
row_idxs,
- bool is_append = false);
-
- Status append(const vectorized::Block* block);
+ Status write(const vectorized::Block* block, const std::vector<uint32_t>&
row_idxs);
// flush the last memtable to flush queue, must call it before
build_rowset()
Status close();
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index cae6ab88012..51cef7e9f58 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -139,13 +139,8 @@ Status DeltaWriterV2::init() {
return Status::OK();
}
-Status DeltaWriterV2::append(const vectorized::Block* block) {
- return write(block, {}, true);
-}
-
-Status DeltaWriterV2::write(const vectorized::Block* block, const
std::vector<uint32_t>& row_idxs,
- bool is_append) {
- if (UNLIKELY(row_idxs.empty() && !is_append)) {
+Status DeltaWriterV2::write(const vectorized::Block* block, const
std::vector<uint32_t>& row_idxs) {
+ if (UNLIKELY(row_idxs.empty())) {
return Status::OK();
}
_lock_watch.start();
@@ -167,7 +162,7 @@ Status DeltaWriterV2::write(const vectorized::Block* block,
const std::vector<ui
}
}
SCOPED_RAW_TIMER(&_write_memtable_time);
- return _memtable_writer->write(block, row_idxs, is_append);
+ return _memtable_writer->write(block, row_idxs);
}
Status DeltaWriterV2::close() {
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index e9e051608d2..31b364e1038 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -71,10 +71,7 @@ public:
Status init();
- Status write(const vectorized::Block* block, const std::vector<uint32_t>&
row_idxs,
- bool is_append = false);
-
- Status append(const vectorized::Block* block);
+ Status write(const vectorized::Block* block, const std::vector<uint32_t>&
row_idxs);
// flush the last memtable to flush queue, must call it before close_wait()
Status close();
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 87ae20237e5..123eb7d8264 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -169,8 +169,7 @@ int RowInBlockComparator::operator()(const RowInBlock*
left, const RowInBlock* r
*_pblock, -1);
}
-void MemTable::insert(const vectorized::Block* input_block, const
std::vector<uint32_t>& row_idxs,
- bool is_append) {
+void MemTable::insert(const vectorized::Block* input_block, const
std::vector<uint32_t>& row_idxs) {
vectorized::Block target_block = *input_block;
target_block = input_block->copy_block(_column_offset);
if (_is_first_insertion) {
@@ -201,13 +200,7 @@ void MemTable::insert(const vectorized::Block*
input_block, const std::vector<ui
auto num_rows = row_idxs.size();
size_t cursor_in_mutableblock = _input_mutable_block.rows();
auto block_size0 = _input_mutable_block.allocated_bytes();
- if (is_append) {
- // Append the block, call insert range from
- _input_mutable_block.add_rows(&target_block, 0, target_block.rows());
- num_rows = target_block.rows();
- } else {
- _input_mutable_block.add_rows(&target_block, row_idxs.data(),
row_idxs.data() + num_rows);
- }
+ _input_mutable_block.add_rows(&target_block, row_idxs.data(),
row_idxs.data() + num_rows);
auto block_size1 = _input_mutable_block.allocated_bytes();
g_memtable_input_block_allocated_size << block_size1 - block_size0;
auto input_size = size_t(target_block.bytes() * num_rows /
target_block.rows() *
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 4ee245af359..c95e38fb05a 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -181,8 +181,7 @@ public:
_flush_mem_tracker->consumption();
}
// insert tuple from (row_pos) to (row_pos+num_rows)
- void insert(const vectorized::Block* block, const std::vector<uint32_t>&
row_idxs,
- bool is_append = false);
+ void insert(const vectorized::Block* block, const std::vector<uint32_t>&
row_idxs);
void shrink_memtable_by_agg();
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 940359641ba..c7ad1590a89 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -89,13 +89,9 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter>
rowset_writer,
return Status::OK();
}
-Status MemTableWriter::append(const vectorized::Block* block) {
- return write(block, {}, true);
-}
-
-Status MemTableWriter::write(const vectorized::Block* block, const
std::vector<uint32_t>& row_idxs,
- bool is_append) {
- if (UNLIKELY(row_idxs.empty() && !is_append)) {
+Status MemTableWriter::write(const vectorized::Block* block,
+ const std::vector<uint32_t>& row_idxs) {
+ if (UNLIKELY(row_idxs.empty())) {
return Status::OK();
}
_lock_watch.start();
@@ -112,12 +108,8 @@ Status MemTableWriter::write(const vectorized::Block*
block, const std::vector<u
_req.tablet_id,
_req.load_id.hi(), _req.load_id.lo());
}
- if (is_append) {
- _total_received_rows += block->rows();
- } else {
- _total_received_rows += row_idxs.size();
- }
- _mem_table->insert(block, row_idxs, is_append);
+ _total_received_rows += row_idxs.size();
+ _mem_table->insert(block, row_idxs);
if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) {
_mem_table->shrink_memtable_by_agg();
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index 6d935e75f3d..b34fe0baee4 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -71,10 +71,7 @@ public:
std::shared_ptr<PartialUpdateInfo> partial_update_info,
ThreadPool* wg_flush_pool_ptr, bool unique_key_mow = false);
- Status write(const vectorized::Block* block, const std::vector<uint32_t>&
row_idxs,
- bool is_append = false);
-
- Status append(const vectorized::Block* block);
+ Status write(const vectorized::Block* block, const std::vector<uint32_t>&
row_idxs);
// flush the last memtable to flush queue, must call it before close_wait()
Status close();
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 7ed0a9a8e05..526c979968d 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -558,19 +558,11 @@ Status BaseTabletsChannel::add_batch(const
PTabletWriterAddBlockRequest& request
return Status::OK();
};
- if (request.is_single_tablet_block()) {
- SCOPED_TIMER(_write_block_timer);
- RETURN_IF_ERROR(write_tablet_data(request.tablet_ids(0),
[&](BaseDeltaWriter* writer) {
- return writer->append(&send_data);
+ SCOPED_TIMER(_write_block_timer);
+ for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
+ RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first,
[&](BaseDeltaWriter* writer) {
+ return writer->write(&send_data, tablet_to_rowidxs_it.second);
}));
- } else {
- SCOPED_TIMER(_write_block_timer);
- for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
- RETURN_IF_ERROR(
- write_tablet_data(tablet_to_rowidxs_it.first,
[&](BaseDeltaWriter* writer) {
- return writer->write(&send_data,
tablet_to_rowidxs_it.second);
- }));
- }
}
{
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 8d3a4c21168..c2a029e7870 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -484,7 +484,7 @@ Status VNodeChannel::open_wait() {
return status;
}
-Status VNodeChannel::add_block(vectorized::Block* block, const Payload*
payload, bool is_append) {
+Status VNodeChannel::add_block(vectorized::Block* block, const Payload*
payload) {
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
if (payload->second.empty()) {
return Status::OK();
@@ -517,56 +517,12 @@ Status VNodeChannel::add_block(vectorized::Block* block,
const Payload* payload,
}
SCOPED_RAW_TIMER(&_stat.append_node_channel_ns);
- if (is_append) {
- if (_cur_mutable_block && !_cur_mutable_block->empty()) {
- // When is-append is true, the previous block may not have been
sent out yet.
- // (e.x. The previous block is not load to single tablet, and its
row num was
- // 4064, which is smaller than the send batch size 8192).
- // If we clear the previous block directly here, it will cause
data loss.
- {
- SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
- std::lock_guard<std::mutex> l(_pending_batches_lock);
- // To simplify the add_row logic, postpone adding block into
req until the time of sending req
- _pending_batches_bytes +=
_cur_mutable_block->allocated_bytes();
- _cur_add_block_request->set_eos(
- false); // for multi-add, only when marking close we
set it eos.
- // Copy the request to tmp request to add to pend block queue
- auto tmp_add_block_request =
std::make_shared<PTabletWriterAddBlockRequest>();
- *tmp_add_block_request = *_cur_add_block_request;
- _pending_blocks.emplace(std::move(_cur_mutable_block),
tmp_add_block_request);
- _pending_batches_num++;
- VLOG_DEBUG << "VTabletWriter:" << _parent << " VNodeChannel:"
<< this
- << " pending_batches_bytes:" <<
_pending_batches_bytes
- << " jobid:" <<
std::to_string(_state->load_job_id())
- << " loadinfo:" << _load_info;
- }
- _cur_mutable_block =
vectorized::MutableBlock::create_unique(block->clone_empty());
- _cur_add_block_request->clear_tablet_ids();
- }
- // Do not split the data of the block by tablets but append it to a
single delta writer.
- // This is a faster way to send block than append_to_block_by_selector
- // TODO: we could write to local delta writer if single_replica_load
is true
- VLOG_DEBUG << "send whole block by append block";
- std::vector<int64_t> tablets(block->rows(), payload->second[0]);
- vectorized::MutableColumns& columns =
_cur_mutable_block->mutable_columns();
- columns.clear();
- columns.reserve(block->columns());
- // Hold the reference of block columns to avoid copying
- for (auto column : block->get_columns()) {
- columns.push_back(std::move(*column).mutate());
- }
- *_cur_add_block_request->mutable_tablet_ids() = {tablets.begin(),
tablets.end()};
- _cur_add_block_request->set_is_single_tablet_block(true);
- } else {
- block->append_to_block_by_selector(_cur_mutable_block.get(),
*(payload->first));
- for (auto tablet_id : payload->second) {
- _cur_add_block_request->add_tablet_ids(tablet_id);
- }
- // need to reset to false avoid load data to incorrect tablet.
- _cur_add_block_request->set_is_single_tablet_block(false);
+ block->append_to_block_by_selector(_cur_mutable_block.get(),
*(payload->first));
+ for (auto tablet_id : payload->second) {
+ _cur_add_block_request->add_tablet_ids(tablet_id);
}
- if (is_append || _cur_mutable_block->rows() >= _batch_size ||
+ if (_cur_mutable_block->rows() >= _batch_size ||
_cur_mutable_block->bytes() > config::doris_scanner_row_bytes) {
{
SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
@@ -1680,35 +1636,12 @@ Status VTabletWriter::write(doris::vectorized::Block&
input_block) {
_generate_index_channels_payloads(_row_part_tablet_ids,
channel_to_payload);
_row_distribution_watch.stop();
- // Random distribution and the block belongs to a single tablet, we could
optimize to append the whole
- // block into node channel.
- bool load_block_to_single_tablet =
- !_vpartition->is_auto_partition() &&
_tablet_finder->is_single_tablet();
- if (load_block_to_single_tablet) {
- SCOPED_RAW_TIMER(&_filter_ns);
- // Filter block
- if (has_filtered_rows) {
- auto filter = vectorized::ColumnUInt8::create(block->rows(), 0);
- vectorized::UInt8* filter_data =
-
static_cast<vectorized::ColumnUInt8*>(filter.get())->get_data().data();
- vectorized::IColumn::Filter& filter_col =
-
static_cast<vectorized::ColumnUInt8*>(filter.get())->get_data();
- for (size_t i = 0; i < filter_col.size(); ++i) {
- filter_data[i] = !_block_convertor->filter_map()[i];
- }
- RETURN_IF_CATCH_EXCEPTION(vectorized::Block::filter_block_internal(
- block.get(), filter_col, block->columns()));
- }
- }
-
// Add block to node channel
for (size_t i = 0; i < _channels.size(); i++) {
for (const auto& entry : channel_to_payload[i]) {
// if this node channel is already failed, this add_row will be
skipped
- auto st = entry.first->add_block(
- block.get(), &entry.second, // entry.second is a [row ->
tablet] mapping
- // if it is load single tablet, then append this whole
block
- load_block_to_single_tablet);
+ // entry.second is a [row -> tablet] mapping
+ auto st = entry.first->add_block(block.get(), &entry.second);
if (!st.ok()) {
_channels[i]->mark_as_failed(entry.first, st.to_string());
}
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index 32539712e3b..bcc5228457a 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -244,7 +244,7 @@ public:
// this function will called multi times. NON_REENTRANT
Status open_wait();
- Status add_block(vectorized::Block* block, const Payload* payload, bool
is_append = false);
+ Status add_block(vectorized::Block* block, const Payload* payload);
// @return: 1 if running, 0 if finished.
// @caller: VOlapTabletSink::_send_batch_process. it's a continual
asynchronous process.
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index ea7fed96d6d..b883d8e87c9 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -456,7 +456,7 @@ Status
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
}
SCOPED_TIMER(_write_memtable_timer);
- auto st = delta_writer->write(block.get(), rows.row_idxes, false);
+ auto st = delta_writer->write(block.get(), rows.row_idxes);
return st;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]