Author: supun
Date: Thu Oct 7 06:25:32 2010
New Revision: 1005337
URL: http://svn.apache.org/viewvc?rev=1005337&view=rev
Log:
adding the capability to force content length
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java?rev=1005337&r1=1005336&r2=1005337&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
Thu Oct 7 06:25:32 2010
@@ -40,6 +40,7 @@ import org.apache.axis2.util.JavaUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.*;
+import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.SSLIOSessionHandler;
import org.apache.http.nio.NHttpClientConnection;
@@ -60,6 +61,7 @@ import org.apache.synapse.transport.nhtt
import
org.apache.synapse.transport.nhttp.util.MessageFormatterDecoratorFactory;
import org.apache.synapse.transport.nhttp.util.NhttpUtil;
import org.apache.synapse.transport.nhttp.util.NhttpMetricsCollector;
+import org.apache.synapse.commons.util.TemporaryData;
import javax.net.ssl.SSLContext;
import java.io.IOException;
@@ -458,9 +460,11 @@ public class HttpCoreNIOSender extends A
*/
private void sendAsyncResponse(MessageContext msgContext) throws AxisFault
{
+ int contentLength = extractContentLength(msgContext);
+
// remove unwanted HTTP headers (if any from the current message)
removeUnwantedHeaders(msgContext);
-
+
ServerWorker worker = (ServerWorker)
msgContext.getProperty(Constants.OUT_TRANSPORT_INFO);
HttpResponse response = worker.getResponse();
@@ -488,31 +492,24 @@ public class HttpCoreNIOSender extends A
}
}
- if
(JavaUtils.isTrueExplicitly(worker.getConn().getContext().getAttribute("forceClosing")))
{
- HttpRequest req = (HttpRequest)
- worker.getConn().getContext().getAttribute("http.request");
- req.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);
- }
-
- // pass ClientConnectionDebug to the Server side
- ServerConnectionDebug scd = (ServerConnectionDebug)
-
worker.getConn().getContext().getAttribute(ServerHandler.SERVER_CONNECTION_DEBUG);
- ClientConnectionDebug ccd = (ClientConnectionDebug)
- msgContext.getProperty(ClientHandler.CLIENT_CONNECTION_DEBUG);
-
- if (scd != null && ccd != null) {
- scd.setClientConnectionDebug(ccd);
- } else if (scd == null && ccd != null) {
- scd = ccd.getServerConnectionDebug();
- scd.setClientConnectionDebug(ccd);
- }
+ boolean forceContentLength = msgContext.isPropertyTrue(
+ NhttpConstants.FORCE_HTTP_CONTENT_LENGTH);
+ boolean forceContentLengthCopy = msgContext.isPropertyTrue(
+ NhttpConstants.COPY_CONTENT_LENGTH_FROM_INCOMING);
- if (scd != null) {
- scd.recordResponseStartTime();
- }
+ BasicHttpEntity entity = (BasicHttpEntity) response.getEntity();
MetricsCollector lstMetrics = worker.getServiceHandler().getMetrics();
try {
+ if (forceContentLength) {
+ entity.setChunked(false);
+ if (forceContentLengthCopy && contentLength > 0) {
+ entity.setContentLength(contentLength);
+ } else {
+ setStreamAsTempData(entity, messageFormatter, msgContext,
format);
+ }
+ }
+
worker.getServiceHandler().commitResponse(worker.getConn(),
response);
lstMetrics.reportResponseCode(response.getStatusLine().getStatusCode());
OutputStream out = worker.getOutputStream();
@@ -525,41 +522,49 @@ public class HttpCoreNIOSender extends A
|| Boolean.TRUE == noEntityBody) {
out.write(new byte[0]);
} else {
- messageFormatter.writeTo(msgContext, format, out, false);
+ if (forceContentLength) {
+ if (forceContentLengthCopy && contentLength > 0) {
+ messageFormatter.writeTo(msgContext, format, out,
false);
+ } else {
+ writeMessageFromTempData(out, msgContext);
+ }
+ } else {
+ messageFormatter.writeTo(msgContext, format, out, false);
+ }
}
out.close();
- lstMetrics.incrementMessagesSent();
+ if (lstMetrics != null) {
+ lstMetrics.incrementMessagesSent();
+ }
} catch (HttpException e) {
if (lstMetrics != null) {
lstMetrics.incrementFaultsSending();
}
handleException("Unexpected HTTP protocol error sending response
to : " +
- worker.getRemoteAddress() + "\n" + scd.dump(), e);
+ worker.getRemoteAddress(), e);
} catch (ConnectionClosedException e) {
if (lstMetrics != null) {
lstMetrics.incrementFaultsSending();
}
- log.warn("Connection closed by client : "
- + worker.getRemoteAddress() + "\n" + scd.dump());
+ log.warn("Connection closed by client : " +
worker.getRemoteAddress());
} catch (IllegalStateException e) {
if (lstMetrics != null) {
lstMetrics.incrementFaultsSending();
}
- log.warn("Connection closed by client : "
- + worker.getRemoteAddress() + "\n" + scd.dump());
+ log.warn("Connection closed by client : " +
worker.getRemoteAddress());
} catch (IOException e) {
if (lstMetrics != null) {
lstMetrics.incrementFaultsSending();
}
handleException("IO Error sending response message to : " +
- worker.getRemoteAddress() + "\n" + scd.dump(), e);
+ worker.getRemoteAddress(), e);
} catch (Exception e) {
if (lstMetrics != null) {
lstMetrics.incrementFaultsSending();
}
handleException("General Error sending response message to : " +
- worker.getRemoteAddress() + "\n" + scd.dump(), e);
+ worker.getRemoteAddress(), e);
}
InputStream is = worker.getIs();
@@ -571,6 +576,76 @@ public class HttpCoreNIOSender extends A
}
/**
+ * Extract the content length from the incoming message
+ *
+ * @param msgContext current MessageContext
+ * @return the length of the message
+ */
+ private int extractContentLength(MessageContext msgContext) {
+ Map headers = (Map)
msgContext.getProperty(MessageContext.TRANSPORT_HEADERS);
+
+ if (headers == null || headers.isEmpty()) {
+ return -1;
+ }
+
+ for (Object o : headers.keySet()) {
+ String headerName = (String) o;
+ if (HTTP.CONTENT_LEN.equalsIgnoreCase(headerName)) {
+ Object value = headers.get(headerName);
+
+ if (value != null && value instanceof String) {
+ try {
+ return Integer.parseInt((String) value);
+ } catch (NumberFormatException e) {
+ return -1;
+ }
+ }
+ }
+ }
+
+ return -1;
+ }
+
+ /**
+ * Write the stream to a temporary storage and calculate the content length
+ * @param entity HTTPEntity
+ * @param messageFormatter message formatter
+ * @param msgContext current message context
+ * @param format message format
+ * @throws IOException if an exception occurred while writing data
+ */
+ private void setStreamAsTempData(BasicHttpEntity entity, MessageFormatter
messageFormatter,
+ MessageContext msgContext, OMOutputFormat
format)
+ throws IOException {
+ TemporaryData serialized = new TemporaryData(256, 4096, "http-nio_",
".dat");
+ OutputStream out = serialized.getOutputStream();
+ try {
+ messageFormatter.writeTo(msgContext, format, out, true);
+ } finally {
+ out.close();
+ }
+ msgContext.setProperty(NhttpConstants.SERIALIZED_BYTES, serialized);
+ entity.setContentLength(serialized.getLength());
+ }
+
+ /**
+ * Take the data from temporary storage and write it to the output stream
+ * @param out output stream output stream
+ * @param msgContext messagecontext
+ * @throws IOException if an exception occurred while writing data
+ */
+ private void writeMessageFromTempData(OutputStream out, MessageContext
msgContext)
+ throws IOException {
+ TemporaryData serialized =
+ (TemporaryData)
msgContext.getProperty(NhttpConstants.SERIALIZED_BYTES);
+ try {
+ serialized.writeTo(out);
+ } finally {
+ serialized.release();
+ }
+ }
+
+ /**
* Determine the HttpStatusCodedepending on the message type processed <br>
* (normal response versus fault response) as well as Axis2 message
context properties set
* via Synapse configuration or MessageBuilders.
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java?rev=1005337&r1=1005336&r2=1005337&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
Thu Oct 7 06:25:32 2010
@@ -30,6 +30,9 @@ public class NhttpConstants {
public static final String SC_ACCEPTED = "SC_ACCEPTED";
public static final String HTTP_SC = "HTTP_SC";
public static final String FORCE_HTTP_1_0 = "FORCE_HTTP_1.0";
+ public static final String FORCE_HTTP_CONTENT_LENGTH =
"FORCE_HTTP_CONTENT_LENGTH";
+ public static final String COPY_CONTENT_LENGTH_FROM_INCOMING =
+ "COPY_CONTENT_LENGTH_FROM_INCOMING";
public static final String DISABLE_CHUNKING = "DISABLE_CHUNKING";
public static final String POST_TO_URI = "POST_TO_URI";
public static final String NO_KEEPALIVE = "NO_KEEPALIVE";