rabbah closed pull request #3310: Refactor activation finisher without actor.
URL: https://github.com/apache/incubator-openwhisk/pull/3310
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
 
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index f77d9bfec1..b7d7737c3b 100644
--- 
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ 
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -17,39 +17,27 @@
 
 package whisk.core.controller.actions
 
-import java.time.Clock
-import java.time.Instant
+import java.time.{Clock, Instant}
 
-import scala.collection.mutable.Buffer
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.Promise
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.Failure
-import scala.util.Success
-import akka.actor.Actor
-import akka.actor.ActorRef
 import akka.actor.ActorSystem
-import akka.actor.Cancellable
-import akka.actor.Props
+import akka.event.Logging.InfoLevel
 import spray.json._
-import whisk.common.Logging
-import whisk.common.LoggingMarkers
-import whisk.common.Scheduler
-import whisk.common.TransactionId
+import whisk.common.{Logging, LoggingMarkers, TransactionId}
 import whisk.core.connector.ActivationMessage
 import whisk.core.controller.WhiskServices
 import whisk.core.database.NoDocumentException
-import whisk.core.entitlement._
-import whisk.core.entitlement.Resource
+import whisk.core.entitlement.{Resource, _}
 import whisk.core.entity._
 import whisk.core.entity.size.SizeInt
-import whisk.core.entity.types.ActivationStore
-import whisk.core.entity.types.EntityStore
+import whisk.core.entity.types.{ActivationStore, EntityStore}
 import whisk.http.Messages._
 import whisk.utils.ExecutionContextFactory.FutureExtensions
-import akka.event.Logging.InfoLevel
+
+import scala.collection.mutable.Buffer
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.{Failure, Success}
 
 protected[actions] trait PrimitiveActions {
   /** The core collections require backend services to be injected in this 
trait. */
@@ -569,158 +557,57 @@ protected[actions] trait PrimitiveActions {
                                         totalWaitTime: FiniteDuration,
                                         activeAckResponse: 
Future[Either[ActivationId, WhiskActivation]])(
     implicit transid: TransactionId): Future[Either[ActivationId, 
WhiskActivation]] = {
-    // this is the promise which active ack or db polling will try to complete 
via:
-    // 1. active ack response, or
-    // 2. failing active ack (due to active ack timeout), fall over to db 
polling
-    // 3. timeout on db polling => converts activation to non-blocking 
(returns activation id only)
-    // 4. internal error message
-    val docid = new DocId(WhiskEntity.qualifiedName(user.namespace.toPath, 
activationId))
-    val (promise, finisher) = ActivationFinisher.props({ () =>
-      WhiskActivation.get(activationStore, docid)
-    })
+    val result = Promise[Either[ActivationId, WhiskActivation]]
 
+    val docid = new DocId(WhiskEntity.qualifiedName(user.namespace.toPath, 
activationId))
     logging.debug(this, s"action activation will block for result upto 
$totalWaitTime")
 
-    activeAckResponse map {
-      case result @ Right(_) =>
-        // activation complete, result is available
-        finisher ! ActivationFinisher.Finish(result)
-
-      case _ =>
-        // active ack received but it does not carry the response,
-        // no result available except by polling the db
-        logging.warn(this, "preemptively polling db because active ack is 
missing result")
-        finisher ! Scheduler.WorkOnceNow
+    // 1. Wait for the active-ack to happen. Either immediately resolve the 
promise or poll the database quickly
+    //    in case of an incomplete active-ack (record too large for example).
+    activeAckResponse.foreach {
+      case Right(activation) => result.trySuccess(Right(activation))
+      case _                 => pollActivation(docid, result, i => 1.seconds + 
(2.seconds * i), maxRetries = 4)
     }
 
-    // return the promise which is either fulfilled by active ack, polling 
from the database,
-    // or the timeout alternative when the allowed duration expires (i.e., the 
action took
-    // longer than the permitted, per totalWaitTime).
-    promise.withAlternativeAfterTimeout(
-      totalWaitTime, {
-        Future.successful(Left(activationId)).andThen {
-          // result no longer interesting; terminate the finisher/shut down db 
polling if necessary
-          case _ => actorSystem.stop(finisher)
-        }
-      })
-  }
-
-  /** Max atomic action count allowed for sequences */
-  private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt
-}
-
-/** Companion to the ActivationFinisher. */
-protected[actions] object ActivationFinisher {
-  case class Finish(activation: Right[ActivationId, WhiskActivation])
-
-  private type ActivationLookup = () => Future[WhiskActivation]
+    // 2. Poll the database slowly in case the active-ack never arrives
+    pollActivation(docid, result, _ => 15.seconds)
 
-  /** Periodically polls the db to cover missing active acks. */
-  private val datastorePollPeriodForActivation = 15.seconds
+    // 3. Timeout forces a fallback to activationId
+    val timeout = 
actorSystem.scheduler.scheduleOnce(totalWaitTime)(result.trySuccess(Left(activationId)))
 
-  /**
-   * In case of a partial active ack where it is know an activation completed
-   * but the result could not be sent over the bus, use this periodicity to 
poll
-   * for a result.
-   */
-  private val datastorePreemptivePolling = Seq(1.second, 3.seconds, 5.seconds, 
7.seconds)
-
-  def props(activationLookup: ActivationLookup)(
-    implicit transid: TransactionId,
-    actorSystem: ActorSystem,
-    executionContext: ExecutionContext,
-    logging: Logging): (Future[Either[ActivationId, WhiskActivation]], 
ActorRef) = {
-
-    val (p, _, f) = props(activationLookup, datastorePollPeriodForActivation, 
datastorePreemptivePolling)
-    (p.future, f) // hides the polling actor
-  }
-
-  /**
-   * Creates the finishing actor.
-   * This is factored for testing.
-   */
-  protected[actions] def props(activationLookup: ActivationLookup,
-                               slowPoll: FiniteDuration,
-                               fastPolls: Seq[FiniteDuration])(
-    implicit transid: TransactionId,
-    actorSystem: ActorSystem,
-    executionContext: ExecutionContext,
-    logging: Logging): (Promise[Either[ActivationId, WhiskActivation]], 
ActorRef, ActorRef) = {
-
-    // this is strictly completed by the finishing actor
-    val promise = Promise[Either[ActivationId, WhiskActivation]]
-    val dbpoller = poller(slowPoll, promise, activationLookup)
-    val finisher = Props(new ActivationFinisher(dbpoller, fastPolls, promise))
-
-    (promise, dbpoller, actorSystem.actorOf(finisher))
+    result.future.andThen {
+      case _ => timeout.cancel()
+    }
   }
 
   /**
-   * An actor to complete a blocking activation request. It encapsulates a 
promise
-   * to be completed when the result is ready. This may happen in one of two 
ways.
-   * An active ack message is relayed to this actor to complete the promise 
when
-   * the active ack is received. Or in case of a partial/missing active ack, an
-   * explicitly scheduled datastore poll of the activation record, if found, 
will
-   * complete the transaction. When the promise is fulfilled, the actor self 
destructs.
+   * Polls the database for an activation.
+   *
+   * Does not use Future composition because an early exit is wanted, once any 
possible external source resolved the
+   * Promise.
+   *
+   * @param docid the docid to poll for
+   * @param result promise to resolve on result. Is also used to abort polling 
once completed.
    */
-  private class ActivationFinisher(poller: ActorRef, // the activation poller
-                                   fastPollPeriods: Seq[FiniteDuration],
-                                   promise: Promise[Either[ActivationId, 
WhiskActivation]])(
-    implicit transid: TransactionId,
-    actorSystem: ActorSystem,
-    executionContext: ExecutionContext,
-    logging: Logging)
-      extends Actor {
-
-    // when the future completes, self-destruct
-    promise.future.andThen { case _ => shutdown() }
-
-    var preemptiveMsgs = Vector.empty[Cancellable]
-
-    def receive = {
-      case ActivationFinisher.Finish(activation) =>
-        promise.trySuccess(activation)
-
-      case msg @ Scheduler.WorkOnceNow =>
-        // try up to three times when pre-emptying the schedule
-        fastPollPeriods.foreach { s =>
-          preemptiveMsgs = preemptiveMsgs :+ 
context.system.scheduler.scheduleOnce(s, poller, msg)
+  private def pollActivation(docid: DocId,
+                             result: Promise[Either[ActivationId, 
WhiskActivation]],
+                             wait: Int => FiniteDuration,
+                             retries: Int = 0,
+                             maxRetries: Int = Int.MaxValue)(implicit transid: 
TransactionId): Unit = {
+    if (!result.isCompleted && retries < maxRetries) {
+      val schedule = actorSystem.scheduler.scheduleOnce(wait(retries)) {
+        WhiskActivation.get(activationStore, docid).onComplete {
+          case Success(activation)             => 
result.trySuccess(Right(activation))
+          case Failure(_: NoDocumentException) => pollActivation(docid, 
result, wait, retries + 1, maxRetries)
+          case Failure(t: Throwable)           => result.tryFailure(t)
         }
-    }
-
-    def shutdown(): Unit = Option(context).foreach(_.stop(self))
+      }
 
-    override def postStop() = {
-      logging.debug(this, "finisher shutdown")
-      preemptiveMsgs.foreach(_.cancel())
-      preemptiveMsgs = Vector.empty
-      context.stop(poller)
+      // Halt the schedule if the result is provided during one execution
+      result.future.onComplete(_ => schedule.cancel())
     }
   }
 
-  /**
-   * This creates the inner datastore poller for the completed activation.
-   * It is a factory method to facilitate testing.
-   */
-  private def poller(slowPollPeriod: FiniteDuration,
-                     promise: Promise[Either[ActivationId, WhiskActivation]],
-                     activationLookup: ActivationLookup)(implicit transid: 
TransactionId,
-                                                         actorSystem: 
ActorSystem,
-                                                         executionContext: 
ExecutionContext,
-                                                         logging: Logging): 
ActorRef = {
-    Scheduler.scheduleWaitAtMost(slowPollPeriod, initialDelay = 
slowPollPeriod, name = "dbpoll")(() => {
-      if (!promise.isCompleted) {
-        activationLookup() map {
-          // complete the future, which in turn will poison pill this scheduler
-          activation =>
-            promise.trySuccess(Right(activation.withoutLogs)) // logs excluded 
on blocking calls
-        } andThen {
-          case Failure(e: NoDocumentException) => // do nothing, scheduler 
will reschedule another poll
-          case Failure(t: Throwable) => // something went wrong, abort
-            logging.error(this, s"failed while waiting on result: 
${t.getMessage}")
-            promise.tryFailure(t) // complete the future, which in turn will 
poison pill this scheduler
-        }
-      } else Future.successful({}) // the scheduler will be halted because the 
promise is now resolved
-    })
-  }
+  /** Max atomic action count allowed for sequences */
+  private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt
 }
diff --git 
a/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala
 
b/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala
deleted file mode 100644
index 25f48ae130..0000000000
--- 
a/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.controller.actions.test
-
-import java.time.Instant
-
-import scala.concurrent.Future
-import scala.concurrent.duration.DurationInt
-
-import org.junit.runner.RunWith
-import org.scalatest.BeforeAndAfterEach
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-import org.scalatest.junit.JUnitRunner
-
-import common.StreamLogging
-import common.WskActorSystem
-import spray.json._
-import whisk.common.TransactionId
-import whisk.core.controller.actions.ActivationFinisher
-import whisk.core.entity._
-import whisk.core.entity.ActivationResponse
-import whisk.core.entity.size.SizeInt
-import whisk.core.database.NoDocumentException
-import akka.testkit.TestProbe
-import whisk.common.Scheduler
-import akka.actor.PoisonPill
-
-@RunWith(classOf[JUnitRunner])
-class ActivationFinisherTests
-    extends FlatSpec
-    with BeforeAndAfterEach
-    with Matchers
-    with WskActorSystem
-    with StreamLogging {
-
-  implicit val tid = TransactionId.testing
-
-  val activation = WhiskActivation(
-    namespace = EntityPath("ns"),
-    name = EntityName("a"),
-    Subject(),
-    activationId = ActivationId(),
-    start = Instant.now(),
-    end = Instant.now(),
-    response = ActivationResponse.success(Some(JsObject("res" -> 
JsNumber(2)))),
-    annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), 
MemoryLimit(128.MB), LogLimit(1.MB)).toJson),
-    duration = Some(123))
-
-  var activationLookupCounter = 0
-  @volatile var activationResult: Option[Throwable] = None
-
-  def activationLookup(): Future[WhiskActivation] = {
-    activationLookupCounter += 1
-    
activationResult.map(Future.failed(_)).getOrElse(Future.successful(activation))
-  }
-
-  override def beforeEach() = {
-    activationLookupCounter = 0
-    activationResult = None
-  }
-
-  behavior of "activation finisher"
-
-  val slowPoll = 200.milliseconds
-  val fastPoll = Seq()
-
-  it should "poll until promise is completed" in {
-    activationResult = Some(NoDocumentException(""))
-    val (promise, poller, finisher) = 
ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
-
-    val testProbePoller = TestProbe()
-    val testProbeFinisher = TestProbe()
-    testProbePoller.watch(poller)
-    testProbeFinisher.watch(finisher)
-
-    val slowPollWorkWindow = (slowPoll * 3) + (slowPoll / 2)
-    Thread.sleep(slowPollWorkWindow.toMillis)
-    activationLookupCounter should (be >= 2 and be <= 3)
-
-    // should terminate the parent finisher and child poller on completion
-    promise.trySuccess(Right(activation))
-
-    testProbePoller.expectTerminated(poller, 1.second)
-    testProbeFinisher.expectTerminated(finisher, 1.second)
-  }
-
-  it should "complete promise from poller" in {
-    val (promise, poller, finisher) = 
ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
-
-    val testProbePoller = TestProbe()
-    val testProbeFinisher = TestProbe()
-    testProbePoller.watch(poller)
-    testProbeFinisher.watch(finisher)
-
-    val slowPollWorkWindow = (slowPoll * 2) + (slowPoll / 1)
-    Thread.sleep(slowPollWorkWindow.toMillis)
-    activationLookupCounter should be(1)
-
-    testProbePoller.expectTerminated(poller, 1.second)
-    testProbeFinisher.expectTerminated(finisher, 1.second)
-
-    promise shouldBe 'completed
-  }
-
-  it should "finish when receiving corresponding message" in {
-    activationResult = Some(NoDocumentException(""))
-    val (promise, poller, finisher) = 
ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
-
-    val testProbePoller = TestProbe()
-    val testProbeFinisher = TestProbe()
-    testProbePoller.watch(poller)
-    testProbeFinisher.watch(finisher)
-
-    val slowPollWorkWindow = (slowPoll * 2) + (slowPoll / 1)
-    Thread.sleep(slowPollWorkWindow.toMillis)
-    activationLookupCounter should (be >= 1 and be <= 2)
-
-    // should terminate the parent finisher and child poller once message is 
received
-    finisher ! ActivationFinisher.Finish(Right(activation))
-
-    testProbePoller.expectTerminated(poller, 1.second)
-    testProbeFinisher.expectTerminated(finisher, 1.second)
-
-    promise shouldBe 'completed
-  }
-
-  it should "poll pre-emptively" in {
-    activationResult = Some(NoDocumentException(""))
-    val slowPoll = 600.milliseconds
-    val fastPoll = Seq(100.milliseconds, 200.milliseconds)
-    val (promise, poller, finisher) = 
ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
-
-    val testProbePoller = TestProbe()
-    val testProbeFinisher = TestProbe()
-    testProbePoller.watch(poller)
-    testProbeFinisher.watch(finisher)
-
-    Thread.sleep(500.milliseconds.toMillis)
-    activationLookupCounter should be(0)
-
-    // should cause polls
-    finisher ! Scheduler.WorkOnceNow
-    Thread.sleep(500.milliseconds.toMillis)
-    activationLookupCounter should be(3)
-
-    finisher ! PoisonPill
-
-    testProbePoller.expectTerminated(poller, 1.second)
-    testProbeFinisher.expectTerminated(finisher, 1.second)
-  }
-
-}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to