This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 09a17203702 [improve] [broker] Not close the socket if lookup failed
caused by bundle unloading or metadata ex (#21211)
09a17203702 is described below
commit 09a17203702a032274a56d7d0ff03c9e32b4529f
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 28 00:19:20 2023 +0800
[improve] [broker] Not close the socket if lookup failed caused by bundle
unloading or metadata ex (#21211)
### Motivation
**Background**: The Pulsar client will close the socket if it receives a
ServiceNotReady error when doing a lookup.
Closing the socket causes the other consumer or producer to reconnect and
does not make the lookup more efficient.
There are two cases that should be improved:
- If the broker gets a metadata read/write error, the broker responds with
a `ServiceNotReady` error, but it should respond with a `MetadataError`
- If the topic is unloading, the broker responds with a `ServiceNotReady`
error.
### Modifications
- Respond to the client with a `MetadataError` if the broker gets a
metadata read/write error.
- Respond to the client with a `MetadataError` if the topic is unloading
---
.../pulsar/broker/lookup/TopicLookupBase.java | 46 ++++-----
.../pulsar/broker/namespace/ServiceUnitUtils.java | 2 +-
.../pulsar/broker/web/PulsarWebResource.java | 1 +
.../loadbalance/LeaderElectionServiceTest.java | 6 +-
.../pulsar/client/api/BrokerServiceLookupTest.java | 103 +++++++++++++++++++++
5 files changed, 132 insertions(+), 26 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index bd70201cba5..a8dda145f6b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -26,7 +26,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import javax.ws.rs.Encoded;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
@@ -48,6 +47,7 @@ import
org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -318,35 +318,37 @@ public class TopicLookupBase extends PulsarWebResource {
requestId,
shouldRedirectThroughServiceUrl(conf, lookupData)));
}
}).exceptionally(ex -> {
- if (ex instanceof CompletionException && ex.getCause()
instanceof IllegalStateException) {
- log.info("Failed to lookup {} for topic {} with error
{}", clientAppId,
- topicName.toString(),
ex.getCause().getMessage());
- } else {
- log.warn("Failed to lookup {} for topic {} with error
{}", clientAppId,
- topicName.toString(), ex.getMessage(), ex);
- }
- lookupfuture.complete(
-
newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(),
requestId));
- return null;
- });
+ handleLookupError(lookupfuture,
topicName.toString(), clientAppId, requestId, ex);
+ return null;
+ });
}
-
}).exceptionally(ex -> {
- if (ex instanceof CompletionException && ex.getCause() instanceof
IllegalStateException) {
- log.info("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName.toString(),
- ex.getCause().getMessage());
- } else {
- log.warn("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName.toString(),
- ex.getMessage(), ex);
- }
-
-
lookupfuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady,
ex.getMessage(), requestId));
+ handleLookupError(lookupfuture, topicName.toString(), clientAppId,
requestId, ex);
return null;
});
return lookupfuture;
}
+ private static void handleLookupError(CompletableFuture<ByteBuf>
lookupFuture, String topicName, String clientAppId,
+ long requestId, Throwable ex){
+ final Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
+ final String errorMsg = unwrapEx.getMessage();
+ if (unwrapEx instanceof IllegalStateException) {
+ // Current broker still hold the bundle's lock, but the bundle is
being unloading.
+ log.info("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName, errorMsg);
+
lookupFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
errorMsg, requestId));
+ } else if (unwrapEx instanceof MetadataStoreException){
+ // Load bundle ownership or acquire lock failed.
+ // Differ with "IllegalStateException", print warning log.
+ log.warn("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName, errorMsg);
+
lookupFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
errorMsg, requestId));
+ } else {
+ log.warn("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName, errorMsg);
+
lookupFuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady,
errorMsg, requestId));
+ }
+ }
+
protected TopicName getTopicName(String topicDomain, String tenant, String
cluster, String namespace,
@Encoded String encodedTopic) {
String decodedName = Codec.decode(encodedTopic);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/ServiceUnitUtils.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/ServiceUnitUtils.java
index c86aac5316f..432aa29798e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/ServiceUnitUtils.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/ServiceUnitUtils.java
@@ -36,7 +36,7 @@ public final class ServiceUnitUtils {
*/
private static final String OWNER_INFO_ROOT = "/namespace";
- static String path(NamespaceBundle suname) {
+ public static String path(NamespaceBundle suname) {
// The ephemeral node path for new namespaces should always have
bundle name appended
return OWNER_INFO_ROOT + "/" + suname.toString();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index e65ef50c72a..0e25c5ce9e6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -530,6 +530,7 @@ public abstract class PulsarWebResource {
pulsar.getPulsarResources().getClusterResources().getClusterAsync(cluster)
.whenComplete((clusterDataResult, ex) -> {
if (ex != null) {
+ log.warn("[{}] Load cluster data failed:
requested={}", clientAppId, cluster);
clusterDataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
index 62faa70bbcb..008897136f8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
@@ -129,10 +129,10 @@ public class LeaderElectionServiceTest {
.topic("persistent://" + tenant + "/" + namespace + "/1p")
.create();
} catch (PulsarClientException t) {
- Assert.assertTrue(t instanceof
PulsarClientException.LookupException);
+ Assert.assertTrue(t instanceof
PulsarClientException.BrokerMetadataException
+ || t instanceof PulsarClientException.LookupException);
Assert.assertTrue(
- t.getMessage().contains(
- "java.lang.IllegalStateException: The leader
election has not yet been completed!"));
+ t.getMessage().contains("The leader election has not yet
been completed"));
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 6becc9cb578..a632608bf70 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.handler.codec.http.HttpRequest;
@@ -71,9 +72,13 @@ import
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.namespace.OwnedBundle;
+import org.apache.pulsar.broker.namespace.OwnershipCache;
+import org.apache.pulsar.broker.namespace.ServiceUnitUtils;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
+import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -87,6 +92,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.zookeeper.KeeperException;
import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
@@ -1105,4 +1111,101 @@ public class BrokerServiceLookupTest extends
ProducerConsumerBase {
return "invalid";
}
}
+
+ @Test
+ public void testLookupConnectionNotCloseIfGetUnloadingExOrMetadataEx()
throws Exception {
+ String tpName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createNonPartitionedTopic(tpName);
+ PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
+ Producer<String> producer =
pulsarClientImpl.newProducer(Schema.STRING).topic(tpName).create();
+ Consumer<String> consumer =
pulsarClientImpl.newConsumer(Schema.STRING).topic(tpName)
+ .subscriptionName("s1").isAckReceiptEnabled(true).subscribe();
+ LookupService lookupService = pulsarClientImpl.getLookup();
+ assertTrue(lookupService instanceof BinaryProtoLookupService);
+ ClientCnx lookupConnection =
pulsarClientImpl.getCnxPool().getConnection(lookupService.resolveHost()).join();
+
+ // Verify the socket will not be closed if the bundle is unloading.
+ BundleOfTopic bundleOfTopic = new BundleOfTopic(tpName);
+ bundleOfTopic.setBundleIsUnloading();
+ try {
+ lookupService.getBroker(TopicName.get(tpName)).get();
+ fail("It should failed due to the namespace bundle is unloading.");
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("is being unloaded"));
+ }
+ // Do unload topic, trigger producer & consumer reconnection.
+ pulsar.getBrokerService().getTopic(tpName,
false).join().get().close(true);
+ assertTrue(lookupConnection.ctx().channel().isActive());
+ bundleOfTopic.setBundleIsNotUnloading();
+ // Assert producer & consumer could reconnect successful.
+ producer.send("1");
+ HashSet<String> messagesReceived = new HashSet<>();
+ while (true) {
+ Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+ if (msg == null) {
+ break;
+ }
+ messagesReceived.add(msg.getValue());
+ }
+ assertTrue(messagesReceived.contains("1"));
+
+ // Verify the socket will not be closed if get a metadata ex.
+ bundleOfTopic.releaseBundleLockAndMakeAcquireFail();
+ try {
+ lookupService.getBroker(TopicName.get(tpName)).get();
+ fail("It should failed due to the acquire bundle lock fail.");
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("OperationTimeout"));
+ }
+ // Do unload topic, trigger producer & consumer reconnection.
+ pulsar.getBrokerService().getTopic(tpName,
false).join().get().close(true);
+ assertTrue(lookupConnection.ctx().channel().isActive());
+ bundleOfTopic.makeAcquireBundleLockSuccess();
+ // Assert producer could reconnect successful.
+ producer.send("2");
+ while (true) {
+ Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+ if (msg == null) {
+ break;
+ }
+ messagesReceived.add(msg.getValue());
+ }
+ assertTrue(messagesReceived.contains("2"));
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(tpName);
+ }
+
+ private class BundleOfTopic {
+
+ private NamespaceBundle namespaceBundle;
+ private OwnershipCache ownershipCache;
+ private AsyncLoadingCache<NamespaceBundle, OwnedBundle>
ownedBundlesCache;
+
+ public BundleOfTopic(String tpName) {
+ namespaceBundle =
pulsar.getNamespaceService().getBundle(TopicName.get(tpName));
+ ownershipCache = pulsar.getNamespaceService().getOwnershipCache();
+ ownedBundlesCache = WhiteboxImpl.getInternalState(ownershipCache,
"ownedBundlesCache");
+ }
+
+ private void setBundleIsUnloading() {
+ ownedBundlesCache.get(namespaceBundle).join().setActive(false);
+ }
+
+ private void setBundleIsNotUnloading() {
+ ownedBundlesCache.get(namespaceBundle).join().setActive(true);
+ }
+
+ private void releaseBundleLockAndMakeAcquireFail() throws Exception {
+ ownedBundlesCache.synchronous().invalidateAll();
+ mockZooKeeper.delete(ServiceUnitUtils.path(namespaceBundle), -1);
+ mockZooKeeper.setAlwaysFail(KeeperException.Code.OPERATIONTIMEOUT);
+ }
+
+ private void makeAcquireBundleLockSuccess() throws Exception {
+ mockZooKeeper.unsetAlwaysFail();
+ }
+ }
}