Hi
I have a case where I want to use an akka stream with backpressure and
insert messages to the stream from multiple threads.
Here is the code to explain better
import akka.actor.ActorSystemimport akka.stream.scaladsl._import
akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import scala.concurrent.duration._import scala.concurrent.{Await, Future}
object PublishToSourceQueueFromManyThreads {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
// build the processing pipeline with queue as entry point
val queue = Source.queue[Int](bufferSize = 2, OverflowStrategy.backpressure)
.groupedWithin(2, 2.seconds)
.mapAsyncUnordered(2) { elem =>
Future {
println(s"${Thread.currentThread().getName} simulating delay, $elem")
Thread.sleep(1000L)
elem
}(scala.concurrent.ExecutionContext.global)
}.to(Sink.ignore)
.run
// here we start few threads that push events to the queue in parallel
new Thread(() => {
while (true) {
val offerResult: Future[QueueOfferResult] = queue.offer(1)
Await.ready(offerResult, 10.seconds)
println(s"${Thread.currentThread().getName} Emitted 1 $offerResult")
}
}).start()
new Thread(() => {
while (true) {
val offerResult: Future[QueueOfferResult] = queue.offer(2)
Await.ready(offerResult, 10.seconds)
println(s"${Thread.currentThread().getName} Emitted 2 $offerResult")
}
}).start()
println("done")
}
}
What I get when I execute the code above is
done
Thread-1 Emitted 2 Future(Success(Enqueued))
Thread-0 Emitted 1 Future(Success(Enqueued))
scala-execution-context-global-18 simulating delay, Vector(1, 2)
Thread-1 Emitted 2 Future(Success(Enqueued))
Thread-0 Emitted 1 Future(Success(Enqueued))
Thread-1 Emitted 2 Future(Success(Enqueued))
scala-execution-context-global-19 simulating delay, Vector(2, 1)
Thread-0 Emitted 1 Future(Success(Enqueued))
Thread-1 Emitted 2 Future(Success(Enqueued))
Thread-0 Emitted 1 Future(Success(Enqueued))
Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have to
wait for previous offer to be resolved to send another request))
Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have to
wait for previous offer to be resolved to send another request))
Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have to
wait for previous offer to be resolved to send another request))
Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have to
wait for previous offer to be resolved to send another request))
Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have to
wait for previous offer to be resolved to send another request))
I checked with the source code of Queue that this is the expected behaviour
https://github.com/akka/akka/blob/master/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala#L21-L22
* - fails when stream is completed or you cannot call offer in this moment
because of implementation rules
* (like for backpressure mode and full buffer you need to wait for last
offer call Future completion)
What I did for now is I wrapped the SourceQueue, used synchronized block
and I'm Awaiting result before returning from offerBlocking.
import akka.actor.ActorSystemimport akka.stream.scaladsl._import
akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import scala.concurrent.duration._import scala.concurrent.{Await, Future,
TimeoutException}
class SyncQueue[T](q: SourceQueue[T]) {
/** * @throws TimeoutException if it couldn't get the value within
`maxWait` time */
def offerBlocking(elem: T, maxWait: Duration = 10.seconds):
Future[QueueOfferResult] =
synchronized {
val result = q.offer(elem)
Await.ready(result, maxWait)
result
}
}
object PublishToSourceQueueFromManyThreads {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
// build the queue processing pipeline
val queue = Source.queue[Int](bufferSize = 2, OverflowStrategy.backpressure)
.groupedWithin(2, 2.seconds)
.mapAsyncUnordered(2) { elem =>
Future {
println(s"${Thread.currentThread().getName} simulating delay, $elem")
Thread.sleep(1000L)
elem
}(scala.concurrent.ExecutionContext.global)
}.to(Sink.ignore)
.run
val queue2 = new SyncQueue(queue)
// here we start few threads that would push events to the queue
new Thread(() => {
while (true) {
val offerResult: Future[QueueOfferResult] = queue2.offerBlocking(1)
// Await.ready(offerResult, 10.seconds)
println(s"${Thread.currentThread().getName} Emitted 1 $offerResult")
}
}).start()
new Thread(() => {
while (true) {
val offerResult: Future[QueueOfferResult] = queue2.offerBlocking(2)
// Await.ready(offerResult, 10.seconds)
println(s"${Thread.currentThread().getName} Emitted 2 $offerResult")
}
}).start()
println("done")
}
}
The questions is what is the recommended way to do that? It would be a good
example to have on the integration page
http://doc.akka.io/docs/akka/2.4.17/scala/stream/stream-integrations.html#Source_queue
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.