This is an automated email from the ASF dual-hosted git repository.

mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new adc1d669ce feat: Add materializeIntoSource
adc1d669ce is described below

commit adc1d669cef6ca724b18c17aa80be331b493508a
Author: Matthew de Detrich <[email protected]>
AuthorDate: Fri May 2 14:35:25 2025 +0200

    feat: Add materializeIntoSource
---
 .../Source-or-Flow/materializeIntoSource.md        | 15 +++++++++++++++
 docs/src/main/paradox/stream/operators/index.md    |  2 ++
 .../org/apache/pekko/stream/javadsl/FlowTest.java  | 12 ++++++++++++
 .../apache/pekko/stream/javadsl/SourceTest.java    | 10 ++++++++++
 .../apache/pekko/stream/scaladsl/FlowSpec.scala    |  6 ++++++
 .../apache/pekko/stream/scaladsl/SourceSpec.scala  |  7 +++++++
 .../org/apache/pekko/stream/javadsl/Flow.scala     | 22 ++++++++++++++++++++++
 .../org/apache/pekko/stream/javadsl/Source.scala   | 16 ++++++++++++++++
 .../org/apache/pekko/stream/scaladsl/Flow.scala    | 20 ++++++++++++++++++++
 .../org/apache/pekko/stream/scaladsl/Source.scala  | 15 +++++++++++++++
 10 files changed, 125 insertions(+)

diff --git 
a/docs/src/main/paradox/stream/operators/Source-or-Flow/materializeIntoSource.md
 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/materializeIntoSource.md
new file mode 100644
index 0000000000..7c43e4d3fb
--- /dev/null
+++ 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/materializeIntoSource.md
@@ -0,0 +1,15 @@
+# materializeIntoSource
+
+Materializes this Graph, immediately returning its materialized values into a 
new Source.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Source.materializeIntoSource](Source) { 
scala="#materializeIntoSource[Mat2](sink:org.apache.pekko.stream.Graph[org.apache.pekko.stream.SinkShape[Out],scala.concurrent.Future[Mat2]]):org.apache.pekko.stream.scaladsl.Source[Mat2,scala.concurrent.Future[org.apache.pekko.NotUsed]]"
 java="#materializeIntoSource(org.apache.pekko.stream.Graph)" }
+@apidoc[Flow.materializeIntoSource](Flow) { 
scala="#materializeIntoSource[Mat1,Mat2](source:org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[In],Mat1],sink:org.apache.pekko.stream.Graph[org.apache.pekko.stream.SinkShape[Out],scala.concurrent.Future[Mat2]]):org.apache.pekko.stream.scaladsl.Source[Mat2,scala.concurrent.Future[org.apache.pekko.NotUsed]]"
 java="#materialize(org.apache.pekko.actor.ClassicActorSystemProvider)" 
java="#materializeIntoSource(org.apache.pekko.stre [...]
+
+
+## Description
+
+Materializes this Graph, immediately returning its materialized values into a 
new Source.
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index 120cc91df0..8f46341db3 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -172,6 +172,7 @@ depending on being backpressured by downstream or not.
 |Source/Flow|<a name="map"></a>@ref[map](Source-or-Flow/map.md)|Transform each 
element in the stream by calling a mapping function with it and passing the 
returned value downstream.|
 |Source/Flow|<a 
name="mapconcat"></a>@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform 
each element into zero or more elements that are individually passed 
downstream.|
 |Source/Flow|<a 
name="mapwithresource"></a>@ref[mapWithResource](Source-or-Flow/mapWithResource.md)|Map
 elements with the help of a resource that can be opened, transform each 
element (in a blocking way) and closed.|
+|Source/Flow|<a 
name="materializeintosource"></a>@ref[materializeIntoSource](Source-or-Flow/materializeIntoSource.md)|Materializes
 this Graph, immediately returning its materialized values into a new Source.|
 |Source/Flow|<a 
name="optionalvia"></a>@ref[optionalVia](Source-or-Flow/optionalVia.md)|For a 
stream containing optional elements, transforms each element by applying the 
given `viaFlow` and passing the value downstream as an optional value.|
 |Source/Flow|<a 
name="prematerialize"></a>@ref[preMaterialize](Source-or-Flow/preMaterialize.md)|Materializes
 this Graph, immediately returning (1) its materialized value, and (2) a new 
pre-materialized Graph.|
 |Source/Flow|<a name="reduce"></a>@ref[reduce](Source-or-Flow/reduce.md)|Start 
with first element and then apply the current and next value to the given 
function, when upstream complete the current value is emitted downstream.|
@@ -529,6 +530,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [mapConcat](Source-or-Flow/mapConcat.md)
 * [mapError](Source-or-Flow/mapError.md)
 * [mapWithResource](Source-or-Flow/mapWithResource.md)
+* [materializeIntoSource](Source-or-Flow/materializeIntoSource.md)
 * [maybe](Source/maybe.md)
 * [merge](Source-or-Flow/merge.md)
 * [mergeAll](Source-or-Flow/mergeAll.md)
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index 0a45acc2bd..c8cb7f06fc 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -1692,6 +1692,18 @@ public class FlowTest extends StreamTest {
         Flow.of(Integer.class).divertToMat(Sink.ignore(), e -> true, (i, n) -> 
"foo");
   }
 
+  @Test
+  public void mustBeAbleToUseMaterializeIntoSource() throws Exception {
+    final Flow<Integer, Integer, NotUsed> flow = Flow.create();
+
+    final Source<List<Integer>, CompletionStage<NotUsed>> source =
+        flow.map(i -> i * 
2).materializeIntoSource(Source.from(Arrays.asList(1, 2, 3)), Sink.seq());
+
+    final CompletionStage<List<Integer>> resultList = 
source.runWith(Sink.head(), system);
+
+    assertEquals(Arrays.asList(2, 4, 6), 
resultList.toCompletableFuture().get(1, TimeUnit.SECONDS));
+  }
+
   @Test
   public void mustBeAbleToUseLazyInit() throws Exception {
     final CompletionStage<Flow<Integer, Integer, NotUsed>> future = new 
CompletableFuture<>();
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index e6bbd01be0..ed9946234d 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -1441,6 +1441,16 @@ public class SourceTest extends StreamTest {
         Source.<Integer>empty().preMaterialize(system);
   }
 
+  @Test
+  public void mustBeAbleToUseMaterializeIntoSource() throws Exception {
+    final List<Integer> input = Arrays.asList(1, 2, 3);
+    final Source<List<Integer>, CompletionStage<NotUsed>> source =
+        Source.from(input).materializeIntoSource(Sink.seq());
+    final CompletionStage<List<Integer>> resultList = 
source.runWith(Sink.head(), system);
+
+    assertEquals(input, resultList.toCompletableFuture().get(1, 
TimeUnit.SECONDS));
+  }
+
   @Test
   public void mustBeAbleToConvertToJavaInJava() {
     final org.apache.pekko.stream.scaladsl.Source<Integer, NotUsed> 
scalaSource =
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
index 3c918db1ec..0364f130cb 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
@@ -639,6 +639,12 @@ class FlowSpec extends 
StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r
       counter.get() should (be(0))
     }
 
+    "materialize into source" in {
+      val source = Flow[Int].map(_ * 2)
+        .materializeIntoSource(Source(List(1, 2, 3)), Sink.seq)
+
+      source.runWith(Sink.head).futureValue should ===(List(2, 4, 6))
+    }
   }
 
   object TestException extends RuntimeException with NoStackTrace
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index a54a17a4c7..cb37d951e6 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -526,6 +526,13 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
 
       a[RuntimeException] shouldBe 
thrownBy(matValPoweredSource.preMaterialize())
     }
+
+    "materialize into source" in {
+      val input = List(1, 2, 3)
+      val source = Source(input).materializeIntoSource(Sink.seq)
+
+      source.runWith(Sink.head).futureValue should ===(input)
+    }
   }
 
   "Source.futureSource" must {
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 150096dc65..4eedf5f2ca 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -622,6 +622,28 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, 
Out, Mat]) extends Gr
     pekko.japi.Pair(som, sim)
   }
 
+  /**
+   * Connects the [[Source]] to this [[Flow]] and materializes it using the 
[[Sink]], immediately returning the values
+   * via the provided [[Sink]] as a new [[Source]].
+   *
+   * @param source A source that connects to this flow
+   * @param sink A sink which needs to materialize into a [[CompletionStage]], 
typically one
+   *             that collects values such as [[Sink.head]] or [[Sink.seq]]
+   * @return A new [[Source]] that contains the results of the [[Flow]] with 
the provided
+   *         [[Source]]'s elements run with the [[Sink]]
+   * @since 1.2.0
+   */
+  def materializeIntoSource[Mat1, Mat2](source: Graph[SourceShape[In], Mat1],
+      sink: Graph[SinkShape[Out], CompletionStage[Mat2]])
+      : Source[Mat2, CompletionStage[NotUsed]] = {
+    Source.fromMaterializer { (mat, attr) =>
+      Source.completionStage(
+        Source.fromGraph(source).via(this).withAttributes(attr).toMat(sink,
+          Keep.right[Mat1, CompletionStage[Mat2]]).run(mat)
+      )
+    }
+  }
+
   /**
    * Connect the `Source` to this `Flow` and then connect it to the `Sink` and 
run it.
    *
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 2673791b3c..13b473e554 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -876,6 +876,22 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
     Pair(mat, new Source(src))
   }
 
+  /**
+   * Materializes this [[Source]] using the [[Sink]], immediately returning 
the values via the
+   * provided [[Sink]] as a new [[Source]].
+   *
+   * @param sink A sink which needs to materialize into a [[CompletionStage]], 
typically one
+   *             that collects values such as [[Sink.head]] or [[Sink.seq]]
+   * @return A new [[Source]] that contains the results of the provided 
[[Source]]'s
+   *         elements run with the [[Sink]]
+   * @since 1.2.0
+   */
+  def materializeIntoSource[Mat2](
+      sink: Graph[SinkShape[Out], CompletionStage[Mat2]]): Source[Mat2, 
CompletionStage[NotUsed]] =
+    Source.fromMaterializer { (mat, attr) =>
+      Source.completionStage(this.withAttributes(attr).toMat(sink, 
Keep.right[Mat, CompletionStage[Mat2]]).run(mat))
+    }
+
   /**
    * Transform this [[Source]] by appending the given processing operators.
    * {{{
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 5ecbd5c81a..49c8ce7d07 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -172,6 +172,26 @@ final class Flow[-In, +Out, +Mat](
     (mat, Flow.fromSinkAndSource(Sink.fromSubscriber(sub), 
Source.fromPublisher(pub)))
   }
 
+  /**
+   * Connects the [[Source]] to this [[Flow]] and materializes it using the 
[[Sink]], immediately returning the values
+   * via the provided [[Sink]] as a new [[Source]].
+   *
+   * @param source A source that connects to this flow
+   * @param sink A sink which needs to materialize into a [[Future]], 
typically one
+   *             that collects values such as [[Sink.head]] or [[Sink.seq]]
+   * @return A new [[Source]] that contains the results of the [[Flow]] with 
the provided
+   *         [[Source]]'s elements run with the [[Sink]]
+   * @since 1.2.0
+   */
+  def materializeIntoSource[Mat1, Mat2](source: Graph[SourceShape[In], Mat1],
+      sink: Graph[SinkShape[Out], Future[Mat2]])
+      : Source[Mat2, Future[NotUsed]] =
+    Source.fromMaterializer { (mat, attr) =>
+      Source.future(
+        
Source.fromGraph(source).via(this).withAttributes(attr).toMat(sink)(Keep.right).run()(mat)
+      )
+    }
+
   /**
    * Transform this Flow by applying a function to each *incoming* upstream 
element before
    * it is passed to the [[Flow]]
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 54ace5a111..9dc14dd4f1 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -109,6 +109,21 @@ final class Source[+Out, +Mat](
     (mat, Source.fromPublisher(pub))
   }
 
+  /**
+   * Materializes this [[Source]] using the [[Sink]], immediately returning 
the values via the
+   * provided [[Sink]] as a new [[Source]].
+   *
+   * @param sink A sink which needs to materialize into a [[Future]], 
typically one
+   *             that collects values such as [[Sink.head]] or [[Sink.seq]]
+   * @return A new [[Source]] that contains the results of the provided 
[[Source]]'s
+   *         elements run with the [[Sink]]
+   * @since 1.2.0
+   */
+  def materializeIntoSource[Mat2](sink: Graph[SinkShape[Out], Future[Mat2]]): 
Source[Mat2, Future[NotUsed]] =
+    Source.fromMaterializer { (mat, attr) =>
+      
Source.future(this.withAttributes(attr).toMat(sink)(Keep.right).run()(mat))
+    }
+
   /**
    * Connect this `Source` to the `Sink.ignore` and run it. Elements from the 
stream will be consumed and discarded.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to