Hi, Thank you for reply. It do work this way. However I can not control with workers I wanna replace in this way.
The reason is why I'm tying to put logic of when worker should be terminated is basically concept separation. In real life low performed workers will be fired by their bosses. But perhaps that is bad point. I did try to make discussion here http://stackoverflow.com/questions/29161763/replacing-bad-performing-workers-in-pool . But there was no any answers. Anyway, thank you for response. I will try to do workers supervising inside the worker and make it work. On Thursday, March 26, 2015 at 4:11:51 PM UTC+1, Patrik Nordwall wrote: > > Hi, > > The reason is that the BalancingPool use a special dispatcher when it > creates the routees. Here you create and add the routees without that. > Try this in instead: > > def replaceWorkers(oldRoutees: Seq[Routee]): Unit = { > pool ! AdjustPoolSize(-numberOfWorkers) > pool ! AdjustPoolSize(numberOfWorkers) > Thread.sleep(500) //Give some time to BalancingPool > } > > However, wouldn't it be better to let the routees throw an exception and > thereby restart? > > /Patrik > > > On Mon, Mar 23, 2015 at 8:13 PM, Stanislav Kurilin <[email protected] > <javascript:>> wrote: > >> Hi >> >> I'm using akka BalancingPool >> <http://doc.akka.io/api/akka/2.3.4/index.html#akka.routing.BalancingPool> to >> distribute tasks over workers. It works pretty well until I add/remove >> workers in pool. I wanna do because some of workers are unreliable and bad >> performing. However, balancing pool send all messages only to one worker >> after replacement. >> >> Here is a scala test for this >> >> >> >> import scala.concurrent.duration._ >> import org.scalatest._ >> import akka.util.Timeout >> import akka.actor._ >> import akka.routing._ >> import akka.testkit._ >> >> >> class BalancingPoolSpec extends TestKit(ActorSystem("BalancingPoolSpec")) >> with ImplicitSender >> with WordSpecLike with Matchers with BeforeAndAfterAll { >> >> >> override def afterAll { >> TestKit.shutdownActorSystem(system) >> } >> >> >> val numberOfTestMessages = 5 >> val numberOfWorkers = 3 >> val pool = system.actorOf(BalancingPool(numberOfWorkers).props(Props[ >> Worker]), "pool") >> >> >> def sendMessagesAndCollectStatistic = { >> for (i <- 1 to numberOfTestMessages) pool ! "task" >> (currentRoutes, collectResponces) >> } >> >> >> def collectResponces = receiveN(numberOfTestMessages, 10.second). >> groupBy(l => l).map(t => (t._1, t._2.length)) >> >> >> def currentRoutes = { >> pool ! GetRoutees >> val Routees(routees) = expectMsgAnyClassOf(classOf[Routees]) >> routees >> } >> >> >> def replaceWorkers(oldRoutees: Seq[Routee]) = { >> //Adding new Routees before removing old ones to make it work :) >> for (i <- 1 to numberOfWorkers) pool ! AddRoutee(ActorRefRoutee( >> system.actorOf(Props[Worker]))) >> for (r <- oldRoutees) pool ! RemoveRoutee(r) >> Thread.sleep(500) //Give some time to BalancingPool >> } >> >> >> "test" in { >> val (routees1, responces1) = sendMessagesAndCollectStatistic >> replaceWorkers(routees1) >> val (routees2, responces2) = sendMessagesAndCollectStatistic >> >> >> assert(responces2.size > 1 , s""" >> Before replacement distribution over ${routees1.size} workers: >> ${responces1} >> After replacement distribution over ${routees2.size} workers: >> ${responces2}""") >> } >> } >> >> >> >> >> //For each task worker simulate some work for 1 second and sends back to >> sender worker's id >> object Worker { >> var i = 0 >> def newId = synchronized { >> i += 1 >> i >> } >> } >> >> >> class Worker extends Actor { >> val id = Worker.newId >> def receive = { >> case _ => Thread.sleep(1000); sender ! id >> } >> } >> >> >> >> The test passes for RoundRobinPool and RandomPool. >> >> I did ask it on stackoverflow, but no success so far >> >> http://stackoverflow.com/questions/29198908/replacing-workers-in-balancingpool >> >> >> Thanks for any help. >> >> >> >> >> >> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected] <javascript:>. >> To post to this group, send email to [email protected] >> <javascript:>. >> Visit this group at http://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > > > > -- > > Patrik Nordwall > Typesafe <http://typesafe.com/> - Reactive apps on the JVM > Twitter: @patriknw > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
