This is an automated email from the ASF dual-hosted git repository.

chenglei pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 81c213d007f HBASE-28509 ScanResumer.resume would perform unnecessary 
scan when cl… (#5817)
81c213d007f is described below

commit 81c213d007f3a986d69c352056245b56741b23ea
Author: chenglei <cheng...@apache.org>
AuthorDate: Sat Apr 20 09:54:55 2024 +0800

    HBASE-28509 ScanResumer.resume would perform unnecessary scan when cl… 
(#5817)
---
 .../AsyncScanSingleRegionRpcRetryingCaller.java    | 37 +++++++++++----
 .../hbase/client/AsyncTableResultScanner.java      | 24 ++++++++--
 .../TestAsyncTableScannerCloseWhileSuspending.java | 54 ++++++++++++++++------
 3 files changed, 89 insertions(+), 26 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 6ef7d6a216a..428a3c65507 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -230,7 +230,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
   // Notice that, the public methods of this class is supposed to be called by 
upper layer only, and
   // package private methods can only be called within the implementation of
   // AsyncScanSingleRegionRpcRetryingCaller.
-  private final class ScanResumerImpl implements 
AdvancedScanResultConsumer.ScanResumer {
+  @InterfaceAudience.Private
+  final class ScanResumerImpl implements 
AdvancedScanResultConsumer.ScanResumer {
 
     // INITIALIZED -> SUSPENDED -> RESUMED
     // INITIALIZED -> RESUMED
@@ -250,6 +251,18 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
     @Override
     public void resume() {
+      doResume(false);
+    }
+
+    /**
+     * This method is used when {@link ScanControllerImpl#suspend} had ever 
been called to get a
+     * {@link ScanResumerImpl}, but now user stops scan and does not need any 
more scan results.
+     */
+    public void terminate() {
+      doResume(true);
+    }
+
+    private void doResume(boolean stopScan) {
       // just used to fix findbugs warnings. In fact, if resume is called 
before prepare, then we
       // just return at the first if condition without loading the resp and 
numValidResuls field. If
       // resume is called after suspend, then it is also safe to just 
reference resp and
@@ -274,7 +287,11 @@ class AsyncScanSingleRegionRpcRetryingCaller {
         localResp = this.resp;
         localNumberOfCompleteRows = this.numberOfCompleteRows;
       }
-      completeOrNext(localResp, localNumberOfCompleteRows);
+      if (stopScan) {
+        stopScan(localResp);
+      } else {
+        completeOrNext(localResp, localNumberOfCompleteRows);
+      }
     }
 
     private void scheduleRenewLeaseTask() {
@@ -535,12 +552,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     }
     ScanControllerState state = scanController.destroy();
     if (state == ScanControllerState.TERMINATED) {
-      if (resp.getMoreResultsInRegion()) {
-        // we have more results in region but user request to stop the scan, 
so we need to close the
-        // scanner explicitly.
-        closeScanner();
-      }
-      completeNoMoreResults();
+      stopScan(resp);
       return;
     }
     int numberOfCompleteRows = resultCache.numberOfCompleteRows() - 
numberOfCompleteRowsBefore;
@@ -552,6 +564,15 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     completeOrNext(resp, numberOfCompleteRows);
   }
 
+  private void stopScan(ScanResponse resp) {
+    if (resp.getMoreResultsInRegion()) {
+      // we have more results in region but user request to stop the scan, so 
we need to close the
+      // scanner explicitly.
+      closeScanner();
+    }
+    completeNoMoreResults();
+  }
+
   private void call() {
     // As we have a call sequence for scan, it is useless to have a different 
rpc timeout which is
     // less than the scan timeout. If the server does not respond in 
time(usually this will not
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
index 85f344b10b5..e36b1893e0f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
@@ -25,6 +25,7 @@ import java.io.InterruptedIOException;
 import java.util.ArrayDeque;
 import java.util.Queue;
 import org.apache.hadoop.hbase.TableName;
+import 
org.apache.hadoop.hbase.client.AsyncScanSingleRegionRpcRetryingCaller.ScanResumerImpl;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -143,6 +144,25 @@ class AsyncTableResultScanner implements ResultScanner, 
AdvancedScanResultConsum
     resumer = null;
   }
 
+  private void terminateResumerIfPossible() {
+    if (resumer == null) {
+      return;
+    }
+    // AsyncTableResultScanner.close means we do not need scan results any 
more, but for
+    // ScanResumerImpl.resume, it would perform another scan on RegionServer 
and call
+    // AsyncTableResultScanner.onNext again when ScanResponse is received. 
This time
+    // AsyncTableResultScanner.onNext would do nothing else but just discard 
ScanResponse
+    // because AsyncTableResultScanner.closed is true. So here we would better 
save this
+    // unnecessary scan on RegionServer and introduce 
ScanResumerImpl.terminate to close
+    // scanner directly.
+    if (resumer instanceof ScanResumerImpl) {
+      ((ScanResumerImpl) resumer).terminate();
+    } else {
+      resumePrefetch();
+    }
+    resumer = null;
+  }
+
   @Override
   public synchronized Result next() throws IOException {
     while (queue.isEmpty()) {
@@ -173,9 +193,7 @@ class AsyncTableResultScanner implements ResultScanner, 
AdvancedScanResultConsum
     closed = true;
     queue.clear();
     cacheSize = 0;
-    if (resumer != null) {
-      resumePrefetch();
-    }
+    terminateResumerIfPossible();
     notifyAll();
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java
index 6b51b582207..f1897c30f99 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.client;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -78,21 +80,41 @@ public class TestAsyncTableScannerCloseWhileSuspending {
 
   @Test
   public void testCloseScannerWhileSuspending() throws Exception {
-    try (ResultScanner scanner = TABLE.getScanner(new 
Scan().setMaxResultSize(1))) {
-      TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {
-
-        @Override
-        public boolean evaluate() throws Exception {
-          return ((AsyncTableResultScanner) scanner).isSuspended();
-        }
-
-        @Override
-        public String explainFailure() throws Exception {
-          return "The given scanner has been suspended in time";
-        }
-      });
-      assertEquals(1, getScannersCount());
-    }
+    final AtomicInteger onNextCounter = new AtomicInteger(0);
+    final CountDownLatch latch = new CountDownLatch(1);
+    final Scan scan = new Scan().setMaxResultSize(1);
+    final AsyncTableResultScanner scanner = new 
AsyncTableResultScanner(TABLE_NAME, scan, 1) {
+      @Override
+      public void onNext(Result[] results, ScanController controller) {
+        onNextCounter.incrementAndGet();
+        super.onNext(results, controller);
+      }
+
+      @Override
+      public void onComplete() {
+        super.onComplete();
+        latch.countDown();
+      }
+    };
+
+    CONN.getTable(TABLE_NAME).scan(scan, scanner);
+
+    TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return scanner.isSuspended();
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "The given scanner has been suspended in time";
+      }
+    });
+    assertEquals(1, getScannersCount());
+    assertEquals(1, onNextCounter.get());
+
+    scanner.close();
     TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {
 
       @Override
@@ -105,5 +127,7 @@ public class TestAsyncTableScannerCloseWhileSuspending {
         return "Still have " + getScannersCount() + " scanners opened";
       }
     });
+    latch.await();
+    assertEquals(1, onNextCounter.get());
   }
 }

Reply via email to