This is an automated email from the ASF dual-hosted git repository.
cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 7f571c3 Ensure ResultMessage is processed. (#4135)
7f571c3 is described below
commit 7f571c32bb8f3155c89f1d96fda4320909e097fd
Author: jiangpch <[email protected]>
AuthorDate: Thu Nov 29 15:08:48 2018 +0800
Ensure ResultMessage is processed. (#4135)
---
.../ShardingContainerPoolBalancer.scala | 29 ++++++++++------------
1 file changed, 13 insertions(+), 16 deletions(-)
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 35a4547..4010cc1 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -175,6 +175,8 @@ class ShardingContainerPoolBalancer(
/** State related to invocations and throttling */
protected[loadBalancer] val activations = TrieMap[ActivationId,
ActivationEntry]()
+ protected[loadBalancer] val blockingPromises =
+ TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
private val totalActivations = new LongAdder()
private val totalActivationMemory = new LongAdder()
@@ -262,9 +264,13 @@ class ShardingContainerPoolBalancer(
chosen
.map { invoker =>
- val entry = setupActivation(msg, action, invoker)
+ setupActivation(msg, action, invoker)
sendActivationToInvoker(messageProducer, msg, invoker).map { _ =>
- entry.promise.future
+ if (msg.blocking) {
+ blockingPromises.getOrElseUpdate(msg.activationId,
Promise[Either[ActivationId, WhiskActivation]]()).future
+ } else {
+ Future.successful(Left(msg.activationId))
+ }
}
}
.getOrElse {
@@ -313,8 +319,7 @@ class ShardingContainerPoolBalancer(
action.limits.memory.megabytes.MB,
action.limits.concurrency.maxConcurrent,
action.fullyQualifiedName(true),
- timeoutHandler,
- Promise[Either[ActivationId, WhiskActivation]]())
+ timeoutHandler)
})
}
@@ -387,9 +392,7 @@ class ShardingContainerPoolBalancer(
// Resolve the promise to send the result back to the user
// The activation will be removed from `activations`-map later, when we
receive the completion message, because the
// slot of the invoker is not yet free for new activations.
- activations.get(aid).map { entry =>
- entry.promise.trySuccess(response)
- }
+ blockingPromises.remove(aid).map(_.trySuccess(response))
logging.info(this, s"received result ack for '$aid'")(tid)
}
@@ -422,13 +425,9 @@ class ShardingContainerPoolBalancer(
.foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName,
entry.maxConcurrent, entry.memory.toMB.toInt))
if (!forced) {
entry.timeoutHandler.cancel()
- // If the action was blocking and the Resultmessage has been
received before nothing will happen here.
- // If the action was blocking and the ResultMessage is still
missing, we pass the ActivationId. With this Id,
- // the controller will get the result out of the database.
- // If the action was non-blocking, we will close the promise here.
- entry.promise.trySuccess(Left(aid))
} else {
- entry.promise.tryFailure(new Throwable("no completion ack received"))
+ // remove blocking promise when timeout, if the ResultMessage is
already processed, this will do nothing
+ blockingPromises.remove(aid).foreach(_.tryFailure(new Throwable("no
completion ack received")))
}
logging.info(this, s"${if (!forced) "received" else "forced"}
completion ack for '$aid'")(tid)
@@ -717,7 +716,6 @@ case class
ShardingContainerPoolBalancerConfig(blackboxFraction: Double, timeout
* @param namespaceId namespace that invoked the action
* @param invokerName invoker the action is scheduled to
* @param timeoutHandler times out completion of this activation, should be
canceled on good paths
- * @param promise the promise to be completed by the activation
*/
case class ActivationEntry(id: ActivationId,
namespaceId: UUID,
@@ -725,5 +723,4 @@ case class ActivationEntry(id: ActivationId,
memory: ByteSize,
maxConcurrent: Int,
fullyQualifiedEntityName: FullyQualifiedEntityName,
- timeoutHandler: Cancellable,
- promise: Promise[Either[ActivationId,
WhiskActivation]])
+ timeoutHandler: Cancellable)