This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8cff1238465 [improve][broker] PIP-192 Made only the leader consume
TopBundlesLoadDataStore (#19730)
8cff1238465 is described below
commit 8cff1238465ca8b1315d8b5b48388d4b61ec74b6
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Mar 15 17:22:12 2023 -0700
[improve][broker] PIP-192 Made only the leader consume
TopBundlesLoadDataStore (#19730)
Master Issue: https://github.com/apache/pulsar/issues/16691
### Motivation
Raising a PR to implement https://github.com/apache/pulsar/issues/16691.
Based on the current design, only the leader needs to consume the topk
bundle load data.
### Modifications
This PR
- made only the leader consume TopBundlesLoadDataStore
- moved LeaderElectionService to ExtensibleLoadManager from
ServiceUnitStateChannel.
---
.../extensions/ExtensibleLoadManagerImpl.java | 113 +++++++++++++-
.../channel/ServiceUnitStateChannel.java | 9 ++
.../channel/ServiceUnitStateChannelImpl.java | 79 +++++-----
.../extensions/scheduler/UnloadScheduler.java | 11 +-
.../extensions/store/LoadDataStore.java | 12 ++
.../store/TableViewLoadDataStoreImpl.java | 46 +++++-
.../extensions/ExtensibleLoadManagerImplTest.java | 169 +++++++++++++++++++--
.../channel/ServiceUnitStateChannelTest.java | 15 ++
.../extensions/filter/BrokerFilterTestBase.java | 11 ++
.../extensions/scheduler/TransferShedderTest.java | 21 +++
.../extensions/store/LoadDataStoreTest.java | 27 ++++
.../strategy/LeastResourceUsageWithWeightTest.java | 12 ++
12 files changed, 465 insertions(+), 60 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 7895f06fd50..716be3718bf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions;
+import static
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower;
+import static
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -26,6 +29,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -35,6 +39,7 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
@@ -70,6 +75,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
@Slf4j
public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
@@ -84,6 +90,8 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
NamespaceName.SYSTEM_NAMESPACE,
"loadbalancer-top-bundles-load-data").toString();
+ private static final long MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS = 200;
+
private PulsarService pulsar;
private ServiceConfiguration conf;
@@ -102,6 +110,9 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
private LoadManagerScheduler unloadScheduler;
+ @Getter
+ private LeaderElectionService leaderElectionService;
+
@Getter
private LoadManagerContext context;
@@ -142,7 +153,14 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
lookupRequests = ConcurrentOpenHashMap.<String,
CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
.build();
+ private final CountDownLatch loadStoreInitWaiter = new CountDownLatch(1);
+
+ public enum Role {
+ Leader,
+ Follower
+ }
+ private Role role;
/**
* Life cycle: Constructor -> initialize -> start -> close.
@@ -173,12 +191,24 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
return;
}
this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+ this.leaderElectionService = new LeaderElectionService(
+ pulsar.getCoordinationService(),
pulsar.getSafeWebServiceAddress(),
+ state -> {
+ pulsar.getLoadManagerExecutor().execute(() -> {
+ if (state == LeaderElectionState.Leading) {
+ playLeader();
+ } else {
+ playFollower();
+ }
+ });
+ });
this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
this.brokerRegistry.start();
this.unloadManager = new UnloadManager();
this.splitManager = new SplitManager(splitCounter);
this.serviceUnitStateChannel.listen(unloadManager);
this.serviceUnitStateChannel.listen(splitManager);
+ this.leaderElectionService.start();
this.serviceUnitStateChannel.start();
this.antiAffinityGroupPolicyHelper =
new AntiAffinityGroupPolicyHelper(pulsar,
serviceUnitStateChannel);
@@ -189,8 +219,10 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
try {
this.brokerLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC,
BrokerLoadData.class);
+ this.brokerLoadDataStore.startTableView();
this.topBundlesLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(),
TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
+ this.loadStoreInitWaiter.countDown();
} catch (LoadDataStoreException e) {
throw new PulsarServerException(e);
}
@@ -409,8 +441,15 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
this.serviceUnitStateChannel.close();
} finally {
this.unloadManager.close();
- this.started = false;
+ try {
+ this.leaderElectionService.close();
+ } catch (Exception e) {
+ throw new PulsarServerException(e);
+ } finally {
+ this.started = false;
+ }
}
+
}
}
}
@@ -421,6 +460,78 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
|| topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}
+ @VisibleForTesting
+ void playLeader() {
+ if (role != Leader) {
+ log.info("This broker:{} is changing the role from {} to {}",
+ pulsar.getLookupServiceAddress(), role, Leader);
+ int retry = 0;
+ while (true) {
+ try {
+ serviceUnitStateChannel.scheduleOwnershipMonitor();
+ loadStoreInitWaiter.await();
+ topBundlesLoadDataStore.startTableView();
+ unloadScheduler.start();
+ break;
+ } catch (Throwable e) {
+ log.error("The broker:{} failed to change the role.
Retrying {} th ...",
+ pulsar.getLookupServiceAddress(), ++retry, e);
+ try {
+ Thread.sleep(Math.min(retry * 10,
MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
+ } catch (InterruptedException ex) {
+ log.warn("Interrupted while sleeping.");
+ }
+ }
+ }
+ role = Leader;
+ log.info("This broker:{} plays the leader now.",
pulsar.getLookupServiceAddress());
+ }
+
+ // flush the load data when the leader is elected.
+ if (brokerLoadDataReporter != null) {
+ brokerLoadDataReporter.reportAsync(true);
+ }
+ if (topBundleLoadDataReporter != null) {
+ topBundleLoadDataReporter.reportAsync(true);
+ }
+ }
+
+ @VisibleForTesting
+ void playFollower() {
+ if (role != Follower) {
+ log.info("This broker:{} is changing the role from {} to {}",
+ pulsar.getLookupServiceAddress(), role, Follower);
+ int retry = 0;
+ while (true) {
+ try {
+ serviceUnitStateChannel.cancelOwnershipMonitor();
+ loadStoreInitWaiter.await();
+ topBundlesLoadDataStore.closeTableView();
+ unloadScheduler.close();
+ break;
+ } catch (Throwable e) {
+ log.error("The broker:{} failed to change the role.
Retrying {} th ...",
+ pulsar.getLookupServiceAddress(), ++retry, e);
+ try {
+ Thread.sleep(Math.min(retry * 10,
MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
+ } catch (InterruptedException ex) {
+ log.warn("Interrupted while sleeping.");
+ }
+ }
+ }
+ role = Follower;
+ log.info("This broker:{} plays a follower now.",
pulsar.getLookupServiceAddress());
+ }
+
+ // flush the load data when the leader is elected.
+ if (brokerLoadDataReporter != null) {
+ brokerLoadDataReporter.reportAsync(true);
+ }
+ if (topBundleLoadDataReporter != null) {
+ topBundleLoadDataReporter.reportAsync(true);
+ }
+ }
+
void updateBrokerLoadMetrics(BrokerLoadData loadData) {
this.brokerLoadMetrics.set(loadData.toMetrics(pulsar.getAdvertisedAddress()));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
index 4e92ad791ab..719c72a67b4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
@@ -172,4 +172,13 @@ public interface ServiceUnitStateChannel extends Closeable
{
*/
Set<Map.Entry<String, ServiceUnitStateData>> getOwnershipEntrySet();
+ /**
+ * Schedules ownership monitor to periodically check and correct invalid
ownership states.
+ */
+ void scheduleOwnershipMonitor();
+
+ /**
+ * Cancels the ownership monitor.
+ */
+ void cancelOwnershipMonitor();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index f8686c07f05..791d02649ba 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -96,7 +96,6 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;
-import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
@Slf4j
@@ -119,9 +118,9 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private final String lookupServiceAddress;
private final ConcurrentOpenHashMap<String, CompletableFuture<Void>>
cleanupJobs;
private final StateChangeListeners stateChangeListeners;
- private final LeaderElectionService leaderElectionService;
private BrokerSelectionStrategy brokerSelector;
private BrokerRegistry brokerRegistry;
+ private LeaderElectionService leaderElectionService;
private TableView<ServiceUnitStateData> tableview;
private Producer<ServiceUnitStateData> producer;
private ScheduledFuture<?> monitorTask;
@@ -205,31 +204,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
this.maxCleanupDelayTimeInSecs = MAX_CLEAN_UP_DELAY_TIME_IN_SECS;
this.minCleanupDelayTimeInSecs = MIN_CLEAN_UP_DELAY_TIME_IN_SECS;
- this.leaderElectionService = new LeaderElectionService(
- pulsar.getCoordinationService(),
pulsar.getSafeWebServiceAddress(),
- state -> {
- if (state == LeaderElectionState.Leading) {
- log.info("This broker:{} is the leader now.",
lookupServiceAddress);
- this.monitorTask = this.pulsar.getLoadManagerExecutor()
- .scheduleWithFixedDelay(() -> {
- try {
-
monitorOwnerships(brokerRegistry.getAvailableBrokersAsync()
-
.get(inFlightStateWaitingTimeInMillis, MILLISECONDS));
- } catch (Exception e) {
- log.info("Failed to monitor
the ownerships. will retry..", e);
- }
- },
- ownershipMonitorDelayTimeInSecs,
ownershipMonitorDelayTimeInSecs, SECONDS);
- } else {
- log.info("This broker:{} is a follower now.",
lookupServiceAddress);
- if (monitorTask != null) {
- monitorTask.cancel(false);
- monitorTask = null;
- log.info("This previous leader broker:{} stopped
the channel clean-up monitor",
- lookupServiceAddress);
- }
- }
- });
+
Map<ServiceUnitState, AtomicLong> tmpOwnerLookUpCounters = new
HashMap<>();
Map<ServiceUnitState, Counters> tmpHandlerCounters = new HashMap<>();
Map<EventType, Counters> tmpEventCounters = new HashMap<>();
@@ -246,6 +221,32 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
this.channelState = Constructed;
}
+ public void scheduleOwnershipMonitor() {
+ if (monitorTask == null) {
+ this.monitorTask = this.pulsar.getLoadManagerExecutor()
+ .scheduleWithFixedDelay(() -> {
+ try {
+
monitorOwnerships(brokerRegistry.getAvailableBrokersAsync()
+
.get(inFlightStateWaitingTimeInMillis, MILLISECONDS));
+ } catch (Exception e) {
+ log.info("Failed to monitor the
ownerships. will retry..", e);
+ }
+ },
+ ownershipMonitorDelayTimeInSecs,
ownershipMonitorDelayTimeInSecs, SECONDS);
+ log.info("This leader broker:{} started the ownership monitor.",
+ lookupServiceAddress);
+ }
+ }
+
+ public void cancelOwnershipMonitor() {
+ if (monitorTask != null) {
+ monitorTask.cancel(false);
+ monitorTask = null;
+ log.info("This previous leader broker:{} stopped the ownership
monitor.",
+ lookupServiceAddress);
+ }
+ }
+
public synchronized void start() throws PulsarServerException {
if (!validateChannelState(LeaderElectionServiceStarted, false)) {
throw new IllegalStateException("Invalid channel state:" +
channelState.name());
@@ -255,11 +256,15 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
try {
this.brokerRegistry = getBrokerRegistry();
this.brokerRegistry.addListener(this::handleBrokerRegistrationEvent);
- leaderElectionService.start();
- this.channelState = LeaderElectionServiceStarted;
- if (debug) {
- log.info("Successfully started the channel leader election
service.");
+ this.leaderElectionService = getLeaderElectionService();
+ var leader = leaderElectionService.readCurrentLeader().get(
+ MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS,
TimeUnit.SECONDS);
+ if (leader.isPresent()) {
+ log.info("Successfully found the channel leader:{}.",
leader.get());
+ } else {
+ log.warn("Failed to find the channel leader.");
}
+ this.channelState = LeaderElectionServiceStarted;
brokerSelector = getBrokerSelector();
if (producer != null) {
@@ -327,15 +332,17 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
return new LeastResourceUsageWithWeight();
}
+ @VisibleForTesting
+ protected LeaderElectionService getLeaderElectionService() {
+ return ((ExtensibleLoadManagerWrapper) pulsar.getLoadManager().get())
+ .get().getLeaderElectionService();
+ }
+
public synchronized void close() throws PulsarServerException {
channelState = Closed;
boolean debug = debug();
try {
- leaderElectionService.close();
- if (debug) {
- log.info("Successfully closed the channel leader election
service.");
- }
-
+ leaderElectionService = null;
if (tableview != null) {
tableview.close();
tableview = null;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
index e6460269787..bc3c8eb6a94 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
@@ -156,16 +156,19 @@ public class UnloadScheduler implements
LoadManagerScheduler {
@Override
public void start() {
- long loadSheddingInterval = TimeUnit.MINUTES
- .toMillis(conf.getLoadBalancerSheddingIntervalMinutes());
- this.task = loadManagerExecutor.scheduleAtFixedRate(
- this::execute, loadSheddingInterval, loadSheddingInterval,
TimeUnit.MILLISECONDS);
+ if (this.task == null) {
+ long loadSheddingInterval = TimeUnit.MINUTES
+ .toMillis(conf.getLoadBalancerSheddingIntervalMinutes());
+ this.task = loadManagerExecutor.scheduleAtFixedRate(
+ this::execute, loadSheddingInterval, loadSheddingInterval,
TimeUnit.MILLISECONDS);
+ }
}
@Override
public void close() {
if (this.task != null) {
this.task.cancel(false);
+ this.task = null;
}
this.recentlyUnloadedBundles.clear();
this.recentlyUnloadedBrokers.clear();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
index 512811e1019..680a36523a2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance.extensions.store;
import java.io.Closeable;
+import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -74,4 +75,15 @@ public interface LoadDataStore<T> extends Closeable {
*/
int size();
+
+ /**
+ * Closes the table view.
+ */
+ void closeTableView() throws IOException;
+
+ /**
+ * Starts the table view.
+ */
+ void startTableView() throws LoadDataStoreException;
+
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
index 3909de2afa2..a400163ebf1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
@@ -36,14 +37,22 @@ import org.apache.pulsar.client.api.TableView;
*/
public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
- private final TableView<T> tableView;
+ private TableView<T> tableView;
private final Producer<T> producer;
+ private final PulsarClient client;
+
+ private final String topic;
+
+ private final Class<T> clazz;
+
public TableViewLoadDataStoreImpl(PulsarClient client, String topic,
Class<T> clazz) throws LoadDataStoreException {
try {
- this.tableView =
client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create();
+ this.client = client;
this.producer =
client.newProducer(Schema.JSON(clazz)).topic(topic).create();
+ this.topic = topic;
+ this.clazz = clazz;
} catch (Exception e) {
throw new LoadDataStoreException(e);
}
@@ -61,30 +70,59 @@ public class TableViewLoadDataStoreImpl<T> implements
LoadDataStore<T> {
@Override
public Optional<T> get(String key) {
+ validateTableViewStart();
return Optional.ofNullable(tableView.get(key));
}
@Override
public void forEach(BiConsumer<String, T> action) {
+ validateTableViewStart();
tableView.forEach(action);
}
public Set<Map.Entry<String, T>> entrySet() {
+ validateTableViewStart();
return tableView.entrySet();
}
@Override
public int size() {
+ validateTableViewStart();
return tableView.size();
}
+ @Override
+ public void closeTableView() throws IOException {
+ if (tableView != null) {
+ tableView.close();
+ tableView = null;
+ }
+ }
+
+ @Override
+ public void startTableView() throws LoadDataStoreException {
+ if (tableView == null) {
+ try {
+ tableView =
client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create();
+ } catch (PulsarClientException e) {
+ tableView = null;
+ throw new LoadDataStoreException(e);
+ }
+ }
+ }
+
@Override
public void close() throws IOException {
if (producer != null) {
producer.close();
}
- if (tableView != null) {
- tableView.close();
+ closeTableView();
+ }
+
+ private void validateTableViewStart() {
+ if (tableView == null) {
+ throw new IllegalStateException("table view has not been started");
}
}
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 17fcb995261..aa8583c6b57 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -35,6 +35,7 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecis
import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
@@ -42,6 +43,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -49,6 +52,8 @@ import com.google.common.collect.Sets;
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -74,12 +79,14 @@ import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateC
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
+import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.resources.NamespaceResources;
@@ -100,6 +107,7 @@ import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -143,22 +151,9 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
additionalPulsarTestContext =
createAdditionalPulsarTestContext(defaultConf);
pulsar2 = additionalPulsarTestContext.getPulsarService();
- ExtensibleLoadManagerWrapper primaryLoadManagerWrapper =
- (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
- primaryLoadManager = spy((ExtensibleLoadManagerImpl)
- FieldUtils.readField(primaryLoadManagerWrapper, "loadManager",
true));
- FieldUtils.writeField(primaryLoadManagerWrapper, "loadManager",
primaryLoadManager, true);
-
- ExtensibleLoadManagerWrapper secondaryLoadManagerWrapper =
- (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
- secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
- FieldUtils.readField(secondaryLoadManagerWrapper,
"loadManager", true));
- FieldUtils.writeField(secondaryLoadManagerWrapper, "loadManager",
secondaryLoadManager, true);
+ setPrimaryLoadManager();
- channel1 = (ServiceUnitStateChannelImpl)
- FieldUtils.readField(primaryLoadManager,
"serviceUnitStateChannel", true);
- channel2 = (ServiceUnitStateChannelImpl)
- FieldUtils.readField(secondaryLoadManager,
"serviceUnitStateChannel", true);
+ setSecondaryLoadManager();
admin.clusters().createCluster(this.conf.getClusterName(),
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
@@ -412,6 +407,130 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
assertEquals(brokerLookupData.get().getWebServiceUrl(),
pulsar2.getWebServiceAddress());
}
+ @Test
+ public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws
Exception {
+ var topBundlesLoadDataStorePrimary =
+ (LoadDataStore)
FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore",
true);
+ var serviceUnitStateChannelPrimary =
+ (ServiceUnitStateChannelImpl)
FieldUtils.readDeclaredField(primaryLoadManager,
+ "serviceUnitStateChannel", true);
+ var tvPrimary =
+ (TableViewImpl)
FieldUtils.readDeclaredField(topBundlesLoadDataStorePrimary, "tableView", true);
+
+ var topBundlesLoadDataStoreSecondary =
+ (LoadDataStore)
FieldUtils.readDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore",
true);
+ var tvSecondary =
+ (TableViewImpl)
FieldUtils.readDeclaredField(topBundlesLoadDataStoreSecondary, "tableView",
true);
+
+ if (serviceUnitStateChannelPrimary.isChannelOwnerAsync().get(5,
TimeUnit.SECONDS)) {
+ assertNotNull(tvPrimary);
+ assertNull(tvSecondary);
+ } else {
+ assertNull(tvPrimary);
+ assertNotNull(tvSecondary);
+ }
+
+ restartBroker();
+ pulsar1 = pulsar;
+ setPrimaryLoadManager();
+
+ var serviceUnitStateChannelPrimaryNew =
+ (ServiceUnitStateChannelImpl)
FieldUtils.readDeclaredField(primaryLoadManager,
+ "serviceUnitStateChannel", true);
+ var topBundlesLoadDataStorePrimaryNew =
+ (LoadDataStore)
FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore"
+ , true);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+
assertFalse(serviceUnitStateChannelPrimaryNew.isChannelOwnerAsync().get(5,
TimeUnit.SECONDS));
+
assertNotNull(FieldUtils.readDeclaredField(topBundlesLoadDataStoreSecondary,
"tableView"
+ , true));
+
assertNull(FieldUtils.readDeclaredField(topBundlesLoadDataStorePrimaryNew,
"tableView"
+ , true));
+ }
+ );
+ }
+
+ @Test
+ public void testRoleChange()
+ throws Exception {
+
+
+ var topBundlesLoadDataStorePrimary =
(LoadDataStore<TopBundlesLoadData>)
+ FieldUtils.readDeclaredField(primaryLoadManager,
"topBundlesLoadDataStore", true);
+ var topBundlesLoadDataStorePrimarySpy =
spy(topBundlesLoadDataStorePrimary);
+ AtomicInteger countPri = new AtomicInteger(3);
+ AtomicInteger countPri2 = new AtomicInteger(3);
+ doAnswer(invocationOnMock -> {
+ if (countPri.decrementAndGet() > 0) {
+ throw new RuntimeException();
+ }
+ // Call the real method
+ reset();
+ return null;
+ }).when(topBundlesLoadDataStorePrimarySpy).startTableView();
+ doAnswer(invocationOnMock -> {
+ if (countPri2.decrementAndGet() > 0) {
+ throw new RuntimeException();
+ }
+ // Call the real method
+ reset();
+ return null;
+ }).when(topBundlesLoadDataStorePrimarySpy).closeTableView();
+ FieldUtils.writeDeclaredField(primaryLoadManager,
"topBundlesLoadDataStore", topBundlesLoadDataStorePrimarySpy, true);
+
+ var topBundlesLoadDataStoreSecondary =
(LoadDataStore<TopBundlesLoadData>)
+ FieldUtils.readDeclaredField(secondaryLoadManager,
"topBundlesLoadDataStore", true);
+ var topBundlesLoadDataStoreSecondarySpy =
spy(topBundlesLoadDataStoreSecondary);
+ AtomicInteger countSec = new AtomicInteger(3);
+ AtomicInteger countSec2 = new AtomicInteger(3);
+ doAnswer(invocationOnMock -> {
+ if (countSec.decrementAndGet() > 0) {
+ throw new RuntimeException();
+ }
+ // Call the real method
+ reset();
+ return null;
+ }).when(topBundlesLoadDataStoreSecondarySpy).startTableView();
+ doAnswer(invocationOnMock -> {
+ if (countSec2.decrementAndGet() > 0) {
+ throw new RuntimeException();
+ }
+ // Call the real method
+ reset();
+ return null;
+ }).when(topBundlesLoadDataStoreSecondarySpy).closeTableView();
+ FieldUtils.writeDeclaredField(secondaryLoadManager,
"topBundlesLoadDataStore", topBundlesLoadDataStoreSecondarySpy, true);
+
+ if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
+ primaryLoadManager.playFollower();
+ primaryLoadManager.playFollower();
+ secondaryLoadManager.playLeader();
+ secondaryLoadManager.playLeader();
+ primaryLoadManager.playLeader();
+ primaryLoadManager.playLeader();
+ secondaryLoadManager.playFollower();
+ secondaryLoadManager.playFollower();
+ } else {
+ primaryLoadManager.playLeader();
+ primaryLoadManager.playLeader();
+ secondaryLoadManager.playFollower();
+ secondaryLoadManager.playFollower();
+ primaryLoadManager.playFollower();
+ primaryLoadManager.playFollower();
+ secondaryLoadManager.playLeader();
+ secondaryLoadManager.playLeader();
+ }
+
+
+ verify(topBundlesLoadDataStorePrimarySpy, times(3)).startTableView();
+ verify(topBundlesLoadDataStorePrimarySpy, times(3)).closeTableView();
+ verify(topBundlesLoadDataStoreSecondarySpy, times(3)).startTableView();
+ verify(topBundlesLoadDataStoreSecondarySpy, times(3)).closeTableView();
+
+ FieldUtils.writeDeclaredField(primaryLoadManager,
"topBundlesLoadDataStore", topBundlesLoadDataStorePrimary, true);
+ FieldUtils.writeDeclaredField(secondaryLoadManager,
"topBundlesLoadDataStore", topBundlesLoadDataStoreSecondary, true);
+ }
+
@Test
public void testGetMetrics() throws Exception {
{
@@ -612,6 +731,26 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
cache.clear();
}
+ private void setPrimaryLoadManager() throws IllegalAccessException {
+ ExtensibleLoadManagerWrapper wrapper =
+ (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
+ primaryLoadManager = spy((ExtensibleLoadManagerImpl)
+ FieldUtils.readField(wrapper, "loadManager", true));
+ FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager,
true);
+ channel1 = (ServiceUnitStateChannelImpl)
+ FieldUtils.readField(primaryLoadManager,
"serviceUnitStateChannel", true);
+ }
+
+ private void setSecondaryLoadManager() throws IllegalAccessException {
+ ExtensibleLoadManagerWrapper wrapper =
+ (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
+ secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
+ FieldUtils.readField(wrapper, "loadManager", true));
+ FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager,
true);
+ channel2 = (ServiceUnitStateChannelImpl)
+ FieldUtils.readField(secondaryLoadManager,
"serviceUnitStateChannel", true);
+ }
+
private CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService
pulsar, TopicName topic) {
return pulsar.getNamespaceService().getBundleAsync(topic);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 64a5f63196b..e0bd69fce64 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -89,6 +89,7 @@ import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
@@ -1497,6 +1498,20 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
doReturn(registry).when(channel).getBrokerRegistry();
doReturn(brokerSelector).when(channel).getBrokerSelector();
+
+ var leaderElectionService = new LeaderElectionService(
+ pulsar.getCoordinationService(),
pulsar.getSafeWebServiceAddress(),
+ state -> {
+ if (state == LeaderElectionState.Leading) {
+ channel.scheduleOwnershipMonitor();
+ } else {
+ channel.cancelOwnershipMonitor();
+ }
+ });
+ leaderElectionService.start();
+
+
doReturn(leaderElectionService).when(channel).getLeaderElectionService();
+
return channel;
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
index 3de957c3b1a..c521861471c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
@@ -34,6 +34,7 @@ import
org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
+import
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
public class BrokerFilterTestBase {
@@ -82,6 +83,16 @@ public class BrokerFilterTestBase {
public int size() {
return map.size();
}
+
+ @Override
+ public void closeTableView() throws IOException {
+
+ }
+
+ @Override
+ public void startTableView() throws LoadDataStoreException {
+
+ }
};
configuration.setPreferLaterVersions(true);
doReturn(configuration).when(mockContext).brokerConfiguration();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 47bf1ad2e7e..3955f1ed9af 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -68,6 +68,7 @@ import
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import
org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
import
org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
+import
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
import
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
@@ -267,6 +268,16 @@ public class TransferShedderTest {
public int size() {
return map.size();
}
+
+ @Override
+ public void closeTableView() throws IOException {
+
+ }
+
+ @Override
+ public void startTableView() throws LoadDataStoreException {
+
+ }
};
var topBundleLoadDataStore = new LoadDataStore<TopBundlesLoadData>() {
@@ -310,6 +321,16 @@ public class TransferShedderTest {
public int size() {
return map.size();
}
+
+ @Override
+ public void closeTableView() throws IOException {
+
+ }
+
+ @Override
+ public void startTableView() throws LoadDataStoreException {
+
+ }
};
BrokerRegistry brokerRegistry = mock(BrokerRegistry.class);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
index 5e2924cd842..184c337a47c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.loadbalance.extensions.store;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;
import com.google.common.collect.Sets;
@@ -74,6 +75,7 @@ public class LoadDataStoreTest extends
MockedPulsarServiceBaseTest {
@Cleanup
LoadDataStore<MyClass> loadDataStore =
LoadDataStoreFactory.create(pulsar.getClient(), topic,
MyClass.class);
+ loadDataStore.startTableView();
MyClass myClass1 = new MyClass("1", 1);
loadDataStore.pushAsync("key1", myClass1).get();
@@ -106,6 +108,7 @@ public class LoadDataStoreTest extends
MockedPulsarServiceBaseTest {
@Cleanup
LoadDataStore<Integer> loadDataStore =
LoadDataStoreFactory.create(pulsar.getClient(), topic,
Integer.class);
+ loadDataStore.startTableView();
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < 10; i++) {
@@ -124,4 +127,28 @@ public class LoadDataStoreTest extends
MockedPulsarServiceBaseTest {
assertEquals(loadDataStore.entrySet(), map.entrySet());
}
+ @Test
+ public void testTableViewRestart() throws Exception {
+ String topic = TopicDomain.persistent + "://" +
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
+ LoadDataStore<Integer> loadDataStore =
+ LoadDataStoreFactory.create(pulsar.getClient(), topic,
Integer.class);
+
+ loadDataStore.startTableView();
+ loadDataStore.pushAsync("1", 1).get();
+ Awaitility.await().untilAsserted(() ->
assertEquals(loadDataStore.size(), 1));
+ assertEquals(loadDataStore.get("1").get(), 1);
+ loadDataStore.closeTableView();
+
+ loadDataStore.pushAsync("1", 2).get();
+ Exception ex = null;
+ try {
+ loadDataStore.get("1");
+ } catch (IllegalStateException e) {
+ ex = e;
+ }
+ assertNotNull(ex);
+ loadDataStore.startTableView();
+ Awaitility.await().untilAsserted(() ->
assertEquals(loadDataStore.get("1").get(), 2));
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
index ebc2424cada..0eea1d87513 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
+import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -36,6 +37,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
+import
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
@@ -244,6 +246,16 @@ public class LeastResourceUsageWithWeightTest {
public int size() {
return map.size();
}
+
+ @Override
+ public void closeTableView() throws IOException {
+
+ }
+
+ @Override
+ public void startTableView() throws LoadDataStoreException {
+
+ }
};
doReturn(conf).when(ctx).brokerConfiguration();