I'm cross posting this here for better coverage. 

http://stackoverflow.com/questions/25174504/multiple-future-calls-in-an-actors-receive-method


I'm trying to make two external calls (to a Redis database) inside an 
Actor's receive method. Both calls return a Future and I need the result of 
the first Future inside the second. I'm wrapping both calls inside a Redis 
transaction to avoid anyone else from modifying the value in the database 
while I'm reading it.

The internal state of the actor is updated based on the value of the second 
Future.

Here is what my current code looks like which I is incorrect because I'm 
updating the internal state of the actor inside a Future.onComplete
 callback.

I cannot use the PipeTo pattern because I need both both Future have to be 
in a transaction. If I use Await for the first Future then my receive 
method will *block*. Any idea how to fix this ?

My *second question* is related to how I'm using Futures. Is this usage of 
Futures below correct? Is there a better way of dealing with multiple 
Futures in general? Imagine if there were 3 or 4 Future each depending on 
the previous one.

import akka.actor.{Props, ActorLogging, Actor}import akka.util.ByteStringimport 
redis.RedisClient
import scala.concurrent.Futureimport scala.util.{Failure, Success}

object GetSubscriptionsDemo extends App {
  val akkaSystem = akka.actor.ActorSystem("redis-demo")
  val actor = akkaSystem.actorOf(Props(new SimpleRedisActor("localhost", 
"dummyzset")), name = "simpleactor")
  actor ! UpdateState}
case object UpdateState
class SimpleRedisActor(ip: String, key: String) extends Actor with ActorLogging 
{

  //mutable state that is updated on a periodic basis
  var mutableState: Set[String] = Set.empty

  //required by Future
  implicit val ctx = context dispatcher

  var rClient = RedisClient(ip)(context.system)

  def receive = {
    case UpdateState => {
      log.info("Start of UpdateState ...")

      val tran = rClient.transaction()

      val zf: Future[Long] = tran.zcard(key)  //FIRST Future 
      zf.onComplete {

        case Success(z) => {
          //SECOND Future, depends on result of FIRST Future 
          val rf: Future[Seq[ByteString]] = tran.zrange(key, z - 1, z) 
          rf.onComplete {
            case Success(x) => {
              //convert ByteString to UTF8 String
              val v = x.map(_.utf8String)
              log.info(s"Updating state with $v ")
              //update actor's internal state inside callback for a Future
              //IS THIS CORRECT ?
              mutableState ++ v
            }
            case Failure(e) => {
              log.warning("ZRANGE future failed ...", e)
            }
          }
        }
        case Failure(f) => log.warning("ZCARD future failed ...", f)
      }
      tran.exec()

    }
  }
}

The compiles but when I run it gets struck.

2014-08-07  INFO [redis-demo-akka.actor.default-dispatcher-3] a.e.s.Slf4jLogger 
- Slf4jLogger started2014-08-07 04:38:35.106UTC INFO 
[redis-demo-akka.actor.default-dispatcher-3] e.c.s.e.a.g.SimpleRedisActor - 
Start of UpdateState ...2014-08-07 04:38:35.134UTC INFO 
[redis-demo-akka.actor.default-dispatcher-8] r.a.RedisClientActor - Connect to 
localhost/127.0.0.1:63792014-08-07 04:38:35.172UTC INFO 
[redis-demo-akka.actor.default-dispatcher-4] r.a.R

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to