This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 90fb3b7783 [Improvement](load) accelerate tablet sink (#12174)
90fb3b7783 is described below
commit 90fb3b77830d75093f11208a0419f79dd42d042e
Author: Gabriel <[email protected]>
AuthorDate: Thu Sep 1 10:08:09 2022 +0800
[Improvement](load) accelerate tablet sink (#12174)
---
be/src/exec/tablet_sink.h | 7 ++-
be/src/vec/columns/column.h | 5 +++
be/src/vec/columns/column_array.h | 5 +++
be/src/vec/columns/column_complex.h | 5 +++
be/src/vec/columns/column_const.h | 5 +++
be/src/vec/columns/column_decimal.h | 5 +++
be/src/vec/columns/column_dictionary.h | 5 +++
be/src/vec/columns/column_dummy.h | 14 ++++++
be/src/vec/columns/column_fixed_length_object.h | 5 +++
be/src/vec/columns/column_impl.h | 15 +++++++
be/src/vec/columns/column_nullable.h | 5 +++
be/src/vec/columns/column_string.h | 5 +++
be/src/vec/columns/column_vector.h | 5 +++
be/src/vec/columns/predicate_column.h | 5 +++
be/src/vec/core/block.cpp | 8 ++++
be/src/vec/core/block.h | 2 +
be/src/vec/sink/vtablet_sink.cpp | 60 ++++++++++++++++++-------
be/src/vec/sink/vtablet_sink.h | 4 +-
18 files changed, 146 insertions(+), 19 deletions(-)
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 6b9b2f1c32..5f6ad148eb 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -183,8 +183,11 @@ public:
virtual Status open_wait();
Status add_row(Tuple* tuple, int64_t tablet_id);
- virtual Status add_row(const BlockRow& block_row, int64_t tablet_id) {
- LOG(FATAL) << "add block row to NodeChannel not supported";
+
+ virtual Status add_block(vectorized::Block* block,
+ const
std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
+ std::vector<int64_t>>& payload) {
+ LOG(FATAL) << "add block to NodeChannel not supported";
return Status::OK();
}
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 002fc8eea3..4307153739 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -361,6 +361,8 @@ public:
virtual std::vector<MutablePtr> scatter(ColumnIndex num_columns,
const Selector& selector) const =
0;
+ virtual void append_data_by_selector(MutablePtr& res, const Selector&
selector) const = 0;
+
/// Insert data from several other columns according to source mask (used
in vertical merge).
/// For now it is a helper to de-virtualize calls to insert*() functions
inside gather loop
/// (descendants should call gatherer_stream.gather(*this) to implement
this function.)
@@ -530,6 +532,9 @@ protected:
/// In derived classes (that use final keyword), implement scatter method
as call to scatter_impl.
template <typename Derived>
std::vector<MutablePtr> scatter_impl(ColumnIndex num_columns, const
Selector& selector) const;
+
+ template <typename Derived>
+ void append_data_by_selector_impl(MutablePtr& res, const Selector&
selector) const;
};
using ColumnPtr = IColumn::Ptr;
diff --git a/be/src/vec/columns/column_array.h
b/be/src/vec/columns/column_array.h
index 182f7b185d..50f864fbb9 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -136,6 +136,11 @@ public:
return scatter_impl<ColumnArray>(num_columns, selector);
}
+ void append_data_by_selector(MutableColumnPtr& res,
+ const IColumn::Selector& selector) const
override {
+ return append_data_by_selector_impl<ColumnArray>(res, selector);
+ }
+
void for_each_subcolumn(ColumnCallback callback) override {
callback(offsets);
callback(data);
diff --git a/be/src/vec/columns/column_complex.h
b/be/src/vec/columns/column_complex.h
index fe732ec8a9..e61bfa8e8b 100644
--- a/be/src/vec/columns/column_complex.h
+++ b/be/src/vec/columns/column_complex.h
@@ -230,6 +230,11 @@ public:
LOG(FATAL) << "scatter not implemented";
}
+ void append_data_by_selector(MutableColumnPtr& res,
+ const IColumn::Selector& selector) const
override {
+ this->template append_data_by_selector_impl<ColumnComplexType<T>>(res,
selector);
+ }
+
void replace_column_data(const IColumn& rhs, size_t row, size_t self_row =
0) override {
DCHECK(size() > self_row);
data[self_row] = static_cast<const Self&>(rhs).data[row];
diff --git a/be/src/vec/columns/column_const.h
b/be/src/vec/columns/column_const.h
index 8a3646025f..615c936a5e 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -165,6 +165,11 @@ public:
MutableColumns scatter(ColumnIndex num_columns, const Selector& selector)
const override;
+ void append_data_by_selector(MutableColumnPtr& res,
+ const IColumn::Selector& selector) const
override {
+ LOG(FATAL) << "append_data_by_selector is not supported in
ColumnConst!";
+ }
+
void get_extremes(Field& min, Field& max) const override {
data->get_extremes(min, max); }
void for_each_subcolumn(ColumnCallback callback) override {
callback(data); }
diff --git a/be/src/vec/columns/column_decimal.h
b/be/src/vec/columns/column_decimal.h
index e318271b13..70ff712a08 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -194,6 +194,11 @@ public:
return this->template scatter_impl<Self>(num_columns, selector);
}
+ void append_data_by_selector(MutableColumnPtr& res,
+ const IColumn::Selector& selector) const
override {
+ this->template append_data_by_selector_impl<Self>(res, selector);
+ }
+
// void gather(ColumnGathererStream & gatherer_stream) override;
bool structure_equals(const IColumn& rhs) const override {
diff --git a/be/src/vec/columns/column_dictionary.h
b/be/src/vec/columns/column_dictionary.h
index 8a6cc0c905..d56265b757 100644
--- a/be/src/vec/columns/column_dictionary.h
+++ b/be/src/vec/columns/column_dictionary.h
@@ -185,6 +185,11 @@ public:
LOG(FATAL) << "scatter not supported in ColumnDictionary";
}
+ void append_data_by_selector(MutableColumnPtr& res,
+ const IColumn::Selector& selector) const
override {
+ LOG(FATAL) << "append_data_by_selector is not supported in
ColumnDictionary!";
+ }
+
Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn*
col_ptr) override {
auto* res_col = reinterpret_cast<vectorized::ColumnString*>(col_ptr);
for (size_t i = 0; i < sel_size; i++) {
diff --git a/be/src/vec/columns/column_dummy.h
b/be/src/vec/columns/column_dummy.h
index 4937e1391b..e7fb657161 100644
--- a/be/src/vec/columns/column_dummy.h
+++ b/be/src/vec/columns/column_dummy.h
@@ -125,6 +125,20 @@ public:
return res;
}
+ void append_data_by_selector(MutableColumnPtr& res,
+ const IColumn::Selector& selector) const
override {
+ size_t num_rows = size();
+
+ if (num_rows < selector.size()) {
+ LOG(FATAL) << fmt::format("Size of selector: {}, is larger than
size of column:{}",
+ selector.size(), num_rows);
+ }
+
+ res->reserve(num_rows);
+
+ for (size_t i = 0; i < selector.size(); ++i) res->insert_from(*this,
selector[i]);
+ }
+
void get_extremes(Field&, Field&) const override {}
void addSize(size_t delta) { s += delta; }
diff --git a/be/src/vec/columns/column_fixed_length_object.h
b/be/src/vec/columns/column_fixed_length_object.h
index 5ecb13ec61..97098d2e5d 100644
--- a/be/src/vec/columns/column_fixed_length_object.h
+++ b/be/src/vec/columns/column_fixed_length_object.h
@@ -165,6 +165,11 @@ public:
LOG(FATAL) << "scatter not supported";
}
+ void append_data_by_selector(MutableColumnPtr& res,
+ const IColumn::Selector& selector) const
override {
+ LOG(FATAL) << "append_data_by_selector is not supported!";
+ }
+
void get_extremes(Field& min, Field& max) const override {
LOG(FATAL) << "get_extremes not supported";
}
diff --git a/be/src/vec/columns/column_impl.h b/be/src/vec/columns/column_impl.h
index 4e12f98d51..215eee711e 100644
--- a/be/src/vec/columns/column_impl.h
+++ b/be/src/vec/columns/column_impl.h
@@ -59,4 +59,19 @@ std::vector<IColumn::MutablePtr>
IColumn::scatter_impl(ColumnIndex num_columns,
return columns;
}
+template <typename Derived>
+void IColumn::append_data_by_selector_impl(MutablePtr& res, const Selector&
selector) const {
+ size_t num_rows = size();
+
+ if (num_rows < selector.size()) {
+ LOG(FATAL) << fmt::format("Size of selector: {}, is larger than size
of column:{}",
+ selector.size(), num_rows);
+ }
+
+ res->reserve(num_rows);
+
+ for (size_t i = 0; i < selector.size(); ++i)
+ static_cast<Derived&>(*res).insert_from(*this, selector[i]);
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/columns/column_nullable.h
b/be/src/vec/columns/column_nullable.h
index c9268118df..05807e10ec 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -165,6 +165,11 @@ public:
return scatter_impl<ColumnNullable>(num_columns, selector);
}
+ void append_data_by_selector(MutableColumnPtr& res,
+ const IColumn::Selector& selector) const
override {
+ append_data_by_selector_impl<ColumnNullable>(res, selector);
+ }
+
// void gather(ColumnGathererStream & gatherer_stream) override;
void for_each_subcolumn(ColumnCallback callback) override {
diff --git a/be/src/vec/columns/column_string.h
b/be/src/vec/columns/column_string.h
index bbe6993ae2..d8a704e362 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -302,6 +302,11 @@ public:
return scatter_impl<ColumnString>(num_columns, selector);
}
+ void append_data_by_selector(MutableColumnPtr& res,
+ const IColumn::Selector& selector) const
override {
+ append_data_by_selector_impl<ColumnString>(res, selector);
+ }
+
// void gather(ColumnGathererStream & gatherer_stream) override;
void reserve(size_t n) override;
diff --git a/be/src/vec/columns/column_vector.h
b/be/src/vec/columns/column_vector.h
index c673f0b7bd..f97dae0cc4 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -342,6 +342,11 @@ public:
return this->template scatter_impl<Self>(num_columns, selector);
}
+ void append_data_by_selector(MutableColumnPtr& res,
+ const IColumn::Selector& selector) const
override {
+ this->template append_data_by_selector_impl<Self>(res, selector);
+ }
+
// void gather(ColumnGathererStream & gatherer_stream) override;
bool can_be_inside_nullable() const override { return true; }
diff --git a/be/src/vec/columns/predicate_column.h
b/be/src/vec/columns/predicate_column.h
index 1131f93d59..6662545b2f 100644
--- a/be/src/vec/columns/predicate_column.h
+++ b/be/src/vec/columns/predicate_column.h
@@ -397,6 +397,11 @@ public:
LOG(FATAL) << "scatter not supported in PredicateColumnType";
}
+ void append_data_by_selector(MutableColumnPtr& res,
+ const IColumn::Selector& selector) const
override {
+ LOG(FATAL) << "append_data_by_selector is not supported in
PredicateColumnType!";
+ }
+
Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn*
col_ptr) override {
if constexpr (std::is_same_v<T, StringValue>) {
insert_string_to_res_column(sel, sel_size,
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 18b63a0975..b9562addd0 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -640,6 +640,14 @@ Block Block::copy_block(const std::vector<int>&
column_offset) const {
return columns_with_type_and_name;
}
+void Block::append_block_by_selector(MutableColumns& columns,
+ const IColumn::Selector& selector) const {
+ DCHECK(data.size() == columns.size());
+ for (size_t i = 0; i < data.size(); i++) {
+ data[i].column->append_data_by_selector(columns[i], selector);
+ }
+}
+
Status Block::filter_block(Block* block, int filter_column_id, int
column_to_keep) {
ColumnPtr filter_column = block->get_by_position(filter_column_id).column;
if (auto* nullable_column =
check_and_get_column<ColumnNullable>(*filter_column)) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 5fe90fdf6d..8895980e4d 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -257,6 +257,8 @@ public:
// copy a new block by the offset column
Block copy_block(const std::vector<int>& column_offset) const;
+ void append_block_by_selector(MutableColumns& columns, const
IColumn::Selector& selector) const;
+
static void filter_block_internal(Block* block, const IColumn::Filter&
filter,
uint32_t column_to_keep);
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index a9d091bf75..1c1ef24579 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -23,6 +23,7 @@
#include "util/doris_metrics.h"
#include "util/proto_util.h"
#include "util/time.h"
+#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
@@ -160,9 +161,11 @@ Status VNodeChannel::open_wait() {
return status;
}
-Status VNodeChannel::add_row(const BlockRow& block_row, int64_t tablet_id) {
+Status VNodeChannel::add_block(vectorized::Block* block,
+ const
std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
+ std::vector<int64_t>>& payload)
{
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
- // If add_row() when _eos_is_produced==true, there must be sth wrong, we
can only mark this channel as failed.
+ // If add_block() when _eos_is_produced==true, there must be sth wrong, we
can only mark this channel as failed.
auto st = none_of({_cancelled, _eos_is_produced});
if (!st.ok()) {
if (_cancelled) {
@@ -177,22 +180,24 @@ Status VNodeChannel::add_row(const BlockRow& block_row,
int64_t tablet_id) {
// so in the ideal case, mem limit is a matter for _plan node.
// But there is still some unfinished things, we do mem limit here
temporarily.
// _cancelled may be set by rpc callback, and it's possible that
_cancelled might be set in any of the steps below.
- // It's fine to do a fake add_row() and return OK, because we will check
_cancelled in next add_row() or mark_close().
+ // It's fine to do a fake add_block() and return OK, because we will check
_cancelled in next add_block() or mark_close().
while (!_cancelled && _pending_batches_num > 0 &&
_pending_batches_bytes > _max_pending_batches_bytes) {
SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
- _cur_mutable_block->add_row(block_row.first, block_row.second);
- _cur_add_block_request.add_tablet_ids(tablet_id);
+ block->append_block_by_selector(_cur_mutable_block->mutable_columns(),
*(payload.first));
+ for (auto tablet_id : payload.second) {
+ _cur_add_block_request.add_tablet_ids(tablet_id);
+ }
- if (_cur_mutable_block->rows() == _batch_size ||
+ if (_cur_mutable_block->rows() >= _batch_size ||
_cur_mutable_block->bytes() > config::doris_scanner_row_bytes) {
{
SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
std::lock_guard<std::mutex> l(_pending_batches_lock);
- //To simplify the add_row logic, postpone adding block into req
until the time of sending req
+ // To simplify the add_row logic, postpone adding block into req
until the time of sending req
_pending_batches_bytes += _cur_mutable_block->allocated_bytes();
_pending_blocks.emplace(std::move(_cur_mutable_block),
_cur_add_block_request);
_pending_batches_num++;
@@ -480,12 +485,11 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block)
_partition_to_tablet_map.clear();
}
- //if pending bytes is more than 500M, wait
- //constexpr size_t MAX_PENDING_BYTES = 500 * 1024 * 1024;
- //while (get_pending_bytes() > MAX_PENDING_BYTES) {
- // std::this_thread::sleep_for(std::chrono::microseconds(500));
- //}
-
+ std::vector<std::unordered_map<
+ NodeChannel*,
+ std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
std::vector<int64_t>>>>
+ channel_to_payload;
+ channel_to_payload.resize(_channels.size());
for (int i = 0; i < num_rows; ++i) {
if (filtered_rows > 0 && _filter_bitmap.Get(i)) {
continue;
@@ -520,12 +524,36 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block)
tablet_index = _vpartition->find_tablet(&block_row, *partition);
}
for (int j = 0; j < partition->indexes.size(); ++j) {
- int64_t tablet_id = partition->indexes[j].tablets[tablet_index];
- _channels[j]->add_row(block_row, tablet_id);
+ auto tid = partition->indexes[j].tablets[tablet_index];
+ auto it = _channels[j]->_channels_by_tablet.find(tid);
+ DCHECK(it != _channels[j]->_channels_by_tablet.end())
+ << "unknown tablet, tablet_id=" << tablet_index;
+ for (const auto& channel : it->second) {
+ if (channel_to_payload[j].count(channel.get()) < 1) {
+ channel_to_payload[j].insert(
+ {channel.get(),
+
std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
+ std::vector<int64_t>> {
+
std::unique_ptr<vectorized::IColumn::Selector>(
+ new
vectorized::IColumn::Selector()),
+ std::vector<int64_t>()}});
+ }
+ channel_to_payload[j][channel.get()].first->push_back(i);
+ channel_to_payload[j][channel.get()].second.push_back(tid);
+ }
_number_output_rows++;
}
}
-
+ for (size_t i = 0; i < _channels.size(); i++) {
+ for (const auto& entry : channel_to_payload[i]) {
+ // if this node channel is already failed, this add_row will be
skipped
+ auto st = entry.first->add_block(&block, entry.second);
+ if (!st.ok()) {
+ _channels[i]->mark_as_failed(entry.first->node_id(),
entry.first->host(),
+ st.get_error_msg());
+ }
+ }
+ }
// check intolerable failure
for (const auto& index_channel : _channels) {
RETURN_IF_ERROR(index_channel->check_intolerable_failure());
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 36943473a8..c841cab5c0 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -38,7 +38,9 @@ public:
Status open_wait() override;
- Status add_row(const BlockRow& block_row, int64_t tablet_id) override;
+ Status add_block(vectorized::Block* block,
+ const
std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
+ std::vector<int64_t>>& payload) override;
int try_send_and_fetch_status(RuntimeState* state,
std::unique_ptr<ThreadPoolToken>&
thread_pool_token) override;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]