Repository: hbase
Updated Branches:
  refs/heads/branch-1 9b4d78c33 -> 71f22ebfb


HBASE-13193 RegionScannerImpl filters should not be reset if a partial Result 
is returned (Jonathan Lawlor)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/71f22ebf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/71f22ebf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/71f22ebf

Branch: refs/heads/branch-1
Commit: 71f22ebfb89933acd142d285cac141e25844a920
Parents: 9b4d78c
Author: stack <[email protected]>
Authored: Mon Mar 16 13:26:34 2015 -0700
Committer: stack <[email protected]>
Committed: Mon Mar 16 13:27:02 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/ClientScanner.java      | 123 ++++++++++++-------
 .../hadoop/hbase/regionserver/HRegion.java      |  14 ++-
 .../hbase/regionserver/InternalScanner.java     |   5 +
 .../hbase/TestPartialResultsFromClientSide.java |  44 ++++++-
 4 files changed, 133 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/71f22ebf/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 4394b19..e08c1d4 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -72,6 +72,12 @@ public class ClientScanner extends AbstractClientScanner {
      * result.
      */
     protected final LinkedList<Result> partialResults = new 
LinkedList<Result>();
+    /**
+     * The row for which we are accumulating partial Results (i.e. the row of 
the Results stored
+     * inside partialResults). Changes to partialResultsRow and partialResults 
are kept in sync
+     * via the methods {@link #addToPartialResults(Result)} and {@link 
#clearPartialResults()}
+     */
+    protected byte[] partialResultsRow = null;
     protected final int caching;
     protected long lastNext;
     // Keep lastResult returned successfully in case we have to reset scanner.
@@ -379,7 +385,7 @@ public class ClientScanner extends AbstractClientScanner {
           } catch (DoNotRetryIOException e) {
             // An exception was thrown which makes any partial results that we 
were collecting
             // invalid. The scanner will need to be reset to the beginning of 
a row.
-            partialResults.clear();
+            clearPartialResults();
 
             // DNRIOEs are thrown to make us break out of retries. Some types 
of DNRIOEs want us
             // to reset the scanner and come back in again.
@@ -516,7 +522,7 @@ public class ClientScanner extends AbstractClientScanner {
     if (resultsFromServer == null || resultsFromServer.length == 0) {
       if (!partialResults.isEmpty()) {
         resultsToAddToCache.add(Result.createCompleteResult(partialResults));
-        partialResults.clear();
+        clearPartialResults();
       }
 
       return resultsToAddToCache;
@@ -535,67 +541,65 @@ public class ClientScanner extends AbstractClientScanner {
       LOG.trace(sb.toString());
     }
 
-    // There are four possibilities cases that can occur while handling 
partial results
+    // There are three possibilities cases that can occur while handling 
partial results
     //
     // 1. (partial != null && partialResults.isEmpty())
     // This is the first partial result that we have received. It should be 
added to
     // the list of partialResults and await the next RPC request at which 
point another
     // portion of the complete result will be received
     //
-    // 2. (partial != null && !partialResults.isEmpty())
-    // a. values.length == 1
-    // Since partialResults contains some elements, it means that we are 
expecting to receive
-    // the remainder of the complete result within this RPC response. The fact 
that a partial result
-    // was returned and it's the ONLY result returned indicates that we are 
still receiving
-    // fragments of the complete result. The Result can be completely formed 
only when we have
-    // received all of the fragments and thus in this case we simply add the 
partial result to
-    // our list.
-    //
-    // b. values.length > 1
-    // More than one result has been returned from the server. The fact that 
we are accumulating
-    // partials in partialList and we just received more than one result back 
from the server
-    // indicates that the FIRST result we received from the server must be the 
final fragment that
-    // can be used to complete our result. What this means is that the partial 
that we received is
-    // a partial result for a different row, and at this point we should 
combine the existing
-    // partials into a complete result, clear the partialList, and begin 
accumulating partials for
-    // a new row
+    // 2. !partialResults.isEmpty()
+    // Since our partialResults list is not empty it means that we have been 
accumulating partial
+    // Results for a particular row. We cannot form the complete/whole Result 
for that row until
+    // all partials for the row have been received. Thus we loop through all 
of the Results
+    // returned from the server and determine whether or not all partial 
Results for the row have
+    // been received. We know that we have received all of the partial Results 
for the row when:
+    // i) We notice a row change in the Results
+    // ii) We see a Result for the partial row that is NOT marked as a partial 
Result
     //
-    // 3. (partial == null && !partialResults.isEmpty())
-    // No partial was received but we are accumulating partials in our list. 
That means the final
-    // fragment of the complete result will be the first Result in values[]. 
We use it to create the
-    // complete Result, clear the list, and add it to the list of Results that 
must be added to the
-    // cache. All other Results in values[] are added after the complete 
result to maintain proper
-    // ordering
-    //
-    // 4. (partial == null && partialResults.isEmpty())
+    // 3. (partial == null && partialResults.isEmpty())
     // Business as usual. We are not accumulating partial results and there 
wasn't a partial result
     // in the RPC response. This means that all of the results we received 
from the server are
     // complete and can be added directly to the cache
     if (partial != null && partialResults.isEmpty()) {
-      partialResults.add(partial);
+      addToPartialResults(partial);
 
       // Exclude the last result, it's a partial
       addResultsToList(resultsToAddToCache, resultsFromServer, 0, 
resultsFromServer.length - 1);
-    } else if (partial != null && !partialResults.isEmpty()) {
-      if (resultsFromServer.length > 1) {
-        Result finalResult = resultsFromServer[0];
-        partialResults.add(finalResult);
-        resultsToAddToCache.add(Result.createCompleteResult(partialResults));
-        partialResults.clear();
+    } else if (!partialResults.isEmpty()) {
+      for (int i = 0; i < resultsFromServer.length; i++) {
+        Result result = resultsFromServer[i];
+
+        // This result is from the same row as the partial Results. Add it to 
the list of partials
+        // and check if it was the last partial Result for that row
+        if (Bytes.equals(partialResultsRow, result.getRow())) {
+          addToPartialResults(result);
+
+          // If the result is not a partial, it is a signal to us that it is 
the last Result we
+          // need to form the complete Result client-side
+          if (!result.isPartial()) {
+            
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
+            clearPartialResults();
+          }
+        } else {
+          // The row of this result differs from the row of the partial 
results we have received so
+          // far. If our list of partials isn't empty, this is a signal to 
form the complete Result
+          // since the row has now changed
+          if (!partialResults.isEmpty()) {
+            
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
+            clearPartialResults();
+          }
 
-        // Exclude first result, it was used to form our complete result
-        // Exclude last result, it's a partial result
-        addResultsToList(resultsToAddToCache, resultsFromServer, 1, 
resultsFromServer.length - 1);
+          // It's possible that in one response from the server we receive the 
final partial for
+          // one row and receive a partial for a different row. Thus, make 
sure that all Results
+          // are added to the proper list
+          if (result.isPartial()) {
+            addToPartialResults(result);
+          } else {
+            resultsToAddToCache.add(result);
+          }
+        }
       }
-      partialResults.add(partial);
-    } else if (partial == null && !partialResults.isEmpty()) {
-      Result finalResult = resultsFromServer[0];
-      partialResults.add(finalResult);
-      resultsToAddToCache.add(Result.createCompleteResult(partialResults));
-      partialResults.clear();
-
-      // Exclude the first result, it was used to form our complete result
-      addResultsToList(resultsToAddToCache, resultsFromServer, 1, 
resultsFromServer.length);
     } else { // partial == null && partialResults.isEmpty() -- business as 
usual
       addResultsToList(resultsToAddToCache, resultsFromServer, 0, 
resultsFromServer.length);
     }
@@ -604,6 +608,31 @@ public class ClientScanner extends AbstractClientScanner {
   }
 
   /**
+   * A convenience method for adding a Result to our list of partials. This 
method ensure that only
+   * Results that belong to the same row as the other partials can be added to 
the list.
+   * @param result The result that we want to add to our list of partial 
Results
+   * @throws IOException
+   */
+  private void addToPartialResults(final Result result) throws IOException {
+    final byte[] row = result.getRow();
+    if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) {
+      throw new IOException("Partial result row does not match. All partial 
results must come "
+          + "from the same row. partialResultsRow: " + 
Bytes.toString(partialResultsRow) + "row: "
+          + Bytes.toString(row));
+    }
+    partialResultsRow = row;
+    partialResults.add(result);
+  }
+
+  /**
+   * Convenience method for clearing the list of partials and resetting the 
partialResultsRow.
+   */
+  private void clearPartialResults() {
+    partialResults.clear();
+    partialResultsRow = null;
+  }
+
+  /**
    * Helper method for adding results between the indices [start, end) to the 
outputList
    * @param outputList the list that results will be added to
    * @param inputArray the array that results are taken from

http://git-wip-us.apache.org/repos/asf/hbase/blob/71f22ebf/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 653cafe..704947d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5539,13 +5539,17 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver { //
         state = nextInternal(tmpList, batchLimit, remainingResultSize);
         outResults.addAll(tmpList);
       }
-      // State should never be null, this is a precautionary measure
-      if (state == null) {
-        if (LOG.isTraceEnabled()) LOG.trace("State was null. Defaulting to no 
more values state");
-        state = NextState.makeState(NextState.State.NO_MORE_VALUES);
+      // Invalid states should never be returned. Receiving an invalid state 
means that we have
+      // no clue how to proceed. Throw an exception.
+      if (!NextState.isValidState(state)) {
+        throw new IOException("Invalid state returned from nextInternal. 
state:" + state);
       }
 
-      resetFilters();
+      // If the size limit was reached it means a partial Result is being 
returned. Returning a
+      // partial Result means that we should not reset the filters; filters 
should only be reset in
+      // between rows
+      if (!state.sizeLimitReached()) resetFilters();
+
       if (isFilterDoneInternal()) {
         state = NextState.makeState(NextState.State.NO_MORE_VALUES, 
state.getResultSize());
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/71f22ebf/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
index e68dc75..ea5a75f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
@@ -201,6 +201,11 @@ public interface InternalScanner extends Closeable {
       return resultSize >= 0;
     }
 
+    @Override
+    public String toString() {
+      return "State: " + state + " resultSize: " + resultSize;
+    }
+
     /**
      * Helper method to centralize all checks as to whether or not the state 
is valid.
      * @param state

http://git-wip-us.apache.org/repos/asf/hbase/blob/71f22ebf/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
index 75fa6e3..e7c3813 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
@@ -24,7 +24,9 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +37,11 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
+import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter;
 import org.apache.hadoop.hbase.filter.RandomRowFilter;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -501,7 +508,7 @@ public class TestPartialResultsFromClientSide {
    * because the entire row needs to be read for the include/exclude decision 
to be made
    */
   @Test
-  public void testNoPartialResultsWhenFilterPresent() throws Exception {
+  public void testNoPartialResultsWhenRowFilterPresent() throws Exception {
     Scan scan = new Scan();
     scan.setMaxResultSize(1);
     scan.setAllowPartialResults(true);
@@ -784,4 +791,39 @@ public class TestPartialResultsFromClientSide {
     scanner.close();
     return numCells;
   }
+
+  /**
+   * Test partial Result re-assembly in the presence of different filters. The 
Results from the
+   * partial scanner should match the Results returned from a scanner that 
receives all of the
+   * results in one RPC to the server. The partial scanner is tested with a 
variety of different
+   * result sizes (all of which are less than the size necessary to fetch an 
entire row)
+   * @throws Exception
+   */
+  @Test
+  public void testPartialResultsWithColumnFilter() throws Exception {
+    testPartialResultsWithColumnFilter(new FirstKeyOnlyFilter());
+    testPartialResultsWithColumnFilter(new 
ColumnPrefixFilter(Bytes.toBytes("testQualifier5")));
+    testPartialResultsWithColumnFilter(new 
ColumnRangeFilter(Bytes.toBytes("testQualifer1"), true,
+        Bytes.toBytes("testQualifier7"), true));
+
+    Set<byte[]> qualifiers = new LinkedHashSet<>();
+    qualifiers.add(Bytes.toBytes("testQualifier5"));
+    testPartialResultsWithColumnFilter(new 
FirstKeyValueMatchingQualifiersFilter(qualifiers));
+  }
+
+  public void testPartialResultsWithColumnFilter(Filter filter) throws 
Exception {
+    assertTrue(!filter.hasFilterRow());
+
+    Scan partialScan = new Scan();
+    partialScan.setFilter(filter);
+
+    Scan oneshotScan = new Scan();
+    oneshotScan.setFilter(filter);
+    oneshotScan.setMaxResultSize(Long.MAX_VALUE);
+
+    for (int i = 1; i <= NUM_COLS; i++) {
+      partialScan.setMaxResultSize(getResultSizeForNumberOfCells(i));
+      testEquivalenceOfScanResults(TABLE, partialScan, oneshotScan);
+    }
+  }
 }

Reply via email to