Hello akka-user list,

I have had a recent experience of being surprised by the APIs in akka 
surrounding streaming HTTP.  I would like some feedback on the solution that I 
have converged on.  I'll begin with the use case:

- Modules akka-actor, akka-stream, and akka-http-core are used to build a 
streaming data application.
- The application ingests data via a purpose-built agent running on an upstream 
service.  Upstream agent pushes HTTP POST requests encoded, for efficiency, in 
raw bytes.
- Latency between upstream agent and service is low, so only a few inbound TCP 
connections will saturate the 10Gb/s NIC.
- In order to ensure at least once delivery, agent is configured to retry all 
POSTS for which the service does not respond with status 200 (OK).
- In order to ensure at least once delivery, service is configured to sync disk 
before responding with HTTP status 200 (OK).
- Parallel instances of the agent partition the data before transmission.

Given these constraints, it becomes clear that we need to limit the number of 
inbound connections.  Only a few are necessary to saturate the connection, and 
more only increase the CPU and memory used.  As an implementation that makes 
sense to me, HTTP status 503 can be issued to the agent upstream when maximum 
parallelism has been reached.  In order to minimize disorder, the agent should 
be notified as soon as possible of the busy node so the agent can proceed with 
retry to a different node.

Note that since the input is streaming to disk, an akka-streams 
ActorMaterializer as well as the http actor must have its parallelism limited.  
Once ActorMaterializer gets a flow started, it seems tricky to get it to stop 
efficiently.

When I began with the problem, it appeared as very likely that one of the many 
implementations of dispatcher could provide this behavior.  There is much 
language about "bounded mailboxes" and etc.  My three attempts to solve this 
problem went like this:

1) Try to find some Typesafe config, .withDispatcher and related calls to bound 
the parallelism of the actor and ActorMaterializer.  This started taking a long 
time, and all configurations found were enqueuing requests instead of failing.

2) I wrote a custom subclass of ThreadPoolExecutor with a hard maximum thread 
count.  I wrapped it from ExecutionContext.fromExecutor and called my HTTP 
handler.

This approach failed when I realized that even though the actor and 
ActorMaterializer were gated by my counter, my counter was decremented too 
soon.  Remember, I only have to have say 3 connections, so 3 threads are very 
cheap.  I had hoped Async.result could sit and keep a thread running for a 
connection.  But as it turns out, even though all the non-blocking bloggers out 
there say to not call Async.result, I couldn't get Async.result or any related 
API to hold a thread!  It must be doing something clever. . . .

3) Finally, I abandoned the Dispatcher and ExecutionContext APIs entirely, in 
favor of a simple Future:

    import scala.concurrent.{ExecutionContext, Promise, Future}
    import scala.util.Try
    
    class FutureSemaphore(numConcurrentMax : Int, ec : ExecutionContext)
    {
      implicit val _ec = ec
      var numConcurrent = 0
    
      def submit[U](fnf : Unit => Future[U]) : Option[Future[U]] = {
         if(numConcurrent < numConcurrentMax) {
          synchronized {
            numConcurrent = numConcurrent + 1
          }
          val f = fnf()
          val g = Promise[U]()
          f.onComplete{(t:Try[U]) => {
            synchronized {
              numConcurrent = numConcurrent - 1
            }
            g.complete(t)
          }}
          Some(g.future)
        }
        else
          None
      }
    }
While the code is small and seems to work as advertised, I am still confused 
about why this took too long to figure out.  Did I miss something in the 
documentation?

Here is the reference documentation:
3.4 Dispatchers

An Akka MessageDispatcher is what makes Akka Actors “tick”, it is the engine of 
the machine so to speak. All MessageDispatcher implementations are also an 
ExecutionContext, which means that they can be used to execute arbitrary code, 
for instance Futures. 

If it is true that dispatchers cannot bound parallelism, it seems like basic 
design information and the reference should say so.  If alternatively, some 
clever configuration of a dispatcher subclass can bound parallelism and fail 
fast, then the configuration should be shown.  Maybe I missed it.  I dunno.

Thanks in advance for any insights.

Regards,


Jeff Henrikson











-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to