This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c3e9b43b6 chore: Add Source.fromArray operator for Java dsl. (#1248)
2c3e9b43b6 is described below

commit 2c3e9b43b67c2ed4c0ec4fe3c6ee39188852e726
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Tue Apr 30 22:52:39 2024 +0800

    chore: Add Source.fromArray operator for Java dsl. (#1248)
---
 .../paradox/stream/operators/Source/fromArray.md   | 28 +++++++++++
 docs/src/main/paradox/stream/operators/index.md    |  2 +
 .../jdocs/stream/operators/SourceDocExamples.java  |  8 ++++
 .../apache/pekko/stream/javadsl/SourceTest.java    | 20 ++++++++
 .../org/apache/pekko/stream/impl/Stages.scala      |  1 +
 .../pekko/stream/impl/fusing/ArraySource.scala     | 55 ++++++++++++++++++++++
 .../pekko/stream/impl/fusing/IterableSource.scala  |  2 +
 .../org/apache/pekko/stream/javadsl/Source.scala   | 10 ++++
 8 files changed, 126 insertions(+)

diff --git a/docs/src/main/paradox/stream/operators/Source/fromArray.md 
b/docs/src/main/paradox/stream/operators/Source/fromArray.md
new file mode 100644
index 0000000000..0e0eb34e7a
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source/fromArray.md
@@ -0,0 +1,28 @@
+# Source.fromArray
+
+Stream the values of an `array`.
+
+@ref[Source operators](../index.md#source-operators)
+
+## Signature
+
+@apidoc[Source.from](Source$) { java="#fromArray(java.lang.Object[])" }
+
+## Description
+
+Stream the values of a Java `array`. 
+
+## Examples
+
+Java
+:  @@snip 
[from.java](/docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) 
{ #imports #source-from-array }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** the next value of the array
+
+**completes** when the last element of the seq has been emitted
+
+@@@
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index eff0593c8b..cc6d698a5b 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -16,6 +16,7 @@ These built-in sources are available from 
@scala[`org.apache.pekko.stream.scalad
 |Source|<a name="empty"></a>@ref[empty](Source/empty.md)|Complete right away 
without ever emitting any elements.|
 |Source|<a name="failed"></a>@ref[failed](Source/failed.md)|Fail directly with 
a user specified exception.|
 |Source|<a 
name="from"></a>@ref[@scala[apply]@java[from]](Source/from.md)|Stream the 
values of an @scala[`immutable.Seq`]@java[`Iterable`].|
+|Source|<a name="fromarray"></a>@ref[fromArray](Source/fromArray.md)|Stream 
the values of an `array`.|
 |Source|<a 
name="fromcompletionstage"></a>@ref[fromCompletionStage](Source/fromCompletionStage.md)|Deprecated
 by @ref[`Source.completionStage`](Source/completionStage.md).|
 |Source|<a 
name="fromfuture"></a>@ref[fromFuture](Source/fromFuture.md)|Deprecated by 
@ref[`Source.future`](Source/future.md).|
 |Source|<a 
name="fromfuturesource"></a>@ref[fromFutureSource](Source/fromFutureSource.md)|Deprecated
 by @ref[`Source.futureSource`](Source/futureSource.md).|
@@ -468,6 +469,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [foreachAsync](Sink/foreachAsync.md)
 * [foreachParallel](Sink/foreachParallel.md)
 * [from](Source/from.md)
+* [fromArray](Source/fromArray.md)
 * [fromCompletionStage](Source/fromCompletionStage.md)
 * [fromFile](FileIO/fromFile.md)
 * [fromFuture](Source/fromFuture.md)
diff --git a/docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java 
b/docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java
index 531d4f3877..c2afdff219 100644
--- a/docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java
+++ b/docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java
@@ -60,6 +60,14 @@ public class SourceDocExamples {
     // #source-from-example
   }
 
+  private static void fromArrayExample() {
+    final ActorSystem system = null;
+    // #source-from-array
+    Source<String, NotUsed> words = Source.fromArray("Hello 
world".split("\\s"));
+    words.runForeach(System.out::println, system);
+    // #source-from-array
+  }
+
   static void rangeExample() {
 
     final ActorSystem system = ActorSystem.create("Source");
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index ced206a4a6..8464688043 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -105,6 +105,26 @@ public class SourceTest extends StreamTest {
     probe.expectMsgEquals("de");
   }
 
+  @Test
+  public void mustBeAbleToCompleteWhenArrayIsEmpty() {
+    Source.fromArray(new String[] {})
+        .runWith(TestSink.probe(system), system)
+        .ensureSubscription()
+        .expectComplete();
+  }
+
+  @Test
+  public void mustBeAbleToEmitEveryArrayElementSequentially() {
+    Source.fromArray(new String[] {"a", "b", "c"})
+        .runWith(TestSink.probe(system), system)
+        .ensureSubscription()
+        .request(3)
+        .expectNext("a")
+        .expectNext("b")
+        .expectNext("c")
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToUseVoidTypeInForeach() {
     final TestKit probe = new TestKit(system);
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index 0a18b880bb..1559a550dc 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -116,6 +116,7 @@ import pekko.stream.Attributes._
 
     val publisherSource = name("publisherSource")
     val iterableSource = name("iterableSource")
+    val arraySource = name("arraySource")
     val iterateSource = name("iterateSource")
     val cycledSource = name("cycledSource")
     val futureSource = name("futureSource")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ArraySource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ArraySource.scala
new file mode 100644
index 0000000000..afbbbf2b3d
--- /dev/null
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ArraySource.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, Outlet, SourceShape }
+import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
+
+@InternalApi
+private[pekko] final class ArraySource[T](elements: Array[T]) extends 
GraphStage[SourceShape[T]] {
+  require(elements ne null, "array must not be null")
+  override protected def initialAttributes: Attributes = 
DefaultAttributes.arraySource
+  private val out = Outlet[T]("ArraySource.out")
+  override val shape: SourceShape[T] = SourceShape(out)
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with OutHandler {
+      private var index: Int = 0
+
+      override def preStart(): Unit = if (elements.isEmpty) completeStage()
+
+      override def onPull(): Unit = {
+        if (index < elements.length) {
+          push(out, elements(index))
+          index += 1
+          if (index == elements.length) {
+            complete(out)
+          }
+        } else {
+          complete(out)
+        }
+      }
+
+      setHandler(out, this)
+    }
+
+  override def toString: String = "ArraySource"
+}
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
index 0bf99be6e3..801803e742 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
@@ -21,12 +21,14 @@ import scala.collection.immutable
 import scala.util.control.NonFatal
 
 import org.apache.pekko
+import pekko.annotation.InternalApi
 import pekko.stream.{ Attributes, Outlet, SourceShape, Supervision }
 import pekko.stream.ActorAttributes.SupervisionStrategy
 import pekko.stream.impl.ReactiveStreamsCompliance
 import pekko.stream.impl.Stages.DefaultAttributes
 import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
 
+@InternalApi
 private[pekko] final class IterableSource[T](val elements: 
immutable.Iterable[T]) extends GraphStage[SourceShape[T]] {
   ReactiveStreamsCompliance.requireNonNullElement(elements)
 
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index dd3747a244..c53881266c 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -26,6 +26,7 @@ import scala.concurrent.duration.FiniteDuration
 import scala.reflect.ClassTag
 
 import org.apache.pekko
+import org.apache.pekko.stream.impl.fusing.ArraySource
 import pekko.{ Done, NotUsed }
 import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider }
 import pekko.annotation.ApiMayChange
@@ -166,6 +167,15 @@ object Source {
     new Source(scaladsl.Source(scalaIterable))
   }
 
+  /**
+   * Creates a `Source` from an array, if the array is empty, the stream is 
completed immediately,
+   * otherwise, every element of the array will be emitted sequentially.
+   *
+   * @since 1.1.0
+   */
+  def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new 
Source(scaladsl.Source.fromGraph(
+    new ArraySource[T](array)))
+
   /**
    * Creates [[Source]] that represents integer values in range 
''[start;end]'', step equals to 1.
    * It allows to create `Source` out of range as simply as on Scala `Source(1 
to N)`


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to