This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c5c2a50 [functions][integration tests] Ensure function subscription
exists before integration tests start producing messages (#2413)
c5c2a50 is described below
commit c5c2a50235b30f101f14b1a82d7d1d2f5278ae16
Author: Sijie Guo <[email protected]>
AuthorDate: Mon Aug 20 18:11:49 2018 -0700
[functions][integration tests] Ensure function subscription exists before
integration tests start producing messages (#2413)
### Motivation
There is a race condition in functions integration tests, where the
producer used for validation can start before
function runs. So a function might be missing some messages produced by the
producer. Hence it will fail the validation.
### Changes
Ensure function subscription exists before producer starts producing
messages to input topic.
---
.../tests/integration/functions/PulsarFunctionsTest.java | 13 +++++++++++++
1 file changed, 13 insertions(+)
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 57a6ba4..67136a6 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -542,6 +542,19 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
commands);
assertTrue(result.getStdout().contains("\"Created successfully\""));
+
+
+ // ensure the function subscription exists before we start producing
messages
+ try (PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build()) {
+ try (Consumer<String> ignored = client.newConsumer(Schema.STRING)
+ .topic(inputTopicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(String.format("public/default/%s",
functionName))
+ .subscribe()) {
+ }
+ }
}
private static void getFunctionInfoSuccess(String functionName) throws
Exception {