This is an automated email from the ASF dual-hosted git repository.
jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 1fc351b HIVE-22925: Implement TopNKeyFilter efficiency check (Attila
Magyar, reviewed by Jesus Camacho Rodriguez)
1fc351b is described below
commit 1fc351b632d9cc7aa953769858c5e4992bf80b25
Author: Attila Magyar <[email protected]>
AuthorDate: Fri Feb 28 13:35:03 2020 -0800
HIVE-22925: Implement TopNKeyFilter efficiency check (Attila Magyar,
reviewed by Jesus Camacho Rodriguez)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../apache/hadoop/hive/ql/exec/TopNKeyFilter.java | 21 +++++-
.../hadoop/hive/ql/exec/TopNKeyOperator.java | 46 +++++++++++---
.../hive/ql/exec/vector/VectorTopNKeyOperator.java | 74 ++++++++++++++++++----
.../ql/optimizer/topnkey/TopNKeyProcessor.java | 25 ++++++--
.../apache/hadoop/hive/ql/parse/TezCompiler.java | 6 +-
.../apache/hadoop/hive/ql/plan/TopNKeyDesc.java | 47 +++++++++++---
.../hadoop/hive/ql/exec/TestTopNKeyFilter.java | 56 +++++++++++++++-
8 files changed, 237 insertions(+), 41 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1a4d71b..0eda0d2 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2416,6 +2416,9 @@ public class HiveConf extends Configuration {
HIVE_OPTIMIZE_TOPNKEY("hive.optimize.topnkey", true, "Whether to enable
top n key optimizer."),
HIVE_MAX_TOPN_ALLOWED("hive.optimize.topnkey.max", 128, "Maximum topN
value allowed by top n key optimizer.\n" +
"If the LIMIT is greater than this value then top n key optimization
won't be used."),
+
HIVE_TOPN_EFFICIENCY_THRESHOLD("hive.optimize.topnkey.efficiency.threshold",
0.6f, "Disable topN key filter if the ratio between forwarded and total rows
reaches this limit."),
+
HIVE_TOPN_EFFICIENCY_CHECK_BATCHES("hive.optimize.topnkey.efficiency.check.nbatches",
8, "Check topN key filter efficiency after a specific number of batches."),
+ HIVE_TOPN_MAX_NUMBER_OF_PARTITIONS("hive.optimize.topnkey.partitions.max",
64, "Limit the maximum number of partitions used by the top N key operator."),
HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true,
"Whether to enable shared work optimizer. The optimizer finds scan
operator over the same table\n" +
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java
index 38d2e08..361dbc3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java
@@ -73,11 +73,30 @@ public final class TopNKeyFilter {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("TopNKeyFilter{");
- sb.append("topN=").append(topN);
+ sb.append("id=").append(super.toString());
+ sb.append(", topN=").append(topN);
sb.append(", repeated=").append(repeated);
sb.append(", added=").append(added);
sb.append(", total=").append(total);
+ sb.append(", forwardingRatio=").append(forwardingRatio());
sb.append('}');
return sb.toString();
}
+
+ /**
+ * Ratio between the forwarded rows and the total incoming rows.
+ * The higher the number is, the less is the efficiency of the filter.
+ * 1 means all rows should be forwarded.
+ * @return
+ */
+ public float forwardingRatio() {
+ if (total == 0) {
+ return 0;
+ }
+ return ((repeated + added) / (float)total);
+ }
+
+ public long getTotal() {
+ return total;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
index dd66dfc..f09867b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
@@ -18,12 +18,15 @@
package org.apache.hadoop.hive.ql.exec;
+import static
org.apache.hadoop.hive.ql.exec.vector.VectorTopNKeyOperator.checkTopNFilterEfficiency;
import static org.apache.hadoop.hive.ql.plan.api.OperatorType.TOPNKEY;
import java.io.Serializable;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.CompilationOpContext;
@@ -47,6 +50,7 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc>
implements Serializab
private transient KeyWrapper keyWrapper;
private transient KeyWrapperComparator keyWrapperComparator;
+ private transient Set<KeyWrapper> disabledPartitions;
/** Kryo ctor. */
public TopNKeyOperator() {
@@ -82,6 +86,7 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc>
implements Serializab
keyObjectInspectors, currentKeyObjectInspectors, columnSortOrder,
nullSortOrder);
this.topNKeyFilters = new HashMap<>();
+ this.disabledPartitions = new HashSet<>();
}
private KeyWrapper initObjectInspectors(Configuration hconf,
@@ -105,29 +110,54 @@ public class TopNKeyOperator extends
Operator<TopNKeyDesc> implements Serializab
@Override
public void process(Object row, int tag) throws HiveException {
+ if (!disabledPartitions.isEmpty() && disabledPartitions.size() ==
topNKeyFilters.size()) { // all filters are disabled due to efficiency check
+ forward(row, outputObjInspector);
+ return;
+ }
+
partitionKeyWrapper.getNewKey(row, inputObjInspectors[tag]);
partitionKeyWrapper.setHashKey();
+ if (disabledPartitions.contains(partitionKeyWrapper)) { // filter for this
partition is disabled
+ forward(row, outputObjInspector);
+ return;
+ }
+
TopNKeyFilter topNKeyFilter = topNKeyFilters.get(partitionKeyWrapper);
- if (topNKeyFilter == null) {
+ if (topNKeyFilter == null && topNKeyFilters.size() <
conf.getMaxNumberOfPartitions()) {
topNKeyFilter = new TopNKeyFilter(conf.getTopN(), keyWrapperComparator);
topNKeyFilters.put(partitionKeyWrapper.copyKey(), topNKeyFilter);
}
-
- keyWrapper.getNewKey(row, inputObjInspectors[tag]);
- keyWrapper.setHashKey();
-
- if (topNKeyFilter.canForward(keyWrapper)) {
+ if (topNKeyFilter == null) {
forward(row, outputObjInspector);
+ } else {
+ keyWrapper.getNewKey(row, inputObjInspectors[tag]);
+ keyWrapper.setHashKey();
+ if (topNKeyFilter.canForward(keyWrapper)) {
+ forward(row, outputObjInspector);
+ }
+ }
+
+ if (runTimeNumRows % conf.getCheckEfficiencyNumRows() == 0) { // check the
efficiency at every nth rows
+ checkTopNFilterEfficiency(topNKeyFilters, disabledPartitions,
conf.getEfficiencyThreshold(), LOG);
}
}
@Override
protected final void closeOp(boolean abort) throws HiveException {
- for (TopNKeyFilter each : topNKeyFilters.values()) {
- each.clear();
+ if (topNKeyFilters.size() == 1) {
+ TopNKeyFilter filter = topNKeyFilters.values().iterator().next();
+ LOG.info("Closing TopNKeyFilter: {}", filter);
+ filter.clear();
+ } else {
+ LOG.info("Closing {} TopNKeyFilters", topNKeyFilters.size());
+ for (TopNKeyFilter each : topNKeyFilters.values()) {
+ LOG.debug("Closing TopNKeyFilter: {}", each);
+ each.clear();
+ }
}
topNKeyFilters.clear();
+ disabledPartitions.clear();
super.closeOp(abort);
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
index 7feadd3..0f8eb17 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
@@ -19,13 +19,14 @@ package org.apache.hadoop.hive.ql.exec.vector;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.KeyWrapper;
-import org.apache.hadoop.hive.ql.exec.KeyWrapperComparator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TopNKeyFilter;
import org.apache.hadoop.hive.ql.exec.TopNKeyOperator;
@@ -38,6 +39,9 @@ import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
import org.apache.hadoop.hive.ql.plan.VectorDesc;
import org.apache.hadoop.hive.ql.plan.VectorTopNKeyDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.slf4j.Logger;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* VectorTopNKeyOperator passes rows that contains top N keys only.
@@ -54,8 +58,9 @@ public class VectorTopNKeyOperator extends
Operator<TopNKeyDesc> implements Vect
private transient VectorHashKeyWrapperBatch partitionKeyWrapperBatch;
private transient VectorHashKeyWrapperBatch keyWrappersBatch;
private transient Map<KeyWrapper, TopNKeyFilter> topNKeyFilters;
+ private transient Set<KeyWrapper> disabledPartitions;
private transient Comparator<VectorHashKeyWrapperBase> keyWrapperComparator;
-
+ private transient long incomingBatches;
public VectorTopNKeyOperator(CompilationOpContext ctx, OperatorDesc conf,
VectorizationContext vContext, VectorDesc vectorDesc) {
@@ -92,6 +97,8 @@ public class VectorTopNKeyOperator extends
Operator<TopNKeyDesc> implements Vect
keyWrapperComparator =
keyWrappersBatch.getComparator(conf.getColumnSortOrder(), conf.getNullOrder());
partitionKeyWrapperBatch =
VectorHashKeyWrapperBatch.compileKeyWrapperBatch(partitionKeyExpressions);
topNKeyFilters = new HashMap<>();
+ disabledPartitions = new HashSet<>();
+ incomingBatches = 0;
}
private void initKeyExpressions(Configuration hconf, VectorExpression[]
keyExpressions) throws HiveException {
@@ -104,7 +111,11 @@ public class VectorTopNKeyOperator extends
Operator<TopNKeyDesc> implements Vect
@Override
public void process(Object data, int tag) throws HiveException {
VectorizedRowBatch batch = (VectorizedRowBatch) data;
-
+ if (!disabledPartitions.isEmpty() && disabledPartitions.size() ==
topNKeyFilters.size()) { // all filters are disabled due to efficiency check
+ vectorForward(batch);
+ return;
+ }
+ incomingBatches++;
// The selected vector represents selected rows.
// Clone the selected vector
System.arraycopy(batch.selected, 0, temporarySelected, 0, batch.size);
@@ -134,15 +145,18 @@ public class VectorTopNKeyOperator extends
Operator<TopNKeyDesc> implements Vect
j = i;
}
- // Select a row in the priority queue
- TopNKeyFilter topNKeyFilter =
topNKeyFilters.get(partitionKeyWrappers[i]);
- if (topNKeyFilter == null) {
- topNKeyFilter = new TopNKeyFilter(conf.getTopN(),
keyWrapperComparator);
- topNKeyFilters.put(partitionKeyWrappers[i].copyKey(), topNKeyFilter);
- }
-
- if (topNKeyFilter.canForward(keyWrappers[i])) {
+ VectorHashKeyWrapperBase partitionKey = partitionKeyWrappers[i];
+ if (disabledPartitions.contains(partitionKey)) { // filter for this
partition is disabled
selected[size++] = j;
+ } else {
+ TopNKeyFilter topNKeyFilter = topNKeyFilters.get(partitionKey);
+ if (topNKeyFilter == null && topNKeyFilters.size() <
conf.getMaxNumberOfPartitions()) {
+ topNKeyFilter = new TopNKeyFilter(conf.getTopN(),
keyWrapperComparator);
+ topNKeyFilters.put(partitionKey.copyKey(), topNKeyFilter);
+ }
+ if (topNKeyFilter == null || topNKeyFilter.canForward(keyWrappers[i]))
{
+ selected[size++] = j;
+ }
}
}
@@ -162,6 +176,28 @@ public class VectorTopNKeyOperator extends
Operator<TopNKeyDesc> implements Vect
batch.selected = selectedBackup;
batch.size = sizeBackup;
batch.selectedInUse = selectedInUseBackup;
+
+ if (incomingBatches % conf.getCheckEfficiencyNumBatches() == 0) {
+ checkTopNFilterEfficiency(topNKeyFilters, disabledPartitions,
conf.getEfficiencyThreshold(), LOG);
+ }
+ }
+
+ public static void checkTopNFilterEfficiency(Map<KeyWrapper, TopNKeyFilter>
filters,
+ Set<KeyWrapper>
disabledPartitions,
+ float efficiencyThreshold,
+ Logger log)
+ {
+ Iterator<Map.Entry<KeyWrapper, TopNKeyFilter>> iterator =
filters.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<KeyWrapper, TopNKeyFilter> each = iterator.next();
+ KeyWrapper partitionKey = each.getKey();
+ TopNKeyFilter filter = each.getValue();
+ log.debug("Checking TopN Filter efficiency {}, threshold: {}", filter,
efficiencyThreshold);
+ if (filter.forwardingRatio() >= efficiencyThreshold) {
+ log.info("Disabling TopN Filter {}", filter);
+ disabledPartitions.add(partitionKey);
+ }
+ }
}
@Override
@@ -195,9 +231,19 @@ public class VectorTopNKeyOperator extends
Operator<TopNKeyDesc> implements Vect
@Override
protected void closeOp(boolean abort) throws HiveException {
// LOG.info("Closing TopNKeyFilter: {}.", topNKeyFilter);
- for (TopNKeyFilter topNKeyFilter : topNKeyFilters.values()) {
- topNKeyFilter.clear();
+ if (topNKeyFilters.size() == 1) {
+ TopNKeyFilter filter = topNKeyFilters.values().iterator().next();
+ LOG.info("Closing TopNKeyFilter: {}", filter);
+ filter.clear();
+ } else {
+ LOG.info("Closing {} TopNKeyFilters", topNKeyFilters.size());
+ for (TopNKeyFilter each : topNKeyFilters.values()) {
+ LOG.debug("Closing TopNKeyFilter: {}", each);
+ each.clear();
+ }
}
+ topNKeyFilters.clear();
+ disabledPartitions.clear();
super.closeOp(abort);
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java
index 3869ffa..bf5083f 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hive.ql.optimizer.topnkey;
+import java.util.Collections;
+import java.util.List;
+import java.util.Stack;
+
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
@@ -33,20 +37,25 @@ import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
-import java.util.List;
-import java.util.Stack;
-
/**
* TopNKeyProcessor is a processor for TopNKeyOperator.
* A TopNKeyOperator will be placed before any ReduceSinkOperator which has a
topN property >= 0.
*/
public class TopNKeyProcessor implements SemanticNodeProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(TopNKeyProcessor.class);
- private final int maxTopNAllowed;
+ private float efficiencyThreshold;
+ private long checkEfficiencyNumBatches;
+ private int maxTopNAllowed;
+ private int maxNumberOfPartitions;
- public TopNKeyProcessor(int maxTopNAllowed) {
+ public TopNKeyProcessor() {
+ }
+
+ public TopNKeyProcessor(int maxTopNAllowed, float efficiencyThreshold, long
checkEfficiencyNumBatches, int maxNumberOfPartitions) {
this.maxTopNAllowed = maxTopNAllowed;
+ this.efficiencyThreshold = efficiencyThreshold;
+ this.checkEfficiencyNumBatches = checkEfficiencyNumBatches;
+ this.maxNumberOfPartitions = maxNumberOfPartitions;
}
@Override
@@ -84,7 +93,9 @@ public class TopNKeyProcessor implements
SemanticNodeProcessor {
}
TopNKeyDesc topNKeyDesc = new TopNKeyDesc(reduceSinkDesc.getTopN(),
reduceSinkDesc.getOrder(),
- reduceSinkDesc.getNullOrder(), reduceSinkDesc.getKeyCols(),
partitionCols);
+ reduceSinkDesc.getNullOrder(), reduceSinkDesc.getKeyCols(), partitionCols,
+ efficiencyThreshold, checkEfficiencyNumBatches, maxNumberOfPartitions);
+
copyDown(reduceSinkOperator, topNKeyDesc);
return null;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index 31735c9..caab056 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -1289,7 +1289,11 @@ public class TezCompiler extends TaskCompiler {
Map<SemanticRule, SemanticNodeProcessor> opRules = new
LinkedHashMap<SemanticRule, SemanticNodeProcessor>();
opRules.put(
new RuleRegExp("Top n key optimization",
ReduceSinkOperator.getOperatorName() + "%"),
- new TopNKeyProcessor(HiveConf.getIntVar(procCtx.conf,
HiveConf.ConfVars.HIVE_MAX_TOPN_ALLOWED)));
+ new TopNKeyProcessor(
+ HiveConf.getIntVar(procCtx.conf,
HiveConf.ConfVars.HIVE_MAX_TOPN_ALLOWED),
+ HiveConf.getFloatVar(procCtx.conf,
ConfVars.HIVE_TOPN_EFFICIENCY_THRESHOLD),
+ HiveConf.getIntVar(procCtx.conf,
ConfVars.HIVE_TOPN_EFFICIENCY_CHECK_BATCHES),
+ HiveConf.getIntVar(procCtx.conf,
ConfVars.HIVE_TOPN_MAX_NUMBER_OF_PARTITIONS)));
opRules.put(
new RuleRegExp("Top n key pushdown",
TopNKeyOperator.getOperatorName() + "%"),
new TopNKeyPushdownProcessor());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java
index 19910a3..ddd657e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java
@@ -17,13 +17,14 @@
*/
package org.apache.hadoop.hive.ql.plan;
-import org.apache.hadoop.hive.ql.optimizer.topnkey.CommonKeyPrefix;
-import org.apache.hadoop.hive.ql.plan.Explain.Level;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.optimizer.topnkey.CommonKeyPrefix;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
/**
* TopNKeyDesc.
*
@@ -38,19 +39,30 @@ public class TopNKeyDesc extends AbstractOperatorDesc {
private String nullOrder;
private List<ExprNodeDesc> keyColumns;
private List<ExprNodeDesc> partitionKeyColumns;
+ private float efficiencyThreshold;
+ private long checkEfficiencyNumBatches;
+ private long checkEfficiencyNumRows;
+ private int maxNumberOfPartitions;
public TopNKeyDesc() {
}
public TopNKeyDesc(
- final int topN,
- final String columnSortOrder,
- final String nullOrder,
- final List<ExprNodeDesc> keyColumns,
- final List<ExprNodeDesc> partitionKeyColumns) {
+ final int topN,
+ final String columnSortOrder,
+ final String nullOrder,
+ final List<ExprNodeDesc> keyColumns,
+ final List<ExprNodeDesc> partitionKeyColumns,
+ float efficiencyThreshold,
+ long checkEfficiencyNumBatches,
+ int maxNumberOfPartitions) {
this.topN = topN;
this.keyColumns = new ArrayList<>(keyColumns.size());
+ this.efficiencyThreshold = efficiencyThreshold;
+ this.checkEfficiencyNumBatches = checkEfficiencyNumBatches;
+ this.checkEfficiencyNumRows = checkEfficiencyNumBatches *
VectorizedRowBatch.DEFAULT_SIZE;
+ this.maxNumberOfPartitions = maxNumberOfPartitions;
StringBuilder sortOrder = new StringBuilder(columnSortOrder.length());
StringBuilder nullSortOrder = new StringBuilder(nullOrder.length());
this.partitionKeyColumns = new ArrayList<>(partitionKeyColumns.size());
@@ -81,6 +93,22 @@ public class TopNKeyDesc extends AbstractOperatorDesc {
return topN;
}
+ public float getEfficiencyThreshold() {
+ return efficiencyThreshold;
+ }
+
+ public long getCheckEfficiencyNumBatches() {
+ return checkEfficiencyNumBatches;
+ }
+
+ public long getCheckEfficiencyNumRows() {
+ return checkEfficiencyNumRows;
+ }
+
+ public int getMaxNumberOfPartitions() {
+ return maxNumberOfPartitions;
+ }
+
public void setTopN(int topN) {
this.topN = topN;
}
@@ -198,7 +226,8 @@ public class TopNKeyDesc extends AbstractOperatorDesc {
public TopNKeyDesc combine(CommonKeyPrefix commonKeyPrefix) {
return new TopNKeyDesc(topN, commonKeyPrefix.getMappedOrder(),
commonKeyPrefix.getMappedNullOrder(),
commonKeyPrefix.getMappedColumns(),
- commonKeyPrefix.getMappedColumns().subList(0,
partitionKeyColumns.size()));
+ commonKeyPrefix.getMappedColumns().subList(0,
partitionKeyColumns.size()),
+ efficiencyThreshold, checkEfficiencyNumBatches,
maxNumberOfPartitions);
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java
b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java
index 95cd459..a91bc73 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java
@@ -17,21 +17,30 @@
*/
package org.apache.hadoop.hive.ql.exec;
+import static
org.apache.hadoop.hive.ql.exec.vector.VectorTopNKeyOperator.checkTopNFilterEfficiency;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Unit test of TopNKeyFilter.
*/
public class TestTopNKeyFilter {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(TestTopNKeyFilter.class.getName());
public static final Comparator<TestKeyWrapper> TEST_KEY_WRAPPER_COMPARATOR =
Comparator.comparingInt(o -> o.keyValue);
@Test
@@ -69,6 +78,51 @@ public class TestTopNKeyFilter {
assertThat(topNKeyFilter.canForward(new TestKeyWrapper(1)), is(true));
}
+ @Test
+ public void testEfficiencyWhenEverythingIsForwarded() {
+ TopNKeyFilter topNKeyFilter = new TopNKeyFilter(2,
TEST_KEY_WRAPPER_COMPARATOR);
+ assertThat(topNKeyFilter.canForward(new TestKeyWrapper(5)), is(true));
+ assertThat(topNKeyFilter.canForward(new TestKeyWrapper(4)), is(true));
+ assertThat(topNKeyFilter.canForward(new TestKeyWrapper(3)), is(true));
+ assertThat(topNKeyFilter.canForward(new TestKeyWrapper(2)), is(true));
+ assertThat(topNKeyFilter.canForward(new TestKeyWrapper(1)), is(true));
+ assertThat(topNKeyFilter.forwardingRatio(), is(1.0f));
+ }
+
+ @Test
+ public void testEfficiencyWhenOnlyOneIsForwarded() {
+ TopNKeyFilter topNKeyFilter = new TopNKeyFilter(1,
TEST_KEY_WRAPPER_COMPARATOR);
+ assertThat(topNKeyFilter.canForward(new TestKeyWrapper(1)), is(true));
+ assertThat(topNKeyFilter.canForward(new TestKeyWrapper(2)), is(false));
+ assertThat(topNKeyFilter.canForward(new TestKeyWrapper(3)), is(false));
+ assertThat(topNKeyFilter.canForward(new TestKeyWrapper(4)), is(false));
+ assertThat(topNKeyFilter.canForward(new TestKeyWrapper(5)), is(false));
+ assertThat(topNKeyFilter.forwardingRatio(), is(1/5f));
+ }
+
+ @Test
+ public void testDisabling() {
+ TopNKeyFilter efficientFilter = new TopNKeyFilter(1,
TEST_KEY_WRAPPER_COMPARATOR);
+ efficientFilter.canForward(new TestKeyWrapper(1));
+ efficientFilter.canForward(new TestKeyWrapper(2));
+ efficientFilter.canForward(new TestKeyWrapper(3));
+
+ TopNKeyFilter inefficientFilter = new TopNKeyFilter(1,
TEST_KEY_WRAPPER_COMPARATOR);
+ inefficientFilter.canForward(new TestKeyWrapper(3));
+ inefficientFilter.canForward(new TestKeyWrapper(2));
+ inefficientFilter.canForward(new TestKeyWrapper(1));
+
+ Map<KeyWrapper, TopNKeyFilter> filters = new HashMap<KeyWrapper,
TopNKeyFilter>() {{
+ put(new TestKeyWrapper(100), efficientFilter);
+ put(new TestKeyWrapper(200), inefficientFilter);
+ }};
+
+ Set<KeyWrapper> disabled = new HashSet<>();
+ checkTopNFilterEfficiency(filters, disabled, 0.6f, LOG);
+ assertThat(disabled, hasSize(1));
+ assertThat(disabled, hasItem(new TestKeyWrapper(200)));
+ }
+
/**
* Test implementation of KeyWrapper.
*/