This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 22115c39454 [improve][broker] Gracefully shut down load balancer
extension (#20315)
22115c39454 is described below
commit 22115c394541e4f6f4890d429bcca455ebd0d1ec
Author: Heesung Sohn <[email protected]>
AuthorDate: Sun May 21 17:50:44 2023 -0700
[improve][broker] Gracefully shut down load balancer extension (#20315)
### Motivation
Load balancer extension needs to shut down gracefully, especially when
shutting down the leader broker. When the leader broker closes the leader
election service too late, service unit lookups to the leader broker could fail
during the shutdown. This could delay client reconnection time.
Lookup failure logs
```
(shutdown starts)
pulsar-broker-8 pulsar-broker 2023-04-22T00:19:52,630+0000
[pulsar-service-shutdown] INFO org.apache.pulsar.broker.PulsarService -
Closing PulsarService
pulsar-broker-3 pulsar-broker 2023-04-22T00:19:52,690+0000 [pulsar-io-4-5]
INFO org.apache.pulsar.client.impl.ConnectionHandler -
[persistent://pulsar/system/loadbalancer-service-unit-state] [pulsar-18-9]
Closed connection [id: 0x907e74b0, L:/10.0.13.6:35838 !
R:pulsar-broker-8.pulsar-broker.pulsar.svc.cluster.local/10.0.18.18:6650] --
Will try again in 0.1 s
pulsar-broker-3 pulsar-broker 2023-04-22T00:19:52,691+0000 [pulsar-io-4-5]
INFO org.apache.pulsar.client.impl.ConnectionHandler -
[persistent://pulsar/system/loadbalancer-service-unit-state]
[reader-30e6b40dd9] Closed connection [id: 0x907e74b0, L:/10.0.13.6:35838 !
R:pulsar-broker-8.pulsar-broker.pulsar.svc.cluster.local/10.0.18.18:6650] --
Will try again in 0.1 s
(znode is gone)
pulsar-broker-8 pulsar-broker 2023-04-22T00:19:52,652+0000
[pulsar-load-manager-1-1] INFO
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl
- BrokerRegistry detected the
broker:pulsar-broker-8.pulsar-broker.pulsar.svc.cluster.local:8080 registry has
been deleted.
(lookup failure)
org.apache.pulsar.client.impl.ConnectionHandler -
[persistent://pulsar/system/loadbalancer-service-unit-state]
[reader-30e6b40dd9] Could not get connection to broker:
org.apache.pulsar.client.api.PulsarClientException$ConnectException:
Disconnected from server at
pulsar-broker-3.pulsar-broker.pulsar.svc.cluster.local/10.0.13.6:6650 -- Will
try again in 0.193 s
pulsar-broker-3 pulsar-broker 2023-04-22T00:19:52,827+0000
[main-EventThread] INFO org.apache.pulsar.broker.lookup.TopicLookupBase -
Failed to lookup null for topic
persistent://pulsar/system/loadbalancer-service-unit-state with error Failed to
look up a broker
registry:pulsar-broker-8.pulsar-broker.pulsar.svc.cluster.local:8080 for
bundle:pulsar/system/0x40000000_0x50000000
pulsar-broker-3 pulsar-broker 2023-04-22T00:19:52,827+0000
[main-EventThread] INFO org.apache.pulsar.broker.lookup.TopicLookupBase -
Failed to lookup null for topic
persistent://pulsar/system/loadbalancer-service-unit-state with error Failed to
look up a broker
registry:pulsar-broker-8.pulsar-broker.pulsar.svc.cluster.local:8080 for
bundle:pulsar/system/0x40000000_0x50000000
(leader election service has been closed)
pulsar-broker-3 pulsar-broker 2023-04-22T00:19:55,569+0000
[pulsar-load-manager-1-1] INFO
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl -
This broker:pulsar-broker-3.pulsar-broker.pulsar.svc.cluster.local:8080 plays
the leader now.
```
### Modifications
During the shutdown flow, when calling loadbalancer.disableBroker(), the
load balancer extension gracefully gives up the owned bundles to other brokers.
After that, it closes the leader election service and removes its register in
the metadata store.
---
.../org/apache/pulsar/broker/PulsarService.java | 22 +++-
.../extensions/ExtensibleLoadManagerImpl.java | 17 ++++
.../extensions/ExtensibleLoadManagerWrapper.java | 2 +-
.../channel/ServiceUnitStateChannel.java | 5 +
.../channel/ServiceUnitStateChannelImpl.java | 112 +++++++++++++++++----
.../extensions/ExtensibleLoadManagerImplTest.java | 45 +++++++++
.../channel/ServiceUnitStateChannelTest.java | 65 ++++++++----
7 files changed, 222 insertions(+), 46 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f833674ceeb..62d4634fa2d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -382,6 +382,17 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
localMetadataStore.close();
}
+ private void closeLeaderElectionService() throws Exception {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+
ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService().close();
+ } else {
+ if (this.leaderElectionService != null) {
+ this.leaderElectionService.close();
+ this.leaderElectionService = null;
+ }
+ }
+ }
+
@Override
public void close() throws PulsarServerException {
try {
@@ -502,10 +513,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.bkClientFactory = null;
}
- if (this.leaderElectionService != null) {
- this.leaderElectionService.close();
- this.leaderElectionService = null;
- }
+ closeLeaderElectionService();
if (adminClient != null) {
adminClient.close();
@@ -1316,7 +1324,11 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
* @return a reference of the current <code>LeaderElectionService</code>
instance.
*/
public LeaderElectionService getLeaderElectionService() {
- return this.leaderElectionService;
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ return
ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService();
+ } else {
+ return this.leaderElectionService;
+ }
}
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 348098df874..531ab18938a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -26,6 +26,7 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecisi
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -397,12 +398,22 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
}
public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId
bundle) {
+ return selectAsync(bundle, Collections.emptySet());
+ }
+
+ public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId
bundle,
+ Set<String>
excludeBrokerSet) {
BrokerRegistry brokerRegistry = getBrokerRegistry();
return brokerRegistry.getAvailableBrokerLookupDataAsync()
.thenCompose(availableBrokers -> {
LoadManagerContext context = this.getContext();
Map<String, BrokerLookupData> availableBrokerCandidates =
new HashMap<>(availableBrokers);
+ if (!excludeBrokerSet.isEmpty()) {
+ for (String exclude : excludeBrokerSet) {
+ availableBrokerCandidates.remove(exclude);
+ }
+ }
// Filter out brokers that do not meet the rules.
List<BrokerFilter> filterPipeline =
getBrokerFilterPipeline();
@@ -702,4 +713,10 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
log.error("Failed to get the channel ownership.", e);
}
}
+
+ public void disableBroker() throws Exception {
+ serviceUnitStateChannel.cleanOwnerships();
+ leaderElectionService.close();
+ brokerRegistry.unregister();
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
index 1eabbe620e2..18e949537de 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
@@ -74,7 +74,7 @@ public class ExtensibleLoadManagerWrapper implements
LoadManager {
@Override
public void disableBroker() throws Exception {
- this.loadManager.getBrokerRegistry().unregister();
+ this.loadManager.disableBroker();
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
index 4782a31fe0c..6e75fe91a91 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
@@ -206,4 +206,9 @@ public interface ServiceUnitStateChannel extends Closeable {
* Cancels the ownership monitor.
*/
void cancelOwnershipMonitor();
+
+ /**
+ * Cleans the service unit ownerships from the current broker's channel.
+ */
+ void cleanOwnerships();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index f476675d01b..489a0085105 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -91,7 +91,6 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
-import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
@@ -110,6 +109,10 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
public static final CompressionType MSG_COMPRESSION_TYPE =
CompressionType.ZSTD;
private static final long MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS = 30
* 1000; // 30sec
+
+ private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000;
+ private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS =
100;
+ private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS =
3000;
public static final long VERSION_ID_INIT = 1; // initial versionId
private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60;
public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3
mins
@@ -257,6 +260,11 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
}
+ @Override
+ public void cleanOwnerships() {
+ doCleanup(lookupServiceAddress);
+ }
+
public synchronized void start() throws PulsarServerException {
if (!validateChannelState(LeaderElectionServiceStarted, false)) {
throw new IllegalStateException("Invalid channel state:" +
channelState.name());
@@ -284,7 +292,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
}
PulsarClusterMetadataSetup.createNamespaceIfAbsent
- (pulsar.getPulsarResources(),
NamespaceName.SYSTEM_NAMESPACE, config.getClusterName());
+ (pulsar.getPulsarResources(), SYSTEM_NAMESPACE,
config.getClusterName());
ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC);
@@ -696,6 +704,8 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (isTargetBroker(data.dstBroker())) {
log(null, serviceUnit, data, null);
lastOwnEventHandledAt = System.currentTimeMillis();
+ } else if (data.force() && isTargetBroker(data.sourceBroker())) {
+ closeServiceUnit(serviceUnit);
}
}
@@ -1110,21 +1120,23 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private ServiceUnitStateData
getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData,
- String
selectedBroker) {
+ String
selectedBroker,
+ String
inactiveBroker) {
if (orphanData.state() == Splitting) {
return new ServiceUnitStateData(Splitting, orphanData.dstBroker(),
selectedBroker,
Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
true, getNextVersionId(orphanData));
} else {
- return new ServiceUnitStateData(Owned, selectedBroker, true,
getNextVersionId(orphanData));
+ return new ServiceUnitStateData(Owned, selectedBroker,
inactiveBroker,
+ true, getNextVersionId(orphanData));
}
}
- private void overrideOwnership(String serviceUnit, ServiceUnitStateData
orphanData) {
-
- Optional<String> selectedBroker = selectBroker(serviceUnit);
+ private void overrideOwnership(String serviceUnit, ServiceUnitStateData
orphanData, String inactiveBroker) {
+ Optional<String> selectedBroker = selectBroker(serviceUnit,
inactiveBroker);
if (selectedBroker.isPresent()) {
- var override = getOverrideInactiveBrokerStateData(orphanData,
selectedBroker.get());
+ var override = getOverrideInactiveBrokerStateData(
+ orphanData, selectedBroker.get(), inactiveBroker);
log.info("Overriding ownership serviceUnit:{} from orphanData:{}
to overrideData:{}",
serviceUnit, orphanData, override);
publishOverrideEventAsync(serviceUnit, orphanData, override)
@@ -1142,26 +1154,69 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
}
+ private void waitForCleanups(String broker, boolean excludeSystemTopics,
int maxWaitTimeInMillis) {
+ long started = System.currentTimeMillis();
+ while (System.currentTimeMillis() - started < maxWaitTimeInMillis) {
+ boolean cleaned = true;
+ for (var etr : tableview.entrySet()) {
+ var serviceUnit = etr.getKey();
+ var data = etr.getValue();
+
+ if (excludeSystemTopics &&
serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
+ continue;
+ }
+
+ if (data.state() == Owned && broker.equals(data.dstBroker())) {
+ cleaned = false;
+ break;
+ }
+ }
+ if (cleaned) {
+ try {
+
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS);
+ } catch (InterruptedException e) {
+ log.warn("Interrupted while gracefully waiting for the
cleanup convergence.");
+ }
+ break;
+ } else {
+ try {
+
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS);
+ } catch (InterruptedException e) {
+ log.warn("Interrupted while delaying the next service unit
clean-up. Cleaning broker:{}",
+ lookupServiceAddress);
+ }
+ }
+ }
+ }
- private void doCleanup(String broker) {
+ private synchronized void doCleanup(String broker) {
long startTime = System.nanoTime();
log.info("Started ownership cleanup for the inactive broker:{}",
broker);
int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
+ Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new
HashMap<>();
for (var etr : tableview.entrySet()) {
var stateData = etr.getValue();
var serviceUnit = etr.getKey();
var state = state(stateData);
if (StringUtils.equals(broker, stateData.dstBroker())) {
if (isActiveState(state)) {
- overrideOwnership(serviceUnit, stateData);
+ if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
+ orphanSystemServiceUnits.put(serviceUnit, stateData);
+ } else {
+ overrideOwnership(serviceUnit, stateData, broker);
+ }
orphanServiceUnitCleanupCnt++;
}
} else if (StringUtils.equals(broker, stateData.sourceBroker())) {
if (isInFlightState(state)) {
- overrideOwnership(serviceUnit, stateData);
+ if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
+ orphanSystemServiceUnits.put(serviceUnit, stateData);
+ } else {
+ overrideOwnership(serviceUnit, stateData, broker);
+ }
orphanServiceUnitCleanupCnt++;
}
}
@@ -1170,14 +1225,33 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
try {
producer.flush();
} catch (PulsarClientException e) {
- log.error("Failed to flush the in-flight messages.", e);
+ log.error("Failed to flush the in-flight non-system bundle
override messages.", e);
}
+
if (orphanServiceUnitCleanupCnt > 0) {
+ // System bundles can contain this channel's system topic and
other important system topics.
+ // Cleaning such system bundles(closing the system topics)
together with the non-system bundles
+ // can cause the cluster to be temporarily unstable.
+ // Hence, we clean the non-system bundles first and gracefully
wait for them.
+ // After that, we clean the system bundles, if any.
+ waitForCleanups(broker, true,
OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS);
this.totalOrphanServiceUnitCleanupCnt +=
orphanServiceUnitCleanupCnt;
this.totalInactiveBrokerCleanupCnt++;
}
+ // clean system bundles in the end
+ for (var orphanSystemServiceUnit :
orphanSystemServiceUnits.entrySet()) {
+ log.info("Overriding orphan system service unit:{}",
orphanSystemServiceUnit.getKey());
+ overrideOwnership(orphanSystemServiceUnit.getKey(),
orphanSystemServiceUnit.getValue(), broker);
+ }
+
+ try {
+ producer.flush();
+ } catch (PulsarClientException e) {
+ log.error("Failed to flush the in-flight system bundle override
messages.", e);
+ }
+
double cleanupTime = TimeUnit.NANOSECONDS
.toMillis((System.nanoTime() - startTime));
@@ -1196,9 +1270,9 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
- private Optional<String> selectBroker(String serviceUnit) {
+ private Optional<String> selectBroker(String serviceUnit, String
inactiveBroker) {
try {
- return loadManager.selectAsync(getNamespaceBundle(serviceUnit))
+ return loadManager.selectAsync(getNamespaceBundle(serviceUnit),
Set.of(inactiveBroker))
.get(inFlightStateWaitingTimeInMillis, MILLISECONDS);
} catch (Throwable e) {
log.error("Failed to select a broker for serviceUnit:{}",
serviceUnit);
@@ -1206,8 +1280,10 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
return Optional.empty();
}
- private Optional<ServiceUnitStateData> getRollForwardStateData(String
serviceUnit, long nextVersionId) {
- Optional<String> selectedBroker = selectBroker(serviceUnit);
+ private Optional<ServiceUnitStateData> getRollForwardStateData(String
serviceUnit,
+ String
inactiveBroker,
+ long
nextVersionId) {
+ Optional<String> selectedBroker = selectBroker(serviceUnit,
inactiveBroker);
if (selectedBroker.isEmpty()) {
return Optional.empty();
}
@@ -1222,7 +1298,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
var state = orphanData.state();
switch (state) {
case Assigning: {
- return getRollForwardStateData(serviceUnit, nextVersionId);
+ return getRollForwardStateData(serviceUnit,
orphanData.dstBroker(), nextVersionId);
}
case Splitting: {
return Optional.of(new ServiceUnitStateData(Splitting,
@@ -1235,7 +1311,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
// rollback to the src
return Optional.of(new ServiceUnitStateData(Owned,
orphanData.sourceBroker(), true, nextVersionId));
} else {
- return getRollForwardStateData(serviceUnit, nextVersionId);
+ return getRollForwardStateData(serviceUnit,
orphanData.sourceBroker(), nextVersionId);
}
}
default: {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index d72ce9661b9..498f48d16d2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -882,6 +882,51 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
assertEquals(actual, expected);
}
+ @Test
+ public void testDisableBroker() throws Exception {
+ // Test rollback to modular load manager.
+ ServiceConfiguration defaultConf = getDefaultConf();
+ defaultConf.setAllowAutoTopicCreation(true);
+ defaultConf.setForceDeleteNamespaceAllowed(true);
+
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+ defaultConf.setLoadBalancerSheddingEnabled(false);
+ try (var additionalPulsarTestContext =
createAdditionalPulsarTestContext(defaultConf)) {
+ var pulsar3 = additionalPulsarTestContext.getPulsarService();
+ ExtensibleLoadManagerImpl ternaryLoadManager =
spy((ExtensibleLoadManagerImpl)
+ FieldUtils.readField(pulsar3.getLoadManager().get(),
"loadManager", true));
+ String topic = "persistent://public/default/test";
+
+ String lookupResult1 =
pulsar3.getAdminClient().lookups().lookupTopic(topic);
+ TopicName topicName = TopicName.get("test");
+ NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+ if (!pulsar3.getBrokerServiceUrl().equals(lookupResult1)) {
+
admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(),
bundle.getBundleRange(),
+ pulsar3.getLookupServiceAddress());
+ lookupResult1 =
pulsar2.getAdminClient().lookups().lookupTopic(topic);
+ }
+ String lookupResult2 =
pulsar1.getAdminClient().lookups().lookupTopic(topic);
+ String lookupResult3 =
pulsar2.getAdminClient().lookups().lookupTopic(topic);
+
+ assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl());
+ assertEquals(lookupResult1, lookupResult2);
+ assertEquals(lookupResult1, lookupResult3);
+
+
+
assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+
assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+
assertTrue(ternaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+
+ ternaryLoadManager.disableBroker();
+
+
assertFalse(ternaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+ if (primaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get()) {
+
assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+ } else {
+
assertTrue(secondaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+ }
+ }
+ }
+
private static abstract class MockBrokerFilter implements BrokerFilter {
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 0f682cf048f..1263170bec4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -517,7 +517,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
// recovered, check the monitor update state : Assigned -> Owned
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
- .when(loadManager).selectAsync(any());
+ .when(loadManager).selectAsync(any(), any());
FieldUtils.writeDeclaredField(channel2, "producer", producer, true);
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 1 , true);
@@ -715,8 +715,8 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var cleanupJobs1 = getCleanupJobs(channel1);
var cleanupJobs2 = getCleanupJobs(channel2);
- var leaderCleanupJobs = spy(cleanupJobs1);
- var followerCleanupJobs = spy(cleanupJobs2);
+ var leaderCleanupJobsTmp = spy(cleanupJobs1);
+ var followerCleanupJobsTmp = spy(cleanupJobs2);
var leaderChannel = channel1;
var followerChannel = channel2;
String leader = channel1.getChannelOwnerAsync().get(2,
TimeUnit.SECONDS).get();
@@ -725,10 +725,12 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
if (leader.equals(lookupServiceAddress2)) {
leaderChannel = channel2;
followerChannel = channel1;
- var tmp = followerCleanupJobs;
- followerCleanupJobs = leaderCleanupJobs;
- leaderCleanupJobs = tmp;
+ var tmp = followerCleanupJobsTmp;
+ followerCleanupJobsTmp = leaderCleanupJobsTmp;
+ leaderCleanupJobsTmp = tmp;
}
+ final var leaderCleanupJobs = leaderCleanupJobsTmp;
+ final var followerCleanupJobs = followerCleanupJobsTmp;
FieldUtils.writeDeclaredField(leaderChannel, "cleanupJobs",
leaderCleanupJobs,
true);
FieldUtils.writeDeclaredField(followerChannel, "cleanupJobs",
followerCleanupJobs,
@@ -737,7 +739,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var owner1 = channel1.getOwnerAsync(bundle1);
var owner2 = channel2.getOwnerAsync(bundle2);
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
- .when(loadManager).selectAsync(any());
+ .when(loadManager).selectAsync(any(), any());
assertTrue(owner1.get().isEmpty());
assertTrue(owner2.get().isEmpty());
@@ -771,9 +773,11 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
verify(leaderCleanupJobs, times(1)).computeIfAbsent(eq(broker), any());
verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker),
any());
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals(0, leaderCleanupJobs.size());
+ assertEquals(0, followerCleanupJobs.size());
+ });
- assertEquals(0, leaderCleanupJobs.size());
- assertEquals(0, followerCleanupJobs.size());
validateMonitorCounters(leaderChannel,
1,
0,
@@ -799,8 +803,12 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
verify(leaderCleanupJobs, times(2)).computeIfAbsent(eq(broker), any());
verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker),
any());
- assertEquals(1, leaderCleanupJobs.size());
- assertEquals(0, followerCleanupJobs.size());
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals(1, leaderCleanupJobs.size());
+ assertEquals(0, followerCleanupJobs.size());
+ });
+
validateMonitorCounters(leaderChannel,
1,
0,
@@ -816,8 +824,12 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
verify(leaderCleanupJobs, times(2)).computeIfAbsent(eq(broker), any());
verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker),
any());
- assertEquals(0, leaderCleanupJobs.size());
- assertEquals(0, followerCleanupJobs.size());
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals(0, leaderCleanupJobs.size());
+ assertEquals(0, followerCleanupJobs.size());
+ });
+
validateMonitorCounters(leaderChannel,
1,
0,
@@ -835,8 +847,11 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
verify(leaderCleanupJobs, times(3)).computeIfAbsent(eq(broker), any());
verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker),
any());
- assertEquals(1, leaderCleanupJobs.size());
- assertEquals(0, followerCleanupJobs.size());
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals(1, leaderCleanupJobs.size());
+ assertEquals(0, followerCleanupJobs.size());
+ });
+
validateMonitorCounters(leaderChannel,
1,
0,
@@ -854,8 +869,11 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
verify(leaderCleanupJobs, times(3)).computeIfAbsent(eq(broker), any());
verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker),
any());
- assertEquals(0, leaderCleanupJobs.size());
- assertEquals(0, followerCleanupJobs.size());
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals(0, leaderCleanupJobs.size());
+ assertEquals(0, followerCleanupJobs.size());
+ });
+
validateMonitorCounters(leaderChannel,
2,
0,
@@ -880,8 +898,11 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
verify(leaderCleanupJobs, times(3)).computeIfAbsent(eq(broker), any());
verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker),
any());
- assertEquals(0, leaderCleanupJobs.size());
- assertEquals(0, followerCleanupJobs.size());
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals(0, leaderCleanupJobs.size());
+ assertEquals(0, followerCleanupJobs.size());
+ });
+
validateMonitorCounters(leaderChannel,
2,
0,
@@ -1103,7 +1124,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
FieldUtils.writeDeclaredField(channel2,
"inFlightStateWaitingTimeInMillis", 3 * 1000, true);
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
- .when(loadManager).selectAsync(any());
+ .when(loadManager).selectAsync(any(), any());
channel1.publishAssignEventAsync(bundle, lookupServiceAddress2);
// channel1 is broken. the assign won't be complete.
waitUntilState(channel1, bundle);
@@ -1442,7 +1463,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
// test stable metadata state
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
- .when(loadManager).selectAsync(any());
+ .when(loadManager).selectAsync(any(), any());
leaderChannel.handleMetadataSessionEvent(SessionReestablished);
followerChannel.handleMetadataSessionEvent(SessionReestablished);
FieldUtils.writeDeclaredField(leaderChannel,
"lastMetadataSessionEventTimestamp",
@@ -1507,7 +1528,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
// test stable metadata state
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2)))
- .when(loadManager).selectAsync(any());
+ .when(loadManager).selectAsync(any(), any());
FieldUtils.writeDeclaredField(leaderChannel,
"inFlightStateWaitingTimeInMillis",
-1, true);
FieldUtils.writeDeclaredField(followerChannel,
"inFlightStateWaitingTimeInMillis",