This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new ce3620fc1a chore: Fix leak in FlatMapPrefix operator. (#1622)
ce3620fc1a is described below
commit ce3620fc1a6a17b00ad29fe103c8c77cff796776
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Jan 4 00:56:50 2025 +0800
chore: Fix leak in FlatMapPrefix operator. (#1622)
---
.../pekko/stream/impl/fusing/FlatMapPrefix.scala | 27 ++++++++++++----------
1 file changed, 15 insertions(+), 12 deletions(-)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala
index c3672ae1ea..1191e44eaf 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala
@@ -43,8 +43,10 @@ import pekko.util.OptionVal
.mandatoryAttribute[Attributes.NestedMaterializationCancellationPolicy]
.propagateToNestedMaterialization
val matPromise = Promise[M]()
- val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
- val accumulated = collection.mutable.Buffer.empty[In]
+ object FlatMapPrefixLogic extends GraphStageLogic(shape) with InHandler
with OutHandler {
+ private var left = n
+ private var builder = Vector.newBuilder[In]
+ builder.sizeHint(left)
private var subSource = OptionVal.none[SubSourceOutlet[In]]
private var subSink = OptionVal.none[SubSinkInlet[Out]]
@@ -65,11 +67,12 @@ import pekko.util.OptionVal
subSource match {
case OptionVal.Some(s) => s.push(grab(in))
case _ =>
- accumulated.append(grab(in))
- if (accumulated.size == n) {
+ builder += grab(in)
+ left -= 1
+ if (left == 0) {
materializeFlow()
} else {
- // gi'me some more!
+ // give me some more!
pull(in)
}
}
@@ -98,12 +101,12 @@ import pekko.util.OptionVal
// delegate to subSink
s.pull()
case _ =>
- if (accumulated.size < n) pull(in)
- else if (accumulated.size == n) {
+ if (left > 0) pull(in)
+ else if (left == 0) {
// corner case for n = 0, can be handled in FlowOps
materializeFlow()
} else {
- throw new IllegalStateException(s"Unexpected accumulated size:
${accumulated.size} (n: $n)")
+ throw new IllegalStateException(s"Unexpected accumulated size,
left : $left (n: $n)")
}
}
}
@@ -114,7 +117,7 @@ import pekko.util.OptionVal
case _ =>
if (propagateToNestedMaterialization) {
downstreamCause = OptionVal.Some(cause)
- if (accumulated.size == n) {
+ if (left == 0) {
// corner case for n = 0, can be handled in FlowOps
materializeFlow()
} else if (!hasBeenPulled(in)) { // if in was already closed,
nested flow would have already been materialized
@@ -128,8 +131,8 @@ import pekko.util.OptionVal
def materializeFlow(): Unit =
try {
- val prefix = accumulated.toVector
- accumulated.clear()
+ val prefix = builder.result()
+ builder = null // free for GC
subSource = OptionVal.Some(new
SubSourceOutlet[In]("FlatMapPrefix.subSource"))
val theSubSource = subSource.get
subSink = OptionVal.Some(new
SubSinkInlet[Out]("FlatMapPrefix.subSink"))
@@ -196,6 +199,6 @@ import pekko.util.OptionVal
case NonFatal(ex) => failStage(ex)
}
}
- (logic, matPromise.future)
+ (FlatMapPrefixLogic, matPromise.future)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]