This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 688a1bc059 [refactor](load) expand OlapTableValidator to 
VOlapTableBlockConvertor (#21476)
688a1bc059 is described below

commit 688a1bc059ad99b51392658c9eb8e32345ade2b8
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Jul 6 10:11:53 2023 +0800

    [refactor](load) expand OlapTableValidator to VOlapTableBlockConvertor 
(#21476)
---
 ...t_validator.cpp => vtablet_block_convertor.cpp} | 85 +++++++++++++++-------
 ...ablet_validator.h => vtablet_block_convertor.h} | 45 ++++++++----
 be/src/vec/sink/vtablet_sink.cpp                   | 73 ++++++++-----------
 be/src/vec/sink/vtablet_sink.h                     | 10 +--
 4 files changed, 121 insertions(+), 92 deletions(-)

diff --git a/be/src/vec/sink/vtablet_validator.cpp 
b/be/src/vec/sink/vtablet_block_convertor.cpp
similarity index 84%
rename from be/src/vec/sink/vtablet_validator.cpp
rename to be/src/vec/sink/vtablet_block_convertor.cpp
index 39cc92f0f4..42e1b48d15 100644
--- a/be/src/vec/sink/vtablet_validator.cpp
+++ b/be/src/vec/sink/vtablet_block_convertor.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "vec/sink/vtablet_validator.h"
+#include "vec/sink/vtablet_block_convertor.h"
 
 #include <fmt/format.h>
 #include <google/protobuf/stubs/common.h>
@@ -54,8 +54,40 @@
 namespace doris {
 namespace stream_load {
 
+Status OlapTableBlockConvertor::validate_and_convert_block(
+        RuntimeState* state, vectorized::Block* input_block,
+        std::shared_ptr<vectorized::Block>& block, 
vectorized::VExprContextSPtrs output_vexpr_ctxs,
+        bool& has_filtered_rows) {
+    DCHECK(input_block->rows() > 0);
+
+    block = 
vectorized::Block::create_shared(input_block->get_columns_with_type_and_name());
+    if (!output_vexpr_ctxs.empty()) {
+        // Do vectorized expr here to speed up load
+        
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
+                output_vexpr_ctxs, *input_block, block.get()));
+    }
+
+    int64_t filtered_rows = 0;
+    {
+        SCOPED_RAW_TIMER(&_validate_data_ns);
+        _filter_bitmap.Reset(block->rows());
+        bool stop_processing = false;
+        RETURN_IF_ERROR(_validate_data(state, block.get(), filtered_rows, 
&stop_processing));
+        _num_filtered_rows += filtered_rows;
+        has_filtered_rows = filtered_rows > 0;
+        if (stop_processing) {
+            // should be returned after updating "_number_filtered_rows", to 
make sure that load job can be cancelled
+            // because of "data unqualified"
+            return Status::EndOfFile("Encountered unqualified data, stop 
processing");
+        }
+        _convert_to_dest_desc_block(block.get());
+    }
+
+    return Status::OK();
+}
+
 template <bool is_min>
-DecimalV2Value OlapTableValidator::_get_decimalv2_min_or_max(const 
TypeDescriptor& type) {
+DecimalV2Value OlapTableBlockConvertor::_get_decimalv2_min_or_max(const 
TypeDescriptor& type) {
     std::map<std::pair<int, int>, DecimalV2Value>* pmap;
     if constexpr (is_min) {
         pmap = &_min_decimalv2_val;
@@ -81,7 +113,7 @@ DecimalV2Value 
OlapTableValidator::_get_decimalv2_min_or_max(const TypeDescripto
 }
 
 template <typename DecimalType, bool IsMin>
-DecimalType OlapTableValidator::_get_decimalv3_min_or_max(const 
TypeDescriptor& type) {
+DecimalType OlapTableBlockConvertor::_get_decimalv3_min_or_max(const 
TypeDescriptor& type) {
     std::map<int, typename DecimalType::NativeType>* pmap;
     if constexpr (std::is_same_v<DecimalType, vectorized::Decimal32>) {
         pmap = IsMin ? &_min_decimal32_val : &_max_decimal32_val;
@@ -107,15 +139,15 @@ DecimalType 
OlapTableValidator::_get_decimalv3_min_or_max(const TypeDescriptor&
     return value;
 }
 
-Status OlapTableValidator::_validate_column(RuntimeState* state, const 
TypeDescriptor& type,
-                                            bool is_nullable, 
vectorized::ColumnPtr column,
-                                            size_t slot_index, Bitmap* 
filter_bitmap,
-                                            bool* stop_processing, 
fmt::memory_buffer& error_prefix,
-                                            vectorized::IColumn::Permutation* 
rows) {
+Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const 
TypeDescriptor& type,
+                                                 bool is_nullable, 
vectorized::ColumnPtr column,
+                                                 size_t slot_index, bool* 
stop_processing,
+                                                 fmt::memory_buffer& 
error_prefix,
+                                                 
vectorized::IColumn::Permutation* rows) {
     DCHECK((rows == nullptr) || (rows->size() == column->size()));
     fmt::memory_buffer error_msg;
     auto set_invalid_and_append_error_msg = [&](int row) {
-        filter_bitmap->Set(row, true);
+        _filter_bitmap.Set(row, true);
         auto ret = state->append_error_msg_to_file([]() -> std::string { 
return ""; },
                                                    [&error_prefix, 
&error_msg]() -> std::string {
                                                        return 
fmt::to_string(error_prefix) +
@@ -129,8 +161,8 @@ Status OlapTableValidator::_validate_column(RuntimeState* 
state, const TypeDescr
     auto column_ptr = 
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column);
     auto& real_column_ptr = column_ptr == nullptr ? column : 
(column_ptr->get_nested_column_ptr());
     auto null_map = column_ptr == nullptr ? nullptr : 
column_ptr->get_null_map_data().data();
-    auto need_to_validate = [&null_map, &filter_bitmap](size_t j, size_t row) {
-        return !filter_bitmap->Get(row) && (null_map == nullptr || null_map[j] 
== 0);
+    auto need_to_validate = [&null_map, this](size_t j, size_t row) {
+        return !_filter_bitmap.Get(row) && (null_map == nullptr || null_map[j] 
== 0);
     };
 
     ssize_t last_invalid_row = -1;
@@ -182,7 +214,7 @@ Status OlapTableValidator::_validate_column(RuntimeState* 
state, const TypeDescr
         const auto column_string =
                 assert_cast<const 
vectorized::ColumnString*>(real_column_ptr.get());
         for (size_t j = 0; j < column->size(); ++j) {
-            if (!filter_bitmap->Get(j)) {
+            if (!_filter_bitmap.Get(j)) {
                 if (is_nullable && column_ptr && column_ptr->is_null_at(j)) {
                     continue;
                 }
@@ -294,8 +326,8 @@ Status OlapTableValidator::_validate_column(RuntimeState* 
state, const TypeDescr
         }
         fmt::format_to(error_prefix, "ARRAY type failed: ");
         RETURN_IF_ERROR(_validate_column(state, nested_type, 
type.contains_nulls[0],
-                                         column_array->get_data_ptr(), 
slot_index, filter_bitmap,
-                                         stop_processing, error_prefix, 
&permutation));
+                                         column_array->get_data_ptr(), 
slot_index, stop_processing,
+                                         error_prefix, &permutation));
         break;
     }
     case TYPE_MAP: {
@@ -312,11 +344,11 @@ Status OlapTableValidator::_validate_column(RuntimeState* 
state, const TypeDescr
         }
         fmt::format_to(error_prefix, "MAP type failed: ");
         RETURN_IF_ERROR(_validate_column(state, key_type, 
type.contains_nulls[0],
-                                         column_map->get_keys_ptr(), 
slot_index, filter_bitmap,
-                                         stop_processing, error_prefix, 
&permutation));
+                                         column_map->get_keys_ptr(), 
slot_index, stop_processing,
+                                         error_prefix, &permutation));
         RETURN_IF_ERROR(_validate_column(state, val_type, 
type.contains_nulls[1],
-                                         column_map->get_values_ptr(), 
slot_index, filter_bitmap,
-                                         stop_processing, error_prefix, 
&permutation));
+                                         column_map->get_values_ptr(), 
slot_index, stop_processing,
+                                         error_prefix, &permutation));
         break;
     }
     case TYPE_STRUCT: {
@@ -327,7 +359,7 @@ Status OlapTableValidator::_validate_column(RuntimeState* 
state, const TypeDescr
         for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) {
             RETURN_IF_ERROR(_validate_column(state, type.children[sc], 
type.contains_nulls[sc],
                                              
column_struct->get_column_ptr(sc), slot_index,
-                                             filter_bitmap, stop_processing, 
error_prefix));
+                                             stop_processing, error_prefix));
         }
         break;
     }
@@ -345,7 +377,7 @@ Status OlapTableValidator::_validate_column(RuntimeState* 
state, const TypeDescr
             if (row == last_invalid_row) {
                 continue;
             }
-            if (null_map[j] && !filter_bitmap->Get(row)) {
+            if (null_map[j] && !_filter_bitmap.Get(row)) {
                 fmt::format_to(error_msg, "null value for not null column, 
type={}",
                                type.debug_string());
                 last_invalid_row = row;
@@ -357,9 +389,8 @@ Status OlapTableValidator::_validate_column(RuntimeState* 
state, const TypeDescr
     return Status::OK();
 }
 
-Status OlapTableValidator::validate_data(RuntimeState* state, 
vectorized::Block* block,
-                                         Bitmap* filter_bitmap, int* 
filtered_rows,
-                                         bool* stop_processing) {
+Status OlapTableBlockConvertor::_validate_data(RuntimeState* state, 
vectorized::Block* block,
+                                               int64_t& filtered_rows, bool* 
stop_processing) {
     for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
         SlotDescriptor* desc = _output_tuple_desc->slots()[i];
         block->get_by_position(i).column =
@@ -369,17 +400,17 @@ Status OlapTableValidator::validate_data(RuntimeState* 
state, vectorized::Block*
         fmt::memory_buffer error_prefix;
         fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name());
         RETURN_IF_ERROR(_validate_column(state, desc->type(), 
desc->is_nullable(), column, i,
-                                         filter_bitmap, stop_processing, 
error_prefix));
+                                         stop_processing, error_prefix));
     }
 
-    *filtered_rows = 0;
+    filtered_rows = 0;
     for (int i = 0; i < block->rows(); ++i) {
-        *filtered_rows += filter_bitmap->Get(i);
+        filtered_rows += _filter_bitmap.Get(i);
     }
     return Status::OK();
 }
 
-void OlapTableValidator::convert_to_dest_desc_block(doris::vectorized::Block* 
block) {
+void 
OlapTableBlockConvertor::_convert_to_dest_desc_block(doris::vectorized::Block* 
block) {
     for (int i = 0; i < _output_tuple_desc->slots().size() && i < 
block->columns(); ++i) {
         SlotDescriptor* desc = _output_tuple_desc->slots()[i];
         if (desc->is_nullable() != 
block->get_by_position(i).type->is_nullable()) {
diff --git a/be/src/vec/sink/vtablet_validator.h 
b/be/src/vec/sink/vtablet_block_convertor.h
similarity index 68%
rename from be/src/vec/sink/vtablet_validator.h
rename to be/src/vec/sink/vtablet_block_convertor.h
index d610f7680d..751c969396 100644
--- a/be/src/vec/sink/vtablet_validator.h
+++ b/be/src/vec/sink/vtablet_block_convertor.h
@@ -29,25 +29,26 @@
 #include "util/bitmap.h"
 #include "vec/columns/column.h"
 #include "vec/core/block.h"
+#include "vec/exprs/vexpr_fwd.h"
 
 namespace doris {
 namespace stream_load {
 
-class OlapTableValidator {
+class OlapTableBlockConvertor {
 public:
-    OlapTableValidator(TupleDescriptor* output_tuple_desc)
-            : _output_tuple_desc(output_tuple_desc) {}
+    OlapTableBlockConvertor(TupleDescriptor* output_tuple_desc)
+            : _output_tuple_desc(output_tuple_desc), _filter_bitmap(1024) {}
 
-    // make input data valid for OLAP table
-    // return number of invalid/filtered rows.
-    // invalid row number is set in Bitmap
-    // set stop_processing if we want to stop the whole process now.
-    Status validate_data(RuntimeState* state, vectorized::Block* block, 
Bitmap* filter_bitmap,
-                         int* filtered_rows, bool* stop_processing);
+    Status validate_and_convert_block(RuntimeState* state, vectorized::Block* 
input_block,
+                                      std::shared_ptr<vectorized::Block>& 
block,
+                                      vectorized::VExprContextSPtrs 
output_vexpr_ctxs,
+                                      bool& has_filtered_rows);
 
-    // some output column of output expr may have different nullable property 
with dest slot desc
-    // so here need to do the convert operation
-    void convert_to_dest_desc_block(vectorized::Block* block);
+    const Bitmap& filter_bitmap() { return _filter_bitmap; }
+
+    int64_t validate_data_ns() const { return _validate_data_ns; }
+
+    int64_t num_filtered_rows() const { return _num_filtered_rows; }
 
 private:
     template <bool is_min>
@@ -57,10 +58,21 @@ private:
     DecimalType _get_decimalv3_min_or_max(const TypeDescriptor& type);
 
     Status _validate_column(RuntimeState* state, const TypeDescriptor& type, 
bool is_nullable,
-                            vectorized::ColumnPtr column, size_t slot_index, 
Bitmap* filter_bitmap,
-                            bool* stop_processing, fmt::memory_buffer& 
error_prefix,
+                            vectorized::ColumnPtr column, size_t slot_index, 
bool* stop_processing,
+                            fmt::memory_buffer& error_prefix,
                             vectorized::IColumn::Permutation* rows = nullptr);
 
+    // make input data valid for OLAP table
+    // return number of invalid/filtered rows.
+    // invalid row number is set in Bitmap
+    // set stop_processing if we want to stop the whole process now.
+    Status _validate_data(RuntimeState* state, vectorized::Block* block, 
int64_t& filtered_rows,
+                          bool* stop_processing);
+
+    // some output column of output expr may have different nullable property 
with dest slot desc
+    // so here need to do the convert operation
+    void _convert_to_dest_desc_block(vectorized::Block* block);
+
     TupleDescriptor* _output_tuple_desc = nullptr;
 
     std::map<std::pair<int, int>, DecimalV2Value> _max_decimalv2_val;
@@ -72,6 +84,11 @@ private:
     std::map<int, int64_t> _min_decimal64_val;
     std::map<int, int128_t> _max_decimal128_val;
     std::map<int, int128_t> _min_decimal128_val;
+
+    Bitmap _filter_bitmap;
+
+    int64_t _validate_data_ns = 0;
+    int64_t _num_filtered_rows = 0;
 };
 
 } // namespace stream_load
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index e1870e48e8..568c67ce1f 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -89,7 +89,7 @@
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
-#include "vec/sink/vtablet_validator.h"
+#include "vec/sink/vtablet_block_convertor.h"
 
 namespace doris {
 class TExpr;
@@ -983,7 +983,7 @@ void VNodeChannel::mark_close() {
 
 VOlapTableSink::VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
                                const std::vector<TExpr>& texprs, Status* 
status)
-        : _pool(pool), _input_row_desc(row_desc), _filter_bitmap(1024) {
+        : _pool(pool), _input_row_desc(row_desc) {
     // From the thrift expressions create the real exprs.
     *status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs);
     _name = "VOlapTableSink";
@@ -1066,7 +1066,7 @@ Status VOlapTableSink::prepare(RuntimeState* state) {
         return Status::InternalError("unknown destination tuple descriptor");
     }
 
-    _validator = std::make_unique<OlapTableValidator>(_output_tuple_desc);
+    _block_convertor = 
std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
     _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, 
false));
 
     // add all counter
@@ -1296,12 +1296,12 @@ void VOlapTableSink::_generate_row_distribution_payload(
 
 Status VOlapTableSink::_single_partition_generate(RuntimeState* state, 
vectorized::Block* block,
                                                   ChannelDistributionPayload& 
channel_to_payload,
-                                                  size_t num_rows, int32_t 
filtered_rows) {
+                                                  size_t num_rows, bool 
has_filtered_rows) {
     const VOlapTablePartition* partition = nullptr;
     uint32_t tablet_index = 0;
     bool stop_processing = false;
     for (int32_t i = 0; i < num_rows; ++i) {
-        if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+        if (UNLIKELY(has_filtered_rows) && 
_block_convertor->filter_bitmap().Get(i)) {
             continue;
         }
         bool is_continue = false;
@@ -1331,7 +1331,7 @@ Status 
VOlapTableSink::_single_partition_generate(RuntimeState* state, vectorize
             auto& selector = channel_to_payload[j][channel.get()].first;
             auto& tablet_ids = channel_to_payload[j][channel.get()].second;
             for (int32_t i = 0; i < num_rows; ++i) {
-                if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+                if (UNLIKELY(has_filtered_rows) && 
_block_convertor->filter_bitmap().Get(i)) {
                     continue;
                 }
                 selector->push_back(i);
@@ -1362,29 +1362,13 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block,
     DorisMetrics::instance()->load_rows->increment(rows);
     DorisMetrics::instance()->load_bytes->increment(bytes);
 
-    vectorized::Block block(input_block->get_columns_with_type_and_name());
-    if (!_output_vexpr_ctxs.empty()) {
-        // Do vectorized expr here to speed up load
-        
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
-                _output_vexpr_ctxs, *input_block, &block));
-    }
+    std::shared_ptr<vectorized::Block> block;
+    bool has_filtered_rows = false;
+    RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
+            state, input_block, block, _output_vexpr_ctxs, has_filtered_rows));
 
-    auto num_rows = block.rows();
-    int filtered_rows = 0;
-    {
-        SCOPED_RAW_TIMER(&_validate_data_ns);
-        _filter_bitmap.Reset(block.rows());
-        bool stop_processing = false;
-        RETURN_IF_ERROR(_validator->validate_data(state, &block, 
&_filter_bitmap, &filtered_rows,
-                                                  &stop_processing));
-        _number_filtered_rows += filtered_rows;
-        if (stop_processing) {
-            // should be returned after updating "_number_filtered_rows", to 
make sure that load job can be cancelled
-            // because of "data unqualified"
-            return Status::EndOfFile("Encountered unqualified data, stop 
processing");
-        }
-        _validator->convert_to_dest_desc_block(&block);
-    }
+    // clear and release the references of columns
+    input_block->clear();
 
     SCOPED_RAW_TIMER(&_send_data_ns);
     // This is just for passing compilation.
@@ -1396,20 +1380,21 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block,
         _partition_to_tablet_map.clear();
     }
     _row_distribution_watch.start();
+    auto num_rows = block->rows();
     size_t partition_num = _vpartition->get_partitions().size();
     if (partition_num == 1 && findTabletMode == 
FindTabletMode::FIND_TABLET_EVERY_SINK) {
-        RETURN_IF_ERROR(_single_partition_generate(state, &block, 
channel_to_payload, num_rows,
-                                                   filtered_rows));
+        RETURN_IF_ERROR(_single_partition_generate(state, block.get(), 
channel_to_payload, num_rows,
+                                                   has_filtered_rows));
     } else {
         for (int i = 0; i < num_rows; ++i) {
-            if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+            if (UNLIKELY(has_filtered_rows) && 
_block_convertor->filter_bitmap().Get(i)) {
                 continue;
             }
             const VOlapTablePartition* partition = nullptr;
             bool is_continue = false;
             uint32_t tablet_index = 0;
-            RETURN_IF_ERROR(find_tablet(state, &block, i, &partition, 
tablet_index, stop_processing,
-                                        is_continue));
+            RETURN_IF_ERROR(find_tablet(state, block.get(), i, &partition, 
tablet_index,
+                                        stop_processing, is_continue));
             if (is_continue) {
                 continue;
             }
@@ -1429,20 +1414,18 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block,
             !_schema->is_dynamic_schema() && _partition_to_tablet_map.size() 
== 1;
     if (load_block_to_single_tablet) {
         SCOPED_RAW_TIMER(&_filter_ns);
-        // clear and release the references of columns
-        input_block->clear();
         // Filter block
-        if (filtered_rows > 0) {
-            auto filter = vectorized::ColumnUInt8::create(block.rows(), 0);
+        if (has_filtered_rows) {
+            auto filter = vectorized::ColumnUInt8::create(block->rows(), 0);
             vectorized::UInt8* filter_data =
                     
static_cast<vectorized::ColumnUInt8*>(filter.get())->get_data().data();
             vectorized::IColumn::Filter& filter_col =
                     
static_cast<vectorized::ColumnUInt8*>(filter.get())->get_data();
             for (size_t i = 0; i < filter_col.size(); ++i) {
-                filter_data[i] = !_filter_bitmap.Get(i);
+                filter_data[i] = !_block_convertor->filter_bitmap().Get(i);
             }
-            RETURN_IF_CATCH_EXCEPTION(
-                    vectorized::Block::filter_block_internal(&block, 
filter_col, block.columns()));
+            RETURN_IF_CATCH_EXCEPTION(vectorized::Block::filter_block_internal(
+                    block.get(), filter_col, block->columns()));
         }
     }
     // Add block to node channel
@@ -1450,7 +1433,7 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block,
         for (const auto& entry : channel_to_payload[i]) {
             // if this node channel is already failed, this add_row will be 
skipped
             auto st = entry.first->add_block(
-                    &block, &entry.second,
+                    block.get(), &entry.second,
                     // if it is load single tablet, then append this whole 
block
                     load_block_to_single_tablet);
             if (!st.ok()) {
@@ -1623,14 +1606,15 @@ Status VOlapTableSink::close(RuntimeState* state, 
Status exec_status) {
 
             COUNTER_SET(_input_rows_counter, _number_input_rows);
             COUNTER_SET(_output_rows_counter, _number_output_rows);
-            COUNTER_SET(_filtered_rows_counter, _number_filtered_rows);
+            COUNTER_SET(_filtered_rows_counter,
+                        _block_convertor->num_filtered_rows() + 
_number_filtered_rows);
             COUNTER_SET(_send_data_timer, _send_data_ns);
             COUNTER_SET(_row_distribution_timer, 
(int64_t)_row_distribution_watch.elapsed_time());
             COUNTER_SET(_filter_timer, _filter_ns);
             COUNTER_SET(_append_node_channel_timer, 
channel_stat.append_node_channel_ns);
             COUNTER_SET(_where_clause_timer, channel_stat.where_clause_ns);
             COUNTER_SET(_wait_mem_limit_timer, 
channel_stat.mem_exceeded_block_ns);
-            COUNTER_SET(_validate_data_timer, _validate_data_ns);
+            COUNTER_SET(_validate_data_timer, 
_block_convertor->validate_data_ns());
             COUNTER_SET(_serialize_batch_timer, serialize_batch_ns);
             COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns);
             COUNTER_SET(_total_add_batch_exec_timer, 
total_add_batch_exec_time_ns);
@@ -1643,7 +1627,8 @@ Status VOlapTableSink::close(RuntimeState* state, Status 
exec_status) {
             int64_t num_rows_load_total = _number_input_rows + 
state->num_rows_load_filtered() +
                                           state->num_rows_load_unselected();
             state->set_num_rows_load_total(num_rows_load_total);
-            state->update_num_rows_load_filtered(_number_filtered_rows);
+            
state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
+                                                 _number_filtered_rows);
             
state->update_num_rows_load_unselected(_number_immutable_partition_filtered_rows);
 
             // print log of add batch time of all node, for tracing load 
performance easily
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 7f9dc38f5f..7551b35571 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -57,7 +57,6 @@
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/thread_context.h"
 #include "runtime/types.h"
-#include "util/bitmap.h"
 #include "util/countdown_latch.h"
 #include "util/runtime_profile.h"
 #include "util/spinlock.h"
@@ -82,7 +81,7 @@ class RefCountClosure;
 
 namespace stream_load {
 
-class OlapTableValidator;
+class OlapTableBlockConvertor;
 class OpenPartitionClosure;
 
 // The counter of add_batch rpc of a single node
@@ -516,7 +515,7 @@ private:
                                             uint32_t tablet_index, int 
row_idx, size_t row_cnt);
     Status _single_partition_generate(RuntimeState* state, vectorized::Block* 
block,
                                       ChannelDistributionPayload& 
channel_to_payload,
-                                      size_t num_rows, int32_t filtered_rows);
+                                      size_t num_rows, bool has_filtered_rows);
 
     Status find_tablet(RuntimeState* state, vectorized::Block* block, int 
row_index,
                        const VOlapTablePartition** partition, uint32_t& 
tablet_index,
@@ -565,17 +564,14 @@ private:
     // only used for partition with random distribution
     std::map<int64_t, int64_t> _partition_to_tablet_map;
 
-    Bitmap _filter_bitmap;
-
     // index_channel
     std::vector<std::shared_ptr<IndexChannel>> _channels;
 
     bthread_t _sender_thread = 0;
     std::unique_ptr<ThreadPoolToken> _send_batch_thread_pool_token;
 
-    std::unique_ptr<OlapTableValidator> _validator;
+    std::unique_ptr<OlapTableBlockConvertor> _block_convertor;
     // Stats for this
-    int64_t _validate_data_ns = 0;
     int64_t _send_data_ns = 0;
     int64_t _number_input_rows = 0;
     int64_t _number_output_rows = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to