Author: chirino
Date: Wed Aug 1 21:48:27 2012
New Revision: 1368280
URL: http://svn.apache.org/viewvc?rev=1368280&view=rev
Log:
Fixes APLO-233: When WebSockets shutdown you have a small chance that the
broker can enter an CPU spin.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1368280&r1=1368279&r2=1368280&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Wed Aug 1 21:48:27 2012
@@ -85,7 +85,9 @@ trait SinkMapper[T,X] extends Sink[T] wi
if( full ) {
false
} else {
- downstream.offer(passing(value))
+ val accepted:Boolean = downstream.offer(passing(value))
+ assert(accepted, "The downstream sink violated it's contract, an offer
was not accepted but it had told us it was not full")
+ accepted
}
}
def passing(value:T):X
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala?rev=1368280&r1=1368279&r2=1368280&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
Wed Aug 1 21:48:27 2012
@@ -153,7 +153,6 @@ object WebSocketTransportFactory extends
} catch {
case ignore =>
}
- server = null
}
on_complete.run
}
@@ -260,8 +259,11 @@ object WebSocketTransportFactory extends
inbound.notify();
}
connection.disconnect()
+ dispatch_queue {
+ protocolCodec = null
+ on_completed.run()
+ }
}
- on_completed.run()
}
def getLocalAddress = new InetSocketAddress(request.getLocalAddr,
request.getLocalPort)
@@ -427,7 +429,9 @@ object WebSocketTransportFactory extends
dispatchQueue.assertExecuting
try {
if (!service_state.is_started) {
- throw new IOException("Not running.")
+ // this command gets dropped since it was issued after
+ // we were stopped..
+ return true;
}
protocolCodec.write(command) match {
case BufferState.FULL =>