This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0a44de67bf6bcd0b41ef066c681cb97694ca8d53 Author: zhangstar333 <[email protected]> AuthorDate: Tue Mar 26 17:34:19 2024 +0800 [bug](distinct agg) fix distinct streaming agg not output all data (#32760) fix distinct streaming agg not output all data --- be/src/vec/exec/distinct_vaggregation_node.cpp | 14 +++++++++++++- .../data/variant_github_events_p0_new/load.out | 3 +++ .../{load.out => sql/test_distinct_streaming_agg.out} | 7 ++----- .../suites/variant_github_events_p0_new/load.groovy | 17 +++++++++++++++++ .../sql/test_distinct_streaming_agg.sql | 5 +++++ 5 files changed, 40 insertions(+), 6 deletions(-) diff --git a/be/src/vec/exec/distinct_vaggregation_node.cpp b/be/src/vec/exec/distinct_vaggregation_node.cpp index 66368fe3a11..8d4f71ec88a 100644 --- a/be/src/vec/exec/distinct_vaggregation_node.cpp +++ b/be/src/vec/exec/distinct_vaggregation_node.cpp @@ -17,8 +17,11 @@ #include "vec/exec/distinct_vaggregation_node.h" +#include <glog/logging.h> + #include "runtime/runtime_state.h" #include "vec/aggregate_functions/aggregate_function_uniq.h" +#include "vec/columns/column.h" #include "vec/exec/vaggregation_node.h" namespace doris { @@ -64,9 +67,18 @@ Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key( SCOPED_TIMER(_insert_keys_to_column_timer); bool mem_reuse = _make_nullable_keys.empty() && out_block->mem_reuse(); if (mem_reuse) { + if (_stop_emplace_flag && !out_block->empty()) { + // when out_block row >= batch_size, push it to data_queue, so when _stop_emplace_flag = true, maybe have some data in block + // need output those data firstly + for (int i = 0; i < rows; ++i) { + _distinct_row.push_back(i); + } + } for (int i = 0; i < key_size; ++i) { auto output_column = out_block->get_by_position(i).column; - if (_stop_emplace_flag) { // swap the column directly, to solve Check failed: d.column->use_count() == 1 (2 vs. 1) + if (_stop_emplace_flag && _distinct_row.empty()) { + // means it's streaming and out_block have no data. + // swap the column directly, so not insert data again. and solve Check failed: d.column->use_count() == 1 (2 vs. 1) out_block->replace_by_position(i, key_columns[i]->assume_mutable()); in_block->replace_by_position(result_idxs[i], output_column); } else { diff --git a/regression-test/data/variant_github_events_p0_new/load.out b/regression-test/data/variant_github_events_p0_new/load.out index 13ce3dfca08..0aeaaeed024 100644 --- a/regression-test/data/variant_github_events_p0_new/load.out +++ b/regression-test/data/variant_github_events_p0_new/load.out @@ -5,3 +5,6 @@ \N 4748 +-- !sql_select_count -- +67843 + diff --git a/regression-test/data/variant_github_events_p0_new/load.out b/regression-test/data/variant_github_events_p0_new/sql/test_distinct_streaming_agg.out similarity index 70% copy from regression-test/data/variant_github_events_p0_new/load.out copy to regression-test/data/variant_github_events_p0_new/sql/test_distinct_streaming_agg.out index 13ce3dfca08..20dee980ec8 100644 --- a/regression-test/data/variant_github_events_p0_new/load.out +++ b/regression-test/data/variant_github_events_p0_new/sql/test_distinct_streaming_agg.out @@ -1,7 +1,4 @@ -- This file is automatically generated. You should know what you did if you want to edit this --- !sql -- -\N -\N -\N -4748 +-- !test_distinct_streaming_agg -- +31481 diff --git a/regression-test/suites/variant_github_events_p0_new/load.groovy b/regression-test/suites/variant_github_events_p0_new/load.groovy index 777befbd160..0be0f205b69 100644 --- a/regression-test/suites/variant_github_events_p0_new/load.groovy +++ b/regression-test/suites/variant_github_events_p0_new/load.groovy @@ -80,4 +80,21 @@ suite("regression_test_variant_github_events_p0", "nonConcurrent"){ // TODO fix compaction issue, this case could be stable qt_sql """select cast(v["payload"]["pull_request"]["additions"] as int) from github_events where cast(v["repo"]["name"] as string) = 'xpressengine/xe-core' order by 1;""" // TODO add test case that some certain columns are materialized in some file while others are not materilized(sparse) + + sql """DROP TABLE IF EXISTS github_events_2""" + sql """ + CREATE TABLE IF NOT EXISTS `github_events_2` ( + `k` BIGINT NULL, + `v` text NULL, + INDEX idx_var (`v`) USING INVERTED PROPERTIES("parser" = "english") COMMENT '' + ) ENGINE = OLAP DUPLICATE KEY(`k`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`k`) BUCKETS 4 PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + insert into github_events_2 select 1, cast(v["repo"]["name"] as string) FROM github_events; + """ + + qt_sql_select_count """ select count(*) from github_events_2; """ } diff --git a/regression-test/suites/variant_github_events_p0_new/sql/test_distinct_streaming_agg.sql b/regression-test/suites/variant_github_events_p0_new/sql/test_distinct_streaming_agg.sql new file mode 100644 index 00000000000..40854a2d535 --- /dev/null +++ b/regression-test/suites/variant_github_events_p0_new/sql/test_distinct_streaming_agg.sql @@ -0,0 +1,5 @@ +SELECT + /*+SET_VAR(batch_size=50,experimental_enable_pipeline_x_engine=false,parallel_pipeline_task_num=1,disable_streaming_preaggregations=false) */ + count(distinct v) +FROM + github_events_2; \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
