Hi Jeff, I think there is a misunderstanding here. Dispatchers are only thread-pools, and yes, they bound parallelism because there are only finite number of threads in the pool. The OS will only execute at most as many things in parallel as there are threads. However, *concurrency*, the number of actors that can run at any time on one of the pool threads is *unbounded*. This concurrency is not bounded and must not be actually.
> > Given these constraints, it becomes clear that we need to limit the number > of inbound connections. Only a few are necessary to saturate the > connection, and more only increase the CPU and memory used. As an > implementation that makes sense to me, HTTP status 503 can be issued to the > agent upstream when maximum parallelism has been reached. In order to > minimize disorder, the agent should be notified as soon as possible of the > busy node so the agent can proceed with retry to a different node. > You can configure the number of connections the HTTP server accepts though. > > Note that since the input is streaming to disk, an akka-streams > ActorMaterializer as well as the http actor must have its parallelism > limited. Once ActorMaterializer gets a flow started, it seems tricky to > get it to stop efficiently. > No, this is not true. Only the part that is streaming to the disk should be limited in parallelism. You can have either a pool of actors to handle disk streaming for all the parallel streams you have. This pool can be fixed. You can also model the pool with a MergeHub ( http://doc.akka.io/docs/akka/2.4/scala/stream/stream-dynamic.html#Dynamic_fan-in_and_fan-out_with_MergeHub_and_BroadcastHub) (exposing the input side as a Source) and pipe that merged stream into a balanced group of Sinks that stream to disk ( http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers ). > > When I began with the problem, it appeared as very likely that one of the > many implementations of dispatcher could provide this behavior. > No, this is a misunderstanding. Dispatchers are always bounded, they just schedule an unbounded number of actors to the limited set of threads in the pool. This is fully analogous to a PC having a finite number of cores and an unbounded number of threads dynamically mapped to them. > There is much language about "bounded mailboxes" and etc. My three > attempts to solve this problem went like this: > > 1) Try to find some Typesafe config, .withDispatcher and related calls to > bound the parallelism of the actor and ActorMaterializer. This started > taking a long time, and all configurations found were enqueuing requests > instead of failing. > > 2) I wrote a custom subclass of ThreadPoolExecutor with a hard maximum > thread count. I wrapped it from ExecutionContext.fromExecutor and called > my HTTP handler. > This is not how it works. Threads are bounded already, you did not make any difference here. > > This approach failed when I realized that even though the actor and > ActorMaterializer were gated by my counter, my counter was decremented too > soon. Remember, I only have to have say 3 connections, so 3 threads are > very cheap. I had hoped Async.result could sit and keep a thread running > for a connection. But as it turns out, even though all the non-blocking > bloggers out there say to not call Async.result, I couldn't get > Async.result or any related API to hold a thread! It must be doing > something clever. . . . > You mean Await.result? Well, it is designed to protect an underlying threadpool getting all of threads blocked (which is what you think you wanted here). As such, it might launch compensating threads so the rest of the application using the dispatcher is healthy and not stalling due to blocked threads. If you want to dedicate part of your application for actual blocking work (I don't think you need it here though) then you can use the default dispatcher with the "thread-pool-executor" instead of the default fork-join-executor: http://doc.akka.io/docs/akka/2.4/scala/dispatchers.html#Types_of_dispatchers > > 3) Finally, I abandoned the Dispatcher and ExecutionContext APIs entirely, > in favor of a simple Future: > > import scala.concurrent.{ExecutionContext, Promise, Future} > import scala.util.Try > > class FutureSemaphore(numConcurrentMax : Int, ec : ExecutionContext) > { > implicit val _ec = ec > var numConcurrent = 0 > > def submit[U](fnf : Unit => Future[U]) : Option[Future[U]] = { > if(numConcurrent < numConcurrentMax) { > synchronized { > numConcurrent = numConcurrent + 1 > } > val f = fnf() > val g = Promise[U]() > f.onComplete{(t:Try[U]) => { > synchronized { > numConcurrent = numConcurrent - 1 > } > g.complete(t) > }} > Some(g.future) > } > else > None > } > } > > While the code is small and seems to work as advertised, I am still > confused about why this took too long to figure out. Did I miss something > in the documentation? > I think you missed the patters that are relevant to your use case. You want to have a limited amount of running entities streaming to disk. You have to create hence a pool, either using a set of plain actors (maybe behind a router) with a PinnedDispatcher (to dedicate threads), or use a MergeHub based stream pool. > > Here is the reference documentation: > > 3.4 Dispatchers > > An Akka MessageDispatcher is what makes Akka Actors “tick”, it is the > engine of the machine so to speak. All MessageDispatcher implementations > are also an ExecutionContext, which means that they can be used to > execute arbitrary code, for instance Futures. > > If it is true that dispatchers cannot bound parallelism, it seems like > basic design information and the reference should say so. > They do bound parallelism. They don't bound concurrency. You have concurrency with even one CPU core on any modern OS (well, you have it in older ones, too, because of interrupts) but you don't have parallelism. > If alternatively, some clever configuration of a dispatcher subclass can > bound parallelism and fail fast, then the configuration should be shown. > Maybe I missed it. I dunno. > For this problem you should not reach to dispatchers at all, this is a dead-end here. You should pool the resources you want to limit the concurrency of, using the patterns available. -Endre > > Thanks in advance for any insights. > > Regards, > > > Jeff Henrikson > > > > > > > > > > > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/ > current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Akka Team Lightbend <http://www.lightbend.com/> - Reactive apps on the JVM Twitter: @akkateam -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
