Author: chirino
Date: Mon Oct 18 18:53:00 2010
New Revision: 1023946

URL: http://svn.apache.org/viewvc?rev=1023946&view=rev
Log:
Disable solinger in tcp connections so they shutdown fast once closed. Stomp 
protocol handler now adds a delay when it it sends an error to the client 
before closing out the connection.

Modified:
    
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    
activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java

Modified: 
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1023946&r1=1023945&r2=1023946&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
 Mon Oct 18 18:53:00 2010
@@ -34,6 +34,7 @@ import org.apache.activemq.apollo.transp
 import org.apache.activemq.apollo.store._
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.dto.{BindingDTO, 
DurableSubscriptionBindingDTO, PointToPointBindingDTO}
+import java.util.concurrent.TimeUnit
 
 /**
  * @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
@@ -592,9 +593,11 @@ class StompProtocolHandler extends Proto
     if( !connection.stopped ) {
       connection.transport.suspendRead
       connection.transport.offer(StompFrame(ERROR, headers, 
BufferContent(ascii(explained))) )
-      ^ {
+      // TODO: if there are too many open connections we should just close the 
connection
+      // without waiting for the error to get sent to the client.
+      queue.after(5, TimeUnit.SECONDS) {
         connection.stop()
-      } >>: queue
+      }
     }
   }
 

Modified: 
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1023946&r1=1023945&r2=1023946&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
 Mon Oct 18 18:53:00 2010
@@ -17,13 +17,27 @@
 package org.apache.activemq.apollo.stomp
 
 import org.scalatest.matchers.ShouldMatchers
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
 import org.apache.activemq.apollo.util.FunSuiteSupport
 import org.apache.activemq.apollo.broker.{Broker, BrokerFactory}
 
-class StompTest extends FunSuiteSupport with ShouldMatchers {
+class StompTestSupport extends FunSuiteSupport with ShouldMatchers {
   var broker: Broker = null
 
+  override protected def beforeAll() = {
+    val uri = "xml:classpath:activemq-stomp.xml"
+    info("Loading broker configuration from the classpath with URI: " + uri)
+    broker = BrokerFactory.createBroker(uri, true)
+    Thread.sleep(1000); //TODO implement waitUntilStarted
+  }
+
+  override protected def afterAll() = {
+    broker.stop
+  }
+
+
+}
+
+class Stomp10Test extends StompTestSupport {
 
   test("Stomp 1.0 CONNECT") {
     val client = new StompClient
@@ -38,6 +52,9 @@ class StompTest extends FunSuiteSupport 
     frame should include("version:1.0\n")
   }
 
+}
+
+class Stomp11Test extends StompTestSupport {
 
   test("Stomp 1.1 CONNECT") {
     val client = new StompClient
@@ -69,7 +86,7 @@ class StompTest extends FunSuiteSupport 
     frame should include("version:1.1\n")
   }
 
-  test("Stomp 1.1 CONNECT /w Version Fallback") {
+  test("Stomp 1.1 CONNECT /w valid version fallback") {
     val client = new StompClient
     client.open("localhost", 61613)
 
@@ -84,30 +101,33 @@ class StompTest extends FunSuiteSupport 
     frame should include("version:1.0\n")
   }
 
-  test("Stomp CONNECT /w invalid virtual host") {
+  test("Stomp 1.1 CONNECT /w invalid version fallback") {
     val client = new StompClient
     client.open("localhost", 61613)
 
     client.send(
       "CONNECT\n" +
-      "accept-version:1.0,1.1\n" +
-      "host:invalid\n" +
+      "accept-version:9.0,10.0\n" +
+      "host:default\n" +
       "\n")
     val frame = client.receive()
     frame should startWith("ERROR\n")
+    frame should include regex("""version:.+?\n""")
     frame should include regex("""message:.+?\n""")
   }
 
+  test("Stomp CONNECT /w invalid virtual host") {
+    val client = new StompClient
+    client.open("localhost", 61613)
 
-  override protected def beforeAll() = {
-    val uri = "xml:classpath:activemq-stomp.xml"
-    info("Loading broker configuration from the classpath with URI: " + uri)
-    broker = BrokerFactory.createBroker(uri, true)
-    Thread.sleep(1000); //TODO implement waitUntilStarted
-  }
-
-  override protected def afterAll() = {
-    broker.stop
+    client.send(
+      "CONNECT\n" +
+      "accept-version:1.0,1.1\n" +
+      "host:invalid\n" +
+      "\n")
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include regex("""message:.+?\n""")
   }
 
 }
\ No newline at end of file

Modified: 
activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java?rev=1023946&r1=1023945&r2=1023946&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
 Mon Oct 18 18:53:00 2010
@@ -201,6 +201,8 @@ public class TcpTransport extends JavaBa
 
         this.channel.configureBlocking(false);
         this.remoteAddress = 
channel.socket().getRemoteSocketAddress().toString();
+        channel.socket().setSoLinger(true, 0);
+
         this.socketState = new CONNECTED();
     }
 


Reply via email to