This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3c3c403c60745d5af7e709b13c5761b7d788e82b
Author: Oneby Wang <[email protected]>
AuthorDate: Sat May 30 16:16:16 2026 +0800

    [fix][test] Fix flaky PulsarFunctionTlsTest.testFunctionsCreation() test 
(#25889)
    
    (cherry picked from commit a6af80198f26d31e991e880db3f9b7d601a579c2)
---
 .../functions/worker/PulsarFunctionTlsTest.java    | 35 ++++++++++++----------
 1 file changed, 19 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 50d81c6efbf..f2e49c4d7b1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -20,8 +20,6 @@ package org.apache.pulsar.functions.worker;
 
 import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Sets;
@@ -42,10 +40,10 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -260,19 +258,16 @@ public class PulsarFunctionTlsTest {
 
             log.info(" -------- Start test function : {}", functionName);
 
-            int finalI = i;
-            Awaitility.await().atMost(1, TimeUnit.MINUTES).pollInterval(1, 
TimeUnit.SECONDS).untilAsserted(() -> {
-                final PulsarWorkerService workerService = 
((PulsarWorkerService) fnWorkerServices[finalI]);
-                final LeaderService leaderService = 
workerService.getLeaderService();
-                assertNotNull(leaderService);
-                if (leaderService.isLeader()) {
-                    assertTrue(true);
-                } else {
-                    final WorkerInfo workerInfo = 
workerService.getMembershipManager().getLeader();
-                    assertTrue(workerInfo != null
-                            && 
!workerInfo.getWorkerId().equals(workerService.getWorkerConfig().getWorkerId()));
-                }
-            });
+            final PulsarAdmin createAdmin = pulsarAdmins[i];
+            // During function-worker leadership election/switchover, the 
coordination topic can already point to
+            // the new leader while that worker is still finishing its leader 
initialization. In that short window
+            // the internal /functions/leader request returns a transient 503 
"Leader not yet ready", so retry only
+            // that condition and let all other failures surface immediately.
+            Awaitility.await().atMost(1, TimeUnit.MINUTES)
+                    .pollInterval(1, TimeUnit.SECONDS)
+                    
.ignoreExceptionsMatching(PulsarFunctionTlsTest::isLeaderNotReady)
+                    .untilAsserted(() -> createAdmin.functions()
+                            .createFunctionWithUrl(functionConfig, 
jarFilePathUrl));
             pulsarAdmins[i].functions().createFunctionWithUrl(
                 functionConfig, jarFilePathUrl
             );
@@ -292,6 +287,14 @@ public class PulsarFunctionTlsTest {
         }
     }
 
+    private static boolean isLeaderNotReady(Throwable e) {
+        return e instanceof PulsarAdminException
+                && ((PulsarAdminException) e).getStatusCode() == 503
+                && String.valueOf(((PulsarAdminException) e).getHttpError())
+                        .contains("Leader not yet ready");
+    }
+
+
     protected static FunctionConfig createFunctionConfig(
         String jarFile,
         String tenant,

Reply via email to