This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 7840ae28459 [Performance](sink) SIMD the tablet sink valied data
function (#25770)
7840ae28459 is described below
commit 7840ae284599fbd5a13244ec03454e2b103ef57a
Author: HappenLee <[email protected]>
AuthorDate: Mon Oct 23 23:25:12 2023 +0800
[Performance](sink) SIMD the tablet sink valied data function (#25770)
---
be/src/vec/sink/vtablet_sink.cpp | 156 ++++++++++++++++++++++-----------------
be/src/vec/sink/vtablet_sink.h | 12 +--
2 files changed, 96 insertions(+), 72 deletions(-)
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index a112a1f3f21..975e053b5e2 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -1252,7 +1252,7 @@ Status
VOlapTableSink::_single_partition_generate(RuntimeState* state, vectorize
uint32_t tablet_index = 0;
bool stop_processing = false;
for (int32_t i = 0; i < num_rows; ++i) {
- if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+ if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap[i]) {
continue;
}
bool is_continue = false;
@@ -1282,7 +1282,7 @@ Status
VOlapTableSink::_single_partition_generate(RuntimeState* state, vectorize
auto& selector = channel_to_payload[j][channel.get()].first;
auto& tablet_ids = channel_to_payload[j][channel.get()].second;
for (int32_t i = 0; i < num_rows; ++i) {
- if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+ if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap[i]) {
continue;
}
selector->push_back(i);
@@ -1328,10 +1328,10 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block,
int filtered_rows = 0;
{
SCOPED_RAW_TIMER(&_validate_data_ns);
- _filter_bitmap.Reset(block.rows());
+ _filter_bitmap.resize(block.rows());
bool stop_processing = false;
RETURN_IF_ERROR(
- _validate_data(state, &block, &_filter_bitmap, &filtered_rows,
&stop_processing));
+ _validate_data(state, &block, _filter_bitmap, &filtered_rows,
&stop_processing));
_number_filtered_rows += filtered_rows;
if (stop_processing) {
// should be returned after updating "_number_filtered_rows", to
make sure that load job can be cancelled
@@ -1357,7 +1357,7 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block,
filtered_rows));
} else {
for (int i = 0; i < num_rows; ++i) {
- if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+ if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap[i]) {
continue;
}
const VOlapTablePartition* partition = nullptr;
@@ -1389,7 +1389,7 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block,
vectorized::IColumn::Filter& filter_col =
static_cast<vectorized::ColumnUInt8*>(filter.get())->get_data();
for (size_t i = 0; i < filter_col.size(); ++i) {
- filter_data[i] = !_filter_bitmap.Get(i);
+ filter_data[i] = !_filter_bitmap[i];
}
RETURN_IF_CATCH_EXCEPTION(
vectorized::Block::filter_block_internal(&block,
filter_col, block.columns()));
@@ -1702,13 +1702,13 @@ DecimalType
VOlapTableSink::_get_decimalv3_min_or_max(const TypeDescriptor& type
Status VOlapTableSink::_validate_column(RuntimeState* state, const
TypeDescriptor& type,
bool is_nullable,
vectorized::ColumnPtr column,
- size_t slot_index, Bitmap*
filter_bitmap,
+ size_t slot_index, std::vector<char>&
filter_bitmap,
bool* stop_processing,
fmt::memory_buffer& error_prefix,
vectorized::IColumn::Permutation*
rows) {
DCHECK((rows == nullptr) || (rows->size() == column->size()));
fmt::memory_buffer error_msg;
auto set_invalid_and_append_error_msg = [&](int row) {
- filter_bitmap->Set(row, true);
+ filter_bitmap[row] = true;
auto ret = state->append_error_msg_to_file([]() -> std::string {
return ""; },
[&error_prefix,
&error_msg]() -> std::string {
return
fmt::to_string(error_prefix) +
@@ -1723,8 +1723,10 @@ Status VOlapTableSink::_validate_column(RuntimeState*
state, const TypeDescripto
auto& real_column_ptr = column_ptr == nullptr ? column :
(column_ptr->get_nested_column_ptr());
auto null_map = column_ptr == nullptr ? nullptr :
column_ptr->get_null_map_data().data();
auto need_to_validate = [&null_map, &filter_bitmap](size_t j, size_t row) {
- return !filter_bitmap->Get(row) && (null_map == nullptr || null_map[j]
== 0);
+ return !filter_bitmap[row] && (null_map == nullptr || null_map[j] ==
0);
};
+ const auto row_count = column->size();
+ int invalid_count = 0;
ssize_t last_invalid_row = -1;
switch (type.type) {
@@ -1739,33 +1741,42 @@ Status VOlapTableSink::_validate_column(RuntimeState*
state, const TypeDescripto
if (type.len > 0) {
limit = std::min(config::string_type_length_soft_limit_bytes,
type.len);
}
- for (size_t j = 0; j < column->size(); ++j) {
- auto row = rows ? (*rows)[j] : j;
- if (row == last_invalid_row) {
- continue;
- }
- if (need_to_validate(j, row)) {
- auto str_val = column_string->get_data_at(j);
- bool invalid = str_val.size > limit;
- if (invalid) {
- last_invalid_row = row;
- if (str_val.size > type.len) {
- fmt::format_to(error_msg, "{}",
- "the length of input is too long than
schema. ");
- fmt::format_to(error_msg, "first 32 bytes of input
str: [{}] ",
- str_val.to_prefix(32));
- fmt::format_to(error_msg, "schema length: {}; ",
type.len);
- fmt::format_to(error_msg, "actual length: {}; ",
str_val.size);
- } else if (str_val.size > limit) {
- fmt::format_to(error_msg, "{}",
- "the length of input string is too long
than vec schema. ");
- fmt::format_to(error_msg, "first 32 bytes of input
str: [{}] ",
- str_val.to_prefix(32));
- fmt::format_to(error_msg, "schema length: {}; ",
type.len);
- fmt::format_to(error_msg, "limit length: {}; ", limit);
- fmt::format_to(error_msg, "actual length: {}; ",
str_val.size);
+
+ auto* __restrict offsets = column_string->get_offsets().data();
+ for (int j = 0; j < row_count; ++j) {
+ invalid_count += (offsets[j] - offsets[j - 1]) > limit;
+ }
+
+ if (invalid_count) {
+ for (size_t j = 0; j < row_count; ++j) {
+ auto row = rows ? (*rows)[j] : j;
+ if (row == last_invalid_row) {
+ continue;
+ }
+ if (need_to_validate(j, row)) {
+ auto str_val = column_string->get_data_at(j);
+ bool invalid = str_val.size > limit;
+ if (invalid) {
+ last_invalid_row = row;
+ if (str_val.size > type.len) {
+ fmt::format_to(error_msg, "{}",
+ "the length of input is too long
than schema. ");
+ fmt::format_to(error_msg, "first 32 bytes of input
str: [{}] ",
+ str_val.to_prefix(32));
+ fmt::format_to(error_msg, "schema length: {}; ",
type.len);
+ fmt::format_to(error_msg, "actual length: {}; ",
str_val.size);
+ } else if (str_val.size > limit) {
+ fmt::format_to(
+ error_msg, "{}",
+ "the length of input string is too long
than vec schema. ");
+ fmt::format_to(error_msg, "first 32 bytes of input
str: [{}] ",
+ str_val.to_prefix(32));
+ fmt::format_to(error_msg, "schema length: {}; ",
type.len);
+ fmt::format_to(error_msg, "limit length: {}; ",
limit);
+ fmt::format_to(error_msg, "actual length: {}; ",
str_val.size);
+ }
+ RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
}
- RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
}
}
}
@@ -1775,7 +1786,7 @@ Status VOlapTableSink::_validate_column(RuntimeState*
state, const TypeDescripto
const auto column_string =
assert_cast<const
vectorized::ColumnString*>(real_column_ptr.get());
for (size_t j = 0; j < column->size(); ++j) {
- if (!filter_bitmap->Get(j)) {
+ if (!filter_bitmap[j]) {
if (is_nullable && column_ptr && column_ptr->is_null_at(j)) {
continue;
}
@@ -1835,32 +1846,41 @@ Status VOlapTableSink::_validate_column(RuntimeState*
state, const TypeDescripto
break;
}
case TYPE_DECIMAL32: {
-#define CHECK_VALIDATION_FOR_DECIMALV3(ColumnDecimalType, DecimalType)
\
- auto column_decimal =
const_cast<vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>( \
- assert_cast<const
vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>( \
- real_column_ptr.get()));
\
- const auto& max_decimal =
_get_decimalv3_min_or_max<vectorized::DecimalType, false>(type); \
- const auto& min_decimal =
_get_decimalv3_min_or_max<vectorized::DecimalType, true>(type); \
- for (size_t j = 0; j < column->size(); ++j) {
\
- auto row = rows ? (*rows)[j] : j;
\
- if (row == last_invalid_row) {
\
- continue;
\
- }
\
- if (need_to_validate(j, row)) {
\
- auto dec_val = column_decimal->get_data()[j];
\
- bool invalid = false;
\
- if (dec_val > max_decimal || dec_val < min_decimal) {
\
- fmt::format_to(error_msg, "{}", "decimal value is not valid
for definition"); \
- fmt::format_to(error_msg, ", value={}", dec_val);
\
- fmt::format_to(error_msg, ", precision={}, scale={}",
type.precision, type.scale); \
- fmt::format_to(error_msg, ", min={}, max={}; ", min_decimal,
max_decimal); \
- invalid = true;
\
- }
\
- if (invalid) {
\
- last_invalid_row = row;
\
- RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
\
- }
\
- }
\
+#define CHECK_VALIDATION_FOR_DECIMALV3(ColumnDecimalType, DecimalType)
\
+ auto column_decimal =
const_cast<vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>( \
+ assert_cast<const
vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>( \
+ real_column_ptr.get()));
\
+ const auto& max_decimal =
_get_decimalv3_min_or_max<vectorized::DecimalType, false>(type); \
+ const auto& min_decimal =
_get_decimalv3_min_or_max<vectorized::DecimalType, true>(type); \
+ const auto* __restrict datas = column_decimal->get_data().data();
\
+ int invalid_count = 0;
\
+ for (int j = 0; j < row_count; ++j) {
\
+ const auto dec_val = datas[j];
\
+ invalid_count += dec_val > max_decimal || dec_val < min_decimal;
\
+ }
\
+ if (invalid_count) {
\
+ for (size_t j = 0; j < row_count; ++j) {
\
+ auto row = rows ? (*rows)[j] : j;
\
+ if (row == last_invalid_row) {
\
+ continue;
\
+ }
\
+ if (need_to_validate(j, row)) {
\
+ auto dec_val = column_decimal->get_data()[j];
\
+ bool invalid = false;
\
+ if (dec_val > max_decimal || dec_val < min_decimal) {
\
+ fmt::format_to(error_msg, "{}", "decimal value is not
valid for definition"); \
+ fmt::format_to(error_msg, ", value={}", dec_val);
\
+ fmt::format_to(error_msg, ", precision={}, scale={}",
type.precision, \
+ type.scale);
\
+ fmt::format_to(error_msg, ", min={}, max={}; ",
min_decimal, max_decimal); \
+ invalid = true;
\
+ }
\
+ if (invalid) {
\
+ last_invalid_row = row;
\
+ RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
\
+ }
\
+ }
\
+ }
\
}
CHECK_VALIDATION_FOR_DECIMALV3(Decimal32, Decimal32);
break;
@@ -1938,7 +1958,7 @@ Status VOlapTableSink::_validate_column(RuntimeState*
state, const TypeDescripto
if (row == last_invalid_row) {
continue;
}
- if (null_map[j] && !filter_bitmap->Get(row)) {
+ if (null_map[j] && !filter_bitmap[row]) {
fmt::format_to(error_msg, "null value for not null column,
type={}",
type.debug_string());
last_invalid_row = row;
@@ -1951,7 +1971,7 @@ Status VOlapTableSink::_validate_column(RuntimeState*
state, const TypeDescripto
}
Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block*
block,
- Bitmap* filter_bitmap, int*
filtered_rows,
+ std::vector<char>& filter_bitmap, int*
filtered_rows,
bool* stop_processing) {
for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
SlotDescriptor* desc = _output_tuple_desc->slots()[i];
@@ -1965,10 +1985,12 @@ Status VOlapTableSink::_validate_data(RuntimeState*
state, vectorized::Block* bl
filter_bitmap, stop_processing,
error_prefix));
}
- *filtered_rows = 0;
- for (int i = 0; i < block->rows(); ++i) {
- *filtered_rows += filter_bitmap->Get(i);
+ const auto rows = block->rows();
+ auto count = 0;
+ for (int i = 0; i < rows; ++i) {
+ count += filter_bitmap[i];
}
+ *filtered_rows = count;
return Status::OK();
}
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index acde67e38e7..6b7e0187c49 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -529,8 +529,9 @@ private:
// return number of invalid/filtered rows.
// invalid row number is set in Bitmap
// set stop_processing if we want to stop the whole process now.
- Status _validate_data(RuntimeState* state, vectorized::Block* block,
Bitmap* filter_bitmap,
- int* filtered_rows, bool* stop_processing);
+ Status _validate_data(RuntimeState* state, vectorized::Block* block,
+ std::vector<char>& filter_bitmap, int* filtered_rows,
+ bool* stop_processing);
template <bool is_min>
DecimalV2Value _get_decimalv2_min_or_max(const TypeDescriptor& type);
@@ -539,8 +540,9 @@ private:
DecimalType _get_decimalv3_min_or_max(const TypeDescriptor& type);
Status _validate_column(RuntimeState* state, const TypeDescriptor& type,
bool is_nullable,
- vectorized::ColumnPtr column, size_t slot_index,
Bitmap* filter_bitmap,
- bool* stop_processing, fmt::memory_buffer&
error_prefix,
+ vectorized::ColumnPtr column, size_t slot_index,
+ std::vector<char>& filter_bitmap, bool*
stop_processing,
+ fmt::memory_buffer& error_prefix,
vectorized::IColumn::Permutation* rows = nullptr);
// some output column of output expr may have different nullable property
with dest slot desc
@@ -592,7 +594,7 @@ private:
// only used for partition with random distribution
std::map<int64_t, int64_t> _partition_to_tablet_map;
- Bitmap _filter_bitmap;
+ std::vector<char> _filter_bitmap;
// index_channel
std::vector<std::shared_ptr<IndexChannel>> _channels;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]