Hi Tagir,

I wonder if lambdas are the right tool in Java to express such generators. They have something in common - each generator uses a very specific internal state. So maybe an abstract Splitterator class that just needs one method to be implemented is perhaps concise enough and easier to understand.

For example, with the following utility class:

    public abstract class IntGenerator
        extends Spliterators.AbstractIntSpliterator {

        public IntGenerator() {
            super(Long.MAX_VALUE, ORDERED | IMMUTABLE | NONNULL);
        }

        public IntStream stream() {
            return StreamSupport.intStream(this, false);
        }
    }

... uses cases can be expressed as good old anonymous inner classes:

        // collatz
        int start = 10;
        new IntGenerator() {
            int curr = start;
            public boolean tryAdvance(IntConsumer action) {
                if (curr == 1) return false;
                action.accept(curr);
                curr = curr % 2 == 0 ? curr / 2 : curr * 3 + 1;
                return true;
            }
        }.stream().forEach(System.out::println);

        // scanner
        Scanner sc = new Scanner("1 2 3 4 foo");
        new IntGenerator() {
            public boolean tryAdvance(IntConsumer action) {
                if (sc.hasNextInt()) {
                    action.accept(sc.nextInt());
                    return true;
                } else {
                    return false;
                }
            }
        }.stream().forEach(System.out::println);

        // craps
        Random r = new Random();
        IntSupplier dice = () -> r.nextInt(6) + r.nextInt(6) + 2;
        new IntGenerator() {
            int point;
            boolean end;
            public boolean tryAdvance(IntConsumer action) {
                if (!end) {
                    int roll = dice.getAsInt();
                    action.accept(roll);
                    if (roll == 7
                        || (point == 0 && (roll > 10 || roll < 4))
                        || (point != 0 && roll == point)) {
                        end = true;
                    } else if (point == 0) {
                        point = roll;
                    }
                }
                return !end;
            }
        }.stream().forEach(System.out::println);


It is general enough, uses as little state as possible and does not produce garbage for each emitted element.

Regards, Peter

On 03/11/2016 09:27 AM, Tagir F. Valeev wrote:
Using the IntEmitter the Craps round could also be implemented easily
and without additional classes:

public static IntEmitter crapsEmitter(IntSupplier dice, int point) {
   return action -> {
     int roll = dice.getAsInt();
     action.accept(roll);
     return roll == 7 || (point == 0 && (roll > 10 || roll < 4)) || (point != 0 
&& roll == point)
         ? null : crapsEmitter(dice, point == 0 ? roll : point);
   };
}

Usage:

Random r = new Random();
IntSupplier dice = () -> r.nextInt(6) + r.nextInt(6) + 2;
crapsEmitter(dice, 0).stream().forEach(System.out::println);

TFV> Here's the source code I wrote for this test:

TFV> http://cr.openjdk.java.net/~tvaleev/patches/produce/ProduceTest.java

TFV> I also have somewhat weird idea of emitters (added to this test class
TFV> as well). Here's example:

TFV> public static IntEmitter collatzEmitter(int val) {
TFV>     return action -> {
TFV>         action.accept(val);
TFV>         return val == 1 ? null : collatzEmitter(val % 2 == 0 ? val / 2 : 
val * 3 + 1);
TFV>     };
TFV> }

TFV> Usage:

TFV> collatzEmitter(17).stream().forEach(System.out::println);

TFV> It does not need any special value as well as special handling of
TFV> starting value. No additional code for finishing is also necessary.
TFV> However such solution surely produces garbage (at least one lambda
TFV> instance per iteration), so the performance would degrade. However I
TFV> like it.

TFV> With best regards,
TFV> Tagir Valeev.

TFV>> Hello!

TFV>> Just for your information I implemented two alternative stream
TFV>> producers as you suggested:

TFV>> // Produces stream of values returned by producer until the first
TFV>> // empty optional is produced
TFV>> public static IntStream produce(Supplier<OptionalInt> producer);

TFV>> // Produces stream of values until predicate returns false
TFV>> // iff predicate returns true it must call the supplied IntConsumer
TFV>> // exactly once (like in Spliterator.tryAdvance)
TFV>> public static IntStream produce(Predicate<IntConsumer> producer);

TFV>> // Produces stream of values until advancer doesn't call the
TFV>> // IntConsumer at least once.
TFV>> // Calling the IntConsumer multiple times is also acceptable.
TFV>> public static IntStream produce(Consumer<IntConsumer> advancer);

TFV>> I tried to produce the Collatz sequence starting from given number and
TFV>> including the trailing one. Unfortunately I don't see how such
TFV>> signatures might simplify the thing. Here's three-arg iterate
TFV>> implementation posted by me before, for reference:

TFV>> IntStream.iterate(start, val -> val != -1,
TFV>>     val -> val == 1 ? -1 : val % 2 == 0 ? val / 2 : val * 3 + 1);

TFV>> Here's plain-old-loop implementation (without stream):

TFV>> public static void collatzLoop(int start) {
TFV>>   int val = start;
TFV>>   System.out.println(val);
TFV>>   while(val != 1) {
TFV>>     val = val % 2 == 0 ? val / 2 : val * 3 + 1;
TFV>>     System.out.println(val);
TFV>>   }
TFV>> }

TFV>> The problem here is that in the easiest implementation we need to emit
TFV>> the value at two places. Otherwise we either have problems with the
TFV>> last value or with the first value. Here's Supplier<OptionalInt>
TFV>> implementation:

TFV>> public static IntStream collatzSupplier(int start) {
TFV>>   int[] cur = {-1};
TFV>>   return produce(() ->
TFV>>     cur[0] == 1 ? OptionalInt.empty()
TFV>>         : OptionalInt.of(cur[0] =
TFV>>           (cur[0] == -1 ? start :
TFV>>             cur[0] % 2 == 0 ?
TFV>>               cur[0] / 2 : cur[0] * 3 + 1)));
TFV>> }

TFV>> Of course we have to maintain the shared state (here I used dirty
TFV>> one-element array trick). Here we have the same three conditions: to
TFV>> detect starting case, to detect finishing case and to separate
TFV>> even/odd cases. Also we also need some special value. Predicate-based
TFV>> solution is not simpler and has the same three conditions:

TFV>> public static IntStream collatzPredicate(int start) {
TFV>>   int[] cur = {-1};
TFV>>   return produce(action -> {
TFV>>     if(cur[0] == 1) return false;
TFV>>     action.accept(cur[0] = (
TFV>>         cur[0] == -1 ? start :
TFV>>         cur[0] % 2 == 0 ? cur[0] / 2 : cur[0] * 3 + 1));
TFV>>     return true;
TFV>>   });
TFV>> }

TFV>> So, at least for Collatz problem these signatures are not
TFV>> well-suitable (at least not better than three-arg iterate).

TFV>> Using Consumer<IntConsumer> advancer is somewhat better:

TFV>> public static IntStream collatzConsumer(int start) {
TFV>>   int[] cur = {-1};
TFV>>   return produce(action -> {
TFV>>     if(cur[0] == -1) action.accept(cur[0] = start);
TFV>>     if(cur[0] != 1) action.accept((cur[0] = cur[0] % 2 == 0 ? cur[0] / 2 
: cur[0] * 3 + 1));
TFV>>   });
TFV>> }

TFV>> However the implementation of such producer is not very good
TFV>> performance-wise as in tryAdvance we need to buffer emitted values.
TFV>> The forEachRemaining performs much better though.

TFV>> In general I like Supplier<OptionalInt> version better (it also used
TFV>> in your Scanner example), though we need to check how much overhead it
TFV>> creates. We cannot rely on JIT inlining here as functional interfaces
TFV>> are really ubiquitous.

TFV>> Having continuations in Java would actually solve this problem, but I
TFV>> guess we will unlikely to see them in the near future.

TFV>> With best regards,
TFV>> Tagir Valeev.

SM>>> Finally getting back to this.

SM>>> So, now that this has been clarified to model a for-loop, this seems 
fine. It
SM>>> looks like Paul is sponsoring this for you. Great, let's move ahead with 
it.

SM>>> As the author of this particular RFE, though, I should say that this 
isn't what
SM>>> I had in mind when I wrote the RFE. :-)

SM>>> There are two main issues I'm concerned about:

SM>>> 1) modeling loops that aren't for-loops; and

SM>>> 2) modeling loops where the termination condition is known to the stream 
source
SM>>> but isn't part of the stream value.

SM>>> If covering these requires a different API and RFE, then that's fine. I 
just
SM>>> want to make sure these cases aren't dropped.

SM>>> For case (1) I think the Collatz example is a good one, since you 
basically had
SM>>> to cheat in order to get it to work with this API, either by appending 1, 
or by
SM>>> introducing a sentinel value. It's not always possible to do this.

SM>>> The kind of loops I'm interested in look like this:

SM>>>      do {
SM>>>          emitValue();
SM>>>      } while (condition);

SM>>> or

SM>>>      while (true) {
SM>>>          emitValue();
SM>>>          if (condition)
SM>>>              break;
SM>>>          stuff();
SM>>>      }

SM>>> Clearly it's possible to rearrange these to fit into the 3-arg iterate() 
method,
SM>>> but it's unnatural to do so. For example, to implement the craps dice 
game, the
SM>>> obvious Java approach is to maintain the game state in an object or even 
local
SM>>> variables that are mutated as the game progresses. To use the 3-arg 
iterate()
SM>>> method, you wrote a State object (a value), which knows how to generate 
its
SM>>> successor State, and from which the int values are extracted. This is a 
very
SM>>> nice functional approach, but I expect that many people will have data 
sources
SM>>> that rely on mutation or external resources. It doesn't seem reasonable to
SM>>> require rewriting the source logic in order to produce a stream from it. 
And in
SM>>> general it might be arbitrarily difficult.

SM>>> It's probably easier to wrap a Spliterator around the source logic and 
simply
SM>>> call it from tryAdvance(). In fact, this is what I'd like to see in a
SM>>> stream-source API, to make it easier to create such stream sources without
SM>>> having to subclass AbstractSpliterator.

SM>>> For (2) it's critical that the stream source have control over production 
of
SM>>> values. Placing takeWhile() / takeWhileInclusive() downstream doesn't 
work,
SM>>> since the stream machinery (particularly in parallel) may aggressively 
pull
SM>>> elements from the source before they're presented to takeWhile().

SM>>> For example, consider parsing int values from a Scanner into an 
IntStream, until
SM>>> there are no more, leaving the scanner at a known state. Suppose there 
were an
SM>>> iterate(Supplier<OptionalInt>) overload. Then something like this would 
work:

SM>>>      Scanner sc = new Scanner("1 2 3 4 foo");
SM>>>      IntStream.iterate(
SM>>>              () -> sc.hasNextInt() ? OptionalInt.of(sc.nextInt())
SM>>>                                    : OptionalInt.empty())
SM>>>          .forEach(System.out::println);

SM>>>      assert sc.hasNext();
SM>>>      System.out.println(sc.next()); // prints "foo"

SM>>> Boxing things into optionals is a bit cumbersome, and I'm also concerned 
about
SM>>> the overhead. But it illustrates the point that the termination condition 
is
SM>>> separate from the values that go into the stream. In particular there's no
SM>>> "seed" value to which a termination condition can be applied. (And 
"iterate"
SM>>> might not be the best name for this method, either.)

SM>>> Oh well, I think this is turning into another RFE.

SM>>> s'marks




SM>>> On 2/16/16 9:48 PM, Tagir F. Valeev wrote:
Hello, Stuart!

Thank you for your comments.

SM> I'd suggest focusing on the API first before worrying about how to track the
SM> stream state with booleans, etc. Is the API convenient to use, and how well 
does
SM> it support the use cases we envision for it?

As Brian already noted, the most benefit of such signature is the
resemblance to the good old for loop. Also it's good, because the
lambdas don't need to maintain external mutable state in this case
(the state is encapsulated in the current element). Most of your
proposed examples, however, need to do it as they don't receive the
existing state. Also I see no reason to create a method which is
compatible with iterator::hasNext/iterator::next or even
spliterator::tryAdvance. If you already have a spliterator, you can
create a stream using StreamSupport.stream(spliterator, false). If you
have an iterator, you can convert it to spliterator using
Spliterators.spliterator[UnknownSize]. Well, this probably looks ugly,
but more flexible and signals that some low-level stuff is performed.

Supplier<Stream<T>> is definitely bad in terms of performance.
Creating a new stream is not nearly free. To illustrate this I wrote a
simple benchmark which compares .flatMapToInt(OptionalInt::stream)
with Java8 way .filter(OptionalInt::isPresent).mapToInt(OptionalInt::getAsInt)

Here's source code and full output:
http://cr.openjdk.java.net/~tvaleev/jmh/optionalstream/

Benchmark                               (n)  Mode  Cnt      Score     Error  
Units
OptionalTest.testOptionalFilterGet       10  avgt   30      0,171 ±   0,011  
us/op
OptionalTest.testOptionalFilterGet     1000  avgt   30      6,295 ±   0,046  
us/op
OptionalTest.testOptionalFilterGet  1000000  avgt   30  12597,706 ±  69,214  
us/op
OptionalTest.testOptionalStream          10  avgt   30      0,330 ±   0,002  
us/op
OptionalTest.testOptionalStream        1000  avgt   30     27,552 ±   0,577  
us/op
OptionalTest.testOptionalStream     1000000  avgt   30  30837,240 ± 812,420  
us/op

Involving intermediate streams makes the thing at least twice slower.
Surely this delay could become negligible in some scenarios, but I
think it's inacceptable to enforce users to create new source with a
bunch of streams. At least primitive specializations will become
meaningless in this case: boxing would eat much less time compared to
stream creation.

As for elements drawn from the queue, it's much better to use existing
takeWhile method:

queue.stream().takeWhile(x -> x.equals(sentinel));

True, such approach will not include the sentinel element to the
result, and there's no easy way to do it with current API. Probably
additional method (takeWhileInclusive?) could be considered to solve
such problems. Still, I think, drawing from the queue is not the
problem which should be solved with new iterate() method.

As for Collatz conjecture, it's quite easy to iterate without trailing
one:

IntStream.iterate(start, val -> val != 1,
    val -> val % 2 == 0 ? val / 2 : val * 3 + 1)

If having one is desired, then it would be easier just to append one
to the stream (even if Collatz conjecture is false we will have an
infinite stream, so appended one will never appear):

IntStream.concat(
    IntStream.iterate(start, val -> val != 1,
        val -> val % 2 == 0 ? val / 2 : val * 3 + 1),
    IntStream.of(1))

A side note: having IntStream.append(int... numbers) would be really
nice:

IntStream.iterate(start, val -> val != 1,
    val -> val % 2 == 0 ? val / 2 : val * 3 + 1).append(1)

Another approach would be to introduce a special stop value (for
example, -1):

IntStream.iterate(start, val -> val != -1,
    val -> val == 1 ? -1 : val % 2 == 0 ? val / 2 : val * 3 + 1)

This stream produces Collatz series, including the trailing one.

As for Craps, I never heard about such game. If I understood the rules
correctly, it's good to represent the state as separate object and
define state transition via its method. Something like this should
work:

Random r = new Random();
IntSupplier dice = () -> r.nextInt(6)+r.nextInt(6)+2;
class State {
    int roll, point;

    State(int roll, int point) {
      this.roll = roll;
      this.point = point;
    }

    State() {
      this(dice.getAsInt(), 0);
    }

    boolean isStopRound() {
      return roll == 7 || (point == 0 && (roll > 10 || roll < 4)) || (point != 0 
&& roll == point);
    }

    State next() {
      return isStopRound() ? null : new State(dice.getAsInt(), point == 0 ? 
roll : point);
    }
}
Stream.iterate(new State(), Objects::nonNull, State::next)
        .mapToInt(state -> state.roll)
        .forEach(System.out::println);

With best regards,
Tagir Valeev.


SM> In particular, I can imagine a number of cases where it would be very 
helpful to
SM> be able to support an empty stream, or where the computation to produce the
SM> first element is the same as the computation to produce subsequent elements.
SM> Requiring a value for the first stream element is at odds with that.

SM> Here are some ideas for use cases to try out:

SM>   - a series of dice rolls representing a round of craps [1]
SM>   - elements drawn from a queue until the queue is empty or until
SM>     a sentinel is reached
SM>   - a sequence of numbers that (probably) terminates but whose length
SM>     isn't necessarily known in advance (e.g. Collatz sequence [2])

SM> [1] https://en.wikipedia.org/wiki/Craps

SM> [2] https://en.wikipedia.org/wiki/Collatz_conjecture

SM> Note that in some cases the sentinel value that terminates the stream 
should be
SM> part of the stream, and in other cases it's not.

SM> I'm sure you can find more uses cases by perusing Stack Overflow. :-)

SM> I'm a bit skeptical of the use of "iterate" for producing a finite stream. 
There
SM> are the usual issues with overloading, but there's also potential confusion 
as
SM> some forms of iterate() are infinite and others finite. I'll suggest the 
name
SM> "produce" instead, but there are surely better terms.

SM> One thing to think about is where the state of the producer is stored. Is it
SM> expected to be in an argument that's passed to each invocation of the 
functional
SM> argument, or is it expected to be captured? I don't think there's an answer 
in
SM> isolation; examining use cases would probably shed some light here.

SM> Here are a few API ideas (wildcards elided):

SM> --

SM> <T> Stream<T> iterate(T seed, Predicate<T> predicate, UnaryOperator<T> f)

SM> The API from your proposal, for comparison purposes.

SM> --

SM> <T> Stream<T> produce(Supplier<Optional<T>>)

SM> Produces elements until empty Optional is returned. This box/unboxes every
SM> element, maybe(?) alleviated by Valhalla.

SM> --

SM> <T> Stream<T> produce(BooleanSupplier, Supplier<T>)

SM> Calls the BooleanSupplier; if true the next stream element is what's 
returned by
SM> calling the Supplier. If BooleanSupplier returns false, end of stream. If 
you
SM> have an iterator already, this enables

SM> produce(iterator::hasNext, iterator::next)

SM> But if you don't have an iterator already, coming up with the functions to
SM> satisfy the iterator-style protocol is sometimes painful.

SM> --

SM> <T> Stream<T> produce(Predicate<Consumer<T>> advancer)

SM> This has an odd signature, but the function is like 
Spliterator.tryAdvance(). It
SM> must either call the consumer once and return true, or return false without
SM> calling the consumer.

SM> --

SM> <T> Stream<T> produce(Consumer<Consumer<T>> advancer)

SM> A variation of the above, without a boolean return. The advancer calls the
SM> consumer one or more times to add elements to the stream. End of stream 
occurs
SM> when the advancer doesn't call the consumer.

SM> --

SM> <T> Stream<T> produce(Supplier<Stream<T>>)

SM> A variation of Supplier<Optional<T>> where the supplier returns a stream
SM> containing zero or more elements. The stream terminates if the supplier 
returns
SM> an empty stream. There "boxing" overhead here, but we don't seem to be 
bothered
SM> by this with flatMap().

SM> --

SM> s'marks


SM> On 2/14/16 6:53 AM, Tagir F. Valeev wrote:
Hello!

I wanted to work on foldLeft, but Brian asked me to take this issue
instead. So here's webrev:
http://cr.openjdk.java.net/~tvaleev/webrev/8072727/r1/

I don't like iterator-based Stream source implementations, so I made
them AbstractSpliterator-based. I also implemented manually
forEachRemaining as, I believe, this improves the performance in
non-short-circuiting cases.

I also decided to keep two flags (started and finished) to track the
state. Currently existing implementation of infinite iterate() does
not use started flag, but instead reads one element ahead for
primitive streams. This seems wrong to me and may even lead to
unexpected exceptions (*). I could get rid of "started" flag for
Stream.iterate() using Streams.NONE, but this would make object
implementation different from primitive implementations. It would also
be possible to keep single three-state variable (byte or int,
NOT_STARTED, STARTED, FINISHED), but I doubt that this would improve
the performance or footprint. Having two flags looks more readable to
me.

Currently existing two-arg iterate methods can now be expressed as a
partial case of the new method:

public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {
       return iterate(seed, x -> true, f);
}
(same for primitive streams). I may do this if you think it's
reasonable.

I created new test class and added new iterate sources to existing
data providers.

Please review and sponsor!

With best regards,
Tagir Valeev.

(*) Consider the following code:

int[] data = {1,2,3,4,-1};
IntStream.iterate(0, x -> data[x])
            .takeWhile(x -> x >= 0)
            .forEach(System.out::println);

Currently this unexpectedly throws an AIOOBE, because
IntStream.iterate unnecessarily tries to read one element ahead.


Reply via email to