Author: supun
Date: Tue Jun 8 22:05:58 2010
New Revision: 952841
URL: http://svn.apache.org/viewvc?rev=952841&view=rev
Log:
keeping track of active connections
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java?rev=952841&r1=952840&r2=952841&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
Tue Jun 8 22:05:58 2010
@@ -55,10 +55,14 @@ import org.apache.http.nio.entity.Conten
import org.apache.http.params.DefaultedHttpParams;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.*;
-import org.apache.sandesha2.Sandesha2Constants;
import org.apache.synapse.transport.nhttp.debug.ClientConnectionDebug;
import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
* The client connection handler. An instance of this class is used by each
IOReactor, to
@@ -90,6 +94,14 @@ public class ClientHandler implements NH
/** Array of content types for which warnings are logged if HTTP status
code is 500. */
private String[] warnOnHttp500;
+ /** weather we are counting the connections to the back end servers */
+ private boolean countConnections = false;
+ /** lock to update the connection counts in a thread safe way */
+ private Lock lock = new ReentrantLock();
+
+ /** A map for holding the number of open connections for a host:port pair
*/
+ private Map<String, AtomicInteger> openConnections = new HashMap<String,
AtomicInteger>();
+
public static final String OUTGOING_MESSAGE_CONTEXT =
"synapse.axis2_message_context";
public static final String AXIS2_HTTP_REQUEST =
"synapse.axis2-http-request";
public static final String CLIENT_CONNECTION_DEBUG =
"synapse.client-connection-debug";
@@ -131,6 +143,10 @@ public class ClientHandler implements NH
if (contentTypeList != null) {
warnOnHttp500 = (String[]) contentTypeList;
}
+ // check weather we count the connections
+ this.countConnections =
NHttpConfiguration.getInstance().isCountConnections();
+ // set the connection map to the configuration context
+ cfgCtx.setProperty(NhttpConstants.OPEN_CONNNECTIONS_MAP,
openConnections);
}
public void requestReady(final NHttpClientConnection conn) {
@@ -164,7 +180,10 @@ public class ClientHandler implements NH
// record connection creation time for debug logging
conn.getContext().setAttribute(CONNECTION_CREATION_TIME,
System.currentTimeMillis());
-
+ if (countConnections) {
+ recordConnection(conn);
+ }
+
try {
processConnection(conn, (Axis2HttpRequest) attachment);
} catch (ConnectionClosedException e) {
@@ -886,13 +905,18 @@ public class ClientHandler implements NH
* @param conn the connection to be shutdown
*/
private void shutdownConnection(final NHttpClientConnection conn) {
-
- if (log.isDebugEnabled() && conn instanceof HttpInetConnection) {
+ if (conn instanceof HttpInetConnection) {
HttpInetConnection inetConnection = (HttpInetConnection) conn;
- log.debug("Connection to remote address : " +
inetConnection.getRemoteAddress()
- + ":" + inetConnection.getRemotePort() + " from local
address : "
- + inetConnection.getLocalAddress() + ":" +
inetConnection.getLocalPort() +
- " is closed!");
+ if (log.isDebugEnabled()) {
+ log.debug("Connection to remote address : " +
inetConnection.getRemoteAddress()
+ + ":" + inetConnection.getRemotePort() + " from local
address : "
+ + inetConnection.getLocalAddress() + ":" +
inetConnection.getLocalPort() +
+ " is closed!");
+ }
+
+ if (countConnections) {
+ removeConnectionRecord(inetConnection);
+ }
}
HttpContext context = conn.getContext();
@@ -917,6 +941,82 @@ public class ClientHandler implements NH
}
/**
+ * Remove a connection record for this host:port pair from active
connections records.
+ *
+ * @param inetConnection connection that need to be removed from the
active connections records
+ */
+ private void removeConnectionRecord(HttpInetConnection inetConnection) {
+ AtomicInteger connections = openConnections.get(
+ inetConnection.getRemoteAddress().getHostName() + ":"
+ + inetConnection.getRemotePort());
+ if (connections == null) {
+ connections = openConnections.get(
+ inetConnection.getRemoteAddress().getHostAddress() + ":"
+ + inetConnection.getRemotePort());
+ }
+
+ if (connections != null) {
+ int no = connections.getAndDecrement();
+ lock.lock();
+ try {
+ if (no == 0) {
+ if (null == openConnections.remove(
+ inetConnection.getRemoteAddress().getHostName()
+ + ":" + inetConnection.getRemotePort())) {
+
+ } else {
+ openConnections.remove(
+
inetConnection.getRemoteAddress().getHostAddress()
+ + ":" + inetConnection.getRemotePort());
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Record a connection in the active connection records.
+ *
+ * @param conn connection to be recorded.
+ */
+ private void recordConnection(NHttpClientConnection conn) {
+ if (conn instanceof HttpInetConnection) {
+ HttpInetConnection inetConnection = (HttpInetConnection) conn;
+ // first we try to get the connection with host_addrss:port key
+ AtomicInteger connections = openConnections.get(
+ inetConnection.getRemoteAddress().getHostName() + ":"
+ + inetConnection.getRemotePort());
+ // if we fail try to get the connection with ip_address:port key
+ if (connections == null) {
+ connections = openConnections.get(
+ inetConnection.getRemoteAddress().getHostAddress() +
":"
+ + inetConnection.getRemotePort());
+ }
+
+ lock.lock();
+ try {
+ if (connections == null) {
+ connections = new AtomicInteger();
+ if (inetConnection.getRemoteAddress().getHostName() !=
null) {
+ openConnections.put(
+
inetConnection.getRemoteAddress().getHostName() + ":"
+ + inetConnection.getRemotePort(), connections);
+ } else {
+ openConnections.put(
+
inetConnection.getRemoteAddress().getHostAddress() + ":"
+ + inetConnection.getRemotePort(), connections);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ connections.getAndIncrement();
+ }
+ }
+
+ /**
* Return the HttpProcessor for requests
*
* @return the HttpProcessor that processes requests
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java?rev=952841&r1=952840&r2=952841&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NHttpConfiguration.java
Tue Jun 8 22:05:58 2010
@@ -130,6 +130,10 @@ public final class NHttpConfiguration {
return getBooleanValue(NhttpConstants.SERVER_HEADER_PRESERVE, true);
}
+ public boolean isCountConnections() {
+ return getBooleanValue(NhttpConstants.COUNT_CONNECTIONS, false);
+ }
+
/**
* Get properties that tune nhttp transport. Preference to system
properties
* @param name name of the system/config property
@@ -168,6 +172,11 @@ public final class NHttpConfiguration {
log.debug("Using nhttp tuning parameter : " + name);
}
return true;
+ } else if (val != null && !Boolean.parseBoolean(val)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Using nhttp tuning parameter : " + name);
+ }
+ return false;
}
return def;
}
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java?rev=952841&r1=952840&r2=952841&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
Tue Jun 8 22:05:58 2010
@@ -73,7 +73,7 @@ public class NhttpConstants {
/** Defines weather synapse needs to preserve the original Server header.
Configures
* through nhttp.properties file or via a system property
* */
- public static final String SERVER_HEADER_PRESERVE =
"http.server.preserve";
+ public static final String SERVER_HEADER_PRESERVE = "http.server.preserve";
/** Denotes a connection close is forced if set at the NhttpContext */
public static final String FORCE_CLOSING = "forceClosing";
@@ -104,4 +104,9 @@ public class NhttpConstants {
protected static final String PRIORITY_CONFIG_FILE_NAME =
"priorityConfigFile";
/* This is a workaround for axis2 RestUtils behaviour */
public static final String REST_REQUEST_CONTENT_TYPE =
"synapse.internal.rest.contentType";
+
+ /** This constant is used to hold the open connections map in the
ConfigurationContext*/
+ public static final String OPEN_CONNNECTIONS_MAP = "OPEN_CONNNECTIONS_MAP";
+ /** Configuration in nhttp.properties file for enable connection counting
*/
+ public static final String COUNT_CONNECTIONS = "http.count.connections";
}