This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new abc18a5cea feat: Add emitMulti with Spliterator support (#1776)
abc18a5cea is described below

commit abc18a5cea29a85e148357d3c86e18ea2f9dc059
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Thu Feb 27 21:34:58 2025 +0800

    feat: Add emitMulti with Spliterator support (#1776)
---
 .../pekko/stream/impl/GraphStageLogicSpec.scala    | 20 ++++++++++++
 .../org/apache/pekko/stream/stage/GraphStage.scala | 36 +++++++++++++++++++++-
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/GraphStageLogicSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/GraphStageLogicSpec.scala
index 787bbd029e..b62b92f45d 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/GraphStageLogicSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/GraphStageLogicSpec.scala
@@ -121,6 +121,20 @@ class GraphStageLogicSpec extends StreamSpec with 
GraphInterpreterSpecKit with S
     override def toString = "GraphStageLogicSpec.emitEmptyIterable"
   }
 
+  object EmitSplitIterator extends GraphStage[SourceShape[Int]] {
+    val out = Outlet[Int]("out")
+    override val shape = SourceShape(out)
+    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic 
= new GraphStageLogic(shape) {
+      setHandler(out,
+        new OutHandler {
+          override def onPull(): Unit = emitMultiple(
+            out,
+            java.util.stream.Stream.of(1, 2, 3).spliterator(), () => emit(out, 
42, () => completeStage()))
+        })
+    }
+    override def toString = "GraphStageLogicSpec.emitEmptyIterable"
+  }
+
   private case class ReadNEmitN(n: Int) extends GraphStage[FlowShape[Int, 
Int]] {
     override val shape = FlowShape(Inlet[Int]("readN.in"), 
Outlet[Int]("readN.out"))
 
@@ -196,6 +210,12 @@ class GraphStageLogicSpec extends StreamSpec with 
GraphInterpreterSpecKit with S
 
     }
 
+    "emit properly when using split iterator" in {
+
+      Source.fromGraph(EmitSplitIterator).runWith(Sink.seq).futureValue should 
===(List(1, 2, 3, 42))
+
+    }
+
     "invoke lifecycle hooks in the right order" in {
       val g = new GraphStage[FlowShape[Int, Int]] {
         val in = Inlet[Int]("in")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala 
b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala
index b18630843f..16ca4642fa 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala
@@ -21,7 +21,6 @@ import scala.annotation.tailrec
 import scala.collection.{ immutable, mutable }
 import scala.concurrent.{ Future, Promise }
 import scala.concurrent.duration.FiniteDuration
-
 import org.apache.pekko
 import pekko.{ Done, NotUsed }
 import pekko.actor._
@@ -37,6 +36,8 @@ import pekko.stream.stage.ConcurrentAsyncCallbackState.{ 
NoPendingEvents, State
 import pekko.util.OptionVal
 import pekko.util.unused
 
+import java.util.Spliterator
+
 /**
  * Scala API: A GraphStage represents a reusable graph stream processing 
operator.
  *
@@ -979,6 +980,26 @@ abstract class GraphStageLogic private[stream] (val 
inCount: Int, val outCount:
       }
     } else andThen()
 
+  /**
+   * Emit a sequence of elements through the given outlet and continue with 
the given thunk
+   * afterwards, suspending execution if necessary.
+   * This action replaces the [[OutHandler]] for the given outlet if suspension
+   * is needed and reinstalls the current handler upon receiving an `onPull()`
+   * signal (before invoking the `andThen` function).
+   */
+  final protected def emitMultiple[T](out: Outlet[T], elems: Spliterator[T], 
andThen: () => Unit): Unit = {
+    val iter = new EmittingSpliterator[T](out, elems, 
getNonEmittingHandler(out), andThen)
+    if (isAvailable(out)) {
+      if (!iter.tryPush()) {
+        andThen()
+      } else {
+        setOrAddEmitting(out, iter)
+      }
+    } else {
+      setOrAddEmitting(out, iter)
+    }
+  }
+
   /**
    * Emit a sequence of elements through the given outlet, suspending 
execution if necessary.
    * This action replaces the [[OutHandler]] for the given outlet if suspension
@@ -1118,6 +1139,19 @@ abstract class GraphStageLogic private[stream] (val 
inCount: Int, val outCount:
     }
   }
 
+  private final class EmittingSpliterator[T](_out: Outlet[T], elems: 
Spliterator[T], _previous: OutHandler,
+      _andThen: () => Unit)
+      extends Emitting[T](_out, _previous, _andThen) with 
java.util.function.Consumer[T] {
+
+    override def onPull(): Unit = if (!elems.tryAdvance(this)) {
+      followUp()
+    }
+
+    def tryPush(): Boolean = elems.tryAdvance(this)
+
+    override def accept(elem: T): Unit = push(out, elem)
+  }
+
   private class EmittingCompletion[T](_out: Outlet[T], _previous: OutHandler)
       extends Emitting[T](_out, _previous, DoNothing) {
     override def onPull(): Unit = complete(out)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to