This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ad70fffeb =str Add IterableSource.
5ad70fffeb is described below

commit 5ad70fffeb3935b6b1b2ccc7f44eb3bb35d956e7
Author: He-Pin <[email protected]>
AuthorDate: Tue Aug 22 12:54:36 2023 +0800

    =str Add IterableSource.
    
    Signed-off-by: He-Pin <[email protected]>
---
 .../pekko/stream/scaladsl/ReverseArrowSpec.scala   |  2 +-
 .../apache/pekko/stream/scaladsl/SourceSpec.scala  | 52 ++++++++++++++-
 .../pekko/stream/impl/fusing/IterableSource.scala  | 76 ++++++++++++++++++++++
 .../org/apache/pekko/stream/scaladsl/Source.scala  |  7 +-
 4 files changed, 130 insertions(+), 7 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ReverseArrowSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ReverseArrowSpec.scala
index 9d52f1a18e..9ca05e3b75 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ReverseArrowSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ReverseArrowSpec.scala
@@ -227,7 +227,7 @@ class ReverseArrowSpec extends StreamSpec {
             src ~> f
             sink2 <~ f
             (the[IllegalArgumentException] thrownBy (s <~ f <~ 
src)).getMessage should include(
-              "[StatefulMapConcat.out] is already connected")
+              "[IterableSource.out] is already connected")
             ClosedShape
           })
           .run(),
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 4e31769fcb..a24e492269 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -311,19 +311,67 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
     }
 
     "use decider when iterator throws" in {
+
+      // using stopping decider
+      Source
+        .fromIterator(() => (1 to 5).iterator.map(k => if (k != 3) k else 
throw TE("a")))
+        
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
+        .grouped(10)
+        .runWith(Sink.head)
+        .failed
+        .futureValue shouldBe a[TE]
+
+      // using stopping decider with recover
+      Source
+        .fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else 
throw TE("a")))
+        
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
+        .recoverWithRetries(1, { case _ => Source.empty })
+        .grouped(10)
+        .runWith(Sink.head)
+        .futureValue shouldBe List(1, 2)
+
+      // failing on every elements, using stopping decider
+      Source
+        .fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b")))
+        
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
+        .grouped(10)
+        .runWith(Sink.headOption)
+        .failed
+        .futureValue shouldBe a[TE]
+
+      // failing on every elements, using stopping decider and recover
+      Source
+        .fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b")))
+        
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
+        .recoverWithRetries(1, { case _ => Source.empty })
+        .grouped(10)
+        .runWith(Sink.headOption)
+        .futureValue shouldBe None
+
+      // using resuming decider
+      Source
+        .fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else 
throw TE("a")))
+        
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
+        .grouped(10)
+        .runWith(Sink.head)
+        .futureValue should ===(List(1, 2, 4, 5))
+
+      // using restarting decider
       Source
         .fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else 
throw TE("a")))
         
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
         .grouped(10)
         .runWith(Sink.head)
-        .futureValue should ===(List(1, 2))
+        .futureValue should ===(List(1, 2, 1, 2, 1, 2, 1, 2, 1, 2))
 
+      //  with failing on every elements, using restarting decider
       Source
         .fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b")))
         
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
         .grouped(10)
         .runWith(Sink.headOption)
-        .futureValue should ===(None)
+        .failed
+        .futureValue shouldBe a[TE]
     }
   }
 
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
new file mode 100644
index 0000000000..0bf99be6e3
--- /dev/null
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import scala.collection.immutable
+import scala.util.control.NonFatal
+
+import org.apache.pekko
+import pekko.stream.{ Attributes, Outlet, SourceShape, Supervision }
+import pekko.stream.ActorAttributes.SupervisionStrategy
+import pekko.stream.impl.ReactiveStreamsCompliance
+import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
+
+private[pekko] final class IterableSource[T](val elements: 
immutable.Iterable[T]) extends GraphStage[SourceShape[T]] {
+  ReactiveStreamsCompliance.requireNonNullElement(elements)
+
+  override protected def initialAttributes: Attributes = 
DefaultAttributes.iterableSource
+
+  private val out = Outlet[T]("IterableSource.out")
+  override val shape: SourceShape[T] = SourceShape(out)
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with OutHandler {
+      private lazy val decider = 
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
+      private var currentIterator: Iterator[T] = _
+
+      override def onPull(): Unit =
+        try {
+          if (currentIterator eq null) {
+            currentIterator = elements.iterator
+          }
+          tryPushNextOrComplete()
+        } catch {
+          case NonFatal(ex) =>
+            decider(ex) match {
+              case Supervision.Stop   => failStage(ex)
+              case Supervision.Resume => tryPushNextOrComplete()
+              case Supervision.Restart =>
+                currentIterator = elements.iterator
+                tryPushNextOrComplete()
+            }
+        }
+
+      private def tryPushNextOrComplete(): Unit =
+        if (currentIterator.hasNext) {
+          if (isAvailable(out)) {
+            push(out, currentIterator.next())
+            if (!currentIterator.hasNext) {
+              completeStage()
+            }
+          }
+        } else {
+          completeStage()
+        }
+
+      setHandler(out, this)
+    }
+
+  override def toString: String = "IterableSource"
+}
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 1d935d990a..cd33cfb799 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -28,7 +28,7 @@ import pekko.annotation.InternalApi
 import pekko.stream.{ Outlet, SourceShape, _ }
 import pekko.stream.impl.{ PublisherSource, _ }
 import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.{ GraphStages, LazyFutureSource, 
LazySingleSource }
+import pekko.stream.impl.fusing.{ GraphStages, IterableSource, 
LazyFutureSource, LazySingleSource }
 import pekko.stream.impl.fusing.GraphStages._
 import pekko.stream.stage.GraphStageWithMaterializedValue
 import pekko.util.ConstantFun
@@ -356,7 +356,7 @@ object Source {
    * beginning) regardless of when they subscribed.
    */
   def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] =
-    
single(iterable).mapConcat(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.iterableSource)
+    fromGraph(new 
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
 
   /**
    * Starts a new `Source` from the given `Future`. The stream will consist of
@@ -419,8 +419,7 @@ object Source {
    * Create a `Source` that will continually emit the given element.
    */
   def repeat[T](element: T): Source[T, NotUsed] = {
-    val next = Some((element, element))
-    unfold(element)(_ => next).withAttributes(DefaultAttributes.repeat)
+    fromIterator(() => 
Iterator.continually(element)).withAttributes(DefaultAttributes.repeat)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to