This behavior by Source.queue() was not obvious for me too.
I mistakenly assumed futures resulting from overflowing calls to offer() 
will generally remain uncompleted, until backpressuring stops. After 
reading your post and taking a second look, that's clearly not the case. 
Source.queue() seems to be geared towards single-producer use cases.
Lucky for me, I'm using DropNew :)

Perhaps MergeHub.source() will work for you? You will end up with with a 
Sink, which you can then use ad-hoc, e.g.: 
Source.single(foo).runWith(mySink).
This leaves you without the explicit acknowledgement mechanism you have 
available on Source.queue(), but if I'm not mistaken the ad-hoc stream will 
not complete so long as the stream behind the MergeHub is backpressuring. 
If you do need a more explicit acknowledgement mechanism then you can 
supplement your message with a promise, and complete it downstream.


On Tuesday, February 28, 2017 at 9:06:47 AM UTC+2, Marcin Milewski wrote:
>
> Hi
>
> I have a case where I want to use an akka stream with backpressure and 
> insert messages to the stream from multiple threads.
>
> Here is the code to explain better
>
> import akka.actor.ActorSystemimport akka.stream.scaladsl._import 
> akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
> import scala.concurrent.duration._import scala.concurrent.{Await, Future}
> object PublishToSourceQueueFromManyThreads {
>
>   def main(args: Array[String]): Unit = {
>     implicit val system = ActorSystem("QuickStart")
>     implicit val materializer = ActorMaterializer()
>
>     // build the processing pipeline with queue as entry point
>     val queue = Source.queue[Int](bufferSize = 2, 
> OverflowStrategy.backpressure)
>       .groupedWithin(2, 2.seconds)
>       .mapAsyncUnordered(2) { elem =>
>         Future {
>           println(s"${Thread.currentThread().getName} simulating delay, 
> $elem")
>           Thread.sleep(1000L)
>           elem
>         }(scala.concurrent.ExecutionContext.global)
>       }.to(Sink.ignore)
>       .run
>
>     // here we start few threads that push events to the queue in parallel
>     new Thread(() => {
>       while (true) {
>         val offerResult: Future[QueueOfferResult] = queue.offer(1)
>         Await.ready(offerResult, 10.seconds)
>         println(s"${Thread.currentThread().getName} Emitted 1 $offerResult")
>       }
>     }).start()
>
>     new Thread(() => {
>       while (true) {
>         val offerResult: Future[QueueOfferResult] = queue.offer(2)
>         Await.ready(offerResult, 10.seconds)
>         println(s"${Thread.currentThread().getName} Emitted 2 $offerResult")
>       }
>     }).start()
>
>     println("done")
>
>   }
> }
>
> What I get when I execute the code above is
>
> done
> Thread-1 Emitted 2 Future(Success(Enqueued))
> Thread-0 Emitted 1 Future(Success(Enqueued))
> scala-execution-context-global-18 simulating delay, Vector(1, 2)
> Thread-1 Emitted 2 Future(Success(Enqueued))
> Thread-0 Emitted 1 Future(Success(Enqueued))
> Thread-1 Emitted 2 Future(Success(Enqueued))
> scala-execution-context-global-19 simulating delay, Vector(2, 1)
> Thread-0 Emitted 1 Future(Success(Enqueued))
> Thread-1 Emitted 2 Future(Success(Enqueued))
> Thread-0 Emitted 1 Future(Success(Enqueued))
> Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have 
> to wait for previous offer to be resolved to send another request))
> Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have 
> to wait for previous offer to be resolved to send another request))
> Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have 
> to wait for previous offer to be resolved to send another request))
> Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have 
> to wait for previous offer to be resolved to send another request))
> Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have 
> to wait for previous offer to be resolved to send another request))
>
> I checked with the source code of Queue that this is the expected 
> behaviour 
> https://github.com/akka/akka/blob/master/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala#L21-L22
>
>    * - fails when stream is completed or you cannot call offer in this moment 
> because of implementation rules
>    * (like for backpressure mode and full buffer you need to wait for last 
> offer call Future completion)
>
> What I did for now is I wrapped the SourceQueue, used synchronized block 
> and I'm Awaiting result before returning from offerBlocking.
>
> import akka.actor.ActorSystemimport akka.stream.scaladsl._import 
> akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
> import scala.concurrent.duration._import scala.concurrent.{Await, Future, 
> TimeoutException}
> class SyncQueue[T](q: SourceQueue[T]) {
>   /**    * @throws TimeoutException if it couldn't get the value within 
> `maxWait` time    */
>   def offerBlocking(elem: T, maxWait: Duration = 10.seconds): 
> Future[QueueOfferResult] = 
>     synchronized {
>       val result = q.offer(elem)
>       Await.ready(result, maxWait)
>       result
>     }
> }
> object PublishToSourceQueueFromManyThreads {
>
>   def main(args: Array[String]): Unit = {
>     implicit val system = ActorSystem("QuickStart")
>     implicit val materializer = ActorMaterializer()
>
>     // build the queue processing pipeline
>     val queue = Source.queue[Int](bufferSize = 2, 
> OverflowStrategy.backpressure)
>       .groupedWithin(2, 2.seconds)
>       .mapAsyncUnordered(2) { elem =>
>         Future {
>           println(s"${Thread.currentThread().getName} simulating delay, 
> $elem")
>           Thread.sleep(1000L)
>           elem
>         }(scala.concurrent.ExecutionContext.global)
>       }.to(Sink.ignore)
>       .run
>
>     val queue2 = new SyncQueue(queue)
>
>     // here we start few threads that would push events to the queue
>     new Thread(() => {
>       while (true) {
>         val offerResult: Future[QueueOfferResult] = queue2.offerBlocking(1)
>         // Await.ready(offerResult, 10.seconds)
>         println(s"${Thread.currentThread().getName} Emitted 1 $offerResult")
>       }
>     }).start()
>
>     new Thread(() => {
>       while (true) {
>         val offerResult: Future[QueueOfferResult] = queue2.offerBlocking(2)
>         // Await.ready(offerResult, 10.seconds)
>         println(s"${Thread.currentThread().getName} Emitted 2 $offerResult")
>       }
>     }).start()
>
>     println("done")
>   }
> }
>
> The questions is what is the recommended way to do that? It would be a 
> good example to have on the integration page 
> http://doc.akka.io/docs/akka/2.4.17/scala/stream/stream-integrations.html#Source_queue
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to