Author: tabish
Date: Fri Jun 15 15:35:04 2012
New Revision: 1350657
URL: http://svn.apache.org/viewvc?rev=1350657&view=rev
Log:
fix and test for: https://issues.apache.org/jira/browse/AMQ-3873
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=1350657&r1=1350656&r2=1350657&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Fri Jun 15 15:35:04 2012
@@ -106,7 +106,9 @@ public class VMTransport implements Tran
public void dispatch(VMTransport transport, BlockingQueue<Object> pending,
Object command) {
TransportListener transportListener = transport.getTransportListener();
if (transportListener != null) {
- synchronized (started) {
+ // Lock here on the target transport's started since we want to
wait for its start()
+ // method to finish dispatching out of the queue before we do our
own.
+ synchronized (transport.started) {
// Ensure that no additional commands entered the queue in the
small time window
// before the start method locks the dispatch lock and the
oneway method was in
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java?rev=1350657&r1=1350656&r2=1350657&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
Fri Jun 15 15:35:04 2012
@@ -32,6 +32,8 @@ import org.apache.activemq.command.BaseC
import org.apache.activemq.command.Response;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.Wait;
@@ -107,6 +109,47 @@ public class VMTransportThreadSafeTest {
}
}
+ private class VMResponderTransportListener implements TransportListener {
+
+ protected final Queue<DummyCommand> received;
+
+ private final Transport peer;
+
+ public VMResponderTransportListener(Queue<DummyCommand> receiveQueue,
Transport peer) {
+ this.received = receiveQueue;
+ this.peer = peer;
+ }
+
+ @Override
+ public void onCommand(Object command) {
+
+ if (command instanceof ShutdownInfo) {
+ return;
+ } else {
+ received.add((DummyCommand) command);
+
+ if (peer != null) {
+ try {
+ peer.oneway(command);
+ } catch (IOException e) {
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onException(IOException error) {
+ }
+
+ @Override
+ public void transportInterupted() {
+ }
+
+ @Override
+ public void transportResumed() {
+ }
+ }
+
private class SlowVMTestTransportListener extends VMTestTransportListener {
private final TimeUnit delayUnit;
@@ -714,4 +757,118 @@ public class VMTransportThreadSafeTest {
return endTime - startTime;
}
+ @Test(timeout=120000)
+ public void testTwoWayTrafficWithMutexTransportSync1() throws Exception {
+
+ for (int i = 0; i < 20; ++i) {
+ doTestTwoWayTrafficWithMutexTransport(false, false);
+ }
+ }
+
+ @Test(timeout=120000)
+ public void testTwoWayTrafficWithMutexTransportSync2() throws Exception {
+
+ for (int i = 0; i < 20; ++i) {
+ doTestTwoWayTrafficWithMutexTransport(true, false);
+ }
+ }
+
+ @Test(timeout=120000)
+ public void testTwoWayTrafficWithMutexTransportSync3() throws Exception {
+
+ for (int i = 0; i < 20; ++i) {
+ doTestTwoWayTrafficWithMutexTransport(false, true);
+ }
+ }
+
+ @Test(timeout=120000)
+ public void testTwoWayTrafficWithMutexTransportSync4() throws Exception {
+
+ for (int i = 0; i < 20; ++i) {
+ doTestTwoWayTrafficWithMutexTransport(false, false);
+ }
+ }
+
+ public void doTestTwoWayTrafficWithMutexTransport(boolean localAsync,
boolean remoteAsync) throws Exception {
+
+ final VMTransport vmlocal = new VMTransport(new URI(location1));
+ final VMTransport vmremote = new VMTransport(new URI(location2));
+
+ final MutexTransport local = new MutexTransport(vmlocal);
+ final MutexTransport remote = new MutexTransport(vmremote);
+
+ final AtomicInteger sequenceId = new AtomicInteger();
+
+ vmlocal.setAsync(localAsync);
+ vmremote.setAsync(remoteAsync);
+
+ vmlocal.setPeer(vmremote);
+ vmremote.setPeer(vmlocal);
+
+ local.setTransportListener(new VMTestTransportListener(localReceived));
+ remote.setTransportListener(new
VMResponderTransportListener(remoteReceived, remote));
+
+ final int messageCount = 200000;
+
+ Thread localSend = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ for(int i = 0; i < messageCount; ++i) {
+ try {
+ local.oneway(new
DummyCommand(sequenceId.incrementAndGet()));
+ } catch (Exception e) {
+ }
+ }
+ }
+ });
+
+ Thread remoteSend = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ for(int i = 0; i < messageCount; ++i) {
+ try {
+ remote.oneway(new
DummyCommand(sequenceId.incrementAndGet()));
+ } catch (Exception e) {
+ }
+ }
+ }
+ });
+
+ localSend.start();
+ remoteSend.start();
+
+ Thread.sleep(10);
+
+ local.start();
+ remote.start();
+
+ // Wait for both to finish and then check that each side go the
correct amount
+ localSend.join();
+ remoteSend.join();
+
+ assertTrue("Remote should have received ("+messageCount+") but got ()"
+ remoteReceived.size(), Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return remoteReceived.size() == messageCount;
+ }
+ }));
+
+ assertTrue("Local should have received ("+messageCount*2+") but got
()" + localReceived.size(), Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return localReceived.size() == messageCount*2;
+ }
+ }));
+
+ LOG.debug("All messages sent,stop all");
+
+ local.stop();
+ remote.stop();
+
+ localReceived.clear();
+ remoteReceived.clear();
+ }
+
}