This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ff0d0eb2ba9 [fix][test] Fix flaky PulsarFunctionsK8STest (#25108)
ff0d0eb2ba9 is described below

commit ff0d0eb2ba97d96c8bc760a1dfe8255efa71d5c1
Author: Philipp Dolif <[email protected]>
AuthorDate: Sun Dec 28 09:32:36 2025 +0100

    [fix][test] Fix flaky PulsarFunctionsK8STest (#25108)
---
 .../integration/functions/k8s/PulsarFunctionsK8STest.java  | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java
index 4671c82e4ea..307e503f0c0 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java
@@ -119,6 +119,14 @@ public class PulsarFunctionsK8STest extends 
AbstractPulsarStandaloneK8STest {
         });
         log.info("Function created successfully");
 
+        log.info("Waiting for function to subscribe to input topic");
+        Awaitility.await().ignoreExceptions().atMost(Duration.ofSeconds(30))
+                .until(() -> {
+                    admin.topics().getSubscriptions(inputTopicName);
+                    return true;
+                });
+        log.info("Function subscribed to input topic");
+
         // Validate that k8s secrets were provided as environment variables to 
the function pod
         String podName = "pf-%s-%s-%s-0".formatted(fnTenant, fnNamespace, 
fnName);
         Exec exec = new Exec(getApiClient());
@@ -157,12 +165,8 @@ public class PulsarFunctionsK8STest extends 
AbstractPulsarStandaloneK8STest {
                 });
 
         log.info("Starting function");
-        // this seems to be flaky if the stopping of the function hasn't fully 
completed when it's started again.
-        // one way to reproduce is to remove the delay before starting the 
function and also removing the pollDelay
-        // from the await after stopFunction
-        Thread.sleep(2000);
         admin.functions().startFunction(fnTenant, fnNamespace, fnName);
-        
Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(30))
+        
Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(40))
                 .untilAsserted(() -> {
                     FunctionStatus functionStatus = 
admin.functions().getFunctionStatus(fnTenant, fnNamespace, fnName);
                     assertThat(functionStatus.getNumInstances()).isEqualTo(1);

Reply via email to