Hello!
To me, it looks like it's possible to make the better default
implementation. It could be done even as a separate static method:
static <T> Stream<T> ofPusher(Consumer<Consumer<? super T>> pusher) {
return StreamSupport.stream(new Spliterator<>() {
private Spliterator<T> delegate;
@Override
public boolean tryAdvance(Consumer<? super T> action) {
initDelegate();
return delegate.tryAdvance(action);
}
private void initDelegate() {
if (delegate == null) {
Stream.Builder<T> builder = Stream.builder(); // or use
SpinedBuffer directly
pusher.accept(builder);
delegate = builder.build().spliterator();
}
}
@Override
public void forEachRemaining(Consumer<? super T> action) {
if (delegate != null) {
delegate.forEachRemaining(action);
} else {
pusher.accept(action);
}
}
@Override
public Spliterator<T> trySplit() {
initDelegate();
return delegate.trySplit();
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return 0;
}
}, false);
}
In this case, we are buffering only if short-circuit operation or
splitting is requested. Otherwise, forEachRemaining will just delegate
to the pusher.
Now, the default implementation could be rewritten as
<T, R> Stream<R> mapMulti(Stream<T> stream, BiConsumer<Consumer<?
super R>, ? super T> mapper) {
Objects.requireNonNull(mapper);
return stream.flatMap(e -> ofPusher(sink -> mapper.accept(sink, e)));
}
And now, I don't think it's necessary to specialize it at all.
Probably it's not necessary to introduce mapMulti at all as well, as
now it's a trivial delegate to ofPusher.
With best regards,
Tagir Valeev.
On Wed, Jun 24, 2020 at 5:58 PM Patrick Concannon
<[email protected]> wrote:
>
> Hi,
>
> Could someone please review myself and Julia's RFE and CSR for JDK-8238286 -
> 'Add new flatMap stream operation that is more amenable to pushing’?
>
> This proposal is to add a new flatMap-like operation:
>
> `<R> Stream<R> mapMulti(BiConsumer<Consumer<R>, ? super T> mapper)`
>
> to the java.util.Stream class. This operation is more receptive to the
> pushing or yielding of values than the current implementation that internally
> assembles values (if any) into one or more streams. This addition includes
> the primitive variations of the operation i.e. mapMultiToInt, IntStream
> mapMulti, etc.
>
> issue: https://bugs.openjdk.java.net/browse/JDK-8238286
> <https://bugs.openjdk.java.net/browse/JDK-8238286>
> csr: https://bugs.openjdk.java.net/browse/JDK-8248166
> <https://bugs.openjdk.java.net/browse/JDK-8248166>
>
> webrev: http://cr.openjdk.java.net/~pconcannon/8238286/webrevs/webrev.00/
> <http://cr.openjdk.java.net/~pconcannon/8238286/webrevs/webrev.00/>
> specdiff:
> http://cr.openjdk.java.net/~pconcannon/8238286/specdiff/specout.00/overview-summary.html
>
> <http://cr.openjdk.java.net/~pconcannon/8238286/specdiff/specout.00/overview-summary.html>
>
>
> Kind regards,
> Patrick & Julia