Author: supun
Date: Tue Jun  8 22:05:58 2010
New Revision: 952841

URL: http://svn.apache.org/viewvc?rev=952841&view=rev
Log:
keeping track of active connections

Modified:
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java?rev=952841&r1=952840&r2=952841&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
 Tue Jun  8 22:05:58 2010
@@ -55,10 +55,14 @@ import org.apache.http.nio.entity.Conten
 import org.apache.http.params.DefaultedHttpParams;
 import org.apache.http.params.HttpParams;
 import org.apache.http.protocol.*;
-import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.synapse.transport.nhttp.debug.ClientConnectionDebug;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * The client connection handler. An instance of this class is used by each 
IOReactor, to
@@ -90,6 +94,14 @@ public class ClientHandler implements NH
     /** Array of content types for which warnings are logged if HTTP status 
code is 500. */
     private String[] warnOnHttp500;
 
+    /** weather we are counting the connections to the back end servers */
+    private boolean countConnections = false;
+    /** lock to update the connection counts in a thread safe way */
+    private Lock lock = new ReentrantLock();
+
+    /** A map for holding the number of open connections for a host:port pair 
*/
+    private Map<String, AtomicInteger> openConnections = new HashMap<String, 
AtomicInteger>();
+
     public static final String OUTGOING_MESSAGE_CONTEXT = 
"synapse.axis2_message_context";
     public static final String AXIS2_HTTP_REQUEST = 
"synapse.axis2-http-request";
     public static final String CLIENT_CONNECTION_DEBUG = 
"synapse.client-connection-debug";
@@ -131,6 +143,10 @@ public class ClientHandler implements NH
         if (contentTypeList != null) {
             warnOnHttp500 = (String[]) contentTypeList;
         }
+        // check weather we count the connections
+        this.countConnections = 
NHttpConfiguration.getInstance().isCountConnections();
+        // set the connection map to the configuration context
+        cfgCtx.setProperty(NhttpConstants.OPEN_CONNNECTIONS_MAP, 
openConnections);
     }
 
     public void requestReady(final NHttpClientConnection conn) {
@@ -164,7 +180,10 @@ public class ClientHandler implements NH
         // record connection creation time for debug logging
         conn.getContext().setAttribute(CONNECTION_CREATION_TIME, 
System.currentTimeMillis());
 
-
+        if (countConnections) {
+            recordConnection(conn);
+        }
+        
         try {
             processConnection(conn, (Axis2HttpRequest) attachment);
         } catch (ConnectionClosedException e) {
@@ -886,13 +905,18 @@ public class ClientHandler implements NH
      * @param conn the connection to be shutdown
      */
     private void shutdownConnection(final NHttpClientConnection conn) {
-
-        if (log.isDebugEnabled() && conn instanceof HttpInetConnection) {
+        if (conn instanceof HttpInetConnection) {
             HttpInetConnection inetConnection = (HttpInetConnection) conn;
-            log.debug("Connection to remote address : " + 
inetConnection.getRemoteAddress()
-                    + ":" + inetConnection.getRemotePort() + " from local 
address : "
-                    + inetConnection.getLocalAddress() + ":" + 
inetConnection.getLocalPort() +
-                    " is closed!");
+            if (log.isDebugEnabled()) {
+                log.debug("Connection to remote address : " + 
inetConnection.getRemoteAddress()
+                        + ":" + inetConnection.getRemotePort() + " from local 
address : "
+                        + inetConnection.getLocalAddress() + ":" + 
inetConnection.getLocalPort() +
+                        " is closed!");
+            }
+            
+            if (countConnections) {
+                removeConnectionRecord(inetConnection);
+            }
         }
 
         HttpContext context = conn.getContext();
@@ -917,6 +941,82 @@ public class ClientHandler implements NH
     }
 
     /**
+     * Remove a connection record for this host:port pair from active 
connections records.
+     *
+     * @param inetConnection connection that need to be removed from the 
active connections records
+     */
+    private void removeConnectionRecord(HttpInetConnection inetConnection) {
+        AtomicInteger connections = openConnections.get(
+                inetConnection.getRemoteAddress().getHostName() + ":"
+                        + inetConnection.getRemotePort());
+        if (connections == null) {
+            connections = openConnections.get(
+                    inetConnection.getRemoteAddress().getHostAddress() + ":"
+                            + inetConnection.getRemotePort());
+        }
+
+        if (connections != null) {
+            int no = connections.getAndDecrement();
+            lock.lock();
+            try {
+                if (no == 0) {
+                    if (null == openConnections.remove(
+                            inetConnection.getRemoteAddress().getHostName()
+                            + ":" + inetConnection.getRemotePort())) {
+
+                    } else {
+                        openConnections.remove(
+                                
inetConnection.getRemoteAddress().getHostAddress()
+                                + ":" + inetConnection.getRemotePort());
+                    }
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    /**
+     * Record a connection in the active connection records.
+     *
+     * @param conn connection to be recorded.
+     */
+    private void recordConnection(NHttpClientConnection conn) {
+        if (conn instanceof HttpInetConnection) {
+            HttpInetConnection inetConnection = (HttpInetConnection) conn;
+            // first we try to get the connection with host_addrss:port key
+            AtomicInteger connections = openConnections.get(
+                    inetConnection.getRemoteAddress().getHostName() + ":"
+                            + inetConnection.getRemotePort());
+            // if we fail try to get the connection with ip_address:port key
+            if (connections == null) {
+                connections = openConnections.get(
+                        inetConnection.getRemoteAddress().getHostAddress() + 
":"
+                                + inetConnection.getRemotePort());
+            }
+
+            lock.lock();
+            try {
+                if (connections == null) {
+                    connections = new AtomicInteger();
+                    if (inetConnection.getRemoteAddress().getHostName() != 
null) {
+                        openConnections.put(
+                                
inetConnection.getRemoteAddress().getHostName() + ":"
+                                + inetConnection.getRemotePort(), connections);
+                    } else {
+                        openConnections.put(
+                                
inetConnection.getRemoteAddress().getHostAddress() + ":"
+                                + inetConnection.getRemotePort(), connections);
+                    }
+                }
+            } finally {
+                lock.unlock();
+            }
+            connections.getAndIncrement();
+        }
+    }
+
+    /**
      * Return the HttpProcessor for requests
      *
      * @return the HttpProcessor that processes requests

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java?rev=952841&r1=952840&r2=952841&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java
 Tue Jun  8 22:05:58 2010
@@ -130,6 +130,10 @@ public final class NHttpConfiguration {
         return getBooleanValue(NhttpConstants.SERVER_HEADER_PRESERVE, true);
     }
 
+    public boolean isCountConnections() {
+        return getBooleanValue(NhttpConstants.COUNT_CONNECTIONS, false);
+    }
+
     /**
      * Get properties that tune nhttp transport. Preference to system 
properties
      * @param name name of the system/config property
@@ -168,6 +172,11 @@ public final class NHttpConfiguration {
                 log.debug("Using nhttp tuning parameter : " + name);
             }
             return true;
+        } else if (val != null && !Boolean.parseBoolean(val)) {
+            if (log.isDebugEnabled()) {
+                log.debug("Using nhttp tuning parameter : " + name);
+            }
+            return false;
         }
         return def;
     }

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java?rev=952841&r1=952840&r2=952841&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
 Tue Jun  8 22:05:58 2010
@@ -73,7 +73,7 @@ public class NhttpConstants {
     /** Defines weather synapse needs to preserve the original Server header. 
Configures
      * through nhttp.properties file or via a system property
      * */
-    public static final String SERVER_HEADER_PRESERVE = 
"http.server.preserve"; 
+    public static final String SERVER_HEADER_PRESERVE = "http.server.preserve";
 
     /** Denotes a connection close is forced if set at the NhttpContext */
     public static final String FORCE_CLOSING = "forceClosing";
@@ -104,4 +104,9 @@ public class NhttpConstants {
     protected static final String PRIORITY_CONFIG_FILE_NAME = 
"priorityConfigFile";
     /* This is a workaround  for  axis2 RestUtils behaviour */
     public static final String REST_REQUEST_CONTENT_TYPE = 
"synapse.internal.rest.contentType";
+    
+    /** This constant is used to hold the open connections map in the 
ConfigurationContext*/
+    public static final String OPEN_CONNNECTIONS_MAP = "OPEN_CONNNECTIONS_MAP";
+    /** Configuration in nhttp.properties file for enable connection counting 
*/
+    public static final String COUNT_CONNECTIONS = "http.count.connections";
 }


Reply via email to