This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new e510326  Make activation polling for blocking invocations configurable 
(#4088)
e510326 is described below

commit e5103265887e034aee6732e8776c3b95f7ba8742
Author: James Dubee <[email protected]>
AuthorDate: Wed Jan 9 14:04:07 2019 -0500

    Make activation polling for blocking invocations configurable (#4088)
    
    * Make activation polling for blocking invocations configurable
    
    * Update Tests
---
 ansible/roles/controller/tasks/deploy.yml          |   2 +
 common/scala/src/main/resources/application.conf   |   6 ++
 .../org/apache/openwhisk/core/WhiskConfig.scala    |   2 +
 .../core/controller/actions/PrimitiveActions.scala |  20 +++-
 .../core/controller/test/ActionsApiTests.scala     | 118 +++++++++++----------
 5 files changed, 90 insertions(+), 58 deletions(-)

diff --git a/ansible/roles/controller/tasks/deploy.yml 
b/ansible/roles/controller/tasks/deploy.yml
index 0fb2ebb..b1b3a0e 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -245,6 +245,8 @@
 
       "CONFIG_whisk_transactions_header": "{{ transactions.header }}"
 
+      "CONFIG_whisk_controller_activation_pollingFromDb": "{{ 
controller_activation_pollingFromDb | default(true) }}"
+
 - name: merge extra env variables
   set_fact:
     env: "{{ env | combine(controller.extraEnv) }}"
diff --git a/common/scala/src/main/resources/application.conf 
b/common/scala/src/main/resources/application.conf
index 40049f8..72f64fa 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -300,6 +300,12 @@ whisk {
         #   sample-rate = "0.01" // sample 1% of requests by default
         #}
     }
+
+    controller {
+        activation {
+            polling-from-db: true
+        }
+    }
 }
 #placeholder for test overrides so that tests can override defaults in 
application.conf (todo: move all defaults to reference.conf)
 test {
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 8dd7810..8061b2a 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -241,4 +241,6 @@ object ConfigKeys {
   val query = "whisk.query-limit"
   val execSizeLimit = "whisk.exec-size-limit"
 
+  val controller = s"whisk.controller"
+  val controllerActivation = s"$controller.activation"
 }
diff --git 
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
 
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
index 1d2ca0c..b508442 100644
--- 
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
+++ 
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
@@ -22,6 +22,7 @@ import java.time.{Clock, Instant}
 import akka.actor.ActorSystem
 import akka.event.Logging.InfoLevel
 import spray.json._
+
 import org.apache.openwhisk.common.tracing.WhiskTracerProvider
 import org.apache.openwhisk.common.{Logging, LoggingMarkers, TransactionId, 
UserEvents}
 import org.apache.openwhisk.core.connector.{ActivationMessage, EventMessage, 
MessagingProvider}
@@ -35,6 +36,7 @@ import org.apache.openwhisk.core.entity.types.EntityStore
 import org.apache.openwhisk.http.Messages._
 import org.apache.openwhisk.spi.SpiLoader
 import org.apache.openwhisk.utils.ExecutionContextFactory.FutureExtensions
+import org.apache.openwhisk.core.ConfigKeys
 
 import scala.collection.mutable.Buffer
 import scala.concurrent.duration._
@@ -42,6 +44,8 @@ import scala.concurrent.{ExecutionContext, Future, Promise}
 import scala.language.postfixOps
 import scala.util.{Failure, Success}
 
+import pureconfig.loadConfigOrThrow
+
 protected[actions] trait PrimitiveActions {
   /** The core collections require backend services to be injected in this 
trait. */
   services: WhiskServices =>
@@ -593,11 +597,15 @@ protected[actions] trait PrimitiveActions {
     //    in case of an incomplete active-ack (record too large for example).
     activeAckResponse.foreach {
       case Right(activation) => result.trySuccess(Right(activation))
-      case _                 => pollActivation(docid, context, result, i => 
1.seconds + (2.seconds * i), maxRetries = 4)
+      case _ if (controllerActivationConfig.pollingFromDb) =>
+        pollActivation(docid, context, result, i => 1.seconds + (2.seconds * 
i), maxRetries = 4)
+      case _ =>
     }
 
-    // 2. Poll the database slowly in case the active-ack never arrives
-    pollActivation(docid, context, result, _ => 15.seconds)
+    if (controllerActivationConfig.pollingFromDb) {
+      // 2. Poll the database slowly in case the active-ack never arrives
+      pollActivation(docid, context, result, _ => 15.seconds)
+    }
 
     // 3. Timeout forces a fallback to activationId
     val timeout = 
actorSystem.scheduler.scheduleOnce(totalWaitTime)(result.trySuccess(Left(activationId)))
@@ -644,4 +652,10 @@ protected[actions] trait PrimitiveActions {
 
   /** Max atomic action count allowed for sequences */
   private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt
+
+  private val controllerActivationConfig =
+    
loadConfigOrThrow[ControllerActivationConfig](ConfigKeys.controllerActivation)
+
 }
+
+case class ControllerActivationConfig(pollingFromDb: Boolean)
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
index ce8a1bf..b69536b 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
@@ -36,9 +36,12 @@ import org.apache.openwhisk.core.entitlement.Collection
 import org.apache.openwhisk.http.ErrorResponse
 import org.apache.openwhisk.http.Messages
 import org.apache.openwhisk.core.database.UserContext
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.controller.actions.ControllerActivationConfig
 import akka.http.scaladsl.model.headers.RawHeader
 import org.apache.commons.lang3.StringUtils
 import org.apache.openwhisk.core.entity.Attachments.Inline
+import pureconfig.loadConfigOrThrow
 
 /**
  * Tests Actions API.
@@ -65,6 +68,7 @@ class ActionsApiTests extends ControllerTestCommon with 
WhiskActionsApi {
   def aname() = MakeName.next("action_tests")
   val actionLimit = Exec.sizeLimit
   val parametersLimit = Parameters.sizeLimit
+  val controllerActivationConfig = 
loadConfigOrThrow[ControllerActivationConfig](ConfigKeys.controllerActivation)
 
   //// GET /actions
   it should "return empty list when no actions exist" in {
@@ -1360,39 +1364,41 @@ class ActionsApiTests extends ControllerTestCommon with 
WhiskActionsApi {
     }
   }
 
-  it should "invoke an action, blocking and retrieve result via db polling" in 
{
-    implicit val tid = transid()
-    val action = WhiskAction(namespace, aname(), jsDefault("??"))
-    val activation = WhiskActivation(
-      action.namespace,
-      action.name,
-      creds.subject,
-      activationIdFactory.make(),
-      start = Instant.now,
-      end = Instant.now,
-      response = ActivationResponse.success(Some(JsObject("test" -> 
"yes".toJson))),
-      logs = ActivationLogs(Vector("first line", "second line")))
-    put(entityStore, action)
-    // storing the activation in the db will allow the db polling to retrieve 
it
-    // the test harness makes sure the activation id observed by the test 
matches
-    // the one generated by the api handler
-    storeActivation(activation, context)
-    try {
-      Post(s"$collectionPath/${action.name}?blocking=true") ~> 
Route.seal(routes(creds)) ~> check {
-        status should be(OK)
-        val response = responseAs[JsObject]
-        response should be(activation.withoutLogs.toExtendedJson)
-      }
+  if (controllerActivationConfig.pollingFromDb) {
+    it should "invoke an action, blocking and retrieve result via db polling" 
in {
+      implicit val tid = transid()
+      val action = WhiskAction(namespace, aname(), jsDefault("??"))
+      val activation = WhiskActivation(
+        action.namespace,
+        action.name,
+        creds.subject,
+        activationIdFactory.make(),
+        start = Instant.now,
+        end = Instant.now,
+        response = ActivationResponse.success(Some(JsObject("test" -> 
"yes".toJson))),
+        logs = ActivationLogs(Vector("first line", "second line")))
+      put(entityStore, action)
+      // storing the activation in the db will allow the db polling to 
retrieve it
+      // the test harness makes sure the activation id observed by the test 
matches
+      // the one generated by the api handler
+      storeActivation(activation, context)
+      try {
+        Post(s"$collectionPath/${action.name}?blocking=true") ~> 
Route.seal(routes(creds)) ~> check {
+          status should be(OK)
+          val response = responseAs[JsObject]
+          response should be(activation.withoutLogs.toExtendedJson)
+        }
 
-      // repeat invoke, get only result back
-      Post(s"$collectionPath/${action.name}?blocking=true&result=true") ~> 
Route.seal(routes(creds)) ~> check {
-        status should be(OK)
-        val response = responseAs[JsObject]
-        response should be(activation.resultAsJson)
-        headers should contain(RawHeader(ActivationIdHeader, 
activation.activationId.asString))
+        // repeat invoke, get only result back
+        Post(s"$collectionPath/${action.name}?blocking=true&result=true") ~> 
Route.seal(routes(creds)) ~> check {
+          status should be(OK)
+          val response = responseAs[JsObject]
+          response should be(activation.resultAsJson)
+          headers should contain(RawHeader(ActivationIdHeader, 
activation.activationId.asString))
+        }
+      } finally {
+        deleteActivation(ActivationId(activation.docid.asString), context)
       }
-    } finally {
-      deleteActivation(ActivationId(activation.docid.asString), context)
     }
   }
 
@@ -1477,31 +1483,33 @@ class ActionsApiTests extends ControllerTestCommon with 
WhiskActionsApi {
     }
   }
 
-  it should "invoke a blocking action and return error response when 
activation fails" in {
-    implicit val tid = transid()
-    val action = WhiskAction(namespace, aname(), jsDefault("??"))
-    val activation = WhiskActivation(
-      action.namespace,
-      action.name,
-      creds.subject,
-      activationIdFactory.make(),
-      start = Instant.now,
-      end = Instant.now,
-      response = ActivationResponse.whiskError("test"))
-    put(entityStore, action)
-    // storing the activation in the db will allow the db polling to retrieve 
it
-    // the test harness makes sure the activation id observed by the test 
matches
-    // the one generated by the api handler
-    storeActivation(activation, context)
-    try {
-      Post(s"$collectionPath/${action.name}?blocking=true") ~> 
Route.seal(routes(creds)) ~> check {
-        status should be(InternalServerError)
-        val response = responseAs[JsObject]
-        response should be(activation.withoutLogs.toExtendedJson)
-        headers should contain(RawHeader(ActivationIdHeader, 
response.fields("activationId").convertTo[String]))
+  if (controllerActivationConfig.pollingFromDb) {
+    it should "invoke a blocking action and return error response when 
activation fails" in {
+      implicit val tid = transid()
+      val action = WhiskAction(namespace, aname(), jsDefault("??"))
+      val activation = WhiskActivation(
+        action.namespace,
+        action.name,
+        creds.subject,
+        activationIdFactory.make(),
+        start = Instant.now,
+        end = Instant.now,
+        response = ActivationResponse.whiskError("test"))
+      put(entityStore, action)
+      // storing the activation in the db will allow the db polling to 
retrieve it
+      // the test harness makes sure the activation id observed by the test 
matches
+      // the one generated by the api handler
+      storeActivation(activation, context)
+      try {
+        Post(s"$collectionPath/${action.name}?blocking=true") ~> 
Route.seal(routes(creds)) ~> check {
+          status should be(InternalServerError)
+          val response = responseAs[JsObject]
+          response should be(activation.withoutLogs.toExtendedJson)
+          headers should contain(RawHeader(ActivationIdHeader, 
response.fields("activationId").convertTo[String]))
+        }
+      } finally {
+        deleteActivation(ActivationId(activation.docid.asString), context)
       }
-    } finally {
-      deleteActivation(ActivationId(activation.docid.asString), context)
     }
   }
 

Reply via email to