Remove! Sent from my Verizon Wireless Phone
[email protected] wrote: >Author: lester >Date: Mon Aug 20 11:16:34 2012 >New Revision: 1374973 > >URL: http://svn.apache.org/viewvc?rev=1374973&view=rev >Log: >In scope of ESME-360: Began work on XMPP Consumer component action. > >Added: > esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala > > esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala >Modified: > esme/branches/akka/build.sbt > esme/branches/akka/pom.xml > esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala > esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala > esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala > esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala > >Modified: esme/branches/akka/build.sbt >URL: >http://svn.apache.org/viewvc/esme/branches/akka/build.sbt?rev=1374973&r1=1374972&r2=1374973&view=diff >============================================================================== >--- esme/branches/akka/build.sbt (original) >+++ esme/branches/akka/build.sbt Mon Aug 20 11:16:34 2012 >@@ -8,7 +8,7 @@ version := "1.4" > > organization := "Apache Software Foundation" > >-scalaVersion := "2.9.1" >+scalaVersion := "2.9.1" > > //scalacOptions ++= Seq("-unchecked", "-deprecation") > scalacOptions ++= Seq("-deprecation") >@@ -28,7 +28,7 @@ libraryDependencies ++= { > val compassVersion = "2.1.1" > val luceneVersion = "2.4.0" > val scalazVersion = "6.0.4" >- val akkaVersion = "2.0.2" >+ val akkaVersion = "2.1-20120701-002745" > val eclipsejettyVersion = "7.3.1.v20110307" > val mortbayjettyVersion = "6.1.22" > val slf4jVersion = "1.6.4" >@@ -47,7 +47,7 @@ libraryDependencies ++= { > "net.liftweb" %% "lift-textile" % liftVersion % "compile->default", > "org.scalaz" %% "scalaz-core" % scalazVersion % "compile->default", > "com.typesafe.akka" % "akka-actor" % akkaVersion % "compile->default", >- "com.typesafe.akka" % "akka-camel" % "2.1-SNAPSHOT" % "compile->default", >+ "com.typesafe.akka" % "akka-camel" % "2.1-20120701-002745" % >"compile->default", > "org.apache.camel" % "camel-xmpp" % "2.8.0" % "compile->default", > "javax.servlet" % "servlet-api" % "2.5" % "provided->default", > "org.compass-project" % "compass" % compassVersion % "compile->default", > >Modified: esme/branches/akka/pom.xml >URL: >http://svn.apache.org/viewvc/esme/branches/akka/pom.xml?rev=1374973&r1=1374972&r2=1374973&view=diff >============================================================================== >--- esme/branches/akka/pom.xml (original) >+++ esme/branches/akka/pom.xml Mon Aug 20 11:16:34 2012 >@@ -76,7 +76,7 @@ > <lift.version>2.4</lift.version> > <scala.version>2.9.1</scala.version> > <scalaz.version>6.0.4</scalaz.version> >- <akka.version>2.1-SNAPSHOT</akka.version> >+ <akka.version>2.1-20120701-002745</akka.version> > <compass.version>2.1.1</compass.version> > <lucene.version>2.4.0</lucene.version> > <netbeans.hint.deploy.server>gfv3</netbeans.hint.deploy.server> >@@ -91,17 +91,6 @@ > <url>http://repo2.maven.org/maven2/</url> > </repository> > <repository> >- <id>scala-tools.org</id> >- <name>Scala-Tools Maven2 Repository</name> >- <url>http://scala-tools.org/repo-releases</url> >- </repository> >- <repository> >- <id>scala-tools.org.snapshots</id> >- <name>Scala-Tools Maven2 Repository for Snapshots</name> >- <url>http://scala-tools.org/repo-snapshots</url> >- <snapshots/> >- </repository> >- <repository> > <id>typesafe</id> > <name>Typesafe Repository Releases</name> > <url>http://repo.typesafe.com/typesafe/releases</url> >@@ -135,9 +124,9 @@ > > <pluginRepositories> > <pluginRepository> >- <id>scala-tools.org</id> >- <name>Scala-Tools Maven2 Repository</name> >- <url>http://scala-tools.org/repo-releases</url> >+ <id>typesafe</id> >+ <name>Typesafe Repository Releases</name> >+ <url>http://repo.typesafe.com/typesafe/releases</url> > </pluginRepository> > </pluginRepositories> > >@@ -232,7 +221,7 @@ > <dependency> > <groupId>com.typesafe.akka</groupId> > <artifactId>akka-camel</artifactId> >- <version>2.1-SNAPSHOT</version> >+ <version>${akka.version}</version> > </dependency> > <dependency> > <groupId>org.apache.camel</groupId> > >Modified: esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala >URL: >http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala?rev=1374973&r1=1374972&r2=1374973&view=diff >============================================================================== >--- esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala (original) >+++ esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala Mon Aug 20 >11:16:34 2012 >@@ -27,6 +27,8 @@ import net.liftweb.http.auth._ > import net.liftweb.sitemap._ > import net.liftweb.sitemap.Loc._ > import Helpers._ >+import akka.actor.{Props => AkkaProps, ActorSystem} >+ > //import TimeHelpers.intToTimeSpanBuilder > //import net.liftweb.mapper.{DB, ConnectionManager, Schemifier, > DefaultConnectionIdentifier, ConnectionIdentifier} > import java.sql.{Connection, DriverManager} >@@ -249,6 +251,8 @@ class Boot extends Loggable { > ConvDistributor.touch > // ScalaInterpreter.touch > >+ val sys = ActorSystem("camel") >+ val xmppSupervisor = sys.actorOf(AkkaProps(new XmppSupervisor()), >"XmppSupervisor") > > // Initiating popular links and resent messages > val resentPeriod = Props.getLong("stats.resent.period", 1 week) > >Modified: >esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala >URL: >http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala?rev=1374973&r1=1374972&r2=1374973&view=diff >============================================================================== >--- esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala >(original) >+++ esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala >Mon Aug 20 11:16:34 2012 >@@ -64,8 +64,8 @@ object UserActor { > val xmppUsr = Props.get("xmpp.user") openOr "" > val xmppPwd = Props.get("xmpp.password") openOr "" > val xmppServiceName = Props.get("xmpp.serviceName") openOr "" >- lazy val sys = ActorSystem("camel") >- lazy val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost, >xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName))) >+ val sys = ActorSystem("camel") >+ val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost, >xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)), "XmppSender") > } > > >@@ -289,8 +289,14 @@ class UserActor extends LiftActor { > Distributor ! > Distributor.AddMessageToMailbox(id, msg, ResendReason(userId)) > >- case FetchFeed(_, _) => >- MessagePullActor ! MessagePullActor.Fetch(td.performId) >+ case XmppFrom(_) => { >+ val sys = ActorSystem("camel") >+ sys.actorFor("XmppSupervisor") ! >XmppSupervisor.Fetch(td.performId) >+ } >+ >+ >+ case FetchFeed(_, _) => >+ MessagePullActor ! MessagePullActor.Fetch(td.performId) > > case ScalaInterpret => logger.info("Scala interpreter is > disabled!") > /*if (msg.source.is != "scala") > >Added: >esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala >URL: >http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala?rev=1374973&view=auto >============================================================================== >--- esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala >(added) >+++ esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala >Mon Aug 20 11:16:34 2012 >@@ -0,0 +1,51 @@ >+package org.apache.esme.actor >+ >+import akka.camel.{CamelMessage, Consumer} >+ >+import net.liftweb.common.{Empty, Logger} >+import collection.immutable.Queue >+import org.apache.esme.actor.Distributor.UserCreatedMessage >+import org.apache.esme.model.User >+ >+/** >+ * Created with IntelliJ IDEA. >+ * User: lester >+ * Date: 17.08.12 >+ * Time: 2:26 >+ */ >+ >+object XmppReceiver { >+ val logger: Logger = Logger("org.apache.esme.actor") >+ case class FetchMessages() >+} >+ >+class XmppReceiver(esmeSrv: String, esmePort: Int, esmeUsr: String, esmePwd: >String, xmppServiceName: String, participant: String, user: User) extends >Consumer { >+ >+ import XmppReceiver._ >+ >+ var messages: List[(String, Long)] = List.empty >+ >+ def endpointUri = {val uri = "xmpp://%s@%s:%s/%s?password=%s" format >(esmeUsr, esmeSrv, esmePort, participant, esmePwd); logger.info("XMPP URI is: >%s".format(uri)); uri} >+ >+ def receive = { >+ case msg: CamelMessage => { >+ messages = (msg.bodyAs[String], System.currentTimeMillis) :: messages >+ } >+ case FetchMessages => { >+ messages.foreach(message => >+ Distributor ! UserCreatedMessage( >+ if (user != null) {user.id} else 0, >+ message._1, >+ Nil, >+ message._2, >+ Empty, >+ participant, >+ Empty, >+ None >+ ) >+ ) >+ messages = List.empty >+ } >+ case _ => logger.error("Incoming message is not Camel Message!") >+ } >+} >\ No newline at end of file > >Added: >esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala >URL: >http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala?rev=1374973&view=auto >============================================================================== >--- >esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala >(added) >+++ >esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala >Mon Aug 20 11:16:34 2012 >@@ -0,0 +1,66 @@ >+package org.apache.esme.actor >+ >+import akka.actor.{ActorRef, Actor, Props => AkkaProps} >+import net.liftweb.util.Props >+import org.apache.esme.actor.XmppReceiver.FetchMessages >+import org.apache.esme.model.User >+import net.liftweb.common.Logger >+ >+/** >+ * Created with IntelliJ IDEA. >+ * User: lester >+ * Date: 17.08.12 >+ * Time: 4:21 >+ */ >+ >+object XmppSupervisor { >+ val logger: Logger = Logger("org.apache.esme.actor") >+ >+ >+ sealed trait XmppSupervisorActions >+ case class Fetch(id: Long) extends XmppSupervisorActions >+ case class Start(id: Long, who: String, usr: User) extends >XmppSupervisorActions >+ case class Stop(id: Long) extends XmppSupervisorActions >+} >+ >+class XmppSupervisor extends Actor { >+ >+ import XmppSupervisor._ >+ >+ private var xmppPullActors: Map[Long, ActorRef] = Map.empty >+ >+ var xmppHost: String = _ >+ var xmppPort: String = _ >+ var xmppUsr: String = _ >+ var xmppPwd: String = _ >+ var xmppServiceName: String = _ >+ >+ >+ override def preStart() { >+ logger.info("preStart() called") >+ >+ xmppHost = Props.get("xmpp.host") openOr "" >+ xmppPort = Props.get("xmpp.port") openOr "" >+ xmppUsr = Props.get("xmpp.user") openOr "" >+ xmppPwd = Props.get("xmpp.password") openOr "" >+ xmppServiceName = Props.get("xmpp.serviceName") openOr "" >+ } >+ >+ def receive = { >+ case Start(id, who, usr) => { >+ logger.info("Start message received. User: %s, who: %s".format(usr, >who)) >+ xmppPullActors += (id -> context.actorOf(AkkaProps(new >XmppReceiver(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName, who, >usr)))) >+ } >+ case Stop(id) => { >+ xmppPullActors.get(id).foreach { ref => >+ context.stop(ref) >+ xmppPullActors -= id >+ } >+ } >+ case Fetch(id) => { >+ logger.info("Fetch message received") >+ xmppPullActors.get(id).foreach(ref => ref ! FetchMessages) >+ } >+ case _ => logger.info("Unknown message received") >+ } >+} > >Modified: esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala >URL: >http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala?rev=1374973&r1=1374972&r2=1374973&view=diff >============================================================================== >--- esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala >(original) >+++ esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala Mon >Aug 20 11:16:34 2012 >@@ -153,7 +153,8 @@ object MsgParser extends TextileParsers( > lazy val password: Parser[String] = user > > lazy val mailtoUrl: Parser[String] = accept("mailto:") ~> emailAddr >- lazy val xmppUrl: Parser[String] = accept("xmppto:") ~> xmppAddr >+ lazy val xmppToUrl: Parser[String] = accept("xmppto:") ~> xmppAddr >+ lazy val xmppFromUrl: Parser[String] = accept("xmppfrom:") ~> xmppAddr > > lazy val emailAddr: Parser[String] = rep1(xchar) ^^ { > case xs => xs.mkString >@@ -268,13 +269,16 @@ object MsgParser extends TextileParsers( > (mailtoUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ { > case mt ~ text => MailTo(mt, text.map(_ mkString)) > }) | >- (xmppUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ { >+ (xmppToUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ { > case mt ~ text => XmppTo(mt, text.map(_ mkString)) > }) | >+ (xmppFromUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ { >+ case mt ~ text => XmppFrom(mt) >+ }) | > (scheme ~ userPass ~ urlpart ~ rep(httpHeader) ~ httpData <~ EOF ^^ { >- case protocol ~ userPass ~ urlpart ~ hdrs ~ data => >- HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data) >- }) | >+ case protocol ~ userPass ~ urlpart ~ hdrs ~ data => >+ HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data) >+ }) | > (acceptCI("atom:") ~> httpUrl ~ tags <~ EOF ^^ { > case url ~ tags => FetchAtom(UrlStore.make(url), tags) > }) | > >Modified: esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala >URL: >http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala?rev=1374973&r1=1374972&r2=1374973&view=diff >============================================================================== >--- esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala >(original) >+++ esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala Mon >Aug 20 11:16:34 2012 >@@ -34,7 +34,15 @@ import java.util.Calendar > import java.util.Date > import scala.xml.{Text, Node, Elem => XmlElem} > >-object Action extends Action with LongKeyedMetaMapper[Action] { >+import akka.actor.{Props => AkkaProps, ActorSystem} >+ >+object Action extends Action with LongKeyedMetaMapper[Action] with Logger { >+ >+ val logger: Logger = Logger("org.apache.esme.model") >+ val sys = ActorSystem("camel") >+ val xmppSupervisor = sys.actorFor("XmppSupervisor") >+ >+ > override def afterCommit = notifyDistributor _ :: super.afterCommit > > private def notifyDistributor(in: Action) { >@@ -58,6 +66,7 @@ object Action extends Action with LongKe > } else { > SchedulerActor ! SchedulerActor.StopRegular(in.id) > MessagePullActor ! MessagePullActor.StopPullActor(in.id) >+ xmppSupervisor ! XmppSupervisor.Stop(in.id) > } > } > >@@ -185,6 +194,9 @@ object Action extends Action with LongKe > */ > class Action extends LongKeyedMapper[Action] { > >+ import Action.xmppSupervisor >+ import Action.logger >+ > /** > * Actors related to regularly executed actions are started here > * This is done when the action is activated or at the start of the > application >@@ -212,7 +224,14 @@ class Action extends LongKeyedMapper[Act > case FetchRss(_, _) => new RssFeed(u, url.url, urlSourcePrefix > + url.uniqueId, 0, tags) > } > MessagePullActor ! MessagePullActor.StartPullActor(id.is, > lastMsg, feed) >- >+ >+ case _ => >+ } >+ } >+ case XmppFrom(who) => { >+ User.find(user) match { >+ case Full(u) => >+ xmppSupervisor ! XmppSupervisor.Start(id.is, who, u) > case _ => > } > } >@@ -569,6 +588,7 @@ sealed trait Performances > case class MailTo(who: String, text: Option[String]) extends Performances > case class HttpTo(url: String, user: String, password: String, headers: > List[(String, String)], data: Option[String]) extends Performances > case class XmppTo(who: String, text: Option[String]) extends Performances >+case class XmppFrom(who: String) extends Performances > case class FetchAtom(override val url: UrlStore, override val tags: > List[String]) extends FetchFeed(url, tags) > case class FetchRss(override val url: UrlStore, override val tags: > List[String]) extends FetchFeed(url, tags) > case object PerformResend extends Performances > >
