This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 10190242023 [fix][broker] Fix tableview divergence in
ServiceUnitStateTableViewSyncer causing flaky tests (#25946)
10190242023 is described below
commit 101902420230180f32efef387ed840885e3672ff
Author: Lari Hotari <[email protected]>
AuthorDate: Sun Jun 7 23:10:15 2026 +0300
[fix][broker] Fix tableview divergence in ServiceUnitStateTableViewSyncer
causing flaky tests (#25946)
---
.../channel/ServiceUnitStateTableViewSyncer.java | 186 +++++++++---
.../ExtensibleLoadManagerImplBaseTest.java | 16 +-
.../extensions/ExtensibleLoadManagerImplTest.java | 324 ++++++++++++++-------
3 files changed, 376 insertions(+), 150 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java
index bab2b989b2c..f6bf312f4de 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java
@@ -23,20 +23,26 @@ import static
org.apache.pulsar.broker.ServiceConfiguration.ServiceUnitTableView
import static
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.COMPACTION_THRESHOLD;
import static
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.configureSystemTopics;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
import lombok.Cleanup;
import lombok.CustomLog;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.jspecify.annotations.NonNull;
/**
* Defines ServiceUnitTableViewSyncer.
@@ -47,10 +53,15 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
public class ServiceUnitStateTableViewSyncer implements Closeable {
private static final int MAX_CONCURRENT_SYNC_COUNT = 100;
private static final int SYNC_WAIT_TIME_IN_SECS = 300;
+ private static final long RECONCILE_INTERVAL_IN_MILLIS = 5_000;
+ private static final BiConsumer<String, ServiceUnitStateData>
NOOP_CONSUMER = (__, ___) -> {
+ };
+ private volatile int syncWaitTimeInSecs = SYNC_WAIT_TIME_IN_SECS;
private PulsarService pulsar;
private volatile ServiceUnitStateTableView systemTopicTableView;
private volatile ServiceUnitStateTableView metadataStoreTableView;
private volatile boolean isActive = false;
+ private final ObjectWriter jsonWriter =
ObjectMapperFactory.getMapper().writer();
public void start(PulsarService pulsar)
@@ -82,53 +93,77 @@ public class ServiceUnitStateTableViewSyncer implements
Closeable {
}
private CompletableFuture<Void> syncToSystemTopic(String key,
ServiceUnitStateData data) {
- return systemTopicTableView.put(key, data);
+ return logIfFailed(sync(systemTopicTableView, key, data), key, data,
"systemTopic");
}
private CompletableFuture<Void> syncToMetadataStore(String key,
ServiceUnitStateData data) {
- return metadataStoreTableView.put(key, data);
+ return logIfFailed(sync(metadataStoreTableView, key, data), key, data,
"metadataStore");
}
- private void dummy(String key, ServiceUnitStateData data) {
+ private CompletableFuture<Void> sync(ServiceUnitStateTableView dst, String
key, ServiceUnitStateData data) {
+ // A null tail item is a tombstone: the source view removed the key.
Route it to
+ // delete() rather than put(): the metadata-store view's put() rejects
null
+ // (@NonNull) and the system-topic view's delete() is itself a
null-valued put(),
+ // so a uniform delete keeps both sync directions symmetric and
prevents a missed
+ // deletion from leaving the two views with different sizes (which
would make
+ // waitUntilSynced spin until the timeout budget).
+ return data == null ? dst.delete(key) : dst.put(key, data);
+ }
+
+ private CompletableFuture<Void> logIfFailed(CompletableFuture<Void>
future, String key,
+ ServiceUnitStateData data,
String dst) {
+ return future.whenComplete((__, e) -> {
+ if (e != null && !(e instanceof
PulsarClientException.AlreadyClosedException)) {
+ log.warn().attr("dst", dst).attr("serviceUnit",
key).attr("data", data).exception(e)
+ .log("Failed to sync tableview item; sizes may diverge
until the next update");
+ }
+ });
}
private void syncExistingItems()
throws IOException, ExecutionException, InterruptedException,
TimeoutException {
long started = System.currentTimeMillis();
+
@Cleanup
ServiceUnitStateTableView metadataStoreTableView = new
ServiceUnitStateMetadataStoreTableViewImpl();
metadataStoreTableView.start(
pulsar,
- this::dummy,
- this::dummy,
- this::dummy
+ NOOP_CONSUMER,
+ NOOP_CONSUMER,
+ NOOP_CONSUMER
);
@Cleanup
ServiceUnitStateTableView systemTopicTableView = new
ServiceUnitStateTableViewImpl();
systemTopicTableView.start(
pulsar,
- this::dummy,
- this::dummy,
- this::dummy
+ NOOP_CONSUMER,
+ NOOP_CONSUMER,
+ NOOP_CONSUMER
);
var syncer =
pulsar.getConfiguration().getLoadBalancerServiceUnitTableViewSyncer();
+ ServiceUnitStateTableView src;
+ ServiceUnitStateTableView dst;
if (syncer == SystemTopicToMetadataStoreSyncer) {
clean(metadataStoreTableView);
syncExistingItemsToMetadataStore(systemTopicTableView);
+ src = systemTopicTableView;
+ dst = metadataStoreTableView;
} else {
clean(systemTopicTableView);
syncExistingItemsToSystemTopic(metadataStoreTableView,
systemTopicTableView);
+ src = metadataStoreTableView;
+ dst = systemTopicTableView;
}
- if (!waitUntilSynced(metadataStoreTableView, systemTopicTableView,
started)) {
+ if (!waitUntilSynced(src, dst, started)) {
throw new TimeoutException(
syncer + " failed to sync existing items in tableviews.
MetadataStoreTableView.size: "
+ metadataStoreTableView.entrySet().size()
+ ", SystemTopicTableView.size: " +
systemTopicTableView.entrySet().size() + " in "
- + SYNC_WAIT_TIME_IN_SECS + " secs");
+ + syncWaitTimeInSecs + " secs");
}
log.info().attr("metadataStoreTableViewSize",
metadataStoreTableView.entrySet().size())
@@ -154,8 +189,8 @@ public class ServiceUnitStateTableViewSyncer implements
Closeable {
this.metadataStoreTableView.start(
pulsar,
this::syncToSystemTopic,
- this::dummy,
- this::dummy
+ NOOP_CONSUMER,
+ NOOP_CONSUMER
);
log.info("Started MetadataStoreTableView");
@@ -163,18 +198,20 @@ public class ServiceUnitStateTableViewSyncer implements
Closeable {
this.systemTopicTableView.start(
pulsar,
this::syncToMetadataStore,
- this::dummy,
- this::dummy
+ NOOP_CONSUMER,
+ NOOP_CONSUMER
);
log.info("Started SystemTopicTableView");
var syncer =
pulsar.getConfiguration().getLoadBalancerServiceUnitTableViewSyncer();
- if (!waitUntilSynced(metadataStoreTableView, systemTopicTableView,
started)) {
+ var src = syncer == SystemTopicToMetadataStoreSyncer ?
systemTopicTableView : metadataStoreTableView;
+ var dst = syncer == SystemTopicToMetadataStoreSyncer ?
metadataStoreTableView : systemTopicTableView;
+ if (!waitUntilSynced(src, dst, started)) {
throw new TimeoutException(
syncer + " failed to sync tableviews.
MetadataStoreTableView.size: "
+ metadataStoreTableView.entrySet().size()
+ ", SystemTopicTableView.size: " +
systemTopicTableView.entrySet().size() + " in "
- + SYNC_WAIT_TIME_IN_SECS + " secs");
+ + syncWaitTimeInSecs + " secs");
}
@@ -187,62 +224,132 @@ public class ServiceUnitStateTableViewSyncer implements
Closeable {
private void syncExistingItemsToMetadataStore(ServiceUnitStateTableView
src)
throws JsonProcessingException, ExecutionException,
InterruptedException, TimeoutException {
// Directly use store to sync existing items to
metadataStoreTableView(otherwise, they are conflicted out)
- var store = pulsar.getLocalMetadataStore();
- var writer = ObjectMapperFactory.getMapper().writer();
- var opTimeout =
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
List<CompletableFuture<Void>> futures = new ArrayList<>();
var srcIter = src.entrySet().iterator();
while (srcIter.hasNext()) {
var e = srcIter.next();
-
futures.add(store.put(ServiceUnitStateMetadataStoreTableViewImpl.PATH_PREFIX +
"/" + e.getKey(),
- writer.writeValueAsBytes(e.getValue()),
Optional.empty()).thenApply(__ -> null));
- if (futures.size() == MAX_CONCURRENT_SYNC_COUNT ||
!srcIter.hasNext()) {
- FutureUtil.waitForAll(futures).get(opTimeout,
TimeUnit.SECONDS);
- }
+ futures.add(writeToMetadataStore(e.getKey(), e.getValue()));
+ maybeWaitCompletion(futures, !srcIter.hasNext());
+ }
+ }
+
+ private void maybeWaitCompletion(List<CompletableFuture<Void>> futures,
boolean forceWait)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ if (!futures.isEmpty() && (futures.size() == MAX_CONCURRENT_SYNC_COUNT
|| forceWait)) {
+ FutureUtil.waitForAll(futures)
+
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
+ futures.clear();
}
}
+ private @NonNull CompletableFuture<Void> writeToMetadataStore(String key,
ServiceUnitStateData value)
+ throws JsonProcessingException {
+ return
pulsar.getLocalMetadataStore().put(ServiceUnitStateMetadataStoreTableViewImpl.PATH_PREFIX
+ "/" + key,
+ jsonWriter.writeValueAsBytes(value),
Optional.empty()).thenApply(__ -> null);
+ }
+
private void syncExistingItemsToSystemTopic(ServiceUnitStateTableView src,
ServiceUnitStateTableView dst)
throws ExecutionException, InterruptedException, TimeoutException {
- var opTimeout =
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
List<CompletableFuture<Void>> futures = new ArrayList<>();
var srcIter = src.entrySet().iterator();
while (srcIter.hasNext()) {
var e = srcIter.next();
futures.add(dst.put(e.getKey(), e.getValue()));
- if (futures.size() == MAX_CONCURRENT_SYNC_COUNT ||
!srcIter.hasNext()) {
- FutureUtil.waitForAll(futures).get(opTimeout,
TimeUnit.SECONDS);
- }
+ maybeWaitCompletion(futures, !srcIter.hasNext());
}
}
private void clean(ServiceUnitStateTableView dst)
throws ExecutionException, InterruptedException, TimeoutException {
- var opTimeout =
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
var dstIter = dst.entrySet().iterator();
List<CompletableFuture<Void>> futures = new ArrayList<>();
while (dstIter.hasNext()) {
var e = dstIter.next();
futures.add(dst.delete(e.getKey()));
- if (futures.size() == MAX_CONCURRENT_SYNC_COUNT ||
!dstIter.hasNext()) {
- FutureUtil.waitForAll(futures).get(opTimeout,
TimeUnit.SECONDS);
- }
+ maybeWaitCompletion(futures, !dstIter.hasNext());
}
}
- private boolean waitUntilSynced(ServiceUnitStateTableView srt,
ServiceUnitStateTableView dst, long started)
+ private boolean waitUntilSynced(ServiceUnitStateTableView src,
ServiceUnitStateTableView dst, long started)
throws InterruptedException {
- while (srt.entrySet().size() != dst.entrySet().size()) {
- if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() -
started)
- > SYNC_WAIT_TIME_IN_SECS) {
+ long lastReconciled = started;
+ while (src.entrySet().size() != dst.entrySet().size()) {
+ long now = System.currentTimeMillis();
+ if (TimeUnit.MILLISECONDS.toSeconds(now - started) >
syncWaitTimeInSecs) {
return false;
}
+ // Give in-flight syncs a grace period to settle on their own,
then reconcile
+ // periodically: updates that raced with the table views'
(re)start were replayed
+ // to the fresh views as existing items — which are deliberately
not wired to
+ // sync — so without reconciliation the views would never converge.
+ if (now - lastReconciled >= RECONCILE_INTERVAL_IN_MILLIS) {
+ log.debug().attr("srcSize",
src.entrySet().size()).attr("dstSize", dst.entrySet().size())
+ .attr("elapsedSecs",
TimeUnit.MILLISECONDS.toSeconds(now - started))
+ .log("Tableviews not synced yet; reconciling");
+ reconcile(src, dst, started);
+ lastReconciled = now;
+ }
Thread.sleep(100);
}
return true;
}
+ /**
+ * Copies items the destination table view is missing and removes stale
items that no longer
+ * exist in the source. Channel updates that land between the
existing-items copy and the
+ * registration of the tail listeners are only visible as existing items
of the freshly
+ * started views, so the tail listeners never see them. Writes flow to the
migration source
+ * while the syncer starts, making the source view authoritative;
destination-only items are
+ * removed only when they predate this sync phase and are still absent
from the source, so a
+ * concurrent fresh write to the destination is never discarded. Failures
are logged and left
+ * for the next reconcile pass. Runs on the caller's (load manager) thread
with each batch
+ * bounded by the metadata store operation timeout.
+ */
+ private void reconcile(ServiceUnitStateTableView src,
ServiceUnitStateTableView dst, long started)
+ throws InterruptedException {
+ // Snapshot the destination entries before iterating the source so
that a key arriving
+ // in the destination through a concurrent tail sync cannot be
misclassified as stale.
+ var staleDstItems = new HashMap<String, ServiceUnitStateData>();
+ for (var e : dst.entrySet()) {
+ staleDstItems.put(e.getKey(), e.getValue());
+ }
+ try {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (var e : src.entrySet()) {
+ if (staleDstItems.remove(e.getKey()) == null) {
+ log.info().attr("serviceUnit", e.getKey())
+ .log("Reconciling item missing from the
destination tableview");
+ if (dst.isMetadataStoreBased()) {
+ // Write directly to the store like
syncExistingItemsToMetadataStore
+ // does; the view's put() would conflict the item out.
+ futures.add(writeToMetadataStore(e.getKey(),
e.getValue()));
+ } else {
+ futures.add(dst.put(e.getKey(), e.getValue()));
+ }
+ maybeWaitCompletion(futures, false);
+ }
+ }
+ for (var e : staleDstItems.entrySet()) {
+ // Only remove items written before this sync phase began and
re-confirmed absent
+ // from the source: a fresh destination write (e.g. from a
broker already switched
+ // to the destination implementation) is propagated to the
source by the tail
+ // listener instead of being deleted here.
+ if (e.getValue().timestamp() < started && src.get(e.getKey())
== null) {
+ log.info().attr("serviceUnit", e.getKey())
+ .log("Reconciling stale item in the destination
tableview");
+ futures.add(dst.delete(e.getKey()));
+ maybeWaitCompletion(futures, false);
+ }
+ }
+ maybeWaitCompletion(futures, true);
+ } catch (IOException | ExecutionException | TimeoutException e) {
+ // Transient write failures leave a size divergence behind; the
next reconcile pass
+ // (or the sync-wait timeout) handles it.
+ log.warn().exception(e).log("Failed to reconcile tableview items");
+ }
+ }
+
@Override
public void close() throws IOException {
if (!isActive) {
@@ -282,4 +389,9 @@ public class ServiceUnitStateTableViewSyncer implements
Closeable {
public boolean isActive() {
return isActive;
}
+
+ @VisibleForTesting
+ public void setSyncWaitTimeInSecs(int syncWaitTimeInSecs) {
+ this.syncWaitTimeInSecs = syncWaitTimeInSecs;
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
index f31f932e8a3..1a6b7e6ddb4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -197,7 +198,20 @@ public abstract class ExtensibleLoadManagerImplBaseTest
extends MockedPulsarServ
@BeforeMethod(alwaysRun = true)
protected void initializeState() throws PulsarAdminException,
IllegalAccessException {
- admin.namespaces().unload(defaultTestNamespace);
+ // After a prior test churned leader election, the channel-topic
bundle can be left
+ // unserved ("not served by this instance"), making the unload's
channel publish fail
+ // (HTTP 500) or hang server-side until the background monitor task
(120s interval)
+ // reconciles the brokers' roles with the channel ownership. Drive
monitor() eagerly to
+ // heal that state, bound each unload attempt (a synchronous unload()
can block longer
+ // than the whole retry window), and fail loudly on exhaustion.
+ Awaitility.await().atMost(120, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .ignoreExceptions()
+ .untilAsserted(() -> {
+ primaryLoadManager.monitor();
+ secondaryLoadManager.monitor();
+
admin.namespaces().unloadAsync(defaultTestNamespace).get(15, TimeUnit.SECONDS);
+ });
reset(primaryLoadManager, secondaryLoadManager);
FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService,
true);
pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index dc111161bc3..58a9b56e79b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -1346,7 +1346,11 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
}
}
- @Test(priority = 200)
+ // Cap below the 300s suite default (AnnotationListener) so a hung
ServiceUnitStateTableViewSyncer
+ // start() fails fast and is retried instead of consuming the full 300s
slot and corrupting the
+ // next @BeforeMethod. Combined with the shortened sync-wait budget set
below, a real divergence
+ // surfaces within the shortened budget instead of a 5-minute
ThreadTimeoutException.
+ @Test(priority = 200, timeOut = 240 * 1000)
public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception {
// Make pulsar1 the leader so primaryLoadManager is the syncer-running
broker.
makePrimaryAsLeader();
@@ -1376,122 +1380,144 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
String syncerType =
serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName())
? "SystemTopicToMetadataStoreSyncer" :
"MetadataStoreToSystemTopicSyncer";
+ // Shrink the sync-wait budget on both brokers' live syncers BEFORE
the first start()
+ // (driven by monitor() below) so any tableview-size divergence fails
in ~30s with the
+ // syncer's own TimeoutException instead of spinning for the full 300s
default — the
+ // exact hang observed in CI happened inside that first start().
+
primaryLoadManager.getServiceUnitStateTableViewSyncer().setSyncWaitTimeInSecs(30);
+
secondaryLoadManager.getServiceUnitStateTableViewSyncer().setSyncWaitTimeInSecs(30);
+
pulsar.getAdminClient().brokers()
.updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer",
syncerType);
Awaitility.await().untilAsserted(() ->
assertTrue(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
- primaryLoadManager.monitor();
- Awaitility.await().atMost(30, TimeUnit.SECONDS)
- .untilAsserted(() ->
assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer()
- .isActive()));
-
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
-
- // === Phase 2: add a 3rd broker using the OTHER table view impl ===
- // pulsar1/pulsar2 use serviceUnitStateTableViewClassName; pulsar3
deliberately
- // uses the other one so the test exercises cross-impl lookups
regardless of
- // which parametrization we're running.
- String otherClassName =
-
serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName())
- ?
ServiceUnitStateMetadataStoreTableViewImpl.class.getName()
- : ServiceUnitStateTableViewImpl.class.getName();
-
- ServiceConfiguration crossImplConf = getDefaultConf();
- crossImplConf.setAllowAutoTopicCreation(true);
- crossImplConf.setForceDeleteNamespaceAllowed(true);
-
crossImplConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName());
-
crossImplConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
-
crossImplConf.setLoadManagerServiceUnitStateTableViewClassName(otherClassName);
-
- try (var crossImplCtx =
createAdditionalPulsarTestContext(crossImplConf)) {
- var pulsar3 = crossImplCtx.getPulsarService();
-
- // All three brokers (across both impls) must agree on topic
ownership.
-
assertEquals(pulsar2.getAdminClient().lookups().lookupTopic(topic),
ownershipBefore);
-
assertEquals(pulsar3.getAdminClient().lookups().lookupTopic(topic),
ownershipBefore);
- Optional<LookupResult> webUrlPulsar3 =
-
pulsar3.getNamespaceService().getLookupResultForWebRequest(bundle, options);
- assertTrue(webUrlPulsar3.isPresent());
- assertEquals(webUrlPulsar3.get().getLookupData().getHttpUrl(),
- webUrlBefore.get().getLookupData().getHttpUrl());
-
- // SLA monitor and heartbeat lookups must agree across impls in
every direction.
- List<PulsarService> brokers = List.of(pulsar1, pulsar2, pulsar3);
- for (PulsarService viewer : brokers) {
- for (PulsarService owner : brokers) {
- assertLookupHeartbeatOwner(viewer, owner.getBrokerId(),
owner.getBrokerServiceUrl());
- assertLookupSLANamespaceOwner(viewer, owner.getBrokerId(),
owner.getBrokerServiceUrl());
+ // Drive monitor() inside the await so a transient start() failure
(swallowed by
+ // monitor()'s catch) is retried instead of waiting for the 120s
background monitor task.
+ Awaitility.await().atMost(120, TimeUnit.SECONDS).untilAsserted(() -> {
+ primaryLoadManager.monitor();
+
assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
+ });
+
+ try {
+
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
+
+ // === Phase 2: add a 3rd broker using the OTHER table view impl
===
+ // pulsar1/pulsar2 use serviceUnitStateTableViewClassName; pulsar3
deliberately
+ // uses the other one so the test exercises cross-impl lookups
regardless of
+ // which parametrization we're running.
+ String otherClassName =
+
serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName())
+ ?
ServiceUnitStateMetadataStoreTableViewImpl.class.getName()
+ : ServiceUnitStateTableViewImpl.class.getName();
+
+ ServiceConfiguration crossImplConf = getDefaultConf();
+ crossImplConf.setAllowAutoTopicCreation(true);
+ crossImplConf.setForceDeleteNamespaceAllowed(true);
+
crossImplConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName());
+
crossImplConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+
crossImplConf.setLoadManagerServiceUnitStateTableViewClassName(otherClassName);
+
+ try (var crossImplCtx =
createAdditionalPulsarTestContext(crossImplConf)) {
+ var pulsar3 = crossImplCtx.getPulsarService();
+
+ // All three brokers (across both impls) must agree on topic
ownership.
+
assertEquals(pulsar2.getAdminClient().lookups().lookupTopic(topic),
ownershipBefore);
+
assertEquals(pulsar3.getAdminClient().lookups().lookupTopic(topic),
ownershipBefore);
+ Optional<LookupResult> webUrlPulsar3 =
+
pulsar3.getNamespaceService().getLookupResultForWebRequest(bundle, options);
+ assertTrue(webUrlPulsar3.isPresent());
+ assertEquals(webUrlPulsar3.get().getLookupData().getHttpUrl(),
+ webUrlBefore.get().getLookupData().getHttpUrl());
+
+ // SLA monitor and heartbeat lookups must agree across impls
in every direction.
+ List<PulsarService> brokers = List.of(pulsar1, pulsar2,
pulsar3);
+ for (PulsarService viewer : brokers) {
+ for (PulsarService owner : brokers) {
+ assertLookupHeartbeatOwner(viewer,
owner.getBrokerId(), owner.getBrokerServiceUrl());
+ assertLookupSLANamespaceOwner(viewer,
owner.getBrokerId(), owner.getBrokerServiceUrl());
+ }
}
- }
- // === Phase 3: simulate the cross-impl broker going offline ===
- // Its SLA namespace must reassign to a remaining broker, and the
ownership
- // change must propagate through the syncer to brokers using the
other impl.
- var wrapper3 = (ExtensibleLoadManagerWrapper)
pulsar3.getLoadManager().get();
- var loadManager3 = (ExtensibleLoadManagerImpl)
- FieldUtils.readField(wrapper3, "loadManager", true);
- ServiceUnitStateChannel channel3 = (ServiceUnitStateChannel)
- FieldUtils.readField(loadManager3,
"serviceUnitStateChannel", true);
- channel3.cleanOwnerships();
- // Set state to Closed BEFORE deleting the ZK node to prevent the
notification
- // handler's session-expiry recovery from auto-re-registering
broker3. In
- // production the PulsarService shuts down after unregister(), so
the handler
- // never fires; in tests the service stays running and creates a
race.
- var registry3 = (BrokerRegistryImpl)
loadManager3.getBrokerRegistry();
- registry3.state.set(BrokerRegistryImpl.State.Closed);
- pulsar3.getLocalMetadataStore()
- .delete("/loadbalance/brokers/" + pulsar3.getBrokerId(),
Optional.empty()).get();
-
- String slaMonitorTopic =
getSLAMonitorNamespace(pulsar3.getBrokerId(), pulsar.getConfiguration())
- .getPersistentTopicName("test");
- String pulsar3BrokerUrl = pulsar3.getBrokerServiceUrl();
- Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
- String reassigned =
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
- assertNotNull(reassigned);
- assertNotEquals(reassigned, pulsar3BrokerUrl);
- });
+ // === Phase 3: simulate the cross-impl broker going offline
===
+ // Its SLA namespace must reassign to a remaining broker, and
the ownership
+ // change must propagate through the syncer to brokers using
the other impl.
+ var wrapper3 = (ExtensibleLoadManagerWrapper)
pulsar3.getLoadManager().get();
+ var loadManager3 = (ExtensibleLoadManagerImpl)
+ FieldUtils.readField(wrapper3, "loadManager", true);
+ ServiceUnitStateChannel channel3 = (ServiceUnitStateChannel)
+ FieldUtils.readField(loadManager3,
"serviceUnitStateChannel", true);
+ channel3.cleanOwnerships();
+ // Set state to Closed BEFORE deleting the ZK node to prevent
the notification
+ // handler's session-expiry recovery from auto-re-registering
broker3. In
+ // production the PulsarService shuts down after unregister(),
so the handler
+ // never fires; in tests the service stays running and creates
a race.
+ var registry3 = (BrokerRegistryImpl)
loadManager3.getBrokerRegistry();
+ registry3.state.set(BrokerRegistryImpl.State.Closed);
+ pulsar3.getLocalMetadataStore()
+ .delete("/loadbalance/brokers/" +
pulsar3.getBrokerId(), Optional.empty()).get();
+
+ String slaMonitorTopic =
getSLAMonitorNamespace(pulsar3.getBrokerId(), pulsar.getConfiguration())
+ .getPersistentTopicName("test");
+ String pulsar3BrokerUrl = pulsar3.getBrokerServiceUrl();
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+ String reassigned =
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+ assertNotNull(reassigned);
+ assertNotEquals(reassigned, pulsar3BrokerUrl);
+ });
- // Send a message while the topic is owned by the reassigned
broker; this must
- // remain durable when ownership migrates back below.
- @Cleanup
- Producer<String> producer =
pulsar.getClient().newProducer(Schema.STRING)
- .topic(slaMonitorTopic).create();
- producer.send("offline");
-
- // === Phase 4: re-register the broker and verify ownership
returns ===
- registry3.state.set(BrokerRegistryImpl.State.Started);
- registry3.registerAsync().get();
- Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() ->
-
assertEquals(pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic),
- pulsar3.getBrokerServiceUrl()));
-
- // Same producer reconnects to the new owner; a fresh producer
also works.
- producer.send("after-reconnect");
- @Cleanup
- Producer<String> producer2 =
pulsar.getClient().newProducer(Schema.STRING)
- .topic(slaMonitorTopic).create();
- producer2.send("from-new-producer");
+ // Send a message while the topic is owned by the reassigned
broker; this must
+ // remain durable when ownership migrates back below.
+ @Cleanup
+ Producer<String> producer =
pulsar.getClient().newProducer(Schema.STRING)
+ .topic(slaMonitorTopic).create();
+ producer.send("offline");
- @Cleanup
- Consumer<String> consumer =
pulsar.getClient().newConsumer(Schema.STRING)
- .topic(slaMonitorTopic)
-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscriptionName("test")
- .subscribe();
- assertEquals(consumer.receive().getValue(), "offline");
- assertEquals(consumer.receive().getValue(), "after-reconnect");
- assertEquals(consumer.receive().getValue(), "from-new-producer");
- }
+ // === Phase 4: re-register the broker and verify ownership
returns ===
+ registry3.state.set(BrokerRegistryImpl.State.Started);
+ registry3.registerAsync().get();
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() ->
+
assertEquals(pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic),
+ pulsar3.getBrokerServiceUrl()));
- // === Phase 5: disable the syncer and verify it deactivates ===
- pulsar.getAdminClient().brokers()
-
.deleteDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer");
- Awaitility.await().untilAsserted(() ->
-
assertFalse(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
- primaryLoadManager.monitor();
- Awaitility.await().atMost(30, TimeUnit.SECONDS)
- .untilAsserted(() ->
assertFalse(primaryLoadManager.getServiceUnitStateTableViewSyncer()
- .isActive()));
-
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
+ // Same producer reconnects to the new owner; a fresh producer
also works.
+ producer.send("after-reconnect");
+ @Cleanup
+ Producer<String> producer2 =
pulsar.getClient().newProducer(Schema.STRING)
+ .topic(slaMonitorTopic).create();
+ producer2.send("from-new-producer");
+
+ @Cleanup
+ Consumer<String> consumer =
pulsar.getClient().newConsumer(Schema.STRING)
+ .topic(slaMonitorTopic)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName("test")
+ .subscribe();
+ assertEquals(consumer.receive().getValue(), "offline");
+ assertEquals(consumer.receive().getValue(), "after-reconnect");
+ assertEquals(consumer.receive().getValue(),
"from-new-producer");
+ }
+ } finally {
+ // === Phase 5: disable the syncer and verify it deactivates ===
+ // Guarantee the dynamic config is removed and the syncer is
driven inactive even if
+ // the body threw, so the syncer cannot stay enabled and poison
later tests. Note this
+ // cannot tear down a start() that failed before isActive=true
(close() short-circuits
+ // on !isActive); leftover tail views from such a partial start
are recovered by the
+ // next successful start(), and the next test's initializeState()
retry absorbs any
+ // residual channel disruption.
+ try {
+ pulsar.getAdminClient().brokers()
+
.deleteDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer");
+ } catch (Exception e) {
+ log.warn().exception(e).log("Failed to delete syncer dynamic
config in cleanup");
+ }
+ Awaitility.await().atMost(60,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+
assertFalse(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled());
+ primaryLoadManager.monitor();
+ secondaryLoadManager.monitor();
+
assertFalse(primaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
+
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
+ });
+ }
}
private void assertLookupHeartbeatOwner(PulsarService pulsar,
@@ -1552,7 +1578,52 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
});
}
- @Test(timeOut = 30 * 1000, priority = 2100)
+ // After a test churns leader election, the channel-topic bundle can be
transiently
+ // unowned and the channel producer can be in reconnect backoff. The next
@BeforeMethod
+ // (initializeState -> namespaces().unload(...)) publishes a state change
on the channel
+ // topic; if it runs in that window the producer send times out (HTTP
500). Give the
+ // re-election a best-effort chance to settle before yielding to the next
test.
+ //
+ // This is a best-effort smoothing wait, not an assertion: the budget is
deliberately a
+ // fraction (20s) of the callers' 60s method timeout so it cannot consume
the whole slot
+ // and trip TestNG's ThreadTimeoutException mid-poll, and any failure to
settle
+ // is swallowed-and-logged rather than thrown. That matters because
callers invoke this from
+ // a finally block — a settling delay here must never replace (mask) the
body's exception.
+ // The next test's initializeState() carries a 60s ignoreExceptions retry
as the real backstop.
+ private void awaitChannelOwnerStable() {
+ try {
+ Awaitility.await().atMost(20,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+ // monitor() reconciles each broker's role with the channel
ownership and
+ // re-serves the channel-topic bundle if the leadership churn
left it unserved
+ // ("not served by this instance") — the same self-healing the
120s background
+ // monitor task provides, driven eagerly so the next test does
not start inside
+ // the broken window.
+ primaryLoadManager.monitor();
+ secondaryLoadManager.monitor();
+ Optional<String> owner1 =
channel1.getChannelOwnerAsync().get(5, TimeUnit.SECONDS);
+ Optional<String> owner2 =
channel2.getChannelOwnerAsync().get(5, TimeUnit.SECONDS);
+ assertTrue(owner1.isPresent());
+ assertEquals(owner1, owner2);
+ assertTrue(channel1.isChannelOwner() ^
channel2.isChannelOwner());
+ // Probe that the channel topic is actually served: the lookup
re-assigns the
+ // pulsar/system bundle if it is unowned, and getStats proves
the owner loads
+ // the topic (the lookup layer alone can claim an owner that
refuses to serve).
+ String channelTopic = ServiceUnitStateTableViewImpl.TOPIC;
+
assertNotNull(pulsar.getAdminClient().lookups().lookupTopic(channelTopic));
+ if (serviceUnitStateTableViewClassName.equals(
+ ServiceUnitStateTableViewImpl.class.getName())) {
+
assertNotNull(pulsar.getAdminClient().topics().getStats(channelTopic));
+ }
+ });
+ } catch (Throwable t) {
+ log.warn().exception(t).log("Channel owner did not stabilize
within the best-effort window; "
+ + "relying on the next test's initializeState() retry");
+ }
+ }
+
+ // 60s: the body's repeated role transitions plus the trailing
awaitChannelOwnerStable() can
+ // exceed 30s under load.
+ @Test(timeOut = 60 * 1000, priority = 2100)
public void testRoleChangeIdempotency() throws Exception {
makePrimaryAsLeader();
@@ -1632,7 +1703,8 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
secondaryLoadManager.getRole());
-
+ // Confirm a stable channel owner before yielding to the next
test's @BeforeMethod.
+ awaitChannelOwnerStable();
}
@DataProvider(name = "noChannelOwnerMonitorHandler")
@@ -1640,7 +1712,9 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
return new Object[][] { { true }, { false } };
}
- @Test(dataProvider = "noChannelOwnerMonitorHandler", timeOut = 30 * 1000,
priority = 2101)
+ // 60s: the body's leader-election churn plus the trailing
awaitChannelOwnerStable() can
+ // exceed 30s under load.
+ @Test(dataProvider = "noChannelOwnerMonitorHandler", timeOut = 60 * 1000,
priority = 2101)
public void testHandleNoChannelOwner(boolean noChannelOwnerMonitorHandler)
throws Exception {
makePrimaryAsLeader();
@@ -1707,10 +1781,25 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
// clean up for monitor test
pulsar1.getLeaderElectionService().start();
pulsar2.getLeaderElectionService().start();
+ // If the body failed mid-churn, both restarted elections can keep
flapping the
+ // leadership between the brokers, leaving the channel-topic
bundle unserved
+ // beyond what the next test's initializeState() retry can absorb.
Force a
+ // deterministic single leader before stabilizing (best-effort:
must not mask
+ // the body's exception).
+ try {
+ makePrimaryAsLeader();
+ } catch (Throwable t) {
+ log.warn().exception(t).log("Failed to re-establish primary as
leader in cleanup");
+ }
+ // Re-establish a stable channel owner before yielding to the next
test's
+ // @BeforeMethod, which publishes to the channel topic via
namespace unload.
+ awaitChannelOwnerStable();
}
}
- @Test(timeOut = 30 * 1000, priority = 2000)
+ // 60s: the body's role transitions plus the trailing
awaitChannelOwnerStable() can exceed
+ // 30s under load (observed locally at 30.017s).
+ @Test(timeOut = 60 * 1000, priority = 2000)
public void testRoleChange() throws Exception {
makePrimaryAsLeader();
@@ -1730,6 +1819,11 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
new NamespaceBundleStats()));
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+ // The internal topics live in the pulsar/system bundle; if the
leadership churn
+ // left it unserved, only a monitor() role reconciliation
re-serves it (the
+ // background monitor task would take up to 120s) — drive it while
waiting.
+ leader.monitor();
+ follower.monitor();
assertNotNull(FieldUtils.readDeclaredField(leader.getTopBundlesLoadDataStore(),
"tableView", true));
@@ -1767,6 +1861,9 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
topBundlesExpected.getTopBundlesLoadData().get(0).stats().msgRateIn =
1;
Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+ // Same monitor()-driven healing as above for the post-transfer
assertions.
+ leader2.monitor();
+ follower2.monitor();
assertNotNull(FieldUtils.readDeclaredField(leader2.getTopBundlesLoadDataStore(),
"tableView", true));
assertNull(FieldUtils.readDeclaredField(follower2.getTopBundlesLoadDataStore(),
"tableView", true));
@@ -1791,6 +1888,9 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
follower2.getBrokerLoadDataStore().pushAsync(key,
brokerLoadExpected).get(3, TimeUnit.SECONDS);
follower2.getTopBundlesLoadDataStore().pushAsync(bundle,
topBundlesExpected)
.get(3, TimeUnit.SECONDS);
+
+ // Confirm a stable channel owner before yielding to the next test's
@BeforeMethod.
+ awaitChannelOwnerStable();
}
@SuppressWarnings("deprecation")