Author: kwall
Date: Thu Nov 19 14:59:31 2015
New Revision: 1715193

URL: http://svn.apache.org/viewvc?rev=1715193&view=rev
Log:
QPID-6871: [Java Broker] Make accepting port socket backlog configurable with 
context variable

Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1715193&r1=1715192&r2=1715193&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
 Thu Nov 19 14:59:31 2015
@@ -57,6 +57,7 @@ public interface AmqpPort<X extends Amqp
     String PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT = 
"qpid.port.amqp.threadPool.keep_alive_timeout";
 
     String PORT_AMQP_NUMBER_OF_SELECTORS = 
"qpid.port.amqp.threadPool.numberOfSelectors";
+    String PORT_AMQP_ACCEPT_BACKLOG = "qpid.port.amqp.acceptBacklog";
     String PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE = 
"qpid.port.amqp.outboundMessageBufferSize";
 
     @ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS)
@@ -76,9 +77,13 @@ public interface AmqpPort<X extends Amqp
     long DEFAULT_PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT = 60; // Minutes
 
     @SuppressWarnings("unused")
-    @ManagedContextDefault( name = PORT_AMQP_NUMBER_OF_SELECTORS)
+    @ManagedContextDefault(name = PORT_AMQP_NUMBER_OF_SELECTORS)
     long DEFAULT_PORT_AMQP_NUMBER_OF_SELECTORS = 
Math.max(DEFAULT_PORT_AMQP_THREAD_POOL_SIZE / 8, 1);
 
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = PORT_AMQP_ACCEPT_BACKLOG)
+    int DEFAULT_PORT_AMQP_ACCEPT_BACKLOG = 1024;
+
     String PORT_MAX_MESSAGE_SIZE = "qpid.port.max_message_size";
 
     @ManagedContextDefault(name = PORT_MAX_MESSAGE_SIZE)

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1715193&r1=1715192&r2=1715193&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
 Thu Nov 19 14:59:31 2015
@@ -38,14 +38,13 @@ import org.apache.qpid.transport.Transpo
 import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.TransportEncryption;
 import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
-import org.apache.qpid.transport.network.io.IoNetworkTransport;
 
 import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
 
 public class NonBlockingNetworkTransport
 {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(IoNetworkTransport.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(NonBlockingNetworkTransport.class);
     private static final int TIMEOUT = 
Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
                                                           
CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
     private static final int HANDSHAKE_TIMEOUT = 
Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
@@ -55,13 +54,13 @@ public class NonBlockingNetworkTransport
     private final ServerSocketChannel _serverSocket;
     private final int _timeout;
     private final NetworkConnectionScheduler _scheduler;
-    private final AmqpPort _port;
+    private final AmqpPort<?> _port;
     private final InetSocketAddress _address;
 
     public NonBlockingNetworkTransport(final MultiVersionProtocolEngineFactory 
factory,
                                        final EnumSet<TransportEncryption> 
encryptionSet,
                                        final NetworkConnectionScheduler 
scheduler,
-                                       final AmqpPort port)
+                                       final AmqpPort<?> port)
     {
         try
         {
@@ -85,10 +84,11 @@ public class NonBlockingNetworkTransport
                 _address = new InetSocketAddress(bindingAddress, portNumber);
             }
 
+            int acceptBacklog = port.getContextValue(Integer.class, 
AmqpPort.PORT_AMQP_ACCEPT_BACKLOG);
             _serverSocket =  ServerSocketChannel.open();
 
             _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
-            _serverSocket.bind(_address);
+            _serverSocket.bind(_address, acceptBacklog);
             _serverSocket.configureBlocking(false);
             _encryptionSet = encryptionSet;
             _scheduler = scheduler;
@@ -99,7 +99,6 @@ public class NonBlockingNetworkTransport
         {
             throw new TransportException("Failed to start AMQP on port : " + 
port, e);
         }
-
     }
 
     public void start()
@@ -142,7 +141,6 @@ public class NonBlockingNetworkTransport
                 if (engine != null)
                 {
                     socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, 
_port.isTcpNoDelay());
-                    socketChannel.socket().setSoTimeout(1000 * 
HANDSHAKE_TIMEOUT);
 
                     final int bufferSize = _port.getNetworkBufferSize();
 

Modified: 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1715193&r1=1715192&r2=1715193&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
 Thu Nov 19 14:59:31 2015
@@ -117,6 +117,7 @@ public class TCPandSSLTransportTest exte
         when(port.getSSLContext()).thenReturn(sslContext);
         when(port.getContextValue(Long.class, 
AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)).thenReturn(1l);
         when(port.getContextValue(Long.class, 
AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
+        when(port.getContextValue(Integer.class, 
AmqpPort.PORT_AMQP_ACCEPT_BACKLOG)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_ACCEPT_BACKLOG);
 
         TCPandSSLTransport transport = new TCPandSSLTransport(new 
HashSet<>(Arrays.asList(transports)),
                                                               port,



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to