Author: rgodfrey
Date: Tue Jul 22 12:18:33 2014
New Revision: 1612555

URL: http://svn.apache.org/r1612555
Log:
QPID-5576 : Detect closure of sockets better and do not leave threads waiting 
for input which will never come

Modified:
    
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
    
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
    
qpid/trunk/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java
    
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
    
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
    
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
    
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java
    
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java
    
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
    
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1612555&r1=1612554&r2=1612555&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
 Tue Jul 22 12:18:33 2014
@@ -239,7 +239,12 @@ public class MessageConsumerImpl impleme
     public MessageImpl receive() throws JMSException
     {
         checkClosed();
-        return receiveImpl(-1L);
+        MessageImpl message = receiveImpl(-1L);
+        if(message == null)
+        {
+            throw new JMSException("Message could not be retrieved");
+        }
+        return message;
     }
 
     public MessageImpl receive(final long timeout) throws JMSException

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1612555&r1=1612554&r2=1612555&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
 Tue Jul 22 12:18:33 2014
@@ -891,9 +891,10 @@ public class SessionImpl implements Sess
         {
             synchronized(getLock())
             {
-                while(!_closed)
+
+                while(!(_closed || getClientSession().getEndpoint().isEnded()))
                 {
-                    while(!_closed && (!_started || (_recoveredMessage == null 
&& _messageConsumerList.isEmpty())))
+                    while(!(_closed || 
getClientSession().getEndpoint().isEnded()) && (!_started || (_recoveredMessage 
== null && _messageConsumerList.isEmpty())))
                     {
                         try
                         {
@@ -904,7 +905,7 @@ public class SessionImpl implements Sess
                             return;
                         }
                     }
-                    while(!_closed && (_started && (_recoveredMessage != null 
|| !_messageConsumerList.isEmpty())))
+                    while(!(_closed || 
getClientSession().getEndpoint().isEnded()) && (_started && (_recoveredMessage 
!= null || !_messageConsumerList.isEmpty())))
                     {
                         Message msg;
 

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java?rev=1612555&r1=1612554&r2=1612555&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java
 Tue Jul 22 12:18:33 2014
@@ -20,6 +20,18 @@
  */
 package org.apache.qpid.amqp_1_0.client.websocket;
 
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocketClient;
+import org.eclipse.jetty.websocket.WebSocketClientFactory;
+
 import org.apache.qpid.amqp_1_0.client.ConnectionException;
 import org.apache.qpid.amqp_1_0.client.TransportProvider;
 import org.apache.qpid.amqp_1_0.codec.FrameWriter;
@@ -29,16 +41,6 @@ import org.apache.qpid.amqp_1_0.framing.
 import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
 import org.apache.qpid.amqp_1_0.type.FrameBody;
 import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
-import org.eclipse.jetty.websocket.WebSocket;
-import org.eclipse.jetty.websocket.WebSocketClient;
-import org.eclipse.jetty.websocket.WebSocketClientFactory;
-
-import javax.net.ssl.SSLContext;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
 
 class WebSocketProvider implements TransportProvider
 {
@@ -49,6 +51,7 @@ class WebSocketProvider implements Trans
     private static QueuedThreadPool _threadPool;
     private final String _transport;
     private static WebSocketClientFactory _factory;
+    private WebSocket.Connection _connection;
 
     public WebSocketProvider(final String transport)
     {
@@ -134,7 +137,7 @@ class WebSocketProvider implements Trans
                                                                                
                (byte)1,
                                                                                
                (byte)0,
                                                                                
                (byte)0),
-                                                                   saslOut,
+                                                                   
saslOut.asFrameSource(),
                                                                    new 
HeaderFrameSource((byte)'A',
                                                                                
                (byte)'M',
                                                                                
                (byte)'Q',
@@ -143,7 +146,7 @@ class WebSocketProvider implements Trans
                                                                                
                (byte)1,
                                                                                
                (byte)0,
                                                                                
                (byte)0),
-                                                                   out);
+                                                                   
out.asFrameSource());
 
                 conn.setSaslFrameOutput(saslOut);
             }
@@ -157,13 +160,13 @@ class WebSocketProvider implements Trans
                                                                                
                (byte)1,
                                                                                
                (byte)0,
                                                                                
                (byte)0),
-                                                               out);
+                                                               
out.asFrameSource());
             }
 
             final ConnectionHandler handler = new ConnectionHandler(conn);
             conn.setFrameOutputHandler(out);
             final URI uri = new URI(_transport +"://"+ address+":"+ port +"/");
-            WebSocket.Connection connection = client.open(uri, new 
WebSocket.OnBinaryMessage()
+            _connection = client.open(uri, new WebSocket.OnBinaryMessage()
             {
                 public void onOpen(Connection connection)
                 {
@@ -192,6 +195,11 @@ class WebSocketProvider implements Trans
 
     }
 
+    @Override
+    public void close()
+    {
+        _connection.close();
+    }
 
 
     public static class HeaderFrameSource implements 
ConnectionHandler.FrameSource
@@ -225,6 +233,12 @@ class WebSocketProvider implements Trans
             return _closed;
         }
 
+        @Override
+        public void close()
+        {
+            _closed = true;
+        }
+
     }
 
 

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java?rev=1612555&r1=1612554&r2=1612555&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
 Tue Jul 22 12:18:33 2014
@@ -25,8 +25,10 @@ import java.security.Principal;
 import java.util.ServiceLoader;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.qpid.amqp_1_0.framing.ExceptionHandler;
+import javax.net.ssl.SSLContext;
+
 import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
+import org.apache.qpid.amqp_1_0.framing.ExceptionHandler;
 import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
 import org.apache.qpid.amqp_1_0.transport.Container;
 import org.apache.qpid.amqp_1_0.transport.Predicate;
@@ -37,8 +39,6 @@ import org.apache.qpid.amqp_1_0.type.tra
 import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
 import org.apache.qpid.amqp_1_0.type.transport.Error;
 
-import javax.net.ssl.SSLContext;
-
 public class Connection implements ExceptionHandler
 {
     private static final int MAX_FRAME_SIZE = 65536;
@@ -225,7 +225,7 @@ public class Connection implements Excep
                                                                                
                        (byte)1,
                                                                                
                        (byte)0,
                                                                                
                        (byte)0),
-                                                               new 
ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()),
+                                                               new 
ConnectionHandler.FrameToBytesSourceAdapter(saslOut.asFrameSource(),_conn.getDescribedTypeRegistry()),
                                                                new 
ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
                                                                                
                        (byte)'M',
                                                                                
                        (byte)'Q',
@@ -234,7 +234,7 @@ public class Connection implements Excep
                                                                                
                        (byte)1,
                                                                                
                        (byte)0,
                                                                                
                        (byte)0),
-                                                               new 
ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
+                                                               new 
ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),_conn.getDescribedTypeRegistry())
             );
 
             _conn.setSaslFrameOutput(saslOut);
@@ -249,7 +249,7 @@ public class Connection implements Excep
                                                                                
                        (byte)1,
                                                                                
                        (byte)0,
                                                                                
                        (byte)0),
-                                                               new 
ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
+                                                               new 
ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),_conn.getDescribedTypeRegistry())
             );
         }
 
@@ -258,7 +258,14 @@ public class Connection implements Excep
         transportProvider.connect(_conn,address,port, sslContext, this);
 
 
-        _conn.open();
+        try
+        {
+            _conn.open();
+        }
+        catch(RuntimeException e)
+        {
+            transportProvider.close();
+        }
 
     }
 
@@ -295,7 +302,14 @@ public class Connection implements Excep
     {
         if(getEndpoint().isClosed())
         {
-            throw new 
ConnectionClosedException(getEndpoint().getRemoteError());
+            Error remoteError = getEndpoint().getRemoteError();
+            if(remoteError == null)
+            {
+                remoteError = new Error();
+                remoteError.setDescription("Connection closed for unknown 
reason");
+
+            }
+            throw new ConnectionClosedException(remoteError);
         }
     }
 
@@ -377,7 +391,7 @@ public class Connection implements Excep
         if(_connectionErrorTask != null)
         {
             Thread thread = new Thread(_connectionErrorTask);
-            thread.run();
+            thread.start();
         }
     }
 }

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java?rev=1612555&r1=1612554&r2=1612555&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
 Tue Jul 22 12:18:33 2014
@@ -20,26 +20,41 @@
  */
 package org.apache.qpid.amqp_1_0.client;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
+
 import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
 import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
 import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.Predicate;
 import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
-
-import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.Modified;
+import org.apache.qpid.amqp_1_0.type.messaging.Released;
 import org.apache.qpid.amqp_1_0.type.messaging.Source;
 import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.amqp_1_0.type.transport.Detach;
 import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeoutException;
+import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
 
 public class Receiver implements DeliveryStateHandler
 {
@@ -193,7 +208,8 @@ public class Receiver implements Deliver
     {
         if(_remoteErrorTask != null)
         {
-            _remoteErrorTask.run();
+            Thread thread = new Thread(_remoteErrorTask);
+            thread.start();
         }
     }
 

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java?rev=1612555&r1=1612554&r2=1612555&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
 Tue Jul 22 12:18:33 2014
@@ -20,6 +20,13 @@
  */
 package org.apache.qpid.amqp_1_0.client;
 
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
 import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructor;
 import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
 import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
@@ -27,22 +34,21 @@ import org.apache.qpid.amqp_1_0.transpor
 import org.apache.qpid.amqp_1_0.transport.Predicate;
 import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
-import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Section;
 import org.apache.qpid.amqp_1_0.type.Source;
 import org.apache.qpid.amqp_1_0.type.Target;
 import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeoutException;
-
+import org.apache.qpid.amqp_1_0.type.transport.Detach;
 import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
 
 public class Sender implements DeliveryStateHandler
 {
@@ -488,7 +494,8 @@ public class Sender implements DeliveryS
     {
         if(_remoteErrorTask != null)
         {
-            _remoteErrorTask.run();
+            Thread thread = new Thread(_remoteErrorTask);
+            thread.start();
         }
     }
 

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java?rev=1612555&r1=1612554&r2=1612555&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java
 Tue Jul 22 12:18:33 2014
@@ -26,6 +26,9 @@ import java.io.OutputStream;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLSocket;
@@ -39,6 +42,9 @@ import org.apache.qpid.amqp_1_0.type.Sas
 
 class TCPTransportProvier implements TransportProvider
 {
+    private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
+
+    private Socket _socket;
     private final String _transport;
     
     // Defines read socket timeout in milliseconds.  A value of 0 means that 
the socket
@@ -49,6 +55,7 @@ class TCPTransportProvier implements Tra
     // the event of a SocketTimeoutException.  A value of -1L will disable 
idle read timeout checking.
     // Default value is set to -1L, which means disable idle read checks.
     private long _readIdleTimeout = 
Long.getLong("qpid.connection_read_idle_timeout", -1L);
+    private final AtomicLong _threadNameIndex = new AtomicLong();
 
     public TCPTransportProvier(final String transport)
     {
@@ -64,7 +71,6 @@ class TCPTransportProvier implements Tra
     {
         try
         {
-            final Socket s;
             if(sslContext != null)
             {
                 final SSLSocketFactory socketFactory = 
sslContext.getSocketFactory();
@@ -72,16 +78,16 @@ class TCPTransportProvier implements Tra
                 SSLSocket sslSocket = (SSLSocket) 
socketFactory.createSocket(address, port);
 
                 
conn.setExternalPrincipal(sslSocket.getSession().getLocalPrincipal());
-                s=sslSocket;
+                _socket=sslSocket;
             }
             else
             {
-                s = new Socket(address, port);
+                _socket = new Socket(address, port);
             }
             // set socket read timeout
-            s.setSoTimeout(_readTimeout);
+            _socket.setSoTimeout(_readTimeout);
 
-            conn.setRemoteAddress(s.getRemoteSocketAddress());
+            conn.setRemoteAddress(_socket.getRemoteSocketAddress());
 
             ConnectionHandler.FrameOutput<FrameBody> out = new 
ConnectionHandler.FrameOutput<FrameBody>(conn);
 
@@ -99,7 +105,7 @@ class TCPTransportProvier implements Tra
                                                                                
                            (byte)1,
                                                                                
                            (byte)0,
                                                                                
                            (byte)0),
-                                                                   new 
ConnectionHandler.FrameToBytesSourceAdapter(saslOut,conn.getDescribedTypeRegistry()),
+                                                                   new 
ConnectionHandler.FrameToBytesSourceAdapter(saslOut.asFrameSource(),conn.getDescribedTypeRegistry()),
                                                                    new 
ConnectionHandler.HeaderBytesSource(conn, (byte)'A',
                                                                                
                            (byte)'M',
                                                                                
                            (byte)'Q',
@@ -108,7 +114,7 @@ class TCPTransportProvier implements Tra
                                                                                
                            (byte)1,
                                                                                
                            (byte)0,
                                                                                
                            (byte)0),
-                                                                   new 
ConnectionHandler.FrameToBytesSourceAdapter(out,conn.getDescribedTypeRegistry())
+                                                                   new 
ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),conn.getDescribedTypeRegistry())
                 );
 
                 conn.setSaslFrameOutput(saslOut);
@@ -123,22 +129,24 @@ class TCPTransportProvier implements Tra
                                                                                
                            (byte)1,
                                                                                
                            (byte)0,
                                                                                
                            (byte)0),
-                                                                   new 
ConnectionHandler.FrameToBytesSourceAdapter(out,conn.getDescribedTypeRegistry())
+                                                                   new 
ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),conn.getDescribedTypeRegistry())
                 );
             }
 
 
-            final OutputStream outputStream = s.getOutputStream();
+            final OutputStream outputStream = _socket.getOutputStream();
             ConnectionHandler.BytesOutputHandler outputHandler =
                     new ConnectionHandler.BytesOutputHandler(outputStream, 
src, conn, exceptionHandler);
-            Thread outputThread = new Thread(outputHandler);
+            long threadIndex = _threadNameIndex.getAndIncrement();
+            Thread outputThread = new Thread(outputHandler, 
"QpidConnectionOutputThread-"+threadIndex);
+
             outputThread.setDaemon(true);
             outputThread.start();
             conn.setFrameOutputHandler(out);
 
 
             final ConnectionHandler handler = new ConnectionHandler(conn);
-            final InputStream inputStream = s.getInputStream();
+            final InputStream inputStream = _socket.getInputStream();
 
             Thread inputThread = new Thread(new Runnable()
             {
@@ -153,21 +161,11 @@ class TCPTransportProvier implements Tra
                     {
                         if(conn.closedForInput() && conn.closedForOutput())
                         {
-                            try
-                            {
-                                synchronized (outputStream)
-                                {
-                                    s.close();
-                                }
-                            }
-                            catch (IOException e)
-                            {
-                                e.printStackTrace();  //To change body of 
catch statement use File | Settings | File Templates.
-                            }
+                            close();
                         }
                     }
                 }
-            });
+            },"QpidConnectionInputThread-"+threadIndex);
 
             inputThread.setDaemon(true);
             inputThread.start();
@@ -178,6 +176,20 @@ class TCPTransportProvier implements Tra
             throw new ConnectionException(e);
         }
     }
+
+    @Override
+    public void close()
+    {
+        try
+        {
+            _socket.close();
+        }
+        catch (IOException e)
+        {
+            RAW_LOGGER.log(Level.WARNING, "Unexpected Error during 
TCPTransportProvider socket close", e);
+        }
+    }
+
     private void doRead(final ConnectionEndpoint conn, final ConnectionHandler 
handler, final InputStream inputStream)
     {
         byte[] buf = new byte[2<<15];

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java?rev=1612555&r1=1612554&r2=1612555&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java
 Tue Jul 22 12:18:33 2014
@@ -20,12 +20,10 @@
  */
 package org.apache.qpid.amqp_1_0.client;
 
-import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
+import javax.net.ssl.SSLContext;
+
 import org.apache.qpid.amqp_1_0.framing.ExceptionHandler;
 import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import org.apache.qpid.amqp_1_0.type.FrameBody;
-
-import javax.net.ssl.SSLContext;
 
 public interface TransportProvider
 {
@@ -34,4 +32,6 @@ public interface TransportProvider
                  int port,
                  SSLContext sslContext,
                  ExceptionHandler exceptionHandler) throws ConnectionException;
+
+    void close();
 }

Modified: 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java?rev=1612555&r1=1612554&r2=1612555&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
 Tue Jul 22 12:18:33 2014
@@ -20,6 +20,17 @@
  */
 package org.apache.qpid.amqp_1_0.framing;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.qpid.amqp_1_0.codec.FrameWriter;
 import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
 import org.apache.qpid.amqp_1_0.codec.ProtocolHeaderHandler;
@@ -27,25 +38,13 @@ import org.apache.qpid.amqp_1_0.codec.Va
 import org.apache.qpid.amqp_1_0.codec.ValueWriter;
 import org.apache.qpid.amqp_1_0.transport.BytesProcessor;
 import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-
 import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
 import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
 import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.transport.Open;
 import org.apache.qpid.amqp_1_0.type.Symbol;
 import org.apache.qpid.amqp_1_0.type.UnsignedShort;
 import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import org.apache.qpid.amqp_1_0.type.transport.Open;
 
 public class ConnectionHandler
 {
@@ -87,7 +86,7 @@ public class ConnectionHandler
 
     // ----------------------------------------------------------------
 
-    public static class FrameOutput<T> implements FrameOutputHandler<T>, 
FrameSource
+    public static class FrameOutput<T> implements FrameOutputHandler<T>
     {
 
         private static final ByteBuffer EMPTY_BYTEBUFFER = ByteBuffer.wrap(new 
byte[0]);
@@ -116,6 +115,39 @@ public class ConnectionHandler
             _conn = conn;
         }
 
+        public FrameSource asFrameSource()
+        {
+            return new FrameSource()
+            {
+                @Override
+                public AMQFrame getNextFrame(final boolean wait)
+                {
+                    return FrameOutput.this.getNextFrame(wait);
+                }
+
+                @Override
+                public boolean closed()
+                {
+                    return FrameOutput.this.closed();
+                }
+
+                @Override
+                public void close()
+                {
+                    FrameOutput.this.immediateClose();
+                }
+            };
+        }
+
+        private void immediateClose()
+        {
+            synchronized (_conn.getLock())
+            {
+                _closed = true;
+                _conn.getLock().notifyAll();
+            }
+        }
+
         public boolean canSend()
         {
             return _queue.remainingCapacity() != 0;
@@ -239,6 +271,8 @@ public class ConnectionHandler
     {
         AMQFrame<T> getNextFrame(boolean wait);
         boolean closed();
+
+        void close();
     }
 
 
@@ -246,6 +280,8 @@ public class ConnectionHandler
     {
         void getBytes(BytesProcessor processor, boolean wait);
         boolean closed();
+
+        void close();
     }
 
     public static class FrameToBytesSourceAdapter implements BytesSource
@@ -320,6 +356,12 @@ public class ConnectionHandler
         {
             return _buffer.position() == 0 && _frameSource.closed();
         }
+
+        @Override
+        public void close()
+        {
+            _frameSource.close();
+        }
     }
 
 
@@ -344,6 +386,11 @@ public class ConnectionHandler
         {
             return !_buffer.hasRemaining();
         }
+
+        @Override
+        public void close()
+        {
+        }
     }
 
     public static class SequentialBytesSource implements BytesSource
@@ -379,6 +426,19 @@ public class ConnectionHandler
         {
             return _sources.isEmpty();
         }
+
+        @Override
+        public void close()
+        {
+            BytesSource src = _sources.peek();
+            while (src != null)
+            {
+                src.close();
+                _sources.poll();
+                src = _sources.peek();
+            }
+
+        }
     }
 
 
@@ -420,6 +480,19 @@ public class ConnectionHandler
         {
             return _sources.isEmpty();
         }
+
+        @Override
+        public void close()
+        {
+            FrameSource src = _sources.peek();
+            while (src != null)
+            {
+                src.close();
+                _sources.poll();
+                src = _sources.peek();
+            }
+
+        }
     }
 
 
@@ -470,6 +543,7 @@ public class ConnectionHandler
             catch (IOException e)
             {
                 _closed = true;
+                _bytesSource.close();
                 _exceptionHandler.handleException(e);
             }
         }

Modified: 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1612555&r1=1612554&r2=1612555&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
 Tue Jul 22 12:18:33 2014
@@ -179,20 +179,27 @@ public class ConnectionEndpoint implemen
     {
         if (_requiresSASLClient)
         {
-            synchronized (getLock())
+            try
             {
-                while (!(_saslComplete || _closedForInput))
+                waitUntil(new Predicate()
                 {
-                    try
-                    {
-                        getLock().wait();
-                    }
-                    catch (InterruptedException e)
+
+                    @Override
+                    public boolean isSatisfied()
                     {
-                        e.printStackTrace();  //To change body of catch 
statement use File | Settings | File Templates.
+                        return _saslComplete || _closedForInput;
                     }
-                }
+                });
+            }
+            catch (TimeoutException e)
+            {
+                throw new RuntimeException("Could not connect - authentication 
error");
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
             }
+
             if (!_authenticated)
             {
                 throw new RuntimeException("Could not connect - authentication 
error");
@@ -471,6 +478,10 @@ public class ConnectionEndpoint implemen
                     }
                 }
             }
+            if(_connectionEventListener != null)
+            {
+                _connectionEventListener.closeReceived();
+            }
         }
         notifyAll();
     }
@@ -801,9 +812,9 @@ public class ConnectionEndpoint implemen
         return _describedTypeRegistry;
     }
 
-    public synchronized void setClosedForOutput(boolean b)
+    public synchronized void setClosedForOutput(boolean closed)
     {
-        _closedForOutput = true;
+        _closedForOutput = closed;
         notifyAll();
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to