This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 7429728805 optimize: FlattenMerge avoids substream materialization for
value-presented sources (#2977)
7429728805 is described below
commit 7429728805bc775f2326dde675a1e66318dd6393
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Wed May 20 10:59:23 2026 +0800
optimize: FlattenMerge avoids substream materialization for value-presented
sources (#2977)
* optimize: FlattenMerge avoids substream materialization for
value-presented sources
Motivation:
FlattenMerge previously only fast-pathed `SingleSource`. For all other
inner sources -- `Source.empty`, `Source(List)`, `Source.fromJavaStream`,
`Source.future(Future.successful(...))`, range/iterator/repeat sources --
each one paid the cost of materializing a `SubSinkInlet` and
`subFusingMaterializer.materialize(...)`. FlattenConcat already does this
optimization via `TraversalBuilder.getValuePresentedSource`; FlattenMerge
should benefit too, especially because the single-arg `flatMapConcat(f)`
internally uses `FlattenMerge(1)` and so depends on FlattenMerge for its
hot path.
Modification:
- Generalize FlattenMerge to dispatch on `getValuePresentedSource`
(instead of `getSingleSource`) and consume `SingleSource`,
`IterableSource`, `IteratorSource`, `RangeSource`, `RepeatSource`,
`JavaStreamSource`, `FutureSource`, `FailedSource`, and empty sources
in-place without materialization, mirroring FlattenConcat.
- Add an `InflightSource[T]` family inside the new `FlattenMerge`
companion to occupy a breadth slot for multi-element value-presented
sources. Track them via a new `pendingInflightSources` counter so
`activeSources` correctly bounds the breadth budget.
- Preserve merge semantics: when an inflight source still has more
elements after a push, re-enqueue it so other concurrent sources keep
interleaving (instead of draining one source first, which is the
concat behaviour).
- Fold completed `Future`s and `FailedSource` directly: success pushes
or queues a single element, failure calls `failStage`.
- Pending `Future`s register a callback via `getAsyncCallback` and
occupy a breadth slot until completion.
- Empty inner sources are discarded in place (no slot consumed).
Result:
- `flatMapMerge(breadth, ...)` and the default `flatMapConcat(...)`
(which routes through `FlattenMerge(1)`) skip substream materialization
for value-presented inner sources, reducing per-source GC and stage
overhead.
- All existing FlattenMerge / flatMapConcat tests pass; new tests cover
empty / single / iterable / range / java stream / completed and
delayed future / failed inner sources across breadth = 1..128.
- Internal API only (`@InternalApi private[pekko]`); MiMa is clean.
References:
- The optimization mirrors FlattenConcat's value-presented-source
handling introduced in 1.2.0.
* refactor: share InflightSource family between FlattenConcat and
FlattenMerge
Motivation:
After the previous commit, FlattenMerge grew its own copy of the
`InflightSource[T]` hierarchy (Iterator/Range/Repeat/CompletedFuture/
PendingFuture) duplicating what FlattenConcat already had in its
companion object. Two near-identical families across two files is a
maintenance hazard: any future tweak to the value-presented optimization
(e.g. adding a new source type, fixing a Java-stream cleanup leak) would
have to be mirrored, and the families had already drifted in small ways
(e.g. `tryPull`/`cancel`/`materialize` declared abstract in concat with
no-op overrides on every subclass; concat used `isClosed = true` for the
completed-future variant while merge used `!_hasNext`).
Modification:
- Extract the common `InflightSource[T]` base and the five value-presented
subclasses (Iterator/Range/Repeat/CompletedFuture/PendingFuture) into
a new `pekko.stream.impl.fusing.InflightSources` package-private object.
- Promote `tryPull` / `cancel` / `materialize` from abstract to concrete
no-op defaults, so the value-presented subclasses no longer carry empty
overrides. Stages that wrap a real `SubSinkInlet` (only FlattenConcat's
`attachAndMaterializeSource` does this) override what they need.
- Align `InflightCompletedFutureSource.isClosed` to FlattenConcat's
`true` semantics — behaviorally equivalent in both stages, but more
faithful to the source being a one-shot cached value.
- Drop the `sealed` modifier on `InflightSource` so FlattenConcat's
attached-substream anonymous subclass can still extend it from another
file in the same package.
- Remove the duplicate definitions from FlattenConcat's companion (now
unused, drop the empty companion entirely) and from FlattenMerge's
companion. Both stages import from the shared object instead.
Result:
- Net -176 lines of duplication; one canonical home for the
optimization's data types.
- Future additions (e.g. extending the optimization to other stream-of-
streams stages such as `MergeMany`-style operators) only need to
reference `InflightSources`.
- All FlattenConcat / FlattenMerge / flatMapConcat parallelism tests
remain green; MiMa is clean (`@InternalApi private[fusing]`).
* fix: address PR #2977 review feedback for value-presented sources
Motivation:
Copilot review on PR #2977 flagged two issues: (1) Java streams obtained
via Source.fromJavaStream were converted to Scala iterators and fed into
InflightIteratorSource, dropping the BaseStream close contract and leaking
onClose handlers and underlying resources; (2) the test purporting to
verify "no pre-materialization for value-presented sources" actually
counted lazySingle materializations and was misleading because
Source.lazySingle
is itself non-VP.
Modification:
- Add InflightJavaStreamSource in InflightSources.scala that wraps the
BaseStream directly, eagerly closes empty streams, closes on exhaustion,
and closes on cancel.
- Wire the new wrapper through FlattenConcat.addJavaStreamSource and
FlattenMerge.addInflightJavaStreamSource so both stages honor the
close contract on the value-presented fast path.
- Cancel queued inflight sources in FlattenMerge.postStop so JavaStream
resources held in the queue (not yet promoted to active SubSinkInlets)
are released on stage termination.
- Replace the misleading test with one that mixes value-presented and
non-VP inner sources via lazySingle().buffer() and asserts the counter
equals only the non-VP count, proving the VP fast path skips
materialization.
- Add two regression tests for the close contract: exhaustion of
finite Java streams and downstream cancel against infinite ones.
Result:
Java streams routed through the VP fast path now close deterministically
on exhaustion, cancel, and stage termination. The materialization-skip
property is demonstrated by a meaningful counter test rather than a
tautology. All 39 FlowFlattenMergeSpec tests pass.
* fix: drop recursive S bound from InflightJavaStreamSource for Scala 3
Motivation:
PR #2977 CI failed on Scala 3.3.7 with a Type Mismatch in FlattenConcat
and FlattenMerge: the dispatch pattern matched JavaStreamSource[T, _] and
forwarded the value to a helper requiring [S <: BaseStream[T, S]]. Scala 2
implicitly skolemized the existential, but Scala 3 does not, causing the
stream module to fail to compile on Scala 3.
Modification:
- Drop the recursive S type parameter from InflightJavaStreamSource;
internally only iterator() and close() are invoked, both of which work
on BaseStream[T, _].
- Drop the matching S type parameter from FlattenConcat.addJavaStreamSource
and FlattenMerge.addInflightJavaStreamSource, accepting
JavaStreamSource[T, _]
directly so the existential never needs to be opened.
Result:
Scala 3.3.7 cross-compile is clean, Scala 2.13 still compiles, MiMa is
green, scalafmt/headerCheck pass, and FlowFlattenMergeSpec (39/39) plus
FlowFlattenConcatSpec / FlowFlatMapConcatSpec all pass on both Scala
versions.
* fix: treat Success(null) future inner sources as completion in flatten ops
Motivation:
GraphStages.FutureSource treats `Success(null)` as
completion-without-element
(see FutureSource.handle). The optimized value-presented fast paths in
FlattenConcat and FlattenMerge added in #2977 forwarded `null` downstream,
violating Reactive Streams' no-null rule and diverging from the materialized
FutureSource. The InflightJavaStreamSource also failed to close the
underlying
BaseStream when user code in `iterator.next()` threw, leaking the resource.
Modification:
- InflightSources: gate value emission via `hasFutureElement` so both
completed and pending future wrappers report `hasNext = false` on
`Success(null)`; close the BaseStream when iterator.next() throws.
- FlattenConcat / FlattenMerge `addCompletedFutureElem`: special-case
`Success(null)` as discard, mirroring FutureSource.
- FlattenConcat `tryPullNextSourceInQueue`: when a head InflightSource
completes without emitting (Success(null)) and demand is still pending
for a following SingleSource at the new head, push directly. Without
this, the stage stalled when `in` was closed.
- Tests: directional `Success(null)` coverage for both operators.
Result:
Inflight wrappers stay consistent with FutureSource semantics; no null is
ever forwarded; resource leak on iterator failure is plugged; FlattenConcat
no longer stalls when a pending future resolves to `Success(null)` ahead of
a queued SingleSource.
Tests: stream-tests/testOnly *FlatMap* *Flatten* *PrefixAndTail*
*Concat*Spec
References: PR #2977 review comments
* fix: surface settled head source in FlattenConcat tryPullNextSourceInQueue
Motivation:
When the previous head source completed without emitting (e.g. a pending
future resolving to Success(null), which the optimized fast path treats as
completion-without-element to mirror GraphStages.FutureSource), a settled
InflightSource at the new head — InflightCompletedFutureSource(Failure(_))
or InflightCompletedFutureSource(Success(_)) — was stranded in the queue:
tryPullNextSourceInQueue called src.tryPull(), which is a no-op for
already-settled sources. With `in` already closed, no further onPull would
fire and the stage stalled.
Modification:
Drive the new InflightSource head via pushOut when demand is pending, so a
queued failure surfaces via handleCurrentSourceClosed and a queued
completed-future value gets emitted. Add directional tests covering both
paths in FlowFlatMapConcatParallelismSpec.
Result:
Failures and completed-future values queued behind a Success(null) pending
future propagate instead of stalling; existing flatten suites stay green.
Tests: stream-tests/testOnly FlowFlatMapConcatParallelismSpec
FlowFlattenMergeSpec
---
.../FlowFlatMapConcatParallelismSpec.scala | 47 ++++++
.../stream/scaladsl/FlowFlattenMergeSpec.scala | 175 +++++++++++++++++++-
.../pekko/stream/impl/fusing/FlattenConcat.scala | 174 +++++++-------------
.../pekko/stream/impl/fusing/InflightSources.scala | 181 +++++++++++++++++++++
.../pekko/stream/impl/fusing/StreamOfStreams.scala | 178 ++++++++++++++++----
5 files changed, 608 insertions(+), 147 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
index 483838aece..59c61b8ff9 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
@@ -149,6 +149,53 @@ class FlowFlatMapConcatParallelismSpec extends
StreamSpec("""
.futureValue should ===(1 to 4)
}
+ "treat Success(null) future inner sources as completion-without-element"
in {
+ // Mirrors GraphStages.FutureSource semantics: Success(null) completes
the inner
+ // source without emitting. The optimized fast path must match this.
+ val toIntegerSeq =
Flow[Integer].grouped(1000).toMat(Sink.head)(Keep.right)
+ Source(
+ List[Source[Integer, NotUsed]](
+ Source.future(Future.successful[Integer](null)),
+ Source.single[Integer](1),
+ Source.future(after(1.millis)(Future.successful[Integer](null))),
+ Source.lazyFuture(() => Future.successful[Integer](null)),
+ Source.single[Integer](2)))
+ .flatMapConcat(ThreadLocalRandom.current().nextInt(1, 129), identity)
+ .runWith(toIntegerSeq)
+ .futureValue should ===(Seq[Integer](1, 2))
+ }
+
+ "propagate inner-source failure queued behind a Success(null) pending
future" in {
+ // Regression: tryPullNextSourceInQueue used to call src.tryPull() for
+ // InflightSource heads, which is a no-op for already-settled sources
+ // (e.g. InflightCompletedFutureSource(Failure(_))). Once the pending
+ // Success(null) head was removed, the failure sat in the queue forever
+ // because nothing surfaced it. Drive the head via pushOut instead.
+ val ex = new BoomException
+ val probe = Source(
+ List[Source[Integer, NotUsed]](
+ Source.future(after(1.millis)(Future.successful[Integer](null))),
+ Source.failed[Integer](ex)))
+ .flatMapConcat(parallelism = 4, identity)
+ .runWith(TestSink())
+ probe.request(1)
+ probe.expectError() should ===(ex)
+ }
+
+ "emit completed future queued behind a Success(null) pending future" in {
+ // Same root cause as above: an InflightCompletedFutureSource(Success(_))
+ // queued behind a Success(null) pending future was stranded because
+ // tryPull() is a no-op on already-settled sources.
+ val toIntegerSeq =
Flow[Integer].grouped(1000).toMat(Sink.head)(Keep.right)
+ Source(
+ List[Source[Integer, NotUsed]](
+ Source.future(after(1.millis)(Future.successful[Integer](null))),
+ Source.future(Future.successful[Integer](42))))
+ .flatMapConcat(parallelism = 4, identity)
+ .runWith(toIntegerSeq)
+ .futureValue should ===(Seq[Integer](42))
+ }
+
"work with value presented sources when demands slow" in {
val prob = Source(
List(Source.empty[Int], Source.single(1), Source(List(2, 3, 4)),
Source.lazyFuture(() => Future.successful(5))))
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala
index 9ae6a231dc..28d44ecaca 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala
@@ -13,11 +13,18 @@
package org.apache.pekko.stream.scaladsl
+import java.util.Collections
+import java.util.concurrent.ThreadLocalRandom
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.annotation.switch
import scala.concurrent._
import scala.concurrent.duration._
+import scala.util.control.NoStackTrace
import org.apache.pekko
import pekko.NotUsed
+import pekko.pattern.FutureTimeoutSupport
import pekko.stream._
import pekko.stream.stage.GraphStage
import pekko.stream.stage.GraphStageLogic
@@ -30,9 +37,11 @@ import pekko.testkit.TestLatch
import org.scalatest.exceptions.TestFailedException
-class FlowFlattenMergeSpec extends StreamSpec {
+class FlowFlattenMergeSpec extends StreamSpec with FutureTimeoutSupport {
import system.dispatcher
+ class BoomException extends RuntimeException("BOOM~~") with NoStackTrace
+
def src10(i: Int) = Source(i until (i + 10))
def blocked = Source.future(Promise[Int]().future)
@@ -280,5 +289,169 @@ class FlowFlattenMergeSpec extends StreamSpec {
probe.expectComplete()
}
+ val checkBreadths = List(1, 2, 4, 8, 16, 32, 64, 128)
+
+ for (b <- checkBreadths) {
+ s"work with value presented sources with breadth: $b" in {
+ Source(
+ List(
+ Source.empty[Int],
+ Source.single(1),
+ Source.empty[Int],
+ Source(List(2, 3, 4)),
+ Source.future(Future.successful(5)),
+ Source.lazyFuture(() => Future.successful(6)),
+ Source.future(after(1.millis)(Future.successful(7)))))
+ .flatMapMerge(b, identity)
+ .runWith(toSet)
+ .futureValue should ===((1 to 7).toSet)
+ }
+ }
+
+ def generateRandomValuePresentedSources(nums: Int): (Int, List[Source[Int,
NotUsed]]) = {
+ val seq = List.tabulate(nums) { _ =>
+ val random = ThreadLocalRandom.current().nextInt(1, 10)
+ (random: @switch) match {
+ case 1 => Source.single(1)
+ case 2 => Source(List(1))
+ case 3 => Source.fromJavaStream(() =>
Collections.singleton(1).stream())
+ case 4 => Source.future(Future.successful(1))
+ case 5 => Source.future(after(1.millis)(Future.successful(1)))
+ case _ => Source.empty[Int]
+ }
+ }
+ val sum = seq.filterNot(_.eq(Source.empty[Int])).size
+ (sum, seq)
+ }
+
+ for (b <- checkBreadths) {
+ s"work with generated value presented sources with breadth: $b " in {
+ val (sum, sources @ _) = generateRandomValuePresentedSources(10000)
+ Source(sources)
+ .flatMapMerge(b, identity(_))
+ .runWith(Sink.seq)
+ .map(_.sum)(scala.concurrent.ExecutionContext.parasitic)
+ .futureValue shouldBe sum
+ }
+ }
+
+ "work with value presented failed sources" in {
+ val ex = new BoomException
+ Source(
+ List(
+ Source.empty[Int],
+ Source.single(1),
+ Source.empty[Int],
+ Source(List(2, 3, 4)),
+ Source.future(Future.failed(ex)),
+ Source.lazyFuture(() => Future.successful(5))))
+ .flatMapMerge(ThreadLocalRandom.current().nextInt(1, 129), identity)
+ .onErrorComplete[BoomException]()
+ .runWith(toSet)
+ .futureValue.subsetOf((1 to 5).toSet) should ===(true)
+ }
+
+ "treat Success(null) future inner sources as completion-without-element"
in {
+ // Mirrors GraphStages.FutureSource semantics: Success(null) completes
the inner
+ // source without emitting. The optimized fast path must match this.
+ val toIntegerSet =
Flow[Integer].grouped(1000).toMat(Sink.head)(Keep.right).mapMaterializedValue(_.map(_.toSet))
+ Source(
+ List[Source[Integer, NotUsed]](
+ Source.future(Future.successful[Integer](null)),
+ Source.single[Integer](1),
+ Source.future(after(1.millis)(Future.successful[Integer](null))),
+ Source.lazyFuture(() => Future.successful[Integer](null)),
+ Source.single[Integer](2)))
+ .flatMapMerge(ThreadLocalRandom.current().nextInt(1, 129), identity)
+ .runWith(toIntegerSet)
+ .futureValue should ===(Set[Integer](1, 2))
+ }
+
+ val breadth = ThreadLocalRandom.current().nextInt(4, 65)
+ s"avoid pre-materialization for value-presented sources, breadth =
$breadth" in {
+ val materializationCounter = new AtomicInteger(0)
+ val n = breadth * 3
+ val probe = Source(1 to n)
+ .flatMapMerge(
+ breadth,
+ value =>
+ Source.lazySingle(() => {
+ materializationCounter.incrementAndGet()
+ value
+ }))
+ .runWith(TestSink())
+
+ probe.request(n.toLong)
+ probe.expectNextN(n.toLong).toSet should ===((1 to n).toSet)
+ probe.expectComplete()
+ // Source.lazySingle is not a value-presented source, so each is
materialized.
+ materializationCounter.get() shouldBe n
+ }
+
+ s"only materialize non-value-presented inner sources, breadth = $breadth"
in {
+ val materializationCounter = new AtomicInteger(0)
+ val n = breadth * 3
+ // Mix value-presented (Source.single, fast path) with
non-value-presented
+ // (lazySingle.buffer, slow path). The counter sits inside the lazySingle
+ // factory and only fires when the inner source is materialized as a
substream.
+ val probe = Source(1 to (n * 2))
+ .flatMapMerge(
+ breadth,
+ value =>
+ if (value % 2 == 0) Source.single(value)
+ else
+ Source
+ .lazySingle(() => {
+ materializationCounter.incrementAndGet()
+ value
+ })
+ .buffer(1, overflowStrategy = OverflowStrategy.backpressure))
+ .runWith(TestSink())
+
+ probe.request(n.toLong * 2)
+ probe.expectNextN(n.toLong * 2).toSet should ===((1 to (n * 2)).toSet)
+ probe.expectComplete()
+ // Only odd values (non-VP) take the substream materialization path.
+ materializationCounter.get() shouldBe n
+ }
+
+ "close JavaStream-backed inner sources on exhaustion" in {
+ val closeCount = new AtomicInteger(0)
+ val streams = (1 to 4).toList
+ Source(streams)
+ .flatMapMerge(
+ 4,
+ (n: Int) =>
+ Source.fromJavaStream(() =>
+ java.util.stream.Stream.of((1 to n).map(Integer.valueOf):
_*).onClose(() =>
+ closeCount.incrementAndGet())))
+ .runWith(Sink.ignore)
+ .futureValue
+ closeCount.get() shouldBe streams.size
+ }
+
+ "close JavaStream-backed inner sources on downstream cancel" in {
+ val closeCount = new AtomicInteger(0)
+ // Endless inner streams; when downstream cancels, the inflight wrappers
+ // queued in FlattenMerge must close their underlying Java streams.
+ val probe = Source
+ .repeat(())
+ .flatMapMerge(
+ 4,
+ _ =>
+ Source.fromJavaStream(() =>
+ java.util.stream.Stream
+ .generate[Integer](() => 1)
+ .onClose(() => closeCount.incrementAndGet())))
+ .runWith(TestSink())
+
+ probe.request(8)
+ probe.expectNextN(8)
+ probe.cancel()
+ awaitAssert {
+ closeCount.get() should be >= 1
+ }
+ }
+
}
}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala
index 7592733d35..2aefcc9d57 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala
@@ -27,103 +27,11 @@ import pekko.stream.{ Attributes, FlowShape, Graph, Inlet,
Outlet, SourceShape,
import pekko.stream.impl.{ Buffer => BufferImpl, FailedSource,
JavaStreamSource, TraversalBuilder }
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource,
SingleSource }
+import pekko.stream.impl.fusing.InflightSources._
import pekko.stream.scaladsl.Source
import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler
}
import pekko.util.OptionVal
-/**
- * INTERNAL API
- */
-@InternalApi
-private[pekko] object FlattenConcat {
- private sealed abstract class InflightSource[T] {
- def hasNext: Boolean
- def next(): T
- def tryPull(): Unit
- def cancel(cause: Throwable): Unit
- def isClosed: Boolean
- def hasFailed: Boolean = failure.isDefined
- def failure: Option[Throwable] = None
- def materialize(): Unit = ()
- }
-
- private final class InflightIteratorSource[T](iterator: Iterator[T]) extends
InflightSource[T] {
- override def hasNext: Boolean = iterator.hasNext
- override def next(): T = iterator.next()
- override def tryPull(): Unit = ()
- override def cancel(cause: Throwable): Unit = ()
- override def isClosed: Boolean = !hasNext
- }
-
- private final class InflightRangeSource[T](range: immutable.Range) extends
InflightSource[T] {
- private val isEmptyRange = range.isEmpty
- private val rangeLast = if (isEmptyRange) 0 else range.last
- private val rangeStep = range.step
- private var nextElement = range.start
- private var closed = isEmptyRange
-
- override def hasNext: Boolean = !closed
- override def next(): T =
- if (closed) throw new NoSuchElementException("next called after
completion")
- else {
- val current = nextElement
- if (current == rangeLast) closed = true
- else nextElement = current + rangeStep
- current.asInstanceOf[T]
- }
- override def tryPull(): Unit = ()
- override def cancel(cause: Throwable): Unit = ()
- override def isClosed: Boolean = closed
- }
-
- private final class InflightRepeatSource[T](elem: T) extends
InflightSource[T] {
- override def hasNext: Boolean = true
- override def next(): T = elem
- override def tryPull(): Unit = ()
- override def cancel(cause: Throwable): Unit = ()
- override def isClosed: Boolean = false
- }
-
- private final class InflightCompletedFutureSource[T](result: Try[T]) extends
InflightSource[T] {
- private var _hasNext = result.isSuccess
- override def hasNext: Boolean = _hasNext
- override def next(): T = {
- if (_hasNext) {
- _hasNext = false
- result.get
- } else throw new NoSuchElementException("next called after completion")
- }
- override def hasFailed: Boolean = result.isFailure
- override def failure: Option[Throwable] = result.failed.toOption
- override def tryPull(): Unit = ()
- override def cancel(cause: Throwable): Unit = ()
- override def isClosed: Boolean = true
- }
-
- private final class InflightPendingFutureSource[T](cb: InflightSource[T] =>
Unit)
- extends InflightSource[T]
- with (Try[T] => Unit) {
- private var result: Try[T] = MapAsync.NotYetThere
- private var consumed = false
- override def apply(result: Try[T]): Unit = {
- this.result = result
- cb(this)
- }
- override def hasNext: Boolean = (result ne MapAsync.NotYetThere) &&
!consumed && result.isSuccess
- override def next(): T = {
- if (!consumed) {
- consumed = true
- result.get
- } else throw new NoSuchElementException("next called after completion")
- }
- override def hasFailed: Boolean = (result ne MapAsync.NotYetThere) &&
result.isFailure
- override def failure: Option[Throwable] = if (result eq
MapAsync.NotYetThere) None else result.failed.toOption
- override def tryPull(): Unit = ()
- override def cancel(cause: Throwable): Unit = ()
- override def isClosed: Boolean = consumed || hasFailed
- }
-}
-
/**
* INTERNAL API
*/
@@ -138,7 +46,6 @@ private[pekko] final class FlattenConcat[T, M](parallelism:
Int)
override val shape: FlowShape[Graph[SourceShape[T], M], T] = FlowShape(in,
out)
override def createLogic(enclosingAttributes: Attributes) = {
object FlattenConcatLogic extends GraphStageLogic(shape) with InHandler
with OutHandler {
- import FlattenConcat._
// InflightSource[T] or SingleSource[T]
// AnyRef here to avoid lift the SingleSource[T] to InflightSource[T]
private var queue: BufferImpl[AnyRef] = _
@@ -150,7 +57,9 @@ private[pekko] final class FlattenConcat[T, M](parallelism:
Int)
private def futureSourceCompleted(futureSource: InflightSource[T]): Unit
= {
if (queue.peek() eq futureSource) {
if (isAvailable(out) && futureSource.hasNext) {
- push(out, futureSource.next()) // TODO should filter out the
`null` here?
+ // Success(null) is filtered out via
InflightPendingFutureSource.hasNext to stay
+ // consistent with GraphStages.FutureSource (which treats
Success(null) as completion).
+ push(out, futureSource.next())
if (futureSource.isClosed) {
handleCurrentSourceClosed(futureSource)
}
@@ -269,17 +178,43 @@ private[pekko] final class FlattenConcat[T,
M](parallelism: Int)
queue.enqueue(inflightSource)
}
- private def addCompletedFutureElem(elem: Try[T]): Unit = {
+ private def addJavaStreamSource(javaStream: JavaStreamSource[T, _]):
Unit = {
+ val inflightSource = new InflightJavaStreamSource[T](javaStream.open)
if (isAvailable(out) && queue.isEmpty) {
- elem match {
- case scala.util.Success(value) => push(out, value)
- case scala.util.Failure(ex) => onUpstreamFailure(ex)
+ if (inflightSource.hasNext) {
+ push(out, inflightSource.next())
+ if (inflightSource.hasNext) {
+ queue.enqueue(inflightSource)
+ }
}
- } else {
- queue.enqueue(new InflightCompletedFutureSource(elem))
+ } else if (inflightSource.hasNext) {
+ queue.enqueue(inflightSource)
}
}
+ private def addCompletedFutureElem(elem: Try[T]): Unit = elem match {
+ // DO NOT CHANGE
+ // WHY: GraphStages.FutureSource treats Success(null) as
completion-without-element
+ // (see FutureSource.handle: `case Success(null) => completeStage()`).
The fast path
+ // here MUST mirror that — pushing null would violate Reactive
Streams' no-null rule
+ // and diverge from the materialized FutureSource behaviour.
+ // How to apply: keep this branch in sync with FutureSource semantics;
if FutureSource
+ // ever changes how it treats Success(null), update here too.
+ case scala.util.Success(null) => // empty inner source: discard, slot
is freed when caller dequeues
+ case scala.util.Success(value) =>
+ if (isAvailable(out) && queue.isEmpty) {
+ push(out, value)
+ } else {
+ queue.enqueue(new InflightCompletedFutureSource(elem))
+ }
+ case scala.util.Failure(ex) =>
+ if (isAvailable(out) && queue.isEmpty) {
+ onUpstreamFailure(ex)
+ } else {
+ queue.enqueue(new InflightCompletedFutureSource(elem))
+ }
+ }
+
private def addPendingFutureElem(future: Future[T]): Unit = {
val inflightSource = new InflightPendingFutureSource[T](invokeCb)
future.onComplete(inflightSource)(scala.concurrent.ExecutionContext.parasitic)
@@ -336,13 +271,11 @@ private[pekko] final class FlattenConcat[T,
M](parallelism: Int)
case Some(elem) => addCompletedFutureElem(elem)
case None => addPendingFutureElem(future)
}
- case iterable: IterableSource[T] @unchecked =>
addSourceElements(iterable.elements.iterator)
- case iterator: IteratorSource[T] @unchecked =>
addSourceElements(iterator.createIterator())
- case range: RangeSource[T] @unchecked =>
addRangeSource(range.range)
- case repeat: RepeatSource[T] @unchecked =>
addRepeatSource(repeat.elem)
- case javaStream: JavaStreamSource[T, _] @unchecked =>
- import scala.jdk.CollectionConverters._
- addSourceElements(javaStream.open().iterator.asScala)
+ case iterable: IterableSource[T] @unchecked =>
addSourceElements(iterable.elements.iterator)
+ case iterator: IteratorSource[T] @unchecked =>
addSourceElements(iterator.createIterator())
+ case range: RangeSource[T] @unchecked =>
addRangeSource(range.range)
+ case repeat: RepeatSource[T] @unchecked =>
addRepeatSource(repeat.elem)
+ case javaStream: JavaStreamSource[T, _] @unchecked =>
addJavaStreamSource(javaStream)
case failed: FailedSource[T] @unchecked =>
addCompletedFutureElem(Failure(failed.failure))
case maybeEmpty if TraversalBuilder.isEmptySource(maybeEmpty) =>
// Empty source is discarded
case _ =>
attachAndMaterializeSource(source)
@@ -382,12 +315,25 @@ private[pekko] final class FlattenConcat[T,
M](parallelism: Int)
}
}
- private def tryPullNextSourceInQueue(): Unit = {
- // pull the new emitting source
- val nextSource = queue.peek()
- if (nextSource.isInstanceOf[InflightSource[T] @unchecked]) {
- nextSource.asInstanceOf[InflightSource[T]].tryPull()
- }
+ private def tryPullNextSourceInQueue(): Unit = queue.peek() match {
+ // DO NOT CHANGE
+ // WHY: The previous head source may complete without emitting (e.g. a
pending
+ // future resolving to Success(null), which we treat as
completion-without-
+ // element to mirror GraphStages.FutureSource). When that happens with
demand
+ // still pending and `in` already closed, no further onPull will fire
— the
+ // stage would stall on whatever sits next in the queue. Drive the new
head
+ // directly: push SingleSource, or pushOut for InflightSource (which
also
+ // surfaces a queued InflightCompletedFutureSource failure, otherwise
the
+ // failure would never propagate).
+ case src: SingleSource[T] @unchecked =>
+ if (isAvailable(out)) {
+ push(out, src.elem)
+ removeSource()
+ }
+ case src: InflightSource[T] @unchecked =>
+ if (isAvailable(out)) pushOut(src)
+ else src.tryPull()
+ case _ => // queue empty or unexpected: nothing to pull
}
setHandlers(in, out, this)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/InflightSources.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/InflightSources.scala
new file mode 100644
index 0000000000..3d4a11c039
--- /dev/null
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/InflightSources.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import scala.collection.immutable
+import scala.util.{ Success, Try }
+import scala.util.control.NonFatal
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+
+/**
+ * INTERNAL API
+ *
+ * Lightweight in-memory representations of value-presented `Source`s that can
+ * be consumed without paying for substream materialization. Shared between
+ * [[FlattenConcat]] and [[FlattenMerge]].
+ */
+@InternalApi
+private[fusing] object InflightSources {
+
+ /**
+ * Common base. The optimized value-presented variants below have no
+ * upstream to pull from or cancel, so `tryPull` / `cancel` / `materialize`
+ * default to no-ops. Stages that wrap a real `SubSinkInlet` (for sources
+ * that still require materialization) override these as needed.
+ */
+ private[fusing] abstract class InflightSource[T] {
+ def hasNext: Boolean
+ def next(): T
+ def isClosed: Boolean
+ def tryPull(): Unit = ()
+ def cancel(cause: Throwable): Unit = ()
+ def materialize(): Unit = ()
+ def hasFailed: Boolean = failure.isDefined
+ def failure: Option[Throwable] = None
+ }
+
+ private[fusing] final class InflightIteratorSource[T](iterator: Iterator[T])
extends InflightSource[T] {
+ override def hasNext: Boolean = iterator.hasNext
+ override def next(): T = iterator.next()
+ override def isClosed: Boolean = !hasNext
+ }
+
+ /**
+ * Wraps a [[java.util.stream.BaseStream]] so it can be consumed without
+ * substream materialization while still honoring the close contract: the
+ * underlying stream is closed on exhaustion, on `cancel`, and eagerly when
+ * the spliterator advertises it as empty.
+ *
+ * The recursive `S <: BaseStream[T, S]` bound that [[JavaStreamSource]]
+ * carries is intentionally dropped here: only `iterator()` and `close()`
+ * are invoked internally, and both are available on `BaseStream[T, _]`.
+ * Keeping the bound would force callers to skolemize the existential
+ * captured by pattern matching `JavaStreamSource[T, _]`, which Scala 3
+ * does not do implicitly.
+ */
+ private[fusing] final class InflightJavaStreamSource[T](
+ open: () => java.util.stream.BaseStream[T, _]) extends InflightSource[T]
{
+ private val stream: java.util.stream.BaseStream[T, _] = open()
+ private val iterator: java.util.Iterator[T] = stream.iterator()
+ private var closed: Boolean = false
+ // Eagerly close empty streams so we don't leak the resource for empty
inner sources.
+ if (!iterator.hasNext) closeStream()
+
+ private def closeStream(): Unit =
+ if (!closed) {
+ closed = true
+ try stream.close()
+ catch { case NonFatal(_) => () }
+ }
+
+ override def hasNext: Boolean = !closed
+ override def next(): T =
+ if (closed) throw new NoSuchElementException("next called after
completion")
+ else {
+ try {
+ val elem = iterator.next()
+ if (!iterator.hasNext) closeStream()
+ elem
+ } catch {
+ case NonFatal(ex) =>
+ // If user code in iterator.next() throws, ensure the BaseStream is
+ // closed before propagating the failure: postStop on the enclosing
+ // FlattenMerge/FlattenConcat may not have a chance to cancel us.
+ closeStream()
+ throw ex
+ }
+ }
+ override def isClosed: Boolean = closed
+ override def cancel(cause: Throwable): Unit = closeStream()
+ }
+
+ private[fusing] final class InflightRangeSource[T](range: immutable.Range)
extends InflightSource[T] {
+ private val isEmptyRange = range.isEmpty
+ private val rangeLast = if (isEmptyRange) 0 else range.last
+ private val rangeStep = range.step
+ private var nextElement = range.start
+ private var closed = isEmptyRange
+
+ override def hasNext: Boolean = !closed
+ override def next(): T =
+ if (closed) throw new NoSuchElementException("next called after
completion")
+ else {
+ val current = nextElement
+ if (current == rangeLast) closed = true
+ else nextElement = current + rangeStep
+ current.asInstanceOf[T]
+ }
+ override def isClosed: Boolean = closed
+ }
+
+ private[fusing] final class InflightRepeatSource[T](elem: T) extends
InflightSource[T] {
+ override def hasNext: Boolean = true
+ override def next(): T = elem
+ override def isClosed: Boolean = false
+ }
+
+ // DO NOT CHANGE
+ // WHY: GraphStages.FutureSource treats Success(null) as
completion-without-element
+ // (see FutureSource.handle: `case Success(null) => completeStage()`). The
inflight
+ // wrappers below MUST stay consistent with that behaviour, otherwise the
optimized
+ // value-presented fast path would emit null — violating Reactive Streams'
no-null
+ // rule and diverging from the materialized FutureSource. If FutureSource
semantics
+ // are ever changed, these wrappers must be updated in lock-step.
+ private def hasFutureElement[T](result: Try[T]): Boolean = result match {
+ case Success(v) => v != null
+ case _ => false
+ }
+
+ private[fusing] final class InflightCompletedFutureSource[T](result: Try[T])
extends InflightSource[T] {
+ private var _hasNext = hasFutureElement(result)
+ override def hasNext: Boolean = _hasNext
+ override def next(): T =
+ if (_hasNext) {
+ _hasNext = false
+ result.get
+ } else throw new NoSuchElementException("next called after completion")
+ override def hasFailed: Boolean = result.isFailure
+ override def failure: Option[Throwable] = result.failed.toOption
+ // The future has already produced its value (or failure); the source is
+ // fundamentally one-shot and reports as closed even before consumption.
+ override def isClosed: Boolean = true
+ }
+
+ private[fusing] final class InflightPendingFutureSource[T](cb:
InflightSource[T] => Unit)
+ extends InflightSource[T]
+ with (Try[T] => Unit) {
+ private var result: Try[T] = MapAsync.NotYetThere
+ private var consumed = false
+ override def apply(result: Try[T]): Unit = {
+ this.result = result
+ cb(this)
+ }
+ override def hasNext: Boolean = (result ne MapAsync.NotYetThere) &&
!consumed && hasFutureElement(result)
+ override def next(): T =
+ if (!consumed) {
+ consumed = true
+ result.get
+ } else throw new NoSuchElementException("next called after completion")
+ override def hasFailed: Boolean = (result ne MapAsync.NotYetThere) &&
result.isFailure
+ override def failure: Option[Throwable] = if (result eq
MapAsync.NotYetThere) None else result.failed.toOption
+ override def isClosed: Boolean = consumed || hasFailed ||
+ ((result ne MapAsync.NotYetThere) && !hasFutureElement(result))
+ }
+}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala
index 5b6c82745f..bf9dd86433 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala
@@ -18,8 +18,10 @@ import java.util.concurrent.atomic.AtomicReference
import scala.annotation.{ nowarn, tailrec }
import scala.collection.immutable
+import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._
+import scala.util.{ Failure, Success, Try }
import scala.util.control.NonFatal
import org.apache.pekko
@@ -30,13 +32,14 @@ import
pekko.stream.ActorAttributes.StreamSubscriptionTimeout
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream.Attributes.SourceLocation
import pekko.stream.Supervision.Decider
-import pekko.stream.impl.{ Buffer => BufferImpl }
+import pekko.stream.impl.{ Buffer => BufferImpl, FailedSource,
JavaStreamSource }
import pekko.stream.impl.ActorSubscriberMessage
import pekko.stream.impl.ActorSubscriberMessage.OnError
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.SubscriptionTimeoutException
import pekko.stream.impl.TraversalBuilder
-import pekko.stream.impl.fusing.GraphStages.SingleSource
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource,
SingleSource }
+import pekko.stream.impl.fusing.InflightSources._
import pekko.stream.scaladsl._
import pekko.stream.stage._
import pekko.util.OptionVal
@@ -57,14 +60,33 @@ import pekko.util.OptionVal
new GraphStageLogic(shape) with OutHandler with InHandler {
var sources = Set.empty[SubSinkInlet[T]]
var pendingSingleSources = 0
- def activeSources = sources.size + pendingSingleSources
+ var pendingInflightSources = 0
+ def activeSources = sources.size + pendingSingleSources +
pendingInflightSources
- // To be able to optimize for SingleSource without materializing them
the queue may hold either
- // SubSinkInlet[T] or SingleSource
+ // To be able to optimize for value-presented sources without
materializing them, the queue may hold
+ // SubSinkInlet[T], SingleSource, or InflightSource[T]
var queue: BufferImpl[AnyRef] = _
override def preStart(): Unit = queue = BufferImpl(breadth,
enclosingAttributes)
+ private val invokeCb: InflightSource[T] => Unit =
+ getAsyncCallback[InflightSource[T]](inflightFutureCompleted).invoke
+
+ private def inflightFutureCompleted(source: InflightSource[T]): Unit = {
+ if (source.hasFailed) {
+ failStage(source.failure.get)
+ } else if (source.hasNext) {
+ if (isAvailable(out) && queue.isEmpty) {
+ push(out, source.next())
+ removeSource(source)
+ } else {
+ queue.enqueue(source)
+ }
+ } else {
+ removeSource(source)
+ }
+ }
+
def pushOut(): Unit = {
queue.dequeue() match {
case src: SubSinkInlet[T] @unchecked =>
@@ -74,6 +96,10 @@ import pekko.util.OptionVal
case single: SingleSource[T] @unchecked =>
push(out, single.elem)
removeSource(single)
+ case inflight: InflightSource[T] @unchecked =>
+ push(out, inflight.next())
+ if (inflight.isClosed) removeSource(inflight)
+ else queue.enqueue(inflight)
case other =>
throw new IllegalStateException(s"Unexpected source type in queue:
'${other.getClass}'")
}
@@ -100,36 +126,111 @@ import pekko.util.OptionVal
setHandlers(in, out, this)
def addSource(source: Graph[SourceShape[T], M]): Unit = {
- // If it's a SingleSource or wrapped such we can push the element
directly instead of materializing it.
- // Have to use AnyRef because of OptionVal null value.
-
TraversalBuilder.getSingleSource(source.asInstanceOf[Graph[SourceShape[AnyRef],
M]]) match {
- case OptionVal.Some(single) =>
- if (isAvailable(out) && queue.isEmpty) {
- push(out, single.elem.asInstanceOf[T])
- } else {
- queue.enqueue(single)
- pendingSingleSources += 1
- }
- case _ =>
- val sinkIn = new SubSinkInlet[T]("FlattenMergeSink")
- sinkIn.setHandler(new InHandler {
- override def onPush(): Unit = {
- if (isAvailable(out)) {
- push(out, sinkIn.grab())
- sinkIn.pull()
- } else {
- queue.enqueue(sinkIn)
+ // If it's a value-presented source (or wrapped such) we can avoid
substream materialization.
+ TraversalBuilder.getValuePresentedSource(source) match {
+ case OptionVal.Some(graph) =>
+ graph match {
+ case single: SingleSource[T] @unchecked =>
addSingleSource(single)
+ case futureSource: FutureSource[T] @unchecked =>
+ val future = futureSource.future
+ future.value match {
+ case Some(elem) => addCompletedFutureElem(elem)
+ case None => addPendingFutureElem(future)
}
- }
- override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable)
removeSource(sinkIn)
- })
- sinkIn.pull()
- sources += sinkIn
- val graph = Source.fromGraph(source).to(sinkIn.sink)
- interpreter.subFusingMaterializer.materialize(graph,
defaultAttributes = enclosingAttributes)
+ case iterable: IterableSource[T] @unchecked =>
addInflightIteratorSource(iterable.elements.iterator)
+ case iterator: IteratorSource[T] @unchecked =>
addInflightIteratorSource(iterator.createIterator())
+ case range: RangeSource[T] @unchecked =>
addInflightRangeSource(range.range)
+ case repeat: RepeatSource[T] @unchecked =>
addInflightRepeatSource(repeat.elem)
+ case javaStream: JavaStreamSource[T, _] @unchecked =>
addInflightJavaStreamSource(javaStream)
+ case failed: FailedSource[T] @unchecked =>
addCompletedFutureElem(Failure(failed.failure))
+ case maybeEmpty if TraversalBuilder.isEmptySource(maybeEmpty) =>
// Empty source is discarded
+ case _ =>
attachAndMaterializeSource(source)
+ }
+ case _ => attachAndMaterializeSource(source)
+ }
+ }
+
+ private def addSingleSource(single: SingleSource[T]): Unit = {
+ if (isAvailable(out) && queue.isEmpty) {
+ push(out, single.elem)
+ } else {
+ queue.enqueue(single)
+ pendingSingleSources += 1
+ }
+ }
+
+ private def addInflightSource(inflight: InflightSource[T]): Unit = {
+ if (isAvailable(out) && queue.isEmpty) {
+ push(out, inflight.next())
+ if (!inflight.isClosed) {
+ queue.enqueue(inflight)
+ pendingInflightSources += 1
+ }
+ } else {
+ queue.enqueue(inflight)
+ pendingInflightSources += 1
}
}
+ private def addInflightIteratorSource(iterator: Iterator[T]): Unit =
+ if (iterator.hasNext) addInflightSource(new
InflightIteratorSource[T](iterator))
+
+ private def addInflightRangeSource(range: immutable.Range): Unit =
+ if (range.nonEmpty) addInflightSource(new
InflightRangeSource[T](range))
+
+ private def addInflightRepeatSource(elem: T): Unit =
+ addInflightSource(new InflightRepeatSource[T](elem))
+
+ private def addInflightJavaStreamSource(javaStream: JavaStreamSource[T,
_]): Unit = {
+ val inflight = new InflightJavaStreamSource[T](javaStream.open)
+ if (inflight.hasNext) addInflightSource(inflight)
+ }
+
+ private def addCompletedFutureElem(elem: Try[T]): Unit = elem match {
+ // DO NOT CHANGE
+ // WHY: GraphStages.FutureSource treats Success(null) as
completion-without-element
+ // (see FutureSource.handle: `case Success(null) => completeStage()`).
The fast path
+ // here MUST mirror that — pushing null would violate Reactive
Streams' no-null rule
+ // and diverge from the materialized FutureSource behaviour.
+ // How to apply: keep this branch in sync with FutureSource semantics;
if FutureSource
+ // ever changes how it treats Success(null), update here too.
+ case Success(null) => // empty inner source: discard, slot is freed
by caller
+ case Success(value) =>
+ if (isAvailable(out) && queue.isEmpty) {
+ push(out, value)
+ } else {
+ queue.enqueue(new InflightCompletedFutureSource[T](elem))
+ pendingInflightSources += 1
+ }
+ case Failure(ex) => failStage(ex)
+ }
+
+ private def addPendingFutureElem(future: Future[T]): Unit = {
+ val inflightSource = new InflightPendingFutureSource[T](invokeCb)
+
future.onComplete(inflightSource)(scala.concurrent.ExecutionContext.parasitic)
+ // Future is not yet ready; occupy a breadth slot until completion
+ pendingInflightSources += 1
+ }
+
+ private def attachAndMaterializeSource(source: Graph[SourceShape[T],
M]): Unit = {
+ val sinkIn = new SubSinkInlet[T]("FlattenMergeSink")
+ sinkIn.setHandler(new InHandler {
+ override def onPush(): Unit = {
+ if (isAvailable(out)) {
+ push(out, sinkIn.grab())
+ sinkIn.pull()
+ } else {
+ queue.enqueue(sinkIn)
+ }
+ }
+ override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable)
removeSource(sinkIn)
+ })
+ sinkIn.pull()
+ sources += sinkIn
+ val graph = Source.fromGraph(source).to(sinkIn.sink)
+ interpreter.subFusingMaterializer.materialize(graph, defaultAttributes
= enclosingAttributes)
+ }
+
def removeSource(src: AnyRef): Unit = {
val pullSuppressed = activeSources == breadth
src match {
@@ -137,13 +238,26 @@ import pekko.util.OptionVal
sources -= sub
case _: SingleSource[_] =>
pendingSingleSources -= 1
+ case _: InflightSource[_] =>
+ pendingInflightSources -= 1
case other => throw new IllegalArgumentException(s"Unexpected source
type: '${other.getClass}'")
}
if (pullSuppressed) tryPull(in)
if (activeSources == 0 && isClosed(in)) completeStage()
}
- override def postStop(): Unit = sources.foreach(_.cancel())
+ override def postStop(): Unit = {
+ sources.foreach(_.cancel())
+ // Cancel any queued inflight sources so close-sensitive resources
(e.g. JavaStream
+ // backed by IO) are released even if downstream cancels mid-flight.
+ while (queue.nonEmpty) {
+ queue.dequeue() match {
+ case inflight: InflightSource[T] @unchecked =>
+
inflight.cancel(SubscriptionWithCancelException.NoMoreElementsNeeded)
+ case _ => // SubSinkInlet already cancelled above; SingleSource
needs no cleanup
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]