This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 55f065f257 chore: Refactor UnfoldResourceSourceAsync. (#1240)
55f065f257 is described below
commit 55f065f257b8083d9aa434e30af69c48d2ea11ad
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Apr 15 10:44:26 2024 +0800
chore: Refactor UnfoldResourceSourceAsync. (#1240)
---
.../stream/impl/UnfoldResourceSourceAsync.scala | 178 +++++++++++----------
1 file changed, 90 insertions(+), 88 deletions(-)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala
index 7f783be6e8..6f6c186186 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala
@@ -23,8 +23,10 @@ import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts.parasitic
import pekko.stream._
import pekko.stream.ActorAttributes.SupervisionStrategy
+import pekko.stream.Attributes.SourceLocation
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.stage._
+import pekko.util.OptionVal
/**
* INTERNAL API
@@ -36,64 +38,66 @@ import pekko.stream.stage._
extends GraphStage[SourceShape[T]] {
val out = Outlet[T]("UnfoldResourceSourceAsync.out")
override val shape = SourceShape(out)
- override def initialAttributes: Attributes =
DefaultAttributes.unfoldResourceSourceAsync
-
- def createLogic(inheritedAttributes: Attributes) = new
GraphStageLogic(shape) with OutHandler {
- lazy val decider =
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- private implicit def ec: ExecutionContext = materializer.executionContext
- private var state: Option[S] = None
-
- private val createdCallback = getAsyncCallback[Try[S]] {
- case Success(resource) =>
- state = Some(resource)
- if (isAvailable(out)) onPull()
- case Failure(t) => failStage(t)
- }.invokeWithFeedback _
-
- private val errorHandler: PartialFunction[Throwable, Unit] = {
- case NonFatal(ex) =>
- decider(ex) match {
- case Supervision.Stop =>
- failStage(ex)
- case Supervision.Restart =>
- try {
- restartResource()
- } catch {
- case NonFatal(ex) => failStage(ex)
- }
- case Supervision.Resume => onPull()
- }
- }
+ override def initialAttributes: Attributes =
+ DefaultAttributes.unfoldResourceSourceAsync and
SourceLocation.forLambda(create)
+
+ def createLogic(inheritedAttributes: Attributes) =
+ new GraphStageLogic(shape) with OutHandler {
+ private lazy val decider =
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
+ private implicit def ec: ExecutionContext = materializer.executionContext
+ private var maybeResource: OptionVal[S] = OptionVal.none
+
+ private val createdCallback = getAsyncCallback[Try[S]] {
+ case Success(resource) =>
+ require(resource != null, "`create` method should not return a null
resource.")
+ maybeResource = OptionVal(resource)
+ if (isAvailable(out)) onPull()
+ case Failure(t) => failStage(t)
+ }.invokeWithFeedback _
+
+ private val errorHandler: PartialFunction[Throwable, Unit] = {
+ case NonFatal(ex) =>
+ decider(ex) match {
+ case Supervision.Stop =>
+ failStage(ex)
+ case Supervision.Restart =>
+ try {
+ restartResource()
+ } catch {
+ case NonFatal(ex) => failStage(ex)
+ }
+ case Supervision.Resume => onPull()
+ }
+ }
- private val readCallback = getAsyncCallback[Try[Option[T]]](handle).invoke
_
-
- private def handle(result: Try[Option[T]]): Unit = result match {
- case Success(data) =>
- data match {
- case Some(d) => push(out, d)
- case None =>
- // end of resource reached, lets close it
- state match {
- case Some(resource) =>
- close(resource).onComplete(getAsyncCallback[Try[Done]] {
- case Success(Done) => completeStage()
- case Failure(ex) => failStage(ex)
- }.invoke)(parasitic)
- state = None
-
- case None =>
- // cannot happen, but for good measure
- throw new IllegalStateException("Reached end of data but there
is no open resource")
- }
- }
- case Failure(t) => errorHandler(t)
- }
+ private val readCallback =
getAsyncCallback[Try[Option[T]]](handle).invoke _
+
+ private def handle(result: Try[Option[T]]): Unit = result match {
+ case Success(data) =>
+ data match {
+ case Some(d) => push(out, d)
+ case None =>
+ // end of resource reached, lets close it
+ maybeResource match {
+ case OptionVal.Some(resource) =>
+ close(resource).onComplete(getAsyncCallback[Try[Done]] {
+ case Success(Done) => completeStage()
+ case Failure(ex) => failStage(ex)
+ }.invoke)(parasitic)
+ maybeResource = OptionVal.none
+
+ case _ =>
+ // cannot happen, but for good measure
+ throw new IllegalStateException("Reached end of data but
there is no open resource")
+ }
+ }
+ case Failure(t) => errorHandler(t)
+ }
- override def preStart(): Unit = createResource()
+ override def preStart(): Unit = createResource()
- override def onPull(): Unit =
- state match {
- case Some(resource) =>
+ override def onPull(): Unit = maybeResource match {
+ case OptionVal.Some(resource) =>
try {
val future = readData(resource)
future.value match {
@@ -101,51 +105,49 @@ import pekko.stream.stage._
case None => future.onComplete(readCallback)(parasitic)
}
} catch errorHandler
- case None =>
+
+ case OptionVal.None =>
// we got a pull but there is no open resource, we are either
// currently creating/restarting then the read will be triggered when
creating the
// resource completes, or shutting down and then the pull does not
matter anyway
}
- override def postStop(): Unit = {
- state.foreach(r => close(r))
- }
+ override def postStop(): Unit = maybeResource match {
+ case OptionVal.Some(resource) => close(resource)
+ case _ => // do nothing
+ }
- private def restartResource(): Unit = {
- state match {
- case Some(resource) =>
+ private def restartResource(): Unit = maybeResource match {
+ case OptionVal.Some(resource) =>
// wait for the resource to close before restarting
close(resource).onComplete(getAsyncCallback[Try[Done]] {
- case Success(Done) =>
- createResource()
- case Failure(ex) => failStage(ex)
+ case Success(Done) => createResource()
+ case Failure(ex) => failStage(ex)
}.invoke)(parasitic)
- state = None
- case None =>
- createResource()
+ maybeResource = OptionVal.none
+
+ case _ => createResource()
}
- }
- private def createResource(): Unit = {
- create().onComplete { resource =>
- createdCallback(resource).failed.foreach {
- case _: StreamDetachedException =>
- // stream stopped before created callback could be invoked, we need
- // to close the resource if it is was opened, to not leak it
- resource match {
- case Success(r) =>
- close(r)
- case Failure(ex) =>
- throw ex // failed to open but stream is stopped already
- }
- case _ => // we don't care here
- }
- }(parasitic)
- }
+ private def createResource(): Unit = {
+ create().onComplete { resource =>
+ createdCallback(resource).failed.foreach {
+ case _: StreamDetachedException =>
+ // stream stopped before created callback could be invoked, we
need
+ // to close the resource if it is was opened, to not leak it
+ resource match {
+ case Success(r) =>
+ close(r)
+ case Failure(ex) =>
+ throw ex // failed to open but stream is stopped already
+ }
+ case _ => // we don't care here
+ }
+ }(parasitic)
+ }
- setHandler(out, this)
+ setHandler(out, this)
+ }
- }
override def toString = "UnfoldResourceSourceAsync"
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]