This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch hepin-gather-statefulmap-coverage-v2 in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 429a3e06797a3b34b1434b69fdb65a62da581fb8 Author: He-Pin <[email protected]> AuthorDate: Tue Apr 7 03:53:30 2026 +0800 stream: address deep review findings for gather operator - Add Gatherers.oneToOne() factory methods for Java DSL hot path access - Fix singleCollector.push to correctly handle 3+ outputs per gather call - Null out pendingOverflow on restart to prevent memory accumulation - Add null check on factory result to catch invalid factories early - Expand SubFlow/SubSource gather documentation for Java DSL - Align Scala/Java DSL documentation language - Add tests: materialization independence, empty upstream, onComplete null emission, multi-output backpressure 🤖 Generated with [Qoder](https://qoder.com) --- .../pekko/stream/scaladsl/FlowGatherSpec.scala | 78 ++++++++++++++++++++++ .../org/apache/pekko/stream/impl/fusing/Ops.scala | 44 ++++++++---- .../org/apache/pekko/stream/javadsl/Flow.scala | 2 +- .../org/apache/pekko/stream/javadsl/Gather.scala | 52 ++++++++++++++- .../org/apache/pekko/stream/javadsl/Source.scala | 2 +- .../org/apache/pekko/stream/javadsl/SubFlow.scala | 22 +++++- .../apache/pekko/stream/javadsl/SubSource.scala | 22 +++++- .../org/apache/pekko/stream/scaladsl/Gather.scala | 35 ++++++++++ 8 files changed, 235 insertions(+), 22 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala index e8f1e9ed32..6e2b1b8dcc 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala @@ -738,6 +738,84 @@ class FlowGatherSpec extends StreamSpec { } } + "create independent gatherer instances per materialization" in { + val stateCounter = new AtomicInteger(0) + val flow = Flow[Int] + .gather(() => { + stateCounter.incrementAndGet() + new Gatherer[Int, Int] { + private var acc = 0 + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = { + acc += elem + collector.push(acc) + } + } + }) + + val source1 = Source(1 to 3).via(flow).runWith(TestSink[Int]()) + val source2 = Source(10 to 12).via(flow).runWith(TestSink[Int]()) + + source1.request(3) + .expectNext(1, 3, 6) + .expectComplete() + source2.request(3) + .expectNext(10, 21, 33) + .expectComplete() + + // Factory should be called once per materialization + stateCounter.get() shouldBe 2 + } + + "call onComplete for empty upstream" in { + val gate = BeenCalledTimesGate() + Source.empty[Int] + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = + collector.push(elem) + + override def onComplete(collector: GatherCollector[Int]): Unit = + gate.mark() + }) + .runWith(TestSink[Int]()) + .request(1) + .expectComplete() + gate.ensure() + } + + "fail when onComplete emits null" in { + Source.single(1) + .gather(() => + new Gatherer[Int, String] { + override def apply(elem: Int, collector: GatherCollector[String]): Unit = () + override def onComplete(collector: GatherCollector[String]): Unit = + collector.push(null.asInstanceOf[String]) + }) + .runWith(TestSink[String]()) + .request(1) + .expectError() shouldBe a[NullPointerException] + } + + "handle 3+ outputs with backpressure mid-drain" in { + Source.single(1) + .gather(() => + new Gatherer[Int, Int] { + override def apply(elem: Int, collector: GatherCollector[Int]): Unit = { + collector.push(elem) + collector.push(elem + 1) + collector.push(elem + 2) + collector.push(elem + 3) + } + }) + .runWith(TestSink[Int]()) + .request(2) + .expectNext(1, 2) + .expectNoMessage(200.millis) + .request(2) + .expectNext(3, 4) + .expectComplete() + } + "support junction output ports" in { val source = Source(List((1, 1), (2, 2))) val graph = RunnableGraph.fromGraph(GraphDSL.createGraph(TestSink[(Int, Int)]()) { implicit b => sink => diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index a2e6d1d9db..69beb30715 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -55,6 +55,7 @@ import pekko.stream.scaladsl.{ StatefulMapConcatAccumulator, StatefulMapConcatAccumulatorFactory } +import pekko.stream.javadsl.{ Gatherers => JGatherers } import pekko.stream.stage._ import pekko.util.{ ConstantFun, OptionVal } @@ -2364,17 +2365,25 @@ private[pekko] final class Gather[In, Out](factory: () => Gatherer[In, Out]) ext private val singleCollector = new GatherCollector[Out] { override def push(elem: Out): Unit = { ReactiveStreamsCompliance.requireNonNullElement(elem) - val cb = callbackFirst - if (cb.asInstanceOf[AnyRef] eq null) { - callbackFirst = elem - } else { - pendingFirst = cb - hasPendingFirst = true - callbackFirst = null.asInstanceOf[Out] - multiMode = true + if (hasPendingFirst) { + // Already in multi mode: all pushes go directly to overflow queue. if (pendingOverflow eq null) pendingOverflow = new java.util.ArrayDeque[Out]() pendingOverflow.addLast(elem) + } else { + val cb = callbackFirst + if (cb.asInstanceOf[AnyRef] eq null) { + callbackFirst = elem + } else { + // Second output from this gather call: transition to multi mode. + pendingFirst = cb + hasPendingFirst = true + callbackFirst = null.asInstanceOf[Out] + multiMode = true + if (pendingOverflow eq null) + pendingOverflow = new java.util.ArrayDeque[Out]() + pendingOverflow.addLast(elem) + } } } } @@ -2391,7 +2400,8 @@ private[pekko] final class Gather[In, Out](factory: () => Gatherer[In, Out]) ext private var hasPendingFirst = false private var multiMode = false private var gatherer: Gatherer[In, Out] = _ - private var oneToOneGatherer: OneToOneGatherer[In, Out] = _ + // Hot-path handle for one-to-one mappings. Supports both Scala and Java DSL implementations. + private var oneToOneGatherer: AnyRef = _ private var finalAction = FinalAction.None private var finalFailure: Throwable = null private var needInvokeOnCompleteCallback = false @@ -2466,7 +2476,10 @@ private[pekko] final class Gather[In, Out](factory: () => Gatherer[In, Out]) ext } private def onPushOneToOne(): Unit = { - val elem = oneToOneGatherer.applyOne(grab(in)) + val elem = oneToOneGatherer match { + case s: OneToOneGatherer[In, Out] @unchecked => s.applyOne(grab(in)) + case j: JGatherers.OneToOneGatherer[In, Out] @unchecked => j.applyOne(grab(in)) + } ReactiveStreamsCompliance.requireNonNullElement(elem) if (isAvailable(out)) push(out, elem) @@ -2543,12 +2556,17 @@ private[pekko] final class Gather[In, Out](factory: () => Gatherer[In, Out]) ext pull(in) private def restartGatherer(): Unit = { - gatherer = factory() + val newGatherer = factory() + if (newGatherer eq null) + throw new IllegalStateException("Gatherer factory must not return null") + gatherer = newGatherer oneToOneGatherer = gatherer match { - case specialized: OneToOneGatherer[In, Out] @unchecked => specialized - case _ => null + case _: OneToOneGatherer[?, ?] => gatherer + case _: JGatherers.OneToOneGatherer[?, ?] => gatherer + case _ => null } multiMode = false + pendingOverflow = null needInvokeOnCompleteCallback = true } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index fdd2dae31a..0bdd5de129 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -872,7 +872,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * Transform each input element into zero or more output elements without requiring tuple or collection allocations * imposed by the operator API itself. * - * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields. + * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields or via captured variables. * The provided [[GatherCollector]] can emit zero or more output elements for each input element. * * The collector is only valid while the callback is running. Emitted elements MUST NOT be `null`. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala index d5c3b1d091..d65aaeb411 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala @@ -24,6 +24,7 @@ import org.apache.pekko.japi.function * Collector passed to [[Gatherer]] for emitting output elements. * * The collector is only valid while the current [[Gatherer]] callback is running. + * Emitted elements MUST NOT be `null`. * * @since 1.3.0 */ @@ -38,7 +39,7 @@ trait GatherCollector[-Out] extends function.Procedure[Out] { * A stateful gatherer for the `gather` operator. * * A new gatherer instance is created for each materialization and on each supervision restart. - * It can keep mutable state in fields. + * It can keep mutable state in fields or via captured variables. * * @since 1.3.0 */ @@ -54,3 +55,52 @@ trait Gatherer[-In, Out] extends function.Procedure2[In, GatherCollector[Out]] { */ def onComplete(collector: GatherCollector[Out]): Unit = () } + +/** Factory methods for [[Gatherer]]. */ +object Gatherers { + + /** + * Creates a specialized `Gatherer` for one-to-one transformations (exactly one output per input). + * + * This variant avoids the overhead of the `GatherCollector` indirection and achieves the + * same performance as the native `map` operator while still supporting mutable state and + * the `onComplete` callback. + * + * @param f the one-to-one transformation function + * @since 1.3.0 + */ + def oneToOne[In, Out](f: function.Function[In, Out]): Gatherer[In, Out] = + new OneToOneGathererImpl[In, Out](f) + + /** + * Creates a specialized `Gatherer` for one-to-one transformations with an `onComplete` callback. + * + * @param f the one-to-one transformation function + * @param onComplete callback invoked when the stage terminates or restarts + * @since 1.3.0 + */ + def oneToOne[In, Out](f: function.Function[In, Out], onComplete: function.Effect): Gatherer[In, Out] = + new OneToOneGathererImpl[In, Out](f, onComplete) + + /** + * A specialized [[Gatherer]] for one-to-one transformations that avoids the `GatherCollector` overhead. + * + * @since 1.3.0 + */ + @DoNotInherit + trait OneToOneGatherer[In, Out] extends Gatherer[In, Out] { + def applyOne(in: In): Out + + final override def apply(in: In, collector: GatherCollector[Out]): Unit = + collector.push(applyOne(in)) + } + + private final class OneToOneGathererImpl[In, Out]( + f: function.Function[In, Out], + onCompleteCallback: function.Effect = null) + extends OneToOneGatherer[In, Out] { + override def applyOne(in: In): Out = f.apply(in) + override def onComplete(@annotation.nowarn collector: GatherCollector[Out]): Unit = + if (onCompleteCallback != null) onCompleteCallback.apply() + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index cb4f2ae576..ee8d25a670 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -2765,7 +2765,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * Transform each input element into zero or more output elements without requiring tuple or collection allocations * imposed by the operator API itself. * - * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields. + * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields or via captured variables. * The provided [[GatherCollector]] can emit zero or more output elements for each input element. * * The collector is only valid while the callback is running. Emitted elements MUST NOT be `null`. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 80fe136d50..0ce6b0fa47 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -349,9 +349,25 @@ final class SubFlow[In, Out, Mat]( * Transform each input element into zero or more output elements without requiring tuple or collection allocations * imposed by the operator API itself. * - * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields. - * `onComplete` is invoked on upstream completion, upstream failure, downstream cancellation, - * abrupt stage termination, and supervision restart. + * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields or via captured variables. + * The provided [[GatherCollector]] can emit zero or more output elements for each input element. + * + * The collector is only valid while the callback is running. Emitted elements MUST NOT be `null`. + * + * The `onComplete` callback is invoked once whenever the stage terminates or restarts: on upstream completion, + * upstream failure, downstream cancellation, abrupt stage termination, or supervision restart. + * Elements emitted from `onComplete` are emitted before upstream-failure propagation, completion, or restart, + * and are ignored on downstream cancellation and abrupt termination. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the gatherer emits an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and the gatherer has emitted all pending elements, including `onComplete` + * + * '''Cancels when''' downstream cancels * * @since 1.3.0 */ diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index e5d80fb734..0513d06fe4 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -340,9 +340,25 @@ final class SubSource[Out, Mat]( * Transform each input element into zero or more output elements without requiring tuple or collection allocations * imposed by the operator API itself. * - * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields. - * `onComplete` is invoked on upstream completion, upstream failure, downstream cancellation, - * abrupt stage termination, and supervision restart. + * A new [[Gatherer]] is created for each materialization and can keep mutable state in fields or via captured variables. + * The provided [[GatherCollector]] can emit zero or more output elements for each input element. + * + * The collector is only valid while the callback is running. Emitted elements MUST NOT be `null`. + * + * The `onComplete` callback is invoked once whenever the stage terminates or restarts: on upstream completion, + * upstream failure, downstream cancellation, abrupt stage termination, or supervision restart. + * Elements emitted from `onComplete` are emitted before upstream-failure propagation, completion, or restart, + * and are ignored on downstream cancellation and abrupt termination. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the gatherer emits an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and the gatherer has emitted all pending elements, including `onComplete` + * + * '''Cancels when''' downstream cancels * * @since 1.3.0 */ diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala index 88fa030657..85fa35a0ed 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala @@ -53,6 +53,41 @@ trait Gatherer[-In, +Out] { def onComplete(collector: GatherCollector[Out]): Unit = () } +/** Factory methods for [[Gatherer]]. */ +object Gatherer { + + /** + * Creates a specialized `Gatherer` for one-to-one transformations (exactly one output per input). + * + * This variant avoids the overhead of the `GatherCollector` indirection and achieves the + * same performance as the native `map` operator while still supporting mutable state and + * the `onComplete` callback. + * + * @param f the one-to-one transformation function + * @since 1.3.0 + */ + def oneToOne[In, Out](f: In => Out): Gatherer[In, Out] = + new OneToOneGathererImpl(f) + + /** + * Creates a specialized `Gatherer` for one-to-one transformations with an `onComplete` callback. + * + * @param f the one-to-one transformation function + * @param onComplete callback invoked when the stage terminates or restarts + * @since 1.3.0 + */ + def oneToOne[In, Out](f: In => Out, onComplete: () => Unit): Gatherer[In, Out] = + new OneToOneGathererImpl(f, onComplete) + + private final class OneToOneGathererImpl[In, Out]( + f: In => Out, + onCompleteCallback: () => Unit = () => ()) + extends OneToOneGatherer[In, Out] { + override def applyOne(in: In): Out = f(in) + override def onComplete(collector: GatherCollector[Out]): Unit = onCompleteCallback() + } +} + /** * INTERNAL API */ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
