This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f98297f3c9c [fix][client] Use dedicated executor for requests in
BinaryProtoLookupService (#23378)
f98297f3c9c is described below
commit f98297f3c9c052d7ddd8444bc0ef876ceeb924b1
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Oct 15 21:41:26 2024 +0800
[fix][client] Use dedicated executor for requests in
BinaryProtoLookupService (#23378)
Signed-off-by: Zixuan Liu <[email protected]>
---
.../org/apache/pulsar/broker/PulsarService.java | 5 ++
.../client/impl/BinaryProtoLookupService.java | 65 +++++++++++++++++-----
.../pulsar/client/impl/PulsarClientImpl.java | 38 +++++++++++--
.../client/impl/BinaryProtoLookupServiceTest.java | 55 +++++++++++++++++-
4 files changed, 140 insertions(+), 23 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index dcc0e961275..05491d9c281 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -263,6 +263,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private final ExecutorProvider brokerClientSharedExternalExecutorProvider;
private final ScheduledExecutorProvider
brokerClientSharedScheduledExecutorProvider;
private final Timer brokerClientSharedTimer;
+ private final ExecutorProvider brokerClientSharedLookupExecutorProvider;
private MetricsGenerator metricsGenerator;
private final PulsarBrokerOpenTelemetry openTelemetry;
@@ -388,6 +389,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
new ScheduledExecutorProvider(1,
"broker-client-shared-scheduled-executor");
this.brokerClientSharedTimer =
new HashedWheelTimer(new
DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS);
+ this.brokerClientSharedLookupExecutorProvider =
+ new ScheduledExecutorProvider(1,
"broker-client-shared-lookup-executor");
// here in the constructor we don't have the offloader scheduler yet
this.offloaderStats = LedgerOffloaderStats.create(false, false, null,
0);
@@ -696,6 +699,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
brokerClientSharedExternalExecutorProvider.shutdownNow();
brokerClientSharedInternalExecutorProvider.shutdownNow();
brokerClientSharedScheduledExecutorProvider.shutdownNow();
+ brokerClientSharedLookupExecutorProvider.shutdownNow();
brokerClientSharedTimer.stop();
monotonicSnapshotClock.close();
@@ -1687,6 +1691,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
.internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
.externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
.scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider)
+
.lookupExecutorProvider(brokerClientSharedLookupExecutorProvider)
.build();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index b45d6e9f6a8..795cdc6d693 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
import static java.lang.String.format;
import static
org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation;
import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
@@ -29,6 +30,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -58,9 +60,11 @@ public class BinaryProtoLookupService implements
LookupService {
private final PulsarClientImpl client;
private final ServiceNameResolver serviceNameResolver;
private final boolean useTls;
- private final ExecutorService executor;
+ private final ExecutorService scheduleExecutor;
private final String listenerName;
private final int maxLookupRedirects;
+ private final ExecutorService lookupPinnedExecutor;
+ private final boolean createdLookupPinnedExecutor;
private final ConcurrentHashMap<Pair<TopicName, Map<String, String>>,
CompletableFuture<LookupTopicResult>>
lookupInProgress = new ConcurrentHashMap<>();
@@ -73,23 +77,43 @@ public class BinaryProtoLookupService implements
LookupService {
private final LatencyHistogram histoGetSchema;
private final LatencyHistogram histoListTopics;
+ /**
+ * @deprecated use {@link
+ * #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean,
ExecutorService, ExecutorService)} instead.
+ */
+ @Deprecated
+ public BinaryProtoLookupService(PulsarClientImpl client,
+ String serviceUrl,
+ boolean useTls,
+ ExecutorService scheduleExecutor)
+ throws PulsarClientException {
+ this(client, serviceUrl, null, useTls, scheduleExecutor);
+ }
+
+ /**
+ * @deprecated use {@link
+ * #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean,
ExecutorService, ExecutorService)} instead.
+ */
+ @Deprecated
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
+ String listenerName,
boolean useTls,
- ExecutorService executor)
+ ExecutorService scheduleExecutor)
throws PulsarClientException {
- this(client, serviceUrl, null, useTls, executor);
+ this(client, serviceUrl, listenerName, useTls, scheduleExecutor, null);
}
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
String listenerName,
boolean useTls,
- ExecutorService executor)
+ ExecutorService scheduleExecutor,
+ ExecutorService lookupPinnedExecutor)
throws PulsarClientException {
this.client = client;
this.useTls = useTls;
- this.executor = executor;
+ this.scheduleExecutor = scheduleExecutor;
this.maxLookupRedirects =
client.getConfiguration().getMaxLookupRedirects();
this.serviceNameResolver = new PulsarServiceNameResolver();
this.listenerName = listenerName;
@@ -103,6 +127,15 @@ public class BinaryProtoLookupService implements
LookupService {
histo.withAttributes(Attributes.builder().put("pulsar.lookup.type",
"metadata").build());
histoGetSchema =
histo.withAttributes(Attributes.builder().put("pulsar.lookup.type",
"schema").build());
histoListTopics =
histo.withAttributes(Attributes.builder().put("pulsar.lookup.type",
"list-topics").build());
+
+ if (lookupPinnedExecutor == null) {
+ this.createdLookupPinnedExecutor = true;
+ this.lookupPinnedExecutor =
+ Executors.newSingleThreadExecutor(new
DefaultThreadFactory("pulsar-client-binary-proto-lookup"));
+ } else {
+ this.createdLookupPinnedExecutor = false;
+ this.lookupPinnedExecutor = lookupPinnedExecutor;
+ }
}
@Override
@@ -180,7 +213,7 @@ public class BinaryProtoLookupService implements
LookupService {
return addressFuture;
}
- client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx
-> {
+
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newLookup(topicName.toString(),
listenerName, authoritative, requestId,
properties);
@@ -247,7 +280,7 @@ public class BinaryProtoLookupService implements
LookupService {
}
client.getCnxPool().releaseConnection(clientCnx);
});
- }).exceptionally(connectionException -> {
+ }, lookupPinnedExecutor).exceptionally(connectionException -> {
addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
@@ -260,7 +293,7 @@ public class BinaryProtoLookupService implements
LookupService {
long startTime = System.nanoTime();
CompletableFuture<PartitionedTopicMetadata> partitionFuture = new
CompletableFuture<>();
- client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx
-> {
+
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
boolean finalAutoCreationEnabled = metadataAutoCreationEnabled;
if (!metadataAutoCreationEnabled &&
!clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) {
if (useFallbackForNonPIP344Brokers) {
@@ -301,7 +334,7 @@ public class BinaryProtoLookupService implements
LookupService {
}
client.getCnxPool().releaseConnection(clientCnx);
});
- }).exceptionally(connectionException -> {
+ }, lookupPinnedExecutor).exceptionally(connectionException -> {
partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
@@ -324,7 +357,7 @@ public class BinaryProtoLookupService implements
LookupService {
return schemaFuture;
}
InetSocketAddress socketAddress = serviceNameResolver.resolveHost();
- client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx
-> {
+
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetSchema(requestId,
topicName.toString(),
Optional.ofNullable(BytesSchemaVersion.of(version)));
@@ -340,7 +373,7 @@ public class BinaryProtoLookupService implements
LookupService {
}
client.getCnxPool().releaseConnection(clientCnx);
});
- }).exceptionally(ex -> {
+ }, lookupPinnedExecutor).exceptionally(ex -> {
schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return null;
});
@@ -385,7 +418,7 @@ public class BinaryProtoLookupService implements
LookupService {
String topicsHash) {
long startTime = System.nanoTime();
- client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx
-> {
+
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
namespace.toString(), requestId, mode, topicsPattern,
topicsHash);
@@ -404,7 +437,7 @@ public class BinaryProtoLookupService implements
LookupService {
}
client.getCnxPool().releaseConnection(clientCnx);
});
- }).exceptionally((e) -> {
+ }, lookupPinnedExecutor).exceptionally((e) -> {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
getTopicsResultFuture.completeExceptionally(
@@ -414,7 +447,7 @@ public class BinaryProtoLookupService implements
LookupService {
return null;
}
- ((ScheduledExecutorService) executor).schedule(() -> {
+ ((ScheduledExecutorService) scheduleExecutor).schedule(() -> {
log.warn("[namespace: {}] Could not get connection while
getTopicsUnderNamespace -- Will try again in"
+ " {} ms", namespace, nextDelay);
remainingTime.addAndGet(-nextDelay);
@@ -428,7 +461,9 @@ public class BinaryProtoLookupService implements
LookupService {
@Override
public void close() throws Exception {
- // no-op
+ if (createdLookupPinnedExecutor && lookupPinnedExecutor != null &&
!lookupPinnedExecutor.isShutdown()) {
+ lookupPinnedExecutor.shutdown();
+ }
}
public static class LookupDataResult {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index e0d4bf35f8a..603844eeb78 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -113,6 +113,7 @@ public class PulsarClientImpl implements PulsarClient {
private final boolean createdExecutorProviders;
private final boolean createdScheduledProviders;
+ private final boolean createdLookupProviders;
private LookupService lookup;
private Map<String, LookupService> urlLookupMap = new
ConcurrentHashMap<>();
private final ConnectionPool cnxPool;
@@ -121,6 +122,7 @@ public class PulsarClientImpl implements PulsarClient {
private boolean needStopTimer;
private final ExecutorProvider externalExecutorProvider;
private final ExecutorProvider internalExecutorProvider;
+ private final ExecutorProvider lookupExecutorProvider;
private final ScheduledExecutorProvider scheduledExecutorProvider;
private final boolean createdEventLoopGroup;
@@ -163,29 +165,39 @@ public class PulsarClientImpl implements PulsarClient {
private TransactionCoordinatorClientImpl tcClient;
public PulsarClientImpl(ClientConfigurationData conf) throws
PulsarClientException {
- this(conf, null, null, null, null, null, null);
+ this(conf, null, null, null, null, null, null, null);
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup) throws PulsarClientException {
- this(conf, eventLoopGroup, null, null, null, null, null);
+ this(conf, eventLoopGroup, null, null, null, null, null, null);
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool cnxPool)
throws PulsarClientException {
- this(conf, eventLoopGroup, cnxPool, null, null, null, null);
+ this(conf, eventLoopGroup, cnxPool, null, null, null, null, null);
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool cnxPool,
Timer timer)
throws PulsarClientException {
- this(conf, eventLoopGroup, cnxPool, timer, null, null, null);
+ this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null);
+ }
+
+ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool connectionPool,
+ Timer timer, ExecutorProvider
externalExecutorProvider,
+ ExecutorProvider internalExecutorProvider,
+ ScheduledExecutorProvider
scheduledExecutorProvider)
+ throws PulsarClientException {
+ this(conf, eventLoopGroup, connectionPool, timer,
externalExecutorProvider, internalExecutorProvider,
+ scheduledExecutorProvider, null);
}
@Builder(builderClassName = "PulsarClientImplBuilder")
private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool connectionPool,
Timer timer, ExecutorProvider
externalExecutorProvider,
ExecutorProvider internalExecutorProvider,
- ScheduledExecutorProvider
scheduledExecutorProvider) throws PulsarClientException {
+ ScheduledExecutorProvider
scheduledExecutorProvider,
+ ExecutorProvider lookupExecutorProvider) throws
PulsarClientException {
EventLoopGroup eventLoopGroupReference = null;
ConnectionPool connectionPoolReference = null;
@@ -198,6 +210,7 @@ public class PulsarClientImpl implements PulsarClient {
}
this.createdExecutorProviders = externalExecutorProvider == null;
this.createdScheduledProviders = scheduledExecutorProvider == null;
+ this.createdLookupProviders = lookupExecutorProvider == null;
eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup
: getEventLoopGroup(conf);
this.eventLoopGroup = eventLoopGroupReference;
if (conf == null || isBlank(conf.getServiceUrl())) {
@@ -218,11 +231,14 @@ public class PulsarClientImpl implements PulsarClient {
new ExecutorProvider(conf.getNumListenerThreads(),
"pulsar-external-listener");
this.internalExecutorProvider = internalExecutorProvider != null ?
internalExecutorProvider :
new ExecutorProvider(conf.getNumIoThreads(),
"pulsar-client-internal");
+ this.lookupExecutorProvider = lookupExecutorProvider != null ?
lookupExecutorProvider :
+ new ExecutorProvider(1, "pulsar-client-lookup");
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(instrumentProvider, conf,
this.eventLoopGroup);
} else {
lookup = new BinaryProtoLookupService(this,
conf.getServiceUrl(), conf.getListenerName(),
- conf.isUseTls(),
this.scheduledExecutorProvider.getExecutor());
+ conf.isUseTls(),
this.scheduledExecutorProvider.getExecutor(),
+ this.lookupExecutorProvider.getExecutor());
}
if (timer == null) {
this.timer = new
HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
@@ -976,6 +992,16 @@ public class PulsarClientImpl implements PulsarClient {
pulsarClientException = PulsarClientException.unwrap(t);
}
}
+
+ if (createdLookupProviders && lookupExecutorProvider != null &&
!lookupExecutorProvider.isShutdown()) {
+ try {
+ lookupExecutorProvider.shutdownNow();
+ } catch (Throwable t) {
+ log.warn("Failed to shutdown lookupExecutorProvider", t);
+ pulsarClientException = PulsarClientException.unwrap(t);
+ }
+ }
+
if (pulsarClientException != null) {
throw pulsarClientException;
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
index f691215b04e..11e00eefcfd 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
@@ -25,25 +25,41 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.PulsarClientException.LookupException;
import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.BaseCommand.Type;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class BinaryProtoLookupServiceTest {
private BinaryProtoLookupService lookup;
private TopicName topicName;
+ private ExecutorService internalExecutor;
+
+ @AfterMethod
+ public void cleanup() throws Exception {
+ internalExecutor.shutdown();
+ lookup.close();
+ }
@BeforeMethod
public void setup() throws Exception {
@@ -72,9 +88,13 @@ public class BinaryProtoLookupServiceTest {
doReturn(1L).when(client).newRequestId();
ClientConfigurationData data = new ClientConfigurationData();
doReturn(data).when(client).getConfiguration();
+ internalExecutor =
+ Executors.newSingleThreadExecutor(new
DefaultThreadFactory("pulsar-client-test-internal-executor"));
+ doReturn(internalExecutor).when(client).getInternalExecutorService();
+
+ lookup = spy(new BinaryProtoLookupService(client,
"pulsar://localhost:6650", null, false,
+ mock(ExecutorService.class), internalExecutor));
- lookup = spy(
- new BinaryProtoLookupService(client,
"pulsar://localhost:6650", false, mock(ExecutorService.class)));
topicName = TopicName.get("persistent://tenant1/ns1/t1");
}
@@ -118,6 +138,37 @@ public class BinaryProtoLookupServiceTest {
}
}
+ @Test
+ public void testCommandUnChangedInDifferentThread() throws Exception {
+ BaseCommand successCommand = Commands.newSuccessCommand(10000);
+ lookup.getBroker(topicName).get();
+ assertEquals(successCommand.getType(), Type.SUCCESS);
+ lookup.getPartitionedTopicMetadata(topicName, true, true).get();
+ assertEquals(successCommand.getType(), Type.SUCCESS);
+ }
+
+ @Test
+ public void testCommandChangedInSameThread() throws Exception {
+ AtomicReference<BaseCommand> successCommand = new AtomicReference<>();
+ internalExecutor.execute(() ->
successCommand.set(Commands.newSuccessCommand(10000)));
+ Awaitility.await().untilAsserted(() -> {
+ BaseCommand baseCommand = successCommand.get();
+ assertNotNull(baseCommand);
+ assertEquals(baseCommand.getType(), Type.SUCCESS);
+ });
+ lookup.getBroker(topicName).get();
+ assertEquals(successCommand.get().getType(), Type.LOOKUP);
+
+ internalExecutor.execute(() ->
successCommand.set(Commands.newSuccessCommand(10000)));
+ Awaitility.await().untilAsserted(() -> {
+ BaseCommand baseCommand = successCommand.get();
+ assertNotNull(baseCommand);
+ assertEquals(baseCommand.getType(), Type.SUCCESS);
+ });
+ lookup.getPartitionedTopicMetadata(topicName, true, true).get();
+ assertEquals(successCommand.get().getType(),
Type.PARTITIONED_METADATA);
+ }
+
private static LookupDataResult createLookupDataResult(String brokerUrl,
boolean redirect) throws Exception {
LookupDataResult lookupResult = new LookupDataResult(-1);