Remove!

Sent from my Verizon Wireless Phone

[email protected] wrote:

>Author: lester
>Date: Mon Aug 20 11:16:34 2012
>New Revision: 1374973
>
>URL: http://svn.apache.org/viewvc?rev=1374973&view=rev
>Log:
>In scope of ESME-360: Began work  on XMPP Consumer component action.
>
>Added:
>    esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
>    
> esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
>Modified:
>    esme/branches/akka/build.sbt
>    esme/branches/akka/pom.xml
>    esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala
>    esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala
>    esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
>    esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala
>
>Modified: esme/branches/akka/build.sbt
>URL: 
>http://svn.apache.org/viewvc/esme/branches/akka/build.sbt?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/build.sbt (original)
>+++ esme/branches/akka/build.sbt Mon Aug 20 11:16:34 2012
>@@ -8,7 +8,7 @@ version := "1.4"
> 
> organization := "Apache Software Foundation"
> 
>-scalaVersion := "2.9.1"    
>+scalaVersion := "2.9.1"
> 
> //scalacOptions ++= Seq("-unchecked", "-deprecation")  
> scalacOptions ++= Seq("-deprecation") 
>@@ -28,7 +28,7 @@ libraryDependencies ++= {
>   val compassVersion = "2.1.1"
>   val luceneVersion = "2.4.0"
>   val scalazVersion = "6.0.4"
>-  val akkaVersion = "2.0.2"
>+  val akkaVersion = "2.1-20120701-002745"
>   val eclipsejettyVersion = "7.3.1.v20110307"
>   val mortbayjettyVersion = "6.1.22"
>   val slf4jVersion = "1.6.4" 
>@@ -47,7 +47,7 @@ libraryDependencies ++= {
>     "net.liftweb" %% "lift-textile" % liftVersion % "compile->default",
>     "org.scalaz" %% "scalaz-core" % scalazVersion % "compile->default",
>     "com.typesafe.akka" % "akka-actor" % akkaVersion % "compile->default",
>-    "com.typesafe.akka" % "akka-camel" % "2.1-SNAPSHOT" % "compile->default",
>+    "com.typesafe.akka" % "akka-camel" % "2.1-20120701-002745" % 
>"compile->default",
>     "org.apache.camel" % "camel-xmpp" % "2.8.0" % "compile->default",
>     "javax.servlet" % "servlet-api" % "2.5" % "provided->default",
>     "org.compass-project" % "compass" % compassVersion % "compile->default",
>
>Modified: esme/branches/akka/pom.xml
>URL: 
>http://svn.apache.org/viewvc/esme/branches/akka/pom.xml?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/pom.xml (original)
>+++ esme/branches/akka/pom.xml Mon Aug 20 11:16:34 2012
>@@ -76,7 +76,7 @@
>         <lift.version>2.4</lift.version>
>         <scala.version>2.9.1</scala.version>
>         <scalaz.version>6.0.4</scalaz.version>
>-        <akka.version>2.1-SNAPSHOT</akka.version>
>+        <akka.version>2.1-20120701-002745</akka.version>
>         <compass.version>2.1.1</compass.version>
>         <lucene.version>2.4.0</lucene.version>
>         <netbeans.hint.deploy.server>gfv3</netbeans.hint.deploy.server>
>@@ -91,17 +91,6 @@
>             <url>http://repo2.maven.org/maven2/</url>
>         </repository>
>         <repository>
>-            <id>scala-tools.org</id>
>-            <name>Scala-Tools Maven2 Repository</name>
>-            <url>http://scala-tools.org/repo-releases</url>
>-        </repository>
>-        <repository>
>-            <id>scala-tools.org.snapshots</id>
>-            <name>Scala-Tools Maven2 Repository for Snapshots</name>
>-            <url>http://scala-tools.org/repo-snapshots</url>
>-            <snapshots/>
>-        </repository>
>-        <repository>
>             <id>typesafe</id>
>             <name>Typesafe Repository Releases</name>
>             <url>http://repo.typesafe.com/typesafe/releases</url>
>@@ -135,9 +124,9 @@
> 
>     <pluginRepositories>
>         <pluginRepository>
>-            <id>scala-tools.org</id>
>-            <name>Scala-Tools Maven2 Repository</name>
>-            <url>http://scala-tools.org/repo-releases</url>
>+            <id>typesafe</id>
>+            <name>Typesafe Repository Releases</name>
>+            <url>http://repo.typesafe.com/typesafe/releases</url>
>         </pluginRepository>
>     </pluginRepositories>
> 
>@@ -232,7 +221,7 @@
>         <dependency>
>             <groupId>com.typesafe.akka</groupId>
>             <artifactId>akka-camel</artifactId>
>-            <version>2.1-SNAPSHOT</version>
>+            <version>${akka.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.camel</groupId>
>
>Modified: esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala
>URL: 
>http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala (original)
>+++ esme/branches/akka/src/main/scala/bootstrap/liftweb/Boot.scala Mon Aug 20 
>11:16:34 2012
>@@ -27,6 +27,8 @@ import net.liftweb.http.auth._
> import net.liftweb.sitemap._
> import net.liftweb.sitemap.Loc._
> import Helpers._
>+import akka.actor.{Props => AkkaProps, ActorSystem}
>+
> //import TimeHelpers.intToTimeSpanBuilder
> //import net.liftweb.mapper.{DB, ConnectionManager, Schemifier, 
> DefaultConnectionIdentifier, ConnectionIdentifier}
> import java.sql.{Connection, DriverManager}
>@@ -249,6 +251,8 @@ class Boot extends Loggable {
>     ConvDistributor.touch
>     // ScalaInterpreter.touch
> 
>+    val sys = ActorSystem("camel")
>+    val xmppSupervisor = sys.actorOf(AkkaProps(new XmppSupervisor()), 
>"XmppSupervisor")
> 
>     // Initiating popular links and resent messages
>     val resentPeriod = Props.getLong("stats.resent.period", 1 week)
>
>Modified: 
>esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala
>URL: 
>http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala 
>(original)
>+++ esme/branches/akka/src/main/scala/org/apache/esme/actor/UserActor.scala 
>Mon Aug 20 11:16:34 2012
>@@ -64,8 +64,8 @@ object UserActor {
>   val xmppUsr = Props.get("xmpp.user") openOr ""
>   val xmppPwd = Props.get("xmpp.password") openOr ""
>   val xmppServiceName = Props.get("xmpp.serviceName") openOr ""
>-  lazy val sys = ActorSystem("camel")
>-  lazy val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost, 
>xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)))
>+  val sys = ActorSystem("camel")
>+  val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost, 
>xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)), "XmppSender")
> }
> 
> 
>@@ -289,8 +289,14 @@ class UserActor extends LiftActor {
>               Distributor !
>               Distributor.AddMessageToMailbox(id, msg, ResendReason(userId))
> 
>-            case FetchFeed(_, _) => 
>-              MessagePullActor ! MessagePullActor.Fetch(td.performId)    
>+            case XmppFrom(_) => {
>+              val sys = ActorSystem("camel")
>+              sys.actorFor("XmppSupervisor") ! 
>XmppSupervisor.Fetch(td.performId)
>+            }
>+
>+
>+            case FetchFeed(_, _) =>
>+              MessagePullActor ! MessagePullActor.Fetch(td.performId)
> 
>             case ScalaInterpret => logger.info("Scala interpreter is 
> disabled!")
>             /*if (msg.source.is != "scala")
>
>Added: 
>esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
>URL: 
>http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala?rev=1374973&view=auto
>==============================================================================
>--- esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala 
>(added)
>+++ esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppReceiver.scala 
>Mon Aug 20 11:16:34 2012
>@@ -0,0 +1,51 @@
>+package org.apache.esme.actor
>+
>+import akka.camel.{CamelMessage, Consumer}
>+
>+import net.liftweb.common.{Empty, Logger}
>+import collection.immutable.Queue
>+import org.apache.esme.actor.Distributor.UserCreatedMessage
>+import org.apache.esme.model.User
>+
>+/**
>+ * Created with IntelliJ IDEA.
>+ * User: lester
>+ * Date: 17.08.12
>+ * Time: 2:26
>+ */
>+
>+object XmppReceiver {
>+  val logger: Logger = Logger("org.apache.esme.actor")
>+  case class FetchMessages()
>+}
>+
>+class XmppReceiver(esmeSrv: String, esmePort: Int, esmeUsr: String, esmePwd: 
>String, xmppServiceName: String, participant: String, user: User) extends 
>Consumer {
>+
>+  import XmppReceiver._
>+
>+  var messages: List[(String, Long)] = List.empty
>+
>+  def endpointUri = {val uri = "xmpp://%s@%s:%s/%s?password=%s" format 
>(esmeUsr, esmeSrv, esmePort, participant, esmePwd); logger.info("XMPP URI is: 
>%s".format(uri)); uri}
>+
>+  def receive = {
>+    case msg: CamelMessage => {
>+      messages = (msg.bodyAs[String], System.currentTimeMillis) :: messages
>+    }
>+    case FetchMessages => {
>+      messages.foreach(message =>
>+        Distributor ! UserCreatedMessage(
>+          if (user != null) {user.id} else 0,
>+          message._1,
>+          Nil,
>+          message._2,
>+          Empty,
>+          participant,
>+          Empty,
>+          None
>+        )
>+      )
>+      messages = List.empty
>+    }
>+    case _ => logger.error("Incoming message is not Camel Message!")
>+  }
>+}
>\ No newline at end of file
>
>Added: 
>esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
>URL: 
>http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala?rev=1374973&view=auto
>==============================================================================
>--- 
>esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala 
>(added)
>+++ 
>esme/branches/akka/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala 
>Mon Aug 20 11:16:34 2012
>@@ -0,0 +1,66 @@
>+package org.apache.esme.actor
>+
>+import akka.actor.{ActorRef, Actor, Props => AkkaProps}
>+import net.liftweb.util.Props
>+import org.apache.esme.actor.XmppReceiver.FetchMessages
>+import org.apache.esme.model.User
>+import net.liftweb.common.Logger
>+
>+/**
>+ * Created with IntelliJ IDEA.
>+ * User: lester
>+ * Date: 17.08.12
>+ * Time: 4:21
>+ */
>+
>+object XmppSupervisor {
>+  val logger: Logger = Logger("org.apache.esme.actor")
>+
>+
>+  sealed trait XmppSupervisorActions
>+  case class Fetch(id: Long) extends XmppSupervisorActions
>+  case class Start(id: Long, who: String, usr: User) extends 
>XmppSupervisorActions
>+  case class Stop(id: Long) extends XmppSupervisorActions
>+}
>+
>+class XmppSupervisor extends Actor {
>+
>+  import XmppSupervisor._
>+
>+  private var xmppPullActors: Map[Long, ActorRef] = Map.empty
>+
>+  var xmppHost: String = _
>+  var xmppPort: String = _
>+  var xmppUsr: String = _
>+  var xmppPwd: String = _
>+  var xmppServiceName: String = _
>+
>+
>+  override def preStart() {
>+    logger.info("preStart() called")
>+
>+    xmppHost = Props.get("xmpp.host") openOr ""
>+    xmppPort = Props.get("xmpp.port") openOr ""
>+    xmppUsr = Props.get("xmpp.user") openOr ""
>+    xmppPwd = Props.get("xmpp.password") openOr ""
>+    xmppServiceName = Props.get("xmpp.serviceName") openOr ""
>+  }
>+
>+  def receive = {
>+    case Start(id, who, usr) => {
>+      logger.info("Start message received. User: %s, who: %s".format(usr, 
>who))
>+      xmppPullActors += (id -> context.actorOf(AkkaProps(new 
>XmppReceiver(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName, who, 
>usr))))
>+    }
>+    case Stop(id) => {
>+      xmppPullActors.get(id).foreach { ref =>
>+        context.stop(ref)
>+        xmppPullActors -= id
>+      }
>+    }
>+    case Fetch(id) => {
>+      logger.info("Fetch message received")
>+      xmppPullActors.get(id).foreach(ref => ref ! FetchMessages)
>+    }
>+    case _ => logger.info("Unknown message received")
>+  }
>+}
>
>Modified: esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala
>URL: 
>http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala 
>(original)
>+++ esme/branches/akka/src/main/scala/org/apache/esme/lib/MsgParser.scala Mon 
>Aug 20 11:16:34 2012
>@@ -153,7 +153,8 @@ object MsgParser extends TextileParsers(
>   lazy val password: Parser[String] = user
> 
>   lazy val mailtoUrl: Parser[String] = accept("mailto:";) ~> emailAddr
>-  lazy val xmppUrl: Parser[String] = accept("xmppto:") ~> xmppAddr
>+  lazy val xmppToUrl: Parser[String] = accept("xmppto:") ~> xmppAddr
>+  lazy val xmppFromUrl: Parser[String] = accept("xmppfrom:") ~> xmppAddr
> 
>   lazy val emailAddr: Parser[String] = rep1(xchar) ^^ {
>     case xs => xs.mkString
>@@ -268,13 +269,16 @@ object MsgParser extends TextileParsers(
>   (mailtoUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
>     case mt ~ text => MailTo(mt, text.map(_ mkString))
>   }) |
>-  (xmppUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
>+  (xmppToUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
>     case mt ~ text => XmppTo(mt, text.map(_ mkString))
>   }) |
>+  (xmppFromUrl ~ opt(rep1(not(EOF) ~ EOL) ~> rep1(anyChar)) <~ EOF ^^ {
>+    case mt ~ text => XmppFrom(mt)
>+  }) |
>   (scheme ~ userPass ~ urlpart ~ rep(httpHeader) ~ httpData <~ EOF ^^ {
>-      case protocol ~ userPass ~ urlpart ~ hdrs ~ data =>
>-        HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data)
>-    }) |
>+    case protocol ~ userPass ~ urlpart ~ hdrs ~ data =>
>+      HttpTo(protocol + urlpart, userPass._1, userPass._2, hdrs, data)
>+  }) |
>   (acceptCI("atom:") ~> httpUrl ~ tags <~ EOF ^^ {
>     case url ~ tags => FetchAtom(UrlStore.make(url), tags)
>   }) |
>
>Modified: esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala
>URL: 
>http://svn.apache.org/viewvc/esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala?rev=1374973&r1=1374972&r2=1374973&view=diff
>==============================================================================
>--- esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala 
>(original)
>+++ esme/branches/akka/src/main/scala/org/apache/esme/model/Action.scala Mon 
>Aug 20 11:16:34 2012
>@@ -34,7 +34,15 @@ import java.util.Calendar
> import java.util.Date
> import scala.xml.{Text, Node, Elem => XmlElem}
> 
>-object Action extends Action with LongKeyedMetaMapper[Action] {
>+import akka.actor.{Props => AkkaProps, ActorSystem}
>+
>+object Action extends Action with LongKeyedMetaMapper[Action] with Logger {
>+
>+  val logger: Logger = Logger("org.apache.esme.model")
>+  val sys = ActorSystem("camel")
>+  val xmppSupervisor = sys.actorFor("XmppSupervisor")
>+
>+
>   override def afterCommit = notifyDistributor _ :: super.afterCommit
> 
>   private def notifyDistributor(in: Action) {
>@@ -58,6 +66,7 @@ object Action extends Action with LongKe
>     } else {
>       SchedulerActor ! SchedulerActor.StopRegular(in.id)
>       MessagePullActor ! MessagePullActor.StopPullActor(in.id)
>+      xmppSupervisor ! XmppSupervisor.Stop(in.id)
>     }
>   }
>   
>@@ -185,6 +194,9 @@ object Action extends Action with LongKe
>  */
> class Action extends LongKeyedMapper[Action] {
> 
>+  import Action.xmppSupervisor
>+  import Action.logger
>+
>   /**
>    * Actors related to regularly executed actions are started here
>    * This is done when the action is activated or at the start of the 
> application
>@@ -212,7 +224,14 @@ class Action extends LongKeyedMapper[Act
>               case FetchRss(_, _) => new RssFeed(u, url.url, urlSourcePrefix 
> + url.uniqueId, 0, tags)
>             }
>             MessagePullActor ! MessagePullActor.StartPullActor(id.is, 
> lastMsg, feed)
>-          
>+
>+          case _ =>
>+        }
>+      }
>+      case XmppFrom(who) => {
>+        User.find(user) match {
>+          case Full(u) =>
>+            xmppSupervisor ! XmppSupervisor.Start(id.is, who, u)
>           case _ =>
>         }
>       }
>@@ -569,6 +588,7 @@ sealed trait Performances
> case class MailTo(who: String, text: Option[String]) extends Performances
> case class HttpTo(url: String, user: String, password: String, headers: 
> List[(String, String)], data: Option[String]) extends Performances
> case class XmppTo(who: String, text: Option[String]) extends Performances
>+case class XmppFrom(who: String) extends Performances
> case class FetchAtom(override val url: UrlStore, override val tags: 
> List[String]) extends FetchFeed(url, tags)
> case class FetchRss(override val url: UrlStore, override val tags: 
> List[String]) extends FetchFeed(url, tags)
> case object PerformResend extends Performances
>
>

Reply via email to