This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 01f2265fb2 chore: optimize Source#future and Source#futureSource
(#2560)
01f2265fb2 is described below
commit 01f2265fb2d69e4e82924680740d1b0e3e212f10
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Dec 14 19:13:03 2025 +0800
chore: optimize Source#future and Source#futureSource (#2560)
* chore: optimize Source#future and Source#futureSource
---------
Co-authored-by: PJ Fanning <[email protected]>
---
.../pekko/stream/impl/TraversalBuilderSpec.scala | 5 +-
.../apache/pekko/stream/scaladsl/SourceSpec.scala | 83 +++++++++++++++++++++-
.../org/apache/pekko/stream/scaladsl/Source.scala | 18 +++--
3 files changed, 99 insertions(+), 7 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
index ae0a154061..2af4b9b0a7 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
@@ -13,7 +13,7 @@
package org.apache.pekko.stream.impl
-import scala.concurrent.Future
+import scala.concurrent.Promise
import org.apache.pekko
import pekko.NotUsed
@@ -508,7 +508,8 @@ class TraversalBuilderSpec extends PekkoSpec {
}
"find Source.future via TraversalBuilder with getValuePresentedSource" in {
- val future = Future.successful("a")
+ val promise = Promise[String]()
+ val future = promise.future
TraversalBuilder.getValuePresentedSource(Source.future(future)).get.asInstanceOf[FutureSource[
String]].future should ===(
future)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 72fd8af97c..9cef219a9b 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -556,8 +556,49 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
}
}
- "Source.futureSource" must {
+ "Source.future" must {
+ "work as empty source when the future source completes with null" in {
+ val source = Source.future(Future.successful(null.asInstanceOf[String]))
+ val probe = source.runWith(TestSink[String]())
+
+ probe.request(1)
+ probe.expectComplete()
+ }
+
+ "work with a successful future" in {
+ val source = Source.future(Future.successful(42))
+ val probe = source.runWith(TestSink[Int]())
+
+ probe.request(1)
+ probe.expectNext(42)
+ probe.expectComplete()
+ }
+ "work with a failed future" in {
+ val ex = new RuntimeException("boom")
+ val source = Source.future(Future.failed(ex))
+ val probe = source.runWith(TestSink[Int]())
+
+ probe.request(1)
+ probe.expectError().getMessage should ===("boom")
+ }
+
+ "work with a delayed future" in {
+ val promise = scala.concurrent.Promise[Int]()
+ val source = Source.future(promise.future)
+ val probe = source.runWith(TestSink[Int]())
+
+ probe.request(1)
+ probe.expectNoMessage(500.millis)
+
+ promise.success(42)
+
+ probe.expectNext(42)
+ probe.expectComplete()
+ }
+ }
+
+ "Source.futureSource" must {
"not cancel substream twice" in {
val result = Source
.futureSource(pekko.pattern.after(2.seconds)(Future.successful(Source(1 to 2))))
@@ -567,6 +608,46 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
Await.result(result, 4.seconds) shouldBe Done
}
+
+ "fail when the future completes with null" in {
+ val source =
Source.futureSource(Future.successful(null.asInstanceOf[Source[Int, NotUsed]]))
+ val probe = source.runWith(TestSink[Int]())
+
+ probe.request(1)
+ probe.expectError().getMessage should include("futureSource completed
with null")
+ }
+
+ "work with a successful future" in {
+ val source = Source.futureSource(Future.successful(Source(1 to 3)))
+ val probe = source.runWith(TestSink[Int]())
+
+ probe.request(3)
+ probe.expectNext(1, 2, 3)
+ probe.expectComplete()
+ }
+
+ "work with a failed future source" in {
+ val ex = new RuntimeException("boom")
+ val source = Source.futureSource(Future.failed(ex))
+ val probe = source.runWith(TestSink[Int]())
+
+ probe.request(1)
+ probe.expectError().getMessage should ===("boom")
+ }
+
+ "work with a delayed future source" in {
+ val promise = scala.concurrent.Promise[Source[Int, NotUsed]]()
+ val source = Source.futureSource(promise.future)
+ val probe = source.runWith(TestSink[Int]())
+
+ probe.request(3)
+ probe.expectNoMessage(500.millis)
+
+ promise.success(Source(1 to 3))
+
+ probe.expectNext(1, 2, 3)
+ probe.expectComplete()
+ }
}
"Source of sources" must {
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 1cc0b59eef..e6c867ebc9 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -589,8 +589,12 @@ object Source {
* Emits a single value when the given `Future` is successfully completed
and then completes the stream.
* The stream fails if the `Future` is completed with a failure.
*/
- def future[T](futureElement: Future[T]): Source[T, NotUsed] =
- fromGraph(new FutureSource[T](futureElement))
+ def future[T](futureElement: Future[T]): Source[T, NotUsed] =
futureElement.value match {
+ case None => fromGraph(new
FutureSource[T](futureElement))
+ case Some(scala.util.Success(null)) => empty[T]
+ case Some(scala.util.Success(elem)) => single(elem)
+ case Some(scala.util.Failure(ex)) => failed[T](ex)
+ }
/**
* Never emits any elements, never completes and never fails.
@@ -612,8 +616,14 @@ object Source {
* Turn a `Future[Source]` into a source that will emit the values of the
source when the future completes successfully.
* If the `Future` is completed with a failure the stream is failed.
*/
- def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T,
Future[M]] =
- fromGraph(new FutureFlattenSource(futureSource))
+ def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T,
Future[M]] = futureSource.value match {
+ case None => fromGraph(new
FutureFlattenSource(futureSource))
+ case Some(scala.util.Success(null)) =>
+ val exception = new NullPointerException("futureSource completed with
null")
+ Source.failed(exception).mapMaterializedValue(_ =>
Future.failed[M](exception))
+ case Some(scala.util.Success(source)) =>
source.mapMaterializedValue(Future.successful)
+ case Some(scala.util.Failure(ex)) =>
Source.failed[T](ex).mapMaterializedValue(_ => Future.failed[M](ex))
+ }
/**
* Defers invoking the `create` function to create a single element until
there is downstream demand.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]