Hi Tagir, Yes, i forgot about that, thanks, nice catch. Patch updated:
http://cr.openjdk.java.net/~psandoz/jdk9/JDK-8071597-take-drop-while/webrev/ See below for a diff. Paul. diff -r 61bc2fe3b2c6 src/java.base/share/classes/java/util/stream/DoubleStream.java --- a/src/java.base/share/classes/java/util/stream/DoubleStream.java Tue Jun 09 07:10:03 2015 +0100 +++ b/src/java.base/share/classes/java/util/stream/DoubleStream.java Wed Jul 08 10:20:25 2015 +0200 @@ -300,7 +300,8 @@ * the wrapped spliterator. The returned stream preserves the execution * characteristics of this stream (namely parallel or sequential execution * as per {@link #isParallel()}) but the wrapped spliterator may choose to - * not support splitting. + * not support splitting. When the returned stream is closed, the close + * handlers for both the returned and this stream are invoked. * * @apiNote * While {@code takeWhile()} is generally a cheap operation on sequential @@ -328,7 +329,7 @@ // is safe to use as long as it configured not to split return StreamSupport.doubleStream( new WhileOps.UnorderedWhileSpliterator.OfDouble.Taking(spliterator(), true, predicate), - isParallel()); + isParallel()).onClose(this::close); } /** @@ -358,7 +359,8 @@ * the wrapped spliterator. The returned stream preserves the execution * characteristics of this stream (namely parallel or sequential execution * as per {@link #isParallel()}) but the wrapped spliterator may choose to - * not support splitting. + * not support splitting. When the returned stream is closed, the close + * handlers for both the returned and this stream are invoked. * * @apiNote * While {@code dropWhile()} is generally a cheap operation on sequential @@ -386,7 +388,7 @@ // is safe to use as long as it configured not to split return StreamSupport.doubleStream( new WhileOps.UnorderedWhileSpliterator.OfDouble.Dropping(spliterator(), true, predicate), - isParallel()); + isParallel()).onClose(this::close); } /** diff -r 61bc2fe3b2c6 src/java.base/share/classes/java/util/stream/IntStream.java --- a/src/java.base/share/classes/java/util/stream/IntStream.java Tue Jun 09 07:10:03 2015 +0100 +++ b/src/java.base/share/classes/java/util/stream/IntStream.java Wed Jul 08 10:20:25 2015 +0200 @@ -298,7 +298,8 @@ * the wrapped spliterator. The returned stream preserves the execution * characteristics of this stream (namely parallel or sequential execution * as per {@link #isParallel()}) but the wrapped spliterator may choose to - * not support splitting. + * not support splitting. When the returned stream is closed, the close + * handlers for both the returned and this stream are invoked. * * @apiNote * While {@code takeWhile()} is generally a cheap operation on sequential @@ -325,7 +326,7 @@ // is safe to use as long as it configured not to split return StreamSupport.intStream( new WhileOps.UnorderedWhileSpliterator.OfInt.Taking(spliterator(), true, predicate), - isParallel()); + isParallel()).onClose(this::close); } /** @@ -355,7 +356,8 @@ * the wrapped spliterator. The returned stream preserves the execution * characteristics of this stream (namely parallel or sequential execution * as per {@link #isParallel()}) but the wrapped spliterator may choose to - * not support splitting. + * not support splitting. When the returned stream is closed, the close + * handlers for both the returned and this stream are invoked. * * @apiNote * While {@code dropWhile()} is generally a cheap operation on sequential @@ -382,7 +384,7 @@ // is safe to use as long as it configured not to split return StreamSupport.intStream( new WhileOps.UnorderedWhileSpliterator.OfInt.Dropping(spliterator(), true, predicate), - isParallel()); + isParallel()).onClose(this::close); } /** diff -r 61bc2fe3b2c6 src/java.base/share/classes/java/util/stream/LongStream.java --- a/src/java.base/share/classes/java/util/stream/LongStream.java Tue Jun 09 07:10:03 2015 +0100 +++ b/src/java.base/share/classes/java/util/stream/LongStream.java Wed Jul 08 10:20:25 2015 +0200 @@ -298,7 +298,8 @@ * the wrapped spliterator. The returned stream preserves the execution * characteristics of this stream (namely parallel or sequential execution * as per {@link #isParallel()}) but the wrapped spliterator may choose to - * not support splitting. + * not support splitting. When the returned stream is closed, the close + * handlers for both the returned and this stream are invoked. * * @apiNote * While {@code takeWhile()} is generally a cheap operation on sequential @@ -326,7 +327,7 @@ // is safe to use as long as it configured not to split return StreamSupport.longStream( new WhileOps.UnorderedWhileSpliterator.OfLong.Taking(spliterator(), true, predicate), - isParallel()); + isParallel()).onClose(this::close); } /** @@ -356,7 +357,8 @@ * the wrapped spliterator. The returned stream preserves the execution * characteristics of this stream (namely parallel or sequential execution * as per {@link #isParallel()}) but the wrapped spliterator may choose to - * not support splitting. + * not support splitting. When the returned stream is closed, the close + * handlers for both the returned and this stream are invoked. * * @apiNote * While {@code dropWhile()} is generally a cheap operation on sequential @@ -384,7 +386,7 @@ // is safe to use as long as it configured not to split return StreamSupport.longStream( new WhileOps.UnorderedWhileSpliterator.OfLong.Dropping(spliterator(), true, predicate), - isParallel()); + isParallel()).onClose(this::close); } /** diff -r 61bc2fe3b2c6 src/java.base/share/classes/java/util/stream/Stream.java --- a/src/java.base/share/classes/java/util/stream/Stream.java Tue Jun 09 07:10:03 2015 +0100 +++ b/src/java.base/share/classes/java/util/stream/Stream.java Wed Jul 08 10:20:25 2015 +0200 @@ -505,7 +505,8 @@ * the wrapped spliterator. The returned stream preserves the execution * characteristics of this stream (namely parallel or sequential execution * as per {@link #isParallel()}) but the wrapped spliterator may choose to - * not support splitting. + * not support splitting. When the returned stream is closed, the close + * handlers for both the returned and this stream are invoked. * * @apiNote * While {@code takeWhile()} is generally a cheap operation on sequential @@ -532,7 +533,7 @@ // is safe to use as long as it configured not to split return StreamSupport.stream( new WhileOps.UnorderedWhileSpliterator.OfRef.Taking<>(spliterator(), true, predicate), - isParallel()); + isParallel()).onClose(this::close); } /** @@ -562,7 +563,8 @@ * the wrapped spliterator. The returned stream preserves the execution * characteristics of this stream (namely parallel or sequential execution * as per {@link #isParallel()}) but the wrapped spliterator may choose to - * not support splitting. + * not support splitting. When the returned stream is closed, the close + * handlers for both the returned and this stream are invoked. * * @apiNote * While {@code dropWhile()} is generally a cheap operation on sequential @@ -589,7 +591,7 @@ // is safe to use as long as it configured not to split return StreamSupport.stream( new WhileOps.UnorderedWhileSpliterator.OfRef.Dropping<>(spliterator(), true, predicate), - isParallel()); + isParallel()).onClose(this::close); } /** diff -r 61bc2fe3b2c6 test/java/util/stream/test/org/openjdk/tests/java/util/stream/WhileOpTest.java --- a/test/java/util/stream/test/org/openjdk/tests/java/util/stream/WhileOpTest.java Tue Jun 09 07:10:03 2015 +0100 +++ b/test/java/util/stream/test/org/openjdk/tests/java/util/stream/WhileOpTest.java Wed Jul 08 10:20:25 2015 +0200 @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.DefaultMethodStreams; @@ -318,4 +319,43 @@ } } + @Test + public void testRefDefaultClose() { + AtomicBoolean isClosed = new AtomicBoolean(); + Stream<Integer> s = Stream.of(1, 2, 3).onClose(() -> isClosed.set(true)); + try (Stream<Integer> ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) { + ds.count(); + } + assertTrue(isClosed.get()); + } + + @Test + public void testIntDefaultClose() { + AtomicBoolean isClosed = new AtomicBoolean(); + IntStream s = IntStream.of(1, 2, 3).onClose(() -> isClosed.set(true)); + try (IntStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) { + ds.count(); + } + assertTrue(isClosed.get()); + } + + @Test + public void testLongDefaultClose() { + AtomicBoolean isClosed = new AtomicBoolean(); + LongStream s = LongStream.of(1, 2, 3).onClose(() -> isClosed.set(true)); + try (LongStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) { + ds.count(); + } + assertTrue(isClosed.get()); + } + + @Test + public void testDoubleDefaultClose() { + AtomicBoolean isClosed = new AtomicBoolean(); + DoubleStream s = DoubleStream.of(1, 2, 3).onClose(() -> isClosed.set(true)); + try (DoubleStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) { + ds.count(); + } + assertTrue(isClosed.get()); + } } On Jul 8, 2015, at 4:58 AM, Tagir F. Valeev <[email protected]> wrote: > Hello, > > I was looking at default implementation of new Stream > takeWhile/dropWhile methods: > > http://cr.openjdk.java.net/~psandoz/jdk9/JDK-8071597-take-drop-while/webrev/src/java.base/share/classes/java/util/stream/Stream.java.cdiff.html > > I think in order to make this default behavior more consistent, the > original Stream must be closed when the returned one is closed. Thus, > takeWhile return statement should look like this: > > return StreamSupport.stream( > new WhileOps.UnorderedWhileSpliterator.OfRef.Taking<> > (spliterator(), true, predicate), > isParallel()).onClose(this::close); > > The same for dropWhile method. > > What do you think? > > Regards, > Tagir Valeev. >
