starpit commented on a change in pull request #3129: fix memory drag in 
PrimitiveActions, and clean up withAlternateAfterTimeout
URL: 
https://github.com/apache/incubator-openwhisk/pull/3129#discussion_r159322291
 
 

 ##########
 File path: 
common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
 ##########
 @@ -23,38 +23,61 @@ import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.Promise
 import scala.concurrent.duration.FiniteDuration
-import scala.util.Try
+import scala.util.control.NonFatal
 
 import akka.actor.ActorSystem
-import akka.pattern.{after => expire}
+import akka.actor.Cancellable
+import akka.actor.Scheduler
 
 object ExecutionContextFactory {
 
-  // Future.firstCompletedOf has a memory drag bug
-  // 
https://stackoverflow.com/questions/36420697/about-future-firstcompletedof-and-garbage-collect-mechanism
-  def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit 
executor: ExecutionContext): Future[T] = {
+  private type CancellableFuture[T] = (Cancellable, Future[T])
+
+  /**
+   * akka.pattern.after has a memory drag issue: it opaquely
+   * schedules an actor which consequently results in drag for the
+   * timeout duration
+   *
+   */
+  def expire[T](duration: FiniteDuration, using: Scheduler)(value: ? 
Future[T])(
+    implicit ec: ExecutionContext): CancellableFuture[T] = {
     val p = Promise[T]()
-    val pref = new java.util.concurrent.atomic.AtomicReference(p)
-    val completeFirst: Try[T] => Unit = { result: Try[T] =>
-      val promise = pref.getAndSet(null)
-      if (promise != null) {
-        promise.tryComplete(result)
+    val cancellable = using.scheduleOnce(duration) {
+      p completeWith {
+        try value
+        catch { case NonFatal(t) ? Future.failed(t) }
       }
     }
-    futures foreach { _ onComplete completeFirst }
+    (cancellable, p.future)
+  }
+
+  /**
+   * Return the first of the two given futures to complete; if f1
+   * finishes first, we will cancel f2
+   *
+   */
+  def firstCompletedOf2[T](f1: Future[T], f2: CancellableFuture[T])(implicit 
executor: ExecutionContext): Future[T] = {
+    val p = Promise[T]()
+
+    f1 onComplete { result =>
+      p.tryComplete(result)
+      f2._1.cancel()
+    }
+    f2._2 onComplete p.tryComplete
 
 Review comment:
   i'm happy to oblige. updated pushed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to