HBASE-18160 Fix incorrect logic in FilterList.filterKeyValue

Signed-off-by: zhangduo <zhang...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/24a7ce84
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/24a7ce84
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/24a7ce84

Branch: refs/heads/HBASE-18410
Commit: 24a7ce849f0e951ba0a84337681033c541d46276
Parents: 5c9523b
Author: huzheng <open...@gmail.com>
Authored: Thu Jun 8 15:58:42 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Tue Oct 24 11:37:45 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/filter/FilterList.java  | 541 ++++++++++++-------
 .../hadoop/hbase/filter/TestFilterList.java     | 148 ++++-
 2 files changed, 471 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/24a7ce84/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java
index 3ff978d..3147ab0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java
@@ -90,62 +90,53 @@ final public class FilterList extends FilterBase {
   private Cell transformedCell = null;
 
   /**
-   * Constructor that takes a set of {@link Filter}s. The default operator
-   * MUST_PASS_ALL is assumed.
+   * Constructor that takes a set of {@link Filter}s and an operator.
+   * @param operator Operator to process filter set with.
+   * @param rowFilters Set of row filters.
+   */
+  public FilterList(final Operator operator, final List<Filter> rowFilters) {
+    reversed = checkAndGetReversed(rowFilters, reversed);
+    this.filters = new ArrayList<>(rowFilters);
+    this.operator = operator;
+    initPrevListForMustPassOne(rowFilters.size());
+  }
+
+  /**
+   * Constructor that takes a set of {@link Filter}s. The default operator 
MUST_PASS_ALL is assumed.
    * All filters are cloned to internal list.
    * @param rowFilters list of filters
    */
   public FilterList(final List<Filter> rowFilters) {
-    reversed = getReversed(rowFilters, reversed);
-    this.filters = new ArrayList<>(rowFilters);
-    initPrevListForMustPassOne(rowFilters.size());
+    this(Operator.MUST_PASS_ALL, rowFilters);
   }
 
   /**
-   * Constructor that takes a var arg number of {@link Filter}s. The fefault 
operator
-   * MUST_PASS_ALL is assumed.
+   * Constructor that takes a var arg number of {@link Filter}s. The default 
operator MUST_PASS_ALL
+   * is assumed.
    * @param rowFilters
    */
   public FilterList(final Filter... rowFilters) {
-    this(Arrays.asList(rowFilters));
+    this(Operator.MUST_PASS_ALL, Arrays.asList(rowFilters));
   }
 
   /**
    * Constructor that takes an operator.
-   *
    * @param operator Operator to process filter set with.
    */
   public FilterList(final Operator operator) {
-    this.operator = operator;
-    this.filters = new ArrayList<>();
-    initPrevListForMustPassOne(filters.size());
-  }
-
-  /**
-   * Constructor that takes a set of {@link Filter}s and an operator.
-   *
-   * @param operator Operator to process filter set with.
-   * @param rowFilters Set of row filters.
-   */
-  public FilterList(final Operator operator, final List<Filter> rowFilters) {
-    this(rowFilters);
-    this.operator = operator;
-    initPrevListForMustPassOne(rowFilters.size());
+    this(operator, new ArrayList<>());
   }
 
   /**
    * Constructor that takes a var arg number of {@link Filter}s and an 
operator.
-   *
    * @param operator Operator to process filter set with.
    * @param rowFilters Filters to use
    */
   public FilterList(final Operator operator, final Filter... rowFilters) {
-    this(rowFilters);
-    this.operator = operator;
-    initPrevListForMustPassOne(rowFilters.length);
+    this(operator, Arrays.asList(rowFilters));
   }
 
-  public void initPrevListForMustPassOne(int size) {
+  private void initPrevListForMustPassOne(int size) {
     if (operator == Operator.MUST_PASS_ONE) {
       if (this.prevFilterRCList == null) {
         prevFilterRCList = new ArrayList<>(Collections.nCopies(size, null));
@@ -156,10 +147,8 @@ final public class FilterList extends FilterBase {
     }
   }
 
-
   /**
    * Get the operator.
-   *
    * @return operator
    */
   public Operator getOperator() {
@@ -168,7 +157,6 @@ final public class FilterList extends FilterBase {
 
   /**
    * Get the filters.
-   *
    * @return filters
    */
   public List<Filter> getFilters() {
@@ -183,33 +171,22 @@ final public class FilterList extends FilterBase {
     return filters.isEmpty();
   }
 
-  private static boolean getReversed(List<Filter> rowFilters, boolean 
defaultValue) {
-    boolean rval = defaultValue;
-    boolean isFirst = true;
-    for (Filter f : rowFilters) {
-      if (isFirst) {
-        rval = f.isReversed();
-        isFirst = false;
-        continue;
-      }
-      if (rval != f.isReversed()) {
-        throw new IllegalArgumentException("Filters in the list must have the 
same reversed flag");
-      }
+  private static boolean checkAndGetReversed(List<Filter> rowFilters, boolean 
defaultValue) {
+    if (rowFilters.isEmpty()) {
+      return defaultValue;
     }
-    return rval;
-  }
-  private static void checkReversed(List<Filter> rowFilters, boolean expected) 
{
-    for (Filter filter : rowFilters) {
-      if (expected != filter.isReversed()) {
-        throw new IllegalArgumentException(
-            "Filters in the list must have the same reversed flag, expected="
-                + expected);
-      }
+    Boolean retValue = rowFilters.get(0).isReversed();
+    boolean allEqual = 
rowFilters.stream().map(Filter::isReversed).allMatch(retValue::equals);
+    if (!allEqual) {
+      throw new IllegalArgumentException("Filters in the list must have the 
same reversed flag");
     }
+    return retValue;
   }
 
   public void addFilter(List<Filter> filters) {
-    checkReversed(filters, isReversed());
+    if (checkAndGetReversed(filters, isReversed()) != isReversed()) {
+      throw new IllegalArgumentException("Filters in the list must have the 
same reversed flag");
+    }
     this.filters.addAll(filters);
     if (operator == Operator.MUST_PASS_ONE) {
       this.prevFilterRCList.addAll(Collections.nCopies(filters.size(), null));
@@ -219,7 +196,6 @@ final public class FilterList extends FilterBase {
 
   /**
    * Add a filter.
-   *
    * @param filter another filter
    */
   public void addFilter(Filter filter) {
@@ -228,8 +204,7 @@ final public class FilterList extends FilterBase {
 
   @Override
   public void reset() throws IOException {
-    int listize = filters.size();
-    for (int i = 0; i < listize; i++) {
+    for (int i = 0, n = filters.size(); i < n; i++) {
       filters.get(i).reset();
       if (operator == Operator.MUST_PASS_ONE) {
         prevFilterRCList.set(i, null);
@@ -245,18 +220,15 @@ final public class FilterList extends FilterBase {
       return super.filterRowKey(rowKey, offset, length);
     }
     boolean flag = this.operator == Operator.MUST_PASS_ONE;
-    int listize = filters.size();
-    for (int i = 0; i < listize; i++) {
+    for (int i = 0, n = filters.size(); i < n; i++) {
       Filter filter = filters.get(i);
       if (this.operator == Operator.MUST_PASS_ALL) {
-        if (filter.filterAllRemaining() ||
-            filter.filterRowKey(rowKey, offset, length)) {
-          flag =  true;
+        if (filter.filterAllRemaining() || filter.filterRowKey(rowKey, offset, 
length)) {
+          flag = true;
         }
       } else if (this.operator == Operator.MUST_PASS_ONE) {
-        if (!filter.filterAllRemaining() &&
-            !filter.filterRowKey(rowKey, offset, length)) {
-          flag =  false;
+        if (!filter.filterAllRemaining() && !filter.filterRowKey(rowKey, 
offset, length)) {
+          flag = false;
         }
       }
     }
@@ -269,8 +241,7 @@ final public class FilterList extends FilterBase {
       return super.filterRowKey(firstRowCell);
     }
     boolean flag = this.operator == Operator.MUST_PASS_ONE;
-    int listize = filters.size();
-    for (int i = 0; i < listize; i++) {
+    for (int i = 0, n = filters.size(); i < n; i++) {
       Filter filter = filters.get(i);
       if (this.operator == Operator.MUST_PASS_ALL) {
         if (filter.filterAllRemaining() || filter.filterRowKey(firstRowCell)) {
@@ -290,8 +261,7 @@ final public class FilterList extends FilterBase {
     if (isEmpty()) {
       return super.filterAllRemaining();
     }
-    int listize = filters.size();
-    for (int i = 0; i < listize; i++) {
+    for (int i = 0, n = filters.size(); i < n; i++) {
       if (filters.get(i).filterAllRemaining()) {
         if (operator == Operator.MUST_PASS_ALL) {
           return true;
@@ -311,10 +281,15 @@ final public class FilterList extends FilterBase {
       return super.transformCell(c);
     }
     if (!CellUtil.equals(c, referenceCell)) {
-      throw new IllegalStateException("Reference Cell: " + this.referenceCell 
+ " does not match: "
-          + c);
+      throw new IllegalStateException(
+          "Reference Cell: " + this.referenceCell + " does not match: " + c);
     }
-    return this.transformedCell;
+    // Copy transformedCell into a new cell and reset transformedCell & 
referenceCell to null for
+    // Java GC optimization
+    Cell cell = KeyValueUtil.copyToNewKeyValue(this.transformedCell);
+    this.transformedCell = null;
+    this.referenceCell = null;
+    return cell;
   }
 
   /**
@@ -352,116 +327,315 @@ final public class FilterList extends FilterBase {
     }
   }
 
-  @Override
-  
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
-    justification="Intentional")
-  public ReturnCode filterKeyValue(Cell c) throws IOException {
-    if (isEmpty()) {
-      return ReturnCode.INCLUDE;
+  /**
+   * FilterList with MUST_PASS_ALL choose the maximal forward step among 
sub-filters in filter list.
+   * Let's call it: The Maximal Step Rule. So if filter-A in filter list 
return INCLUDE and filter-B
+   * in filter list return INCLUDE_AND_NEXT_COL, then the filter list should 
return
+   * INCLUDE_AND_NEXT_COL. For SEEK_NEXT_USING_HINT, it's more special, and in 
method
+   * filterKeyValueWithMustPassAll(), if any sub-filter return 
SEEK_NEXT_USING_HINT, then our filter
+   * list will return SEEK_NEXT_USING_HINT. so we don't care about the 
SEEK_NEXT_USING_HINT here. <br/>
+   * <br/>
+   * The jump step will be:
+   *
+   * <pre>
+   * INCLUDE &lt; SKIP &lt; INCLUDE_AND_NEXT_COL &lt; NEXT_COL &lt; 
INCLUDE_AND_SEEK_NEXT_ROW &lt; NEXT_ROW &lt; SEEK_NEXT_USING_HINT
+   * </pre>
+   *
+   * Here, we have the following map to describe The Maximal Step Rule. if 
current return code (for
+   * previous sub-filters in filter list) is <strong>ReturnCode</strong>, and 
current filter returns
+   * <strong>localRC</strong>, then we should return map[ReturnCode][localRC] 
for the merged result,
+   * according to The Maximal Step Rule. <br/>
+   *
+   * <pre>
+   * LocalCode\ReturnCode       INCLUDE                    
INCLUDE_AND_NEXT_COL      INCLUDE_AND_SEEK_NEXT_ROW  SKIP                  
NEXT_COL              NEXT_ROW              SEEK_NEXT_USING_HINT
+   * INCLUDE                    INCLUDE                    
INCLUDE_AND_NEXT_COL      INCLUDE_AND_SEEK_NEXT_ROW  SKIP                  
NEXT_COL              NEXT_ROW              SEEK_NEXT_USING_HINT
+   * INCLUDE_AND_NEXT_COL       INCLUDE_AND_NEXT_COL       
INCLUDE_AND_NEXT_COL      INCLUDE_AND_SEEK_NEXT_ROW  NEXT_COL              
NEXT_COL              NEXT_ROW              SEEK_NEXT_USING_HINT
+   * INCLUDE_AND_SEEK_NEXT_ROW  INCLUDE_AND_SEEK_NEXT_ROW  
INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW  NEXT_ROW              
NEXT_ROW              NEXT_ROW              SEEK_NEXT_USING_HINT
+   * SKIP                       SKIP                       NEXT_COL            
      NEXT_ROW                   SKIP                  NEXT_COL              
NEXT_ROW              SEEK_NEXT_USING_HINT
+   * NEXT_COL                   NEXT_COL                   NEXT_COL            
      NEXT_ROW                   NEXT_COL              NEXT_COL              
NEXT_ROW              SEEK_NEXT_USING_HINT
+   * NEXT_ROW                   NEXT_ROW                   NEXT_ROW            
      NEXT_ROW                   NEXT_ROW              NEXT_ROW              
NEXT_ROW              SEEK_NEXT_USING_HINT
+   * SEEK_NEXT_USING_HINT       SEEK_NEXT_USING_HINT       
SEEK_NEXT_USING_HINT      SEEK_NEXT_USING_HINT       SEEK_NEXT_USING_HINT  
SEEK_NEXT_USING_HINT  SEEK_NEXT_USING_HINT  SEEK_NEXT_USING_HINT
+   * </pre>
+   * @param rc Return code which is calculated by previous sub-filter(s) in 
filter list.
+   * @param localRC Return code of the current sub-filter in filter list.
+   * @return Return code which is merged by the return code of previous 
sub-filter(s) and the return
+   *         code of current sub-filter.
+   */
+  private ReturnCode mergeReturnCodeForAndOperator(ReturnCode rc, ReturnCode 
localRC) {
+    if (rc == ReturnCode.SEEK_NEXT_USING_HINT || localRC == 
ReturnCode.SEEK_NEXT_USING_HINT) {
+      return ReturnCode.SEEK_NEXT_USING_HINT;
     }
-    this.referenceCell = c;
-    seekHintFilters.clear();
+    switch (localRC) {
+    case INCLUDE:
+      return rc;
+    case INCLUDE_AND_NEXT_COL:
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE, 
ReturnCode.INCLUDE_AND_NEXT_COL)) {
+        return ReturnCode.INCLUDE_AND_NEXT_COL;
+      }
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW)) {
+        return ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW;
+      }
+      if (isInReturnCodes(rc, ReturnCode.SKIP, ReturnCode.NEXT_COL)) {
+        return ReturnCode.NEXT_COL;
+      }
+      if (isInReturnCodes(rc, ReturnCode.NEXT_ROW)) {
+        return ReturnCode.NEXT_ROW;
+      }
+      break;
+    case INCLUDE_AND_SEEK_NEXT_ROW:
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE, 
ReturnCode.INCLUDE_AND_NEXT_COL,
+        ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW)) {
+        return ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW;
+      }
+      if (isInReturnCodes(rc, ReturnCode.SKIP, ReturnCode.NEXT_COL, 
ReturnCode.NEXT_ROW)) {
+        return ReturnCode.NEXT_ROW;
+      }
+      break;
+    case SKIP:
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE, ReturnCode.SKIP)) {
+        return ReturnCode.SKIP;
+      }
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE_AND_NEXT_COL, 
ReturnCode.NEXT_COL)) {
+        return ReturnCode.NEXT_COL;
+      }
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW, 
ReturnCode.NEXT_ROW)) {
+        return ReturnCode.NEXT_ROW;
+      }
+      break;
+    case NEXT_COL:
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE, 
ReturnCode.INCLUDE_AND_NEXT_COL, ReturnCode.SKIP,
+        ReturnCode.NEXT_COL)) {
+        return ReturnCode.NEXT_COL;
+      }
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW, 
ReturnCode.NEXT_ROW)) {
+        return ReturnCode.NEXT_ROW;
+      }
+      break;
+    case NEXT_ROW:
+      return ReturnCode.NEXT_ROW;
+    }
+    throw new IllegalStateException("Received code is not valid. rc: " + rc + 
", localRC: "
+        + localRC);
+  }
 
-    // Accumulates successive transformation of every filter that includes the 
Cell:
+  private ReturnCode filterKeyValueWithMustPassAll(Cell c) throws IOException {
+    ReturnCode rc = ReturnCode.INCLUDE;
     Cell transformed = c;
+    this.seekHintFilters.clear();
+    for (int i = 0, n = filters.size(); i < n; i++) {
+      Filter filter = filters.get(i);
+      if (filter.filterAllRemaining()) {
+        return ReturnCode.NEXT_ROW;
+      }
+      ReturnCode localRC = filter.filterKeyValue(c);
+      rc = mergeReturnCodeForAndOperator(rc, localRC);
 
-    ReturnCode rc = operator == Operator.MUST_PASS_ONE?
-        ReturnCode.SKIP: ReturnCode.INCLUDE;
-    int listize = filters.size();
-    /*
-     * When all filters in a MUST_PASS_ONE FilterList return a 
SEEK_USING_NEXT_HINT code,
-     * we should return SEEK_NEXT_USING_HINT from the FilterList to utilize 
the lowest seek value.
-     * 
-     * The following variable tracks whether any of the Filters returns 
ReturnCode other than
-     * SEEK_NEXT_USING_HINT for MUST_PASS_ONE FilterList, in which case the 
optimization would
-     * be skipped.
-     */
-    boolean seenNonHintReturnCode = false;
-    for (int i = 0; i < listize; i++) {
+      // For INCLUDE* case, we need to update the transformed cell.
+      if (isInReturnCodes(localRC, ReturnCode.INCLUDE, 
ReturnCode.INCLUDE_AND_NEXT_COL,
+        ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW)) {
+        transformed = filter.transformCell(transformed);
+      }
+      if (localRC == ReturnCode.SEEK_NEXT_USING_HINT) {
+        seekHintFilters.add(filter);
+      }
+    }
+    this.transformedCell = transformed;
+    if (!seekHintFilters.isEmpty()) {
+      return ReturnCode.SEEK_NEXT_USING_HINT;
+    }
+    return rc;
+  }
+
+  private void updatePrevFilterRCList(int index, ReturnCode currentRC) {
+    prevFilterRCList.set(index, currentRC);
+  }
+
+  private void updatePrevCellList(int index, Cell currentCell, ReturnCode 
currentRC) {
+    if (currentCell == null || currentRC == ReturnCode.INCLUDE || currentRC == 
ReturnCode.SKIP) {
+      // If previous return code is INCLUDE or SKIP, we should always pass the 
next cell to the
+      // corresponding sub-filter(need not test 
shouldPassCurrentCellToFilter() method), So we
+      // need not save current cell to prevCellList for saving heap memory.
+      prevCellList.set(index, null);
+    } else {
+      prevCellList.set(index, KeyValueUtil.toNewKeyCell(currentCell));
+    }
+  }
+
+  private static boolean isInReturnCodes(ReturnCode testRC, ReturnCode... 
returnCodes) {
+    return Arrays.stream(returnCodes).anyMatch(testRC::equals);
+  }
+
+  /**
+   * FilterList with MUST_PASS_ONE choose the minimal forward step among 
sub-filter in filter list.
+   * Let's call it: The Minimal Step Rule. So if filter-A in filter list 
return INCLUDE and filter-B
+   * in filter list return INCLUDE_AND_NEXT_COL, then the filter list should 
return INCLUDE. For
+   * SEEK_NEXT_USING_HINT, it's more special, because we do not know how far 
it will forward, so we
+   * use SKIP by default.<br/>
+   * <br/>
+   * The jump step will be:
+   *
+   * <pre>
+   * INCLUDE &lt; SKIP &lt; INCLUDE_AND_NEXT_COL &lt; NEXT_COL &lt; 
INCLUDE_AND_SEEK_NEXT_ROW &lt; NEXT_ROW &lt; SEEK_NEXT_USING_HINT
+   * </pre>
+   *
+   * Here, we have the following map to describe The Minimal Step Rule. if 
current return code (for
+   * previous sub-filters in filter list) is <strong>ReturnCode</strong>, and 
current filter returns
+   * <strong>localRC</strong>, then we should return map[ReturnCode][localRC] 
for the merged result,
+   * according to The Minimal Step Rule.<br/>
+   *
+   * <pre>
+   * LocalCode\ReturnCode       INCLUDE INCLUDE_AND_NEXT_COL     
INCLUDE_AND_SEEK_NEXT_ROW  SKIP      NEXT_COL              NEXT_ROW             
     SEEK_NEXT_USING_HINT
+   * INCLUDE                    INCLUDE INCLUDE                  INCLUDE       
             INCLUDE   INCLUDE               INCLUDE                   INCLUDE
+   * INCLUDE_AND_NEXT_COL       INCLUDE INCLUDE_AND_NEXT_COL     
INCLUDE_AND_NEXT_COL       INCLUDE   INCLUDE_AND_NEXT_COL  INCLUDE_AND_NEXT_COL 
     INCLUDE
+   * INCLUDE_AND_SEEK_NEXT_ROW  INCLUDE INCLUDE_AND_NEXT_COL     
INCLUDE_AND_SEEK_NEXT_ROW  INCLUDE   INCLUDE_AND_NEXT_COL  
INCLUDE_AND_SEEK_NEXT_ROW INCLUDE
+   * SKIP                       INCLUDE INCLUDE                  INCLUDE       
             SKIP      SKIP                  SKIP                      SKIP
+   * NEXT_COL                   INCLUDE INCLUDE_AND_NEXT_COL     
INCLUDE_AND_NEXT_COL       SKIP      NEXT_COL              NEXT_COL             
     SKIP
+   * NEXT_ROW                   INCLUDE INCLUDE_AND_NEXT_COL     
INCLUDE_AND_SEEK_NEXT_ROW  SKIP      NEXT_COL              NEXT_ROW             
     SKIP
+   * SEEK_NEXT_USING_HINT       INCLUDE INCLUDE                  INCLUDE       
             SKIP      SKIP                  SKIP                      
SEEK_NEXT_USING_HINT
+   * </pre>
+   * @param rc Return code which is calculated by previous sub-filter(s) in 
filter list.
+   * @param localRC Return code of the current sub-filter in filter list.
+   * @return Return code which is merged by the return code of previous 
sub-filter(s) and the return
+   *         code of current sub-filter.
+   */
+  private ReturnCode mergeReturnCodeForOrOperator(ReturnCode rc, ReturnCode 
localRC) {
+    if (rc == null) return localRC;
+    switch (localRC) {
+    case INCLUDE:
+      return ReturnCode.INCLUDE;
+    case INCLUDE_AND_NEXT_COL:
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE, ReturnCode.SKIP,
+        ReturnCode.SEEK_NEXT_USING_HINT)) {
+        return ReturnCode.INCLUDE;
+      }
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE_AND_NEXT_COL, 
ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW,
+        ReturnCode.NEXT_COL, ReturnCode.NEXT_ROW)) {
+        return ReturnCode.INCLUDE_AND_NEXT_COL;
+      }
+      break;
+    case INCLUDE_AND_SEEK_NEXT_ROW:
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE, ReturnCode.SKIP,
+        ReturnCode.SEEK_NEXT_USING_HINT)) {
+        return ReturnCode.INCLUDE;
+      }
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE_AND_NEXT_COL, 
ReturnCode.NEXT_COL)) {
+        return ReturnCode.INCLUDE_AND_NEXT_COL;
+      }
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW, 
ReturnCode.NEXT_ROW)) {
+        return ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW;
+      }
+      break;
+    case SKIP:
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE, 
ReturnCode.INCLUDE_AND_NEXT_COL,
+        ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW)) {
+        return ReturnCode.INCLUDE;
+      }
+      if (isInReturnCodes(rc, ReturnCode.SKIP, ReturnCode.NEXT_COL, 
ReturnCode.NEXT_ROW,
+        ReturnCode.SEEK_NEXT_USING_HINT)) {
+        return ReturnCode.SKIP;
+      }
+      break;
+    case NEXT_COL:
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE)) {
+        return ReturnCode.INCLUDE;
+      }
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE_AND_NEXT_COL,
+        ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW)) {
+        return ReturnCode.INCLUDE_AND_NEXT_COL;
+      }
+      if (isInReturnCodes(rc, ReturnCode.SKIP, 
ReturnCode.SEEK_NEXT_USING_HINT)) {
+        return ReturnCode.SKIP;
+      }
+      break;
+    case NEXT_ROW:
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE)) {
+        return ReturnCode.INCLUDE;
+      }
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE_AND_NEXT_COL)) {
+        return ReturnCode.INCLUDE_AND_NEXT_COL;
+      }
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW)) {
+        return ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW;
+      }
+      if (isInReturnCodes(rc, ReturnCode.SKIP, 
ReturnCode.SEEK_NEXT_USING_HINT)) {
+        return ReturnCode.SKIP;
+      }
+      if (isInReturnCodes(rc, ReturnCode.NEXT_COL)) {
+        return ReturnCode.NEXT_COL;
+      }
+      if (isInReturnCodes(rc, ReturnCode.NEXT_ROW)) {
+        return ReturnCode.NEXT_ROW;
+      }
+    case SEEK_NEXT_USING_HINT:
+      if (isInReturnCodes(rc, ReturnCode.INCLUDE, 
ReturnCode.INCLUDE_AND_NEXT_COL,
+        ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW)) {
+        return ReturnCode.INCLUDE;
+      }
+      if (isInReturnCodes(rc, ReturnCode.SKIP, ReturnCode.NEXT_COL, 
ReturnCode.NEXT_ROW)) {
+        return ReturnCode.SKIP;
+      }
+      if (isInReturnCodes(rc, ReturnCode.SEEK_NEXT_USING_HINT)) {
+        return ReturnCode.SEEK_NEXT_USING_HINT;
+      }
+    }
+    throw new IllegalStateException(
+        "Received code is not valid. rc: " + rc + ", localRC: " + localRC);
+  }
+
+  private ReturnCode filterKeyValueWithMustPassOne(Cell c) throws IOException {
+    ReturnCode rc = null;
+    boolean everyFilterReturnHint = true;
+    Cell transformed = c;
+    for (int i = 0, n = filters.size(); i < n; i++) {
       Filter filter = filters.get(i);
-      if (operator == Operator.MUST_PASS_ALL) {
-        if (filter.filterAllRemaining()) {
-          return ReturnCode.NEXT_ROW;
-        }
-        ReturnCode code = filter.filterKeyValue(c);
-        switch (code) {
-        // Override INCLUDE and continue to evaluate.
-        case INCLUDE_AND_NEXT_COL:
-          rc = ReturnCode.INCLUDE_AND_NEXT_COL; // FindBugs 
SF_SWITCH_FALLTHROUGH
-        case INCLUDE:
-          transformed = filter.transformCell(transformed);
-          continue;
-        case SEEK_NEXT_USING_HINT:
-          seekHintFilters.add(filter);
-          continue;
-        default:
-          if (seekHintFilters.isEmpty()) {
-            return code;
-          }
-        }
-      } else if (operator == Operator.MUST_PASS_ONE) {
-        Cell prevCell = this.prevCellList.get(i);
-        if (filter.filterAllRemaining() || 
!shouldPassCurrentCellToFilter(prevCell, c, i)) {
-          seenNonHintReturnCode = true;
-          continue;
-        }
+      Cell prevCell = this.prevCellList.get(i);
+      if (filter.filterAllRemaining() || 
!shouldPassCurrentCellToFilter(prevCell, c, i)) {
+        everyFilterReturnHint = false;
+        continue;
+      }
 
-        ReturnCode localRC = filter.filterKeyValue(c);
-        // Update previous cell and return code we encountered.
-        prevFilterRCList.set(i, localRC);
-        if (c == null || localRC == ReturnCode.INCLUDE || localRC == 
ReturnCode.SKIP) {
-          // If previous return code is INCLUDE or SKIP, we should always pass 
the next cell to the
-          // corresponding sub-filter(need not test 
shouldPassCurrentCellToFilter() method), So we
-          // need not save current cell to prevCellList for saving heap memory.
-          prevCellList.set(i, null);
-        } else {
-          prevCellList.set(i, KeyValueUtil.toNewKeyCell(c));
-        }
+      ReturnCode localRC = filter.filterKeyValue(c);
 
-        if (localRC != ReturnCode.SEEK_NEXT_USING_HINT) {
-          seenNonHintReturnCode = true;
-        }
-        switch (localRC) {
-        case INCLUDE:
-          if (rc != ReturnCode.INCLUDE_AND_NEXT_COL) {
-            rc = ReturnCode.INCLUDE;
-          }
-          transformed = filter.transformCell(transformed);
-          break;
-        case INCLUDE_AND_NEXT_COL:
-          rc = ReturnCode.INCLUDE_AND_NEXT_COL;
-          transformed = filter.transformCell(transformed);
-          // must continue here to evaluate all filters
-          break;
-        case NEXT_ROW:
-          break;
-        case SKIP:
-          break;
-        case NEXT_COL:
-          break;
-        case SEEK_NEXT_USING_HINT:
-          break;
-        default:
-          throw new IllegalStateException("Received code is not valid.");
-        }
+      // Update previous return code and previous cell for filter[i].
+      updatePrevFilterRCList(i, localRC);
+      updatePrevCellList(i, c, localRC);
+
+      if (localRC != ReturnCode.SEEK_NEXT_USING_HINT) {
+        everyFilterReturnHint = false;
+      }
+
+      rc = mergeReturnCodeForOrOperator(rc, localRC);
+
+      // For INCLUDE* case, we need to update the transformed cell.
+      if (isInReturnCodes(localRC, ReturnCode.INCLUDE, 
ReturnCode.INCLUDE_AND_NEXT_COL,
+        ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW)) {
+        transformed = filter.transformCell(transformed);
       }
     }
 
-    if (!seekHintFilters.isEmpty()) {
+    this.transformedCell = transformed;
+    if (everyFilterReturnHint) {
       return ReturnCode.SEEK_NEXT_USING_HINT;
+    } else if (rc == null) {
+      // Each sub-filter in filter list got true for filterAllRemaining().
+      return ReturnCode.SKIP;
+    } else {
+      return rc;
     }
+  }
 
-    // Save the transformed Cell for transform():
-    this.transformedCell = transformed;
+  @Override
+  public ReturnCode filterKeyValue(Cell c) throws IOException {
+    if (isEmpty()) {
+      return ReturnCode.INCLUDE;
+    }
+    this.referenceCell = c;
 
-    /*
-     * The seenNonHintReturnCode flag is intended only for 
Operator.MUST_PASS_ONE branch.
-     * If we have seen non SEEK_NEXT_USING_HINT ReturnCode, respect that 
ReturnCode.
-     */
-    if (operator == Operator.MUST_PASS_ONE && !seenNonHintReturnCode) {
-      return ReturnCode.SEEK_NEXT_USING_HINT;
+    if (operator == Operator.MUST_PASS_ALL) {
+      return filterKeyValueWithMustPassAll(c);
+    } else {
+      return filterKeyValueWithMustPassOne(c);
     }
-    return rc;
   }
 
   /**
@@ -472,16 +646,14 @@ final public class FilterList extends FilterBase {
    */
   @Override
   public void filterRowCells(List<Cell> cells) throws IOException {
-    int listize = filters.size();
-    for (int i = 0; i < listize; i++) {
+    for (int i = 0, n = filters.size(); i < n; i++) {
       filters.get(i).filterRowCells(cells);
     }
   }
 
   @Override
   public boolean hasFilterRow() {
-    int listize = filters.size();
-    for (int i = 0; i < listize; i++) {
+    for (int i = 0, n = filters.size(); i < n; i++) {
       if (filters.get(i).hasFilterRow()) {
         return true;
       }
@@ -494,8 +666,7 @@ final public class FilterList extends FilterBase {
     if (isEmpty()) {
       return super.filterRow();
     }
-    int listize = filters.size();
-    for (int i = 0; i < listize; i++) {
+    for (int i = 0, n = filters.size(); i < n; i++) {
       Filter filter = filters.get(i);
       if (operator == Operator.MUST_PASS_ALL) {
         if (filter.filterRow()) {
@@ -507,18 +678,16 @@ final public class FilterList extends FilterBase {
         }
       }
     }
-    return  operator == Operator.MUST_PASS_ONE;
+    return operator == Operator.MUST_PASS_ONE;
   }
 
   /**
    * @return The filter serialized using pb
    */
   public byte[] toByteArray() throws IOException {
-    FilterProtos.FilterList.Builder builder =
-      FilterProtos.FilterList.newBuilder();
+    FilterProtos.FilterList.Builder builder = 
FilterProtos.FilterList.newBuilder();
     
builder.setOperator(FilterProtos.FilterList.Operator.valueOf(operator.name()));
-    int listize = filters.size();
-    for (int i = 0; i < listize; i++) {
+    for (int i = 0, n = filters.size(); i < n; i++) {
       builder.addFilters(ProtobufUtil.toFilter(filters.get(i)));
     }
     return builder.build().toByteArray();
@@ -531,7 +700,7 @@ final public class FilterList extends FilterBase {
    * @see #toByteArray
    */
   public static FilterList parseFrom(final byte [] pbBytes)
-  throws DeserializationException {
+      throws DeserializationException {
     FilterProtos.FilterList proto;
     try {
       proto = FilterProtos.FilterList.parseFrom(pbBytes);
@@ -542,14 +711,13 @@ final public class FilterList extends FilterBase {
     List<Filter> rowFilters = new ArrayList<>(proto.getFiltersCount());
     try {
       List<FilterProtos.Filter> filtersList = proto.getFiltersList();
-      int listSize = filtersList.size();
-      for (int i = 0; i < listSize; i++) {
+      for (int i = 0, n = filtersList.size(); i < n; i++) {
         rowFilters.add(ProtobufUtil.toFilter(filtersList.get(i)));
       }
     } catch (IOException ioe) {
       throw new DeserializationException(ioe);
     }
-    return new 
FilterList(Operator.valueOf(proto.getOperator().name()),rowFilters);
+    return new FilterList(Operator.valueOf(proto.getOperator().name()), 
rowFilters);
   }
 
   /**
@@ -589,8 +757,7 @@ final public class FilterList extends FilterBase {
     }
 
     // If any condition can pass, we need to keep the min hint
-    int listize = filters.size();
-    for (int i = 0; i < listize; i++) {
+    for (int i = 0, n = filters.size(); i < n; i++) {
       if (filters.get(i).filterAllRemaining()) {
         continue;
       }
@@ -616,8 +783,7 @@ final public class FilterList extends FilterBase {
     if (isEmpty()) {
       return super.isFamilyEssential(name);
     }
-    int listize = filters.size();
-    for (int i = 0; i < listize; i++) {
+    for (int i = 0, n = filters.size(); i < n; i++) {
       if (filters.get(i).isFamilyEssential(name)) {
         return true;
       }
@@ -627,8 +793,7 @@ final public class FilterList extends FilterBase {
 
   @Override
   public void setReversed(boolean reversed) {
-    int listize = filters.size();
-    for (int i = 0; i < listize; i++) {
+    for (int i = 0, n = filters.size(); i < n; i++) {
       filters.get(i).setReversed(reversed);
     }
     this.reversed = reversed;

http://git-wip-us.apache.org/repos/asf/hbase/blob/24a7ce84/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterList.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterList.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterList.java
index e414729..f7cba18 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterList.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterList.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -43,21 +45,12 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 
-/**
- * Tests filter sets
- *
- */
+
 @Category({FilterTests.class, SmallTests.class})
 public class TestFilterList {
   static final int MAX_PAGES = 2;
-  static final char FIRST_CHAR = 'a';
-  static final char LAST_CHAR = 'e';
-  static byte[] GOOD_BYTES = Bytes.toBytes("abc");
-  static byte[] BAD_BYTES = Bytes.toBytes("def");
-
 
   @Test
   public void testAddFilter() throws Exception {
@@ -311,23 +304,23 @@ public class TestFilterList {
     FilterList flist = new FilterList(FilterList.Operator.MUST_PASS_ONE);
     flist.addFilter(new PrefixFilter(r1));
     flist.filterRowKey(KeyValueUtil.createFirstOnRow(r1));
-    assertEquals(flist.filterKeyValue(new KeyValue(r1,r1,r1)), 
ReturnCode.INCLUDE);
-    assertEquals(flist.filterKeyValue(new KeyValue(r11,r11,r11)), 
ReturnCode.INCLUDE);
+    assertEquals(ReturnCode.INCLUDE, flist.filterKeyValue(new KeyValue(r1, r1, 
r1)));
+    assertEquals(ReturnCode.INCLUDE, flist.filterKeyValue(new KeyValue(r11, 
r11, r11)));
 
     flist.reset();
     flist.filterRowKey(KeyValueUtil.createFirstOnRow(r2));
-    assertEquals(flist.filterKeyValue(new KeyValue(r2,r2,r2)), 
ReturnCode.SKIP);
+    assertEquals(ReturnCode.SKIP, flist.filterKeyValue(new KeyValue(r2, r2, 
r2)));
 
     flist = new FilterList(FilterList.Operator.MUST_PASS_ONE);
     flist.addFilter(new AlwaysNextColFilter());
     flist.addFilter(new PrefixFilter(r1));
     flist.filterRowKey(KeyValueUtil.createFirstOnRow(r1));
-    assertEquals(flist.filterKeyValue(new KeyValue(r1,r1,r1)), 
ReturnCode.INCLUDE);
-    assertEquals(flist.filterKeyValue(new KeyValue(r11,r11,r11)), 
ReturnCode.INCLUDE);
+    assertEquals(ReturnCode.INCLUDE, flist.filterKeyValue(new KeyValue(r1, r1, 
r1)));
+    assertEquals(ReturnCode.INCLUDE, flist.filterKeyValue(new KeyValue(r11, 
r11, r11)));
 
     flist.reset();
     flist.filterRowKey(KeyValueUtil.createFirstOnRow(r2));
-    assertEquals(flist.filterKeyValue(new KeyValue(r2,r2,r2)), 
ReturnCode.SKIP);
+    assertEquals(ReturnCode.NEXT_COL, flist.filterKeyValue(new KeyValue(r2, 
r2, r2)));
   }
 
   /**
@@ -345,12 +338,12 @@ public class TestFilterList {
     flist.addFilter(new AlwaysNextColFilter());
     flist.addFilter(new InclusiveStopFilter(r1));
     flist.filterRowKey(KeyValueUtil.createFirstOnRow(r1));
-    assertEquals(flist.filterKeyValue(new KeyValue(r1,r1,r1)), 
ReturnCode.INCLUDE);
-    assertEquals(flist.filterKeyValue(new KeyValue(r11,r11,r11)), 
ReturnCode.INCLUDE);
+    assertEquals(ReturnCode.INCLUDE, flist.filterKeyValue(new KeyValue(r1, r1, 
r1)));
+    assertEquals(ReturnCode.INCLUDE, flist.filterKeyValue(new KeyValue(r11, 
r11, r11)));
 
     flist.reset();
     flist.filterRowKey(KeyValueUtil.createFirstOnRow(r2));
-    assertEquals(flist.filterKeyValue(new KeyValue(r2,r2,r2)), 
ReturnCode.SKIP);
+    assertEquals(ReturnCode.NEXT_COL, flist.filterKeyValue(new KeyValue(r2, 
r2, r2)));
   }
 
   public static class AlwaysNextColFilter extends FilterBase {
@@ -432,7 +425,7 @@ public class TestFilterList {
     FilterList mpOnefilterList = new FilterList(Operator.MUST_PASS_ONE,
         Arrays.asList(new Filter[] { includeFilter, alternateIncludeFilter, 
alternateFilter }));
     // INCLUDE, INCLUDE, INCLUDE_AND_NEXT_COL.
-    assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, 
mpOnefilterList.filterKeyValue(null));
+    assertEquals(ReturnCode.INCLUDE, mpOnefilterList.filterKeyValue(null));
     // INCLUDE, SKIP, INCLUDE.
     assertEquals(Filter.ReturnCode.INCLUDE, 
mpOnefilterList.filterKeyValue(null));
 
@@ -607,16 +600,16 @@ public class TestFilterList {
     KeyValue kv3 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("fam"), 
Bytes.toBytes("qual"),
         3, Bytes.toBytes("value"));
 
-    assertEquals(filterList01.filterKeyValue(kv1), 
ReturnCode.INCLUDE_AND_NEXT_COL);
-    assertEquals(filterList01.filterKeyValue(kv2), ReturnCode.SKIP);
-    assertEquals(filterList01.filterKeyValue(kv3), ReturnCode.SKIP);
+    assertEquals(ReturnCode.INCLUDE_AND_NEXT_COL, 
filterList01.filterKeyValue(kv1));
+    assertEquals(ReturnCode.SKIP, filterList01.filterKeyValue(kv2));
+    assertEquals(ReturnCode.SKIP, filterList01.filterKeyValue(kv3));
 
     FilterList filterList11 =
         new FilterList(Operator.MUST_PASS_ONE, new ColumnPaginationFilter(1, 
1));
 
-    assertEquals(filterList11.filterKeyValue(kv1), ReturnCode.SKIP);
-    assertEquals(filterList11.filterKeyValue(kv2), ReturnCode.SKIP);
-    assertEquals(filterList11.filterKeyValue(kv3), ReturnCode.SKIP);
+    assertEquals(ReturnCode.NEXT_COL, filterList11.filterKeyValue(kv1));
+    assertEquals(ReturnCode.SKIP, filterList11.filterKeyValue(kv2));
+    assertEquals(ReturnCode.SKIP, filterList11.filterKeyValue(kv3));
   }
 
   @Test
@@ -634,10 +627,10 @@ public class TestFilterList {
     KeyValue kv4 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("fam"), 
Bytes.toBytes("c"), 4,
         Bytes.toBytes("value"));
 
-    assertEquals(filterList.filterKeyValue(kv1), 
ReturnCode.SEEK_NEXT_USING_HINT);
-    assertEquals(filterList.filterKeyValue(kv2), ReturnCode.SKIP);
-    assertEquals(filterList.filterKeyValue(kv3), 
ReturnCode.INCLUDE_AND_NEXT_COL);
-    assertEquals(filterList.filterKeyValue(kv4), ReturnCode.SKIP);
+    assertEquals(ReturnCode.SEEK_NEXT_USING_HINT, 
filterList.filterKeyValue(kv1));
+    assertEquals(ReturnCode.SKIP, filterList.filterKeyValue(kv2));
+    assertEquals(ReturnCode.INCLUDE_AND_NEXT_COL, 
filterList.filterKeyValue(kv3));
+    assertEquals(ReturnCode.SKIP, filterList.filterKeyValue(kv4));
   }
 
   private static class MockFilter extends FilterBase {
@@ -710,6 +703,101 @@ public class TestFilterList {
     mockFilter.didCellPassToTheFilter = false;
     filter.filterKeyValue(kv4);
     assertTrue(mockFilter.didCellPassToTheFilter);
+
+    mockFilter = new MockFilter(ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW);
+    filter = new FilterList(Operator.MUST_PASS_ONE, mockFilter);
+    filter.filterKeyValue(kv1);
+    assertTrue(mockFilter.didCellPassToTheFilter);
+
+    mockFilter.didCellPassToTheFilter = false;
+    filter.filterKeyValue(kv2);
+    assertFalse(mockFilter.didCellPassToTheFilter);
+
+    mockFilter.didCellPassToTheFilter = false;
+    filter.filterKeyValue(kv3);
+    assertFalse(mockFilter.didCellPassToTheFilter);
+
+    mockFilter.didCellPassToTheFilter = false;
+    filter.filterKeyValue(kv4);
+    assertTrue(mockFilter.didCellPassToTheFilter);
+  }
+
+  @Test
+  public void testTheMaximalRule() throws IOException {
+    KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("fam"), 
Bytes.toBytes("a"), 1,
+        Bytes.toBytes("value"));
+    MockFilter filter1 = new MockFilter(ReturnCode.INCLUDE);
+    MockFilter filter2 = new MockFilter(ReturnCode.INCLUDE_AND_NEXT_COL);
+    MockFilter filter3 = new MockFilter(ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW);
+    MockFilter filter4 = new MockFilter(ReturnCode.NEXT_COL);
+    MockFilter filter5 = new MockFilter(ReturnCode.SKIP);
+    MockFilter filter6 = new MockFilter(ReturnCode.SEEK_NEXT_USING_HINT);
+    MockFilter filter7 = new MockFilter(ReturnCode.NEXT_ROW);
+
+    FilterList filterList = new FilterList(Operator.MUST_PASS_ALL, filter1, 
filter2);
+    assertEquals(ReturnCode.INCLUDE_AND_NEXT_COL, 
filterList.filterKeyValue(kv1));
+
+    filterList = new FilterList(Operator.MUST_PASS_ALL, filter2, filter3);
+    assertEquals(ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW, 
filterList.filterKeyValue(kv1));
+
+    filterList = new FilterList(Operator.MUST_PASS_ALL, filter4, filter5, 
filter6);
+    assertEquals(ReturnCode.SEEK_NEXT_USING_HINT, 
filterList.filterKeyValue(kv1));
+
+    filterList = new FilterList(Operator.MUST_PASS_ALL, filter4, filter6);
+    assertEquals(ReturnCode.SEEK_NEXT_USING_HINT, 
filterList.filterKeyValue(kv1));
+
+    filterList = new FilterList(Operator.MUST_PASS_ALL, filter3, filter1);
+    assertEquals(ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW, 
filterList.filterKeyValue(kv1));
+
+    filterList = new FilterList(Operator.MUST_PASS_ALL, filter3, filter2, 
filter1, filter5);
+    assertEquals(ReturnCode.NEXT_ROW, filterList.filterKeyValue(kv1));
+
+    filterList = new FilterList(Operator.MUST_PASS_ALL, filter2,
+        new FilterList(Operator.MUST_PASS_ALL, filter3, filter4));
+    assertEquals(ReturnCode.NEXT_ROW, filterList.filterKeyValue(kv1));
+
+    filterList = new FilterList(Operator.MUST_PASS_ALL, filter3, filter7);
+    assertEquals(ReturnCode.NEXT_ROW, filterList.filterKeyValue(kv1));
+  }
+
+  @Test
+  public void testTheMinimalRule() throws IOException {
+    KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("fam"), 
Bytes.toBytes("a"), 1,
+        Bytes.toBytes("value"));
+    MockFilter filter1 = new MockFilter(ReturnCode.INCLUDE);
+    MockFilter filter2 = new MockFilter(ReturnCode.INCLUDE_AND_NEXT_COL);
+    MockFilter filter3 = new MockFilter(ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW);
+    MockFilter filter4 = new MockFilter(ReturnCode.NEXT_COL);
+    MockFilter filter5 = new MockFilter(ReturnCode.SKIP);
+    MockFilter filter6 = new MockFilter(ReturnCode.SEEK_NEXT_USING_HINT);
+    FilterList filterList = new FilterList(Operator.MUST_PASS_ONE, filter1, 
filter2);
+    assertEquals(filterList.filterKeyValue(kv1), ReturnCode.INCLUDE);
+
+    filterList = new FilterList(Operator.MUST_PASS_ONE, filter2, filter3);
+    assertEquals(ReturnCode.INCLUDE_AND_NEXT_COL, 
filterList.filterKeyValue(kv1));
+
+    filterList = new FilterList(Operator.MUST_PASS_ONE, filter4, filter5, 
filter6);
+    assertEquals(ReturnCode.SKIP, filterList.filterKeyValue(kv1));
+
+    filterList = new FilterList(Operator.MUST_PASS_ONE, filter4, filter6);
+    assertEquals(ReturnCode.SKIP, filterList.filterKeyValue(kv1));
+
+    filterList = new FilterList(Operator.MUST_PASS_ONE, filter3, filter1);
+    assertEquals(ReturnCode.INCLUDE, filterList.filterKeyValue(kv1));
+
+    filterList = new FilterList(Operator.MUST_PASS_ONE, filter3, filter2, 
filter1, filter5);
+    assertEquals(ReturnCode.INCLUDE, filterList.filterKeyValue(kv1));
+
+    filterList = new FilterList(Operator.MUST_PASS_ONE, filter2,
+        new FilterList(Operator.MUST_PASS_ONE, filter3, filter4));
+    assertEquals(ReturnCode.INCLUDE_AND_NEXT_COL, 
filterList.filterKeyValue(kv1));
+
+    filterList = new FilterList(Operator.MUST_PASS_ONE, filter2,
+        new FilterList(Operator.MUST_PASS_ONE, filter3, filter4));
+    assertEquals(ReturnCode.INCLUDE_AND_NEXT_COL, 
filterList.filterKeyValue(kv1));
+
+    filterList = new FilterList(Operator.MUST_PASS_ONE, filter6, filter6);
+    assertEquals(ReturnCode.SEEK_NEXT_USING_HINT, 
filterList.filterKeyValue(kv1));
   }
 }
 

Reply via email to