This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 021b417  HIVE-22726 : TopN Key optimizer should use array instead of 
priority queue (Attila Magyar via Gopal V)
021b417 is described below

commit 021b4175f9d596188785d22f3cf4f158c6abd975
Author: Attila Magyar <[email protected]>
AuthorDate: Wed Jan 29 15:26:59 2020 -0800

    HIVE-22726 : TopN Key optimizer should use array instead of priority queue 
(Attila Magyar via Gopal V)
    
    Signed-off-by: Ashutosh Chauhan <[email protected]>
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  2 +
 .../apache/hadoop/hive/ql/exec/TopNKeyFilter.java  | 61 ++++++++++++++++------
 .../hadoop/hive/ql/exec/TopNKeyOperator.java       | 23 ++++----
 .../hive/ql/exec/vector/VectorTopNKeyOperator.java |  5 +-
 .../vector/wrapper/VectorHashKeyWrapperBatch.java  |  4 +-
 .../VectorHashKeyWrapperGeneralComparator.java     |  4 ++
 .../ql/optimizer/topnkey/TopNKeyProcessor.java     |  8 ++-
 .../apache/hadoop/hive/ql/parse/TezCompiler.java   |  2 +-
 .../hadoop/hive/ql/exec/TestTopNKeyFilter.java     |  8 +--
 9 files changed, 82 insertions(+), 35 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1c4682c..93d6ff6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2384,6 +2384,8 @@ public class HiveConf extends Configuration {
         "would change the query plan to take care of it, and 
hive.optimize.skewjoin will be a no-op."),
 
     HIVE_OPTIMIZE_TOPNKEY("hive.optimize.topnkey", true, "Whether to enable 
top n key optimizer."),
+    HIVE_MAX_TOPN_ALLOWED("hive.optimize.topnkey.max", 128, "Maximum topN 
value allowed by top n key optimizer.\n" +
+      "If the LIMIT is greater than this value then top n key optimization 
won't be used."),
 
     HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true,
         "Whether to enable shared work optimizer. The optimizer finds scan 
operator over the same table\n" +
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java
index 4998766..38d2e08 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java
@@ -17,38 +17,67 @@
  */
 package org.apache.hadoop.hive.ql.exec;
 
+import static java.util.Arrays.binarySearch;
+
+import java.util.Arrays;
 import java.util.Comparator;
-import java.util.PriorityQueue;
 
 /**
  * Implementation of filtering out keys.
  * An instance of this class is wrapped in {@link TopNKeyOperator} and
  * {@link org.apache.hadoop.hive.ql.exec.vector.VectorTopNKeyOperator}
- * @param <T> - Type of {@link KeyWrapper}. Each key is stored in a KeyWrapper 
instance.
  */
-public class TopNKeyFilter<T extends KeyWrapper> {
-  private final PriorityQueue<T> priorityQueue;
+public final class TopNKeyFilter {
   private final int topN;
+  private Comparator<? extends KeyWrapper> comparator;
+  private KeyWrapper[] sortedTopItems;
+  private int size = 0;
+  private long repeated = 0;
+  private long added = 0;
+  private long total = 0;
 
-  public TopNKeyFilter(int topN, Comparator<T> comparator) {
-    // We need a reversed comparator because the PriorityQueue.poll() method 
is used for filtering out keys.
-    // Ex.: When ORDER BY key1 ASC then call of poll() should remove the 
largest key.
-    this.priorityQueue = new PriorityQueue<>(topN + 1, comparator.reversed());
+  public TopNKeyFilter(int topN, Comparator<? extends KeyWrapper> comparator) {
+    this.comparator = comparator;
+    this.sortedTopItems = new KeyWrapper[topN +1];
     this.topN = topN;
   }
 
-  public boolean canForward(T kw) {
-    if (!priorityQueue.contains(kw)) {
-      priorityQueue.offer((T) kw.copyKey());
+  public final boolean canForward(KeyWrapper kw) {
+    total++;
+    int pos = binarySearch(sortedTopItems, 0, size, kw, (Comparator<? super 
KeyWrapper>) comparator);
+    if (pos >= 0) { // found
+      repeated++;
+      return true;
     }
-    if (priorityQueue.size() > topN) {
-      priorityQueue.poll();
+    pos = -pos -1; // not found, calculate insertion point
+    if (pos >= topN) { // would be inserted to the end, there are topN 
elements which are smaller/larger
+      return false;
     }
-
-    return priorityQueue.contains(kw);
+    System.arraycopy(sortedTopItems, pos, sortedTopItems, pos +1, size - pos); 
// make space by shifting
+    sortedTopItems[pos] = kw.copyKey();
+    added++;
+    if (size < topN) {
+      size++;
+    }
+    return true;
   }
 
   public void clear() {
-    priorityQueue.clear();
+    this.size = 0;
+    this.repeated = 0;
+    this.added = 0;
+    this.total = 0;
+    Arrays.fill(sortedTopItems, null);
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("TopNKeyFilter{");
+    sb.append("topN=").append(topN);
+    sb.append(", repeated=").append(repeated);
+    sb.append(", added=").append(added);
+    sb.append(", total=").append(total);
+    sb.append('}');
+    return sb.toString();
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
index 0ccaeea..bd8ff62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
@@ -18,6 +18,13 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import static org.apache.hadoop.hive.ql.plan.api.OperatorType.TOPNKEY;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -27,13 +34,6 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.hive.ql.plan.api.OperatorType.TOPNKEY;
-
 /**
  * TopNKeyOperator passes rows that contains top N keys only.
  */
@@ -41,7 +41,7 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc> 
implements Serializab
 
   private static final long serialVersionUID = 1L;
 
-  private transient Map<KeyWrapper, TopNKeyFilter<KeyWrapper>> topNKeyFilters;
+  private transient Map<KeyWrapper, TopNKeyFilter> topNKeyFilters;
 
   private transient KeyWrapper partitionKeyWrapper;
   private transient KeyWrapper keyWrapper;
@@ -108,9 +108,9 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc> 
implements Serializab
     partitionKeyWrapper.getNewKey(row, inputObjInspectors[tag]);
     partitionKeyWrapper.setHashKey();
 
-    TopNKeyFilter<KeyWrapper> topNKeyFilter = 
topNKeyFilters.get(partitionKeyWrapper);
+    TopNKeyFilter topNKeyFilter = topNKeyFilters.get(partitionKeyWrapper);
     if (topNKeyFilter == null) {
-      topNKeyFilter = new TopNKeyFilter<>(conf.getTopN(), 
keyWrapperComparator);
+      topNKeyFilter = new TopNKeyFilter(conf.getTopN(), keyWrapperComparator);
       topNKeyFilters.put(partitionKeyWrapper.copyKey(), topNKeyFilter);
     }
 
@@ -124,6 +124,9 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc> 
implements Serializab
 
   @Override
   protected final void closeOp(boolean abort) throws HiveException {
+    for (TopNKeyFilter each : topNKeyFilters.values()) {
+      each.clear();
+    }
     topNKeyFilters.clear();
     super.closeOp(abort);
   }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
index 5faa038..f03d650 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
@@ -46,7 +46,7 @@ public class VectorTopNKeyOperator extends 
Operator<TopNKeyDesc> implements Vect
   // Batch processing
   private transient int[] temporarySelected;
   private transient VectorHashKeyWrapperBatch keyWrappersBatch;
-  private transient TopNKeyFilter<VectorHashKeyWrapperBase> topNKeyFilter;
+  private transient TopNKeyFilter topNKeyFilter;
 
   public VectorTopNKeyOperator(CompilationOpContext ctx, OperatorDesc conf,
       VectorizationContext vContext, VectorDesc vectorDesc) {
@@ -80,7 +80,7 @@ public class VectorTopNKeyOperator extends 
Operator<TopNKeyDesc> implements Vect
     temporarySelected = new int [VectorizedRowBatch.DEFAULT_SIZE];
 
     keyWrappersBatch = 
VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
-    this.topNKeyFilter = new TopNKeyFilter<>(conf.getTopN(), 
keyWrappersBatch.getComparator(
+    this.topNKeyFilter = new TopNKeyFilter(conf.getTopN(), 
keyWrappersBatch.getComparator(
             conf.getColumnSortOrder(),
             conf.getNullOrder()));
   }
@@ -169,6 +169,7 @@ public class VectorTopNKeyOperator extends 
Operator<TopNKeyDesc> implements Vect
 
   @Override
   protected void closeOp(boolean abort) throws HiveException {
+    LOG.info("Closing TopNKeyFilter: {}.", topNKeyFilter);
     topNKeyFilter.clear();
     super.closeOp(abort);
   }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBatch.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBatch.java
index 0786c82..b487480 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBatch.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBatch.java
@@ -1084,7 +1084,9 @@ public class VectorHashKeyWrapperBatch extends 
VectorColumnSetInfo {
       comparator.addColumnComparator(
               i, columnTypeSpecificIndex, columnVectorType, 
columnSortOrder.charAt(i), nullOrder.charAt(i));
     }
-
+    if (comparator.getComparators().size() == 1) { // don't use the composite 
comparator for n=1
+      return comparator.getComparators().get(0);
+    }
     return comparator;
   }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneralComparator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneralComparator.java
index 8cb4847..06ac661 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneralComparator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneralComparator.java
@@ -133,4 +133,8 @@ public class VectorHashKeyWrapperGeneralComparator
     }
     return 0;
   }
+
+  public List<VectorHashKeyWrapperBaseComparator> getComparators() {
+    return comparators;
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java
index a9ff6b4..8ad52d0 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java
@@ -43,8 +43,10 @@ import java.util.Stack;
  */
 public class TopNKeyProcessor implements NodeProcessor {
   private static final Logger LOG = 
LoggerFactory.getLogger(TopNKeyProcessor.class);
+  private final int maxTopNAllowed;
 
-  public TopNKeyProcessor() {
+  public TopNKeyProcessor(int maxTopNAllowed) {
+    this.maxTopNAllowed = maxTopNAllowed;
   }
 
   @Override
@@ -60,6 +62,10 @@ public class TopNKeyProcessor implements NodeProcessor {
       return null;
     }
 
+    if (reduceSinkDesc.getTopN() > maxTopNAllowed) {
+      return null;
+    }
+
     // Check whether there already is a top n key operator
     Operator<? extends OperatorDesc> parentOperator = 
reduceSinkOperator.getParentOperators().get(0);
     if (parentOperator instanceof TopNKeyOperator) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index 5a78ed5..f1ebc2f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -1289,7 +1289,7 @@ public class TezCompiler extends TaskCompiler {
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, 
NodeProcessor>();
     opRules.put(
         new RuleRegExp("Top n key optimization", 
ReduceSinkOperator.getOperatorName() + "%"),
-        new TopNKeyProcessor());
+        new TopNKeyProcessor(HiveConf.getIntVar(procCtx.conf, 
HiveConf.ConfVars.HIVE_MAX_TOPN_ALLOWED)));
     opRules.put(
             new RuleRegExp("Top n key pushdown", 
TopNKeyOperator.getOperatorName() + "%"),
             new TopNKeyPushdownProcessor());
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java
index fce850f..95cd459 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java
@@ -36,14 +36,14 @@ public class TestTopNKeyFilter {
 
   @Test
   public void testNothingCanBeForwardedIfTopNIs0() {
-    TopNKeyFilter<TestKeyWrapper> topNKeyFilter = new TopNKeyFilter<>(0, 
TEST_KEY_WRAPPER_COMPARATOR);
+    TopNKeyFilter topNKeyFilter = new TopNKeyFilter(0, 
TEST_KEY_WRAPPER_COMPARATOR);
     assertThat(topNKeyFilter.canForward(new TestKeyWrapper(1)), is(false));
     assertThat(topNKeyFilter.canForward(new TestKeyWrapper(-1)), is(false));
   }
 
   @Test
   public void testFirstTopNKeysCanBeForwarded() {
-    TopNKeyFilter<TestKeyWrapper> topNKeyFilter = new TopNKeyFilter<>(3, 
TEST_KEY_WRAPPER_COMPARATOR);
+    TopNKeyFilter topNKeyFilter = new TopNKeyFilter(3, 
TEST_KEY_WRAPPER_COMPARATOR);
     assertThat(topNKeyFilter.canForward(new TestKeyWrapper(1)), is(true));
     assertThat(topNKeyFilter.canForward(new TestKeyWrapper(5)), is(true));
     assertThat(topNKeyFilter.canForward(new TestKeyWrapper(10)), is(true));
@@ -52,7 +52,7 @@ public class TestTopNKeyFilter {
 
   @Test
   public void testKeyCanNotBeForwardedIfItIsDroppedOutFromTopNKeys() {
-    TopNKeyFilter<TestKeyWrapper> topNKeyFilter = new TopNKeyFilter<>(2, 
TEST_KEY_WRAPPER_COMPARATOR);
+    TopNKeyFilter topNKeyFilter = new TopNKeyFilter(2, 
TEST_KEY_WRAPPER_COMPARATOR);
     assertThat(topNKeyFilter.canForward(new TestKeyWrapper(1)), is(true));
     assertThat(topNKeyFilter.canForward(new TestKeyWrapper(3)), is(true));
     assertThat(topNKeyFilter.canForward(new TestKeyWrapper(2)), is(true));
@@ -61,7 +61,7 @@ public class TestTopNKeyFilter {
 
   @Test
   public void testMembersOfTopNKeysStillCanBeForwardedAfterNonTopNKeysTried() {
-    TopNKeyFilter<TestKeyWrapper> topNKeyFilter = new TopNKeyFilter<>(2, 
TEST_KEY_WRAPPER_COMPARATOR);
+    TopNKeyFilter topNKeyFilter = new TopNKeyFilter(2, 
TEST_KEY_WRAPPER_COMPARATOR);
     assertThat(topNKeyFilter.canForward(new TestKeyWrapper(1)), is(true));
     assertThat(topNKeyFilter.canForward(new TestKeyWrapper(3)), is(true));
     assertThat(topNKeyFilter.canForward(new TestKeyWrapper(5)), is(false));

Reply via email to