This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 566a3470f7 fix(stream): Source.combine single source with
type-transforming fan-in strategies (#2723) (#2726)
566a3470f7 is described below
commit 566a3470f7942942c33479ca8e09409d8aaba370
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Mar 16 11:21:02 2026 +0800
fix(stream): Source.combine single source with type-transforming fan-in
strategies (#2723) (#2726)
Motivation:
Source.combine with a single source bypassed the fan-in strategy using an
unsafe
asInstanceOf cast. This worked for type-preserving strategies like Merge (T
→ T),
but silently produced incorrect results for type-transforming strategies
like
MergeLatest (T → List[T]). For example:
Source.combine(Seq(Source.single(1)))(MergeLatest(_)) emitted 1 instead
of List(1)
Modification:
- Introduce TypePreservingFanIn marker trait for fan-in stages where T == U
AND
single-input behavior is a no-op pass-through (Merge, Concat, Interleave,
MergePrioritized, OrElse)
- MergeSequence intentionally NOT marked: despite being T → T, it validates
sequence ordering (not a pure pass-through)
- Source.combine single-source case: check TypePreservingFanIn trait before
bypassing. Strategies without the trait are routed through the fan-in
graph.
- Relax Concat, Interleave, MergeSequence to accept inputPorts >= 1 (was >
1).
This eliminates the need for a try-catch fallback in Source.combine and
allows
these stages to be used directly with a single input.
- Use Source.fromGraph for non-Source Graph inputs safety
- Add 14 regression tests (12 Scala + 1 Java + MergeSequence validation)
Result:
- MergeLatest/ZipWithN correctly apply their transformation even for single
source
- Merge/Concat/Interleave correctly bypass (type-preserving optimization)
- MergeSequence correctly validates sequences even for single source
- Unknown/third-party strategies default to routing through the fan-in graph
(safe default for strategies that may transform types)
- Binary compatibility maintained (verified via MiMa)
References:
- https://github.com/apache/pekko/issues/2723
- https://github.com/apache/pekko/pull/2726
---
.../apache/pekko/stream/javadsl/SourceTest.java | 15 +++
.../apache/pekko/stream/scaladsl/SinkSpec.scala | 40 ++++++++
.../apache/pekko/stream/scaladsl/SourceSpec.scala | 101 +++++++++++++++++++++
.../apache/pekko/stream/TypePreservingFanIn.scala | 48 ++++++++++
.../apache/pekko/stream/TypePreservingFanOut.scala | 44 +++++++++
.../org/apache/pekko/stream/scaladsl/Graph.scala | 43 ++++++---
.../org/apache/pekko/stream/scaladsl/Sink.scala | 22 ++++-
.../org/apache/pekko/stream/scaladsl/Source.scala | 41 ++++++++-
8 files changed, 339 insertions(+), 15 deletions(-)
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index dda5bb4bb7..58eb1147e7 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -1156,6 +1156,21 @@ public class SourceTest extends StreamTest {
assertEquals(6, result.toCompletableFuture().get(3,
TimeUnit.SECONDS).intValue());
}
+ // Regression test for https://github.com/apache/pekko/issues/2723
+ // Verifies that Source.combine with a single source correctly applies
+ // type-transforming strategies (like MergeLatest), rather than bypassing
+ // them with an unsafe asInstanceOf cast.
+ @Test
+ public void mustBeAbleToCombineSingleSourceWithMergeLatest() throws
Exception {
+ final List<Source<Integer, NotUsed>> sources =
Collections.singletonList(Source.single(1));
+ final List<List<Integer>> result =
+ Source.<Integer, List<Integer>, NotUsed>combine(sources,
MergeLatest::create)
+ .runWith(Sink.collect(Collectors.toList()), system)
+ .toCompletableFuture()
+ .get(3, TimeUnit.SECONDS);
+ assertEquals(Collections.singletonList(Collections.singletonList(1)),
result);
+ }
+
@SuppressWarnings("unchecked")
@Test
public void mustBeAbleToZipN() throws Exception {
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
index a39ef09123..87ea68ce86 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
@@ -25,6 +25,8 @@ import pekko.stream.testkit._
import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
import pekko.testkit.DefaultTimeout
+import scala.collection.immutable
+
import org.reactivestreams.Publisher
import org.scalatest.concurrent.ScalaFutures
@@ -192,6 +194,44 @@ class SinkSpec extends StreamSpec with DefaultTimeout with
ScalaFutures {
}
}
+ // Regression tests for Sink.combine single-sink case — mirrors
Source.combine fix (#2723).
+ // The single-sink case previously used an unsafe asInstanceOf cast.
+
+ "combine single sink with Broadcast should work (type-preserving bypass)"
in {
+ // Broadcast has TypePreservingFanOut, so the single-sink case is safely
bypassed.
+ implicit val ex: scala.concurrent.ExecutionContext =
scala.concurrent.ExecutionContext.parasitic
+ val result = Source(List(1, 2, 3))
+ .runWith(Sink.combine(immutable.Seq(Sink.seq[Int]))(Broadcast[Int](_)))
+ Future.sequence(result).futureValue should ===(List(immutable.Seq(1, 2,
3)))
+ }
+
+ "combine single sink with Balance should work (type-preserving bypass)" in
{
+ // Balance has TypePreservingFanOut, so the single-sink case is safely
bypassed.
+ implicit val ex: scala.concurrent.ExecutionContext =
scala.concurrent.ExecutionContext.parasitic
+ val result = Source(List(1, 2, 3))
+ .runWith(Sink.combine(immutable.Seq(Sink.seq[Int]))(Balance[Int](_)))
+ Future.sequence(result).futureValue should ===(List(immutable.Seq(1, 2,
3)))
+ }
+
+ "combine single sink with Partition should route through strategy (not
type-preserving)" in {
+ // Partition intentionally does NOT have TypePreservingFanOut — its
partitioner function
+ // provides user-specified routing semantics that would be lost if
bypassed.
+ // Single-sink Partition goes through the fan-out graph, honoring
partitioner semantics.
+ implicit val ex: scala.concurrent.ExecutionContext =
scala.concurrent.ExecutionContext.parasitic
+ val result = Source(List(1, 2, 3))
+ .runWith(Sink.combine(immutable.Seq(Sink.seq[Int]))(Partition[Int](_,
_ => 0)))
+ Future.sequence(result).futureValue should ===(List(immutable.Seq(1, 2,
3)))
+ }
+
+ "combine single sink with wrapped Broadcast (.named) should still work" in
{
+ // Even if the fan-out strategy loses the TypePreservingFanOut trait via
wrapping
+ // (e.g., .named()), routing through the strategy is still correct.
+ implicit val ex: scala.concurrent.ExecutionContext =
scala.concurrent.ExecutionContext.parasitic
+ val result = Source(List(1, 2, 3))
+ .runWith(Sink.combine(immutable.Seq(Sink.seq[Int]))(n =>
Broadcast[Int](n).named("myBroadcast")))
+ Future.sequence(result).futureValue should ===(List(immutable.Seq(1, 2,
3)))
+ }
+
"suitably override attribute handling methods" in {
import Attributes._
val s: Sink[Int, Future[Int]] =
Sink.head[Int].async.addAttributes(none).named("name")
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index f9059aae07..22d3b55077 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -171,6 +171,107 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
sub.request(5).expectNextN(0 to 4).expectComplete()
}
+ // Regression tests for https://github.com/apache/pekko/issues/2723
+ // Source.combine with a single source must apply type-transforming fan-in
strategies
+ // (like MergeLatest) correctly, rather than bypassing them with an unsafe
cast.
+ // The TypePreservingFanIn trait marks strategies where T == U, enabling
safe bypass.
+ // Strategies without this trait (MergeLatest, ZipWithN) are always routed
through
+ // the fan-in graph even for a single source.
+
+ "combine single source with MergeLatest should emit wrapped elements" in {
+ Source
+ .combine(immutable.Seq(Source.single(1)))(MergeLatest(_))
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq(List(1)))
+ }
+
+ "combine single source with MergeLatest should emit all wrapped elements"
in {
+ Source
+ .combine(immutable.Seq(Source(List(1, 2, 3))))(MergeLatest(_))
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq(List(1), List(2), List(3)))
+ }
+
+ "combine single source with ZipWithN should apply zipper function" in {
+ Source
+ .combine(immutable.Seq(Source(List(1, 2, 3))))(n => ZipWithN[Int,
Int](_.sum)(n))
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq(1, 2, 3))
+ }
+
+ "combine single source with Merge should still work (type-preserving)" in {
+ Source
+ .combine(immutable.Seq(Source.single(1)))(Merge(_))
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq(1))
+ }
+
+ "combine single source with Concat should still work (type-preserving)" in
{
+ Source
+ .combine(immutable.Seq(Source(List(1, 2, 3))))(Concat(_))
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq(1, 2, 3))
+ }
+
+ "combine single source with Interleave should still work
(type-preserving)" in {
+ Source
+ .combine(immutable.Seq(Source(List(1, 2, 3))))(Interleave(_, 1))
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq(1, 2, 3))
+ }
+
+ "combine single source with wrapped Merge (.named) should still work" in {
+ // When Merge is wrapped via .named(), the TypePreservingFanIn trait is
lost
+ // (GenericGraphWithChangedAttributes does not extend it). The code
correctly
+ // routes through the fan-in graph instead of bypassing — functionally
correct,
+ // just slightly less optimal.
+ Source
+ .combine(immutable.Seq(Source(List(1, 2, 3))))(n =>
Merge[Int](n).named("my-merge"))
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq(1, 2, 3))
+ }
+
+ "combine single source with wrapped MergeLatest (.named) should emit
wrapped elements" in {
+ Source
+ .combine(immutable.Seq(Source(List(1, 2, 3))))(n =>
MergeLatest[Int](n).named("my-merge-latest"))
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq(List(1), List(2), List(3)))
+ }
+
+ "combine single source with MergeSequence should still work
(type-preserving)" in {
+ // MergeSequence.apply wraps via withDetachedInputs, which loses the
TypePreservingFanIn
+ // trait. This means single-source MergeSequence goes through the fan-in
strategy (safe
+ // default). The test uses 0-based sequences to satisfy MergeSequence's
ordering validation.
+ Source
+ .combine(immutable.Seq(Source(List(0L, 1L, 2L))))(n =>
MergeSequence[Long](n)(identity))
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq(0L, 1L, 2L))
+ }
+
+ "combine single source with MergePrioritized should still work
(type-preserving)" in {
+ Source
+ .combine(immutable.Seq(Source(List(1, 2, 3))))(n =>
MergePrioritized(Seq.fill(n)(1)))
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq(1, 2, 3))
+ }
+
+ "combine single source materialized value should be a singleton list" in {
+ val (mat, result) = Source
+ .combine(immutable.Seq(Source.single(1).mapMaterializedValue(_ =>
"mat-value")))(MergeLatest(_))
+ .toMat(Sink.seq)(Keep.both)
+ .run()
+ mat should ===(immutable.Seq("mat-value"))
+ result.futureValue should ===(immutable.Seq(List(1)))
+ }
+
+ "combine empty sources list should produce empty source" in {
+ val result = Source
+ .combine(immutable.Seq.empty[Source[Int, NotUsed]])(MergeLatest(_))
+ .runWith(Sink.seq)
+ .futureValue
+ result should ===(immutable.Seq.empty)
+ }
+
"combine from two inputs with simplified API" in {
val probes = immutable.Seq.fill(2)(TestPublisher.manualProbe[Int]())
val source = Source.fromPublisher(probes(0)) ::
Source.fromPublisher(probes(1)) :: Nil
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanIn.scala
b/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanIn.scala
new file mode 100644
index 0000000000..a015f4d0e5
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanIn.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+/**
+ * Marker trait for fan-in graph stages whose output element type is the same
as
+ * their input element type (i.e., `T => T`).
+ *
+ * Built-in stages with this trait: [[scaladsl.Merge]], [[scaladsl.Concat]],
+ * [[scaladsl.Interleave]], [[scaladsl.MergePrioritized]], [[scaladsl.OrElse]],
+ * and [[scaladsl.MergeSequence]].
+ *
+ * Note: some of these stages (Concat, Interleave, MergeSequence) have factory
methods
+ * that wrap the stage via `withDetachedInputs`, which loses this trait. In
those cases,
+ * `Source.combine` routes through the fan-in graph instead of
bypassing—functionally
+ * correct, just slightly less optimal. The bypass optimization fires for
stages whose
+ * factory methods return the raw class (e.g., `Merge`, `MergePrioritized`).
+ *
+ * This trait is used by [[scaladsl.Source.combine]] (and its Java API
counterpart)
+ * to safely optimize the single-source case. When only one source is provided,
+ * the fan-in strategy can be bypassed with a direct pass-through if and only
if the
+ * strategy is type-preserving (output type equals input type). Without this
marker,
+ * a bypass via `asInstanceOf` would be unsafe for type-transforming strategies
+ * like `MergeLatest` (where `T => List[T]`) or `ZipWithN` (where `A => O`).
+ *
+ * This design uses a "safe default": strategies '''without''' this trait will
always
+ * be routed through the fan-in graph, even for a single source. This ensures
+ * correct behavior for unknown or third-party fan-in strategies that may
transform
+ * the element type.
+ *
+ * @since 1.2.0
+ */
+trait TypePreservingFanIn
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanOut.scala
b/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanOut.scala
new file mode 100644
index 0000000000..f66f15be41
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanOut.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+/**
+ * Marker trait for fan-out graph stages whose output element type is the same
as
+ * their input element type (i.e., `T => T`).
+ *
+ * Examples include [[scaladsl.Broadcast]] and [[scaladsl.Balance]].
+ *
+ * Note: [[scaladsl.Partition]] is intentionally NOT marked with this trait
despite having
+ * `T => T` types, because its `partitioner` function provides user-specified
routing
+ * semantics that would be lost if the stage were bypassed.
+ *
+ * This trait is used by [[scaladsl.Sink.combine]] (and its Java API
counterpart)
+ * to safely optimize the single-sink case. When only one sink is provided,
+ * the fan-out strategy can be bypassed with a direct pass-through if and only
if the
+ * strategy is type-preserving (output type equals input type). Without this
marker,
+ * a bypass via `asInstanceOf` would be unsafe for type-transforming strategies
+ * where `T` differs from `U`.
+ *
+ * This design uses a "safe default": strategies '''without''' this trait will
always
+ * be routed through the fan-out graph, even for a single sink. This ensures
+ * correct behavior for unknown or third-party fan-out strategies that may
transform
+ * the element type.
+ *
+ * @since 1.2.0
+ */
+trait TypePreservingFanOut
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
index 9ab3e76677..a8c4a2ad24 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
@@ -96,7 +96,9 @@ object Merge {
*
* '''Cancels when''' downstream cancels
*/
-final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean) extends
GraphStage[UniformFanInShape[T, T]] {
+final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean)
+ extends GraphStage[UniformFanInShape[T, T]]
+ with pekko.stream.TypePreservingFanIn {
// one input might seem counter intuitive but saves us from special handling
in other places
require(inputPorts >= 1, "A Merge must have one or more input ports")
@@ -338,7 +340,8 @@ object MergePrioritized {
* '''Cancels when''' downstream cancels
*/
final class MergePrioritized[T] private (val priorities: Seq[Int], val
eagerComplete: Boolean)
- extends GraphStage[UniformFanInShape[T, T]] {
+ extends GraphStage[UniformFanInShape[T, T]]
+ with pekko.stream.TypePreservingFanIn {
require(priorities.nonEmpty, "A Merge must have one or more input ports")
require(priorities.forall(_ > 0), "Priorities should be positive integers")
@@ -463,8 +466,12 @@ object Interleave {
* '''Cancels when''' downstream cancels
*/
final class Interleave[T](val inputPorts: Int, val segmentSize: Int, val
eagerClose: Boolean)
- extends GraphStage[UniformFanInShape[T, T]] {
- require(inputPorts > 1, "input ports must be > 1")
+ extends GraphStage[UniformFanInShape[T, T]]
+ with pekko.stream.TypePreservingFanIn {
+ // Relaxed from > 1 to >= 1: single-input Interleave is semantically valid
(pass-through).
+ // This enables Source.combine to route single-source cases through the
stage without
+ // needing a try-catch fallback. See #2723.
+ require(inputPorts >= 1, "input ports must be >= 1")
require(segmentSize > 0, "segmentSize must be > 0")
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i =>
Inlet[T]("Interleave.in" + i))
@@ -617,7 +624,9 @@ object Broadcast {
* '''Cancels when'''
* If eagerCancel is enabled: when any downstream cancels; otherwise: when
all downstreams cancel
*/
-final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean)
extends GraphStage[UniformFanOutShape[T, T]] {
+final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean)
+ extends GraphStage[UniformFanOutShape[T, T]]
+ with pekko.stream.TypePreservingFanOut {
// one output might seem counter intuitive but saves us from special
handling in other places
require(outputPorts >= 1, "A Broadcast must have one or more output ports")
val in: Inlet[T] = Inlet[T]("Broadcast.in")
@@ -937,7 +946,8 @@ object Balance {
* '''Cancels when''' If eagerCancel is enabled: when any downstream cancels;
otherwise: when all downstreams cancel
*/
final class Balance[T](val outputPorts: Int, val waitForAllDownstreams:
Boolean, val eagerCancel: Boolean)
- extends GraphStage[UniformFanOutShape[T, T]] {
+ extends GraphStage[UniformFanOutShape[T, T]]
+ with pekko.stream.TypePreservingFanOut {
// one output might seem counter intuitive but saves us from special
handling in other places
require(outputPorts >= 1, "A Balance must have one or more output ports")
@@ -1313,8 +1323,13 @@ object Concat {
*
* '''Cancels when''' downstream cancels
*/
-final class Concat[T](val inputPorts: Int) extends
GraphStage[UniformFanInShape[T, T]] {
- require(inputPorts > 1, "A Concat must have more than 1 input ports")
+final class Concat[T](val inputPorts: Int)
+ extends GraphStage[UniformFanInShape[T, T]]
+ with pekko.stream.TypePreservingFanIn {
+ // Relaxed from > 1 to >= 1: single-input Concat is semantically valid
(pass-through).
+ // This enables Source.combine to route single-source cases through the
stage without
+ // needing a try-catch fallback. See #2723.
+ require(inputPorts >= 1, "A Concat must have at least 1 input port")
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i =>
Inlet[T]("Concat.in" + i))
val out: Outlet[T] = Outlet[T]("Concat.out")
override def initialAttributes = DefaultAttributes.concat
@@ -1386,7 +1401,9 @@ object OrElse {
* '''Cancels when''' downstream cancels
*/
@InternalApi
-private[stream] final class OrElse[T] extends GraphStage[UniformFanInShape[T,
T]] {
+private[stream] final class OrElse[T]
+ extends GraphStage[UniformFanInShape[T, T]]
+ with pekko.stream.TypePreservingFanIn {
val primary = Inlet[T]("OrElse.primary")
val secondary = Inlet[T]("OrElse.secondary")
val out = Outlet[T]("OrElse.out")
@@ -1485,8 +1502,12 @@ object MergeSequence {
* '''Cancels when''' downstream cancels
*/
final class MergeSequence[T](val inputPorts: Int)(extractSequence: T => Long)
- extends GraphStage[UniformFanInShape[T, T]] {
- require(inputPorts > 1, "A MergeSequence must have more than 1 input ports")
+ extends GraphStage[UniformFanInShape[T, T]]
+ with pekko.stream.TypePreservingFanIn {
+ // Relaxed from > 1 to >= 1: single-input MergeSequence is semantically
valid.
+ // This enables Source.combine to route single-source cases through the
stage without
+ // needing a try-catch fallback. See #2723.
+ require(inputPorts >= 1, "A MergeSequence must have at least 1 input port")
private val in: IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i =>
Inlet[T]("MergeSequence.in" + i))
private val out: Outlet[T] = Outlet("MergeSequence.out")
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
index a81c3ba97b..1a49530dfe 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
@@ -400,8 +400,26 @@ object Sink {
fanOutStrategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]):
Sink[T, immutable.Seq[M]] =
sinks match {
case immutable.Seq() => Sink.cancelled.mapMaterializedValue(_ => Nil)
- case immutable.Seq(sink) => sink.asInstanceOf[Sink[T,
M]].mapMaterializedValue(_ :: Nil)
- case _ =>
+ case immutable.Seq(sink) =>
+ // Single-sink optimization: bypass the fan-out strategy if and only
if the strategy
+ // is type-preserving (T == U), marked by the TypePreservingFanOut
trait.
+ // For type-transforming strategies, we MUST route through the
strategy even for a
+ // single sink. Same design as Source.combine — see #2723.
+ val strategyGraph = fanOutStrategy(1)
+ strategyGraph match {
+ case _: pekko.stream.TypePreservingFanOut =>
+ // Type-preserving (T == U): safe to bypass the strategy with a
direct pass-through.
+ Sink.fromGraph(sink).asInstanceOf[Sink[T,
M]].mapMaterializedValue(_ :: Nil)
+ case _ =>
+ // Not type-preserving or unknown: route through the fan-out
strategy.
+ Sink.fromGraph(GraphDSL.create(sinks) { implicit b => shapes =>
+ import GraphDSL.Implicits._
+ val c = b.add(strategyGraph)
+ c.out(0) ~> shapes.head
+ SinkShape(c.in)
+ })
+ }
+ case _ =>
Sink.fromGraph(GraphDSL.create(sinks) { implicit b => shapes =>
import GraphDSL.Implicits._
val c = b.add(fanOutStrategy(sinks.size))
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index e6c867ebc9..739382bf7e 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -803,8 +803,45 @@ object Source {
fanInStrategy: Int => Graph[UniformFanInShape[T, U], NotUsed]):
Source[U, immutable.Seq[M]] =
sources match {
case immutable.Seq() => Source.empty.mapMaterializedValue(_ => Nil)
- case immutable.Seq(source) => source.asInstanceOf[Source[U,
M]].mapMaterializedValue(_ :: Nil)
- case _ =>
+ case immutable.Seq(source) =>
+ // Single-source optimization: bypass the fan-in strategy if and only
if the strategy
+ // is type-preserving (T == U), marked by the TypePreservingFanIn
trait.
+ //
+ // For type-transforming strategies (e.g., MergeLatest: T => List[T],
ZipWithN: A => O),
+ // we MUST route through the strategy even for a single source to
ensure correct output
+ // types. Without this check, the asInstanceOf cast would silently
produce incorrect
+ // results at runtime (see #2723).
+ //
+ // Design: "safe default" — strategies WITHOUT TypePreservingFanIn
always go through
+ // the full fan-in graph. This correctly handles unknown or
third-party strategies.
+ // Built-in type-preserving fan-in stages (Merge, MergePrioritized)
directly return
+ // the stage class from their factory methods, so the trait check
fires and enables
+ // the bypass. Other stages (Concat, Interleave, MergeSequence) are
wrapped by
+ // withDetachedInputs in their factory methods, which loses the trait
— they always
+ // route through the graph, which is correct because their require
constraints have
+ // been relaxed to accept inputPorts >= 1.
+ //
+ // Note: fanInStrategy(1) is always invoked here to determine the
strategy's trait.
+ // Third-party strategies that reject n=1 will surface their exception
immediately,
+ // which is preferable to silently returning an incorrectly-typed
stream.
+ val strategyGraph = fanInStrategy(1)
+ strategyGraph match {
+ case _: pekko.stream.TypePreservingFanIn =>
+ // Type-preserving (T == U): safe to bypass the strategy with a
direct pass-through.
+ // Use Source.fromGraph to handle non-Source Graph inputs safely
(the sources parameter
+ // accepts Graph[SourceShape[T], M], not just Source[T, M]).
+ Source.fromGraph(source).asInstanceOf[Source[U,
M]].mapMaterializedValue(_ :: Nil)
+ case _ =>
+ // Not type-preserving or unknown: route through the fan-in
strategy.
+ // This ensures type-transforming strategies correctly transform
the output.
+ Source.fromGraph(GraphDSL.create(sources) { implicit b => shapes =>
+ import GraphDSL.Implicits._
+ val c = b.add(strategyGraph)
+ shapes.head ~> c.in(0)
+ SourceShape(c.out)
+ })
+ }
+ case _ =>
Source.fromGraph(GraphDSL.create(sources) { implicit b => shapes =>
import GraphDSL.Implicits._
val c = b.add(fanInStrategy(sources.size))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]