up vote
down votefavorite 
<http://stackoverflow.com/questions/38233742/how-would-you-connect-many-independent-graphs-maintaining-backpressure-between#>

Hey guys! I continue learning akka-streams and have new question 
<http://stackoverflow.com/q/38233742/226895>.

*Variables:*

   - Single http client flow with throttling
   - Multiple other flows that want to use first flow simultaneously

*Goal:*

Single http flow is flow that makes requests to particular API that limits 
number of calls to it. Otherwise it bans me. Thus it's very important to 
maintain rate of request regardless of how many clients in my code use it.

There are number of other flows that want to make requests to mentioned API 
but I'd like to have backpressure from http flow. Normally you connect 
whole thing to one graph and it works. But it my case I have multiple 
graphs.

How would you solve it ?

*My attempt to solve it:*

I use Source.queue for http flow so that I can queue http requests and have 
throttling. Problem is that Future from SourceQueue.offer fails if I exceed 
number of requests. Thus somehow I need to "reoffer" when previously 
offered event completes. Thus modified Future from SourceQueue would 
backpressure other graphs (inside their mapAsync) that make http requests.

Here is how I implemented it

object Main {

  implicit val system = ActorSystem("root")
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  private val queueHttp = Source.queue[(String, Promise[String])](2, 
OverflowStrategy.backpressure)
    .throttle(1, FiniteDuration(1000, MILLISECONDS), 1, ThrottleMode.Shaping)
    .mapAsync(4) {
      case (text, promise) =>
        // Simulate delay of http request
        val delay = (Random.nextDouble() * 1000 / 2).toLong
        Thread.sleep(delay)
        Future.successful(text -> promise)
    }
    .toMat(Sink.foreach({
      case (text, p) =>
        p.success(text)
    }))(Keep.left)
    .run

  val futureDeque = new ConcurrentLinkedDeque[Future[String]]()

  def sendRequest(value: String): Future[String] = {

    val p = Promise[String]()
    val offerFuture = queueHttp.offer(value -> p)

    def addToQueue(future: Future[String]): Future[String] = {
      futureDeque.addLast(future)
      future.onComplete {
        case _ => futureDeque.remove(future)
      }
      future
    }

    offerFuture.flatMap {
      case QueueOfferResult.Enqueued =>
        addToQueue(p.future)
    }.recoverWith {
      case ex =>
        val first = futureDeque.pollFirst()
        if (first != null)
          addToQueue(first.flatMap(_ => sendRequest(value)))
        else
          sendRequest(value)
    }
  }

  def main(args: Array[String]) {

    val allFutures = for (v <- 0 until 15)
      yield {
        val res = sendRequest(s"Text $v")
        res.onSuccess {
          case text =>
            println("> " + text)
        }
        res
      }

    Future.sequence(allFutures).onComplete {
      case Success(text) =>
        println(s">>> TOTAL: ${text.length} [in queue: ${futureDeque.size()}]")
        system.terminate()
      case Failure(ex) =>
        ex.printStackTrace()
        system.terminate()
    }

    Await.result(system.whenTerminated, Duration.Inf)
  }}

Disadvantage of this solution is that I have locking on 
ConcurrentLinkedDeque which is probably not that bad for rate of 1 request 
per second but still.

How would you solve this task?

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to