This is an automated email from the ASF dual-hosted git repository.

jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fc351b  HIVE-22925: Implement TopNKeyFilter efficiency check (Attila 
Magyar, reviewed by Jesus Camacho Rodriguez)
1fc351b is described below

commit 1fc351b632d9cc7aa953769858c5e4992bf80b25
Author: Attila Magyar <[email protected]>
AuthorDate: Fri Feb 28 13:35:03 2020 -0800

    HIVE-22925: Implement TopNKeyFilter efficiency check (Attila Magyar, 
reviewed by Jesus Camacho Rodriguez)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  3 +
 .../apache/hadoop/hive/ql/exec/TopNKeyFilter.java  | 21 +++++-
 .../hadoop/hive/ql/exec/TopNKeyOperator.java       | 46 +++++++++++---
 .../hive/ql/exec/vector/VectorTopNKeyOperator.java | 74 ++++++++++++++++++----
 .../ql/optimizer/topnkey/TopNKeyProcessor.java     | 25 ++++++--
 .../apache/hadoop/hive/ql/parse/TezCompiler.java   |  6 +-
 .../apache/hadoop/hive/ql/plan/TopNKeyDesc.java    | 47 +++++++++++---
 .../hadoop/hive/ql/exec/TestTopNKeyFilter.java     | 56 +++++++++++++++-
 8 files changed, 237 insertions(+), 41 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1a4d71b..0eda0d2 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2416,6 +2416,9 @@ public class HiveConf extends Configuration {
     HIVE_OPTIMIZE_TOPNKEY("hive.optimize.topnkey", true, "Whether to enable 
top n key optimizer."),
     HIVE_MAX_TOPN_ALLOWED("hive.optimize.topnkey.max", 128, "Maximum topN 
value allowed by top n key optimizer.\n" +
       "If the LIMIT is greater than this value then top n key optimization 
won't be used."),
+    
HIVE_TOPN_EFFICIENCY_THRESHOLD("hive.optimize.topnkey.efficiency.threshold", 
0.6f, "Disable topN key filter if the ratio between forwarded and total rows 
reaches this limit."),
+    
HIVE_TOPN_EFFICIENCY_CHECK_BATCHES("hive.optimize.topnkey.efficiency.check.nbatches",
 8, "Check topN key filter efficiency after a specific number of batches."),
+    HIVE_TOPN_MAX_NUMBER_OF_PARTITIONS("hive.optimize.topnkey.partitions.max", 
64, "Limit the maximum number of partitions used by the top N key operator."),
 
     HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true,
         "Whether to enable shared work optimizer. The optimizer finds scan 
operator over the same table\n" +
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java
index 38d2e08..361dbc3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java
@@ -73,11 +73,30 @@ public final class TopNKeyFilter {
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder("TopNKeyFilter{");
-    sb.append("topN=").append(topN);
+    sb.append("id=").append(super.toString());
+    sb.append(", topN=").append(topN);
     sb.append(", repeated=").append(repeated);
     sb.append(", added=").append(added);
     sb.append(", total=").append(total);
+    sb.append(", forwardingRatio=").append(forwardingRatio());
     sb.append('}');
     return sb.toString();
   }
+
+  /**
+   * Ratio between the forwarded rows and the total incoming rows.
+   * The higher the number is, the less is the efficiency of the filter.
+   * 1 means all rows should be forwarded.
+   * @return
+   */
+  public float forwardingRatio() {
+    if (total == 0) {
+      return 0;
+    }
+    return ((repeated + added) / (float)total);
+  }
+
+  public long getTotal() {
+    return total;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
index dd66dfc..f09867b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
@@ -18,12 +18,15 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import static 
org.apache.hadoop.hive.ql.exec.vector.VectorTopNKeyOperator.checkTopNFilterEfficiency;
 import static org.apache.hadoop.hive.ql.plan.api.OperatorType.TOPNKEY;
 
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
@@ -47,6 +50,7 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc> 
implements Serializab
   private transient KeyWrapper keyWrapper;
 
   private transient KeyWrapperComparator keyWrapperComparator;
+  private transient Set<KeyWrapper> disabledPartitions;
 
   /** Kryo ctor. */
   public TopNKeyOperator() {
@@ -82,6 +86,7 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc> 
implements Serializab
             keyObjectInspectors, currentKeyObjectInspectors, columnSortOrder, 
nullSortOrder);
 
     this.topNKeyFilters = new HashMap<>();
+    this.disabledPartitions = new HashSet<>();
   }
 
   private KeyWrapper initObjectInspectors(Configuration hconf,
@@ -105,29 +110,54 @@ public class TopNKeyOperator extends 
Operator<TopNKeyDesc> implements Serializab
 
   @Override
   public void process(Object row, int tag) throws HiveException {
+    if (!disabledPartitions.isEmpty() && disabledPartitions.size() == 
topNKeyFilters.size()) { // all filters are disabled due to efficiency check
+      forward(row, outputObjInspector);
+      return;
+    }
+
     partitionKeyWrapper.getNewKey(row, inputObjInspectors[tag]);
     partitionKeyWrapper.setHashKey();
 
+    if (disabledPartitions.contains(partitionKeyWrapper)) { // filter for this 
partition is disabled
+      forward(row, outputObjInspector);
+      return;
+    }
+
     TopNKeyFilter topNKeyFilter = topNKeyFilters.get(partitionKeyWrapper);
-    if (topNKeyFilter == null) {
+    if (topNKeyFilter == null && topNKeyFilters.size() < 
conf.getMaxNumberOfPartitions()) {
       topNKeyFilter = new TopNKeyFilter(conf.getTopN(), keyWrapperComparator);
       topNKeyFilters.put(partitionKeyWrapper.copyKey(), topNKeyFilter);
     }
-
-    keyWrapper.getNewKey(row, inputObjInspectors[tag]);
-    keyWrapper.setHashKey();
-
-    if (topNKeyFilter.canForward(keyWrapper)) {
+    if (topNKeyFilter == null) {
       forward(row, outputObjInspector);
+    } else {
+      keyWrapper.getNewKey(row, inputObjInspectors[tag]);
+      keyWrapper.setHashKey();
+      if (topNKeyFilter.canForward(keyWrapper)) {
+        forward(row, outputObjInspector);
+      }
+    }
+
+    if (runTimeNumRows % conf.getCheckEfficiencyNumRows() == 0) { // check the 
efficiency at every nth rows
+      checkTopNFilterEfficiency(topNKeyFilters, disabledPartitions, 
conf.getEfficiencyThreshold(), LOG);
     }
   }
 
   @Override
   protected final void closeOp(boolean abort) throws HiveException {
-    for (TopNKeyFilter each : topNKeyFilters.values()) {
-      each.clear();
+    if (topNKeyFilters.size() == 1) {
+      TopNKeyFilter filter = topNKeyFilters.values().iterator().next();
+      LOG.info("Closing TopNKeyFilter: {}", filter);
+      filter.clear();
+    } else {
+      LOG.info("Closing {} TopNKeyFilters", topNKeyFilters.size());
+      for (TopNKeyFilter each : topNKeyFilters.values()) {
+        LOG.debug("Closing TopNKeyFilter: {}", each);
+        each.clear();
+      }
     }
     topNKeyFilters.clear();
+    disabledPartitions.clear();
     super.closeOp(abort);
   }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
index 7feadd3..0f8eb17 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
@@ -19,13 +19,14 @@ package org.apache.hadoop.hive.ql.exec.vector;
 
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.KeyWrapper;
-import org.apache.hadoop.hive.ql.exec.KeyWrapperComparator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TopNKeyFilter;
 import org.apache.hadoop.hive.ql.exec.TopNKeyOperator;
@@ -38,6 +39,9 @@ import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
 import org.apache.hadoop.hive.ql.plan.VectorDesc;
 import org.apache.hadoop.hive.ql.plan.VectorTopNKeyDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.slf4j.Logger;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * VectorTopNKeyOperator passes rows that contains top N keys only.
@@ -54,8 +58,9 @@ public class VectorTopNKeyOperator extends 
Operator<TopNKeyDesc> implements Vect
   private transient VectorHashKeyWrapperBatch partitionKeyWrapperBatch;
   private transient VectorHashKeyWrapperBatch keyWrappersBatch;
   private transient Map<KeyWrapper, TopNKeyFilter> topNKeyFilters;
+  private transient Set<KeyWrapper> disabledPartitions;
   private transient Comparator<VectorHashKeyWrapperBase> keyWrapperComparator;
-
+  private transient long incomingBatches;
 
   public VectorTopNKeyOperator(CompilationOpContext ctx, OperatorDesc conf,
       VectorizationContext vContext, VectorDesc vectorDesc) {
@@ -92,6 +97,8 @@ public class VectorTopNKeyOperator extends 
Operator<TopNKeyDesc> implements Vect
     keyWrapperComparator = 
keyWrappersBatch.getComparator(conf.getColumnSortOrder(), conf.getNullOrder());
     partitionKeyWrapperBatch = 
VectorHashKeyWrapperBatch.compileKeyWrapperBatch(partitionKeyExpressions);
     topNKeyFilters = new HashMap<>();
+    disabledPartitions = new HashSet<>();
+    incomingBatches = 0;
   }
 
   private void initKeyExpressions(Configuration hconf, VectorExpression[] 
keyExpressions) throws HiveException {
@@ -104,7 +111,11 @@ public class VectorTopNKeyOperator extends 
Operator<TopNKeyDesc> implements Vect
   @Override
   public void process(Object data, int tag) throws HiveException {
     VectorizedRowBatch batch = (VectorizedRowBatch) data;
-
+    if (!disabledPartitions.isEmpty() && disabledPartitions.size() == 
topNKeyFilters.size()) { // all filters are disabled due to efficiency check
+      vectorForward(batch);
+      return;
+    }
+    incomingBatches++;
     // The selected vector represents selected rows.
     // Clone the selected vector
     System.arraycopy(batch.selected, 0, temporarySelected, 0, batch.size);
@@ -134,15 +145,18 @@ public class VectorTopNKeyOperator extends 
Operator<TopNKeyDesc> implements Vect
         j = i;
       }
 
-      // Select a row in the priority queue
-      TopNKeyFilter topNKeyFilter = 
topNKeyFilters.get(partitionKeyWrappers[i]);
-      if (topNKeyFilter == null) {
-        topNKeyFilter = new TopNKeyFilter(conf.getTopN(), 
keyWrapperComparator);
-        topNKeyFilters.put(partitionKeyWrappers[i].copyKey(), topNKeyFilter);
-      }
-
-      if (topNKeyFilter.canForward(keyWrappers[i])) {
+      VectorHashKeyWrapperBase partitionKey = partitionKeyWrappers[i];
+      if (disabledPartitions.contains(partitionKey)) { // filter for this 
partition is disabled
         selected[size++] = j;
+      } else {
+        TopNKeyFilter topNKeyFilter = topNKeyFilters.get(partitionKey);
+        if (topNKeyFilter == null && topNKeyFilters.size() < 
conf.getMaxNumberOfPartitions()) {
+          topNKeyFilter = new TopNKeyFilter(conf.getTopN(), 
keyWrapperComparator);
+          topNKeyFilters.put(partitionKey.copyKey(), topNKeyFilter);
+        }
+        if (topNKeyFilter == null || topNKeyFilter.canForward(keyWrappers[i])) 
{
+          selected[size++] = j;
+        }
       }
     }
 
@@ -162,6 +176,28 @@ public class VectorTopNKeyOperator extends 
Operator<TopNKeyDesc> implements Vect
     batch.selected = selectedBackup;
     batch.size = sizeBackup;
     batch.selectedInUse = selectedInUseBackup;
+
+    if (incomingBatches % conf.getCheckEfficiencyNumBatches() == 0) {
+      checkTopNFilterEfficiency(topNKeyFilters, disabledPartitions, 
conf.getEfficiencyThreshold(), LOG);
+    }
+  }
+
+  public static void checkTopNFilterEfficiency(Map<KeyWrapper, TopNKeyFilter> 
filters,
+                                                Set<KeyWrapper> 
disabledPartitions,
+                                                float efficiencyThreshold,
+                                                Logger log)
+  {
+    Iterator<Map.Entry<KeyWrapper, TopNKeyFilter>> iterator = 
filters.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<KeyWrapper, TopNKeyFilter> each = iterator.next();
+      KeyWrapper partitionKey = each.getKey();
+      TopNKeyFilter filter = each.getValue();
+      log.debug("Checking TopN Filter efficiency {}, threshold: {}", filter, 
efficiencyThreshold);
+      if (filter.forwardingRatio() >= efficiencyThreshold) {
+        log.info("Disabling TopN Filter {}", filter);
+        disabledPartitions.add(partitionKey);
+      }
+    }
   }
 
   @Override
@@ -195,9 +231,19 @@ public class VectorTopNKeyOperator extends 
Operator<TopNKeyDesc> implements Vect
   @Override
   protected void closeOp(boolean abort) throws HiveException {
 //    LOG.info("Closing TopNKeyFilter: {}.", topNKeyFilter);
-    for (TopNKeyFilter topNKeyFilter : topNKeyFilters.values()) {
-      topNKeyFilter.clear();
+    if (topNKeyFilters.size() == 1) {
+      TopNKeyFilter filter = topNKeyFilters.values().iterator().next();
+      LOG.info("Closing TopNKeyFilter: {}", filter);
+      filter.clear();
+    } else {
+      LOG.info("Closing {} TopNKeyFilters", topNKeyFilters.size());
+      for (TopNKeyFilter each : topNKeyFilters.values()) {
+        LOG.debug("Closing TopNKeyFilter: {}", each);
+        each.clear();
+      }
     }
+    topNKeyFilters.clear();
+    disabledPartitions.clear();
     super.closeOp(abort);
   }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java
index 3869ffa..bf5083f 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hive.ql.optimizer.topnkey;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Stack;
+
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
@@ -33,20 +37,25 @@ import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Stack;
-
 /**
  * TopNKeyProcessor is a processor for TopNKeyOperator.
  * A TopNKeyOperator will be placed before any ReduceSinkOperator which has a 
topN property >= 0.
  */
 public class TopNKeyProcessor implements SemanticNodeProcessor {
   private static final Logger LOG = 
LoggerFactory.getLogger(TopNKeyProcessor.class);
-  private final int maxTopNAllowed;
+  private float efficiencyThreshold;
+  private long checkEfficiencyNumBatches;
+  private int maxTopNAllowed;
+  private int maxNumberOfPartitions;
 
-  public TopNKeyProcessor(int maxTopNAllowed) {
+  public TopNKeyProcessor() {
+  }
+
+  public TopNKeyProcessor(int maxTopNAllowed, float efficiencyThreshold, long 
checkEfficiencyNumBatches, int maxNumberOfPartitions) {
     this.maxTopNAllowed = maxTopNAllowed;
+    this.efficiencyThreshold = efficiencyThreshold;
+    this.checkEfficiencyNumBatches = checkEfficiencyNumBatches;
+    this.maxNumberOfPartitions = maxNumberOfPartitions;
   }
 
   @Override
@@ -84,7 +93,9 @@ public class TopNKeyProcessor implements 
SemanticNodeProcessor {
     }
 
     TopNKeyDesc topNKeyDesc = new TopNKeyDesc(reduceSinkDesc.getTopN(), 
reduceSinkDesc.getOrder(),
-            reduceSinkDesc.getNullOrder(), reduceSinkDesc.getKeyCols(), 
partitionCols);
+    reduceSinkDesc.getNullOrder(), reduceSinkDesc.getKeyCols(), partitionCols,
+      efficiencyThreshold, checkEfficiencyNumBatches, maxNumberOfPartitions);
+
 
     copyDown(reduceSinkOperator, topNKeyDesc);
     return null;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index 31735c9..caab056 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -1289,7 +1289,11 @@ public class TezCompiler extends TaskCompiler {
     Map<SemanticRule, SemanticNodeProcessor> opRules = new 
LinkedHashMap<SemanticRule, SemanticNodeProcessor>();
     opRules.put(
         new RuleRegExp("Top n key optimization", 
ReduceSinkOperator.getOperatorName() + "%"),
-        new TopNKeyProcessor(HiveConf.getIntVar(procCtx.conf, 
HiveConf.ConfVars.HIVE_MAX_TOPN_ALLOWED)));
+        new TopNKeyProcessor(
+          HiveConf.getIntVar(procCtx.conf, 
HiveConf.ConfVars.HIVE_MAX_TOPN_ALLOWED),
+          HiveConf.getFloatVar(procCtx.conf, 
ConfVars.HIVE_TOPN_EFFICIENCY_THRESHOLD),
+          HiveConf.getIntVar(procCtx.conf, 
ConfVars.HIVE_TOPN_EFFICIENCY_CHECK_BATCHES),
+          HiveConf.getIntVar(procCtx.conf, 
ConfVars.HIVE_TOPN_MAX_NUMBER_OF_PARTITIONS)));
     opRules.put(
             new RuleRegExp("Top n key pushdown", 
TopNKeyOperator.getOperatorName() + "%"),
             new TopNKeyPushdownProcessor());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java
index 19910a3..ddd657e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java
@@ -17,13 +17,14 @@
  */
 package org.apache.hadoop.hive.ql.plan;
 
-import org.apache.hadoop.hive.ql.optimizer.topnkey.CommonKeyPrefix;
-import org.apache.hadoop.hive.ql.plan.Explain.Level;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.optimizer.topnkey.CommonKeyPrefix;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
 /**
  * TopNKeyDesc.
  *
@@ -38,19 +39,30 @@ public class TopNKeyDesc extends AbstractOperatorDesc {
   private String nullOrder;
   private List<ExprNodeDesc> keyColumns;
   private List<ExprNodeDesc> partitionKeyColumns;
+  private float efficiencyThreshold;
+  private long checkEfficiencyNumBatches;
+  private long checkEfficiencyNumRows;
+  private int maxNumberOfPartitions;
 
   public TopNKeyDesc() {
   }
 
   public TopNKeyDesc(
-      final int topN,
-      final String columnSortOrder,
-      final String nullOrder,
-      final List<ExprNodeDesc> keyColumns,
-      final List<ExprNodeDesc> partitionKeyColumns) {
+    final int topN,
+    final String columnSortOrder,
+    final String nullOrder,
+    final List<ExprNodeDesc> keyColumns,
+    final List<ExprNodeDesc> partitionKeyColumns,
+    float efficiencyThreshold,
+    long checkEfficiencyNumBatches,
+    int maxNumberOfPartitions) {
 
     this.topN = topN;
     this.keyColumns = new ArrayList<>(keyColumns.size());
+    this.efficiencyThreshold = efficiencyThreshold;
+    this.checkEfficiencyNumBatches = checkEfficiencyNumBatches;
+    this.checkEfficiencyNumRows = checkEfficiencyNumBatches * 
VectorizedRowBatch.DEFAULT_SIZE;
+    this.maxNumberOfPartitions = maxNumberOfPartitions;
     StringBuilder sortOrder = new StringBuilder(columnSortOrder.length());
     StringBuilder nullSortOrder = new StringBuilder(nullOrder.length());
     this.partitionKeyColumns = new ArrayList<>(partitionKeyColumns.size());
@@ -81,6 +93,22 @@ public class TopNKeyDesc extends AbstractOperatorDesc {
     return topN;
   }
 
+  public float getEfficiencyThreshold() {
+    return efficiencyThreshold;
+  }
+
+  public long getCheckEfficiencyNumBatches() {
+    return checkEfficiencyNumBatches;
+  }
+
+  public long getCheckEfficiencyNumRows() {
+    return checkEfficiencyNumRows;
+  }
+
+  public int getMaxNumberOfPartitions() {
+    return maxNumberOfPartitions;
+  }
+
   public void setTopN(int topN) {
     this.topN = topN;
   }
@@ -198,7 +226,8 @@ public class TopNKeyDesc extends AbstractOperatorDesc {
   public TopNKeyDesc combine(CommonKeyPrefix commonKeyPrefix) {
     return new TopNKeyDesc(topN, commonKeyPrefix.getMappedOrder(),
             commonKeyPrefix.getMappedNullOrder(), 
commonKeyPrefix.getMappedColumns(),
-            commonKeyPrefix.getMappedColumns().subList(0, 
partitionKeyColumns.size()));
+            commonKeyPrefix.getMappedColumns().subList(0, 
partitionKeyColumns.size()),
+            efficiencyThreshold, checkEfficiencyNumBatches, 
maxNumberOfPartitions);
   }
 
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java
index 95cd459..a91bc73 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestTopNKeyFilter.java
@@ -17,21 +17,30 @@
  */
 package org.apache.hadoop.hive.ql.exec;
 
+import static 
org.apache.hadoop.hive.ql.exec.vector.VectorTopNKeyOperator.checkTopNFilterEfficiency;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Unit test of TopNKeyFilter.
  */
 public class TestTopNKeyFilter {
-
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestTopNKeyFilter.class.getName());
   public static final Comparator<TestKeyWrapper> TEST_KEY_WRAPPER_COMPARATOR = 
Comparator.comparingInt(o -> o.keyValue);
 
   @Test
@@ -69,6 +78,51 @@ public class TestTopNKeyFilter {
     assertThat(topNKeyFilter.canForward(new TestKeyWrapper(1)), is(true));
   }
 
+  @Test
+  public void testEfficiencyWhenEverythingIsForwarded() {
+    TopNKeyFilter topNKeyFilter = new TopNKeyFilter(2, 
TEST_KEY_WRAPPER_COMPARATOR);
+    assertThat(topNKeyFilter.canForward(new TestKeyWrapper(5)), is(true));
+    assertThat(topNKeyFilter.canForward(new TestKeyWrapper(4)), is(true));
+    assertThat(topNKeyFilter.canForward(new TestKeyWrapper(3)), is(true));
+    assertThat(topNKeyFilter.canForward(new TestKeyWrapper(2)), is(true));
+    assertThat(topNKeyFilter.canForward(new TestKeyWrapper(1)), is(true));
+    assertThat(topNKeyFilter.forwardingRatio(), is(1.0f));
+  }
+
+  @Test
+  public void testEfficiencyWhenOnlyOneIsForwarded() {
+    TopNKeyFilter topNKeyFilter = new TopNKeyFilter(1, 
TEST_KEY_WRAPPER_COMPARATOR);
+    assertThat(topNKeyFilter.canForward(new TestKeyWrapper(1)), is(true));
+    assertThat(topNKeyFilter.canForward(new TestKeyWrapper(2)), is(false));
+    assertThat(topNKeyFilter.canForward(new TestKeyWrapper(3)), is(false));
+    assertThat(topNKeyFilter.canForward(new TestKeyWrapper(4)), is(false));
+    assertThat(topNKeyFilter.canForward(new TestKeyWrapper(5)), is(false));
+    assertThat(topNKeyFilter.forwardingRatio(), is(1/5f));
+  }
+
+  @Test
+  public void testDisabling() {
+    TopNKeyFilter efficientFilter = new TopNKeyFilter(1, 
TEST_KEY_WRAPPER_COMPARATOR);
+    efficientFilter.canForward(new TestKeyWrapper(1));
+    efficientFilter.canForward(new TestKeyWrapper(2));
+    efficientFilter.canForward(new TestKeyWrapper(3));
+
+    TopNKeyFilter inefficientFilter = new TopNKeyFilter(1, 
TEST_KEY_WRAPPER_COMPARATOR);
+    inefficientFilter.canForward(new TestKeyWrapper(3));
+    inefficientFilter.canForward(new TestKeyWrapper(2));
+    inefficientFilter.canForward(new TestKeyWrapper(1));
+
+    Map<KeyWrapper, TopNKeyFilter> filters = new HashMap<KeyWrapper, 
TopNKeyFilter>() {{
+      put(new TestKeyWrapper(100), efficientFilter);
+      put(new TestKeyWrapper(200), inefficientFilter);
+    }};
+
+    Set<KeyWrapper> disabled = new HashSet<>();
+    checkTopNFilterEfficiency(filters, disabled, 0.6f, LOG);
+    assertThat(disabled, hasSize(1));
+    assertThat(disabled, hasItem(new TestKeyWrapper(200)));
+  }
+
   /**
    * Test implementation of KeyWrapper.
    */

Reply via email to