Hi
I'm using akka BalancingPool <http://doc.akka.io/api/akka/2.3.4/index.html#akka.routing.BalancingPool> to distribute tasks over workers. It works pretty well until I add/remove workers in pool. I wanna do because some of workers are unreliable and bad performing. However, balancing pool send all messages only to one worker after replacement. Here is a scala test for this import scala.concurrent.duration._ import org.scalatest._ import akka.util.Timeout import akka.actor._ import akka.routing._ import akka.testkit._ class BalancingPoolSpec extends TestKit(ActorSystem("BalancingPoolSpec")) with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { override def afterAll { TestKit.shutdownActorSystem(system) } val numberOfTestMessages = 5 val numberOfWorkers = 3 val pool = system.actorOf(BalancingPool(numberOfWorkers).props(Props[ Worker]), "pool") def sendMessagesAndCollectStatistic = { for (i <- 1 to numberOfTestMessages) pool ! "task" (currentRoutes, collectResponces) } def collectResponces = receiveN(numberOfTestMessages, 10.second).groupBy(l => l).map(t => (t._1, t._2.length)) def currentRoutes = { pool ! GetRoutees val Routees(routees) = expectMsgAnyClassOf(classOf[Routees]) routees } def replaceWorkers(oldRoutees: Seq[Routee]) = { //Adding new Routees before removing old ones to make it work :) for (i <- 1 to numberOfWorkers) pool ! AddRoutee(ActorRefRoutee(system. actorOf(Props[Worker]))) for (r <- oldRoutees) pool ! RemoveRoutee(r) Thread.sleep(500) //Give some time to BalancingPool } "test" in { val (routees1, responces1) = sendMessagesAndCollectStatistic replaceWorkers(routees1) val (routees2, responces2) = sendMessagesAndCollectStatistic assert(responces2.size > 1 , s""" Before replacement distribution over ${routees1.size} workers: ${responces1} After replacement distribution over ${routees2.size} workers: ${responces2}""") } } //For each task worker simulate some work for 1 second and sends back to sender worker's id object Worker { var i = 0 def newId = synchronized { i += 1 i } } class Worker extends Actor { val id = Worker.newId def receive = { case _ => Thread.sleep(1000); sender ! id } } The test passes for RoundRobinPool and RandomPool. I did ask it on stackoverflow, but no success so far http://stackoverflow.com/questions/29198908/replacing-workers-in-balancingpool Thanks for any help. -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
