This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 09a17203702 [improve] [broker] Not close the socket if lookup failed 
caused by bundle unloading or metadata ex (#21211)
09a17203702 is described below

commit 09a17203702a032274a56d7d0ff03c9e32b4529f
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 28 00:19:20 2023 +0800

    [improve] [broker] Not close the socket if lookup failed caused by bundle 
unloading or metadata ex (#21211)
    
    ### Motivation
    
    **Background**: The Pulsar client will close the socket if it receives a 
ServiceNotReady error when doing a lookup.
    Closing the socket causes the other consumer or producer to reconnect and 
does not make the lookup more efficient.
    
    There are two cases that should be improved:
    - If the broker gets a metadata read/write error, the broker responds with 
a `ServiceNotReady` error, but it should respond with a `MetadataError`
    - If the topic is unloading, the broker responds with a `ServiceNotReady` 
error.
    
    ### Modifications
    - Respond to the client with a `MetadataError` if the broker gets a 
metadata read/write error.
    - Respond to the client with a `MetadataError` if the topic is unloading
---
 .../pulsar/broker/lookup/TopicLookupBase.java      |  46 ++++-----
 .../pulsar/broker/namespace/ServiceUnitUtils.java  |   2 +-
 .../pulsar/broker/web/PulsarWebResource.java       |   1 +
 .../loadbalance/LeaderElectionServiceTest.java     |   6 +-
 .../pulsar/client/api/BrokerServiceLookupTest.java | 103 +++++++++++++++++++++
 5 files changed, 132 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index bd70201cba5..a8dda145f6b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -26,7 +26,6 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import javax.ws.rs.Encoded;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
@@ -48,6 +47,7 @@ import 
org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -318,35 +318,37 @@ public class TopicLookupBase extends PulsarWebResource {
                                         requestId, 
shouldRedirectThroughServiceUrl(conf, lookupData)));
                             }
                         }).exceptionally(ex -> {
-                    if (ex instanceof CompletionException && ex.getCause() 
instanceof IllegalStateException) {
-                        log.info("Failed to lookup {} for topic {} with error 
{}", clientAppId,
-                                topicName.toString(), 
ex.getCause().getMessage());
-                    } else {
-                        log.warn("Failed to lookup {} for topic {} with error 
{}", clientAppId,
-                                topicName.toString(), ex.getMessage(), ex);
-                    }
-                    lookupfuture.complete(
-                            
newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), 
requestId));
-                    return null;
-                });
+                            handleLookupError(lookupfuture, 
topicName.toString(), clientAppId, requestId, ex);
+                            return null;
+                        });
             }
-
         }).exceptionally(ex -> {
-            if (ex instanceof CompletionException && ex.getCause() instanceof 
IllegalStateException) {
-                log.info("Failed to lookup {} for topic {} with error {}", 
clientAppId, topicName.toString(),
-                        ex.getCause().getMessage());
-            } else {
-                log.warn("Failed to lookup {} for topic {} with error {}", 
clientAppId, topicName.toString(),
-                        ex.getMessage(), ex);
-            }
-
-            
lookupfuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady, 
ex.getMessage(), requestId));
+            handleLookupError(lookupfuture, topicName.toString(), clientAppId, 
requestId, ex);
             return null;
         });
 
         return lookupfuture;
     }
 
+    private static void handleLookupError(CompletableFuture<ByteBuf> 
lookupFuture, String topicName, String clientAppId,
+                                   long requestId, Throwable ex){
+        final Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
+        final String errorMsg = unwrapEx.getMessage();
+        if (unwrapEx instanceof IllegalStateException) {
+            // Current broker still hold the bundle's lock, but the bundle is 
being unloading.
+            log.info("Failed to lookup {} for topic {} with error {}", 
clientAppId, topicName, errorMsg);
+            
lookupFuture.complete(newLookupErrorResponse(ServerError.MetadataError, 
errorMsg, requestId));
+        } else if (unwrapEx instanceof MetadataStoreException){
+            // Load bundle ownership or acquire lock failed.
+            // Differ with "IllegalStateException", print warning log.
+            log.warn("Failed to lookup {} for topic {} with error {}", 
clientAppId, topicName, errorMsg);
+            
lookupFuture.complete(newLookupErrorResponse(ServerError.MetadataError, 
errorMsg, requestId));
+        } else {
+            log.warn("Failed to lookup {} for topic {} with error {}", 
clientAppId, topicName, errorMsg);
+            
lookupFuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady, 
errorMsg, requestId));
+        }
+    }
+
     protected TopicName getTopicName(String topicDomain, String tenant, String 
cluster, String namespace,
             @Encoded String encodedTopic) {
         String decodedName = Codec.decode(encodedTopic);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/ServiceUnitUtils.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/ServiceUnitUtils.java
index c86aac5316f..432aa29798e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/ServiceUnitUtils.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/ServiceUnitUtils.java
@@ -36,7 +36,7 @@ public final class ServiceUnitUtils {
      */
     private static final String OWNER_INFO_ROOT = "/namespace";
 
-    static String path(NamespaceBundle suname) {
+    public static String path(NamespaceBundle suname) {
         // The ephemeral node path for new namespaces should always have 
bundle name appended
         return OWNER_INFO_ROOT + "/" + suname.toString();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index e65ef50c72a..0e25c5ce9e6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -530,6 +530,7 @@ public abstract class PulsarWebResource {
         
pulsar.getPulsarResources().getClusterResources().getClusterAsync(cluster)
                 .whenComplete((clusterDataResult, ex) -> {
                     if (ex != null) {
+                        log.warn("[{}] Load cluster data failed: 
requested={}", clientAppId, cluster);
                         
clusterDataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
                         return;
                     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
index 62faa70bbcb..008897136f8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
@@ -129,10 +129,10 @@ public class LeaderElectionServiceTest {
                     .topic("persistent://" + tenant + "/" + namespace + "/1p")
                     .create();
         } catch (PulsarClientException t) {
-            Assert.assertTrue(t instanceof 
PulsarClientException.LookupException);
+            Assert.assertTrue(t instanceof 
PulsarClientException.BrokerMetadataException
+                    || t instanceof PulsarClientException.LookupException);
             Assert.assertTrue(
-                    t.getMessage().contains(
-                            "java.lang.IllegalStateException: The leader 
election has not yet been completed!"));
+                    t.getMessage().contains("The leader election has not yet 
been completed"));
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 6becc9cb578..a632608bf70 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.handler.codec.http.HttpRequest;
@@ -71,9 +72,13 @@ import 
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.namespace.OwnedBundle;
+import org.apache.pulsar.broker.namespace.OwnershipCache;
+import org.apache.pulsar.broker.namespace.ServiceUnitUtils;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.impl.BinaryProtoLookupService;
+import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -87,6 +92,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.zookeeper.KeeperException;
 import org.asynchttpclient.AsyncCompletionHandler;
 import org.asynchttpclient.AsyncHttpClient;
 import org.asynchttpclient.AsyncHttpClientConfig;
@@ -1105,4 +1111,101 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
             return "invalid";
         }
     }
+
+    @Test
+    public void testLookupConnectionNotCloseIfGetUnloadingExOrMetadataEx() 
throws Exception {
+        String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(tpName);
+        PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
+        Producer<String> producer = 
pulsarClientImpl.newProducer(Schema.STRING).topic(tpName).create();
+        Consumer<String> consumer = 
pulsarClientImpl.newConsumer(Schema.STRING).topic(tpName)
+                .subscriptionName("s1").isAckReceiptEnabled(true).subscribe();
+        LookupService lookupService = pulsarClientImpl.getLookup();
+        assertTrue(lookupService instanceof BinaryProtoLookupService);
+        ClientCnx lookupConnection = 
pulsarClientImpl.getCnxPool().getConnection(lookupService.resolveHost()).join();
+
+        // Verify the socket will not be closed if the bundle is unloading.
+        BundleOfTopic bundleOfTopic = new BundleOfTopic(tpName);
+        bundleOfTopic.setBundleIsUnloading();
+        try {
+            lookupService.getBroker(TopicName.get(tpName)).get();
+            fail("It should failed due to the namespace bundle is unloading.");
+        } catch (Exception ex) {
+            assertTrue(ex.getMessage().contains("is being unloaded"));
+        }
+        // Do unload topic, trigger producer & consumer reconnection.
+        pulsar.getBrokerService().getTopic(tpName, 
false).join().get().close(true);
+        assertTrue(lookupConnection.ctx().channel().isActive());
+        bundleOfTopic.setBundleIsNotUnloading();
+        //  Assert producer & consumer could reconnect successful.
+        producer.send("1");
+        HashSet<String> messagesReceived = new HashSet<>();
+        while (true) {
+            Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+            messagesReceived.add(msg.getValue());
+        }
+        assertTrue(messagesReceived.contains("1"));
+
+        // Verify the socket will not be closed if get a metadata ex.
+        bundleOfTopic.releaseBundleLockAndMakeAcquireFail();
+        try {
+            lookupService.getBroker(TopicName.get(tpName)).get();
+            fail("It should failed due to the acquire bundle lock fail.");
+        } catch (Exception ex) {
+            assertTrue(ex.getMessage().contains("OperationTimeout"));
+        }
+        // Do unload topic, trigger producer & consumer reconnection.
+        pulsar.getBrokerService().getTopic(tpName, 
false).join().get().close(true);
+        assertTrue(lookupConnection.ctx().channel().isActive());
+        bundleOfTopic.makeAcquireBundleLockSuccess();
+        // Assert producer could reconnect successful.
+        producer.send("2");
+        while (true) {
+            Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+            messagesReceived.add(msg.getValue());
+        }
+        assertTrue(messagesReceived.contains("2"));
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(tpName);
+    }
+
+    private class BundleOfTopic {
+
+        private NamespaceBundle namespaceBundle;
+        private OwnershipCache ownershipCache;
+        private AsyncLoadingCache<NamespaceBundle, OwnedBundle> 
ownedBundlesCache;
+
+        public BundleOfTopic(String tpName) {
+            namespaceBundle = 
pulsar.getNamespaceService().getBundle(TopicName.get(tpName));
+            ownershipCache = pulsar.getNamespaceService().getOwnershipCache();
+            ownedBundlesCache = WhiteboxImpl.getInternalState(ownershipCache, 
"ownedBundlesCache");
+        }
+
+        private void setBundleIsUnloading() {
+            ownedBundlesCache.get(namespaceBundle).join().setActive(false);
+        }
+
+        private void setBundleIsNotUnloading() {
+            ownedBundlesCache.get(namespaceBundle).join().setActive(true);
+        }
+
+        private void releaseBundleLockAndMakeAcquireFail() throws Exception {
+            ownedBundlesCache.synchronous().invalidateAll();
+            mockZooKeeper.delete(ServiceUnitUtils.path(namespaceBundle), -1);
+            mockZooKeeper.setAlwaysFail(KeeperException.Code.OPERATIONTIMEOUT);
+        }
+
+        private void makeAcquireBundleLockSuccess() throws Exception {
+            mockZooKeeper.unsetAlwaysFail();
+        }
+    }
 }

Reply via email to