This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 6df02655a3c [fix][broker][branch-3.0] Set ServiceUnitStateChannel
topic compaction threshold explicitly, improve getOwnerAsync, and fix other
bugs (#22064) (#22154)
6df02655a3c is described below
commit 6df02655a3c307d29a86b7f3e0ea46c16ad18aea
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Feb 28 17:52:37 2024 -0800
[fix][broker][branch-3.0] Set ServiceUnitStateChannel topic compaction
threshold explicitly, improve getOwnerAsync, and fix other bugs (#22064)
(#22154)
---
.../extensions/ExtensibleLoadManagerImpl.java | 56 +++++++--
.../channel/ServiceUnitStateChannelImpl.java | 101 +++++++++++----
.../extensions/store/LoadDataStoreFactory.java | 7 +-
.../store/TableViewLoadDataStoreImpl.java | 79 +++++++++---
.../client/impl/RawBatchMessageContainerImpl.java | 1 +
.../extensions/ExtensibleLoadManagerImplTest.java | 136 ++++++++++++---------
.../channel/ServiceUnitStateChannelTest.java | 77 ++++++------
.../extensions/store/LoadDataStoreTest.java | 41 ++-----
.../apache/pulsar/client/impl/TableViewImpl.java | 8 +-
.../loadbalance/ExtensibleLoadManagerTest.java | 1 -
10 files changed, 325 insertions(+), 182 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index baf3e7b1563..409bb55075b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.loadbalance.extensions;
import static java.lang.String.format;
import static
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower;
import static
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.TOPIC;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
import static
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.getNamespaceBundle;
@@ -117,6 +118,8 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
private static final long MONITOR_INTERVAL_IN_MILLIS = 120_000;
+ public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;
+
private static final String ELECTION_ROOT =
"/loadbalance/extension/leader";
private PulsarService pulsar;
@@ -173,6 +176,8 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
private volatile boolean started = false;
+ private boolean configuredSystemTopics = false;
+
private final AssignCounter assignCounter = new AssignCounter();
@Getter
private final UnloadCounter unloadCounter = new UnloadCounter();
@@ -262,6 +267,10 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
return
ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
}
+ public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
+ return pulsar.getLoadManager().get() instanceof
ExtensibleLoadManagerWrapper;
+ }
+
public static ExtensibleLoadManagerImpl get(LoadManager loadManager) {
if (!(loadManager instanceof ExtensibleLoadManagerWrapper
loadManagerWrapper)) {
throw new IllegalArgumentException("The load manager should be
'ExtensibleLoadManagerWrapper'.");
@@ -291,6 +300,27 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}
+ private static boolean configureSystemTopics(PulsarService pulsar) {
+ try {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
+ && (pulsar.getConfiguration().isSystemTopicEnabled()
+ &&
pulsar.getConfiguration().isTopicLevelPoliciesEnabled())) {
+ Long threshold =
pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC);
+ if (threshold == null || COMPACTION_THRESHOLD !=
threshold.longValue()) {
+
pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC,
COMPACTION_THRESHOLD);
+ log.info("Set compaction threshold: {} bytes for system
topic {}.", COMPACTION_THRESHOLD, TOPIC);
+ }
+ } else {
+ log.warn("System topic or topic level policies is disabled. "
+ + "{} compaction threshold follows the broker or
namespace policies.", TOPIC);
+ }
+ return true;
+ } catch (Exception e) {
+ log.error("Failed to set compaction threshold for system
topic:{}", TOPIC, e);
+ }
+ return false;
+ }
+
@Override
public void start() throws PulsarServerException {
if (this.started) {
@@ -329,9 +359,9 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
try {
this.brokerLoadDataStore = LoadDataStoreFactory
- .create(pulsar.getClient(),
BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
+ .create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC,
BrokerLoadData.class);
this.topBundlesLoadDataStore = LoadDataStoreFactory
- .create(pulsar.getClient(),
TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
+ .create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC,
TopBundlesLoadData.class);
} catch (LoadDataStoreException e) {
throw new PulsarServerException(e);
}
@@ -389,6 +419,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
this.splitScheduler.start();
this.initWaiter.countDown();
this.started = true;
+ log.info("Started load manager.");
} catch (Exception ex) {
log.error("Failed to start the extensible load balance and close
broker registry {}.",
this.brokerRegistry, ex);
@@ -523,7 +554,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
if (ex != null) {
assignCounter.incrementFailure();
}
- lookupRequests.remove(key, newFutureCreated.getValue());
+ lookupRequests.remove(key);
});
}
}
@@ -736,13 +767,13 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
}
public static boolean isInternalTopic(String topic) {
- return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC)
+ return topic.startsWith(TOPIC)
|| topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
|| topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}
@VisibleForTesting
- void playLeader() {
+ synchronized void playLeader() {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Leader);
int retry = 0;
@@ -760,7 +791,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
serviceUnitStateChannel.scheduleOwnershipMonitor();
break;
} catch (Throwable e) {
- log.error("The broker:{} failed to set the role. Retrying {}
th ...",
+ log.warn("The broker:{} failed to set the role. Retrying {} th
...",
pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10,
MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
@@ -780,7 +811,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
}
@VisibleForTesting
- void playFollower() {
+ synchronized void playFollower() {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Follower);
int retry = 0;
@@ -794,7 +825,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
topBundlesLoadDataStore.startProducer();
break;
} catch (Throwable e) {
- log.error("The broker:{} failed to set the role. Retrying {}
th ...",
+ log.warn("The broker:{} failed to set the role. Retrying {} th
...",
pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10,
MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
@@ -834,7 +865,9 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
return metricsCollection;
}
- private void monitor() {
+
+ @VisibleForTesting
+ protected void monitor() {
try {
initWaiter.await();
@@ -842,6 +875,11 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
// Periodically check the role in case ZK watcher fails.
var isChannelOwner = serviceUnitStateChannel.isChannelOwner();
if (isChannelOwner) {
+ // System topic config might fail due to the race condition
+ // with topic policy init(Topic policies cache have not init).
+ if (!configuredSystemTopics) {
+ configuredSystemTopics = configureSystemTopics(pulsar);
+ }
if (role != Leader) {
log.warn("Current role:{} does not match with the channel
ownership:{}. "
+ "Playing the leader role.", role,
isChannelOwner);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 8efa4e2e21a..220ce02ba5a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -492,21 +492,28 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
String serviceUnit,
ServiceUnitState state,
Optional<String> owner) {
- CompletableFuture<Optional<String>> activeOwner = owner.isPresent()
- ? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData
-> lookupData.flatMap(__ -> owner))
- : CompletableFuture.completedFuture(Optional.empty());
-
- return activeOwner
- .thenCompose(broker -> broker
- .map(__ -> activeOwner)
- .orElseGet(() ->
deferGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable)))
- .whenComplete((__, e) -> {
+ return deferGetOwnerRequest(serviceUnit)
+ .thenCompose(newOwner -> {
+ if (newOwner == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ return brokerRegistry.lookupAsync(newOwner)
+ .thenApply(lookupData -> {
+ if (lookupData.isPresent()) {
+ return newOwner;
+ } else {
+ throw new IllegalStateException(
+ "The new owner " + newOwner + " is
inactive.");
+ }
+ });
+ }).whenComplete((__, e) -> {
if (e != null) {
- log.error("Failed to get active owner broker.
serviceUnit:{}, state:{}, owner:{}",
- serviceUnit, state, owner, e);
+ log.error("{} failed to get active owner broker.
serviceUnit:{}, state:{}, owner:{}",
+ brokerId, serviceUnit, state, owner, e);
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
}
- });
+ }).thenApply(Optional::ofNullable);
}
public CompletableFuture<Optional<String>> getOwnerAsync(String
serviceUnit) {
@@ -544,6 +551,25 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
}
+ private Optional<String> getOwner(String serviceUnit) {
+ ServiceUnitStateData data = tableview.get(serviceUnit);
+ ServiceUnitState state = state(data);
+ switch (state) {
+ case Owned -> {
+ return Optional.of(data.dstBroker());
+ }
+ case Splitting -> {
+ return Optional.of(data.sourceBroker());
+ }
+ case Init, Free -> {
+ return Optional.empty();
+ }
+ default -> {
+ return null;
+ }
+ }
+ }
+
private long getNextVersionId(String serviceUnit) {
var data = tableview.get(serviceUnit);
return getNextVersionId(data);
@@ -697,7 +723,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private void log(Throwable e, String serviceUnit, ServiceUnitStateData
data, ServiceUnitStateData next) {
if (e == null) {
- if (log.isDebugEnabled() || isTransferCommand(data)) {
+ if (debug() || isTransferCommand(data)) {
long handlerTotalCount = getHandlerTotalCounter(data).get();
long handlerFailureCount =
getHandlerFailureCounter(data).get();
log.info("{} handled {} event for serviceUnit:{}, cur:{},
next:{}, "
@@ -736,6 +762,9 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data)
{
var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
if (getOwnerRequest != null) {
+ if (debug()) {
+ log.info("Returned owner request for serviceUnit:{}",
serviceUnit);
+ }
getOwnerRequest.complete(data.dstBroker());
}
stateChangeListeners.notify(serviceUnit, data, null);
@@ -848,26 +877,52 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit)
{
+
var requested = new MutableObject<CompletableFuture<String>>();
try {
return getOwnerRequests
.computeIfAbsent(serviceUnit, k -> {
- CompletableFuture<String> future = new
CompletableFuture<>();
+ var ownerBefore = getOwner(serviceUnit);
+ if (ownerBefore != null && ownerBefore.isPresent()) {
+ // Here, we do a quick active check first with the
computeIfAbsent lock
+
brokerRegistry.lookupAsync(ownerBefore.get()).getNow(Optional.empty())
+ .ifPresent(__ -> requested.setValue(
+
CompletableFuture.completedFuture(ownerBefore.get())));
+
+ if (requested.getValue() != null) {
+ return requested.getValue();
+ }
+ }
+
+
+ CompletableFuture<String> future =
+ new
CompletableFuture<String>().orTimeout(inFlightStateWaitingTimeInMillis,
+ TimeUnit.MILLISECONDS)
+ .exceptionally(e -> {
+ var ownerAfter =
getOwner(serviceUnit);
+ log.warn("{} failed to wait for
owner for serviceUnit:{}; Trying to "
+ + "return the
current owner:{}",
+ brokerId, serviceUnit,
ownerAfter, e);
+ if (ownerAfter == null) {
+ throw new
IllegalStateException(e);
+ }
+ return ownerAfter.orElse(null);
+ });
+ if (debug()) {
+ log.info("{} is waiting for owner for
serviceUnit:{}", brokerId, serviceUnit);
+ }
requested.setValue(future);
return future;
});
} finally {
var future = requested.getValue();
if (future != null) {
- future.orTimeout(inFlightStateWaitingTimeInMillis + 5 * 1000,
TimeUnit.MILLISECONDS)
- .whenComplete((v, e) -> {
- if (e != null) {
- getOwnerRequests.remove(serviceUnit,
future);
- log.warn("Failed to getOwner for
serviceUnit:{}",
- serviceUnit, e);
- }
- }
- );
+ future.whenComplete((__, e) -> {
+ getOwnerRequests.remove(serviceUnit);
+ if (e != null) {
+ log.warn("{} failed to getOwner for serviceUnit:{}",
brokerId, serviceUnit, e);
+ }
+ });
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java
index 18f39abd76b..bcb2657c67f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java
@@ -18,15 +18,16 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.store;
-import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.broker.PulsarService;
/**
* The load data store factory, use to create the load data store.
*/
public class LoadDataStoreFactory {
- public static <T> LoadDataStore<T> create(PulsarClient client, String
name, Class<T> clazz)
+ public static <T> LoadDataStore<T> create(PulsarService pulsar, String
name,
+ Class<T> clazz)
throws LoadDataStoreException {
- return new TableViewLoadDataStoreImpl<>(client, name, clazz);
+ return new TableViewLoadDataStoreImpl<>(pulsar, name, clazz);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
index 56afbef0456..d916e917162 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -23,34 +23,46 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
-import org.apache.pulsar.common.util.FutureUtil;
/**
* The load data store, base on {@link TableView <T>}.
*
* @param <T> Load data type.
*/
+@Slf4j
public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
+ private static final long
LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART = 2;
+
private volatile TableView<T> tableView;
+ private volatile long tableViewLastUpdateTimestamp;
private volatile Producer<T> producer;
+ private final ServiceConfiguration conf;
+
private final PulsarClient client;
private final String topic;
private final Class<T> clazz;
- public TableViewLoadDataStoreImpl(PulsarClient client, String topic,
Class<T> clazz) throws LoadDataStoreException {
+ public TableViewLoadDataStoreImpl(PulsarService pulsar, String topic,
Class<T> clazz)
+ throws LoadDataStoreException {
try {
- this.client = client;
+ this.conf = pulsar.getConfiguration();
+ this.client = pulsar.getClient();
this.topic = topic;
this.clazz = clazz;
} catch (Exception e) {
@@ -60,40 +72,36 @@ public class TableViewLoadDataStoreImpl<T> implements
LoadDataStore<T> {
@Override
public synchronized CompletableFuture<Void> pushAsync(String key, T
loadData) {
- if (producer == null) {
- return FutureUtil.failedFuture(new IllegalStateException("producer
has not been started"));
- }
+ validateProducer();
return
producer.newMessage().key(key).value(loadData).sendAsync().thenAccept(__ -> {});
}
@Override
public synchronized CompletableFuture<Void> removeAsync(String key) {
- if (producer == null) {
- return FutureUtil.failedFuture(new IllegalStateException("producer
has not been started"));
- }
+ validateProducer();
return
producer.newMessage().key(key).value(null).sendAsync().thenAccept(__ -> {});
}
@Override
public synchronized Optional<T> get(String key) {
- validateTableViewStart();
+ validateTableView();
return Optional.ofNullable(tableView.get(key));
}
@Override
public synchronized void forEach(BiConsumer<String, T> action) {
- validateTableViewStart();
+ validateTableView();
tableView.forEach(action);
}
public synchronized Set<Map.Entry<String, T>> entrySet() {
- validateTableViewStart();
+ validateTableView();
return tableView.entrySet();
}
@Override
public synchronized int size() {
- validateTableViewStart();
+ validateTableView();
return tableView.size();
}
@@ -116,6 +124,8 @@ public class TableViewLoadDataStoreImpl<T> implements
LoadDataStore<T> {
if (tableView == null) {
try {
tableView =
client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create();
+ tableView.forEachAndListen((k, v) ->
+ tableViewLastUpdateTimestamp =
System.currentTimeMillis());
} catch (PulsarClientException e) {
tableView = null;
throw new LoadDataStoreException(e);
@@ -150,9 +160,48 @@ public class TableViewLoadDataStoreImpl<T> implements
LoadDataStore<T> {
start();
}
- private synchronized void validateTableViewStart() {
+ private void validateProducer() {
+ if (producer == null || !producer.isConnected()) {
+ try {
+ if (producer != null) {
+ producer.close();
+ }
+ producer = null;
+ startProducer();
+ log.info("Restarted producer on {}", topic);
+ } catch (Exception e) {
+ log.error("Failed to restart producer on {}", topic, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void validateTableView() {
+ String restartReason = null;
+
if (tableView == null) {
- throw new IllegalStateException("table view has not been started");
+ restartReason = "table view is null";
+ } else {
+ long inactiveDuration = System.currentTimeMillis() -
tableViewLastUpdateTimestamp;
+ long threshold =
TimeUnit.MINUTES.toMillis(conf.getLoadBalancerReportUpdateMaxIntervalMinutes())
+ *
LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART;
+ if (inactiveDuration > threshold) {
+ restartReason = String.format("inactiveDuration=%d secs >
threshold = %d secs",
+ TimeUnit.MILLISECONDS.toSeconds(inactiveDuration),
+ TimeUnit.MILLISECONDS.toSeconds(threshold));
+ }
+ }
+
+ if (StringUtils.isNotBlank(restartReason)) {
+ tableViewLastUpdateTimestamp = 0;
+ try {
+ closeTableView();
+ startTableView();
+ log.info("Restarted tableview on {}, {}", topic,
restartReason);
+ } catch (Exception e) {
+ log.error("Failed to restart tableview on {}", topic, e);
+ throw new RuntimeException(e);
+ }
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
index ba8d3db7178..374f1e30c0a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
@@ -187,6 +187,7 @@ public class RawBatchMessageContainerImpl extends
BatchMessageContainerImpl {
idData.writeTo(buf);
buf.writeInt(metadataAndPayload.readableBytes());
buf.writeBytes(metadataAndPayload);
+ metadataAndPayload.release();
encryptedPayload.release();
clear();
return buf;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 4ec884a624e..e87532cdc6c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -115,6 +115,7 @@ import
org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.awaitility.Awaitility;
import org.mockito.MockedStatic;
+import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -141,64 +142,53 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
private final String defaultTestNamespace = "public/test";
+ private static void initConfig(ServiceConfiguration conf){
+ conf.setForceDeleteNamespaceAllowed(true);
+ conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+ conf.setAllowAutoTopicCreation(true);
+
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+ conf.setLoadBalancerSheddingEnabled(false);
+ conf.setLoadBalancerDebugModeEnabled(true);
+ conf.setTopicLevelPoliciesEnabled(true);
+ }
+
@BeforeClass
@Override
public void setup() throws Exception {
- try (MockedStatic<ServiceUnitStateChannelImpl> channelMockedStatic =
- mockStatic(ServiceUnitStateChannelImpl.class)) {
- channelMockedStatic.when(() ->
ServiceUnitStateChannelImpl.newInstance(isA(PulsarService.class)))
- .thenAnswer(invocation -> {
- PulsarService pulsarService =
invocation.getArgument(0);
- // Set the inflight state waiting time and ownership
monitor delay time to 5 seconds to avoid
- // stuck when doing unload.
- return new ServiceUnitStateChannelImpl(pulsarService,
5 * 1000, 1);
- });
- conf.setForceDeleteNamespaceAllowed(true);
- conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
- conf.setAllowAutoTopicCreation(true);
-
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
-
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
- conf.setLoadBalancerSheddingEnabled(false);
- conf.setLoadBalancerDebugModeEnabled(true);
- conf.setTopicLevelPoliciesEnabled(true);
- super.internalSetup(conf);
- pulsar1 = pulsar;
- ServiceConfiguration defaultConf = getDefaultConf();
- defaultConf.setAllowAutoTopicCreation(true);
- defaultConf.setForceDeleteNamespaceAllowed(true);
-
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
-
defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
- defaultConf.setLoadBalancerSheddingEnabled(false);
- defaultConf.setTopicLevelPoliciesEnabled(true);
- additionalPulsarTestContext =
createAdditionalPulsarTestContext(defaultConf);
- pulsar2 = additionalPulsarTestContext.getPulsarService();
-
- setPrimaryLoadManager();
-
- setSecondaryLoadManager();
-
- admin.clusters().createCluster(this.conf.getClusterName(),
-
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
- admin.tenants().createTenant("public",
- new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
- Sets.newHashSet(this.conf.getClusterName())));
- admin.namespaces().createNamespace("public/default");
-
admin.namespaces().setNamespaceReplicationClusters("public/default",
- Sets.newHashSet(this.conf.getClusterName()));
-
- admin.namespaces().createNamespace(defaultTestNamespace);
-
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
- Sets.newHashSet(this.conf.getClusterName()));
- }
+ // Set the inflight state waiting time and ownership monitor delay
time to 5 seconds to avoid
+ // stuck when doing unload.
+ initConfig(conf);
+ super.internalSetup(conf);
+ pulsar1 = pulsar;
+ ServiceConfiguration defaultConf = getDefaultConf();
+ initConfig(defaultConf);
+ additionalPulsarTestContext =
createAdditionalPulsarTestContext(defaultConf);
+ pulsar2 = additionalPulsarTestContext.getPulsarService();
+
+ setPrimaryLoadManager();
+
+ setSecondaryLoadManager();
+
+ admin.clusters().createCluster(this.conf.getClusterName(),
+
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ admin.tenants().createTenant("public",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
+ Sets.newHashSet(this.conf.getClusterName())));
+ admin.namespaces().createNamespace("public/default");
+ admin.namespaces().setNamespaceReplicationClusters("public/default",
+ Sets.newHashSet(this.conf.getClusterName()));
+
+ admin.namespaces().createNamespace(defaultTestNamespace);
+
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
+ Sets.newHashSet(this.conf.getClusterName()));
}
@Override
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
- pulsar1 = null;
- pulsar2.close();
- super.internalCleanup();
this.additionalPulsarTestContext.close();
+ super.internalCleanup();
}
@BeforeMethod(alwaysRun = true)
@@ -236,9 +226,6 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
Optional<BrokerLookupData> brokerLookupData1 =
secondaryLoadManager.assign(Optional.empty(), bundle).get();
assertEquals(brokerLookupData, brokerLookupData1);
- verify(primaryLoadManager, times(1)).getBrokerSelectionStrategy();
- verify(secondaryLoadManager, times(0)).getBrokerSelectionStrategy();
-
Optional<LookupResult> lookupResult = pulsar2.getNamespaceService()
.getBrokerServiceUrlAsync(topicName, null).get();
assertTrue(lookupResult.isPresent());
@@ -462,7 +449,8 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
"specified_positions_divide", List.of(bundleRanges.get(0),
bundleRanges.get(1), splitPosition));
BundlesData bundlesData = admin.namespaces().getBundles(namespace);
- assertEquals(bundlesData.getNumBundles(), numBundles + 1);
+ Awaitility.waitAtMost(15, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(bundlesData.getNumBundles(),
numBundles + 1));
String lowBundle = String.format("0x%08x", bundleRanges.get(0));
String midBundle = String.format("0x%08x", splitPosition);
String highBundle = String.format("0x%08x", bundleRanges.get(1));
@@ -475,15 +463,26 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
final String namespace = "public/testDeleteNamespaceBundle";
admin.namespaces().createNamespace(namespace, 3);
TopicName topicName = TopicName.get(namespace +
"/test-delete-namespace-bundle");
- NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
- String broker = admin.lookups().lookupTopic(topicName.toString());
- log.info("Assign the bundle {} to {}", bundle, broker);
-
- checkOwnershipState(broker, bundle);
- admin.namespaces().deleteNamespaceBundle(topicName.getNamespace(),
bundle.getBundleRange());
- assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+ Awaitility.await()
+ .atMost(15, TimeUnit.SECONDS)
+ .ignoreExceptions()
+ .untilAsserted(() -> {
+ NamespaceBundle bundle = getBundleAsync(pulsar1,
topicName).get();
+ String broker =
admin.lookups().lookupTopic(topicName.toString());
+ log.info("Assign the bundle {} to {}", bundle, broker);
+ checkOwnershipState(broker, bundle);
+
admin.namespaces().deleteNamespaceBundle(topicName.getNamespace(),
bundle.getBundleRange(), true);
+ // this could fail if the system topic lookup
asynchronously happens before this.
+ // we will retry if it fails.
+
assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+ });
+
+ Awaitility.await()
+ .atMost(15, TimeUnit.SECONDS)
+ .ignoreExceptions()
+ .untilAsserted(() ->
admin.namespaces().deleteNamespace(namespace, true));
}
@Test(timeOut = 30 * 1000)
@@ -708,7 +707,7 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
assertEquals(result, expectedBrokerServiceUrl);
}
- @Test
+ @Test(priority = 10)
public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws
Exception {
var topBundlesLoadDataStorePrimary =
(LoadDataStore)
FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore",
true);
@@ -1219,6 +1218,21 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
admin.brokers().healthcheck(TopicVersion.V2);
}
+ @Test(timeOut = 30 * 1000)
+ public void compactionScheduleTest() {
+ Awaitility.await()
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .atMost(30, TimeUnit.SECONDS)
+ .ignoreExceptions()
+ .untilAsserted(() -> { // wait until true
+ primaryLoadManager.monitor();
+ secondaryLoadManager.monitor();
+ var threshold = admin.topicPolicies()
+
.getCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, false);
+ AssertJUnit.assertEquals(5 * 1024 * 1024, threshold ==
null ? 0 : threshold.longValue());
+ });
+ }
+
private static abstract class MockBrokerFilter implements BrokerFilter {
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 158b91fd277..84988a3b3bf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -86,7 +86,6 @@ import
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.policies.data.TopicType;
@@ -331,23 +330,6 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
return errorCnt;
}
- @Test(priority = 1)
- public void compactionScheduleTest() {
-
- Awaitility.await()
- .pollInterval(200, TimeUnit.MILLISECONDS)
- .atMost(5, TimeUnit.SECONDS)
- .untilAsserted(() -> { // wait until true
- try {
- var threshold = admin.topicPolicies()
-
.getCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, false).longValue();
- assertEquals(5 * 1024 * 1024, threshold);
- } catch (Exception e) {
- ;
- }
- });
- }
-
@Test(priority = 2)
public void assignmentTest()
throws ExecutionException, InterruptedException,
IllegalAccessException, TimeoutException {
@@ -927,8 +909,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
}
@Test(priority = 10)
- public void conflictAndCompactionTest() throws ExecutionException,
InterruptedException, TimeoutException,
- IllegalAccessException, PulsarClientException,
PulsarServerException {
+ public void conflictAndCompactionTest() throws Exception {
String bundle = String.format("%s/%s", "public/default",
"0x0000000a_0xffffffff");
var owner1 = channel1.getOwnerAsync(bundle);
var owner2 = channel2.getOwnerAsync(bundle);
@@ -961,26 +942,41 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
Field strategicCompactorField =
FieldUtils.getDeclaredField(PulsarService.class, "strategicCompactor", true);
FieldUtils.writeField(strategicCompactorField, pulsar1, compactor,
true);
FieldUtils.writeField(strategicCompactorField, pulsar2, compactor,
true);
- Awaitility.await()
- .pollInterval(200, TimeUnit.MILLISECONDS)
- .atMost(140, TimeUnit.SECONDS)
- .untilAsserted(() -> {
- channel1.publishAssignEventAsync(bundle, brokerId1);
- verify(compactor, times(1))
- .compact(eq(ServiceUnitStateChannelImpl.TOPIC),
any());
- });
+
+ var threshold = admin.topicPolicies()
+ .getCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC);
+ admin.topicPolicies()
+ .setCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, 0);
+
+ try {
+ Awaitility.await()
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .atMost(140, TimeUnit.SECONDS)
+ .untilAsserted(() -> {
+ channel1.publishAssignEventAsync(bundle, brokerId1);
+ verify(compactor, times(1))
+
.compact(eq(ServiceUnitStateChannelImpl.TOPIC), any());
+ });
+
+
+ var channel3 = createChannel(pulsar);
+ channel3.start();
+ Awaitility.await()
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(
+ channel3.getOwnerAsync(bundle).get(),
Optional.of(brokerId1)));
+ channel3.close();
+ } finally {
+ FieldUtils.writeDeclaredField(channel2,
+ "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
+ if (threshold != null) {
+ admin.topicPolicies()
+
.setCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, threshold);
+ }
+ }
- var channel3 = createChannel(pulsar);
- channel3.start();
- Awaitility.await()
- .pollInterval(200, TimeUnit.MILLISECONDS)
- .atMost(5, TimeUnit.SECONDS)
- .untilAsserted(() -> assertEquals(
- channel3.getOwnerAsync(bundle).get(),
Optional.of(brokerId1)));
- channel3.close();
- FieldUtils.writeDeclaredField(channel2,
- "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
}
@Test(priority = 11)
@@ -1583,7 +1579,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
// verify getOwnerAsync times out because the owner is inactive now.
long start = System.currentTimeMillis();
var ex = expectThrows(ExecutionException.class, () ->
channel1.getOwnerAsync(bundle).get());
- assertTrue(ex.getCause() instanceof TimeoutException);
+ assertTrue(ex.getCause() instanceof IllegalStateException);
assertTrue(System.currentTimeMillis() - start >= 1000);
// simulate ownership cleanup(no selected owner) by the leader channel
@@ -1783,6 +1779,8 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
throws IllegalAccessException {
var tv = (TableViewImpl<ServiceUnitStateData>)
FieldUtils.readField(channel, "tableview", true);
+ var getOwnerRequests = (Map<String, CompletableFuture<String>>)
+ FieldUtils.readField(channel, "getOwnerRequests", true);
var cache = (ConcurrentMap<String, ServiceUnitStateData>)
FieldUtils.readField(tv, "data", true);
if(val == null){
@@ -1790,6 +1788,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
} else {
cache.put(serviceUnit, val);
}
+ getOwnerRequests.clear();
}
private static void cleanOpsCounters(ServiceUnitStateChannel channel)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
index f486370400c..d25cba2bd1b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
@@ -20,8 +20,6 @@ package org.apache.pulsar.broker.loadbalance.extensions.store;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertTrue;
import com.google.common.collect.Sets;
@@ -29,6 +27,7 @@ import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
@@ -40,7 +39,6 @@ import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ExecutionException;
@Test(groups = "broker")
public class LoadDataStoreTest extends MockedPulsarServiceBaseTest {
@@ -76,7 +74,7 @@ public class LoadDataStoreTest extends
MockedPulsarServiceBaseTest {
@Cleanup
LoadDataStore<MyClass> loadDataStore =
- LoadDataStoreFactory.create(pulsar.getClient(), topic,
MyClass.class);
+ LoadDataStoreFactory.create(pulsar, topic, MyClass.class);
loadDataStore.startProducer();
loadDataStore.startTableView();
MyClass myClass1 = new MyClass("1", 1);
@@ -110,7 +108,7 @@ public class LoadDataStoreTest extends
MockedPulsarServiceBaseTest {
@Cleanup
LoadDataStore<Integer> loadDataStore =
- LoadDataStoreFactory.create(pulsar.getClient(), topic,
Integer.class);
+ LoadDataStoreFactory.create(pulsar, topic, Integer.class);
loadDataStore.startProducer();
loadDataStore.startTableView();
@@ -135,7 +133,7 @@ public class LoadDataStoreTest extends
MockedPulsarServiceBaseTest {
public void testTableViewRestart() throws Exception {
String topic = TopicDomain.persistent + "://" +
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
LoadDataStore<Integer> loadDataStore =
- LoadDataStoreFactory.create(pulsar.getClient(), topic,
Integer.class);
+ LoadDataStoreFactory.create(pulsar, topic, Integer.class);
loadDataStore.startProducer();
loadDataStore.startTableView();
@@ -145,43 +143,26 @@ public class LoadDataStoreTest extends
MockedPulsarServiceBaseTest {
loadDataStore.closeTableView();
loadDataStore.pushAsync("1", 2).get();
- Exception ex = null;
- try {
- loadDataStore.get("1");
- } catch (IllegalStateException e) {
- ex = e;
- }
- assertNotNull(ex);
- loadDataStore.startTableView();
Awaitility.await().untilAsserted(() ->
assertEquals(loadDataStore.get("1").get(), 2));
+
+ loadDataStore.pushAsync("1", 3).get();
+ FieldUtils.writeField(loadDataStore, "tableViewLastUpdateTimestamp", 0
, true);
+ Awaitility.await().untilAsserted(() ->
assertEquals(loadDataStore.get("1").get(), 3));
}
@Test
public void testProducerStop() throws Exception {
String topic = TopicDomain.persistent + "://" +
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
LoadDataStore<Integer> loadDataStore =
- LoadDataStoreFactory.create(pulsar.getClient(), topic,
Integer.class);
+ LoadDataStoreFactory.create(pulsar, topic, Integer.class);
loadDataStore.startProducer();
loadDataStore.pushAsync("1", 1).get();
loadDataStore.removeAsync("1").get();
loadDataStore.close();
- try {
- loadDataStore.pushAsync("2", 2).get();
- fail();
- } catch (ExecutionException ex) {
- assertTrue(ex.getCause() instanceof IllegalStateException);
- }
- try {
- loadDataStore.removeAsync("2").get();
- fail();
- } catch (ExecutionException ex) {
- assertTrue(ex.getCause() instanceof IllegalStateException);
- }
- loadDataStore.startProducer();
- loadDataStore.pushAsync("3", 3).get();
- loadDataStore.removeAsync("3").get();
+ loadDataStore.pushAsync("2", 2).get();
+ loadDataStore.removeAsync("2").get();
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 560636f9462..d46f7fa1408 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -284,8 +284,14 @@ public class TableViewImpl<T> implements TableView<T> {
log.error("Reader {} was closed while reading tail
messages.",
reader.getTopic(), ex);
} else {
+ // Retrying on the other exceptions such as
NotConnectedException
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
log.warn("Reader {} was interrupted while reading tail
messages. "
- + "Retrying..", reader.getTopic(), ex);
+ + "Retrying..", reader.getTopic(), ex);
readTailMessages(reader);
}
return null;
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index e262b27fe23..b9707ea76c3 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -91,7 +91,6 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
"org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder");
brokerEnvs.put("forceDeleteNamespaceAllowed", "true");
brokerEnvs.put("loadBalancerDebugModeEnabled", "true");
- brokerEnvs.put("topicLevelPoliciesEnabled", "false");
brokerEnvs.put("PULSAR_MEM", "-Xmx512M");
spec.brokerEnvs(brokerEnvs);
pulsarCluster = PulsarCluster.forSpec(spec);