This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new f24e3887f3 =str Implement Source.never as a dedicated GraphStage.
f24e3887f3 is described below
commit f24e3887f3c6c8c7ad93805ba692e1c72f6d8902
Author: He-Pin <[email protected]>
AuthorDate: Sat Apr 1 17:44:01 2023 +0800
=str Implement Source.never as a dedicated GraphStage.
---
.../org/apache/pekko/stream/impl/fusing/GraphStages.scala | 15 +++++++++++++++
.../scala/org/apache/pekko/stream/scaladsl/Source.scala | 5 ++---
2 files changed, 17 insertions(+), 3 deletions(-)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
index ea8ad1e36c..036976b238 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
@@ -457,6 +457,21 @@ import pekko.stream.stage._
}
}
+ @InternalApi
+ private[pekko] object NeverSource extends GraphStage[SourceShape[Nothing]] {
+ private val out = Outlet[Nothing]("NeverSource.out")
+ val shape: SourceShape[Nothing] = SourceShape(out)
+
+ override def initialAttributes: Attributes = DefaultAttributes.neverSource
+
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic
with OutHandler =
+ new GraphStageLogic(shape) with OutHandler {
+ override def onPull(): Unit = ()
+
+ setHandler(out, this)
+ }
+ }
+
@InternalApi
private[pekko] object NeverSink extends
GraphStageWithMaterializedValue[SinkShape[Any], Future[Done]] {
private val in = Inlet[Any]("NeverSink.in")
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 593547d958..1d935d990a 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -293,7 +293,7 @@ object Source {
*/
def fromJavaStream[T, S <: java.util.stream.BaseStream[T, S]](
stream: () => java.util.stream.BaseStream[T, S]): Source[T, NotUsed] =
- StreamConverters.fromJavaStream(stream);
+ StreamConverters.fromJavaStream(stream)
/**
* Creates [[Source]] that will continually produce given elements in
specified order.
@@ -516,8 +516,7 @@ object Source {
* This stream could be useful in tests.
*/
def never[T]: Source[T, NotUsed] = _never
- private[this] val _never: Source[Nothing, NotUsed] =
- future(Future.never).withAttributes(DefaultAttributes.neverSource)
+ private[this] val _never: Source[Nothing, NotUsed] =
fromGraph(GraphStages.NeverSource)
/**
* Emits a single value when the given `CompletionStage` is successfully
completed and then completes the stream.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]