This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new f24e3887f3 =str Implement Source.never as a dedicated GraphStage.
f24e3887f3 is described below

commit f24e3887f3c6c8c7ad93805ba692e1c72f6d8902
Author: He-Pin <[email protected]>
AuthorDate: Sat Apr 1 17:44:01 2023 +0800

    =str Implement Source.never as a dedicated GraphStage.
---
 .../org/apache/pekko/stream/impl/fusing/GraphStages.scala | 15 +++++++++++++++
 .../scala/org/apache/pekko/stream/scaladsl/Source.scala   |  5 ++---
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
index ea8ad1e36c..036976b238 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
@@ -457,6 +457,21 @@ import pekko.stream.stage._
     }
   }
 
+  @InternalApi
+  private[pekko] object NeverSource extends GraphStage[SourceShape[Nothing]] {
+    private val out = Outlet[Nothing]("NeverSource.out")
+    val shape: SourceShape[Nothing] = SourceShape(out)
+
+    override def initialAttributes: Attributes = DefaultAttributes.neverSource
+
+    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic 
with OutHandler =
+      new GraphStageLogic(shape) with OutHandler {
+        override def onPull(): Unit = ()
+
+        setHandler(out, this)
+      }
+  }
+
   @InternalApi
   private[pekko] object NeverSink extends 
GraphStageWithMaterializedValue[SinkShape[Any], Future[Done]] {
     private val in = Inlet[Any]("NeverSink.in")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 593547d958..1d935d990a 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -293,7 +293,7 @@ object Source {
    */
   def fromJavaStream[T, S <: java.util.stream.BaseStream[T, S]](
       stream: () => java.util.stream.BaseStream[T, S]): Source[T, NotUsed] =
-    StreamConverters.fromJavaStream(stream);
+    StreamConverters.fromJavaStream(stream)
 
   /**
    * Creates [[Source]] that will continually produce given elements in 
specified order.
@@ -516,8 +516,7 @@ object Source {
    * This stream could be useful in tests.
    */
   def never[T]: Source[T, NotUsed] = _never
-  private[this] val _never: Source[Nothing, NotUsed] =
-    future(Future.never).withAttributes(DefaultAttributes.neverSource)
+  private[this] val _never: Source[Nothing, NotUsed] = 
fromGraph(GraphStages.NeverSource)
 
   /**
    * Emits a single value when the given `CompletionStage` is successfully 
completed and then completes the stream.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to