This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch fix-source-combine-single
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 45f7cbda003529623e9236a656c116f5e935d430
Author: He-Pin <[email protected]>
AuthorDate: Mon Mar 16 00:53:11 2026 +0800

    fix(stream): Source.combine single source with type-transforming fan-in 
strategies (#2723)
    
    Motivation:
    Source.combine with a single source bypassed the fan-in strategy using an 
unsafe
    asInstanceOf cast. This worked for type-preserving strategies like Merge (T 
→ T),
    but silently produced incorrect results for type-transforming strategies 
like
    MergeLatest (T → List[T]). For example:
      Source.combine(Seq(Source.single(1)))(MergeLatest(_)) emitted 1 instead 
of List(1)
    
    Modification:
    - Introduce TypePreservingFanIn marker trait for fan-in stages where T == U 
AND
      single-input behavior is a no-op pass-through (Merge, Concat, Interleave,
      MergePrioritized, OrElse)
    - MergeSequence intentionally NOT marked: despite being T → T, it validates
      sequence ordering (not a pure pass-through)
    - Source.combine single-source case: check TypePreservingFanIn trait before
      bypassing. Strategies without the trait are routed through the fan-in 
graph.
    - Relax Concat, Interleave, MergeSequence to accept inputPorts >= 1 (was > 
1).
      This eliminates the need for a try-catch fallback in Source.combine and 
allows
      these stages to be used directly with a single input.
    - Use Source.fromGraph for non-Source Graph inputs safety
    - Add 14 regression tests (12 Scala + 1 Java + MergeSequence validation)
    
    Result:
    - MergeLatest/ZipWithN correctly apply their transformation even for single 
source
    - Merge/Concat/Interleave correctly bypass (type-preserving optimization)
    - MergeSequence correctly validates sequences even for single source
    - Unknown/third-party strategies default to routing through the fan-in graph
      (safe default for strategies that may transform types)
    - Binary compatibility maintained (verified via MiMa)
    
    References:
    - https://github.com/apache/pekko/issues/2723
    - https://github.com/apache/pekko/pull/2726
---
 .../apache/pekko/stream/javadsl/SourceTest.java    |  16 ++++
 .../apache/pekko/stream/scaladsl/SourceSpec.scala  | 100 +++++++++++++++++++++
 .../apache/pekko/stream/TypePreservingFanIn.scala  |  43 +++++++++
 .../org/apache/pekko/stream/scaladsl/Graph.scala   |  38 ++++++--
 .../org/apache/pekko/stream/scaladsl/Source.scala  |  39 +++++++-
 5 files changed, 226 insertions(+), 10 deletions(-)

diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index dda5bb4bb7..f6c148afaa 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -41,6 +41,7 @@ import org.apache.pekko.japi.pf.PFBuilder;
 // #imports
 import org.apache.pekko.stream.*;
 // #imports
+import org.apache.pekko.stream.javadsl.MergeLatest;
 import org.apache.pekko.stream.scaladsl.FlowSpec;
 import org.apache.pekko.stream.stage.AbstractInHandler;
 import org.apache.pekko.stream.stage.AbstractOutHandler;
@@ -1156,6 +1157,21 @@ public class SourceTest extends StreamTest {
     assertEquals(6, result.toCompletableFuture().get(3, 
TimeUnit.SECONDS).intValue());
   }
 
+  // Regression test for https://github.com/apache/pekko/issues/2723
+  // Verifies that Source.combine with a single source correctly applies
+  // type-transforming strategies (like MergeLatest), rather than bypassing
+  // them with an unsafe asInstanceOf cast.
+  @Test
+  public void mustBeAbleToCombineSingleSourceWithMergeLatest() throws 
Exception {
+    final List<Source<Integer, NotUsed>> sources = 
Collections.singletonList(Source.single(1));
+    final List<List<Integer>> result =
+        Source.<Integer, List<Integer>, NotUsed>combine(sources, 
MergeLatest::create)
+            .runWith(Sink.collect(Collectors.toList()), system)
+            .toCompletableFuture()
+            .get(3, TimeUnit.SECONDS);
+    assertEquals(Collections.singletonList(Collections.singletonList(1)), 
result);
+  }
+
   @SuppressWarnings("unchecked")
   @Test
   public void mustBeAbleToZipN() throws Exception {
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index f9059aae07..dcc6aafae4 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -171,6 +171,106 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
       sub.request(5).expectNextN(0 to 4).expectComplete()
     }
 
+    // Regression tests for https://github.com/apache/pekko/issues/2723
+    // Source.combine with a single source must apply type-transforming fan-in 
strategies
+    // (like MergeLatest) correctly, rather than bypassing them with an unsafe 
cast.
+    // The TypePreservingFanIn trait marks strategies where T == U, enabling 
safe bypass.
+    // Strategies without this trait (MergeLatest, ZipWithN) are always routed 
through
+    // the fan-in graph even for a single source.
+
+    "combine single source with MergeLatest should emit wrapped elements" in {
+      Source
+        .combine(immutable.Seq(Source.single(1)))(MergeLatest(_))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(List(1)))
+    }
+
+    "combine single source with MergeLatest should emit all wrapped elements" 
in {
+      Source
+        .combine(immutable.Seq(Source(List(1, 2, 3))))(MergeLatest(_))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(List(1), List(2), List(3)))
+    }
+
+    "combine single source with ZipWithN should apply zipper function" in {
+      Source
+        .combine(immutable.Seq(Source(List(1, 2, 3))))(n => ZipWithN[Int, 
Int](_.sum)(n))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
+
+    "combine single source with Merge should still work (type-preserving)" in {
+      Source
+        .combine(immutable.Seq(Source.single(1)))(Merge(_))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1))
+    }
+
+    "combine single source with Concat should still work (type-preserving)" in 
{
+      Source
+        .combine(immutable.Seq(Source(List(1, 2, 3))))(Concat(_))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
+
+    "combine single source with Interleave should still work 
(type-preserving)" in {
+      Source
+        .combine(immutable.Seq(Source(List(1, 2, 3))))(Interleave(_, 1))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
+
+    "combine single source with wrapped Merge (.named) should still work" in {
+      // When Merge is wrapped via .named(), the TypePreservingFanIn trait is 
lost
+      // (GenericGraphWithChangedAttributes does not extend it). The code 
correctly
+      // routes through the fan-in graph instead of bypassing — functionally 
correct,
+      // just slightly less optimal.
+      Source
+        .combine(immutable.Seq(Source(List(1, 2, 3))))(n => 
Merge[Int](n).named("my-merge"))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
+
+    "combine single source with wrapped MergeLatest (.named) should emit 
wrapped elements" in {
+      Source
+        .combine(immutable.Seq(Source(List(1, 2, 3))))(n => 
MergeLatest[Int](n).named("my-merge-latest"))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(List(1), List(2), List(3)))
+    }
+
+    "combine single source with MergeSequence should route through strategy 
(validates sequences)" in {
+      // MergeSequence does NOT have TypePreservingFanIn because it validates 
sequence ordering
+      // (not a pure pass-through). With a single source, it still runs and 
validates sequences.
+      Source
+        .combine(immutable.Seq(Source(List(0L, 1L, 2L))))(n => 
MergeSequence[Long](n)(identity))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(0L, 1L, 2L))
+    }
+
+    "combine single source with MergePrioritized should still work 
(type-preserving)" in {
+      Source
+        .combine(immutable.Seq(Source(List(1, 2, 3))))(n => 
MergePrioritized(Seq.fill(n)(1)))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq(1, 2, 3))
+    }
+
+    "combine single source materialized value should be a singleton list" in {
+      val (mat, result) = Source
+        .combine(immutable.Seq(Source.single(1).mapMaterializedValue(_ => 
"mat-value")))(MergeLatest(_))
+        .toMat(Sink.seq)(Keep.both)
+        .run()
+      mat should ===(immutable.Seq("mat-value"))
+      result.futureValue should ===(immutable.Seq(List(1)))
+    }
+
+    "combine empty sources list should produce empty source" in {
+      val result = Source
+        .combine(immutable.Seq.empty[Source[Int, NotUsed]])(MergeLatest(_))
+        .runWith(Sink.seq)
+        .futureValue
+      result should ===(immutable.Seq.empty)
+    }
+
     "combine from two inputs with simplified API" in {
       val probes = immutable.Seq.fill(2)(TestPublisher.manualProbe[Int]())
       val source = Source.fromPublisher(probes(0)) :: 
Source.fromPublisher(probes(1)) :: Nil
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanIn.scala 
b/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanIn.scala
new file mode 100644
index 0000000000..765d6a1439
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanIn.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+package org.apache.pekko.stream
+
+/**
+ * Marker trait for fan-in graph stages whose output element type is the same 
as
+ * their input element type (i.e., `T => T`) AND whose single-input behavior is
+ * semantically equivalent to a no-op pass-through.
+ *
+ * Examples include [[scaladsl.Merge]], [[scaladsl.Concat]], 
[[scaladsl.Interleave]],
+ * [[scaladsl.MergePrioritized]], and [[scaladsl.OrElse]].
+ *
+ * Note: [[scaladsl.MergeSequence]] is intentionally '''not''' marked with 
this trait
+ * despite being type-preserving (`T => T`), because it validates sequence 
ordering
+ * even for a single input—its single-input behavior is NOT a no-op 
pass-through.
+ *
+ * This trait is used by [[scaladsl.Source.combine]] (and its Java API 
counterpart)
+ * to safely optimize the single-source case. When only one source is provided,
+ * the fan-in strategy can be bypassed with a direct pass-through if and only 
if the
+ * strategy is type-preserving AND semantically a no-op for single input. 
Without this
+ * marker, a bypass via `asInstanceOf` would be unsafe for type-transforming 
strategies
+ * like `MergeLatest` (where `T => List[T]`) or `ZipWithN` (where `A => O`).
+ *
+ * This design uses a "safe default": strategies '''without''' this trait will 
always
+ * be routed through the fan-in graph, even for a single source. This ensures
+ * correct behavior for unknown or third-party fan-in strategies that may 
transform
+ * the element type or have semantic side effects beyond type preservation.
+ *
+ * Note: if a stage with this trait is wrapped (e.g., via `.withAttributes()` 
or
+ * `.named()`), the trait may be lost and the stage will be routed through the
+ * fan-in graph instead of being bypassed. This is functionally correct—just
+ * slightly less optimal.
+ *
+ * @since 1.2.0
+ */
+trait TypePreservingFanIn
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
index 9ab3e76677..d3d45a51ab 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
@@ -96,7 +96,9 @@ object Merge {
  *
  * '''Cancels when''' downstream cancels
  */
-final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean) extends 
GraphStage[UniformFanInShape[T, T]] {
+final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean)
+    extends GraphStage[UniformFanInShape[T, T]]
+    with pekko.stream.TypePreservingFanIn {
   // one input might seem counter intuitive but saves us from special handling 
in other places
   require(inputPorts >= 1, "A Merge must have one or more input ports")
 
@@ -338,7 +340,8 @@ object MergePrioritized {
  * '''Cancels when''' downstream cancels
  */
 final class MergePrioritized[T] private (val priorities: Seq[Int], val 
eagerComplete: Boolean)
-    extends GraphStage[UniformFanInShape[T, T]] {
+    extends GraphStage[UniformFanInShape[T, T]]
+    with pekko.stream.TypePreservingFanIn {
   require(priorities.nonEmpty, "A Merge must have one or more input ports")
   require(priorities.forall(_ > 0), "Priorities should be positive integers")
 
@@ -463,8 +466,12 @@ object Interleave {
  * '''Cancels when''' downstream cancels
  */
 final class Interleave[T](val inputPorts: Int, val segmentSize: Int, val 
eagerClose: Boolean)
-    extends GraphStage[UniformFanInShape[T, T]] {
-  require(inputPorts > 1, "input ports must be > 1")
+    extends GraphStage[UniformFanInShape[T, T]]
+    with pekko.stream.TypePreservingFanIn {
+  // Relaxed from > 1 to >= 1: single-input Interleave is semantically valid 
(pass-through).
+  // This enables Source.combine to route single-source cases through the 
stage without
+  // needing a try-catch fallback. See #2723.
+  require(inputPorts >= 1, "input ports must be >= 1")
   require(segmentSize > 0, "segmentSize must be > 0")
 
   val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => 
Inlet[T]("Interleave.in" + i))
@@ -1313,8 +1320,13 @@ object Concat {
  *
  * '''Cancels when''' downstream cancels
  */
-final class Concat[T](val inputPorts: Int) extends 
GraphStage[UniformFanInShape[T, T]] {
-  require(inputPorts > 1, "A Concat must have more than 1 input ports")
+final class Concat[T](val inputPorts: Int)
+    extends GraphStage[UniformFanInShape[T, T]]
+    with pekko.stream.TypePreservingFanIn {
+  // Relaxed from > 1 to >= 1: single-input Concat is semantically valid 
(pass-through).
+  // This enables Source.combine to route single-source cases through the 
stage without
+  // needing a try-catch fallback. See #2723.
+  require(inputPorts >= 1, "A Concat must have at least 1 input port")
   val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => 
Inlet[T]("Concat.in" + i))
   val out: Outlet[T] = Outlet[T]("Concat.out")
   override def initialAttributes = DefaultAttributes.concat
@@ -1386,7 +1398,9 @@ object OrElse {
  * '''Cancels when''' downstream cancels
  */
 @InternalApi
-private[stream] final class OrElse[T] extends GraphStage[UniformFanInShape[T, 
T]] {
+private[stream] final class OrElse[T]
+    extends GraphStage[UniformFanInShape[T, T]]
+    with pekko.stream.TypePreservingFanIn {
   val primary = Inlet[T]("OrElse.primary")
   val secondary = Inlet[T]("OrElse.secondary")
   val out = Outlet[T]("OrElse.out")
@@ -1486,7 +1500,15 @@ object MergeSequence {
  */
 final class MergeSequence[T](val inputPorts: Int)(extractSequence: T => Long)
     extends GraphStage[UniformFanInShape[T, T]] {
-  require(inputPorts > 1, "A MergeSequence must have more than 1 input ports")
+  // Note: MergeSequence is type-preserving (T → T) but does NOT extend 
TypePreservingFanIn
+  // because it has semantic side effects beyond type preservation: it 
validates sequence ordering
+  // (expects elements starting from 0, incrementing by 1). For Source.combine 
with a single source,
+  // MergeSequence should still run to apply its sequence validation logic. 
See #2723.
+  //
+  // Relaxed from > 1 to >= 1: single-input MergeSequence is semantically 
valid.
+  // This enables Source.combine to route single-source cases through the 
stage without
+  // needing a try-catch fallback.
+  require(inputPorts >= 1, "A MergeSequence must have at least 1 input port")
   private val in: IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => 
Inlet[T]("MergeSequence.in" + i))
   private val out: Outlet[T] = Outlet("MergeSequence.out")
   override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index e6c867ebc9..c2336ca26f 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -803,8 +803,43 @@ object Source {
       fanInStrategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): 
Source[U, immutable.Seq[M]] =
     sources match {
       case immutable.Seq()       => Source.empty.mapMaterializedValue(_ => Nil)
-      case immutable.Seq(source) => source.asInstanceOf[Source[U, 
M]].mapMaterializedValue(_ :: Nil)
-      case _                     =>
+      case immutable.Seq(source) =>
+        // Single-source optimization: bypass the fan-in strategy if and only 
if the strategy
+        // is type-preserving (T == U), marked by the TypePreservingFanIn 
trait.
+        //
+        // For type-transforming strategies (e.g., MergeLatest: T => List[T], 
ZipWithN: A => O),
+        // we MUST route through the strategy even for a single source to 
ensure correct output
+        // types. Without this check, the asInstanceOf cast would silently 
produce incorrect
+        // results at runtime (see #2723).
+        //
+        // Design: "safe default" — strategies WITHOUT TypePreservingFanIn 
always go through
+        // the full fan-in graph. This correctly handles unknown or 
third-party strategies.
+        // All built-in type-preserving fan-in stages (Concat, Interleave, 
Merge, MergePrioritized,
+        // OrElse) have been relaxed to accept inputPorts >= 1, so 
fanInStrategy(1) will succeed
+        // and the TypePreservingFanIn trait can be checked. MergeSequence 
also accepts >= 1 but
+        // is intentionally NOT marked TypePreservingFanIn because it 
validates sequence ordering.
+        //
+        // Note: fanInStrategy(1) is always invoked here to determine the 
strategy's trait.
+        // Third-party strategies that reject n=1 will surface their exception 
immediately,
+        // which is preferable to silently returning an incorrectly-typed 
stream.
+        val strategyGraph = fanInStrategy(1)
+        strategyGraph match {
+          case _: pekko.stream.TypePreservingFanIn =>
+            // Type-preserving (T == U): safe to bypass the strategy with a 
direct pass-through.
+            // Use Source.fromGraph to handle non-Source Graph inputs safely 
(the sources parameter
+            // accepts Graph[SourceShape[T], M], not just Source[T, M]).
+            Source.fromGraph(source).asInstanceOf[Source[U, 
M]].mapMaterializedValue(_ :: Nil)
+          case _ =>
+            // Not type-preserving or unknown: route through the fan-in 
strategy.
+            // This ensures type-transforming strategies correctly transform 
the output.
+            Source.fromGraph(GraphDSL.create(sources) { implicit b => shapes =>
+              import GraphDSL.Implicits._
+              val c = b.add(strategyGraph)
+              shapes.head ~> c.in(0)
+              SourceShape(c.out)
+            })
+        }
+      case _ =>
         Source.fromGraph(GraphDSL.create(sources) { implicit b => shapes =>
           import GraphDSL.Implicits._
           val c = b.add(fanInStrategy(sources.size))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to