This is an automated email from the ASF dual-hosted git repository.
dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new e510326 Make activation polling for blocking invocations configurable
(#4088)
e510326 is described below
commit e5103265887e034aee6732e8776c3b95f7ba8742
Author: James Dubee <[email protected]>
AuthorDate: Wed Jan 9 14:04:07 2019 -0500
Make activation polling for blocking invocations configurable (#4088)
* Make activation polling for blocking invocations configurable
* Update Tests
---
ansible/roles/controller/tasks/deploy.yml | 2 +
common/scala/src/main/resources/application.conf | 6 ++
.../org/apache/openwhisk/core/WhiskConfig.scala | 2 +
.../core/controller/actions/PrimitiveActions.scala | 20 +++-
.../core/controller/test/ActionsApiTests.scala | 118 +++++++++++----------
5 files changed, 90 insertions(+), 58 deletions(-)
diff --git a/ansible/roles/controller/tasks/deploy.yml
b/ansible/roles/controller/tasks/deploy.yml
index 0fb2ebb..b1b3a0e 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -245,6 +245,8 @@
"CONFIG_whisk_transactions_header": "{{ transactions.header }}"
+ "CONFIG_whisk_controller_activation_pollingFromDb": "{{
controller_activation_pollingFromDb | default(true) }}"
+
- name: merge extra env variables
set_fact:
env: "{{ env | combine(controller.extraEnv) }}"
diff --git a/common/scala/src/main/resources/application.conf
b/common/scala/src/main/resources/application.conf
index 40049f8..72f64fa 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -300,6 +300,12 @@ whisk {
# sample-rate = "0.01" // sample 1% of requests by default
#}
}
+
+ controller {
+ activation {
+ polling-from-db: true
+ }
+ }
}
#placeholder for test overrides so that tests can override defaults in
application.conf (todo: move all defaults to reference.conf)
test {
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 8dd7810..8061b2a 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -241,4 +241,6 @@ object ConfigKeys {
val query = "whisk.query-limit"
val execSizeLimit = "whisk.exec-size-limit"
+ val controller = s"whisk.controller"
+ val controllerActivation = s"$controller.activation"
}
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
index 1d2ca0c..b508442 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
@@ -22,6 +22,7 @@ import java.time.{Clock, Instant}
import akka.actor.ActorSystem
import akka.event.Logging.InfoLevel
import spray.json._
+
import org.apache.openwhisk.common.tracing.WhiskTracerProvider
import org.apache.openwhisk.common.{Logging, LoggingMarkers, TransactionId,
UserEvents}
import org.apache.openwhisk.core.connector.{ActivationMessage, EventMessage,
MessagingProvider}
@@ -35,6 +36,7 @@ import org.apache.openwhisk.core.entity.types.EntityStore
import org.apache.openwhisk.http.Messages._
import org.apache.openwhisk.spi.SpiLoader
import org.apache.openwhisk.utils.ExecutionContextFactory.FutureExtensions
+import org.apache.openwhisk.core.ConfigKeys
import scala.collection.mutable.Buffer
import scala.concurrent.duration._
@@ -42,6 +44,8 @@ import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps
import scala.util.{Failure, Success}
+import pureconfig.loadConfigOrThrow
+
protected[actions] trait PrimitiveActions {
/** The core collections require backend services to be injected in this
trait. */
services: WhiskServices =>
@@ -593,11 +597,15 @@ protected[actions] trait PrimitiveActions {
// in case of an incomplete active-ack (record too large for example).
activeAckResponse.foreach {
case Right(activation) => result.trySuccess(Right(activation))
- case _ => pollActivation(docid, context, result, i =>
1.seconds + (2.seconds * i), maxRetries = 4)
+ case _ if (controllerActivationConfig.pollingFromDb) =>
+ pollActivation(docid, context, result, i => 1.seconds + (2.seconds *
i), maxRetries = 4)
+ case _ =>
}
- // 2. Poll the database slowly in case the active-ack never arrives
- pollActivation(docid, context, result, _ => 15.seconds)
+ if (controllerActivationConfig.pollingFromDb) {
+ // 2. Poll the database slowly in case the active-ack never arrives
+ pollActivation(docid, context, result, _ => 15.seconds)
+ }
// 3. Timeout forces a fallback to activationId
val timeout =
actorSystem.scheduler.scheduleOnce(totalWaitTime)(result.trySuccess(Left(activationId)))
@@ -644,4 +652,10 @@ protected[actions] trait PrimitiveActions {
/** Max atomic action count allowed for sequences */
private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt
+
+ private val controllerActivationConfig =
+
loadConfigOrThrow[ControllerActivationConfig](ConfigKeys.controllerActivation)
+
}
+
+case class ControllerActivationConfig(pollingFromDb: Boolean)
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
index ce8a1bf..b69536b 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
@@ -36,9 +36,12 @@ import org.apache.openwhisk.core.entitlement.Collection
import org.apache.openwhisk.http.ErrorResponse
import org.apache.openwhisk.http.Messages
import org.apache.openwhisk.core.database.UserContext
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.controller.actions.ControllerActivationConfig
import akka.http.scaladsl.model.headers.RawHeader
import org.apache.commons.lang3.StringUtils
import org.apache.openwhisk.core.entity.Attachments.Inline
+import pureconfig.loadConfigOrThrow
/**
* Tests Actions API.
@@ -65,6 +68,7 @@ class ActionsApiTests extends ControllerTestCommon with
WhiskActionsApi {
def aname() = MakeName.next("action_tests")
val actionLimit = Exec.sizeLimit
val parametersLimit = Parameters.sizeLimit
+ val controllerActivationConfig =
loadConfigOrThrow[ControllerActivationConfig](ConfigKeys.controllerActivation)
//// GET /actions
it should "return empty list when no actions exist" in {
@@ -1360,39 +1364,41 @@ class ActionsApiTests extends ControllerTestCommon with
WhiskActionsApi {
}
}
- it should "invoke an action, blocking and retrieve result via db polling" in
{
- implicit val tid = transid()
- val action = WhiskAction(namespace, aname(), jsDefault("??"))
- val activation = WhiskActivation(
- action.namespace,
- action.name,
- creds.subject,
- activationIdFactory.make(),
- start = Instant.now,
- end = Instant.now,
- response = ActivationResponse.success(Some(JsObject("test" ->
"yes".toJson))),
- logs = ActivationLogs(Vector("first line", "second line")))
- put(entityStore, action)
- // storing the activation in the db will allow the db polling to retrieve
it
- // the test harness makes sure the activation id observed by the test
matches
- // the one generated by the api handler
- storeActivation(activation, context)
- try {
- Post(s"$collectionPath/${action.name}?blocking=true") ~>
Route.seal(routes(creds)) ~> check {
- status should be(OK)
- val response = responseAs[JsObject]
- response should be(activation.withoutLogs.toExtendedJson)
- }
+ if (controllerActivationConfig.pollingFromDb) {
+ it should "invoke an action, blocking and retrieve result via db polling"
in {
+ implicit val tid = transid()
+ val action = WhiskAction(namespace, aname(), jsDefault("??"))
+ val activation = WhiskActivation(
+ action.namespace,
+ action.name,
+ creds.subject,
+ activationIdFactory.make(),
+ start = Instant.now,
+ end = Instant.now,
+ response = ActivationResponse.success(Some(JsObject("test" ->
"yes".toJson))),
+ logs = ActivationLogs(Vector("first line", "second line")))
+ put(entityStore, action)
+ // storing the activation in the db will allow the db polling to
retrieve it
+ // the test harness makes sure the activation id observed by the test
matches
+ // the one generated by the api handler
+ storeActivation(activation, context)
+ try {
+ Post(s"$collectionPath/${action.name}?blocking=true") ~>
Route.seal(routes(creds)) ~> check {
+ status should be(OK)
+ val response = responseAs[JsObject]
+ response should be(activation.withoutLogs.toExtendedJson)
+ }
- // repeat invoke, get only result back
- Post(s"$collectionPath/${action.name}?blocking=true&result=true") ~>
Route.seal(routes(creds)) ~> check {
- status should be(OK)
- val response = responseAs[JsObject]
- response should be(activation.resultAsJson)
- headers should contain(RawHeader(ActivationIdHeader,
activation.activationId.asString))
+ // repeat invoke, get only result back
+ Post(s"$collectionPath/${action.name}?blocking=true&result=true") ~>
Route.seal(routes(creds)) ~> check {
+ status should be(OK)
+ val response = responseAs[JsObject]
+ response should be(activation.resultAsJson)
+ headers should contain(RawHeader(ActivationIdHeader,
activation.activationId.asString))
+ }
+ } finally {
+ deleteActivation(ActivationId(activation.docid.asString), context)
}
- } finally {
- deleteActivation(ActivationId(activation.docid.asString), context)
}
}
@@ -1477,31 +1483,33 @@ class ActionsApiTests extends ControllerTestCommon with
WhiskActionsApi {
}
}
- it should "invoke a blocking action and return error response when
activation fails" in {
- implicit val tid = transid()
- val action = WhiskAction(namespace, aname(), jsDefault("??"))
- val activation = WhiskActivation(
- action.namespace,
- action.name,
- creds.subject,
- activationIdFactory.make(),
- start = Instant.now,
- end = Instant.now,
- response = ActivationResponse.whiskError("test"))
- put(entityStore, action)
- // storing the activation in the db will allow the db polling to retrieve
it
- // the test harness makes sure the activation id observed by the test
matches
- // the one generated by the api handler
- storeActivation(activation, context)
- try {
- Post(s"$collectionPath/${action.name}?blocking=true") ~>
Route.seal(routes(creds)) ~> check {
- status should be(InternalServerError)
- val response = responseAs[JsObject]
- response should be(activation.withoutLogs.toExtendedJson)
- headers should contain(RawHeader(ActivationIdHeader,
response.fields("activationId").convertTo[String]))
+ if (controllerActivationConfig.pollingFromDb) {
+ it should "invoke a blocking action and return error response when
activation fails" in {
+ implicit val tid = transid()
+ val action = WhiskAction(namespace, aname(), jsDefault("??"))
+ val activation = WhiskActivation(
+ action.namespace,
+ action.name,
+ creds.subject,
+ activationIdFactory.make(),
+ start = Instant.now,
+ end = Instant.now,
+ response = ActivationResponse.whiskError("test"))
+ put(entityStore, action)
+ // storing the activation in the db will allow the db polling to
retrieve it
+ // the test harness makes sure the activation id observed by the test
matches
+ // the one generated by the api handler
+ storeActivation(activation, context)
+ try {
+ Post(s"$collectionPath/${action.name}?blocking=true") ~>
Route.seal(routes(creds)) ~> check {
+ status should be(InternalServerError)
+ val response = responseAs[JsObject]
+ response should be(activation.withoutLogs.toExtendedJson)
+ headers should contain(RawHeader(ActivationIdHeader,
response.fields("activationId").convertTo[String]))
+ }
+ } finally {
+ deleteActivation(ActivationId(activation.docid.asString), context)
}
- } finally {
- deleteActivation(ActivationId(activation.docid.asString), context)
}
}