Hello akka-user list,
I have had a recent experience of being surprised by the APIs in akka
surrounding streaming HTTP. I would like some feedback on the solution that I
have converged on. I'll begin with the use case:
- Modules akka-actor, akka-stream, and akka-http-core are used to build a
streaming data application.
- The application ingests data via a purpose-built agent running on an upstream
service. Upstream agent pushes HTTP POST requests encoded, for efficiency, in
raw bytes.
- Latency between upstream agent and service is low, so only a few inbound TCP
connections will saturate the 10Gb/s NIC.
- In order to ensure at least once delivery, agent is configured to retry all
POSTS for which the service does not respond with status 200 (OK).
- In order to ensure at least once delivery, service is configured to sync disk
before responding with HTTP status 200 (OK).
- Parallel instances of the agent partition the data before transmission.
Given these constraints, it becomes clear that we need to limit the number of
inbound connections. Only a few are necessary to saturate the connection, and
more only increase the CPU and memory used. As an implementation that makes
sense to me, HTTP status 503 can be issued to the agent upstream when maximum
parallelism has been reached. In order to minimize disorder, the agent should
be notified as soon as possible of the busy node so the agent can proceed with
retry to a different node.
Note that since the input is streaming to disk, an akka-streams
ActorMaterializer as well as the http actor must have its parallelism limited.
Once ActorMaterializer gets a flow started, it seems tricky to get it to stop
efficiently.
When I began with the problem, it appeared as very likely that one of the many
implementations of dispatcher could provide this behavior. There is much
language about "bounded mailboxes" and etc. My three attempts to solve this
problem went like this:
1) Try to find some Typesafe config, .withDispatcher and related calls to bound
the parallelism of the actor and ActorMaterializer. This started taking a long
time, and all configurations found were enqueuing requests instead of failing.
2) I wrote a custom subclass of ThreadPoolExecutor with a hard maximum thread
count. I wrapped it from ExecutionContext.fromExecutor and called my HTTP
handler.
This approach failed when I realized that even though the actor and
ActorMaterializer were gated by my counter, my counter was decremented too
soon. Remember, I only have to have say 3 connections, so 3 threads are very
cheap. I had hoped Async.result could sit and keep a thread running for a
connection. But as it turns out, even though all the non-blocking bloggers out
there say to not call Async.result, I couldn't get Async.result or any related
API to hold a thread! It must be doing something clever. . . .
3) Finally, I abandoned the Dispatcher and ExecutionContext APIs entirely, in
favor of a simple Future:
import scala.concurrent.{ExecutionContext, Promise, Future}
import scala.util.Try
class FutureSemaphore(numConcurrentMax : Int, ec : ExecutionContext)
{
implicit val _ec = ec
var numConcurrent = 0
def submit[U](fnf : Unit => Future[U]) : Option[Future[U]] = {
if(numConcurrent < numConcurrentMax) {
synchronized {
numConcurrent = numConcurrent + 1
}
val f = fnf()
val g = Promise[U]()
f.onComplete{(t:Try[U]) => {
synchronized {
numConcurrent = numConcurrent - 1
}
g.complete(t)
}}
Some(g.future)
}
else
None
}
}
While the code is small and seems to work as advertised, I am still confused
about why this took too long to figure out. Did I miss something in the
documentation?
Here is the reference documentation:
3.4 Dispatchers
An Akka MessageDispatcher is what makes Akka Actors “tick”, it is the engine of
the machine so to speak. All MessageDispatcher implementations are also an
ExecutionContext, which means that they can be used to execute arbitrary code,
for instance Futures.
If it is true that dispatchers cannot bound parallelism, it seems like basic
design information and the reference should say so. If alternatively, some
clever configuration of a dispatcher subclass can bound parallelism and fail
fast, then the configuration should be shown. Maybe I missed it. I dunno.
Thanks in advance for any insights.
Regards,
Jeff Henrikson
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.