This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new f0db8f0c33 feat: Add Source#items (#2429)
f0db8f0c33 is described below
commit f0db8f0c3344a74bd41d1c127540bf5d28f56bd1
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Nov 10 21:47:40 2025 +0800
feat: Add Source#items (#2429)
---
.../main/paradox/stream/operators/Source/items.md | 26 ++++++++++++++++++++++
docs/src/main/paradox/stream/operators/index.md | 2 ++
.../apache/pekko/stream/javadsl/SourceTest.java | 21 +++++++++++++++++
.../org/apache/pekko/stream/javadsl/Source.scala | 18 +++++++++++++++
.../org/apache/pekko/stream/scaladsl/Source.scala | 20 ++++++++++++++++-
5 files changed, 86 insertions(+), 1 deletion(-)
diff --git a/docs/src/main/paradox/stream/operators/Source/items.md
b/docs/src/main/paradox/stream/operators/Source/items.md
new file mode 100644
index 0000000000..31cf9a0415
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source/items.md
@@ -0,0 +1,26 @@
+# Source.items
+
+Create a `Source` from the given items.
+
+@ref[Source operators](../index.md#source-operators)
+
+## Signature
+
+@apidoc[Source.items](Source$) {
scala="#items[T](items:T*):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.NotUsed]"
java="#items[T](T)" }
+
+
+## Description
+
+Create a `Source` from the given items.
+
+## Examples
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** the items one by one
+
+**completes** when the last item has been emitted
+
+@@@
diff --git a/docs/src/main/paradox/stream/operators/index.md
b/docs/src/main/paradox/stream/operators/index.md
index 552402b2f9..68bc137888 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -23,6 +23,7 @@ These built-in sources are available from
@scala[`org.apache.pekko.stream.scalad
|Source|<a
name="frompublisher"></a>@ref[fromPublisher](Source/fromPublisher.md)|Integration
with Reactive Streams, subscribes to a `org.reactivestreams.Publisher`.|
|Source|<a name="future"></a>@ref[future](Source/future.md)|Send the single
value of the `Future` when it completes and there is demand.|
|Source|<a
name="futuresource"></a>@ref[futureSource](Source/futureSource.md)|Streams the
elements of the given future source once it successfully completes.|
+|Source|<a name="items"></a>@ref[items](Source/items.md)|Create a `Source`
from the given items.|
|Source|<a name="iterate"></a>@ref[iterate](Source/iterate.md)|Creates a
sequential `Source` by iterating with the given predicate, function and seed.|
|Source|<a
name="lazycompletionstage"></a>@ref[lazyCompletionStage](Source/lazyCompletionStage.md)|Defers
creation of a future of a single element source until there is demand.|
|Source|<a
name="lazycompletionstagesource"></a>@ref[lazyCompletionStageSource](Source/lazyCompletionStageSource.md)|Defers
creation of a future source until there is demand.|
@@ -512,6 +513,7 @@ For more background see the @ref[Error Handling in
Streams](../stream-error.md)
* [interleave](Source-or-Flow/interleave.md)
* [interleaveAll](Source-or-Flow/interleaveAll.md)
* [intersperse](Source-or-Flow/intersperse.md)
+* [items](Source/items.md)
* [iterate](Source/iterate.md)
* [javaCollector](StreamConverters/javaCollector.md)
*
[javaCollectorParallelUnordered](StreamConverters/javaCollectorParallelUnordered.md)
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index 29d96bd1b3..f3d21d506a 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -126,6 +126,27 @@ public class SourceTest extends StreamTest {
.expectComplete();
}
+ @Test
+ public void mustBeAbleToUseItems() {
+ Source.items("a", "b", "c")
+ .runWith(TestSink.create(system), system)
+ .ensureSubscription()
+ .request(3)
+ .expectNext("a")
+ .expectNext("b")
+ .expectNext("c")
+ .expectComplete();
+ }
+
+ @Test
+ public void mustBeAbleToUseItemsWhenEmpty() {
+ Source.<String>items()
+ .runWith(TestSink.create(system), system)
+ .ensureSubscription()
+ .request(1)
+ .expectComplete();
+ }
+
@Test
public void mustBeAbleToUseVoidTypeInForeach() {
final TestKit probe = new TestKit(system);
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 100eb1d394..89bef8f91f 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -247,6 +247,24 @@ object Source {
def single[T](element: T): Source[T, NotUsed] =
new Source(scaladsl.Source.single(element))
+ /**
+ * Create a `Source` from the given elements.
+ *
+ * @since 1.3.0
+ */
+ @varargs
+ @SafeVarargs
+ @SuppressWarnings(Array("varargs"))
+ def items[T](items: T*): javadsl.Source[T, NotUsed] = {
+ if (items.isEmpty) {
+ empty()
+ } else if (items.length == 1) {
+ single(items.head)
+ } else {
+ new Source(scaladsl.Source(items))
+ }
+ }
+
/**
* Create a `Source` that will continually emit the given element.
*/
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 5198444a4b..96b0c55c0d 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl
import java.util.concurrent.CompletionStage
-import scala.annotation.tailrec
+import scala.annotation.{ tailrec, varargs }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.{ immutable, AbstractIterator }
import scala.concurrent.{ Future, Promise }
@@ -437,6 +437,24 @@ object Source {
def single[T](element: T): Source[T, NotUsed] =
fromGraph(new GraphStages.SingleSource(element))
+ /**
+ * Create a `Source` from the given elements.
+ *
+ * @since 1.3.0
+ */
+ @varargs
+ @SafeVarargs
+ @SuppressWarnings(Array("varargs"))
+ def items[T](items: T*): Source[T, NotUsed] = {
+ if (items.isEmpty) {
+ empty[T]
+ } else if (items.length == 1) {
+ single(items.head)
+ } else {
+ Source(items)
+ }
+ }
+
/**
* Create a `Source` from an `Option` value, emitting the value if it is
defined.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]