This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch asJavaStream
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit aa703832b72224796775342ee4631cc81d742f75
Author: He-Pin <[email protected]>
AuthorDate: Sat Sep 6 17:08:34 2025 +0800

    feat: Add asJavaStream to Sink
---
 .../org/apache/pekko/stream/io/SinkAsJavaSourceTest.java  |  8 ++++++++
 .../pekko/stream/scaladsl/SinkAsJavaStreamSpec.scala      |  3 +++
 .../main/scala/org/apache/pekko/stream/javadsl/Sink.scala | 15 +++++++++++++++
 .../scala/org/apache/pekko/stream/scaladsl/Sink.scala     | 15 +++++++++++++++
 4 files changed, 41 insertions(+)

diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/io/SinkAsJavaSourceTest.java
 
b/stream-tests/src/test/java/org/apache/pekko/stream/io/SinkAsJavaSourceTest.java
index 054e33a6c4..e56d3ea4cd 100644
--- 
a/stream-tests/src/test/java/org/apache/pekko/stream/io/SinkAsJavaSourceTest.java
+++ 
b/stream-tests/src/test/java/org/apache/pekko/stream/io/SinkAsJavaSourceTest.java
@@ -44,4 +44,12 @@ public class SinkAsJavaSourceTest extends StreamTest {
     java.util.stream.Stream<Integer> javaStream = 
Source.from(list).runWith(streamSink, system);
     assertEquals(list, javaStream.collect(Collectors.toList()));
   }
+
+  @Test
+  public void mustBeAbleToUseAsJavaStreamOnSink() throws Exception {
+    final List<Integer> list = Arrays.asList(1, 2, 3);
+    java.util.stream.Stream<Integer> javaStream =
+        Source.from(list).runWith(Sink.asJavaStream(), system);
+    assertEquals(list, javaStream.collect(Collectors.toList()));
+  }
 }
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkAsJavaStreamSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkAsJavaStreamSpec.scala
index a9ace43651..a15ee557d1 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkAsJavaStreamSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkAsJavaStreamSpec.scala
@@ -32,6 +32,9 @@ class SinkAsJavaStreamSpec extends 
StreamSpec(UnboundedMailboxConfig) {
     "work in happy case" in {
       val javaSource = Source(1 to 
100).runWith(StreamConverters.asJavaStream())
       javaSource.count() should ===(100L)
+      //
+      Source(1 to 100).runWith(Sink.asJavaStream())
+        .count() should ===(100L)
     }
 
     "fail if parent stream is failed" in {
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index 76db0ef98c..ca059f90c2 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -212,6 +212,21 @@ object Sink {
   def asPublisher[T](fanout: AsPublisher): Sink[T, Publisher[T]] =
     new Sink(scaladsl.Sink.asPublisher(fanout == AsPublisher.WITH_FANOUT))
 
+  /**
+   * Creates a sink which materializes into Java 8 ``Stream`` that can be run 
to trigger demand through the sink.
+   * Elements emitted through the stream will be available for reading through 
the Java 8 ``Stream``.
+   *
+   * The Java 8 ``Stream`` will be ended when the stream flowing into this 
``Sink`` completes, and closing the Java
+   * ``Stream`` will cancel the inflow of this ``Sink``.
+   *
+   * Java 8 ``Stream`` throws exception in case reactive stream failed.
+   *
+   * Be aware that Java ``Stream`` blocks current thread while waiting on next 
element from downstream.
+   * As it is interacting wit blocking API the implementation runs on a 
separate dispatcher
+   * configured through the ``pekko.stream.blocking-io-dispatcher``.
+   */
+  def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = new 
Sink(scaladsl.StreamConverters.asJavaStream())
+
   /**
    * A `Sink` that will invoke the given procedure for each received element. 
The sink is materialized
    * into a [[java.util.concurrent.CompletionStage]] which will be completed 
with `Success` when reaching the
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
index 5112a69db2..c337f074f7 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
@@ -292,6 +292,21 @@ object Sink {
       if (fanout) new 
FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, 
shape("FanoutPublisherSink"))
       else new PublisherSink[T](DefaultAttributes.publisherSink, 
shape("PublisherSink")))
 
+  /**
+   * Creates a sink which materializes into Java 8 ``Stream`` that can be run 
to trigger demand through the sink.
+   * Elements emitted through the stream will be available for reading through 
the Java 8 ``Stream``.
+   *
+   * The Java 8 ``Stream`` will be ended when the stream flowing into this 
``Sink`` completes, and closing the Java
+   * ``Stream`` will cancel the inflow of this ``Sink``.
+   *
+   * If the Java 8 ``Stream`` throws exception the Pekko stream is cancelled.
+   *
+   * Be aware that Java ``Stream`` blocks current thread while waiting on next 
element from downstream.
+   * As it is interacting wit blocking API the implementation runs on a 
separate dispatcher
+   * configured through the ``pekko.stream.blocking-io-dispatcher``.
+   */
+  def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = 
StreamConverters.asJavaStream()
+
   /**
    * A `Sink` that will consume the stream and discard the elements.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to