Author: hiranya
Date: Thu Aug 15 20:53:20 2013
New Revision: 1514487
URL: http://svn.apache.org/r1514487
Log:
Refactored the code related to closing/releasing http connections in the PT
transport
Modified:
synapse/trunk/java/modules/documentation/src/site/xdoc/userguide/transports/pass_through.xml
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceResponse.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetResponse.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/PassThroughTransportUtils.java
Modified:
synapse/trunk/java/modules/documentation/src/site/xdoc/userguide/transports/pass_through.xml
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/documentation/src/site/xdoc/userguide/transports/pass_through.xml?rev=1514487&r1=1514486&r2=1514487&view=diff
==============================================================================
---
synapse/trunk/java/modules/documentation/src/site/xdoc/userguide/transports/pass_through.xml
(original)
+++
synapse/trunk/java/modules/documentation/src/site/xdoc/userguide/transports/pass_through.xml
Thu Aug 15 20:53:20 2013
@@ -707,7 +707,7 @@
and updated with new values. Default value
is 50.
</li>
<li>
- CacheDurationMins: Set the time duration
(in minutes) between
+ CacheDurationMins: Sets the time duration
(in minutes) between
two consecutive runs of the CacheManager
task which periodically
performs housekeeping work in each cache.
Default value is 15.
</li>
@@ -751,7 +751,7 @@
<tr>
<td>http.socket.timeout</td>
<td>
- Set the TCP socket timeout in milliseconds
+ Sets the TCP socket timeout in milliseconds
(See <a
href="http://docs.oracle.com/javase/1.5.0/docs/api/java/net/SocketOptions.html#SO_TIMEOUT">SO_TIMEOUT</a>).
<div
class="xmlConf">http.socket.timeout=20000</div>
</td>
@@ -761,7 +761,7 @@
<tr>
<td>http.connection.timeout</td>
<td>
- Set the TCP connection timeout in milliseconds.
This determines the timeout
+ Sets the TCP connection timeout in milliseconds.
This determines the timeout
value for non-blocking connection requests.
Setting this property to
0 disables connection timeout (i.e. no timeout).
<div
class="xmlConf">http.connection.timeout=30000</div>
@@ -798,7 +798,7 @@
<tr>
<td>http.socket.buffer-size</td>
<td>
- Set the size of the I/O session buffers (in bytes)
used by the transport
+ Sets the size of the I/O session buffers (in
bytes) used by the transport
for reading incoming data and writing outgoing
data.
<div
class="xmlConf">http.socket.buffer-size=4096</div>
</td>
@@ -808,7 +808,7 @@
<tr>
<td>http.socket.rcv-buffer-size</td>
<td>
- Set the size of the buffers (in bytes) used by the
underlying platform
+ Sets the size of the buffers (in bytes) used by
the underlying platform
for incoming network I/O. This value is only a
hint. When set, this is a
suggestion to the OS kernel from Synapse about the
size of buffers to
use for the data to be received over the socket
@@ -821,7 +821,7 @@
<tr>
<td>http.socket.snd-buffer-size</td>
<td>
- Set the size of the buffers (in bytes) used by the
underlying platform
+ Sets the size of the buffers (in bytes) used by
the underlying platform
for outgoing network I/O. This value is only a
hint. When set, this is a
suggestion to the OS kernel from Synapse about the
size of buffers to
use for the data to be sent over the socket
@@ -846,7 +846,7 @@
<tr>
<td>http.socket.reuseaddr</td>
<td>
- Set the <a
href="http://docs.oracle.com/javase/1.5.0/docs/api/java/net/SocketOptions.html#SO_REUSEADDR">SO_REUSEADDR</a>
+ Sets the <a
href="http://docs.oracle.com/javase/1.5.0/docs/api/java/net/SocketOptions.html#SO_REUSEADDR">SO_REUSEADDR</a>
socket option for the sockets created by the HTTP
transport. Accepted
values are either 'true' or 'false'.
<div
class="xmlConf">http.socket.reuseaddr=true</div>
@@ -857,7 +857,7 @@
<tr>
<td>http.nio.select-interval</td>
<td>
- Set the time interval in milliseconds at which the
I/O reactor wakes up
+ Sets the time interval in milliseconds at which
the I/O reactor wakes up
to check for timed out sessions and session
requests.
<div
class="xmlConf">http.nio.select-interval=2500</div>
</td>
@@ -867,7 +867,7 @@
<tr>
<td>io_threads_per_reactor <a
name="io_threads_per_reactor"/></td>
<td>
- Set the number of I/O dispatcher threads to be
used by each I/O reactor.
+ Sets the number of I/O dispatcher threads to be
used by each I/O reactor.
Typically, this property controls the ability of
the HTTP transport
to handle concurrent I/O events.
It is recommended that this property is set to the
number of CPU cores
@@ -920,7 +920,7 @@
<tr>
<td>io_buffer_size</td>
<td>
- Set the size of the I/O buffers (in bytes) used as
the pipes between HTTP
+ Sets the size of the I/O buffers (in bytes) used
as the pipes between HTTP
listener and sender. Typically, the HTTP listener
would write the incoming
message data to one of these buffers, and the
sender would read from it to
send the message out.
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java?rev=1514487&r1=1514486&r2=1514487&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
Thu Aug 15 20:53:20 2013
@@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFac
import org.apache.synapse.transport.passthru.config.SourceConfiguration;
import org.apache.synapse.transport.passthru.jmx.LatencyView;
import
org.apache.synapse.transport.passthru.jmx.PassThroughTransportMetricsCollector;
+import org.apache.synapse.transport.passthru.util.PassThroughTransportUtils;
import java.io.IOException;
@@ -202,7 +203,8 @@ public class SourceHandler implements NH
if (!outBuf.hasData() && encoder.isCompleted()) {
// We are done - At this point the entire response
payload has been
// written out to the SimpleOutputBuffer
-
sourceConfiguration.getSourceConnections().releaseConnection(conn);
+
PassThroughTransportUtils.finishUsingSourceConnection(conn.getHttpResponse(),
+ conn,
sourceConfiguration.getSourceConnections());
}
}
return;
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceResponse.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceResponse.java?rev=1514487&r1=1514486&r2=1514487&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceResponse.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceResponse.java
Thu Aug 15 20:53:20 2013
@@ -21,13 +21,13 @@ package org.apache.synapse.transport.pas
import org.apache.http.*;
import org.apache.http.entity.BasicHttpEntity;
-import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.nio.ContentEncoder;
import org.apache.http.nio.NHttpServerConnection;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpCoreContext;
import org.apache.synapse.transport.passthru.config.SourceConfiguration;
+import org.apache.synapse.transport.passthru.util.PassThroughTransportUtils;
import java.io.IOException;
import java.util.HashMap;
@@ -56,9 +56,6 @@ public class SourceResponse {
/** Version of the response */
private ProtocolVersion version = HttpVersion.HTTP_1_1;
- /** Connection strategy */
- private ConnectionReuseStrategy connStrategy = new
DefaultConnectionReuseStrategy();
-
private SourceRequest request = null;
public SourceResponse(SourceConfiguration config, int status,
SourceRequest request) {
@@ -157,23 +154,10 @@ public class SourceResponse {
// Update connection state
if (encoder.isCompleted()) {
SourceContext.updateState(conn, ProtocolState.RESPONSE_DONE);
-
- sourceConfiguration.getMetrics().
-
notifySentMessageSize(conn.getMetrics().getSentBytesCount());
-
- if (!this.connStrategy.keepAlive(response, conn.getContext())) {
- SourceContext.updateState(conn, ProtocolState.CLOSING);
-
sourceConfiguration.getSourceConnections().closeConnection(conn);
- } else if (SourceContext.get(conn).isShutDown()) {
- // we need to shut down if the shutdown flag is set
- SourceContext.updateState(conn, ProtocolState.CLOSING);
-
sourceConfiguration.getSourceConnections().closeConnection(conn);
- } else {
- // Reset connection state
-
sourceConfiguration.getSourceConnections().releaseConnection(conn);
- // Ready to deal with a new request
- conn.requestInput();
- }
+ sourceConfiguration.getMetrics().notifySentMessageSize(
+ conn.getMetrics().getSentBytesCount());
+ PassThroughTransportUtils.finishUsingSourceConnection(response,
conn,
+ sourceConfiguration.getSourceConnections());
}
return bytes;
}
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetResponse.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetResponse.java?rev=1514487&r1=1514486&r2=1514487&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetResponse.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetResponse.java
Thu Aug 15 20:53:20 2013
@@ -21,10 +21,10 @@ package org.apache.synapse.transport.pas
import org.apache.http.*;
import org.apache.http.entity.BasicHttpEntity;
-import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.synapse.transport.passthru.config.TargetConfiguration;
+import org.apache.synapse.transport.passthru.util.PassThroughTransportUtils;
import java.io.IOException;
import java.util.HashMap;
@@ -56,10 +56,6 @@ public class TargetResponse {
/** Protocol version */
private ProtocolVersion version = HttpVersion.HTTP_1_1;
- /** This utility class is used for determining weather we need to close
the connection
- * after submitting the response */
- private ConnectionReuseStrategy connStrategy = new
DefaultConnectionReuseStrategy();
-
/** The connection */
private NHttpClientConnection connection;
@@ -104,19 +100,9 @@ public class TargetResponse {
entity.setChunked(true);
}
response.setEntity(entity);
- } else {
- if (!connStrategy.keepAlive(response, conn.getContext())) {
- try {
- // this is a connection we should not re-use
- TargetContext.updateState(conn, ProtocolState.CLOSING);
-
targetConfiguration.getConnections().shutdownConnection(conn);
-
- } catch (Exception ignore) {
-
- }
- } else {
- targetConfiguration.getConnections().releaseConnection(conn);
- }
+ } else {
+ PassThroughTransportUtils.finishUsingTargetConnection(response,
conn,
+ targetConfiguration.getConnections());
}
}
@@ -140,13 +126,8 @@ public class TargetResponse {
TargetContext.updateState(conn, ProtocolState.RESPONSE_DONE);
targetConfiguration.getMetrics().notifyReceivedMessageSize(
conn.getMetrics().getReceivedBytesCount());
-
- if (!this.connStrategy.keepAlive(response, conn.getContext())) {
- TargetContext.updateState(conn, ProtocolState.CLOSED);
- targetConfiguration.getConnections().shutdownConnection(conn);
- } else {
- targetConfiguration.getConnections().releaseConnection(conn);
- }
+ PassThroughTransportUtils.finishUsingTargetConnection(response,
conn,
+ targetConfiguration.getConnections());
}
return bytes;
}
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java?rev=1514487&r1=1514486&r2=1514487&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java
Thu Aug 15 20:53:20 2013
@@ -104,7 +104,31 @@ public class TargetConnections {
}
/**
- * This connection is no longer valid. So we need to shutdownConnection
connection.
+ * This connection is no longer needed. So we need to close connection.
+ *
+ * @param conn connection to shutdownConnection
+ */
+ public void closeConnection(NHttpClientConnection conn) {
+ HostConnections pool = (HostConnections)
conn.getContext().getAttribute(
+ PassThroughConstants.CONNECTION_POOL);
+
+ TargetContext.get(conn).reset();
+
+ if (pool != null) {
+ pool.forget(conn);
+ } else {
+ // we shouldn't get here
+ log.fatal("Connection without a pool. Something wrong. Need to
fix.");
+ }
+
+ try {
+ conn.close();
+ } catch (IOException ignored) {
+ }
+ }
+
+ /**
+ * This connection is no longer valid. So we need to shutdown connection.
*
* @param conn connection to shutdownConnection
*/
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/PassThroughTransportUtils.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/PassThroughTransportUtils.java?rev=1514487&r1=1514486&r2=1514487&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/PassThroughTransportUtils.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/PassThroughTransportUtils.java
Thu Aug 15 20:53:20 2013
@@ -24,11 +24,21 @@ import org.apache.axis2.addressing.Endpo
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.Constants;
import org.apache.axis2.transport.TransportUtils;
+import org.apache.http.ConnectionReuseStrategy;
+import org.apache.http.HttpResponse;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.nio.NHttpClientConnection;
+import org.apache.http.nio.NHttpServerConnection;
import org.apache.http.protocol.HTTP;
import org.apache.http.HttpStatus;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.transport.passthru.PassThroughConstants;
+import org.apache.synapse.transport.passthru.ProtocolState;
+import org.apache.synapse.transport.passthru.SourceContext;
+import org.apache.synapse.transport.passthru.TargetContext;
+import org.apache.synapse.transport.passthru.connections.SourceConnections;
+import org.apache.synapse.transport.passthru.connections.TargetConnections;
import java.net.InetAddress;
import java.util.Map;
@@ -41,6 +51,8 @@ public class PassThroughTransportUtils {
private static final Log log =
LogFactory.getLog(PassThroughTransportUtils.class);
+ private static final ConnectionReuseStrategy connStrategy = new
DefaultConnectionReuseStrategy();
+
/**
* This method tries to determine the hostname of the given InetAddress
without
* triggering a reverse DNS lookup. {@link
java.net.InetAddress#getHostName()}
@@ -212,4 +224,31 @@ public class PassThroughTransportUtils {
PassThroughConstants.MESSAGE_BUILDER_INVOKED));
}
+ public static void finishUsingSourceConnection(HttpResponse response,
+ NHttpServerConnection conn,
+ SourceConnections
connections) {
+ if (!connStrategy.keepAlive(response, conn.getContext()) ||
+ SourceContext.get(conn).isShutDown()) {
+ SourceContext.updateState(conn, ProtocolState.CLOSING);
+ connections.closeConnection(conn);
+ } else {
+ // Reset connection state
+ connections.releaseConnection(conn);
+ // Ready to deal with a new request
+ conn.requestInput();
+ }
+ }
+
+ public static void finishUsingTargetConnection(HttpResponse response,
+ NHttpClientConnection conn,
+ TargetConnections
connections) {
+ if (!connStrategy.keepAlive(response, conn.getContext())) {
+ // this is a connection we should not re-use
+ TargetContext.updateState(conn, ProtocolState.CLOSING);
+ connections.closeConnection(conn);
+ } else {
+ connections.releaseConnection(conn);
+ }
+ }
+
}