Author: hiranya
Date: Wed Aug 14 21:35:25 2013
New Revision: 1514059

URL: http://svn.apache.org/r1514059
Log:
Handling the endOfInput property in the PT SourceHandler - Otherwise JMX stats 
get messed up.

Modified:
    
synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/registry/DynamicResourceTest.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/jmx/LatencyView.java

Modified: 
synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/registry/DynamicResourceTest.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/registry/DynamicResourceTest.java?rev=1514059&r1=1514058&r2=1514059&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/registry/DynamicResourceTest.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/registry/DynamicResourceTest.java
 Wed Aug 14 21:35:25 2013
@@ -101,7 +101,7 @@ public class DynamicResourceTest extends
         System.out.println("Testing advanced sequence caching...");
         synCtx = TestUtils.createLightweightSynapseMessageContext("<empty/>", 
config);
         System.out.println("Waiting for the cache to expire...");
-        Thread.sleep(8500L);
+        Thread.sleep(10000L);
         Mediator seq3 = synCtx.getSequence(KEY_DYNAMIC_SEQUENCE_1);
         assertNotNull(seq3);
         assertTrue(((SequenceMediator) seq3).isInitialized());
@@ -114,7 +114,7 @@ public class DynamicResourceTest extends
         System.out.println("Testing sequence reloading...");
         registry.updateResource(KEY_DYNAMIC_SEQUENCE_1, 
TestUtils.createOMElement(DYNAMIC_SEQUENCE_2));
         System.out.println("Waiting for the cache to expire...");
-        Thread.sleep(8500L);
+        Thread.sleep(10000L);
         synCtx = TestUtils.createLightweightSynapseMessageContext("<empty/>", 
config);
         Mediator seq4 = synCtx.getSequence(KEY_DYNAMIC_SEQUENCE_1);
         assertNotNull(seq4);
@@ -158,7 +158,7 @@ public class DynamicResourceTest extends
         System.out.println("Testing advanced endpoint caching...");
         synCtx = TestUtils.createSynapseMessageContext("<empty/>", config);
         System.out.println("Waiting for the cache to expire...");
-        Thread.sleep(8500L);
+        Thread.sleep(10000L);
         Endpoint ep3 = synCtx.getEndpoint(KEY_DYNAMIC_ENDPOINT_1);
         assertNotNull(ep3);
         assertEquals(1, registry.getHitCount());
@@ -168,7 +168,7 @@ public class DynamicResourceTest extends
         System.out.println("Testing endpoint reloading...");
         registry.updateResource(KEY_DYNAMIC_ENDPOINT_1, 
TestUtils.createOMElement(DYNAMIC_ENDPOINT_2));
         System.out.println("Waiting for the cache to expire...");
-        Thread.sleep(8500L);
+        Thread.sleep(10000L);
         synCtx = TestUtils.createSynapseMessageContext("<empty/>", config);
         Endpoint ep4 = synCtx.getEndpoint(KEY_DYNAMIC_ENDPOINT_1);
         assertNotNull(ep4);

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java?rev=1514059&r1=1514058&r2=1514059&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
 Wed Aug 14 21:35:25 2013
@@ -78,7 +78,11 @@ public class SourceContext {
     public void reset() {
         this.request = null;
         this.response = null;
-        this.state = ProtocolState.REQUEST_READY;
+        if (this.state != ProtocolState.CLOSED) {
+            // if the connection is not closed yet, prepare to receive a new 
request
+            // on it
+            this.state = ProtocolState.REQUEST_READY;
+        }
 
         if (writer != null) {
             ByteBuffer buffer = writer.getBuffer();

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java?rev=1514059&r1=1514058&r2=1514059&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
 Wed Aug 14 21:35:25 2013
@@ -250,16 +250,31 @@ public class SourceHandler implements NH
             metrics.incrementBytesSent(bytesSent);
         } catch (IOException e) {
             logIOException(e);
-
             informWriterError(conn);
-
             SourceContext.updateState(conn, ProtocolState.CLOSING);
             
sourceConfiguration.getSourceConnections().shutDownConnection(conn);
         } 
     }
 
     public void endOfInput(NHttpServerConnection conn) throws IOException {
-        closed(conn);
+        ProtocolState state = SourceContext.getState(conn);
+
+        if (state == ProtocolState.REQUEST_READY || state == 
ProtocolState.RESPONSE_DONE) {
+            if (log.isDebugEnabled()) {
+                log.debug("Keep-Alive connection was closed at the remote end: 
" + conn);
+            }
+        } else if (state == ProtocolState.REQUEST_BODY || state == 
ProtocolState.REQUEST_HEAD) {
+            informReaderError(conn);
+            log.warn("Connection closed at the remote end while reading the 
request: " + conn);
+        } else if (state == ProtocolState.RESPONSE_BODY || state == 
ProtocolState.RESPONSE_HEAD) {
+            informWriterError(conn);
+            log.warn("Connection closed at the remote end while writing the 
response: " + conn);
+        } else if (state == ProtocolState.REQUEST_DONE) {
+            log.warn("Connection closed by the client after request is read: " 
+ conn);
+        }
+
+        SourceContext.updateState(conn, ProtocolState.CLOSED);
+        sourceConfiguration.getSourceConnections().shutDownConnection(conn);
     }
 
     public void exception(NHttpServerConnection conn, Exception e) {
@@ -361,10 +376,9 @@ public class SourceHandler implements NH
         try {
             sourceConfiguration.getHttpProcessor().process(response, 
httpContext);
             conn.submitResponse(response);
-            SourceContext.updateState(conn, ProtocolState.CLOSED);
-            conn.close();
         } catch (Exception ex) {
             log.error("Error while handling HttpException", ex);
+        } finally {
             SourceContext.updateState(conn, ProtocolState.CLOSED);
             
sourceConfiguration.getSourceConnections().shutDownConnection(conn);
         }
@@ -403,12 +417,10 @@ public class SourceHandler implements NH
             if (log.isDebugEnabled()) {
                 log.debug("Keep-Alive connection was closed: " + conn);
             }
-        } else if (state == ProtocolState.REQUEST_BODY ||
-                state == ProtocolState.REQUEST_HEAD) {
+        } else if (state == ProtocolState.REQUEST_BODY || state == 
ProtocolState.REQUEST_HEAD) {
             informReaderError(conn);
             log.warn("Connection closed while reading the request: " + conn);
-        } else if (state == ProtocolState.RESPONSE_BODY ||
-                state == ProtocolState.RESPONSE_HEAD) {
+        } else if (state == ProtocolState.RESPONSE_BODY || state == 
ProtocolState.RESPONSE_HEAD) {
             informWriterError(conn);
             log.warn("Connection closed while writing the response: " + conn);
         } else if (state == ProtocolState.REQUEST_DONE) {
@@ -416,9 +428,10 @@ public class SourceHandler implements NH
         }
 
         metrics.disconnected();
-
-        SourceContext.updateState(conn, ProtocolState.CLOSED);
-        sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+        if (state != ProtocolState.CLOSED) {
+            SourceContext.updateState(conn, ProtocolState.CLOSED);
+            
sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+        }
     }
 
     private void handleInvalidState(NHttpServerConnection conn, String action) 
{

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/jmx/LatencyView.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/jmx/LatencyView.java?rev=1514059&r1=1514058&r2=1514059&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/jmx/LatencyView.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/jmx/LatencyView.java
 Wed Aug 14 21:35:25 2013
@@ -46,7 +46,7 @@ import java.util.concurrent.atomic.Atomi
  *
  * <ul>
  *  <li>t1 - Receiving a new request (ServerHandler#requestReceived)</li>
- *  <li>t2 - Obtaining a connection to forward the request 
(Clienthandler#processConnection)</li>
+ *  <li>t2 - Obtaining a connection to forward the request 
(ClientHandler#processConnection)</li>
  *  <li>t3 - Reading the complete response from the backend server 
(ClientHandler#inputReady)</li>
  *  <li>t4 - Writing the complete response to the client 
(ServerHandler#outputReady)</li>
  * <ul>
@@ -58,7 +58,7 @@ import java.util.concurrent.atomic.Atomi
  */
 public class LatencyView implements LatencyViewMBean {
 
-    private static final String NHTTP_LATENCY_VIEW = "NhttpTransportLatency";
+    private static final String NHTTP_LATENCY_VIEW = 
"PassThroughTransportLatency";
 
     private static final int SMALL_DATA_COLLECTION_PERIOD = 5;
     private static final int LARGE_DATA_COLLECTION_PERIOD = 5 * 60;
@@ -124,7 +124,7 @@ public class LatencyView implements Late
      *
      * @param reqArrival The request arrival time
      * @param reqDeparture The request departure time (backend connection 
establishment)
-     * @param resArrival The resoponse arrival time
+     * @param resArrival The response arrival time
      * @param resDeparture The response departure time
      */
     public void notifyTimes(long reqArrival, long reqDeparture,


Reply via email to