This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b1ad91dd5e0f5878f883593fd3e2faa3e5722f28
Author: Heesung Sohn <[email protected]>
AuthorDate: Fri Jun 7 10:12:37 2024 -0700

    [fix][broker] Support advertised listeners when gracefully transferring 
topics (ExtensibleLoadManagerImpl only) (#22862)
    
    (cherry picked from commit 5af05951754f526c6066b16dc6e56797f041bca7)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 41 +++++++---
 .../ExtensibleLoadManagerImplBaseTest.java         | 14 +++-
 .../extensions/ExtensibleLoadManagerImplTest.java  | 52 ++++++++++--
 ...LoadManagerImplWithAdvertisedListenersTest.java | 92 ++++++++++++++++++++++
 ...dManagerImplWithTransactionCoordinatorTest.java |  4 +-
 5 files changed, 181 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c7d68a30e72..e50e5c3cd1b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -28,7 +28,6 @@ import static 
org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
 import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.getMigratedClusterUrl;
 import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
-import static org.apache.pulsar.common.protocol.Commands.newCloseConsumer;
 import static 
org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
@@ -70,6 +69,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
@@ -81,6 +81,7 @@ import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.limiter.ConnectionController;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.namespace.LookupOptions;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -146,6 +147,7 @@ import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.intercept.InterceptException;
+import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.Metadata;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
@@ -3116,15 +3118,28 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         closeProducer(producer.getProducerId(), producer.getEpoch(), 
assignedBrokerLookupData);
     }
 
+    private LookupData getLookupData(BrokerLookupData lookupData) {
+        LookupOptions.LookupOptionsBuilder builder = LookupOptions.builder();
+        if (StringUtils.isNotBlank((listenerName))) {
+            builder.advertisedListenerName(listenerName);
+        }
+        try {
+            return lookupData.toLookupResult(builder.build()).getLookupData();
+        } catch (PulsarServerException e) {
+            log.error("Failed to get lookup data", e);
+            throw new RuntimeException(e);
+        }
+    }
+
     private void closeProducer(long producerId, long epoch, 
Optional<BrokerLookupData> assignedBrokerLookupData) {
         if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
-            if (assignedBrokerLookupData.isPresent()) {
-                writeAndFlush(Commands.newCloseProducer(producerId, -1L,
-                        assignedBrokerLookupData.get().pulsarServiceUrl(),
-                        assignedBrokerLookupData.get().pulsarServiceUrlTls()));
-            } else {
-                writeAndFlush(Commands.newCloseProducer(producerId, -1L));
-            }
+            assignedBrokerLookupData.ifPresentOrElse(lookup -> {
+                        LookupData lookupData = getLookupData(lookup);
+                        writeAndFlush(Commands.newCloseProducer(producerId, 
-1L,
+                                lookupData.getBrokerUrl(),
+                                lookupData.getBrokerUrlTls()));
+                    },
+                    () -> writeAndFlush(Commands.newCloseProducer(producerId, 
-1L)));
 
             // The client does not necessarily know that the producer is 
closed, but the connection is still
             // active, and there could be messages in flight already. We want 
to ignore these messages for a time
@@ -3150,9 +3165,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     private void closeConsumer(long consumerId, Optional<BrokerLookupData> 
assignedBrokerLookupData) {
         if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
-            writeAndFlush(newCloseConsumer(consumerId, -1L,
-                    
assignedBrokerLookupData.map(BrokerLookupData::pulsarServiceUrl).orElse(null),
-                    
assignedBrokerLookupData.map(BrokerLookupData::pulsarServiceUrlTls).orElse(null)));
+            assignedBrokerLookupData.ifPresentOrElse(lookup -> {
+                        LookupData lookupData = getLookupData(lookup);
+                        writeAndFlush(Commands.newCloseConsumer(consumerId, 
-1L,
+                                lookupData.getBrokerUrl(),
+                                lookupData.getBrokerUrlTls()));
+                    },
+                    () -> writeAndFlush(Commands.newCloseConsumer(consumerId, 
-1L, null, null)));
         } else {
             close();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
index a77af54daa3..3b611ef62c4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -63,7 +63,14 @@ public abstract class ExtensibleLoadManagerImplBaseTest 
extends MockedPulsarServ
         this.defaultTestNamespace = defaultTestNamespace;
     }
 
-    protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        updateConfig(conf);
+    }
+
+
+    protected ServiceConfiguration updateConfig(ServiceConfiguration conf) {
         conf.setForceDeleteNamespaceAllowed(true);
         
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
         
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
@@ -75,10 +82,9 @@ public abstract class ExtensibleLoadManagerImplBaseTest 
extends MockedPulsarServ
     @Override
     @BeforeClass(alwaysRun = true)
     protected void setup() throws Exception {
-        initConfig(conf);
         super.internalSetup(conf);
         pulsar1 = pulsar;
-        var conf2 = initConfig(getDefaultConf());
+        var conf2 = updateConfig(getDefaultConf());
         additionalPulsarTestContext = createAdditionalPulsarTestContext(conf2);
         pulsar2 = additionalPulsarTestContext.getPulsarService();
 
@@ -141,7 +147,7 @@ public abstract class ExtensibleLoadManagerImplBaseTest 
extends MockedPulsarServ
                 FieldUtils.readField(secondaryLoadManager, 
"serviceUnitStateChannel", true);
     }
 
-    protected CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService 
pulsar, TopicName topic) {
+    protected static CompletableFuture<NamespaceBundle> 
getBundleAsync(PulsarService pulsar, TopicName topic) {
         return pulsar.getNamespaceService().getBundleAsync(topic);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index e4ab71ba2a6..c85eeed0e22 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -109,6 +109,7 @@ import 
org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener;
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -390,6 +391,19 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
     @Test(timeOut = 30_000, dataProvider = 
"isPersistentTopicSubscriptionTypeTest")
     public void testTransferClientReconnectionWithoutLookup(TopicDomain 
topicDomain, SubscriptionType subscriptionType)
             throws Exception {
+        testTransferClientReconnectionWithoutLookup(topicDomain, 
subscriptionType, defaultTestNamespace, admin,
+                lookupUrl.toString(), pulsar1, pulsar2, primaryLoadManager, 
secondaryLoadManager);
+    }
+
+    @Test(enabled = false)
+    public static void testTransferClientReconnectionWithoutLookup(TopicDomain 
topicDomain,
+                                                                   
SubscriptionType subscriptionType,
+                                                                   String 
defaultTestNamespace,
+                                                                   PulsarAdmin 
admin, String brokerServiceUrl,
+                                                                   
PulsarService pulsar1, PulsarService pulsar2,
+                                                                   
ExtensibleLoadManager primaryLoadManager,
+                                                                   
ExtensibleLoadManager secondaryLoadManager)
+            throws Exception {
         var id = String.format("test-tx-client-reconnect-%s-%s", 
subscriptionType, UUID.randomUUID());
         var topic = String.format("%s://%s/%s", topicDomain.toString(), 
defaultTestNamespace, id);
         var topicName = TopicName.get(topic);
@@ -399,7 +413,8 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         var consumers = new ArrayList<Consumer<String>>();
         try {
             var lookups = new ArrayList<LookupService>();
-
+            var pulsarClient = pulsarClient(brokerServiceUrl, 0);
+            clients.add(pulsarClient);
             @Cleanup
             var producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
             lookups.add(spyLookupService(pulsarClient));
@@ -407,7 +422,7 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
             var consumerCount = subscriptionType == SubscriptionType.Exclusive 
? 1 : 3;
 
             for (int i = 0; i < consumerCount; i++) {
-                var client = newPulsarClient(lookupUrl.toString(), 0);
+                var client = pulsarClient(brokerServiceUrl, 0);
                 clients.add(client);
                 var consumer = client.newConsumer(Schema.STRING).
                         subscriptionName(id).
@@ -434,7 +449,7 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
                 dstBrokerUrl = pulsar1.getBrokerId();
                 dstBrokerServiceUrl = pulsar1.getBrokerServiceUrl();
             }
-            checkOwnershipState(broker, bundle);
+            checkOwnershipState(broker, bundle, primaryLoadManager, 
secondaryLoadManager, pulsar1);
 
             var messageCountBeforeUnloading = 100;
             var messageCountAfterUnloading = 100;
@@ -528,6 +543,17 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
     @Test(timeOut = 30 * 1000, dataProvider = 
"isPersistentTopicSubscriptionTypeTest")
     public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain,
                                                        SubscriptionType 
subscriptionType) throws Exception {
+        testUnloadClientReconnectionWithLookup(topicDomain, subscriptionType, 
defaultTestNamespace, admin,
+                lookupUrl.toString(), pulsar1);
+    }
+
+    @Test(enabled = false)
+    public static void testUnloadClientReconnectionWithLookup(TopicDomain 
topicDomain,
+                                                              SubscriptionType 
subscriptionType,
+                                                              String 
defaultTestNamespace,
+                                                              PulsarAdmin 
admin,
+                                                              String 
brokerServiceUrl,
+                                                              PulsarService 
pulsar1) throws Exception {
         var id = String.format("test-unload-%s-client-reconnect-%s-%s",
                 topicDomain, subscriptionType, UUID.randomUUID());
         var topic = String.format("%s://%s/%s", topicDomain, 
defaultTestNamespace, id);
@@ -536,6 +562,7 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         var consumers = new ArrayList<Consumer<String>>();
         try {
             @Cleanup
+            var pulsarClient = pulsarClient(brokerServiceUrl, 0);
             var producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
 
             var consumerCount = subscriptionType == SubscriptionType.Exclusive 
? 1 : 3;
@@ -600,13 +627,16 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         }
     }
 
-    private LookupService spyLookupService(PulsarClient client) throws 
IllegalAccessException {
+    private static LookupService spyLookupService(PulsarClient client) throws 
IllegalAccessException {
         LookupService svc = (LookupService) 
FieldUtils.readDeclaredField(client, "lookup", true);
         var lookup = spy(svc);
         FieldUtils.writeDeclaredField(client, "lookup", lookup, true);
         return lookup;
     }
-    private void checkOwnershipState(String broker, NamespaceBundle bundle)
+
+    protected static void checkOwnershipState(String broker, NamespaceBundle 
bundle,
+                                              ExtensibleLoadManager 
primaryLoadManager,
+                                              ExtensibleLoadManager 
secondaryLoadManager, PulsarService pulsar1)
             throws ExecutionException, InterruptedException {
         var targetLoadManager = secondaryLoadManager;
         var otherLoadManager = primaryLoadManager;
@@ -618,6 +648,11 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         assertFalse(otherLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
     }
 
+    protected void checkOwnershipState(String broker, NamespaceBundle bundle)
+            throws ExecutionException, InterruptedException {
+        checkOwnershipState(broker, bundle, primaryLoadManager, 
secondaryLoadManager, pulsar1);
+    }
+
     @Test(timeOut = 30 * 1000)
     public void testSplitBundleAdminAPI() throws Exception {
         final String namespace = "public/testSplitBundleAdminAPI";
@@ -1576,4 +1611,11 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
 
     }
 
+    protected static PulsarClient pulsarClient(String url, int intervalInSecs) 
throws PulsarClientException {
+        return
+                PulsarClient.builder()
+                        .serviceUrl(url)
+                        .statsInterval(intervalInSecs, 
TimeUnit.SECONDS).build();
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
new file mode 100644
index 00000000000..087aefa1cfc
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test for {@link ExtensibleLoadManagerImpl with AdvertisedListeners 
broker configs}.
+ */
+@Slf4j
+@Test(groups = "flaky")
+@SuppressWarnings("unchecked")
+public class ExtensibleLoadManagerImplWithAdvertisedListenersTest extends 
ExtensibleLoadManagerImplBaseTest {
+
+    public String brokerServiceUrl;
+    public ExtensibleLoadManagerImplWithAdvertisedListenersTest() {
+        super("public/test");
+    }
+
+    @Override
+    protected ServiceConfiguration updateConfig(ServiceConfiguration conf) {
+        super.updateConfig(conf);
+        int privatePulsarPort = nextLockedFreePort();
+        int publicPulsarPort = nextLockedFreePort();
+        conf.setInternalListenerName("internal");
+        conf.setBindAddresses("external:pulsar://localhost:" + 
publicPulsarPort);
+        conf.setAdvertisedListeners(
+                "external:pulsar://localhost:" + publicPulsarPort +
+                        ",internal:pulsar://localhost:" + privatePulsarPort);
+        conf.setWebServicePortTls(Optional.empty());
+        conf.setBrokerServicePortTls(Optional.empty());
+        conf.setBrokerServicePort(Optional.of(privatePulsarPort));
+        conf.setWebServicePort(Optional.of(0));
+        brokerServiceUrl = conf.getBindAddresses().replaceAll("external:", "");
+        return conf;
+    }
+
+    @DataProvider(name = "isPersistentTopicSubscriptionTypeTest")
+    public Object[][] isPersistentTopicSubscriptionTypeTest() {
+        return new Object[][]{
+                {TopicDomain.non_persistent, SubscriptionType.Exclusive},
+                {TopicDomain.persistent, SubscriptionType.Key_Shared}
+        };
+    }
+
+    @Test(timeOut = 30_000, dataProvider = 
"isPersistentTopicSubscriptionTypeTest")
+    public void testTransferClientReconnectionWithoutLookup(TopicDomain 
topicDomain, SubscriptionType subscriptionType)
+            throws Exception {
+        
ExtensibleLoadManagerImplTest.testTransferClientReconnectionWithoutLookup(topicDomain,
 subscriptionType,
+                defaultTestNamespace, admin,
+                brokerServiceUrl,
+                pulsar1, pulsar2, primaryLoadManager, secondaryLoadManager);
+    }
+
+    @Test(timeOut = 30 * 1000, dataProvider = 
"isPersistentTopicSubscriptionTypeTest")
+    public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain,
+                                                       SubscriptionType 
subscriptionType) throws Exception {
+        
ExtensibleLoadManagerImplTest.testUnloadClientReconnectionWithLookup(topicDomain,
 subscriptionType,
+                defaultTestNamespace, admin,
+                brokerServiceUrl,
+                pulsar1);
+    }
+
+    @DataProvider(name = "isPersistentTopicTest")
+    public Object[][] isPersistentTopicTest() {
+        return new Object[][]{{TopicDomain.persistent}, 
{TopicDomain.non_persistent}};
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java
index 0c95dd85f28..ed99b502b7e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java
@@ -31,8 +31,8 @@ public class 
ExtensibleLoadManagerImplWithTransactionCoordinatorTest extends Ext
     }
 
     @Override
-    protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
-        conf = super.initConfig(conf);
+    protected ServiceConfiguration updateConfig(ServiceConfiguration conf) {
+        conf = super.updateConfig(conf);
         conf.setTransactionCoordinatorEnabled(true);
         return conf;
     }

Reply via email to