This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch elements in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 3634569526f1d0c5566e5db6df5acbe7995bda70 Author: He-Pin <[email protected]> AuthorDate: Sun Nov 2 02:42:34 2025 +0800 feat: Add Source#elements --- .../paradox/stream/operators/Source/elements.md | 26 ++++++++++++++++++++++ docs/src/main/paradox/stream/operators/index.md | 2 ++ .../apache/pekko/stream/javadsl/SourceTest.java | 21 +++++++++++++++++ .../org/apache/pekko/stream/javadsl/Source.scala | 17 ++++++++++++++ .../org/apache/pekko/stream/scaladsl/Source.scala | 19 +++++++++++++++- 5 files changed, 84 insertions(+), 1 deletion(-) diff --git a/docs/src/main/paradox/stream/operators/Source/elements.md b/docs/src/main/paradox/stream/operators/Source/elements.md new file mode 100644 index 0000000000..87fa35f89c --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source/elements.md @@ -0,0 +1,26 @@ +# Source.elements + +Create a `Source` from the given elements. + +@ref[Source operators](../index.md#source-operators) + +## Signature + +@apidoc[Source.elements](Source$) { scala="#elements[T](elements:T*):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.NotUsed]" java="#elements[T](T)" } + + +## Description + +Create a `Source` from the given elements. + +## Examples + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** the elements one by one + +**completes** when the last element has been emitted + +@@@ diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index ad676d0551..672f585554 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -13,6 +13,7 @@ These built-in sources are available from @scala[`org.apache.pekko.stream.scalad |Source|<a name="completionstage"></a>@ref[completionStage](Source/completionStage.md)|Send the single value of the `CompletionStage` when it completes and there is demand.| |Source|<a name="completionstagesource"></a>@ref[completionStageSource](Source/completionStageSource.md)|Streams the elements of an asynchronous source once its given *completion* operator completes.| |Source|<a name="cycle"></a>@ref[cycle](Source/cycle.md)|Stream iterator in cycled manner.| +|Source|<a name="elements"></a>@ref[elements](Source/elements.md)|Create a `Source` from the given elements.| |Source|<a name="empty"></a>@ref[empty](Source/empty.md)|Complete right away without ever emitting any elements.| |Source|<a name="failed"></a>@ref[failed](Source/failed.md)|Fail directly with a user specified exception.| |Source|<a name="from"></a>@ref[@scala[apply]@java[from]](Source/from.md)|Stream the values of an @scala[`immutable.Seq`]@java[`Iterable`].| @@ -453,6 +454,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [dropRepeated](Source-or-Flow/dropRepeated.md) * [dropWhile](Source-or-Flow/dropWhile.md) * [dropWithin](Source-or-Flow/dropWithin.md) +* [elements](Source/elements.md) * [empty](Source/empty.md) * [exists](Sink/exists.md) * [expand](Source-or-Flow/expand.md) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 2787a4f22e..991229d809 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -126,6 +126,27 @@ public class SourceTest extends StreamTest { .expectComplete(); } + @Test + public void mustBeAbleToUseElements() { + Source.elements("a", "b", "c") + .runWith(TestSink.probe(system), system) + .ensureSubscription() + .request(3) + .expectNext("a") + .expectNext("b") + .expectNext("c") + .expectComplete(); + } + + @Test + public void mustBeAbleToUseElementsEmpty() { + Source.<String>elements() + .runWith(TestSink.probe(system), system) + .ensureSubscription() + .request(1) + .expectComplete(); + } + @Test public void mustBeAbleToUseVoidTypeInForeach() { final TestKit probe = new TestKit(system); diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 1f5468961f..f8dbe68eaf 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -247,6 +247,23 @@ object Source { def single[T](element: T): Source[T, NotUsed] = new Source(scaladsl.Source.single(element)) + /** + * Create a `Source` from the given elements. + * + * @since 1.3.0 + */ + @varargs + @SafeVarargs + def elements[T](elements: T*): javadsl.Source[T, NotUsed] = { + if (elements.isEmpty) { + empty() + } else if (elements.length == 1) { + single(elements.head) + } else { + new Source(scaladsl.Source(elements)) + } + } + /** * Create a `Source` that will continually emit the given element. */ diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index a50d7bcd8e..2ebbd6207a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl import java.util.concurrent.CompletionStage -import scala.annotation.tailrec +import scala.annotation.{ tailrec, varargs } import scala.annotation.unchecked.uncheckedVariance import scala.collection.{ immutable, AbstractIterator } import scala.concurrent.{ Future, Promise } @@ -422,6 +422,23 @@ object Source { def single[T](element: T): Source[T, NotUsed] = fromGraph(new GraphStages.SingleSource(element)) + /** + * Create a `Source` from the given elements. + * + * @since 1.3.0 + */ + @varargs + @SafeVarargs + def elements[T](elements: T*): Source[T, NotUsed] = { + if (elements.isEmpty) { + empty[T] + } else if (elements.length == 1) { + single(elements.head) + } else { + Source(elements) + } + } + /** * Create a `Source` from an `Option` value, emitting the value if it is defined. * --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
