This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b3e8bdf10361db19e70e339d6704b33bdcdc1181
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Jun 8 23:41:13 2026 +0300

    [fix][test] Fix flaky ExtensibleLoadManagerImplTest.initializeState by 
recovering wedged channel ownership (#25977)
---
 .../ExtensibleLoadManagerImplBaseTest.java         | 77 +++++++++++++++++-----
 .../extensions/ExtensibleLoadManagerImplTest.java  |  3 +-
 2 files changed, 64 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
index d483938c621..e181fff8ace 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.loadbalance.extensions;
 
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Sets;
 import com.google.common.io.Resources;
 import java.util.ArrayList;
@@ -38,7 +39,6 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateM
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl;
 import 
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.LookupService;
@@ -48,6 +48,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -196,18 +197,39 @@ public abstract class ExtensibleLoadManagerImplBaseTest 
extends MockedPulsarServ
     }
 
     @BeforeMethod(alwaysRun = true)
-    protected void initializeState() throws PulsarAdminException, 
IllegalAccessException {
-        // After a prior test churned leader election, the channel-topic 
bundle can be left
-        // unserved ("not served by this instance"), making the unload's 
channel publish fail
-        // (HTTP 500) or hang server-side. monitor() only self-heals when 
there is *no* channel
-        // owner; it does NOT heal the case where an owner is recorded but the 
bundle is not
-        // actually served, so the unload below can never publish. Force-serve 
the channel topic
-        // each attempt: an admin lookup re-assigns the pulsar/system bundle 
and getStats makes
-        // the owner load the topic (the lookup layer alone can claim an owner 
that refuses to
-        // serve). Bound each unload attempt and fail loudly on exhaustion.
+    protected void initializeState() throws Exception {
+        // Reset to a clean state before each test: reconcile each broker's 
role with the channel
+        // ownership and unload the test namespace so no bundle ownership 
carries over. The unload
+        // publishes a state change on the channel system topic.
+        //
+        // A prior role-churning test (e.g. the direct 
playLeader()/playFollower() calls in
+        // testRoleChangeIdempotency) can leave the channel system topic owned 
by a broker that no
+        // longer serves it ("not served by this instance, redo the lookup"), 
with the channel
+        // producer stuck in escalating reconnect backoff, so the unload's 
channel publish keeps
+        // failing. Each unload attempt force-serves the channel topic (an 
admin lookup re-assigns
+        // the pulsar/system bundle and getStats makes the owner load it); if 
that still does not
+        // recover, force a clean channel owner via leader re-election (which 
reassigns and
+        // re-serves the channel topic and makes clients redo their lookups) 
and retry.
+        try {
+            awaitTestNamespaceUnloaded(30);
+        } catch (ConditionTimeoutException channelWedged) {
+            recoverChannelOwnership();
+            awaitTestNamespaceUnloaded(60);
+        }
+        reset(primaryLoadManager, secondaryLoadManager);
+        FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, 
true);
+        pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
+        pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
+    }
+
+    // Drive monitor() to reconcile roles and force-serve the channel topic 
(monitor() only
+    // self-heals when there is *no* channel owner, not when an owner is 
recorded but the bundle is
+    // not served), then unload. ignoreExceptions() retries transient 
channel-publish failures; each
+    // unload attempt is bounded so a synchronous unload cannot block longer 
than the retry window.
+    private void awaitTestNamespaceUnloaded(long atMostSeconds) {
         boolean systemTopicChannel =
                 
serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName());
-        Awaitility.await().atMost(120, TimeUnit.SECONDS)
+        Awaitility.await().atMost(atMostSeconds, TimeUnit.SECONDS)
                 .pollInterval(1, TimeUnit.SECONDS)
                 .ignoreExceptions()
                 .untilAsserted(() -> {
@@ -219,10 +241,35 @@ public abstract class ExtensibleLoadManagerImplBaseTest 
extends MockedPulsarServ
                     }
                     
admin.namespaces().unloadAsync(defaultTestNamespace).get(15, TimeUnit.SECONDS);
                 });
-        reset(primaryLoadManager, secondaryLoadManager);
-        FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, 
true);
-        pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
-        pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
+    }
+
+    /**
+     * Force a clean channel owner via leader re-election. After heavy direct
+     * playLeader()/playFollower() churn the channel system topic can be left 
owned by a broker
+     * that no longer serves it, leaving the channel producer stuck on a stale 
lookup in
+     * escalating reconnect backoff. Closing the current owner's 
LeaderElectionService moves
+     * ownership to the other broker; its playLeader() re-creates and 
re-serves the channel
+     * topic, and the ownership change makes clients redo their (stale) 
lookups.
+     */
+    private void recoverChannelOwnership() throws Exception {
+        boolean pulsar1Owns;
+        try {
+            pulsar1Owns = channel1.isChannelOwner();
+        } catch (Exception e) {
+            // Owner can't be determined (e.g. no channel owner now); default 
to moving to pulsar2.
+            pulsar1Owns = true;
+        }
+        PulsarService currentOwner = pulsar1Owns ? pulsar1 : pulsar2;
+        ServiceUnitStateChannelImpl newOwnerChannel = pulsar1Owns ? channel2 : 
channel1;
+        currentOwner.getLeaderElectionService().close();
+        try {
+            Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions()
+                    .untilAsserted(() -> 
assertTrue(newOwnerChannel.isChannelOwner()));
+        } catch (ConditionTimeoutException ignore) {
+            // Best effort: the subsequent unload retry is the real backstop.
+        } finally {
+            currentOwner.getLeaderElectionService().start();
+        }
     }
 
     protected void setPrimaryLoadManager() throws IllegalAccessException {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index dc518316159..4e1a70e9744 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -1570,7 +1570,8 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
     // and trip TestNG's ThreadTimeoutException mid-poll, and any failure to 
settle
     // is swallowed-and-logged rather than thrown. That matters because 
callers invoke this from
     // a finally block — a settling delay here must never replace (mask) the 
body's exception.
-    // The next test's initializeState() carries a 60s ignoreExceptions retry 
as the real backstop.
+    // The next test's initializeState() is the real backstop: it retries the 
unload and, if the
+    // channel stays wedged, forces a clean channel owner via leader 
re-election before retrying.
     private void awaitChannelOwnerStable() {
         try {
             Awaitility.await().atMost(20, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {

Reply via email to