This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new ce3620fc1a chore: Fix leak in FlatMapPrefix operator. (#1622)
ce3620fc1a is described below

commit ce3620fc1a6a17b00ad29fe103c8c77cff796776
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Jan 4 00:56:50 2025 +0800

    chore: Fix leak in FlatMapPrefix operator. (#1622)
---
 .../pekko/stream/impl/fusing/FlatMapPrefix.scala   | 27 ++++++++++++----------
 1 file changed, 15 insertions(+), 12 deletions(-)

diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala
index c3672ae1ea..1191e44eaf 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala
@@ -43,8 +43,10 @@ import pekko.util.OptionVal
         .mandatoryAttribute[Attributes.NestedMaterializationCancellationPolicy]
         .propagateToNestedMaterialization
     val matPromise = Promise[M]()
-    val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
-      val accumulated = collection.mutable.Buffer.empty[In]
+    object FlatMapPrefixLogic extends GraphStageLogic(shape) with InHandler 
with OutHandler {
+      private var left = n
+      private var builder = Vector.newBuilder[In]
+      builder.sizeHint(left)
 
       private var subSource = OptionVal.none[SubSourceOutlet[In]]
       private var subSink = OptionVal.none[SubSinkInlet[Out]]
@@ -65,11 +67,12 @@ import pekko.util.OptionVal
         subSource match {
           case OptionVal.Some(s) => s.push(grab(in))
           case _ =>
-            accumulated.append(grab(in))
-            if (accumulated.size == n) {
+            builder += grab(in)
+            left -= 1
+            if (left == 0) {
               materializeFlow()
             } else {
-              // gi'me some more!
+              // give me some more!
               pull(in)
             }
         }
@@ -98,12 +101,12 @@ import pekko.util.OptionVal
             // delegate to subSink
             s.pull()
           case _ =>
-            if (accumulated.size < n) pull(in)
-            else if (accumulated.size == n) {
+            if (left > 0) pull(in)
+            else if (left == 0) {
               // corner case for n = 0, can be handled in FlowOps
               materializeFlow()
             } else {
-              throw new IllegalStateException(s"Unexpected accumulated size: 
${accumulated.size} (n: $n)")
+              throw new IllegalStateException(s"Unexpected accumulated size, 
left : $left (n: $n)")
             }
         }
       }
@@ -114,7 +117,7 @@ import pekko.util.OptionVal
           case _ =>
             if (propagateToNestedMaterialization) {
               downstreamCause = OptionVal.Some(cause)
-              if (accumulated.size == n) {
+              if (left == 0) {
                 // corner case for n = 0, can be handled in FlowOps
                 materializeFlow()
               } else if (!hasBeenPulled(in)) { // if in was already closed, 
nested flow would have already been materialized
@@ -128,8 +131,8 @@ import pekko.util.OptionVal
 
       def materializeFlow(): Unit =
         try {
-          val prefix = accumulated.toVector
-          accumulated.clear()
+          val prefix = builder.result()
+          builder = null // free for GC
           subSource = OptionVal.Some(new 
SubSourceOutlet[In]("FlatMapPrefix.subSource"))
           val theSubSource = subSource.get
           subSink = OptionVal.Some(new 
SubSinkInlet[Out]("FlatMapPrefix.subSink"))
@@ -196,6 +199,6 @@ import pekko.util.OptionVal
           case NonFatal(ex) => failStage(ex)
         }
     }
-    (logic, matPromise.future)
+    (FlatMapPrefixLogic, matPromise.future)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to