This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 5ad70fffeb =str Add IterableSource.
5ad70fffeb is described below
commit 5ad70fffeb3935b6b1b2ccc7f44eb3bb35d956e7
Author: He-Pin <[email protected]>
AuthorDate: Tue Aug 22 12:54:36 2023 +0800
=str Add IterableSource.
Signed-off-by: He-Pin <[email protected]>
---
.../pekko/stream/scaladsl/ReverseArrowSpec.scala | 2 +-
.../apache/pekko/stream/scaladsl/SourceSpec.scala | 52 ++++++++++++++-
.../pekko/stream/impl/fusing/IterableSource.scala | 76 ++++++++++++++++++++++
.../org/apache/pekko/stream/scaladsl/Source.scala | 7 +-
4 files changed, 130 insertions(+), 7 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ReverseArrowSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ReverseArrowSpec.scala
index 9d52f1a18e..9ca05e3b75 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ReverseArrowSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ReverseArrowSpec.scala
@@ -227,7 +227,7 @@ class ReverseArrowSpec extends StreamSpec {
src ~> f
sink2 <~ f
(the[IllegalArgumentException] thrownBy (s <~ f <~
src)).getMessage should include(
- "[StatefulMapConcat.out] is already connected")
+ "[IterableSource.out] is already connected")
ClosedShape
})
.run(),
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 4e31769fcb..a24e492269 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -311,19 +311,67 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
}
"use decider when iterator throws" in {
+
+ // using stopping decider
+ Source
+ .fromIterator(() => (1 to 5).iterator.map(k => if (k != 3) k else
throw TE("a")))
+
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
+ .grouped(10)
+ .runWith(Sink.head)
+ .failed
+ .futureValue shouldBe a[TE]
+
+ // using stopping decider with recover
+ Source
+ .fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else
throw TE("a")))
+
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
+ .recoverWithRetries(1, { case _ => Source.empty })
+ .grouped(10)
+ .runWith(Sink.head)
+ .futureValue shouldBe List(1, 2)
+
+ // failing on every elements, using stopping decider
+ Source
+ .fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b")))
+
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
+ .grouped(10)
+ .runWith(Sink.headOption)
+ .failed
+ .futureValue shouldBe a[TE]
+
+ // failing on every elements, using stopping decider and recover
+ Source
+ .fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b")))
+
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
+ .recoverWithRetries(1, { case _ => Source.empty })
+ .grouped(10)
+ .runWith(Sink.headOption)
+ .futureValue shouldBe None
+
+ // using resuming decider
+ Source
+ .fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else
throw TE("a")))
+
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
+ .grouped(10)
+ .runWith(Sink.head)
+ .futureValue should ===(List(1, 2, 4, 5))
+
+ // using restarting decider
Source
.fromIterator(() => (1 to 5).toIterator.map(k => if (k != 3) k else
throw TE("a")))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.grouped(10)
.runWith(Sink.head)
- .futureValue should ===(List(1, 2))
+ .futureValue should ===(List(1, 2, 1, 2, 1, 2, 1, 2, 1, 2))
+ // with failing on every elements, using restarting decider
Source
.fromIterator(() => (1 to 5).toIterator.map(_ => throw TE("b")))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.grouped(10)
.runWith(Sink.headOption)
- .futureValue should ===(None)
+ .failed
+ .futureValue shouldBe a[TE]
}
}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
new file mode 100644
index 0000000000..0bf99be6e3
--- /dev/null
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import scala.collection.immutable
+import scala.util.control.NonFatal
+
+import org.apache.pekko
+import pekko.stream.{ Attributes, Outlet, SourceShape, Supervision }
+import pekko.stream.ActorAttributes.SupervisionStrategy
+import pekko.stream.impl.ReactiveStreamsCompliance
+import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
+
+private[pekko] final class IterableSource[T](val elements:
immutable.Iterable[T]) extends GraphStage[SourceShape[T]] {
+ ReactiveStreamsCompliance.requireNonNullElement(elements)
+
+ override protected def initialAttributes: Attributes =
DefaultAttributes.iterableSource
+
+ private val out = Outlet[T]("IterableSource.out")
+ override val shape: SourceShape[T] = SourceShape(out)
+
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with OutHandler {
+ private lazy val decider =
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
+ private var currentIterator: Iterator[T] = _
+
+ override def onPull(): Unit =
+ try {
+ if (currentIterator eq null) {
+ currentIterator = elements.iterator
+ }
+ tryPushNextOrComplete()
+ } catch {
+ case NonFatal(ex) =>
+ decider(ex) match {
+ case Supervision.Stop => failStage(ex)
+ case Supervision.Resume => tryPushNextOrComplete()
+ case Supervision.Restart =>
+ currentIterator = elements.iterator
+ tryPushNextOrComplete()
+ }
+ }
+
+ private def tryPushNextOrComplete(): Unit =
+ if (currentIterator.hasNext) {
+ if (isAvailable(out)) {
+ push(out, currentIterator.next())
+ if (!currentIterator.hasNext) {
+ completeStage()
+ }
+ }
+ } else {
+ completeStage()
+ }
+
+ setHandler(out, this)
+ }
+
+ override def toString: String = "IterableSource"
+}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 1d935d990a..cd33cfb799 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -28,7 +28,7 @@ import pekko.annotation.InternalApi
import pekko.stream.{ Outlet, SourceShape, _ }
import pekko.stream.impl.{ PublisherSource, _ }
import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.{ GraphStages, LazyFutureSource,
LazySingleSource }
+import pekko.stream.impl.fusing.{ GraphStages, IterableSource,
LazyFutureSource, LazySingleSource }
import pekko.stream.impl.fusing.GraphStages._
import pekko.stream.stage.GraphStageWithMaterializedValue
import pekko.util.ConstantFun
@@ -356,7 +356,7 @@ object Source {
* beginning) regardless of when they subscribed.
*/
def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] =
-
single(iterable).mapConcat(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.iterableSource)
+ fromGraph(new
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
/**
* Starts a new `Source` from the given `Future`. The stream will consist of
@@ -419,8 +419,7 @@ object Source {
* Create a `Source` that will continually emit the given element.
*/
def repeat[T](element: T): Source[T, NotUsed] = {
- val next = Some((element, element))
- unfold(element)(_ => next).withAttributes(DefaultAttributes.repeat)
+ fromIterator(() =>
Iterator.continually(element)).withAttributes(DefaultAttributes.repeat)
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]