This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch asJavaStream in repository https://gitbox.apache.org/repos/asf/pekko.git
commit aa703832b72224796775342ee4631cc81d742f75 Author: He-Pin <[email protected]> AuthorDate: Sat Sep 6 17:08:34 2025 +0800 feat: Add asJavaStream to Sink --- .../org/apache/pekko/stream/io/SinkAsJavaSourceTest.java | 8 ++++++++ .../pekko/stream/scaladsl/SinkAsJavaStreamSpec.scala | 3 +++ .../main/scala/org/apache/pekko/stream/javadsl/Sink.scala | 15 +++++++++++++++ .../scala/org/apache/pekko/stream/scaladsl/Sink.scala | 15 +++++++++++++++ 4 files changed, 41 insertions(+) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/io/SinkAsJavaSourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/io/SinkAsJavaSourceTest.java index 054e33a6c4..e56d3ea4cd 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/io/SinkAsJavaSourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/io/SinkAsJavaSourceTest.java @@ -44,4 +44,12 @@ public class SinkAsJavaSourceTest extends StreamTest { java.util.stream.Stream<Integer> javaStream = Source.from(list).runWith(streamSink, system); assertEquals(list, javaStream.collect(Collectors.toList())); } + + @Test + public void mustBeAbleToUseAsJavaStreamOnSink() throws Exception { + final List<Integer> list = Arrays.asList(1, 2, 3); + java.util.stream.Stream<Integer> javaStream = + Source.from(list).runWith(Sink.asJavaStream(), system); + assertEquals(list, javaStream.collect(Collectors.toList())); + } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkAsJavaStreamSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkAsJavaStreamSpec.scala index a9ace43651..a15ee557d1 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkAsJavaStreamSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkAsJavaStreamSpec.scala @@ -32,6 +32,9 @@ class SinkAsJavaStreamSpec extends StreamSpec(UnboundedMailboxConfig) { "work in happy case" in { val javaSource = Source(1 to 100).runWith(StreamConverters.asJavaStream()) javaSource.count() should ===(100L) + // + Source(1 to 100).runWith(Sink.asJavaStream()) + .count() should ===(100L) } "fail if parent stream is failed" in { diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index 76db0ef98c..ca059f90c2 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -212,6 +212,21 @@ object Sink { def asPublisher[T](fanout: AsPublisher): Sink[T, Publisher[T]] = new Sink(scaladsl.Sink.asPublisher(fanout == AsPublisher.WITH_FANOUT)) + /** + * Creates a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink. + * Elements emitted through the stream will be available for reading through the Java 8 ``Stream``. + * + * The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java + * ``Stream`` will cancel the inflow of this ``Sink``. + * + * Java 8 ``Stream`` throws exception in case reactive stream failed. + * + * Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream. + * As it is interacting wit blocking API the implementation runs on a separate dispatcher + * configured through the ``pekko.stream.blocking-io-dispatcher``. + */ + def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = new Sink(scaladsl.StreamConverters.asJavaStream()) + /** * A `Sink` that will invoke the given procedure for each received element. The sink is materialized * into a [[java.util.concurrent.CompletionStage]] which will be completed with `Success` when reaching the diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index 5112a69db2..c337f074f7 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -292,6 +292,21 @@ object Sink { if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink")) else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink"))) + /** + * Creates a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink. + * Elements emitted through the stream will be available for reading through the Java 8 ``Stream``. + * + * The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java + * ``Stream`` will cancel the inflow of this ``Sink``. + * + * If the Java 8 ``Stream`` throws exception the Pekko stream is cancelled. + * + * Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream. + * As it is interacting wit blocking API the implementation runs on a separate dispatcher + * configured through the ``pekko.stream.blocking-io-dispatcher``. + */ + def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = StreamConverters.asJavaStream() + /** * A `Sink` that will consume the stream and discard the elements. */ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
