Hi all,
Attached is a patch file containing basic stats gathering code. I introduced
a dependency on a scala-stats project I forked from the twitter guys and
modified. The scala-stats dependency can be downloaded from
http://github.com/andythedestroyer/scala-stats/downloads.
I initially set up 19 counters for:
* liftSessions - ( actually grabbed from SessionWatcher )
* usersLoggedIn ( incremented and decremented through User.logUserIn and
User logUserOut)
* userCount ( User actors started since start/reset )
* groupCount
* messagesCreated
* userMessagesCreated
* interpreterMessagesCreated
* schedulerMessagesCreated
* messagesPulled ( messages pulled from MessagePullActor )
* messagesDelivered
* messagesDeliveredTrackReason
* messagesDeliveredDirectReason
* messagesDeliveredConversationReason
* messagesDeliveredResendReason
* messagesDeliveredRegularReason
* messagesDeliveredInterpreterReason
* messagesMailed
* messagesSentViaHttp
* messagesFiltered
and 3 gauges for:
( Gauges are partial functions that run to grab a point in time statistic
when the stat is requested )
* users ( from Distributor )
* groups ""
* listeners ""
The stats MBean is dynamic and the stats are created when they are first
incremented, which means if you connect your jconsole at the start of the
server you may not see all the stats. You will need to reconnect later or
invoke the DynamicMBean.getBeanInfo method which jconsole only does once
when it connects.
There are two JMX operations: reset and getGatheringTime. Reset will reset
all statistics to zero and it will actually remove the gauges, which
shouldn't happen. It definitly needs some more work but I wanted to get some
opinions.
Here are a few items that can be added or improved.
* Timers
Timers are functions that wrap other functions, time how long it takes for
the wrapped function to execute and gathers max / min / avg.
Maybe wrap the message parser?.. Could also add other metrics like std
deviation and/or rate of increase/descrease in duration over time periods.
* Intigrate lift statistics
Lower level stats like http request counters and timers. Http response code
counters. DB connection pool counters and maybe a small array of the longest
running db queries.
* Add initializer stack for scala-stats
Add a mutable list of functions that will be called when the Stats object is
started or reset. This could initialize statistics from a db or elsewhere.
Let me know if this is the direction you guys are thinking of or if I am way
off mark.
Thanks
Andy
***************************
Index: src/main/scala/org/apache/esme/model/User.scala
===================================================================
--- src/main/scala/org/apache/esme/model/User.scala (revision 800477)
+++ src/main/scala/org/apache/esme/model/User.scala (working copy)
@@ -45,6 +45,8 @@
import java.net.URL
import java.util.logging._
+import com.twitter.service.{ Stats => ESMEStatistics }
+
object User extends User with KeyedMetaMapper[Long, User] {
override def afterSave = profileChanged _ :: notifyActors _ ::
super.afterSave
@@ -382,6 +384,9 @@
curUser.remove()
curUserId(Full(who.id.toString))
onLogIn.foreach(_(who))
+
+ // This may not be accurate because of users timing out instead of
logging out
+ ESMEStatistics incr "usersLoggedIn"
}
def logoutCurrentUser = logUserOut()
@@ -391,6 +396,7 @@
curUserId.remove()
curUser.remove()
S.request.foreach(_.request.getSession.invalidate)
+ ESMEStatistics getCounter "usersLoggedIn" decr
}
private object curUserId extends SessionVar[Box[String]](Empty)
Index: src/main/scala/org/apache/esme/lib/AccessPoolMgr.scala
===================================================================
--- src/main/scala/org/apache/esme/lib/AccessPoolMgr.scala (revision
800477)
+++ src/main/scala/org/apache/esme/lib/AccessPoolMgr.scala (working copy)
@@ -85,7 +85,7 @@
}
bind("add", in,
- "poolName" -> text("", addNewPool, "id" -> theInput)
+ "poolName" -> text("", addNewPool _, "id" -> theInput)
)
}
@@ -138,7 +138,7 @@
poolId.set(p.toLong);
redisplayPool()},
"id" -> editPoolName),
- "username" -> text(username, username = _, "id" -> editUsername),
+ "username" -> text(username, username = _:String, "id" ->
editUsername),
"permission" -> select(permissions, Empty, addPoolUser, "id" ->
editPermission)
)
Index: src/main/scala/org/apache/esme/lib/ActionMgr.scala
===================================================================
--- src/main/scala/org/apache/esme/lib/ActionMgr.scala (revision 800477)
+++ src/main/scala/org/apache/esme/lib/ActionMgr.scala (working copy)
@@ -123,7 +123,7 @@
bind("main", in,
- "name" -> text(name, name = _, "id" -> mainName),
+ "name" -> text(name, name = _:String, "id" -> mainName),
"test" -> textarea(test, test = _, "id" -> mainTest),
"action" -> textarea("", doSave, "id" -> mainAction)
)
Index: src/main/scala/org/apache/esme/lib/AuthMgr.scala
===================================================================
--- src/main/scala/org/apache/esme/lib/AuthMgr.scala (revision 800477)
+++ src/main/scala/org/apache/esme/lib/AuthMgr.scala (working copy)
@@ -100,7 +100,7 @@
}
bind("main", in,
- "token" -> text("", addAuthToken, "id" -> theInput)
+ "token" -> text("", addAuthToken _, "id" -> theInput)
)
}
Index: src/main/scala/org/apache/esme/lib/TrackMgr.scala
===================================================================
--- src/main/scala/org/apache/esme/lib/TrackMgr.scala (revision 800477)
+++ src/main/scala/org/apache/esme/lib/TrackMgr.scala (working copy)
@@ -101,7 +101,7 @@
}
bind("main", in,
- "track" -> text("", addTrack, "id" -> theInput)
+ "track" -> text("", addTrack _, "id" -> theInput)
)
}
Index: src/main/scala/org/apache/esme/actor/GroupActor.scala
===================================================================
--- src/main/scala/org/apache/esme/actor/GroupActor.scala (revision
800477)
+++ src/main/scala/org/apache/esme/actor/GroupActor.scala (working copy)
@@ -30,6 +30,7 @@
import org.apache.esme._
import model._
+import com.twitter.service.{ Stats => ESMEStatistics }
object GroupActor {
case class StartMeUp(group: Long)
@@ -42,6 +43,7 @@
react {
case StartMeUp =>
link(ActorWatcher)
+ ESMEStatistics incr "groupCount"
}
}
}
Index: src/main/scala/org/apache/esme/actor/ScalaInterpreter.scala
===================================================================
--- src/main/scala/org/apache/esme/actor/ScalaInterpreter.scala (revision
800477)
+++ src/main/scala/org/apache/esme/actor/ScalaInterpreter.scala (working
copy)
@@ -31,6 +31,8 @@
import org.apache.esme.model._
import net.liftweb.util.{Empty,Props}
+import com.twitter.service.{ Stats => ESMEStatistics }
+
object ScalaInterpreter extends Actor{
val settings = new Settings()
@@ -57,6 +59,9 @@
Distributor !
Distributor.AddMessageToMailbox(from, msg, InterpreterReason(from))
}
}
+
+ ESMEStatistics incr "interpreterMessagesCreated"
+ ESMEStatistics incr "messagesCreated"
}
}
Index: src/main/scala/org/apache/esme/actor/SchedulerActor.scala
===================================================================
--- src/main/scala/org/apache/esme/actor/SchedulerActor.scala (revision
800477)
+++ src/main/scala/org/apache/esme/actor/SchedulerActor.scala (working
copy)
@@ -28,6 +28,7 @@
import org.apache.esme.model._
import net.liftweb.http.ActorWatcher
import net.liftweb.util.{Full,Empty,TimeHelpers}
+import com.twitter.service.{ Stats => ESMEStatistics }
class SchedulerActor(val messageProcessor: Actor, val user: Long, val
everySeconds: Int, val reason: MailboxReason) extends Actor {
@@ -64,6 +65,8 @@
messageProcessor !
Distributor.AddMessageToMailbox(user, msg, reason)
// }
}
+ ESMEStatistics incr "schedulerMessagesCreated"
+ ESMEStatistics incr "messagesCreated"
}
}
Index: src/main/scala/org/apache/esme/actor/UserActor.scala
===================================================================
--- src/main/scala/org/apache/esme/actor/UserActor.scala (revision
800477)
+++ src/main/scala/org/apache/esme/actor/UserActor.scala (working copy)
@@ -36,6 +36,7 @@
import java.util.logging._
import java.util.{TimeZone, Calendar}
import scala.xml.{Elem}
+import com.twitter.service.{ Stats => ESMEStatistics }
object UserActor {
private[actor] case class StartMeUp(user: Long)
@@ -106,6 +107,8 @@
this ! UpdateTracking(Distributor.TrackTrackingType)
this ! UpdateTracking(Distributor.PerformTrackingType)
+ ESMEStatistics incr "userCount"
+
case RunFunc(f) =>
f()
@@ -135,7 +138,9 @@
else null
msg.saveMe
-
+ ESMEStatistics incr "userMessagesCreated"
+ ESMEStatistics incr "messagesCreated"
+
Distributor ! Distributor.AddMessageToMailbox(userId, msg,
NoReason)
for (id <- followers)
@@ -222,13 +227,20 @@
val mb = Mailbox.create.user(userId).message(msg)
reason match {
case TrackReason(trackId) => mb.viaTrack(trackId)
+ ESMEStatistics incr "messagesDeliveredTrackReason"
case DirectReason(fromId) => mb.directlyFrom(fromId)
+ ESMEStatistics incr "messagesDeliveredDirectReason"
case ConversationReason(convId) => mb.conversation(convId)
+ ESMEStatistics incr "messagesDeliveredConversationReason"
case ResendReason(resender) => mb.resentBy(resender)
+ ESMEStatistics incr "messagesDeliveredResendReason"
+ case RegularReason(id) => ESMEStatistics incr
"messagesDeliveredRegularReason"
+ case InterpreterReason(id) => ESMEStatistics incr
"messagesDeliveredInterpreterReason"
case _ =>
}
mb.saveMe
-
+ ESMEStatistics incr "messagesDelivered"
+
_mailbox = ((msg.id.is, reason) ::
_mailbox.toList).take(500).toArray
listeners.foreach(_ ! MessageReceived(msg, reason))
@@ -238,12 +250,14 @@
td.whatToDo match {
case m @ MailTo(_, _) =>
- User.find(userId).foreach( u =>
- HttpSender ! HttpSender.SendAMessage(m, msg, u, reason,
td.uniqueId))
+ User.find(userId).foreach( u => {
+ HttpSender ! HttpSender.SendAMessage(m, msg, u, reason,
td.uniqueId)
+ ESMEStatistics incr "messagesMailed"})
case h @ HttpTo(_, _, _, _, _) =>
- User.find(userId).foreach( u =>
- HttpSender ! HttpSender.SendAMessage(h, msg, u, reason,
td.uniqueId))
+ User.find(userId).foreach( u => {
+ HttpSender ! HttpSender.SendAMessage(h, msg, u, reason,
td.uniqueId)
+ ESMEStatistics incr "messagesSentViaHTTP"})
case PerformResend =>
if (! msg.saved_?) msg.save
@@ -256,7 +270,7 @@
case ScalaInterpret => if (msg.source.is != "scala")
ScalaInterpreter ! ScalaInterpreter.ScalaExcerpt(userId,
msg.id.is, msg.pool.is, msg.getText)
- case PerformFilter => // IGNORE
+ case PerformFilter => ESMEStatistics incr "messagesFiltered" //
IGNORE
}
}
}
Index: src/main/scala/org/apache/esme/actor/Distributor.scala
===================================================================
--- src/main/scala/org/apache/esme/actor/Distributor.scala (revision
800477)
+++ src/main/scala/org/apache/esme/actor/Distributor.scala (working copy)
@@ -159,4 +159,8 @@
ret
}
}
+
+ def getUsersCount = users.size
+ def getGroupsCount = groups.size
+ def getListenersCount = listeners.size
}
Index: src/main/scala/org/apache/esme/actor/MessagePullActor.scala
===================================================================
--- src/main/scala/org/apache/esme/actor/MessagePullActor.scala (revision
800477)
+++ src/main/scala/org/apache/esme/actor/MessagePullActor.scala (working
copy)
@@ -25,6 +25,7 @@
import scala.actors.Actor._
import net.liftweb.http.ActorWatcher
import org.apache.esme.actor.Distributor.{UserCreatedMessage=>Msg}
+import com.twitter.service.{ Stats => ESMEStatistics }
class MessagePullActor(val messageProcessor: Actor, private var
lastMessage: Option[Msg], val messageSource: UniqueMessageSource) extends
Actor {
@@ -45,6 +46,7 @@
for (message <- lastMessages) {
messageProcessor ! message
lastMessage = Some(message)
+ ESMEStatistics incr "messagesPulled"
}
}
case FetchMessages => actor {
Index: src/main/scala/bootstrap/liftweb/Boot.scala
===================================================================
--- src/main/scala/bootstrap/liftweb/Boot.scala (revision 800477)
+++ src/main/scala/bootstrap/liftweb/Boot.scala (working copy)
@@ -43,7 +43,7 @@
import org.compass.core.config.CompassConfiguration
import scala.actors.Actor
import Actor._
-
+import com.twitter.service.{StatsMBean, Stats => ESMEStatistics }
/**
* A class that's instantiated early and run. It allows the application
* to modify lift's environment
@@ -140,11 +140,18 @@
LiftRules.early.append(makeUtf8)
+ // register stats gathering object with platform mbean server
+ StatsMBean("org.apache.esme.stats")
+
Distributor.touch
SchedulerActor.touch
MessagePullActor.touch
ScalaInterpreter.touch
+ ESMEStatistics.makeGauge("users") { Distributor.getUsersCount }
+ ESMEStatistics.makeGauge("groups") { Distributor.getGroupsCount }
+ ESMEStatistics.makeGauge("listeners") { Distributor.getListenersCount }
+
Action.findAll(By(Action.disabled, false), By(Action.removed,
false)).foreach {
_.startActors
}
@@ -265,6 +272,7 @@
loop {
react {
case SessionWatcherInfo(sessions) =>
+ ESMEStatistics.getCounter("liftSessions").set(sessions.size)
if ((millis - tenMinutes) > lastTime) {
lastTime = millis
val rt = Runtime.getRuntime
Index: pom.xml
===================================================================
--- pom.xml (revision 800477)
+++ pom.xml (working copy)
@@ -137,6 +137,13 @@
<scope>provided</scope>
</dependency>
+ <!-- for stats gathering and stats MBean code can be found at
+ git://github.com/andythedestroyer/scala-stats.git -->
+ <dependency>
+ <groupId>com.twitter.service</groupId>
+ <artifactId>stats</artifactId>
+ <version>1.0</version>
+ </dependency>
<dependency>
<groupId>junit</groupId>