Author: ruwan
Date: Mon May 3 21:58:40 2010
New Revision: 940649
URL: http://svn.apache.org/viewvc?rev=940649&view=rev
Log:
Fixing the graceful shutdown to properly work on keep alive http connecitons
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2TransportHelper.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java?rev=940649&r1=940648&r2=940649&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java
Mon May 3 21:58:40 2010
@@ -455,6 +455,11 @@ public class Axis2SynapseController impl
log.info(new StringBuilder("Waiting for:
").append(pendingSenderThreads)
.append(" listener threads to complete").toString());
}
+ int activeConnections =
transportHelper.getActiveConnectionsCount();
+ if (activeConnections > 0) {
+ log.info("Waiting for: " + activeConnections
+ + " active connections to be closed..");
+ }
int pendingTransportThreads = pendingListenerThreads +
pendingSenderThreads;
int pendingCallbacks =
ServerManager.getInstance().getCallbackCount();
@@ -480,8 +485,9 @@ public class Axis2SynapseController impl
if (System.currentTimeMillis() < endTime) {
log.info(new StringBuilder("Waiting for a maximum of
another ")
.append((endTime - System.currentTimeMillis()) /
1000)
- .append(" seconds until transport threads and
tasks become idle,")
- .append(" and callbacks complete..").toString());
+ .append(" seconds until transport threads and
tasks become idle, ")
+ .append("active connections to get closed,")
+ .append(" and callbacks to be
completed..").toString());
try {
Thread.sleep(waitIntervalMillis);
} catch (InterruptedException ignore) {
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2TransportHelper.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2TransportHelper.java?rev=940649&r1=940648&r2=940649&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2TransportHelper.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2TransportHelper.java
Mon May 3 21:58:40 2010
@@ -27,6 +27,7 @@ import org.apache.axis2.transport.Transp
import org.apache.axis2.transport.base.ManagementSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.transport.nhttp.HttpCoreNIOListener;
import java.util.Map;
@@ -178,6 +179,19 @@ public class Axis2TransportHelper {
return pendingThreads;
}
+ public int getActiveConnectionsCount() {
+ Map<String, TransportInDescription> trpIns
+ =
configurationContext.getAxisConfiguration().getTransportsIn();
+
+ for (TransportInDescription trpIn : trpIns.values()) {
+ if (trpIn.getReceiver() instanceof HttpCoreNIOListener) {
+ return ((HttpCoreNIOListener)
trpIn.getReceiver()).getActiveConnectionsSize();
+ }
+ }
+
+ return 0;
+ }
+
/**
* Determines the total number of pending sender threads (active + queued).
*
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java?rev=940649&r1=940648&r2=940649&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
Mon May 3 21:58:40 2010
@@ -107,7 +107,7 @@ public class HttpCoreNIOListener impleme
/** Metrics collector for this transport */
private MetricsCollector metrics = new MetricsCollector();
/** state of the listener */
- private int state = BaseConstants.STOPPED;
+ private volatile int state = BaseConstants.STOPPED;
/** The ServerHandler */
private ServerHandler handler = null;
/** This will execute the requests based on calculate priority */
@@ -206,6 +206,10 @@ public class HttpCoreNIOListener impleme
}
}
+ public int getActiveConnectionsSize() {
+ return handler.getActiveConnectionsSize();
+ }
+
private void createPriorityConfiguration(String fileName) throws AxisFault
{
OMElement definitions = null;
try {
@@ -437,6 +441,7 @@ public class HttpCoreNIOListener impleme
if (state != BaseConstants.STARTED) return;
try {
ioReactor.pause();
+ handler.markActiveConnectionsToBeClosed();
state = BaseConstants.PAUSED;
log.info((sslContext == null ? "HTTP" : "HTTPS") + " Listener
Paused");
} catch (IOException e) {
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java?rev=940649&r1=940648&r2=940649&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
Mon May 3 21:58:40 2010
@@ -36,6 +36,7 @@ import org.apache.axis2.transport.base.B
import org.apache.axis2.transport.base.ManagementSupport;
import org.apache.axis2.transport.base.MetricsCollector;
import org.apache.axis2.transport.base.TransportMBeanSupport;
+import org.apache.axis2.util.JavaUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.*;
@@ -93,7 +94,7 @@ public class HttpCoreNIOSender extends A
/** Metrics collector for the sender */
private MetricsCollector metrics = new MetricsCollector();
/** state of the listener */
- private int state = BaseConstants.STOPPED;
+ private volatile int state = BaseConstants.STOPPED;
/** The proxy host */
private String proxyHost = null;
/** The proxy port */
@@ -477,7 +478,7 @@ public class HttpCoreNIOSender extends A
}
}
- if (state == BaseConstants.PAUSED) {
+ if
(JavaUtils.isTrueExplicitly(worker.getConn().getContext().getAttribute("forceClosing")))
{
HttpRequest req = (HttpRequest)
worker.getConn().getContext().getAttribute("http.request");
req.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java?rev=940649&r1=940648&r2=940649&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
Mon May 3 21:58:40 2010
@@ -65,6 +65,11 @@ public class NhttpConstants {
/** The message context property name which holds the exception (if any)
for the last encountered exception */
public static final String ERROR_EXCEPTION = "ERROR_EXCEPTION";
+ /** Denotes a connection close is forced if set at the NhttpContext */
+ public static final String FORCE_CLOSING = "forceClosing";
+ /** Denotes a message is being processed by the current connection if this
is set at the context */
+ public static final String MESSAGE_IN_FLIGHT = "message-in-flight";
+
// ********** DO NOT CHANGE THESE UNLESS CORRESPONDING SYNAPSE CONSTANT
ARE CHANGED ************
public static final int RCV_IO_ERROR_SENDING = 101000;
public static final int RCV_IO_ERROR_RECEIVING = 101001;
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java?rev=940649&r1=940648&r2=940649&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
Mon May 3 21:58:40 2010
@@ -22,6 +22,7 @@ import org.apache.axis2.context.Configur
import org.apache.axis2.transport.base.MetricsCollector;
import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
import org.apache.axis2.transport.base.threads.WorkerPool;
+import org.apache.axis2.util.JavaUtils;
import org.apache.http.*;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.entity.ByteArrayEntity;
@@ -52,6 +53,8 @@ import org.apache.synapse.transport.nhtt
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.HashMap;
@@ -86,6 +89,9 @@ public class ServerHandler implements NH
private WorkerPool workerPool = null;
/** the metrics collector */
private MetricsCollector metrics = null;
+
+ /** keeps track of the connection that are alive in the system */
+ private volatile List<NHttpServerConnection> activeConnections = null;
private Parser parser = null;
@@ -108,6 +114,7 @@ public class ServerHandler implements NH
this.httpProcessor = getHttpProcessor();
this.connStrategy = new DefaultConnectionReuseStrategy();
this.allocator = new HeapByteBufferAllocator();
+ this.activeConnections = new ArrayList<NHttpServerConnection>();
this.cfg = NHttpConfiguration.getInstance();
if (executor == null) {
@@ -132,6 +139,7 @@ public class ServerHandler implements NH
HttpContext context = conn.getContext();
HttpRequest request = conn.getHttpRequest();
context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
+ context.setAttribute(NhttpConstants.MESSAGE_IN_FLIGHT, "true");
// prepare to collect debug information
conn.getContext().setAttribute(
@@ -357,13 +365,33 @@ public class ServerHandler implements NH
}
// record connection creation time for debug logging
conn.getContext().setAttribute(CONNECTION_CREATION_TIME,
System.currentTimeMillis());
+ if (log.isDebugEnabled()) {
+ log.debug("Adding a connection : "
+ + conn + " to the pool, existing pool size : " +
activeConnections.size());
+ }
+ activeConnections.add(conn);
}
public void responseReady(NHttpServerConnection conn) {
+ if
(JavaUtils.isTrueExplicitly(conn.getContext().getAttribute(NhttpConstants.FORCE_CLOSING))
+ && !JavaUtils.isTrueExplicitly(conn.getContext().getAttribute(
+ NhttpConstants.MESSAGE_IN_FLIGHT))) {
+
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Closing a persisted connection since it is
forced : " + conn);
+ }
+ conn.close();
+ } catch (IOException ignore) {}
+
+ return;
+ }
+
metrics.notifyReceivedMessageSize(conn.getMetrics().getReceivedBytesCount());
metrics.notifySentMessageSize(conn.getMetrics().getSentBytesCount());
conn.getMetrics().reset();
+ conn.getContext().removeAttribute(NhttpConstants.MESSAGE_IN_FLIGHT);
if (log.isTraceEnabled()) {
log.trace("Ready to send response");
@@ -384,6 +412,17 @@ public class ServerHandler implements NH
}
}
+ public void markActiveConnectionsToBeClosed() {
+ log.info("Marking the closing signal on the connection pool of size : "
+ + activeConnections.size());
+ synchronized (this) {
+ for (NHttpServerConnection conn : activeConnections) {
+ conn.getContext().setAttribute(NhttpConstants.FORCE_CLOSING,
"true");
+ conn.requestOutput();
+ }
+ }
+ }
+
/**
* Handle HTTP Protocol violations with an error response
* @param conn the connection being processed
@@ -469,11 +508,23 @@ public class ServerHandler implements NH
if (inputBuffer != null) {
inputBuffer.close();
}
+
+ synchronized (this) {
+ if (activeConnections.remove(conn) && log.isDebugEnabled()) {
+ log.debug("Removing the connection : " + conn
+ + " from pool of size : " + activeConnections.size());
+ }
+ }
+
try {
conn.shutdown();
} catch (IOException ignore) {}
}
+ public int getActiveConnectionsSize() {
+ return activeConnections.size();
+ }
+
/**
* Return the HttpProcessor for responses
* @return the HttpProcessor that processes HttpResponses of this server