This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 70cc682 Using Auto Consume consumer on a topic that doesn't have a
schema doesn't work (#3909)
70cc682 is described below
commit 70cc68229de6c3e6620b60a20f2f4faa4b93ce6a
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Thu Mar 28 09:32:17 2019 -0700
Using Auto Consume consumer on a topic that doesn't have a schema doesn't
work (#3909)
---
.../tests/integration/functions/PulsarFunctionsTest.java | 13 ++++++++-----
1 file changed, 8 insertions(+), 5 deletions(-)
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 9c44a05..d8e5f50 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -743,7 +743,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
}
// get function status
- getFunctionStatus(functionName, numMessages);
+ getFunctionStatus(functionName, numMessages, true);
// get function stats
getFunctionStats(functionName, numMessages);
@@ -964,7 +964,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
}
}
- private static void getFunctionStatus(String functionName, int
numMessages) throws Exception {
+ private static void getFunctionStatus(String functionName, int
numMessages, boolean checkRestarts) throws Exception {
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
@@ -985,7 +985,9 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
assertTrue(functionStatus.getInstances().get(0).getStatus().getLastInvocationTime()
> 0);
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumReceived(),
numMessages);
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumSuccessfullyProcessed(),
numMessages);
-
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumRestarts(),
0);
+ if (checkRestarts) {
+
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumRestarts(),
0);
+ }
assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestUserExceptions().size(),
0);
assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(),
0);
}
@@ -1121,8 +1123,9 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
// publish and consume result
publishAndConsumeAvroMessages(inputTopicName, outputTopicName,
numMessages);
- // get function status
- getFunctionStatus(functionName, numMessages);
+ // get function status. Note that this function might restart a few
times until
+ // the producer above writes the messages.
+ getFunctionStatus(functionName, numMessages, false);
// delete function
deleteFunction(functionName);