This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f98297f3c9c [fix][client] Use dedicated executor for requests in 
BinaryProtoLookupService (#23378)
f98297f3c9c is described below

commit f98297f3c9c052d7ddd8444bc0ef876ceeb924b1
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Oct 15 21:41:26 2024 +0800

    [fix][client] Use dedicated executor for requests in 
BinaryProtoLookupService (#23378)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 .../org/apache/pulsar/broker/PulsarService.java    |  5 ++
 .../client/impl/BinaryProtoLookupService.java      | 65 +++++++++++++++++-----
 .../pulsar/client/impl/PulsarClientImpl.java       | 38 +++++++++++--
 .../client/impl/BinaryProtoLookupServiceTest.java  | 55 +++++++++++++++++-
 4 files changed, 140 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index dcc0e961275..05491d9c281 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -263,6 +263,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private final ExecutorProvider brokerClientSharedExternalExecutorProvider;
     private final ScheduledExecutorProvider 
brokerClientSharedScheduledExecutorProvider;
     private final Timer brokerClientSharedTimer;
+    private final ExecutorProvider brokerClientSharedLookupExecutorProvider;
 
     private MetricsGenerator metricsGenerator;
     private final PulsarBrokerOpenTelemetry openTelemetry;
@@ -388,6 +389,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 new ScheduledExecutorProvider(1, 
"broker-client-shared-scheduled-executor");
         this.brokerClientSharedTimer =
                 new HashedWheelTimer(new 
DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS);
+        this.brokerClientSharedLookupExecutorProvider =
+                new ScheduledExecutorProvider(1, 
"broker-client-shared-lookup-executor");
 
         // here in the constructor we don't have the offloader scheduler yet
         this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 
0);
@@ -696,6 +699,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             brokerClientSharedExternalExecutorProvider.shutdownNow();
             brokerClientSharedInternalExecutorProvider.shutdownNow();
             brokerClientSharedScheduledExecutorProvider.shutdownNow();
+            brokerClientSharedLookupExecutorProvider.shutdownNow();
             brokerClientSharedTimer.stop();
             monotonicSnapshotClock.close();
 
@@ -1687,6 +1691,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 
.internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
                 
.externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
                 
.scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider)
+                
.lookupExecutorProvider(brokerClientSharedLookupExecutorProvider)
                 .build();
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index b45d6e9f6a8..795cdc6d693 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import static java.lang.String.format;
 import static 
org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation;
 import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import io.opentelemetry.api.common.Attributes;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -29,6 +30,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -58,9 +60,11 @@ public class BinaryProtoLookupService implements 
LookupService {
     private final PulsarClientImpl client;
     private final ServiceNameResolver serviceNameResolver;
     private final boolean useTls;
-    private final ExecutorService executor;
+    private final ExecutorService scheduleExecutor;
     private final String listenerName;
     private final int maxLookupRedirects;
+    private final ExecutorService lookupPinnedExecutor;
+    private final boolean createdLookupPinnedExecutor;
 
     private final ConcurrentHashMap<Pair<TopicName, Map<String, String>>, 
CompletableFuture<LookupTopicResult>>
             lookupInProgress = new ConcurrentHashMap<>();
@@ -73,23 +77,43 @@ public class BinaryProtoLookupService implements 
LookupService {
     private final LatencyHistogram histoGetSchema;
     private final LatencyHistogram histoListTopics;
 
+    /**
+     * @deprecated use {@link
+     * #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, 
ExecutorService, ExecutorService)} instead.
+     */
+    @Deprecated
+    public BinaryProtoLookupService(PulsarClientImpl client,
+                                    String serviceUrl,
+                                    boolean useTls,
+                                    ExecutorService scheduleExecutor)
+            throws PulsarClientException {
+        this(client, serviceUrl, null, useTls, scheduleExecutor);
+    }
+
+    /**
+     * @deprecated use {@link
+     * #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, 
ExecutorService, ExecutorService)} instead.
+     */
+    @Deprecated
     public BinaryProtoLookupService(PulsarClientImpl client,
                                     String serviceUrl,
+                                    String listenerName,
                                     boolean useTls,
-                                    ExecutorService executor)
+                                    ExecutorService scheduleExecutor)
             throws PulsarClientException {
-        this(client, serviceUrl, null, useTls, executor);
+        this(client, serviceUrl, listenerName, useTls, scheduleExecutor, null);
     }
 
     public BinaryProtoLookupService(PulsarClientImpl client,
                                     String serviceUrl,
                                     String listenerName,
                                     boolean useTls,
-                                    ExecutorService executor)
+                                    ExecutorService scheduleExecutor,
+                                    ExecutorService lookupPinnedExecutor)
             throws PulsarClientException {
         this.client = client;
         this.useTls = useTls;
-        this.executor = executor;
+        this.scheduleExecutor = scheduleExecutor;
         this.maxLookupRedirects = 
client.getConfiguration().getMaxLookupRedirects();
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
@@ -103,6 +127,15 @@ public class BinaryProtoLookupService implements 
LookupService {
                 
histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", 
"metadata").build());
         histoGetSchema = 
histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", 
"schema").build());
         histoListTopics = 
histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", 
"list-topics").build());
+
+        if (lookupPinnedExecutor == null) {
+            this.createdLookupPinnedExecutor = true;
+            this.lookupPinnedExecutor =
+                    Executors.newSingleThreadExecutor(new 
DefaultThreadFactory("pulsar-client-binary-proto-lookup"));
+        } else {
+            this.createdLookupPinnedExecutor = false;
+            this.lookupPinnedExecutor = lookupPinnedExecutor;
+        }
     }
 
     @Override
@@ -180,7 +213,7 @@ public class BinaryProtoLookupService implements 
LookupService {
             return addressFuture;
         }
 
-        client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx 
-> {
+        
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
             long requestId = client.newRequestId();
             ByteBuf request = Commands.newLookup(topicName.toString(), 
listenerName, authoritative, requestId,
                     properties);
@@ -247,7 +280,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                 }
                 client.getCnxPool().releaseConnection(clientCnx);
             });
-        }).exceptionally(connectionException -> {
+        }, lookupPinnedExecutor).exceptionally(connectionException -> {
             
addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
             return null;
         });
@@ -260,7 +293,7 @@ public class BinaryProtoLookupService implements 
LookupService {
         long startTime = System.nanoTime();
         CompletableFuture<PartitionedTopicMetadata> partitionFuture = new 
CompletableFuture<>();
 
-        client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx 
-> {
+        
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
             boolean finalAutoCreationEnabled = metadataAutoCreationEnabled;
             if (!metadataAutoCreationEnabled && 
!clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) {
                 if (useFallbackForNonPIP344Brokers) {
@@ -301,7 +334,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                 }
                 client.getCnxPool().releaseConnection(clientCnx);
             });
-        }).exceptionally(connectionException -> {
+        }, lookupPinnedExecutor).exceptionally(connectionException -> {
             
partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
             return null;
         });
@@ -324,7 +357,7 @@ public class BinaryProtoLookupService implements 
LookupService {
             return schemaFuture;
         }
         InetSocketAddress socketAddress = serviceNameResolver.resolveHost();
-        client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx 
-> {
+        
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
             long requestId = client.newRequestId();
             ByteBuf request = Commands.newGetSchema(requestId, 
topicName.toString(),
                 Optional.ofNullable(BytesSchemaVersion.of(version)));
@@ -340,7 +373,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                 }
                 client.getCnxPool().releaseConnection(clientCnx);
             });
-        }).exceptionally(ex -> {
+        }, lookupPinnedExecutor).exceptionally(ex -> {
             
schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
             return null;
         });
@@ -385,7 +418,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                                          String topicsHash) {
         long startTime = System.nanoTime();
 
-        client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx 
-> {
+        
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
             long requestId = client.newRequestId();
             ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
                 namespace.toString(), requestId, mode, topicsPattern, 
topicsHash);
@@ -404,7 +437,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                 }
                 client.getCnxPool().releaseConnection(clientCnx);
             });
-        }).exceptionally((e) -> {
+        }, lookupPinnedExecutor).exceptionally((e) -> {
             long nextDelay = Math.min(backoff.next(), remainingTime.get());
             if (nextDelay <= 0) {
                 getTopicsResultFuture.completeExceptionally(
@@ -414,7 +447,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                 return null;
             }
 
-            ((ScheduledExecutorService) executor).schedule(() -> {
+            ((ScheduledExecutorService) scheduleExecutor).schedule(() -> {
                 log.warn("[namespace: {}] Could not get connection while 
getTopicsUnderNamespace -- Will try again in"
                                 + " {} ms", namespace, nextDelay);
                 remainingTime.addAndGet(-nextDelay);
@@ -428,7 +461,9 @@ public class BinaryProtoLookupService implements 
LookupService {
 
     @Override
     public void close() throws Exception {
-        // no-op
+        if (createdLookupPinnedExecutor && lookupPinnedExecutor != null && 
!lookupPinnedExecutor.isShutdown()) {
+            lookupPinnedExecutor.shutdown();
+        }
     }
 
     public static class LookupDataResult {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index e0d4bf35f8a..603844eeb78 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -113,6 +113,7 @@ public class PulsarClientImpl implements PulsarClient {
     private final boolean createdExecutorProviders;
 
     private final boolean createdScheduledProviders;
+    private final boolean createdLookupProviders;
     private LookupService lookup;
     private Map<String, LookupService> urlLookupMap = new 
ConcurrentHashMap<>();
     private final ConnectionPool cnxPool;
@@ -121,6 +122,7 @@ public class PulsarClientImpl implements PulsarClient {
     private boolean needStopTimer;
     private final ExecutorProvider externalExecutorProvider;
     private final ExecutorProvider internalExecutorProvider;
+    private final ExecutorProvider lookupExecutorProvider;
 
     private final ScheduledExecutorProvider scheduledExecutorProvider;
     private final boolean createdEventLoopGroup;
@@ -163,29 +165,39 @@ public class PulsarClientImpl implements PulsarClient {
     private TransactionCoordinatorClientImpl tcClient;
 
     public PulsarClientImpl(ClientConfigurationData conf) throws 
PulsarClientException {
-        this(conf, null, null, null, null, null, null);
+        this(conf, null, null, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup) throws PulsarClientException {
-        this(conf, eventLoopGroup, null, null, null, null, null);
+        this(conf, eventLoopGroup, null, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, null, null, null, null);
+        this(conf, eventLoopGroup, cnxPool, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool,
                             Timer timer)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, timer, null, null, null);
+        this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null);
+    }
+
+    public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool connectionPool,
+                            Timer timer, ExecutorProvider 
externalExecutorProvider,
+                            ExecutorProvider internalExecutorProvider,
+                            ScheduledExecutorProvider 
scheduledExecutorProvider)
+            throws PulsarClientException {
+        this(conf, eventLoopGroup, connectionPool, timer, 
externalExecutorProvider, internalExecutorProvider,
+                scheduledExecutorProvider, null);
     }
 
     @Builder(builderClassName = "PulsarClientImplBuilder")
     private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool connectionPool,
                              Timer timer, ExecutorProvider 
externalExecutorProvider,
                              ExecutorProvider internalExecutorProvider,
-                             ScheduledExecutorProvider 
scheduledExecutorProvider) throws PulsarClientException {
+                             ScheduledExecutorProvider 
scheduledExecutorProvider,
+                             ExecutorProvider lookupExecutorProvider) throws 
PulsarClientException {
 
         EventLoopGroup eventLoopGroupReference = null;
         ConnectionPool connectionPoolReference = null;
@@ -198,6 +210,7 @@ public class PulsarClientImpl implements PulsarClient {
             }
             this.createdExecutorProviders = externalExecutorProvider == null;
             this.createdScheduledProviders = scheduledExecutorProvider == null;
+            this.createdLookupProviders = lookupExecutorProvider == null;
             eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup 
: getEventLoopGroup(conf);
             this.eventLoopGroup = eventLoopGroupReference;
             if (conf == null || isBlank(conf.getServiceUrl())) {
@@ -218,11 +231,14 @@ public class PulsarClientImpl implements PulsarClient {
                     new ExecutorProvider(conf.getNumListenerThreads(), 
"pulsar-external-listener");
             this.internalExecutorProvider = internalExecutorProvider != null ? 
internalExecutorProvider :
                     new ExecutorProvider(conf.getNumIoThreads(), 
"pulsar-client-internal");
+            this.lookupExecutorProvider = lookupExecutorProvider != null ? 
lookupExecutorProvider :
+                    new ExecutorProvider(1, "pulsar-client-lookup");
             if (conf.getServiceUrl().startsWith("http")) {
                 lookup = new HttpLookupService(instrumentProvider, conf, 
this.eventLoopGroup);
             } else {
                 lookup = new BinaryProtoLookupService(this, 
conf.getServiceUrl(), conf.getListenerName(),
-                        conf.isUseTls(), 
this.scheduledExecutorProvider.getExecutor());
+                        conf.isUseTls(), 
this.scheduledExecutorProvider.getExecutor(),
+                        this.lookupExecutorProvider.getExecutor());
             }
             if (timer == null) {
                 this.timer = new 
HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
@@ -976,6 +992,16 @@ public class PulsarClientImpl implements PulsarClient {
                 pulsarClientException = PulsarClientException.unwrap(t);
             }
         }
+
+        if (createdLookupProviders && lookupExecutorProvider != null && 
!lookupExecutorProvider.isShutdown()) {
+            try {
+                lookupExecutorProvider.shutdownNow();
+            } catch (Throwable t) {
+                log.warn("Failed to shutdown lookupExecutorProvider", t);
+                pulsarClientException = PulsarClientException.unwrap(t);
+            }
+        }
+
         if (pulsarClientException != null) {
             throw pulsarClientException;
         }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
index f691215b04e..11e00eefcfd 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
@@ -25,25 +25,41 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.pulsar.client.api.PulsarClientException.LookupException;
 import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.BaseCommand.Type;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class BinaryProtoLookupServiceTest {
     private BinaryProtoLookupService lookup;
     private TopicName topicName;
+    private ExecutorService internalExecutor;
+
+    @AfterMethod
+    public void cleanup() throws Exception {
+        internalExecutor.shutdown();
+        lookup.close();
+    }
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -72,9 +88,13 @@ public class BinaryProtoLookupServiceTest {
         doReturn(1L).when(client).newRequestId();
         ClientConfigurationData data = new ClientConfigurationData();
         doReturn(data).when(client).getConfiguration();
+        internalExecutor =
+                Executors.newSingleThreadExecutor(new 
DefaultThreadFactory("pulsar-client-test-internal-executor"));
+        doReturn(internalExecutor).when(client).getInternalExecutorService();
+
+        lookup = spy(new BinaryProtoLookupService(client, 
"pulsar://localhost:6650", null, false,
+                mock(ExecutorService.class), internalExecutor));
 
-        lookup = spy(
-                new BinaryProtoLookupService(client, 
"pulsar://localhost:6650", false, mock(ExecutorService.class)));
         topicName = TopicName.get("persistent://tenant1/ns1/t1");
     }
 
@@ -118,6 +138,37 @@ public class BinaryProtoLookupServiceTest {
         }
     }
 
+    @Test
+    public void testCommandUnChangedInDifferentThread() throws Exception {
+        BaseCommand successCommand = Commands.newSuccessCommand(10000);
+        lookup.getBroker(topicName).get();
+        assertEquals(successCommand.getType(), Type.SUCCESS);
+        lookup.getPartitionedTopicMetadata(topicName, true, true).get();
+        assertEquals(successCommand.getType(), Type.SUCCESS);
+    }
+
+    @Test
+    public void testCommandChangedInSameThread() throws Exception {
+        AtomicReference<BaseCommand> successCommand = new AtomicReference<>();
+        internalExecutor.execute(() -> 
successCommand.set(Commands.newSuccessCommand(10000)));
+        Awaitility.await().untilAsserted(() -> {
+            BaseCommand baseCommand = successCommand.get();
+            assertNotNull(baseCommand);
+            assertEquals(baseCommand.getType(), Type.SUCCESS);
+        });
+        lookup.getBroker(topicName).get();
+        assertEquals(successCommand.get().getType(), Type.LOOKUP);
+
+        internalExecutor.execute(() -> 
successCommand.set(Commands.newSuccessCommand(10000)));
+        Awaitility.await().untilAsserted(() -> {
+            BaseCommand baseCommand = successCommand.get();
+            assertNotNull(baseCommand);
+            assertEquals(baseCommand.getType(), Type.SUCCESS);
+        });
+        lookup.getPartitionedTopicMetadata(topicName, true, true).get();
+        assertEquals(successCommand.get().getType(), 
Type.PARTITIONED_METADATA);
+    }
+
     private static LookupDataResult createLookupDataResult(String brokerUrl, 
boolean redirect) throws Exception {
         LookupDataResult lookupResult = new LookupDataResult(-1);
 

Reply via email to