This is an automated email from the ASF dual-hosted git repository. chenglei pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push: new 81c213d007f HBASE-28509 ScanResumer.resume would perform unnecessary scan when cl… (#5817) 81c213d007f is described below commit 81c213d007f3a986d69c352056245b56741b23ea Author: chenglei <cheng...@apache.org> AuthorDate: Sat Apr 20 09:54:55 2024 +0800 HBASE-28509 ScanResumer.resume would perform unnecessary scan when cl… (#5817) --- .../AsyncScanSingleRegionRpcRetryingCaller.java | 37 +++++++++++---- .../hbase/client/AsyncTableResultScanner.java | 24 ++++++++-- .../TestAsyncTableScannerCloseWhileSuspending.java | 54 ++++++++++++++++------ 3 files changed, 89 insertions(+), 26 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 6ef7d6a216a..428a3c65507 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -230,7 +230,8 @@ class AsyncScanSingleRegionRpcRetryingCaller { // Notice that, the public methods of this class is supposed to be called by upper layer only, and // package private methods can only be called within the implementation of // AsyncScanSingleRegionRpcRetryingCaller. - private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer { + @InterfaceAudience.Private + final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer { // INITIALIZED -> SUSPENDED -> RESUMED // INITIALIZED -> RESUMED @@ -250,6 +251,18 @@ class AsyncScanSingleRegionRpcRetryingCaller { @Override public void resume() { + doResume(false); + } + + /** + * This method is used when {@link ScanControllerImpl#suspend} had ever been called to get a + * {@link ScanResumerImpl}, but now user stops scan and does not need any more scan results. + */ + public void terminate() { + doResume(true); + } + + private void doResume(boolean stopScan) { // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we // just return at the first if condition without loading the resp and numValidResuls field. If // resume is called after suspend, then it is also safe to just reference resp and @@ -274,7 +287,11 @@ class AsyncScanSingleRegionRpcRetryingCaller { localResp = this.resp; localNumberOfCompleteRows = this.numberOfCompleteRows; } - completeOrNext(localResp, localNumberOfCompleteRows); + if (stopScan) { + stopScan(localResp); + } else { + completeOrNext(localResp, localNumberOfCompleteRows); + } } private void scheduleRenewLeaseTask() { @@ -535,12 +552,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { } ScanControllerState state = scanController.destroy(); if (state == ScanControllerState.TERMINATED) { - if (resp.getMoreResultsInRegion()) { - // we have more results in region but user request to stop the scan, so we need to close the - // scanner explicitly. - closeScanner(); - } - completeNoMoreResults(); + stopScan(resp); return; } int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore; @@ -552,6 +564,15 @@ class AsyncScanSingleRegionRpcRetryingCaller { completeOrNext(resp, numberOfCompleteRows); } + private void stopScan(ScanResponse resp) { + if (resp.getMoreResultsInRegion()) { + // we have more results in region but user request to stop the scan, so we need to close the + // scanner explicitly. + closeScanner(); + } + completeNoMoreResults(); + } + private void call() { // As we have a call sequence for scan, it is useless to have a different rpc timeout which is // less than the scan timeout. If the server does not respond in time(usually this will not diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java index 85f344b10b5..e36b1893e0f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java @@ -25,6 +25,7 @@ import java.io.InterruptedIOException; import java.util.ArrayDeque; import java.util.Queue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncScanSingleRegionRpcRetryingCaller.ScanResumerImpl; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -143,6 +144,25 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum resumer = null; } + private void terminateResumerIfPossible() { + if (resumer == null) { + return; + } + // AsyncTableResultScanner.close means we do not need scan results any more, but for + // ScanResumerImpl.resume, it would perform another scan on RegionServer and call + // AsyncTableResultScanner.onNext again when ScanResponse is received. This time + // AsyncTableResultScanner.onNext would do nothing else but just discard ScanResponse + // because AsyncTableResultScanner.closed is true. So here we would better save this + // unnecessary scan on RegionServer and introduce ScanResumerImpl.terminate to close + // scanner directly. + if (resumer instanceof ScanResumerImpl) { + ((ScanResumerImpl) resumer).terminate(); + } else { + resumePrefetch(); + } + resumer = null; + } + @Override public synchronized Result next() throws IOException { while (queue.isEmpty()) { @@ -173,9 +193,7 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum closed = true; queue.clear(); cacheSize = 0; - if (resumer != null) { - resumePrefetch(); - } + terminateResumerIfPossible(); notifyAll(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java index 6b51b582207..f1897c30f99 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -78,21 +80,41 @@ public class TestAsyncTableScannerCloseWhileSuspending { @Test public void testCloseScannerWhileSuspending() throws Exception { - try (ResultScanner scanner = TABLE.getScanner(new Scan().setMaxResultSize(1))) { - TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() { - - @Override - public boolean evaluate() throws Exception { - return ((AsyncTableResultScanner) scanner).isSuspended(); - } - - @Override - public String explainFailure() throws Exception { - return "The given scanner has been suspended in time"; - } - }); - assertEquals(1, getScannersCount()); - } + final AtomicInteger onNextCounter = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(1); + final Scan scan = new Scan().setMaxResultSize(1); + final AsyncTableResultScanner scanner = new AsyncTableResultScanner(TABLE_NAME, scan, 1) { + @Override + public void onNext(Result[] results, ScanController controller) { + onNextCounter.incrementAndGet(); + super.onNext(results, controller); + } + + @Override + public void onComplete() { + super.onComplete(); + latch.countDown(); + } + }; + + CONN.getTable(TABLE_NAME).scan(scan, scanner); + + TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() { + + @Override + public boolean evaluate() throws Exception { + return scanner.isSuspended(); + } + + @Override + public String explainFailure() throws Exception { + return "The given scanner has been suspended in time"; + } + }); + assertEquals(1, getScannersCount()); + assertEquals(1, onNextCounter.get()); + + scanner.close(); TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() { @Override @@ -105,5 +127,7 @@ public class TestAsyncTableScannerCloseWhileSuspending { return "Still have " + getScannersCount() + " scanners opened"; } }); + latch.await(); + assertEquals(1, onNextCounter.get()); } }