This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 7840ae28459 [Performance](sink) SIMD the tablet sink valied data 
function (#25770)
7840ae28459 is described below

commit 7840ae284599fbd5a13244ec03454e2b103ef57a
Author: HappenLee <[email protected]>
AuthorDate: Mon Oct 23 23:25:12 2023 +0800

    [Performance](sink) SIMD the tablet sink valied data function (#25770)
---
 be/src/vec/sink/vtablet_sink.cpp | 156 ++++++++++++++++++++++-----------------
 be/src/vec/sink/vtablet_sink.h   |  12 +--
 2 files changed, 96 insertions(+), 72 deletions(-)

diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index a112a1f3f21..975e053b5e2 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -1252,7 +1252,7 @@ Status 
VOlapTableSink::_single_partition_generate(RuntimeState* state, vectorize
     uint32_t tablet_index = 0;
     bool stop_processing = false;
     for (int32_t i = 0; i < num_rows; ++i) {
-        if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+        if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap[i]) {
             continue;
         }
         bool is_continue = false;
@@ -1282,7 +1282,7 @@ Status 
VOlapTableSink::_single_partition_generate(RuntimeState* state, vectorize
             auto& selector = channel_to_payload[j][channel.get()].first;
             auto& tablet_ids = channel_to_payload[j][channel.get()].second;
             for (int32_t i = 0; i < num_rows; ++i) {
-                if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+                if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap[i]) {
                     continue;
                 }
                 selector->push_back(i);
@@ -1328,10 +1328,10 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block,
     int filtered_rows = 0;
     {
         SCOPED_RAW_TIMER(&_validate_data_ns);
-        _filter_bitmap.Reset(block.rows());
+        _filter_bitmap.resize(block.rows());
         bool stop_processing = false;
         RETURN_IF_ERROR(
-                _validate_data(state, &block, &_filter_bitmap, &filtered_rows, 
&stop_processing));
+                _validate_data(state, &block, _filter_bitmap, &filtered_rows, 
&stop_processing));
         _number_filtered_rows += filtered_rows;
         if (stop_processing) {
             // should be returned after updating "_number_filtered_rows", to 
make sure that load job can be cancelled
@@ -1357,7 +1357,7 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block,
                                                    filtered_rows));
     } else {
         for (int i = 0; i < num_rows; ++i) {
-            if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+            if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap[i]) {
                 continue;
             }
             const VOlapTablePartition* partition = nullptr;
@@ -1389,7 +1389,7 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block,
             vectorized::IColumn::Filter& filter_col =
                     
static_cast<vectorized::ColumnUInt8*>(filter.get())->get_data();
             for (size_t i = 0; i < filter_col.size(); ++i) {
-                filter_data[i] = !_filter_bitmap.Get(i);
+                filter_data[i] = !_filter_bitmap[i];
             }
             RETURN_IF_CATCH_EXCEPTION(
                     vectorized::Block::filter_block_internal(&block, 
filter_col, block.columns()));
@@ -1702,13 +1702,13 @@ DecimalType 
VOlapTableSink::_get_decimalv3_min_or_max(const TypeDescriptor& type
 
 Status VOlapTableSink::_validate_column(RuntimeState* state, const 
TypeDescriptor& type,
                                         bool is_nullable, 
vectorized::ColumnPtr column,
-                                        size_t slot_index, Bitmap* 
filter_bitmap,
+                                        size_t slot_index, std::vector<char>& 
filter_bitmap,
                                         bool* stop_processing, 
fmt::memory_buffer& error_prefix,
                                         vectorized::IColumn::Permutation* 
rows) {
     DCHECK((rows == nullptr) || (rows->size() == column->size()));
     fmt::memory_buffer error_msg;
     auto set_invalid_and_append_error_msg = [&](int row) {
-        filter_bitmap->Set(row, true);
+        filter_bitmap[row] = true;
         auto ret = state->append_error_msg_to_file([]() -> std::string { 
return ""; },
                                                    [&error_prefix, 
&error_msg]() -> std::string {
                                                        return 
fmt::to_string(error_prefix) +
@@ -1723,8 +1723,10 @@ Status VOlapTableSink::_validate_column(RuntimeState* 
state, const TypeDescripto
     auto& real_column_ptr = column_ptr == nullptr ? column : 
(column_ptr->get_nested_column_ptr());
     auto null_map = column_ptr == nullptr ? nullptr : 
column_ptr->get_null_map_data().data();
     auto need_to_validate = [&null_map, &filter_bitmap](size_t j, size_t row) {
-        return !filter_bitmap->Get(row) && (null_map == nullptr || null_map[j] 
== 0);
+        return !filter_bitmap[row] && (null_map == nullptr || null_map[j] == 
0);
     };
+    const auto row_count = column->size();
+    int invalid_count = 0;
 
     ssize_t last_invalid_row = -1;
     switch (type.type) {
@@ -1739,33 +1741,42 @@ Status VOlapTableSink::_validate_column(RuntimeState* 
state, const TypeDescripto
         if (type.len > 0) {
             limit = std::min(config::string_type_length_soft_limit_bytes, 
type.len);
         }
-        for (size_t j = 0; j < column->size(); ++j) {
-            auto row = rows ? (*rows)[j] : j;
-            if (row == last_invalid_row) {
-                continue;
-            }
-            if (need_to_validate(j, row)) {
-                auto str_val = column_string->get_data_at(j);
-                bool invalid = str_val.size > limit;
-                if (invalid) {
-                    last_invalid_row = row;
-                    if (str_val.size > type.len) {
-                        fmt::format_to(error_msg, "{}",
-                                       "the length of input is too long than 
schema. ");
-                        fmt::format_to(error_msg, "first 32 bytes of input 
str: [{}] ",
-                                       str_val.to_prefix(32));
-                        fmt::format_to(error_msg, "schema length: {}; ", 
type.len);
-                        fmt::format_to(error_msg, "actual length: {}; ", 
str_val.size);
-                    } else if (str_val.size > limit) {
-                        fmt::format_to(error_msg, "{}",
-                                       "the length of input string is too long 
than vec schema. ");
-                        fmt::format_to(error_msg, "first 32 bytes of input 
str: [{}] ",
-                                       str_val.to_prefix(32));
-                        fmt::format_to(error_msg, "schema length: {}; ", 
type.len);
-                        fmt::format_to(error_msg, "limit length: {}; ", limit);
-                        fmt::format_to(error_msg, "actual length: {}; ", 
str_val.size);
+
+        auto* __restrict offsets = column_string->get_offsets().data();
+        for (int j = 0; j < row_count; ++j) {
+            invalid_count += (offsets[j] - offsets[j - 1]) > limit;
+        }
+
+        if (invalid_count) {
+            for (size_t j = 0; j < row_count; ++j) {
+                auto row = rows ? (*rows)[j] : j;
+                if (row == last_invalid_row) {
+                    continue;
+                }
+                if (need_to_validate(j, row)) {
+                    auto str_val = column_string->get_data_at(j);
+                    bool invalid = str_val.size > limit;
+                    if (invalid) {
+                        last_invalid_row = row;
+                        if (str_val.size > type.len) {
+                            fmt::format_to(error_msg, "{}",
+                                           "the length of input is too long 
than schema. ");
+                            fmt::format_to(error_msg, "first 32 bytes of input 
str: [{}] ",
+                                           str_val.to_prefix(32));
+                            fmt::format_to(error_msg, "schema length: {}; ", 
type.len);
+                            fmt::format_to(error_msg, "actual length: {}; ", 
str_val.size);
+                        } else if (str_val.size > limit) {
+                            fmt::format_to(
+                                    error_msg, "{}",
+                                    "the length of input string is too long 
than vec schema. ");
+                            fmt::format_to(error_msg, "first 32 bytes of input 
str: [{}] ",
+                                           str_val.to_prefix(32));
+                            fmt::format_to(error_msg, "schema length: {}; ", 
type.len);
+                            fmt::format_to(error_msg, "limit length: {}; ", 
limit);
+                            fmt::format_to(error_msg, "actual length: {}; ", 
str_val.size);
+                        }
+                        RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
                     }
-                    RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
                 }
             }
         }
@@ -1775,7 +1786,7 @@ Status VOlapTableSink::_validate_column(RuntimeState* 
state, const TypeDescripto
         const auto column_string =
                 assert_cast<const 
vectorized::ColumnString*>(real_column_ptr.get());
         for (size_t j = 0; j < column->size(); ++j) {
-            if (!filter_bitmap->Get(j)) {
+            if (!filter_bitmap[j]) {
                 if (is_nullable && column_ptr && column_ptr->is_null_at(j)) {
                     continue;
                 }
@@ -1835,32 +1846,41 @@ Status VOlapTableSink::_validate_column(RuntimeState* 
state, const TypeDescripto
         break;
     }
     case TYPE_DECIMAL32: {
-#define CHECK_VALIDATION_FOR_DECIMALV3(ColumnDecimalType, DecimalType)         
                    \
-    auto column_decimal = 
const_cast<vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>(   \
-            assert_cast<const 
vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>(          \
-                    real_column_ptr.get()));                                   
                    \
-    const auto& max_decimal = 
_get_decimalv3_min_or_max<vectorized::DecimalType, false>(type);     \
-    const auto& min_decimal = 
_get_decimalv3_min_or_max<vectorized::DecimalType, true>(type);      \
-    for (size_t j = 0; j < column->size(); ++j) {                              
                    \
-        auto row = rows ? (*rows)[j] : j;                                      
                    \
-        if (row == last_invalid_row) {                                         
                    \
-            continue;                                                          
                    \
-        }                                                                      
                    \
-        if (need_to_validate(j, row)) {                                        
                    \
-            auto dec_val = column_decimal->get_data()[j];                      
                    \
-            bool invalid = false;                                              
                    \
-            if (dec_val > max_decimal || dec_val < min_decimal) {              
                    \
-                fmt::format_to(error_msg, "{}", "decimal value is not valid 
for definition");      \
-                fmt::format_to(error_msg, ", value={}", dec_val);              
                    \
-                fmt::format_to(error_msg, ", precision={}, scale={}", 
type.precision, type.scale); \
-                fmt::format_to(error_msg, ", min={}, max={}; ", min_decimal, 
max_decimal);         \
-                invalid = true;                                                
                    \
-            }                                                                  
                    \
-            if (invalid) {                                                     
                    \
-                last_invalid_row = row;                                        
                    \
-                RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));        
                    \
-            }                                                                  
                    \
-        }                                                                      
                    \
+#define CHECK_VALIDATION_FOR_DECIMALV3(ColumnDecimalType, DecimalType)         
                   \
+    auto column_decimal = 
const_cast<vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>(  \
+            assert_cast<const 
vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>(         \
+                    real_column_ptr.get()));                                   
                   \
+    const auto& max_decimal = 
_get_decimalv3_min_or_max<vectorized::DecimalType, false>(type);    \
+    const auto& min_decimal = 
_get_decimalv3_min_or_max<vectorized::DecimalType, true>(type);     \
+    const auto* __restrict datas = column_decimal->get_data().data();          
                   \
+    int invalid_count = 0;                                                     
                   \
+    for (int j = 0; j < row_count; ++j) {                                      
                   \
+        const auto dec_val = datas[j];                                         
                   \
+        invalid_count += dec_val > max_decimal || dec_val < min_decimal;       
                   \
+    }                                                                          
                   \
+    if (invalid_count) {                                                       
                   \
+        for (size_t j = 0; j < row_count; ++j) {                               
                   \
+            auto row = rows ? (*rows)[j] : j;                                  
                   \
+            if (row == last_invalid_row) {                                     
                   \
+                continue;                                                      
                   \
+            }                                                                  
                   \
+            if (need_to_validate(j, row)) {                                    
                   \
+                auto dec_val = column_decimal->get_data()[j];                  
                   \
+                bool invalid = false;                                          
                   \
+                if (dec_val > max_decimal || dec_val < min_decimal) {          
                   \
+                    fmt::format_to(error_msg, "{}", "decimal value is not 
valid for definition"); \
+                    fmt::format_to(error_msg, ", value={}", dec_val);          
                   \
+                    fmt::format_to(error_msg, ", precision={}, scale={}", 
type.precision,         \
+                                   type.scale);                                
                   \
+                    fmt::format_to(error_msg, ", min={}, max={}; ", 
min_decimal, max_decimal);    \
+                    invalid = true;                                            
                   \
+                }                                                              
                   \
+                if (invalid) {                                                 
                   \
+                    last_invalid_row = row;                                    
                   \
+                    RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));    
                   \
+                }                                                              
                   \
+            }                                                                  
                   \
+        }                                                                      
                   \
     }
         CHECK_VALIDATION_FOR_DECIMALV3(Decimal32, Decimal32);
         break;
@@ -1938,7 +1958,7 @@ Status VOlapTableSink::_validate_column(RuntimeState* 
state, const TypeDescripto
             if (row == last_invalid_row) {
                 continue;
             }
-            if (null_map[j] && !filter_bitmap->Get(row)) {
+            if (null_map[j] && !filter_bitmap[row]) {
                 fmt::format_to(error_msg, "null value for not null column, 
type={}",
                                type.debug_string());
                 last_invalid_row = row;
@@ -1951,7 +1971,7 @@ Status VOlapTableSink::_validate_column(RuntimeState* 
state, const TypeDescripto
 }
 
 Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block* 
block,
-                                      Bitmap* filter_bitmap, int* 
filtered_rows,
+                                      std::vector<char>& filter_bitmap, int* 
filtered_rows,
                                       bool* stop_processing) {
     for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
         SlotDescriptor* desc = _output_tuple_desc->slots()[i];
@@ -1965,10 +1985,12 @@ Status VOlapTableSink::_validate_data(RuntimeState* 
state, vectorized::Block* bl
                                          filter_bitmap, stop_processing, 
error_prefix));
     }
 
-    *filtered_rows = 0;
-    for (int i = 0; i < block->rows(); ++i) {
-        *filtered_rows += filter_bitmap->Get(i);
+    const auto rows = block->rows();
+    auto count = 0;
+    for (int i = 0; i < rows; ++i) {
+        count += filter_bitmap[i];
     }
+    *filtered_rows = count;
     return Status::OK();
 }
 
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index acde67e38e7..6b7e0187c49 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -529,8 +529,9 @@ private:
     // return number of invalid/filtered rows.
     // invalid row number is set in Bitmap
     // set stop_processing if we want to stop the whole process now.
-    Status _validate_data(RuntimeState* state, vectorized::Block* block, 
Bitmap* filter_bitmap,
-                          int* filtered_rows, bool* stop_processing);
+    Status _validate_data(RuntimeState* state, vectorized::Block* block,
+                          std::vector<char>& filter_bitmap, int* filtered_rows,
+                          bool* stop_processing);
 
     template <bool is_min>
     DecimalV2Value _get_decimalv2_min_or_max(const TypeDescriptor& type);
@@ -539,8 +540,9 @@ private:
     DecimalType _get_decimalv3_min_or_max(const TypeDescriptor& type);
 
     Status _validate_column(RuntimeState* state, const TypeDescriptor& type, 
bool is_nullable,
-                            vectorized::ColumnPtr column, size_t slot_index, 
Bitmap* filter_bitmap,
-                            bool* stop_processing, fmt::memory_buffer& 
error_prefix,
+                            vectorized::ColumnPtr column, size_t slot_index,
+                            std::vector<char>& filter_bitmap, bool* 
stop_processing,
+                            fmt::memory_buffer& error_prefix,
                             vectorized::IColumn::Permutation* rows = nullptr);
 
     // some output column of output expr may have different nullable property 
with dest slot desc
@@ -592,7 +594,7 @@ private:
     // only used for partition with random distribution
     std::map<int64_t, int64_t> _partition_to_tablet_map;
 
-    Bitmap _filter_bitmap;
+    std::vector<char> _filter_bitmap;
 
     // index_channel
     std::vector<std::shared_ptr<IndexChannel>> _channels;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to