This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new a159aee6d8 +str Add combine seq method to Source and Sink.
a159aee6d8 is described below

commit a159aee6d84513e0a3d45a25702a047def829646
Author: He-Pin <[email protected]>
AuthorDate: Sun Dec 17 17:56:44 2023 +0800

    +str Add combine seq method to Source and Sink.
---
 .../org/apache/pekko/stream/javadsl/SinkTest.java  | 38 +++++++++++++++++
 .../apache/pekko/stream/javadsl/SourceTest.java    | 13 ++++++
 .../apache/pekko/stream/scaladsl/SinkSpec.scala    | 34 +++++++++++++--
 .../apache/pekko/stream/scaladsl/SourceSpec.scala  |  8 ++++
 .../org/apache/pekko/stream/javadsl/Sink.scala     | 47 ++++++++++++++++++---
 .../org/apache/pekko/stream/javadsl/Source.scala   | 34 ++++++++++++---
 .../org/apache/pekko/stream/scaladsl/Sink.scala    | 49 +++++++++++++++++++---
 .../org/apache/pekko/stream/scaladsl/Source.scala  | 45 +++++++++++++++-----
 8 files changed, 236 insertions(+), 32 deletions(-)

diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
index 287a6d213e..cdcfd588bd 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
@@ -26,12 +26,15 @@ import org.apache.pekko.NotUsed;
 import org.apache.pekko.japi.Pair;
 import org.apache.pekko.japi.function.Function;
 import org.apache.pekko.stream.*;
+import org.apache.pekko.stream.testkit.TestSubscriber;
+import org.apache.pekko.stream.testkit.javadsl.TestSink;
 import org.apache.pekko.testkit.javadsl.TestKit;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.reactivestreams.Publisher;
 import org.apache.pekko.testkit.PekkoSpec;
 import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
+import org.reactivestreams.Subscription;
 
 import static org.junit.Assert.*;
 
@@ -127,6 +130,41 @@ public class SinkTest extends StreamTest {
     probe2.expectMsgEquals("done2");
   }
 
+  @Test
+  public void mustBeAbleToUseCombineMat() {
+    final Sink<Integer, TestSubscriber.Probe<Integer>> sink1 = 
TestSink.create(system);
+    final Sink<Integer, TestSubscriber.Probe<Integer>> sink2 = 
TestSink.create(system);
+    final Sink<Integer, Pair<TestSubscriber.Probe<Integer>, 
TestSubscriber.Probe<Integer>>> sink =
+        Sink.combineMat(sink1, sink2, Broadcast::create, Keep.both());
+
+    final Pair<TestSubscriber.Probe<Integer>, TestSubscriber.Probe<Integer>> 
subscribers =
+        Source.from(Arrays.asList(0, 1)).runWith(sink, system);
+    final TestSubscriber.Probe<Integer> subscriber1 = subscribers.first();
+    final TestSubscriber.Probe<Integer> subscriber2 = subscribers.second();
+    final Subscription sub1 = subscriber1.expectSubscription();
+    final Subscription sub2 = subscriber2.expectSubscription();
+    sub1.request(2);
+    sub2.request(2);
+    subscriber1.expectNext(0, 1).expectComplete();
+    subscriber2.expectNext(0, 1).expectComplete();
+  }
+
+  @Test
+  public void mustBeAbleToUseCombineMany() throws Exception {
+    final Sink<Long, CompletionStage<Long>> firstSink = Sink.head();
+    final Sink<Long, CompletionStage<Long>> secondSink = Sink.head();
+    final Sink<Long, CompletionStage<Long>> thirdSink = Sink.head();
+
+    final Sink<Long, List<CompletionStage<Long>>> combineSink =
+        Sink.combine(Arrays.asList(firstSink, secondSink, thirdSink), 
Broadcast::create);
+    final List<CompletionStage<Long>> results =
+        Source.single(1L).toMat(combineSink, Keep.right()).run(system);
+    for (CompletionStage<Long> result : results) {
+      final long value = result.toCompletableFuture().get(3, TimeUnit.SECONDS);
+      assertEquals(1L, value);
+    }
+  }
+
   @Test
   public void mustBeAbleToUseContramap() throws Exception {
     List<Integer> out =
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index b316b96a9d..f385ecb807 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -968,6 +968,19 @@ public class SourceTest extends StreamTest {
     probe.expectMsgAllOf(0, 1, 2, 3);
   }
 
+  @Test
+  public void mustBeAbleToCombineN() throws Exception {
+    final Source<Integer, NotUsed> source1 = Source.single(1);
+    final Source<Integer, NotUsed> source2 = Source.single(2);
+    final Source<Integer, NotUsed> source3 = Source.single(3);
+    final List<Source<Integer, NotUsed>> sources = Arrays.asList(source1, 
source2, source3);
+    final CompletionStage<Integer> result =
+        Source.combine(sources, Concat::create)
+            .runWith(Sink.collect(Collectors.toList()), system)
+            .thenApply(list -> list.stream().mapToInt(l -> l).sum());
+    assertEquals(6, result.toCompletableFuture().get(3, 
TimeUnit.SECONDS).intValue());
+  }
+
   @SuppressWarnings("unchecked")
   @Test
   public void mustBeAbleToZipN() throws Exception {
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
index c0b32c0116..e5f0202b5c 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
@@ -17,16 +17,16 @@ import scala.annotation.nowarn
 import scala.concurrent.{ Await, Future }
 import scala.concurrent.duration._
 
-import org.reactivestreams.Publisher
-import org.scalatest.concurrent.ScalaFutures
-
 import org.apache.pekko
+import org.scalatest.concurrent.ScalaFutures
 import pekko.Done
 import pekko.stream._
 import pekko.stream.testkit._
 import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
 import pekko.testkit.DefaultTimeout
 
+import org.reactivestreams.Publisher
+
 class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
 
   import GraphDSL.Implicits._
@@ -138,6 +138,34 @@ class SinkSpec extends StreamSpec with DefaultTimeout with 
ScalaFutures {
       }
     }
 
+    "combine many sinks to one" in {
+      val source = Source(List(0, 1, 2, 3, 4, 5))
+      implicit val ex = org.apache.pekko.dispatch.ExecutionContexts.parasitic
+      val sink = Sink
+        .combine(
+          List(
+            Sink.reduce[Int]((a, b) => a + b),
+            Sink.reduce[Int]((a, b) => a + b),
+            Sink.reduce[Int]((a, b) => a + b)))(Broadcast[Int](_))
+        .mapMaterializedValue(Future.reduceLeft(_)(_ + _))
+      val result = source.runWith(sink)
+      result.futureValue should be(45)
+    }
+
+    "combine two sinks with combineMat" in {
+      implicit val ex = org.apache.pekko.dispatch.ExecutionContexts.parasitic
+      Source(List(0, 1, 2, 3, 4, 5))
+        .toMat(Sink.combineMat(Sink.reduce[Int]((a, b) => a + b), 
Sink.reduce[Int]((a, b) => a + b))(Broadcast[Int](_))(
+          (f1, f2) => {
+            for {
+              r1 <- f1
+              r2 <- f2
+            } yield r1 + r2
+          }))(Keep.right)
+        .run()
+        .futureValue should be(30)
+    }
+
     "combine to two sinks with simplified API" in {
       val probes = Seq.fill(2)(TestSubscriber.manualProbe[Int]())
       val sink = Sink.combine(Sink.fromSubscriber(probes(0)), 
Sink.fromSubscriber(probes(1)))(Broadcast[Int](_))
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index a24e492269..eee8ce96ee 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -150,6 +150,14 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
       out.expectComplete()
     }
 
+    "combine many sources into one" in {
+      val sources = Vector.tabulate(5)(_ => Source.maybe[Int])
+      val (promises, sub) = 
Source.combine(sources)(Concat(_)).toMat(TestSink.probe[Int])(Keep.both).run()
+      for ((promise, idx) <- promises.zipWithIndex)
+        promise.success(Some(idx))
+      sub.request(5).expectNextN(0 to 4).expectComplete()
+    }
+
     "combine from two inputs with simplified API" in {
       val probes = immutable.Seq.fill(2)(TestPublisher.manualProbe[Int]())
       val source = Source.fromPublisher(probes(0)) :: 
Source.fromPublisher(probes(1)) :: Nil
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index b5b63060c5..7bbbcd782f 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -19,21 +19,19 @@ import java.util.concurrent.CompletionStage
 import java.util.function.BiFunction
 import java.util.stream.Collector
 
+import scala.annotation.nowarn
 import scala.annotation.unchecked.uncheckedVariance
 import scala.collection.immutable
 import scala.concurrent.ExecutionContext
 import scala.util.Try
 
-import org.reactivestreams.Publisher
-import org.reactivestreams.Subscriber
-
 import org.apache.pekko
 import pekko._
 import pekko.actor.ActorRef
 import pekko.actor.ClassicActorSystemProvider
 import pekko.actor.Status
 import pekko.dispatch.ExecutionContexts
-import pekko.japi.function
+import pekko.japi.{ function, Util }
 import pekko.japi.function.Creator
 import pekko.stream._
 import pekko.stream.impl.LinearTraversalBuilder
@@ -43,6 +41,9 @@ import pekko.stream.scaladsl.SinkToCompletionStage
 import pekko.util.FutureConverters._
 import pekko.util.OptionConverters._
 
+import org.reactivestreams.Publisher
+import org.reactivestreams.Subscriber
+
 /** Java API */
 object Sink {
 
@@ -372,10 +373,44 @@ object Sink {
       output1: Sink[U, _],
       output2: Sink[U, _],
       rest: java.util.List[Sink[U, _]],
-      strategy: function.Function[java.lang.Integer, 
Graph[UniformFanOutShape[T, U], NotUsed]]): Sink[T, NotUsed] = {
+      @nowarn
+      @deprecatedName(Symbol("strategy"))
+      fanOutStrategy: function.Function[java.lang.Integer, 
Graph[UniformFanOutShape[T, U], NotUsed]])
+      : Sink[T, NotUsed] = {
     import pekko.util.ccompat.JavaConverters._
     val seq = if (rest != null) rest.asScala.map(_.asScala).toSeq else 
immutable.Seq()
-    new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq: 
_*)(num => strategy.apply(num)))
+    new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq: 
_*)(num => fanOutStrategy.apply(num)))
+  }
+
+  /**
+   * Combine two sinks with fan-out strategy like `Broadcast` or `Balance` and 
returns `Sink` with 2 outlets.
+   * @since 1.1.0
+   */
+  def combineMat[T, U, M1, M2, M](
+      first: Sink[U, M1],
+      second: Sink[U, M2],
+      fanOutStrategy: function.Function[java.lang.Integer, 
Graph[UniformFanOutShape[T, U], NotUsed]],
+      matF: function.Function2[M1, M2, M]): Sink[T, M] = {
+    new Sink(
+      scaladsl.Sink.combineMat(first.asScala, second.asScala)(size => 
fanOutStrategy(size))(combinerToScala(matF)))
+  }
+
+  /**
+   * Combine several sinks with fan-out strategy like `Broadcast` or `Balance` 
and returns `Sink`.
+   * The fanoutGraph's outlets size must match the provides sinks'.
+   * @since 1.1.0
+   */
+  def combine[T, U, M](
+      sinks: java.util.List[_ <: Graph[SinkShape[U], M]],
+      fanOutStrategy: function.Function[java.lang.Integer, 
Graph[UniformFanOutShape[T, U], NotUsed]])
+      : Sink[T, java.util.List[M]] = {
+    val seq = if (sinks != null) Util.immutableSeq(sinks).collect {
+      case sink: Sink[U @unchecked, M @unchecked] => sink.asScala
+      case other                                  => other
+    }
+    else immutable.Seq()
+    import org.apache.pekko.util.ccompat.JavaConverters._
+    new Sink(scaladsl.Sink.combine(seq)(size => 
fanOutStrategy(size)).mapMaterializedValue(_.asJava))
   }
 
   /**
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index ae01282622..1f6e80d60d 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -25,8 +25,6 @@ import scala.concurrent.{ Future, Promise }
 import scala.concurrent.duration.FiniteDuration
 import scala.reflect.ClassTag
 
-import org.reactivestreams.{ Publisher, Subscriber }
-
 import org.apache.pekko
 import pekko.{ Done, NotUsed }
 import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider }
@@ -43,6 +41,8 @@ import pekko.util.JavaDurationConverters._
 import pekko.util.OptionConverters._
 import pekko.util.ccompat.JavaConverters._
 
+import org.reactivestreams.{ Publisher, Subscriber }
+
 /** Java API */
 object Source {
   private[this] val _empty = new Source[Any, NotUsed](scaladsl.Source.empty)
@@ -656,10 +656,12 @@ object Source {
       first: Source[T, _ <: Any],
       second: Source[T, _ <: Any],
       rest: java.util.List[Source[T, _ <: Any]],
-      strategy: function.Function[java.lang.Integer, _ <: 
Graph[UniformFanInShape[T, U], NotUsed]])
+      @nowarn
+      @deprecatedName(Symbol("strategy"))
+      fanInStrategy: function.Function[java.lang.Integer, _ <: 
Graph[UniformFanInShape[T, U], NotUsed]])
       : Source[U, NotUsed] = {
     val seq = if (rest != null) Util.immutableSeq(rest).map(_.asScala) else 
immutable.Seq()
-    new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: 
_*)(num => strategy.apply(num)))
+    new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: 
_*)(num => fanInStrategy.apply(num)))
   }
 
   /**
@@ -668,10 +670,30 @@ object Source {
   def combineMat[T, U, M1, M2, M](
       first: Source[T, M1],
       second: Source[T, M2],
-      strategy: function.Function[java.lang.Integer, _ <: 
Graph[UniformFanInShape[T, U], NotUsed]],
+      @nowarn
+      @deprecatedName(Symbol("strategy"))
+      fanInStrategy: function.Function[java.lang.Integer, _ <: 
Graph[UniformFanInShape[T, U], NotUsed]],
       combine: function.Function2[M1, M2, M]): Source[U, M] = {
     new Source(
-      scaladsl.Source.combineMat(first.asScala, second.asScala)(num => 
strategy.apply(num))(combinerToScala(combine)))
+      scaladsl.Source.combineMat(first.asScala, second.asScala)(num => 
fanInStrategy.apply(num))(
+        combinerToScala(combine)))
+  }
+
+  /**
+   * Combines several sources with fan-in strategy like [[Merge]] or 
[[Concat]] into a single [[Source]].
+   * @since 1.1.0
+   */
+  def combine[T, U, M](
+      sources: java.util.List[_ <: Graph[SourceShape[T], M]],
+      fanInStrategy: function.Function[java.lang.Integer, 
Graph[UniformFanInShape[T, U], NotUsed]])
+      : Source[U, java.util.List[M]] = {
+    val seq = if (sources != null) Util.immutableSeq(sources).collect {
+      case source: Source[T @unchecked, M @unchecked] => source.asScala
+      case other                                      => other
+    }
+    else immutable.Seq()
+    import org.apache.pekko.util.ccompat.JavaConverters._
+    new Source(scaladsl.Source.combine(seq)(size => 
fanInStrategy(size)).mapMaterializedValue(_.asJava))
   }
 
   /**
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
index 7e379a350b..099c56425f 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
@@ -13,7 +13,7 @@
 
 package org.apache.pekko.stream.scaladsl
 
-import scala.annotation.tailrec
+import scala.annotation.{ nowarn, tailrec }
 import scala.annotation.unchecked.uncheckedVariance
 import scala.collection.immutable
 import scala.concurrent.ExecutionContext
@@ -22,9 +22,6 @@ import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
 
-import org.reactivestreams.Publisher
-import org.reactivestreams.Subscriber
-
 import org.apache.pekko
 import pekko.Done
 import pekko.NotUsed
@@ -40,6 +37,9 @@ import pekko.stream.javadsl
 import pekko.stream.stage._
 import pekko.util.ccompat._
 
+import org.reactivestreams.Publisher
+import org.reactivestreams.Subscriber
+
 /**
  * A `Sink` is a set of stream processing steps that has one open input.
  * Can be used as a `Subscriber`
@@ -339,10 +339,12 @@ object Sink {
    * Combine several sinks with fan-out strategy like `Broadcast` or `Balance` 
and returns `Sink`.
    */
   def combine[T, U](first: Sink[U, _], second: Sink[U, _], rest: Sink[U, _]*)(
-      strategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, 
NotUsed] =
+      @nowarn
+      @deprecatedName(Symbol("strategy"))
+      fanOutStrategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): 
Sink[T, NotUsed] =
     Sink.fromGraph(GraphDSL.create() { implicit b =>
       import GraphDSL.Implicits._
-      val d = b.add(strategy(rest.size + 2))
+      val d = b.add(fanOutStrategy(rest.size + 2))
       d.out(0) ~> first
       d.out(1) ~> second
 
@@ -355,6 +357,41 @@ object Sink {
       combineRest(2, rest.iterator)
     })
 
+  /**
+   * Combine two sinks with fan-out strategy like `Broadcast` or `Balance` and 
returns `Sink` with 2 outlets.
+   * @since 1.1.0
+   */
+  def combineMat[T, U, M1, M2, M](first: Sink[U, M1], second: Sink[U, M2])(
+      fanOutStrategy: Int => Graph[UniformFanOutShape[T, U], NotUsed])(matF: 
(M1, M2) => M): Sink[T, M] = {
+    Sink.fromGraph(GraphDSL.createGraph(first, second)(matF) { implicit b => 
(shape1, shape2) =>
+      import GraphDSL.Implicits._
+      val d = b.add(fanOutStrategy(2))
+      d.out(0) ~> shape1
+      d.out(1) ~> shape2
+      new SinkShape[T](d.in)
+    })
+  }
+
+  /**
+   * Combine several sinks with fan-out strategy like `Broadcast` or `Balance` 
and returns `Sink`.
+   * The fanoutGraph's outlets size must match the provides sinks'.
+   * @since 1.1.0
+   */
+  def combine[T, U, M](sinks: immutable.Seq[Graph[SinkShape[U], M]])(
+      fanOutStrategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): 
Sink[T, immutable.Seq[M]] =
+    sinks match {
+      case immutable.Seq()     => Sink.cancelled.mapMaterializedValue(_ => Nil)
+      case immutable.Seq(sink) => sink.asInstanceOf[Sink[T, 
M]].mapMaterializedValue(_ :: Nil)
+      case _ =>
+        Sink.fromGraph(GraphDSL.create(sinks) { implicit b => shapes =>
+          import GraphDSL.Implicits._
+          val c = b.add(fanOutStrategy(sinks.size))
+          for ((shape, idx) <- shapes.zipWithIndex)
+            c.out(idx) ~> shape
+          SinkShape(c.in)
+        })
+    }
+
   /**
    * A `Sink` that will invoke the given function to each of the elements
    * as they pass in. The sink is materialized into a 
[[scala.concurrent.Future]]
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 429276f36d..4fa134838e 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl
 
 import java.util.concurrent.CompletionStage
 
-import scala.annotation.tailrec
+import scala.annotation.{ nowarn, tailrec }
 import scala.annotation.unchecked.uncheckedVariance
 import scala.collection.immutable
 import scala.concurrent.{ Future, Promise }
@@ -756,10 +756,12 @@ object Source {
    * Combines several sources with fan-in strategy like [[Merge]] or 
[[Concat]] into a single [[Source]].
    */
   def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, 
_]*)(
-      strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, 
NotUsed] =
+      @nowarn
+      @deprecatedName(Symbol("strategy"))
+      fanInStrategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): 
Source[U, NotUsed] =
     Source.fromGraph(GraphDSL.create() { implicit b =>
       import GraphDSL.Implicits._
-      val c = b.add(strategy(rest.size + 2))
+      val c = b.add(fanInStrategy(rest.size + 2))
       first  ~> c.in(0)
       second ~> c.in(1)
 
@@ -772,19 +774,40 @@ object Source {
       combineRest(2, rest.iterator)
     })
 
+  /**
+   * Combines several sources with fan-in strategy like [[Merge]] or 
[[Concat]] into a single [[Source]].
+   * @since 1.1.0
+   */
+  def combine[T, U, M](sources: immutable.Seq[Graph[SourceShape[T], M]])(
+      fanInStrategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): 
Source[U, immutable.Seq[M]] =
+    sources match {
+      case immutable.Seq()       => Source.empty.mapMaterializedValue(_ => Nil)
+      case immutable.Seq(source) => source.asInstanceOf[Source[U, 
M]].mapMaterializedValue(_ :: Nil)
+      case _ =>
+        Source.fromGraph(GraphDSL.create(sources) { implicit b => shapes =>
+          import GraphDSL.Implicits._
+          val c = b.add(fanInStrategy(sources.size))
+          for ((shape, i) <- shapes.zipWithIndex) {
+            shape ~> c.in(i)
+          }
+          SourceShape(c.out)
+        })
+    }
+
   /**
    * Combines several sources with fan-in strategy like [[Merge]] or 
[[Concat]] into a single [[Source]] with a materialized value.
    */
   def combineMat[T, U, M1, M2, M](first: Source[T, M1], second: Source[T, M2])(
-      strategy: Int => Graph[UniformFanInShape[T, U], NotUsed])(matF: (M1, M2) 
=> M): Source[U, M] = {
-    val secondPartiallyCombined = GraphDSL.createGraph(second) { implicit b => 
secondShape =>
+      @nowarn
+      @deprecatedName(Symbol("strategy"))
+      fanInStrategy: Int => Graph[UniformFanInShape[T, U], NotUsed])(matF: 
(M1, M2) => M): Source[U, M] =
+    Source.fromGraph(GraphDSL.createGraph(first, second)(matF) { implicit b => 
(shape1, shape2) =>
       import GraphDSL.Implicits._
-      val c = b.add(strategy(2))
-      secondShape ~> c.in(1)
-      FlowShape(c.in(0), c.out)
-    }
-    first.viaMat(secondPartiallyCombined)(matF)
-  }
+      val c = b.add(fanInStrategy(2))
+      shape1 ~> c.in(0)
+      shape2 ~> c.in(1)
+      SourceShape(c.out)
+    })
 
   /**
    * Combine the elements of multiple streams into a stream of sequences.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to