This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2557db63d48a6534951d9a988c1d7afb5fa64dce
Author: Heesung Sohn <[email protected]>
AuthorDate: Tue Apr 16 00:34:59 2024 -0700

    [fix][broker] Fix Replicated Topic unload bug when ExtensibleLoadManager is 
enabled (#22496)
    
    (cherry picked from commit 203f305bf449dd335b39501177f210cfcb73d5fa)
    (cherry picked from commit f467f37a75cae11841161a5a7c6e4be5671100fd)
---
 .../extensions/channel/ServiceUnitStateChannelImpl.java     | 13 ++++++-------
 .../apache/pulsar/broker/namespace/NamespaceService.java    |  5 +++++
 .../broker/service/nonpersistent/NonPersistentTopic.java    |  4 +++-
 .../pulsar/broker/service/persistent/PersistentTopic.java   |  3 ++-
 .../pulsar/broker/service/ReplicatorGlobalNSTest.java       |  7 +++----
 5 files changed, 19 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index f66ed2a5c90..7cd0e226700 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -88,7 +88,6 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TableView;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -1338,8 +1337,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
 
         try {
-            producer.flush();
-        } catch (PulsarClientException e) {
+            
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, 
MILLISECONDS);
+        } catch (Exception e) {
             log.error("Failed to flush the in-flight non-system bundle 
override messages.", e);
         }
 
@@ -1362,8 +1361,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
 
         try {
-            producer.flush();
-        } catch (PulsarClientException e) {
+            
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, 
MILLISECONDS);
+        } catch (Exception e) {
             log.error("Failed to flush the in-flight system bundle override 
messages.", e);
         }
 
@@ -1541,8 +1540,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
 
         try {
-            producer.flush();
-        } catch (PulsarClientException e) {
+            
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, 
MILLISECONDS);
+        } catch (Exception e) {
             log.error("Failed to flush the in-flight messages.", e);
         }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 18ea233a971..41ad834304c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -800,6 +800,11 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public CompletableFuture<Boolean> isNamespaceBundleOwned(NamespaceBundle 
bundle) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
+            ExtensibleLoadManagerImpl extensibleLoadManager = 
ExtensibleLoadManagerImpl.get(loadManager.get());
+            return extensibleLoadManager.getOwnershipAsync(Optional.empty(), 
bundle)
+                    .thenApply(Optional::isPresent);
+        }
         return 
pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle));
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index c98e901ada4..36a4369e076 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.PulsarServerException;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.service.AbstractReplicator;
@@ -553,7 +554,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
     @Override
     public CompletableFuture<Void> checkReplication() {
         TopicName name = TopicName.get(topic);
-        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
+        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
+                || ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
             return CompletableFuture.completedFuture(null);
         }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 73ff8cef893..df16883b5c8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1736,7 +1736,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     @Override
     public CompletableFuture<Void> checkReplication() {
         TopicName name = TopicName.get(topic);
-        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
+        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
+                || ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
             return CompletableFuture.completedFuture(null);
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
index 057126e6981..c09809d4ad8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
@@ -20,14 +20,16 @@ package org.apache.pulsar.broker.service;
 
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
-import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -40,9 +42,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.lang.reflect.Method;
-import java.util.concurrent.TimeUnit;
-
 /**
  * The tests in this class should be denied in a production pulsar cluster. 
they are very dangerous, which leads to
  * a lot of topic deletion and makes namespace policies being incorrect.

Reply via email to