This is an automated email from the ASF dual-hosted git repository. heesung pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b1ad91dd5e0f5878f883593fd3e2faa3e5722f28 Author: Heesung Sohn <[email protected]> AuthorDate: Fri Jun 7 10:12:37 2024 -0700 [fix][broker] Support advertised listeners when gracefully transferring topics (ExtensibleLoadManagerImpl only) (#22862) (cherry picked from commit 5af05951754f526c6066b16dc6e56797f041bca7) --- .../apache/pulsar/broker/service/ServerCnx.java | 41 +++++++--- .../ExtensibleLoadManagerImplBaseTest.java | 14 +++- .../extensions/ExtensibleLoadManagerImplTest.java | 52 ++++++++++-- ...LoadManagerImplWithAdvertisedListenersTest.java | 92 ++++++++++++++++++++++ ...dManagerImplWithTransactionCoordinatorTest.java | 4 +- 5 files changed, 181 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index c7d68a30e72..e50e5c3cd1b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -28,7 +28,6 @@ import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.getMigratedClusterUrl; import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; -import static org.apache.pulsar.common.protocol.Commands.newCloseConsumer; import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; @@ -70,6 +69,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.TransactionMetadataStoreService; @@ -81,6 +81,7 @@ import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.limiter.ConnectionController; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; @@ -146,6 +147,7 @@ import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.intercept.InterceptException; +import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.Metadata; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; @@ -3116,15 +3118,28 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { closeProducer(producer.getProducerId(), producer.getEpoch(), assignedBrokerLookupData); } + private LookupData getLookupData(BrokerLookupData lookupData) { + LookupOptions.LookupOptionsBuilder builder = LookupOptions.builder(); + if (StringUtils.isNotBlank((listenerName))) { + builder.advertisedListenerName(listenerName); + } + try { + return lookupData.toLookupResult(builder.build()).getLookupData(); + } catch (PulsarServerException e) { + log.error("Failed to get lookup data", e); + throw new RuntimeException(e); + } + } + private void closeProducer(long producerId, long epoch, Optional<BrokerLookupData> assignedBrokerLookupData) { if (getRemoteEndpointProtocolVersion() >= v5.getValue()) { - if (assignedBrokerLookupData.isPresent()) { - writeAndFlush(Commands.newCloseProducer(producerId, -1L, - assignedBrokerLookupData.get().pulsarServiceUrl(), - assignedBrokerLookupData.get().pulsarServiceUrlTls())); - } else { - writeAndFlush(Commands.newCloseProducer(producerId, -1L)); - } + assignedBrokerLookupData.ifPresentOrElse(lookup -> { + LookupData lookupData = getLookupData(lookup); + writeAndFlush(Commands.newCloseProducer(producerId, -1L, + lookupData.getBrokerUrl(), + lookupData.getBrokerUrlTls())); + }, + () -> writeAndFlush(Commands.newCloseProducer(producerId, -1L))); // The client does not necessarily know that the producer is closed, but the connection is still // active, and there could be messages in flight already. We want to ignore these messages for a time @@ -3150,9 +3165,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private void closeConsumer(long consumerId, Optional<BrokerLookupData> assignedBrokerLookupData) { if (getRemoteEndpointProtocolVersion() >= v5.getValue()) { - writeAndFlush(newCloseConsumer(consumerId, -1L, - assignedBrokerLookupData.map(BrokerLookupData::pulsarServiceUrl).orElse(null), - assignedBrokerLookupData.map(BrokerLookupData::pulsarServiceUrlTls).orElse(null))); + assignedBrokerLookupData.ifPresentOrElse(lookup -> { + LookupData lookupData = getLookupData(lookup); + writeAndFlush(Commands.newCloseConsumer(consumerId, -1L, + lookupData.getBrokerUrl(), + lookupData.getBrokerUrlTls())); + }, + () -> writeAndFlush(Commands.newCloseConsumer(consumerId, -1L, null, null))); } else { close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java index a77af54daa3..3b611ef62c4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java @@ -63,7 +63,14 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ this.defaultTestNamespace = defaultTestNamespace; } - protected ServiceConfiguration initConfig(ServiceConfiguration conf) { + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + updateConfig(conf); + } + + + protected ServiceConfiguration updateConfig(ServiceConfiguration conf) { conf.setForceDeleteNamespaceAllowed(true); conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); @@ -75,10 +82,9 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ @Override @BeforeClass(alwaysRun = true) protected void setup() throws Exception { - initConfig(conf); super.internalSetup(conf); pulsar1 = pulsar; - var conf2 = initConfig(getDefaultConf()); + var conf2 = updateConfig(getDefaultConf()); additionalPulsarTestContext = createAdditionalPulsarTestContext(conf2); pulsar2 = additionalPulsarTestContext.getPulsarService(); @@ -141,7 +147,7 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true); } - protected CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsar, TopicName topic) { + protected static CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsar, TopicName topic) { return pulsar.getNamespaceService().getBundleAsync(topic); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index e4ab71ba2a6..c85eeed0e22 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -109,6 +109,7 @@ import org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener; import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -390,6 +391,19 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase @Test(timeOut = 30_000, dataProvider = "isPersistentTopicSubscriptionTypeTest") public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, SubscriptionType subscriptionType) throws Exception { + testTransferClientReconnectionWithoutLookup(topicDomain, subscriptionType, defaultTestNamespace, admin, + lookupUrl.toString(), pulsar1, pulsar2, primaryLoadManager, secondaryLoadManager); + } + + @Test(enabled = false) + public static void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, + SubscriptionType subscriptionType, + String defaultTestNamespace, + PulsarAdmin admin, String brokerServiceUrl, + PulsarService pulsar1, PulsarService pulsar2, + ExtensibleLoadManager primaryLoadManager, + ExtensibleLoadManager secondaryLoadManager) + throws Exception { var id = String.format("test-tx-client-reconnect-%s-%s", subscriptionType, UUID.randomUUID()); var topic = String.format("%s://%s/%s", topicDomain.toString(), defaultTestNamespace, id); var topicName = TopicName.get(topic); @@ -399,7 +413,8 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase var consumers = new ArrayList<Consumer<String>>(); try { var lookups = new ArrayList<LookupService>(); - + var pulsarClient = pulsarClient(brokerServiceUrl, 0); + clients.add(pulsarClient); @Cleanup var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); lookups.add(spyLookupService(pulsarClient)); @@ -407,7 +422,7 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase var consumerCount = subscriptionType == SubscriptionType.Exclusive ? 1 : 3; for (int i = 0; i < consumerCount; i++) { - var client = newPulsarClient(lookupUrl.toString(), 0); + var client = pulsarClient(brokerServiceUrl, 0); clients.add(client); var consumer = client.newConsumer(Schema.STRING). subscriptionName(id). @@ -434,7 +449,7 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase dstBrokerUrl = pulsar1.getBrokerId(); dstBrokerServiceUrl = pulsar1.getBrokerServiceUrl(); } - checkOwnershipState(broker, bundle); + checkOwnershipState(broker, bundle, primaryLoadManager, secondaryLoadManager, pulsar1); var messageCountBeforeUnloading = 100; var messageCountAfterUnloading = 100; @@ -528,6 +543,17 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicSubscriptionTypeTest") public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain, SubscriptionType subscriptionType) throws Exception { + testUnloadClientReconnectionWithLookup(topicDomain, subscriptionType, defaultTestNamespace, admin, + lookupUrl.toString(), pulsar1); + } + + @Test(enabled = false) + public static void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain, + SubscriptionType subscriptionType, + String defaultTestNamespace, + PulsarAdmin admin, + String brokerServiceUrl, + PulsarService pulsar1) throws Exception { var id = String.format("test-unload-%s-client-reconnect-%s-%s", topicDomain, subscriptionType, UUID.randomUUID()); var topic = String.format("%s://%s/%s", topicDomain, defaultTestNamespace, id); @@ -536,6 +562,7 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase var consumers = new ArrayList<Consumer<String>>(); try { @Cleanup + var pulsarClient = pulsarClient(brokerServiceUrl, 0); var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); var consumerCount = subscriptionType == SubscriptionType.Exclusive ? 1 : 3; @@ -600,13 +627,16 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase } } - private LookupService spyLookupService(PulsarClient client) throws IllegalAccessException { + private static LookupService spyLookupService(PulsarClient client) throws IllegalAccessException { LookupService svc = (LookupService) FieldUtils.readDeclaredField(client, "lookup", true); var lookup = spy(svc); FieldUtils.writeDeclaredField(client, "lookup", lookup, true); return lookup; } - private void checkOwnershipState(String broker, NamespaceBundle bundle) + + protected static void checkOwnershipState(String broker, NamespaceBundle bundle, + ExtensibleLoadManager primaryLoadManager, + ExtensibleLoadManager secondaryLoadManager, PulsarService pulsar1) throws ExecutionException, InterruptedException { var targetLoadManager = secondaryLoadManager; var otherLoadManager = primaryLoadManager; @@ -618,6 +648,11 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase assertFalse(otherLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); } + protected void checkOwnershipState(String broker, NamespaceBundle bundle) + throws ExecutionException, InterruptedException { + checkOwnershipState(broker, bundle, primaryLoadManager, secondaryLoadManager, pulsar1); + } + @Test(timeOut = 30 * 1000) public void testSplitBundleAdminAPI() throws Exception { final String namespace = "public/testSplitBundleAdminAPI"; @@ -1576,4 +1611,11 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase } + protected static PulsarClient pulsarClient(String url, int intervalInSecs) throws PulsarClientException { + return + PulsarClient.builder() + .serviceUrl(url) + .statsInterval(intervalInSecs, TimeUnit.SECONDS).build(); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java new file mode 100644 index 00000000000..087aefa1cfc --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.naming.TopicDomain; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Unit test for {@link ExtensibleLoadManagerImpl with AdvertisedListeners broker configs}. + */ +@Slf4j +@Test(groups = "flaky") +@SuppressWarnings("unchecked") +public class ExtensibleLoadManagerImplWithAdvertisedListenersTest extends ExtensibleLoadManagerImplBaseTest { + + public String brokerServiceUrl; + public ExtensibleLoadManagerImplWithAdvertisedListenersTest() { + super("public/test"); + } + + @Override + protected ServiceConfiguration updateConfig(ServiceConfiguration conf) { + super.updateConfig(conf); + int privatePulsarPort = nextLockedFreePort(); + int publicPulsarPort = nextLockedFreePort(); + conf.setInternalListenerName("internal"); + conf.setBindAddresses("external:pulsar://localhost:" + publicPulsarPort); + conf.setAdvertisedListeners( + "external:pulsar://localhost:" + publicPulsarPort + + ",internal:pulsar://localhost:" + privatePulsarPort); + conf.setWebServicePortTls(Optional.empty()); + conf.setBrokerServicePortTls(Optional.empty()); + conf.setBrokerServicePort(Optional.of(privatePulsarPort)); + conf.setWebServicePort(Optional.of(0)); + brokerServiceUrl = conf.getBindAddresses().replaceAll("external:", ""); + return conf; + } + + @DataProvider(name = "isPersistentTopicSubscriptionTypeTest") + public Object[][] isPersistentTopicSubscriptionTypeTest() { + return new Object[][]{ + {TopicDomain.non_persistent, SubscriptionType.Exclusive}, + {TopicDomain.persistent, SubscriptionType.Key_Shared} + }; + } + + @Test(timeOut = 30_000, dataProvider = "isPersistentTopicSubscriptionTypeTest") + public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, SubscriptionType subscriptionType) + throws Exception { + ExtensibleLoadManagerImplTest.testTransferClientReconnectionWithoutLookup(topicDomain, subscriptionType, + defaultTestNamespace, admin, + brokerServiceUrl, + pulsar1, pulsar2, primaryLoadManager, secondaryLoadManager); + } + + @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicSubscriptionTypeTest") + public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain, + SubscriptionType subscriptionType) throws Exception { + ExtensibleLoadManagerImplTest.testUnloadClientReconnectionWithLookup(topicDomain, subscriptionType, + defaultTestNamespace, admin, + brokerServiceUrl, + pulsar1); + } + + @DataProvider(name = "isPersistentTopicTest") + public Object[][] isPersistentTopicTest() { + return new Object[][]{{TopicDomain.persistent}, {TopicDomain.non_persistent}}; + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java index 0c95dd85f28..ed99b502b7e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java @@ -31,8 +31,8 @@ public class ExtensibleLoadManagerImplWithTransactionCoordinatorTest extends Ext } @Override - protected ServiceConfiguration initConfig(ServiceConfiguration conf) { - conf = super.initConfig(conf); + protected ServiceConfiguration updateConfig(ServiceConfiguration conf) { + conf = super.updateConfig(conf); conf.setTransactionCoordinatorEnabled(true); return conf; }
