This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e7c2a75473b [improve][broker] Do not close the socket if lookup failed
due to LockBusyException (#21993)
e7c2a75473b is described below
commit e7c2a75473b545134a3b292ae0e87a79d65cb756
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Feb 2 10:12:19 2024 +0800
[improve][broker] Do not close the socket if lookup failed due to
LockBusyException (#21993)
---
.../pulsar/broker/lookup/TopicLookupBase.java | 8 +++--
.../pulsar/client/api/BrokerServiceLookupTest.java | 39 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index c4a39cd0d44..7b2c7774148 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -30,6 +30,7 @@ import javax.ws.rs.Encoded;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -333,13 +334,16 @@ public class TopicLookupBase extends PulsarWebResource {
private static void handleLookupError(CompletableFuture<ByteBuf>
lookupFuture, String topicName, String clientAppId,
long requestId, Throwable ex){
- final Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
+ Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
final String errorMsg = unwrapEx.getMessage();
+ if (unwrapEx instanceof PulsarServerException) {
+ unwrapEx =
FutureUtil.unwrapCompletionException(unwrapEx.getCause());
+ }
if (unwrapEx instanceof IllegalStateException) {
// Current broker still hold the bundle's lock, but the bundle is
being unloading.
log.info("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName, errorMsg);
lookupFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
errorMsg, requestId));
- } else if (unwrapEx instanceof MetadataStoreException){
+ } else if (unwrapEx instanceof MetadataStoreException) {
// Load bundle ownership or acquire lock failed.
// Differ with "IllegalStateException", print warning log.
log.warn("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName, errorMsg);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index dab4fe9087e..0a4c5b7a318 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -71,6 +71,7 @@ import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
+import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnedBundle;
import org.apache.pulsar.broker.namespace.OwnershipCache;
@@ -1208,4 +1209,42 @@ public class BrokerServiceLookupTest extends
ProducerConsumerBase {
mockZooKeeper.unsetAlwaysFail();
}
}
+
+ @Test(timeOut = 30000)
+ public void
testLookupConnectionNotCloseIfFailedToAcquireOwnershipOfBundle() throws
Exception {
+ String tpName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createNonPartitionedTopic(tpName);
+ final var pulsarClientImpl = (PulsarClientImpl) pulsarClient;
+ final var cache = pulsar.getNamespaceService().getOwnershipCache();
+ final var bundle =
pulsar.getNamespaceService().getBundle(TopicName.get(tpName));
+ final var value = cache.getOwnerAsync(bundle).get().orElse(null);
+ assertNotNull(value);
+
+ cache.invalidateLocalOwnerCache();
+ final var lock =
pulsar.getCoordinationService().getLockManager(NamespaceEphemeralData.class)
+ .acquireLock(ServiceUnitUtils.path(bundle), new
NamespaceEphemeralData()).join();
+ lock.updateValue(null);
+ log.info("Updated bundle {} with null", bundle.getBundleRange());
+
+ // wait for the system topic reader to __change_events is closed,
otherwise the test will be affected
+ Thread.sleep(500);
+
+ final var future =
pulsarClientImpl.getLookup().getBroker(TopicName.get(tpName));
+ final var cnx =
pulsarClientImpl.getCnxPool().getConnections().stream().findAny()
+ .map(CompletableFuture::join).orElse(null);
+ assertNotNull(cnx);
+
+ try {
+ future.get();
+ fail();
+ } catch (ExecutionException e) {
+ log.info("getBroker failed with {}: {}",
e.getCause().getClass().getName(), e.getMessage());
+ assertTrue(e.getCause() instanceof
PulsarClientException.BrokerMetadataException);
+ assertTrue(cnx.ctx().channel().isActive());
+ lock.updateValue(value);
+ lock.release();
+ assertTrue(e.getMessage().contains("Failed to acquire ownership"));
+
pulsarClientImpl.getLookup().getBroker(TopicName.get(tpName)).get();
+ }
+ }
}