This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c5c2a50  [functions][integration tests] Ensure function subscription 
exists before integration tests start producing messages (#2413)
c5c2a50 is described below

commit c5c2a50235b30f101f14b1a82d7d1d2f5278ae16
Author: Sijie Guo <[email protected]>
AuthorDate: Mon Aug 20 18:11:49 2018 -0700

    [functions][integration tests] Ensure function subscription exists before 
integration tests start producing messages (#2413)
    
    ### Motivation
    
    There is a race condition in functions integration tests, where the 
producer used for validation can start before
    function runs. So a function might be missing some messages produced by the 
producer. Hence it will fail the validation.
    
     ### Changes
    
    Ensure function subscription exists before producer starts producing 
messages to input topic.
---
 .../tests/integration/functions/PulsarFunctionsTest.java    | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 57a6ba4..67136a6 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -542,6 +542,19 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             commands);
         assertTrue(result.getStdout().contains("\"Created successfully\""));
+
+
+        // ensure the function subscription exists before we start producing 
messages
+        try (PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build()) {
+            try (Consumer<String> ignored = client.newConsumer(Schema.STRING)
+                .topic(inputTopicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName(String.format("public/default/%s", 
functionName))
+                .subscribe()) {
+            }
+        }
     }
 
     private static void getFunctionInfoSuccess(String functionName) throws 
Exception {

Reply via email to