This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 094742d5fa6 [fix][broker] Do not migrate internal topics during 
Blue-Green Migration when ExtensibleLoadBalancer is used (#22478)
094742d5fa6 is described below

commit 094742d5fa6f07b5ceed581876c45564fa0379bd
Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com>
AuthorDate: Thu Apr 11 16:21:45 2024 -0700

    [fix][broker] Do not migrate internal topics during Blue-Green Migration 
when ExtensibleLoadBalancer is used (#22478)
---
 .../apache/pulsar/PulsarClusterMetadataSetup.java  |  4 +-
 .../extensions/ExtensibleLoadManagerImpl.java      |  4 +-
 .../channel/ServiceUnitStateChannelImpl.java       |  4 ++
 .../pulsar/broker/service/BrokerService.java       |  3 ++
 .../service/nonpersistent/NonPersistentTopic.java  |  4 ++
 .../broker/service/persistent/PersistentTopic.java |  4 ++
 .../broker/service/ClusterMigrationTest.java       | 47 +++++++++++++++++-----
 7 files changed, 57 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index e8efeabcdd3..d5b8df43a47 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -358,8 +358,8 @@ public class PulsarClusterMetadataSetup {
         log.info("Cluster metadata for '{}' setup correctly", 
arguments.cluster);
     }
 
-    static void createTenantIfAbsent(PulsarResources resources, String tenant, 
String cluster) throws IOException,
-            InterruptedException, ExecutionException {
+    public static void createTenantIfAbsent(PulsarResources resources, String 
tenant, String cluster)
+            throws IOException, InterruptedException, ExecutionException {
 
         TenantResources tenantResources = resources.getTenantResources();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index c35dc11d7ef..0c9448ab69c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -825,11 +825,11 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         boolean becameFollower = false;
         while (!Thread.currentThread().isInterrupted()) {
             try {
+                initWaiter.await();
                 if (!serviceUnitStateChannel.isChannelOwner()) {
                     becameFollower = true;
                     break;
                 }
-                initWaiter.await();
                 // Confirm the system topics have been created or create them 
if they do not exist.
                 // If the leader has changed, the new leader need to reset
                 // the local brokerService.topics (by this topic creations).
@@ -875,11 +875,11 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         boolean becameLeader = false;
         while (!Thread.currentThread().isInterrupted()) {
             try {
+                initWaiter.await();
                 if (serviceUnitStateChannel.isChannelOwner()) {
                     becameLeader = true;
                     break;
                 }
-                initWaiter.await();
                 unloadScheduler.close();
                 serviceUnitStateChannel.cancelOwnershipMonitor();
                 brokerLoadDataStore.init();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 71ddb3acb28..68b38080e73 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -293,6 +293,10 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                     log.info("Closed the channel producer.");
                 }
             }
+
+            PulsarClusterMetadataSetup.createTenantIfAbsent
+                    (pulsar.getPulsarResources(), 
SYSTEM_NAMESPACE.getTenant(), config.getClusterName());
+
             PulsarClusterMetadataSetup.createNamespaceIfAbsent
                     (pulsar.getPulsarResources(), SYSTEM_NAMESPACE, 
config.getClusterName());
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 549dfef896c..b4d0f38b4a4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1785,6 +1785,9 @@ public class BrokerService implements Closeable {
     }
 
     private CompletableFuture<Void> checkTopicAlreadyMigrated(TopicName 
topicName) {
+        if (ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) {
+            return CompletableFuture.completedFuture(null);
+        }
         CompletableFuture<Void> result = new CompletableFuture<>();
         AbstractTopic.isClusterMigrationEnabled(pulsar, 
topicName.toString()).handle((isMigrated, ex) -> {
             if (isMigrated) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 88f8c698950..0ac06d6883f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -996,6 +996,10 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 
     @Override
     public CompletableFuture<Void> checkClusterMigration() {
+        if (ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
+            return CompletableFuture.completedFuture(null);
+        }
+
         Optional<ClusterUrl> url = getMigratedClusterUrl();
         if (url.isPresent()) {
             this.migrated = true;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index b21cd165402..3ceecd7f4aa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2740,6 +2740,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     @Override
     public CompletableFuture<Void> checkClusterMigration() {
+        if (ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
+            return CompletableFuture.completedFuture(null);
+        }
+
         Optional<ClusterUrl> clusterUrl = getMigratedClusterUrl();
 
         if (!clusterUrl.isPresent()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
index 7bd82cdd840..20e13023cac 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
@@ -35,6 +35,8 @@ import lombok.Cleanup;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
@@ -53,6 +55,7 @@ import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
 import org.testng.annotations.Test;
 
 @Test(groups = "cluster-migration")
@@ -86,6 +89,8 @@ public class ClusterMigrationTest {
     PulsarService pulsar4;
     PulsarAdmin admin4;
 
+    String loadManagerClassName;
+
     @DataProvider(name="NamespaceMigrationTopicSubscriptionTypes")
     public Object[][] namespaceMigrationSubscriptionTypes() {
         return new Object[][] {
@@ -95,15 +100,28 @@ public class ClusterMigrationTest {
         };
     }
 
+    @DataProvider(name = "loadManagerClassName")
+    public static Object[][] loadManagerClassName() {
+        return new Object[][]{
+                {ModularLoadManagerImpl.class.getName()},
+                {ExtensibleLoadManagerImpl.class.getName()}
+        };
+    }
+
+    @Factory(dataProvider = "loadManagerClassName")
+    public ClusterMigrationTest(String loadManagerClassName) {
+        this.loadManagerClassName = loadManagerClassName;
+    }
+
     @BeforeMethod(alwaysRun = true, timeOut = 300000)
     public void setup() throws Exception {
 
         log.info("--- Starting ReplicatorTestBase::setup ---");
 
-        broker1 = new TestBroker("r1");
-        broker2 = new TestBroker("r2");
-        broker3 = new TestBroker("r3");
-        broker4 = new TestBroker("r4");
+        broker1 = new TestBroker("r1", loadManagerClassName);
+        broker2 = new TestBroker("r2", loadManagerClassName);
+        broker3 = new TestBroker("r3", loadManagerClassName);
+        broker4 = new TestBroker("r4", loadManagerClassName);
 
         pulsar1 = broker1.getPulsarService();
         url1 = new URL(pulsar1.getWebServiceAddress());
@@ -163,9 +181,9 @@ public class ClusterMigrationTest {
                         
.brokerServiceUrlTls(pulsar4.getBrokerServiceUrlTls()).build());
 
         // Setting r3 as replication cluster for r1
-        admin1.tenants().createTenant("pulsar",
+        updateTenantInfo(admin1, "pulsar",
                 new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", 
"appid3"), Sets.newHashSet("r1", "r3")));
-        admin3.tenants().createTenant("pulsar",
+        updateTenantInfo(admin3, "pulsar",
                 new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", 
"appid3"), Sets.newHashSet("r1", "r3")));
         admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1", 
"r3"));
         admin3.namespaces().createNamespace(namespace);
@@ -175,9 +193,9 @@ public class ClusterMigrationTest {
         
admin1.namespaces().setNamespaceReplicationClusters(namespaceNotToMigrate, 
Sets.newHashSet("r1", "r3"));
 
         // Setting r4 as replication cluster for r2
-        admin2.tenants().createTenant("pulsar",
+        updateTenantInfo(admin2, "pulsar",
                 new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", 
"appid3"), Sets.newHashSet("r2", "r4")));
-        admin4.tenants().createTenant("pulsar",
+        updateTenantInfo(admin4,"pulsar",
                 new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", 
"appid3"), Sets.newHashSet("r2", "r4")));
         admin2.namespaces().createNamespace(namespace, Sets.newHashSet("r2", 
"r4"));
         admin4.namespaces().createNamespace(namespace);
@@ -200,6 +218,14 @@ public class ClusterMigrationTest {
 
     }
 
+    protected void updateTenantInfo(PulsarAdmin admin, String tenant, 
TenantInfoImpl tenantInfo) throws Exception {
+        if (!admin.tenants().getTenants().contains(tenant)) {
+            admin.tenants().createTenant(tenant, tenantInfo);
+        } else {
+            admin.tenants().updateTenant(tenant, tenantInfo);
+        }
+    }
+
     @AfterMethod(alwaysRun = true, timeOut = 300000)
     protected void cleanup() throws Exception {
         log.info("--- Shutting down ---");
@@ -1059,9 +1085,11 @@ public class ClusterMigrationTest {
     static class TestBroker extends MockedPulsarServiceBaseTest {
 
         private String clusterName;
+        private String loadManagerClassName;
 
-        public TestBroker(String clusterName) throws Exception {
+        public TestBroker(String clusterName, String loadManagerClassName) 
throws Exception {
             this.clusterName = clusterName;
+            this.loadManagerClassName = loadManagerClassName;
             setup();
         }
 
@@ -1073,6 +1101,7 @@ public class ClusterMigrationTest {
         @Override
         protected void doInitConf() throws Exception {
             super.doInitConf();
+            this.conf.setLoadManagerClassName(loadManagerClassName);
             this.conf.setWebServicePortTls(Optional.of(0));
             this.conf.setBrokerServicePortTls(Optional.of(0));
         }

Reply via email to