Author: supun
Date: Thu Oct  7 06:25:32 2010
New Revision: 1005337

URL: http://svn.apache.org/viewvc?rev=1005337&view=rev
Log:
adding the capability to force content length

Modified:
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java?rev=1005337&r1=1005336&r2=1005337&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
 Thu Oct  7 06:25:32 2010
@@ -40,6 +40,7 @@ import org.apache.axis2.util.JavaUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.http.*;
+import org.apache.http.entity.BasicHttpEntity;
 import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
 import org.apache.http.impl.nio.reactor.SSLIOSessionHandler;
 import org.apache.http.nio.NHttpClientConnection;
@@ -60,6 +61,7 @@ import org.apache.synapse.transport.nhtt
 import 
org.apache.synapse.transport.nhttp.util.MessageFormatterDecoratorFactory;
 import org.apache.synapse.transport.nhttp.util.NhttpUtil;
 import org.apache.synapse.transport.nhttp.util.NhttpMetricsCollector;
+import org.apache.synapse.commons.util.TemporaryData;
 
 import javax.net.ssl.SSLContext;
 import java.io.IOException;
@@ -458,9 +460,11 @@ public class HttpCoreNIOSender extends A
      */
     private void sendAsyncResponse(MessageContext msgContext) throws AxisFault 
{
 
+        int contentLength = extractContentLength(msgContext);
+
         // remove unwanted HTTP headers (if any from the current message)
         removeUnwantedHeaders(msgContext);
-        
+
         ServerWorker worker = (ServerWorker) 
msgContext.getProperty(Constants.OUT_TRANSPORT_INFO);
         HttpResponse response = worker.getResponse();
 
@@ -488,31 +492,24 @@ public class HttpCoreNIOSender extends A
             }
         }
 
-        if 
(JavaUtils.isTrueExplicitly(worker.getConn().getContext().getAttribute("forceClosing")))
 {
-            HttpRequest req = (HttpRequest)
-                    worker.getConn().getContext().getAttribute("http.request");
-            req.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);
-        }
-
-        // pass ClientConnectionDebug to the Server side
-        ServerConnectionDebug scd = (ServerConnectionDebug)
-                
worker.getConn().getContext().getAttribute(ServerHandler.SERVER_CONNECTION_DEBUG);
-        ClientConnectionDebug ccd = (ClientConnectionDebug)
-            msgContext.getProperty(ClientHandler.CLIENT_CONNECTION_DEBUG);
-
-        if (scd != null && ccd != null) {
-            scd.setClientConnectionDebug(ccd);
-        } else if (scd == null && ccd != null) {
-            scd = ccd.getServerConnectionDebug();
-            scd.setClientConnectionDebug(ccd);
-        }
+        boolean forceContentLength = msgContext.isPropertyTrue(
+                NhttpConstants.FORCE_HTTP_CONTENT_LENGTH);
+        boolean forceContentLengthCopy = msgContext.isPropertyTrue(
+                NhttpConstants.COPY_CONTENT_LENGTH_FROM_INCOMING);
 
-        if (scd != null) {
-            scd.recordResponseStartTime();
-        }
+        BasicHttpEntity entity = (BasicHttpEntity) response.getEntity();
 
         MetricsCollector lstMetrics = worker.getServiceHandler().getMetrics();
         try {
+            if (forceContentLength) {
+                entity.setChunked(false);
+                if (forceContentLengthCopy && contentLength > 0) {
+                    entity.setContentLength(contentLength);
+                } else {
+                    setStreamAsTempData(entity, messageFormatter, msgContext, 
format);
+                }
+            }
+
             worker.getServiceHandler().commitResponse(worker.getConn(), 
response);
             
lstMetrics.reportResponseCode(response.getStatusLine().getStatusCode());
             OutputStream out = worker.getOutputStream();
@@ -525,41 +522,49 @@ public class HttpCoreNIOSender extends A
                 || Boolean.TRUE == noEntityBody) {
                 out.write(new byte[0]);
             } else {
-                messageFormatter.writeTo(msgContext, format, out, false);
+                if (forceContentLength) {
+                    if (forceContentLengthCopy && contentLength > 0) {
+                        messageFormatter.writeTo(msgContext, format, out, 
false);
+                    } else {
+                        writeMessageFromTempData(out, msgContext);
+                    }
+                } else {
+                    messageFormatter.writeTo(msgContext, format, out, false);
+                }
             }
             out.close();
-            lstMetrics.incrementMessagesSent();
+            if (lstMetrics != null) {
+                lstMetrics.incrementMessagesSent();
+            }
 
         } catch (HttpException e) {
             if (lstMetrics != null) {
                 lstMetrics.incrementFaultsSending();
             }
             handleException("Unexpected HTTP protocol error sending response 
to : " +
-                worker.getRemoteAddress() + "\n" + scd.dump(), e);
+                worker.getRemoteAddress(), e);
         } catch (ConnectionClosedException e) {
             if (lstMetrics != null) {
                 lstMetrics.incrementFaultsSending();
             }
-            log.warn("Connection closed by client : "
-                    + worker.getRemoteAddress() + "\n" + scd.dump());
+            log.warn("Connection closed by client : " + 
worker.getRemoteAddress());
         } catch (IllegalStateException e) {
             if (lstMetrics != null) {
                 lstMetrics.incrementFaultsSending();
             }
-            log.warn("Connection closed by client : "
-                    + worker.getRemoteAddress() + "\n" + scd.dump());
+            log.warn("Connection closed by client : " + 
worker.getRemoteAddress());
         } catch (IOException e) {
             if (lstMetrics != null) {
                 lstMetrics.incrementFaultsSending();
             }
             handleException("IO Error sending response message to : " +
-                worker.getRemoteAddress() + "\n" + scd.dump(), e);
+                worker.getRemoteAddress(), e);
         } catch (Exception e) {
             if (lstMetrics != null) {
                 lstMetrics.incrementFaultsSending();
             }
             handleException("General Error sending response message to : " +
-                worker.getRemoteAddress() + "\n" + scd.dump(), e);
+                worker.getRemoteAddress(), e);
         }
 
         InputStream is = worker.getIs();
@@ -571,6 +576,76 @@ public class HttpCoreNIOSender extends A
     }
 
     /**
+     * Extract the content length from the incoming message
+     *
+     * @param msgContext current MessageContext
+     * @return the length of the message
+     */
+    private int extractContentLength(MessageContext msgContext) {
+        Map headers = (Map) 
msgContext.getProperty(MessageContext.TRANSPORT_HEADERS);
+
+        if (headers == null || headers.isEmpty()) {
+            return -1;
+        }
+
+        for (Object o : headers.keySet()) {
+            String headerName = (String) o;
+            if (HTTP.CONTENT_LEN.equalsIgnoreCase(headerName)) {
+                Object value = headers.get(headerName);
+
+                if (value != null && value instanceof String) {
+                    try {
+                        return Integer.parseInt((String) value);
+                    } catch (NumberFormatException e) {
+                        return -1;
+                    }
+                }
+            }
+        }
+
+        return -1;
+    }
+
+    /**
+     * Write the stream to a temporary storage and calculate the content length
+     * @param entity HTTPEntity
+     * @param messageFormatter message formatter
+     * @param msgContext current message context
+     * @param format message format
+     * @throws IOException if an exception occurred while writing data
+     */
+    private void setStreamAsTempData(BasicHttpEntity entity, MessageFormatter 
messageFormatter,
+                                     MessageContext msgContext, OMOutputFormat 
format)
+            throws IOException {
+        TemporaryData serialized = new TemporaryData(256, 4096, "http-nio_", 
".dat");
+        OutputStream out = serialized.getOutputStream();
+        try {
+            messageFormatter.writeTo(msgContext, format, out, true);
+        } finally {
+            out.close();
+        }
+        msgContext.setProperty(NhttpConstants.SERIALIZED_BYTES, serialized);
+        entity.setContentLength(serialized.getLength());
+    }
+
+    /**
+     * Take the data from temporary storage and write it to the output stream
+     * @param out output stream output stream
+     * @param msgContext messagecontext
+     * @throws IOException if an exception occurred while writing data
+     */
+    private void writeMessageFromTempData(OutputStream out, MessageContext 
msgContext)
+            throws IOException {
+        TemporaryData serialized =
+                (TemporaryData) 
msgContext.getProperty(NhttpConstants.SERIALIZED_BYTES);
+        try {
+            serialized.writeTo(out);
+        } finally {
+            serialized.release();
+        }
+    }
+
+    /**
      * Determine the HttpStatusCodedepending on the message type processed <br>
      * (normal response versus fault response) as well as Axis2 message 
context properties set
      * via Synapse configuration or MessageBuilders.

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java?rev=1005337&r1=1005336&r2=1005337&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
 Thu Oct  7 06:25:32 2010
@@ -30,6 +30,9 @@ public class NhttpConstants {
     public static final String SC_ACCEPTED = "SC_ACCEPTED";
     public static final String HTTP_SC = "HTTP_SC";
     public static final String FORCE_HTTP_1_0 = "FORCE_HTTP_1.0";
+    public static final String FORCE_HTTP_CONTENT_LENGTH = 
"FORCE_HTTP_CONTENT_LENGTH";
+    public static final String COPY_CONTENT_LENGTH_FROM_INCOMING =
+            "COPY_CONTENT_LENGTH_FROM_INCOMING";
     public static final String DISABLE_CHUNKING = "DISABLE_CHUNKING";
     public static final String POST_TO_URI = "POST_TO_URI";
     public static final String NO_KEEPALIVE = "NO_KEEPALIVE";


Reply via email to