This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/1.3.x by this push:
     new 611ce6f072 feat: Add Source#items (#2429) (#2472)
611ce6f072 is described below

commit 611ce6f072e119ff96f8292c67a06a82fcccc29a
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Nov 10 22:57:43 2025 +0800

    feat: Add Source#items (#2429) (#2472)
    
    (cherry picked from commit f0db8f0c3344a74bd41d1c127540bf5d28f56bd1)
---
 .../main/paradox/stream/operators/Source/items.md  | 26 ++++++++++++++++++++++
 docs/src/main/paradox/stream/operators/index.md    |  2 ++
 .../apache/pekko/stream/javadsl/SourceTest.java    | 21 +++++++++++++++++
 .../org/apache/pekko/stream/javadsl/Source.scala   | 18 +++++++++++++++
 .../org/apache/pekko/stream/scaladsl/Source.scala  | 20 ++++++++++++++++-
 5 files changed, 86 insertions(+), 1 deletion(-)

diff --git a/docs/src/main/paradox/stream/operators/Source/items.md 
b/docs/src/main/paradox/stream/operators/Source/items.md
new file mode 100644
index 0000000000..31cf9a0415
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source/items.md
@@ -0,0 +1,26 @@
+# Source.items
+
+Create a `Source` from the given items.
+
+@ref[Source operators](../index.md#source-operators)
+
+## Signature
+
+@apidoc[Source.items](Source$) { 
scala="#items[T](items:T*):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.NotUsed]"
 java="#items[T](T)" }
+
+
+## Description
+
+Create a `Source` from the given items.
+
+## Examples
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** the items one by one
+
+**completes** when the last item has been emitted
+
+@@@
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index 80ae73ef44..8bbf89a16f 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -27,6 +27,7 @@ These built-in sources are available from 
@scala[`org.apache.pekko.stream.scalad
 |Source|<a 
name="fromsourcecompletionstage"></a>@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Deprecated
 by @ref[`Source.completionStageSource`](Source/completionStageSource.md).|
 |Source|<a name="future"></a>@ref[future](Source/future.md)|Send the single 
value of the `Future` when it completes and there is demand.|
 |Source|<a 
name="futuresource"></a>@ref[futureSource](Source/futureSource.md)|Streams the 
elements of the given future source once it successfully completes.|
+|Source|<a name="items"></a>@ref[items](Source/items.md)|Create a `Source` 
from the given items.|
 |Source|<a name="iterate"></a>@ref[iterate](Source/iterate.md)|Creates a 
sequential `Source` by iterating with the given predicate, function and seed.|
 |Source|<a name="lazily"></a>@ref[lazily](Source/lazily.md)|Deprecated by 
@ref[`Source.lazySource`](Source/lazySource.md).|
 |Source|<a 
name="lazilyasync"></a>@ref[lazilyAsync](Source/lazilyAsync.md)|Deprecated by 
@ref[`Source.lazyFutureSource`](Source/lazyFutureSource.md).|
@@ -533,6 +534,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [interleave](Source-or-Flow/interleave.md)
 * [interleaveAll](Source-or-Flow/interleaveAll.md)
 * [intersperse](Source-or-Flow/intersperse.md)
+* [items](Source/items.md)
 * [iterate](Source/iterate.md)
 * [javaCollector](StreamConverters/javaCollector.md)
 * 
[javaCollectorParallelUnordered](StreamConverters/javaCollectorParallelUnordered.md)
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index 9ed206b3c3..bc4d67db7c 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -125,6 +125,27 @@ public class SourceTest extends StreamTest {
         .expectComplete();
   }
 
+  @Test
+  public void mustBeAbleToUseItems() {
+    Source.items("a", "b", "c")
+        .runWith(TestSink.create(system), system)
+        .ensureSubscription()
+        .request(3)
+        .expectNext("a")
+        .expectNext("b")
+        .expectNext("c")
+        .expectComplete();
+  }
+
+  @Test
+  public void mustBeAbleToUseItemsWhenEmpty() {
+    Source.<String>items()
+        .runWith(TestSink.create(system), system)
+        .ensureSubscription()
+        .request(1)
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToUseVoidTypeInForeach() {
     final TestKit probe = new TestKit(system);
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index c97b5ad8f0..7135becfb1 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -302,6 +302,24 @@ object Source {
   def single[T](element: T): Source[T, NotUsed] =
     new Source(scaladsl.Source.single(element))
 
+  /**
+   * Create a `Source` from the given elements.
+   *
+   * @since 1.3.0
+   */
+  @varargs
+  @SafeVarargs
+  @SuppressWarnings(Array("varargs"))
+  def items[T](items: T*): javadsl.Source[T, NotUsed] = {
+    if (items.isEmpty) {
+      empty()
+    } else if (items.length == 1) {
+      single(items.head)
+    } else {
+      new Source(scaladsl.Source(items.toIndexedSeq))
+    }
+  }
+
   /**
    * Create a `Source` that will continually emit the given element.
    */
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 306dde3ec3..255b38b86a 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl
 
 import java.util.concurrent.CompletionStage
 
-import scala.annotation.{ nowarn, tailrec }
+import scala.annotation.{ nowarn, tailrec, varargs }
 import scala.annotation.unchecked.uncheckedVariance
 import scala.collection.{ immutable, AbstractIterator }
 import scala.concurrent.{ Future, Promise }
@@ -480,6 +480,24 @@ object Source {
   def single[T](element: T): Source[T, NotUsed] =
     fromGraph(new GraphStages.SingleSource(element))
 
+  /**
+   * Create a `Source` from the given elements.
+   *
+   * @since 1.3.0
+   */
+  @varargs
+  @SafeVarargs
+  @SuppressWarnings(Array("varargs"))
+  def items[T](items: T*): Source[T, NotUsed] = {
+    if (items.isEmpty) {
+      empty[T]
+    } else if (items.length == 1) {
+      single(items.head)
+    } else {
+      Source(items.toIndexedSeq)
+    }
+  }
+
   /**
    * Create a `Source` from an `Option` value, emitting the value if it is 
defined.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to