This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 14395295b49 [fix][client] Fix concurrent lookup with properties might
have different results (#23260)
14395295b49 is described below
commit 14395295b4996dcfb7eac288d92baf1104c9c576
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Sep 6 12:01:09 2024 +0800
[fix][client] Fix concurrent lookup with properties might have different
results (#23260)
---
.../pulsar/client/api/LookupPropertiesTest.java | 43 ++++++++++++++++++++++
.../client/impl/BinaryProtoLookupService.java | 20 ++++++----
2 files changed, 55 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java
index cb8b2d1e526..768dc29731f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java
@@ -19,23 +19,30 @@
package org.apache.pulsar.client.api;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.ServiceConfiguration;
import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.namespace.LookupOptions;
+import org.apache.pulsar.client.impl.LookupTopicResult;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -72,6 +79,7 @@ public class LookupPropertiesTest extends MultiBrokerBaseTest
{
@Test
public void testLookupProperty() throws Exception {
+ admin.namespaces().unload("public/default");
final var topic = "test-lookup-property";
admin.topics().createPartitionedTopic(topic, 16);
@Cleanup final var client = (PulsarClientImpl) PulsarClient.builder()
@@ -89,7 +97,35 @@ public class LookupPropertiesTest extends
MultiBrokerBaseTest {
Assert.assertEquals(port,
additionalBrokers.get(0).getBrokerListenPort().orElseThrow());
}
+ @Test
+ public void testConcurrentLookupProperties() throws Exception {
+ @Cleanup final var client = (PulsarClientImpl) PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .build();
+ final var futures = new
ArrayList<CompletableFuture<LookupTopicResult>>();
+ BrokerIdAwareLoadManager.clientIdList.clear();
+
+ final var clientIdList = IntStream.range(0, 10).mapToObj(i -> "key-" +
i).toList();
+ for (var clientId : clientIdList) {
+
client.getConfiguration().setLookupProperties(Collections.singletonMap(CLIENT_KEY,
clientId));
+
futures.add(client.getLookup().getBroker(TopicName.get("test-concurrent-lookup-properties")));
+
client.getConfiguration().setLookupProperties(Collections.emptyMap());
+ }
+ FutureUtil.waitForAll(futures).get();
+ Assert.assertEquals(clientIdList,
BrokerIdAwareLoadManager.clientIdList);
+ }
+
public static class BrokerIdAwareLoadManager extends
ExtensibleLoadManagerImpl {
+
+ static final List<String> clientIdList =
Collections.synchronizedList(new ArrayList<>());
+
+ @Override
+ public CompletableFuture<Optional<BrokerLookupData>>
assign(Optional<ServiceUnitId> topic,
+
ServiceUnitId serviceUnit, LookupOptions options) {
+ getClientId(options).ifPresent(clientIdList::add);
+ return super.assign(topic, serviceUnit, options);
+ }
+
@Override
public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId
bundle, Set<String> excludeBrokerSet,
LookupOptions
options) {
@@ -106,5 +142,12 @@ public class LookupPropertiesTest extends
MultiBrokerBaseTest {
.orElseGet(() -> super.selectAsync(bundle,
excludeBrokerSet, options));
});
}
+
+ private static Optional<String> getClientId(LookupOptions options) {
+ if (options.getProperties() == null) {
+ return Optional.empty();
+ }
+ return
Optional.ofNullable(options.getProperties().get(CLIENT_KEY));
+ }
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 9dd04acce7e..b45d6e9f6a8 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf;
import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -32,6 +33,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
@@ -60,7 +62,7 @@ public class BinaryProtoLookupService implements
LookupService {
private final String listenerName;
private final int maxLookupRedirects;
- private final ConcurrentHashMap<TopicName,
CompletableFuture<LookupTopicResult>>
+ private final ConcurrentHashMap<Pair<TopicName, Map<String, String>>,
CompletableFuture<LookupTopicResult>>
lookupInProgress = new ConcurrentHashMap<>();
private final ConcurrentHashMap<TopicName,
CompletableFuture<PartitionedTopicMetadata>>
@@ -118,10 +120,12 @@ public class BinaryProtoLookupService implements
LookupService {
public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName)
{
long startTime = System.nanoTime();
final MutableObject<CompletableFuture> newFutureCreated = new
MutableObject<>();
+ final Pair<TopicName, Map<String, String>> key = Pair.of(topicName,
+ client.getConfiguration().getLookupProperties());
try {
- return lookupInProgress.computeIfAbsent(topicName, tpName -> {
- CompletableFuture<LookupTopicResult> newFuture =
- findBroker(serviceNameResolver.resolveHost(), false,
topicName, 0);
+ return lookupInProgress.computeIfAbsent(key, tpName -> {
+ CompletableFuture<LookupTopicResult> newFuture =
findBroker(serviceNameResolver.resolveHost(), false,
+ topicName, 0, key.getRight());
newFutureCreated.setValue(newFuture);
newFuture.thenRun(() -> {
@@ -135,7 +139,7 @@ public class BinaryProtoLookupService implements
LookupService {
} finally {
if (newFutureCreated.getValue() != null) {
newFutureCreated.getValue().whenComplete((v, ex) -> {
- lookupInProgress.remove(topicName,
newFutureCreated.getValue());
+ lookupInProgress.remove(key, newFutureCreated.getValue());
});
}
}
@@ -167,7 +171,7 @@ public class BinaryProtoLookupService implements
LookupService {
}
private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress
socketAddress,
- boolean authoritative, TopicName topicName, final int
redirectCount) {
+ boolean authoritative, TopicName topicName, final int
redirectCount, Map<String, String> properties) {
CompletableFuture<LookupTopicResult> addressFuture = new
CompletableFuture<>();
if (maxLookupRedirects > 0 && redirectCount > maxLookupRedirects) {
@@ -179,7 +183,7 @@ public class BinaryProtoLookupService implements
LookupService {
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx
-> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newLookup(topicName.toString(),
listenerName, authoritative, requestId,
- client.getConfiguration().getLookupProperties());
+ properties);
clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
if (t != null) {
// lookup failed
@@ -204,7 +208,7 @@ public class BinaryProtoLookupService implements
LookupService {
// (2) redirect to given address if response is:
redirect
if (r.redirect) {
- findBroker(responseBrokerAddress, r.authoritative,
topicName, redirectCount + 1)
+ findBroker(responseBrokerAddress, r.authoritative,
topicName, redirectCount + 1, properties)
.thenAccept(addressFuture::complete)
.exceptionally((lookupException) -> {
Throwable cause =
FutureUtil.unwrapCompletionException(lookupException);