This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 15f4587  [tests] Make BrokerClientIntegrationTest testing behavior 
deterministic (#2585)
15f4587 is described below

commit 15f45871d930d3eae6792e9d1e93bded88c5df2d
Author: Sijie Guo <[email protected]>
AuthorDate: Fri Sep 21 08:45:46 2018 -0700

    [tests] Make BrokerClientIntegrationTest testing behavior deterministic 
(#2585)
    
    
    [tests] Make BrokerClientIntegrationTest testing behavior deterministic
    
    *Motivation*
    
    The test is flaky.
    
    ```
    2018-09-14\T\17:46:29.848 [ERROR] 
testUnsupportedBatchMessageConsumer(org.apache.pulsar.client.impl.BrokerClientIntegrationTest)
  Time elapsed: 3.161 s  <<< FAILURE!
    java.lang.AssertionError: Received message my-message-5 did not match the 
expected message my-message-0 expected [my-message-0] but found [my-message-5]
            at org.testng.Assert.fail(Assert.java:96)
            at org.testng.Assert.failNotEquals(Assert.java:776)
            at org.testng.Assert.assertEqualsImpl(Assert.java:137)
            at org.testng.Assert.assertEquals(Assert.java:118)
            at 
org.apache.pulsar.client.api.ProducerConsumerBase.testMessageOrderAndDuplicates(ProducerConsumerBase.java:51)
            at 
org.apache.pulsar.client.impl.BrokerClientIntegrationTest.testUnsupportedBatchMessageConsumer(BrokerClientIntegrationTest.java:357)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at 
org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
            at 
org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54)
            at 
org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44)
            at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    ```
    
    The problem is due to we used a `time` based batching policy in testing. 
There is no guarantee how messages can be batched, hence
    the ordering and duplication check can fail on shared subscription
    
    *Changes*
    
    Set batching delay to a very large value and make sure the messages are in 
one batch.
---
 .../client/impl/BrokerClientIntegrationTest.java       | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 0e47dfe..314ca69 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -292,16 +293,18 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
     public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) 
throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final int batchMessageDelayMs = 1000;
         final String topicName = "persistent://my-property/my-ns/my-topic1";
         final String subscriptionName = "my-subscriber-name" + subType;
 
         ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
                 
.subscriptionName(subscriptionName).subscriptionType(subType).subscribe();
 
+        final int numMessagesPerBatch = 10;
+
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
         Producer<byte[]> batchProducer = 
pulsarClient.newProducer().topic(topicName).enableBatching(true)
-                .batchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS).batchingMaxMessages(20).create();
+                .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.SECONDS)
+                .batchingMaxMessages(numMessagesPerBatch).create();
 
         // update consumer's version to incompatible batch-message version = 
Version.V3
         Topic topic = 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
@@ -315,13 +318,13 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
         versionField.set(cnx, 3);
 
         // (1) send non-batch message: consumer should be able to consume
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < numMessagesPerBatch; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
         Set<String> messageSet = Sets.newHashSet();
         Message<byte[]> msg = null;
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < numMessagesPerBatch; i++) {
             msg = consumer1.receive(1, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
             String expectedMessage = "my-message-" + i;
@@ -333,12 +336,11 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
         // verification
         consumer1.setClientCnx(null);
         // (2) send batch-message which should not be able to consume: as 
broker will disconnect the consumer
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < numMessagesPerBatch; i++) {
             String message = "my-message-" + i;
             batchProducer.sendAsync(message.getBytes());
         }
-
-        Thread.sleep(batchMessageDelayMs);
+        batchProducer.flush();
 
         // consumer should have not received any message as it should have 
been disconnected
         msg = consumer1.receive(2, TimeUnit.SECONDS);
@@ -349,7 +351,7 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
                 .subscribe();
 
         messageSet.clear();
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < numMessagesPerBatch; i++) {
             msg = consumer2.receive(1, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
             log.debug("Received message: [{}]", receivedMessage);

Reply via email to