This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 16644eff7f8 [opt](load) optimize the performance of row distribution
(#25546)
16644eff7f8 is described below
commit 16644eff7f8c882ad330ed88e25b23cdcdcce7ed
Author: zclllyybb <[email protected]>
AuthorDate: Tue Nov 7 10:04:59 2023 +0800
[opt](load) optimize the performance of row distribution (#25546)
For non-pipeline non-sinkv2:
before: 14s
now: 6s-
For pipeline + sinkv2:
before: 230ms *48 instances
now: 38ms *48 instances
---
be/src/exec/tablet_info.cpp | 64 +----------
be/src/exec/tablet_info.h | 78 ++++++++++++-
be/src/vec/sink/vtablet_finder.cpp | 76 +++++++------
be/src/vec/sink/vtablet_finder.h | 13 ++-
be/src/vec/sink/vtablet_sink_v2.cpp | 64 ++++++-----
be/src/vec/sink/vtablet_sink_v2.h | 11 +-
be/src/vec/sink/writer/vtablet_writer.cpp | 179 +++++++++++++++++-------------
be/src/vec/sink/writer/vtablet_writer.h | 11 +-
8 files changed, 285 insertions(+), 211 deletions(-)
diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index 00ec0c2e581..90d43462581 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -17,7 +17,6 @@
#include "exec/tablet_info.h"
-#include <butil/fast_rand.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Types_types.h>
@@ -26,6 +25,7 @@
#include <stddef.h>
#include <algorithm>
+#include <memory>
#include <ostream>
#include <tuple>
@@ -324,49 +324,20 @@ Status VOlapTablePartitionParam::init() {
}
}
- _partitions_map.reset(
- new std::map<BlockRowWithIndicator, VOlapTablePartition*,
VOlapTablePartKeyComparator>(
- VOlapTablePartKeyComparator(_partition_slot_locs,
_transformed_slot_locs)));
+ _partitions_map = std::make_unique<
+ std::map<BlockRowWithIndicator, VOlapTablePartition*,
VOlapTablePartKeyComparator>>(
+ VOlapTablePartKeyComparator(_partition_slot_locs,
_transformed_slot_locs));
if (_t_param.__isset.distributed_columns) {
for (auto& col : _t_param.distributed_columns) {
RETURN_IF_ERROR(find_slot_locs(col, _distributed_slot_locs,
"distributed"));
}
}
- if (_distributed_slot_locs.empty()) {
- _compute_tablet_index = [](BlockRow* key,
- const VOlapTablePartition& partition) ->
uint32_t {
- if (partition.load_tablet_idx == -1) {
- // load_to_single_tablet = false, just do random
- return butil::fast_rand() % partition.num_buckets;
- }
- // load_to_single_tablet = ture, do round-robin
- return partition.load_tablet_idx % partition.num_buckets;
- };
- } else {
- _compute_tablet_index = [this](BlockRow* key,
- const VOlapTablePartition& partition)
-> uint32_t {
- uint32_t hash_val = 0;
- for (int i = 0; i < _distributed_slot_locs.size(); ++i) {
- auto slot_desc = _slots[_distributed_slot_locs[i]];
- auto& column =
key->first->get_by_position(_distributed_slot_locs[i]).column;
- auto val = column->get_data_at(key->second);
- if (val.data != nullptr) {
- hash_val = RawValue::zlib_crc32(val.data, val.size,
slot_desc->type().type,
- hash_val);
- } else {
- hash_val = HashUtil::zlib_crc_hash_null(hash_val);
- }
- }
- return hash_val % partition.num_buckets;
- };
- }
// for both auto/non-auto partition table.
_is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED;
// initial partitions
- for (int i = 0; i < _t_param.partitions.size(); ++i) {
- const TOlapTablePartition& t_part = _t_param.partitions[i];
+ for (const auto& t_part : _t_param.partitions) {
VOlapTablePartition* part = nullptr;
RETURN_IF_ERROR(generate_partition_from(t_part, part));
_partitions.emplace_back(part);
@@ -385,26 +356,6 @@ Status VOlapTablePartitionParam::init() {
return Status::OK();
}
-bool VOlapTablePartitionParam::find_partition(BlockRow* block_row,
- const VOlapTablePartition**
partition) const {
- // block_row is gave by inserting process. So try to use transformed index.
- auto it =
- _is_in_partition
- ? _partitions_map->find(std::tuple {block_row->first,
block_row->second, true})
- : _partitions_map->upper_bound(
- std::tuple {block_row->first, block_row->second,
true});
- // for list partition it might result in default partition
- if (_is_in_partition) {
- *partition = (it != _partitions_map->end()) ? it->second :
_default_partition;
- it = _partitions_map->end();
- }
- if (it != _partitions_map->end() &&
- _part_contains(it->second, std::tuple {block_row->first,
block_row->second, true})) {
- *partition = it->second;
- }
- return (*partition != nullptr);
-}
-
bool VOlapTablePartitionParam::_part_contains(VOlapTablePartition* part,
BlockRowWithIndicator key) const
{
// start_key.second == -1 means only single partition
@@ -413,11 +364,6 @@ bool
VOlapTablePartitionParam::_part_contains(VOlapTablePartition* part,
!comparator(key, std::tuple {part->start_key.first,
part->start_key.second, false});
}
-uint32_t VOlapTablePartitionParam::find_tablet(BlockRow* block_row,
- const VOlapTablePartition&
partition) const {
- return _compute_tablet_index(block_row, partition);
-}
-
Status VOlapTablePartitionParam::_create_partition_keys(const
std::vector<TExprNode>& t_exprs,
BlockRow* part_key) {
for (int i = 0; i < t_exprs.size(); i++) {
diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h
index ec12dcbfcd3..bb9fbd8bc60 100644
--- a/be/src/exec/tablet_info.h
+++ b/be/src/exec/tablet_info.h
@@ -17,6 +17,7 @@
#pragma once
+#include <butil/fast_rand.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/descriptors.pb.h>
@@ -33,6 +34,8 @@
#include "common/object_pool.h"
#include "common/status.h"
+#include "runtime/descriptors.h"
+#include "runtime/raw_value.h"
#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
@@ -162,9 +165,78 @@ public:
int64_t version() const { return _t_param.version; }
// return true if we found this block_row in partition
- bool find_partition(BlockRow* block_row, const VOlapTablePartition**
partition) const;
+ //TODO: use virtual function to refactor it
+ ALWAYS_INLINE bool find_partition(vectorized::Block* block, int row,
+ VOlapTablePartition*& partition) const {
+ auto it = _is_in_partition ? _partitions_map->find(std::tuple {block,
row, true})
+ : _partitions_map->upper_bound(std::tuple
{block, row, true});
+ // for list partition it might result in default partition
+ if (_is_in_partition) {
+ partition = (it != _partitions_map->end()) ? it->second :
_default_partition;
+ it = _partitions_map->end();
+ }
+ if (it != _partitions_map->end() &&
+ _part_contains(it->second, std::tuple {block, row, true})) {
+ partition = it->second;
+ }
+ return (partition != nullptr);
+ }
+
+ ALWAYS_INLINE void find_tablets(
+ vectorized::Block* block, const std::vector<uint32_t>& indexes,
+ const std::vector<VOlapTablePartition*>& partitions,
+ std::vector<uint32_t>& tablet_indexes /*result*/,
+ /*TODO: check if flat hash map will be better*/
+ std::map<int64_t, int64_t>* partition_tablets_buffer = nullptr)
const {
+ std::function<uint32_t(vectorized::Block*, uint32_t, const
VOlapTablePartition&)>
+ compute_function;
+ if (!_distributed_slot_locs.empty()) {
+ //TODO: refactor by saving the hash values. then we can calculate
in columnwise.
+ compute_function = [this](vectorized::Block* block, uint32_t row,
+ const VOlapTablePartition& partition) ->
uint32_t {
+ uint32_t hash_val = 0;
+ for (unsigned short _distributed_slot_loc :
_distributed_slot_locs) {
+ auto* slot_desc = _slots[_distributed_slot_loc];
+ auto& column =
block->get_by_position(_distributed_slot_loc).column;
+ auto val = column->get_data_at(row);
+ if (val.data != nullptr) {
+ hash_val = RawValue::zlib_crc32(val.data, val.size,
slot_desc->type().type,
+ hash_val);
+ } else {
+ hash_val = HashUtil::zlib_crc_hash_null(hash_val);
+ }
+ }
+ return hash_val % partition.num_buckets;
+ };
+ } else { // random distribution
+ compute_function = [](vectorized::Block* block, uint32_t row,
+ const VOlapTablePartition& partition) ->
uint32_t {
+ if (partition.load_tablet_idx == -1) {
+ // load_to_single_tablet = false, just do random
+ return butil::fast_rand() % partition.num_buckets;
+ }
+ // load_to_single_tablet = ture, do round-robin
+ return partition.load_tablet_idx % partition.num_buckets;
+ };
+ }
- uint32_t find_tablet(BlockRow* block_row, const VOlapTablePartition&
partition) const;
+ if (partition_tablets_buffer == nullptr) {
+ for (auto index : indexes) {
+ tablet_indexes[index] = compute_function(block, index,
*partitions[index]);
+ }
+ } else { // use buffer
+ for (auto index : indexes) {
+ auto& partition_id = partitions[index]->id;
+ if (auto it = partition_tablets_buffer->find(partition_id);
+ it != partition_tablets_buffer->end()) {
+ tablet_indexes[index] = it->second; // tablet
+ }
+ // compute and save in buffer
+ (*partition_tablets_buffer)[partition_id] =
tablet_indexes[index] =
+ compute_function(block, index, *partitions[index]);
+ }
+ }
+ }
const std::vector<VOlapTablePartition*>& get_partitions() const { return
_partitions; }
@@ -193,8 +265,6 @@ private:
Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key,
uint16_t pos);
- std::function<uint32_t(BlockRow*, const VOlapTablePartition&)>
_compute_tablet_index;
-
// check if this partition contain this key
bool _part_contains(VOlapTablePartition* part, BlockRowWithIndicator key)
const;
diff --git a/be/src/vec/sink/vtablet_finder.cpp
b/be/src/vec/sink/vtablet_finder.cpp
index 421b3ebb11c..f01add4b22e 100644
--- a/be/src/vec/sink/vtablet_finder.cpp
+++ b/be/src/vec/sink/vtablet_finder.cpp
@@ -41,21 +41,24 @@
#include "vec/functions/simple_function_factory.h"
namespace doris::vectorized {
+Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int
rows,
+ std::vector<VOlapTablePartition*>&
partitions,
+ std::vector<uint32_t>& tablet_index,
bool& stop_processing,
+ std::vector<bool>& skip,
std::vector<int64_t>* miss_rows) {
+ for (int index = 0; index < rows; index++) {
+ _vpartition->find_partition(block, index, partitions[index]);
+ }
+
+ std::vector<uint32_t> qualified_rows;
+ qualified_rows.reserve(rows);
-Status OlapTabletFinder::find_tablet(RuntimeState* state, Block* block, int
row_index,
- const VOlapTablePartition** partition,
uint32_t& tablet_index,
- bool& stop_processing, bool& is_continue,
- bool* missing_partition) {
- Status status = Status::OK();
- *partition = nullptr;
- tablet_index = 0;
- BlockRow block_row;
- block_row = {block, row_index};
- if (!_vpartition->find_partition(&block_row, partition)) {
- if (missing_partition != nullptr) { // auto partition table
- *missing_partition = true;
- return status;
- } else {
+ for (int row_index = 0; row_index < rows; row_index++) {
+ if (partitions[row_index] == nullptr) [[unlikely]] {
+ if (miss_rows != nullptr) { // auto partition table
+ miss_rows->push_back(row_index); // already reserve memory
outside
+ skip[row_index] = true;
+ continue;
+ }
RETURN_IF_ERROR(state->append_error_msg_to_file(
[]() -> std::string { return ""; },
[&]() -> std::string {
@@ -70,33 +73,34 @@ Status OlapTabletFinder::find_tablet(RuntimeState* state,
Block* block, int row_
if (stop_processing) {
return Status::EndOfFile("Encountered unqualified data, stop
processing");
}
- is_continue = true;
- return status;
+ skip[row_index] = true;
+ continue;
}
- }
- if (!(*partition)->is_mutable) {
- _num_immutable_partition_filtered_rows++;
- is_continue = true;
- return status;
- }
- if ((*partition)->num_buckets <= 0) {
- std::stringstream ss;
- ss << "num_buckets must be greater than 0, num_buckets=" <<
(*partition)->num_buckets;
- return Status::InternalError(ss.str());
- }
- _partition_ids.emplace((*partition)->id);
- if (_find_tablet_mode != FindTabletMode::FIND_TABLET_EVERY_ROW) {
- if (_partition_to_tablet_map.find((*partition)->id) ==
_partition_to_tablet_map.end()) {
- tablet_index = _vpartition->find_tablet(&block_row, **partition);
- _partition_to_tablet_map.emplace((*partition)->id, tablet_index);
- } else {
- tablet_index = _partition_to_tablet_map[(*partition)->id];
+ if (!partitions[row_index]->is_mutable) [[unlikely]] {
+ _num_immutable_partition_filtered_rows++;
+ skip[row_index] = true;
+ continue;
}
+ if (partitions[row_index]->num_buckets <= 0) [[unlikely]] {
+ std::stringstream ss;
+ ss << "num_buckets must be greater than 0, num_buckets="
+ << partitions[row_index]->num_buckets;
+ return Status::InternalError(ss.str());
+ }
+
+ _partition_ids.emplace(partitions[row_index]->id);
+
+ qualified_rows.push_back(row_index);
+ }
+
+ if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_ROW) {
+ _vpartition->find_tablets(block, qualified_rows, partitions,
tablet_index);
} else {
- tablet_index = _vpartition->find_tablet(&block_row, **partition);
+ _vpartition->find_tablets(block, qualified_rows, partitions,
tablet_index,
+ &_partition_to_tablet_map);
}
- return status;
+ return Status::OK();
}
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/sink/vtablet_finder.h b/be/src/vec/sink/vtablet_finder.h
index 28d71c6a1e7..3426f7cb67d 100644
--- a/be/src/vec/sink/vtablet_finder.h
+++ b/be/src/vec/sink/vtablet_finder.h
@@ -18,10 +18,12 @@
#pragma once
#include <map>
+#include <unordered_set>
#include "common/status.h"
#include "exec/tablet_info.h"
#include "util/bitmap.h"
+#include "vec/common/hash_table/phmap_fwd_decl.h"
#include "vec/core/block.h"
namespace doris::vectorized {
@@ -39,9 +41,10 @@ public:
OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode)
: _vpartition(vpartition), _find_tablet_mode(mode),
_filter_bitmap(1024) {};
- Status find_tablet(RuntimeState* state, vectorized::Block* block, int
row_index,
- const VOlapTablePartition** partition, uint32_t&
tablet_index,
- bool& filtered, bool& is_continue, bool*
missing_partition = nullptr);
+ Status find_tablets(RuntimeState* state, vectorized::Block* block, int
rows,
+ std::vector<VOlapTablePartition*>& partitions,
+ std::vector<uint32_t>& tablet_index, bool& filtered,
+ std::vector<bool>& is_continue, std::vector<int64_t>*
miss_rows = nullptr);
bool is_find_tablet_every_sink() {
return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK;
@@ -55,7 +58,7 @@ public:
bool is_single_tablet() { return _partition_to_tablet_map.size() == 1; }
- const std::set<int64_t>& partition_ids() { return _partition_ids; }
+ const vectorized::flat_hash_set<int64_t>& partition_ids() { return
_partition_ids; }
int64_t num_filtered_rows() const { return _num_filtered_rows; }
@@ -69,7 +72,7 @@ private:
VOlapTablePartitionParam* _vpartition;
FindTabletMode _find_tablet_mode;
std::map<int64_t, int64_t> _partition_to_tablet_map;
- std::set<int64_t> _partition_ids;
+ vectorized::flat_hash_set<int64_t> _partition_ids;
int64_t _num_filtered_rows = 0;
int64_t _num_immutable_partition_filtered_rows = 0;
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp
b/be/src/vec/sink/vtablet_sink_v2.cpp
index 6ca96ef118b..f38785f53f4 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -221,20 +221,32 @@ void VOlapTableSinkV2::_build_tablet_node_mapping() {
}
}
-void VOlapTableSinkV2::_generate_rows_for_tablet(RowsForTablet&
rows_for_tablet,
- const VOlapTablePartition*
partition,
- uint32_t tablet_index, int
row_idx) {
- // Generate channel payload for sinking data to each tablet
- for (const auto& index : partition->indexes) {
- auto tablet_id = index.tablets[tablet_index];
- if (rows_for_tablet.count(tablet_id) == 0) {
- Rows rows;
- rows.partition_id = partition->id;
- rows.index_id = index.index_id;
- rows_for_tablet.insert({tablet_id, rows});
+void VOlapTableSinkV2::_generate_rows_for_tablet(
+ RowsForTablet& rows_for_tablet, const
std::vector<VOlapTablePartition*>& partitions,
+ const std::vector<uint32_t>& tablet_indexes, const std::vector<bool>&
skip,
+ size_t row_cnt) {
+ for (int row_idx = 0; row_idx < row_cnt; row_idx++) {
+ if (skip[row_idx]) {
+ continue;
+ }
+
+ auto& partition = partitions[row_idx];
+ auto& tablet_index = tablet_indexes[row_idx];
+
+ for (const auto& index : partition->indexes) {
+ auto tablet_id = index.tablets[tablet_index];
+ auto it = rows_for_tablet.find(tablet_id);
+ if (it == rows_for_tablet.end()) {
+ Rows rows;
+ rows.partition_id = partition->id;
+ rows.index_id = index.index_id;
+ rows.row_idxes.reserve(row_cnt);
+ auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows});
+ it = tmp_it;
+ }
+ it->second.row_idxes.push_back(row_idx);
+ _number_output_rows++;
}
- rows_for_tablet[tablet_id].row_idxes.push_back(row_idx);
- _number_output_rows++;
}
}
@@ -288,20 +300,22 @@ Status VOlapTableSinkV2::send(RuntimeState* state,
vectorized::Block* input_bloc
_row_distribution_watch.start();
const auto num_rows = input_rows;
const auto* __restrict filter_map = _block_convertor->filter_map();
- for (int i = 0; i < num_rows; ++i) {
- if (UNLIKELY(has_filtered_rows) && filter_map[i]) {
- continue;
- }
- const VOlapTablePartition* partition = nullptr;
- bool is_continue = false;
- uint32_t tablet_index = 0;
- RETURN_IF_ERROR(_tablet_finder->find_tablet(state, block.get(), i,
&partition, tablet_index,
- stop_processing,
is_continue));
- if (is_continue) {
- continue;
+
+ //reuse vars
+ _partitions.assign(num_rows, nullptr);
+ _skip.assign(num_rows, false);
+ _tablet_indexes.assign(num_rows, 0);
+
+ RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(),
num_rows, _partitions,
+ _tablet_indexes,
stop_processing, _skip));
+
+ if (has_filtered_rows) {
+ for (int i = 0; i < num_rows; i++) {
+ _skip[i] = _skip[i] || filter_map[i];
}
- _generate_rows_for_tablet(rows_for_tablet, partition, tablet_index, i);
}
+ _generate_rows_for_tablet(rows_for_tablet, _partitions, _tablet_indexes,
_skip, num_rows);
+
_row_distribution_watch.stop();
// For each tablet, send its input_rows from block to delta writer
diff --git a/be/src/vec/sink/vtablet_sink_v2.h
b/be/src/vec/sink/vtablet_sink_v2.h
index f70f74b9da6..42103fa03b1 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -60,6 +60,7 @@
#include "util/stopwatch.hpp"
#include "vec/columns/column.h"
#include "vec/common/allocator.h"
+#include "vec/common/hash_table/phmap_fwd_decl.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
@@ -137,8 +138,9 @@ private:
void _build_tablet_node_mapping();
void _generate_rows_for_tablet(RowsForTablet& rows_for_tablet,
- const VOlapTablePartition* partition,
uint32_t tablet_index,
- int row_idx);
+ const std::vector<VOlapTablePartition*>&
partitions,
+ const std::vector<uint32_t>& tablet_indexes,
+ const std::vector<bool>& skip, size_t
row_cnt);
Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t
tablet_id,
const Rows& rows, const Streams& streams);
@@ -184,6 +186,11 @@ private:
int64_t _number_input_rows = 0;
int64_t _number_output_rows = 0;
+ // reuse for find_tablet
+ std::vector<VOlapTablePartition*> _partitions;
+ std::vector<bool> _skip;
+ std::vector<uint32_t> _tablet_indexes;
+
MonotonicStopWatch _row_distribution_watch;
RuntimeProfile::Counter* _input_rows_counter = nullptr;
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 22005a9ac1c..d0720255695 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -44,6 +44,7 @@
#include <string>
#include <unordered_map>
#include <utility>
+#include <vector>
#include "olap/wal_manager.h"
#include "util/runtime_profile.h"
@@ -421,7 +422,6 @@ Status VNodeChannel::open_wait() {
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
open_closure->cntl.remote_side());
}
-
_cancelled = true;
auto error_code = open_closure->cntl.ErrorCode();
auto error_text = open_closure->cntl.ErrorText();
@@ -1337,50 +1337,80 @@ Status VTabletWriter::_incremental_open_node_channel(
return Status::OK();
}
+// Generate channel payload for sinking data to differenct node channel
+// Payload = std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
std::vector<int64_t>>;
+// first = row_id, second = vector<tablet_id>
void VTabletWriter::_generate_row_distribution_payload(
- ChannelDistributionPayload& channel_to_payload, const
VOlapTablePartition* partition,
- uint32_t tablet_index, int row_idx, size_t row_cnt) {
- // Generate channel payload for sinking data to differenct node channel
- for (int j = 0; j < partition->indexes.size(); ++j) {
- auto tid = partition->indexes[j].tablets[tablet_index];
- auto it = _channels[j]->_channels_by_tablet.find(tid);
- DCHECK(it != _channels[j]->_channels_by_tablet.end())
- << "unknown tablet, tablet_id=" << tablet_index;
- for (const auto& channel : it->second) {
- if (channel_to_payload[j].count(channel.get()) < 1) {
- channel_to_payload[j].insert(
- {channel.get(), Payload
{std::unique_ptr<vectorized::IColumn::Selector>(
- new
vectorized::IColumn::Selector()),
- std::vector<int64_t>()}});
+ ChannelDistributionPayload& channel_to_payload,
+ const std::vector<VOlapTablePartition*>& partitions,
+ const std::vector<uint32_t>& tablet_indexes, const std::vector<bool>&
skip,
+ size_t row_cnt) {
+ for (int row_idx = 0; row_idx < row_cnt; row_idx++) {
+ if (skip[row_idx]) {
+ continue;
+ }
+ const auto& partition = partitions[row_idx];
+ const auto& tablet_index = tablet_indexes[row_idx];
+
+ for (int index_num = 0; index_num < partition->indexes.size();
+ ++index_num) { // partition->indexes = [index, tablets...]
+
+ auto tablet_id =
partition->indexes[index_num].tablets[tablet_index];
+ auto it = _channels[index_num]->_channels_by_tablet.find(
+ tablet_id); // (tablet_id, VNodeChannel) where this tablet
locate
+
+ DCHECK(it != _channels[index_num]->_channels_by_tablet.end())
+ << "unknown tablet, tablet_id=" << tablet_index;
+
+ std::vector<std::shared_ptr<VNodeChannel>>& tablet_locations =
it->second;
+ std::unordered_map<VNodeChannel*, Payload>& payloads_this_index =
+ channel_to_payload[index_num]; // payloads of this index
in every node
+
+ for (const auto& locate_node : tablet_locations) {
+ auto payload_it =
+ payloads_this_index.find(locate_node.get()); //
<VNodeChannel*, Payload>
+ if (payload_it == payloads_this_index.end()) {
+ auto [tmp_it, _] = payloads_this_index.emplace(
+ locate_node.get(),
+ Payload
{std::make_unique<vectorized::IColumn::Selector>(),
+ std::vector<int64_t>()});
+ payload_it = tmp_it;
+ payload_it->second.first->reserve(row_cnt);
+ payload_it->second.second.reserve(row_cnt);
+ }
+ payload_it->second.first->push_back(row_idx);
+ payload_it->second.second.push_back(tablet_id);
}
- channel_to_payload[j][channel.get()].first->push_back(row_idx);
- channel_to_payload[j][channel.get()].second.push_back(tid);
+ _number_output_rows++;
}
- _number_output_rows += row_cnt;
}
}
Status VTabletWriter::_single_partition_generate(RuntimeState* state,
vectorized::Block* block,
ChannelDistributionPayload&
channel_to_payload,
size_t num_rows, bool
has_filtered_rows) {
+ // only need to calculate one value for single partition.
+ std::vector<VOlapTablePartition*> partitions(1, nullptr);
+ std::vector<bool> skip(1, false);
+ std::vector<uint32_t> tablet_indexes(1, 0);
+ bool stop_processing = false;
+
+ RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, 1, partitions,
tablet_indexes,
+ stop_processing, skip));
+
const VOlapTablePartition* partition = nullptr;
uint32_t tablet_index = 0;
- bool stop_processing = false;
- for (int32_t i = 0; i < num_rows; ++i) {
- if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) {
- continue;
- }
- bool is_continue = false;
- RETURN_IF_ERROR(_tablet_finder->find_tablet(state, block, i,
&partition, tablet_index,
- stop_processing,
is_continue));
- if (is_continue) {
- continue;
+ for (size_t i = 0; i < num_rows; i++) {
+ if (!skip[i]) {
+ partition = partitions[i];
+ tablet_index = tablet_indexes[i];
+ break;
}
- break;
}
if (partition == nullptr) {
return Status::OK();
}
+
for (int j = 0; j < partition->indexes.size(); ++j) {
auto tid = partition->indexes[j].tablets[tablet_index];
auto it = _channels[j]->_channels_by_tablet.find(tid);
@@ -1388,10 +1418,9 @@ Status
VTabletWriter::_single_partition_generate(RuntimeState* state, vectorized
<< "unknown tablet, tablet_id=" << tablet_index;
int64_t row_cnt = 0;
for (const auto& channel : it->second) {
- if (channel_to_payload[j].count(channel.get()) < 1) {
+ if (!channel_to_payload[j].contains(channel.get())) {
channel_to_payload[j].insert(
- {channel.get(), Payload
{std::unique_ptr<vectorized::IColumn::Selector>(
- new
vectorized::IColumn::Selector()),
+ {channel.get(), Payload
{std::make_unique<vectorized::IColumn::Selector>(),
std::vector<int64_t>()}});
}
auto& selector = channel_to_payload[j][channel.get()].first;
@@ -1535,10 +1564,15 @@ Status VTabletWriter::close(Status exec_status) {
auto status = Status::OK();
// BE id -> add_batch method counter
std::unordered_map<int64_t, AddBatchCounter>
node_add_batch_counter_map;
- int64_t serialize_batch_ns = 0, queue_push_lock_ns = 0,
actual_consume_ns = 0,
- total_add_batch_exec_time_ns = 0, max_add_batch_exec_time_ns =
0,
- total_wait_exec_time_ns = 0, max_wait_exec_time_ns = 0,
total_add_batch_num = 0,
- num_node_channels = 0;
+ int64_t serialize_batch_ns = 0;
+ int64_t queue_push_lock_ns = 0;
+ int64_t actual_consume_ns = 0;
+ int64_t total_add_batch_exec_time_ns = 0;
+ int64_t max_add_batch_exec_time_ns = 0;
+ int64_t total_wait_exec_time_ns = 0;
+ int64_t max_wait_exec_time_ns = 0;
+ int64_t total_add_batch_num = 0;
+ int64_t num_node_channels = 0;
VNodeChannelStat channel_stat;
for (const auto& index_channel : _channels) {
@@ -1665,7 +1699,7 @@ Status VTabletWriter::close(Status exec_status) {
[](const std::shared_ptr<VNodeChannel>& ch) {
ch->clear_all_blocks(); });
}
- if (_wal_writer.get() != nullptr) {
+ if (_wal_writer != nullptr) {
static_cast<void>(_wal_writer->finalize());
}
return _close_status;
@@ -1703,7 +1737,7 @@ Status
VTabletWriter::append_block(doris::vectorized::Block& input_block) {
SCOPED_RAW_TIMER(&_send_data_ns);
// This is just for passing compilation.
bool stop_processing = false;
- std::vector<std::unordered_map<VNodeChannel*, Payload>> channel_to_payload;
+ ChannelDistributionPayload channel_to_payload;
channel_to_payload.resize(_channels.size());
_tablet_finder->clear_for_new_batch();
_row_distribution_watch.start();
@@ -1737,34 +1771,30 @@ Status
VTabletWriter::append_block(doris::vectorized::Block& input_block) {
missing_map.reserve(partition_col.column->size());
// try to find tablet and save missing value
- for (int i = 0; i < num_rows; ++i) {
- if (UNLIKELY(has_filtered_rows) &&
_block_convertor->filter_map()[i]) {
- continue;
- }
- const VOlapTablePartition* partition = nullptr;
- bool is_continue = false;
- uint32_t tablet_index = 0;
- bool missing_this = false;
- RETURN_IF_ERROR(_tablet_finder->find_tablet(_state,
block.get(), i, &partition,
- tablet_index,
stop_processing,
- is_continue,
&missing_this));
- if (missing_this) {
- missing_map.push_back(i);
- } else {
- _generate_row_distribution_payload(channel_to_payload,
partition, tablet_index,
- i, 1);
+ std::vector<VOlapTablePartition*> partitions(num_rows, nullptr);
+ std::vector<bool> skip(num_rows, false);
+ std::vector<uint32_t> tablet_indexes(num_rows, 0);
+
+ //TODO: we could use the buffer to save tablets we found so that
no need to find them again when we created partitions and try to append block
next time.
+ RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(),
num_rows, partitions,
+ tablet_indexes,
stop_processing, skip,
+ &missing_map));
+
+ if (missing_map.empty()) {
+ // we don't calculate it distribution when have missing values
+ if (has_filtered_rows) {
+ for (int i = 0; i < num_rows; i++) {
+ skip[i] = skip[i] || _block_convertor->filter_map()[i];
+ }
}
- }
- missing_map.shrink_to_fit();
-
- // for missing partition keys, calc the missing partition and save
in _partitions_need_create
- auto type = partition_col.type;
- if (missing_map.size() > 0) {
+ _generate_row_distribution_payload(channel_to_payload,
partitions, tablet_indexes,
+ skip, num_rows);
+ } else { // for missing partition keys, calc the missing partition
and save in _partitions_need_create
auto return_type = part_func->data_type();
// expose the data column
vectorized::ColumnPtr range_left_col =
block->get_by_position(result_idx).column;
- if (auto* nullable =
+ if (const auto* nullable =
check_and_get_column<vectorized::ColumnNullable>(*range_left_col)) {
range_left_col = nullable->get_nested_column_ptr();
return_type =
@@ -1786,23 +1816,20 @@ Status
VTabletWriter::append_block(doris::vectorized::Block& input_block) {
return Status::NeedSendAgain("");
} // creating done
} else { // not auto partition
- for (int i = 0; i < num_rows; ++i) {
- if (UNLIKELY(has_filtered_rows) &&
_block_convertor->filter_map()[i]) {
- continue;
- }
- const VOlapTablePartition* partition = nullptr;
- bool is_continue = false;
- uint32_t tablet_index = 0;
- RETURN_IF_ERROR(_tablet_finder->find_tablet(_state,
block.get(), i, &partition,
- tablet_index,
stop_processing,
- is_continue));
- if (is_continue) {
- continue;
+ std::vector<VOlapTablePartition*> partitions(num_rows, nullptr);
+ std::vector<bool> skip(num_rows, false);
+ std::vector<uint32_t> tablet_indexes(num_rows, 0);
+
+ RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(),
num_rows, partitions,
+ tablet_indexes,
stop_processing, skip));
+
+ if (has_filtered_rows) {
+ for (int i = 0; i < num_rows; i++) {
+ skip[i] = skip[i] || _block_convertor->filter_map()[i];
}
- // each row
- _generate_row_distribution_payload(channel_to_payload,
partition, tablet_index, i,
- 1);
}
+ _generate_row_distribution_payload(channel_to_payload, partitions,
tablet_indexes, skip,
+ num_rows);
}
}
_row_distribution_watch.stop();
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index 4e95b444d79..c8d5d1c2ce9 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -553,10 +553,13 @@ private:
using ChannelDistributionPayload =
std::vector<std::unordered_map<VNodeChannel*, Payload>>;
Status _init(RuntimeState* state, RuntimeProfile* profile);
- // payload for each row
- void _generate_row_distribution_payload(ChannelDistributionPayload&
payload,
- const VOlapTablePartition*
partition,
- uint32_t tablet_index, int
row_idx, size_t row_cnt);
+
+ // payload for every row
+ void _generate_row_distribution_payload(ChannelDistributionPayload&
channel_to_payload,
+ const
std::vector<VOlapTablePartition*>& partitions,
+ const std::vector<uint32_t>&
tablet_indexes,
+ const std::vector<bool>& skip,
size_t row_cnt);
+
Status _single_partition_generate(RuntimeState* state, vectorized::Block*
block,
ChannelDistributionPayload&
channel_to_payload,
size_t num_rows, bool has_filtered_rows);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]