This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 15f4587 [tests] Make BrokerClientIntegrationTest testing behavior
deterministic (#2585)
15f4587 is described below
commit 15f45871d930d3eae6792e9d1e93bded88c5df2d
Author: Sijie Guo <[email protected]>
AuthorDate: Fri Sep 21 08:45:46 2018 -0700
[tests] Make BrokerClientIntegrationTest testing behavior deterministic
(#2585)
[tests] Make BrokerClientIntegrationTest testing behavior deterministic
*Motivation*
The test is flaky.
```
2018-09-14\T\17:46:29.848 [ERROR]
testUnsupportedBatchMessageConsumer(org.apache.pulsar.client.impl.BrokerClientIntegrationTest)
Time elapsed: 3.161 s <<< FAILURE!
java.lang.AssertionError: Received message my-message-5 did not match the
expected message my-message-0 expected [my-message-0] but found [my-message-5]
at org.testng.Assert.fail(Assert.java:96)
at org.testng.Assert.failNotEquals(Assert.java:776)
at org.testng.Assert.assertEqualsImpl(Assert.java:137)
at org.testng.Assert.assertEquals(Assert.java:118)
at
org.apache.pulsar.client.api.ProducerConsumerBase.testMessageOrderAndDuplicates(ProducerConsumerBase.java:51)
at
org.apache.pulsar.client.impl.BrokerClientIntegrationTest.testUnsupportedBatchMessageConsumer(BrokerClientIntegrationTest.java:357)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
at
org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54)
at
org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
The problem is due to we used a `time` based batching policy in testing.
There is no guarantee how messages can be batched, hence
the ordering and duplication check can fail on shared subscription
*Changes*
Set batching delay to a very large value and make sure the messages are in
one batch.
---
.../client/impl/BrokerClientIntegrationTest.java | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 0e47dfe..314ca69 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
@@ -292,16 +293,18 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
public void testUnsupportedBatchMessageConsumer(SubscriptionType subType)
throws Exception {
log.info("-- Starting {} test --", methodName);
- final int batchMessageDelayMs = 1000;
final String topicName = "persistent://my-property/my-ns/my-topic1";
final String subscriptionName = "my-subscriber-name" + subType;
ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).subscriptionType(subType).subscribe();
+ final int numMessagesPerBatch = 10;
+
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
Producer<byte[]> batchProducer =
pulsarClient.newProducer().topic(topicName).enableBatching(true)
- .batchingMaxPublishDelay(batchMessageDelayMs,
TimeUnit.MILLISECONDS).batchingMaxMessages(20).create();
+ .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.SECONDS)
+ .batchingMaxMessages(numMessagesPerBatch).create();
// update consumer's version to incompatible batch-message version =
Version.V3
Topic topic =
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
@@ -315,13 +318,13 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
versionField.set(cnx, 3);
// (1) send non-batch message: consumer should be able to consume
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < numMessagesPerBatch; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Set<String> messageSet = Sets.newHashSet();
Message<byte[]> msg = null;
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < numMessagesPerBatch; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
String expectedMessage = "my-message-" + i;
@@ -333,12 +336,11 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
// verification
consumer1.setClientCnx(null);
// (2) send batch-message which should not be able to consume: as
broker will disconnect the consumer
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < numMessagesPerBatch; i++) {
String message = "my-message-" + i;
batchProducer.sendAsync(message.getBytes());
}
-
- Thread.sleep(batchMessageDelayMs);
+ batchProducer.flush();
// consumer should have not received any message as it should have
been disconnected
msg = consumer1.receive(2, TimeUnit.SECONDS);
@@ -349,7 +351,7 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
.subscribe();
messageSet.clear();
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < numMessagesPerBatch; i++) {
msg = consumer2.receive(1, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);