I had another thought about this behaviour and two things came to my mind. 
First, it might be worth mentioning this somewhere in the documentation 
about testing as other users might also experience this and I think didn't 
see this mentioned anywhere in the docs. And second, is it safe to use 
*expectComplete 
*at all if the behaviour is undefined and any such change might break the 
tests unexpectedly ?

Jakub

On Friday, February 16, 2018 at 9:55:45 PM UTC+1, Patrik Nordwall wrote:
>
> It is allowed to ”delay” delivery of the complete signal when there is no 
> demand. This behavior is undefined, i.e. some stages deliver it 
> immediately, some only when demand is requested. This is as expected and in 
> such test you have to request enough to be sure to get the completed signal.
>
> /Patrik
> fre 16 feb. 2018 kl. 09:02 skrev Jakub Kahovec <[email protected] 
> <javascript:>>:
>
>> Hi,
>>
>>
>> when using Akka Streams (2.5.9)  I've recently bumped into a problem when 
>> testing a stream with a mapAsync stage. It unexpectedly ends with an 
>> assertion failing on *"timeout (3 seconds) during expectMsg while 
>> waiting for OnComplete"*
>>
>>
>> To demonstrate it I've created a simple example. 
>>
>>
>> The code below works as expected.
>>
>>
>> val sourceUnderTest = Source(1 to 2).mapAsync(2)(i => Future.successful(i 
>> * 2))
>>
>> val c = TestSubscriber.manualProbe[Int]() 
>> val p = sourceUnderTest.to(Sink.fromSubscriber(c)).run() 
>> val sub = c.expectSubscription() 
>>
>> sub.request(2) 
>> c.expectNextN(2)   // List(2,4) 
>> c.expectComplete()   // 
>> akka.stream.testkit.TestSubscriber$ManualProbe@fc258b1
>>
>>
>> However, when I add another mapping function (bold code), the test fails 
>> with a timeout. When I call additional c.request(1) (commented code) it 
>> ends correctly. So it looks like the mapping function adds an additional 
>> item into the stream, which seems strange.
>>
>>   
>> val sourceUnderTest = Source(1 to 2).mapAsync(2)(i => Future.successful(i 
>> * 2)*.**map(identity**)*)
>>
>> val c = TestSubscriber.manualProbe[Int]() 
>> val p = sourceUnderTest.to(Sink.fromSubscriber(c)).run() 
>> val sub = c.expectSubscription() 
>>
>> sub.request(2) 
>> c.expectNextN(2)   // List(2,4) 
>> // c.request(1)
>> c.expectComplete()   // ends with java.lang.AssertionError: assertion 
>> failed: timeout (3 seconds) during expectMsg while waiting for OnComplete
>>
>>
>> Can anyone explain this strange behaviour ?
>>
>>
>> Thanks
>>
>> Jakub
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected] 
>> <javascript:>.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to