Repository: hbase
Updated Branches:
  refs/heads/master d0941127d -> 624652373


HBASE-18489 Expose scan cursor in RawScanResultConsumer


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/62465237
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/62465237
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/62465237

Branch: refs/heads/master
Commit: 624652373e6fb11a7aee0ce8bcfb7a394ed89dcb
Parents: d094112
Author: zhangduo <zhang...@apache.org>
Authored: Thu Aug 10 10:10:45 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Thu Aug 10 10:11:40 2017 +0800

----------------------------------------------------------------------
 .../AsyncScanSingleRegionRpcRetryingCaller.java |  18 +-
 .../hbase/client/RawScanResultConsumer.java     |  20 +-
 .../hbase/regionserver/RSRpcServices.java       |  15 +-
 .../hbase/regionserver/ScannerContext.java      |  12 +-
 .../hadoop/hbase/regionserver/StoreScanner.java |   7 +-
 .../hbase/client/AbstractTestScanCursor.java    | 146 ++++++++++++++
 .../hbase/client/TestRawAsyncScanCursor.java    | 107 +++++++++++
 .../hadoop/hbase/client/TestScanCursor.java     |  90 +++++++++
 .../hbase/regionserver/TestScannerCursor.java   | 191 -------------------
 9 files changed, 387 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/62465237/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index e5448d9..89b3afc 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -53,6 +53,7 @@ import 
org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
@@ -144,7 +145,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {
   private final class ScanControllerImpl implements 
RawScanResultConsumer.ScanController {
 
     // Make sure the methods are only called in this thread.
-    private final Thread callerThread = Thread.currentThread();
+    private final Thread callerThread;
+
+    private final Optional<Cursor> cursor;
 
     // INITIALIZED -> SUSPENDED -> DESTROYED
     // INITIALIZED -> TERMINATED -> DESTROYED
@@ -154,6 +157,12 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
     private ScanResumerImpl resumer;
 
+    public ScanControllerImpl(ScanResponse resp) {
+      callerThread = Thread.currentThread();
+      cursor = resp.hasCursor() ? 
Optional.of(ProtobufUtil.toCursor(resp.getCursor()))
+          : Optional.empty();
+    }
+
     private void preCheck() {
       Preconditions.checkState(Thread.currentThread() == callerThread,
         "The current thread is %s, expected thread is %s, " +
@@ -184,6 +193,11 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       this.state = ScanControllerState.DESTROYED;
       return state;
     }
+
+    @Override
+    public Optional<Cursor> cursor() {
+        return cursor;
+    }
   }
 
   private enum ScanResumerState {
@@ -479,7 +493,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       return;
     }
 
-    ScanControllerImpl scanController = new ScanControllerImpl();
+    ScanControllerImpl scanController = new ScanControllerImpl(resp);
     if (results.length > 0) {
       updateNextStartRowWhenError(results[results.length - 1]);
       consumer.onNext(results, scanController);

http://git-wip-us.apache.org/repos/asf/hbase/blob/62465237/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
index 820960b..54d4887 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.util.Optional;
+
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
@@ -47,14 +49,14 @@ public interface RawScanResultConsumer {
   }
 
   /**
-   * Used to suspend or stop a scan.
+   * Used to suspend or stop a scan, or get a scan cursor if available.
    * <p>
-   * Notice that, you should only call the methods below inside onNext or 
onHeartbeat method. A
-   * IllegalStateException will be thrown if you call them at other places.
+   * Notice that, you should only call the {@link #suspend()} or {@link 
#terminate()} inside onNext
+   * or onHeartbeat method. A IllegalStateException will be thrown if you call 
them at other places.
    * <p>
-   * You can only call one of the methods below, i.e., call suspend or 
terminate(of course you are
-   * free to not call them both), and the methods are not reentrant. A 
IllegalStateException will be
-   * thrown if you have already called one of the methods.
+   * You can only call one of the {@link #suspend()} and {@link #terminate()} 
methods(of course you
+   * are free to not call them both), and the methods are not reentrant. An 
IllegalStateException
+   * will be thrown if you have already called one of the methods.
    */
   @InterfaceAudience.Public
   interface ScanController {
@@ -75,6 +77,12 @@ public interface RawScanResultConsumer {
      * or you want to stop the scan in onHeartbeat method because it has spent 
too many time.
      */
     void terminate();
+
+    /**
+     * Get the scan cursor if available.
+     * @return The scan cursor.
+     */
+    Optional<Cursor> cursor();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/62465237/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index bc196a6..cc85ddb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -18,10 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import 
org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.com.google.common.cache.Cache;
-import org.apache.hadoop.hbase.shaded.com.google.common.cache.CacheBuilder;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -119,6 +115,9 @@ import 
org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
+import 
org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.cache.Cache;
+import org.apache.hadoop.hbase.shaded.com.google.common.cache.CacheBuilder;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
@@ -221,8 +220,6 @@ import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.zookeeper.KeeperException;
 
-import 
org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-
 /**
  * Implements the regionserver RPC services.
  */
@@ -3108,9 +3105,9 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
               // Heartbeat messages occur when the time limit has been reached.
               builder.setHeartbeatMessage(timeLimitReached);
               if (timeLimitReached && rsh.needCursor) {
-                Cell readingCell = scannerContext.getPeekedCellInHeartbeat();
-                if (readingCell != null ) {
-                  builder.setCursor(ProtobufUtil.toCursor(readingCell));
+                Cell cursorCell = scannerContext.getLastPeekedCell();
+                if (cursorCell != null ) {
+                  builder.setCursor(ProtobufUtil.toCursor(cursorCell));
                 }
               }
             }

http://git-wip-us.apache.org/repos/asf/hbase/blob/62465237/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
index 2bab82e..bc7b597 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -97,7 +95,7 @@ public class ScannerContext {
   boolean keepProgress;
   private static boolean DEFAULT_KEEP_PROGRESS = false;
 
-  private Cell peekedCellInHeartbeat = null;
+  private Cell lastPeekedCell = null;
 
   /**
    * Tracks the relevant server side metrics during scans. null when metrics 
should not be tracked
@@ -333,12 +331,12 @@ public class ScannerContext {
         || checkTimeLimit(checkerScope);
   }
 
-  public Cell getPeekedCellInHeartbeat() {
-    return peekedCellInHeartbeat;
+  Cell getLastPeekedCell() {
+    return lastPeekedCell;
   }
 
-  public void setPeekedCellInHeartbeat(Cell peekedCellInHeartbeat) {
-    this.peekedCellInHeartbeat = peekedCellInHeartbeat;
+  void setLastPeekedCell(Cell lastPeekedCell) {
+    this.lastPeekedCell = lastPeekedCell;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/62465237/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 969d485..5286c39 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -48,11 +48,10 @@ import 
org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatc
 import 
org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher;
 import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
 import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
+import 
org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.util.CollectionUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
-import 
org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-
 /**
  * Scanner scans both the memstore and the Store. Coalesce KeyValue stream 
into List&lt;KeyValue&gt;
  * for a single row.
@@ -105,7 +104,7 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
    * KVs skipped via seeking to next row/column. TODO: estimate them?
    */
   private long kvsScanned = 0;
-  private Cell prevCell = null;
+  protected Cell prevCell = null;
 
   private final long preadMaxBytes;
   private long bytesRead;
@@ -593,7 +592,6 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
       if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
         scannerContext.updateTimeProgress();
         if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
-          scannerContext.setPeekedCellInHeartbeat(prevCell);
           return 
scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
         }
       }
@@ -605,6 +603,7 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
       int cellSize = CellUtil.estimatedSerializedSizeOf(cell);
       bytesRead += cellSize;
       prevCell = cell;
+      scannerContext.setLastPeekedCell(cell);
       topChanged = false;
       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
       switch (qcode) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/62465237/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestScanCursor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestScanCursor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestScanCursor.java
new file mode 100644
index 0000000..ffd8c01
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestScanCursor.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTestConst;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public abstract class AbstractTestScanCursor {
+
+  protected final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  /**
+   * Table configuration
+   */
+  protected static TableName TABLE_NAME = TableName.valueOf("TestScanCursor");
+
+  protected static int NUM_ROWS = 5;
+  protected static byte[] ROW = Bytes.toBytes("testRow");
+  protected static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
+
+  protected static int NUM_FAMILIES = 2;
+  protected static byte[] FAMILY = Bytes.toBytes("testFamily");
+  protected static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 
NUM_FAMILIES);
+
+  protected static int NUM_QUALIFIERS = 2;
+  protected static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
+  protected static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 
NUM_QUALIFIERS);
+
+  protected static int VALUE_SIZE = 10;
+  protected static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
+
+  protected static final int TIMEOUT = 4000;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+
+    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, TIMEOUT);
+    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, TIMEOUT);
+
+    // Check the timeout condition after every cell
+    conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
+    TEST_UTIL.startMiniCluster(1);
+
+    createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
+  }
+
+  private static void createTestTable(TableName name, byte[][] rows, byte[][] 
families,
+      byte[][] qualifiers, byte[] cellValue) throws IOException {
+    TEST_UTIL.createTable(name, families).put(createPuts(rows, families, 
qualifiers, cellValue));
+  }
+
+  private static List<Put> createPuts(byte[][] rows, byte[][] families, 
byte[][] qualifiers,
+      byte[] value) throws IOException {
+    List<Put> puts = new ArrayList<>();
+    for (int row = 0; row < rows.length; row++) {
+      Put put = new Put(rows[row]);
+      for (int fam = 0; fam < families.length; fam++) {
+        for (int qual = 0; qual < qualifiers.length; qual++) {
+          KeyValue kv = new KeyValue(rows[row], families[fam], 
qualifiers[qual], qual, value);
+          put.add(kv);
+        }
+      }
+      puts.add(put);
+    }
+    return puts;
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  public static final class SparseFilter extends FilterBase {
+
+    private final boolean reversed;
+
+    public SparseFilter(boolean reversed) {
+      this.reversed = reversed;
+    }
+
+    @Override
+    public ReturnCode filterKeyValue(Cell v) throws IOException {
+      Threads.sleep(TIMEOUT / 2 + 100);
+      return Bytes.equals(CellUtil.cloneRow(v), ROWS[reversed ? 0 : NUM_ROWS - 
1])
+          ? ReturnCode.INCLUDE
+          : ReturnCode.SKIP;
+    }
+
+    @Override
+    public byte[] toByteArray() throws IOException {
+      return reversed ? new byte[] { 1 } : new byte[] { 0 };
+    }
+
+    public static Filter parseFrom(final byte[] pbBytes) {
+      return new SparseFilter(pbBytes[0] != 0);
+    }
+  }
+
+  protected Scan createScanWithSparseFilter() {
+    return new 
Scan().setMaxResultSize(Long.MAX_VALUE).setCaching(Integer.MAX_VALUE)
+        .setNeedCursorResult(true).setAllowPartialResults(true).setFilter(new 
SparseFilter(false));
+  }
+
+  protected Scan createReversedScanWithSparseFilter() {
+    return new 
Scan().setMaxResultSize(Long.MAX_VALUE).setCaching(Integer.MAX_VALUE)
+        
.setReversed(true).setNeedCursorResult(true).setAllowPartialResults(true)
+        .setFilter(new SparseFilter(true));
+  }
+
+  protected Scan createScanWithSizeLimit() {
+    return new 
Scan().setMaxResultSize(1).setCaching(Integer.MAX_VALUE).setNeedCursorResult(true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/62465237/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java
new file mode 100644
index 0000000..9caf942
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestRawAsyncScanCursor extends AbstractTestScanCursor {
+
+  private void doTest(boolean reversed)
+      throws InterruptedException, ExecutionException, IOException {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    try (AsyncConnection conn =
+        
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
+      RawAsyncTable table = conn.getRawTable(TABLE_NAME);
+      table.scan(reversed ? createReversedScanWithSparseFilter() : 
createScanWithSparseFilter(),
+        new RawScanResultConsumer() {
+
+          private int count;
+
+          @Override
+          public void onHeartbeat(ScanController controller) {
+            int row = count / NUM_FAMILIES / NUM_QUALIFIERS;
+            if (reversed) {
+              row = NUM_ROWS - 1 - row;
+            }
+            try {
+              assertArrayEquals(ROWS[row], controller.cursor().get().getRow());
+              count++;
+            } catch (Throwable e) {
+              future.completeExceptionally(e);
+              throw e;
+            }
+          }
+
+          @Override
+          public void onNext(Result[] results, ScanController controller) {
+            try {
+              assertEquals(1, results.length);
+              assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / 
NUM_QUALIFIERS);
+              // we will always provide a scan cursor if time limit is reached.
+              if (count == NUM_ROWS * NUM_FAMILIES * NUM_QUALIFIERS - 1) {
+                assertFalse(controller.cursor().isPresent());
+              } else {
+                assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1],
+                  controller.cursor().get().getRow());
+              }
+              assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], 
results[0].getRow());
+              count++;
+            } catch (Throwable e) {
+              future.completeExceptionally(e);
+              throw e;
+            }
+          }
+
+          @Override
+          public void onError(Throwable error) {
+            future.completeExceptionally(error);
+          }
+
+          @Override
+          public void onComplete() {
+            future.complete(null);
+          }
+        });
+      future.get();
+    }
+  }
+
+  @Test
+  public void testHeartbeatWithSparseFilter()
+      throws IOException, InterruptedException, ExecutionException {
+    doTest(false);
+  }
+
+  @Test
+  public void testHeartbeatWithSparseFilterReversed()
+      throws IOException, InterruptedException, ExecutionException {
+    doTest(true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/62465237/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java
new file mode 100644
index 0000000..f7798f0
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestScanCursor extends AbstractTestScanCursor {
+
+  @Test
+  public void testHeartbeatWithSparseFilter() throws Exception {
+    try (ResultScanner scanner =
+        
TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSparseFilter()))
 {
+      int num = 0;
+      Result r;
+      while ((r = scanner.next()) != null) {
+        if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
+          Assert.assertTrue(r.isCursor());
+          Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS],
+            r.getCursor().getRow());
+        } else {
+          Assert.assertFalse(r.isCursor());
+          Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], 
r.getRow());
+        }
+        num++;
+      }
+    }
+  }
+
+  @Test
+  public void testHeartbeatWithSparseFilterReversed() throws Exception {
+    try (ResultScanner scanner = TEST_UTIL.getConnection().getTable(TABLE_NAME)
+        .getScanner(createReversedScanWithSparseFilter())) {
+      int num = 0;
+      Result r;
+      while ((r = scanner.next()) != null) {
+        if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
+          Assert.assertTrue(r.isCursor());
+          Assert.assertArrayEquals(ROWS[NUM_ROWS - 1 - num / NUM_FAMILIES / 
NUM_QUALIFIERS],
+            r.getCursor().getRow());
+        } else {
+          Assert.assertFalse(r.isCursor());
+          Assert.assertArrayEquals(ROWS[0], r.getRow());
+        }
+        num++;
+      }
+    }
+  }
+
+  @Test
+  public void testSizeLimit() throws IOException {
+    try (ResultScanner scanner =
+        
TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSizeLimit()))
 {
+      int num = 0;
+      Result r;
+      while ((r = scanner.next()) != null) {
+        if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * 
NUM_QUALIFIERS) - 1) {
+          Assert.assertTrue(r.isCursor());
+          Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS],
+            r.getCursor().getRow());
+        } else {
+          Assert.assertFalse(r.isCursor());
+          Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], 
r.getRow());
+        }
+        num++;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/62465237/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerCursor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerCursor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerCursor.java
deleted file mode 100644
index e40b808..0000000
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerCursor.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTestConst;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Threads;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category(MediumTests.class)
-public class TestScannerCursor {
-
-  private static final Log LOG =
-      LogFactory.getLog(TestScannerCursor.class);
-
-  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
-
-  private static Table TABLE = null;
-
-  /**
-   * Table configuration
-   */
-  private static TableName TABLE_NAME = TableName.valueOf("TestScannerCursor");
-
-  private static int NUM_ROWS = 5;
-  private static byte[] ROW = Bytes.toBytes("testRow");
-  private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
-
-  private static int NUM_FAMILIES = 2;
-  private static byte[] FAMILY = Bytes.toBytes("testFamily");
-  private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 
NUM_FAMILIES);
-
-  private static int NUM_QUALIFIERS = 2;
-  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
-  private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 
NUM_QUALIFIERS);
-
-  private static int VALUE_SIZE = 10;
-  private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
-
-  private static final int TIMEOUT = 4000;
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    Configuration conf = TEST_UTIL.getConfiguration();
-
-    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, TIMEOUT);
-    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, TIMEOUT);
-
-    // Check the timeout condition after every cell
-    conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
-    TEST_UTIL.startMiniCluster(1);
-
-    TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
-
-  }
-
-  static Table createTestTable(TableName name, byte[][] rows, byte[][] 
families,
-      byte[][] qualifiers, byte[] cellValue) throws IOException {
-    Table ht = TEST_UTIL.createTable(name, families);
-    List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
-    ht.put(puts);
-    return ht;
-  }
-
-  static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] 
qualifiers,
-      byte[] value) throws IOException {
-    Put put;
-    ArrayList<Put> puts = new ArrayList<>();
-
-    for (int row = 0; row < rows.length; row++) {
-      put = new Put(rows[row]);
-      for (int fam = 0; fam < families.length; fam++) {
-        for (int qual = 0; qual < qualifiers.length; qual++) {
-          KeyValue kv = new KeyValue(rows[row], families[fam], 
qualifiers[qual], qual, value);
-          put.add(kv);
-        }
-      }
-      puts.add(put);
-    }
-
-    return puts;
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  public static class SparseFilter extends FilterBase {
-
-    @Override
-    public ReturnCode filterKeyValue(Cell v) throws IOException {
-      Threads.sleep(TIMEOUT / 2 + 100);
-      return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? 
ReturnCode.INCLUDE
-          : ReturnCode.SKIP;
-    }
-
-    public static Filter parseFrom(final byte[] pbBytes) {
-      return new SparseFilter();
-    }
-  }
-
-  @Test
-  public void testHeartbeatWithSparseFilter() throws Exception {
-    Scan scan = new Scan();
-    scan.setMaxResultSize(Long.MAX_VALUE);
-    scan.setCaching(Integer.MAX_VALUE);
-    scan.setNeedCursorResult(true);
-    scan.setAllowPartialResults(true);
-    scan.setFilter(new SparseFilter());
-    try(ResultScanner scanner = TABLE.getScanner(scan)) {
-      int num = 0;
-      Result r;
-      while ((r = scanner.next()) != null) {
-
-        if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
-          Assert.assertTrue(r.isCursor());
-          Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], 
r.getCursor().getRow());
-        } else {
-          Assert.assertFalse(r.isCursor());
-          Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], 
r.getRow());
-        }
-        num++;
-      }
-    }
-  }
-
-  @Test
-  public void testSizeLimit() throws IOException {
-    Scan scan = new Scan();
-    scan.setMaxResultSize(1);
-    scan.setCaching(Integer.MAX_VALUE);
-    scan.setNeedCursorResult(true);
-    try (ResultScanner scanner = TABLE.getScanner(scan)) {
-      int num = 0;
-      Result r;
-      while ((r = scanner.next()) != null) {
-
-        if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * 
NUM_QUALIFIERS)-1) {
-          Assert.assertTrue(r.isCursor());
-          Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], 
r.getCursor().getRow());
-        } else {
-          Assert.assertFalse(r.isCursor());
-          Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], 
r.getRow());
-        }
-        num++;
-      }
-    }
-  }
-
-}

Reply via email to