This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 094ebf7 Fix instability in Pulsar Function window integration test
(#5337)
094ebf7 is described below
commit 094ebf7af8d0bb27bd9df3bcea2dba4e9ba204fc
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Tue Oct 8 08:48:49 2019 -0700
Fix instability in Pulsar Function window integration test (#5337)
* fix instability with tumbling window test
* cleaning up
* cleaning up
---
.../integration/functions/PulsarFunctionsTest.java | 47 ++++++++++++++--------
.../tests/integration/suites/PulsarTestSuite.java | 24 +++++++++--
2 files changed, 50 insertions(+), 21 deletions(-)
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 1d35c1b..e92371d 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -1015,7 +1015,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
.build();
@Cleanup
- Reader reader = client.newReader().startMessageId(MessageId.earliest)
+ Reader<byte[]> reader =
client.newReader().startMessageId(MessageId.earliest)
.topic(outputTopicName)
.create();
@@ -1030,12 +1030,18 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
}
int i = 0;
- while (reader.hasMessageAvailable()) {
- if (i >= expectedResults.length) {
+ while (true) {
+ if (i > expectedResults.length) {
Assertions.fail("More results than expected");
}
- String result = new
String(reader.readNext().getData()).split(":")[0];
- log.info("i: {} result: {}", i, result);
+
+ Message<byte[]> msg = reader.readNext(30, TimeUnit.SECONDS);
+ if (msg == null) {
+ break;
+ }
+ String msgStr = new String(msg.getData());
+ log.info("i: {} RECV: {}", i, msgStr);
+ String result = msgStr.split(":")[0];
assertThat(result).contains(expectedResults[i]);
i++;
}
@@ -1706,18 +1712,25 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
}
private static void getFunctionInfoNotFound(String functionName) throws
Exception {
- try {
- pulsarCluster.getAnyWorker().execCmd(
- PulsarCluster.ADMIN_SCRIPT,
- "functions",
- "get",
- "--tenant", "public",
- "--namespace", "default",
- "--name", functionName);
- fail("Command should have exited with non-zero");
- } catch (ContainerExecException e) {
- assertTrue(e.getResult().getStderr().contains("Reason: Function "
+ functionName + " doesn't exist"));
- }
+ retryStrategically(aVoid -> {
+ try {
+ pulsarCluster.getAnyWorker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName);
+ } catch (ContainerExecException e) {
+ if (e.getResult().getStderr().contains("Reason: Function " +
functionName + " doesn't exist")) {
+ return true;
+ }
+
+ } catch (Exception e) {
+
+ }
+ return false;
+ }, 5, 100, true);
}
private static void checkSubscriptionsCleanup(String topic) throws
Exception {
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
index 7fe4e46..4b8dde7 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
@@ -44,13 +44,29 @@ public class PulsarTestSuite extends PulsarClusterTestBase
implements ITest {
return "pulsar-test-suite";
}
- public static void retryStrategically(Predicate<Void> predicate, int
retryCount, long intSleepTimeInMillis)
+ public static void retryStrategically(Predicate<Void> predicate, int
retryCount, long intSleepTimeInMillis) throws Exception {
+ retryStrategically(predicate, retryCount, intSleepTimeInMillis, false);
+ }
+
+
+ public static void retryStrategically(Predicate<Void> predicate, int
retryCount, long intSleepTimeInMillis, boolean throwException)
throws Exception {
+
for (int i = 0; i < retryCount; i++) {
- if (predicate.test(null) || i == (retryCount - 1)) {
- break;
+ if (throwException) {
+ if (i == (retryCount - 1)) {
+ throw new RuntimeException("Action was not successful
after " + retryCount + " retries");
+ }
+ if (predicate.test(null)) {
+ break;
+ }
+ } else {
+ if (predicate.test(null) || i == (retryCount - 1)) {
+ break;
+ }
}
- Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
+
+ Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
}
}
}