eolivelli commented on a change in pull request #10391:
URL: https://github.com/apache/pulsar/pull/10391#discussion_r622768899
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
##########
@@ -63,33 +53,17 @@ public void start() throws PulsarServerException {
+ pulsar.getConfiguration().getWebServicePort().get();
localResourceUnit = new SimpleResourceUnit(String.format("http://%s",
lookupServiceAddress),
new PulsarResourceDescription());
- zkClient = pulsar.getZkClient();
- localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(),
pulsar.getWebServiceAddressTls(),
+ LocalBrokerData localData = new
LocalBrokerData(pulsar.getSafeWebServiceAddress(),
+ pulsar.getWebServiceAddressTls(),
pulsar.getSafeBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls());
localData.setProtocols(pulsar.getProtocolDataToAdvertise());
- String brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" +
lookupServiceAddress;
+ String brokerReportPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" +
lookupServiceAddress;
try {
- // When running in standalone, this error can happen when killing
the "standalone" process
- // ungracefully since the ZK session will not be closed and it
will take some time for ZK server
- // to prune the expired sessions after startup.
- // Since there's a single broker instance running, it's safe, in
this mode, to remove the old lock
-
- // Delete and recreate z-node
- try {
- if (zkClient.exists(brokerZnodePath, null) != null) {
- zkClient.delete(brokerZnodePath, -1);
- }
- } catch (NoNodeException nne) {
- // Ignore if z-node was just expired
- }
-
- ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath,
localData.getJsonBytes(),
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-
- } catch (Exception e) {
- throw new PulsarServerException(e);
+ lockManager.acquireLock(brokerReportPath, localData).join();
+ } catch (CompletionException ce) {
Review comment:
catching CompletionException sounds unusual to me, is it because of
`.join()` ?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
##########
@@ -296,6 +272,22 @@ public LocalBrokerData deserialize(String key, byte[]
content) throws Exception
loadSheddingPipeline.add(createLoadSheddingStrategy());
}
+ @Override
+ public void accept(Notification t) {
+ if (t.getPath().startsWith(LoadManager.LOADBALANCE_BROKERS_ROOT)) {
Review comment:
thinking out loud:
we should have some simpler way to filter out the notifications
now when you register a Listener it is receiving all of the notifications
a better API would be:
` pulsar.getLocalMetadataStore().registerListener(this, (Notification t) ->
t.getPath().startsWith(LoadManager.LOADBALANCE_BROKERS_ROOT));`
with an utility for "startsWith":
`
pulsar.getLocalMetadataStore().registerListener(this,LoadManager.LOADBALANCE_BROKERS_ROOT));`
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
##########
@@ -363,24 +343,27 @@ private void reapDeadBrokerPreallocations(Set<String>
aliveBrokers) {
@Override
public Set<String> getAvailableBrokers() {
try {
- return availableActiveBrokers.get();
+ return new
TreeSet<>(brokersData.listLocks(LoadManager.LOADBALANCE_BROKERS_ROOT).get());
} catch (Exception e) {
log.warn("Error when trying to get active brokers", e);
Review comment:
in case of ZK problem, do we see lots of stacktraces written in logs ?
does it make sense to strip out the stacktrace here ?
(we should also deal with InterruptedException), as this method can be
called from anywhere
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
##########
@@ -401,29 +384,24 @@ private BundleData getBundleDataOrDefault(final String
bundle) {
longTermData.setNumSamples(NUM_LONG_SAMPLES);
}
} catch (Exception e) {
- log.warn("Error when trying to find bundle {} on zookeeper: {}",
bundle, e);
+ log.warn("Error when trying to find bundle {} on metadata store:
{}", bundle, e);
Review comment:
should we handle InterruptedException here ?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
##########
@@ -236,19 +228,12 @@ public void initialize(final PulsarService pulsar) {
lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
lastLoadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
- loadReportCacheZk = new
ZooKeeperDataCache<LoadReport>(pulsar.getLocalZkCache()) {
- @Override
- public LoadReport deserialize(String key, byte[] content) throws
Exception {
- return ObjectMapperFactory.getThreadLocal().readValue(content,
LoadReport.class);
- }
- };
- loadReportCacheZk.registerListener(this);
- this.dynamicConfigurationCache = new ZooKeeperDataCache<Map<String,
String>>(pulsar.getLocalZkCache()) {
- @Override
- public Map<String, String> deserialize(String key, byte[] content)
throws Exception {
- return ObjectMapperFactory.getThreadLocal().readValue(content,
HashMap.class);
- }
- };
+ loadReports =
pulsar.getCoordinationService().getLockManager(LoadReport.class);
+ pulsar.getLocalMetadataStore().registerListener(this);
+ this.dynamicConfigurationCache =
pulsar.getLocalMetadataStore().getMetadataCache(
+ new TypeReference<Map<String, String>>() {
Review comment:
for the future:
here we are subclassing TypeReference with an anonymous class. can we
provide a better API for this case ?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
##########
@@ -341,88 +281,69 @@ public void start() throws PulsarServerException {
LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_MEM_ZPATH,
SETTING_NAME_LOAD_FACTOR_MEM,
this.realtimeMemoryLoadFactor);
} catch (Exception e) {
- log.error("Unable to create znode - [{}] for load balance on
zookeeper ", brokerZnodePath, e);
+ log.error("Unable to create node - [{}] for load balance on
metadata store", brokerLockPath, e);
throw new PulsarServerException(e);
}
}
@Override
public void disableBroker() throws Exception {
- if (isNotEmpty(brokerZnodePath)) {
- pulsar.getZkClient().delete(brokerZnodePath, -1);
+ if (brokerLock != null) {
+ brokerLock.release().join();
}
}
- @Override
- public Deserializer<LoadReport> getLoadReportDeserializer() {
- return loadReportDeserializer;
- }
-
- public ZooKeeperChildrenCache getActiveBrokersCache() {
- return this.availableActiveBrokers;
- }
-
@Override
public Set<String> getAvailableBrokers() throws Exception {
- return this.availableActiveBrokers.get();
- }
-
- public ZooKeeperDataCache<LoadReport> getLoadReportCache() {
- return this.loadReportCacheZk;
+ return new
HashSet<>(loadReports.listLocks(LOADBALANCE_BROKERS_ROOT).join());
Review comment:
I am seeing this "listLocks(SOMETHING).join" quite often, perhaps we
could add a specific method in order to not need to write always this
complicated statement.
Also it is common to require a "copy", like in this case and in other points
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]