This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 4be5fd8896dcd445a6379bdcda4bdcf318f24511 Author: Yida Wu <[email protected]> AuthorDate: Fri Mar 29 08:41:23 2024 -0700 IMPALA-12960: Fix Incorrect RowsPassedThrough Metric in Streaming Aggregation This patch fixes a bug in the RowsPassedThrough metric within the query profile while using Streaming Aggregation. The issue is from the AddBatchStreaming() function's logic, where the number of rows in the output batch isn't necessarily initialized to 0, while the function uses num_rows() of the output batch directly to be the actual number of rows returned and passed through of this specific aggregator. This discrepancy can significantly impact the accuracy of the returned and passed through numbers, as well as the calculation of reduction rates during hash table expansion in Streaming Aggregation. Huge differences can be observed especially when using the rollup function. The solution is to calculate the actual number of rows added to the output batch within each round of the AddBatchStreaming() function. Tests: Passed exhaustive tests. Added a corresponding case in tpch-passthrough-aggregations.test. Change-Id: I59205a4b06824ee1607a25e906db1f96dc4eda9f Reviewed-on: http://gerrit.cloudera.org:8080/21235 Reviewed-by: Wenzhe Zhou <[email protected]> Reviewed-by: Riza Suminto <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/grouping-aggregator.cc | 5 +++-- .../queries/tpch-passthrough-aggregations.test | 24 ++++++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc index 074167dc0..61ef2d948 100644 --- a/be/src/exec/grouping-aggregator.cc +++ b/be/src/exec/grouping-aggregator.cc @@ -505,6 +505,7 @@ Status GroupingAggregator::AddBatchStreaming( } TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode; + int64_t num_row_out_batch_old = out_batch->num_rows(); GroupingAggregatorConfig::AddBatchStreamingImplFn fn = add_batch_streaming_impl_fn_.load(); if (fn != nullptr) { @@ -515,8 +516,8 @@ Status GroupingAggregator::AddBatchStreaming( child_batch, out_batch, ht_ctx_.get(), remaining_capacity)); } *eos = (streaming_idx_ == 0); - - num_rows_returned_ += out_batch->num_rows(); + DCHECK_GE(out_batch->num_rows(), num_row_out_batch_old); + num_rows_returned_ += out_batch->num_rows() - num_row_out_batch_old; COUNTER_SET(num_passthrough_rows_, num_rows_returned_); return Status::OK(); } diff --git a/testdata/workloads/tpch/queries/tpch-passthrough-aggregations.test b/testdata/workloads/tpch/queries/tpch-passthrough-aggregations.test index 5b9b1c5c9..8c25faf47 100644 --- a/testdata/workloads/tpch/queries/tpch-passthrough-aggregations.test +++ b/testdata/workloads/tpch/queries/tpch-passthrough-aggregations.test @@ -50,6 +50,30 @@ bigint,bigint,bigint,string row_regex: .*RowsPassedThrough: .* \([1-9][0-9]*\) ==== ---- QUERY +# IMPALA-12960: Test the case of rollup. +select l_orderkey, l_partkey, count(*) +from tpch.lineitem +group by rollup(1, 2) +order by 3 desc, 1, 2 +limit 10; +---- RESULTS +NULL,NULL,6001215 +7,NULL,7 +68,NULL,7 +129,NULL,7 +164,NULL,7 +194,NULL,7 +225,NULL,7 +226,NULL,7 +322,NULL,7 +326,NULL,7 +---- TYPES +bigint,bigint,bigint +---- RUNTIME_PROFILE +# Verify that at least one passthrough number should be 0 for rollup. +row_regex: .*RowsPassedThrough: 0 +==== +---- QUERY # Test for preaggregation passthrough optimization: two-phase count distinct aggregation. select count(distinct p_comment) from part
