Author: gtully
Date: Fri Feb 10 20:19:44 2012
New Revision: 1242912

URL: http://svn.apache.org/viewvc?rev=1242912&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3684 - resolve deadlock on blocked 
oneway, revise sync and lazy init, remove use of valve

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=1242912&r1=1242911&r2=1242912&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
 Fri Feb 10 20:19:44 2012
@@ -26,20 +26,14 @@ import org.apache.activemq.command.Shutd
 import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
-import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.thread.Valve;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.util.IOExceptionSupport;
-
 
 /**
  * A Transport implementation that uses direct method invocations.
- * 
- * 
  */
 public class VMTransport implements Transport, Task {
 
@@ -47,21 +41,23 @@ public class VMTransport implements Tran
     private static final AtomicLong NEXT_ID = new AtomicLong(0);
     protected VMTransport peer;
     protected TransportListener transportListener;
-    protected boolean disposed;
     protected boolean marshal;
     protected boolean network;
     protected boolean async = true;
     protected int asyncQueueDepth = 2000;
-    protected LinkedBlockingQueue<Object> messageQueue;
-    protected boolean started;
     protected final URI location;
     protected final long id;
-    private TaskRunner taskRunner;
-    private final Object lazyInitMutext = new Object();
-    private final Valve enqueueValve = new Valve(true);
-    protected final AtomicBoolean stopping = new AtomicBoolean();
+    protected LinkedBlockingQueue<Object> messageQueue = new 
LinkedBlockingQueue<Object>(this.asyncQueueDepth);
+    private TaskRunner taskRunner = 
DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, 
"VMTransport: " + toString());
+
     private volatile int receiveCounter;
-    
+
+    // Managed Sate access protected by locks.
+    protected final AtomicBoolean stopping = new AtomicBoolean();
+    protected final AtomicBoolean started = new AtomicBoolean();
+    protected final AtomicBoolean starting = new AtomicBoolean();
+    protected final AtomicBoolean disposed = new AtomicBoolean();
+
     public VMTransport(URI location) {
         this.location = location;
         this.id = NEXT_ID.getAndIncrement();
@@ -72,50 +68,52 @@ public class VMTransport implements Tran
     }
 
     public void oneway(Object command) throws IOException {
-        if (disposed) {
+        if (disposed.get()) {
             throw new TransportDisposedIOException("Transport disposed.");
         }
         if (peer == null) {
             throw new IOException("Peer not connected.");
         }
 
-        
-        TransportListener transportListener=null;
+        TransportListener transportListener = null;
         try {
-            // Disable the peer from changing his state while we try to 
enqueue onto him.
-            peer.enqueueValve.increment();
-        
-            if (peer.disposed || peer.stopping.get()) {
+            if (peer.disposed.get() || peer.stopping.get()) {
                 throw new TransportDisposedIOException("Peer (" + 
peer.toString() + ") disposed.");
             }
-            
-            if (peer.started) {
+
+            if (peer.started.get()) {
                 if (peer.async) {
-                    peer.getMessageQueue().put(command);
+                    peer.messageQueue.put(command);
                     peer.wakeup();
                 } else {
                     transportListener = peer.transportListener;
                 }
             } else {
-                peer.getMessageQueue().put(command);
+                peer.messageQueue.put(command);
+                synchronized (peer.starting) {
+                    if (peer.started.get() && !peer.messageQueue.isEmpty()) {
+                        // we missed the pending dispatch during start
+                        if (peer.async) {
+                            peer.wakeup();
+                        } else {
+                            transportListener = peer.transportListener;
+                        }
+                    }
+                }
             }
-            
         } catch (InterruptedException e) {
             InterruptedIOException iioe = new 
InterruptedIOException(e.getMessage());
             iioe.initCause(e);
             throw iioe;
-        } finally {
-            // Allow the peer to change state again...
-            peer.enqueueValve.decrement();
         }
-
         dispatch(peer, transportListener, command);
     }
-    
+
     public void dispatch(VMTransport transport, TransportListener 
transportListener, Object command) {
-        if( transportListener!=null ) {
-            if( command == DISCONNECT ) {
-                transportListener.onException(new 
TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
+        if (transportListener != null) {
+            if (command == DISCONNECT) {
+                transportListener.onException(
+                        new TransportDisposedIOException("Peer (" + 
peer.toString() + ") disposed."));
             } else {
                 transport.receiveCounter++;
                 transportListener.onCommand(command);
@@ -124,135 +122,81 @@ public class VMTransport implements Tran
     }
 
     public void start() throws Exception {
-        if (transportListener == null) {
-            throw new IOException("TransportListener not set.");
-        }
-        try {
-            enqueueValve.turnOff();
-            if (messageQueue != null && !async) {
+
+        if (starting.compareAndSet(false, true)) {
+
+            if (transportListener == null) {
+                throw new IOException("TransportListener not set.");
+            }
+
+            // ensure there is no missed dispatch during start, sync with 
oneway
+            synchronized (peer.starting) {
                 Object command;
-                while ((command = messageQueue.poll()) != null && 
!stopping.get() ) {
-                    receiveCounter++;
+                while ((command = messageQueue.poll()) != null && 
!stopping.get()) {
                     dispatch(this, transportListener, command);
                 }
+
+                if (!disposed.get()) {
+
+                    started.set(true);
+
+                    if (async) {
+                        taskRunner.wakeup();
+                    } else {
+                        messageQueue.clear();
+                        messageQueue = null;
+                        taskRunner.shutdown();
+                        taskRunner = null;
+                    }
+                }
             }
-            started = true;
-            wakeup();
-        } finally {
-            enqueueValve.turnOn();
-        }
-        // If we get stopped while starting up, then do the actual stop now 
-        // that the enqueueValve is back on.
-        if( stopping.get() ) {
-            stop();
         }
     }
 
     public void stop() throws Exception {
-        stopping.set(true);
-        
-        // If stop() is called while being start()ed.. then we can't stop 
until we return to the start() method.
-        if( enqueueValve.isOn() ) {
-               
+        if (disposed.compareAndSet(false, true)) {
+            stopping.set(true);
             // let the peer know that we are disconnecting..
             try {
                 peer.transportListener.onCommand(new ShutdownInfo());
             } catch (Exception ignore) {
             }
-               
-               
-            TaskRunner tr = null;
-            try {
-                enqueueValve.turnOff();
-                if (!disposed) {
-                    started = false;
-                    disposed = true;
-                    if (taskRunner != null) {
-                        tr = taskRunner;
-                        taskRunner = null;
-                    }
-                }
-            } finally {
-                stopping.set(false);
-                enqueueValve.turnOn();
+
+            if (messageQueue != null) {
+                messageQueue.clear();
             }
-            if (tr != null) {
-                tr.shutdown(1000);
+            if (taskRunner != null) {
+                taskRunner.shutdown(1000);
+                taskRunner = null;
             }
-            
-
         }
-        
     }
-    
+
     /**
      * @see org.apache.activemq.thread.Task#iterate()
      */
     public boolean iterate() {
-        
-        final TransportListener tl;
-        try {
-            // Disable changing the state variables while we are running... 
-            enqueueValve.increment();
-            tl = transportListener;
-            if (!started || disposed || tl == null || stopping.get()) {
-                if( stopping.get() ) {
-                    // drain the queue it since folks could be blocked putting 
on to
-                    // it and that would not allow the stop() method for 
finishing up.
-                    getMessageQueue().clear();  
-                }
-                return false;
-            }
-        } catch (InterruptedException e) {
+
+        if (disposed.get() || stopping.get()) {
             return false;
-        } finally {
-            enqueueValve.decrement();
         }
 
-        LinkedBlockingQueue<Object> mq = getMessageQueue();
+        LinkedBlockingQueue<Object> mq = messageQueue;
         Object command = mq.poll();
         if (command != null) {
-            if( command == DISCONNECT ) {
-                tl.onException(new TransportDisposedIOException("Peer (" + 
peer.toString() + ") disposed."));
+            if (command == DISCONNECT) {
+                transportListener.onException(new 
TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
             } else {
-                tl.onCommand(command);
+                transportListener.onCommand(command);
             }
             return !mq.isEmpty();
         } else {
             return false;
         }
-        
     }
 
     public void setTransportListener(TransportListener commandListener) {
-        try {
-            // enqueue can block on blocking queue, preventing turnOff
-            // so avoid in that case: 
https://issues.apache.org/jira/browse/AMQ-3684
-            if (async && getMessageQueue().remainingCapacity() == 0) {
-                // enqueue blocked or will be
-                this.transportListener = commandListener;
-                wakeup();
-            } else {
-                try {
-                    enqueueValve.turnOff();
-                    this.transportListener = commandListener;
-                    wakeup();
-                } finally {
-                    enqueueValve.turnOn();
-                }
-            }
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private LinkedBlockingQueue<Object> getMessageQueue() {
-        synchronized (lazyInitMutext) {
-            if (messageQueue == null) {
-                messageQueue = new 
LinkedBlockingQueue<Object>(this.asyncQueueDepth);
-            }
-            return messageQueue;
-        }
+        this.transportListener = commandListener;
     }
 
     public FutureResponse asyncRequest(Object command, ResponseCallback 
responseCallback) throws IOException {
@@ -336,11 +280,6 @@ public class VMTransport implements Tran
 
     protected void wakeup() {
         if (async) {
-            synchronized (lazyInitMutext) {
-                if (taskRunner == null) {
-                    taskRunner = 
DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, 
"VMTransport: " + toString());
-                }
-            }
             try {
                 taskRunner.wakeup();
             } catch (InterruptedException e) {
@@ -353,16 +292,16 @@ public class VMTransport implements Tran
         return false;
     }
 
-       public boolean isDisposed() {
-               return disposed;
-       }
-       
-       public boolean isConnected() {
-           return started;
-       }
+    public boolean isDisposed() {
+        return disposed.get();
+    }
 
-       public void reconnect(URI uri) throws IOException {
-        throw new IOException("Not supported");
+    public boolean isConnected() {
+        return started.get();
+    }
+
+    public void reconnect(URI uri) throws IOException {
+        throw new IOException("reconnection Not supported by this transport.");
     }
 
     public boolean isReconnectSupported() {
@@ -372,7 +311,8 @@ public class VMTransport implements Tran
     public boolean isUpdateURIsSupported() {
         return false;
     }
-    public void updateURIs(boolean reblance,URI[] uris) throws IOException {
+
+    public void updateURIs(boolean reblance, URI[] uris) throws IOException {
         throw new IOException("Not supported");
     }
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java?rev=1242912&r1=1242911&r2=1242912&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
 Fri Feb 10 20:19:44 2012
@@ -75,7 +75,7 @@ public class VMTransportServer implement
         connectionCount.incrementAndGet();
         VMTransport client = new VMTransport(location) {
             public void stop() throws Exception {
-               if (stopping.compareAndSet(false, true) && !disposed) {
+               if (stopping.compareAndSet(false, true) && !disposed.get()) {
                                        super.stop();
                                        if (connectionCount.decrementAndGet() 
== 0
                                                        && disposeOnDisconnect) 
{


Reply via email to