This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.0.1 by this push:
new 8cce760493 [hotfix](dev-1.0.1) fix agg node produce nullable value bug
(#10430)
8cce760493 is described below
commit 8cce760493b4a598c0ed93fbd5b7ffd2c6bdc642
Author: starocean999 <[email protected]>
AuthorDate: Tue Jun 28 16:29:51 2022 +0800
[hotfix](dev-1.0.1) fix agg node produce nullable value bug (#10430)
---
be/src/vec/exec/vaggregation_node.cpp | 118 +++++++++++++---------------------
be/src/vec/exec/vaggregation_node.h | 9 +--
2 files changed, 48 insertions(+), 79 deletions(-)
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index def5f56ff7..37be1f5648 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -224,15 +224,22 @@ Status AggregationNode::prepare(RuntimeState* state) {
auto nullable_input = _probe_expr_ctxs[i]->root()->is_nullable();
if (nullable_output != nullable_input) {
DCHECK(nullable_output);
- _make_nullable_keys.emplace_back(i);
+ _make_nullable_output_column_pos.emplace_back(i);
}
}
+ int probe_expr_count = _probe_expr_ctxs.size();
for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++j) {
SlotDescriptor* intermediate_slot_desc =
_intermediate_tuple_desc->slots()[j];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(state,
child(0)->row_desc(),
_mem_pool.get(),
intermediate_slot_desc,
output_slot_desc,
mem_tracker()));
+ auto nullable_output = output_slot_desc->is_nullable();
+ auto nullable_agg_output =
_aggregate_evaluators[i]->data_type()->is_nullable();
+ if (nullable_output != nullable_agg_output) {
+ DCHECK(nullable_output);
+ _make_nullable_output_column_pos.emplace_back(i +
probe_expr_count);
+ }
}
// set profile timer to evaluators
@@ -384,11 +391,11 @@ Status AggregationNode::get_next(RuntimeState* state,
Block* block, bool* eos) {
}
// pre stream agg need use _num_row_return to decide whether to do pre
stream agg
_num_rows_returned += block->rows();
- _make_nullable_output_key(block);
+ _make_nullable_output_column(block);
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
} else {
RETURN_IF_ERROR(_executor.get_result(state, block, eos));
- _make_nullable_output_key(block);
+ _make_nullable_output_column(block);
// dispose the having clause, should not be execute in prestreaming agg
RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block,
block->columns()));
reached_limit(block, eos);
@@ -526,10 +533,12 @@ Status AggregationNode::_merge_without_key(Block* block) {
std::unique_ptr<char[]> deserialize_buffer(new
char[_total_size_of_aggregate_states]);
int rows = block->rows();
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- DCHECK(_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 &&
-
_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref());
- int col_id =
-
((VSlotRef*)_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root())->column_id();
+ int col_id = i;
+ if (_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 &&
+
_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref()) {
+ col_id =
((VSlotRef*)_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root())
+ ->column_id();
+ }
if (_aggregate_evaluators[i]->is_merge()) {
auto column = block->get_by_position(col_id).column;
if (column->is_nullable()) {
@@ -569,13 +578,16 @@ void AggregationNode::_close_without_key() {
release_tracker();
}
-void AggregationNode::_make_nullable_output_key(Block* block) {
+void AggregationNode::_make_nullable_output_column(Block* block) {
if (block->rows() != 0) {
- for (auto cid : _make_nullable_keys) {
- block->get_by_position(cid).column =
- make_nullable(block->get_by_position(cid).column);
- block->get_by_position(cid).type =
- make_nullable(block->get_by_position(cid).type);
+ for (auto cid : _make_nullable_output_column_pos) {
+ if (!block->get_by_position(cid).column->is_nullable()) {
+ block->get_by_position(cid).column =
+ make_nullable(block->get_by_position(cid).column);
+ }
+ if (!block->get_by_position(cid).type->is_nullable()) {
+ block->get_by_position(cid).type =
make_nullable(block->get_by_position(cid).type);
+ }
}
}
}
@@ -688,7 +700,7 @@ Status
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
// will serialize value data to string column
std::vector<VectorBufferWriter> value_buffer_writers;
- bool mem_reuse = out_block->mem_reuse();
+ bool mem_reuse = out_block->mem_reuse() &&
_make_nullable_output_column_pos.empty();
auto serialize_string_type =
std::make_shared<DataTypeString>();
MutableColumns value_columns;
for (int i = 0; i < _aggregate_evaluators.size(); ++i)
{
@@ -839,49 +851,42 @@ Status
AggregationNode::_execute_with_serialized_key(Block* block) {
Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state,
Block* block,
bool* eos) {
- bool mem_reuse = block->mem_reuse();
+ bool mem_reuse = block->mem_reuse() &&
_make_nullable_output_column_pos.empty();
auto column_withschema =
VectorizedUtils::create_columns_with_type_and_name(row_desc());
int key_size = _probe_expr_ctxs.size();
MutableColumns key_columns;
for (int i = 0; i < key_size; ++i) {
if (!mem_reuse) {
-
key_columns.emplace_back(column_withschema[i].type->create_column());
+
key_columns.emplace_back(_probe_expr_ctxs[i]->root()->data_type()->create_column());
} else {
key_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate());
}
}
- MutableColumns temp_key_columns = _create_temp_key_columns();
- DCHECK(temp_key_columns.size() == key_size);
-
MutableColumns value_columns;
for (int i = key_size; i < column_withschema.size(); ++i) {
if (!mem_reuse) {
-
value_columns.emplace_back(column_withschema[i].type->create_column());
+ value_columns.emplace_back(
+ _aggregate_evaluators[i -
key_size]->data_type()->create_column());
} else {
value_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate());
}
}
- MutableColumns temp_value_columns = _create_temp_value_columns();
- DCHECK(temp_value_columns.size() == _aggregate_evaluators.size() &&
- _aggregate_evaluators.size() == column_withschema.size() -
key_size);
-
SCOPED_TIMER(_get_results_timer);
std::visit(
[&](auto&& agg_method) -> void {
auto& data = agg_method.data;
auto& iter = agg_method.iterator;
agg_method.init_once();
- while (iter != data.end() && temp_key_columns[0]->size() <
state->batch_size()) {
+ while (iter != data.end() && key_columns[0]->size() <
state->batch_size()) {
const auto& key = iter->get_first();
auto& mapped = iter->get_second();
- agg_method.insert_key_into_columns(key, temp_key_columns,
_probe_key_sz);
+ agg_method.insert_key_into_columns(key, key_columns,
_probe_key_sz);
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i)
_aggregate_evaluators[i]->insert_result_info(
- mapped + _offsets_of_aggregate_states[i],
- temp_value_columns[i].get());
+ mapped + _offsets_of_aggregate_states[i],
value_columns[i].get());
++iter;
}
@@ -889,15 +894,15 @@ Status
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
if (agg_method.data.has_null_key_data()) {
// only one key of group by support wrap null key
// here need additional processing logic on the null
key / value
- DCHECK(temp_key_columns.size() == 1);
- DCHECK(temp_key_columns[0]->is_nullable());
- if (temp_key_columns[0]->size() < state->batch_size())
{
- temp_key_columns[0]->insert_data(nullptr, 0);
+ DCHECK(key_columns.size() == 1);
+ DCHECK(key_columns[0]->is_nullable());
+ if (key_columns[0]->size() < state->batch_size()) {
+ key_columns[0]->insert_data(nullptr, 0);
auto mapped = agg_method.data.get_null_key_data();
for (size_t i = 0; i <
_aggregate_evaluators.size(); ++i)
_aggregate_evaluators[i]->insert_result_info(
mapped +
_offsets_of_aggregate_states[i],
- temp_value_columns[i].get());
+ value_columns[i].get());
*eos = true;
}
} else {
@@ -907,25 +912,6 @@ Status
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
},
_agg_data._aggregated_method_variant);
- for (int i = 0; i < key_size; ++i) {
- if (key_columns[i]->is_nullable() xor
temp_key_columns[i]->is_nullable()) {
- DCHECK(key_columns[i]->is_nullable() &&
!temp_key_columns[i]->is_nullable());
- key_columns[i] =
(*std::move(make_nullable(std::move(temp_key_columns[i])))).mutate();
- } else {
- key_columns[i] = std::move(temp_key_columns[i]);
- }
- }
-
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- if (value_columns[i]->is_nullable() xor
temp_value_columns[i]->is_nullable()) {
- DCHECK(value_columns[i]->is_nullable() &&
!temp_value_columns[i]->is_nullable());
- value_columns[i] =
-
(*std::move(make_nullable(std::move(temp_value_columns[i])))).mutate();
- } else {
- value_columns[i] = std::move(temp_value_columns[i]);
- }
- }
-
if (!mem_reuse) {
*block = column_withschema;
MutableColumns columns(block->columns());
@@ -949,7 +935,7 @@ Status
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
MutableColumns value_columns(agg_size);
DataTypes value_data_types(agg_size);
- bool mem_reuse = block->mem_reuse();
+ bool mem_reuse = block->mem_reuse() &&
_make_nullable_output_column_pos.empty();
MutableColumns key_columns;
for (int i = 0; i < key_size; ++i) {
@@ -1078,10 +1064,12 @@ Status
AggregationNode::_merge_with_serialized_key(Block* block) {
std::unique_ptr<char[]> deserialize_buffer(new
char[_total_size_of_aggregate_states]);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- DCHECK(_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 &&
-
_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref());
- int col_id =
-
((VSlotRef*)_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root())->column_id();
+ int col_id = i + key_size;
+ if (_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 &&
+
_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref()) {
+ col_id =
((VSlotRef*)_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root())
+ ->column_id();
+ }
if (_aggregate_evaluators[i]->is_merge()) {
auto column = block->get_by_position(col_id).column;
if (column->is_nullable()) {
@@ -1143,20 +1131,4 @@ void AggregationNode::release_tracker() {
mem_tracker()->Release(_mem_usage_record.used_in_state +
_mem_usage_record.used_in_arena);
}
-MutableColumns AggregationNode::_create_temp_key_columns() {
- MutableColumns key_columns;
- for (const auto& expr_ctx : _probe_expr_ctxs) {
- key_columns.push_back(expr_ctx->root()->data_type()->create_column());
- }
- return key_columns;
-}
-
-MutableColumns AggregationNode::_create_temp_value_columns() {
- MutableColumns key_columns;
- for (const auto& agg : _aggregate_evaluators) {
- key_columns.push_back(agg->data_type()->create_column());
- }
- return key_columns;
-}
-
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
index d2f580d327..149836ade7 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -417,9 +417,9 @@ public:
private:
// group by k1,k2
std::vector<VExprContext*> _probe_expr_ctxs;
- // left / full join will change the key nullable make output/input solt
+ // left / full join will change the output nullable make output/input solt
// nullable diff. so we need make nullable of it.
- std::vector<size_t> _make_nullable_keys;
+ std::vector<size_t> _make_nullable_output_column_pos;
std::vector<size_t> _probe_key_sz;
std::vector<AggFnEvaluator*> _aggregate_evaluators;
@@ -461,7 +461,7 @@ private:
/// the preagg should pass through any rows it can't fit in its tables.
bool _should_expand_preagg_hash_tables();
- void _make_nullable_output_key(Block* block);
+ void _make_nullable_output_column(Block* block);
Status _create_agg_status(AggregateDataPtr data);
Status _destory_agg_status(AggregateDataPtr data);
@@ -484,9 +484,6 @@ private:
void release_tracker();
- MutableColumns _create_temp_key_columns();
- MutableColumns _create_temp_value_columns();
-
using vectorized_execute = std::function<Status(Block* block)>;
using vectorized_pre_agg = std::function<Status(Block* in_block, Block*
out_block)>;
using vectorized_get_result =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]