This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 55f065f257 chore: Refactor UnfoldResourceSourceAsync. (#1240)
55f065f257 is described below

commit 55f065f257b8083d9aa434e30af69c48d2ea11ad
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Apr 15 10:44:26 2024 +0800

    chore: Refactor UnfoldResourceSourceAsync. (#1240)
---
 .../stream/impl/UnfoldResourceSourceAsync.scala    | 178 +++++++++++----------
 1 file changed, 90 insertions(+), 88 deletions(-)

diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala
index 7f783be6e8..6f6c186186 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala
@@ -23,8 +23,10 @@ import pekko.annotation.InternalApi
 import pekko.dispatch.ExecutionContexts.parasitic
 import pekko.stream._
 import pekko.stream.ActorAttributes.SupervisionStrategy
+import pekko.stream.Attributes.SourceLocation
 import pekko.stream.impl.Stages.DefaultAttributes
 import pekko.stream.stage._
+import pekko.util.OptionVal
 
 /**
  * INTERNAL API
@@ -36,64 +38,66 @@ import pekko.stream.stage._
     extends GraphStage[SourceShape[T]] {
   val out = Outlet[T]("UnfoldResourceSourceAsync.out")
   override val shape = SourceShape(out)
-  override def initialAttributes: Attributes = 
DefaultAttributes.unfoldResourceSourceAsync
-
-  def createLogic(inheritedAttributes: Attributes) = new 
GraphStageLogic(shape) with OutHandler {
-    lazy val decider = 
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
-    private implicit def ec: ExecutionContext = materializer.executionContext
-    private var state: Option[S] = None
-
-    private val createdCallback = getAsyncCallback[Try[S]] {
-      case Success(resource) =>
-        state = Some(resource)
-        if (isAvailable(out)) onPull()
-      case Failure(t) => failStage(t)
-    }.invokeWithFeedback _
-
-    private val errorHandler: PartialFunction[Throwable, Unit] = {
-      case NonFatal(ex) =>
-        decider(ex) match {
-          case Supervision.Stop =>
-            failStage(ex)
-          case Supervision.Restart =>
-            try {
-              restartResource()
-            } catch {
-              case NonFatal(ex) => failStage(ex)
-            }
-          case Supervision.Resume => onPull()
-        }
-    }
+  override def initialAttributes: Attributes =
+    DefaultAttributes.unfoldResourceSourceAsync and 
SourceLocation.forLambda(create)
+
+  def createLogic(inheritedAttributes: Attributes) =
+    new GraphStageLogic(shape) with OutHandler {
+      private lazy val decider = 
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
+      private implicit def ec: ExecutionContext = materializer.executionContext
+      private var maybeResource: OptionVal[S] = OptionVal.none
+
+      private val createdCallback = getAsyncCallback[Try[S]] {
+        case Success(resource) =>
+          require(resource != null, "`create` method should not return a null 
resource.")
+          maybeResource = OptionVal(resource)
+          if (isAvailable(out)) onPull()
+        case Failure(t) => failStage(t)
+      }.invokeWithFeedback _
+
+      private val errorHandler: PartialFunction[Throwable, Unit] = {
+        case NonFatal(ex) =>
+          decider(ex) match {
+            case Supervision.Stop =>
+              failStage(ex)
+            case Supervision.Restart =>
+              try {
+                restartResource()
+              } catch {
+                case NonFatal(ex) => failStage(ex)
+              }
+            case Supervision.Resume => onPull()
+          }
+      }
 
-    private val readCallback = getAsyncCallback[Try[Option[T]]](handle).invoke 
_
-
-    private def handle(result: Try[Option[T]]): Unit = result match {
-      case Success(data) =>
-        data match {
-          case Some(d) => push(out, d)
-          case None    =>
-            // end of resource reached, lets close it
-            state match {
-              case Some(resource) =>
-                close(resource).onComplete(getAsyncCallback[Try[Done]] {
-                  case Success(Done) => completeStage()
-                  case Failure(ex)   => failStage(ex)
-                }.invoke)(parasitic)
-                state = None
-
-              case None =>
-                // cannot happen, but for good measure
-                throw new IllegalStateException("Reached end of data but there 
is no open resource")
-            }
-        }
-      case Failure(t) => errorHandler(t)
-    }
+      private val readCallback = 
getAsyncCallback[Try[Option[T]]](handle).invoke _
+
+      private def handle(result: Try[Option[T]]): Unit = result match {
+        case Success(data) =>
+          data match {
+            case Some(d) => push(out, d)
+            case None    =>
+              // end of resource reached, lets close it
+              maybeResource match {
+                case OptionVal.Some(resource) =>
+                  close(resource).onComplete(getAsyncCallback[Try[Done]] {
+                    case Success(Done) => completeStage()
+                    case Failure(ex)   => failStage(ex)
+                  }.invoke)(parasitic)
+                  maybeResource = OptionVal.none
+
+                case _ =>
+                  // cannot happen, but for good measure
+                  throw new IllegalStateException("Reached end of data but 
there is no open resource")
+              }
+          }
+        case Failure(t) => errorHandler(t)
+      }
 
-    override def preStart(): Unit = createResource()
+      override def preStart(): Unit = createResource()
 
-    override def onPull(): Unit =
-      state match {
-        case Some(resource) =>
+      override def onPull(): Unit = maybeResource match {
+        case OptionVal.Some(resource) =>
           try {
             val future = readData(resource)
             future.value match {
@@ -101,51 +105,49 @@ import pekko.stream.stage._
               case None        => future.onComplete(readCallback)(parasitic)
             }
           } catch errorHandler
-        case None =>
+
+        case OptionVal.None =>
         // we got a pull but there is no open resource, we are either
         // currently creating/restarting then the read will be triggered when 
creating the
         // resource completes, or shutting down and then the pull does not 
matter anyway
       }
 
-    override def postStop(): Unit = {
-      state.foreach(r => close(r))
-    }
+      override def postStop(): Unit = maybeResource match {
+        case OptionVal.Some(resource) => close(resource)
+        case _                        => // do nothing
+      }
 
-    private def restartResource(): Unit = {
-      state match {
-        case Some(resource) =>
+      private def restartResource(): Unit = maybeResource match {
+        case OptionVal.Some(resource) =>
           // wait for the resource to close before restarting
           close(resource).onComplete(getAsyncCallback[Try[Done]] {
-            case Success(Done) =>
-              createResource()
-            case Failure(ex) => failStage(ex)
+            case Success(Done) => createResource()
+            case Failure(ex)   => failStage(ex)
           }.invoke)(parasitic)
-          state = None
-        case None =>
-          createResource()
+          maybeResource = OptionVal.none
+
+        case _ => createResource()
       }
-    }
 
-    private def createResource(): Unit = {
-      create().onComplete { resource =>
-        createdCallback(resource).failed.foreach {
-          case _: StreamDetachedException =>
-            // stream stopped before created callback could be invoked, we need
-            // to close the resource if it is was opened, to not leak it
-            resource match {
-              case Success(r) =>
-                close(r)
-              case Failure(ex) =>
-                throw ex // failed to open but stream is stopped already
-            }
-          case _ => // we don't care here
-        }
-      }(parasitic)
-    }
+      private def createResource(): Unit = {
+        create().onComplete { resource =>
+          createdCallback(resource).failed.foreach {
+            case _: StreamDetachedException =>
+              // stream stopped before created callback could be invoked, we 
need
+              // to close the resource if it is was opened, to not leak it
+              resource match {
+                case Success(r) =>
+                  close(r)
+                case Failure(ex) =>
+                  throw ex // failed to open but stream is stopped already
+              }
+            case _ => // we don't care here
+          }
+        }(parasitic)
+      }
 
-    setHandler(out, this)
+      setHandler(out, this)
+    }
 
-  }
   override def toString = "UnfoldResourceSourceAsync"
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to