This is an automated email from the ASF dual-hosted git repository. heesung pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 094742d5fa6 [fix][broker] Do not migrate internal topics during Blue-Green Migration when ExtensibleLoadBalancer is used (#22478) 094742d5fa6 is described below commit 094742d5fa6f07b5ceed581876c45564fa0379bd Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com> AuthorDate: Thu Apr 11 16:21:45 2024 -0700 [fix][broker] Do not migrate internal topics during Blue-Green Migration when ExtensibleLoadBalancer is used (#22478) --- .../apache/pulsar/PulsarClusterMetadataSetup.java | 4 +- .../extensions/ExtensibleLoadManagerImpl.java | 4 +- .../channel/ServiceUnitStateChannelImpl.java | 4 ++ .../pulsar/broker/service/BrokerService.java | 3 ++ .../service/nonpersistent/NonPersistentTopic.java | 4 ++ .../broker/service/persistent/PersistentTopic.java | 4 ++ .../broker/service/ClusterMigrationTest.java | 47 +++++++++++++++++----- 7 files changed, 57 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java index e8efeabcdd3..d5b8df43a47 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java @@ -358,8 +358,8 @@ public class PulsarClusterMetadataSetup { log.info("Cluster metadata for '{}' setup correctly", arguments.cluster); } - static void createTenantIfAbsent(PulsarResources resources, String tenant, String cluster) throws IOException, - InterruptedException, ExecutionException { + public static void createTenantIfAbsent(PulsarResources resources, String tenant, String cluster) + throws IOException, InterruptedException, ExecutionException { TenantResources tenantResources = resources.getTenantResources(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index c35dc11d7ef..0c9448ab69c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -825,11 +825,11 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { boolean becameFollower = false; while (!Thread.currentThread().isInterrupted()) { try { + initWaiter.await(); if (!serviceUnitStateChannel.isChannelOwner()) { becameFollower = true; break; } - initWaiter.await(); // Confirm the system topics have been created or create them if they do not exist. // If the leader has changed, the new leader need to reset // the local brokerService.topics (by this topic creations). @@ -875,11 +875,11 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { boolean becameLeader = false; while (!Thread.currentThread().isInterrupted()) { try { + initWaiter.await(); if (serviceUnitStateChannel.isChannelOwner()) { becameLeader = true; break; } - initWaiter.await(); unloadScheduler.close(); serviceUnitStateChannel.cancelOwnershipMonitor(); brokerLoadDataStore.init(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 71ddb3acb28..68b38080e73 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -293,6 +293,10 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { log.info("Closed the channel producer."); } } + + PulsarClusterMetadataSetup.createTenantIfAbsent + (pulsar.getPulsarResources(), SYSTEM_NAMESPACE.getTenant(), config.getClusterName()); + PulsarClusterMetadataSetup.createNamespaceIfAbsent (pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 549dfef896c..b4d0f38b4a4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1785,6 +1785,9 @@ public class BrokerService implements Closeable { } private CompletableFuture<Void> checkTopicAlreadyMigrated(TopicName topicName) { + if (ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) { + return CompletableFuture.completedFuture(null); + } CompletableFuture<Void> result = new CompletableFuture<>(); AbstractTopic.isClusterMigrationEnabled(pulsar, topicName.toString()).handle((isMigrated, ex) -> { if (isMigrated) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 88f8c698950..0ac06d6883f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -996,6 +996,10 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol @Override public CompletableFuture<Void> checkClusterMigration() { + if (ExtensibleLoadManagerImpl.isInternalTopic(topic)) { + return CompletableFuture.completedFuture(null); + } + Optional<ClusterUrl> url = getMigratedClusterUrl(); if (url.isPresent()) { this.migrated = true; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index b21cd165402..3ceecd7f4aa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2740,6 +2740,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Override public CompletableFuture<Void> checkClusterMigration() { + if (ExtensibleLoadManagerImpl.isInternalTopic(topic)) { + return CompletableFuture.completedFuture(null); + } + Optional<ClusterUrl> clusterUrl = getMigratedClusterUrl(); if (!clusterUrl.isPresent()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java index 7bd82cdd840..20e13023cac 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -35,6 +35,8 @@ import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; @@ -53,6 +55,7 @@ import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; import org.testng.annotations.Test; @Test(groups = "cluster-migration") @@ -86,6 +89,8 @@ public class ClusterMigrationTest { PulsarService pulsar4; PulsarAdmin admin4; + String loadManagerClassName; + @DataProvider(name="NamespaceMigrationTopicSubscriptionTypes") public Object[][] namespaceMigrationSubscriptionTypes() { return new Object[][] { @@ -95,15 +100,28 @@ public class ClusterMigrationTest { }; } + @DataProvider(name = "loadManagerClassName") + public static Object[][] loadManagerClassName() { + return new Object[][]{ + {ModularLoadManagerImpl.class.getName()}, + {ExtensibleLoadManagerImpl.class.getName()} + }; + } + + @Factory(dataProvider = "loadManagerClassName") + public ClusterMigrationTest(String loadManagerClassName) { + this.loadManagerClassName = loadManagerClassName; + } + @BeforeMethod(alwaysRun = true, timeOut = 300000) public void setup() throws Exception { log.info("--- Starting ReplicatorTestBase::setup ---"); - broker1 = new TestBroker("r1"); - broker2 = new TestBroker("r2"); - broker3 = new TestBroker("r3"); - broker4 = new TestBroker("r4"); + broker1 = new TestBroker("r1", loadManagerClassName); + broker2 = new TestBroker("r2", loadManagerClassName); + broker3 = new TestBroker("r3", loadManagerClassName); + broker4 = new TestBroker("r4", loadManagerClassName); pulsar1 = broker1.getPulsarService(); url1 = new URL(pulsar1.getWebServiceAddress()); @@ -163,9 +181,9 @@ public class ClusterMigrationTest { .brokerServiceUrlTls(pulsar4.getBrokerServiceUrlTls()).build()); // Setting r3 as replication cluster for r1 - admin1.tenants().createTenant("pulsar", + updateTenantInfo(admin1, "pulsar", new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r3"))); - admin3.tenants().createTenant("pulsar", + updateTenantInfo(admin3, "pulsar", new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r3"))); admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1", "r3")); admin3.namespaces().createNamespace(namespace); @@ -175,9 +193,9 @@ public class ClusterMigrationTest { admin1.namespaces().setNamespaceReplicationClusters(namespaceNotToMigrate, Sets.newHashSet("r1", "r3")); // Setting r4 as replication cluster for r2 - admin2.tenants().createTenant("pulsar", + updateTenantInfo(admin2, "pulsar", new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r2", "r4"))); - admin4.tenants().createTenant("pulsar", + updateTenantInfo(admin4,"pulsar", new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r2", "r4"))); admin2.namespaces().createNamespace(namespace, Sets.newHashSet("r2", "r4")); admin4.namespaces().createNamespace(namespace); @@ -200,6 +218,14 @@ public class ClusterMigrationTest { } + protected void updateTenantInfo(PulsarAdmin admin, String tenant, TenantInfoImpl tenantInfo) throws Exception { + if (!admin.tenants().getTenants().contains(tenant)) { + admin.tenants().createTenant(tenant, tenantInfo); + } else { + admin.tenants().updateTenant(tenant, tenantInfo); + } + } + @AfterMethod(alwaysRun = true, timeOut = 300000) protected void cleanup() throws Exception { log.info("--- Shutting down ---"); @@ -1059,9 +1085,11 @@ public class ClusterMigrationTest { static class TestBroker extends MockedPulsarServiceBaseTest { private String clusterName; + private String loadManagerClassName; - public TestBroker(String clusterName) throws Exception { + public TestBroker(String clusterName, String loadManagerClassName) throws Exception { this.clusterName = clusterName; + this.loadManagerClassName = loadManagerClassName; setup(); } @@ -1073,6 +1101,7 @@ public class ClusterMigrationTest { @Override protected void doInitConf() throws Exception { super.doInitConf(); + this.conf.setLoadManagerClassName(loadManagerClassName); this.conf.setWebServicePortTls(Optional.of(0)); this.conf.setBrokerServicePortTls(Optional.of(0)); }