This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e91574ac7b4 [fix][broker] Fix unloadNamespaceBundlesGracefully can be
stuck with extensible load manager (#23349)
e91574ac7b4 is described below
commit e91574ac7b44348a05f1ae812c5aae3abb26fe64
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Sep 27 17:33:28 2024 +0800
[fix][broker] Fix unloadNamespaceBundlesGracefully can be stuck with
extensible load manager (#23349)
---
.../org/apache/pulsar/broker/PulsarService.java | 7 +-
.../extensions/ExtensibleLoadManagerImpl.java | 130 +++++++++++++++------
.../extensions/ExtensibleLoadManagerWrapper.java | 2 +-
.../channel/ServiceUnitStateChannelImpl.java | 43 +++++--
.../extensions/channel/ServiceUnitStateData.java | 2 +-
.../ServiceUnitStateDataConflictResolver.java | 2 +-
.../channel/ServiceUnitStateTableViewImpl.java | 19 ++-
.../filter/BrokerMaxTopicCountFilter.java | 7 +-
.../store/TableViewLoadDataStoreImpl.java | 10 +-
.../pulsar/broker/service/BrokerService.java | 7 +-
.../SystemTopicBasedTopicPoliciesService.java | 32 +++--
.../extensions/ExtensibleLoadManagerCloseTest.java | 50 ++++++--
.../extensions/store/LoadDataStoreTest.java | 3 +-
.../apache/pulsar/client/impl/TableViewImpl.java | 7 +-
14 files changed, 240 insertions(+), 81 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index a2f6fb9e977..6c768a07897 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -513,6 +513,9 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
return closeFuture;
}
LOG.info("Closing PulsarService");
+ if (topicPoliciesService != null) {
+ topicPoliciesService.close();
+ }
if (brokerService != null) {
brokerService.unloadNamespaceBundlesGracefully();
}
@@ -633,10 +636,6 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
transactionBufferClient.close();
}
- if (topicPoliciesService != null) {
- topicPoliciesService.close();
- topicPoliciesService = null;
- }
if (client != null) {
client.close();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 98ef6bf36ed..841f9bfb669 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -181,7 +181,14 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
private SplitManager splitManager;
- volatile boolean started = false;
+ enum State {
+ INIT,
+ RUNNING,
+ // It's removing visibility of the current broker from other brokers.
In this state, it cannot play as a leader
+ // or follower.
+ DISABLED,
+ }
+ private final AtomicReference<State> state = new
AtomicReference<>(State.INIT);
private boolean configuredSystemTopics = false;
@@ -214,7 +221,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
}
public Set<NamespaceBundle> getOwnedServiceUnits() {
- if (!started) {
+ if (state.get() == State.INIT) {
log.warn("Failed to get owned service units, load manager is not
started.");
return Collections.emptySet();
}
@@ -344,7 +351,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
@Override
public void start() throws PulsarServerException {
- if (this.started) {
+ if (state.get() != State.INIT) {
return;
}
try {
@@ -443,7 +450,9 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
this.splitScheduler.start();
this.initWaiter.complete(true);
- this.started = true;
+ if (!state.compareAndSet(State.INIT, State.RUNNING)) {
+ failForUnexpectedState("start");
+ }
log.info("Started load manager.");
} catch (Throwable e) {
failStarting(e);
@@ -615,21 +624,17 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
filter.filterAsync(availableBrokerCandidates,
bundle, context);
futures.add(future);
}
- CompletableFuture<Optional<String>> result = new
CompletableFuture<>();
- FutureUtil.waitForAll(futures).whenComplete((__, ex) -> {
- if (ex != null) {
- // TODO: We may need to revisit this error case.
- log.error("Failed to filter out brokers when
select bundle: {}", bundle, ex);
- }
+ return FutureUtil.waitForAll(futures).exceptionally(e -> {
+ // TODO: We may need to revisit this error case.
+ log.error("Failed to filter out brokers when select
bundle: {}", bundle, e);
+ return null;
+ }).thenApply(__ -> {
if (availableBrokerCandidates.isEmpty()) {
- result.complete(Optional.empty());
- return;
+ return Optional.empty();
}
Set<String> candidateBrokers =
availableBrokerCandidates.keySet();
-
-
result.complete(getBrokerSelectionStrategy().select(candidateBrokers, bundle,
context));
+ return
getBrokerSelectionStrategy().select(candidateBrokers, bundle, context);
});
- return result;
});
}
@@ -667,6 +672,9 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
boolean force,
long timeout,
TimeUnit
timeoutUnit) {
+ if (state.get() == State.INIT) {
+ return CompletableFuture.completedFuture(null);
+ }
if
(NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString()))
{
log.info("Skip unloading namespace bundle: {}.", bundle);
return CompletableFuture.completedFuture(null);
@@ -755,24 +763,11 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
@Override
public void close() throws PulsarServerException {
- if (!this.started) {
+ if (state.get() == State.INIT) {
return;
}
try {
- if (brokerLoadDataReportTask != null) {
- brokerLoadDataReportTask.cancel(true);
- }
-
- if (topBundlesLoadDataReportTask != null) {
- topBundlesLoadDataReportTask.cancel(true);
- }
-
- if (monitorTask != null) {
- monitorTask.cancel(true);
- }
-
- this.brokerLoadDataStore.shutdown();
- this.topBundlesLoadDataStore.shutdown();
+ stopLoadDataReportTasks();
this.unloadScheduler.close();
this.splitScheduler.close();
this.serviceUnitStateTableViewSyncer.close();
@@ -791,7 +786,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
} catch (Exception e) {
throw new PulsarServerException(e);
} finally {
- this.started = false;
+ state.set(State.INIT);
}
}
@@ -799,6 +794,28 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
}
}
+ private void stopLoadDataReportTasks() {
+ if (brokerLoadDataReportTask != null) {
+ brokerLoadDataReportTask.cancel(true);
+ }
+ if (topBundlesLoadDataReportTask != null) {
+ topBundlesLoadDataReportTask.cancel(true);
+ }
+ if (monitorTask != null) {
+ monitorTask.cancel(true);
+ }
+ try {
+ brokerLoadDataStore.shutdown();
+ } catch (IOException e) {
+ log.warn("Failed to shutdown brokerLoadDataStore", e);
+ }
+ try {
+ topBundlesLoadDataStore.shutdown();
+ } catch (IOException e) {
+ log.warn("Failed to shutdown topBundlesLoadDataStore", e);
+ }
+ }
+
public static boolean isInternalTopic(String topic) {
return INTERNAL_TOPICS.contains(topic)
|| topic.startsWith(TOPIC)
@@ -814,13 +831,16 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
boolean becameFollower = false;
while (!Thread.currentThread().isInterrupted()) {
try {
- if (!initWaiter.get()) {
+ if (!initWaiter.get() || disabled()) {
return;
}
if (!serviceUnitStateChannel.isChannelOwner()) {
becameFollower = true;
break;
}
+ if (disabled()) {
+ return;
+ }
// Confirm the system topics have been created or create them
if they do not exist.
// If the leader has changed, the new leader need to reset
// the local brokerService.topics (by this topic creations).
@@ -835,6 +855,11 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
}
break;
} catch (Throwable e) {
+ if (disabled()) {
+ log.warn("The broker:{} failed to set the role but exit
because it's disabled",
+ pulsar.getBrokerId(), e);
+ return;
+ }
log.warn("The broker:{} failed to set the role. Retrying {} th
...",
pulsar.getBrokerId(), ++retry, e);
try {
@@ -846,6 +871,9 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
}
}
}
+ if (disabled()) {
+ return;
+ }
if (becameFollower) {
log.warn("The broker:{} became follower while initializing leader
role.", pulsar.getBrokerId());
@@ -869,13 +897,16 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
boolean becameLeader = false;
while (!Thread.currentThread().isInterrupted()) {
try {
- if (!initWaiter.get()) {
+ if (!initWaiter.get() || disabled()) {
return;
}
if (serviceUnitStateChannel.isChannelOwner()) {
becameLeader = true;
break;
}
+ if (disabled()) {
+ return;
+ }
unloadScheduler.close();
serviceUnitStateChannel.cancelOwnershipMonitor();
closeInternalTopics();
@@ -885,6 +916,11 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
serviceUnitStateTableViewSyncer.close();
break;
} catch (Throwable e) {
+ if (disabled()) {
+ log.warn("The broker:{} failed to set the role but exit
because it's disabled",
+ pulsar.getBrokerId(), e);
+ return;
+ }
log.warn("The broker:{} failed to set the role. Retrying {} th
...",
pulsar.getBrokerId(), ++retry, e);
try {
@@ -896,6 +932,9 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
}
}
}
+ if (disabled()) {
+ return;
+ }
if (becameLeader) {
log.warn("This broker:{} became leader while initializing follower
role.", pulsar.getBrokerId());
@@ -982,9 +1021,20 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
}
public void disableBroker() throws Exception {
+ // TopicDoesNotExistException might be thrown and it's not
recoverable. Enable this flag to exit playFollower()
+ // or playLeader() quickly.
+ if (!state.compareAndSet(State.RUNNING, State.DISABLED)) {
+ failForUnexpectedState("disableBroker");
+ }
+ stopLoadDataReportTasks();
serviceUnitStateChannel.cleanOwnerships();
- leaderElectionService.close();
brokerRegistry.unregister();
+ leaderElectionService.close();
+ final var availableBrokers = brokerRegistry.getAvailableBrokersAsync()
+ .get(conf.getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
+ if (availableBrokers.isEmpty()) {
+ close();
+ }
// Close the internal topics (if owned any) after giving up the
possible leader role,
// so that the subsequent lookups could hit the next leader.
closeInternalTopics();
@@ -1018,4 +1068,16 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
protected ServiceUnitStateChannel
createServiceUnitStateChannel(PulsarService pulsar) {
return new ServiceUnitStateChannelImpl(pulsar);
}
+
+ private void failForUnexpectedState(String msg) {
+ throw new IllegalStateException("Failed to " + msg + ", state: " +
state.get());
+ }
+
+ boolean running() {
+ return state.get() == State.RUNNING;
+ }
+
+ private boolean disabled() {
+ return state.get() == State.DISABLED;
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
index 6a48607977b..35f6cfcbcf5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
@@ -51,7 +51,7 @@ public class ExtensibleLoadManagerWrapper implements
LoadManager {
}
public boolean started() {
- return loadManager.started &&
loadManager.getServiceUnitStateChannel().started();
+ return loadManager.running() &&
loadManager.getServiceUnitStateChannel().started();
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index ddbc9eacac9..ce975495feb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -255,6 +255,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
@Override
public void cleanOwnerships() {
+ disable();
doCleanup(brokerId, true);
}
@@ -412,9 +413,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (owner.isPresent()) {
return isTargetBroker(owner.get());
} else {
- String msg = "There is no channel owner now.";
- log.error(msg);
- throw new IllegalStateException(msg);
+ throw new IllegalStateException("There is no channel owner
now.");
}
});
}
@@ -679,11 +678,15 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
brokerId, serviceUnit, data, totalHandledRequests);
}
- if (channelState == Disabled) {
+ ServiceUnitState state = state(data);
+ if (channelState == Disabled && (data == null || !data.force())) {
+ final var request = getOwnerRequests.remove(serviceUnit);
+ if (request != null) {
+ request.completeExceptionally(new
BrokerServiceException.ServiceUnitNotReadyException(
+ "cancel the lookup request for " + serviceUnit + "
when receiving " + state));
+ }
return;
}
-
- ServiceUnitState state = state(data);
try {
switch (state) {
case Owned -> handleOwnEvent(serviceUnit, data);
@@ -851,7 +854,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
}
- private void handleFreeEvent(String serviceUnit, ServiceUnitStateData
data) {
+ private CompletableFuture<Void> handleFreeEvent(String serviceUnit,
ServiceUnitStateData data) {
var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
if (getOwnerRequest != null) {
getOwnerRequest.complete(null);
@@ -865,8 +868,10 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
:
CompletableFuture.completedFuture(0)).thenApply(__ -> null);
stateChangeListeners.notifyOnCompletion(future, serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
+ return future;
} else {
stateChangeListeners.notify(serviceUnit, data, null);
+ return CompletableFuture.completedFuture(null);
}
}
@@ -1273,7 +1278,11 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
return;
}
} catch (Exception e) {
- log.error("Failed to handle broker deletion event.", e);
+ if (e instanceof ExecutionException && e.getCause() instanceof
IllegalStateException) {
+ log.warn("Failed to handle broker deletion event due to {}",
e.getMessage());
+ } else {
+ log.error("Failed to handle broker deletion event.", e);
+ }
return;
}
MetadataState state = getMetadataState();
@@ -1293,6 +1302,11 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private void scheduleCleanup(String broker, long delayInSecs) {
var scheduled = new MutableObject<CompletableFuture<Void>>();
try {
+ final var channelState = this.channelState;
+ if (channelState == Disabled || channelState == Closed) {
+ log.warn("[{}] Skip scheduleCleanup because the state is {}
now", brokerId, channelState);
+ return;
+ }
cleanupJobs.computeIfAbsent(broker, k -> {
Executor delayed = CompletableFuture
.delayedExecutor(delayInSecs, TimeUnit.SECONDS,
pulsar.getLoadManagerExecutor());
@@ -1393,6 +1407,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (data.state() == Owned && broker.equals(data.dstBroker())) {
cleaned = false;
+ log.info("[{}] bundle {} is still owned by this, data:
{}", broker, serviceUnit, data);
break;
}
}
@@ -1400,10 +1415,15 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
break;
} else {
try {
-
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS);
+
tableview.flush(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS / 2);
+ Thread.sleep(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS /
2);
} catch (InterruptedException e) {
log.warn("Interrupted while delaying the next service unit
clean-up. Cleaning broker:{}",
brokerId);
+ } catch (ExecutionException e) {
+ log.error("Failed to flush table view", e.getCause());
+ } catch (TimeoutException e) {
+ log.warn("Failed to flush the table view in {} ms",
OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS);
}
}
}
@@ -1428,6 +1448,11 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
log.info("Started ownership cleanup for the inactive broker:{}",
broker);
int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
+ try {
+ tableview.flush(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS);
+ } catch (Exception e) {
+ log.error("Failed to flush", e);
+ }
Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new
HashMap<>();
for (var etr : tableview.entrySet()) {
var stateData = etr.getValue();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
index e85134e6116..4a990ddbc9b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
@@ -34,7 +34,7 @@ public record ServiceUnitStateData(
public ServiceUnitStateData {
Objects.requireNonNull(state);
- if (StringUtils.isBlank(dstBroker) &&
StringUtils.isBlank(sourceBroker)) {
+ if (state != ServiceUnitState.Free && StringUtils.isBlank(dstBroker)
&& StringUtils.isBlank(sourceBroker)) {
throw new IllegalArgumentException("Empty broker");
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java
index b1dbb6fac87..3e43237f4c0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java
@@ -145,4 +145,4 @@ public class ServiceUnitStateDataConflictResolver
implements TopicCompactionStra
|| !from.dstBroker().equals(to.sourceBroker())
|| from.dstBroker().equals(to.dstBroker());
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java
index 8dfaddcdabc..12cf87445a3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.broker.PulsarService;
import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.TopicDomain;
@@ -144,8 +145,13 @@ public class ServiceUnitStateTableViewImpl extends
ServiceUnitStateTableViewBase
.sendAsync()
.whenComplete((messageId, e) -> {
if (e != null) {
- log.error("Failed to publish the message:
serviceUnit:{}, data:{}",
- key, value, e);
+ if (e instanceof
PulsarClientException.AlreadyClosedException) {
+ log.info("Skip publishing the message since the
producer is closed, serviceUnit: {}, data: "
+ + "{}", key, value);
+ } else {
+ log.error("Failed to publish the message:
serviceUnit:{}, data:{}",
+ key, value, e);
+ }
future.completeExceptionally(e);
} else {
future.complete(null);
@@ -159,7 +165,14 @@ public class ServiceUnitStateTableViewImpl extends
ServiceUnitStateTableViewBase
if (!isValidState()) {
throw new IllegalStateException(INVALID_STATE_ERROR_MSG);
}
- producer.flushAsync().get(waitDurationInMillis, MILLISECONDS);
+ final var deadline = System.currentTimeMillis() + waitDurationInMillis;
+ var waitTimeMs = waitDurationInMillis;
+ producer.flushAsync().get(waitTimeMs, MILLISECONDS);
+ waitTimeMs = deadline - System.currentTimeMillis();
+ if (waitTimeMs < 0) {
+ waitTimeMs = 0;
+ }
+ tableview.refreshAsync().get(waitTimeMs, MILLISECONDS);
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
index 48213c18e63..9863d05ee75 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
@@ -41,7 +41,12 @@ public class BrokerMaxTopicCountFilter implements
BrokerFilter {
LoadManagerContext context) {
int loadBalancerBrokerMaxTopics =
context.brokerConfiguration().getLoadBalancerBrokerMaxTopics();
brokers.keySet().removeIf(broker -> {
- Optional<BrokerLoadData> brokerLoadDataOpt =
context.brokerLoadDataStore().get(broker);
+ final Optional<BrokerLoadData> brokerLoadDataOpt;
+ try {
+ brokerLoadDataOpt = context.brokerLoadDataStore().get(broker);
+ } catch (IllegalStateException ignored) {
+ return false;
+ }
long topics =
brokerLoadDataOpt.map(BrokerLoadData::getTopics).orElse(0L);
// TODO: The broker load data might be delayed, so the max topic
check might not accurate.
return topics >= loadBalancerBrokerMaxTopics;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
index c9d18676cfa..3ce44a1e65a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -92,7 +92,11 @@ public class TableViewLoadDataStoreImpl<T> implements
LoadDataStore<T> {
public synchronized Optional<T> get(String key) {
String msg = validateTableView();
if (StringUtils.isNotBlank(msg)) {
- throw new IllegalStateException(msg);
+ if (msg.equals(SHUTDOWN_ERR_MSG)) {
+ return Optional.empty();
+ } else {
+ throw new IllegalStateException(msg);
+ }
}
return Optional.ofNullable(tableView.get(key));
}
@@ -193,7 +197,9 @@ public class TableViewLoadDataStoreImpl<T> implements
LoadDataStore<T> {
@Override
public synchronized void close() throws IOException {
- validateState();
+ if (isShutdown) {
+ return;
+ }
closeProducer();
closeTableView();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 09f04d878c4..bfa99eedcad 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -973,7 +973,12 @@ public class BrokerService implements Closeable {
pulsar.getNamespaceService().unloadNamespaceBundle(su, timeout, MILLISECONDS,
closeWithoutWaitingClientDisconnect).get(timeout, MILLISECONDS);
} catch (Exception e) {
- log.warn("Failed to unload namespace bundle {}",
su, e);
+ if (e instanceof ExecutionException
+ && e.getCause() instanceof
ServiceUnitNotReadyException) {
+ log.warn("Failed to unload namespace bundle
{}: {}", su, e.getMessage());
+ } else {
+ log.warn("Failed to unload namespace bundle
{}", su, e);
+ }
}
}
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 18b4c610a5c..6ff6408916b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -254,7 +254,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
// initialization by calling this method. At the moment, the load
manager does not start so the lookup
// for "__change_events" will fail. In this case, just return an empty
policies to avoid deadlock.
final var loadManager = pulsarService.getLoadManager().get();
- if (loadManager == null || !loadManager.started()) {
+ if (loadManager == null || !loadManager.started() || closed.get()) {
return CompletableFuture.completedFuture(Optional.empty());
}
final CompletableFuture<Boolean> preparedFuture =
prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
@@ -308,6 +308,9 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
@VisibleForTesting
@Nonnull CompletableFuture<Boolean> prepareInitPoliciesCacheAsync(@Nonnull
NamespaceName namespace) {
requireNonNull(namespace);
+ if (closed.get()) {
+ return CompletableFuture.completedFuture(false);
+ }
return
pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
.thenCompose(namespacePolicies -> {
if (namespacePolicies.isEmpty() ||
namespacePolicies.get().deleted) {
@@ -331,6 +334,9 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
});
initFuture.exceptionally(ex -> {
try {
+ if (closed.get()) {
+ return null;
+ }
log.error("[{}] Failed to create
reader on __change_events topic",
namespace, ex);
cleanCacheAndCloseReader(namespace,
false);
@@ -681,14 +687,22 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
if (closed.compareAndSet(false, true)) {
writerCaches.synchronous().invalidateAll();
readerCaches.values().forEach(future -> {
- if (future != null && !future.isCompletedExceptionally()) {
- future.thenAccept(reader -> {
- try {
- reader.close();
- } catch (Exception e) {
- log.error("Failed to close reader.", e);
- }
- });
+ try {
+ final var reader = future.getNow(null);
+ if (reader != null) {
+ reader.close();
+ log.info("Closed the reader for topic policies");
+ } else {
+ // Avoid blocking the thread that the reader is created
+
future.thenAccept(SystemTopicClient.Reader::closeAsync).whenComplete((__, e) ->
{
+ if (e == null) {
+ log.info("Closed the reader for topic
policies");
+ } else {
+ log.error("Failed to close the reader for
topic policies", e);
+ }
+ });
+ }
+ } catch (Throwable ignored) {
}
});
readerCaches.clear();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
index 41413f3e3a9..fa63ce566c6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
@@ -22,13 +22,15 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -36,26 +38,33 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
+@Test(groups = "broker")
public class ExtensibleLoadManagerCloseTest {
private static final String clusterName = "test";
- private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(1,
0, () -> 0);
private final List<PulsarService> brokers = new ArrayList<>();
- private PulsarAdmin admin;
+ private LocalBookkeeperEnsemble bk;
@BeforeClass(alwaysRun = true)
public void setup() throws Exception {
+ bk = new LocalBookkeeperEnsemble(1, 0, () -> 0);
bk.start();
- for (int i = 0; i < 3; i++) {
+ }
+
+ private void setupBrokers(int numBrokers) throws Exception {
+ brokers.clear();
+ for (int i = 0; i < numBrokers; i++) {
final var broker = new PulsarService(brokerConfig());
broker.start();
brokers.add(broker);
}
- admin = brokers.get(0).getAdminClient();
- admin.clusters().createCluster(clusterName,
ClusterData.builder().build());
- admin.tenants().createTenant("public", TenantInfo.builder()
- .allowedClusters(Collections.singleton(clusterName)).build());
- admin.namespaces().createNamespace("public/default");
+ final var admin = brokers.get(0).getAdminClient();
+ if (!admin.clusters().getClusters().contains(clusterName)) {
+ admin.clusters().createCluster(clusterName,
ClusterData.builder().build());
+ admin.tenants().createTenant("public", TenantInfo.builder()
+
.allowedClusters(Collections.singleton(clusterName)).build());
+ admin.namespaces().createNamespace("public/default");
+ }
}
@@ -85,7 +94,9 @@ public class ExtensibleLoadManagerCloseTest {
@Test
public void testCloseAfterLoadingBundles() throws Exception {
+ setupBrokers(3);
final var topic = "test";
+ final var admin = brokers.get(0).getAdminClient();
admin.topics().createPartitionedTopic(topic, 20);
admin.lookups().lookupPartitionedTopic(topic);
final var client =
PulsarClient.builder().serviceUrl(brokers.get(0).getBrokerServiceUrl()).build();
@@ -104,4 +115,25 @@ public class ExtensibleLoadManagerCloseTest {
Assert.assertTrue(closeTimeMs < 5000L);
}
}
+
+ @Test
+ public void testLookup() throws Exception {
+ setupBrokers(1);
+ final var topic = "test-lookup";
+ final var numPartitions = 16;
+ final var admin = brokers.get(0).getAdminClient();
+ admin.topics().createPartitionedTopic(topic, numPartitions);
+
+ final var futures = new ArrayList<CompletableFuture<String>>();
+ for (int i = 0; i < numPartitions; i++) {
+ futures.add(admin.lookups().lookupTopicAsync(topic +
TopicName.PARTITIONED_TOPIC_SUFFIX + i));
+ }
+ FutureUtil.waitForAll(futures).get();
+
+ final var start = System.currentTimeMillis();
+ brokers.get(0).close();
+ final var closeTimeMs = System.currentTimeMillis() - start;
+ log.info("Broker close time: {}", closeTimeMs);
+ Assert.assertTrue(closeTimeMs < 5000L);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
index 3267e67ad2c..820307637be 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
@@ -198,7 +198,7 @@ public class LoadDataStoreTest extends
MockedPulsarServiceBaseTest {
Assert.assertTrue(loadDataStore.pushAsync("2",
2).isCompletedExceptionally());
Assert.assertTrue(loadDataStore.removeAsync("2").isCompletedExceptionally());
- assertThrows(IllegalStateException.class, () ->
loadDataStore.get("2"));
+ assertTrue(loadDataStore.get("2").isEmpty());
assertThrows(IllegalStateException.class, loadDataStore::size);
assertThrows(IllegalStateException.class, loadDataStore::entrySet);
assertThrows(IllegalStateException.class, () ->
loadDataStore.forEach((k, v) -> {}));
@@ -206,7 +206,6 @@ public class LoadDataStoreTest extends
MockedPulsarServiceBaseTest {
assertThrows(IllegalStateException.class, loadDataStore::start);
assertThrows(IllegalStateException.class,
loadDataStore::startProducer);
assertThrows(IllegalStateException.class,
loadDataStore::startTableView);
- assertThrows(IllegalStateException.class, loadDataStore::close);
assertThrows(IllegalStateException.class,
loadDataStore::closeTableView);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index d5d4174ee10..4f520604978 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -364,8 +364,8 @@ public class TableViewImpl<T> implements TableView<T> {
}
}).exceptionally(ex -> {
if (ex.getCause() instanceof
PulsarClientException.AlreadyClosedException) {
- log.error("Reader {} was closed while
reading existing messages.",
- reader.getTopic(), ex);
+ log.info("Reader {} was closed while
reading existing messages.",
+ reader.getTopic());
} else {
log.warn("Reader {} was interrupted
while reading existing messages. ",
reader.getTopic(), ex);
@@ -393,8 +393,7 @@ public class TableViewImpl<T> implements TableView<T> {
readTailMessages(reader);
}).exceptionally(ex -> {
if (ex.getCause() instanceof
PulsarClientException.AlreadyClosedException) {
- log.error("Reader {} was closed while reading tail
messages.",
- reader.getTopic(), ex);
+ log.info("Reader {} was closed while reading tail
messages.", reader.getTopic());
// Fail all refresh request when no more messages can
be read.
pendingRefreshRequests.keySet().forEach(future -> {
pendingRefreshRequests.remove(future);