Hi Paul,

After reading your notes here, and in the bug reports, and the comments in the code, and banging my head against the code (before and after) for a while, I'm starting to see that this is on the right track. Sorry to hedge a bit but I have to admit that I don't fully understand the code.

However, I do see the comment you referred to (old version, line 460) and that the new code is essentially a merge of the old parallelPrepare() code into sourceSpliterator(). And I do see the crucial addition of the flag modification based on the spliterator's SIZED characteristic. So that seems right, and if the tests pass, so much the better.

A few comments for future maintenance/cleanups in this area.

* This package seems like a curious mixture of abstraction of bit-twiddling into small methods (e.g., combineOpFlags), and bit-twiddling in open code (evalation of Spliterator.SIZED into thisOpFlags, new version lines 459ff). While not incorrect, this is jarring.

* The loop variables over the pipeline stages are hard to follow. The variable 'p' is used in the loop at line 413 a the "current" pipeline stage, whereas 'p' is used for the "next" stage in the loop at 434, and 'u' is the "current" stage. This is confusing, and I don't know what 'u' means.

* Also, in the same loop, 'e' is initialized to 'this' and is checked as the loop exit condition (I guess that's what 'e' stands for) but it's not used elsewhere, so it's not clear to me how much value this adds.

Stepping back from this a bit, this is clearly an area of some complexity, and it might benefit from some additional testing. The streams library is overall well-tested, but mostly from a functional level, i.e. running a bunch of streams in various combinations to ensure the right output is produced. For this case it might be helpful to assemble (but not execute) a bunch of combinations of pipelines and then make sure that each stage ends up with the right flags. (I didn't find such a test when I went looking, but I might have missed it.)

In any case I think it's reasonble to proceed with this patch as it stands instead of tinkering with it. Some additional cleanups are warranted at some point but we should keep an eye on these for the future.

s'marks

On 3/18/15 7:09 AM, Paul Sandoz wrote:
Hi,

The fix for https://bugs.openjdk.java.net/browse/JDK-8067969 (optimized 
Stream.count()) caused a regression in the JCK tests.

A test exposed a known weakness in the way pipeline flags are prepared for parallel 
execution. In certain cases the pipeline may report SIZED but the source spliterator does 
not report the SIZE characteristic. This is because the preparation always assumed that 
stateful operations are a full barrier and thus inject SIZE. This is not always the case 
when a stateful operation is "lazy" and can wrap the source spliterator and 
operations.

For example:

   List<Integer> list = Arrays.asList(1, 2, 3);
   Stream<Integer> s = list.stream().parallel().unordered().distinct();
   long count = s.count();
   System.out.println(count);

The stateful distinct op can wrap source spliterator of the list since we don't 
care about order, but as a result the size of the distinct->count pipeline 
slice is unknown.

While i could fix the counting functionality to check for the inconsistency the 
right thing to do is fix the preparation of flags (a particularly sensitive 
area to get right) by merging that into the sourceSpliterator method (as 
suggested in the comments):

   
http://cr.openjdk.java.net/~psandoz/jdk9/JDK-8075307-stream-parallel-prepare-flags/webrev/

This also makes things marginally more efficient when there are no stateful 
operations. There is no need to do any work if there are no stateful ops (i 
needed to relax tests in UnorderedTest which were overly aggressive when there 
were no stateful ops present, similarly i relaxed a test in DisctinctOps that 
is no longer generally applicable).

Furthermore, as a consequence the terminal counting reduce ops now declare they 
don't care about encounter order [*], which avoid the distinct becoming a full 
barrier that reduces elements to a linked hash set.

A JPRT test run reported no relevant failures and local execution of relevant 
JCK tests pass.

Paul.

[*] Note that a count can also be performed with:

   LongAdder l = new LongAdder();
   Stream<T> s = ...
   s.parallel().forEach(e -> l.increment());
   long count = l.sum();

Which is a possible alternative implementation to that of reduction, but i 
prefer to stick with the latter for now.

Reply via email to