This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ff0d0eb2ba9 [fix][test] Fix flaky PulsarFunctionsK8STest (#25108)
ff0d0eb2ba9 is described below
commit ff0d0eb2ba97d96c8bc760a1dfe8255efa71d5c1
Author: Philipp Dolif <[email protected]>
AuthorDate: Sun Dec 28 09:32:36 2025 +0100
[fix][test] Fix flaky PulsarFunctionsK8STest (#25108)
---
.../integration/functions/k8s/PulsarFunctionsK8STest.java | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java
index 4671c82e4ea..307e503f0c0 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java
@@ -119,6 +119,14 @@ public class PulsarFunctionsK8STest extends
AbstractPulsarStandaloneK8STest {
});
log.info("Function created successfully");
+ log.info("Waiting for function to subscribe to input topic");
+ Awaitility.await().ignoreExceptions().atMost(Duration.ofSeconds(30))
+ .until(() -> {
+ admin.topics().getSubscriptions(inputTopicName);
+ return true;
+ });
+ log.info("Function subscribed to input topic");
+
// Validate that k8s secrets were provided as environment variables to
the function pod
String podName = "pf-%s-%s-%s-0".formatted(fnTenant, fnNamespace,
fnName);
Exec exec = new Exec(getApiClient());
@@ -157,12 +165,8 @@ public class PulsarFunctionsK8STest extends
AbstractPulsarStandaloneK8STest {
});
log.info("Starting function");
- // this seems to be flaky if the stopping of the function hasn't fully
completed when it's started again.
- // one way to reproduce is to remove the delay before starting the
function and also removing the pollDelay
- // from the await after stopFunction
- Thread.sleep(2000);
admin.functions().startFunction(fnTenant, fnNamespace, fnName);
-
Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(30))
+
Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(40))
.untilAsserted(() -> {
FunctionStatus functionStatus =
admin.functions().getFunctionStatus(fnTenant, fnNamespace, fnName);
assertThat(functionStatus.getNumInstances()).isEqualTo(1);