Author: aidan
Date: Tue Aug 18 16:16:10 2009
New Revision: 805477
URL: http://svn.apache.org/viewvc?rev=805477&view=rev
Log:
QPID-2024: Add ProtocolEngine and NetworkDriver interfaces and a NetworkDriver
implementation that uses MINA.
Added:
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
- copied, changed from r805454,
qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/
qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/
qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
Removed:
qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java
Modified:
qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
Modified:
qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=805477&r1=805476&r2=805477&view=diff
==============================================================================
---
qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
(original)
+++
qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
Tue Aug 18 16:16:10 2009
@@ -31,6 +31,7 @@
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.thread.QpidThreadExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Added:
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java?rev=805477&view=auto
==============================================================================
---
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
(added)
+++
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
Tue Aug 18 16:16:10 2009
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import java.net.SocketAddress;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.Receiver;
+
+/**
+ * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data
passed to it in the received
+ * decodes it and then process the result.
+ */
+public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
+{
+ // Sets the network driver providing data for this ProtocolEngine
+ void setNetworkDriver (NetworkDriver driver);
+
+ // Returns the remote address of the NetworkDriver
+ SocketAddress getRemoteAddress();
+
+ // Returns number of bytes written
+ long getWrittenBytes();
+
+ // Returns number of bytes read
+ long getReadBytes();
+
+ // Called by the NetworkDriver when the socket has been closed for reading
+ void closed();
+
+ // Called when the NetworkEngine has not written data for the specified
period of time (will trigger a
+ // heartbeat)
+ void writerIdle();
+
+ // Called when the NetworkEngine has not read data for the specified period
of time (will close the connection)
+ void readerIdle();
+
+ /**
+ * Accepts an AMQFrame for writing to the network. The ProtocolEngine
encodes the frame into bytes and
+ * passes the data onto the NetworkDriver for sending
+ */
+ void writeFrame(AMQDataBlock frame);
+}
\ No newline at end of file
Added:
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java?rev=805477&view=auto
==============================================================================
---
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
(added)
+++
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
Tue Aug 18 16:16:10 2009
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+public interface ProtocolEngineFactory
+{
+
+ // Returns a new instance of a ProtocolEngine
+ ProtocolEngine newProtocolEngine();
+
+}
\ No newline at end of file
Copied:
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
(from r805454,
qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java)
URL:
http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java?p2=qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java&p1=qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java&r1=805454&r2=805477&rev=805477&view=diff
==============================================================================
---
qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java
(original)
+++
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
Tue Aug 18 16:16:10 2009
@@ -1,4 +1,25 @@
-package org.apache.qpid.client.transport;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.thread;
import org.apache.qpid.thread.Threading;
Added:
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java?rev=805477&view=auto
==============================================================================
---
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
(added)
+++
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
Tue Aug 18 16:16:10 2009
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.SocketAddress;
+
+import javax.net.ssl.SSLEngine;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+
+public interface NetworkDriver extends Sender<java.nio.ByteBuffer>
+{
+ // Creates a NetworkDriver which attempts to connect to destination on port
and attaches the ProtocolEngine to
+ // it using the SSLEngine if provided
+ void open(int port, InetAddress destination, ProtocolEngine engine,
+ NetworkDriverConfiguration config, SSLEngine sslEngine)
+ throws OpenException;
+
+ // listens for incoming connections on the specified ports and address and
creates a new NetworkDriver which
+ // processes incoming connections with ProtocolEngines created from factory
using the SSLEngine if provided
+ void bind (int port, InetAddress[] addresses, ProtocolEngineFactory
protocolFactory,
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory)
throws BindException;
+
+ // Returns the remote address of underlying socket
+ SocketAddress getRemoteAddress();
+
+ /**
+ * The length of time after which the ProtocolEngines readIdle() method
should be called if no data has been
+ * read in seconds
+ */
+ void setMaxReadIdle(int idleTime);
+
+ /**
+ * The length of time after which the ProtocolEngines writeIdle() method
should be called if no data has been
+ * written in seconds
+ */
+ void setMaxWriteIdle(int idleTime);
+
+}
\ No newline at end of file
Added:
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java?rev=805477&view=auto
==============================================================================
---
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
(added)
+++
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
Tue Aug 18 16:16:10 2009
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+/**
+ * This interface provides a means for NetworkDrivers to configure TCP options
such as incoming and outgoing
+ * buffer sizes and set particular options on the socket. NetworkDrivers
should honour the values returned
+ * from here if the underlying implementation supports them.
+ */
+public interface NetworkDriverConfiguration
+{
+ // Taken from Socket
+ boolean getKeepAlive();
+ boolean getOOBInline();
+ boolean getReuseAddress();
+ Integer getSoLinger(); // null means off
+ int getSoTimeout();
+ boolean getTcpNoDelay();
+ int getTrafficClass();
+
+ // The amount of memory in bytes to allocate to the incoming buffer
+ int getReceiveBufferSize();
+
+ // The amount of memory in bytes to allocate to the outgoing buffer
+ int getSendBufferSize();
+}
Added:
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java?rev=805477&view=auto
==============================================================================
---
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
(added)
+++
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
Tue Aug 18 16:16:10 2009
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport;
+
+public class OpenException extends Exception
+{
+
+ public OpenException(String string, Throwable lastException)
+ {
+ super(string, lastException);
+ }
+
+}
Added:
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java?rev=805477&view=auto
==============================================================================
---
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
(added)
+++
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
Tue Aug 18 16:16:10 2009
@@ -0,0 +1,379 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network.mina;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLEngine;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.ReadThrottleFilterBuilder;
+import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.SessionUtil;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.thread.QpidThreadExecutor;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.NetworkDriverConfiguration;
+import org.apache.qpid.transport.OpenException;
+
+public class MINANetworkDriver extends IoHandlerAdapter implements
NetworkDriver
+{
+
+ private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
+
+ ProtocolEngine _protocolEngine;
+ private boolean _useNIO = false;
+ private int _processors = 4;
+ private boolean _executorPool = false;
+ private SSLContextFactory _sslFactory = null;
+ private SocketConnector _socketConnector;
+ private IoAcceptor _acceptor;
+ private IoSession _ioSession;
+ private ProtocolEngineFactory _factory;
+ private boolean _protectIO;
+ private NetworkDriverConfiguration _config;
+ private Throwable _lastException;
+ private boolean _acceptingConnections = false;
+
+ public MINANetworkDriver(boolean useNIO, int processors, boolean
executorPool, boolean protectIO)
+ {
+ _useNIO = useNIO;
+ _processors = processors;
+ _executorPool = executorPool;
+ _protectIO = protectIO;
+ }
+
+ public MINANetworkDriver(boolean useNIO, int processors, boolean
executorPool, boolean protectIO,
+ ProtocolEngine protocolEngine, IoSession session)
+ {
+ _useNIO = useNIO;
+ _processors = processors;
+ _executorPool = executorPool;
+ _protectIO = protectIO;
+ _protocolEngine = protocolEngine;
+ _ioSession = session;
+ }
+
+ public MINANetworkDriver()
+ {
+
+ }
+
+ public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory
factory,
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory)
throws BindException
+ {
+
+ _factory = factory;
+ _config = config;
+
+ if (_useNIO)
+ {
+ _acceptor = new
org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(_processors,
+ new NewThreadExecutor());
+ }
+ else
+ {
+ _acceptor = new
org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new
NewThreadExecutor());
+ }
+
+ SocketAcceptorConfig sconfig = (SocketAcceptorConfig)
_acceptor.getDefaultConfig();
+ SocketSessionConfig sc = (SocketSessionConfig)
sconfig.getSessionConfig();
+
+ if (config != null)
+ {
+ sc.setReceiveBufferSize(config.getReceiveBufferSize());
+ sc.setSendBufferSize(config.getSendBufferSize());
+ sc.setTcpNoDelay(config.getTcpNoDelay());
+ }
+
+ // if we do not use the executor pool threading model we get the
default
+ // leader follower
+ // implementation provided by MINA
+ if (_executorPool)
+ {
+ sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
+ }
+
+ if (sslFactory != null)
+ {
+ _sslFactory = sslFactory;
+ }
+
+ if (addresses != null && addresses.length > 0)
+ {
+ for (InetAddress addr : addresses)
+ {
+ try
+ {
+ _acceptor.bind(new InetSocketAddress(addr, port), this,
sconfig);
+ }
+ catch (IOException e)
+ {
+ throw new BindException(String.format("Could not bind to
{0}:{2}", addr, port));
+ }
+ }
+ }
+ else
+ {
+ try
+ {
+ _acceptor.bind(new InetSocketAddress(port), this, sconfig);
+ }
+ catch (IOException e)
+ {
+ throw new BindException(String.format("Could not bind to
*:{1}", port));
+ }
+ }
+ _acceptingConnections = true;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _ioSession.getRemoteAddress();
+ }
+
+ public void open(int port, InetAddress destination, ProtocolEngine engine,
NetworkDriverConfiguration config,
+ SSLEngine sslEngine) throws OpenException
+ {
+ if (_useNIO)
+ {
+ _socketConnector = new MultiThreadSocketConnector(1, new
QpidThreadExecutor());
+ }
+ else
+ {
+ _socketConnector = new SocketConnector(1, new
QpidThreadExecutor()); // non-blocking
+
// connector
+ }
+
+
org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
+ // the MINA default is currently to use the pooled allocator although
this may change in future
+ // once more testing of the performance of the simple allocator has
been done
+ if (!Boolean.getBoolean("amqj.enablePooledAllocator"))
+ {
+ org.apache.mina.common.ByteBuffer.setAllocator(new
SimpleByteBufferAllocator());
+ }
+
+
+ SocketConnectorConfig cfg = (SocketConnectorConfig)
_socketConnector.getDefaultConfig();
+
+ // if we do not use our own thread model we get the MINA default which
is to use
+ // its own leader-follower model
+ boolean readWriteThreading =
Boolean.getBoolean("amqj.shared_read_write_pool");
+ if (readWriteThreading)
+ {
+ cfg.setThreadModel(ReadWriteThreadModel.getInstance());
+ }
+
+ SocketSessionConfig scfg = (SocketSessionConfig)
cfg.getSessionConfig();
+ scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true);
+ scfg.setSendBufferSize((config != null) ? config.getSendBufferSize() :
DEFAULT_BUFFER_SIZE);
+ scfg.setReceiveBufferSize((config != null) ?
config.getReceiveBufferSize() : DEFAULT_BUFFER_SIZE);
+
+ // Don't have the connector's worker thread wait around for other
+ // connections (we only use
+ // one SocketConnector per connection at the moment anyway). This
allows
+ // short-running
+ // clients (like unit tests) to complete quickly.
+ _socketConnector.setWorkerTimeout(0);
+ ConnectFuture future = _socketConnector.connect(new
InetSocketAddress(destination, port), this, cfg);
+ future.join();
+ if (!future.isConnected())
+ {
+ throw new OpenException("Could not open connection",
_lastException);
+ }
+ _ioSession = future.getSession();
+
ReadWriteThreadModel.getInstance().getAsynchronousReadFilter().createNewJobForSession(_ioSession);
+
ReadWriteThreadModel.getInstance().getAsynchronousWriteFilter().createNewJobForSession(_ioSession);
+ _ioSession.setAttachment(engine);
+ engine.setNetworkDriver(this);
+ _protocolEngine = engine;
+ }
+
+ public void setMaxReadIdle(int idleTime)
+ {
+ _ioSession.setIdleTime(IdleStatus.READER_IDLE, idleTime);
+ }
+
+ public void setMaxWriteIdle(int idleTime)
+ {
+ _ioSession.setIdleTime(IdleStatus.WRITER_IDLE, idleTime);
+ }
+
+ public void close()
+ {
+ if (_acceptor != null)
+ {
+ _acceptor.unbindAll();
+ }
+ if (_ioSession != null)
+ {
+ _ioSession.close();
+ }
+ }
+
+ public void flush()
+ {
+ // MINA doesn't support flush
+ }
+
+ public void send(ByteBuffer msg)
+ {
+ WriteFuture future =
_ioSession.write(org.apache.mina.common.ByteBuffer.wrap(msg));
+ future.join();
+ }
+
+ public void setIdleTimeout(long l)
+ {
+ // MINA doesn't support setting SO_TIMEOUT
+ }
+
+ public void exceptionCaught(IoSession protocolSession, Throwable
throwable) throws Exception
+ {
+ if (_protocolEngine != null)
+ {
+ _protocolEngine.exception(throwable);
+ }
+ _lastException = throwable;
+ }
+
+ /**
+ * Invoked when a message is received on a particular protocol session.
Note
+ * that a protocol session is directly tied to a particular physical
+ * connection.
+ *
+ * @param protocolSession
+ * the protocol session that received the message
+ * @param message
+ * the message itself (i.e. a decoded frame)
+ *
+ * @throws Exception
+ * if the message cannot be processed
+ */
+ public void messageReceived(IoSession protocolSession, Object message)
throws Exception
+ {
+ if (message instanceof org.apache.mina.common.ByteBuffer)
+ {
+ ((ProtocolEngine)
protocolSession.getAttachment()).received(((org.apache.mina.common.ByteBuffer)
message).buf());
+ }
+ else
+ {
+ throw new IllegalStateException("Handed unhandled message.
message.class = " + message.getClass() + " message = " + message);
+ }
+ }
+
+ public void sessionClosed(IoSession protocolSession) throws Exception
+ {
+ ((ProtocolEngine) protocolSession.getAttachment()).closed();
+ }
+
+ public void sessionCreated(IoSession protocolSession) throws Exception
+ {
+ if (_acceptingConnections)
+ {
+ // Configure the session with SSL if necessary
+ SessionUtil.initialize(protocolSession);
+ if (_executorPool)
+ {
+ if (_sslFactory != null)
+ {
+
protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
+ }
+ }
+ else
+ {
+ if (_sslFactory != null)
+ {
+
protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
+ }
+ }
+
+ // Do we want to have read/write buffer limits?
+ if (_protectIO)
+ {
+ //Add IO Protection Filters
+ IoFilterChain chain = protocolSession.getFilterChain();
+
+
protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder",
new ExecutorFilter());
+
+ ReadThrottleFilterBuilder readfilter = new
ReadThrottleFilterBuilder();
+
readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize());
+ readfilter.attach(chain);
+
+ WriteBufferLimitFilterBuilder writefilter = new
WriteBufferLimitFilterBuilder();
+
writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize());
+ writefilter.attach(chain);
+
+
protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+ }
+
+ // Set up the protocol engine
+ ProtocolEngine protocolEngine = _factory.newProtocolEngine();
+ MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO,
_processors, _executorPool, _protectIO, protocolEngine, protocolSession);
+ protocolEngine.setNetworkDriver(newDriver);
+ protocolSession.setAttachment(protocolEngine);
+ }
+ }
+
+ public void sessionIdle(IoSession session, IdleStatus status) throws
Exception
+ {
+ if (IdleStatus.WRITER_IDLE.equals(status))
+ {
+ ((ProtocolEngine) session.getAttachment()).writerIdle();
+ }
+ else if (IdleStatus.READER_IDLE.equals(status))
+ {
+ ((ProtocolEngine) session.getAttachment()).readerIdle();
+ }
+ }
+
+ private ProtocolEngine getProtocolEngine()
+ {
+ return _protocolEngine;
+ }
+
+}
Added:
qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java?rev=805477&view=auto
==============================================================================
---
qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
(added)
+++
qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
Tue Aug 18 16:16:10 2009
@@ -0,0 +1,473 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network.mina;
+
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.OpenException;
+
+public class MINANetworkDriverTest extends TestCase
+{
+
+ private static final String TEST_DATA = "YHALOTHAR";
+ private static final int TEST_PORT = 2323;
+ private NetworkDriver _server;
+ private NetworkDriver _client;
+ private CountingProtocolEngine _countingEngine; // Keeps a count of how
many bytes it's read
+ private Exception _thrownEx;
+
+ @Override
+ public void setUp()
+ {
+ _server = new MINANetworkDriver();
+ _client = new MINANetworkDriver();
+ _thrownEx = null;
+ _countingEngine = new CountingProtocolEngine();
+ }
+
+ @Override
+ public void tearDown()
+ {
+ if (_server != null)
+ {
+ _server.close();
+ }
+
+ if (_client != null)
+ {
+ _client.close();
+ }
+ }
+
+ /**
+ * Tests that a socket can't be opened if a driver hasn't been bound
+ * to the port and can be opened if a driver has been bound.
+ * @throws BindException
+ * @throws UnknownHostException
+ * @throws OpenException
+ */
+ public void testBindOpen() throws BindException, UnknownHostException,
OpenException
+ {
+ try
+ {
+ _client.open(TEST_PORT, InetAddress.getLocalHost(),
_countingEngine, null, null);
+ }
+ catch (OpenException e)
+ {
+ _thrownEx = e;
+ }
+
+ assertNotNull("Open should have failed since no engine bound",
_thrownEx);
+
+ _server.bind(TEST_PORT, null, new
EchoProtocolEngineSingletonFactory(), null, null);
+
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine,
null, null);
+ }
+
+ /**
+ * Tests that a socket can't be opened after a bound NetworkDriver has
been closed
+ * @throws BindException
+ * @throws UnknownHostException
+ * @throws OpenException
+ */
+ public void testBindOpenCloseOpen() throws BindException,
UnknownHostException, OpenException
+ {
+ _server.bind(TEST_PORT, null, new
EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine,
null, null);
+ _client.close();
+ _server.close();
+
+ try
+ {
+ _client.open(TEST_PORT, InetAddress.getLocalHost(),
_countingEngine, null, null);
+ }
+ catch (OpenException e)
+ {
+ _thrownEx = e;
+ }
+ assertNotNull("Open should have failed", _thrownEx);
+ }
+
+ /**
+ * Checks that the right exception is thrown when binding a NetworkDriver
to an already
+ * existing socket.
+ */
+ public void testBindPortInUse()
+ {
+ try
+ {
+ _server.bind(TEST_PORT, null, new
EchoProtocolEngineSingletonFactory(), null, null);
+ }
+ catch (BindException e)
+ {
+ fail("First bind should not fail");
+ }
+
+ try
+ {
+ _client.bind(TEST_PORT, null, new
EchoProtocolEngineSingletonFactory(), null, null);
+ }
+ catch (BindException e)
+ {
+ _thrownEx = e;
+ }
+ assertNotNull("Second bind should throw BindException", _thrownEx);
+ }
+
+ /**
+ * tests that bytes sent on a network driver are received at the other end
+ *
+ * @throws UnknownHostException
+ * @throws OpenException
+ * @throws InterruptedException
+ * @throws BindException
+ */
+ public void testSend() throws UnknownHostException, OpenException,
InterruptedException, BindException
+ {
+ // Open a connection from a counting engine to an echo engine
+ _server.bind(TEST_PORT, null, new
EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine,
null, null);
+
+ // Tell the counting engine how much data we're sending
+ _countingEngine.setNewLatch(TEST_DATA.getBytes().length);
+
+ // Send the data and wait for up to 2 seconds to get it back
+ _client.send(ByteBuffer.wrap(TEST_DATA.getBytes()));
+ _countingEngine.getLatch().await(2, TimeUnit.SECONDS);
+
+ // Check what we got
+ assertEquals("Wrong amount of data recieved",
TEST_DATA.getBytes().length, _countingEngine.getReadBytes());
+ }
+
+ /**
+ * Opens a connection with a low read idle and check that it gets triggered
+ * @throws BindException
+ * @throws OpenException
+ * @throws UnknownHostException
+ *
+ */
+ public void testSetReadIdle() throws BindException, UnknownHostException,
OpenException
+ {
+ // Open a connection from a counting engine to an echo engine
+ _server.bind(TEST_PORT, null, new
EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine,
null, null);
+ assertFalse("Reader should not have been idle",
_countingEngine.getReaderHasBeenIdle());
+ _client.setMaxReadIdle(1);
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ // Eat it
+ }
+ assertTrue("Reader should have been idle",
_countingEngine.getReaderHasBeenIdle());
+ }
+
+ /**
+ * Opens a connection with a low write idle and check that it gets
triggered
+ * @throws BindException
+ * @throws OpenException
+ * @throws UnknownHostException
+ *
+ */
+ public void testSetWriteIdle() throws BindException, UnknownHostException,
OpenException
+ {
+ // Open a connection from a counting engine to an echo engine
+ _server.bind(TEST_PORT, null, new
EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine,
null, null);
+ assertFalse("Reader should not have been idle",
_countingEngine.getWriterHasBeenIdle());
+ _client.setMaxWriteIdle(1);
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ // Eat it
+ }
+ assertTrue("Reader should have been idle",
_countingEngine.getWriterHasBeenIdle());
+ }
+
+
+ /**
+ * Creates and then closes a connection from client to server and checks
that the server
+ * has its closed() method called. Then creates a new client and closes
the server to check
+ * that the client has its closed() method called.
+ * @throws BindException
+ * @throws UnknownHostException
+ * @throws OpenException
+ */
+ public void testClosed() throws BindException, UnknownHostException,
OpenException
+ {
+ // Open a connection from a counting engine to an echo engine
+ EchoProtocolEngineSingletonFactory factory = new
EchoProtocolEngineSingletonFactory();
+ _server.bind(TEST_PORT, null, factory, null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine,
null, null);
+ EchoProtocolEngine serverEngine = null;
+ while (serverEngine == null)
+ {
+ serverEngine = factory.getEngine();
+ if (serverEngine == null)
+ {
+ try
+ {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ assertFalse("Server should not have been closed",
serverEngine.getClosed());
+ serverEngine.setNewLatch(1);
+ _client.close();
+ try
+ {
+ serverEngine.getLatch().await(2, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ assertTrue("Server should have been closed", serverEngine.getClosed());
+
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine,
null, null);
+ _countingEngine.setClosed(false);
+ assertFalse("Client should not have been closed",
_countingEngine.getClosed());
+ _countingEngine.setNewLatch(1);
+ _server.close();
+ try
+ {
+ _countingEngine.getLatch().await(2, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ assertTrue("Client should have been closed",
_countingEngine.getClosed());
+ }
+
+ /**
+ * Create a connection and instruct the client to throw an exception when
it gets some data
+ * and that the latch gets counted down.
+ * @throws BindException
+ * @throws UnknownHostException
+ * @throws OpenException
+ * @throws InterruptedException
+ */
+ public void testExceptionCaught() throws BindException,
UnknownHostException, OpenException, InterruptedException
+ {
+ _server.bind(TEST_PORT, null, new
EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine,
null, null);
+
+
+ assertEquals("Exception should not have been thrown", 1,
+ _countingEngine.getExceptionLatch().getCount());
+ _countingEngine.setErrorOnNextRead(true);
+ _countingEngine.setNewLatch(TEST_DATA.getBytes().length);
+ _client.send(ByteBuffer.wrap(TEST_DATA.getBytes()));
+ _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS);
+ assertEquals("Exception should not been thrown", 0,
+ _countingEngine.getExceptionLatch().getCount());
+ }
+
+ /**
+ * Opens a connection and checks that the remote address is the one that
was asked for
+ * @throws BindException
+ * @throws UnknownHostException
+ * @throws OpenException
+ */
+ public void testGetRemoteAddress() throws BindException,
UnknownHostException, OpenException
+ {
+ _server.bind(TEST_PORT, null, new
EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine,
null, null);
+ assertEquals(new InetSocketAddress(InetAddress.getLocalHost(),
TEST_PORT),
+ _client.getRemoteAddress());
+ }
+
+ private class EchoProtocolEngineSingletonFactory implements
ProtocolEngineFactory
+ {
+ EchoProtocolEngine _engine = null;
+
+ public ProtocolEngine newProtocolEngine()
+ {
+ if (_engine == null)
+ {
+ _engine = new EchoProtocolEngine();
+ }
+ return getEngine();
+ }
+
+ public EchoProtocolEngine getEngine()
+ {
+ return _engine;
+ }
+ }
+
+ public class CountingProtocolEngine implements ProtocolEngine
+ {
+
+ protected NetworkDriver _driver;
+ public ArrayList<ByteBuffer> _receivedBytes = new
ArrayList<ByteBuffer>();
+ private int _readBytes;
+ private CountDownLatch _latch = new CountDownLatch(0);
+ private boolean _readerHasBeenIdle;
+ private boolean _writerHasBeenIdle;
+ private boolean _closed = false;
+ private boolean _nextReadErrors = false;
+ private CountDownLatch _exceptionLatch = new CountDownLatch(1);
+
+ public void closed()
+ {
+ setClosed(true);
+ _latch.countDown();
+ }
+
+ public void setErrorOnNextRead(boolean b)
+ {
+ _nextReadErrors = b;
+ }
+
+ public void setNewLatch(int length)
+ {
+ _latch = new CountDownLatch(length);
+ }
+
+ public long getReadBytes()
+ {
+ return _readBytes;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ if (_driver != null)
+ {
+ return _driver.getRemoteAddress();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0;
+ }
+
+ public void readerIdle()
+ {
+ _readerHasBeenIdle = true;
+ }
+
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _driver = driver;
+ }
+
+ public void writeFrame(AMQDataBlock frame)
+ {
+
+ }
+
+ public void writerIdle()
+ {
+ _writerHasBeenIdle = true;
+ }
+
+ public void exception(Throwable t)
+ {
+ _exceptionLatch.countDown();
+ }
+
+ public CountDownLatch getExceptionLatch()
+ {
+ return _exceptionLatch;
+ }
+
+ public void received(ByteBuffer msg)
+ {
+ // increment read bytes and count down the latch for that many
+ int bytes = msg.remaining();
+ _readBytes += bytes;
+ for (int i = 0; i < bytes; i++)
+ {
+ _latch.countDown();
+ }
+
+ // Throw an error if we've been asked too, but we can still count
+ if (_nextReadErrors)
+ {
+ throw new RuntimeException("Was asked to error");
+ }
+ }
+
+ public CountDownLatch getLatch()
+ {
+ return _latch;
+ }
+
+ public boolean getWriterHasBeenIdle()
+ {
+ return _writerHasBeenIdle;
+ }
+
+ public boolean getReaderHasBeenIdle()
+ {
+ return _readerHasBeenIdle;
+ }
+
+ public void setClosed(boolean _closed)
+ {
+ this._closed = _closed;
+ }
+
+ public boolean getClosed()
+ {
+ return _closed;
+ }
+ }
+
+ private class EchoProtocolEngine extends CountingProtocolEngine
+ {
+
+ public void received(ByteBuffer msg)
+ {
+ super.received(msg);
+ msg.rewind();
+ _driver.send(msg);
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]