Considered Flow.throttle?

On Fri, Jul 8, 2016 at 11:23 AM, Jack Daniels <rus...@walkmind.com> wrote:

> up vote
> down votefavorite
> <http://stackoverflow.com/questions/38233742/how-would-you-connect-many-independent-graphs-maintaining-backpressure-between#>
>
> Hey guys! I continue learning akka-streams and have new question
> <http://stackoverflow.com/q/38233742/226895>.
>
> *Variables:*
>
>    - Single http client flow with throttling
>    - Multiple other flows that want to use first flow simultaneously
>
> *Goal:*
>
> Single http flow is flow that makes requests to particular API that limits
> number of calls to it. Otherwise it bans me. Thus it's very important to
> maintain rate of request regardless of how many clients in my code use it.
>
> There are number of other flows that want to make requests to mentioned
> API but I'd like to have backpressure from http flow. Normally you connect
> whole thing to one graph and it works. But it my case I have multiple
> graphs.
>
> How would you solve it ?
>
> *My attempt to solve it:*
>
> I use Source.queue for http flow so that I can queue http requests and
> have throttling. Problem is that Future from SourceQueue.offer fails if I
> exceed number of requests. Thus somehow I need to "reoffer" when previously
> offered event completes. Thus modified Future from SourceQueue would
> backpressure other graphs (inside their mapAsync) that make http requests.
>
> Here is how I implemented it
>
> object Main {
>
>   implicit val system = ActorSystem("root")
>   implicit val executor = system.dispatcher
>   implicit val materializer = ActorMaterializer()
>
>   private val queueHttp = Source.queue[(String, Promise[String])](2, 
> OverflowStrategy.backpressure)
>     .throttle(1, FiniteDuration(1000, MILLISECONDS), 1, ThrottleMode.Shaping)
>     .mapAsync(4) {
>       case (text, promise) =>
>         // Simulate delay of http request
>         val delay = (Random.nextDouble() * 1000 / 2).toLong
>         Thread.sleep(delay)
>         Future.successful(text -> promise)
>     }
>     .toMat(Sink.foreach({
>       case (text, p) =>
>         p.success(text)
>     }))(Keep.left)
>     .run
>
>   val futureDeque = new ConcurrentLinkedDeque[Future[String]]()
>
>   def sendRequest(value: String): Future[String] = {
>
>     val p = Promise[String]()
>     val offerFuture = queueHttp.offer(value -> p)
>
>     def addToQueue(future: Future[String]): Future[String] = {
>       futureDeque.addLast(future)
>       future.onComplete {
>         case _ => futureDeque.remove(future)
>       }
>       future
>     }
>
>     offerFuture.flatMap {
>       case QueueOfferResult.Enqueued =>
>         addToQueue(p.future)
>     }.recoverWith {
>       case ex =>
>         val first = futureDeque.pollFirst()
>         if (first != null)
>           addToQueue(first.flatMap(_ => sendRequest(value)))
>         else
>           sendRequest(value)
>     }
>   }
>
>   def main(args: Array[String]) {
>
>     val allFutures = for (v <- 0 until 15)
>       yield {
>         val res = sendRequest(s"Text $v")
>         res.onSuccess {
>           case text =>
>             println("> " + text)
>         }
>         res
>       }
>
>     Future.sequence(allFutures).onComplete {
>       case Success(text) =>
>         println(s">>> TOTAL: ${text.length} [in queue: 
> ${futureDeque.size()}]")
>         system.terminate()
>       case Failure(ex) =>
>         ex.printStackTrace()
>         system.terminate()
>     }
>
>     Await.result(system.whenTerminated, Duration.Inf)
>   }}
>
> Disadvantage of this solution is that I have locking on
> ConcurrentLinkedDeque which is probably not that bad for rate of 1
> request per second but still.
>
> How would you solve this task?
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to