Hi

I'm using akka BalancingPool 
<http://doc.akka.io/api/akka/2.3.4/index.html#akka.routing.BalancingPool> to 
distribute tasks over workers. It works pretty well until I add/remove 
workers in pool. I wanna do because some of workers are unreliable and bad 
performing. However, balancing pool send all messages only to one worker 
after replacement.

Here is a scala test for this



import scala.concurrent.duration._
import org.scalatest._
import akka.util.Timeout
import akka.actor._
import akka.routing._ 
import akka.testkit._


class BalancingPoolSpec extends TestKit(ActorSystem("BalancingPoolSpec")) 
with ImplicitSender
  with WordSpecLike with Matchers with BeforeAndAfterAll {


  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }


  val numberOfTestMessages = 5
  val numberOfWorkers = 3
  val pool = system.actorOf(BalancingPool(numberOfWorkers).props(Props[
Worker]), "pool")


  def sendMessagesAndCollectStatistic = {
    for (i <- 1 to numberOfTestMessages) pool ! "task"
    (currentRoutes, collectResponces)
  }


  def collectResponces = receiveN(numberOfTestMessages, 10.second).groupBy(l 
=> l).map(t => (t._1, t._2.length))


  def currentRoutes = {
    pool ! GetRoutees
    val Routees(routees) = expectMsgAnyClassOf(classOf[Routees])
    routees
  }


  def replaceWorkers(oldRoutees: Seq[Routee]) = {
    //Adding new Routees before removing old ones to make it work :)
    for (i <- 1 to numberOfWorkers) pool ! AddRoutee(ActorRefRoutee(system.
actorOf(Props[Worker])))
    for (r <- oldRoutees) pool ! RemoveRoutee(r)
    Thread.sleep(500) //Give some time to BalancingPool
  }


  "test" in {
    val (routees1, responces1) = sendMessagesAndCollectStatistic
    replaceWorkers(routees1)
    val (routees2, responces2) = sendMessagesAndCollectStatistic


    assert(responces2.size > 1 , s"""
      Before replacement distribution over ${routees1.size} workers: 
${responces1}
      After replacement distribution over ${routees2.size} workers: 
${responces2}""")
  } 
}




//For each task worker simulate some work for 1 second and sends back to 
sender worker's id
object Worker {
  var i = 0
  def newId = synchronized {
    i += 1
    i  
  } 
}


class Worker extends Actor {
  val id = Worker.newId
  def receive = {
    case _ => Thread.sleep(1000); sender ! id
  }
}



The test passes for RoundRobinPool and RandomPool. 

I did ask it on stackoverflow, but no success so far 
http://stackoverflow.com/questions/29198908/replacing-workers-in-balancingpool


Thanks for any help.






-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to