This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f7cf4f8446642b7c50c54c5cba977a4fcdf9a6e3
Author: Riza Suminto <[email protected]>
AuthorDate: Tue May 13 10:50:32 2025 -0700

    IMPALA-14070: Use checkedMultiply in SortNode.java
    
    maxRowsInHeaps calculation may overflow because it use simple
    multiplication. This patch fix the bug by calculating it using
    checkedMultiply(). A broader refactoring will be done by IMPALA-14071.
    
    Testing:
    Add ee tests TestTopNHighNdv that exercise the issue.
    
    Change-Id: Ic6712b94f4704fd8016829b2538b1be22baaf2f7
    Reviewed-on: http://gerrit.cloudera.org:8080/22896
    Reviewed-by: Abhishek Rawat <[email protected]>
    Reviewed-by: Wenzhe Zhou <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../java/org/apache/impala/planner/SortNode.java   |  2 +-
 .../QueryTest/partitioned-top-n-high-ndv.test      | 74 ++++++++++++++++++++++
 tests/query_test/test_queries.py                   | 15 +++++
 3 files changed, 90 insertions(+), 1 deletion(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java 
b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index 43295763e..8ad65ce3f 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -312,7 +312,7 @@ public class SortNode extends PlanNode implements 
SpillableOperator {
       List<Expr> partExprs = info_.getSortExprs().subList(0, 
numPartitionExprs_);
       long partNdv = numPartitionExprs_ == 0 ? 1 : 
Expr.getNumDistinctValues(partExprs);
       if (partNdv >= 0) {
-        long maxRowsInHeaps = partNdv * getPerPartitionLimit();
+        long maxRowsInHeaps = checkedMultiply(partNdv, getPerPartitionLimit());
         if (cardinality_ < 0 || cardinality_ > maxRowsInHeaps) {
           cardinality_ = maxRowsInHeaps;
         }
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/partitioned-top-n-high-ndv.test
 
b/testdata/workloads/functional-query/queries/QueryTest/partitioned-top-n-high-ndv.test
new file mode 100644
index 000000000..1da2ee52d
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/partitioned-top-n-high-ndv.test
@@ -0,0 +1,74 @@
+====
+---- QUERY
+# IMPALA-14070: Create table with artificially high NDV.
+CREATE TABLE $DATABASE.high_ndv_cols (part1 STRING, part2 STRING, part3 
STRING, col4 STRING, col5 STRING, col6 STRING,
+  PRIMARY KEY (part1, part2, part3) )
+PARTITION BY HASH (part1, part2, part3)
+PARTITIONS 3 STORED AS KUDU;
+insert into $DATABASE.high_ndv_cols values ("A", "A", "A", "A", "A", "A");
+alter table $DATABASE.high_ndv_cols set column stats part1 
('numDVs'='7792','numNulls'='0');
+alter table $DATABASE.high_ndv_cols set column stats part2 
('numDVs'='12502840','numNulls'='0');
+alter table $DATABASE.high_ndv_cols set column stats part3 
('numDVs'='4245286','numNulls'='0');
+alter table $DATABASE.high_ndv_cols set column stats col4 
('numDVs'='963','numNulls'='0');
+alter table $DATABASE.high_ndv_cols set column stats col5 
('numDVs'='2','numNulls'='0');
+====
+---- QUERY
+# IMPALA-14070: Run EXPLAIN.
+EXPLAIN with vw1 as (select part1, part2, part3, col4,
+    rank() OVER (PARTITION BY part1, part2, part3, col4, col5 ORDER BY col6 
ASC) rk
+from $DATABASE.high_ndv_cols)
+select * from vw1 where rk<=2 LIMIT 100;
+---- RESULTS
+'Max Per-Host Resource Reservation: Memory=32.00MB Threads=4'
+'Per-Host Resource Estimates: Memory=8192.00PB'
+'WARNING: The following tables are missing relevant table and/or column 
statistics.'
+'$DATABASE.high_ndv_cols'
+''
+'PLAN-ROOT SINK'
+'|'
+'06:EXCHANGE [UNPARTITIONED]'
+'|  limit: 100'
+'|'
+'03:SELECT'
+'|  predicates: rank() <= 2'
+'|  limit: 100'
+'|  row-size=80B cardinality=100'
+'|'
+'02:ANALYTIC'
+'|  functions: rank()'
+'|  partition by: part1, part2, part3, col4, col5'
+'|  order by: col6 ASC'
+'|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW'
+'|  row-size=80B cardinality=9223372.04T'
+'|'
+'05:TOP-N'
+'|  partition by: part1, part2, part3, col4, col5'
+'|  order by: col6 ASC'
+'|  partition limit: 2 (include ties)'
+'|  row-size=72B cardinality=9223372.04T'
+'|'
+'04:EXCHANGE [HASH(part1,part2,part3,col4,col5)]'
+'|'
+'01:TOP-N'
+'|  partition by: part1, part2, part3, col4, col5'
+'|  order by: col6 ASC'
+'|  partition limit: 2 (include ties)'
+'|  source expr: rank() <= CAST(2 AS BIGINT)'
+'|  row-size=72B cardinality=9223372.04T'
+'|'
+'00:SCAN KUDU [$DATABASE.high_ndv_cols]'
+'   row-size=96B cardinality=unavailable'
+---- TYPES
+STRING
+====
+---- QUERY
+# IMPALA-14070: Run query.
+with vw1 as (select part1, part2, part3, col4,
+    rank() OVER (PARTITION BY part1, part2, part3, col4, col5 ORDER BY col6 
ASC) rk
+from $DATABASE.high_ndv_cols)
+select * from vw1 where rk<=2 LIMIT 100;
+---- RESULTS
+'A','A','A','A',1
+---- TYPES
+STRING, STRING, STRING, STRING, BIGINT
+====
diff --git a/tests/query_test/test_queries.py b/tests/query_test/test_queries.py
index 94801670a..f1efb6e91 100644
--- a/tests/query_test/test_queries.py
+++ b/tests/query_test/test_queries.py
@@ -31,6 +31,7 @@ from tests.common.test_dimensions import (
     create_client_protocol_dimension,
     create_exec_option_dimension,
     create_exec_option_dimension_from_dict,
+    create_single_exec_option_dimension,
     create_uncompressed_json_dimension,
     create_uncompressed_text_dimension,
     default_protocol_or_parquet_constraint,
@@ -451,3 +452,17 @@ class TestAnalyticFnsTpch(ImpalaTestSuite):
 
   def test_analytic_predicate(self, vector):
     self.run_test_case('analytic-fns', vector)
+
+
+class TestTopNHighNdv(ImpalaTestSuite):
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestTopNHighNdv, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+    cls.ImpalaTestMatrix.add_dimension(
+      create_uncompressed_text_dimension(cls.get_workload()))
+
+  def test_topn_high_ndv(self, vector, unique_database):
+    self.run_test_case(
+      'QueryTest/partitioned-top-n-high-ndv', vector, use_db=unique_database)

Reply via email to