markusthoemmes commented on a change in pull request #2602: Remove old invoker
code and refactor accordingly.
URL:
https://github.com/apache/incubator-openwhisk/pull/2602#discussion_r133458130
##########
File path: core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
##########
@@ -147,70 +156,74 @@ class InvokerReactive(
val pool = actorSystem.actorOf(ContainerPool.props(
childFactory,
- OldContainerPool.getDefaultMaxActive(config),
- OldContainerPool.getDefaultMaxActive(config),
+ maximumContainers,
+ maximumContainers,
activationFeed,
Some(PrewarmingConfig(2, prewarmExec, 256.MB))))
/** Is called when an ActivationMessage is read from Kafka */
- override def onMessage(msg: ActivationMessage)(implicit transid:
TransactionId): Future[Unit] = {
- require(msg != null, "message undefined")
- require(msg.action.version.isDefined, "action version undefined")
-
- val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION)
- val namespace = msg.action.path
- val name = msg.action.name
- val actionid = FullyQualifiedEntityName(namespace,
name).toDocId.asDocInfo(msg.revision)
- val tran = Transaction(msg)
- val subject = msg.user.subject
-
- logging.info(this, s"${actionid.id} $subject ${msg.activationId}")
-
- // caching is enabled since actions have revision id and an updated
- // action will not hit in the cache due to change in the revision id;
- // if the doc revision is missing, then bypass cache
- if (actionid.rev == DocRevision.empty) {
- logging.warn(this, s"revision was not provided for ${actionid.id}")
- }
- WhiskAction.get(entityStore, actionid.id, actionid.rev, fromCache =
actionid.rev != DocRevision.empty).flatMap { action =>
- action.toExecutableWhiskAction match {
- case Some(executable) =>
- pool ! Run(executable, msg)
- Future.successful(())
- case None =>
- logging.error(this, s"non-executable action reached the
invoker ${action.fullyQualifiedName(false)}")
- Future.failed(new IllegalStateException())
- }
- }.recover {
- case t =>
- // If the action cannot be found, the user has concurrently
deleted it,
- // making this an application error. All other errors are
considered system
- // errors and should cause the invoker to be considered
unhealthy.
- val response = t match {
- case _: NoDocumentException =>
ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking)
- case _ =>
ActivationResponse.whiskError(Messages.actionRemovedWhileInvoking)
+ def processActivationMessage(bytes: Array[Byte]): Future[Unit] = {
+ Future(ActivationMessage.parse(new String(bytes,
StandardCharsets.UTF_8)))
+ .flatMap(Future.fromTry(_))
+ .filter(_.action.version.isDefined)
+ .flatMap { msg =>
+ implicit val transid = msg.transid
+
+ val start = transid.started(this,
LoggingMarkers.INVOKER_ACTIVATION)
+ val namespace = msg.action.path
+ val name = msg.action.name
+ val actionid = FullyQualifiedEntityName(namespace,
name).toDocId.asDocInfo(msg.revision)
+ val subject = msg.user.subject
+
+ logging.info(this, s"${actionid.id} $subject
${msg.activationId}")
+
+ // caching is enabled since actions have revision id and an
updated
+ // action will not hit in the cache due to change in the
revision id;
+ // if the doc revision is missing, then bypass cache
+ if (actionid.rev == DocRevision.empty) {
+ logging.warn(this, s"revision was not provided for
${actionid.id}")
}
- val interval = Interval.zero
- val causedBy = if (msg.causedBySequence)
Parameters("causedBy", "sequence".toJson) else Parameters()
- val activation = WhiskActivation(
- activationId = msg.activationId,
- namespace = msg.activationNamespace,
- subject = msg.user.subject,
- cause = msg.cause,
- name = msg.action.name,
- version = msg.action.version.getOrElse(SemVer()),
- start = interval.start,
- end = interval.end,
- duration = Some(interval.duration.toMillis),
- response = response,
- annotations = {
- Parameters("path", msg.action.toString.toJson) ++
causedBy
- })
-
- activationFeed ! MessageFeed.Processed
- ack(msg.transid, activation, msg.rootControllerIndex)
- store(msg.transid, activation)
- }
+
+ WhiskAction.get(entityStore, actionid.id, actionid.rev,
fromCache = actionid.rev != DocRevision.empty).flatMap { action =>
+ action.toExecutableWhiskAction match {
+ case Some(executable) =>
+ pool ! Run(executable, msg)
+ Future.successful(())
+ case None =>
+ logging.error(this, s"non-executable action
reached the invoker ${action.fullyQualifiedName(false)}")
+ Future.failed(new IllegalStateException())
Review comment:
We could do that for good measure. Note though that the message wouldn't be
displayed anywhere (at least not today)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services