> On 18 Aug 2016, at 03:54, Doug Lea <d...@cs.oswego.edu> wrote:
> 
> On 08/17/2016 09:01 AM, Tagir F. Valeev wrote:
>> Hello!
>> 
>> I found no information in Stream API documentation on how it behaves
>> in case if exception occurs during the stream processing. While it's
>> quite evident for sequential stream (stream processing is terminated
>> and exception is propagated to the caller as is), the behavior for
>> parallel streams differs from one might expect. Consider the following
>> test:
> 
> In your example, you can witness the delayed termination of other threads
> upon exception in another because you add side-effecting operations
> (here, just printing). Avoiding all this would force sequential processing.
> 
> But if the supplied functions follow the documented properties,
> it should not matter that parallel processing of some elements continues
> when another hits an exception. Which is why nothing is said about it.
> Similar effects occur in findAny and other methods. I don't see any benefit
> in trying to specify exactly what happens in these cases.
> 

Short circuiting terminal operations such as findAny and findFirst will not 
return until the computation has “quiesced”, so once a result has been found 
the result will not be returned until all executing f/j tasks have completed. 
We consciously decided to do that so as the common pool would not contain 
straggling tasks beyond evaluation of the pipeline.

When an behavioural operation throws an exception the stream can return before 
all related f/j tasks have finished. I thought i had a good grasp of the 
CountedCompleter exception propagation behaviour vs. normal completion 
propagation behaviour, but i am not so sure now, since i cannot bias things to 
make the non-quiescent behaviour very obvious. I think it’s because the main 
non-worker thread tries to help out executing tasks before waiting.

Here is another reproducer:
try {
    int s = 1024 * 16;
    IntStream.range(0, s).parallel()
            .peek(e -> {
                if (e < s / 2) throw new RuntimeException();
                BigInteger.probablePrime(256, ThreadLocalRandom.current());
            })
            .forEach(e -> System.out.println(Thread.currentThread().getId() + " 
" + e));
} catch (RuntimeException e) {
    System.out.println("FAILED: " + ForkJoinPool.commonPool().isQuiescent());
}
Paul.

Reply via email to