rabbah closed pull request #3129: fix memory drag in PrimitiveActions, and
clean up withAlternateAfterTimeout
URL: https://github.com/apache/incubator-openwhisk/pull/3129
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
index 88764d3f52..ea025cbf32 100644
--- a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
+++ b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
@@ -23,38 +23,63 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration.FiniteDuration
-import scala.util.Try
+import scala.util.control.NonFatal
import akka.actor.ActorSystem
-import akka.pattern.{after => expire}
+import akka.actor.Cancellable
+import akka.actor.Scheduler
object ExecutionContextFactory {
- // Future.firstCompletedOf has a memory drag bug
- //
https://stackoverflow.com/questions/36420697/about-future-firstcompletedof-and-garbage-collect-mechanism
- def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit
executor: ExecutionContext): Future[T] = {
+ private type CancellableFuture[T] = (Cancellable, Future[T])
+
+ /**
+ * akka.pattern.after has a memory drag issue: it opaquely
+ * schedules an actor which consequently results in drag for the
+ * timeout duration
+ *
+ */
+ def expire[T](duration: FiniteDuration, using: Scheduler)(value: ?
Future[T])(
+ implicit ec: ExecutionContext): CancellableFuture[T] = {
val p = Promise[T]()
- val pref = new java.util.concurrent.atomic.AtomicReference(p)
- val completeFirst: Try[T] => Unit = { result: Try[T] =>
- val promise = pref.getAndSet(null)
- if (promise != null) {
- promise.tryComplete(result)
+ val cancellable = using.scheduleOnce(duration) {
+ p completeWith {
+ try value
+ catch { case NonFatal(t) ? Future.failed(t) }
}
}
- futures foreach { _ onComplete completeFirst }
+ (cancellable, p.future)
+ }
+
+ /**
+ * Return the first of the two given futures to complete; if f1
+ * finishes first, we will cancel f2
+ *
+ */
+ def firstCompletedOf2[T](f1: Future[T], f2Cancellable: CancellableFuture[T])(
+ implicit executor: ExecutionContext): Future[T] = {
+ val p = Promise[T]()
+ val (f2Killswitch, f2) = f2Cancellable
+
+ f1.onComplete { result =>
+ p.tryComplete(result)
+ f2Killswitch.cancel()
+ }
+ f2.onComplete(p.tryComplete)
+
p.future
}
implicit class FutureExtensions[T](f: Future[T]) {
def withTimeout(timeout: FiniteDuration, msg: => Throwable)(implicit
system: ActorSystem): Future[T] = {
implicit val ec = system.dispatcher
- firstCompletedOf(Seq(f, expire(timeout,
system.scheduler)(Future.failed(msg))))
+ firstCompletedOf2(f, expire(timeout,
system.scheduler)(Future.failed(msg)))
}
def withAlternativeAfterTimeout(timeout: FiniteDuration, alt: =>
Future[T])(
implicit system: ActorSystem): Future[T] = {
implicit val ec = system.dispatcher
- firstCompletedOf(Seq(f, expire(timeout, system.scheduler)(alt)))
+ firstCompletedOf2(f, expire(timeout, system.scheduler)(alt))
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services