This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 6effb15ce Add Function Cache Refresh If Invoker Is Running Container 
For Function (#5327)
6effb15ce is described below

commit 6effb15ce6f415f8424c0094d67e64078f5c64c1
Author: Brendan Doyle <[email protected]>
AuthorDate: Wed Sep 28 14:16:06 2022 -0400

    Add Function Cache Refresh If Invoker Is Running Container For Function 
(#5327)
    
    * wip
    
    * wip
    
    * add cache refresh to container proxy
    
    Co-authored-by: Brendan Doyle <[email protected]>
---
 .../v2/FunctionPullingContainerProxy.scala         | 34 ++++++++++++++++------
 1 file changed, 25 insertions(+), 9 deletions(-)

diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
index eaa043679..dcb34266d 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
@@ -19,9 +19,8 @@ package org.apache.openwhisk.core.containerpool.v2
 
 import java.net.InetSocketAddress
 import java.time.Instant
-
 import akka.actor.Status.{Failure => FailureMessage}
-import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.actor.{actorRef2Scala, ActorRef, ActorRefFactory, ActorSystem, 
FSM, Props, Stash}
 import akka.event.Logging.InfoLevel
 import akka.io.{IO, Tcp}
 import akka.pattern.pipe
@@ -87,6 +86,7 @@ case class Initialized(data: InitializedData)
 case class Resumed(data: WarmData)
 case class ResumeFailed(data: WarmData)
 case class RecreateClient(action: ExecutableWhiskAction)
+case object PingCache
 case class DetermineKeepContainer(attempt: Int)
 
 // States
@@ -209,7 +209,8 @@ class FunctionPullingContainerProxy(
   private val KeepingTimeoutName = "KeepingTimeout"
   private val RunningActivationTimeoutName = "RunningActivationTimeout"
   private val runningActivationTimeout = 10.seconds
-
+  private val PingCacheName = "PingCache"
+  private val pingCacheInterval = 1.minute
   private var timedOut = false
 
   var healthPingActor: Option[ActorRef] = None //setup after prewarm starts
@@ -374,6 +375,7 @@ class FunctionPullingContainerProxy(
     case Event(initializedData: InitializedData, _) =>
       context.parent ! Initialized(initializedData)
       initializedData.clientProxy ! RequestActivation()
+      startTimerWithFixedDelay(PingCacheName, PingCache, pingCacheInterval)
       startSingleTimer(UnusedTimeoutName, StateTimeout, unusedTimeout)
       stay() using initializedData
 
@@ -469,7 +471,7 @@ class FunctionPullingContainerProxy(
         data.action.rev,
         None)
 
-    case _ => delay
+    case x: Event if x.event != PingCache => delay
   }
 
   when(Rescheduling, stateTimeout = 10.seconds) {
@@ -626,7 +628,7 @@ class FunctionPullingContainerProxy(
       data.clientProxy ! CloseClientProxy
       stay
 
-    case _ => delay
+    case x: Event if x.event != PingCache => delay
   }
 
   when(Pausing) {
@@ -653,7 +655,7 @@ class FunctionPullingContainerProxy(
         data.action.rev,
         Some(data.clientProxy))
 
-    case _ => delay
+    case x: Event if x.event != PingCache => delay
   }
 
   when(Paused) {
@@ -745,7 +747,8 @@ class FunctionPullingContainerProxy(
         data.action.fullyQualifiedName(false),
         data.action.rev,
         Some(data.clientProxy))
-    case _ => delay
+
+    case x: Event if x.event != PingCache => delay
   }
 
   when(Removing, unusedTimeout) {
@@ -768,6 +771,20 @@ class FunctionPullingContainerProxy(
       stay()
   }
 
+  whenUnhandled {
+    case Event(PingCache, data: WarmData) =>
+      val actionId = 
data.action.fullyQualifiedName(false).toDocId.asDocInfo(data.revision)
+      get(entityStore, actionId.id, actionId.rev, true).map(_ => {
+        logging.debug(
+          this,
+          s"Refreshed function cache for action ${data.action} from container 
${data.container.containerId}.")
+      })
+      stay
+    case Event(PingCache, _) =>
+      logging.debug(this, "Container is not warm, ignore function cache ping.")
+      stay
+  }
+
   onTransition {
     case _ -> Uninitialized     => unstashAll()
     case _ -> CreatingContainer => unstashAll()
@@ -823,7 +840,7 @@ class FunctionPullingContainerProxy(
                       fqn: FullyQualifiedEntityName,
                       revision: DocRevision,
                       clientProxy: Option[ActorRef]): State = {
-
+    cancelTimer(PingCacheName)
     dataManagementService ! UnregisterData(
       s"${ContainerKeys.existingContainers(invocationNamespace, fqn, revision, 
Some(instance), Some(container.containerId))}")
 
@@ -831,7 +848,6 @@ class FunctionPullingContainerProxy(
   }
 
   private def cleanUp(container: Container, clientProxy: Option[ActorRef], 
replacePrewarm: Boolean = true): State = {
-
     context.parent ! ContainerRemoved(replacePrewarm)
     val unpause = stateName match {
       case Paused => container.resume()(TransactionId.invokerNanny)

Reply via email to