Author: robbie
Date: Thu Jul 7 15:09:14 2011
New Revision: 1143866
URL: http://svn.apache.org/viewvc?rev=1143866&view=rev
Log:
QPID-3342: rationalise the existing 0-10 transport code and introduce new
NetworkTransport + NetworkConnection abstraction. Decouple IoSender and
IoReceiver, initiate their threads after the constructor completes.
Applied patch by Keith Wall and myself
Added:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
- copied, changed from r1143865,
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
- copied, changed from r1143865,
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/
qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
- copied, changed from r1143865,
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
Removed:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1143866&r1=1143865&r2=1143866&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
Thu Jul 7 15:09:14 2011
@@ -27,6 +27,7 @@ import static org.apache.qpid.transport.
import static org.apache.qpid.transport.Connection.State.OPENING;
import static org.apache.qpid.transport.Connection.State.RESUMING;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -40,6 +41,12 @@ import java.util.concurrent.atomic.Atomi
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslServer;
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.io.IoNetworkTransport;
import org.apache.qpid.transport.network.security.SecurityLayer;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
@@ -235,13 +242,15 @@ public class Connection extends Connecti
state = OPENING;
userID = settings.getUsername();
delegate = new ClientDelegate(settings);
-
- TransportBuilder transport = new TransportBuilder();
- transport.init(this);
- this.sender = transport.buildSenderPipe();
- transport.buildReceiverPipe(this);
- this.securityLayer = transport.getSecurityLayer();
-
+
+ securityLayer = new SecurityLayer();
+ securityLayer.init(this);
+
+ OutgoingNetworkTransport transport = new IoNetworkTransport();
+ Receiver<ByteBuffer> receiver = securityLayer.receiver(new
InputHandler(new Assembler(this)));
+ NetworkConnection network = transport.connect(settings, receiver,
null);
+ sender = new
Disassembler(securityLayer.sender(network.getSender()),
settings.getMaxFrameSize());
+
send(new ProtocolHeader(1, 0, 10));
Waiter w = new Waiter(lock, timeout);
Copied:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
(from r1143865,
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java)
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java?p2=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java&p1=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java&r1=1143865&r2=1143866&rev=1143866&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
Thu Jul 7 15:09:14 2011
@@ -1,5 +1,5 @@
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,29 +7,37 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
-package org.apache.qpid.transport.network.io;
+package org.apache.qpid.transport.network;
-import java.net.Socket;
+import java.net.SocketAddress;
import java.nio.ByteBuffer;
import org.apache.qpid.transport.Sender;
-public interface IoContext
+public interface NetworkConnection
{
Sender<ByteBuffer> getSender();
-
- IoReceiver getReceiver();
- Socket getSocket();
-}
+ void close();
+
+ /**
+ * Returns the remote address of the underlying socket.
+ */
+ SocketAddress getRemoteAddress();
+
+ /**
+ * Returns the local address of the underlying socket.
+ */
+ SocketAddress getLocalAddress();
+}
\ No newline at end of file
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java?rev=1143866&r1=1143865&r2=1143866&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
Thu Jul 7 15:09:14 2011
@@ -20,19 +20,9 @@
*/
package org.apache.qpid.transport.network;
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.ConnectionSettings;
-
public interface NetworkTransport
{
- public void init(ConnectionSettings settings);
-
- public Sender<ByteBuffer> sender();
-
- public void receiver(Receiver<ByteBuffer> delegate);
-
public void close();
-}
\ No newline at end of file
+
+ public NetworkConnection getConnection();
+}
Copied:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
(from r1143865,
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java)
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java?p2=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java&p1=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java&r1=1143865&r2=1143866&rev=1143866&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
Thu Jul 7 15:09:14 2011
@@ -22,17 +22,11 @@ package org.apache.qpid.transport.networ
import java.nio.ByteBuffer;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.Receiver;
-public interface NetworkTransport
+public interface OutgoingNetworkTransport extends NetworkTransport
{
- public void init(ConnectionSettings settings);
-
- public Sender<ByteBuffer> sender();
-
- public void receiver(Receiver<ByteBuffer> delegate);
-
- public void close();
+ public NetworkConnection connect(ConnectionSettings settings,
Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory);
}
\ No newline at end of file
Added:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java?rev=1143866&view=auto
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
(added)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
Thu Jul 7 15:09:14 2011
@@ -0,0 +1,82 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IoNetworkConnection implements NetworkConnection
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoNetworkConnection.class);
+ private final Socket _socket;
+ private final long _timeout;
+ private final IoSender _ioSender;
+ private final IoReceiver _ioReceiver;
+
+ public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
+ int sendBufferSize, int receiveBufferSize, long timeout)
+ {
+ _socket = socket;
+ _timeout = timeout;
+
+ _ioReceiver = new IoReceiver(_socket, delegate,
receiveBufferSize,_timeout);
+ _ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout);
+ _ioSender.registerCloseListener(_ioReceiver);
+
+ _ioReceiver.initiate();
+ _ioSender.initiate();
+ }
+
+ public Sender<ByteBuffer> getSender()
+ {
+ return _ioSender;
+ }
+
+ public void close()
+ {
+ try
+ {
+ _ioSender.close();
+ }
+ finally
+ {
+ _ioReceiver.close(false);
+ }
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _socket.getRemoteSocketAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _socket.getLocalSocketAddress();
+ }
+
+}
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1143866&r1=1143865&r2=1143866&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
Thu Jul 7 15:09:14 2011
@@ -27,14 +27,15 @@ import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
+import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.NetworkTransport;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
import org.apache.qpid.transport.util.Logger;
-public class IoNetworkTransport implements NetworkTransport, IoContext
+public class IoNetworkTransport implements OutgoingNetworkTransport
{
static
{
@@ -44,34 +45,31 @@ public class IoNetworkTransport implemen
(Boolean.getBoolean("amqj.enableDirectBuffers"));
}
- private static final Logger log = Logger.get(IoNetworkTransport.class);
+ private static final Logger LOGGER = Logger.get(IoNetworkTransport.class);
- private Socket socket;
- private Sender<ByteBuffer> sender;
- private IoReceiver receiver;
- private long timeout = 60000;
- private ConnectionSettings settings;
+ private Socket _socket;
+ private IoNetworkConnection _connection;
+ private long _timeout = 60000;
- public void init(ConnectionSettings settings)
+ public NetworkConnection connect(ConnectionSettings settings,
Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory)
{
+ int sendBufferSize = settings.getWriteBufferSize();
+ int receiveBufferSize = settings.getReadBufferSize();
+
try
{
- this.settings = settings;
- InetAddress address = InetAddress.getByName(settings.getHost());
- socket = new Socket();
- socket.setReuseAddress(true);
- socket.setTcpNoDelay(settings.isTcpNodelay());
-
- log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
- log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
+ _socket = new Socket();
+ _socket.setReuseAddress(true);
+ _socket.setTcpNoDelay(settings.isTcpNodelay());
+ _socket.setSendBufferSize(sendBufferSize);
+ _socket.setReceiveBufferSize(receiveBufferSize);
- socket.setSendBufferSize(settings.getWriteBufferSize());
- socket.setReceiveBufferSize(settings.getReadBufferSize());
+ LOGGER.debug("SO_RCVBUF : %s", _socket.getReceiveBufferSize());
+ LOGGER.debug("SO_SNDBUF : %s", _socket.getSendBufferSize());
- log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
- log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
+ InetAddress address = InetAddress.getByName(settings.getHost());
- socket.connect(new InetSocketAddress(address, settings.getPort()));
+ _socket.connect(new InetSocketAddress(address,
settings.getPort()));
}
catch (SocketException e)
{
@@ -81,36 +79,35 @@ public class IoNetworkTransport implemen
{
throw new TransportException("Error connecting to broker", e);
}
- }
- public void receiver(Receiver<ByteBuffer> delegate)
- {
- receiver = new IoReceiver(this, delegate,
- 2*settings.getReadBufferSize() , timeout);
- }
-
- public Sender<ByteBuffer> sender()
- {
- return new IoSender(this, 2*settings.getWriteBufferSize(), timeout);
- }
+ try
+ {
+ _connection = new IoNetworkConnection(_socket, delegate,
sendBufferSize, receiveBufferSize, _timeout);
+ }
+ catch(Exception e)
+ {
+ try
+ {
+ _socket.close();
+ }
+ catch(IOException ioe)
+ {
+ //ignored, throw based on original exception
+ }
- public void close()
- {
-
- }
+ throw new TransportException("Error creating network connection",
e);
+ }
- public Sender<ByteBuffer> getSender()
- {
- return sender;
+ return _connection;
}
- public IoReceiver getReceiver()
+ public void close()
{
- return receiver;
+ _connection.close();
}
- public Socket getSocket()
+ public NetworkConnection getConnection()
{
- return socket;
+ return _connection;
}
}
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=1143866&r1=1143865&r2=1143866&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
Thu Jul 7 15:09:14 2011
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.transport.network.io;
+import org.apache.qpid.common.Closeable;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
@@ -37,43 +38,54 @@ import java.util.concurrent.atomic.Atomi
*
*/
-final class IoReceiver implements Runnable
+final class IoReceiver implements Runnable, Closeable
{
private static final Logger log = Logger.get(IoReceiver.class);
- private final IoContext ioCtx;
private final Receiver<ByteBuffer> receiver;
private final int bufferSize;
private final Socket socket;
private final long timeout;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread receiverThread;
- private final boolean shutdownBroken =
- ((String)
System.getProperties().get("os.name")).matches("(?i).*windows.*");
+ private static final boolean shutdownBroken;
+ static
+ {
+ String osName = System.getProperty("os.name");
+ shutdownBroken = osName == null ? false :
osName.matches("(?i).*windows.*");
+ }
- public IoReceiver(IoContext ioCtx, Receiver<ByteBuffer> receiver,
- int bufferSize, long timeout)
+ public IoReceiver(Socket socket, Receiver<ByteBuffer> receiver, int
bufferSize, long timeout)
{
- this.ioCtx = ioCtx;
this.receiver = receiver;
this.bufferSize = bufferSize;
- this.socket = ioCtx.getSocket();
+ this.socket = socket;
this.timeout = timeout;
try
{
+ //Create but deliberately don't start the thread.
receiverThread = Threading.getThreadFactory().createThread(this);
}
catch(Exception e)
{
- throw new Error("Error creating IOReceiver thread",e);
+ throw new RuntimeException("Error creating IOReceiver thread",e);
}
receiverThread.setDaemon(true);
receiverThread.setName(String.format("IoReceiver - %s",
socket.getRemoteSocketAddress()));
+ }
+
+ public void initiate()
+ {
receiverThread.start();
}
+ public void close()
+ {
+ close(false);
+ }
+
void close(boolean block)
{
if (!closed.getAndSet(true))
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=1143866&r1=1143865&r2=1143866&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
Thu Jul 7 15:09:14 2011
@@ -24,8 +24,11 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.qpid.common.Closeable;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.SenderException;
@@ -43,7 +46,6 @@ public final class IoSender implements R
// we can test other cases as well
private final static int START = Integer.MAX_VALUE - 10;
- private final IoContext ioCtx;
private final long timeout;
private final Socket socket;
private final OutputStream out;
@@ -56,14 +58,13 @@ public final class IoSender implements R
private final Object notEmpty = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread senderThread;
+ private final List<Closeable> _listeners = new ArrayList<Closeable>();
private volatile Throwable exception = null;
-
- public IoSender(IoContext ioCtx, int bufferSize, long timeout)
+ public IoSender(Socket socket, int bufferSize, long timeout)
{
- this.ioCtx = ioCtx;
- this.socket = ioCtx.getSocket();
+ this.socket = socket;
this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a
power of 2
this.timeout = timeout;
@@ -78,6 +79,7 @@ public final class IoSender implements R
try
{
+ //Create but deliberately don't start the thread.
senderThread = Threading.getThreadFactory().createThread(this);
}
catch(Exception e)
@@ -87,6 +89,10 @@ public final class IoSender implements R
senderThread.setDaemon(true);
senderThread.setName(String.format("IoSender - %s",
socket.getRemoteSocketAddress()));
+ }
+
+ public void initiate()
+ {
senderThread.start();
}
@@ -204,16 +210,20 @@ public final class IoSender implements R
senderThread.join(timeout);
if (senderThread.isAlive())
{
+ log.error("join timed out");
throw new SenderException("join timed out");
}
}
- ioCtx.getReceiver().close(false);
}
catch (InterruptedException e)
{
+ log.error("interrupted whilst waiting for sender thread to
stop");
throw new SenderException(e);
}
-
+ finally
+ {
+ closeListeners();
+ }
if (reportException && exception != null)
{
throw new SenderException(exception);
@@ -221,6 +231,28 @@ public final class IoSender implements R
}
}
+ private void closeListeners()
+ {
+ Exception ex = null;
+ for(Closeable listener : _listeners)
+ {
+ try
+ {
+ listener.close();
+ }
+ catch(Exception e)
+ {
+ log.error("Exception closing listener: " + e.getMessage());
+ ex = e;
+ }
+ }
+
+ if (ex != null)
+ {
+ throw new SenderException(ex.getMessage(), ex);
+ }
+ }
+
public void run()
{
final int size = buffer.length;
@@ -304,4 +336,9 @@ public final class IoSender implements R
throw new SenderException(e);
}
}
+
+ public void registerCloseListener(Closeable listener)
+ {
+ _listeners.add(listener);
+ }
}
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=1143866&r1=1143865&r2=1143866&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
Thu Jul 7 15:09:14 2011
@@ -42,7 +42,7 @@ import org.apache.qpid.transport.util.Lo
* SO_RCVBUF - amqj.receiveBufferSize
* SO_SNDBUF - amqj.sendBufferSize
*/
-public final class IoTransport<E> implements IoContext
+public final class IoTransport<E>
{
static
@@ -70,44 +70,63 @@ public final class IoTransport<E> implem
IoTransport(Socket socket, Binding<E,ByteBuffer> binding, boolean ssl)
{
this.socket = socket;
-
+
if (ssl)
{
- SSLEngine engine = null;
- SSLContext sslCtx;
- try
- {
- sslCtx = createSSLContext();
- }
- catch (Exception e)
- {
- throw new TransportException("Error creating SSL Context", e);
- }
-
- try
- {
- engine = sslCtx.createSSLEngine();
- engine.setUseClientMode(true);
- }
- catch(Exception e)
- {
- throw new TransportException("Error creating SSL Engine", e);
- }
-
- this.sender = new SSLSender(engine,new IoSender(this,
2*writeBufferSize, timeout));
- this.endpoint = binding.endpoint(sender);
- this.receiver = new IoReceiver(this, new
SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender),
- 2*readBufferSize, timeout);
-
- log.info("SSL Sender and Receiver initiated");
+ setupSSLTransport(socket, binding);
}
else
{
- this.sender = new IoSender(this, 2*writeBufferSize, timeout);
- this.endpoint = binding.endpoint(sender);
- this.receiver = new IoReceiver(this, binding.receiver(endpoint),
- 2*readBufferSize, timeout);
+ setupTransport(socket, binding);
+ }
+ }
+
+ private void setupTransport(Socket socket, Binding<E, ByteBuffer> binding)
+ {
+ IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout);
+ ios.initiate();
+
+ this.sender = ios;
+ this.endpoint = binding.endpoint(sender);
+ this.receiver = new IoReceiver(socket, binding.receiver(endpoint),
+ 2*readBufferSize, timeout);
+ this.receiver.initiate();
+
+ ios.registerCloseListener(this.receiver);
+ }
+
+ private void setupSSLTransport(Socket socket, Binding<E, ByteBuffer>
binding)
+ {
+ SSLEngine engine = null;
+ SSLContext sslCtx;
+ try
+ {
+ sslCtx = createSSLContext();
+ }
+ catch (Exception e)
+ {
+ throw new TransportException("Error creating SSL Context", e);
+ }
+
+ try
+ {
+ engine = sslCtx.createSSLEngine();
+ engine.setUseClientMode(true);
}
+ catch(Exception e)
+ {
+ throw new TransportException("Error creating SSL Engine", e);
+ }
+ IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout);
+ ios.initiate();
+ this.sender = new SSLSender(engine,ios);
+ this.endpoint = binding.endpoint(sender);
+ this.receiver = new IoReceiver(socket, new
SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender),
+ 2*readBufferSize, timeout);
+ this.receiver.initiate();
+ ios.registerCloseListener(this.receiver);
+
+ log.info("SSL Sender and Receiver initiated");
}
public Sender<ByteBuffer> getSender()
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java?rev=1143866&r1=1143865&r2=1143866&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
Thu Jul 7 15:09:14 2011
@@ -43,8 +43,7 @@ public class SASLSender extends SASLEncr
this.delegate = delegate;
log.debug("SASL Sender enabled");
}
-
- @Override
+
public void close()
{
@@ -65,13 +64,11 @@ public class SASLSender extends SASLEncr
}
}
- @Override
public void flush()
{
delegate.flush();
}
- @Override
public void send(ByteBuffer buf)
{
if (closed.get())
@@ -108,7 +105,6 @@ public class SASLSender extends SASLEncr
}
}
- @Override
public void setIdleTimeout(int i)
{
delegate.setIdleTimeout(i);
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java?rev=1143866&r1=1143865&r2=1143866&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
Thu Jul 7 15:09:14 2011
@@ -48,51 +48,45 @@ public class QpidClientX509KeyManager ex
kmf.init(ks, keyStorePassword.toCharArray());
this.delegate = (X509ExtendedKeyManager)kmf.getKeyManagers()[0];
}
-
- @Override
+
public String chooseClientAlias(String[] keyType, Principal[] issuers,
Socket socket)
{
log.debug("chooseClientAlias:Returning alias " + alias);
return alias;
}
- @Override
public String chooseServerAlias(String keyType, Principal[] issuers,
Socket socket)
{
return delegate.chooseServerAlias(keyType, issuers, socket);
}
- @Override
public X509Certificate[] getCertificateChain(String alias)
{
return delegate.getCertificateChain(alias);
}
- @Override
public String[] getClientAliases(String keyType, Principal[] issuers)
{
log.debug("getClientAliases:Returning alias " + alias);
return new String[]{alias};
}
- @Override
public PrivateKey getPrivateKey(String alias)
{
return delegate.getPrivateKey(alias);
}
- @Override
public String[] getServerAliases(String keyType, Principal[] issuers)
{
return delegate.getServerAliases(keyType, issuers);
}
-
+
public String chooseEngineClientAlias(String[] keyType, Principal[]
issuers, SSLEngine engine)
{
log.debug("chooseEngineClientAlias:Returning alias " + alias);
return alias;
}
-
+
public String chooseEngineServerAlias(String keyType, Principal[] issuers,
SSLEngine engine)
{
return delegate.chooseEngineServerAlias(keyType, issuers, engine);
Copied:
qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
(from r1143865,
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java)
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java?p2=qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java&p1=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java&r1=1143865&r2=1143866&rev=1143866&view=diff
==============================================================================
(empty)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]