This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 259275b  [tests] Fix the synchronization problem at 
BrokerClientIntegrationTest.testMaxConcurrentTopicLoading (#2595)
259275b is described below

commit 259275ba8df5750e64fa15781dc2a968959ce71f
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Mon Sep 17 13:36:50 2018 -0700

    [tests] Fix the synchronization problem at 
BrokerClientIntegrationTest.testMaxConcurrentTopicLoading (#2595)
    
    *Motivation*
    
    Following exception is observed in one of the CI jobs.
    
    ```
    java.lang.NullPointerException
        at 
org.apache.pulsar.common.util.FutureUtil.waitForAll(FutureUtil.java:44)
        at 
org.apache.pulsar.client.impl.BrokerClientIntegrationTest.testMaxConcurrentTopicLoading(BrokerClientIntegrationTest.java:601)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
        at 
org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54)
        at 
org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    ```
    
    The problem seems to be coming from improper synchronization in the tests.
    
    *Changes*
    
    Fix the synchronization problem in 
BrokerClientIntegrationTest.testMaxConcurrentTopicLoading
---
 .../pulsar/client/impl/BrokerClientIntegrationTest.java      | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index bf65fdd..0e47dfe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -583,7 +583,7 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
             ClientCnx cnx = producer.cnx();
             assertTrue(cnx.channel().isActive());
             ExecutorService executor = 
Executors.newFixedThreadPool(concurrentLookupRequests);
-            List<CompletableFuture<Producer<byte[]>>> futures = 
Lists.newArrayList();
+            final List<CompletableFuture<Producer<byte[]>>> futures = 
Lists.newArrayList();
             final int totalProducers = 10;
             CountDownLatch latch = new CountDownLatch(totalProducers);
             for (int i = 0; i < totalProducers; i++) {
@@ -591,14 +591,18 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
                     final String randomTopicName1 = topicName + 
randomUUID().toString();
                     final String randomTopicName2 = topicName + 
randomUUID().toString();
                     // pass producer-name to avoid exception: producer is 
already connected to topic
-                    
futures.add(pulsarClient2.newProducer().topic(randomTopicName1).createAsync());
-                    
futures.add(pulsarClient.newProducer().topic(randomTopicName2).createAsync());
+                    synchronized (futures) {
+                        
futures.add(pulsarClient2.newProducer().topic(randomTopicName1).createAsync());
+                        
futures.add(pulsarClient.newProducer().topic(randomTopicName2).createAsync());
+                    }
                     latch.countDown();
                 });
             }
 
             latch.await();
-            FutureUtil.waitForAll(futures).get();
+            synchronized (futures) {
+                FutureUtil.waitForAll(futures).get();
+            }
             pulsarClient.close();
             pulsarClient2.close();
         } finally {

Reply via email to