Hello!

Thank you for your feedback. I would disagree that my solution is more
complex. In fact, it's much more simple. It doesn't require any
changes in *Pipeline classes. Also, unlike your proposal, it doesn't
create different behavior for JDK stream implementation and for any
third-party stream implementations. Also, it's more user-friendly.
Your proposal assumes that push-like behavior could be necessary only
in flatMap-like scenarios. But what if somebody wants just to return
the stream to the caller? With your proposal, it could be done in some
hacky way like 'return Stream.of("bogus element").mapMulti((c,
ignored) -> ...)' instead of straightforward 'return ofPusher(c ->
...)'.

By the way, the proposed API allows no possibility to short-circuit
the pusher. So if mapMulti produces many elements and short-circuiting
terminal operation like anyMatch finds the match, it won't stop the
pusher from pushing. Have you considered to use
BiConsumer<Predicate<T>, T>, so Stream API may signal to the pusher
that it's ok to stop pushing? Note that in modern Java versions,
flatMap can actually short-circuit.

With best regards,
Tagir Valeev.

On Thu, Jul 2, 2020 at 9:39 PM Patrick Concannon
<patrick.concan...@oracle.com> wrote:
>
> Hi Tagir,
>
> Thank you for your input. However, we feel that this approach may add more 
> complexity than is needed at this stage. It's a good point, and one that we 
> think is reasonable to consider and evaluate as a possible follow on from 
> this initial proposal.
>
> Kind regards,
>
> Patrick
>
> > On 25 Jun 2020, at 05:04, Tagir Valeev <amae...@gmail.com> wrote:
> >
> > Hello!
> >
> > To me, it looks like it's possible to make the better default
> > implementation. It could be done even as a separate static method:
> >
> > static <T> Stream<T> ofPusher(Consumer<Consumer<? super T>> pusher) {
> >  return StreamSupport.stream(new Spliterator<>() {
> >    private Spliterator<T> delegate;
> >
> >    @Override
> >    public boolean tryAdvance(Consumer<? super T> action) {
> >      initDelegate();
> >      return delegate.tryAdvance(action);
> >    }
> >
> >    private void initDelegate() {
> >      if (delegate == null) {
> >        Stream.Builder<T> builder = Stream.builder(); // or use
> > SpinedBuffer directly
> >        pusher.accept(builder);
> >        delegate = builder.build().spliterator();
> >      }
> >    }
> >
> >    @Override
> >    public void forEachRemaining(Consumer<? super T> action) {
> >      if (delegate != null) {
> >        delegate.forEachRemaining(action);
> >      } else {
> >        pusher.accept(action);
> >      }
> >    }
> >
> >    @Override
> >    public Spliterator<T> trySplit() {
> >      initDelegate();
> >      return delegate.trySplit();
> >    }
> >
> >    @Override
> >    public long estimateSize() {
> >      return Long.MAX_VALUE;
> >    }
> >
> >    @Override
> >    public int characteristics() {
> >      return 0;
> >    }
> >  }, false);
> > }
> >
> > In this case, we are buffering only if short-circuit operation or
> > splitting is requested. Otherwise, forEachRemaining will just delegate
> > to the pusher.
> > Now, the default implementation could be rewritten as
> >
> > <T, R> Stream<R> mapMulti(Stream<T> stream, BiConsumer<Consumer<?
> > super R>, ? super T> mapper) {
> >  Objects.requireNonNull(mapper);
> >  return stream.flatMap(e -> ofPusher(sink -> mapper.accept(sink, e)));
> > }
> >
> > And now, I don't think it's necessary to specialize it at all.
> > Probably it's not necessary to introduce mapMulti at all as well, as
> > now it's a trivial delegate to ofPusher.
> >
> > With best regards,
> > Tagir Valeev.
> >
> > On Wed, Jun 24, 2020 at 5:58 PM Patrick Concannon
> > <patrick.concan...@oracle.com> wrote:
> >>
> >> Hi,
> >>
> >> Could someone please review myself and Julia's RFE and CSR for JDK-8238286 
> >> - 'Add new flatMap stream operation that is more amenable to pushing’?
> >>
> >> This proposal is to add a new flatMap-like operation:
> >>
> >> `<R> Stream<R> mapMulti(BiConsumer<Consumer<R>, ? super T> mapper)`
> >>
> >> to the java.util.Stream class. This operation is more receptive to the 
> >> pushing or yielding of values than the current implementation that 
> >> internally assembles values (if any) into one or more streams. This 
> >> addition includes the primitive variations of the operation i.e. 
> >> mapMultiToInt, IntStream mapMulti, etc.
> >>
> >> issue: https://bugs.openjdk.java.net/browse/JDK-8238286 
> >> <https://bugs.openjdk.java.net/browse/JDK-8238286>
> >> csr: https://bugs.openjdk.java.net/browse/JDK-8248166 
> >> <https://bugs.openjdk.java.net/browse/JDK-8248166>
> >>
> >> webrev: http://cr.openjdk.java.net/~pconcannon/8238286/webrevs/webrev.00/ 
> >> <http://cr.openjdk.java.net/~pconcannon/8238286/webrevs/webrev.00/>
> >> specdiff: 
> >> http://cr.openjdk.java.net/~pconcannon/8238286/specdiff/specout.00/overview-summary.html
> >>   
> >> <http://cr.openjdk.java.net/~pconcannon/8238286/specdiff/specout.00/overview-summary.html>
> >>
> >>
> >> Kind regards,
> >> Patrick & Julia
>

Reply via email to