This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b3c7f394c86b43bcd86fc2172a58b90e622b394a
Author: Oneby Wang <[email protected]>
AuthorDate: Sat May 30 16:16:16 2026 +0800

    [fix][test] Fix flaky PulsarFunctionTlsTest.testFunctionsCreation() test 
(#25889)
    
    (cherry picked from commit a6af80198f26d31e991e880db3f9b7d601a579c2)
---
 .../functions/worker/PulsarFunctionTlsTest.java    | 37 +++++++++++-----------
 1 file changed, 18 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index e23dd51dc4a..7872b795667 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -20,8 +20,6 @@ package org.apache.pulsar.functions.worker;
 
 import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Sets;
@@ -42,10 +40,10 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -260,23 +258,16 @@ public class PulsarFunctionTlsTest {
 
             log.info(" -------- Start test function : {}", functionName);
 
-            int finalI = i;
-            // Wait for a leader to be ready and create the function.
-            // The createFunctionWithUrl call is included in the retry loop 
because a leadership
-            // transition can happen between the leader check and the actual 
API call, causing
-            // a 503 "Leader not yet ready" error.
             final PulsarAdmin createAdmin = pulsarAdmins[i];
-            Awaitility.await().atMost(1, TimeUnit.MINUTES).pollInterval(1, 
TimeUnit.SECONDS).untilAsserted(() -> {
-                final PulsarWorkerService workerService = 
((PulsarWorkerService) fnWorkerServices[finalI]);
-                final LeaderService leaderService = 
workerService.getLeaderService();
-                assertNotNull(leaderService);
-                if (!leaderService.isLeader()) {
-                    final WorkerInfo workerInfo = 
workerService.getMembershipManager().getLeader();
-                    assertTrue(workerInfo != null
-                            && 
!workerInfo.getWorkerId().equals(workerService.getWorkerConfig().getWorkerId()));
-                }
-                createAdmin.functions().createFunctionWithUrl(functionConfig, 
jarFilePathUrl);
-            });
+            // During function-worker leadership election/switchover, the 
coordination topic can already point to
+            // the new leader while that worker is still finishing its leader 
initialization. In that short window
+            // the internal /functions/leader request returns a transient 503 
"Leader not yet ready", so retry only
+            // that condition and let all other failures surface immediately.
+            Awaitility.await().atMost(1, TimeUnit.MINUTES)
+                    .pollInterval(1, TimeUnit.SECONDS)
+                    
.ignoreExceptionsMatching(PulsarFunctionTlsTest::isLeaderNotReady)
+                    .untilAsserted(() -> createAdmin.functions()
+                            .createFunctionWithUrl(functionConfig, 
jarFilePathUrl));
 
             // Function creation is not strongly consistent, so this test can 
fail with a get that is too eager and
             // does not have retries.
@@ -293,6 +284,14 @@ public class PulsarFunctionTlsTest {
         }
     }
 
+    private static boolean isLeaderNotReady(Throwable e) {
+        return e instanceof PulsarAdminException
+                && ((PulsarAdminException) e).getStatusCode() == 503
+                && String.valueOf(((PulsarAdminException) e).getHttpError())
+                        .contains("Leader not yet ready");
+    }
+
+
     protected static FunctionConfig createFunctionConfig(
         String jarFile,
         String tenant,

Reply via email to