This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new d42dacb2b2f [fix][broker] Fix unloadNamespaceBundlesGracefully can be
stuck with extensible load manager (#23349) (#23496)
d42dacb2b2f is described below
commit d42dacb2b2fab717ab83527ce38d8f5590391532
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Oct 23 04:25:22 2024 -0700
[fix][broker] Fix unloadNamespaceBundlesGracefully can be stuck with
extensible load manager (#23349) (#23496)
Co-authored-by: Yunze Xu <[email protected]>
---
.../org/apache/pulsar/broker/PulsarService.java | 7 +-
.../extensions/ExtensibleLoadManagerImpl.java | 132 +++++++++++++-----
.../extensions/ExtensibleLoadManagerWrapper.java | 4 +
.../channel/ServiceUnitStateChannel.java | 5 +
.../channel/ServiceUnitStateChannelImpl.java | 54 ++++++--
.../ServiceUnitStateCompactionStrategy.java | 2 +-
.../extensions/channel/ServiceUnitStateData.java | 2 +-
.../filter/BrokerMaxTopicCountFilter.java | 7 +-
.../extensions/store/LoadDataStore.java | 8 +-
.../store/TableViewLoadDataStoreImpl.java | 148 +++++++++++++++------
.../pulsar/broker/service/BrokerService.java | 7 +-
.../SystemTopicBasedTopicPoliciesService.java | 30 +++--
.../extensions/ExtensibleLoadManagerCloseTest.java | 50 +++++--
.../channel/ServiceUnitStateChannelTest.java | 7 +-
.../extensions/store/LoadDataStoreTest.java | 60 +++++++--
.../apache/pulsar/client/impl/TableViewImpl.java | 7 +-
16 files changed, 404 insertions(+), 126 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 559db301158..7698f3d119c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -463,6 +463,9 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
return closeFuture;
}
LOG.info("Closing PulsarService");
+ if (topicPoliciesService != null) {
+ topicPoliciesService.close();
+ }
if (brokerService != null) {
brokerService.unloadNamespaceBundlesGracefully();
}
@@ -578,10 +581,6 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
transactionBufferClient.close();
}
- if (topicPoliciesService != null) {
- topicPoliciesService.close();
- topicPoliciesService = null;
- }
if (client != null) {
client.close();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 8e34f2f697f..bf98df2c5ac 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -182,7 +182,14 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
private SplitManager splitManager;
- private volatile boolean started = false;
+ enum State {
+ INIT,
+ RUNNING,
+ // It's removing visibility of the current broker from other brokers.
In this state, it cannot play as a leader
+ // or follower.
+ DISABLED,
+ }
+ private final AtomicReference<State> state = new
AtomicReference<>(State.INIT);
private boolean configuredSystemTopics = false;
@@ -210,7 +217,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
* Get all the bundles that are owned by this broker.
*/
public CompletableFuture<Set<NamespaceBundle>> getOwnedServiceUnitsAsync()
{
- if (!started) {
+ if (state.get() == State.INIT) {
log.warn("Failed to get owned service units, load manager is not
started.");
return CompletableFuture.completedFuture(Collections.emptySet());
}
@@ -373,7 +380,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
@Override
public void start() throws PulsarServerException {
- if (this.started) {
+ if (state.get() != State.INIT) {
return;
}
try {
@@ -471,7 +478,9 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
this.splitScheduler.start();
this.initWaiter.complete(true);
- this.started = true;
+ if (!state.compareAndSet(State.INIT, State.RUNNING)) {
+ failForUnexpectedState("start");
+ }
log.info("Started load manager.");
} catch (Throwable e) {
failStarting(e);
@@ -643,21 +652,17 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
filter.filterAsync(availableBrokerCandidates,
bundle, context);
futures.add(future);
}
- CompletableFuture<Optional<String>> result = new
CompletableFuture<>();
- FutureUtil.waitForAll(futures).whenComplete((__, ex) -> {
- if (ex != null) {
- // TODO: We may need to revisit this error case.
- log.error("Failed to filter out brokers when
select bundle: {}", bundle, ex);
- }
+ return FutureUtil.waitForAll(futures).exceptionally(e -> {
+ // TODO: We may need to revisit this error case.
+ log.error("Failed to filter out brokers when select
bundle: {}", bundle, e);
+ return null;
+ }).thenApply(__ -> {
if (availableBrokerCandidates.isEmpty()) {
- result.complete(Optional.empty());
- return;
+ return Optional.empty();
}
Set<String> candidateBrokers =
availableBrokerCandidates.keySet();
-
-
result.complete(getBrokerSelectionStrategy().select(candidateBrokers, bundle,
context));
+ return
getBrokerSelectionStrategy().select(candidateBrokers, bundle, context);
});
- return result;
});
}
@@ -695,6 +700,9 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
boolean force,
long timeout,
TimeUnit
timeoutUnit) {
+ if (state.get() == State.INIT) {
+ return CompletableFuture.completedFuture(null);
+ }
if
(NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString()))
{
log.info("Skip unloading namespace bundle: {}.", bundle);
return CompletableFuture.completedFuture(null);
@@ -783,28 +791,13 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
@Override
public void close() throws PulsarServerException {
- if (!this.started) {
+ if (state.get() == State.INIT) {
return;
}
try {
- if (brokerLoadDataReportTask != null) {
- brokerLoadDataReportTask.cancel(true);
- }
-
- if (topBundlesLoadDataReportTask != null) {
- topBundlesLoadDataReportTask.cancel(true);
- }
-
- if (monitorTask != null) {
- monitorTask.cancel(true);
- }
-
- this.brokerLoadDataStore.close();
- this.topBundlesLoadDataStore.close();
+ stopLoadDataReportTasks();
this.unloadScheduler.close();
this.splitScheduler.close();
- } catch (IOException ex) {
- throw new PulsarServerException(ex);
} finally {
try {
this.brokerRegistry.close();
@@ -818,7 +811,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
} catch (Exception e) {
throw new PulsarServerException(e);
} finally {
- this.started = false;
+ state.set(State.INIT);
}
}
@@ -826,6 +819,28 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
}
}
+ private void stopLoadDataReportTasks() {
+ if (brokerLoadDataReportTask != null) {
+ brokerLoadDataReportTask.cancel(true);
+ }
+ if (topBundlesLoadDataReportTask != null) {
+ topBundlesLoadDataReportTask.cancel(true);
+ }
+ if (monitorTask != null) {
+ monitorTask.cancel(true);
+ }
+ try {
+ brokerLoadDataStore.shutdown();
+ } catch (IOException e) {
+ log.warn("Failed to shutdown brokerLoadDataStore", e);
+ }
+ try {
+ topBundlesLoadDataStore.shutdown();
+ } catch (IOException e) {
+ log.warn("Failed to shutdown topBundlesLoadDataStore", e);
+ }
+ }
+
public static boolean isInternalTopic(String topic) {
return INTERNAL_TOPICS.contains(topic)
|| topic.startsWith(TOPIC)
@@ -841,13 +856,16 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
boolean becameFollower = false;
while (!Thread.currentThread().isInterrupted()) {
try {
- if (!initWaiter.get()) {
+ if (!initWaiter.get() || disabled()) {
return;
}
if (!serviceUnitStateChannel.isChannelOwner()) {
becameFollower = true;
break;
}
+ if (disabled()) {
+ return;
+ }
// Confirm the system topics have been created or create them
if they do not exist.
// If the leader has changed, the new leader need to reset
// the local brokerService.topics (by this topic creations).
@@ -859,6 +877,11 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
serviceUnitStateChannel.scheduleOwnershipMonitor();
break;
} catch (Throwable e) {
+ if (disabled()) {
+ log.warn("The broker:{} failed to set the role but exit
because it's disabled",
+ pulsar.getBrokerId(), e);
+ return;
+ }
log.warn("The broker:{} failed to set the role. Retrying {} th
...",
pulsar.getBrokerId(), ++retry, e);
try {
@@ -870,6 +893,9 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
}
}
}
+ if (disabled()) {
+ return;
+ }
if (becameFollower) {
log.warn("The broker:{} became follower while initializing leader
role.", pulsar.getBrokerId());
@@ -893,13 +919,16 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
boolean becameLeader = false;
while (!Thread.currentThread().isInterrupted()) {
try {
- if (!initWaiter.get()) {
+ if (!initWaiter.get() || disabled()) {
return;
}
if (serviceUnitStateChannel.isChannelOwner()) {
becameLeader = true;
break;
}
+ if (disabled()) {
+ return;
+ }
unloadScheduler.close();
serviceUnitStateChannel.cancelOwnershipMonitor();
closeInternalTopics();
@@ -908,6 +937,11 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
topBundlesLoadDataStore.startProducer();
break;
} catch (Throwable e) {
+ if (disabled()) {
+ log.warn("The broker:{} failed to set the role but exit
because it's disabled",
+ pulsar.getBrokerId(), e);
+ return;
+ }
log.warn("The broker:{} failed to set the role. Retrying {} th
...",
pulsar.getBrokerId(), ++retry, e);
try {
@@ -919,6 +953,9 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
}
}
}
+ if (disabled()) {
+ return;
+ }
if (becameLeader) {
log.warn("This broker:{} became leader while initializing follower
role.", pulsar.getBrokerId());
@@ -997,9 +1034,20 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
}
public void disableBroker() throws Exception {
+ // TopicDoesNotExistException might be thrown and it's not
recoverable. Enable this flag to exit playFollower()
+ // or playLeader() quickly.
+ if (!state.compareAndSet(State.RUNNING, State.DISABLED)) {
+ failForUnexpectedState("disableBroker");
+ }
+ stopLoadDataReportTasks();
serviceUnitStateChannel.cleanOwnerships();
- leaderElectionService.close();
brokerRegistry.unregister();
+ leaderElectionService.close();
+ final var availableBrokers = brokerRegistry.getAvailableBrokersAsync()
+ .get(conf.getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
+ if (availableBrokers.isEmpty()) {
+ close();
+ }
// Close the internal topics (if owned any) after giving up the
possible leader role,
// so that the subsequent lookups could hit the next leader.
closeInternalTopics();
@@ -1033,4 +1081,16 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
protected ServiceUnitStateChannel
createServiceUnitStateChannel(PulsarService pulsar) {
return new ServiceUnitStateChannelImpl(pulsar);
}
+
+ private void failForUnexpectedState(String msg) {
+ throw new IllegalStateException("Failed to " + msg + ", state: " +
state.get());
+ }
+
+ boolean running() {
+ return state.get() == State.RUNNING;
+ }
+
+ private boolean disabled() {
+ return state.get() == State.DISABLED;
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
index 25eb27bc58d..35f6cfcbcf5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
@@ -50,6 +50,10 @@ public class ExtensibleLoadManagerWrapper implements
LoadManager {
loadManager.start();
}
+ public boolean started() {
+ return loadManager.running() &&
loadManager.getServiceUnitStateChannel().started();
+ }
+
@Override
public void initialize(PulsarService pulsar) {
loadManager.initialize(pulsar);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
index 9be76e1b0f4..6319fc332a6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
@@ -43,6 +43,11 @@ public interface ServiceUnitStateChannel extends Closeable {
*/
void start() throws PulsarServerException;
+ /**
+ * Whether the channel started.
+ */
+ boolean started();
+
/**
* Closes the ServiceUnitStateChannel.
* @throws PulsarServerException if it fails to close the channel.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 1a4b6eb3dfb..1f2715a00ac 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -32,6 +32,7 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.isInFlightState;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Closed;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Constructed;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Disabled;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.LeaderElectionServiceStarted;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Started;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Assign;
@@ -181,7 +182,8 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
Closed(0),
Constructed(1),
LeaderElectionServiceStarted(2),
- Started(3);
+ Started(3),
+ Disabled(4);
ChannelState(int id) {
this.id = id;
@@ -260,11 +262,19 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
}
+
+
@Override
public void cleanOwnerships() {
+ disable();
doCleanup(brokerId);
}
+ @Override
+ public synchronized boolean started() {
+ return validateChannelState(Started, true);
+ }
+
public synchronized void start() throws PulsarServerException {
if (!validateChannelState(LeaderElectionServiceStarted, false)) {
throw new IllegalStateException("Invalid channel state:" +
channelState.name());
@@ -442,9 +452,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (owner.isPresent()) {
return isTargetBroker(owner.get());
} else {
- String msg = "There is no channel owner now.";
- log.error(msg);
- throw new IllegalStateException(msg);
+ throw new IllegalStateException("There is no channel owner
now.");
}
});
}
@@ -618,7 +626,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
public CompletableFuture<String> publishAssignEventAsync(String
serviceUnit, String broker) {
- if (!validateChannelState(Started, true)) {
+ if (!validateChannelState(Started, true) || channelState == Disabled) {
return CompletableFuture.failedFuture(
new IllegalStateException("Invalid channel state:" +
channelState.name()));
}
@@ -699,6 +707,14 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
ServiceUnitState state = state(data);
+ if (channelState == Disabled && (data == null || !data.force())) {
+ final var request = getOwnerRequests.remove(serviceUnit);
+ if (request != null) {
+ request.completeExceptionally(new
BrokerServiceException.ServiceUnitNotReadyException(
+ "cancel the lookup request for " + serviceUnit + "
when receiving " + state));
+ }
+ return;
+ }
try {
switch (state) {
case Owned -> handleOwnEvent(serviceUnit, data);
@@ -866,7 +882,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
}
- private void handleFreeEvent(String serviceUnit, ServiceUnitStateData
data) {
+ private CompletableFuture<Void> handleFreeEvent(String serviceUnit,
ServiceUnitStateData data) {
var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
if (getOwnerRequest != null) {
getOwnerRequest.complete(null);
@@ -880,8 +896,10 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
:
CompletableFuture.completedFuture(0)).thenApply(__ -> null);
stateChangeListeners.notifyOnCompletion(future, serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
+ return future;
} else {
stateChangeListeners.notify(serviceUnit, data, null);
+ return CompletableFuture.completedFuture(null);
}
}
@@ -1258,8 +1276,17 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
private void handleBrokerDeletionEvent(String broker) {
- if (!isChannelOwner()) {
- log.warn("This broker is not the leader now. Ignoring
BrokerDeletionEvent for broker {}.", broker);
+ try {
+ if (!isChannelOwner()) {
+ log.warn("This broker is not the leader now. Ignoring
BrokerDeletionEvent for broker {}.", broker);
+ return;
+ }
+ } catch (Exception e) {
+ if (e instanceof ExecutionException && e.getCause() instanceof
IllegalStateException) {
+ log.warn("Failed to handle broker deletion event due to {}",
e.getMessage());
+ } else {
+ log.error("Failed to handle broker deletion event.", e);
+ }
return;
}
MetadataState state = getMetadataState();
@@ -1279,6 +1306,11 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private void scheduleCleanup(String broker, long delayInSecs) {
var scheduled = new MutableObject<CompletableFuture<Void>>();
try {
+ final var channelState = this.channelState;
+ if (channelState == Disabled || channelState == Closed) {
+ log.warn("[{}] Skip scheduleCleanup because the state is {}
now", brokerId, channelState);
+ return;
+ }
cleanupJobs.computeIfAbsent(broker, k -> {
Executor delayed = CompletableFuture
.delayedExecutor(delayInSecs, TimeUnit.SECONDS,
pulsar.getLoadManagerExecutor());
@@ -1371,6 +1403,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (data.state() == Owned && broker.equals(data.dstBroker())) {
cleaned = false;
+ log.info("[{}] bundle {} is still owned by this, data:
{}", broker, serviceUnit, data);
break;
}
}
@@ -1784,4 +1817,9 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
public static ServiceUnitStateChannel get(PulsarService pulsar) {
return
ExtensibleLoadManagerImpl.get(pulsar.getLoadManager().get()).getServiceUnitStateChannel();
}
+
+ @VisibleForTesting
+ protected void disable() {
+ channelState = Disabled;
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
index 6a98b79be81..7dd1035b220 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
@@ -131,4 +131,4 @@ public class ServiceUnitStateCompactionStrategy implements
TopicCompactionStrate
|| !from.dstBroker().equals(to.sourceBroker())
|| from.dstBroker().equals(to.dstBroker());
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
index 307d3a4acb1..61a1606ba6e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java
@@ -34,7 +34,7 @@ public record ServiceUnitStateData(
public ServiceUnitStateData {
Objects.requireNonNull(state);
- if (StringUtils.isBlank(dstBroker) &&
StringUtils.isBlank(sourceBroker)) {
+ if (state != ServiceUnitState.Free && StringUtils.isBlank(dstBroker)
&& StringUtils.isBlank(sourceBroker)) {
throw new IllegalArgumentException("Empty broker");
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
index 48213c18e63..9863d05ee75 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
@@ -41,7 +41,12 @@ public class BrokerMaxTopicCountFilter implements
BrokerFilter {
LoadManagerContext context) {
int loadBalancerBrokerMaxTopics =
context.brokerConfiguration().getLoadBalancerBrokerMaxTopics();
brokers.keySet().removeIf(broker -> {
- Optional<BrokerLoadData> brokerLoadDataOpt =
context.brokerLoadDataStore().get(broker);
+ final Optional<BrokerLoadData> brokerLoadDataOpt;
+ try {
+ brokerLoadDataOpt = context.brokerLoadDataStore().get(broker);
+ } catch (IllegalStateException ignored) {
+ return false;
+ }
long topics =
brokerLoadDataOpt.map(BrokerLoadData::getTopics).orElse(0L);
// TODO: The broker load data might be delayed, so the max topic
check might not accurate.
return topics >= loadBalancerBrokerMaxTopics;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
index a7deeeaad8a..157d6a107c5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
@@ -103,4 +103,10 @@ public interface LoadDataStore<T> extends Closeable {
*/
void startProducer() throws LoadDataStoreException;
-}
+ /**
+ * Shutdowns the data store.
+ */
+ default void shutdown() throws IOException {
+ close();
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
index e9289d3ccda..672a9a66af0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -43,20 +43,17 @@ import org.apache.pulsar.client.api.TableView;
public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
private static final long
LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART = 2;
+ private static final String SHUTDOWN_ERR_MSG = "This load store tableview
has been shutdown";
private static final long INIT_TIMEOUT_IN_SECS = 5;
-
private volatile TableView<T> tableView;
private volatile long tableViewLastUpdateTimestamp;
-
+ private volatile long producerLastPublishTimestamp;
private volatile Producer<T> producer;
-
private final ServiceConfiguration conf;
-
private final PulsarClient client;
-
private final String topic;
-
private final Class<T> clazz;
+ private volatile boolean isShutdown;
public TableViewLoadDataStoreImpl(PulsarService pulsar, String topic,
Class<T> clazz)
throws LoadDataStoreException {
@@ -65,6 +62,7 @@ public class TableViewLoadDataStoreImpl<T> implements
LoadDataStore<T> {
this.client = pulsar.getClient();
this.topic = topic;
this.clazz = clazz;
+ this.isShutdown = false;
} catch (Exception e) {
throw new LoadDataStoreException(e);
}
@@ -72,41 +70,80 @@ public class TableViewLoadDataStoreImpl<T> implements
LoadDataStore<T> {
@Override
public synchronized CompletableFuture<Void> pushAsync(String key, T
loadData) {
- validateProducer();
- return
producer.newMessage().key(key).value(loadData).sendAsync().thenAccept(__ -> {});
+ String msg = validateProducer();
+ if (StringUtils.isNotBlank(msg)) {
+ return CompletableFuture.failedFuture(new
IllegalStateException(msg));
+ }
+ return producer.newMessage().key(key).value(loadData).sendAsync()
+ .thenAccept(__ -> producerLastPublishTimestamp =
System.currentTimeMillis());
}
@Override
public synchronized CompletableFuture<Void> removeAsync(String key) {
- validateProducer();
- return
producer.newMessage().key(key).value(null).sendAsync().thenAccept(__ -> {});
+ String msg = validateProducer();
+ if (StringUtils.isNotBlank(msg)) {
+ return CompletableFuture.failedFuture(new
IllegalStateException(msg));
+ }
+ return producer.newMessage().key(key).value(null).sendAsync()
+ .thenAccept(__ -> producerLastPublishTimestamp =
System.currentTimeMillis());
}
@Override
public synchronized Optional<T> get(String key) {
- validateTableView();
+ String msg = validateTableView();
+ if (StringUtils.isNotBlank(msg)) {
+ if (msg.equals(SHUTDOWN_ERR_MSG)) {
+ return Optional.empty();
+ } else {
+ throw new IllegalStateException(msg);
+ }
+ }
return Optional.ofNullable(tableView.get(key));
}
@Override
public synchronized void forEach(BiConsumer<String, T> action) {
- validateTableView();
+ String msg = validateTableView();
+ if (StringUtils.isNotBlank(msg)) {
+ throw new IllegalStateException(msg);
+ }
tableView.forEach(action);
}
public synchronized Set<Map.Entry<String, T>> entrySet() {
- validateTableView();
+ String msg = validateTableView();
+ if (StringUtils.isNotBlank(msg)) {
+ throw new IllegalStateException(msg);
+ }
return tableView.entrySet();
}
@Override
public synchronized int size() {
- validateTableView();
+ String msg = validateTableView();
+ if (StringUtils.isNotBlank(msg)) {
+ throw new IllegalStateException(msg);
+ }
return tableView.size();
}
+ private void validateState() {
+ if (isShutdown) {
+ throw new IllegalStateException(SHUTDOWN_ERR_MSG);
+ }
+ }
+
+
+ @Override
+ public synchronized void init() throws IOException {
+ validateState();
+ close();
+ start();
+ }
+
@Override
public synchronized void closeTableView() throws IOException {
+ validateState();
if (tableView != null) {
tableView.close();
tableView = null;
@@ -115,16 +152,26 @@ public class TableViewLoadDataStoreImpl<T> implements
LoadDataStore<T> {
@Override
public synchronized void start() throws LoadDataStoreException {
+ validateState();
startProducer();
startTableView();
}
+ private synchronized void closeProducer() throws IOException {
+ validateState();
+ if (producer != null) {
+ producer.close();
+ producer = null;
+ }
+ }
@Override
public synchronized void startTableView() throws LoadDataStoreException {
+ validateState();
if (tableView == null) {
try {
tableView =
client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).createAsync()
.get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
+ tableViewLastUpdateTimestamp = System.currentTimeMillis();
tableView.forEachAndListen((k, v) ->
tableViewLastUpdateTimestamp =
System.currentTimeMillis());
} catch (Exception e) {
@@ -133,13 +180,14 @@ public class TableViewLoadDataStoreImpl<T> implements
LoadDataStore<T> {
}
}
}
-
@Override
public synchronized void startProducer() throws LoadDataStoreException {
+ validateState();
if (producer == null) {
try {
producer =
client.newProducer(Schema.JSON(clazz)).topic(topic).createAsync()
.get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
+ producerLastPublishTimestamp = System.currentTimeMillis();
} catch (Exception e) {
producer = null;
throw new LoadDataStoreException(e);
@@ -149,38 +197,65 @@ public class TableViewLoadDataStoreImpl<T> implements
LoadDataStore<T> {
@Override
public synchronized void close() throws IOException {
- if (producer != null) {
- producer.close();
- producer = null;
+ if (isShutdown) {
+ return;
}
+ closeProducer();
closeTableView();
}
@Override
- public synchronized void init() throws IOException {
+ public synchronized void shutdown() throws IOException {
close();
- start();
+ isShutdown = true;
}
- private void validateProducer() {
- if (producer == null) {
+ private String validateProducer() {
+ if (isShutdown) {
+ return SHUTDOWN_ERR_MSG;
+ }
+ String restartReason = getRestartReason(producer,
producerLastPublishTimestamp);
+ if (StringUtils.isNotBlank(restartReason)) {
try {
+ closeProducer();
startProducer();
- log.info("Restarted producer on {}", topic);
+ log.info("Restarted producer on {}, {}", topic, restartReason);
} catch (Exception e) {
- log.error("Failed to restart producer on {}", topic, e);
- throw new RuntimeException(e);
+ String msg = "Failed to restart producer on " + topic + ",
restart reason: " + restartReason;
+ log.error(msg, e);
+ return msg;
}
}
+ return null;
}
- private void validateTableView() {
+ private String validateTableView() {
+ if (isShutdown) {
+ return SHUTDOWN_ERR_MSG;
+ }
+ String restartReason = getRestartReason(tableView,
tableViewLastUpdateTimestamp);
+ if (StringUtils.isNotBlank(restartReason)) {
+ try {
+ closeTableView();
+ startTableView();
+ log.info("Restarted tableview on {}, {}", topic,
restartReason);
+ } catch (Exception e) {
+ String msg = "Failed to tableview on " + topic + ", restart
reason: " + restartReason;
+ log.error(msg, e);
+ return msg;
+ }
+ }
+ return null;
+ }
+
+ private String getRestartReason(Object obj, long lastUpdateTimestamp) {
+
String restartReason = null;
- if (tableView == null) {
- restartReason = "table view is null";
+ if (obj == null) {
+ restartReason = "object is null";
} else {
- long inactiveDuration = System.currentTimeMillis() -
tableViewLastUpdateTimestamp;
+ long inactiveDuration = System.currentTimeMillis() -
lastUpdateTimestamp;
long threshold =
TimeUnit.MINUTES.toMillis(conf.getLoadBalancerReportUpdateMaxIntervalMinutes())
*
LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART;
if (inactiveDuration > threshold) {
@@ -189,17 +264,6 @@ public class TableViewLoadDataStoreImpl<T> implements
LoadDataStore<T> {
TimeUnit.MILLISECONDS.toSeconds(threshold));
}
}
-
- if (StringUtils.isNotBlank(restartReason)) {
- tableViewLastUpdateTimestamp = 0;
- try {
- closeTableView();
- startTableView();
- log.info("Restarted tableview on {}, {}", topic,
restartReason);
- } catch (Exception e) {
- log.error("Failed to restart tableview on {}", topic, e);
- throw new RuntimeException(e);
- }
- }
+ return restartReason;
}
-}
+}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b4885ffcb6f..cb121b66356 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -978,7 +978,12 @@ public class BrokerService implements Closeable {
pulsar.getNamespaceService().unloadNamespaceBundle(su, timeout, MILLISECONDS,
closeWithoutWaitingClientDisconnect).get(timeout, MILLISECONDS);
} catch (Exception e) {
- log.warn("Failed to unload namespace bundle {}",
su, e);
+ if (e instanceof ExecutionException
+ && e.getCause() instanceof
ServiceUnitNotReadyException) {
+ log.warn("Failed to unload namespace bundle
{}: {}", su, e.getMessage());
+ } else {
+ log.warn("Failed to unload namespace bundle
{}", su, e);
+ }
}
}
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 5156246bb5e..8054d781976 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -349,6 +349,9 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
@VisibleForTesting
@Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull
NamespaceName namespace) {
requireNonNull(namespace);
+ if (closed.get()) {
+ return CompletableFuture.completedFuture(null);
+ }
return
pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
.thenCompose(namespacePolicies -> {
if (namespacePolicies.isEmpty() ||
namespacePolicies.get().deleted) {
@@ -372,6 +375,9 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
});
initFuture.exceptionally(ex -> {
try {
+ if (closed.get()) {
+ return null;
+ }
log.error("[{}] Failed to create
reader on __change_events topic",
namespace, ex);
cleanCacheAndCloseReader(namespace,
false);
@@ -779,14 +785,22 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
if (closed.compareAndSet(false, true)) {
writerCaches.synchronous().invalidateAll();
readerCaches.values().forEach(future -> {
- if (future != null && !future.isCompletedExceptionally()) {
- future.thenAccept(reader -> {
- try {
- reader.close();
- } catch (Exception e) {
- log.error("Failed to close reader.", e);
- }
- });
+ try {
+ final var reader = future.getNow(null);
+ if (reader != null) {
+ reader.close();
+ log.info("Closed the reader for topic policies");
+ } else {
+ // Avoid blocking the thread that the reader is created
+
future.thenAccept(SystemTopicClient.Reader::closeAsync).whenComplete((__, e) ->
{
+ if (e == null) {
+ log.info("Closed the reader for topic
policies");
+ } else {
+ log.error("Failed to close the reader for
topic policies", e);
+ }
+ });
+ }
+ } catch (Throwable ignored) {
}
});
readerCaches.clear();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
index 41413f3e3a9..fa63ce566c6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
@@ -22,13 +22,15 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -36,26 +38,33 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
+@Test(groups = "broker")
public class ExtensibleLoadManagerCloseTest {
private static final String clusterName = "test";
- private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(1,
0, () -> 0);
private final List<PulsarService> brokers = new ArrayList<>();
- private PulsarAdmin admin;
+ private LocalBookkeeperEnsemble bk;
@BeforeClass(alwaysRun = true)
public void setup() throws Exception {
+ bk = new LocalBookkeeperEnsemble(1, 0, () -> 0);
bk.start();
- for (int i = 0; i < 3; i++) {
+ }
+
+ private void setupBrokers(int numBrokers) throws Exception {
+ brokers.clear();
+ for (int i = 0; i < numBrokers; i++) {
final var broker = new PulsarService(brokerConfig());
broker.start();
brokers.add(broker);
}
- admin = brokers.get(0).getAdminClient();
- admin.clusters().createCluster(clusterName,
ClusterData.builder().build());
- admin.tenants().createTenant("public", TenantInfo.builder()
- .allowedClusters(Collections.singleton(clusterName)).build());
- admin.namespaces().createNamespace("public/default");
+ final var admin = brokers.get(0).getAdminClient();
+ if (!admin.clusters().getClusters().contains(clusterName)) {
+ admin.clusters().createCluster(clusterName,
ClusterData.builder().build());
+ admin.tenants().createTenant("public", TenantInfo.builder()
+
.allowedClusters(Collections.singleton(clusterName)).build());
+ admin.namespaces().createNamespace("public/default");
+ }
}
@@ -85,7 +94,9 @@ public class ExtensibleLoadManagerCloseTest {
@Test
public void testCloseAfterLoadingBundles() throws Exception {
+ setupBrokers(3);
final var topic = "test";
+ final var admin = brokers.get(0).getAdminClient();
admin.topics().createPartitionedTopic(topic, 20);
admin.lookups().lookupPartitionedTopic(topic);
final var client =
PulsarClient.builder().serviceUrl(brokers.get(0).getBrokerServiceUrl()).build();
@@ -104,4 +115,25 @@ public class ExtensibleLoadManagerCloseTest {
Assert.assertTrue(closeTimeMs < 5000L);
}
}
+
+ @Test
+ public void testLookup() throws Exception {
+ setupBrokers(1);
+ final var topic = "test-lookup";
+ final var numPartitions = 16;
+ final var admin = brokers.get(0).getAdminClient();
+ admin.topics().createPartitionedTopic(topic, numPartitions);
+
+ final var futures = new ArrayList<CompletableFuture<String>>();
+ for (int i = 0; i < numPartitions; i++) {
+ futures.add(admin.lookups().lookupTopicAsync(topic +
TopicName.PARTITIONED_TOPIC_SUFFIX + i));
+ }
+ FutureUtil.waitForAll(futures).get();
+
+ final var start = System.currentTimeMillis();
+ brokers.get(0).close();
+ final var closeTimeMs = System.currentTimeMillis() - start;
+ log.info("Broker close time: {}", closeTimeMs);
+ Assert.assertTrue(closeTimeMs < 5000L);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index e569f0d32d5..af42649436b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -1551,6 +1551,9 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertEquals(Optional.empty(),
channel2.getOwnerAsync(freeBundle).get());
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty());
+ ServiceUnitStateChannel finalLeaderChannel = leaderChannel;
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() -> getCleanupJobs(finalLeaderChannel).isEmpty());
// clean-up
FieldUtils.writeDeclaredField(leaderChannel,
"maxCleanupDelayTimeInSecs", 3 * 60, true);
@@ -1621,8 +1624,8 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
.monitorOwnerships(List.of(brokerId1, brokerId2, "broker-3"));
ServiceUnitStateChannel finalLeaderChannel = leaderChannel;
- Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() ->
getCleanupJobs(finalLeaderChannel).isEmpty());
-
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() -> getCleanupJobs(finalLeaderChannel).isEmpty());
waitUntilNewOwner(channel2, releasingBundle1, brokerId2);
waitUntilNewOwner(channel2, releasingBundle2, brokerId2);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
index d25cba2bd1b..6d4e0f00c93 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
@@ -18,8 +18,12 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.store;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
import static org.testng.AssertJUnit.assertTrue;
import com.google.common.collect.Sets;
@@ -33,6 +37,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -75,8 +80,6 @@ public class LoadDataStoreTest extends
MockedPulsarServiceBaseTest {
@Cleanup
LoadDataStore<MyClass> loadDataStore =
LoadDataStoreFactory.create(pulsar, topic, MyClass.class);
- loadDataStore.startProducer();
- loadDataStore.startTableView();
MyClass myClass1 = new MyClass("1", 1);
loadDataStore.pushAsync("key1", myClass1).get();
@@ -109,8 +112,6 @@ public class LoadDataStoreTest extends
MockedPulsarServiceBaseTest {
@Cleanup
LoadDataStore<Integer> loadDataStore =
LoadDataStoreFactory.create(pulsar, topic, Integer.class);
- loadDataStore.startProducer();
- loadDataStore.startTableView();
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < 10; i++) {
@@ -134,9 +135,6 @@ public class LoadDataStoreTest extends
MockedPulsarServiceBaseTest {
String topic = TopicDomain.persistent + "://" +
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
LoadDataStore<Integer> loadDataStore =
LoadDataStoreFactory.create(pulsar, topic, Integer.class);
- loadDataStore.startProducer();
-
- loadDataStore.startTableView();
loadDataStore.pushAsync("1", 1).get();
Awaitility.await().untilAsserted(() ->
assertEquals(loadDataStore.size(), 1));
assertEquals(loadDataStore.get("1").get(), 1);
@@ -150,6 +148,31 @@ public class LoadDataStoreTest extends
MockedPulsarServiceBaseTest {
Awaitility.await().untilAsserted(() ->
assertEquals(loadDataStore.get("1").get(), 3));
}
+ @Test
+ public void testProducerRestart() throws Exception {
+ String topic = TopicDomain.persistent + "://" +
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
+ var loadDataStore =
+ (TableViewLoadDataStoreImpl)
spy(LoadDataStoreFactory.create(pulsar, topic, Integer.class));
+
+ // happy case
+ loadDataStore.pushAsync("1", 1).get();
+ Awaitility.await().untilAsserted(() ->
assertEquals(loadDataStore.size(), 1));
+ assertEquals(loadDataStore.get("1").get(), 1);
+ verify(loadDataStore, times(1)).startProducer();
+
+ // loadDataStore will restart producer if null.
+ FieldUtils.writeField(loadDataStore, "producer", null, true);
+ loadDataStore.pushAsync("1", 2).get();
+ Awaitility.await().untilAsserted(() ->
assertEquals(loadDataStore.get("1").get(), 2));
+ verify(loadDataStore, times(2)).startProducer();
+
+ // loadDataStore will restart producer if too slow.
+ FieldUtils.writeField(loadDataStore, "producerLastPublishTimestamp", 0
, true);
+ loadDataStore.pushAsync("1", 3).get();
+ Awaitility.await().untilAsserted(() ->
assertEquals(loadDataStore.get("1").get(), 3));
+ verify(loadDataStore, times(3)).startProducer();
+ }
+
@Test
public void testProducerStop() throws Exception {
String topic = TopicDomain.persistent + "://" +
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
@@ -165,4 +188,25 @@ public class LoadDataStoreTest extends
MockedPulsarServiceBaseTest {
loadDataStore.removeAsync("2").get();
}
-}
+ @Test
+ public void testShutdown() throws Exception {
+ String topic = TopicDomain.persistent + "://" +
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
+ LoadDataStore<Integer> loadDataStore =
+ LoadDataStoreFactory.create(pulsar, topic, Integer.class);
+ loadDataStore.start();
+ loadDataStore.shutdown();
+
+ Assert.assertTrue(loadDataStore.pushAsync("2",
2).isCompletedExceptionally());
+
Assert.assertTrue(loadDataStore.removeAsync("2").isCompletedExceptionally());
+ assertTrue(loadDataStore.get("2").isEmpty());
+ assertThrows(IllegalStateException.class, loadDataStore::size);
+ assertThrows(IllegalStateException.class, loadDataStore::entrySet);
+ assertThrows(IllegalStateException.class, () ->
loadDataStore.forEach((k, v) -> {}));
+ assertThrows(IllegalStateException.class, loadDataStore::init);
+ assertThrows(IllegalStateException.class, loadDataStore::start);
+ assertThrows(IllegalStateException.class,
loadDataStore::startProducer);
+ assertThrows(IllegalStateException.class,
loadDataStore::startTableView);
+ assertThrows(IllegalStateException.class,
loadDataStore::closeTableView);
+ }
+
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index d5d4174ee10..4f520604978 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -364,8 +364,8 @@ public class TableViewImpl<T> implements TableView<T> {
}
}).exceptionally(ex -> {
if (ex.getCause() instanceof
PulsarClientException.AlreadyClosedException) {
- log.error("Reader {} was closed while
reading existing messages.",
- reader.getTopic(), ex);
+ log.info("Reader {} was closed while
reading existing messages.",
+ reader.getTopic());
} else {
log.warn("Reader {} was interrupted
while reading existing messages. ",
reader.getTopic(), ex);
@@ -393,8 +393,7 @@ public class TableViewImpl<T> implements TableView<T> {
readTailMessages(reader);
}).exceptionally(ex -> {
if (ex.getCause() instanceof
PulsarClientException.AlreadyClosedException) {
- log.error("Reader {} was closed while reading tail
messages.",
- reader.getTopic(), ex);
+ log.info("Reader {} was closed while reading tail
messages.", reader.getTopic());
// Fail all refresh request when no more messages can
be read.
pendingRefreshRequests.keySet().forEach(future -> {
pendingRefreshRequests.remove(future);