Hi-

I've encountered an issue with processing a stream that I fan out via 
broadcast and fan in via zip.
The broadcast splits the stream in two with one branch containing a drop 
element.
According to my read of the docs, I would expect the terminating zip to 
complete when the shorter of the two streams (the one with the drop) 
completes.
However, the flow hangs waiting indefinitely.

Here's the relevant part of a test case I put together to reproduce the 
problem.
Note that the flow without the drop (the first flow) works fine with 
different length streams.
What am I doing wrong?

Akka Stream Version: 1.0-RC2

Thanks,
Lance

  // This flow works fine
  def zipSource(num: Int, diff: Int) = Source() { implicit b =>
    import akka.stream.scaladsl.FlowGraph.Implicits._

    val source0 = b.add(Source(1 to num))
    val source1 = b.add(Source(1 to (num + diff)))
    val zip = b.add(Zip[Int, Int])

    source0 ~> zip.in0
    source1 ~> zip.in1

    (zip.out)
  }

  // This flow waits indefinitely when diff > 0
  def zipDropSource(num: Int, diff: Int) = Source() {  implicit b =>
    import akka.stream.scaladsl.FlowGraph.Implicits._

    val source = b.add(Source(1 to (num + diff)))
    val bcast = b.add(Broadcast[Int](2))
    val drop = b.add(Flow[Int].drop(diff))
    val zip = b.add(Zip[Int, Int])

    source ~> bcast ~>         zip.in0
              bcast ~> drop ~> zip.in1

    (zip.out)
  }

  // PASS
  "Zip" should "complete with same length streams" in {
    val future: Future[Int] = zipSource(10, 10).runWith(Sink.fold(0)((s, i) 
=> s + 1))
    whenReady(future)(_ shouldBe 10)
  }

  // PASS
  it should "complete with different length streams" in {
    val future: Future[Int] = zipSource(10, 20).runWith(Sink.fold(0)((s, i) 
=> s + 1))
    whenReady(future)(_ shouldBe 10)
  }

  // PASS
  "Zip with drop" should "complete with same length streams" in {
    val future: Future[Int] = zipDropSource(10, 0).runWith(Sink.fold(0)((s, 
i) => s + 1))
    whenReady(future)(_ shouldBe 10)
  }

  // FAIL
  it should "complete with different length streams" in {
    val future: Future[Int] = zipDropSource(10, 
10).runWith(Sink.fold(0)((s, i) => s + 1))
    whenReady(future)(_ shouldBe 10)
  }

}

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to