I've gone ahead and added this to the implementation note section of
Stream.concat
+ * <p>Subsequent changes to the sequential/parallel execution mode of the
+ * returned stream are not guaranteed to be propagated to the input
streams.
+ *
and I've pushed the change.
The situation with Stream.takeWhile/dropWhile seems a bit sticky. On the one
hand, the default implementations seem to make a reasonable set of compromises.
On the other hand, I can't think of anything useful to add to the description
that's already there. If somebody can think of something useful to add, by all
means propose it, but I can't think of anything that usefully clarifies the
situation without diving into a bunch of implementation details.
s'marks
On 8/4/15 12:20 AM, Paul Sandoz wrote:
On 4 Aug 2015, at 01:09, Stuart Marks <stuart.ma...@oracle.com> wrote:
Hi Tagir,
Interesting issues.
Regarding Stream.concat, it may be that, today, changes to the
sequential/parallel execution mode aren't propagated to the streams being
concatenated.
The execution mode is propagated if either stream to concat is parallel i.e.
isParallel()
The issue here is that Stream.spltierator() is a form of terminal operation
that will result in pipeline evaluation if there are non-lazy stateful
operations present.
What we don’t currently do is propagate the parallelism back to a sequential
stream when the other is a parallel stream. We could easily do that as a bug
fix. I agree with Stuart it does not require any specification e.g.:
public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T>
b) {
Objects.requireNonNull(a);
Objects.requireNonNull(b);
boolean isPar = a.isParallel() || b.isParallel();
@SuppressWarnings("unchecked")
Spliterator<T> split = new Streams.ConcatSpliterator.OfRef<>(
(Spliterator<T>) (isPar ? a.parallel() : a).spliterator(),
(Spliterator<T>) (isPar ? b.parallel() : b).spliterator());
Stream<T> stream = StreamSupport.stream(split, isPar);
return stream.onClose(Streams.composedClose(a, b));
}
But is that something inherent to the specification of concatenation, or is it
something that might change in the future? It's currently unspecified, so
adding a clarification really sounds more like changing the specification to
reflect the current implementation, which I'd prefer not to do.
Regarding the default implementations of takeWhile/dropWhile, again, today,
they don't propagate the execution mode upstream. But is this just a bug?
Granted the API for doing so isn't obvious, but isn't this something that could
just be fixed?
The default implementations are specified to propagate the execution mode in
terms of correctly reporting isParallel() but may choose not to split:
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
* not support splitting. When the returned stream is closed, the close
* handlers for both the returned and this stream are invoked.
So this is a little different from the Stream.concat case.
The only way a default implementation can be implemented is to derive
functionality from the upstream spliterator(). Rather than choosing to add yet
more code to support this (and most likely poorly too) i opted for not
splitting and reuse some existing code (the unordered spliterators that are
configured not to split). Nor did i want to wrap the non-splitting spliterator
around one which copies a prefix, which introduces a different set of poorly
splitting characteristics (and would also penalise sequential operation). This
is a tradeoff, and seems to me a reasonable compromise for a default.
Paul.