This is an automated email from the ASF dual-hosted git repository.
cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 6130c5c Correctly recover from errors when fetching an action
6130c5c is described below
commit 6130c5ce94b7990f809a7da84126d98abffb02ca
Author: Markus Thoemmes <[email protected]>
AuthorDate: Wed May 17 08:12:36 2017 +0200
Correctly recover from errors when fetching an action
If the invoker fails to fetch an action from the database it needs to
inform the ActivationFeed that it still has resources (it didn't consume a
container after all).
Reporting of those errors needs some disambiguation, as an error on
fetching the action could also be caused by the user (for example by
concurrently deleting the action while invoking it). The InvokerHealth protocol
would shutdown the invoker, iff the activation was reported as a WHISK_ERROR.
All errors but DocumentNotFound are considered WHISK_ERRORs though.
Also in this commit:
- Loglevel change from error to warn for a missing revision. The
InvokerHealth protocol produces ERROR otherwise.
- Some documentation and restructuring of the ContainerPool's setup.
---
.../src/main/scala/whisk/http/ErrorResponse.scala | 14 ++++++-----
.../scala/whisk/core/invoker/InvokerReactive.scala | 29 ++++++++++++++++++----
2 files changed, 32 insertions(+), 11 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
index 98b2490..d89d749 100644
--- a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
+++ b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
@@ -155,6 +155,8 @@ object Messages {
if (!init) "." else " during initialization."
}
}
+
+ val actionRemovedWhileInvoking = "Action could not be found or may have
been deleted."
}
/** Replaces rejections with Json object containing cause and transaction id.
*/
@@ -171,13 +173,13 @@ object ErrorResponse extends Directives {
} getOrElse None)
}
- def terminate(status: StatusCode, error: Option[ErrorResponse] = None,
asJson: Boolean = true)(implicit transid: TransactionId): StandardRoute = {
+ def terminate(status: StatusCode, error: Option[ErrorResponse] = None,
asJson: Boolean = true)(implicit transid: TransactionId): StandardRoute = {
val errorResponse = error getOrElse response(status)
- if (asJson) {
- complete(status, errorResponse)
- } else {
- complete(status, s"${errorResponse.error} (code:
${errorResponse.code})")
- }
+ if (asJson) {
+ complete(status, errorResponse)
+ } else {
+ complete(status, s"${errorResponse.error} (code:
${errorResponse.code})")
+ }
}
def response(status: StatusCode)(implicit transid: TransactionId):
ErrorResponse = status match {
diff --git
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index dc050ee..f6ab973 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -46,6 +46,10 @@ import whisk.core.dispatcher.MessageHandler
import whisk.core.entity._
import whisk.core.entity.ExecManifest.ImageName
import whisk.core.entity.size._
+import whisk.core.dispatcher.ActivationFeed.ContainerReleased
+import whisk.core.containerpool.ContainerPool
+import whisk.core.database.NoDocumentException
+import whisk.http.Messages
class InvokerReactive(
config: WhiskConfig,
@@ -67,6 +71,7 @@ class InvokerReactive(
val cleaning = docker.ps(Seq("name" ->
"wsk_"))(TransactionId.invokerNanny).flatMap { containers =>
val removals = containers.map { id =>
runc.resume(id)(TransactionId.invokerNanny).recoverWith {
+ // Ignore resume failures and try to remove anyway
case _ => Future.successful(())
}.flatMap {
_ => docker.rm(id)(TransactionId.invokerNanny)
@@ -80,6 +85,7 @@ class InvokerReactive(
cleanup()
sys.addShutdownHook(cleanup())
+ /** Factory used by the ContainerProxy to physically create a new
container. */
val containerFactory = (tid: TransactionId, name: String, actionImage:
ImageName, userProvidedImage: Boolean, memory: ByteSize) => {
val image = if (userProvidedImage) {
actionImage.publicImageName
@@ -99,6 +105,7 @@ class InvokerReactive(
name = Some(name))
}
+ /** Sends an active-ack. */
val ack = (tid: TransactionId, activation: WhiskActivation) => {
implicit val transid = tid
producer.send("completed", CompletionMessage(tid, activation,
s"invoker$instance")).andThen {
@@ -106,6 +113,7 @@ class InvokerReactive(
}
}
+ /** Stores an activation in the database. */
val store = (tid: TransactionId, activation: WhiskActivation) => {
implicit val transid = tid
logging.info(this, "recording the activation result to the data store")
@@ -115,18 +123,21 @@ class InvokerReactive(
}
}
+ /** Creates a ContainerProxy Actor when being called. */
+ val childFactory = (f: ActorRefFactory) =>
f.actorOf(ContainerProxy.props(containerFactory, ack, store))
+
val prewarmKind = "nodejs:6"
val prewarmExec =
ExecManifest.runtimesManifest.resolveDefaultRuntime(prewarmKind).map { manifest
=>
new CodeExecAsString(manifest, "", None)
}.get
- val childFactory = (f: ActorRefFactory) =>
f.actorOf(ContainerProxy.props(containerFactory, ack, store))
- val pool =
actorSystem.actorOf(whisk.core.containerpool.ContainerPool.props(
+ val pool = actorSystem.actorOf(ContainerPool.props(
childFactory,
OldContainerPool.getDefaultMaxActive(config),
activationFeed,
Some(PrewarmingConfig(2, prewarmExec, 256.MB))))
+ /** Is called when an ActivationMessage is read from Kafka */
override def onMessage(msg: ActivationMessage)(implicit transid:
TransactionId): Future[Unit] = {
require(msg != null, "message undefined")
require(msg.action.version.isDefined, "action version undefined")
@@ -144,7 +155,7 @@ class InvokerReactive(
// action will not hit in the cache due to change in the revision id;
// if the doc revision is missing, then bypass cache
if (actionid.rev == DocRevision.empty) {
- logging.error(this, s"revision was not provided for
${actionid.id}")
+ logging.warn(this, s"revision was not provided for ${actionid.id}")
}
WhiskAction.get(entityStore, actionid.id, actionid.rev, fromCache =
actionid.rev != DocRevision.empty).flatMap { action =>
action.toExecutableWhiskAction match {
@@ -156,7 +167,14 @@ class InvokerReactive(
Future.failed(new IllegalStateException())
}
}.recover {
- case _ =>
+ case t =>
+ // If the action cannot be found, the user has concurrently
deleted it,
+ // making this an application error. All other errors are
considered system
+ // errors and should cause the invoker to be considered
unhealthy.
+ val response = t match {
+ case _: NoDocumentException =>
ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking)
+ case _ =>
ActivationResponse.whiskError(Messages.actionRemovedWhileInvoking)
+ }
val interval = Interval.zero
val causedBy = if (msg.causedBySequence)
Parameters("causedBy", "sequence".toJson) else Parameters()
val activation = WhiskActivation(
@@ -169,11 +187,12 @@ class InvokerReactive(
start = interval.start,
end = interval.end,
duration = Some(interval.duration.toMillis),
- response = ActivationResponse.applicationError("action
could not be found"),
+ response = response,
annotations = {
Parameters("path", msg.action.toString.toJson) ++
causedBy
})
+ activationFeed ! ContainerReleased
ack(msg.transid, activation)
store(msg.transid, activation)
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].