Perhaps I've misunderstood, but it sounds like you need to process the 
stream twice i.e. sequential, not parallel, processing.
That really isn't a use case for broadcast, AFIK.
A few questions:

   1. Can the underlying stream be opened multiple times? For example, are 
   you reading elements from a file?
   2. Does the first stage in your processing produce an aggregate i.e. a 
   result whose value is dependent upon the entire stream but whose size is 
   independent of the size of the stream? For example, a stage that sums 
   elements or computes a hash.

If the answer to the above are yes, you should be able to easily construct 
a flow which opens the file, produces the aggregate, and feeds it 
downstream, zipping it with elements from the second reading of the file (a 
second subscriber on the same source), all within a single flow.

It would help to understand your requirements to suggest a more specific, 
correct solution.

-Lance

On Monday, July 27, 2015 at 10:50:45 AM UTC-4, wwagner4 wrote:
>
> I do have the same poblem but increasing the buffer size does not really 
> help. In my case all elements from one branche must be read before I read 
> all the elements of the second branch. So the maximum difference is always 
> the complete length of the stream which I do not know in advance (and which 
> can be very big). 
> In any case it seems not to be a good idea to rely on a buffer size that 
> depends on the amount of elements provided by a stream. As the 
> documentation says: "Adjusting buffer size is only for inreasing 
> performance".
>
> Perhaps some kind of BroadcastPreferred like MergePeferred could be a 
> solution.
>
> Am Dienstag, 2. Juni 2015 00:10:34 UTC+2 schrieb Lance Arlaus:
>>
>> Circling back on this, I created a blog post that explains the issue I 
>> encountered along with the solution of using a balancing buffer.
>>
>>
>> http://blog.lancearlaus.com/akka/streams/scala/2015/05/27/Akka-Streams-Balancing-Buffer/
>>
>> I hope it helps those who encounter the same issue.
>>
>> On Saturday, May 9, 2015 at 11:49:48 AM UTC-4, Lance Arlaus wrote:
>>>
>>> No problem.
>>> Thanks for the quick response and here's the corresponding issue: 
>>> https://github.com/akka/akka/issues/17435
>>>
>>> On Saturday, May 9, 2015 at 5:05:44 AM UTC-4, drewhk wrote:
>>>>
>>>> Hi Lance,
>>>>
>>>> On Sat, May 9, 2015 at 12:49 AM, Lance Arlaus <[email protected]> 
>>>> wrote:
>>>>
>>>>> Hi-
>>>>>
>>>>> I've encountered an issue with processing a stream that I fan out via 
>>>>> broadcast and fan in via zip.
>>>>> The broadcast splits the stream in two with one branch containing a 
>>>>> drop element.
>>>>> According to my read of the docs, I would expect the terminating zip 
>>>>> to complete when the shorter of the two streams (the one with the drop) 
>>>>> completes.
>>>>> However, the flow hangs waiting indefinitely.
>>>>>
>>>>> Here's the relevant part of a test case I put together to reproduce 
>>>>> the problem.
>>>>> Note that the flow without the drop (the first flow) works fine with 
>>>>> different length streams.
>>>>> What am I doing wrong?
>>>>>
>>>>
>>>> I don't think you are doing anything wrong. Btw, I suspect the bug 
>>>> being in Broadcast instead. Can you file a ticket please?
>>>>
>>>> -Endre
>>>>  
>>>>
>>>>>
>>>>> Akka Stream Version: 1.0-RC2
>>>>>
>>>>> Thanks,
>>>>> Lance
>>>>>
>>>>>   // This flow works fine
>>>>>   def zipSource(num: Int, diff: Int) = Source() { implicit b =>
>>>>>     import akka.stream.scaladsl.FlowGraph.Implicits._
>>>>>
>>>>>     val source0 = b.add(Source(1 to num))
>>>>>     val source1 = b.add(Source(1 to (num + diff)))
>>>>>     val zip = b.add(Zip[Int, Int])
>>>>>
>>>>>     source0 ~> zip.in0
>>>>>     source1 ~> zip.in1
>>>>>
>>>>>     (zip.out)
>>>>>   }
>>>>>
>>>>>   // This flow waits indefinitely when diff > 0
>>>>>   def zipDropSource(num: Int, diff: Int) = Source() {  implicit b =>
>>>>>     import akka.stream.scaladsl.FlowGraph.Implicits._
>>>>>
>>>>>     val source = b.add(Source(1 to (num + diff)))
>>>>>     val bcast = b.add(Broadcast[Int](2))
>>>>>     val drop = b.add(Flow[Int].drop(diff))
>>>>>     val zip = b.add(Zip[Int, Int])
>>>>>
>>>>>     source ~> bcast ~>         zip.in0
>>>>>               bcast ~> drop ~> zip.in1
>>>>>
>>>>>     (zip.out)
>>>>>   }
>>>>>
>>>>>   // PASS
>>>>>   "Zip" should "complete with same length streams" in {
>>>>>     val future: Future[Int] = zipSource(10, 
>>>>> 10).runWith(Sink.fold(0)((s, i) => s + 1))
>>>>>     whenReady(future)(_ shouldBe 10)
>>>>>   }
>>>>>
>>>>>   // PASS
>>>>>   it should "complete with different length streams" in {
>>>>>     val future: Future[Int] = zipSource(10, 
>>>>> 20).runWith(Sink.fold(0)((s, i) => s + 1))
>>>>>     whenReady(future)(_ shouldBe 10)
>>>>>   }
>>>>>
>>>>>   // PASS
>>>>>   "Zip with drop" should "complete with same length streams" in {
>>>>>     val future: Future[Int] = zipDropSource(10, 
>>>>> 0).runWith(Sink.fold(0)((s, i) => s + 1))
>>>>>     whenReady(future)(_ shouldBe 10)
>>>>>   }
>>>>>
>>>>>   // FAIL
>>>>>   it should "complete with different length streams" in {
>>>>>     val future: Future[Int] = zipDropSource(10, 
>>>>> 10).runWith(Sink.fold(0)((s, i) => s + 1))
>>>>>     whenReady(future)(_ shouldBe 10)
>>>>>   }
>>>>>
>>>>> }
>>>>>
>>>>>  -- 
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ: 
>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>> >>>>>>>>>> Search the archives: 
>>>>> https://groups.google.com/group/akka-user
>>>>> --- 
>>>>> You received this message because you are subscribed to the Google 
>>>>> Groups "Akka User List" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>>> an email to [email protected].
>>>>> To post to this group, send email to [email protected].
>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>
>>>>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to