This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b1c16b96d6 [refactor](load) move validator out of VOlapTableSink
(#21460)
b1c16b96d6 is described below
commit b1c16b96d63c96f1f6db1a21fbc4321d99897dec
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Jul 4 10:16:56 2023 +0800
[refactor](load) move validator out of VOlapTableSink (#21460)
---
be/src/vec/sink/vtablet_sink.cpp | 354 +----------------------------
be/src/vec/sink/vtablet_sink.h | 34 +--
be/src/vec/sink/vtablet_validator.cpp | 404 ++++++++++++++++++++++++++++++++++
be/src/vec/sink/vtablet_validator.h | 78 +++++++
4 files changed, 489 insertions(+), 381 deletions(-)
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 39f3475563..f95c2b13c5 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -89,6 +89,7 @@
#include "vec/data_types/data_type_nullable.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
+#include "vec/sink/vtablet_validator.h"
namespace doris {
class TExpr;
@@ -1051,6 +1052,7 @@ Status VOlapTableSink::prepare(RuntimeState* state) {
return Status::InternalError("unknown destination tuple descriptor");
}
+ _validator = std::make_unique<OlapTableValidator>(_output_tuple_desc);
_output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc,
false));
// add all counter
@@ -1359,15 +1361,15 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block,
SCOPED_RAW_TIMER(&_validate_data_ns);
_filter_bitmap.Reset(block.rows());
bool stop_processing = false;
- RETURN_IF_ERROR(
- _validate_data(state, &block, &_filter_bitmap, &filtered_rows,
&stop_processing));
+ RETURN_IF_ERROR(_validator->validate_data(state, &block,
&_filter_bitmap, &filtered_rows,
+ &stop_processing));
_number_filtered_rows += filtered_rows;
if (stop_processing) {
// should be returned after updating "_number_filtered_rows", to
make sure that load job can be cancelled
// because of "data unqualified"
return Status::EndOfFile("Encountered unqualified data, stop
processing");
}
- _convert_to_dest_desc_block(&block);
+ _validator->convert_to_dest_desc_block(&block);
}
SCOPED_RAW_TIMER(&_send_data_ns);
@@ -1595,351 +1597,5 @@ Status VOlapTableSink::close(RuntimeState* state,
Status exec_status) {
return status;
}
-template <bool is_min>
-DecimalV2Value VOlapTableSink::_get_decimalv2_min_or_max(const TypeDescriptor&
type) {
- std::map<std::pair<int, int>, DecimalV2Value>* pmap = nullptr;
- if constexpr (is_min) {
- pmap = &_min_decimalv2_val;
- } else {
- pmap = &_max_decimalv2_val;
- }
-
- // found
- auto iter = pmap->find({type.precision, type.scale});
- if (iter != pmap->end()) {
- return iter->second;
- }
-
- // save min or max DecimalV2Value for next time
- DecimalV2Value value;
- if constexpr (is_min) {
- value.to_min_decimal(type.precision, type.scale);
- } else {
- value.to_max_decimal(type.precision, type.scale);
- }
- pmap->emplace(std::pair<int, int> {type.precision, type.scale}, value);
- return value;
-}
-
-template <typename DecimalType, bool IsMin>
-DecimalType VOlapTableSink::_get_decimalv3_min_or_max(const TypeDescriptor&
type) {
- std::map<int, typename DecimalType::NativeType>* pmap = nullptr;
- if constexpr (std::is_same_v<DecimalType, vectorized::Decimal32>) {
- pmap = IsMin ? &_min_decimal32_val : &_max_decimal32_val;
- } else if constexpr (std::is_same_v<DecimalType, vectorized::Decimal64>) {
- pmap = IsMin ? &_min_decimal64_val : &_max_decimal64_val;
- } else {
- pmap = IsMin ? &_min_decimal128_val : &_max_decimal128_val;
- }
-
- // found
- auto iter = pmap->find(type.precision);
- if (iter != pmap->end()) {
- return iter->second;
- }
-
- typename DecimalType::NativeType value;
- if constexpr (IsMin) {
- value = vectorized::min_decimal_value<DecimalType>(type.precision);
- } else {
- value = vectorized::max_decimal_value<DecimalType>(type.precision);
- }
- pmap->emplace(type.precision, value);
- return value;
-}
-
-Status VOlapTableSink::_validate_column(RuntimeState* state, const
TypeDescriptor& type,
- bool is_nullable,
vectorized::ColumnPtr column,
- size_t slot_index, Bitmap*
filter_bitmap,
- bool* stop_processing,
fmt::memory_buffer& error_prefix,
- vectorized::IColumn::Permutation*
rows) {
- DCHECK((rows == nullptr) || (rows->size() == column->size()));
- fmt::memory_buffer error_msg;
- auto set_invalid_and_append_error_msg = [&](int row) {
- filter_bitmap->Set(row, true);
- auto ret = state->append_error_msg_to_file([]() -> std::string {
return ""; },
- [&error_prefix,
&error_msg]() -> std::string {
- return
fmt::to_string(error_prefix) +
-
fmt::to_string(error_msg);
- },
- stop_processing);
- error_msg.clear();
- return ret;
- };
-
- auto column_ptr =
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column);
- auto& real_column_ptr = column_ptr == nullptr ? column :
(column_ptr->get_nested_column_ptr());
- auto null_map = column_ptr == nullptr ? nullptr :
column_ptr->get_null_map_data().data();
- auto need_to_validate = [&null_map, &filter_bitmap](size_t j, size_t row) {
- return !filter_bitmap->Get(row) && (null_map == nullptr || null_map[j]
== 0);
- };
-
- ssize_t last_invalid_row = -1;
- switch (type.type) {
- case TYPE_CHAR:
- case TYPE_VARCHAR:
- case TYPE_STRING: {
- const auto column_string =
- assert_cast<const
vectorized::ColumnString*>(real_column_ptr.get());
-
- size_t limit = config::string_type_length_soft_limit_bytes;
- // when type.len is negative, std::min will return overflow value, so
we need to check it
- if (type.len > 0) {
- limit = std::min(config::string_type_length_soft_limit_bytes,
type.len);
- }
- for (size_t j = 0; j < column->size(); ++j) {
- auto row = rows ? (*rows)[j] : j;
- if (row == last_invalid_row) {
- continue;
- }
- if (need_to_validate(j, row)) {
- auto str_val = column_string->get_data_at(j);
- bool invalid = str_val.size > limit;
- if (invalid) {
- last_invalid_row = row;
- if (str_val.size > type.len) {
- fmt::format_to(error_msg, "{}",
- "the length of input is too long than
schema. ");
- fmt::format_to(error_msg, "first 32 bytes of input
str: [{}] ",
- str_val.to_prefix(32));
- fmt::format_to(error_msg, "schema length: {}; ",
type.len);
- fmt::format_to(error_msg, "actual length: {}; ",
str_val.size);
- } else if (str_val.size > limit) {
- fmt::format_to(error_msg, "{}",
- "the length of input string is too long
than vec schema. ");
- fmt::format_to(error_msg, "first 32 bytes of input
str: [{}] ",
- str_val.to_prefix(32));
- fmt::format_to(error_msg, "schema length: {}; ",
type.len);
- fmt::format_to(error_msg, "limit length: {}; ", limit);
- fmt::format_to(error_msg, "actual length: {}; ",
str_val.size);
- }
- RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
- }
- }
- }
- break;
- }
- case TYPE_JSONB: {
- const auto column_string =
- assert_cast<const
vectorized::ColumnString*>(real_column_ptr.get());
- for (size_t j = 0; j < column->size(); ++j) {
- if (!filter_bitmap->Get(j)) {
- if (is_nullable && column_ptr && column_ptr->is_null_at(j)) {
- continue;
- }
- auto str_val = column_string->get_data_at(j);
- bool invalid = str_val.size == 0;
- if (invalid) {
- error_msg.clear();
- fmt::format_to(error_msg, "{}", "jsonb with size 0 is
invalid");
- RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
- }
- }
- }
- break;
- }
- case TYPE_DECIMALV2: {
- auto column_decimal =
const_cast<vectorized::ColumnDecimal<vectorized::Decimal128>*>(
- assert_cast<const
vectorized::ColumnDecimal<vectorized::Decimal128>*>(
- real_column_ptr.get()));
- const auto& max_decimalv2 = _get_decimalv2_min_or_max<false>(type);
- const auto& min_decimalv2 = _get_decimalv2_min_or_max<true>(type);
- for (size_t j = 0; j < column->size(); ++j) {
- auto row = rows ? (*rows)[j] : j;
- if (row == last_invalid_row) {
- continue;
- }
- if (need_to_validate(j, row)) {
- auto dec_val = binary_cast<vectorized::Int128, DecimalV2Value>(
- column_decimal->get_data()[j]);
- bool invalid = false;
-
- if (dec_val.greater_than_scale(type.scale)) {
- auto code = dec_val.round(&dec_val, type.scale, HALF_UP);
- column_decimal->get_data()[j] = dec_val.value();
-
- if (code != E_DEC_OK) {
- fmt::format_to(error_msg, "round one decimal
failed.value={}; ",
- dec_val.to_string());
- invalid = true;
- }
- }
- if (dec_val > max_decimalv2 || dec_val < min_decimalv2) {
- fmt::format_to(error_msg, "{}", "decimal value is not
valid for definition");
- fmt::format_to(error_msg, ", value={}",
dec_val.to_string());
- fmt::format_to(error_msg, ", precision={}, scale={}",
type.precision,
- type.scale);
- fmt::format_to(error_msg, ", min={}, max={}; ",
min_decimalv2.to_string(),
- max_decimalv2.to_string());
- invalid = true;
- }
-
- if (invalid) {
- last_invalid_row = row;
- RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
- }
- }
- }
- break;
- }
- case TYPE_DECIMAL32: {
-#define CHECK_VALIDATION_FOR_DECIMALV3(ColumnDecimalType, DecimalType)
\
- auto column_decimal =
const_cast<vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>( \
- assert_cast<const
vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>( \
- real_column_ptr.get()));
\
- const auto& max_decimal =
_get_decimalv3_min_or_max<vectorized::DecimalType, false>(type); \
- const auto& min_decimal =
_get_decimalv3_min_or_max<vectorized::DecimalType, true>(type); \
- for (size_t j = 0; j < column->size(); ++j) {
\
- auto row = rows ? (*rows)[j] : j;
\
- if (row == last_invalid_row) {
\
- continue;
\
- }
\
- if (need_to_validate(j, row)) {
\
- auto dec_val = column_decimal->get_data()[j];
\
- bool invalid = false;
\
- if (dec_val > max_decimal || dec_val < min_decimal) {
\
- fmt::format_to(error_msg, "{}", "decimal value is not valid
for definition"); \
- fmt::format_to(error_msg, ", value={}", dec_val);
\
- fmt::format_to(error_msg, ", precision={}, scale={}",
type.precision, type.scale); \
- fmt::format_to(error_msg, ", min={}, max={}; ", min_decimal,
max_decimal); \
- invalid = true;
\
- }
\
- if (invalid) {
\
- last_invalid_row = row;
\
- RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
\
- }
\
- }
\
- }
- CHECK_VALIDATION_FOR_DECIMALV3(Decimal32, Decimal32);
- break;
- }
- case TYPE_DECIMAL64: {
- CHECK_VALIDATION_FOR_DECIMALV3(Decimal64, Decimal64);
- break;
- }
- case TYPE_DECIMAL128I: {
- CHECK_VALIDATION_FOR_DECIMALV3(Decimal128I, Decimal128);
- break;
- }
- case TYPE_ARRAY: {
- const auto column_array =
- assert_cast<const
vectorized::ColumnArray*>(real_column_ptr.get());
- DCHECK(type.children.size() == 1);
- auto nested_type = type.children[0];
- const auto& offsets = column_array->get_offsets();
- vectorized::IColumn::Permutation permutation(offsets.back());
- for (size_t r = 0; r < offsets.size(); ++r) {
- for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
- permutation[c] = rows ? (*rows)[r] : r;
- }
- }
- fmt::format_to(error_prefix, "ARRAY type failed: ");
- RETURN_IF_ERROR(_validate_column(state, nested_type,
type.contains_nulls[0],
- column_array->get_data_ptr(),
slot_index, filter_bitmap,
- stop_processing, error_prefix,
&permutation));
- break;
- }
- case TYPE_MAP: {
- const auto column_map = assert_cast<const
vectorized::ColumnMap*>(real_column_ptr.get());
- DCHECK(type.children.size() == 2);
- auto key_type = type.children[0];
- auto val_type = type.children[1];
- const auto& offsets = column_map->get_offsets();
- vectorized::IColumn::Permutation permutation(offsets.back());
- for (size_t r = 0; r < offsets.size(); ++r) {
- for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
- permutation[c] = rows ? (*rows)[r] : r;
- }
- }
- fmt::format_to(error_prefix, "MAP type failed: ");
- RETURN_IF_ERROR(_validate_column(state, key_type,
type.contains_nulls[0],
- column_map->get_keys_ptr(),
slot_index, filter_bitmap,
- stop_processing, error_prefix,
&permutation));
- RETURN_IF_ERROR(_validate_column(state, val_type,
type.contains_nulls[1],
- column_map->get_values_ptr(),
slot_index, filter_bitmap,
- stop_processing, error_prefix,
&permutation));
- break;
- }
- case TYPE_STRUCT: {
- const auto column_struct =
- assert_cast<const
vectorized::ColumnStruct*>(real_column_ptr.get());
- DCHECK(type.children.size() == column_struct->tuple_size());
- fmt::format_to(error_prefix, "STRUCT type failed: ");
- for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) {
- RETURN_IF_ERROR(_validate_column(state, type.children[sc],
type.contains_nulls[sc],
-
column_struct->get_column_ptr(sc), slot_index,
- filter_bitmap, stop_processing,
error_prefix));
- }
- break;
- }
- default:
- break;
- }
-
- // Dispose the column should do not contain the NULL value
- // Only two case:
- // 1. column is nullable but the desc is not nullable
- // 2. desc->type is BITMAP
- if ((!is_nullable || type == TYPE_OBJECT) && column_ptr) {
- for (int j = 0; j < column->size(); ++j) {
- auto row = rows ? (*rows)[j] : j;
- if (row == last_invalid_row) {
- continue;
- }
- if (null_map[j] && !filter_bitmap->Get(row)) {
- fmt::format_to(error_msg, "null value for not null column,
type={}",
- type.debug_string());
- last_invalid_row = row;
- RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
- }
- }
- }
-
- return Status::OK();
-}
-
-Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block*
block,
- Bitmap* filter_bitmap, int*
filtered_rows,
- bool* stop_processing) {
- for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
- SlotDescriptor* desc = _output_tuple_desc->slots()[i];
- block->get_by_position(i).column =
-
block->get_by_position(i).column->convert_to_full_column_if_const();
- const auto& column = block->get_by_position(i).column;
-
- fmt::memory_buffer error_prefix;
- fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name());
- RETURN_IF_ERROR(_validate_column(state, desc->type(),
desc->is_nullable(), column, i,
- filter_bitmap, stop_processing,
error_prefix));
- }
-
- *filtered_rows = 0;
- for (int i = 0; i < block->rows(); ++i) {
- *filtered_rows += filter_bitmap->Get(i);
- }
- return Status::OK();
-}
-
-void VOlapTableSink::_convert_to_dest_desc_block(doris::vectorized::Block*
block) {
- for (int i = 0; i < _output_tuple_desc->slots().size() && i <
block->columns(); ++i) {
- SlotDescriptor* desc = _output_tuple_desc->slots()[i];
- if (desc->is_nullable() !=
block->get_by_position(i).type->is_nullable()) {
- if (desc->is_nullable()) {
- block->get_by_position(i).type =
-
vectorized::make_nullable(block->get_by_position(i).type);
- block->get_by_position(i).column =
-
vectorized::make_nullable(block->get_by_position(i).column);
- } else {
- block->get_by_position(i).type = assert_cast<const
vectorized::DataTypeNullable&>(
-
*block->get_by_position(i).type)
- .get_nested_type();
- block->get_by_position(i).column = assert_cast<const
vectorized::ColumnNullable&>(
-
*block->get_by_position(i).column)
-
.get_nested_column_ptr();
- }
- }
- }
-}
-
} // namespace stream_load
} // namespace doris
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index ebd7c40a81..9c608cda43 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -82,6 +82,7 @@ class RefCountClosure;
namespace stream_load {
+class OlapTableValidator;
class OpenPartitionClosure;
// The counter of add_batch rpc of a single node
@@ -494,28 +495,6 @@ private:
ChannelDistributionPayload&
channel_to_payload,
size_t num_rows, int32_t filtered_rows);
- // make input data valid for OLAP table
- // return number of invalid/filtered rows.
- // invalid row number is set in Bitmap
- // set stop_processing if we want to stop the whole process now.
- Status _validate_data(RuntimeState* state, vectorized::Block* block,
Bitmap* filter_bitmap,
- int* filtered_rows, bool* stop_processing);
-
- template <bool is_min>
- DecimalV2Value _get_decimalv2_min_or_max(const TypeDescriptor& type);
-
- template <typename DecimalType, bool IsMin>
- DecimalType _get_decimalv3_min_or_max(const TypeDescriptor& type);
-
- Status _validate_column(RuntimeState* state, const TypeDescriptor& type,
bool is_nullable,
- vectorized::ColumnPtr column, size_t slot_index,
Bitmap* filter_bitmap,
- bool* stop_processing, fmt::memory_buffer&
error_prefix,
- vectorized::IColumn::Permutation* rows = nullptr);
-
- // some output column of output expr may have different nullable property
with dest slot desc
- // so here need to do the convert operation
- void _convert_to_dest_desc_block(vectorized::Block* block);
-
Status find_tablet(RuntimeState* state, vectorized::Block* block, int
row_index,
const VOlapTablePartition** partition, uint32_t&
tablet_index,
bool& stop_processing, bool& is_continue);
@@ -565,16 +544,7 @@ private:
bthread_t _sender_thread = 0;
std::unique_ptr<ThreadPoolToken> _send_batch_thread_pool_token;
- std::map<std::pair<int, int>, DecimalV2Value> _max_decimalv2_val;
- std::map<std::pair<int, int>, DecimalV2Value> _min_decimalv2_val;
-
- std::map<int, int32_t> _max_decimal32_val;
- std::map<int, int32_t> _min_decimal32_val;
- std::map<int, int64_t> _max_decimal64_val;
- std::map<int, int64_t> _min_decimal64_val;
- std::map<int, int128_t> _max_decimal128_val;
- std::map<int, int128_t> _min_decimal128_val;
-
+ std::unique_ptr<OlapTableValidator> _validator;
// Stats for this
int64_t _validate_data_ns = 0;
int64_t _send_data_ns = 0;
diff --git a/be/src/vec/sink/vtablet_validator.cpp
b/be/src/vec/sink/vtablet_validator.cpp
new file mode 100644
index 0000000000..39cc92f0f4
--- /dev/null
+++ b/be/src/vec/sink/vtablet_validator.cpp
@@ -0,0 +1,404 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/vtablet_validator.h"
+
+#include <fmt/format.h>
+#include <google/protobuf/stubs/common.h>
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
+#include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/status.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
+#include "service/brpc.h"
+#include "util/binary_cast.hpp"
+#include "util/brpc_client_cache.h"
+#include "util/thread.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_const.h"
+#include "vec/columns/column_decimal.h"
+#include "vec/columns/column_map.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_struct.h"
+#include "vec/common/assert_cast.h"
+#include "vec/core/block.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+
+namespace doris {
+namespace stream_load {
+
+template <bool is_min>
+DecimalV2Value OlapTableValidator::_get_decimalv2_min_or_max(const
TypeDescriptor& type) {
+ std::map<std::pair<int, int>, DecimalV2Value>* pmap;
+ if constexpr (is_min) {
+ pmap = &_min_decimalv2_val;
+ } else {
+ pmap = &_max_decimalv2_val;
+ }
+
+ // found
+ auto iter = pmap->find({type.precision, type.scale});
+ if (iter != pmap->end()) {
+ return iter->second;
+ }
+
+ // save min or max DecimalV2Value for next time
+ DecimalV2Value value;
+ if constexpr (is_min) {
+ value.to_min_decimal(type.precision, type.scale);
+ } else {
+ value.to_max_decimal(type.precision, type.scale);
+ }
+ pmap->emplace(std::pair<int, int> {type.precision, type.scale}, value);
+ return value;
+}
+
+template <typename DecimalType, bool IsMin>
+DecimalType OlapTableValidator::_get_decimalv3_min_or_max(const
TypeDescriptor& type) {
+ std::map<int, typename DecimalType::NativeType>* pmap;
+ if constexpr (std::is_same_v<DecimalType, vectorized::Decimal32>) {
+ pmap = IsMin ? &_min_decimal32_val : &_max_decimal32_val;
+ } else if constexpr (std::is_same_v<DecimalType, vectorized::Decimal64>) {
+ pmap = IsMin ? &_min_decimal64_val : &_max_decimal64_val;
+ } else {
+ pmap = IsMin ? &_min_decimal128_val : &_max_decimal128_val;
+ }
+
+ // found
+ auto iter = pmap->find(type.precision);
+ if (iter != pmap->end()) {
+ return iter->second;
+ }
+
+ typename DecimalType::NativeType value;
+ if constexpr (IsMin) {
+ value = vectorized::min_decimal_value<DecimalType>(type.precision);
+ } else {
+ value = vectorized::max_decimal_value<DecimalType>(type.precision);
+ }
+ pmap->emplace(type.precision, value);
+ return value;
+}
+
+Status OlapTableValidator::_validate_column(RuntimeState* state, const
TypeDescriptor& type,
+ bool is_nullable,
vectorized::ColumnPtr column,
+ size_t slot_index, Bitmap*
filter_bitmap,
+ bool* stop_processing,
fmt::memory_buffer& error_prefix,
+ vectorized::IColumn::Permutation*
rows) {
+ DCHECK((rows == nullptr) || (rows->size() == column->size()));
+ fmt::memory_buffer error_msg;
+ auto set_invalid_and_append_error_msg = [&](int row) {
+ filter_bitmap->Set(row, true);
+ auto ret = state->append_error_msg_to_file([]() -> std::string {
return ""; },
+ [&error_prefix,
&error_msg]() -> std::string {
+ return
fmt::to_string(error_prefix) +
+
fmt::to_string(error_msg);
+ },
+ stop_processing);
+ error_msg.clear();
+ return ret;
+ };
+
+ auto column_ptr =
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column);
+ auto& real_column_ptr = column_ptr == nullptr ? column :
(column_ptr->get_nested_column_ptr());
+ auto null_map = column_ptr == nullptr ? nullptr :
column_ptr->get_null_map_data().data();
+ auto need_to_validate = [&null_map, &filter_bitmap](size_t j, size_t row) {
+ return !filter_bitmap->Get(row) && (null_map == nullptr || null_map[j]
== 0);
+ };
+
+ ssize_t last_invalid_row = -1;
+ switch (type.type) {
+ case TYPE_CHAR:
+ case TYPE_VARCHAR:
+ case TYPE_STRING: {
+ const auto column_string =
+ assert_cast<const
vectorized::ColumnString*>(real_column_ptr.get());
+
+ size_t limit = config::string_type_length_soft_limit_bytes;
+ // when type.len is negative, std::min will return overflow value, so
we need to check it
+ if (type.len > 0) {
+ limit = std::min(config::string_type_length_soft_limit_bytes,
type.len);
+ }
+ for (size_t j = 0; j < column->size(); ++j) {
+ auto row = rows ? (*rows)[j] : j;
+ if (row == last_invalid_row) {
+ continue;
+ }
+ if (need_to_validate(j, row)) {
+ auto str_val = column_string->get_data_at(j);
+ bool invalid = str_val.size > limit;
+ if (invalid) {
+ last_invalid_row = row;
+ if (str_val.size > type.len) {
+ fmt::format_to(error_msg, "{}",
+ "the length of input is too long than
schema. ");
+ fmt::format_to(error_msg, "first 32 bytes of input
str: [{}] ",
+ str_val.to_prefix(32));
+ fmt::format_to(error_msg, "schema length: {}; ",
type.len);
+ fmt::format_to(error_msg, "actual length: {}; ",
str_val.size);
+ } else if (str_val.size > limit) {
+ fmt::format_to(error_msg, "{}",
+ "the length of input string is too long
than vec schema. ");
+ fmt::format_to(error_msg, "first 32 bytes of input
str: [{}] ",
+ str_val.to_prefix(32));
+ fmt::format_to(error_msg, "schema length: {}; ",
type.len);
+ fmt::format_to(error_msg, "limit length: {}; ", limit);
+ fmt::format_to(error_msg, "actual length: {}; ",
str_val.size);
+ }
+ RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
+ }
+ }
+ }
+ break;
+ }
+ case TYPE_JSONB: {
+ const auto column_string =
+ assert_cast<const
vectorized::ColumnString*>(real_column_ptr.get());
+ for (size_t j = 0; j < column->size(); ++j) {
+ if (!filter_bitmap->Get(j)) {
+ if (is_nullable && column_ptr && column_ptr->is_null_at(j)) {
+ continue;
+ }
+ auto str_val = column_string->get_data_at(j);
+ bool invalid = str_val.size == 0;
+ if (invalid) {
+ error_msg.clear();
+ fmt::format_to(error_msg, "{}", "jsonb with size 0 is
invalid");
+ RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
+ }
+ }
+ }
+ break;
+ }
+ case TYPE_DECIMALV2: {
+ auto column_decimal =
const_cast<vectorized::ColumnDecimal<vectorized::Decimal128>*>(
+ assert_cast<const
vectorized::ColumnDecimal<vectorized::Decimal128>*>(
+ real_column_ptr.get()));
+ const auto& max_decimalv2 = _get_decimalv2_min_or_max<false>(type);
+ const auto& min_decimalv2 = _get_decimalv2_min_or_max<true>(type);
+ for (size_t j = 0; j < column->size(); ++j) {
+ auto row = rows ? (*rows)[j] : j;
+ if (row == last_invalid_row) {
+ continue;
+ }
+ if (need_to_validate(j, row)) {
+ auto dec_val = binary_cast<vectorized::Int128, DecimalV2Value>(
+ column_decimal->get_data()[j]);
+ bool invalid = false;
+
+ if (dec_val.greater_than_scale(type.scale)) {
+ auto code = dec_val.round(&dec_val, type.scale, HALF_UP);
+ column_decimal->get_data()[j] = dec_val.value();
+
+ if (code != E_DEC_OK) {
+ fmt::format_to(error_msg, "round one decimal
failed.value={}; ",
+ dec_val.to_string());
+ invalid = true;
+ }
+ }
+ if (dec_val > max_decimalv2 || dec_val < min_decimalv2) {
+ fmt::format_to(error_msg, "{}", "decimal value is not
valid for definition");
+ fmt::format_to(error_msg, ", value={}",
dec_val.to_string());
+ fmt::format_to(error_msg, ", precision={}, scale={}",
type.precision,
+ type.scale);
+ fmt::format_to(error_msg, ", min={}, max={}; ",
min_decimalv2.to_string(),
+ max_decimalv2.to_string());
+ invalid = true;
+ }
+
+ if (invalid) {
+ last_invalid_row = row;
+ RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
+ }
+ }
+ }
+ break;
+ }
+ case TYPE_DECIMAL32: {
+#define CHECK_VALIDATION_FOR_DECIMALV3(ColumnDecimalType, DecimalType)
\
+ auto column_decimal =
const_cast<vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>( \
+ assert_cast<const
vectorized::ColumnDecimal<vectorized::ColumnDecimalType>*>( \
+ real_column_ptr.get()));
\
+ const auto& max_decimal =
_get_decimalv3_min_or_max<vectorized::DecimalType, false>(type); \
+ const auto& min_decimal =
_get_decimalv3_min_or_max<vectorized::DecimalType, true>(type); \
+ for (size_t j = 0; j < column->size(); ++j) {
\
+ auto row = rows ? (*rows)[j] : j;
\
+ if (row == last_invalid_row) {
\
+ continue;
\
+ }
\
+ if (need_to_validate(j, row)) {
\
+ auto dec_val = column_decimal->get_data()[j];
\
+ bool invalid = false;
\
+ if (dec_val > max_decimal || dec_val < min_decimal) {
\
+ fmt::format_to(error_msg, "{}", "decimal value is not valid
for definition"); \
+ fmt::format_to(error_msg, ", value={}", dec_val);
\
+ fmt::format_to(error_msg, ", precision={}, scale={}",
type.precision, type.scale); \
+ fmt::format_to(error_msg, ", min={}, max={}; ", min_decimal,
max_decimal); \
+ invalid = true;
\
+ }
\
+ if (invalid) {
\
+ last_invalid_row = row;
\
+ RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
\
+ }
\
+ }
\
+ }
+ CHECK_VALIDATION_FOR_DECIMALV3(Decimal32, Decimal32);
+ break;
+ }
+ case TYPE_DECIMAL64: {
+ CHECK_VALIDATION_FOR_DECIMALV3(Decimal64, Decimal64);
+ break;
+ }
+ case TYPE_DECIMAL128I: {
+ CHECK_VALIDATION_FOR_DECIMALV3(Decimal128I, Decimal128);
+ break;
+ }
+ case TYPE_ARRAY: {
+ const auto column_array =
+ assert_cast<const
vectorized::ColumnArray*>(real_column_ptr.get());
+ DCHECK(type.children.size() == 1);
+ auto nested_type = type.children[0];
+ const auto& offsets = column_array->get_offsets();
+ vectorized::IColumn::Permutation permutation(offsets.back());
+ for (size_t r = 0; r < offsets.size(); ++r) {
+ for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
+ permutation[c] = rows ? (*rows)[r] : r;
+ }
+ }
+ fmt::format_to(error_prefix, "ARRAY type failed: ");
+ RETURN_IF_ERROR(_validate_column(state, nested_type,
type.contains_nulls[0],
+ column_array->get_data_ptr(),
slot_index, filter_bitmap,
+ stop_processing, error_prefix,
&permutation));
+ break;
+ }
+ case TYPE_MAP: {
+ const auto column_map = assert_cast<const
vectorized::ColumnMap*>(real_column_ptr.get());
+ DCHECK(type.children.size() == 2);
+ auto key_type = type.children[0];
+ auto val_type = type.children[1];
+ const auto& offsets = column_map->get_offsets();
+ vectorized::IColumn::Permutation permutation(offsets.back());
+ for (size_t r = 0; r < offsets.size(); ++r) {
+ for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
+ permutation[c] = rows ? (*rows)[r] : r;
+ }
+ }
+ fmt::format_to(error_prefix, "MAP type failed: ");
+ RETURN_IF_ERROR(_validate_column(state, key_type,
type.contains_nulls[0],
+ column_map->get_keys_ptr(),
slot_index, filter_bitmap,
+ stop_processing, error_prefix,
&permutation));
+ RETURN_IF_ERROR(_validate_column(state, val_type,
type.contains_nulls[1],
+ column_map->get_values_ptr(),
slot_index, filter_bitmap,
+ stop_processing, error_prefix,
&permutation));
+ break;
+ }
+ case TYPE_STRUCT: {
+ const auto column_struct =
+ assert_cast<const
vectorized::ColumnStruct*>(real_column_ptr.get());
+ DCHECK(type.children.size() == column_struct->tuple_size());
+ fmt::format_to(error_prefix, "STRUCT type failed: ");
+ for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) {
+ RETURN_IF_ERROR(_validate_column(state, type.children[sc],
type.contains_nulls[sc],
+
column_struct->get_column_ptr(sc), slot_index,
+ filter_bitmap, stop_processing,
error_prefix));
+ }
+ break;
+ }
+ default:
+ break;
+ }
+
+ // Dispose the column should do not contain the NULL value
+ // Only two case:
+ // 1. column is nullable but the desc is not nullable
+ // 2. desc->type is BITMAP
+ if ((!is_nullable || type == TYPE_OBJECT) && column_ptr) {
+ for (int j = 0; j < column->size(); ++j) {
+ auto row = rows ? (*rows)[j] : j;
+ if (row == last_invalid_row) {
+ continue;
+ }
+ if (null_map[j] && !filter_bitmap->Get(row)) {
+ fmt::format_to(error_msg, "null value for not null column,
type={}",
+ type.debug_string());
+ last_invalid_row = row;
+ RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
+ }
+ }
+ }
+
+ return Status::OK();
+}
+
+Status OlapTableValidator::validate_data(RuntimeState* state,
vectorized::Block* block,
+ Bitmap* filter_bitmap, int*
filtered_rows,
+ bool* stop_processing) {
+ for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
+ SlotDescriptor* desc = _output_tuple_desc->slots()[i];
+ block->get_by_position(i).column =
+
block->get_by_position(i).column->convert_to_full_column_if_const();
+ const auto& column = block->get_by_position(i).column;
+
+ fmt::memory_buffer error_prefix;
+ fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name());
+ RETURN_IF_ERROR(_validate_column(state, desc->type(),
desc->is_nullable(), column, i,
+ filter_bitmap, stop_processing,
error_prefix));
+ }
+
+ *filtered_rows = 0;
+ for (int i = 0; i < block->rows(); ++i) {
+ *filtered_rows += filter_bitmap->Get(i);
+ }
+ return Status::OK();
+}
+
+void OlapTableValidator::convert_to_dest_desc_block(doris::vectorized::Block*
block) {
+ for (int i = 0; i < _output_tuple_desc->slots().size() && i <
block->columns(); ++i) {
+ SlotDescriptor* desc = _output_tuple_desc->slots()[i];
+ if (desc->is_nullable() !=
block->get_by_position(i).type->is_nullable()) {
+ if (desc->is_nullable()) {
+ block->get_by_position(i).type =
+
vectorized::make_nullable(block->get_by_position(i).type);
+ block->get_by_position(i).column =
+
vectorized::make_nullable(block->get_by_position(i).column);
+ } else {
+ block->get_by_position(i).type = assert_cast<const
vectorized::DataTypeNullable&>(
+
*block->get_by_position(i).type)
+ .get_nested_type();
+ block->get_by_position(i).column = assert_cast<const
vectorized::ColumnNullable&>(
+
*block->get_by_position(i).column)
+
.get_nested_column_ptr();
+ }
+ }
+ }
+}
+
+} // namespace stream_load
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/vtablet_validator.h
b/be/src/vec/sink/vtablet_validator.h
new file mode 100644
index 0000000000..d610f7680d
--- /dev/null
+++ b/be/src/vec/sink/vtablet_validator.h
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <stddef.h>
+#include <stdint.h>
+
+// IWYU pragma: no_include <bits/chrono.h>
+#include <chrono> // IWYU pragma: keep
+#include <map>
+
+#include "common/status.h"
+#include "runtime/decimalv2_value.h"
+#include "runtime/types.h"
+#include "util/bitmap.h"
+#include "vec/columns/column.h"
+#include "vec/core/block.h"
+
+namespace doris {
+namespace stream_load {
+
+class OlapTableValidator {
+public:
+ OlapTableValidator(TupleDescriptor* output_tuple_desc)
+ : _output_tuple_desc(output_tuple_desc) {}
+
+ // make input data valid for OLAP table
+ // return number of invalid/filtered rows.
+ // invalid row number is set in Bitmap
+ // set stop_processing if we want to stop the whole process now.
+ Status validate_data(RuntimeState* state, vectorized::Block* block,
Bitmap* filter_bitmap,
+ int* filtered_rows, bool* stop_processing);
+
+ // some output column of output expr may have different nullable property
with dest slot desc
+ // so here need to do the convert operation
+ void convert_to_dest_desc_block(vectorized::Block* block);
+
+private:
+ template <bool is_min>
+ DecimalV2Value _get_decimalv2_min_or_max(const TypeDescriptor& type);
+
+ template <typename DecimalType, bool IsMin>
+ DecimalType _get_decimalv3_min_or_max(const TypeDescriptor& type);
+
+ Status _validate_column(RuntimeState* state, const TypeDescriptor& type,
bool is_nullable,
+ vectorized::ColumnPtr column, size_t slot_index,
Bitmap* filter_bitmap,
+ bool* stop_processing, fmt::memory_buffer&
error_prefix,
+ vectorized::IColumn::Permutation* rows = nullptr);
+
+ TupleDescriptor* _output_tuple_desc = nullptr;
+
+ std::map<std::pair<int, int>, DecimalV2Value> _max_decimalv2_val;
+ std::map<std::pair<int, int>, DecimalV2Value> _min_decimalv2_val;
+
+ std::map<int, int32_t> _max_decimal32_val;
+ std::map<int, int32_t> _min_decimal32_val;
+ std::map<int, int64_t> _max_decimal64_val;
+ std::map<int, int64_t> _min_decimal64_val;
+ std::map<int, int128_t> _max_decimal128_val;
+ std::map<int, int128_t> _min_decimal128_val;
+};
+
+} // namespace stream_load
+} // namespace doris
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]