Hi Paul,
After reading your notes here, and in the bug reports, and the comments in the
code, and banging my head against the code (before and after) for a while, I'm
starting to see that this is on the right track. Sorry to hedge a bit but I have
to admit that I don't fully understand the code.
However, I do see the comment you referred to (old version, line 460) and that
the new code is essentially a merge of the old parallelPrepare() code into
sourceSpliterator(). And I do see the crucial addition of the flag modification
based on the spliterator's SIZED characteristic. So that seems right, and if the
tests pass, so much the better.
A few comments for future maintenance/cleanups in this area.
* This package seems like a curious mixture of abstraction of bit-twiddling into
small methods (e.g., combineOpFlags), and bit-twiddling in open code (evalation
of Spliterator.SIZED into thisOpFlags, new version lines 459ff). While not
incorrect, this is jarring.
* The loop variables over the pipeline stages are hard to follow. The variable
'p' is used in the loop at line 413 a the "current" pipeline stage, whereas 'p'
is used for the "next" stage in the loop at 434, and 'u' is the "current" stage.
This is confusing, and I don't know what 'u' means.
* Also, in the same loop, 'e' is initialized to 'this' and is checked as the
loop exit condition (I guess that's what 'e' stands for) but it's not used
elsewhere, so it's not clear to me how much value this adds.
Stepping back from this a bit, this is clearly an area of some complexity, and
it might benefit from some additional testing. The streams library is overall
well-tested, but mostly from a functional level, i.e. running a bunch of streams
in various combinations to ensure the right output is produced. For this case it
might be helpful to assemble (but not execute) a bunch of combinations of
pipelines and then make sure that each stage ends up with the right flags. (I
didn't find such a test when I went looking, but I might have missed it.)
In any case I think it's reasonble to proceed with this patch as it stands
instead of tinkering with it. Some additional cleanups are warranted at some
point but we should keep an eye on these for the future.
s'marks
On 3/18/15 7:09 AM, Paul Sandoz wrote:
Hi,
The fix for https://bugs.openjdk.java.net/browse/JDK-8067969 (optimized
Stream.count()) caused a regression in the JCK tests.
A test exposed a known weakness in the way pipeline flags are prepared for parallel
execution. In certain cases the pipeline may report SIZED but the source spliterator does
not report the SIZE characteristic. This is because the preparation always assumed that
stateful operations are a full barrier and thus inject SIZE. This is not always the case
when a stateful operation is "lazy" and can wrap the source spliterator and
operations.
For example:
List<Integer> list = Arrays.asList(1, 2, 3);
Stream<Integer> s = list.stream().parallel().unordered().distinct();
long count = s.count();
System.out.println(count);
The stateful distinct op can wrap source spliterator of the list since we don't
care about order, but as a result the size of the distinct->count pipeline
slice is unknown.
While i could fix the counting functionality to check for the inconsistency the
right thing to do is fix the preparation of flags (a particularly sensitive
area to get right) by merging that into the sourceSpliterator method (as
suggested in the comments):
http://cr.openjdk.java.net/~psandoz/jdk9/JDK-8075307-stream-parallel-prepare-flags/webrev/
This also makes things marginally more efficient when there are no stateful
operations. There is no need to do any work if there are no stateful ops (i
needed to relax tests in UnorderedTest which were overly aggressive when there
were no stateful ops present, similarly i relaxed a test in DisctinctOps that
is no longer generally applicable).
Furthermore, as a consequence the terminal counting reduce ops now declare they
don't care about encounter order [*], which avoid the distinct becoming a full
barrier that reduces elements to a linked hash set.
A JPRT test run reported no relevant failures and local execution of relevant
JCK tests pass.
Paul.
[*] Note that a count can also be performed with:
LongAdder l = new LongAdder();
Stream<T> s = ...
s.parallel().forEach(e -> l.increment());
long count = l.sum();
Which is a possible alternative implementation to that of reduction, but i
prefer to stick with the latter for now.