bbeaudreault commented on a change in pull request #4146: URL: https://github.com/apache/hbase/pull/4146#discussion_r819724988
########## File path: hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java ########## @@ -478,53 +472,123 @@ public void testCheckAndMutateMetaTable() throws IOException { any(ClientProtos.MultiRequest.class), any()); } + private void mockScan(int scanPriority) { + int scannerId = 1; + AtomicInteger scanNextCalled = new AtomicInteger(0); + doAnswer(new Answer<Void>() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + threadPool.submit(() ->{ + ScanRequest req = invocation.getArgument(1); + RpcCallback<ScanResponse> done = invocation.getArgument(2); + if (!req.hasScannerId()) { + done.run( + ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800).setMoreResultsInRegion(true).setMoreResults(true).build()); + } else { + assertFalse("close scanner should not come in with scan priority " + scanPriority, + req.hasCloseScanner() && req.getCloseScanner()); + + Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) + .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")).setValue(Bytes.toBytes("v")).build(); + Result result = Result.create(Arrays.asList(cell)); + done.run( + ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800).setMoreResultsInRegion(true).setMoreResults(true).addResults(ProtobufUtil.toResult(result)).build()); + } + }); + return null; + } + }).when(stub).scan(assertPriority(scanPriority), any(ScanRequest.class), any()); + + doAnswer(new Answer<Void>() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + threadPool.submit(() ->{ + ScanRequest req = invocation.getArgument(1); + RpcCallback<ScanResponse> done = invocation.getArgument(2); + assertTrue("close request should have scannerId", req.hasScannerId()); + assertEquals("close request's scannerId should match", scannerId, req.getScannerId()); + assertTrue("close request should have closerScanner set", req.hasCloseScanner() && req.getCloseScanner()); + + done.run(ScanResponse.getDefaultInstance()); + }); + return null; + } + }).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any()); + } + @Test public void testScan() throws IOException, InterruptedException { + mockScan(19); try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName())) .getScanner(new Scan().setCaching(1).setMaxResultSize(1).setPriority(19))) { assertNotNull(scanner.next()); Thread.sleep(1000); } - Thread.sleep(1000); - // open, next, several renew lease, and then close - verify(stub, atLeast(4)).scan(assertPriority(19), any(ScanRequest.class), any()); + // ensures the close thread has time to finish before asserting Review comment: I spent a long time on these tests already, trying to figure out why AsyncTableResultScanner wasn't working properly due to being single-threaded. So I was hesitant to fully refactor them, since they don't seem to be currently a flaky test problem (if it ain't broke...). I can give it a quick look though. I believe the reason for the sleeps are to give time for the renew lease to occur when the prefetcher is paused. I'll need to see how easily I can mock that out -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@hbase.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org