This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch fix-source-combine-single in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 45f7cbda003529623e9236a656c116f5e935d430 Author: He-Pin <[email protected]> AuthorDate: Mon Mar 16 00:53:11 2026 +0800 fix(stream): Source.combine single source with type-transforming fan-in strategies (#2723) Motivation: Source.combine with a single source bypassed the fan-in strategy using an unsafe asInstanceOf cast. This worked for type-preserving strategies like Merge (T → T), but silently produced incorrect results for type-transforming strategies like MergeLatest (T → List[T]). For example: Source.combine(Seq(Source.single(1)))(MergeLatest(_)) emitted 1 instead of List(1) Modification: - Introduce TypePreservingFanIn marker trait for fan-in stages where T == U AND single-input behavior is a no-op pass-through (Merge, Concat, Interleave, MergePrioritized, OrElse) - MergeSequence intentionally NOT marked: despite being T → T, it validates sequence ordering (not a pure pass-through) - Source.combine single-source case: check TypePreservingFanIn trait before bypassing. Strategies without the trait are routed through the fan-in graph. - Relax Concat, Interleave, MergeSequence to accept inputPorts >= 1 (was > 1). This eliminates the need for a try-catch fallback in Source.combine and allows these stages to be used directly with a single input. - Use Source.fromGraph for non-Source Graph inputs safety - Add 14 regression tests (12 Scala + 1 Java + MergeSequence validation) Result: - MergeLatest/ZipWithN correctly apply their transformation even for single source - Merge/Concat/Interleave correctly bypass (type-preserving optimization) - MergeSequence correctly validates sequences even for single source - Unknown/third-party strategies default to routing through the fan-in graph (safe default for strategies that may transform types) - Binary compatibility maintained (verified via MiMa) References: - https://github.com/apache/pekko/issues/2723 - https://github.com/apache/pekko/pull/2726 --- .../apache/pekko/stream/javadsl/SourceTest.java | 16 ++++ .../apache/pekko/stream/scaladsl/SourceSpec.scala | 100 +++++++++++++++++++++ .../apache/pekko/stream/TypePreservingFanIn.scala | 43 +++++++++ .../org/apache/pekko/stream/scaladsl/Graph.scala | 38 ++++++-- .../org/apache/pekko/stream/scaladsl/Source.scala | 39 +++++++- 5 files changed, 226 insertions(+), 10 deletions(-) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index dda5bb4bb7..f6c148afaa 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -41,6 +41,7 @@ import org.apache.pekko.japi.pf.PFBuilder; // #imports import org.apache.pekko.stream.*; // #imports +import org.apache.pekko.stream.javadsl.MergeLatest; import org.apache.pekko.stream.scaladsl.FlowSpec; import org.apache.pekko.stream.stage.AbstractInHandler; import org.apache.pekko.stream.stage.AbstractOutHandler; @@ -1156,6 +1157,21 @@ public class SourceTest extends StreamTest { assertEquals(6, result.toCompletableFuture().get(3, TimeUnit.SECONDS).intValue()); } + // Regression test for https://github.com/apache/pekko/issues/2723 + // Verifies that Source.combine with a single source correctly applies + // type-transforming strategies (like MergeLatest), rather than bypassing + // them with an unsafe asInstanceOf cast. + @Test + public void mustBeAbleToCombineSingleSourceWithMergeLatest() throws Exception { + final List<Source<Integer, NotUsed>> sources = Collections.singletonList(Source.single(1)); + final List<List<Integer>> result = + Source.<Integer, List<Integer>, NotUsed>combine(sources, MergeLatest::create) + .runWith(Sink.collect(Collectors.toList()), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + assertEquals(Collections.singletonList(Collections.singletonList(1)), result); + } + @SuppressWarnings("unchecked") @Test public void mustBeAbleToZipN() throws Exception { diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala index f9059aae07..dcc6aafae4 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala @@ -171,6 +171,106 @@ class SourceSpec extends StreamSpec with DefaultTimeout { sub.request(5).expectNextN(0 to 4).expectComplete() } + // Regression tests for https://github.com/apache/pekko/issues/2723 + // Source.combine with a single source must apply type-transforming fan-in strategies + // (like MergeLatest) correctly, rather than bypassing them with an unsafe cast. + // The TypePreservingFanIn trait marks strategies where T == U, enabling safe bypass. + // Strategies without this trait (MergeLatest, ZipWithN) are always routed through + // the fan-in graph even for a single source. + + "combine single source with MergeLatest should emit wrapped elements" in { + Source + .combine(immutable.Seq(Source.single(1)))(MergeLatest(_)) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq(List(1))) + } + + "combine single source with MergeLatest should emit all wrapped elements" in { + Source + .combine(immutable.Seq(Source(List(1, 2, 3))))(MergeLatest(_)) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq(List(1), List(2), List(3))) + } + + "combine single source with ZipWithN should apply zipper function" in { + Source + .combine(immutable.Seq(Source(List(1, 2, 3))))(n => ZipWithN[Int, Int](_.sum)(n)) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq(1, 2, 3)) + } + + "combine single source with Merge should still work (type-preserving)" in { + Source + .combine(immutable.Seq(Source.single(1)))(Merge(_)) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq(1)) + } + + "combine single source with Concat should still work (type-preserving)" in { + Source + .combine(immutable.Seq(Source(List(1, 2, 3))))(Concat(_)) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq(1, 2, 3)) + } + + "combine single source with Interleave should still work (type-preserving)" in { + Source + .combine(immutable.Seq(Source(List(1, 2, 3))))(Interleave(_, 1)) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq(1, 2, 3)) + } + + "combine single source with wrapped Merge (.named) should still work" in { + // When Merge is wrapped via .named(), the TypePreservingFanIn trait is lost + // (GenericGraphWithChangedAttributes does not extend it). The code correctly + // routes through the fan-in graph instead of bypassing — functionally correct, + // just slightly less optimal. + Source + .combine(immutable.Seq(Source(List(1, 2, 3))))(n => Merge[Int](n).named("my-merge")) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq(1, 2, 3)) + } + + "combine single source with wrapped MergeLatest (.named) should emit wrapped elements" in { + Source + .combine(immutable.Seq(Source(List(1, 2, 3))))(n => MergeLatest[Int](n).named("my-merge-latest")) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq(List(1), List(2), List(3))) + } + + "combine single source with MergeSequence should route through strategy (validates sequences)" in { + // MergeSequence does NOT have TypePreservingFanIn because it validates sequence ordering + // (not a pure pass-through). With a single source, it still runs and validates sequences. + Source + .combine(immutable.Seq(Source(List(0L, 1L, 2L))))(n => MergeSequence[Long](n)(identity)) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq(0L, 1L, 2L)) + } + + "combine single source with MergePrioritized should still work (type-preserving)" in { + Source + .combine(immutable.Seq(Source(List(1, 2, 3))))(n => MergePrioritized(Seq.fill(n)(1))) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq(1, 2, 3)) + } + + "combine single source materialized value should be a singleton list" in { + val (mat, result) = Source + .combine(immutable.Seq(Source.single(1).mapMaterializedValue(_ => "mat-value")))(MergeLatest(_)) + .toMat(Sink.seq)(Keep.both) + .run() + mat should ===(immutable.Seq("mat-value")) + result.futureValue should ===(immutable.Seq(List(1))) + } + + "combine empty sources list should produce empty source" in { + val result = Source + .combine(immutable.Seq.empty[Source[Int, NotUsed]])(MergeLatest(_)) + .runWith(Sink.seq) + .futureValue + result should ===(immutable.Seq.empty) + } + "combine from two inputs with simplified API" in { val probes = immutable.Seq.fill(2)(TestPublisher.manualProbe[Int]()) val source = Source.fromPublisher(probes(0)) :: Source.fromPublisher(probes(1)) :: Nil diff --git a/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanIn.scala b/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanIn.scala new file mode 100644 index 0000000000..765d6a1439 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanIn.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +package org.apache.pekko.stream + +/** + * Marker trait for fan-in graph stages whose output element type is the same as + * their input element type (i.e., `T => T`) AND whose single-input behavior is + * semantically equivalent to a no-op pass-through. + * + * Examples include [[scaladsl.Merge]], [[scaladsl.Concat]], [[scaladsl.Interleave]], + * [[scaladsl.MergePrioritized]], and [[scaladsl.OrElse]]. + * + * Note: [[scaladsl.MergeSequence]] is intentionally '''not''' marked with this trait + * despite being type-preserving (`T => T`), because it validates sequence ordering + * even for a single input—its single-input behavior is NOT a no-op pass-through. + * + * This trait is used by [[scaladsl.Source.combine]] (and its Java API counterpart) + * to safely optimize the single-source case. When only one source is provided, + * the fan-in strategy can be bypassed with a direct pass-through if and only if the + * strategy is type-preserving AND semantically a no-op for single input. Without this + * marker, a bypass via `asInstanceOf` would be unsafe for type-transforming strategies + * like `MergeLatest` (where `T => List[T]`) or `ZipWithN` (where `A => O`). + * + * This design uses a "safe default": strategies '''without''' this trait will always + * be routed through the fan-in graph, even for a single source. This ensures + * correct behavior for unknown or third-party fan-in strategies that may transform + * the element type or have semantic side effects beyond type preservation. + * + * Note: if a stage with this trait is wrapped (e.g., via `.withAttributes()` or + * `.named()`), the trait may be lost and the stage will be routed through the + * fan-in graph instead of being bypassed. This is functionally correct—just + * slightly less optimal. + * + * @since 1.2.0 + */ +trait TypePreservingFanIn diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala index 9ab3e76677..d3d45a51ab 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala @@ -96,7 +96,9 @@ object Merge { * * '''Cancels when''' downstream cancels */ -final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean) extends GraphStage[UniformFanInShape[T, T]] { +final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean) + extends GraphStage[UniformFanInShape[T, T]] + with pekko.stream.TypePreservingFanIn { // one input might seem counter intuitive but saves us from special handling in other places require(inputPorts >= 1, "A Merge must have one or more input ports") @@ -338,7 +340,8 @@ object MergePrioritized { * '''Cancels when''' downstream cancels */ final class MergePrioritized[T] private (val priorities: Seq[Int], val eagerComplete: Boolean) - extends GraphStage[UniformFanInShape[T, T]] { + extends GraphStage[UniformFanInShape[T, T]] + with pekko.stream.TypePreservingFanIn { require(priorities.nonEmpty, "A Merge must have one or more input ports") require(priorities.forall(_ > 0), "Priorities should be positive integers") @@ -463,8 +466,12 @@ object Interleave { * '''Cancels when''' downstream cancels */ final class Interleave[T](val inputPorts: Int, val segmentSize: Int, val eagerClose: Boolean) - extends GraphStage[UniformFanInShape[T, T]] { - require(inputPorts > 1, "input ports must be > 1") + extends GraphStage[UniformFanInShape[T, T]] + with pekko.stream.TypePreservingFanIn { + // Relaxed from > 1 to >= 1: single-input Interleave is semantically valid (pass-through). + // This enables Source.combine to route single-source cases through the stage without + // needing a try-catch fallback. See #2723. + require(inputPorts >= 1, "input ports must be >= 1") require(segmentSize > 0, "segmentSize must be > 0") val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("Interleave.in" + i)) @@ -1313,8 +1320,13 @@ object Concat { * * '''Cancels when''' downstream cancels */ -final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]] { - require(inputPorts > 1, "A Concat must have more than 1 input ports") +final class Concat[T](val inputPorts: Int) + extends GraphStage[UniformFanInShape[T, T]] + with pekko.stream.TypePreservingFanIn { + // Relaxed from > 1 to >= 1: single-input Concat is semantically valid (pass-through). + // This enables Source.combine to route single-source cases through the stage without + // needing a try-catch fallback. See #2723. + require(inputPorts >= 1, "A Concat must have at least 1 input port") val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("Concat.in" + i)) val out: Outlet[T] = Outlet[T]("Concat.out") override def initialAttributes = DefaultAttributes.concat @@ -1386,7 +1398,9 @@ object OrElse { * '''Cancels when''' downstream cancels */ @InternalApi -private[stream] final class OrElse[T] extends GraphStage[UniformFanInShape[T, T]] { +private[stream] final class OrElse[T] + extends GraphStage[UniformFanInShape[T, T]] + with pekko.stream.TypePreservingFanIn { val primary = Inlet[T]("OrElse.primary") val secondary = Inlet[T]("OrElse.secondary") val out = Outlet[T]("OrElse.out") @@ -1486,7 +1500,15 @@ object MergeSequence { */ final class MergeSequence[T](val inputPorts: Int)(extractSequence: T => Long) extends GraphStage[UniformFanInShape[T, T]] { - require(inputPorts > 1, "A MergeSequence must have more than 1 input ports") + // Note: MergeSequence is type-preserving (T → T) but does NOT extend TypePreservingFanIn + // because it has semantic side effects beyond type preservation: it validates sequence ordering + // (expects elements starting from 0, incrementing by 1). For Source.combine with a single source, + // MergeSequence should still run to apply its sequence validation logic. See #2723. + // + // Relaxed from > 1 to >= 1: single-input MergeSequence is semantically valid. + // This enables Source.combine to route single-source cases through the stage without + // needing a try-catch fallback. + require(inputPorts >= 1, "A MergeSequence must have at least 1 input port") private val in: IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("MergeSequence.in" + i)) private val out: Outlet[T] = Outlet("MergeSequence.out") override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index e6c867ebc9..c2336ca26f 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -803,8 +803,43 @@ object Source { fanInStrategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, immutable.Seq[M]] = sources match { case immutable.Seq() => Source.empty.mapMaterializedValue(_ => Nil) - case immutable.Seq(source) => source.asInstanceOf[Source[U, M]].mapMaterializedValue(_ :: Nil) - case _ => + case immutable.Seq(source) => + // Single-source optimization: bypass the fan-in strategy if and only if the strategy + // is type-preserving (T == U), marked by the TypePreservingFanIn trait. + // + // For type-transforming strategies (e.g., MergeLatest: T => List[T], ZipWithN: A => O), + // we MUST route through the strategy even for a single source to ensure correct output + // types. Without this check, the asInstanceOf cast would silently produce incorrect + // results at runtime (see #2723). + // + // Design: "safe default" — strategies WITHOUT TypePreservingFanIn always go through + // the full fan-in graph. This correctly handles unknown or third-party strategies. + // All built-in type-preserving fan-in stages (Concat, Interleave, Merge, MergePrioritized, + // OrElse) have been relaxed to accept inputPorts >= 1, so fanInStrategy(1) will succeed + // and the TypePreservingFanIn trait can be checked. MergeSequence also accepts >= 1 but + // is intentionally NOT marked TypePreservingFanIn because it validates sequence ordering. + // + // Note: fanInStrategy(1) is always invoked here to determine the strategy's trait. + // Third-party strategies that reject n=1 will surface their exception immediately, + // which is preferable to silently returning an incorrectly-typed stream. + val strategyGraph = fanInStrategy(1) + strategyGraph match { + case _: pekko.stream.TypePreservingFanIn => + // Type-preserving (T == U): safe to bypass the strategy with a direct pass-through. + // Use Source.fromGraph to handle non-Source Graph inputs safely (the sources parameter + // accepts Graph[SourceShape[T], M], not just Source[T, M]). + Source.fromGraph(source).asInstanceOf[Source[U, M]].mapMaterializedValue(_ :: Nil) + case _ => + // Not type-preserving or unknown: route through the fan-in strategy. + // This ensures type-transforming strategies correctly transform the output. + Source.fromGraph(GraphDSL.create(sources) { implicit b => shapes => + import GraphDSL.Implicits._ + val c = b.add(strategyGraph) + shapes.head ~> c.in(0) + SourceShape(c.out) + }) + } + case _ => Source.fromGraph(GraphDSL.create(sources) { implicit b => shapes => import GraphDSL.Implicits._ val c = b.add(fanInStrategy(sources.size)) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
