Hi Jack, Queue.offer returns a Future, you can use that in a mapAsync in the stream you want to transitively throttle. As you noted, this can fail though, if the queue is full. What I would do is probably just use Future composition (recoverWith in this case) to implement a retry logic (and probably add some delay) with a bound (like 2-3 retires). This have many benefits
- due to how mapAsync works, it will attempt to backpressure individual clients according to the global rate (the offer Future). - it does have a retry, but it has an upper bound on how many times it retries and after it fails/cancels the client connection - there are no unbounded queues -Endre On Wed, Jul 13, 2016 at 2:16 PM, Viktor Klang <[email protected]> wrote: > Ah, sorry, I should've looked more carefully. AFAICT you'll need to figure > out what throttling means for a fan-in operation. > > (Because now you're adding an unbounded queue (the CLQ)) > > On Sun, Jul 10, 2016 at 4:31 PM, Jack Daniels <[email protected]> wrote: > >> Hi! I use it in the example provided above. The problem is how to >> throttle other graphs that are not connected to throttled graph. >> >> On Friday, July 8, 2016 at 2:24:04 PM UTC+3, √ wrote: >>> >>> Considered Flow.throttle? >>> >>> On Fri, Jul 8, 2016 at 11:23 AM, Jack Daniels <[email protected]> >>> wrote: >>> >>>> up vote >>>> down votefavorite >>>> <http://stackoverflow.com/questions/38233742/how-would-you-connect-many-independent-graphs-maintaining-backpressure-between#> >>>> >>>> Hey guys! I continue learning akka-streams and have new question >>>> <http://stackoverflow.com/q/38233742/226895>. >>>> >>>> *Variables:* >>>> >>>> - Single http client flow with throttling >>>> - Multiple other flows that want to use first flow simultaneously >>>> >>>> *Goal:* >>>> >>>> Single http flow is flow that makes requests to particular API that >>>> limits number of calls to it. Otherwise it bans me. Thus it's very >>>> important to maintain rate of request regardless of how many clients in my >>>> code use it. >>>> >>>> There are number of other flows that want to make requests to mentioned >>>> API but I'd like to have backpressure from http flow. Normally you connect >>>> whole thing to one graph and it works. But it my case I have multiple >>>> graphs. >>>> >>>> How would you solve it ? >>>> >>>> *My attempt to solve it:* >>>> >>>> I use Source.queue for http flow so that I can queue http requests and >>>> have throttling. Problem is that Future from SourceQueue.offer fails >>>> if I exceed number of requests. Thus somehow I need to "reoffer" when >>>> previously offered event completes. Thus modified Future from >>>> SourceQueue would backpressure other graphs (inside their mapAsync) >>>> that make http requests. >>>> >>>> Here is how I implemented it >>>> >>>> object Main { >>>> >>>> implicit val system = ActorSystem("root") >>>> implicit val executor = system.dispatcher >>>> implicit val materializer = ActorMaterializer() >>>> >>>> private val queueHttp = Source.queue[(String, Promise[String])](2, >>>> OverflowStrategy.backpressure) >>>> .throttle(1, FiniteDuration(1000, MILLISECONDS), 1, >>>> ThrottleMode.Shaping) >>>> .mapAsync(4) { >>>> case (text, promise) => >>>> // Simulate delay of http request >>>> val delay = (Random.nextDouble() * 1000 / 2).toLong >>>> Thread.sleep(delay) >>>> Future.successful(text -> promise) >>>> } >>>> .toMat(Sink.foreach({ >>>> case (text, p) => >>>> p.success(text) >>>> }))(Keep.left) >>>> .run >>>> >>>> val futureDeque = new ConcurrentLinkedDeque[Future[String]]() >>>> >>>> def sendRequest(value: String): Future[String] = { >>>> >>>> val p = Promise[String]() >>>> val offerFuture = queueHttp.offer(value -> p) >>>> >>>> def addToQueue(future: Future[String]): Future[String] = { >>>> futureDeque.addLast(future) >>>> future.onComplete { >>>> case _ => futureDeque.remove(future) >>>> } >>>> future >>>> } >>>> >>>> offerFuture.flatMap { >>>> case QueueOfferResult.Enqueued => >>>> addToQueue(p.future) >>>> }.recoverWith { >>>> case ex => >>>> val first = futureDeque.pollFirst() >>>> if (first != null) >>>> addToQueue(first.flatMap(_ => sendRequest(value))) >>>> else >>>> sendRequest(value) >>>> } >>>> } >>>> >>>> def main(args: Array[String]) { >>>> >>>> val allFutures = for (v <- 0 until 15) >>>> yield { >>>> val res = sendRequest(s"Text $v") >>>> res.onSuccess { >>>> case text => >>>> println("> " + text) >>>> } >>>> res >>>> } >>>> >>>> Future.sequence(allFutures).onComplete { >>>> case Success(text) => >>>> println(s">>> TOTAL: ${text.length} [in queue: >>>> ${futureDeque.size()}]") >>>> system.terminate() >>>> case Failure(ex) => >>>> ex.printStackTrace() >>>> system.terminate() >>>> } >>>> >>>> Await.result(system.whenTerminated, Duration.Inf) >>>> }} >>>> >>>> Disadvantage of this solution is that I have locking on >>>> ConcurrentLinkedDeque which is probably not that bad for rate of 1 >>>> request per second but still. >>>> >>>> How would you solve this task? >>>> >>>> -- >>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>> >>>>>>>>>> Check the FAQ: >>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>> >>>>>>>>>> Search the archives: >>>> https://groups.google.com/group/akka-user >>>> --- >>>> You received this message because you are subscribed to the Google >>>> Groups "Akka User List" group. >>>> To unsubscribe from this group and stop receiving emails from it, send >>>> an email to [email protected]. >>>> To post to this group, send email to [email protected]. >>>> Visit this group at https://groups.google.com/group/akka-user. >>>> For more options, visit https://groups.google.com/d/optout. >>>> >>> >>> >>> >>> -- >>> Cheers, >>> √ >>> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected]. >> To post to this group, send email to [email protected]. >> Visit this group at https://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > > > > -- > Cheers, > √ > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Akka Team Lightbend <http://www.lightbend.com/> - Reactive apps on the JVM Twitter: @akkateam -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
