Done. Sorry, comments were inserted due to recent upgrade of my IDE.
Best Regards, Vladimir 2012/8/20 AJ Prudhomme <[email protected]> > Remove! > > Sent from my Verizon Wireless Phone > > [email protected] wrote: > > >Author: lester > >Date: Mon Aug 20 11:16:34 2012 > >New Revision: 1374973 > > > >URL: http://svn.apache.org/viewvc?rev=1374973&view=rev > >Log: > >In scope of ESME-360: Began work on XMPP Consumer component action. > > > >Added: > > > esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala > > > esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala > >Modified: > > esme/branches/akka/build.sbt > > esme/branches/akka/pom.xml > > esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala > > > esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala > > esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala > > esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala > > > >Modified: esme/branches/akka/build.sbt > >URL: > http://svn.apache.org/viewvc/esme/branches/akka/build.sbt?rev=1374973&r1=1374972&r2=1374973&view=diff > > >============================================================================== > >--- esme/branches/akka/build.sbt (original) > >+++ esme/branches/akka/build.sbt Mon Aug 20 11:16:34 2012 > >@@ -8,7 +8,7 @@ version := "1.4" > > > > organization := "Apache Software Foundation" > > > >-scalaVersion := "2.9.1" > >+scalaVersion := "2.9.1" > > > > //scalacOptions ++= Seq("-unchecked", "-deprecation") > > scalacOptions ++= Seq("-deprecation") > >@@ -28,7 +28,7 @@ libraryDependencies ++= { > > val compassVersion = "2.1.1" > > val luceneVersion = "2.4.0" > > val scalazVersion = "6.0.4" > >- val akkaVersion = "2.0.2" > >+ val akkaVersion = "2.1-20120701-002745" > > val eclipsejettyVersion = "7.3.1.v20110307" > > val mortbayjettyVersion = "6.1.22" > > val slf4jVersion = "1.6.4" > >@@ -47,7 +47,7 @@ libraryDependencies ++= { > > "net.liftweb" %% "lift-textile" % liftVersion % "compile->default", > > "org.scalaz" %% "scalaz-core" % scalazVersion % "compile->default", > > "com.typesafe.akka" % "akka-actor" % akkaVersion % > "compile->default", > >- "com.typesafe.akka" % "akka-camel" % "2.1-SNAPSHOT" % > "compile->default", > >+ "com.typesafe.akka" % "akka-camel" % "2.1-20120701-002745" % > "compile->default", > > "org.apache.camel" % "camel-xmpp" % "2.8.0" % "compile->default", > > "javax.servlet" % "servlet-api" % "2.5" % "provided->default", > > "org.compass-project" % "compass" % compassVersion % > "compile->default", > > > >Modified: esme/branches/akka/pom.xml > >URL: > http://svn.apache.org/viewvc/esme/branches/akka/pom.xml?rev=1374973&r1=1374972&r2=1374973&view=diff > > >============================================================================== > >--- esme/branches/akka/pom.xml (original) > >+++ esme/branches/akka/pom.xml Mon Aug 20 11:16:34 2012 > >@@ -76,7 +76,7 @@ > > <lift.version>2.4</lift.version> > > <scala.version>2.9.1</scala.version> > > <scalaz.version>6.0.4</scalaz.version> > >- <akka.version>2.1-SNAPSHOT</akka.version> > >+ <akka.version>2.1-20120701-002745</akka.version> > > <compass.version>2.1.1</compass.version> > > <lucene.version>2.4.0</lucene.version> > > <netbeans.hint.deploy.server>gfv3</netbeans.hint.deploy.server> > >@@ -91,17 +91,6 @@ > > <url>http://repo2.maven.org/maven2/</url> > > </repository> > > <repository> > >- <id>scala-tools.org</id> > >- <name>Scala-Tools Maven2 Repository</name> > >- <url>http://scala-tools.org/repo-releases</url> > >- </repository> > >- <repository> > >- <id>scala-tools.org.snapshots</id> > >- <name>Scala-Tools Maven2 Repository for Snapshots</name> > >- <url>http://scala-tools.org/repo-snapshots</url> > >- <snapshots/> > >- </repository> > >- <repository> > > <id>typesafe</id> > > <name>Typesafe Repository Releases</name> > > <url>http://repo.typesafe.com/typesafe/releases</url> > >@@ -135,9 +124,9 @@ > > > > <pluginRepositories> > > <pluginRepository> > >- <id>scala-tools.org</id> > >- <name>Scala-Tools Maven2 Repository</name> > >- <url>http://scala-tools.org/repo-releases</url> > >+ <id>typesafe</id> > >+ <name>Typesafe Repository Releases</name> > >+ <url>http://repo.typesafe.com/typesafe/releases</url> > > </pluginRepository> > > </pluginRepositories> > > > >@@ -232,7 +221,7 @@ > > <dependency> > > <groupId>com.typesafe.akka</groupId> > > <artifactId>akka-camel</artifactId> > >- <version>2.1-SNAPSHOT</version> > >+ <version>${akka.version}</version> > > </dependency> > > <dependency> > > <groupId>org.apache.camel</groupId> > > > >Modified: esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala > >URL: > http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala?rev=1374973&r1=1374972&r2=1374973&view=diff > > >============================================================================== > >--- esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala > (original) > >+++ esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala Mon > Aug 20 11:16:34 2012 > >@@ -27,6 +27,8 @@ import net.liftweb.http.auth._ > > import net.liftweb.sitemap._ > > import net.liftweb.sitemap.Loc._ > > import Helpers._ > >+import akka.actor.{Props => AkkaProps, ActorSystem} > >+ > > //import TimeHelpers.intToTimeSpanBuilder > > //import net.liftweb.mapper.{DB, ConnectionManager, Schemifier, > DefaultConnectionIdentifier, ConnectionIdentifier} > > import java.sql.{Connection, DriverManager} > >@@ -249,6 +251,8 @@ class Boot extends Loggable { > > ConvDistributor.touch > > // ScalaInterpreter.touch > > > >+ val sys = ActorSystem("camel") > >+ val xmppSupervisor = sys.actorOf(AkkaProps(new XmppSupervisor()), > "XmppSupervisor") > > > > // Initiating popular links and resent messages > > val resentPeriod = Props.getLong("stats.resent.period", 1 week) > > > >Modified: > esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala > >URL: > http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala?rev=1374973&r1=1374972&r2=1374973&view=diff > > >============================================================================== > >--- > esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala > (original) > >+++ > esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala Mon > Aug 20 11:16:34 2012 > >@@ -64,8 +64,8 @@ object UserActor { > > val xmppUsr = Props.get("xmpp.user") openOr "" > > val xmppPwd = Props.get("xmpp.password") openOr "" > > val xmppServiceName = Props.get("xmpp.serviceName") openOr "" > >- lazy val sys = ActorSystem("camel") > >- lazy val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost, > xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName))) > >+ val sys = ActorSystem("camel") > >+ val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost, > xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)), "XmppSender") > > } > > > > > >@@ -289,8 +289,14 @@ class UserActor extends LiftActor { > > Distributor ! > > Distributor.AddMessageToMailbox(id, msg, > ResendReason(userId)) > > > >- case FetchFeed(_, _) => > >- MessagePullActor ! MessagePullActor.Fetch(td.performId) > >+ case XmppFrom(_) => { > >+ val sys = ActorSystem("camel") > >+ sys.actorFor("XmppSupervisor") ! > XmppSupervisor.Fetch(td.performId) > >+ } > >+ > >+ > >+ case FetchFeed(_, _) => > >+ MessagePullActor ! MessagePullActor.Fetch(td.performId) > > > > case ScalaInterpret => logger.info("Scala interpreter is > disabled!") > > /*if (msg.source.is != "scala") > > > >Added: > esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala > >URL: > http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala?rev=1374973&view=auto > > >============================================================================== > >--- > esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala > (added) > >+++ > esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala > Mon Aug 20 11:16:34 2012 > >@@ -0,0 +1,51 @@ > >+package org.apache.esme.actor > >+ > >+import akka.camel.{CamelMessage, Consumer} > >+ > >+import net.liftweb.common.{Empty, Logger} > >+import collection.immutable.Queue > >+import org.apache.esme.actor.Distributor.UserCreatedMessage > >+import org.apache.esme.model.User > >+ > >+/** > >+ * Created with IntelliJ IDEA. > >+ * User: lester > >+ * Date: 17.08.12 > >+ * Time: 2:26 > >+ */ > >+ > >+object XmppReceiver { > >+ val logger: Logger = Logger("org.apache.esme.actor") > >+ case class FetchMessages() > >+} > >+ > >+class XmppReceiver(esmeSrv: String, esmePort: Int, esmeUsr: String, > esmePwd: String, xmppServiceName: String, participant: String, user: User) > extends Consumer { > >+ > >+ import XmppReceiver._ > >+ > >+ var messages: List[(String, Long)] = List.empty > >+ > >+ def endpointUri = {val uri = "xmpp://%s@%s:%s/%s?password=%s" format > (esmeUsr, esmeSrv, esmePort, participant, esmePwd); logger.info("XMPP URI > is: %s".format(uri)); uri} > >+ > >+ def receive = { > >+ case msg: CamelMessage => { > >+ messages = (msg.bodyAs[String], System.currentTimeMillis) :: > messages > >+ } > >+ case FetchMessages => { > >+ messages.foreach(message => > >+ Distributor ! UserCreatedMessage( > >+ if (user != null) {user.id} else 0, > >+ message._1, > >+ Nil, > >+ message._2, > >+ Empty, > >+ participant, > >+ Empty, > >+ None > >+ ) > >+ ) > >+ messages = List.empty > >+ } > >+ case _ => logger.error("Incoming message is not Camel Message!") > >+ } > >+} > >\ No newline at end of file > > > >Added: > esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala > >URL: > http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala?rev=1374973&view=auto > > >============================================================================== > >--- > esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala > (added) > >+++ > esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala > Mon Aug 20 11:16:34 2012 > >@@ -0,0 +1,66 @@ > >+package org.apache.esme.actor > >+ > >+import akka.actor.{ActorRef, Actor, Props => AkkaProps} > >+import net.liftweb.util.Props > >+import org.apache.esme.actor.XmppReceiver.FetchMessages > >+import org.apache.esme.model.User > >+import net.liftweb.common.Logger > >+ > >+/** > >+ * Created with IntelliJ IDEA. > >+ * User: lester > >+ * Date: 17.08.12 > >+ * Time: 4:21 > >+ */ > >+ > >+object XmppSupervisor { > >+ val logger: Logger = Logger("org.apache.esme.actor") > >+ > >+ > >+ sealed trait XmppSupervisorActions > >+ case class Fetch(id: Long) extends XmppSupervisorActions > >+ case class Start(id: Long, who: String, usr: User) extends > XmppSupervisorActions > >+ case class Stop(id: Long) extends XmppSupervisorActions > >+} > >+ > >+class XmppSupervisor extends Actor { > >+ > >+ import XmppSupervisor._ > >+ > >+ private var xmppPullActors: Map[Long, ActorRef] = Map.empty > >+ > >+ var xmppHost: String = _ > >+ var xmppPort: String = _ > >+ var xmppUsr: String = _ > >+ var xmppPwd: String = _ > >+ var xmppServiceName: String = _ > >+ > >+ > >+ override def preStart() { > >+ logger.info("preStart() called") > >+ > >+ xmppHost = Props.get("xmpp.host") openOr "" > >+ xmppPort = Props.get("xmpp.port") openOr "" > >+ xmppUsr = Props.get("xmpp.user") openOr "" > >+ xmppPwd = Props.get("xmpp.password") openOr "" > >+ xmppServiceName = Props.get("xmpp.serviceName") openOr "" > >+ } > >+ > >+ def receive = { > >+ case Start(id, who, usr) => { > >+ logger.info("Start message received. User: %s, who: > %s".format(usr, who)) > >+ xmppPullActors += (id -> context.actorOf(AkkaProps(new > XmppReceiver(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName, > who, usr)))) > >+ } > >+ case Stop(id) => { > >+ xmppPullActors.get(id).foreach { ref => > >+ context.stop(ref) > >+ xmppPullActors -= id > >+ } > >+ } > >+ case Fetch(id) => { > >+ logger.info("Fetch message received") > >+ xmppPullActors.get(id).foreach(ref => ref ! FetchMessages) > >+ } > >+ case _ => logger.info("Unknown message received") > >+ } > >+} > > > >Modified: > esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala > >URL: > http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala?rev=1374973&r1=1374972&r2=1374973&view=diff > > >============================================================================== > >--- esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala > (original) > >+++ esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala > Mon Aug 20 11:16:34 2012 > >@@ -153,7 +153,8 @@ object MsgParser extends TextileParsers( > > lazy val password: Parser[String] = user > > > > lazy val mailtoUrl: Parser[String] = accept("mailto:") ~> emailAddr > >- lazy val xmppUrl: Parser[String] = accept("xmppto:") ~> xmppAddr > >+ lazy val xmppToUrl: Parser[String] = accept("xmppto:") ~> xmppAddr > >+ lazy val xmppFromUrl: Parser[String] = accept("xmppfrom:") ~> xmppAddr > > > > lazy val emailAddr: Parser[String] = rep1(xchar) ^^ { > > case xs => xs.mkString > >@@ -268,13 +269,16 @@ object MsgParser extends TextileParsers( > > (mailtoUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ { > > case mt ~ text => MailTo(mt, text.map(_ mkString)) > > }) | > >- (xmppUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ { > >+ (xmppToUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ { > > case mt ~ text => XmppTo(mt, text.map(_ mkString)) > > }) | > >+ (xmppFromUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ { > >+ case mt ~ text => XmppFrom(mt) > >+ }) | > > (scheme ~ userPass ~ urlpart ~ rep(httpHeader) ~ httpData <~ EOF ^^ { > >- case protocol ~ userPass ~ urlpart ~ hdrs ~ data => > >- HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data) > >- }) | > >+ case protocol ~ userPass ~ urlpart ~ hdrs ~ data => > >+ HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data) > >+ }) | > > (acceptCI("atom:") ~> httpUrl ~ tags <~ EOF ^^ { > > case url ~ tags => FetchAtom(UrlStore.make(url), tags) > > }) | > > > >Modified: > esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala > >URL: > http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala?rev=1374973&r1=1374972&r2=1374973&view=diff > > >============================================================================== > >--- esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala > (original) > >+++ esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala > Mon Aug 20 11:16:34 2012 > >@@ -34,7 +34,15 @@ import java.util.Calendar > > import java.util.Date > > import scala.xml.{Text, Node, Elem => XmlElem} > > > >-object Action extends Action with LongKeyedMetaMapper[Action] { > >+import akka.actor.{Props => AkkaProps, ActorSystem} > >+ > >+object Action extends Action with LongKeyedMetaMapper[Action] with > Logger { > >+ > >+ val logger: Logger = Logger("org.apache.esme.model") > >+ val sys = ActorSystem("camel") > >+ val xmppSupervisor = sys.actorFor("XmppSupervisor") > >+ > >+ > > override def afterCommit = notifyDistributor _ :: super.afterCommit > > > > private def notifyDistributor(in: Action) { > >@@ -58,6 +66,7 @@ object Action extends Action with LongKe > > } else { > > SchedulerActor ! SchedulerActor.StopRegular(in.id) > > MessagePullActor ! MessagePullActor.StopPullActor(in.id) > >+ xmppSupervisor ! XmppSupervisor.Stop(in.id) > > } > > } > > > >@@ -185,6 +194,9 @@ object Action extends Action with LongKe > > */ > > class Action extends LongKeyedMapper[Action] { > > > >+ import Action.xmppSupervisor > >+ import Action.logger > >+ > > /** > > * Actors related to regularly executed actions are started here > > * This is done when the action is activated or at the start of the > application > >@@ -212,7 +224,14 @@ class Action extends LongKeyedMapper[Act > > case FetchRss(_, _) => new RssFeed(u, url.url, > urlSourcePrefix + url.uniqueId, 0, tags) > > } > > MessagePullActor ! MessagePullActor.StartPullActor(id.is, > lastMsg, feed) > >- > >+ > >+ case _ => > >+ } > >+ } > >+ case XmppFrom(who) => { > >+ User.find(user) match { > >+ case Full(u) => > >+ xmppSupervisor ! XmppSupervisor.Start(id.is, who, u) > > case _ => > > } > > } > >@@ -569,6 +588,7 @@ sealed trait Performances > > case class MailTo(who: String, text: Option[String]) extends Performances > > case class HttpTo(url: String, user: String, password: String, headers: > List[(String, String)], data: Option[String]) extends Performances > > case class XmppTo(who: String, text: Option[String]) extends Performances > >+case class XmppFrom(who: String) extends Performances > > case class FetchAtom(override val url: UrlStore, override val tags: > List[String]) extends FetchFeed(url, tags) > > case class FetchRss(override val url: UrlStore, override val tags: > List[String]) extends FetchFeed(url, tags) > > case object PerformResend extends Performances > > > > > -- Best Regards, Vladimir Ivanov
