This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 664fbffcba [Enchancement](table-function) optimization for vectorized
table function (#17973)
664fbffcba is described below
commit 664fbffcbaf861d8ce8b1e9211c5dfc8c01bf130
Author: Pxl <[email protected]>
AuthorDate: Wed Mar 29 10:45:00 2023 +0800
[Enchancement](table-function) optimization for vectorized table function
(#17973)
---
be/src/vec/exec/vtable_function_node.cpp | 75 ++++++++++-----------
be/src/vec/exec/vtable_function_node.h | 31 ++++++---
be/src/vec/exprs/table_function/table_function.h | 58 ++++++++--------
.../exprs/table_function/table_function_factory.h | 3 +-
be/src/vec/exprs/table_function/vexplode.cpp | 53 +++------------
be/src/vec/exprs/table_function/vexplode.h | 14 ++--
.../vec/exprs/table_function/vexplode_bitmap.cpp | 78 ++++++++--------------
be/src/vec/exprs/table_function/vexplode_bitmap.h | 10 +--
.../exprs/table_function/vexplode_json_array.cpp | 40 +++--------
.../vec/exprs/table_function/vexplode_json_array.h | 35 ++++------
.../vec/exprs/table_function/vexplode_numbers.cpp | 74 +++++++++++---------
be/src/vec/exprs/table_function/vexplode_numbers.h | 34 ++++++++--
be/src/vec/exprs/table_function/vexplode_split.cpp | 40 +++--------
be/src/vec/exprs/table_function/vexplode_split.h | 6 +-
be/test/vec/function/function_test_util.cpp | 24 ++-----
.../test_varchar_schema_change.groovy | 12 ++--
16 files changed, 249 insertions(+), 338 deletions(-)
diff --git a/be/src/vec/exec/vtable_function_node.cpp
b/be/src/vec/exec/vtable_function_node.cpp
index 32cfee7234..39ffb4fad7 100644
--- a/be/src/vec/exec/vtable_function_node.cpp
+++ b/be/src/vec/exec/vtable_function_node.cpp
@@ -43,8 +43,6 @@ Status VTableFunctionNode::init(const TPlanNode& tnode,
RuntimeState* state) {
_fns.push_back(fn);
}
_fn_num = _fns.size();
- _fn_values.resize(_fn_num);
- _fn_value_lengths.resize(_fn_num);
// Prepare output slot ids
RETURN_IF_ERROR(_prepare_output_slot_ids(tnode));
@@ -104,6 +102,14 @@ Status VTableFunctionNode::prepare(RuntimeState* state) {
}
}
+ for (size_t i = 0; i < _child_slots.size(); i++) {
+ if (_slot_need_copy(i)) {
+ _output_slot_indexs.push_back(i);
+ } else {
+ _useless_slot_indexs.push_back(i);
+ }
+ }
+
_cur_child_offset = -1;
return Status::OK();
@@ -121,7 +127,7 @@ Status VTableFunctionNode::get_next(RuntimeState* state,
Block* block, bool* eos
RETURN_IF_ERROR_AND_CHECK_SPAN(
child(0)->get_next_after_projects(
state, &_child_block, &_child_eos,
- std::bind((Status(ExecNode::*)(RuntimeState*,
vectorized::Block*, bool*)) &
+ std::bind((Status(ExecNode::*)(RuntimeState*, Block*,
bool*)) &
ExecNode::get_next,
_children[0], std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3)),
@@ -133,11 +139,12 @@ Status VTableFunctionNode::get_next(RuntimeState* state,
Block* block, bool* eos
return pull(state, block, eos);
}
-Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block*
output_block, bool* eos) {
+Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block*
output_block,
+ bool* eos) {
size_t column_size = _output_slots.size();
bool mem_reuse = output_block->mem_reuse();
- std::vector<vectorized::MutableColumnPtr> columns(column_size);
+ std::vector<MutableColumnPtr> columns(column_size);
for (size_t i = 0; i < column_size; i++) {
if (mem_reuse) {
columns[i] =
std::move(*output_block->get_by_position(i).column).mutate();
@@ -146,6 +153,12 @@ Status
VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output
}
}
+ for (int i = 0; i < _fn_num; i++) {
+ if (columns[i + _child_slots.size()]->is_nullable()) {
+ _fns[i]->set_nullable();
+ }
+ }
+
while (columns[_child_slots.size()]->size() < state->batch_size()) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while
getting next batch."));
@@ -158,6 +171,7 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState*
state, Block* output
while (columns[_child_slots.size()]->size() < state->batch_size()) {
int idx = _find_last_fn_eos_idx();
if (idx == 0 || skip_child_row) {
+ _copy_output_slots(columns);
// all table functions' results are exhausted, process next
child row.
RETURN_IF_ERROR(_process_next_child_row());
if (_cur_child_offset == -1) {
@@ -175,41 +189,25 @@ Status
VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output
if (skip_child_row = _is_inner_and_empty(); skip_child_row) {
continue;
}
-
- // get slots from every table function.
- // notice that _fn_values[i] may be null if the table function has
empty result set.
- for (int i = 0; i < _fn_num; i++) {
- RETURN_IF_ERROR(_fns[i]->get_value(&_fn_values[i]));
-
RETURN_IF_ERROR(_fns[i]->get_value_length(&_fn_value_lengths[i]));
- }
-
- // The tuples order in parent row batch should be
- // child1, child2, tf1, tf2, ...
-
- // 1. copy data from child_block.
- for (int i = 0; i < _child_slots.size(); i++) {
- if (!slot_need_copy(i)) {
- columns[i]->insert_default();
- continue;
+ if (_fn_num == 1) {
+ _current_row_insert_times += _fns[0]->get_value(
+ columns[_child_slots.size()],
+ state->batch_size() -
columns[_child_slots.size()]->size());
+ } else {
+ for (int i = 0; i < _fn_num; i++) {
+ _fns[i]->get_value(columns[i + _child_slots.size()]);
}
- auto src_column = _child_block.get_by_position(i).column;
- columns[i]->insert_from(*src_column, _cur_child_offset);
+ _current_row_insert_times++;
+ _fns[_fn_num - 1]->forward();
}
+ }
+ }
- // 2. copy function result
- for (int i = 0; i < _fns.size(); i++) {
- int output_slot_idx = i + _child_slots.size();
- if (_fn_values[i] == nullptr) {
- columns[output_slot_idx]->insert_default();
- } else {
-
columns[output_slot_idx]->insert_data(reinterpret_cast<char*>(_fn_values[i]),
-
_fn_value_lengths[i]);
- }
- }
+ _copy_output_slots(columns);
- bool tmp = false;
- _fns[_fn_num - 1]->forward(&tmp);
- }
+ size_t row_size = columns[_child_slots.size()]->size();
+ for (auto index : _useless_slot_indexs) {
+ columns[index]->insert_many_defaults(row_size -
columns[index]->size());
}
if (!columns.empty() && !columns[0]->empty()) {
@@ -292,11 +290,10 @@ int VTableFunctionNode::_find_last_fn_eos_idx() {
// If `last_eos_idx` is 1, which means f2 and f3 are eos.
// So we need to forward f1, and reset f2 and f3.
bool VTableFunctionNode::_roll_table_functions(int last_eos_idx) {
- bool fn_eos = false;
int i = last_eos_idx - 1;
for (; i >= 0; --i) {
- _fns[i]->forward(&fn_eos);
- if (!fn_eos) {
+ _fns[i]->forward();
+ if (!_fns[i]->eos()) {
break;
}
}
diff --git a/be/src/vec/exec/vtable_function_node.h
b/be/src/vec/exec/vtable_function_node.h
index 99d2394514..2aad138f4a 100644
--- a/be/src/vec/exec/vtable_function_node.h
+++ b/be/src/vec/exec/vtable_function_node.h
@@ -39,7 +39,7 @@ public:
bool need_more_input_data() const { return !_child_block.rows() &&
!_child_eos; }
void release_resource(doris::RuntimeState* state) override {
- vectorized::VExpr::close(_vfn_ctxs, state);
+ VExpr::close(_vfn_ctxs, state);
if (_num_rows_filtered_counter != nullptr) {
COUNTER_SET(_num_rows_filtered_counter,
static_cast<int64_t>(_num_rows_filtered));
@@ -47,7 +47,7 @@ public:
ExecNode::release_resource(state);
}
- Status push(RuntimeState*, vectorized::Block* input_block, bool eos)
override {
+ Status push(RuntimeState*, Block* input_block, bool eos) override {
_child_eos = eos;
if (input_block->rows() == 0) {
return Status::OK();
@@ -60,8 +60,8 @@ public:
return Status::OK();
}
- Status pull(RuntimeState* state, vectorized::Block* output_block, bool*
eos) override {
- RETURN_IF_ERROR(get_expanded_block(state, output_block, eos));
+ Status pull(RuntimeState* state, Block* output_block, bool* eos) override {
+ RETURN_IF_ERROR(_get_expanded_block(state, output_block, eos));
reached_limit(output_block, eos);
return Status::OK();
}
@@ -97,26 +97,39 @@ private:
1. FE: create a new output tuple based on the real output slots;
2. BE: refractor (V)TableFunctionNode output rows based no the new
tuple;
*/
- inline bool slot_need_copy(SlotId slot_id) const {
+ inline bool _slot_need_copy(SlotId slot_id) const {
auto id = _output_slots[slot_id]->id();
return (id < _output_slot_ids.size()) && (_output_slot_ids[id]);
}
- Status get_expanded_block(RuntimeState* state, Block* output_block, bool*
eos);
+ Status _get_expanded_block(RuntimeState* state, Block* output_block, bool*
eos);
+
+ void _copy_output_slots(std::vector<MutableColumnPtr>& columns) {
+ if (!_current_row_insert_times) {
+ return;
+ }
+ for (auto index : _output_slot_indexs) {
+ auto src_column = _child_block.get_by_position(index).column;
+ columns[index]->insert_many_from(*src_column, _cur_child_offset,
+ _current_row_insert_times);
+ }
+ _current_row_insert_times = 0;
+ }
+ int _current_row_insert_times = 0;
Block _child_block;
std::vector<SlotDescriptor*> _child_slots;
std::vector<SlotDescriptor*> _output_slots;
int64_t _cur_child_offset = 0;
- std::vector<vectorized::VExprContext*> _vfn_ctxs;
+ std::vector<VExprContext*> _vfn_ctxs;
std::vector<TableFunction*> _fns;
- std::vector<void*> _fn_values;
- std::vector<int64_t> _fn_value_lengths;
int _fn_num = 0;
std::vector<bool> _output_slot_ids;
+ std::vector<int> _output_slot_indexs;
+ std::vector<int> _useless_slot_indexs;
std::vector<int> _child_slot_sizes;
// indicate if child node reach the end
diff --git a/be/src/vec/exprs/table_function/table_function.h
b/be/src/vec/exprs/table_function/table_function.h
index 68e6829df5..7fff88899b 100644
--- a/be/src/vec/exprs/table_function/table_function.h
+++ b/be/src/vec/exprs/table_function/table_function.h
@@ -30,44 +30,49 @@ constexpr auto COMBINATOR_SUFFIX_OUTER = "_outer";
class TableFunction {
public:
- virtual ~TableFunction() {}
+ virtual ~TableFunction() = default;
virtual Status prepare() { return Status::OK(); }
virtual Status open() { return Status::OK(); }
- // only used for vectorized.
- virtual Status process_init(vectorized::Block* block) = 0;
+ virtual Status process_init(Block* block) = 0;
- // only used for vectorized.
- virtual Status process_row(size_t row_idx) = 0;
+ virtual Status process_row(size_t row_idx) {
+ _cur_size = 0;
+ return reset();
+ }
// only used for vectorized.
virtual Status process_close() = 0;
- virtual Status reset() = 0;
+ virtual Status reset() {
+ _eos = false;
+ _cur_offset = 0;
+ return Status::OK();
+ }
- virtual Status get_value(void** output) = 0;
+ virtual void get_value(MutableColumnPtr& column) = 0;
- // only used for vectorized.
- virtual Status get_value_length(int64_t* length) {
- *length = -1;
- return Status::OK();
+ virtual int get_value(MutableColumnPtr& column, int max_step) {
+ max_step = std::max(1, std::min(max_step, (int)(_cur_size -
_cur_offset)));
+ int i = 0;
+ for (; i < max_step && !eos(); i++) {
+ get_value(column);
+ forward();
+ }
+ return i;
}
virtual Status close() { return Status::OK(); }
- virtual Status forward(bool* eos) {
- if (_is_current_empty) {
- *eos = true;
+ virtual Status forward(int step = 1) {
+ if (current_empty()) {
_eos = true;
} else {
- ++_cur_offset;
- if (_cur_offset == _cur_size) {
- *eos = true;
+ _cur_offset += step;
+ if (_cur_offset >= _cur_size) {
_eos = true;
- } else {
- *eos = false;
}
}
return Status::OK();
@@ -76,9 +81,8 @@ public:
std::string name() const { return _fn_name; }
bool eos() const { return _eos; }
- void set_vexpr_context(vectorized::VExprContext* vexpr_context) {
- _vexpr_context = vexpr_context;
- }
+ void set_vexpr_context(VExprContext* vexpr_context) { _vexpr_context =
vexpr_context; }
+ void set_nullable() { _is_nullable = true; }
bool is_outer() const { return _is_outer; }
void set_outer() {
@@ -89,21 +93,21 @@ public:
_fn_name += COMBINATOR_SUFFIX_OUTER;
}
- bool current_empty() const { return _is_current_empty; }
+ bool current_empty() const { return _cur_size == 0; }
protected:
std::string _fn_name;
- vectorized::VExprContext* _vexpr_context = nullptr;
+ VExprContext* _vexpr_context = nullptr;
// true if there is no more data can be read from this function.
bool _eos = false;
- // true means the function result set from current row is empty(eg, source
value is null or empty).
- // so that when calling reset(), we can do nothing and keep eos as false.
- bool _is_current_empty = false;
// the position of current cursor
int64_t _cur_offset = 0;
// the size of current result
int64_t _cur_size = 0;
// set _is_outer to false for explode function, and should not return
tuple while array is null or empty
bool _is_outer = false;
+
+ bool _is_nullable = false;
+ bool _is_const = false;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/table_function/table_function_factory.h
b/be/src/vec/exprs/table_function/table_function_factory.h
index 456dba0ddc..d299af0a9c 100644
--- a/be/src/vec/exprs/table_function/table_function_factory.h
+++ b/be/src/vec/exprs/table_function/table_function_factory.h
@@ -36,8 +36,7 @@ namespace vectorized {
class TableFunction;
class TableFunctionFactory {
public:
- TableFunctionFactory() {}
- ~TableFunctionFactory() {}
+ TableFunctionFactory() = delete;
static Status get_fn(const std::string& fn_name_raw, ObjectPool* pool,
TableFunction** fn);
const static std::unordered_map<std::string,
std::function<TableFunction*()>> _function_map;
diff --git a/be/src/vec/exprs/table_function/vexplode.cpp
b/be/src/vec/exprs/table_function/vexplode.cpp
index aa0249916a..b2c93196a8 100644
--- a/be/src/vec/exprs/table_function/vexplode.cpp
+++ b/be/src/vec/exprs/table_function/vexplode.cpp
@@ -26,7 +26,7 @@ VExplodeTableFunction::VExplodeTableFunction() {
_fn_name = "vexplode";
}
-Status VExplodeTableFunction::process_init(vectorized::Block* block) {
+Status VExplodeTableFunction::process_init(Block* block) {
CHECK(_vexpr_context->root()->children().size() == 1)
<< "VExplodeTableFunction only support 1 child but has "
<< _vexpr_context->root()->children().size();
@@ -48,17 +48,12 @@ Status
VExplodeTableFunction::process_init(vectorized::Block* block) {
Status VExplodeTableFunction::process_row(size_t row_idx) {
DCHECK(row_idx < _array_column->size());
- _is_current_empty = false;
- _eos = false;
- _cur_offset = 0;
- _array_offset = (*_detail.offsets_ptr)[row_idx - 1];
- _cur_size = (*_detail.offsets_ptr)[row_idx] - _array_offset;
+ RETURN_IF_ERROR(TableFunction::process_row(row_idx));
- // array is NULL, or array is empty
- if (_cur_size == 0 || (_detail.array_nullmap_data &&
_detail.array_nullmap_data[row_idx])) {
- _is_current_empty = true;
+ if (!_detail.array_nullmap_data || !_detail.array_nullmap_data[row_idx]) {
+ _array_offset = (*_detail.offsets_ptr)[row_idx - 1];
+ _cur_size = (*_detail.offsets_ptr)[row_idx] - _array_offset;
}
-
return Status::OK();
}
@@ -69,42 +64,14 @@ Status VExplodeTableFunction::process_close() {
return Status::OK();
}
-Status VExplodeTableFunction::reset() {
- _eos = false;
- _cur_offset = 0;
- return Status::OK();
-}
-
-Status VExplodeTableFunction::get_value(void** output) {
- if (_is_current_empty) {
- *output = nullptr;
- return Status::OK();
- }
-
+void VExplodeTableFunction::get_value(MutableColumnPtr& column) {
size_t pos = _array_offset + _cur_offset;
- if (_detail.nested_nullmap_data && _detail.nested_nullmap_data[pos]) {
- *output = nullptr;
+ if (current_empty() || (_detail.nested_nullmap_data &&
_detail.nested_nullmap_data[pos])) {
+ column->insert_default();
} else {
- *output = const_cast<char*>(_detail.nested_col->get_data_at(pos).data);
+
column->insert_data(const_cast<char*>(_detail.nested_col->get_data_at(pos).data),
+ _detail.nested_col->get_data_at(pos).size);
}
-
- return Status::OK();
-}
-
-Status VExplodeTableFunction::get_value_length(int64_t* length) {
- if (_is_current_empty) {
- *length = -1;
- return Status::OK();
- }
-
- size_t pos = _array_offset + _cur_offset;
- if (_detail.nested_nullmap_data && _detail.nested_nullmap_data[pos]) {
- *length = 0;
- } else {
- *length = _detail.nested_col->get_data_at(pos).size;
- }
-
- return Status::OK();
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/table_function/vexplode.h
b/be/src/vec/exprs/table_function/vexplode.h
index 3bc8ba9ef0..8911c8f225 100644
--- a/be/src/vec/exprs/table_function/vexplode.h
+++ b/be/src/vec/exprs/table_function/vexplode.h
@@ -30,14 +30,12 @@ class VExplodeTableFunction : public TableFunction {
public:
VExplodeTableFunction();
- virtual ~VExplodeTableFunction() = default;
-
- virtual Status process_init(vectorized::Block* block) override;
- virtual Status process_row(size_t row_idx) override;
- virtual Status process_close() override;
- virtual Status reset() override;
- virtual Status get_value(void** output) override;
- virtual Status get_value_length(int64_t* length) override;
+ ~VExplodeTableFunction() override = default;
+
+ Status process_init(Block* block) override;
+ Status process_row(size_t row_idx) override;
+ Status process_close() override;
+ void get_value(MutableColumnPtr& column) override;
private:
ColumnPtr _array_column;
diff --git a/be/src/vec/exprs/table_function/vexplode_bitmap.cpp
b/be/src/vec/exprs/table_function/vexplode_bitmap.cpp
index f680922e7c..8dabbac8c3 100644
--- a/be/src/vec/exprs/table_function/vexplode_bitmap.cpp
+++ b/be/src/vec/exprs/table_function/vexplode_bitmap.cpp
@@ -19,6 +19,8 @@
#include "common/status.h"
#include "util/bitmap_value.h"
+#include "vec/columns/columns_number.h"
+#include "vec/exprs/table_function/table_function.h"
#include "vec/exprs/vexpr.h"
namespace doris::vectorized {
@@ -27,7 +29,7 @@ VExplodeBitmapTableFunction::VExplodeBitmapTableFunction() {
_fn_name = "vexplode_bitmap";
}
-Status VExplodeBitmapTableFunction::process_init(vectorized::Block* block) {
+Status VExplodeBitmapTableFunction::process_init(Block* block) {
CHECK(_vexpr_context->root()->children().size() == 1)
<< "VExplodeNumbersTableFunction must be have 1 children but have "
<< _vexpr_context->root()->children().size();
@@ -42,68 +44,53 @@ Status
VExplodeBitmapTableFunction::process_init(vectorized::Block* block) {
Status VExplodeBitmapTableFunction::reset() {
_eos = false;
- if (!_is_current_empty) {
- _reset_iterator();
+ _cur_offset = 0;
+ if (!current_empty()) {
+ _cur_iter.reset(new BitmapValueIterator(*_cur_bitmap));
}
return Status::OK();
}
-Status VExplodeBitmapTableFunction::forward(bool* eos) {
- if (_is_current_empty) {
- *eos = true;
- _eos = true;
- } else {
- ++(*_cur_iter);
- ++_cur_offset;
- if (_cur_offset == _cur_size) {
- *eos = true;
- _eos = true;
- } else {
- _cur_value = **_cur_iter;
- *eos = false;
+Status VExplodeBitmapTableFunction::forward(int step) {
+ if (!current_empty()) {
+ for (int i = 0; i < step; i++) {
+ ++(*_cur_iter);
}
}
- return Status::OK();
+ return TableFunction::forward(step);
}
-Status VExplodeBitmapTableFunction::get_value(void** output) {
- if (_is_current_empty) {
- *output = nullptr;
+void VExplodeBitmapTableFunction::get_value(MutableColumnPtr& column) {
+ if (current_empty()) {
+ column->insert_default();
} else {
- *output = &_cur_value;
+ if (_is_nullable) {
+ static_cast<ColumnInt64*>(
+
static_cast<ColumnNullable*>(column.get())->get_nested_column_ptr().get())
+ ->insert_value(**_cur_iter);
+ static_cast<ColumnUInt8*>(
+
static_cast<ColumnNullable*>(column.get())->get_null_map_column_ptr().get())
+ ->insert_default();
+ } else {
+ static_cast<ColumnInt64*>(column.get())->insert_value(**_cur_iter);
+ }
}
- return Status::OK();
-}
-
-void VExplodeBitmapTableFunction::_reset_iterator() {
- DCHECK(_cur_bitmap->cardinality() > 0) << _cur_bitmap->cardinality();
- _cur_iter.reset(new BitmapValueIterator(*_cur_bitmap));
- _cur_value = **_cur_iter;
- _cur_offset = 0;
}
Status VExplodeBitmapTableFunction::process_row(size_t row_idx) {
- _eos = false;
- _is_current_empty = false;
- _cur_size = 0;
- _cur_offset = 0;
+ RETURN_IF_ERROR(TableFunction::process_row(row_idx));
StringRef value = _value_column->get_data_at(row_idx);
- if (value.data == nullptr) {
- _is_current_empty = true;
- } else {
+ if (value.data) {
_cur_bitmap = reinterpret_cast<const BitmapValue*>(value.data);
_cur_size = _cur_bitmap->cardinality();
- if (_cur_size == 0) {
- _is_current_empty = true;
- } else {
- _reset_iterator();
+ if (!current_empty()) {
+ _cur_iter.reset(new BitmapValueIterator(*_cur_bitmap));
}
}
- _is_current_empty = (_cur_size == 0);
return Status::OK();
}
@@ -112,13 +99,4 @@ Status VExplodeBitmapTableFunction::process_close() {
return Status::OK();
}
-Status VExplodeBitmapTableFunction::get_value_length(int64_t* length) {
- if (_is_current_empty) {
- *length = -1;
- } else {
- *length = sizeof(uint64_t);
- }
- return Status::OK();
-}
-
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/table_function/vexplode_bitmap.h
b/be/src/vec/exprs/table_function/vexplode_bitmap.h
index 3d4b7c2ff0..dd47261c38 100644
--- a/be/src/vec/exprs/table_function/vexplode_bitmap.h
+++ b/be/src/vec/exprs/table_function/vexplode_bitmap.h
@@ -29,13 +29,12 @@ public:
~VExplodeBitmapTableFunction() override = default;
Status reset() override;
- Status get_value(void** output) override;
- Status forward(bool* eos) override;
+ void get_value(MutableColumnPtr& column) override;
+ Status forward(int step = 1) override;
- Status process_init(vectorized::Block* block) override;
+ Status process_init(Block* block) override;
Status process_row(size_t row_idx) override;
Status process_close() override;
- Status get_value_length(int64_t* length) override;
private:
void _reset_iterator();
@@ -43,9 +42,6 @@ private:
const BitmapValue* _cur_bitmap = nullptr;
// iterator of _cur_bitmap
std::unique_ptr<BitmapValueIterator> _cur_iter = nullptr;
- // current value read from bitmap, it will be referenced by
- // table function scan node.
- uint64_t _cur_value = 0;
ColumnPtr _value_column;
};
diff --git a/be/src/vec/exprs/table_function/vexplode_json_array.cpp
b/be/src/vec/exprs/table_function/vexplode_json_array.cpp
index f12fa617bc..b67467ea98 100644
--- a/be/src/vec/exprs/table_function/vexplode_json_array.cpp
+++ b/be/src/vec/exprs/table_function/vexplode_json_array.cpp
@@ -127,7 +127,7 @@
VExplodeJsonArrayTableFunction::VExplodeJsonArrayTableFunction(ExplodeJsonArrayT
_fn_name = "vexplode_json_array";
}
-Status VExplodeJsonArrayTableFunction::process_init(vectorized::Block* block) {
+Status VExplodeJsonArrayTableFunction::process_init(Block* block) {
CHECK(_vexpr_context->root()->children().size() == 1)
<< _vexpr_context->root()->children().size();
@@ -139,28 +139,15 @@ Status
VExplodeJsonArrayTableFunction::process_init(vectorized::Block* block) {
return Status::OK();
}
-Status VExplodeJsonArrayTableFunction::reset() {
- _eos = false;
- _cur_offset = 0;
- return Status::OK();
-}
-
Status VExplodeJsonArrayTableFunction::process_row(size_t row_idx) {
- _is_current_empty = false;
- _eos = false;
+ RETURN_IF_ERROR(TableFunction::process_row(row_idx));
StringRef text = _text_column->get_data_at(row_idx);
- if (text.data == nullptr) {
- _is_current_empty = true;
- } else {
+ if (text.data != nullptr) {
rapidjson::Document document;
document.Parse(text.data, text.size);
- if (UNLIKELY(document.HasParseError()) || !document.IsArray() ||
- document.GetArray().Size() == 0) {
- _is_current_empty = true;
- } else {
+ if (!document.HasParseError() && document.IsArray() &&
document.GetArray().Size()) {
_cur_size = _parsed_data.set_output(_type, document);
- _cur_offset = 0;
}
}
return Status::OK();
@@ -171,22 +158,13 @@ Status VExplodeJsonArrayTableFunction::process_close() {
return Status::OK();
}
-Status VExplodeJsonArrayTableFunction::get_value_length(int64_t* length) {
- if (_is_current_empty) {
- *length = -1;
- } else {
- _parsed_data.get_value_length(_type, _cur_offset, length);
- }
- return Status::OK();
-}
-
-Status VExplodeJsonArrayTableFunction::get_value(void** output) {
- if (_is_current_empty) {
- *output = nullptr;
+void VExplodeJsonArrayTableFunction::get_value(MutableColumnPtr& column) {
+ if (current_empty()) {
+ column->insert_default();
} else {
- _parsed_data.get_value(_type, _cur_offset, output, true);
+ column->insert_data((char*)_parsed_data.get_value(_type, _cur_offset,
true),
+ _parsed_data.get_value_length(_type, _cur_offset));
}
- return Status::OK();
}
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/table_function/vexplode_json_array.h
b/be/src/vec/exprs/table_function/vexplode_json_array.h
index ab171c7cef..9d7de170ff 100644
--- a/be/src/vec/exprs/table_function/vexplode_json_array.h
+++ b/be/src/vec/exprs/table_function/vexplode_json_array.h
@@ -63,33 +63,25 @@ struct ParsedData {
}
}
- void get_value(ExplodeJsonArrayType type, int64_t offset, void** output,
bool real = false) {
+ void* get_value(ExplodeJsonArrayType type, int64_t offset, bool real =
false) {
switch (type) {
case ExplodeJsonArrayType::INT:
case ExplodeJsonArrayType::DOUBLE:
- *output = _data[offset];
- break;
+ return _data[offset];
case ExplodeJsonArrayType::STRING:
- *output = _string_nulls[offset] ? nullptr
- : real ?
reinterpret_cast<void*>(_backup_string[offset].data())
- : &_data_string[offset];
- break;
+ return _string_nulls[offset] ? nullptr
+ : real ?
reinterpret_cast<void*>(_backup_string[offset].data())
+ : &_data_string[offset];
default:
- CHECK(false) << type;
+ return nullptr;
}
}
- void get_value_length(ExplodeJsonArrayType type, int64_t offset, int64_t*
length) {
- switch (type) {
- case ExplodeJsonArrayType::INT:
- case ExplodeJsonArrayType::DOUBLE:
- break;
- case ExplodeJsonArrayType::STRING:
- *length = _string_nulls[offset] ? -1 :
_backup_string[offset].size();
- break;
- default:
- CHECK(false) << type;
+ int64 get_value_length(ExplodeJsonArrayType type, int64_t offset) {
+ if (type == ExplodeJsonArrayType::STRING && !_string_nulls[offset]) {
+ return _backup_string[offset].size();
}
+ return 0;
}
int set_output(ExplodeJsonArrayType type, rapidjson::Document& document);
@@ -100,13 +92,10 @@ public:
VExplodeJsonArrayTableFunction(ExplodeJsonArrayType type);
~VExplodeJsonArrayTableFunction() override = default;
- Status process_init(vectorized::Block* block) override;
+ Status process_init(Block* block) override;
Status process_row(size_t row_idx) override;
Status process_close() override;
- Status get_value(void** output) override;
- Status get_value_length(int64_t* length) override;
-
- Status reset() override;
+ void get_value(MutableColumnPtr& column) override;
private:
ParsedData _parsed_data;
diff --git a/be/src/vec/exprs/table_function/vexplode_numbers.cpp
b/be/src/vec/exprs/table_function/vexplode_numbers.cpp
index 1c01653ed2..d1a5eb07dc 100644
--- a/be/src/vec/exprs/table_function/vexplode_numbers.cpp
+++ b/be/src/vec/exprs/table_function/vexplode_numbers.cpp
@@ -18,6 +18,9 @@
#include "vec/exprs/table_function/vexplode_numbers.h"
#include "common/status.h"
+#include "vec/columns/column_const.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/columns_number.h"
#include "vec/exprs/vexpr.h"
namespace doris::vectorized {
@@ -26,7 +29,7 @@ VExplodeNumbersTableFunction::VExplodeNumbersTableFunction() {
_fn_name = "vexplode_numbers";
}
-Status VExplodeNumbersTableFunction::process_init(vectorized::Block* block) {
+Status VExplodeNumbersTableFunction::process_init(Block* block) {
CHECK(_vexpr_context->root()->children().size() == 1)
<< "VExplodeSplitTableFunction must be have 1 children but have "
<< _vexpr_context->root()->children().size();
@@ -35,24 +38,38 @@ Status
VExplodeNumbersTableFunction::process_init(vectorized::Block* block) {
RETURN_IF_ERROR(_vexpr_context->root()->children()[0]->execute(_vexpr_context,
block,
&value_column_idx));
_value_column = block->get_by_position(value_column_idx).column;
+ if (is_column_const(*_value_column)) {
+ _cur_size = 0;
+ auto& column_nested = assert_cast<const
ColumnConst&>(*_value_column).get_data_column_ptr();
+ if (column_nested->is_nullable()) {
+ if (!column_nested->is_null_at(0)) {
+ _cur_size = static_cast<const
ColumnNullable*>(column_nested.get())
+ ->get_nested_column()
+ .get_int(0);
+ }
+ } else {
+ _cur_size = column_nested->get_int(0);
+ }
+ if (_cur_size && _cur_size <= block->rows()) { // avoid
elements_column too big or empty
+ _is_const = true; // use const optimize
+ for (int i = 0; i < _cur_size; i++) {
+ ((ColumnInt32*)_elements_column.get())->insert_value(i);
+ }
+ }
+ }
return Status::OK();
}
Status VExplodeNumbersTableFunction::process_row(size_t row_idx) {
- _is_current_empty = false;
- _eos = false;
+ RETURN_IF_ERROR(TableFunction::process_row(row_idx));
+ if (_is_const) {
+ return Status::OK();
+ }
StringRef value = _value_column->get_data_at(row_idx);
-
- if (value.data == nullptr) {
- _is_current_empty = true;
- _cur_size = 0;
- _cur_offset = 0;
- } else {
- _cur_size = *reinterpret_cast<const int*>(value.data);
- _cur_offset = 0;
- _is_current_empty = (_cur_size <= 0);
+ if (value.data != nullptr) {
+ _cur_size = std::max(0, *reinterpret_cast<const int*>(value.data));
}
return Status::OK();
}
@@ -62,28 +79,21 @@ Status VExplodeNumbersTableFunction::process_close() {
return Status::OK();
}
-Status VExplodeNumbersTableFunction::reset() {
- _eos = false;
- _cur_offset = 0;
- return Status::OK();
-}
-
-Status VExplodeNumbersTableFunction::get_value(void** output) {
- if (_is_current_empty) {
- *output = nullptr;
+void VExplodeNumbersTableFunction::get_value(MutableColumnPtr& column) {
+ if (current_empty()) {
+ column->insert_default();
} else {
- *output = &_cur_offset;
+ if (_is_nullable) {
+ static_cast<ColumnInt32*>(
+
static_cast<ColumnNullable*>(column.get())->get_nested_column_ptr().get())
+ ->insert_value(_cur_offset);
+ static_cast<ColumnUInt8*>(
+
static_cast<ColumnNullable*>(column.get())->get_null_map_column_ptr().get())
+ ->insert_default();
+ } else {
+ static_cast<ColumnInt32*>(column.get())->insert_value(_cur_offset);
+ }
}
- return Status::OK();
-}
-
-Status VExplodeNumbersTableFunction::get_value_length(int64_t* length) {
- if (_is_current_empty) {
- *length = -1;
- } else {
- *length = sizeof(int);
- }
- return Status::OK();
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/table_function/vexplode_numbers.h
b/be/src/vec/exprs/table_function/vexplode_numbers.h
index e125e5d5b8..3e93471b47 100644
--- a/be/src/vec/exprs/table_function/vexplode_numbers.h
+++ b/be/src/vec/exprs/table_function/vexplode_numbers.h
@@ -25,17 +25,37 @@ namespace doris::vectorized {
class VExplodeNumbersTableFunction : public TableFunction {
public:
VExplodeNumbersTableFunction();
- virtual ~VExplodeNumbersTableFunction() = default;
+ ~VExplodeNumbersTableFunction() override = default;
- virtual Status process_init(vectorized::Block* block) override;
- virtual Status process_row(size_t row_idx) override;
- virtual Status process_close() override;
- virtual Status reset() override;
- virtual Status get_value(void** output) override;
- virtual Status get_value_length(int64_t* length) override;
+ Status process_init(Block* block) override;
+ Status process_row(size_t row_idx) override;
+ Status process_close() override;
+ void get_value(MutableColumnPtr& column) override;
+ int get_value(MutableColumnPtr& column, int max_step) override {
+ if (_is_const) {
+ max_step = std::min(max_step, (int)(_cur_size - _cur_offset));
+ if (_is_nullable) {
+ static_cast<ColumnInt32*>(
+
static_cast<ColumnNullable*>(column.get())->get_nested_column_ptr().get())
+ ->insert_many_from(*_elements_column, _cur_offset,
max_step);
+ static_cast<ColumnUInt8*>(
+
static_cast<ColumnNullable*>(column.get())->get_null_map_column_ptr().get())
+ ->insert_many_defaults(max_step);
+ } else {
+ static_cast<ColumnInt32*>(column.get())
+ ->insert_many_from(*_elements_column, _cur_offset,
max_step);
+ }
+
+ forward(max_step);
+ return max_step;
+ }
+
+ return TableFunction::get_value(column, max_step);
+ }
private:
ColumnPtr _value_column;
+ ColumnPtr _elements_column = ColumnInt32::create();
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/table_function/vexplode_split.cpp
b/be/src/vec/exprs/table_function/vexplode_split.cpp
index 1bceffeeba..d2438d8c9a 100644
--- a/be/src/vec/exprs/table_function/vexplode_split.cpp
+++ b/be/src/vec/exprs/table_function/vexplode_split.cpp
@@ -32,15 +32,7 @@ Status VExplodeSplitTableFunction::open() {
return Status::OK();
}
-Status VExplodeSplitTableFunction::reset() {
- _eos = false;
- if (!_is_current_empty) {
- _cur_offset = 0;
- }
- return Status::OK();
-}
-
-Status VExplodeSplitTableFunction::process_init(vectorized::Block* block) {
+Status VExplodeSplitTableFunction::process_init(Block* block) {
CHECK(_vexpr_context->root()->children().size() == 2)
<< "VExplodeSplitTableFunction must be have 2 children but have "
<< _vexpr_context->root()->children().size();
@@ -77,14 +69,9 @@ Status
VExplodeSplitTableFunction::process_init(vectorized::Block* block) {
}
Status VExplodeSplitTableFunction::process_row(size_t row_idx) {
- _is_current_empty = false;
- _eos = false;
+ RETURN_IF_ERROR(TableFunction::process_row(row_idx));
- if ((_test_null_map and _test_null_map[row_idx]) || _delimiter.data ==
nullptr) {
- _is_current_empty = true;
- _cur_size = 0;
- _cur_offset = 0;
- } else {
+ if (!(_test_null_map && _test_null_map[row_idx]) && _delimiter.data !=
nullptr) {
// TODO: use the function to be better string_view/StringRef split
auto split = [](std::string_view strv, std::string_view delims = " ") {
std::vector<std::string_view> output;
@@ -113,8 +100,6 @@ Status VExplodeSplitTableFunction::process_row(size_t
row_idx) {
_backup = split(_real_text_column->get_data_at(row_idx), _delimiter);
_cur_size = _backup.size();
- _cur_offset = 0;
- _is_current_empty = (_cur_size == 0);
}
return Status::OK();
}
@@ -127,22 +112,13 @@ Status VExplodeSplitTableFunction::process_close() {
return Status::OK();
}
-Status VExplodeSplitTableFunction::get_value(void** output) {
- if (_is_current_empty) {
- *output = nullptr;
+void VExplodeSplitTableFunction::get_value(MutableColumnPtr& column) {
+ if (current_empty()) {
+ column->insert_default();
} else {
- *output = const_cast<char*>(_backup[_cur_offset].data());
+ column->insert_data(const_cast<char*>(_backup[_cur_offset].data()),
+ _backup[_cur_offset].length());
}
- return Status::OK();
-}
-
-Status VExplodeSplitTableFunction::get_value_length(int64_t* length) {
- if (_is_current_empty) {
- *length = -1;
- } else {
- *length = _backup[_cur_offset].length();
- }
- return Status::OK();
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/table_function/vexplode_split.h
b/be/src/vec/exprs/table_function/vexplode_split.h
index 53935b6a0b..5881ef9b6e 100644
--- a/be/src/vec/exprs/table_function/vexplode_split.h
+++ b/be/src/vec/exprs/table_function/vexplode_split.h
@@ -30,12 +30,10 @@ public:
~VExplodeSplitTableFunction() override = default;
Status open() override;
- Status process_init(vectorized::Block* block) override;
+ Status process_init(Block* block) override;
Status process_row(size_t row_idx) override;
Status process_close() override;
- Status get_value(void** output) override;
- Status get_value_length(int64_t* length) override;
- Status reset() override;
+ void get_value(MutableColumnPtr& column) override;
private:
std::vector<std::string_view> _backup;
diff --git a/be/test/vec/function/function_test_util.cpp
b/be/test/vec/function/function_test_util.cpp
index e1551fe77f..da975a2c84 100644
--- a/be/test/vec/function/function_test_util.cpp
+++ b/be/test/vec/function/function_test_util.cpp
@@ -335,6 +335,9 @@ Block* process_table_function(TableFunction* fn, Block*
input_block,
// prepare output column
vectorized::MutableColumnPtr column = descs[0].data_type->create_column();
+ if (column->is_nullable()) {
+ fn->set_nullable();
+ }
// process table function for all rows
for (size_t row = 0; row < input_block->rows(); ++row) {
@@ -348,25 +351,10 @@ Block* process_table_function(TableFunction* fn, Block*
input_block,
continue;
}
- bool tmp_eos = false;
do {
- void* cell = nullptr;
- int64_t cell_len = 0;
- if (fn->get_value(&cell) != Status::OK() ||
- fn->get_value_length(&cell_len) != Status::OK()) {
- LOG(WARNING) << "TableFunction get_value or get_value_length
failed";
- return nullptr;
- }
-
- // copy data from input block
- if (cell == nullptr) {
- column->insert_default();
- } else {
- column->insert_data(reinterpret_cast<char*>(cell), cell_len);
- }
-
- fn->forward(&tmp_eos);
- } while (!tmp_eos);
+ fn->get_value(column);
+ fn->forward();
+ } while (!fn->eos());
}
std::unique_ptr<Block> output_block(new Block());
diff --git
a/regression-test/suites/schema_change_p0/test_varchar_schema_change.groovy
b/regression-test/suites/schema_change_p0/test_varchar_schema_change.groovy
index 2b2b0619fd..a3a66c68fd 100644
--- a/regression-test/suites/schema_change_p0/test_varchar_schema_change.groovy
+++ b/regression-test/suites/schema_change_p0/test_varchar_schema_change.groovy
@@ -111,7 +111,7 @@ suite ("test_varchar_schema_change") {
logger.info(res[2][1])
assertEquals(res[2][1].toLowerCase(),"varchar(30)")
- qt_sc " select * from ${tableName} order by 1; "
+ qt_sc " select * from ${tableName} order by 1,2; "
// test { //没捕获到异常
// sql """ insert into ${tableName}
values(92,'2017-12-01',483647,'sdafdsaf') """
@@ -140,7 +140,7 @@ suite ("test_varchar_schema_change") {
logger.info(res[2][1])
assertEquals(res[2][1].toLowerCase(),"varchar(30)")
- qt_sc " select * from ${tableName} where c2 like '%1%' order by 1; "
+ qt_sc " select * from ${tableName} where c2 like '%1%' order by 1,2; "
sql """ insert into ${tableName}
values(22,'2011-12-01','12f2','fdsaf') """
sql """ insert into ${tableName}
values(55,'2009-11-21','12d1d113','123aa') """
@@ -196,9 +196,9 @@ suite ("test_varchar_schema_change") {
} while (running)
}
- qt_sc " select * from ${tableName} order by 1; "
- qt_sc " select min(c2),max(c2) from ${tableName} order by 1; "
- qt_sc " select min(c2),max(c2) from ${tableName} group by c0 order by
1; "
+ qt_sc " select * from ${tableName} order by 1,2; "
+ qt_sc " select min(c2),max(c2) from ${tableName} order by 1,2; "
+ qt_sc " select min(c2),max(c2) from ${tableName} group by c0 order by
1,2; "
sleep(5000)
sql """ alter table ${tableName}
@@ -222,7 +222,7 @@ suite ("test_varchar_schema_change") {
logger.info(res[2][1])
assertEquals(res[2][1].toLowerCase(),"varchar(40)")
- qt_sc " select * from ${tableName} order by 1; "
+ qt_sc " select * from ${tableName} order by 1,2; "
// test{
// sql """ alter table t0 modify column c1 varchar(20) NOT NULL """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]