This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 6effb15ce Add Function Cache Refresh If Invoker Is Running Container
For Function (#5327)
6effb15ce is described below
commit 6effb15ce6f415f8424c0094d67e64078f5c64c1
Author: Brendan Doyle <[email protected]>
AuthorDate: Wed Sep 28 14:16:06 2022 -0400
Add Function Cache Refresh If Invoker Is Running Container For Function
(#5327)
* wip
* wip
* add cache refresh to container proxy
Co-authored-by: Brendan Doyle <[email protected]>
---
.../v2/FunctionPullingContainerProxy.scala | 34 ++++++++++++++++------
1 file changed, 25 insertions(+), 9 deletions(-)
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
index eaa043679..dcb34266d 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
@@ -19,9 +19,8 @@ package org.apache.openwhisk.core.containerpool.v2
import java.net.InetSocketAddress
import java.time.Instant
-
import akka.actor.Status.{Failure => FailureMessage}
-import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
+import akka.actor.{actorRef2Scala, ActorRef, ActorRefFactory, ActorSystem,
FSM, Props, Stash}
import akka.event.Logging.InfoLevel
import akka.io.{IO, Tcp}
import akka.pattern.pipe
@@ -87,6 +86,7 @@ case class Initialized(data: InitializedData)
case class Resumed(data: WarmData)
case class ResumeFailed(data: WarmData)
case class RecreateClient(action: ExecutableWhiskAction)
+case object PingCache
case class DetermineKeepContainer(attempt: Int)
// States
@@ -209,7 +209,8 @@ class FunctionPullingContainerProxy(
private val KeepingTimeoutName = "KeepingTimeout"
private val RunningActivationTimeoutName = "RunningActivationTimeout"
private val runningActivationTimeout = 10.seconds
-
+ private val PingCacheName = "PingCache"
+ private val pingCacheInterval = 1.minute
private var timedOut = false
var healthPingActor: Option[ActorRef] = None //setup after prewarm starts
@@ -374,6 +375,7 @@ class FunctionPullingContainerProxy(
case Event(initializedData: InitializedData, _) =>
context.parent ! Initialized(initializedData)
initializedData.clientProxy ! RequestActivation()
+ startTimerWithFixedDelay(PingCacheName, PingCache, pingCacheInterval)
startSingleTimer(UnusedTimeoutName, StateTimeout, unusedTimeout)
stay() using initializedData
@@ -469,7 +471,7 @@ class FunctionPullingContainerProxy(
data.action.rev,
None)
- case _ => delay
+ case x: Event if x.event != PingCache => delay
}
when(Rescheduling, stateTimeout = 10.seconds) {
@@ -626,7 +628,7 @@ class FunctionPullingContainerProxy(
data.clientProxy ! CloseClientProxy
stay
- case _ => delay
+ case x: Event if x.event != PingCache => delay
}
when(Pausing) {
@@ -653,7 +655,7 @@ class FunctionPullingContainerProxy(
data.action.rev,
Some(data.clientProxy))
- case _ => delay
+ case x: Event if x.event != PingCache => delay
}
when(Paused) {
@@ -745,7 +747,8 @@ class FunctionPullingContainerProxy(
data.action.fullyQualifiedName(false),
data.action.rev,
Some(data.clientProxy))
- case _ => delay
+
+ case x: Event if x.event != PingCache => delay
}
when(Removing, unusedTimeout) {
@@ -768,6 +771,20 @@ class FunctionPullingContainerProxy(
stay()
}
+ whenUnhandled {
+ case Event(PingCache, data: WarmData) =>
+ val actionId =
data.action.fullyQualifiedName(false).toDocId.asDocInfo(data.revision)
+ get(entityStore, actionId.id, actionId.rev, true).map(_ => {
+ logging.debug(
+ this,
+ s"Refreshed function cache for action ${data.action} from container
${data.container.containerId}.")
+ })
+ stay
+ case Event(PingCache, _) =>
+ logging.debug(this, "Container is not warm, ignore function cache ping.")
+ stay
+ }
+
onTransition {
case _ -> Uninitialized => unstashAll()
case _ -> CreatingContainer => unstashAll()
@@ -823,7 +840,7 @@ class FunctionPullingContainerProxy(
fqn: FullyQualifiedEntityName,
revision: DocRevision,
clientProxy: Option[ActorRef]): State = {
-
+ cancelTimer(PingCacheName)
dataManagementService ! UnregisterData(
s"${ContainerKeys.existingContainers(invocationNamespace, fqn, revision,
Some(instance), Some(container.containerId))}")
@@ -831,7 +848,6 @@ class FunctionPullingContainerProxy(
}
private def cleanUp(container: Container, clientProxy: Option[ActorRef],
replacePrewarm: Boolean = true): State = {
-
context.parent ! ContainerRemoved(replacePrewarm)
val unpause = stateName match {
case Paused => container.resume()(TransactionId.invokerNanny)