Author: chirino
Date: Mon Oct 18 18:53:00 2010
New Revision: 1023946
URL: http://svn.apache.org/viewvc?rev=1023946&view=rev
Log:
Disable solinger in tcp connections so they shutdown fast once closed. Stomp
protocol handler now adds a delay when it it sends an error to the client
before closing out the connection.
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1023946&r1=1023945&r2=1023946&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Mon Oct 18 18:53:00 2010
@@ -34,6 +34,7 @@ import org.apache.activemq.apollo.transp
import org.apache.activemq.apollo.store._
import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.dto.{BindingDTO,
DurableSubscriptionBindingDTO, PointToPointBindingDTO}
+import java.util.concurrent.TimeUnit
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -592,9 +593,11 @@ class StompProtocolHandler extends Proto
if( !connection.stopped ) {
connection.transport.suspendRead
connection.transport.offer(StompFrame(ERROR, headers,
BufferContent(ascii(explained))) )
- ^ {
+ // TODO: if there are too many open connections we should just close the
connection
+ // without waiting for the error to get sent to the client.
+ queue.after(5, TimeUnit.SECONDS) {
connection.stop()
- } >>: queue
+ }
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1023946&r1=1023945&r2=1023946&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Mon Oct 18 18:53:00 2010
@@ -17,13 +17,27 @@
package org.apache.activemq.apollo.stomp
import org.scalatest.matchers.ShouldMatchers
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.activemq.apollo.util.FunSuiteSupport
import org.apache.activemq.apollo.broker.{Broker, BrokerFactory}
-class StompTest extends FunSuiteSupport with ShouldMatchers {
+class StompTestSupport extends FunSuiteSupport with ShouldMatchers {
var broker: Broker = null
+ override protected def beforeAll() = {
+ val uri = "xml:classpath:activemq-stomp.xml"
+ info("Loading broker configuration from the classpath with URI: " + uri)
+ broker = BrokerFactory.createBroker(uri, true)
+ Thread.sleep(1000); //TODO implement waitUntilStarted
+ }
+
+ override protected def afterAll() = {
+ broker.stop
+ }
+
+
+}
+
+class Stomp10Test extends StompTestSupport {
test("Stomp 1.0 CONNECT") {
val client = new StompClient
@@ -38,6 +52,9 @@ class StompTest extends FunSuiteSupport
frame should include("version:1.0\n")
}
+}
+
+class Stomp11Test extends StompTestSupport {
test("Stomp 1.1 CONNECT") {
val client = new StompClient
@@ -69,7 +86,7 @@ class StompTest extends FunSuiteSupport
frame should include("version:1.1\n")
}
- test("Stomp 1.1 CONNECT /w Version Fallback") {
+ test("Stomp 1.1 CONNECT /w valid version fallback") {
val client = new StompClient
client.open("localhost", 61613)
@@ -84,30 +101,33 @@ class StompTest extends FunSuiteSupport
frame should include("version:1.0\n")
}
- test("Stomp CONNECT /w invalid virtual host") {
+ test("Stomp 1.1 CONNECT /w invalid version fallback") {
val client = new StompClient
client.open("localhost", 61613)
client.send(
"CONNECT\n" +
- "accept-version:1.0,1.1\n" +
- "host:invalid\n" +
+ "accept-version:9.0,10.0\n" +
+ "host:default\n" +
"\n")
val frame = client.receive()
frame should startWith("ERROR\n")
+ frame should include regex("""version:.+?\n""")
frame should include regex("""message:.+?\n""")
}
+ test("Stomp CONNECT /w invalid virtual host") {
+ val client = new StompClient
+ client.open("localhost", 61613)
- override protected def beforeAll() = {
- val uri = "xml:classpath:activemq-stomp.xml"
- info("Loading broker configuration from the classpath with URI: " + uri)
- broker = BrokerFactory.createBroker(uri, true)
- Thread.sleep(1000); //TODO implement waitUntilStarted
- }
-
- override protected def afterAll() = {
- broker.stop
+ client.send(
+ "CONNECT\n" +
+ "accept-version:1.0,1.1\n" +
+ "host:invalid\n" +
+ "\n")
+ val frame = client.receive()
+ frame should startWith("ERROR\n")
+ frame should include regex("""message:.+?\n""")
}
}
\ No newline at end of file
Modified:
activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java?rev=1023946&r1=1023945&r2=1023946&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
Mon Oct 18 18:53:00 2010
@@ -201,6 +201,8 @@ public class TcpTransport extends JavaBa
this.channel.configureBlocking(false);
this.remoteAddress =
channel.socket().getRemoteSocketAddress().toString();
+ channel.socket().setSoLinger(true, 0);
+
this.socketState = new CONNECTED();
}