This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c85731dd135d0feb4d917d55e524daec588e0993 Author: ZhangJian He <[email protected]> AuthorDate: Fri Oct 14 19:14:56 2022 +0800 [fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when channel inactive (#17856) ### Motivation https://github.com/apache/pulsar/blob/b89c1451551a6bbe681465726906a2e61c9d8a69/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L282-L297 The `pendingLookupRequestSemaphore` will leak when channel inactive. There are `LookUpRequestSemaphore` not released when removing it from `pendingRequests` ### Modifications We can't easily release the semaphore in `channelInactive`, because there are not only `LookUpRequest`. So release the semaphore when connectionException ### Verifying this change Add unit test case to cover this change ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc-required` (Your PR needs to update docs and you will update later) - [x] `doc-not-needed` bug fixs, no need doc - [ ] `doc` (Your PR contains doc changes) - [ ] `doc-complete` (Docs have been already added) (cherry picked from commit b4518802f0dfad857abf3575758a1f69aa9457f8) --- .../org/apache/pulsar/client/impl/ClientCnx.java | 8 +++ .../apache/pulsar/client/impl/ClientCnxTest.java | 66 ++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 506a0d36ca7..7598da17bd5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -132,6 +132,9 @@ public class ClientCnx extends PulsarHandler { private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>(); private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>(); + + @VisibleForTesting + @Getter(AccessLevel.PACKAGE) private final Semaphore pendingLookupRequestSemaphore; private final Semaphore maxLookupRequestSemaphore; private final EventLoopGroup eventLoopGroup; @@ -760,6 +763,11 @@ public class ClientCnx extends PulsarHandler { TimedCompletableFuture<LookupDataResult> future = new TimedCompletableFuture<>(); if (pendingLookupRequestSemaphore.tryAcquire()) { + future.whenComplete((lookupDataResult, throwable) -> { + if (throwable instanceof ConnectException) { + pendingLookupRequestSemaphore.release(); + } + }); addPendingLookupRequests(requestId, future); ctx.writeAndFlush(request).addListener(writeFuture -> { if (!writeFuture.isSuccess()) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index 6ce4afecd02..c46101fd47f 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -32,6 +32,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Field; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadFactory; import org.apache.pulsar.client.api.PulsarClientException; @@ -44,6 +45,7 @@ import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.PulsarHandler; import org.apache.pulsar.common.util.netty.EventLoopUtil; +import org.awaitility.Awaitility; import org.testng.annotations.Test; public class ClientCnxTest { @@ -74,6 +76,70 @@ public class ClientCnxTest { eventLoop.shutdownGracefully(); } + @Test + public void testPendingLookupRequestSemaphore() throws Exception { + EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout")); + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setOperationTimeoutMs(10_000); + conf.setKeepAliveIntervalSeconds(0); + ClientCnx cnx = new ClientCnx(conf, eventLoop); + + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + Channel channel = mock(Channel.class); + when(ctx.channel()).thenReturn(channel); + ChannelFuture listenerFuture = mock(ChannelFuture.class); + when(listenerFuture.addListener(any())).thenReturn(listenerFuture); + when(ctx.writeAndFlush(any())).thenReturn(listenerFuture); + cnx.channelActive(ctx); + CountDownLatch countDownLatch = new CountDownLatch(1); + CompletableFuture<Exception> completableFuture = new CompletableFuture<>(); + new Thread(() -> { + try { + Thread.sleep(1_000); + CompletableFuture<BinaryProtoLookupService.LookupDataResult> future = + cnx.newLookup(null, 123); + countDownLatch.countDown(); + future.get(); + } catch (Exception e) { + completableFuture.complete(e); + } + }).start(); + countDownLatch.await(); + cnx.channelInactive(ctx); + assertTrue(completableFuture.get().getCause() instanceof PulsarClientException.ConnectException); + // wait for subsequent calls over + Awaitility.await().untilAsserted(() -> { + assertEquals(cnx.getPendingLookupRequestSemaphore().availablePermits(), conf.getConcurrentLookupRequest()); + }); + eventLoop.shutdownGracefully(); + } + + @Test + public void testPendingWaitingLookupRequestSemaphore() throws Exception { + EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout")); + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setOperationTimeoutMs(10_000); + conf.setKeepAliveIntervalSeconds(0); + ClientCnx cnx = new ClientCnx(conf, eventLoop); + + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + Channel channel = mock(Channel.class); + when(ctx.channel()).thenReturn(channel); + ChannelFuture listenerFuture = mock(ChannelFuture.class); + when(listenerFuture.addListener(any())).thenReturn(listenerFuture); + when(ctx.writeAndFlush(any())).thenReturn(listenerFuture); + cnx.channelActive(ctx); + for (int i = 0; i < 5001; i++) { + cnx.newLookup(null, i); + } + cnx.channelInactive(ctx); + // wait for subsequent calls over + Awaitility.await().untilAsserted(() -> { + assertEquals(cnx.getPendingLookupRequestSemaphore().availablePermits(), conf.getConcurrentLookupRequest()); + }); + eventLoop.shutdownGracefully(); + } + @Test public void testReceiveErrorAtSendConnectFrameState() throws Exception { ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
