This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3c3c403c60745d5af7e709b13c5761b7d788e82b Author: Oneby Wang <[email protected]> AuthorDate: Sat May 30 16:16:16 2026 +0800 [fix][test] Fix flaky PulsarFunctionTlsTest.testFunctionsCreation() test (#25889) (cherry picked from commit a6af80198f26d31e991e880db3f9b7d601a579c2) --- .../functions/worker/PulsarFunctionTlsTest.java | 35 ++++++++++++---------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index 50d81c6efbf..f2e49c4d7b1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -20,8 +20,6 @@ package org.apache.pulsar.functions.worker; import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; @@ -42,10 +40,10 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.functions.FunctionConfig; -import org.apache.pulsar.common.functions.WorkerInfo; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.ClassLoaderUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -260,19 +258,16 @@ public class PulsarFunctionTlsTest { log.info(" -------- Start test function : {}", functionName); - int finalI = i; - Awaitility.await().atMost(1, TimeUnit.MINUTES).pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> { - final PulsarWorkerService workerService = ((PulsarWorkerService) fnWorkerServices[finalI]); - final LeaderService leaderService = workerService.getLeaderService(); - assertNotNull(leaderService); - if (leaderService.isLeader()) { - assertTrue(true); - } else { - final WorkerInfo workerInfo = workerService.getMembershipManager().getLeader(); - assertTrue(workerInfo != null - && !workerInfo.getWorkerId().equals(workerService.getWorkerConfig().getWorkerId())); - } - }); + final PulsarAdmin createAdmin = pulsarAdmins[i]; + // During function-worker leadership election/switchover, the coordination topic can already point to + // the new leader while that worker is still finishing its leader initialization. In that short window + // the internal /functions/leader request returns a transient 503 "Leader not yet ready", so retry only + // that condition and let all other failures surface immediately. + Awaitility.await().atMost(1, TimeUnit.MINUTES) + .pollInterval(1, TimeUnit.SECONDS) + .ignoreExceptionsMatching(PulsarFunctionTlsTest::isLeaderNotReady) + .untilAsserted(() -> createAdmin.functions() + .createFunctionWithUrl(functionConfig, jarFilePathUrl)); pulsarAdmins[i].functions().createFunctionWithUrl( functionConfig, jarFilePathUrl ); @@ -292,6 +287,14 @@ public class PulsarFunctionTlsTest { } } + private static boolean isLeaderNotReady(Throwable e) { + return e instanceof PulsarAdminException + && ((PulsarAdminException) e).getStatusCode() == 503 + && String.valueOf(((PulsarAdminException) e).getHttpError()) + .contains("Leader not yet ready"); + } + + protected static FunctionConfig createFunctionConfig( String jarFile, String tenant,
