This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch elements
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 8b48db64e420a75c893d8c9c91b315eb270d4363
Author: He-Pin <[email protected]>
AuthorDate: Sun Nov 2 02:42:34 2025 +0800

    feat: Add Source#elements
---
 .../paradox/stream/operators/Source/elements.md    | 26 ++++++++++++++++
 docs/src/main/paradox/stream/operators/index.md    |  2 ++
 .../apache/pekko/stream/javadsl/SourceTest.java    | 21 +++++++++++++
 .../org/apache/pekko/stream/javadsl/Source.scala   | 18 +++++++++++
 .../org/apache/pekko/stream/scaladsl/Source.scala  | 36 +++++++++++++++++++++-
 5 files changed, 102 insertions(+), 1 deletion(-)

diff --git a/docs/src/main/paradox/stream/operators/Source/elements.md 
b/docs/src/main/paradox/stream/operators/Source/elements.md
new file mode 100644
index 0000000000..87fa35f89c
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source/elements.md
@@ -0,0 +1,26 @@
+# Source.elements
+
+Create a `Source` from the given elements.
+
+@ref[Source operators](../index.md#source-operators)
+
+## Signature
+
+@apidoc[Source.elements](Source$) { 
scala="#elements[T](elements:T*):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.NotUsed]"
 java="#elements[T](T)" }
+
+
+## Description
+
+Create a `Source` from the given elements.
+
+## Examples
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** the elements one by one
+
+**completes** when the last element has been emitted
+
+@@@
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index a728b8b8ae..539632a278 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -13,6 +13,7 @@ These built-in sources are available from 
@scala[`org.apache.pekko.stream.scalad
 |Source|<a 
name="completionstage"></a>@ref[completionStage](Source/completionStage.md)|Send
 the single value of the `CompletionStage` when it completes and there is 
demand.|
 |Source|<a 
name="completionstagesource"></a>@ref[completionStageSource](Source/completionStageSource.md)|Streams
 the elements of an asynchronous source once its given *completion* operator 
completes.|
 |Source|<a name="cycle"></a>@ref[cycle](Source/cycle.md)|Stream iterator in 
cycled manner.|
+|Source|<a name="elements"></a>@ref[elements](Source/elements.md)|Create a 
`Source` from the given elements.|
 |Source|<a name="empty"></a>@ref[empty](Source/empty.md)|Complete right away 
without ever emitting any elements.|
 |Source|<a name="failed"></a>@ref[failed](Source/failed.md)|Fail directly with 
a user specified exception.|
 |Source|<a 
name="from"></a>@ref[@scala[apply]@java[from]](Source/from.md)|Stream the 
values of an @scala[`immutable.Seq`]@java[`Iterable`].|
@@ -454,6 +455,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [dropRepeated](Source-or-Flow/dropRepeated.md)
 * [dropWhile](Source-or-Flow/dropWhile.md)
 * [dropWithin](Source-or-Flow/dropWithin.md)
+* [elements](Source/elements.md)
 * [empty](Source/empty.md)
 * [exists](Sink/exists.md)
 * [expand](Source-or-Flow/expand.md)
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index 27db166b6c..32a56fc667 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -126,6 +126,27 @@ public class SourceTest extends StreamTest {
         .expectComplete();
   }
 
+  @Test
+  public void mustBeAbleToUseElements() {
+    Source.elements("a", "b", "c")
+        .runWith(TestSink.probe(system), system)
+        .ensureSubscription()
+        .request(3)
+        .expectNext("a")
+        .expectNext("b")
+        .expectNext("c")
+        .expectComplete();
+  }
+
+  @Test
+  public void mustBeAbleToUseElementsEmpty() {
+    Source.<String>elements()
+        .runWith(TestSink.probe(system), system)
+        .ensureSubscription()
+        .request(1)
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToUseVoidTypeInForeach() {
     final TestKit probe = new TestKit(system);
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 100eb1d394..56db74790c 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -247,6 +247,24 @@ object Source {
   def single[T](element: T): Source[T, NotUsed] =
     new Source(scaladsl.Source.single(element))
 
+  /**
+   * Create a `Source` from the given elements.
+   *
+   * @since 1.3.0
+   */
+  @varargs
+  @SafeVarargs
+  @SuppressWarnings(Array("varargs"))
+  def elements[T](elements: T*): javadsl.Source[T, NotUsed] = {
+    if (elements.isEmpty) {
+      empty()
+    } else if (elements.length == 1) {
+      single(elements.head)
+    } else {
+      new Source(scaladsl.Source(elements))
+    }
+  }
+
   /**
    * Create a `Source` that will continually emit the given element.
    */
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 5198444a4b..66b23b0700 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl
 
 import java.util.concurrent.CompletionStage
 
-import scala.annotation.tailrec
+import scala.annotation.{ tailrec, varargs }
 import scala.annotation.unchecked.uncheckedVariance
 import scala.collection.{ immutable, AbstractIterator }
 import scala.concurrent.{ Future, Promise }
@@ -437,6 +437,40 @@ object Source {
   def single[T](element: T): Source[T, NotUsed] =
     fromGraph(new GraphStages.SingleSource(element))
 
+  /**
+   * Create a `Source` from the given elements.
+   *
+   * @since 1.3.0
+   */
+  @varargs
+  @SafeVarargs
+  @SuppressWarnings(Array("varargs"))
+  def elements[T](elements: T*): Source[T, NotUsed] = {
+    if (elements.isEmpty) {
+      empty[T]
+    } else if (elements.length == 1) {
+      single(elements.head)
+    } else {
+      Source(elements)
+    }
+  }
+
+  /**
+   * Creates a `Source` from an array, if the array is empty, the stream is 
completed immediately,
+   * otherwise, every element of the array will be emitted sequentially.
+   *
+   * @since 1.3.0
+   */
+  def fromArray[T](array: Array[T]): Source[T, NotUsed] = {
+    if (array.length == 0) {
+      empty
+    } else if (array.length == 1) {
+      single(array(0))
+    } else {
+      Source.fromGraph(new ArraySource[T](array))
+    }
+  }
+
   /**
    * Create a `Source` from an `Option` value, emitting the value if it is 
defined.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to