This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 70cc682  Using Auto Consume consumer on a topic that doesn't have a 
schema doesn't work (#3909)
70cc682 is described below

commit 70cc68229de6c3e6620b60a20f2f4faa4b93ce6a
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Thu Mar 28 09:32:17 2019 -0700

    Using Auto Consume consumer on a topic that doesn't have a schema doesn't 
work (#3909)
---
 .../tests/integration/functions/PulsarFunctionsTest.java    | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 9c44a05..d8e5f50 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -743,7 +743,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         }
 
         // get function status
-        getFunctionStatus(functionName, numMessages);
+        getFunctionStatus(functionName, numMessages, true);
 
         // get function stats
         getFunctionStats(functionName, numMessages);
@@ -964,7 +964,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         }
     }
 
-    private static void getFunctionStatus(String functionName, int 
numMessages) throws Exception {
+    private static void getFunctionStatus(String functionName, int 
numMessages, boolean checkRestarts) throws Exception {
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             PulsarCluster.ADMIN_SCRIPT,
             "functions",
@@ -985,7 +985,9 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         
assertTrue(functionStatus.getInstances().get(0).getStatus().getLastInvocationTime()
 > 0);
         
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumReceived(), 
numMessages);
         
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumSuccessfullyProcessed(),
 numMessages);
-        
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumRestarts(), 
0);
+        if (checkRestarts) {
+            
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumRestarts(), 
0);
+        }
         
assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestUserExceptions().size(),
 0);
         
assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(),
 0);
     }
@@ -1121,8 +1123,9 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         // publish and consume result
         publishAndConsumeAvroMessages(inputTopicName, outputTopicName, 
numMessages);
 
-        // get function status
-        getFunctionStatus(functionName, numMessages);
+        // get function status. Note that this function might restart a few 
times until
+        // the producer above writes the messages.
+        getFunctionStatus(functionName, numMessages, false);
 
         // delete function
         deleteFunction(functionName);

Reply via email to