This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 021b417 HIVE-22726 : TopN Key optimizer should use array instead of
priority queue (Attila Magyar via Gopal V)
021b417 is described below
commit 021b4175f9d596188785d22f3cf4f158c6abd975
Author: Attila Magyar <[email protected]>
AuthorDate: Wed Jan 29 15:26:59 2020 -0800
HIVE-22726 : TopN Key optimizer should use array instead of priority queue
(Attila Magyar via Gopal V)
Signed-off-by: Ashutosh Chauhan <[email protected]>
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 2 +
.../apache/hadoop/hive/ql/exec/TopNKeyFilter.java | 61 ++++++++++++++++------
.../hadoop/hive/ql/exec/TopNKeyOperator.java | 23 ++++----
.../hive/ql/exec/vector/VectorTopNKeyOperator.java | 5 +-
.../vector/wrapper/VectorHashKeyWrapperBatch.java | 4 +-
.../VectorHashKeyWrapperGeneralComparator.java | 4 ++
.../ql/optimizer/topnkey/TopNKeyProcessor.java | 8 ++-
.../apache/hadoop/hive/ql/parse/TezCompiler.java | 2 +-
.../hadoop/hive/ql/exec/TestTopNKeyFilter.java | 8 +--
9 files changed, 82 insertions(+), 35 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1c4682c..93d6ff6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2384,6 +2384,8 @@ public class HiveConf extends Configuration {
"would change the query plan to take care of it, and
hive.optimize.skewjoin will be a no-op."),
HIVE_OPTIMIZE_TOPNKEY("hive.optimize.topnkey", true, "Whether to enable
top n key optimizer."),
+ HIVE_MAX_TOPN_ALLOWED("hive.optimize.topnkey.max", 128, "Maximum topN
value allowed by top n key optimizer.\n" +
+ "If the LIMIT is greater than this value then top n key optimization
won't be used."),
HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true,
"Whether to enable shared work optimizer. The optimizer finds scan
operator over the same table\n" +
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java
index 4998766..38d2e08 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java
@@ -17,38 +17,67 @@
*/
package org.apache.hadoop.hive.ql.exec;
+import static java.util.Arrays.binarySearch;
+
+import java.util.Arrays;
import java.util.Comparator;
-import java.util.PriorityQueue;
/**
* Implementation of filtering out keys.
* An instance of this class is wrapped in {@link TopNKeyOperator} and
* {@link org.apache.hadoop.hive.ql.exec.vector.VectorTopNKeyOperator}
- * @param <T> - Type of {@link KeyWrapper}. Each key is stored in a KeyWrapper
instance.
*/
-public class TopNKeyFilter<T extends KeyWrapper> {
- private final PriorityQueue<T> priorityQueue;
+public final class TopNKeyFilter {
private final int topN;
+ private Comparator<? extends KeyWrapper> comparator;
+ private KeyWrapper[] sortedTopItems;
+ private int size = 0;
+ private long repeated = 0;
+ private long added = 0;
+ private long total = 0;
- public TopNKeyFilter(int topN, Comparator<T> comparator) {
- // We need a reversed comparator because the PriorityQueue.poll() method
is used for filtering out keys.
- // Ex.: When ORDER BY key1 ASC then call of poll() should remove the
largest key.
- this.priorityQueue = new PriorityQueue<>(topN + 1, comparator.reversed());
+ public TopNKeyFilter(int topN, Comparator<? extends KeyWrapper> comparator) {
+ this.comparator = comparator;
+ this.sortedTopItems = new KeyWrapper[topN +1];
this.topN = topN;
}
- public boolean canForward(T kw) {
- if (!priorityQueue.contains(kw)) {
- priorityQueue.offer((T) kw.copyKey());
+ public final boolean canForward(KeyWrapper kw) {
+ total++;
+ int pos = binarySearch(sortedTopItems, 0, size, kw, (Comparator<? super
KeyWrapper>) comparator);
+ if (pos >= 0) { // found
+ repeated++;
+ return true;
}
- if (priorityQueue.size() > topN) {
- priorityQueue.poll();
+ pos = -pos -1; // not found, calculate insertion point
+ if (pos >= topN) { // would be inserted to the end, there are topN
elements which are smaller/larger
+ return false;
}
-
- return priorityQueue.contains(kw);
+ System.arraycopy(sortedTopItems, pos, sortedTopItems, pos +1, size - pos);
// make space by shifting
+ sortedTopItems[pos] = kw.copyKey();
+ added++;
+ if (size < topN) {
+ size++;
+ }
+ return true;
}
public void clear() {
- priorityQueue.clear();
+ this.size = 0;
+ this.repeated = 0;
+ this.added = 0;
+ this.total = 0;
+ Arrays.fill(sortedTopItems, null);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("TopNKeyFilter{");
+ sb.append("topN=").append(topN);
+ sb.append(", repeated=").append(repeated);
+ sb.append(", added=").append(added);
+ sb.append(", total=").append(total);
+ sb.append('}');
+ return sb.toString();
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
index 0ccaeea..bd8ff62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
@@ -18,6 +18,13 @@
package org.apache.hadoop.hive.ql.exec;
+import static org.apache.hadoop.hive.ql.plan.api.OperatorType.TOPNKEY;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -27,13 +34,6 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.hive.ql.plan.api.OperatorType.TOPNKEY;
-
/**
* TopNKeyOperator passes rows that contains top N keys only.
*/
@@ -41,7 +41,7 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc>
implements Serializab
private static final long serialVersionUID = 1L;
- private transient Map<KeyWrapper, TopNKeyFilter<KeyWrapper>> topNKeyFilters;
+ private transient Map<KeyWrapper, TopNKeyFilter> topNKeyFilters;
private transient KeyWrapper partitionKeyWrapper;
private transient KeyWrapper keyWrapper;
@@ -108,9 +108,9 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc>
implements Serializab
partitionKeyWrapper.getNewKey(row, inputObjInspectors[tag]);
partitionKeyWrapper.setHashKey();
- TopNKeyFilter<KeyWrapper> topNKeyFilter =
topNKeyFilters.get(partitionKeyWrapper);
+ TopNKeyFilter topNKeyFilter = topNKeyFilters.get(partitionKeyWrapper);
if (topNKeyFilter == null) {
- topNKeyFilter = new TopNKeyFilter<>(conf.getTopN(),
keyWrapperComparator);
+ topNKeyFilter = new TopNKeyFilter(conf.getTopN(), keyWrapperComparator);
topNKeyFilters.put(partitionKeyWrapper.copyKey(), topNKeyFilter);
}
@@ -124,6 +124,9 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc>
implements Serializab
@Override
protected final void closeOp(boolean abort) throws HiveException {
+ for (TopNKeyFilter each : topNKeyFilters.values()) {
+ each.clear();
+ }
topNKeyFilters.clear();
super.closeOp(abort);
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
index 5faa038..f03d650 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
@@ -46,7 +46,7 @@ public class VectorTopNKeyOperator extends
Operator<TopNKeyDesc> implements Vect
// Batch processing
private transient int[] temporarySelected;
private transient VectorHashKeyWrapperBatch keyWrappersBatch;
- private transient TopNKeyFilter<VectorHashKeyWrapperBase> topNKeyFilter;
+ private transient TopNKeyFilter topNKeyFilter;
public VectorTopNKeyOperator(CompilationOpContext ctx, OperatorDesc conf,
VectorizationContext vContext, VectorDesc vectorDesc) {
@@ -80,7 +80,7 @@ public class VectorTopNKeyOperator extends
Operator<TopNKeyDesc> implements Vect
temporarySelected = new int [VectorizedRowBatch.DEFAULT_SIZE];
keyWrappersBatch =
VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
- this.topNKeyFilter = new TopNKeyFilter<>(conf.getTopN(),
keyWrappersBatch.getComparator(
+ this.topNKeyFilter = new TopNKeyFilter(conf.getTopN(),
keyWrappersBatch.getComparator(
conf.getColumnSortOrder(),
conf.getNullOrder()));
}
@@ -169,6 +169,7 @@ public class VectorTopNKeyOperator extends
Operator<TopNKeyDesc> implements Vect
@Override
protected void closeOp(boolean abort) throws HiveException {
+ LOG.info("Closing TopNKeyFilter: {}.", topNKeyFilter);
topNKeyFilter.clear();
super.closeOp(abort);
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBatch.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBatch.java
index 0786c82..b487480 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBatch.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBatch.java
@@ -1084,7 +1084,9 @@ public class VectorHashKeyWrapperBatch extends
VectorColumnSetInfo {
comparator.addColumnComparator(
i, columnTypeSpecificIndex, columnVectorType,
columnSortOrder.charAt(i), nullOrder.charAt(i));
}
-
+ if (comparator.getComparators().size() == 1) { // don't use the composite
comparator for n=1
+ return comparator.getComparators().get(0);
+ }
return comparator;
}
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneralComparator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneralComparator.java
index 8cb4847..06ac661 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneralComparator.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneralComparator.java
@@ -133,4 +133,8 @@ public class VectorHashKeyWrapperGeneralComparator
}
return 0;
}
+
+ public List<VectorHashKeyWrapperBaseComparator> getComparators() {
+ return comparators;
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java
index a9ff6b4..8ad52d0 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java
@@ -43,8 +43,10 @@ import java.util.Stack;
*/
public class TopNKeyProcessor implements NodeProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(TopNKeyProcessor.class);
+ private final int maxTopNAllowed;
- public TopNKeyProcessor() {
+ public TopNKeyProcessor(int maxTopNAllowed) {
+ this.maxTopNAllowed = maxTopNAllowed;
}
@Override
@@ -60,6 +62,10 @@ public class TopNKeyProcessor implements NodeProcessor {
return null;
}
+ if (reduceSinkDesc.getTopN() > maxTopNAllowed) {
+ return null;
+ }
+
// Check whether there already is a top n key operator
Operator<? extends OperatorDesc> parentOperator =
reduceSinkOperator.getParentOperators().get(0);
if (parentOperator instanceof TopNKeyOperator) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index 5a78ed5..f1ebc2f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -1289,7 +1289,7 @@ public class TezCompiler extends TaskCompiler {
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule,
NodeProcessor>();
opRules.put(
new RuleRegExp("Top n key optimization",
ReduceSinkOperator.getOperatorName() + "%"),
- new TopNKeyProcessor());
+ new TopNKeyProcessor(HiveConf.getIntVar(procCtx.conf,
HiveConf.ConfVars.HIVE_MAX_TOPN_ALLOWED)));
opRules.put(
new RuleRegExp("Top n key pushdown",
TopNKeyOperator.getOperatorName() + "%"),
new TopNKeyPushdownProcessor());
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java
b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java
index fce850f..95cd459 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java
@@ -36,14 +36,14 @@ public class TestTopNKeyFilter {
@Test
public void testNothingCanBeForwardedIfTopNIs0() {
- TopNKeyFilter<TestKeyWrapper> topNKeyFilter = new TopNKeyFilter<>(0,
TEST_KEY_WRAPPER_COMPARATOR);
+ TopNKeyFilter topNKeyFilter = new TopNKeyFilter(0,
TEST_KEY_WRAPPER_COMPARATOR);
assertThat(topNKeyFilter.canForward(new TestKeyWrapper(1)), is(false));
assertThat(topNKeyFilter.canForward(new TestKeyWrapper(-1)), is(false));
}
@Test
public void testFirstTopNKeysCanBeForwarded() {
- TopNKeyFilter<TestKeyWrapper> topNKeyFilter = new TopNKeyFilter<>(3,
TEST_KEY_WRAPPER_COMPARATOR);
+ TopNKeyFilter topNKeyFilter = new TopNKeyFilter(3,
TEST_KEY_WRAPPER_COMPARATOR);
assertThat(topNKeyFilter.canForward(new TestKeyWrapper(1)), is(true));
assertThat(topNKeyFilter.canForward(new TestKeyWrapper(5)), is(true));
assertThat(topNKeyFilter.canForward(new TestKeyWrapper(10)), is(true));
@@ -52,7 +52,7 @@ public class TestTopNKeyFilter {
@Test
public void testKeyCanNotBeForwardedIfItIsDroppedOutFromTopNKeys() {
- TopNKeyFilter<TestKeyWrapper> topNKeyFilter = new TopNKeyFilter<>(2,
TEST_KEY_WRAPPER_COMPARATOR);
+ TopNKeyFilter topNKeyFilter = new TopNKeyFilter(2,
TEST_KEY_WRAPPER_COMPARATOR);
assertThat(topNKeyFilter.canForward(new TestKeyWrapper(1)), is(true));
assertThat(topNKeyFilter.canForward(new TestKeyWrapper(3)), is(true));
assertThat(topNKeyFilter.canForward(new TestKeyWrapper(2)), is(true));
@@ -61,7 +61,7 @@ public class TestTopNKeyFilter {
@Test
public void testMembersOfTopNKeysStillCanBeForwardedAfterNonTopNKeysTried() {
- TopNKeyFilter<TestKeyWrapper> topNKeyFilter = new TopNKeyFilter<>(2,
TEST_KEY_WRAPPER_COMPARATOR);
+ TopNKeyFilter topNKeyFilter = new TopNKeyFilter(2,
TEST_KEY_WRAPPER_COMPARATOR);
assertThat(topNKeyFilter.canForward(new TestKeyWrapper(1)), is(true));
assertThat(topNKeyFilter.canForward(new TestKeyWrapper(3)), is(true));
assertThat(topNKeyFilter.canForward(new TestKeyWrapper(5)), is(false));