I was hoping someone might be able to shed some light on an odd issue I'm 
observing with Akka streaming (version 1.0-M3).

I've defined a PushPullStage implementation of a sliding window (see 
https://github.com/carlpulley/lift/blob/model/server/exercise/src/main/scala/com/eigengo/lift/exercise/classifiers/workflows/SlidingWindow.scala
 
for its implementation details).

Now, depending on how I construct my tests, my testing of the flow for the 
sliding window either succeeds or fails!

If I define the flow for testing as follows:

def sample1(in: Source[String], out: Sink[List[String]]) =
  Flow[String].transform(() => SlidingWindow[String](windowSize)).runWith(in, 
out)


and then test that code as follows:

val limit = 1000
val msgs = (0 to limit).map(n => s"message-$n").toList
// Simulate source that outputs messages and then blocks
val in = PublisherProbe[String]()
val out = SubscriberProbe[List[String]]()

sample1(Source(in), Sink(out))
val pub = in.expectSubscription()
val sub = out.expectSubscription()
sub.request(msgs.length)
for (msg <- msgs) {
  pub.sendNext(msg)
}

for (n <- 0 to (limit - windowSize + 1)) {
  out.expectNext(msgs.slice(n, n+windowSize))
}
out.expectNoMsg() // since buffer is saturated and no more messages are arriving


Then the test passes with no issues.

Using the following flow definition for testing:

def sample2(in: Source[String], out: Sink[List[String]]) =
  Flow[String].transform(() => SlidingWindow[String](windowSize)).map { x => x 
}.runWith(in, out)


I get the following exception:
[ERROR] [02/14/2015 13:32:49.114] 
[SlidingWindowTest-akka.actor.default-dispatcher-4] 
[akka://SlidingWindowTest/user/$a/flow-3-1-stageFactory] Input buffer 
overrun
java.lang.IllegalStateException: Input buffer overrun
at 
akka.stream.impl.fusing.BatchingActorInputBoundary.akka$stream$impl$fusing$BatchingActorInputBoundary$$enqueue(ActorInterpreter.scala:61)
at 
akka.stream.impl.fusing.BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(ActorInterpreter.scala:130)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at akka.stream.impl.SubReceive.apply(Transfer.scala:16)
at akka.stream.impl.SubReceive.apply(Transfer.scala:12)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:12)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
akka.stream.impl.fusing.ActorInterpreter.aroundReceive(ActorInterpreter.scala:278)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

If I now test the code as follows:

val limit = 1000
val msgs = (0 to limit).map(n => s"message-$n").toList
val out = SubscriberProbe[List[String]]()

sample2(Source(msgs), Sink(out))
val sub = out.expectSubscription()
sub.request(msgs.length)

for (n <- 0 to (limit - windowSize + 1)) {
  out.expectNext(msgs.slice(n, n+windowSize))
}
// since iterable source closes, and no more messages are arriving, contents 
will flush out
for (n <- (limit - windowSize + 1 + 1) to limit) {
  out.expectNext(msgs.slice(n, n+windowSize))
}


The test passes!

Many thanks for any help or clarification with this,

  Carl.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to