Hello Carl,

I think the problem is in 

for (msg <- msgs) {
  pub.sendNext(msg)
}


You send too much messages to downstream and these messages overflows the 
buffer. You should analyze how much is requested from downstream and sent 
not more that requested. In you last example Source takes care about 
backpressure.

I think in your scenario you simply dont need PublisherProbe and can use 
prebuilt Source

суббота, 14 февраля 2015 г., 19:38:24 UTC+6 пользователь Carl Pulley 
написал:
>
> I was hoping someone might be able to shed some light on an odd issue I'm 
> observing with Akka streaming (version 1.0-M3).
>
> I've defined a PushPullStage implementation of a sliding window (see 
> https://github.com/carlpulley/lift/blob/model/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/workflows/SlidingWindow.scala
>  
> for its implementation details).
>
> Now, depending on how I construct my tests, my testing of the flow for the 
> sliding window either succeeds or fails!
>
> If I define the flow for testing as follows:
>
> def sample1(in: Source[String], out: Sink[List[String]]) =
>   Flow[String].transform(() => SlidingWindow[String](windowSize)).runWith(in, 
> out)
>
>
> and then test that code as follows:
>
> val limit = 1000
> val msgs = (0 to limit).map(n => s"message-$n").toList
> // Simulate source that outputs messages and then blocks
> val in = PublisherProbe[String]()
> val out = SubscriberProbe[List[String]]()
>
> sample1(Source(in), Sink(out))
> val pub = in.expectSubscription()
> val sub = out.expectSubscription()
> sub.request(msgs.length)
> for (msg <- msgs) {
>   pub.sendNext(msg)
> }
>
> for (n <- 0 to (limit - windowSize + 1)) {
>   out.expectNext(msgs.slice(n, n+windowSize))
> }
> out.expectNoMsg() // since buffer is saturated and no more messages are 
> arriving
>
>
> Then the test passes with no issues.
>
> Using the following flow definition for testing:
>
> def sample2(in: Source[String], out: Sink[List[String]]) =
>   Flow[String].transform(() => SlidingWindow[String](windowSize)).map { x => 
> x }.runWith(in, out)
>
>
> I get the following exception:
> [ERROR] [02/14/2015 13:32:49.114] 
> [SlidingWindowTest-akka.actor.default-dispatcher-4] 
> [akka://SlidingWindowTest/user/$a/flow-3-1-stageFactory] Input buffer 
> overrun
> java.lang.IllegalStateException: Input buffer overrun
> at 
> akka.stream.impl.fusing.BatchingActorInputBoundary.akka$stream$impl$fusing$BatchingActorInputBoundary$$enqueue(ActorInterpreter.scala:61)
> at 
> akka.stream.impl.fusing.BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(ActorInterpreter.scala:130)
> at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at akka.stream.impl.SubReceive.apply(Transfer.scala:16)
> at akka.stream.impl.SubReceive.apply(Transfer.scala:12)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:12)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> akka.stream.impl.fusing.ActorInterpreter.aroundReceive(ActorInterpreter.scala:278)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> If I now test the code as follows:
>
> val limit = 1000
> val msgs = (0 to limit).map(n => s"message-$n").toList
> val out = SubscriberProbe[List[String]]()
>
> sample2(Source(msgs), Sink(out))
> val sub = out.expectSubscription()
> sub.request(msgs.length)
>
> for (n <- 0 to (limit - windowSize + 1)) {
>   out.expectNext(msgs.slice(n, n+windowSize))
> }
> // since iterable source closes, and no more messages are arriving, contents 
> will flush out
> for (n <- (limit - windowSize + 1 + 1) to limit) {
>   out.expectNext(msgs.slice(n, n+windowSize))
> }
>
>
> The test passes!
>
> Many thanks for any help or clarification with this,
>
>   Carl.
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to