Author: robbie
Date: Thu Jul 7 15:08:44 2011
New Revision: 1143865
URL: http://svn.apache.org/viewvc?rev=1143865&view=rev
Log:
QPID-3341: remove unused/dead transport code and accompanying implementation
classes
Applied patch by Keith Wall and myself.
Removed:
qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java
qpid/trunk/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java
qpid/trunk/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java
Modified:
qpid/trunk/qpid/java/broker/etc/config.xml
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
qpid/trunk/qpid/java/systests/etc/config-systests-firewall-2.xml
qpid/trunk/qpid/java/systests/etc/config-systests-firewall-3.xml
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java
Modified: qpid/trunk/qpid/java/broker/etc/config.xml
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/etc/config.xml?rev=1143865&r1=1143864&r2=1143865&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/etc/config.xml (original)
+++ qpid/trunk/qpid/java/broker/etc/config.xml Thu Jul 7 15:08:44 2011
@@ -37,17 +37,10 @@
<keystorePath>/path/to/keystore.ks</keystorePath>
<keystorePassword>keystorepass</keystorePassword>
</ssl>
- <qpidnio>false</qpidnio>
- <protectio>
- <enabled>false</enabled>
- <readBufferLimitSize>262144</readBufferLimitSize>
- <writeBufferLimitSize>262144</writeBufferLimitSize>
- </protectio>
- <transport>nio</transport>
<port>5672</port>
<sslport>8672</sslport>
- <socketReceiveBuffer>32768</socketReceiveBuffer>
- <socketSendBuffer>32768</socketSendBuffer>
+ <socketReceiveBuffer>262144</socketReceiveBuffer>
+ <socketSendBuffer>262144</socketSendBuffer>
</connector>
<management>
<enabled>true</enabled>
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=1143865&r1=1143864&r2=1143865&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
Thu Jul 7 15:08:44 2011
@@ -52,9 +52,7 @@ public class ServerConfiguration extends
protected static final Logger _logger =
Logger.getLogger(ServerConfiguration.class);
// Default Configuration values
- public static final int DEFAULT_BUFFER_READ_LIMIT_SIZE = 262144;
- public static final int DEFAULT_BUFFER_WRITE_LIMIT_SIZE = 262144;
- public static final boolean DEFAULT_BROKER_CONNECTOR_PROTECTIO_ENABLED =
false;
+ public static final int DEFAULT_BUFFER_SIZE = 262144;
public static final String DEFAULT_STATUS_UPDATES = "on";
public static final String SECURITY_CONFIG_RELOADED = "SECURITY
CONFIGURATION RELOADED";
@@ -84,9 +82,6 @@ public class ServerConfiguration extends
// Configuration values to be read from the configuration file
//todo Move all properties to static values to ensure system testing can
be performed.
- public static final String CONNECTOR_PROTECTIO_ENABLED =
"connector.protectio.enabled";
- public static final String CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE =
"connector.protectio.readBufferLimitSize";
- public static final String CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE =
"connector.protectio.writeBufferLimitSize";
public static final String MGMT_CUSTOM_REGISTRY_SOCKET =
"management.custom-registry-socket";
public static final String STATUS_UPDATES = "status-updates";
public static final String ADVANCED_LOCALE = "advanced.locale";
@@ -95,7 +90,6 @@ public class ServerConfiguration extends
envVarMap.put("QPID_PORT", "connector.port");
envVarMap.put("QPID_ENABLEDIRECTBUFFERS",
"advanced.enableDirectBuffers");
envVarMap.put("QPID_SSLPORT", "connector.ssl.port");
- envVarMap.put("QPID_NIO", "connector.qpidnio");
envVarMap.put("QPID_WRITEBIASED", "advanced.useWriteBiasedPool");
envVarMap.put("QPID_JMXPORT", "management.jmxport");
envVarMap.put("QPID_FRAMESIZE", "advanced.framesize");
@@ -545,21 +539,6 @@ public class ServerConfiguration extends
return getIntValue("advanced.framesize", DEFAULT_FRAME_SIZE);
}
- public boolean getProtectIOEnabled()
- {
- return getBooleanValue(CONNECTOR_PROTECTIO_ENABLED,
DEFAULT_BROKER_CONNECTOR_PROTECTIO_ENABLED);
- }
-
- public int getBufferReadLimit()
- {
- return getIntValue(CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE,
DEFAULT_BUFFER_READ_LIMIT_SIZE);
- }
-
- public int getBufferWriteLimit()
- {
- return getIntValue(CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE,
DEFAULT_BUFFER_WRITE_LIMIT_SIZE);
- }
-
public boolean getSynchedClocks()
{
return getBooleanValue("advanced.synced-clocks");
@@ -687,12 +666,12 @@ public class ServerConfiguration extends
public int getReceiveBufferSize()
{
- return getIntValue("connector.socketReceiveBuffer", 32767);
+ return getIntValue("connector.socketReceiveBuffer",
DEFAULT_BUFFER_SIZE);
}
public int getWriteBufferSize()
{
- return getIntValue("connector.socketWriteBuffer", 32767);
+ return getIntValue("connector.socketWriteBuffer", DEFAULT_BUFFER_SIZE);
}
public boolean getTcpNoDelay()
@@ -735,11 +714,6 @@ public class ServerConfiguration extends
return getStringValue("connector.ssl.certType", "SunX509");
}
- public boolean getQpidNIO()
- {
- return getBooleanValue("connector.qpidnio");
- }
-
public boolean getUseBiasedWrites()
{
return getBooleanValue("advanced.useWriteBiasedPool");
@@ -809,8 +783,7 @@ public class ServerConfiguration extends
public Boolean getTcpNoDelay()
{
- // Can't call parent getTcpNoDelay since it just calls this one
- return getBooleanValue("connector.tcpNoDelay", true);
+ return ServerConfiguration.this.getTcpNoDelay();
}
public Integer getSoTimeout()
@@ -825,7 +798,7 @@ public class ServerConfiguration extends
public Integer getSendBufferSize()
{
- return getBufferWriteLimit();
+ return ServerConfiguration.this.getWriteBufferSize();
}
public Boolean getReuseAddress()
@@ -835,7 +808,7 @@ public class ServerConfiguration extends
public Integer getReceiveBufferSize()
{
- return getBufferReadLimit();
+ return ServerConfiguration.this.getReceiveBufferSize();
}
public Boolean getOOBInline()
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java?rev=1143865&r1=1143864&r2=1143865&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
Thu Jul 7 15:08:44 2011
@@ -187,49 +187,6 @@ public class ServerConfigurationTest ext
assertEquals(23, serverConfig.getFrameSize());
}
- public void testGetProtectIOEnabled() throws ConfigurationException
- {
- // Check default
- ServerConfiguration serverConfig = new ServerConfiguration(_config);
- serverConfig.initialise();
- assertEquals(false, serverConfig.getProtectIOEnabled());
-
- // Check value we set
- _config.setProperty(ServerConfiguration.CONNECTOR_PROTECTIO_ENABLED,
true);
- serverConfig = new ServerConfiguration(_config);
- serverConfig.initialise();
- assertEquals(true, serverConfig.getProtectIOEnabled());
- }
-
- public void testGetBufferReadLimit() throws ConfigurationException
- {
- // Check default
- ServerConfiguration serverConfig = new ServerConfiguration(_config);
- serverConfig.initialise();
- assertEquals(262144, serverConfig.getBufferReadLimit());
-
- // Check value we set
-
_config.setProperty(ServerConfiguration.CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE,
23);
- serverConfig = new ServerConfiguration(_config);
- serverConfig.initialise();
- assertEquals(23, serverConfig.getBufferReadLimit());
- }
-
- public void testGetBufferWriteLimit() throws ConfigurationException
- {
- // Check default
- ServerConfiguration serverConfig = new ServerConfiguration(_config);
- serverConfig.initialise();
- assertEquals(262144, serverConfig.getBufferWriteLimit());
-
- // Check value we set
-
_config.setProperty(ServerConfiguration.CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE,
23);
- serverConfig = new ServerConfiguration(_config);
- serverConfig.initialise();
- assertEquals(23, serverConfig.getBufferWriteLimit());
- }
-
-
public void testGetStatusEnabled() throws ConfigurationException
{
// Check default
@@ -543,7 +500,7 @@ public class ServerConfigurationTest ext
// Check default
ServerConfiguration serverConfig = new ServerConfiguration(_config);
serverConfig.initialise();
- assertEquals(32767, serverConfig.getReceiveBufferSize());
+ assertEquals(ServerConfiguration.DEFAULT_BUFFER_SIZE,
serverConfig.getReceiveBufferSize());
// Check value we set
_config.setProperty("connector.socketReceiveBuffer", "23");
@@ -557,7 +514,7 @@ public class ServerConfigurationTest ext
// Check default
ServerConfiguration serverConfig = new ServerConfiguration(_config);
serverConfig.initialise();
- assertEquals(32767, serverConfig.getWriteBufferSize());
+ assertEquals(ServerConfiguration.DEFAULT_BUFFER_SIZE,
serverConfig.getWriteBufferSize());
// Check value we set
_config.setProperty("connector.socketWriteBuffer", "23");
@@ -678,20 +635,6 @@ public class ServerConfigurationTest ext
assertEquals("a", serverConfig.getCertType());
}
- public void testGetQpidNIO() throws ConfigurationException
- {
- // Check default
- ServerConfiguration serverConfig = new ServerConfiguration(_config);
- serverConfig.initialise();
- assertEquals(false, serverConfig.getQpidNIO());
-
- // Check value we set
- _config.setProperty("connector.qpidnio", true);
- serverConfig = new ServerConfiguration(_config);
- serverConfig.initialise();
- assertEquals(true, serverConfig.getQpidNIO());
- }
-
public void testGetUseBiasedWrites() throws ConfigurationException
{
// Check default
@@ -756,7 +699,7 @@ public class ServerConfigurationTest ext
out.close();
out = new FileWriter(fileB);
-
out.write("<broker><connector><ssl><port>2345</port></ssl><qpidnio>true</qpidnio></connector></broker>");
+
out.write("<broker><connector><ssl><port>2345</port></ssl></connector></broker>");
out.close();
ServerConfiguration config = new
ServerConfiguration(mainFile.getAbsoluteFile());
@@ -767,8 +710,6 @@ public class ServerConfigurationTest ext
assertEquals(1, config.getPorts().size());
assertEquals("2342", config.getPorts().get(0)); // From the first
file, not
// present in the second
- assertEquals(true, config.getQpidNIO()); // From the second file, not
- // present in the first
}
public void testVariableInterpolation() throws Exception
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1143865&r1=1143864&r2=1143865&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
Thu Jul 7 15:08:44 2011
@@ -89,15 +89,8 @@ public class AMQConnectionDelegate_8_0 i
StateWaiter waiter =
_conn._protocolHandler.createWaiter(openOrClosedStates);
- // TODO: use system property thingy for this
- if (System.getProperty("UseTransportIo", "false").equals("false"))
- {
-
TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler,
brokerDetail);
- }
- else
- {
- _conn.getProtocolHandler().createIoTransportSession(brokerDetail);
- }
+
TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler,
brokerDetail);
+
_conn._protocolHandler.getProtocolSession().init();
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1143865&r1=1143864&r2=1143865&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
Thu Jul 7 15:08:44 2011
@@ -211,21 +211,6 @@ public class AMQProtocolHandler implemen
}
/**
- * Called when we want to create a new IoTransport session
- * @param brokerDetail
- */
- public void createIoTransportSession(BrokerDetails brokerDetail)
- {
- _protocolSession = new AMQProtocolSession(this, _connection);
- _stateManager.setProtocolSession(_protocolSession);
- IoTransport.connect_0_9(getProtocolSession(),
- brokerDetail.getHost(),
- brokerDetail.getPort(),
-
brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL));
- _protocolSession.init();
- }
-
- /**
* Called when the network connection is closed. This can happen, either
because the client explicitly requested
* that the connection be closed, in which case nothing is done, or
because the connection died. In the case
* where the connection died, an attempt to failover automatically to a
new connection may be started. The failover
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=1143865&r1=1143864&r2=1143865&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
Thu Jul 7 15:08:44 2011
@@ -30,7 +30,6 @@ import org.apache.mina.common.IoConnecto
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
-import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
@@ -116,20 +115,8 @@ public class TransportConnection
{
public IoConnector newSocketConnector()
{
- SocketConnector result;
- // FIXME - this needs to be sorted to use the new Mina
MultiThread SA.
- if (Boolean.getBoolean("qpidnio"))
- {
- _logger.warn("Using Qpid MultiThreaded NIO - " +
(System.getProperties().containsKey("qpidnio")
-
? "Qpid NIO is new default"
-
: "Sysproperty 'qpidnio' is set"));
- result = new MultiThreadSocketConnector(1, new
QpidThreadExecutor());
- }
- else
- {
- _logger.info("Using Mina NIO");
- result = new SocketConnector(1, new
QpidThreadExecutor()); // non-blocking connector
- }
+ SocketConnector result = new SocketConnector(1, new
QpidThreadExecutor()); // non-blocking connector
+
// Don't have the connector's worker thread wait
around for other connections (we only use
// one SocketConnector per connection at the moment
anyway). This allows short-running
// clients (like unit tests) to complete quickly.
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java?rev=1143865&r1=1143864&r2=1143865&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
Thu Jul 7 15:08:44 2011
@@ -86,35 +86,6 @@ public class ClientProperties
public static final String USE_LEGACY_MAP_MESSAGE_FORMAT =
"qpid.use_legacy_map_message";
- /**
- * ==========================================================
- * Those properties are used when the io size should be bounded
- * ==========================================================
- */
-
- /**
- * When set to true the io layer throttle down producers and consumers
- * when written or read messages are accumulating and exceeding a certain
size.
- * This is especially useful when a the producer rate is greater than the
network
- * speed.
- * type: boolean
- */
- public static final String PROTECTIO_PROP_NAME = "protectio";
-
- //=== The following properties are only used when the previous one is true.
- /**
- * Max size of read messages that can be stored within the MINA layer
- * type: int
- */
- public static final String READ_BUFFER_LIMIT_PROP_NAME =
"qpid.read.buffer.limit";
- public static final String READ_BUFFER_LIMIT_DEFAULT = "262144";
- /**
- * Max size of written messages that can be stored within the MINA layer
- * type: int
- */
- public static final String WRITE_BUFFER_LIMIT_PROP_NAME =
"qpid.read.buffer.limit";
- public static final String WRITE_BUFFER_LIMIT_DEFAULT = "262144";
-
public static final String AMQP_VERSION = "qpid.amqp.version";
private static ClientProperties _instance = new ClientProperties();
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=1143865&r1=1143864&r2=1143865&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
Thu Jul 7 15:08:44 2011
@@ -19,25 +19,16 @@
package org.apache.qpid.transport.network.io;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.net.Socket;
-import java.net.SocketException;
import java.nio.ByteBuffer;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.Binding;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionDelegate;
-import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.ConnectionBinding;
import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
import org.apache.qpid.transport.network.security.ssl.SSLSender;
import org.apache.qpid.transport.util.Logger;
@@ -134,82 +125,6 @@ public final class IoTransport<E> implem
return socket;
}
- public static final <E> E connect(String host, int port,
- Binding<E,ByteBuffer> binding,
- boolean ssl)
- {
- Socket socket = createSocket(host, port);
- IoTransport<E> transport = new IoTransport<E>(socket, binding,ssl);
- return transport.endpoint;
- }
-
- public static final Connection connect(String host, int port,
- ConnectionDelegate delegate,
- boolean ssl)
- {
- return connect(host, port, ConnectionBinding.get(delegate),ssl);
- }
-
- public static void connect_0_9(AMQVersionAwareProtocolSession session,
String host, int port, boolean ssl)
- {
- connect(host, port, new Binding_0_9(session),ssl);
- }
-
- private static class Binding_0_9
- implements Binding<AMQVersionAwareProtocolSession,ByteBuffer>
- {
-
- private AMQVersionAwareProtocolSession session;
-
- Binding_0_9(AMQVersionAwareProtocolSession session)
- {
- this.session = session;
- }
-
- public AMQVersionAwareProtocolSession endpoint(Sender<ByteBuffer>
sender)
- {
- session.setSender(sender);
- return session;
- }
-
- public Receiver<ByteBuffer> receiver(AMQVersionAwareProtocolSession
ssn)
- {
- return new InputHandler_0_9(ssn);
- }
-
- }
-
- private static Socket createSocket(String host, int port)
- {
- try
- {
- InetAddress address = InetAddress.getByName(host);
- Socket socket = new Socket();
- socket.setReuseAddress(true);
- socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
-
- log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
- log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
-
- socket.setSendBufferSize(writeBufferSize);
- socket.setReceiveBufferSize(readBufferSize);
-
- log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
- log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
-
- socket.connect(new InetSocketAddress(address, port));
- return socket;
- }
- catch (SocketException e)
- {
- throw new TransportException("Error connecting to broker", e);
- }
- catch (IOException e)
- {
- throw new TransportException("Error connecting to broker", e);
- }
- }
-
private SSLContext createSSLContext() throws Exception
{
String trustStorePath = System.getProperty("javax.net.ssl.trustStore");
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java?rev=1143865&r1=1143864&r2=1143865&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
Thu Jul 7 15:08:44 2011
@@ -26,16 +26,11 @@ import org.apache.mina.common.ExecutorTh
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.common.WriteFuture;
-import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
-import org.apache.mina.filter.executor.ExecutorFilter;
-import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
@@ -66,16 +61,12 @@ public class MINANetworkDriver extends I
private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
ProtocolEngine _protocolEngine;
- private boolean _useNIO = false;
private int _processors = 4;
- private boolean _executorPool = false;
private SSLContextFactory _sslFactory = null;
private IoConnector _socketConnector;
private IoAcceptor _acceptor;
private IoSession _ioSession;
private ProtocolEngineFactory _factory;
- private boolean _protectIO;
- private NetworkDriverConfiguration _config;
private Throwable _lastException;
private boolean _acceptingConnections = false;
@@ -91,21 +82,9 @@ public class MINANetworkDriver extends I
org.apache.mina.common.ByteBuffer.setAllocator(new
SimpleByteBufferAllocator());
}
- public MINANetworkDriver(boolean useNIO, int processors, boolean
executorPool, boolean protectIO)
+ public MINANetworkDriver(int processors, ProtocolEngine protocolEngine,
IoSession session)
{
- _useNIO = useNIO;
_processors = processors;
- _executorPool = executorPool;
- _protectIO = protectIO;
- }
-
- public MINANetworkDriver(boolean useNIO, int processors, boolean
executorPool, boolean protectIO,
- ProtocolEngine protocolEngine, IoSession session)
- {
- _useNIO = useNIO;
- _processors = processors;
- _executorPool = executorPool;
- _protectIO = protectIO;
_protocolEngine = protocolEngine;
_ioSession = session;
_ioSession.setAttachment(_protocolEngine);
@@ -132,17 +111,8 @@ public class MINANetworkDriver extends I
{
_factory = factory;
- _config = config;
- if (_useNIO)
- {
- _acceptor = new
org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(_processors,
- new NewThreadExecutor());
- }
- else
- {
- _acceptor = new
org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new
NewThreadExecutor());
- }
+ _acceptor = new
org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new
NewThreadExecutor());
SocketAcceptorConfig sconfig = (SocketAcceptorConfig)
_acceptor.getDefaultConfig();
sconfig.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Acceptor)"));
@@ -207,15 +177,7 @@ public class MINANetworkDriver extends I
_sslFactory = sslFactory;
}
- if (_useNIO)
- {
- _socketConnector = new MultiThreadSocketConnector(1, new
QpidThreadExecutor());
- }
- else
- {
- _socketConnector = new SocketConnector(1, new
QpidThreadExecutor()); // non-blocking
-
// connector
- }
+ _socketConnector = new SocketConnector(1, new QpidThreadExecutor());
// non-blocking connector
SocketConnectorConfig cfg = (SocketConnectorConfig)
_socketConnector.getDefaultConfig();
String s = "";
@@ -351,39 +313,10 @@ public class MINANetworkDriver extends I
{
// Configure the session with SSL if necessary
SessionUtil.initialize(protocolSession);
- if (_executorPool)
+ if (_sslFactory != null)
{
- if (_sslFactory != null)
- {
-
protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
- new SSLFilter(_sslFactory.buildServerContext()));
- }
- }
- else
- {
- if (_sslFactory != null)
- {
- protocolSession.getFilterChain().addBefore("protocolFilter",
"sslFilter",
- new SSLFilter(_sslFactory.buildServerContext()));
- }
- }
- // Do we want to have read/write buffer limits?
- if (_protectIO)
- {
- //Add IO Protection Filters
- IoFilterChain chain = protocolSession.getFilterChain();
-
-
protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder",
new ExecutorFilter());
-
- ReadThrottleFilterBuilder readfilter = new
ReadThrottleFilterBuilder();
-
readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize());
- readfilter.attach(chain);
-
- WriteBufferLimitFilterBuilder writefilter = new
WriteBufferLimitFilterBuilder();
-
writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize());
- writefilter.attach(chain);
-
-
protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+ protocolSession.getFilterChain().addBefore("protocolFilter",
"sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
}
if (_ioSession == null)
@@ -395,7 +328,7 @@ public class MINANetworkDriver extends I
{
// Set up the protocol engine
ProtocolEngine protocolEngine = _factory.newProtocolEngine(this);
- MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO,
_processors, _executorPool, _protectIO, protocolEngine, protocolSession);
+ MINANetworkDriver newDriver = new MINANetworkDriver(_processors,
protocolEngine, protocolSession);
protocolEngine.setNetworkDriver(newDriver);
}
}
Modified: qpid/trunk/qpid/java/systests/etc/config-systests-firewall-2.xml
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/etc/config-systests-firewall-2.xml?rev=1143865&r1=1143864&r2=1143865&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/etc/config-systests-firewall-2.xml (original)
+++ qpid/trunk/qpid/java/systests/etc/config-systests-firewall-2.xml Thu Jul 7
15:08:44 2011
@@ -35,17 +35,10 @@
<keystorePath>/path/to/keystore.ks</keystorePath>
<keystorePassword>keystorepass</keystorePassword>
</ssl>
- <qpidnio>false</qpidnio>
- <protectio>
- <enabled>false</enabled>
- <readBufferLimitSize>262144</readBufferLimitSize>
- <writeBufferLimitSize>262144</writeBufferLimitSize>
- </protectio>
- <transport>nio</transport>
<port>5672</port>
<sslport>8672</sslport>
- <socketReceiveBuffer>32768</socketReceiveBuffer>
- <socketSendBuffer>32768</socketSendBuffer>
+ <socketReceiveBuffer>262144</socketReceiveBuffer>
+ <socketSendBuffer>262144</socketSendBuffer>
</connector>
<management>
<enabled>false</enabled>
Modified: qpid/trunk/qpid/java/systests/etc/config-systests-firewall-3.xml
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/etc/config-systests-firewall-3.xml?rev=1143865&r1=1143864&r2=1143865&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/etc/config-systests-firewall-3.xml (original)
+++ qpid/trunk/qpid/java/systests/etc/config-systests-firewall-3.xml Thu Jul 7
15:08:44 2011
@@ -35,17 +35,10 @@
<keystorePath>/path/to/keystore.ks</keystorePath>
<keystorePassword>keystorepass</keystorePassword>
</ssl>
- <qpidnio>false</qpidnio>
- <protectio>
- <enabled>false</enabled>
- <readBufferLimitSize>262144</readBufferLimitSize>
- <writeBufferLimitSize>262144</writeBufferLimitSize>
- </protectio>
- <transport>nio</transport>
<port>5672</port>
<sslport>8672</sslport>
- <socketReceiveBuffer>32768</socketReceiveBuffer>
- <socketSendBuffer>32768</socketSendBuffer>
+ <socketReceiveBuffer>262144</socketReceiveBuffer>
+ <socketSendBuffer>262144</socketSendBuffer>
</connector>
<management>
<enabled>false</enabled>
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java?rev=1143865&r1=1143864&r2=1143865&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java
Thu Jul 7 15:08:44 2011
@@ -61,21 +61,6 @@ public class ServerConfigurationFileTest
_serverConfig.getConfig().getProperty(property));
}
- public void testProtectIOEnabled() throws ConfigurationException
- {
-
validatePropertyDefinedInFile(ServerConfiguration.CONNECTOR_PROTECTIO_ENABLED);
- }
-
- public void testProtectIOReadBufferLimitSize() throws
ConfigurationException
- {
-
validatePropertyDefinedInFile(ServerConfiguration.CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE);
- }
-
- public void testProtectIOWriteBufferLimitSize() throws
ConfigurationException
- {
-
validatePropertyDefinedInFile(ServerConfiguration.CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE);
- }
-
public void testStatusUpdates() throws ConfigurationException
{
validatePropertyDefinedInFile(ServerConfiguration.STATUS_UPDATES);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]