This is an automated email from the ASF dual-hosted git repository.

krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 856d8070f5b HIVE-26671: Incorrect results with Top N Key optimization 
(Stephen Carlin, reviewed by Krisztian Kasa)
856d8070f5b is described below

commit 856d8070f5bcc9d0d12d46d453a455304fb8009f
Author: scarlin-cloudera <[email protected]>
AuthorDate: Sat Oct 29 23:18:29 2022 -0700

    HIVE-26671: Incorrect results with Top N Key optimization (Stephen Carlin, 
reviewed by Krisztian Kasa)
---
 .../ql/optimizer/topnkey/TopNKeyProcessor.java     |   9 ++
 .../apache/hadoop/hive/ql/plan/ReduceSinkDesc.java |  13 +++
 ql/src/test/queries/clientpositive/topnkey.q       |   8 ++
 .../test/results/clientpositive/llap/topnkey.q.out | 116 +++++++++++++++++++++
 4 files changed, 146 insertions(+)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java
index 47fb9cb7702..ac397a513d9 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java
@@ -75,6 +75,15 @@ public class TopNKeyProcessor implements 
SemanticNodeProcessor {
       return null;
     }
 
+    // HIVE-26671: We do not want to create a TopNKey processor when the 
reduce sink
+    // operator contains a count distinct. This would result in a topnkey 
operator
+    // with an extra group in its sort order. The TopNKey Pushdown Processor 
could then
+    // push down this operator and it would be incorrect since the count 
distinct adds
+    // a group that is only temporarily used for calculating a value.
+    if (reduceSinkDesc.hasADistinctColumnIndex()) {
+      return null;
+    }
+
     // Check whether there already is a top n key operator
     Operator<? extends OperatorDesc> parentOperator = 
reduceSinkOperator.getParentOperators().get(0);
     if (parentOperator instanceof TopNKeyOperator) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
index 028b83a2af6..b120ae951fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
@@ -430,6 +431,18 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
     this.distinctColumnIndices = distinctColumnIndices;
   }
 
+  public boolean hasADistinctColumnIndex() {
+    if (this.distinctColumnIndices == null) {
+      return false;
+    }
+    for (List<Integer> distinctColumnIndex : this.distinctColumnIndices) {
+      if (CollectionUtils.isNotEmpty(distinctColumnIndex)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   @Explain(displayName = "outputname", explainLevels = { Level.USER })
   public String getOutputName() {
     return outputName;
diff --git a/ql/src/test/queries/clientpositive/topnkey.q 
b/ql/src/test/queries/clientpositive/topnkey.q
index 6b53d6c6b1a..8a946b77655 100644
--- a/ql/src/test/queries/clientpositive/topnkey.q
+++ b/ql/src/test/queries/clientpositive/topnkey.q
@@ -65,4 +65,12 @@ SELECT a, b FROM t_test GROUP BY a, b ORDER BY a, b LIMIT 3;
 SET hive.optimize.topnkey=false;
 SELECT a, b FROM t_test GROUP BY a, b ORDER BY a, b LIMIT 3;
 
+SET hive.optimize.topnkey=true;
+EXPLAIN
+SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a ORDER BY a LIMIT 3;
+SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a ORDER BY a LIMIT 3;
+
+SET hive.optimize.topnkey=false;
+SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a ORDER BY a LIMIT 3;
+
 DROP TABLE t_test;
diff --git a/ql/src/test/results/clientpositive/llap/topnkey.q.out 
b/ql/src/test/results/clientpositive/llap/topnkey.q.out
index b75589ef75a..777d559f535 100644
--- a/ql/src/test/results/clientpositive/llap/topnkey.q.out
+++ b/ql/src/test/results/clientpositive/llap/topnkey.q.out
@@ -671,6 +671,122 @@ POSTHOOK: Input: default@t_test
 5      1
 5      2
 6      2
+PREHOOK: query: EXPLAIN
+SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a ORDER BY a LIMIT 3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t_test
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN
+SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a ORDER BY a LIMIT 3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t_test
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: t_test
+                  Statistics: Num rows: 8 Data size: 96 Basic stats: COMPLETE 
Column stats: COMPLETE
+                  Top N Key Operator
+                    sort order: +
+                    keys: a (type: int)
+                    null sort order: z
+                    Statistics: Num rows: 8 Data size: 96 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    top n: 3
+                    Select Operator
+                      expressions: a (type: int), b (type: int), c (type: int)
+                      outputColumnNames: a, b, c
+                      Statistics: Num rows: 8 Data size: 96 Basic stats: 
COMPLETE Column stats: COMPLETE
+                      Group By Operator
+                        aggregations: count(DISTINCT b), min(c)
+                        keys: a (type: int), b (type: int)
+                        minReductionHashAggr: 0.4
+                        mode: hash
+                        outputColumnNames: _col0, _col1, _col2, _col3
+                        Statistics: Num rows: 4 Data size: 80 Basic stats: 
COMPLETE Column stats: COMPLETE
+                        Reduce Output Operator
+                          key expressions: _col0 (type: int), _col1 (type: int)
+                          null sort order: zz
+                          sort order: ++
+                          Map-reduce partition columns: _col0 (type: int)
+                          Statistics: Num rows: 4 Data size: 80 Basic stats: 
COMPLETE Column stats: COMPLETE
+                          TopN Hash Memory Usage: 0.1
+                          value expressions: _col3 (type: int)
+            Execution mode: llap
+            LLAP IO: all inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(DISTINCT KEY._col1:0._col0), 
min(VALUE._col1)
+                keys: KEY._col0 (type: int)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 3 Data size: 48 Basic stats: COMPLETE 
Column stats: COMPLETE
+                Reduce Output Operator
+                  key expressions: _col0 (type: int)
+                  null sort order: z
+                  sort order: +
+                  Statistics: Num rows: 3 Data size: 48 Basic stats: COMPLETE 
Column stats: COMPLETE
+                  value expressions: _col1 (type: bigint), _col2 (type: int)
+        Reducer 3 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: int), VALUE._col0 
(type: bigint), VALUE._col1 (type: int)
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 3 Data size: 48 Basic stats: COMPLETE 
Column stats: COMPLETE
+                Limit
+                  Number of rows: 3
+                  Statistics: Num rows: 3 Data size: 48 Basic stats: COMPLETE 
Column stats: COMPLETE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 3 Data size: 48 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    table:
+                        input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: 3
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a 
ORDER BY a LIMIT 3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t_test
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a 
ORDER BY a LIMIT 3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t_test
+#### A masked pattern was here ####
+5      2       2
+6      1       1
+7      1       4
+PREHOOK: query: SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a 
ORDER BY a LIMIT 3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t_test
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT a, count(distinct b), min(c) FROM t_test GROUP BY a 
ORDER BY a LIMIT 3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t_test
+#### A masked pattern was here ####
+5      2       2
+6      1       1
+7      1       4
 PREHOOK: query: DROP TABLE t_test
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@t_test

Reply via email to