This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git

The following commit(s) were added to refs/heads/master by this push:
       new  6130c5c   Correctly recover from errors when fetching an action
6130c5c is described below

commit 6130c5ce94b7990f809a7da84126d98abffb02ca
Author: Markus Thoemmes <[email protected]>
AuthorDate: Wed May 17 08:12:36 2017 +0200

    Correctly recover from errors when fetching an action
    
    If the invoker fails to fetch an action from the database it needs to 
inform the ActivationFeed that it still has resources (it didn't consume a 
container after all).
    
    Reporting of those errors needs some disambiguation, as an error on 
fetching the action could also be caused by the user (for example by 
concurrently deleting the action while invoking it). The InvokerHealth protocol 
would shutdown the invoker, iff the activation was reported as a WHISK_ERROR. 
All errors but DocumentNotFound are considered WHISK_ERRORs though.
    
    Also in this commit:
    - Loglevel change from error to warn for a missing revision. The 
InvokerHealth protocol produces ERROR otherwise.
    - Some documentation and restructuring of the ContainerPool's setup.
---
 .../src/main/scala/whisk/http/ErrorResponse.scala  | 14 ++++++-----
 .../scala/whisk/core/invoker/InvokerReactive.scala | 29 ++++++++++++++++++----
 2 files changed, 32 insertions(+), 11 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala 
b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
index 98b2490..d89d749 100644
--- a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
+++ b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
@@ -155,6 +155,8 @@ object Messages {
             if (!init) "." else " during initialization."
         }
     }
+
+    val actionRemovedWhileInvoking = "Action could not be found or may have 
been deleted."
 }
 
 /** Replaces rejections with Json object containing cause and transaction id. 
*/
@@ -171,13 +173,13 @@ object ErrorResponse extends Directives {
         } getOrElse None)
     }
 
-     def terminate(status: StatusCode, error: Option[ErrorResponse] = None, 
asJson: Boolean = true)(implicit transid: TransactionId): StandardRoute = {
+    def terminate(status: StatusCode, error: Option[ErrorResponse] = None, 
asJson: Boolean = true)(implicit transid: TransactionId): StandardRoute = {
         val errorResponse = error getOrElse response(status)
-            if (asJson) {
-                complete(status, errorResponse)
-            } else {
-                complete(status, s"${errorResponse.error} (code: 
${errorResponse.code})")
-            }
+        if (asJson) {
+            complete(status, errorResponse)
+        } else {
+            complete(status, s"${errorResponse.error} (code: 
${errorResponse.code})")
+        }
     }
 
     def response(status: StatusCode)(implicit transid: TransactionId): 
ErrorResponse = status match {
diff --git 
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index dc050ee..f6ab973 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -46,6 +46,10 @@ import whisk.core.dispatcher.MessageHandler
 import whisk.core.entity._
 import whisk.core.entity.ExecManifest.ImageName
 import whisk.core.entity.size._
+import whisk.core.dispatcher.ActivationFeed.ContainerReleased
+import whisk.core.containerpool.ContainerPool
+import whisk.core.database.NoDocumentException
+import whisk.http.Messages
 
 class InvokerReactive(
     config: WhiskConfig,
@@ -67,6 +71,7 @@ class InvokerReactive(
         val cleaning = docker.ps(Seq("name" -> 
"wsk_"))(TransactionId.invokerNanny).flatMap { containers =>
             val removals = containers.map { id =>
                 runc.resume(id)(TransactionId.invokerNanny).recoverWith {
+                    // Ignore resume failures and try to remove anyway
                     case _ => Future.successful(())
                 }.flatMap {
                     _ => docker.rm(id)(TransactionId.invokerNanny)
@@ -80,6 +85,7 @@ class InvokerReactive(
     cleanup()
     sys.addShutdownHook(cleanup())
 
+    /** Factory used by the ContainerProxy to physically create a new 
container. */
     val containerFactory = (tid: TransactionId, name: String, actionImage: 
ImageName, userProvidedImage: Boolean, memory: ByteSize) => {
         val image = if (userProvidedImage) {
             actionImage.publicImageName
@@ -99,6 +105,7 @@ class InvokerReactive(
             name = Some(name))
     }
 
+    /** Sends an active-ack. */
     val ack = (tid: TransactionId, activation: WhiskActivation) => {
         implicit val transid = tid
         producer.send("completed", CompletionMessage(tid, activation, 
s"invoker$instance")).andThen {
@@ -106,6 +113,7 @@ class InvokerReactive(
         }
     }
 
+    /** Stores an activation in the database. */
     val store = (tid: TransactionId, activation: WhiskActivation) => {
         implicit val transid = tid
         logging.info(this, "recording the activation result to the data store")
@@ -115,18 +123,21 @@ class InvokerReactive(
         }
     }
 
+    /** Creates a ContainerProxy Actor when being called. */
+    val childFactory = (f: ActorRefFactory) => 
f.actorOf(ContainerProxy.props(containerFactory, ack, store))
+
     val prewarmKind = "nodejs:6"
     val prewarmExec = 
ExecManifest.runtimesManifest.resolveDefaultRuntime(prewarmKind).map { manifest 
=>
         new CodeExecAsString(manifest, "", None)
     }.get
 
-    val childFactory = (f: ActorRefFactory) => 
f.actorOf(ContainerProxy.props(containerFactory, ack, store))
-    val pool = 
actorSystem.actorOf(whisk.core.containerpool.ContainerPool.props(
+    val pool = actorSystem.actorOf(ContainerPool.props(
         childFactory,
         OldContainerPool.getDefaultMaxActive(config),
         activationFeed,
         Some(PrewarmingConfig(2, prewarmExec, 256.MB))))
 
+    /** Is called when an ActivationMessage is read from Kafka */
     override def onMessage(msg: ActivationMessage)(implicit transid: 
TransactionId): Future[Unit] = {
         require(msg != null, "message undefined")
         require(msg.action.version.isDefined, "action version undefined")
@@ -144,7 +155,7 @@ class InvokerReactive(
         // action will not hit in the cache due to change in the revision id;
         // if the doc revision is missing, then bypass cache
         if (actionid.rev == DocRevision.empty) {
-            logging.error(this, s"revision was not provided for 
${actionid.id}")
+            logging.warn(this, s"revision was not provided for ${actionid.id}")
         }
         WhiskAction.get(entityStore, actionid.id, actionid.rev, fromCache = 
actionid.rev != DocRevision.empty).flatMap { action =>
             action.toExecutableWhiskAction match {
@@ -156,7 +167,14 @@ class InvokerReactive(
                     Future.failed(new IllegalStateException())
             }
         }.recover {
-            case _ =>
+            case t =>
+                // If the action cannot be found, the user has concurrently 
deleted it,
+                // making this an application error. All other errors are 
considered system
+                // errors and should cause the invoker to be considered 
unhealthy.
+                val response = t match {
+                    case _: NoDocumentException => 
ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking)
+                    case _                      => 
ActivationResponse.whiskError(Messages.actionRemovedWhileInvoking)
+                }
                 val interval = Interval.zero
                 val causedBy = if (msg.causedBySequence) 
Parameters("causedBy", "sequence".toJson) else Parameters()
                 val activation = WhiskActivation(
@@ -169,11 +187,12 @@ class InvokerReactive(
                     start = interval.start,
                     end = interval.end,
                     duration = Some(interval.duration.toMillis),
-                    response = ActivationResponse.applicationError("action 
could not be found"),
+                    response = response,
                     annotations = {
                         Parameters("path", msg.action.toString.toJson) ++ 
causedBy
                     })
 
+                activationFeed ! ContainerReleased
                 ack(msg.transid, activation)
                 store(msg.transid, activation)
         }

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to