This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new 611ce6f072 feat: Add Source#items (#2429) (#2472)
611ce6f072 is described below
commit 611ce6f072e119ff96f8292c67a06a82fcccc29a
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Nov 10 22:57:43 2025 +0800
feat: Add Source#items (#2429) (#2472)
(cherry picked from commit f0db8f0c3344a74bd41d1c127540bf5d28f56bd1)
---
.../main/paradox/stream/operators/Source/items.md | 26 ++++++++++++++++++++++
docs/src/main/paradox/stream/operators/index.md | 2 ++
.../apache/pekko/stream/javadsl/SourceTest.java | 21 +++++++++++++++++
.../org/apache/pekko/stream/javadsl/Source.scala | 18 +++++++++++++++
.../org/apache/pekko/stream/scaladsl/Source.scala | 20 ++++++++++++++++-
5 files changed, 86 insertions(+), 1 deletion(-)
diff --git a/docs/src/main/paradox/stream/operators/Source/items.md
b/docs/src/main/paradox/stream/operators/Source/items.md
new file mode 100644
index 0000000000..31cf9a0415
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source/items.md
@@ -0,0 +1,26 @@
+# Source.items
+
+Create a `Source` from the given items.
+
+@ref[Source operators](../index.md#source-operators)
+
+## Signature
+
+@apidoc[Source.items](Source$) {
scala="#items[T](items:T*):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.NotUsed]"
java="#items[T](T)" }
+
+
+## Description
+
+Create a `Source` from the given items.
+
+## Examples
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** the items one by one
+
+**completes** when the last item has been emitted
+
+@@@
diff --git a/docs/src/main/paradox/stream/operators/index.md
b/docs/src/main/paradox/stream/operators/index.md
index 80ae73ef44..8bbf89a16f 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -27,6 +27,7 @@ These built-in sources are available from
@scala[`org.apache.pekko.stream.scalad
|Source|<a
name="fromsourcecompletionstage"></a>@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Deprecated
by @ref[`Source.completionStageSource`](Source/completionStageSource.md).|
|Source|<a name="future"></a>@ref[future](Source/future.md)|Send the single
value of the `Future` when it completes and there is demand.|
|Source|<a
name="futuresource"></a>@ref[futureSource](Source/futureSource.md)|Streams the
elements of the given future source once it successfully completes.|
+|Source|<a name="items"></a>@ref[items](Source/items.md)|Create a `Source`
from the given items.|
|Source|<a name="iterate"></a>@ref[iterate](Source/iterate.md)|Creates a
sequential `Source` by iterating with the given predicate, function and seed.|
|Source|<a name="lazily"></a>@ref[lazily](Source/lazily.md)|Deprecated by
@ref[`Source.lazySource`](Source/lazySource.md).|
|Source|<a
name="lazilyasync"></a>@ref[lazilyAsync](Source/lazilyAsync.md)|Deprecated by
@ref[`Source.lazyFutureSource`](Source/lazyFutureSource.md).|
@@ -533,6 +534,7 @@ For more background see the @ref[Error Handling in
Streams](../stream-error.md)
* [interleave](Source-or-Flow/interleave.md)
* [interleaveAll](Source-or-Flow/interleaveAll.md)
* [intersperse](Source-or-Flow/intersperse.md)
+* [items](Source/items.md)
* [iterate](Source/iterate.md)
* [javaCollector](StreamConverters/javaCollector.md)
*
[javaCollectorParallelUnordered](StreamConverters/javaCollectorParallelUnordered.md)
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index 9ed206b3c3..bc4d67db7c 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -125,6 +125,27 @@ public class SourceTest extends StreamTest {
.expectComplete();
}
+ @Test
+ public void mustBeAbleToUseItems() {
+ Source.items("a", "b", "c")
+ .runWith(TestSink.create(system), system)
+ .ensureSubscription()
+ .request(3)
+ .expectNext("a")
+ .expectNext("b")
+ .expectNext("c")
+ .expectComplete();
+ }
+
+ @Test
+ public void mustBeAbleToUseItemsWhenEmpty() {
+ Source.<String>items()
+ .runWith(TestSink.create(system), system)
+ .ensureSubscription()
+ .request(1)
+ .expectComplete();
+ }
+
@Test
public void mustBeAbleToUseVoidTypeInForeach() {
final TestKit probe = new TestKit(system);
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index c97b5ad8f0..7135becfb1 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -302,6 +302,24 @@ object Source {
def single[T](element: T): Source[T, NotUsed] =
new Source(scaladsl.Source.single(element))
+ /**
+ * Create a `Source` from the given elements.
+ *
+ * @since 1.3.0
+ */
+ @varargs
+ @SafeVarargs
+ @SuppressWarnings(Array("varargs"))
+ def items[T](items: T*): javadsl.Source[T, NotUsed] = {
+ if (items.isEmpty) {
+ empty()
+ } else if (items.length == 1) {
+ single(items.head)
+ } else {
+ new Source(scaladsl.Source(items.toIndexedSeq))
+ }
+ }
+
/**
* Create a `Source` that will continually emit the given element.
*/
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 306dde3ec3..255b38b86a 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl
import java.util.concurrent.CompletionStage
-import scala.annotation.{ nowarn, tailrec }
+import scala.annotation.{ nowarn, tailrec, varargs }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.{ immutable, AbstractIterator }
import scala.concurrent.{ Future, Promise }
@@ -480,6 +480,24 @@ object Source {
def single[T](element: T): Source[T, NotUsed] =
fromGraph(new GraphStages.SingleSource(element))
+ /**
+ * Create a `Source` from the given elements.
+ *
+ * @since 1.3.0
+ */
+ @varargs
+ @SafeVarargs
+ @SuppressWarnings(Array("varargs"))
+ def items[T](items: T*): Source[T, NotUsed] = {
+ if (items.isEmpty) {
+ empty[T]
+ } else if (items.length == 1) {
+ single(items.head)
+ } else {
+ Source(items.toIndexedSeq)
+ }
+ }
+
/**
* Create a `Source` from an `Option` value, emitting the value if it is
defined.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]