This is an automated email from the ASF dual-hosted git repository. heesung pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2557db63d48a6534951d9a988c1d7afb5fa64dce Author: Heesung Sohn <[email protected]> AuthorDate: Tue Apr 16 00:34:59 2024 -0700 [fix][broker] Fix Replicated Topic unload bug when ExtensibleLoadManager is enabled (#22496) (cherry picked from commit 203f305bf449dd335b39501177f210cfcb73d5fa) (cherry picked from commit f467f37a75cae11841161a5a7c6e4be5671100fd) --- .../extensions/channel/ServiceUnitStateChannelImpl.java | 13 ++++++------- .../apache/pulsar/broker/namespace/NamespaceService.java | 5 +++++ .../broker/service/nonpersistent/NonPersistentTopic.java | 4 +++- .../pulsar/broker/service/persistent/PersistentTopic.java | 3 ++- .../pulsar/broker/service/ReplicatorGlobalNSTest.java | 7 +++---- 5 files changed, 19 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index f66ed2a5c90..7cd0e226700 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -88,7 +88,6 @@ import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TableView; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -1338,8 +1337,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { } try { - producer.flush(); - } catch (PulsarClientException e) { + producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS); + } catch (Exception e) { log.error("Failed to flush the in-flight non-system bundle override messages.", e); } @@ -1362,8 +1361,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { } try { - producer.flush(); - } catch (PulsarClientException e) { + producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS); + } catch (Exception e) { log.error("Failed to flush the in-flight system bundle override messages.", e); } @@ -1541,8 +1540,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { } try { - producer.flush(); - } catch (PulsarClientException e) { + producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS); + } catch (Exception e) { log.error("Failed to flush the in-flight messages.", e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 18ea233a971..41ad834304c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -800,6 +800,11 @@ public class NamespaceService implements AutoCloseable { } public CompletableFuture<Boolean> isNamespaceBundleOwned(NamespaceBundle bundle) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { + ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); + return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle) + .thenApply(Optional::isPresent); + } return pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index c98e901ada4..36a4369e076 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.service.AbstractReplicator; @@ -553,7 +554,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol @Override public CompletableFuture<Void> checkReplication() { TopicName name = TopicName.get(topic); - if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) { + if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name) + || ExtensibleLoadManagerImpl.isInternalTopic(topic)) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 73ff8cef893..df16883b5c8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1736,7 +1736,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Override public CompletableFuture<Void> checkReplication() { TopicName name = TopicName.get(topic); - if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) { + if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name) + || ExtensibleLoadManagerImpl.isInternalTopic(topic)) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java index 057126e6981..c09809d4ad8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java @@ -20,14 +20,16 @@ package org.apache.pulsar.broker.service; import static org.testng.Assert.fail; import com.google.common.collect.Sets; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; -import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.ConsumerImpl; @@ -40,9 +42,6 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.lang.reflect.Method; -import java.util.concurrent.TimeUnit; - /** * The tests in this class should be denied in a production pulsar cluster. they are very dangerous, which leads to * a lot of topic deletion and makes namespace policies being incorrect.
