This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new a159aee6d8 +str Add combine seq method to Source and Sink.
a159aee6d8 is described below
commit a159aee6d84513e0a3d45a25702a047def829646
Author: He-Pin <[email protected]>
AuthorDate: Sun Dec 17 17:56:44 2023 +0800
+str Add combine seq method to Source and Sink.
---
.../org/apache/pekko/stream/javadsl/SinkTest.java | 38 +++++++++++++++++
.../apache/pekko/stream/javadsl/SourceTest.java | 13 ++++++
.../apache/pekko/stream/scaladsl/SinkSpec.scala | 34 +++++++++++++--
.../apache/pekko/stream/scaladsl/SourceSpec.scala | 8 ++++
.../org/apache/pekko/stream/javadsl/Sink.scala | 47 ++++++++++++++++++---
.../org/apache/pekko/stream/javadsl/Source.scala | 34 ++++++++++++---
.../org/apache/pekko/stream/scaladsl/Sink.scala | 49 +++++++++++++++++++---
.../org/apache/pekko/stream/scaladsl/Source.scala | 45 +++++++++++++++-----
8 files changed, 236 insertions(+), 32 deletions(-)
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
index 287a6d213e..cdcfd588bd 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
@@ -26,12 +26,15 @@ import org.apache.pekko.NotUsed;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.japi.function.Function;
import org.apache.pekko.stream.*;
+import org.apache.pekko.stream.testkit.TestSubscriber;
+import org.apache.pekko.stream.testkit.javadsl.TestSink;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.junit.ClassRule;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.apache.pekko.testkit.PekkoSpec;
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
+import org.reactivestreams.Subscription;
import static org.junit.Assert.*;
@@ -127,6 +130,41 @@ public class SinkTest extends StreamTest {
probe2.expectMsgEquals("done2");
}
+ @Test
+ public void mustBeAbleToUseCombineMat() {
+ final Sink<Integer, TestSubscriber.Probe<Integer>> sink1 =
TestSink.create(system);
+ final Sink<Integer, TestSubscriber.Probe<Integer>> sink2 =
TestSink.create(system);
+ final Sink<Integer, Pair<TestSubscriber.Probe<Integer>,
TestSubscriber.Probe<Integer>>> sink =
+ Sink.combineMat(sink1, sink2, Broadcast::create, Keep.both());
+
+ final Pair<TestSubscriber.Probe<Integer>, TestSubscriber.Probe<Integer>>
subscribers =
+ Source.from(Arrays.asList(0, 1)).runWith(sink, system);
+ final TestSubscriber.Probe<Integer> subscriber1 = subscribers.first();
+ final TestSubscriber.Probe<Integer> subscriber2 = subscribers.second();
+ final Subscription sub1 = subscriber1.expectSubscription();
+ final Subscription sub2 = subscriber2.expectSubscription();
+ sub1.request(2);
+ sub2.request(2);
+ subscriber1.expectNext(0, 1).expectComplete();
+ subscriber2.expectNext(0, 1).expectComplete();
+ }
+
+ @Test
+ public void mustBeAbleToUseCombineMany() throws Exception {
+ final Sink<Long, CompletionStage<Long>> firstSink = Sink.head();
+ final Sink<Long, CompletionStage<Long>> secondSink = Sink.head();
+ final Sink<Long, CompletionStage<Long>> thirdSink = Sink.head();
+
+ final Sink<Long, List<CompletionStage<Long>>> combineSink =
+ Sink.combine(Arrays.asList(firstSink, secondSink, thirdSink),
Broadcast::create);
+ final List<CompletionStage<Long>> results =
+ Source.single(1L).toMat(combineSink, Keep.right()).run(system);
+ for (CompletionStage<Long> result : results) {
+ final long value = result.toCompletableFuture().get(3, TimeUnit.SECONDS);
+ assertEquals(1L, value);
+ }
+ }
+
@Test
public void mustBeAbleToUseContramap() throws Exception {
List<Integer> out =
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index b316b96a9d..f385ecb807 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -968,6 +968,19 @@ public class SourceTest extends StreamTest {
probe.expectMsgAllOf(0, 1, 2, 3);
}
+ @Test
+ public void mustBeAbleToCombineN() throws Exception {
+ final Source<Integer, NotUsed> source1 = Source.single(1);
+ final Source<Integer, NotUsed> source2 = Source.single(2);
+ final Source<Integer, NotUsed> source3 = Source.single(3);
+ final List<Source<Integer, NotUsed>> sources = Arrays.asList(source1,
source2, source3);
+ final CompletionStage<Integer> result =
+ Source.combine(sources, Concat::create)
+ .runWith(Sink.collect(Collectors.toList()), system)
+ .thenApply(list -> list.stream().mapToInt(l -> l).sum());
+ assertEquals(6, result.toCompletableFuture().get(3,
TimeUnit.SECONDS).intValue());
+ }
+
@SuppressWarnings("unchecked")
@Test
public void mustBeAbleToZipN() throws Exception {
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
index c0b32c0116..e5f0202b5c 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
@@ -17,16 +17,16 @@ import scala.annotation.nowarn
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
-import org.reactivestreams.Publisher
-import org.scalatest.concurrent.ScalaFutures
-
import org.apache.pekko
+import org.scalatest.concurrent.ScalaFutures
import pekko.Done
import pekko.stream._
import pekko.stream.testkit._
import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
import pekko.testkit.DefaultTimeout
+import org.reactivestreams.Publisher
+
class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
import GraphDSL.Implicits._
@@ -138,6 +138,34 @@ class SinkSpec extends StreamSpec with DefaultTimeout with
ScalaFutures {
}
}
+ "combine many sinks to one" in {
+ val source = Source(List(0, 1, 2, 3, 4, 5))
+ implicit val ex = org.apache.pekko.dispatch.ExecutionContexts.parasitic
+ val sink = Sink
+ .combine(
+ List(
+ Sink.reduce[Int]((a, b) => a + b),
+ Sink.reduce[Int]((a, b) => a + b),
+ Sink.reduce[Int]((a, b) => a + b)))(Broadcast[Int](_))
+ .mapMaterializedValue(Future.reduceLeft(_)(_ + _))
+ val result = source.runWith(sink)
+ result.futureValue should be(45)
+ }
+
+ "combine two sinks with combineMat" in {
+ implicit val ex = org.apache.pekko.dispatch.ExecutionContexts.parasitic
+ Source(List(0, 1, 2, 3, 4, 5))
+ .toMat(Sink.combineMat(Sink.reduce[Int]((a, b) => a + b),
Sink.reduce[Int]((a, b) => a + b))(Broadcast[Int](_))(
+ (f1, f2) => {
+ for {
+ r1 <- f1
+ r2 <- f2
+ } yield r1 + r2
+ }))(Keep.right)
+ .run()
+ .futureValue should be(30)
+ }
+
"combine to two sinks with simplified API" in {
val probes = Seq.fill(2)(TestSubscriber.manualProbe[Int]())
val sink = Sink.combine(Sink.fromSubscriber(probes(0)),
Sink.fromSubscriber(probes(1)))(Broadcast[Int](_))
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index a24e492269..eee8ce96ee 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -150,6 +150,14 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
out.expectComplete()
}
+ "combine many sources into one" in {
+ val sources = Vector.tabulate(5)(_ => Source.maybe[Int])
+ val (promises, sub) =
Source.combine(sources)(Concat(_)).toMat(TestSink.probe[Int])(Keep.both).run()
+ for ((promise, idx) <- promises.zipWithIndex)
+ promise.success(Some(idx))
+ sub.request(5).expectNextN(0 to 4).expectComplete()
+ }
+
"combine from two inputs with simplified API" in {
val probes = immutable.Seq.fill(2)(TestPublisher.manualProbe[Int]())
val source = Source.fromPublisher(probes(0)) ::
Source.fromPublisher(probes(1)) :: Nil
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index b5b63060c5..7bbbcd782f 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -19,21 +19,19 @@ import java.util.concurrent.CompletionStage
import java.util.function.BiFunction
import java.util.stream.Collector
+import scala.annotation.nowarn
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.util.Try
-import org.reactivestreams.Publisher
-import org.reactivestreams.Subscriber
-
import org.apache.pekko
import pekko._
import pekko.actor.ActorRef
import pekko.actor.ClassicActorSystemProvider
import pekko.actor.Status
import pekko.dispatch.ExecutionContexts
-import pekko.japi.function
+import pekko.japi.{ function, Util }
import pekko.japi.function.Creator
import pekko.stream._
import pekko.stream.impl.LinearTraversalBuilder
@@ -43,6 +41,9 @@ import pekko.stream.scaladsl.SinkToCompletionStage
import pekko.util.FutureConverters._
import pekko.util.OptionConverters._
+import org.reactivestreams.Publisher
+import org.reactivestreams.Subscriber
+
/** Java API */
object Sink {
@@ -372,10 +373,44 @@ object Sink {
output1: Sink[U, _],
output2: Sink[U, _],
rest: java.util.List[Sink[U, _]],
- strategy: function.Function[java.lang.Integer,
Graph[UniformFanOutShape[T, U], NotUsed]]): Sink[T, NotUsed] = {
+ @nowarn
+ @deprecatedName(Symbol("strategy"))
+ fanOutStrategy: function.Function[java.lang.Integer,
Graph[UniformFanOutShape[T, U], NotUsed]])
+ : Sink[T, NotUsed] = {
import pekko.util.ccompat.JavaConverters._
val seq = if (rest != null) rest.asScala.map(_.asScala).toSeq else
immutable.Seq()
- new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq:
_*)(num => strategy.apply(num)))
+ new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq:
_*)(num => fanOutStrategy.apply(num)))
+ }
+
+ /**
+ * Combine two sinks with fan-out strategy like `Broadcast` or `Balance` and
returns `Sink` with 2 outlets.
+ * @since 1.1.0
+ */
+ def combineMat[T, U, M1, M2, M](
+ first: Sink[U, M1],
+ second: Sink[U, M2],
+ fanOutStrategy: function.Function[java.lang.Integer,
Graph[UniformFanOutShape[T, U], NotUsed]],
+ matF: function.Function2[M1, M2, M]): Sink[T, M] = {
+ new Sink(
+ scaladsl.Sink.combineMat(first.asScala, second.asScala)(size =>
fanOutStrategy(size))(combinerToScala(matF)))
+ }
+
+ /**
+ * Combine several sinks with fan-out strategy like `Broadcast` or `Balance`
and returns `Sink`.
+ * The fanoutGraph's outlets size must match the provides sinks'.
+ * @since 1.1.0
+ */
+ def combine[T, U, M](
+ sinks: java.util.List[_ <: Graph[SinkShape[U], M]],
+ fanOutStrategy: function.Function[java.lang.Integer,
Graph[UniformFanOutShape[T, U], NotUsed]])
+ : Sink[T, java.util.List[M]] = {
+ val seq = if (sinks != null) Util.immutableSeq(sinks).collect {
+ case sink: Sink[U @unchecked, M @unchecked] => sink.asScala
+ case other => other
+ }
+ else immutable.Seq()
+ import org.apache.pekko.util.ccompat.JavaConverters._
+ new Sink(scaladsl.Sink.combine(seq)(size =>
fanOutStrategy(size)).mapMaterializedValue(_.asJava))
}
/**
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index ae01282622..1f6e80d60d 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -25,8 +25,6 @@ import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
-import org.reactivestreams.{ Publisher, Subscriber }
-
import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider }
@@ -43,6 +41,8 @@ import pekko.util.JavaDurationConverters._
import pekko.util.OptionConverters._
import pekko.util.ccompat.JavaConverters._
+import org.reactivestreams.{ Publisher, Subscriber }
+
/** Java API */
object Source {
private[this] val _empty = new Source[Any, NotUsed](scaladsl.Source.empty)
@@ -656,10 +656,12 @@ object Source {
first: Source[T, _ <: Any],
second: Source[T, _ <: Any],
rest: java.util.List[Source[T, _ <: Any]],
- strategy: function.Function[java.lang.Integer, _ <:
Graph[UniformFanInShape[T, U], NotUsed]])
+ @nowarn
+ @deprecatedName(Symbol("strategy"))
+ fanInStrategy: function.Function[java.lang.Integer, _ <:
Graph[UniformFanInShape[T, U], NotUsed]])
: Source[U, NotUsed] = {
val seq = if (rest != null) Util.immutableSeq(rest).map(_.asScala) else
immutable.Seq()
- new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq:
_*)(num => strategy.apply(num)))
+ new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq:
_*)(num => fanInStrategy.apply(num)))
}
/**
@@ -668,10 +670,30 @@ object Source {
def combineMat[T, U, M1, M2, M](
first: Source[T, M1],
second: Source[T, M2],
- strategy: function.Function[java.lang.Integer, _ <:
Graph[UniformFanInShape[T, U], NotUsed]],
+ @nowarn
+ @deprecatedName(Symbol("strategy"))
+ fanInStrategy: function.Function[java.lang.Integer, _ <:
Graph[UniformFanInShape[T, U], NotUsed]],
combine: function.Function2[M1, M2, M]): Source[U, M] = {
new Source(
- scaladsl.Source.combineMat(first.asScala, second.asScala)(num =>
strategy.apply(num))(combinerToScala(combine)))
+ scaladsl.Source.combineMat(first.asScala, second.asScala)(num =>
fanInStrategy.apply(num))(
+ combinerToScala(combine)))
+ }
+
+ /**
+ * Combines several sources with fan-in strategy like [[Merge]] or
[[Concat]] into a single [[Source]].
+ * @since 1.1.0
+ */
+ def combine[T, U, M](
+ sources: java.util.List[_ <: Graph[SourceShape[T], M]],
+ fanInStrategy: function.Function[java.lang.Integer,
Graph[UniformFanInShape[T, U], NotUsed]])
+ : Source[U, java.util.List[M]] = {
+ val seq = if (sources != null) Util.immutableSeq(sources).collect {
+ case source: Source[T @unchecked, M @unchecked] => source.asScala
+ case other => other
+ }
+ else immutable.Seq()
+ import org.apache.pekko.util.ccompat.JavaConverters._
+ new Source(scaladsl.Source.combine(seq)(size =>
fanInStrategy(size)).mapMaterializedValue(_.asJava))
}
/**
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
index 7e379a350b..099c56425f 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
@@ -13,7 +13,7 @@
package org.apache.pekko.stream.scaladsl
-import scala.annotation.tailrec
+import scala.annotation.{ nowarn, tailrec }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.ExecutionContext
@@ -22,9 +22,6 @@ import scala.util.Failure
import scala.util.Success
import scala.util.Try
-import org.reactivestreams.Publisher
-import org.reactivestreams.Subscriber
-
import org.apache.pekko
import pekko.Done
import pekko.NotUsed
@@ -40,6 +37,9 @@ import pekko.stream.javadsl
import pekko.stream.stage._
import pekko.util.ccompat._
+import org.reactivestreams.Publisher
+import org.reactivestreams.Subscriber
+
/**
* A `Sink` is a set of stream processing steps that has one open input.
* Can be used as a `Subscriber`
@@ -339,10 +339,12 @@ object Sink {
* Combine several sinks with fan-out strategy like `Broadcast` or `Balance`
and returns `Sink`.
*/
def combine[T, U](first: Sink[U, _], second: Sink[U, _], rest: Sink[U, _]*)(
- strategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T,
NotUsed] =
+ @nowarn
+ @deprecatedName(Symbol("strategy"))
+ fanOutStrategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]):
Sink[T, NotUsed] =
Sink.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
- val d = b.add(strategy(rest.size + 2))
+ val d = b.add(fanOutStrategy(rest.size + 2))
d.out(0) ~> first
d.out(1) ~> second
@@ -355,6 +357,41 @@ object Sink {
combineRest(2, rest.iterator)
})
+ /**
+ * Combine two sinks with fan-out strategy like `Broadcast` or `Balance` and
returns `Sink` with 2 outlets.
+ * @since 1.1.0
+ */
+ def combineMat[T, U, M1, M2, M](first: Sink[U, M1], second: Sink[U, M2])(
+ fanOutStrategy: Int => Graph[UniformFanOutShape[T, U], NotUsed])(matF:
(M1, M2) => M): Sink[T, M] = {
+ Sink.fromGraph(GraphDSL.createGraph(first, second)(matF) { implicit b =>
(shape1, shape2) =>
+ import GraphDSL.Implicits._
+ val d = b.add(fanOutStrategy(2))
+ d.out(0) ~> shape1
+ d.out(1) ~> shape2
+ new SinkShape[T](d.in)
+ })
+ }
+
+ /**
+ * Combine several sinks with fan-out strategy like `Broadcast` or `Balance`
and returns `Sink`.
+ * The fanoutGraph's outlets size must match the provides sinks'.
+ * @since 1.1.0
+ */
+ def combine[T, U, M](sinks: immutable.Seq[Graph[SinkShape[U], M]])(
+ fanOutStrategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]):
Sink[T, immutable.Seq[M]] =
+ sinks match {
+ case immutable.Seq() => Sink.cancelled.mapMaterializedValue(_ => Nil)
+ case immutable.Seq(sink) => sink.asInstanceOf[Sink[T,
M]].mapMaterializedValue(_ :: Nil)
+ case _ =>
+ Sink.fromGraph(GraphDSL.create(sinks) { implicit b => shapes =>
+ import GraphDSL.Implicits._
+ val c = b.add(fanOutStrategy(sinks.size))
+ for ((shape, idx) <- shapes.zipWithIndex)
+ c.out(idx) ~> shape
+ SinkShape(c.in)
+ })
+ }
+
/**
* A `Sink` that will invoke the given function to each of the elements
* as they pass in. The sink is materialized into a
[[scala.concurrent.Future]]
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 429276f36d..4fa134838e 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl
import java.util.concurrent.CompletionStage
-import scala.annotation.tailrec
+import scala.annotation.{ nowarn, tailrec }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.{ Future, Promise }
@@ -756,10 +756,12 @@ object Source {
* Combines several sources with fan-in strategy like [[Merge]] or
[[Concat]] into a single [[Source]].
*/
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T,
_]*)(
- strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U,
NotUsed] =
+ @nowarn
+ @deprecatedName(Symbol("strategy"))
+ fanInStrategy: Int => Graph[UniformFanInShape[T, U], NotUsed]):
Source[U, NotUsed] =
Source.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
- val c = b.add(strategy(rest.size + 2))
+ val c = b.add(fanInStrategy(rest.size + 2))
first ~> c.in(0)
second ~> c.in(1)
@@ -772,19 +774,40 @@ object Source {
combineRest(2, rest.iterator)
})
+ /**
+ * Combines several sources with fan-in strategy like [[Merge]] or
[[Concat]] into a single [[Source]].
+ * @since 1.1.0
+ */
+ def combine[T, U, M](sources: immutable.Seq[Graph[SourceShape[T], M]])(
+ fanInStrategy: Int => Graph[UniformFanInShape[T, U], NotUsed]):
Source[U, immutable.Seq[M]] =
+ sources match {
+ case immutable.Seq() => Source.empty.mapMaterializedValue(_ => Nil)
+ case immutable.Seq(source) => source.asInstanceOf[Source[U,
M]].mapMaterializedValue(_ :: Nil)
+ case _ =>
+ Source.fromGraph(GraphDSL.create(sources) { implicit b => shapes =>
+ import GraphDSL.Implicits._
+ val c = b.add(fanInStrategy(sources.size))
+ for ((shape, i) <- shapes.zipWithIndex) {
+ shape ~> c.in(i)
+ }
+ SourceShape(c.out)
+ })
+ }
+
/**
* Combines several sources with fan-in strategy like [[Merge]] or
[[Concat]] into a single [[Source]] with a materialized value.
*/
def combineMat[T, U, M1, M2, M](first: Source[T, M1], second: Source[T, M2])(
- strategy: Int => Graph[UniformFanInShape[T, U], NotUsed])(matF: (M1, M2)
=> M): Source[U, M] = {
- val secondPartiallyCombined = GraphDSL.createGraph(second) { implicit b =>
secondShape =>
+ @nowarn
+ @deprecatedName(Symbol("strategy"))
+ fanInStrategy: Int => Graph[UniformFanInShape[T, U], NotUsed])(matF:
(M1, M2) => M): Source[U, M] =
+ Source.fromGraph(GraphDSL.createGraph(first, second)(matF) { implicit b =>
(shape1, shape2) =>
import GraphDSL.Implicits._
- val c = b.add(strategy(2))
- secondShape ~> c.in(1)
- FlowShape(c.in(0), c.out)
- }
- first.viaMat(secondPartiallyCombined)(matF)
- }
+ val c = b.add(fanInStrategy(2))
+ shape1 ~> c.in(0)
+ shape2 ~> c.in(1)
+ SourceShape(c.out)
+ })
/**
* Combine the elements of multiple streams into a stream of sequences.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]