This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e7c2a75473b [improve][broker] Do not close the socket if lookup failed 
due to LockBusyException (#21993)
e7c2a75473b is described below

commit e7c2a75473b545134a3b292ae0e87a79d65cb756
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Feb 2 10:12:19 2024 +0800

    [improve][broker] Do not close the socket if lookup failed due to 
LockBusyException (#21993)
---
 .../pulsar/broker/lookup/TopicLookupBase.java      |  8 +++--
 .../pulsar/client/api/BrokerServiceLookupTest.java | 39 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index c4a39cd0d44..7b2c7774148 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -30,6 +30,7 @@ import javax.ws.rs.Encoded;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -333,13 +334,16 @@ public class TopicLookupBase extends PulsarWebResource {
 
     private static void handleLookupError(CompletableFuture<ByteBuf> 
lookupFuture, String topicName, String clientAppId,
                                    long requestId, Throwable ex){
-        final Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
+        Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
         final String errorMsg = unwrapEx.getMessage();
+        if (unwrapEx instanceof PulsarServerException) {
+            unwrapEx = 
FutureUtil.unwrapCompletionException(unwrapEx.getCause());
+        }
         if (unwrapEx instanceof IllegalStateException) {
             // Current broker still hold the bundle's lock, but the bundle is 
being unloading.
             log.info("Failed to lookup {} for topic {} with error {}", 
clientAppId, topicName, errorMsg);
             
lookupFuture.complete(newLookupErrorResponse(ServerError.MetadataError, 
errorMsg, requestId));
-        } else if (unwrapEx instanceof MetadataStoreException){
+        } else if (unwrapEx instanceof MetadataStoreException) {
             // Load bundle ownership or acquire lock failed.
             // Differ with "IllegalStateException", print warning log.
             log.warn("Failed to lookup {} for topic {} with error {}", 
clientAppId, topicName, errorMsg);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index dab4fe9087e..0a4c5b7a318 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -71,6 +71,7 @@ import org.apache.pulsar.broker.loadbalance.ResourceUnit;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
+import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.namespace.OwnedBundle;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
@@ -1208,4 +1209,42 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
             mockZooKeeper.unsetAlwaysFail();
         }
     }
+
+    @Test(timeOut = 30000)
+    public void 
testLookupConnectionNotCloseIfFailedToAcquireOwnershipOfBundle() throws 
Exception {
+        String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(tpName);
+        final var pulsarClientImpl = (PulsarClientImpl) pulsarClient;
+        final var cache = pulsar.getNamespaceService().getOwnershipCache();
+        final var bundle = 
pulsar.getNamespaceService().getBundle(TopicName.get(tpName));
+        final var value = cache.getOwnerAsync(bundle).get().orElse(null);
+        assertNotNull(value);
+
+        cache.invalidateLocalOwnerCache();
+        final var lock = 
pulsar.getCoordinationService().getLockManager(NamespaceEphemeralData.class)
+                .acquireLock(ServiceUnitUtils.path(bundle), new 
NamespaceEphemeralData()).join();
+        lock.updateValue(null);
+        log.info("Updated bundle {} with null", bundle.getBundleRange());
+
+        // wait for the system topic reader to __change_events is closed, 
otherwise the test will be affected
+        Thread.sleep(500);
+
+        final var future = 
pulsarClientImpl.getLookup().getBroker(TopicName.get(tpName));
+        final var cnx = 
pulsarClientImpl.getCnxPool().getConnections().stream().findAny()
+                .map(CompletableFuture::join).orElse(null);
+        assertNotNull(cnx);
+
+        try {
+            future.get();
+            fail();
+        } catch (ExecutionException e) {
+            log.info("getBroker failed with {}: {}", 
e.getCause().getClass().getName(), e.getMessage());
+            assertTrue(e.getCause() instanceof 
PulsarClientException.BrokerMetadataException);
+            assertTrue(cnx.ctx().channel().isActive());
+            lock.updateValue(value);
+            lock.release();
+            assertTrue(e.getMessage().contains("Failed to acquire ownership"));
+            
pulsarClientImpl.getLookup().getBroker(TopicName.get(tpName)).get();
+        }
+    }
 }

Reply via email to