This is an automated email from the ASF dual-hosted git repository. rabbah pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new b803c64 Ensure that Result-ack is sent before Completion-ack. (#4115) b803c64 is described below commit b803c64366e273a0747c25c55873eb733a42f793 Author: Christian Bickel <git...@cbickel.de> AuthorDate: Fri Nov 23 22:29:26 2018 +0100 Ensure that Result-ack is sent before Completion-ack. (#4115) Improves comments to clarify the ordering of result and completion messages. Adds a type alias for the active ack messages, and document the interface. Co-authored-by: Christian Bickel <cbic...@de.ibm.com> Co-authored-by: Rodric Rabbah <rod...@gmail.com> --- .../core/containerpool/ContainerProxy.scala | 29 +++++++++++++++++----- .../openwhisk/core/invoker/InvokerReactive.scala | 29 +++++++++++++++++----- 2 files changed, 46 insertions(+), 12 deletions(-) diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala index 0c126a2..0fdea5e 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala @@ -24,6 +24,7 @@ import akka.actor.{FSM, Props, Stash} import akka.event.Logging.InfoLevel import akka.pattern.pipe import pureconfig.loadConfigOrThrow + import scala.collection.immutable import spray.json.DefaultJsonProtocol._ import spray.json._ @@ -35,6 +36,7 @@ import org.apache.openwhisk.core.database.UserContext import org.apache.openwhisk.core.entity.ExecManifest.ImageName import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ +import org.apache.openwhisk.core.invoker.InvokerReactive.ActiveAck import org.apache.openwhisk.http.Messages import scala.concurrent.Future @@ -127,7 +129,7 @@ case object RunCompleted */ class ContainerProxy( factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container], - sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any], + sendActiveAck: ActiveAck, storeActivation: (TransactionId, WhiskActivation, UserContext) => Future[Any], collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs], instance: InvokerInstanceId, @@ -496,10 +498,16 @@ class ContainerProxy( ActivationResponse.whiskError(Messages.abnormalRun)) } - // Sending active ack. Entirely asynchronous and not waited upon. - if (job.msg.blocking) { - activation.foreach( + // Sending an active ack is an asynchronous operation. The result is forwarded as soon as + // possible for blocking activations so that dependent activations can be scheduled. The + // completion message which frees a load balancer slot is sent after the active ack future + // completes to ensure proper ordering. + val sendResult = if (job.msg.blocking) { + activation.map( sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, false)) + } else { + // For non-blocking request, do not forward the result. + Future.successful(()) } val context = UserContext(job.msg.user) @@ -530,8 +538,17 @@ class ContainerProxy( activationWithLogs .map(_.fold(_.activation, identity)) .foreach { activation => - // Sending the completionMessage to the controller asynchronously. - sendActiveAck(tid, activation, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, true) + // Sending the completion message to the controller after the active ack ensures proper ordering + // (result is received before the completion message for blocking invokes). + sendResult.onComplete( + _ => + sendActiveAck( + tid, + activation, + job.msg.blocking, + job.msg.rootControllerIndex, + job.msg.user.namespace.uuid, + true)) // Storing the record. Entirely asynchronous and not waited upon. storeActivation(tid, activation, context) } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index da86843..ab30510 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -43,6 +43,23 @@ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} +object InvokerReactive { + + /** + * An method for sending Active Acknowledgements (aka "active ack") messages to the load balancer. These messages + * are either completion messages for an activation to indicate a resource slot is free, or result-forwarding + * messages for continuations (e.g., sequences and conductor actions). + * + * @param TransactionId the transaction id for the activation + * @param WhiskActivaiton is the activation result + * @param Boolean is true iff the activation was a blocking request + * @param ControllerInstanceId the originating controller/loadbalancer id + * @param UUID is the UUID for the namespace owning the activation + * @param Boolean is true this is resource free message and false if this is a result forwarding message + */ + type ActiveAck = (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any] +} + class InvokerReactive( config: WhiskConfig, instance: InvokerInstanceId, @@ -115,12 +132,12 @@ class InvokerReactive( }) /** Sends an active-ack. */ - private val ack = (tid: TransactionId, - activationResult: WhiskActivation, - blockingInvoke: Boolean, - controllerInstance: ControllerInstanceId, - userId: UUID, - isSlotFree: Boolean) => { + private val ack: InvokerReactive.ActiveAck = (tid: TransactionId, + activationResult: WhiskActivation, + blockingInvoke: Boolean, + controllerInstance: ControllerInstanceId, + userId: UUID, + isSlotFree: Boolean) => { implicit val transid: TransactionId = tid def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {