Hi,

The reason is that the BalancingPool use a special dispatcher when it
creates the routees. Here you create and add the routees without that.
Try this in instead:

  def replaceWorkers(oldRoutees: Seq[Routee]): Unit = {
    pool ! AdjustPoolSize(-numberOfWorkers)
    pool ! AdjustPoolSize(numberOfWorkers)
    Thread.sleep(500) //Give some time to BalancingPool
  }

However, wouldn't it be better to let the routees throw an exception and
thereby restart?

/Patrik


On Mon, Mar 23, 2015 at 8:13 PM, Stanislav Kurilin <[email protected]>
wrote:

> Hi
>
> I'm using akka BalancingPool
> <http://doc.akka.io/api/akka/2.3.4/index.html#akka.routing.BalancingPool> to
> distribute tasks over workers. It works pretty well until I add/remove
> workers in pool. I wanna do because some of workers are unreliable and bad
> performing. However, balancing pool send all messages only to one worker
> after replacement.
>
> Here is a scala test for this
>
>
>
> import scala.concurrent.duration._
> import org.scalatest._
> import akka.util.Timeout
> import akka.actor._
> import akka.routing._
> import akka.testkit._
>
>
> class BalancingPoolSpec extends TestKit(ActorSystem("BalancingPoolSpec"))
> with ImplicitSender
>   with WordSpecLike with Matchers with BeforeAndAfterAll {
>
>
>   override def afterAll {
>     TestKit.shutdownActorSystem(system)
>   }
>
>
>   val numberOfTestMessages = 5
>   val numberOfWorkers = 3
>   val pool = system.actorOf(BalancingPool(numberOfWorkers).props(Props[
> Worker]), "pool")
>
>
>   def sendMessagesAndCollectStatistic = {
>     for (i <- 1 to numberOfTestMessages) pool ! "task"
>     (currentRoutes, collectResponces)
>   }
>
>
>   def collectResponces = receiveN(numberOfTestMessages, 10.second).groupBy
> (l => l).map(t => (t._1, t._2.length))
>
>
>   def currentRoutes = {
>     pool ! GetRoutees
>     val Routees(routees) = expectMsgAnyClassOf(classOf[Routees])
>     routees
>   }
>
>
>   def replaceWorkers(oldRoutees: Seq[Routee]) = {
>     //Adding new Routees before removing old ones to make it work :)
>     for (i <- 1 to numberOfWorkers) pool ! AddRoutee(ActorRefRoutee(system
> .actorOf(Props[Worker])))
>     for (r <- oldRoutees) pool ! RemoveRoutee(r)
>     Thread.sleep(500) //Give some time to BalancingPool
>   }
>
>
>   "test" in {
>     val (routees1, responces1) = sendMessagesAndCollectStatistic
>     replaceWorkers(routees1)
>     val (routees2, responces2) = sendMessagesAndCollectStatistic
>
>
>     assert(responces2.size > 1 , s"""
>       Before replacement distribution over ${routees1.size} workers:
> ${responces1}
>       After replacement distribution over ${routees2.size} workers:
> ${responces2}""")
>   }
> }
>
>
>
>
> //For each task worker simulate some work for 1 second and sends back to
> sender worker's id
> object Worker {
>   var i = 0
>   def newId = synchronized {
>     i += 1
>     i
>   }
> }
>
>
> class Worker extends Actor {
>   val id = Worker.newId
>   def receive = {
>     case _ => Thread.sleep(1000); sender ! id
>   }
> }
>
>
>
> The test passes for RoundRobinPool and RandomPool.
>
> I did ask it on stackoverflow, but no success so far
>
> http://stackoverflow.com/questions/29198908/replacing-workers-in-balancingpool
>
>
> Thanks for any help.
>
>
>
>
>
>
>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
Twitter: @patriknw

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to