Author: chirino
Date: Mon Oct 18 18:53:08 2010
New Revision: 1023947
URL: http://svn.apache.org/viewvc?rev=1023947&view=rev
Log:
Timeout a connection which does not send it's discrimination header.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala?rev=1023947&r1=1023946&r2=1023947&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala
Mon Oct 18 18:53:08 2010
@@ -24,6 +24,8 @@ import java.nio.channels.{WritableByteCh
import java.nio.ByteBuffer
import java.io.IOException
import java.lang.String
+import java.util.concurrent.TimeUnit
+import org.fusesource.hawtdispatch.ScalaDispatch._
/**
* <p>
@@ -137,7 +139,7 @@ class MultiProtocolHandler extends Proto
def protocol = "multi"
- var connected = false
+ var discriminated = false
override def onTransportCommand(command: Any) = {
@@ -145,6 +147,8 @@ class MultiProtocolHandler extends Proto
throw new ProtocolException("Expected a protocol codec");
}
+ discriminated = true
+
var codec: ProtocolCodec = command.asInstanceOf[ProtocolCodec];
val protocol = codec.protocol()
val protocolHandler = ProtocolFactory.get(protocol) match {
@@ -165,6 +169,17 @@ class MultiProtocolHandler extends Proto
override def onTransportConnected = {
connection.transport.resumeRead
+
+ // Make sure client connects eventually...
+ connection.dispatchQueue.after(5, TimeUnit.SECONDS) {
+ assert_discriminated
+ }
+ }
+
+ def assert_discriminated = {
+ if( connection.serviceState.isStarted && !discriminated ) {
+ connection.stop
+ }
}
}