Repository: activemq
Updated Branches:
  refs/heads/master ccbbecb4a -> b8a20e9ef


skipped first dispatch on vmtransport needed a better test - reworked to avoid 
busy loop on full and ensured sync on started for enqueue. Sort 
FailoverStaticNetworkTest and NetworkOfTwentyBrokersTest intermittent failures


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b8a20e9e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b8a20e9e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b8a20e9e

Branch: refs/heads/master
Commit: b8a20e9ef655f184f80d06ad5bdb35fece297eca
Parents: ccbbecb
Author: gtully <[email protected]>
Authored: Wed May 13 13:56:24 2015 +0100
Committer: gtully <[email protected]>
Committed: Wed May 13 13:56:24 2015 +0100

----------------------------------------------------------------------
 .../activemq/transport/vm/VMTransport.java      | 14 +++-
 .../transport/vm/VMTransportBrokerNameTest.java | 85 +++++++++++++-------
 2 files changed, 68 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b8a20e9e/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
index e7aa729..d90473b 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
@@ -100,11 +100,23 @@ public class VMTransport implements Transport, Task {
 
             if (!peer.started.get()) {
                 LinkedBlockingQueue<Object> pending = peer.getMessageQueue();
+                int sleepTimeMillis;
                 boolean accepted = false;
                 do {
+                    sleepTimeMillis = 0;
+                    // the pending queue is drained on start so we need to 
ensure we add before
+                    // the drain commences, otherwise we never get the command 
dispatched!
                     synchronized (peer.started) {
-                        accepted = pending.offer(command);
+                        if (!peer.started.get()) {
+                            accepted = pending.offer(command);
+                            if (!accepted) {
+                                sleepTimeMillis = 500;
+                            }
+                        }
                     }
+                    // give start thread a chance if we will loop
+                    TimeUnit.MILLISECONDS.sleep(sleepTimeMillis);
+
                 } while (!accepted && !peer.started.get());
                 if (accepted) {
                     return;

http://git-wip-us.apache.org/repos/asf/activemq/blob/b8a20e9e/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
index 5183db8..50a81f6 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
@@ -19,12 +19,11 @@ package org.apache.activemq.transport.vm;
 import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-
 import javax.jms.Connection;
-
-import junit.framework.TestCase;
-
+import org.junit.Test;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerRegistry;
@@ -33,66 +32,92 @@ import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportListener;
 
-public class VMTransportBrokerNameTest extends TestCase {
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class VMTransportBrokerNameTest {
 
     private static final String MY_BROKER = "myBroker";
     final String vmUrl = "vm:(broker:(tcp://localhost:61616)/" + MY_BROKER + 
"?persistent=false)";
 
+    @Test
     public void testBrokerName() throws Exception {
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new 
URI(vmUrl));
         ActiveMQConnection c1 = (ActiveMQConnection) cf.createConnection();
         assertTrue("Transport has name in it: " + c1.getTransport(), 
c1.getTransport().toString().contains(MY_BROKER));
-        
+
         // verify Broker is there with name
         ActiveMQConnectionFactory cfbyName = new ActiveMQConnectionFactory(new 
URI("vm://" + MY_BROKER + "?create=false"));
         Connection c2 = cfbyName.createConnection();
-        
+
         assertNotNull(BrokerRegistry.getInstance().lookup(MY_BROKER));
         assertEquals(BrokerRegistry.getInstance().findFirst().getBrokerName(), 
MY_BROKER);
         assertEquals(BrokerRegistry.getInstance().getBrokers().size(), 1);
-        
+
         c1.close();
         c2.close();
     }
 
-    public void testBrokerInfoClientAsync() throws Exception {
+    @Test
+    public void testBrokerInfoReceiptClientAsync() throws Exception {
 
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new 
URI(vmUrl));
         ActiveMQConnection c1 = (ActiveMQConnection) cf.createConnection();
-        assertTrue("Transport has name in it: " + c1.getTransport(), 
c1.getTransport().toString().contains(MY_BROKER));
 
-        for (int i=0;i<20; i++) {
-            final CountDownLatch gotBrokerInfo = new CountDownLatch(1);
-            Transport transport = TransportFactory.connect(new URI("vm://" + 
MY_BROKER + "?async=false"));
-            transport.setTransportListener(new TransportListener() {
+        final int numIterations = 400;
+        final CountDownLatch successLatch = new CountDownLatch(numIterations);
+        ExecutorService executor = Executors.newFixedThreadPool(100);
+        for (int i = 0; i < numIterations; i++) {
+            executor.submit(new Runnable() {
                 @Override
-                public void onCommand(Object command) {
-                    if (command instanceof BrokerInfo) {
-                        gotBrokerInfo.countDown();
+                public void run() {
+                    try {
+                        verifyBrokerInfo(successLatch);
+                    } catch (Exception ignored) {
+                        ignored.printStackTrace();
                     }
                 }
+            });
+        }
 
-                @Override
-                public void onException(IOException error) {
+        executor.shutdown();
+        executor.awaitTermination(20, TimeUnit.SECONDS);
+        c1.close();
 
+        assertTrue("all success: " + successLatch.getCount(), 
successLatch.await(1, TimeUnit.SECONDS));
+    }
+
+    public void verifyBrokerInfo(CountDownLatch success) throws Exception {
+        final CountDownLatch gotBrokerInfo = new CountDownLatch(1);
+        Transport transport = TransportFactory.connect(new URI("vm://" + 
MY_BROKER + "?async=false"));
+        transport.setTransportListener(new TransportListener() {
+            @Override
+            public void onCommand(Object command) {
+                if (command instanceof BrokerInfo) {
+                    gotBrokerInfo.countDown();
                 }
+            }
 
-                @Override
-                public void transportInterupted() {
+            @Override
+            public void onException(IOException error) {
 
-                }
+            }
 
-                @Override
-                public void transportResumed() {
+            @Override
+            public void transportInterupted() {
 
-                }
-            });
-            transport.start();
+            }
 
-            assertTrue("got broker info on iteration:" + i, 
gotBrokerInfo.await(5, TimeUnit.SECONDS));
+            @Override
+            public void transportResumed() {
 
-            transport.stop();
+            }
+        });
+        transport.start();
+        if (gotBrokerInfo.await(5, TimeUnit.SECONDS)) {
+            success.countDown();
         }
-        c1.close();
+        transport.stop();
     }
 }

Reply via email to