This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new b803c64 Ensure that Result-ack is sent before Completion-ack. (#4115)
b803c64 is described below
commit b803c64366e273a0747c25c55873eb733a42f793
Author: Christian Bickel <[email protected]>
AuthorDate: Fri Nov 23 22:29:26 2018 +0100
Ensure that Result-ack is sent before Completion-ack. (#4115)
Improves comments to clarify the ordering of result and completion messages.
Adds a type alias for the active ack messages, and document the interface.
Co-authored-by: Christian Bickel <[email protected]>
Co-authored-by: Rodric Rabbah <[email protected]>
---
.../core/containerpool/ContainerProxy.scala | 29 +++++++++++++++++-----
.../openwhisk/core/invoker/InvokerReactive.scala | 29 +++++++++++++++++-----
2 files changed, 46 insertions(+), 12 deletions(-)
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 0c126a2..0fdea5e 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -24,6 +24,7 @@ import akka.actor.{FSM, Props, Stash}
import akka.event.Logging.InfoLevel
import akka.pattern.pipe
import pureconfig.loadConfigOrThrow
+
import scala.collection.immutable
import spray.json.DefaultJsonProtocol._
import spray.json._
@@ -35,6 +36,7 @@ import org.apache.openwhisk.core.database.UserContext
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.invoker.InvokerReactive.ActiveAck
import org.apache.openwhisk.http.Messages
import scala.concurrent.Future
@@ -127,7 +129,7 @@ case object RunCompleted
*/
class ContainerProxy(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) =>
Future[Container],
- sendActiveAck: (TransactionId, WhiskActivation, Boolean,
ControllerInstanceId, UUID, Boolean) => Future[Any],
+ sendActiveAck: ActiveAck,
storeActivation: (TransactionId, WhiskActivation, UserContext) =>
Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container,
ExecutableWhiskAction) => Future[ActivationLogs],
instance: InvokerInstanceId,
@@ -496,10 +498,16 @@ class ContainerProxy(
ActivationResponse.whiskError(Messages.abnormalRun))
}
- // Sending active ack. Entirely asynchronous and not waited upon.
- if (job.msg.blocking) {
- activation.foreach(
+ // Sending an active ack is an asynchronous operation. The result is
forwarded as soon as
+ // possible for blocking activations so that dependent activations can be
scheduled. The
+ // completion message which frees a load balancer slot is sent after the
active ack future
+ // completes to ensure proper ordering.
+ val sendResult = if (job.msg.blocking) {
+ activation.map(
sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex,
job.msg.user.namespace.uuid, false))
+ } else {
+ // For non-blocking request, do not forward the result.
+ Future.successful(())
}
val context = UserContext(job.msg.user)
@@ -530,8 +538,17 @@ class ContainerProxy(
activationWithLogs
.map(_.fold(_.activation, identity))
.foreach { activation =>
- // Sending the completionMessage to the controller asynchronously.
- sendActiveAck(tid, activation, job.msg.blocking,
job.msg.rootControllerIndex, job.msg.user.namespace.uuid, true)
+ // Sending the completion message to the controller after the active
ack ensures proper ordering
+ // (result is received before the completion message for blocking
invokes).
+ sendResult.onComplete(
+ _ =>
+ sendActiveAck(
+ tid,
+ activation,
+ job.msg.blocking,
+ job.msg.rootControllerIndex,
+ job.msg.user.namespace.uuid,
+ true))
// Storing the record. Entirely asynchronous and not waited upon.
storeActivation(tid, activation, context)
}
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index da86843..ab30510 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -43,6 +43,23 @@ import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
+object InvokerReactive {
+
+ /**
+ * An method for sending Active Acknowledgements (aka "active ack") messages
to the load balancer. These messages
+ * are either completion messages for an activation to indicate a resource
slot is free, or result-forwarding
+ * messages for continuations (e.g., sequences and conductor actions).
+ *
+ * @param TransactionId the transaction id for the activation
+ * @param WhiskActivaiton is the activation result
+ * @param Boolean is true iff the activation was a blocking request
+ * @param ControllerInstanceId the originating controller/loadbalancer id
+ * @param UUID is the UUID for the namespace owning the activation
+ * @param Boolean is true this is resource free message and false if this is
a result forwarding message
+ */
+ type ActiveAck = (TransactionId, WhiskActivation, Boolean,
ControllerInstanceId, UUID, Boolean) => Future[Any]
+}
+
class InvokerReactive(
config: WhiskConfig,
instance: InvokerInstanceId,
@@ -115,12 +132,12 @@ class InvokerReactive(
})
/** Sends an active-ack. */
- private val ack = (tid: TransactionId,
- activationResult: WhiskActivation,
- blockingInvoke: Boolean,
- controllerInstance: ControllerInstanceId,
- userId: UUID,
- isSlotFree: Boolean) => {
+ private val ack: InvokerReactive.ActiveAck = (tid: TransactionId,
+ activationResult:
WhiskActivation,
+ blockingInvoke: Boolean,
+ controllerInstance:
ControllerInstanceId,
+ userId: UUID,
+ isSlotFree: Boolean) => {
implicit val transid: TransactionId = tid
def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean =
false) = {