This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 0a44de67bf6bcd0b41ef066c681cb97694ca8d53
Author: zhangstar333 <[email protected]>
AuthorDate: Tue Mar 26 17:34:19 2024 +0800

    [bug](distinct agg) fix distinct streaming agg not output all data (#32760)
    
    fix distinct streaming agg not output all data
---
 be/src/vec/exec/distinct_vaggregation_node.cpp          | 14 +++++++++++++-
 .../data/variant_github_events_p0_new/load.out          |  3 +++
 .../{load.out => sql/test_distinct_streaming_agg.out}   |  7 ++-----
 .../suites/variant_github_events_p0_new/load.groovy     | 17 +++++++++++++++++
 .../sql/test_distinct_streaming_agg.sql                 |  5 +++++
 5 files changed, 40 insertions(+), 6 deletions(-)

diff --git a/be/src/vec/exec/distinct_vaggregation_node.cpp 
b/be/src/vec/exec/distinct_vaggregation_node.cpp
index 66368fe3a11..8d4f71ec88a 100644
--- a/be/src/vec/exec/distinct_vaggregation_node.cpp
+++ b/be/src/vec/exec/distinct_vaggregation_node.cpp
@@ -17,8 +17,11 @@
 
 #include "vec/exec/distinct_vaggregation_node.h"
 
+#include <glog/logging.h>
+
 #include "runtime/runtime_state.h"
 #include "vec/aggregate_functions/aggregate_function_uniq.h"
+#include "vec/columns/column.h"
 #include "vec/exec/vaggregation_node.h"
 
 namespace doris {
@@ -64,9 +67,18 @@ Status 
DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(
     SCOPED_TIMER(_insert_keys_to_column_timer);
     bool mem_reuse = _make_nullable_keys.empty() && out_block->mem_reuse();
     if (mem_reuse) {
+        if (_stop_emplace_flag && !out_block->empty()) {
+            // when out_block row >= batch_size, push it to data_queue, so 
when _stop_emplace_flag = true, maybe have some data in block
+            // need output those data firstly
+            for (int i = 0; i < rows; ++i) {
+                _distinct_row.push_back(i);
+            }
+        }
         for (int i = 0; i < key_size; ++i) {
             auto output_column = out_block->get_by_position(i).column;
-            if (_stop_emplace_flag) { // swap the column directly, to solve 
Check failed: d.column->use_count() == 1 (2 vs. 1)
+            if (_stop_emplace_flag && _distinct_row.empty()) {
+                // means it's streaming and out_block have no data.
+                // swap the column directly, so not insert data again. and 
solve Check failed: d.column->use_count() == 1 (2 vs. 1)
                 out_block->replace_by_position(i, 
key_columns[i]->assume_mutable());
                 in_block->replace_by_position(result_idxs[i], output_column);
             } else {
diff --git a/regression-test/data/variant_github_events_p0_new/load.out 
b/regression-test/data/variant_github_events_p0_new/load.out
index 13ce3dfca08..0aeaaeed024 100644
--- a/regression-test/data/variant_github_events_p0_new/load.out
+++ b/regression-test/data/variant_github_events_p0_new/load.out
@@ -5,3 +5,6 @@
 \N
 4748
 
+-- !sql_select_count --
+67843
+
diff --git a/regression-test/data/variant_github_events_p0_new/load.out 
b/regression-test/data/variant_github_events_p0_new/sql/test_distinct_streaming_agg.out
similarity index 70%
copy from regression-test/data/variant_github_events_p0_new/load.out
copy to 
regression-test/data/variant_github_events_p0_new/sql/test_distinct_streaming_agg.out
index 13ce3dfca08..20dee980ec8 100644
--- a/regression-test/data/variant_github_events_p0_new/load.out
+++ 
b/regression-test/data/variant_github_events_p0_new/sql/test_distinct_streaming_agg.out
@@ -1,7 +1,4 @@
 -- This file is automatically generated. You should know what you did if you 
want to edit this
--- !sql --
-\N
-\N
-\N
-4748
+-- !test_distinct_streaming_agg --
+31481
 
diff --git a/regression-test/suites/variant_github_events_p0_new/load.groovy 
b/regression-test/suites/variant_github_events_p0_new/load.groovy
index 777befbd160..0be0f205b69 100644
--- a/regression-test/suites/variant_github_events_p0_new/load.groovy
+++ b/regression-test/suites/variant_github_events_p0_new/load.groovy
@@ -80,4 +80,21 @@ suite("regression_test_variant_github_events_p0", 
"nonConcurrent"){
     // TODO fix compaction issue, this case could be stable
     qt_sql """select cast(v["payload"]["pull_request"]["additions"] as int)  
from github_events where cast(v["repo"]["name"] as string) = 
'xpressengine/xe-core' order by 1;"""
     // TODO add test case that some certain columns are materialized in some 
file while others are not materilized(sparse)
+
+    sql """DROP TABLE IF EXISTS github_events_2"""
+    sql """
+        CREATE TABLE IF NOT EXISTS `github_events_2` (
+        `k` BIGINT NULL,
+        `v` text NULL,
+        INDEX idx_var (`v`) USING INVERTED PROPERTIES("parser" = "english") 
COMMENT ''
+        ) ENGINE = OLAP DUPLICATE KEY(`k`) COMMENT 'OLAP' DISTRIBUTED BY 
HASH(`k`) BUCKETS 4 PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+    """
+
+    sql """
+        insert into github_events_2 select 1, cast(v["repo"]["name"] as 
string) FROM github_events;
+    """
+
+    qt_sql_select_count """ select count(*) from github_events_2; """
 }
diff --git 
a/regression-test/suites/variant_github_events_p0_new/sql/test_distinct_streaming_agg.sql
 
b/regression-test/suites/variant_github_events_p0_new/sql/test_distinct_streaming_agg.sql
new file mode 100644
index 00000000000..40854a2d535
--- /dev/null
+++ 
b/regression-test/suites/variant_github_events_p0_new/sql/test_distinct_streaming_agg.sql
@@ -0,0 +1,5 @@
+SELECT
+    
/*+SET_VAR(batch_size=50,experimental_enable_pipeline_x_engine=false,parallel_pipeline_task_num=1,disable_streaming_preaggregations=false)
 */
+    count(distinct v)
+FROM
+    github_events_2;
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to