This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8c179d449f10f68d88a3df0406cfec0bd9a346dc Author: ZhangJian He <[email protected]> AuthorDate: Thu Oct 27 23:01:37 2022 +0800 [fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when Ser… (#18219) ### Motivation https://github.com/apache/pulsar/blob/b061c6ac5833c21e483368febebd0d30679a35e1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L748-L774 The `pendingLookupRequestSemaphore` will leak when handleError. There are `LookUpRequestSemaphore` not released when removing it from `pendingRequests` related PR: #17856 ### Modifications We can't easily release the semaphore in `handleError`, because there are not only `LookUpRequest`. So release the semaphore when LookupException ### Verifying this change Add unit test case to cover this change ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc-required` (Your PR needs to update docs and you will update later) - [x] `doc-not-needed` bug fixs, no need doc - [ ] `doc` (Your PR contains doc changes) - [ ] `doc-complete` (Docs have been already added) (cherry picked from commit fad3cccf87480a7a8c3a938cf5ca539b9a033106) --- .../org/apache/pulsar/client/impl/ClientCnx.java | 3 +- .../apache/pulsar/client/impl/ClientCnxTest.java | 43 ++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 7598da17bd5..2dd4cf334bf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -764,7 +764,8 @@ public class ClientCnx extends PulsarHandler { if (pendingLookupRequestSemaphore.tryAcquire()) { future.whenComplete((lookupDataResult, throwable) -> { - if (throwable instanceof ConnectException) { + if (throwable instanceof ConnectException + || throwable instanceof PulsarClientException.LookupException) { pendingLookupRequestSemaphore.release(); } }); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index c46101fd47f..a33d338fa22 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -114,6 +114,49 @@ public class ClientCnxTest { eventLoop.shutdownGracefully(); } + @Test + public void testPendingLookupRequestSemaphoreServiceNotReady() throws Exception { + EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout")); + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setOperationTimeoutMs(10_000); + conf.setKeepAliveIntervalSeconds(0); + ClientCnx cnx = new ClientCnx(conf, eventLoop); + + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + Channel channel = mock(Channel.class); + when(ctx.channel()).thenReturn(channel); + ChannelFuture listenerFuture = mock(ChannelFuture.class); + when(listenerFuture.addListener(any())).thenReturn(listenerFuture); + when(ctx.writeAndFlush(any())).thenReturn(listenerFuture); + cnx.channelActive(ctx); + cnx.state = ClientCnx.State.Ready; + CountDownLatch countDownLatch = new CountDownLatch(1); + CompletableFuture<Exception> completableFuture = new CompletableFuture<>(); + new Thread(() -> { + try { + Thread.sleep(1_000); + CompletableFuture<BinaryProtoLookupService.LookupDataResult> future = + cnx.newLookup(null, 123); + countDownLatch.countDown(); + future.get(); + } catch (Exception e) { + completableFuture.complete(e); + } + }).start(); + countDownLatch.await(); + CommandError commandError = new CommandError(); + commandError.setRequestId(123L); + commandError.setError(ServerError.ServiceNotReady); + commandError.setMessage("Service not ready"); + cnx.handleError(commandError); + assertTrue(completableFuture.get().getCause() instanceof PulsarClientException.LookupException); + // wait for subsequent calls over + Awaitility.await().untilAsserted(() -> { + assertEquals(cnx.getPendingLookupRequestSemaphore().availablePermits(), conf.getConcurrentLookupRequest()); + }); + eventLoop.shutdownGracefully(); + } + @Test public void testPendingWaitingLookupRequestSemaphore() throws Exception { EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout"));
