This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new 09e370860a feat: Add Source#apply for Array (#2474)
09e370860a is described below
commit 09e370860a9b006ceda9b181e926d9e52d529da8
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Tue Nov 11 01:01:17 2025 +0800
feat: Add Source#apply for Array (#2474)
---
.../pekko/stream/DslFactoriesConsistencySpec.scala | 1 +
.../org/apache/pekko/stream/scaladsl/SourceSpec.scala | 12 ++++++++++++
.../scala/org/apache/pekko/stream/javadsl/Source.scala | 5 ++---
.../scala/org/apache/pekko/stream/scaladsl/Source.scala | 17 ++++++++++++++++-
4 files changed, 31 insertions(+), 4 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
index 93defd9017..29565b52a4 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
@@ -55,6 +55,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with
Matchers {
("apply" -> "fromGraph") ::
("apply" -> "fromIterator") ::
("apply" -> "fromFunctions") ::
+ ("apply" -> "fromArray") ::
Nil
// format: OFF
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 87c20154d2..780760f6a7 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -62,6 +62,18 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
c.expectComplete()
}
+ "product elements with Array" in {
+ val p = Source(Array(1, 2, 3)).runWith(Sink.asPublisher(false))
+ val c = TestSubscriber.manualProbe[Int]()
+ p.subscribe(c)
+ val sub = c.expectSubscription()
+ sub.request(3)
+ c.expectNext(1)
+ c.expectNext(2)
+ c.expectNext(3)
+ c.expectComplete()
+ }
+
"reject later subscriber" in {
val p = Source.single(1).runWith(Sink.asPublisher(false))
val c1 = TestSubscriber.manualProbe[Int]()
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 7135becfb1..5cc5401219 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -36,7 +36,7 @@ import pekko.japi.function.Creator
import pekko.stream._
import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava
}
import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.{ ArraySource, StatefulMapConcat,
ZipWithIndexJava }
+import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
import pekko.util.{ unused, _ }
import pekko.util.FutureConverters._
import pekko.util.JavaDurationConverters._
@@ -201,8 +201,7 @@ object Source {
*
* @since 1.1.0
*/
- def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new
Source(scaladsl.Source.fromGraph(
- new ArraySource[T](array)))
+ def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new
Source(scaladsl.Source(array))
/**
* Create a `Source` from an `Optional` value, emitting the value if it is
present.
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 255b38b86a..c1b512a7ce 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -29,7 +29,7 @@ import pekko.annotation.InternalApi
import pekko.stream._
import pekko.stream.impl._
import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.{ GraphStages, IterableSource,
LazyFutureSource, LazySingleSource }
+import pekko.stream.impl.fusing.{ ArraySource, GraphStages, IterableSource,
LazyFutureSource, LazySingleSource }
import pekko.stream.impl.fusing.GraphStages._
import pekko.stream.stage.GraphStageWithMaterializedValue
import pekko.util.ConstantFun
@@ -423,6 +423,21 @@ object Source {
def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] =
fromGraph(new
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
+ /**
+ * Creates a `Source` from an array, if the array is empty, the stream is
completed immediately,
+ * otherwise, every element of the array will be emitted sequentially.
+ *
+ * @since 1.3.0
+ */
+ def apply[T](array: Array[T]): Source[T, NotUsed] = {
+ if (array.length == 0)
+ empty
+ else if (array.length == 1)
+ single(array(0))
+ else
+ Source.fromGraph(new ArraySource[T](array))
+ }
+
/**
* Starts a new `Source` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]