This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0ee53be [fix][improvement](runtime-filter) fix string type length
limit error && add runtime filter decimal support (#8282)
0ee53be is described below
commit 0ee53be8831bec1c4eb6f0f6d6ae437c7b59646b
Author: Pxl <[email protected]>
AuthorDate: Thu Mar 3 22:44:49 2022 +0800
[fix][improvement](runtime-filter) fix string type length limit error &&
add runtime filter decimal support (#8282)
---
be/src/exprs/runtime_filter.cpp | 200 ++++++++++++++++++---------------
be/src/olap/olap_define.h | 2 +-
be/src/vec/core/block.cpp | 9 +-
be/src/vec/functions/function_case.h | 5 +-
be/src/vec/sink/vtablet_sink.cpp | 205 +++++++++++++++++++---------------
be/src/vec/utils/template_helpers.hpp | 8 +-
6 files changed, 247 insertions(+), 182 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 4b603ea..2ab2e17 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -34,9 +34,11 @@
#include "exprs/predicate.h"
#include "gen_cpp/internal_service.pb.h"
#include "gen_cpp/types.pb.h"
+#include "runtime/primitive_type.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/type_limit.h"
+#include "udf/udf.h"
#include "util/defer_op.h"
#include "util/runtime_profile.h"
#include "util/string_parser.hpp"
@@ -328,8 +330,13 @@ public:
_filter_id(params->filter_id) {}
// for a 'tmp' runtime predicate wrapper
// only could called assign method or as a param for merge
- RuntimePredicateWrapper(MemTracker* tracker, ObjectPool* pool,
RuntimeFilterType type, UniqueId fragment_instance_id, uint32_t filter_id)
- : _tracker(tracker), _pool(pool), _filter_type(type),
_fragment_instance_id(fragment_instance_id), _filter_id(filter_id) {}
+ RuntimePredicateWrapper(MemTracker* tracker, ObjectPool* pool,
RuntimeFilterType type,
+ UniqueId fragment_instance_id, uint32_t filter_id)
+ : _tracker(tracker),
+ _pool(pool),
+ _filter_type(type),
+ _fragment_instance_id(fragment_instance_id),
+ _filter_id(filter_id) {}
// init runtime filter wrapper
// alloc memory to init runtime filter function
Status init(const RuntimeFilterParams* params) {
@@ -410,8 +417,10 @@ public:
case TYPE_DATE:
case TYPE_DATETIME: {
// DateTime->DateTimeValue
- vectorized::DateTime date_time =*reinterpret_cast<const
vectorized::DateTime*>(value.data);
- vectorized::VecDateTimeValue vec_date_time_value =
binary_cast<vectorized::Int64, vectorized::VecDateTimeValue>(date_time);
+ vectorized::DateTime date_time =
+ *reinterpret_cast<const vectorized::DateTime*>(value.data);
+ vectorized::VecDateTimeValue vec_date_time_value =
+ binary_cast<vectorized::Int64,
vectorized::VecDateTimeValue>(date_time);
doris::DateTimeValue date_time_value;
vec_date_time_value.convert_vec_dt_to_dt(&date_time_value);
insert(reinterpret_cast<const void*>(&date_time_value));
@@ -438,9 +447,8 @@ public:
RuntimeFilterType get_real_type() {
auto real_filter_type = _filter_type;
if (real_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
- real_filter_type = _is_bloomfilter
- ? RuntimeFilterType::BLOOM_FILTER
- : RuntimeFilterType::IN_FILTER;
+ real_filter_type = _is_bloomfilter ?
RuntimeFilterType::BLOOM_FILTER
+ : RuntimeFilterType::IN_FILTER;
}
return real_filter_type;
}
@@ -511,18 +519,18 @@ public:
}
Status merge(const RuntimePredicateWrapper* wrapper) {
- bool can_not_merge_in_or_bloom
- = _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
- (wrapper->_filter_type != RuntimeFilterType::IN_FILTER
- && wrapper->_filter_type !=
RuntimeFilterType::BLOOM_FILTER);
+ bool can_not_merge_in_or_bloom = _filter_type ==
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
+ (wrapper->_filter_type !=
RuntimeFilterType::IN_FILTER &&
+ wrapper->_filter_type !=
RuntimeFilterType::BLOOM_FILTER);
- bool can_not_merge_other = _filter_type !=
RuntimeFilterType::IN_OR_BLOOM_FILTER
- && _filter_type != wrapper->_filter_type;
+ bool can_not_merge_other = _filter_type !=
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
+ _filter_type != wrapper->_filter_type;
CHECK(!can_not_merge_in_or_bloom && !can_not_merge_other)
<< "fragment instance " << _fragment_instance_id.to_string()
- << " can not merge runtime filter(id=" << _filter_id << "),
current is filter type is "
- << to_string(_filter_type) << ", other filter type is " <<
to_string(wrapper->_filter_type);
+ << " can not merge runtime filter(id=" << _filter_id
+ << "), current is filter type is " << to_string(_filter_type)
+ << ", other filter type is " <<
to_string(wrapper->_filter_type);
switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
@@ -530,8 +538,8 @@ public:
break;
} else if (wrapper->_is_ignored_in_filter) {
VLOG_DEBUG << "fragment instance " <<
_fragment_instance_id.to_string()
- << " ignore merge runtime filter(in filter id "
- << _filter_id << ") because: " <<
*(wrapper->get_ignored_in_filter_msg());
+ << " ignore merge runtime filter(in filter id " <<
_filter_id
+ << ") because: " <<
*(wrapper->get_ignored_in_filter_msg());
_is_ignored_in_filter = true;
_ignored_in_filter_msg = wrapper->_ignored_in_filter_msg;
@@ -545,9 +553,9 @@ public:
#ifdef VLOG_DEBUG_IS_ON
std::stringstream msg;
msg << "fragment instance " <<
_fragment_instance_id.to_string()
- << " ignore merge runtime filter(in filter id "
- << _filter_id << ") because: in_num(" <<
_hybrid_set->size()
- << ") >= max_in_num(" << _max_in_num << ")";
+ << " ignore merge runtime filter(in filter id " <<
_filter_id
+ << ") because: in_num(" << _hybrid_set->size() << ") >=
max_in_num("
+ << _max_in_num << ")";
_ignored_in_filter_msg = _pool->add(new
std::string(msg.str()));
#else
_ignored_in_filter_msg = _pool->add(new
std::string("ignored"));
@@ -568,9 +576,8 @@ public:
break;
}
case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
- auto real_filter_type = _is_bloomfilter
- ? RuntimeFilterType::BLOOM_FILTER
- : RuntimeFilterType::IN_FILTER;
+ auto real_filter_type = _is_bloomfilter ?
RuntimeFilterType::BLOOM_FILTER
+ :
RuntimeFilterType::IN_FILTER;
if (real_filter_type == RuntimeFilterType::IN_FILTER) {
if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) {
// in merge in
CHECK(!wrapper->_is_ignored_in_filter)
@@ -581,21 +588,22 @@ public:
_hybrid_set->insert(wrapper->_hybrid_set.get());
if (_max_in_num >= 0 && _hybrid_set->size() >=
_max_in_num) {
VLOG_DEBUG << "fragment instance " <<
_fragment_instance_id.to_string()
- << " change runtime filter to bloom filter(id=" <<
_filter_id
- << ") because: in_num(" << _hybrid_set->size()
- << ") >= max_in_num(" << _max_in_num << ")";
+ << " change runtime filter to bloom
filter(id=" << _filter_id
+ << ") because: in_num(" <<
_hybrid_set->size()
+ << ") >= max_in_num(" << _max_in_num << ")";
change_to_bloom_filter();
}
- // in merge bloom filter
+ // in merge bloom filter
} else {
VLOG_DEBUG << "fragment instance " <<
_fragment_instance_id.to_string()
- << " change runtime filter to bloom filter(id=" <<
_filter_id
- << ") because: already exist a bloom filter";
+ << " change runtime filter to bloom filter(id="
<< _filter_id
+ << ") because: already exist a bloom filter";
change_to_bloom_filter();
_bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
}
} else {
- if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) {
// bloom filter merge in
+ if (wrapper->_filter_type ==
+ RuntimeFilterType::IN_FILTER) { // bloom filter merge in
CHECK(!wrapper->_is_ignored_in_filter)
<< "fragment instance " <<
_fragment_instance_id.to_string()
<< " can not ignore merge runtime filter(in filter
id "
@@ -607,7 +615,7 @@ public:
_bloomfilter_func->insert(value);
it->next();
}
- // bloom filter merge bloom filter
+ // bloom filter merge bloom filter
} else {
_bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
}
@@ -626,7 +634,8 @@ public:
PrimitiveType type = to_primitive_type(in_filter->column_type());
if (in_filter->has_ignored_msg()) {
- VLOG_DEBUG << "Ignore in filter(id=" << _filter_id << ") because:
" << in_filter->ignored_msg();
+ VLOG_DEBUG << "Ignore in filter(id=" << _filter_id
+ << ") because: " << in_filter->ignored_msg();
_is_ignored_in_filter = true;
_ignored_in_filter_msg = _pool->add(new
std::string(in_filter->ignored_msg()));
return Status::OK();
@@ -634,60 +643,68 @@ public:
_hybrid_set.reset(create_set(type));
switch (type) {
case TYPE_BOOLEAN: {
- batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set,
PColumnValue &column, ObjectPool *pool) {
+ batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set,
PColumnValue& column,
+ ObjectPool* pool) {
bool bool_val = column.boolval();
set->insert(&bool_val);
});
break;
}
case TYPE_TINYINT: {
- batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set,
PColumnValue &column, ObjectPool *pool) {
+ batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set,
PColumnValue& column,
+ ObjectPool* pool) {
int8_t int_val = static_cast<int8_t>(column.intval());
set->insert(&int_val);
});
break;
}
case TYPE_SMALLINT: {
- batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set,
PColumnValue &column, ObjectPool *pool) {
+ batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set,
PColumnValue& column,
+ ObjectPool* pool) {
int16_t int_val = static_cast<int16_t>(column.intval());
set->insert(&int_val);
});
break;
}
case TYPE_INT: {
- batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set,
PColumnValue &column, ObjectPool *pool) {
+ batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set,
PColumnValue& column,
+ ObjectPool* pool) {
int32_t int_val = column.intval();
set->insert(&int_val);
});
break;
}
case TYPE_BIGINT: {
- batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set,
PColumnValue &column, ObjectPool *pool) {
+ batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set,
PColumnValue& column,
+ ObjectPool* pool) {
int64_t long_val = column.longval();
set->insert(&long_val);
});
break;
}
case TYPE_LARGEINT: {
- batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set,
PColumnValue &column, ObjectPool *pool) {
+ batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set,
PColumnValue& column,
+ ObjectPool* pool) {
auto string_val = column.stringval();
StringParser::ParseResult result;
- int128_t int128_val =
StringParser::string_to_int<int128_t>(string_val.c_str(),
-
string_val.length(), &result);
+ int128_t int128_val = StringParser::string_to_int<int128_t>(
+ string_val.c_str(), string_val.length(), &result);
DCHECK(result == StringParser::PARSE_SUCCESS);
set->insert(&int128_val);
});
break;
}
case TYPE_FLOAT: {
- batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set,
PColumnValue &column, ObjectPool *pool) {
+ batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set,
PColumnValue& column,
+ ObjectPool* pool) {
float float_val = static_cast<float>(column.doubleval());
set->insert(&float_val);
});
break;
}
case TYPE_DOUBLE: {
- batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set,
PColumnValue &column, ObjectPool *pool) {
+ batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set,
PColumnValue& column,
+ ObjectPool* pool) {
double double_val = column.doubleval();
set->insert(&double_val);
});
@@ -695,19 +712,30 @@ public:
}
case TYPE_DATETIME:
case TYPE_DATE: {
- batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set,
PColumnValue &column, ObjectPool *pool) {
- auto &string_val_ref = column.stringval();
+ batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set,
PColumnValue& column,
+ ObjectPool* pool) {
+ auto& string_val_ref = column.stringval();
DateTimeValue datetime_val;
datetime_val.from_date_str(string_val_ref.c_str(),
string_val_ref.length());
set->insert(&datetime_val);
});
break;
}
+ case TYPE_DECIMALV2: {
+ batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set,
PColumnValue& column,
+ ObjectPool* pool) {
+ auto& string_val_ref = column.stringval();
+ DecimalV2Value decimal_val(string_val_ref);
+ set->insert(&decimal_val);
+ });
+ break;
+ }
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_STRING: {
- batch_assign(in_filter, [](std::unique_ptr<HybridSetBase> &set,
PColumnValue &column, ObjectPool *pool) {
- auto &string_val_ref = column.stringval();
+ batch_assign(in_filter, [](std::unique_ptr<HybridSetBase>& set,
PColumnValue& column,
+ ObjectPool* pool) {
+ auto& string_val_ref = column.stringval();
auto val_ptr = pool->add(new std::string(string_val_ref));
StringValue string_val(const_cast<char*>(val_ptr->c_str()),
val_ptr->length());
set->insert(&string_val);
@@ -742,10 +770,8 @@ public:
_minmax_func.reset(create_minmax_filter(type));
switch (type) {
case TYPE_BOOLEAN: {
- bool min_val;
- bool max_val;
- min_val = minmax_filter->min_val().boolval();
- max_val = minmax_filter->max_val().boolval();
+ bool min_val = minmax_filter->min_val().boolval();
+ bool max_val = minmax_filter->max_val().boolval();
return _minmax_func->assign(&min_val, &max_val);
}
case TYPE_TINYINT: {
@@ -791,17 +817,13 @@ public:
return _minmax_func->assign(&min_val, &max_val);
}
case TYPE_FLOAT: {
- float min_val;
- float max_val;
- min_val = static_cast<float>(minmax_filter->min_val().doubleval());
- max_val = static_cast<float>(minmax_filter->max_val().doubleval());
+ float min_val =
static_cast<float>(minmax_filter->min_val().doubleval());
+ float max_val =
static_cast<float>(minmax_filter->max_val().doubleval());
return _minmax_func->assign(&min_val, &max_val);
}
case TYPE_DOUBLE: {
- double min_val;
- double max_val;
- min_val =
static_cast<double>(minmax_filter->min_val().doubleval());
- max_val =
static_cast<double>(minmax_filter->max_val().doubleval());
+ double min_val =
static_cast<double>(minmax_filter->min_val().doubleval());
+ double max_val =
static_cast<double>(minmax_filter->max_val().doubleval());
return _minmax_func->assign(&min_val, &max_val);
}
case TYPE_DATETIME:
@@ -814,6 +836,13 @@ public:
max_val.from_date_str(max_val_ref.c_str(), max_val_ref.length());
return _minmax_func->assign(&min_val, &max_val);
}
+ case TYPE_DECIMALV2: {
+ auto& min_val_ref = minmax_filter->min_val().stringval();
+ auto& max_val_ref = minmax_filter->max_val().stringval();
+ DecimalV2Value min_val(min_val_ref);
+ DecimalV2Value max_val(max_val_ref);
+ return _minmax_func->assign(&min_val, &max_val);
+ }
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_STRING: {
@@ -869,20 +898,15 @@ public:
}
}
- bool is_bloomfilter() const {
- return _is_bloomfilter;
- }
+ bool is_bloomfilter() const { return _is_bloomfilter; }
- bool is_ignored_in_filter() const {
- return _is_ignored_in_filter;
- }
+ bool is_ignored_in_filter() const { return _is_ignored_in_filter; }
- std::string* get_ignored_in_filter_msg() const {
- return _ignored_in_filter_msg;
- }
+ std::string* get_ignored_in_filter_msg() const { return
_ignored_in_filter_msg; }
void batch_assign(const PInFilter* filter,
- void (*assign_func) (std::unique_ptr<HybridSetBase>
&_hybrid_set, PColumnValue&, ObjectPool*)) {
+ void (*assign_func)(std::unique_ptr<HybridSetBase>&
_hybrid_set,
+ PColumnValue&, ObjectPool*)) {
for (int i = 0; i < filter->values_size(); ++i) {
PColumnValue column = filter->values(i);
assign_func(_hybrid_set, column, _pool);
@@ -1085,7 +1109,8 @@ Status IRuntimeFilter::_create_wrapper(const T* param,
MemTracker* tracker, Obje
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
int filter_type = param->request->filter_type();
wrapper->reset(new RuntimePredicateWrapper(tracker, pool,
get_type(filter_type),
- UniqueId(param->request->fragment_id()),
param->request->filter_id()));
+
UniqueId(param->request->fragment_id()),
+ param->request->filter_id()));
switch (filter_type) {
case PFilterType::IN_FILTER: {
@@ -1122,7 +1147,8 @@ void IRuntimeFilter::init_profile(RuntimeProfile*
parent_profile) {
void IRuntimeFilter::update_runtime_filter_type_to_profile() {
if (_profile.get() != nullptr) {
- _profile->add_info_string("RealRuntimeFilterType",
::doris::to_string(_wrapper->get_real_type()));
+ _profile->add_info_string("RealRuntimeFilterType",
+
::doris::to_string(_wrapper->get_real_type()));
}
}
@@ -1157,7 +1183,7 @@ const RuntimePredicateWrapper*
IRuntimeFilter::get_wrapper() {
template <typename T>
void batch_copy(PInFilter* filter, HybridSetBase::IteratorBase* it,
- void (*set_func) (PColumnValue*, const T*)) {
+ void (*set_func)(PColumnValue*, const T*)) {
while (it->has_next()) {
const void* void_value = it->get_value();
auto origin_value = reinterpret_cast<const T*>(void_value);
@@ -1170,9 +1196,8 @@ template <class T>
Status IRuntimeFilter::serialize_impl(T* request, void** data, int* len) {
auto real_runtime_filter_type = _runtime_filter_type;
if (real_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
- real_runtime_filter_type = _wrapper->is_bloomfilter()
- ? RuntimeFilterType::BLOOM_FILTER
- : RuntimeFilterType::IN_FILTER;
+ real_runtime_filter_type = _wrapper->is_bloomfilter() ?
RuntimeFilterType::BLOOM_FILTER
+ :
RuntimeFilterType::IN_FILTER;
}
request->set_filter_type(get_type(real_runtime_filter_type));
@@ -1209,56 +1234,56 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) {
switch (column_type) {
case TYPE_BOOLEAN: {
- batch_copy<int32_t>(filter, it, [](PColumnValue *column, const int32_t
*value) {
+ batch_copy<int32_t>(filter, it, [](PColumnValue* column, const
int32_t* value) {
column->set_boolval(*value);
});
return;
}
case TYPE_TINYINT: {
- batch_copy<int8_t>(filter, it, [](PColumnValue *column, const int8_t
*value) {
+ batch_copy<int8_t>(filter, it, [](PColumnValue* column, const int8_t*
value) {
column->set_intval(*value);
});
return;
}
case TYPE_SMALLINT: {
- batch_copy<int16_t>(filter, it, [](PColumnValue *column, const int16_t
*value) {
+ batch_copy<int16_t>(filter, it, [](PColumnValue* column, const
int16_t* value) {
column->set_intval(*value);
});
return;
}
case TYPE_INT: {
- batch_copy<int32_t>(filter, it, [](PColumnValue *column, const int32_t
*value) {
+ batch_copy<int32_t>(filter, it, [](PColumnValue* column, const
int32_t* value) {
column->set_intval(*value);
});
return;
}
case TYPE_BIGINT: {
- batch_copy<int64_t>(filter, it, [](PColumnValue *column, const int64_t
*value) {
+ batch_copy<int64_t>(filter, it, [](PColumnValue* column, const
int64_t* value) {
column->set_longval(*value);
});
return;
}
case TYPE_LARGEINT: {
- batch_copy<int128_t>(filter, it, [](PColumnValue *column, const
int128_t *value) {
+ batch_copy<int128_t>(filter, it, [](PColumnValue* column, const
int128_t* value) {
column->set_stringval(LargeIntValue::to_string(*value));
});
return;
}
case TYPE_FLOAT: {
- batch_copy<float>(filter, it, [](PColumnValue *column, const float
*value) {
+ batch_copy<float>(filter, it, [](PColumnValue* column, const float*
value) {
column->set_doubleval(*value);
});
return;
}
case TYPE_DOUBLE: {
- batch_copy<double>(filter, it, [](PColumnValue *column, const double
*value) {
+ batch_copy<double>(filter, it, [](PColumnValue* column, const double*
value) {
column->set_doubleval(*value);
});
return;
}
case TYPE_DATE:
case TYPE_DATETIME: {
- batch_copy<DateTimeValue>(filter, it, [](PColumnValue *column, const
DateTimeValue *value) {
+ batch_copy<DateTimeValue>(filter, it, [](PColumnValue* column, const
DateTimeValue* value) {
char convert_buffer[30];
value->to_string(convert_buffer);
column->set_stringval(convert_buffer);
@@ -1266,15 +1291,16 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) {
return;
}
case TYPE_DECIMALV2: {
- batch_copy<DecimalV2Value>(filter, it, [](PColumnValue *column, const
DecimalV2Value *value) {
- column->set_stringval(value->to_string());
- });
+ batch_copy<DecimalV2Value>(filter, it,
+ [](PColumnValue* column, const
DecimalV2Value* value) {
+
column->set_stringval(value->to_string());
+ });
return;
}
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
- batch_copy<StringValue>(filter, it, [](PColumnValue *column, const
StringValue *value) {
+ batch_copy<StringValue>(filter, it, [](PColumnValue* column, const
StringValue* value) {
column->set_stringval(std::string(value->ptr, value->len));
});
return;
diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index 19205e6..b16c614 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -57,7 +57,7 @@ static const uint16_t OLAP_VARCHAR_MAX_LENGTH = 65535;
static const uint32_t OLAP_STRING_MAX_LENGTH = 2147483647;
// the max length supported for vec string type 1MB
-static constexpr auto MAX_SIZE_OF_VEC_STRING = 1024l * 1024;
+static constexpr size_t MAX_SIZE_OF_VEC_STRING = 1024 * 1024;
// the max length supported for array
static const uint16_t OLAP_ARRAY_MAX_LENGTH = 65535;
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index f888ad1..1dda373 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -44,10 +44,10 @@
#include "vec/data_types/data_type_date.h"
#include "vec/data_types/data_type_date_time.h"
#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_hll.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_number.h"
#include "vec/data_types/data_type_string.h"
-#include "vec/data_types/data_type_hll.h"
namespace doris::vectorized {
@@ -718,14 +718,12 @@ Status Block::serialize(PBlock* pblock, size_t*
uncompressed_bytes, size_t* comp
}
// serialize data values
+ // when data type is HLL, content_uncompressed_size maybe larger than real
size.
allocated_buf->resize(content_uncompressed_size);
char* buf = allocated_buf->data();
- char* start_buf = buf;
for (const auto& c : *this) {
buf = c.type->serialize(*(c.column), buf);
}
- CHECK(content_uncompressed_size == (buf - start_buf))
- << content_uncompressed_size << " vs. " << (buf - start_buf);
*uncompressed_bytes = content_uncompressed_size;
// compress
@@ -792,7 +790,8 @@ doris::Tuple* Block::deep_copy_tuple(const
doris::TupleDescriptor& desc, MemPool
if (!slot_desc->type().is_string_type() &&
!slot_desc->type().is_date_type()) {
memcpy((void*)dst->get_slot(slot_desc->tuple_offset()),
data_ref.data, data_ref.size);
- } else if (slot_desc->type().is_string_type() && slot_desc->type() !=
TYPE_OBJECT && slot_desc->type() != TYPE_HLL) {
+ } else if (slot_desc->type().is_string_type() && slot_desc->type() !=
TYPE_OBJECT &&
+ slot_desc->type() != TYPE_HLL) {
memcpy((void*)dst->get_slot(slot_desc->tuple_offset()), (const
void*)(&data_ref),
sizeof(data_ref));
// Copy the content of string
diff --git a/be/src/vec/functions/function_case.h
b/be/src/vec/functions/function_case.h
index 1b728e9..7cea64a 100644
--- a/be/src/vec/functions/function_case.h
+++ b/be/src/vec/functions/function_case.h
@@ -17,6 +17,7 @@
#pragma once
+#include "vec/columns/column_complex.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/functions/function.h"
#include "vec/functions/function_helpers.h"
@@ -188,7 +189,9 @@ public:
uint8* then_idx, CaseWhenColumnHolder&
column_holder) {
auto result_column_ptr = data_type->create_column();
- if constexpr (std::is_same_v<ColumnType, ColumnString>) {
+ if constexpr (std::is_same_v<ColumnType, ColumnString> ||
+ std::is_same_v<ColumnType, ColumnBitmap> ||
+ std::is_same_v<ColumnType, ColumnHLL>) {
// result_column and all then_column is not nullable.
// can't simd when type is string.
update_result_normal(result_column_ptr, then_idx, column_holder);
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index a6af2ee..2fa6992 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -15,18 +15,18 @@
// specific language governing permissions and limitations
// under the License.
-#include "util/doris_metrics.h"
+#include "vec/sink/vtablet_sink.h"
+#include "util/doris_metrics.h"
+#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
-#include "vec/sink/vtablet_sink.h"
-#include "vec/core/block.h"
namespace doris {
namespace stream_load {
VOlapTableSink::VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
- const std::vector<TExpr>& texprs, Status* status)
+ const std::vector<TExpr>& texprs, Status*
status)
: OlapTableSink(pool, row_desc, texprs, status) {
// From the thrift expressions create the real exprs.
vectorized::VExpr::create_expr_trees(pool, texprs, &_output_vexpr_ctxs);
@@ -43,7 +43,8 @@ Status VOlapTableSink::init(const TDataSink& sink) {
Status VOlapTableSink::prepare(RuntimeState* state) {
// Prepare the exprs to run.
- RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_input_row_desc, _expr_mem_tracker));
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_input_row_desc,
+ _expr_mem_tracker));
return OlapTableSink::prepare(state);
}
@@ -58,7 +59,9 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block)
auto rows = input_block->rows();
auto bytes = input_block->bytes();
- if (UNLIKELY(rows == 0)) { return status; }
+ if (UNLIKELY(rows == 0)) {
+ return status;
+ }
SCOPED_TIMER(_profile->total_time_counter());
_number_input_rows += rows;
@@ -73,8 +76,10 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block)
if (!_output_vexpr_ctxs.empty()) {
// Do vectorized expr here to speed up load
block = vectorized::VExprContext::get_output_block_after_execute_exprs(
- _output_vexpr_ctxs, *input_block, status);
- if (UNLIKELY(block.rows() == 0)) { return status; }
+ _output_vexpr_ctxs, *input_block, status);
+ if (UNLIKELY(block.rows() == 0)) {
+ return status;
+ }
}
auto num_rows = block.rows();
@@ -83,7 +88,8 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block)
SCOPED_RAW_TIMER(&_validate_data_ns);
_filter_bitmap.Reset(block.rows());
bool stop_processing = false;
- RETURN_IF_ERROR(_validate_data(state, &block, &_filter_bitmap,
&filtered_rows, &stop_processing));
+ RETURN_IF_ERROR(
+ _validate_data(state, &block, &_filter_bitmap, &filtered_rows,
&stop_processing));
_number_filtered_rows += filtered_rows;
if (stop_processing) {
// should be returned after updating "_number_filtered_rows", to
make sure that load job can be cancelled
@@ -107,12 +113,14 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block)
uint32_t tablet_index = 0;
block_row = {&block, i};
if (!_vpartition->find_partition(&block_row, &partition)) {
- RETURN_IF_ERROR(state->append_error_msg_to_file([]() ->
std::string { return ""; },
+ RETURN_IF_ERROR(state->append_error_msg_to_file(
+ []() -> std::string { return ""; },
[&]() -> std::string {
- fmt::memory_buffer buf;
- fmt::format_to(buf, "no partition for this tuple.
tuple=[]");
- return buf.data();
- }, &stop_processing));
+ fmt::memory_buffer buf;
+ fmt::format_to(buf, "no partition for this tuple.
tuple=[]");
+ return buf.data();
+ },
+ &stop_processing));
_number_filtered_rows++;
if (stop_processing) {
return Status::EndOfFile("Encountered unqualified data, stop
processing");
@@ -150,24 +158,28 @@ Status VOlapTableSink::close(RuntimeState* state, Status
exec_status) {
return OlapTableSink::close(state, exec_status);
}
-Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block*
block, Bitmap* filter_bitmap, int* filtered_rows,
- bool* stop_processing) {
+Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block*
block,
+ Bitmap* filter_bitmap, int*
filtered_rows,
+ bool* stop_processing) {
const auto num_rows = block->rows();
fmt::memory_buffer error_msg;
auto set_invalid_and_append_error_msg = [&](int row) {
- filter_bitmap->Set(row, true);
- return state->append_error_msg_to_file([]() -> std::string { return
""; },
- [&error_msg]() -> std::string { return error_msg.data(); },
stop_processing);
+ filter_bitmap->Set(row, true);
+ return state->append_error_msg_to_file(
+ []() -> std::string { return ""; },
+ [&error_msg]() -> std::string { return error_msg.data(); },
stop_processing);
};
for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
SlotDescriptor* desc = _output_tuple_desc->slots()[i];
- block->get_by_position(i).column =
block->get_by_position(i).column->convert_to_full_column_if_const();
+ block->get_by_position(i).column =
+
block->get_by_position(i).column->convert_to_full_column_if_const();
const auto& column = block->get_by_position(i).column;
if (desc->is_nullable() && desc->type() == TYPE_OBJECT) {
- const auto& null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)
- ->get_null_map_data();
+ const auto& null_map =
+
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)
+ ->get_null_map_data();
fmt::format_to(error_msg, "null is not allowed for bitmap column,
column_name: {}; ",
desc->col_name());
@@ -180,85 +192,103 @@ Status VOlapTableSink::_validate_data(RuntimeState*
state, vectorized::Block* bl
}
} else {
auto column_ptr =
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column);
- auto& real_column_ptr = column_ptr == nullptr ? column :
(column_ptr->get_nested_column_ptr());
+ auto& real_column_ptr =
+ column_ptr == nullptr ? column :
(column_ptr->get_nested_column_ptr());
switch (desc->type().type) {
- case TYPE_CHAR:
- case TYPE_VARCHAR:
- case TYPE_STRING: {
- const auto column_string = assert_cast<const
vectorized::ColumnString *>(real_column_ptr.get());
-
- for (int j = 0; j < num_rows; ++j) {
- if (!filter_bitmap->Get(j)) {
- auto str_val = column_string->get_data_at(j);
- bool invalid = str_val.size >
std::min(desc->type().len, (int)MAX_SIZE_OF_VEC_STRING);
-
- error_msg.clear();
- if (str_val.size > desc->type().len) {
- fmt::format_to(error_msg, "{}", "the length of
input is too long than schema. ");
- fmt::format_to(error_msg, "column_name: {}; ",
desc->col_name());
- fmt::format_to(error_msg, "input str: [{}] ",
str_val.to_string());
- fmt::format_to(error_msg, "schema length: {};
", desc->type().len);
- fmt::format_to(error_msg, "actual length: {};
", str_val.size);
- } else if (str_val.size > MAX_SIZE_OF_VEC_STRING) {
- fmt::format_to(error_msg, "{}", "the length of
input string is too long than vec schema. ");
- fmt::format_to(error_msg, "column_name: {}; ",
desc->col_name());
- fmt::format_to(error_msg, "input str: [{}] ",
str_val.to_string());
- fmt::format_to(error_msg, "schema length: {};
", MAX_SIZE_OF_VEC_STRING);
- fmt::format_to(error_msg, "actual length: {};
", str_val.size);
- }
+ case TYPE_CHAR:
+ case TYPE_VARCHAR:
+ case TYPE_STRING: {
+ const auto column_string =
+ assert_cast<const
vectorized::ColumnString*>(real_column_ptr.get());
+
+ size_t limit = MAX_SIZE_OF_VEC_STRING;
+ if (desc->type().type != TYPE_STRING) {
+ DCHECK(desc->type().len >= 0);
+ limit = std::min(limit, (size_t)desc->type().len);
+ }
- if (invalid) {
-
RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
- }
+ for (int j = 0; j < num_rows; ++j) {
+ if (!filter_bitmap->Get(j)) {
+ auto str_val = column_string->get_data_at(j);
+ bool invalid = str_val.size > limit;
+
+ error_msg.clear();
+ if (str_val.size > desc->type().len) {
+ fmt::format_to(error_msg, "{}",
+ "the length of input is too long
than schema. ");
+ fmt::format_to(error_msg, "column_name: {}; ",
desc->col_name());
+ fmt::format_to(error_msg, "input str: [{}] ",
str_val.to_string());
+ fmt::format_to(error_msg, "schema length: {}; ",
desc->type().len);
+ fmt::format_to(error_msg, "actual length: {}; ",
str_val.size);
+ } else if (str_val.size > MAX_SIZE_OF_VEC_STRING) {
+ fmt::format_to(
+ error_msg, "{}",
+ "the length of input string is too long
than vec schema. ");
+ fmt::format_to(error_msg, "column_name: {}; ",
desc->col_name());
+ fmt::format_to(error_msg, "input str: [{}] ",
str_val.to_string());
+ fmt::format_to(error_msg, "schema length: {}; ",
+ MAX_SIZE_OF_VEC_STRING);
+ fmt::format_to(error_msg, "actual length: {}; ",
str_val.size);
+ }
+
+ if (invalid) {
+
RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
}
}
- break;
}
- case TYPE_DECIMALV2: {
- auto column_decimal = const_cast<vectorized::ColumnDecimal
- <vectorized::Decimal128> *>(assert_cast<const
vectorized::ColumnDecimal
- <vectorized::Decimal128>
*>(real_column_ptr.get()));
-
- for (int j = 0; j < num_rows; ++j) {
- if (!filter_bitmap->Get(j)) {
- auto dec_val = binary_cast<vectorized::Int128,
DecimalV2Value>(
- column_decimal->get_data()[j]);
- error_msg.clear();
- bool invalid = false;
-
- if
(dec_val.greater_than_scale(desc->type().scale)) {
- auto code = dec_val.round(&dec_val,
desc->type().scale, HALF_UP);
- column_decimal->get_data()[j] =
binary_cast<DecimalV2Value, vectorized::Int128>(
- dec_val);
-
- if (code != E_DEC_OK) {
- fmt::format_to(error_msg, "round one
decimal failed.value={}; ", dec_val.to_string());
- invalid = true;
- }
- }
- if (dec_val > _max_decimalv2_val[i] || dec_val <
_min_decimalv2_val[i]) {
- fmt::format_to(error_msg, "decimal value is
not valid for definition, column={}", desc->col_name());
- fmt::format_to(error_msg, ", value={}",
dec_val.to_string());
- fmt::format_to(error_msg, ", precision={},
scale={}; ", desc->type().precision, desc->type().scale);
+ break;
+ }
+ case TYPE_DECIMALV2: {
+ auto column_decimal = const_cast<
+ vectorized::ColumnDecimal<vectorized::Decimal128>*>(
+ assert_cast<const
vectorized::ColumnDecimal<vectorized::Decimal128>*>(
+ real_column_ptr.get()));
+
+ for (int j = 0; j < num_rows; ++j) {
+ if (!filter_bitmap->Get(j)) {
+ auto dec_val = binary_cast<vectorized::Int128,
DecimalV2Value>(
+ column_decimal->get_data()[j]);
+ error_msg.clear();
+ bool invalid = false;
+
+ if (dec_val.greater_than_scale(desc->type().scale)) {
+ auto code = dec_val.round(&dec_val,
desc->type().scale, HALF_UP);
+ column_decimal->get_data()[j] =
+ binary_cast<DecimalV2Value,
vectorized::Int128>(dec_val);
+
+ if (code != E_DEC_OK) {
+ fmt::format_to(error_msg, "round one decimal
failed.value={}; ",
+ dec_val.to_string());
invalid = true;
}
+ }
+ if (dec_val > _max_decimalv2_val[i] || dec_val <
_min_decimalv2_val[i]) {
+ fmt::format_to(error_msg,
+ "decimal value is not valid for
definition, column={}",
+ desc->col_name());
+ fmt::format_to(error_msg, ", value={}",
dec_val.to_string());
+ fmt::format_to(error_msg, ", precision={},
scale={}; ",
+ desc->type().precision,
desc->type().scale);
+ invalid = true;
+ }
- if (invalid) {
-
RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
- }
+ if (invalid) {
+
RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
}
}
- break;
}
- default:
- break;
+ break;
+ }
+ default:
+ break;
}
// Dispose the nullable column not match problem here, convert to
nullable column
if (desc->is_nullable() && !column_ptr) {
block->get_by_position(i).column =
vectorized::make_nullable(column);
- block->get_by_position(i).type =
vectorized::make_nullable(block->get_by_position(i).type);
+ block->get_by_position(i).type =
+
vectorized::make_nullable(block->get_by_position(i).type);
}
// Dispose the nullable column not match problem here, convert to
not nullable column
@@ -272,8 +302,10 @@ Status VOlapTableSink::_validate_data(RuntimeState* state,
vectorized::Block* bl
}
}
block->get_by_position(i).column =
column_ptr->get_nested_column_ptr();
- block->get_by_position(i).type = (reinterpret_cast<const
vectorized::DataTypeNullable*>(
-
block->get_by_position(i).type.get()))->get_nested_type();
+ block->get_by_position(i).type =
+ (reinterpret_cast<const vectorized::DataTypeNullable*>(
+ block->get_by_position(i).type.get()))
+ ->get_nested_type();
}
}
}
@@ -287,4 +319,3 @@ Status VOlapTableSink::_validate_data(RuntimeState* state,
vectorized::Block* bl
} // namespace stream_load
} // namespace doris
-
diff --git a/be/src/vec/utils/template_helpers.hpp
b/be/src/vec/utils/template_helpers.hpp
index 10aac23..7fe681a 100644
--- a/be/src/vec/utils/template_helpers.hpp
+++ b/be/src/vec/utils/template_helpers.hpp
@@ -19,6 +19,7 @@
#include "http/http_status.h"
#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_complex.h"
#include "vec/columns/columns_number.h"
#include "vec/data_types/data_type.h"
#include "vec/functions/function.h"
@@ -44,11 +45,16 @@
M(Date, ColumnInt64) \
M(DateTime, ColumnInt64)
+#define COMPLEX_TYPE_TO_COLUMN_TYPE(M) \
+ M(BitMap, ColumnBitmap) \
+ M(HLL, ColumnHLL)
+
#define TYPE_TO_COLUMN_TYPE(M) \
NUMERIC_TYPE_TO_COLUMN_TYPE(M) \
DECIMAL_TYPE_TO_COLUMN_TYPE(M) \
STRING_TYPE_TO_COLUMN_TYPE(M) \
- TIME_TYPE_TO_COLUMN_TYPE(M)
+ TIME_TYPE_TO_COLUMN_TYPE(M) \
+ COMPLEX_TYPE_TO_COLUMN_TYPE(M)
namespace doris::vectorized {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]