Author: chirino
Date: Wed Mar 30 03:49:31 2011
New Revision: 1086830
URL: http://svn.apache.org/viewvc?rev=1086830&view=rev
Log:
Simpler assertions and error logging.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1086830&r1=1086829&r2=1086830&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
Wed Mar 30 03:49:31 2011
@@ -20,10 +20,10 @@ import _root_.java.io.{IOException}
import _root_.java.lang.{String}
import org.fusesource.hawtdispatch._
import protocol.{ProtocolHandler}
-import org.apache.activemq.apollo.util.{Log, BaseService}
import org.apache.activemq.apollo.filter.BooleanExpression
import org.apache.activemq.apollo.transport.{TransportListener,
DefaultTransportListener, Transport}
import org.apache.activemq.apollo.dto.{DestinationDTO, ConnectionStatusDTO}
+import org.apache.activemq.apollo.util.{Dispatched, Log, BaseService}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -32,7 +32,7 @@ object Connection extends Log
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-abstract class Connection() extends BaseService {
+abstract class Connection() extends BaseService with Dispatched {
import Connection._
val dispatch_queue = createQueue()
@@ -99,20 +99,24 @@ class BrokerConnection(var connector: Co
override def toString = "id: "+id.toString
protected override def _start(on_completed:Runnable) = {
- connector.broker.connection_log.info("Client connected from: %s",
transport.getRemoteAddress)
protocol_handler.set_connection(this);
super._start(on_completed)
}
protected override def _stop(on_completed:Runnable) = {
- connector.broker.connection_log.info("Client disconnected from: %s",
transport.getRemoteAddress)
connector.stopped(this)
super._stop(on_completed)
}
- protected override def on_transport_connected() =
protocol_handler.on_transport_connected
+ protected override def on_transport_connected() = {
+ connector.broker.connection_log.info("connected: %s",
transport.getRemoteAddress)
+ protocol_handler.on_transport_connected
+ }
- protected override def on_transport_disconnected() =
protocol_handler.on_transport_disconnected
+ protected override def on_transport_disconnected() = {
+ connector.broker.connection_log.info("disconnected: %s",
transport.getRemoteAddress)
+ protocol_handler.on_transport_disconnected
+ }
protected override def on_transport_command(command: Object) = {
try {
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1086830&r1=1086829&r2=1086830&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Wed Mar 30 03:49:31 2011
@@ -151,7 +151,7 @@ class OverflowSink[T](val downstream:Sin
class MutableSink[T] extends Sink[T] {
var refiller:Runnable = NOOP
- private var _downstream:Option[Sink[T]] =_
+ private var _downstream:Option[Sink[T]] = None
def downstream_=(value: Option[Sink[T]]) {
_downstream.foreach(d => d.refiller = NOOP )
@@ -292,7 +292,7 @@ class Session[T](val producer_queue:Disp
override def full = {
- assert(getCurrentQueue eq producer_queue)
+ assert(producer_queue.isExecuting)
_full
}
@@ -310,7 +310,7 @@ class Session[T](val producer_queue:Disp
def close = {
if( !closed ) {
closed=true
- assert(getCurrentQueue eq producer_queue)
+ assert(producer_queue.isExecuting)
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala?rev=1086830&r1=1086829&r2=1086830&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
Wed Mar 30 03:49:31 2011
@@ -136,10 +136,9 @@ class JettyWebServer(val broker:Broker)
var server:Server = _
-
override def toString: String = "jetty webserver"
- protected val dispatch_queue = createQueue()
+ val dispatch_queue = createQueue()
protected def _start(on_completed: Runnable) = Broker.BLOCKABLE_THREAD_POOL {
this.synchronized {
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala?rev=1086830&r1=1086829&r2=1086830&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
Wed Mar 30 03:49:31 2011
@@ -18,11 +18,11 @@ package org.apache.activemq.apollo.broke
import java.io.{IOException}
import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
-import org.apache.activemq.apollo.util.ClassFinder
import org.apache.activemq.apollo.broker.store.MessageRecord
import org.apache.activemq.apollo.transport._
import org.apache.activemq.apollo.broker.{Delivery, Message, BrokerConnection}
import org.apache.activemq.apollo.dto.ConnectionStatusDTO
+import org.apache.activemq.apollo.util.{Log, ClassFinder}
/**
* <p>
@@ -58,11 +58,13 @@ trait Protocol extends ProtocolCodecFact
}
+object ProtocolHandler extends Log
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait ProtocolHandler {
+ import ProtocolHandler._
def protocol:String
@@ -75,6 +77,7 @@ trait ProtocolHandler {
def create_connection_status = new ConnectionStatusDTO
def on_transport_failure(error:IOException) = {
+ trace(error)
connection.stop()
}
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1086830&r1=1086829&r2=1086830&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Wed Mar 30 03:49:31 2011
@@ -261,7 +261,7 @@ class StompProtocolHandler extends Proto
val session = session_manager.open(producer.dispatch_queue)
def close = {
- assert(getCurrentQueue == producer.dispatch_queue)
+ assert(producer.dispatch_queue.isExecuting)
if( !closed ) {
closed = true
if( browser ) {
Modified:
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala?rev=1086830&r1=1086829&r2=1086830&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
Wed Mar 30 03:49:31 2011
@@ -28,7 +28,7 @@ object BaseService extends Log
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait BaseService extends Service {
+trait BaseService extends Service with Dispatched {
import BaseService._
@@ -58,8 +58,6 @@ trait BaseService extends Service {
protected class STOPPING extends State with CallbackSupport { override def
is_stopping = true }
protected class STOPPED extends State { override def is_stopped = true }
- protected def dispatch_queue:DispatchQueue
-
final def start() = start(null)
final def stop() = stop(null)
Modified:
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala?rev=1086830&r1=1086829&r2=1086830&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
Wed Mar 30 03:49:31 2011
@@ -28,5 +28,10 @@ import org.fusesource.hawtdispatch._
trait Dispatched {
def dispatch_queue:DispatchQueue
- protected def assert_dispatched = assert( getCurrentQueue == dispatch_queue )
+ protected def assert_executing = assert( dispatch_queue.isExecuting,
+ "Dispatch queue '%s' was not executing, (currently executing: %s)".format(
+ Option(dispatch_queue.getLabel).getOrElse(""),
+ Option(getCurrentThreadQueue).map(_.getLabel).getOrElse("None") )
+ )
+
}
\ No newline at end of file