This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 15db04d9aee8817fcac591ca9287b1db30dda33a
Author: Heesung Sohn <[email protected]>
AuthorDate: Tue Apr 16 00:34:59 2024 -0700

    [fix][broker] Fix Replicated Topic unload bug when ExtensibleLoadManager is 
enabled (#22496)
    
    (cherry picked from commit 203f305bf449dd335b39501177f210cfcb73d5fa)
---
 .../channel/ServiceUnitStateChannelImpl.java          | 13 ++++++-------
 .../pulsar/broker/namespace/NamespaceService.java     |  5 +++++
 .../service/nonpersistent/NonPersistentTopic.java     |  3 ++-
 .../broker/service/persistent/PersistentTopic.java    |  3 ++-
 .../pulsar/broker/service/ReplicatorGlobalNSTest.java | 16 ++++++++++++++++
 .../pulsar/broker/service/ReplicatorTestBase.java     | 19 +++++++++++++++++--
 6 files changed, 48 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 477a9239538..f6fb4503b03 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -88,7 +88,6 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TableView;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -1377,8 +1376,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
 
         try {
-            producer.flush();
-        } catch (PulsarClientException e) {
+            
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, 
MILLISECONDS);
+        } catch (Exception e) {
             log.error("Failed to flush the in-flight non-system bundle 
override messages.", e);
         }
 
@@ -1401,8 +1400,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
 
         try {
-            producer.flush();
-        } catch (PulsarClientException e) {
+            
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, 
MILLISECONDS);
+        } catch (Exception e) {
             log.error("Failed to flush the in-flight system bundle override 
messages.", e);
         }
 
@@ -1580,8 +1579,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
 
         try {
-            producer.flush();
-        } catch (PulsarClientException e) {
+            
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, 
MILLISECONDS);
+        } catch (Exception e) {
             log.error("Failed to flush the in-flight messages.", e);
         }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 6844b44419d..187290566d4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -795,6 +795,11 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public CompletableFuture<Boolean> isNamespaceBundleOwned(NamespaceBundle 
bundle) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
+            ExtensibleLoadManagerImpl extensibleLoadManager = 
ExtensibleLoadManagerImpl.get(loadManager.get());
+            return extensibleLoadManager.getOwnershipAsync(Optional.empty(), 
bundle)
+                    .thenApply(Optional::isPresent);
+        }
         return 
pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle));
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 8af654633b4..22ee11d8eaf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -580,7 +580,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
     @Override
     public CompletableFuture<Void> checkReplication() {
         TopicName name = TopicName.get(topic);
-        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
+        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
+                || ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
             return CompletableFuture.completedFuture(null);
         }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f363132f944..bd53191a261 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1817,7 +1817,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     @Override
     public CompletableFuture<Void> checkReplication() {
         TopicName name = TopicName.get(topic);
-        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
+        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
+                || ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
             return CompletableFuture.completedFuture(null);
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
index 479c1e616e3..514e0207fbf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
@@ -27,6 +27,8 @@ import java.util.concurrent.Future;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -38,6 +40,8 @@ import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
 import org.testng.annotations.Test;
 
 import java.lang.reflect.Method;
@@ -52,6 +56,18 @@ import java.util.concurrent.TimeUnit;
 public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
 
     protected String methodName;
+    @DataProvider(name = "loadManagerClassName")
+    public static Object[][] loadManagerClassName() {
+        return new Object[][]{
+                {ModularLoadManagerImpl.class.getName()},
+                {ExtensibleLoadManagerImpl.class.getName()}
+        };
+    }
+
+    @Factory(dataProvider = "loadManagerClassName")
+    public ReplicatorGlobalNSTest(String loadManagerClassName) {
+        this.loadManagerClassName = loadManagerClassName;
+    }
 
     @BeforeMethod
     public void beforeMethod(Method m) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 39cd13fbba5..d87f896e31a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -119,6 +119,11 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
     protected final String cluster2 = "r2";
     protected final String cluster3 = "r3";
     protected final String cluster4 = "r4";
+    protected String loadManagerClassName;
+
+    protected String getLoadManagerClassName() {
+        return loadManagerClassName;
+    }
 
     // Default frequency
     public int getBrokerServicePurgeInactiveFrequency() {
@@ -271,8 +276,9 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
                 .brokerClientTlsTrustStoreType(keyStoreType)
                 .build());
 
-        admin1.tenants().createTenant("pulsar",
-                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", 
"appid3"), Sets.newHashSet("r1", "r2", "r3")));
+        updateTenantInfo("pulsar",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", 
"appid3"),
+                        Sets.newHashSet("r1", "r2", "r3")));
         admin1.namespaces().createNamespace("pulsar/ns", Sets.newHashSet("r1", 
"r2", "r3"));
         admin1.namespaces().createNamespace("pulsar/ns1", 
Sets.newHashSet("r1", "r2"));
 
@@ -344,6 +350,7 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
         config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
         config.setEnableReplicatedSubscriptions(true);
         config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
+        config.setLoadManagerClassName(getLoadManagerClassName());
     }
 
     public void resetConfig1() {
@@ -436,6 +443,14 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
         resetConfig4();
     }
 
+    protected void updateTenantInfo(String tenant, TenantInfoImpl tenantInfo) 
throws Exception {
+        if (!admin1.tenants().getTenants().contains(tenant)) {
+            admin1.tenants().createTenant(tenant, tenantInfo);
+        } else {
+            admin1.tenants().updateTenant(tenant, tenantInfo);
+        }
+    }
+
     static class MessageProducer implements AutoCloseable {
         URL url;
         String namespace;

Reply via email to