This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 476881fab2da836aa0d593aa679f489a928f8a73 Author: Lari Hotari <[email protected]> AuthorDate: Mon Jun 8 23:41:13 2026 +0300 [fix][test] Fix flaky ExtensibleLoadManagerImplTest.initializeState by recovering wedged channel ownership (#25977) --- .../ExtensibleLoadManagerImplBaseTest.java | 77 +++++++++++++++++----- .../extensions/ExtensibleLoadManagerImplTest.java | 3 +- 2 files changed, 64 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java index ea74bd86127..afd8681e24d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.broker.loadbalance.extensions; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertTrue; import com.google.common.collect.Sets; import com.google.common.io.Resources; import java.util.ArrayList; @@ -38,7 +39,6 @@ import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateM import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl; import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder; import org.apache.pulsar.broker.testcontext.PulsarTestContext; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.LookupService; @@ -48,6 +48,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -196,18 +197,39 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ } @BeforeMethod(alwaysRun = true) - protected void initializeState() throws PulsarAdminException, IllegalAccessException { - // After a prior test churned leader election, the channel-topic bundle can be left - // unserved ("not served by this instance"), making the unload's channel publish fail - // (HTTP 500) or hang server-side. monitor() only self-heals when there is *no* channel - // owner; it does NOT heal the case where an owner is recorded but the bundle is not - // actually served, so the unload below can never publish. Force-serve the channel topic - // each attempt: an admin lookup re-assigns the pulsar/system bundle and getStats makes - // the owner load the topic (the lookup layer alone can claim an owner that refuses to - // serve). Bound each unload attempt and fail loudly on exhaustion. + protected void initializeState() throws Exception { + // Reset to a clean state before each test: reconcile each broker's role with the channel + // ownership and unload the test namespace so no bundle ownership carries over. The unload + // publishes a state change on the channel system topic. + // + // A prior role-churning test (e.g. the direct playLeader()/playFollower() calls in + // testRoleChangeIdempotency) can leave the channel system topic owned by a broker that no + // longer serves it ("not served by this instance, redo the lookup"), with the channel + // producer stuck in escalating reconnect backoff, so the unload's channel publish keeps + // failing. Each unload attempt force-serves the channel topic (an admin lookup re-assigns + // the pulsar/system bundle and getStats makes the owner load it); if that still does not + // recover, force a clean channel owner via leader re-election (which reassigns and + // re-serves the channel topic and makes clients redo their lookups) and retry. + try { + awaitTestNamespaceUnloaded(30); + } catch (ConditionTimeoutException channelWedged) { + recoverChannelOwnership(); + awaitTestNamespaceUnloaded(60); + } + reset(primaryLoadManager, secondaryLoadManager); + FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true); + pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true); + pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true); + } + + // Drive monitor() to reconcile roles and force-serve the channel topic (monitor() only + // self-heals when there is *no* channel owner, not when an owner is recorded but the bundle is + // not served), then unload. ignoreExceptions() retries transient channel-publish failures; each + // unload attempt is bounded so a synchronous unload cannot block longer than the retry window. + private void awaitTestNamespaceUnloaded(long atMostSeconds) { boolean systemTopicChannel = serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName()); - Awaitility.await().atMost(120, TimeUnit.SECONDS) + Awaitility.await().atMost(atMostSeconds, TimeUnit.SECONDS) .pollInterval(1, TimeUnit.SECONDS) .ignoreExceptions() .untilAsserted(() -> { @@ -219,10 +241,35 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ } admin.namespaces().unloadAsync(defaultTestNamespace).get(15, TimeUnit.SECONDS); }); - reset(primaryLoadManager, secondaryLoadManager); - FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true); - pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true); - pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true); + } + + /** + * Force a clean channel owner via leader re-election. After heavy direct + * playLeader()/playFollower() churn the channel system topic can be left owned by a broker + * that no longer serves it, leaving the channel producer stuck on a stale lookup in + * escalating reconnect backoff. Closing the current owner's LeaderElectionService moves + * ownership to the other broker; its playLeader() re-creates and re-serves the channel + * topic, and the ownership change makes clients redo their (stale) lookups. + */ + private void recoverChannelOwnership() throws Exception { + boolean pulsar1Owns; + try { + pulsar1Owns = channel1.isChannelOwner(); + } catch (Exception e) { + // Owner can't be determined (e.g. no channel owner now); default to moving to pulsar2. + pulsar1Owns = true; + } + PulsarService currentOwner = pulsar1Owns ? pulsar1 : pulsar2; + ServiceUnitStateChannelImpl newOwnerChannel = pulsar1Owns ? channel2 : channel1; + currentOwner.getLeaderElectionService().close(); + try { + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions() + .untilAsserted(() -> assertTrue(newOwnerChannel.isChannelOwner())); + } catch (ConditionTimeoutException ignore) { + // Best effort: the subsequent unload retry is the real backstop. + } finally { + currentOwner.getLeaderElectionService().start(); + } } protected void setPrimaryLoadManager() throws IllegalAccessException { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 05e9bfba6ef..24a62650125 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -1578,7 +1578,8 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase // and trip TestNG's ThreadTimeoutException mid-poll, and any failure to settle // is swallowed-and-logged rather than thrown. That matters because callers invoke this from // a finally block — a settling delay here must never replace (mask) the body's exception. - // The next test's initializeState() carries a 60s ignoreExceptions retry as the real backstop. + // The next test's initializeState() is the real backstop: it retries the unload and, if the + // channel stays wedged, forces a clean channel owner via leader re-election before retrying. private void awaitChannelOwnerStable() { try { Awaitility.await().atMost(20, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
