This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch alreadyCompletedFuture in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 2cc7584dcec62271b1e33c1f0a8ed7a67fb17547 Author: He-Pin <[email protected]> AuthorDate: Sun Dec 7 18:25:46 2025 +0800 chore: optimize Source#future and Source#futureSource --- .../apache/pekko/stream/scaladsl/SourceSpec.scala | 83 +++++++++++++++++++++- .../org/apache/pekko/stream/scaladsl/Source.scala | 18 +++-- 2 files changed, 96 insertions(+), 5 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala index 780760f6a7..c36793220a 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala @@ -547,8 +547,49 @@ class SourceSpec extends StreamSpec with DefaultTimeout { } } - "Source.futureSource" must { + "Source.future" must { + "work as empty source when the future source completes with null" in { + val source = Source.future(Future.successful(null.asInstanceOf[String])) + val probe = source.runWith(TestSink[String]()) + + probe.request(1) + probe.expectComplete() + } + + "work with a successful future" in { + val source = Source.future(Future.successful(42)) + val probe = source.runWith(TestSink[Int]()) + + probe.request(1) + probe.expectNext(42) + probe.expectComplete() + } + "work with a failed future" in { + val ex = new RuntimeException("boom") + val source = Source.future(Future.failed(ex)) + val probe = source.runWith(TestSink[Int]()) + + probe.request(1) + probe.expectError().getMessage should ===("boom") + } + + "work with a delayed future" in { + val promise = scala.concurrent.Promise[Int]() + val source = Source.future(promise.future) + val probe = source.runWith(TestSink[Int]()) + + probe.request(1) + probe.expectNoMessage(500.millis) + + promise.success(42) + + probe.expectNext(42) + probe.expectComplete() + } + } + + "Source.futureSource" must { "not cancel substream twice" in { val result = Source .futureSource(pekko.pattern.after(2.seconds)(Future.successful(Source(1 to 2)))) @@ -558,6 +599,46 @@ class SourceSpec extends StreamSpec with DefaultTimeout { Await.result(result, 4.seconds) shouldBe Done } + + "fail when the future completes with null" in { + val source = Source.futureSource(Future.successful(null.asInstanceOf[Source[Int, NotUsed]])) + val probe = source.runWith(TestSink[Int]()) + + probe.request(1) + probe.expectError().getMessage should include("futureSource completed with null") + } + + "work with a successful future" in { + val source = Source.futureSource(Future.successful(Source(1 to 3))) + val probe = source.runWith(TestSink[Int]()) + + probe.request(3) + probe.expectNext(1, 2, 3) + probe.expectComplete() + } + + "work with a failed future source" in { + val ex = new RuntimeException("boom") + val source = Source.futureSource(Future.failed(ex)) + val probe = source.runWith(TestSink[Int]()) + + probe.request(1) + probe.expectError().getMessage should ===("boom") + } + + "work with a delayed future source" in { + val promise = scala.concurrent.Promise[Source[Int, NotUsed]]() + val source = Source.futureSource(promise.future) + val probe = source.runWith(TestSink[Int]()) + + probe.request(3) + probe.expectNoMessage(500.millis) + + promise.success(Source(1 to 3)) + + probe.expectNext(1, 2, 3) + probe.expectComplete() + } } "Source of sources" must { diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 96b0c55c0d..ab456b704e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -561,8 +561,12 @@ object Source { * Emits a single value when the given `Future` is successfully completed and then completes the stream. * The stream fails if the `Future` is completed with a failure. */ - def future[T](futureElement: Future[T]): Source[T, NotUsed] = - fromGraph(new FutureSource[T](futureElement)) + def future[T](futureElement: Future[T]): Source[T, NotUsed] = futureElement.value match { + case Some(scala.util.Success(null)) => empty[T] + case Some(scala.util.Success(elem)) => single(elem) + case Some(scala.util.Failure(ex)) => failed[T](ex) + case _ => fromGraph(new FutureSource[T](futureElement)) + } /** * Never emits any elements, never completes and never fails. @@ -584,8 +588,14 @@ object Source { * Turn a `Future[Source]` into a source that will emit the values of the source when the future completes successfully. * If the `Future` is completed with a failure the stream is failed. */ - def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, Future[M]] = - fromGraph(new FutureFlattenSource(futureSource)) + def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, Future[M]] = futureSource.value match { + case Some(scala.util.Success(null)) => + val exception = new NullPointerException("futureSource completed with null") + Source.failed(exception).mapMaterializedValue(_ => Future.failed[M](exception)) + case Some(scala.util.Success(source)) => source.mapMaterializedValue(Future.successful) + case Some(scala.util.Failure(ex)) => Source.failed[T](ex).mapMaterializedValue(_ => Future.failed[M](ex)) + case _ => fromGraph(new FutureFlattenSource(futureSource)) + } /** * Defers invoking the `create` function to create a single element until there is downstream demand. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
