This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 190debaac9 [Improvement](load) single partition load optimize (#20876)
190debaac9 is described below
commit 190debaac92cade94e3101cb673bfb09f0c34472
Author: zzzxl <[email protected]>
AuthorDate: Tue Jun 20 20:29:39 2023 +0800
[Improvement](load) single partition load optimize (#20876)
1. When creating a single partition,partition and tablet are not looked up
for each row of data
2. Only DISTRIBUTED BY random
---
be/src/vec/sink/vtablet_sink.cpp | 94 ++++++++++++++++++++++++++++++++--------
be/src/vec/sink/vtablet_sink.h | 3 ++
2 files changed, 79 insertions(+), 18 deletions(-)
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index e8871bae40..25bb88dfb7 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -36,9 +36,11 @@
#include <algorithm>
#include <iterator>
+#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
+#include <utility>
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
@@ -1276,6 +1278,56 @@ void VOlapTableSink::_generate_row_distribution_payload(
}
}
+Status VOlapTableSink::_single_partition_generate(RuntimeState* state,
vectorized::Block* block,
+ ChannelDistributionPayload&
channel_to_payload,
+ size_t num_rows, int32_t
filtered_rows) {
+ const VOlapTablePartition* partition = nullptr;
+ uint32_t tablet_index = 0;
+ bool stop_processing = false;
+ for (int32_t i = 0; i < num_rows; ++i) {
+ if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+ continue;
+ }
+ bool is_continue = false;
+ RETURN_IF_ERROR(find_tablet(state, block, i, &partition, tablet_index,
stop_processing,
+ is_continue));
+ if (is_continue) {
+ continue;
+ }
+ if (config::enable_lazy_open_partition) {
+ _open_partition(partition);
+ }
+ break;
+ }
+ for (int j = 0; j < partition->indexes.size(); ++j) {
+ auto tid = partition->indexes[j].tablets[tablet_index];
+ auto it = _channels[j]->_channels_by_tablet.find(tid);
+ DCHECK(it != _channels[j]->_channels_by_tablet.end())
+ << "unknown tablet, tablet_id=" << tablet_index;
+ int64_t row_cnt = 0;
+ for (const auto& channel : it->second) {
+ if (channel_to_payload[j].count(channel.get()) < 1) {
+ channel_to_payload[j].insert(
+ {channel.get(), Payload
{std::unique_ptr<vectorized::IColumn::Selector>(
+ new
vectorized::IColumn::Selector()),
+ std::vector<int64_t>()}});
+ }
+ auto& selector = channel_to_payload[j][channel.get()].first;
+ auto& tablet_ids = channel_to_payload[j][channel.get()].second;
+ for (int32_t i = 0; i < num_rows; ++i) {
+ if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+ continue;
+ }
+ selector->push_back(i);
+ }
+ tablet_ids.resize(selector->size(), tid);
+ row_cnt = selector->size();
+ }
+ _number_output_rows += row_cnt;
+ }
+ return Status::OK();
+}
+
Status VOlapTableSink::send(RuntimeState* state, vectorized::Block*
input_block, bool eos) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Status status = Status::OK();
@@ -1328,24 +1380,30 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block,
_partition_to_tablet_map.clear();
}
_row_distribution_watch.start();
- for (int i = 0; i < num_rows; ++i) {
- if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
- continue;
- }
- const VOlapTablePartition* partition = nullptr;
- bool is_continue = false;
- uint32_t tablet_index = 0;
- RETURN_IF_ERROR(find_tablet(state, &block, i, &partition,
tablet_index, stop_processing,
- is_continue));
- if (is_continue) {
- continue;
- }
- // each row
- _generate_row_distribution_payload(channel_to_payload, partition,
tablet_index, i, 1);
- // open partition
- if (config::enable_lazy_open_partition) {
- // aysnc open operation,don't block send operation
- _open_partition(partition);
+ size_t partition_num = _vpartition->get_partitions().size();
+ if (partition_num == 1 && findTabletMode ==
FindTabletMode::FIND_TABLET_EVERY_SINK) {
+ RETURN_IF_ERROR(_single_partition_generate(state, &block,
channel_to_payload, num_rows,
+ filtered_rows));
+ } else {
+ for (int i = 0; i < num_rows; ++i) {
+ if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+ continue;
+ }
+ const VOlapTablePartition* partition = nullptr;
+ bool is_continue = false;
+ uint32_t tablet_index = 0;
+ RETURN_IF_ERROR(find_tablet(state, &block, i, &partition,
tablet_index, stop_processing,
+ is_continue));
+ if (is_continue) {
+ continue;
+ }
+ // each row
+ _generate_row_distribution_payload(channel_to_payload, partition,
tablet_index, i, 1);
+ // open partition
+ if (config::enable_lazy_open_partition) {
+ // aysnc open operation,don't block send operation
+ _open_partition(partition);
+ }
}
}
_row_distribution_watch.stop();
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 1e91f4247f..6a6d2f6038 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -490,6 +490,9 @@ private:
void _generate_row_distribution_payload(ChannelDistributionPayload&
payload,
const VOlapTablePartition*
partition,
uint32_t tablet_index, int
row_idx, size_t row_cnt);
+ Status _single_partition_generate(RuntimeState* state, vectorized::Block*
block,
+ ChannelDistributionPayload&
channel_to_payload,
+ size_t num_rows, int32_t filtered_rows);
// make input data valid for OLAP table
// return number of invalid/filtered rows.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]