dubee closed pull request #4033: Remove Artifact Store Polling for Blocking 
Invocations
URL: https://github.com/apache/incubator-openwhisk/pull/4033
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
 
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index b924d931da..332f16aaa1 100644
--- 
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ 
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -26,7 +26,7 @@ import whisk.common.tracing.WhiskTracerProvider
 import whisk.common.{Logging, LoggingMarkers, TransactionId, UserEvents}
 import whisk.core.connector.{ActivationMessage, EventMessage, 
MessagingProvider}
 import whisk.core.controller.WhiskServices
-import whisk.core.database.{ActivationStore, NoDocumentException, UserContext}
+import whisk.core.database.{ActivationStore, UserContext}
 import whisk.core.entitlement.{Resource, _}
 import whisk.core.entity.ActivationResponse.ERROR_FIELD
 import whisk.core.entity._
@@ -574,10 +574,6 @@ protected[actions] trait PrimitiveActions {
    * Waits for a response from the message bus (e.g., Kafka) containing the 
result of the activation. This is the fast path
    * used for blocking calls where only the result of the activation is 
needed. This path is called active acknowledgement
    * or active ack.
-   *
-   * While waiting for the active ack, periodically poll the datastore in case 
there is a failure in the fast path delivery
-   * which could happen if the connection from an invoker to the message bus 
is disrupted, or if the publishing of the response
-   * fails because the message is too large.
    */
   private def waitForActivationResponse(user: Identity,
                                         activationId: ActivationId,
@@ -589,17 +585,13 @@ protected[actions] trait PrimitiveActions {
     val docid = new 
DocId(WhiskEntity.qualifiedName(user.namespace.name.toPath, activationId))
     logging.debug(this, s"action activation will block for result upto 
$totalWaitTime")
 
-    // 1. Wait for the active-ack to happen. Either immediately resolve the 
promise or poll the database quickly
-    //    in case of an incomplete active-ack (record too large for example).
+    // 1. Wait for the active-ack to happen
     activeAckResponse.foreach {
       case Right(activation) => result.trySuccess(Right(activation))
-      case _                 => pollActivation(docid, context, result, i => 
1.seconds + (2.seconds * i), maxRetries = 4)
+      case _                 =>
     }
 
-    // 2. Poll the database slowly in case the active-ack never arrives
-    pollActivation(docid, context, result, _ => 15.seconds)
-
-    // 3. Timeout forces a fallback to activationId
+    // 2. Timeout forces a fallback to activationId
     val timeout = 
actorSystem.scheduler.scheduleOnce(totalWaitTime)(result.trySuccess(Left(activationId)))
 
     result.future.andThen {
@@ -607,41 +599,6 @@ protected[actions] trait PrimitiveActions {
     }
   }
 
-  /**
-   * Polls the database for an activation.
-   *
-   * Does not use Future composition because an early exit is wanted, once any 
possible external source resolved the
-   * Promise.
-   *
-   * @param docid the docid to poll for
-   * @param result promise to resolve on result. Is also used to abort polling 
once completed.
-   */
-  private def pollActivation(docid: DocId,
-                             context: UserContext,
-                             result: Promise[Either[ActivationId, 
WhiskActivation]],
-                             wait: Int => FiniteDuration,
-                             retries: Int = 0,
-                             maxRetries: Int = Int.MaxValue)(implicit transid: 
TransactionId): Unit = {
-    if (!result.isCompleted && retries < maxRetries) {
-      val schedule = actorSystem.scheduler.scheduleOnce(wait(retries)) {
-        activationStore.get(ActivationId(docid.asString), context).onComplete {
-          case Success(activation) =>
-            transid.mark(
-              this,
-              LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING_DATABASE_RETRIEVAL,
-              s"retrieved activation for blocking invocation via DB polling",
-              logLevel = InfoLevel)
-            result.trySuccess(Right(activation))
-          case Failure(_: NoDocumentException) => pollActivation(docid, 
context, result, wait, retries + 1, maxRetries)
-          case Failure(t: Throwable)           => result.tryFailure(t)
-        }
-      }
-
-      // Halt the schedule if the result is provided during one execution
-      result.future.onComplete(_ => schedule.cancel())
-    }
-  }
-
   /** Max atomic action count allowed for sequences */
   private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt
 }
diff --git 
a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala 
b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
index fe4d2171e2..a17a4ff453 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
@@ -1261,41 +1261,6 @@ class ActionsApiTests extends ControllerTestCommon with 
WhiskActionsApi {
     }
   }
 
-  it should "invoke an action, blocking and retrieve result via db polling" in 
{
-    implicit val tid = transid()
-    val action = WhiskAction(namespace, aname(), jsDefault("??"))
-    val activation = WhiskActivation(
-      action.namespace,
-      action.name,
-      creds.subject,
-      activationIdFactory.make(),
-      start = Instant.now,
-      end = Instant.now,
-      response = ActivationResponse.success(Some(JsObject("test" -> 
"yes".toJson))))
-    put(entityStore, action)
-    // storing the activation in the db will allow the db polling to retrieve 
it
-    // the test harness makes sure the activation id observed by the test 
matches
-    // the one generated by the api handler
-    storeActivation(activation, context)
-    try {
-      Post(s"$collectionPath/${action.name}?blocking=true") ~> 
Route.seal(routes(creds)) ~> check {
-        status should be(OK)
-        val response = responseAs[JsObject]
-        response should be(activation.withoutLogs.toExtendedJson)
-      }
-
-      // repeat invoke, get only result back
-      Post(s"$collectionPath/${action.name}?blocking=true&result=true") ~> 
Route.seal(routes(creds)) ~> check {
-        status should be(OK)
-        val response = responseAs[JsObject]
-        response should be(activation.resultAsJson)
-        headers should contain(RawHeader(ActivationIdHeader, 
activation.activationId.asString))
-      }
-    } finally {
-      deleteActivation(ActivationId(activation.docid.asString), context)
-    }
-  }
-
   it should "invoke an action, blocking and retrieve result via active ack" in 
{
     implicit val tid = transid()
     val action = WhiskAction(namespace, aname(), jsDefault("??"))
@@ -1377,34 +1342,6 @@ class ActionsApiTests extends ControllerTestCommon with 
WhiskActionsApi {
     }
   }
 
-  it should "invoke a blocking action and return error response when 
activation fails" in {
-    implicit val tid = transid()
-    val action = WhiskAction(namespace, aname(), jsDefault("??"))
-    val activation = WhiskActivation(
-      action.namespace,
-      action.name,
-      creds.subject,
-      activationIdFactory.make(),
-      start = Instant.now,
-      end = Instant.now,
-      response = ActivationResponse.whiskError("test"))
-    put(entityStore, action)
-    // storing the activation in the db will allow the db polling to retrieve 
it
-    // the test harness makes sure the activation id observed by the test 
matches
-    // the one generated by the api handler
-    storeActivation(activation, context)
-    try {
-      Post(s"$collectionPath/${action.name}?blocking=true") ~> 
Route.seal(routes(creds)) ~> check {
-        status should be(InternalServerError)
-        val response = responseAs[JsObject]
-        response should be(activation.withoutLogs.toExtendedJson)
-        headers should contain(RawHeader(ActivationIdHeader, 
response.fields("activationId").convertTo[String]))
-      }
-    } finally {
-      deleteActivation(ActivationId(activation.docid.asString), context)
-    }
-  }
-
   it should "ensure WhiskActionMetadata is used to invoke an action" in {
     implicit val tid = transid()
     val action = WhiskAction(namespace, aname(), jsDefault("??"))
diff --git a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala 
b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
index e32cd9d747..0316d3a6fb 100644
--- a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
+++ b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
@@ -248,9 +248,7 @@ class ActionLimitsTests extends TestHelpers with 
WskTestHelpers with WskActorSys
 
     // Needs some bytes grace since activation message is not only the payload.
     val args = Map("p" -> ("a" * (allowedSize - 750).toInt).toJson)
-    val start = Instant.now
     val rr = wsk.action.invoke(name, args, blocking = true, expectedExitCode = 
TestUtils.SUCCESS_EXIT)
-    Instant.now.toEpochMilli - start.toEpochMilli should be < 15000L // Ensure 
activation was not retrieved via DB polling
     val activation = 
wsk.parseJsonString(rr.respData).convertTo[ActivationResult]
 
     activation.response.success shouldBe true
@@ -287,9 +285,7 @@ class ActionLimitsTests extends TestHelpers with 
WskTestHelpers with WskActorSys
       val args = Map("size" -> (allowedSize + 1).toJson, "char" -> "a".toJson)
       val code = if (blocking) BadGateway.intValue else TestUtils.ACCEPTED
       if (blocking) {
-        val start = Instant.now
         val rr = wsk.action.invoke(name, args, blocking = blocking, 
expectedExitCode = code)
-        Instant.now.toEpochMilli - start.toEpochMilli should be < 15000L // 
Ensure activation was not retrieved via DB polling
         
checkResponse(wsk.parseJsonString(rr.respData).convertTo[ActivationResult])
       } else {
         val rr = wsk.action.invoke(name, args, blocking = blocking, 
expectedExitCode = code)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to