This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 688a1bc059 [refactor](load) expand OlapTableValidator to
VOlapTableBlockConvertor (#21476)
688a1bc059 is described below
commit 688a1bc059ad99b51392658c9eb8e32345ade2b8
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Jul 6 10:11:53 2023 +0800
[refactor](load) expand OlapTableValidator to VOlapTableBlockConvertor
(#21476)
---
...t_validator.cpp => vtablet_block_convertor.cpp} | 85 +++++++++++++++-------
...ablet_validator.h => vtablet_block_convertor.h} | 45 ++++++++----
be/src/vec/sink/vtablet_sink.cpp | 73 ++++++++-----------
be/src/vec/sink/vtablet_sink.h | 10 +--
4 files changed, 121 insertions(+), 92 deletions(-)
diff --git a/be/src/vec/sink/vtablet_validator.cpp
b/be/src/vec/sink/vtablet_block_convertor.cpp
similarity index 84%
rename from be/src/vec/sink/vtablet_validator.cpp
rename to be/src/vec/sink/vtablet_block_convertor.cpp
index 39cc92f0f4..42e1b48d15 100644
--- a/be/src/vec/sink/vtablet_validator.cpp
+++ b/be/src/vec/sink/vtablet_block_convertor.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "vec/sink/vtablet_validator.h"
+#include "vec/sink/vtablet_block_convertor.h"
#include <fmt/format.h>
#include <google/protobuf/stubs/common.h>
@@ -54,8 +54,40 @@
namespace doris {
namespace stream_load {
+Status OlapTableBlockConvertor::validate_and_convert_block(
+ RuntimeState* state, vectorized::Block* input_block,
+ std::shared_ptr<vectorized::Block>& block,
vectorized::VExprContextSPtrs output_vexpr_ctxs,
+ bool& has_filtered_rows) {
+ DCHECK(input_block->rows() > 0);
+
+ block =
vectorized::Block::create_shared(input_block->get_columns_with_type_and_name());
+ if (!output_vexpr_ctxs.empty()) {
+ // Do vectorized expr here to speed up load
+
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
+ output_vexpr_ctxs, *input_block, block.get()));
+ }
+
+ int64_t filtered_rows = 0;
+ {
+ SCOPED_RAW_TIMER(&_validate_data_ns);
+ _filter_bitmap.Reset(block->rows());
+ bool stop_processing = false;
+ RETURN_IF_ERROR(_validate_data(state, block.get(), filtered_rows,
&stop_processing));
+ _num_filtered_rows += filtered_rows;
+ has_filtered_rows = filtered_rows > 0;
+ if (stop_processing) {
+ // should be returned after updating "_number_filtered_rows", to
make sure that load job can be cancelled
+ // because of "data unqualified"
+ return Status::EndOfFile("Encountered unqualified data, stop
processing");
+ }
+ _convert_to_dest_desc_block(block.get());
+ }
+
+ return Status::OK();
+}
+
template <bool is_min>
-DecimalV2Value OlapTableValidator::_get_decimalv2_min_or_max(const
TypeDescriptor& type) {
+DecimalV2Value OlapTableBlockConvertor::_get_decimalv2_min_or_max(const
TypeDescriptor& type) {
std::map<std::pair<int, int>, DecimalV2Value>* pmap;
if constexpr (is_min) {
pmap = &_min_decimalv2_val;
@@ -81,7 +113,7 @@ DecimalV2Value
OlapTableValidator::_get_decimalv2_min_or_max(const TypeDescripto
}
template <typename DecimalType, bool IsMin>
-DecimalType OlapTableValidator::_get_decimalv3_min_or_max(const
TypeDescriptor& type) {
+DecimalType OlapTableBlockConvertor::_get_decimalv3_min_or_max(const
TypeDescriptor& type) {
std::map<int, typename DecimalType::NativeType>* pmap;
if constexpr (std::is_same_v<DecimalType, vectorized::Decimal32>) {
pmap = IsMin ? &_min_decimal32_val : &_max_decimal32_val;
@@ -107,15 +139,15 @@ DecimalType
OlapTableValidator::_get_decimalv3_min_or_max(const TypeDescriptor&
return value;
}
-Status OlapTableValidator::_validate_column(RuntimeState* state, const
TypeDescriptor& type,
- bool is_nullable,
vectorized::ColumnPtr column,
- size_t slot_index, Bitmap*
filter_bitmap,
- bool* stop_processing,
fmt::memory_buffer& error_prefix,
- vectorized::IColumn::Permutation*
rows) {
+Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const
TypeDescriptor& type,
+ bool is_nullable,
vectorized::ColumnPtr column,
+ size_t slot_index, bool*
stop_processing,
+ fmt::memory_buffer&
error_prefix,
+
vectorized::IColumn::Permutation* rows) {
DCHECK((rows == nullptr) || (rows->size() == column->size()));
fmt::memory_buffer error_msg;
auto set_invalid_and_append_error_msg = [&](int row) {
- filter_bitmap->Set(row, true);
+ _filter_bitmap.Set(row, true);
auto ret = state->append_error_msg_to_file([]() -> std::string {
return ""; },
[&error_prefix,
&error_msg]() -> std::string {
return
fmt::to_string(error_prefix) +
@@ -129,8 +161,8 @@ Status OlapTableValidator::_validate_column(RuntimeState*
state, const TypeDescr
auto column_ptr =
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column);
auto& real_column_ptr = column_ptr == nullptr ? column :
(column_ptr->get_nested_column_ptr());
auto null_map = column_ptr == nullptr ? nullptr :
column_ptr->get_null_map_data().data();
- auto need_to_validate = [&null_map, &filter_bitmap](size_t j, size_t row) {
- return !filter_bitmap->Get(row) && (null_map == nullptr || null_map[j]
== 0);
+ auto need_to_validate = [&null_map, this](size_t j, size_t row) {
+ return !_filter_bitmap.Get(row) && (null_map == nullptr || null_map[j]
== 0);
};
ssize_t last_invalid_row = -1;
@@ -182,7 +214,7 @@ Status OlapTableValidator::_validate_column(RuntimeState*
state, const TypeDescr
const auto column_string =
assert_cast<const
vectorized::ColumnString*>(real_column_ptr.get());
for (size_t j = 0; j < column->size(); ++j) {
- if (!filter_bitmap->Get(j)) {
+ if (!_filter_bitmap.Get(j)) {
if (is_nullable && column_ptr && column_ptr->is_null_at(j)) {
continue;
}
@@ -294,8 +326,8 @@ Status OlapTableValidator::_validate_column(RuntimeState*
state, const TypeDescr
}
fmt::format_to(error_prefix, "ARRAY type failed: ");
RETURN_IF_ERROR(_validate_column(state, nested_type,
type.contains_nulls[0],
- column_array->get_data_ptr(),
slot_index, filter_bitmap,
- stop_processing, error_prefix,
&permutation));
+ column_array->get_data_ptr(),
slot_index, stop_processing,
+ error_prefix, &permutation));
break;
}
case TYPE_MAP: {
@@ -312,11 +344,11 @@ Status OlapTableValidator::_validate_column(RuntimeState*
state, const TypeDescr
}
fmt::format_to(error_prefix, "MAP type failed: ");
RETURN_IF_ERROR(_validate_column(state, key_type,
type.contains_nulls[0],
- column_map->get_keys_ptr(),
slot_index, filter_bitmap,
- stop_processing, error_prefix,
&permutation));
+ column_map->get_keys_ptr(),
slot_index, stop_processing,
+ error_prefix, &permutation));
RETURN_IF_ERROR(_validate_column(state, val_type,
type.contains_nulls[1],
- column_map->get_values_ptr(),
slot_index, filter_bitmap,
- stop_processing, error_prefix,
&permutation));
+ column_map->get_values_ptr(),
slot_index, stop_processing,
+ error_prefix, &permutation));
break;
}
case TYPE_STRUCT: {
@@ -327,7 +359,7 @@ Status OlapTableValidator::_validate_column(RuntimeState*
state, const TypeDescr
for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) {
RETURN_IF_ERROR(_validate_column(state, type.children[sc],
type.contains_nulls[sc],
column_struct->get_column_ptr(sc), slot_index,
- filter_bitmap, stop_processing,
error_prefix));
+ stop_processing, error_prefix));
}
break;
}
@@ -345,7 +377,7 @@ Status OlapTableValidator::_validate_column(RuntimeState*
state, const TypeDescr
if (row == last_invalid_row) {
continue;
}
- if (null_map[j] && !filter_bitmap->Get(row)) {
+ if (null_map[j] && !_filter_bitmap.Get(row)) {
fmt::format_to(error_msg, "null value for not null column,
type={}",
type.debug_string());
last_invalid_row = row;
@@ -357,9 +389,8 @@ Status OlapTableValidator::_validate_column(RuntimeState*
state, const TypeDescr
return Status::OK();
}
-Status OlapTableValidator::validate_data(RuntimeState* state,
vectorized::Block* block,
- Bitmap* filter_bitmap, int*
filtered_rows,
- bool* stop_processing) {
+Status OlapTableBlockConvertor::_validate_data(RuntimeState* state,
vectorized::Block* block,
+ int64_t& filtered_rows, bool*
stop_processing) {
for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
SlotDescriptor* desc = _output_tuple_desc->slots()[i];
block->get_by_position(i).column =
@@ -369,17 +400,17 @@ Status OlapTableValidator::validate_data(RuntimeState*
state, vectorized::Block*
fmt::memory_buffer error_prefix;
fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name());
RETURN_IF_ERROR(_validate_column(state, desc->type(),
desc->is_nullable(), column, i,
- filter_bitmap, stop_processing,
error_prefix));
+ stop_processing, error_prefix));
}
- *filtered_rows = 0;
+ filtered_rows = 0;
for (int i = 0; i < block->rows(); ++i) {
- *filtered_rows += filter_bitmap->Get(i);
+ filtered_rows += _filter_bitmap.Get(i);
}
return Status::OK();
}
-void OlapTableValidator::convert_to_dest_desc_block(doris::vectorized::Block*
block) {
+void
OlapTableBlockConvertor::_convert_to_dest_desc_block(doris::vectorized::Block*
block) {
for (int i = 0; i < _output_tuple_desc->slots().size() && i <
block->columns(); ++i) {
SlotDescriptor* desc = _output_tuple_desc->slots()[i];
if (desc->is_nullable() !=
block->get_by_position(i).type->is_nullable()) {
diff --git a/be/src/vec/sink/vtablet_validator.h
b/be/src/vec/sink/vtablet_block_convertor.h
similarity index 68%
rename from be/src/vec/sink/vtablet_validator.h
rename to be/src/vec/sink/vtablet_block_convertor.h
index d610f7680d..751c969396 100644
--- a/be/src/vec/sink/vtablet_validator.h
+++ b/be/src/vec/sink/vtablet_block_convertor.h
@@ -29,25 +29,26 @@
#include "util/bitmap.h"
#include "vec/columns/column.h"
#include "vec/core/block.h"
+#include "vec/exprs/vexpr_fwd.h"
namespace doris {
namespace stream_load {
-class OlapTableValidator {
+class OlapTableBlockConvertor {
public:
- OlapTableValidator(TupleDescriptor* output_tuple_desc)
- : _output_tuple_desc(output_tuple_desc) {}
+ OlapTableBlockConvertor(TupleDescriptor* output_tuple_desc)
+ : _output_tuple_desc(output_tuple_desc), _filter_bitmap(1024) {}
- // make input data valid for OLAP table
- // return number of invalid/filtered rows.
- // invalid row number is set in Bitmap
- // set stop_processing if we want to stop the whole process now.
- Status validate_data(RuntimeState* state, vectorized::Block* block,
Bitmap* filter_bitmap,
- int* filtered_rows, bool* stop_processing);
+ Status validate_and_convert_block(RuntimeState* state, vectorized::Block*
input_block,
+ std::shared_ptr<vectorized::Block>&
block,
+ vectorized::VExprContextSPtrs
output_vexpr_ctxs,
+ bool& has_filtered_rows);
- // some output column of output expr may have different nullable property
with dest slot desc
- // so here need to do the convert operation
- void convert_to_dest_desc_block(vectorized::Block* block);
+ const Bitmap& filter_bitmap() { return _filter_bitmap; }
+
+ int64_t validate_data_ns() const { return _validate_data_ns; }
+
+ int64_t num_filtered_rows() const { return _num_filtered_rows; }
private:
template <bool is_min>
@@ -57,10 +58,21 @@ private:
DecimalType _get_decimalv3_min_or_max(const TypeDescriptor& type);
Status _validate_column(RuntimeState* state, const TypeDescriptor& type,
bool is_nullable,
- vectorized::ColumnPtr column, size_t slot_index,
Bitmap* filter_bitmap,
- bool* stop_processing, fmt::memory_buffer&
error_prefix,
+ vectorized::ColumnPtr column, size_t slot_index,
bool* stop_processing,
+ fmt::memory_buffer& error_prefix,
vectorized::IColumn::Permutation* rows = nullptr);
+ // make input data valid for OLAP table
+ // return number of invalid/filtered rows.
+ // invalid row number is set in Bitmap
+ // set stop_processing if we want to stop the whole process now.
+ Status _validate_data(RuntimeState* state, vectorized::Block* block,
int64_t& filtered_rows,
+ bool* stop_processing);
+
+ // some output column of output expr may have different nullable property
with dest slot desc
+ // so here need to do the convert operation
+ void _convert_to_dest_desc_block(vectorized::Block* block);
+
TupleDescriptor* _output_tuple_desc = nullptr;
std::map<std::pair<int, int>, DecimalV2Value> _max_decimalv2_val;
@@ -72,6 +84,11 @@ private:
std::map<int, int64_t> _min_decimal64_val;
std::map<int, int128_t> _max_decimal128_val;
std::map<int, int128_t> _min_decimal128_val;
+
+ Bitmap _filter_bitmap;
+
+ int64_t _validate_data_ns = 0;
+ int64_t _num_filtered_rows = 0;
};
} // namespace stream_load
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index e1870e48e8..568c67ce1f 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -89,7 +89,7 @@
#include "vec/data_types/data_type_nullable.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
-#include "vec/sink/vtablet_validator.h"
+#include "vec/sink/vtablet_block_convertor.h"
namespace doris {
class TExpr;
@@ -983,7 +983,7 @@ void VNodeChannel::mark_close() {
VOlapTableSink::VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
const std::vector<TExpr>& texprs, Status*
status)
- : _pool(pool), _input_row_desc(row_desc), _filter_bitmap(1024) {
+ : _pool(pool), _input_row_desc(row_desc) {
// From the thrift expressions create the real exprs.
*status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs);
_name = "VOlapTableSink";
@@ -1066,7 +1066,7 @@ Status VOlapTableSink::prepare(RuntimeState* state) {
return Status::InternalError("unknown destination tuple descriptor");
}
- _validator = std::make_unique<OlapTableValidator>(_output_tuple_desc);
+ _block_convertor =
std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
_output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc,
false));
// add all counter
@@ -1296,12 +1296,12 @@ void VOlapTableSink::_generate_row_distribution_payload(
Status VOlapTableSink::_single_partition_generate(RuntimeState* state,
vectorized::Block* block,
ChannelDistributionPayload&
channel_to_payload,
- size_t num_rows, int32_t
filtered_rows) {
+ size_t num_rows, bool
has_filtered_rows) {
const VOlapTablePartition* partition = nullptr;
uint32_t tablet_index = 0;
bool stop_processing = false;
for (int32_t i = 0; i < num_rows; ++i) {
- if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+ if (UNLIKELY(has_filtered_rows) &&
_block_convertor->filter_bitmap().Get(i)) {
continue;
}
bool is_continue = false;
@@ -1331,7 +1331,7 @@ Status
VOlapTableSink::_single_partition_generate(RuntimeState* state, vectorize
auto& selector = channel_to_payload[j][channel.get()].first;
auto& tablet_ids = channel_to_payload[j][channel.get()].second;
for (int32_t i = 0; i < num_rows; ++i) {
- if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+ if (UNLIKELY(has_filtered_rows) &&
_block_convertor->filter_bitmap().Get(i)) {
continue;
}
selector->push_back(i);
@@ -1362,29 +1362,13 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block,
DorisMetrics::instance()->load_rows->increment(rows);
DorisMetrics::instance()->load_bytes->increment(bytes);
- vectorized::Block block(input_block->get_columns_with_type_and_name());
- if (!_output_vexpr_ctxs.empty()) {
- // Do vectorized expr here to speed up load
-
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
- _output_vexpr_ctxs, *input_block, &block));
- }
+ std::shared_ptr<vectorized::Block> block;
+ bool has_filtered_rows = false;
+ RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
+ state, input_block, block, _output_vexpr_ctxs, has_filtered_rows));
- auto num_rows = block.rows();
- int filtered_rows = 0;
- {
- SCOPED_RAW_TIMER(&_validate_data_ns);
- _filter_bitmap.Reset(block.rows());
- bool stop_processing = false;
- RETURN_IF_ERROR(_validator->validate_data(state, &block,
&_filter_bitmap, &filtered_rows,
- &stop_processing));
- _number_filtered_rows += filtered_rows;
- if (stop_processing) {
- // should be returned after updating "_number_filtered_rows", to
make sure that load job can be cancelled
- // because of "data unqualified"
- return Status::EndOfFile("Encountered unqualified data, stop
processing");
- }
- _validator->convert_to_dest_desc_block(&block);
- }
+ // clear and release the references of columns
+ input_block->clear();
SCOPED_RAW_TIMER(&_send_data_ns);
// This is just for passing compilation.
@@ -1396,20 +1380,21 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block,
_partition_to_tablet_map.clear();
}
_row_distribution_watch.start();
+ auto num_rows = block->rows();
size_t partition_num = _vpartition->get_partitions().size();
if (partition_num == 1 && findTabletMode ==
FindTabletMode::FIND_TABLET_EVERY_SINK) {
- RETURN_IF_ERROR(_single_partition_generate(state, &block,
channel_to_payload, num_rows,
- filtered_rows));
+ RETURN_IF_ERROR(_single_partition_generate(state, block.get(),
channel_to_payload, num_rows,
+ has_filtered_rows));
} else {
for (int i = 0; i < num_rows; ++i) {
- if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+ if (UNLIKELY(has_filtered_rows) &&
_block_convertor->filter_bitmap().Get(i)) {
continue;
}
const VOlapTablePartition* partition = nullptr;
bool is_continue = false;
uint32_t tablet_index = 0;
- RETURN_IF_ERROR(find_tablet(state, &block, i, &partition,
tablet_index, stop_processing,
- is_continue));
+ RETURN_IF_ERROR(find_tablet(state, block.get(), i, &partition,
tablet_index,
+ stop_processing, is_continue));
if (is_continue) {
continue;
}
@@ -1429,20 +1414,18 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block,
!_schema->is_dynamic_schema() && _partition_to_tablet_map.size()
== 1;
if (load_block_to_single_tablet) {
SCOPED_RAW_TIMER(&_filter_ns);
- // clear and release the references of columns
- input_block->clear();
// Filter block
- if (filtered_rows > 0) {
- auto filter = vectorized::ColumnUInt8::create(block.rows(), 0);
+ if (has_filtered_rows) {
+ auto filter = vectorized::ColumnUInt8::create(block->rows(), 0);
vectorized::UInt8* filter_data =
static_cast<vectorized::ColumnUInt8*>(filter.get())->get_data().data();
vectorized::IColumn::Filter& filter_col =
static_cast<vectorized::ColumnUInt8*>(filter.get())->get_data();
for (size_t i = 0; i < filter_col.size(); ++i) {
- filter_data[i] = !_filter_bitmap.Get(i);
+ filter_data[i] = !_block_convertor->filter_bitmap().Get(i);
}
- RETURN_IF_CATCH_EXCEPTION(
- vectorized::Block::filter_block_internal(&block,
filter_col, block.columns()));
+ RETURN_IF_CATCH_EXCEPTION(vectorized::Block::filter_block_internal(
+ block.get(), filter_col, block->columns()));
}
}
// Add block to node channel
@@ -1450,7 +1433,7 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block,
for (const auto& entry : channel_to_payload[i]) {
// if this node channel is already failed, this add_row will be
skipped
auto st = entry.first->add_block(
- &block, &entry.second,
+ block.get(), &entry.second,
// if it is load single tablet, then append this whole
block
load_block_to_single_tablet);
if (!st.ok()) {
@@ -1623,14 +1606,15 @@ Status VOlapTableSink::close(RuntimeState* state,
Status exec_status) {
COUNTER_SET(_input_rows_counter, _number_input_rows);
COUNTER_SET(_output_rows_counter, _number_output_rows);
- COUNTER_SET(_filtered_rows_counter, _number_filtered_rows);
+ COUNTER_SET(_filtered_rows_counter,
+ _block_convertor->num_filtered_rows() +
_number_filtered_rows);
COUNTER_SET(_send_data_timer, _send_data_ns);
COUNTER_SET(_row_distribution_timer,
(int64_t)_row_distribution_watch.elapsed_time());
COUNTER_SET(_filter_timer, _filter_ns);
COUNTER_SET(_append_node_channel_timer,
channel_stat.append_node_channel_ns);
COUNTER_SET(_where_clause_timer, channel_stat.where_clause_ns);
COUNTER_SET(_wait_mem_limit_timer,
channel_stat.mem_exceeded_block_ns);
- COUNTER_SET(_validate_data_timer, _validate_data_ns);
+ COUNTER_SET(_validate_data_timer,
_block_convertor->validate_data_ns());
COUNTER_SET(_serialize_batch_timer, serialize_batch_ns);
COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns);
COUNTER_SET(_total_add_batch_exec_timer,
total_add_batch_exec_time_ns);
@@ -1643,7 +1627,8 @@ Status VOlapTableSink::close(RuntimeState* state, Status
exec_status) {
int64_t num_rows_load_total = _number_input_rows +
state->num_rows_load_filtered() +
state->num_rows_load_unselected();
state->set_num_rows_load_total(num_rows_load_total);
- state->update_num_rows_load_filtered(_number_filtered_rows);
+
state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
+ _number_filtered_rows);
state->update_num_rows_load_unselected(_number_immutable_partition_filtered_rows);
// print log of add batch time of all node, for tracing load
performance easily
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 7f9dc38f5f..7551b35571 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -57,7 +57,6 @@
#include "runtime/memory/mem_tracker.h"
#include "runtime/thread_context.h"
#include "runtime/types.h"
-#include "util/bitmap.h"
#include "util/countdown_latch.h"
#include "util/runtime_profile.h"
#include "util/spinlock.h"
@@ -82,7 +81,7 @@ class RefCountClosure;
namespace stream_load {
-class OlapTableValidator;
+class OlapTableBlockConvertor;
class OpenPartitionClosure;
// The counter of add_batch rpc of a single node
@@ -516,7 +515,7 @@ private:
uint32_t tablet_index, int
row_idx, size_t row_cnt);
Status _single_partition_generate(RuntimeState* state, vectorized::Block*
block,
ChannelDistributionPayload&
channel_to_payload,
- size_t num_rows, int32_t filtered_rows);
+ size_t num_rows, bool has_filtered_rows);
Status find_tablet(RuntimeState* state, vectorized::Block* block, int
row_index,
const VOlapTablePartition** partition, uint32_t&
tablet_index,
@@ -565,17 +564,14 @@ private:
// only used for partition with random distribution
std::map<int64_t, int64_t> _partition_to_tablet_map;
- Bitmap _filter_bitmap;
-
// index_channel
std::vector<std::shared_ptr<IndexChannel>> _channels;
bthread_t _sender_thread = 0;
std::unique_ptr<ThreadPoolToken> _send_batch_thread_pool_token;
- std::unique_ptr<OlapTableValidator> _validator;
+ std::unique_ptr<OlapTableBlockConvertor> _block_convertor;
// Stats for this
- int64_t _validate_data_ns = 0;
int64_t _send_data_ns = 0;
int64_t _number_input_rows = 0;
int64_t _number_output_rows = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]