Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java?rev=1143867&view=auto ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java (added) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java Thu Jul 7 15:10:30 2011 @@ -0,0 +1,149 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network.mina; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.filter.SSLFilter; +import org.apache.mina.util.SessionUtil; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.NetworkConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MinaNetworkHandler extends IoHandlerAdapter +{ + private static final Logger LOGGER = LoggerFactory.getLogger(MinaNetworkHandler.class); + + private ProtocolEngineFactory _factory; + private SSLContextFactory _sslFactory = null; + + static + { + boolean directBuffers = Boolean.getBoolean("amqj.enableDirectBuffers"); + LOGGER.debug("Using " + (directBuffers ? "direct" : "heap") + " buffers"); + ByteBuffer.setUseDirectBuffers(directBuffers); + + //override the MINA defaults to prevent use of the PooledByteBufferAllocator + ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + } + + public MinaNetworkHandler(SSLContextFactory sslFactory, ProtocolEngineFactory factory) + { + _sslFactory = sslFactory; + _factory = factory; + } + + public MinaNetworkHandler(SSLContextFactory sslFactory) + { + this(sslFactory, null); + } + + public void messageReceived(IoSession session, Object message) + { + ProtocolEngine engine = (ProtocolEngine) session.getAttachment(); + ByteBuffer buf = (ByteBuffer) message; + try + { + engine.received(buf.buf()); + } + catch (RuntimeException re) + { + engine.exception(re); + } + } + + public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception + { + ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment(); + if(engine != null) + { + LOGGER.error("Exception caught by Mina", throwable); + engine.exception(throwable); + } + else + { + LOGGER.error("Exception caught by Mina but without protocol engine to handle it", throwable); + } + } + + public void sessionCreated(IoSession ioSession) throws Exception + { + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("Created session: " + ioSession.getRemoteAddress()); + } + + SessionUtil.initialize(ioSession); + + if (_sslFactory != null) + { + ioSession.getFilterChain().addBefore("protocolFilter", "sslFilter", + new SSLFilter(_sslFactory.buildServerContext())); + } + + if (_factory != null) + { + NetworkConnection netConn = new MinaNetworkConnection(ioSession); + + ProtocolEngine engine = _factory.newProtocolEngine(netConn); + ioSession.setAttachment(engine); + } + } + + public void sessionClosed(IoSession ioSession) throws Exception + { + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("closed: " + ioSession.getRemoteAddress()); + } + + ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment(); + if(engine != null) + { + engine.closed(); + } + else + { + LOGGER.error("Unable to close ProtocolEngine as none was present"); + } + } + + + public void sessionIdle(IoSession session, IdleStatus status) throws Exception + { + if (IdleStatus.WRITER_IDLE.equals(status)) + { + ((ProtocolEngine) session.getAttachment()).writerIdle(); + } + else if (IdleStatus.READER_IDLE.equals(status)) + { + ((ProtocolEngine) session.getAttachment()).readerIdle(); + } + } + +}
Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java?rev=1143867&view=auto ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java (added) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java Thu Jul 7 15:10:30 2011 @@ -0,0 +1,250 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.mina; + +import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExecutorThreadModel; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoSession; +import org.apache.mina.transport.socket.nio.SocketAcceptor; +import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; +import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.mina.util.NewThreadExecutor; +import org.apache.mina.transport.vmpipe.QpidVmPipeConnector; +import org.apache.mina.transport.vmpipe.VmPipeAddress; + +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.thread.QpidThreadExecutor; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.SocketConnectorFactory; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.Transport; +import org.apache.qpid.transport.network.VMBrokerMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MinaNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport +{ + private static final int UNKNOWN = -1; + private static final int TCP = 0; + private static final int VM = 1; + + public NetworkConnection _connection; + private SocketAcceptor _acceptor; + private InetSocketAddress _address; + + public NetworkConnection connect(ConnectionSettings settings, + Receiver<java.nio.ByteBuffer> delegate, SSLContextFactory sslFactory) + { + int transport = getTransport(settings.getProtocol()); + + IoConnectorCreator stc; + switch(transport) + { + case TCP: + stc = new IoConnectorCreator(new SocketConnectorFactory() + { + public IoConnector newConnector() + { + return new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector + } + }); + _connection = stc.connect(delegate, settings, sslFactory); + break; + case VM: + stc = new IoConnectorCreator(new SocketConnectorFactory() + { + public IoConnector newConnector() + { + return new QpidVmPipeConnector(); + } + }); + _connection = stc.connect(delegate, settings, sslFactory); + break; + case UNKNOWN: + default: + throw new TransportException("Unknown protocol: " + settings.getProtocol()); + } + + return _connection; + } + + private static int getTransport(String transport) + { + if (transport.equals(Transport.TCP)) + { + return TCP; + } + + if (transport.equals(Transport.VM)) + { + return VM; + } + + return -1; + } + + public void close() + { + if(_connection != null) + { + _connection.close(); + } + if (_acceptor != null) + { + _acceptor.unbindAll(); + } + } + + public NetworkConnection getConnection() + { + return _connection; + } + + public void accept(final NetworkTransportConfiguration config, final ProtocolEngineFactory factory, + final SSLContextFactory sslFactory) + { + int processors = config.getConnectorProcessors(); + + if (Transport.TCP.equalsIgnoreCase(config.getTransport())) + { + _acceptor = new SocketAcceptor(processors, new NewThreadExecutor()); + + SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); + sconfig.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Acceptor)")); + SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); + sc.setTcpNoDelay(config.getTcpNoDelay()); + sc.setSendBufferSize(config.getSendBufferSize()); + sc.setReceiveBufferSize(config.getReceiveBufferSize()); + + if (config.getHost().equals(WILDCARD_ADDRESS)) + { + _address = new InetSocketAddress(config.getPort()); + } + else + { + _address = new InetSocketAddress(config.getHost(), config.getPort()); + } + } + else + { + throw new TransportException("Unknown transport: " + config.getTransport()); + } + + try + { + _acceptor.bind(_address, new MinaNetworkHandler(sslFactory, factory)); + } + catch (IOException e) + { + throw new TransportException("Could not bind to " + _address, e); + } + } + + + private static class IoConnectorCreator + { + private static final Logger LOGGER = LoggerFactory.getLogger(IoConnectorCreator.class); + + private static final int CLIENT_DEFAULT_BUFFER_SIZE = 32 * 1024; + + private SocketConnectorFactory _ioConnectorFactory; + + public IoConnectorCreator(SocketConnectorFactory socketConnectorFactory) + { + _ioConnectorFactory = socketConnectorFactory; + } + + public NetworkConnection connect(Receiver<java.nio.ByteBuffer> receiver, ConnectionSettings settings, SSLContextFactory sslFactory) + { + final IoConnector ioConnector = _ioConnectorFactory.newConnector(); + final SocketAddress address; + final String protocol = settings.getProtocol(); + final int port = settings.getPort(); + + if (Transport.TCP.equalsIgnoreCase(protocol)) + { + address = new InetSocketAddress(settings.getHost(), port); + } + else if(Transport.VM.equalsIgnoreCase(protocol)) + { + synchronized (VMBrokerMap.class) + { + if(!VMBrokerMap.contains(port)) + { + throw new TransportException("VM broker on port " + port + " does not exist."); + } + } + + address = new VmPipeAddress(port); + } + else + { + throw new TransportException("Unknown transport: " + protocol); + } + + LOGGER.info("Attempting connection to " + address); + + if (ioConnector instanceof SocketConnector) + { + SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); + cfg.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Client)")); + + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + scfg.setTcpNoDelay(true); + scfg.setSendBufferSize(CLIENT_DEFAULT_BUFFER_SIZE); + scfg.setReceiveBufferSize(CLIENT_DEFAULT_BUFFER_SIZE); + + // Don't have the connector's worker thread wait around for other + // connections (we only use one SocketConnector per connection + // at the moment anyway). This allows short-running + // clients (like unit tests) to complete quickly. + ((SocketConnector) ioConnector).setWorkerTimeout(0); + } + + ConnectFuture future = ioConnector.connect(address, new MinaNetworkHandler(sslFactory), ioConnector.getDefaultConfig()); + future.join(); + if (!future.isConnected()) + { + throw new TransportException("Could not open connection"); + } + + IoSession session = future.getSession(); + session.setAttachment(receiver); + + return new MinaNetworkConnection(session); + } + } +} Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java?rev=1143867&view=auto ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java (added) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java Thu Jul 7 15:10:30 2011 @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.mina; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.CloseFuture; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.WriteFuture; +import org.apache.qpid.transport.Sender; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MinaSender + */ +public class MinaSender implements Sender<java.nio.ByteBuffer> +{ + private static final Logger _log = LoggerFactory.getLogger(MinaSender.class); + + private final IoSession _session; + private WriteFuture _lastWrite; + + public MinaSender(IoSession session) + { + _session = session; + } + + public synchronized void send(java.nio.ByteBuffer msg) + { + _log.debug("sending data:"); + ByteBuffer mina = ByteBuffer.allocate(msg.limit()); + mina.put(msg); + mina.flip(); + _lastWrite = _session.write(mina); + _log.debug("sent data:"); + } + + public synchronized void flush() + { + if (_lastWrite != null) + { + _lastWrite.join(); + } + } + + public void close() + { + // MINA will sometimes throw away in-progress writes when you ask it to close + flush(); + CloseFuture closed = _session.close(); + closed.join(); + } + + public void setIdleTimeout(int i) + { + //TODO: + //We are instead using the setMax[Read|Write]IdleTime methods in + //MinaNetworkConnection for this. Should remove this method from + //sender interface, but currently being used by IoSender for 0-10. + } +} Copied: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/MockSender.java (from r1143866, qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java) URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/MockSender.java?p2=qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/MockSender.java&p1=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java&r1=1143866&r2=1143867&rev=1143867&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java (original) +++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/MockSender.java Thu Jul 7 15:10:30 2011 @@ -18,14 +18,30 @@ * under the License. * */ -package org.apache.qpid.protocol; +package org.apache.qpid.transport; -import org.apache.qpid.transport.NetworkDriver; +import java.nio.ByteBuffer; -public interface ProtocolEngineFactory -{ - - // Returns a new instance of a ProtocolEngine - ProtocolEngine newProtocolEngine(NetworkDriver networkDriver); - -} \ No newline at end of file +public class MockSender implements Sender<ByteBuffer> +{ + + public void setIdleTimeout(int i) + { + + } + + public void send(ByteBuffer msg) + { + + } + + public void flush() + { + + } + + public void close() + { + + } +} Copied: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java (from r1143866, qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java) URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java?p2=qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java&p1=qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java&r1=1143866&r2=1143867&rev=1143867&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java (original) +++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java Thu Jul 7 15:10:30 2011 @@ -25,32 +25,32 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.NetworkConnection; /** * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented, * so if this class is being used and some methods are to be used, then please update those. */ -public class TestNetworkDriver implements NetworkDriver +public class TestNetworkConnection implements NetworkConnection { - private final ConcurrentMap attributes = new ConcurrentHashMap(); private String _remoteHost = "127.0.0.1"; private String _localHost = "127.0.0.1"; private int _port = 1; private SocketAddress _localAddress = null; private SocketAddress _remoteAddress = null; + private final MockSender _sender; - public TestNetworkDriver() + public TestNetworkConnection() { + _sender = new MockSender(); } public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, - NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException + NetworkTransportConfiguration config, SSLContextFactory sslFactory) throws BindException { } @@ -65,7 +65,7 @@ public class TestNetworkDriver implement return (_remoteAddress != null) ? _remoteAddress : new InetSocketAddress(_remoteHost, _port); } - public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, + public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkTransportConfiguration config, SSLContextFactory sslFactory) throws OpenException { @@ -130,4 +130,9 @@ public class TestNetworkDriver implement { _remoteAddress = address; } + + public Sender<ByteBuffer> getSender() + { + return _sender; + } } Copied: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java (from r1143866, qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java) URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java?p2=qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java&p1=qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java&r1=1143866&r2=1143867&rev=1143867&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java (original) +++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java Thu Jul 7 15:10:30 2011 @@ -21,44 +21,58 @@ package org.apache.qpid.transport.network.mina; -import java.net.BindException; +import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; + import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import junit.framework.TestCase; - +import org.apache.mina.util.AvailablePortFinder; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.OpenException; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.Transport; -public class MINANetworkDriverTest extends TestCase +public class MinaNetworkHandlerTest extends QpidTestCase { private static final String TEST_DATA = "YHALOTHAR"; - private static int TEST_PORT = 2323; - private NetworkDriver _server; - private NetworkDriver _client; + private int _testPort; + private IncomingNetworkTransport _server; + private OutgoingNetworkTransport _client; private CountingProtocolEngine _countingEngine; // Keeps a count of how many bytes it's read private Exception _thrownEx; + private ConnectionSettings _clientSettings; + private NetworkConnection _network; + private TestNetworkTransportConfiguration _brokerSettings; @Override - public void setUp() + public void setUp() throws Exception { - _server = new MINANetworkDriver(); - _client = new MINANetworkDriver(); + String host = InetAddress.getLocalHost().getHostName(); + _testPort = AvailablePortFinder.getNextAvailable(10000); + + _clientSettings = new ConnectionSettings(); + _clientSettings.setHost(host); + _clientSettings.setPort(_testPort); + + _brokerSettings = new TestNetworkTransportConfiguration(_testPort, host); + + _server = new MinaNetworkTransport(); + _client = new MinaNetworkTransport(); _thrownEx = null; _countingEngine = new CountingProtocolEngine(); - // increment the port to prevent tests clashing with each other when - // the port is in TIMED_WAIT state. - TEST_PORT++; } @Override @@ -78,46 +92,40 @@ public class MINANetworkDriverTest exten /** * Tests that a socket can't be opened if a driver hasn't been bound * to the port and can be opened if a driver has been bound. - * @throws BindException - * @throws UnknownHostException - * @throws OpenException */ - public void testBindOpen() throws BindException, UnknownHostException, OpenException + public void testBindOpen() throws Exception { try { - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _client.connect(_clientSettings, _countingEngine, null); } - catch (OpenException e) + catch (TransportException e) { _thrownEx = e; } assertNotNull("Open should have failed since no engine bound", _thrownEx); - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + _server.accept(_brokerSettings, null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _client.connect(_clientSettings, _countingEngine, null); } /** * Tests that a socket can't be opened after a bound NetworkDriver has been closed - * @throws BindException - * @throws UnknownHostException - * @throws OpenException */ - public void testBindOpenCloseOpen() throws BindException, UnknownHostException, OpenException + public void testBindOpenCloseOpen() throws Exception { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _client.connect(_clientSettings, _countingEngine, null); _client.close(); _server.close(); try { - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _client.connect(_clientSettings, _countingEngine, null); } - catch (OpenException e) + catch (TransportException e) { _thrownEx = e; } @@ -132,43 +140,60 @@ public class MINANetworkDriverTest exten { try { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); } - catch (BindException e) + catch (TransportException e) { fail("First bind should not fail"); } try { - _client.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + IncomingNetworkTransport second = new MinaNetworkTransport(); + second.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); } - catch (BindException e) + catch (TransportException e) { _thrownEx = e; } assertNotNull("Second bind should throw BindException", _thrownEx); - } - + } + + /** + * Tests that binding to the wildcard address succeeds and a client can + * connect via localhost. + */ + public void testWildcardBind() throws Exception + { + TestNetworkTransportConfiguration serverSettings = + new TestNetworkTransportConfiguration(_testPort, WILDCARD_ADDRESS); + + _server.accept(serverSettings, null, null); + + try + { + _client.connect(_clientSettings, _countingEngine, null); + } + catch (TransportException e) + { + fail("Open should have succeeded since we used a wildcard bind"); + } + } + /** * tests that bytes sent on a network driver are received at the other end - * - * @throws UnknownHostException - * @throws OpenException - * @throws InterruptedException - * @throws BindException */ - public void testSend() throws UnknownHostException, OpenException, InterruptedException, BindException + public void testSend() throws Exception { // Open a connection from a counting engine to an echo engine - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _network = _client.connect(_clientSettings, _countingEngine, null); // Tell the counting engine how much data we're sending _countingEngine.setNewLatch(TEST_DATA.getBytes().length); // Send the data and wait for up to 2 seconds to get it back - _client.send(ByteBuffer.wrap(TEST_DATA.getBytes())); + _network.getSender().send(ByteBuffer.wrap(TEST_DATA.getBytes())); _countingEngine.getLatch().await(2, TimeUnit.SECONDS); // Check what we got @@ -177,36 +202,30 @@ public class MINANetworkDriverTest exten /** * Opens a connection with a low read idle and check that it gets triggered - * @throws BindException - * @throws OpenException - * @throws UnknownHostException * */ - public void testSetReadIdle() throws BindException, UnknownHostException, OpenException + public void testSetReadIdle() throws Exception { // Open a connection from a counting engine to an echo engine - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _network = _client.connect(_clientSettings, _countingEngine, null); assertFalse("Reader should not have been idle", _countingEngine.getReaderHasBeenIdle()); - _client.setMaxReadIdle(1); + _network.setMaxReadIdle(1); sleepForAtLeast(1500); assertTrue("Reader should have been idle", _countingEngine.getReaderHasBeenIdle()); } /** * Opens a connection with a low write idle and check that it gets triggered - * @throws BindException - * @throws OpenException - * @throws UnknownHostException * */ - public void testSetWriteIdle() throws BindException, UnknownHostException, OpenException + public void testSetWriteIdle() throws Exception { // Open a connection from a counting engine to an echo engine - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _network = _client.connect(_clientSettings, _countingEngine, null); assertFalse("Reader should not have been idle", _countingEngine.getWriterHasBeenIdle()); - _client.setMaxWriteIdle(1); + _network.setMaxWriteIdle(1); sleepForAtLeast(1500); assertTrue("Reader should have been idle", _countingEngine.getWriterHasBeenIdle()); } @@ -216,16 +235,13 @@ public class MINANetworkDriverTest exten * Creates and then closes a connection from client to server and checks that the server * has its closed() method called. Then creates a new client and closes the server to check * that the client has its closed() method called. - * @throws BindException - * @throws UnknownHostException - * @throws OpenException */ - public void testClosed() throws BindException, UnknownHostException, OpenException + public void testClosed() throws Exception { // Open a connection from a counting engine to an echo engine EchoProtocolEngineSingletonFactory factory = new EchoProtocolEngineSingletonFactory(); - _server.bind(TEST_PORT, null, factory, null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _server.accept(_brokerSettings, factory, null); + _network = _client.connect(_clientSettings, _countingEngine, null); EchoProtocolEngine serverEngine = null; while (serverEngine == null) { @@ -253,7 +269,7 @@ public class MINANetworkDriverTest exten } assertTrue("Server should have been closed", serverEngine.getClosed()); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _client.connect(_clientSettings, _countingEngine, null); _countingEngine.setClosed(false); assertFalse("Client should not have been closed", _countingEngine.getClosed()); _countingEngine.setNewLatch(1); @@ -271,22 +287,18 @@ public class MINANetworkDriverTest exten /** * Create a connection and instruct the client to throw an exception when it gets some data * and that the latch gets counted down. - * @throws BindException - * @throws UnknownHostException - * @throws OpenException - * @throws InterruptedException */ - public void testExceptionCaught() throws BindException, UnknownHostException, OpenException, InterruptedException + public void testExceptionCaught() throws Exception { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _network = _client.connect(_clientSettings, _countingEngine, null); assertEquals("Exception should not have been thrown", 1, _countingEngine.getExceptionLatch().getCount()); _countingEngine.setErrorOnNextRead(true); _countingEngine.setNewLatch(TEST_DATA.getBytes().length); - _client.send(ByteBuffer.wrap(TEST_DATA.getBytes())); + _network.getSender().send(ByteBuffer.wrap(TEST_DATA.getBytes())); _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS); assertEquals("Exception should have been thrown", 0, _countingEngine.getExceptionLatch().getCount()); @@ -294,28 +306,24 @@ public class MINANetworkDriverTest exten /** * Opens a connection and checks that the remote address is the one that was asked for - * @throws BindException - * @throws UnknownHostException - * @throws OpenException */ - public void testGetRemoteAddress() throws BindException, UnknownHostException, OpenException + public void testGetRemoteAddress() throws Exception { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), TEST_PORT), - _client.getRemoteAddress()); + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _network = _client.connect(_clientSettings, _countingEngine, null); + assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), _testPort), + _network.getRemoteAddress()); } private class EchoProtocolEngineSingletonFactory implements ProtocolEngineFactory { - EchoProtocolEngine _engine = null; + private EchoProtocolEngine _engine = null; - public ProtocolEngine newProtocolEngine(NetworkDriver driver) + public ProtocolEngine newProtocolEngine(NetworkConnection network) { if (_engine == null) { - _engine = new EchoProtocolEngine(); - _engine.setNetworkDriver(driver); + _engine = new EchoProtocolEngine(network); } return getEngine(); } @@ -328,8 +336,6 @@ public class MINANetworkDriverTest exten public class CountingProtocolEngine implements ProtocolEngine { - - protected NetworkDriver _driver; public ArrayList<ByteBuffer> _receivedBytes = new ArrayList<ByteBuffer>(); private int _readBytes; private CountDownLatch _latch = new CountDownLatch(0); @@ -362,26 +368,12 @@ public class MINANetworkDriverTest exten public SocketAddress getRemoteAddress() { - if (_driver != null) - { - return _driver.getRemoteAddress(); - } - else - { - return null; - } + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - if (_driver != null) - { - return _driver.getLocalAddress(); - } - else - { - return null; - } + return _network.getLocalAddress(); } public long getWrittenBytes() @@ -394,11 +386,6 @@ public class MINANetworkDriverTest exten _readerHasBeenIdle = true; } - public void setNetworkDriver(NetworkDriver driver) - { - _driver = driver; - } - public void writeFrame(AMQDataBlock frame) { @@ -465,12 +452,18 @@ public class MINANetworkDriverTest exten private class EchoProtocolEngine extends CountingProtocolEngine { + private NetworkConnection _echoNetwork; + + public EchoProtocolEngine(NetworkConnection network) + { + _echoNetwork = network; + } public void received(ByteBuffer msg) { super.received(msg); msg.rewind(); - _driver.send(msg); + _echoNetwork.getSender().send(msg); } } @@ -491,4 +484,52 @@ public class MINANetworkDriverTest exten timeLeft = period - (System.currentTimeMillis() - start); } } + + private static class TestNetworkTransportConfiguration implements NetworkTransportConfiguration + { + private int _port; + private String _host; + + public TestNetworkTransportConfiguration(final int port, final String host) + { + _port = port; + _host = host; + } + + public Boolean getTcpNoDelay() + { + return true; + } + + public Integer getReceiveBufferSize() + { + return 32768; + } + + public Integer getSendBufferSize() + { + return 32768; + } + + public Integer getPort() + { + return _port; + } + + public String getHost() + { + return _host; + } + + public String getTransport() + { + return Transport.TCP; + } + + public Integer getConnectorProcessors() + { + return 4; + } + + } } \ No newline at end of file Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java?rev=1143867&r1=1143866&r2=1143867&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java Thu Jul 7 15:10:30 2011 @@ -31,21 +31,21 @@ import org.apache.qpid.client.protocol.A import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.transport.TestNetworkDriver; +import org.apache.qpid.transport.TestNetworkConnection; public class AMQProtocolSessionTest extends QpidBrokerTestCase { - private static class AMQProtSession extends AMQProtocolSession + private static class TestProtocolSession extends AMQProtocolSession { - public AMQProtSession(AMQProtocolHandler protocolHandler, AMQConnection connection) + public TestProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) { super(protocolHandler,connection); } - public TestNetworkDriver getNetworkDriver() + public TestNetworkConnection getNetworkConnection() { - return (TestNetworkDriver) _protocolHandler.getNetworkDriver(); + return (TestNetworkConnection) _protocolHandler.getNetworkConnection(); } public AMQShortString genQueueName() @@ -54,7 +54,7 @@ public class AMQProtocolSessionTest exte } } - private AMQProtSession _testSession; + private TestProtocolSession _testSession; protected void setUp() throws Exception { @@ -62,10 +62,10 @@ public class AMQProtocolSessionTest exte AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con); - protocolHandler.setNetworkDriver(new TestNetworkDriver()); - + protocolHandler.setNetworkConnection(new TestNetworkConnection()); + //don't care about the values set here apart from the dummy IoSession - _testSession = new AMQProtSession(protocolHandler , con); + _testSession = new TestProtocolSession(protocolHandler , con); } public void testTemporaryQueueWildcard() throws UnknownHostException @@ -100,7 +100,7 @@ public class AMQProtocolSessionTest exte private void checkTempQueueName(SocketAddress address, String queueName) { - _testSession.getNetworkDriver().setLocalAddress(address); + _testSession.getNetworkConnection().setLocalAddress(address); assertEquals("Wrong queue name", queueName, _testSession.genQueueName().asString()); } } Modified: qpid/trunk/qpid/java/test-profiles/Excludes URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Excludes?rev=1143867&r1=1143866&r2=1143867&view=diff ============================================================================== --- qpid/trunk/qpid/java/test-profiles/Excludes (original) +++ qpid/trunk/qpid/java/test-profiles/Excludes Thu Jul 7 15:10:30 2011 @@ -50,4 +50,3 @@ org.apache.qpid.test.unit.ack.Acknowledg // QPID-2418 : The queue backing the dur sub is not currently deleted at subscription change, so the test will fail. org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart - Modified: qpid/trunk/qpid/java/test-profiles/Java010Excludes URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Java010Excludes?rev=1143867&r1=1143866&r2=1143867&view=diff ============================================================================== --- qpid/trunk/qpid/java/test-profiles/Java010Excludes (original) +++ qpid/trunk/qpid/java/test-profiles/Java010Excludes Thu Jul 7 15:10:30 2011 @@ -77,6 +77,5 @@ org.apache.qpid.test.unit.client.channel //Temporarily adding the following until the issues are sorted out. //Should probably raise JIRAs for them. -org.apache.qpid.transport.network.mina.MINANetworkDriverTest#* org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchange org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMode --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:[email protected]
