chetanmeh commented on a change in pull request #4326: Invoker backpressure
URL: 
https://github.com/apache/incubator-openwhisk/pull/4326#discussion_r289349415
 
 

 ##########
 File path: 
core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
 ##########
 @@ -227,48 +276,131 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
         busyPool = busyPool + (sender() -> newData)
         freePool = freePool - sender()
       }
+      updateUnused() //in case a previously in-use is now unused
+      processBuffer()
 
     // Container is prewarmed and ready to take work
     case NeedWork(data: PreWarmedData) =>
+      //stop tracking via reserved
+      resourceManager.releaseReservation(sender())
       prewarmedPool = prewarmedPool + (sender() -> data)
 
     // Container got removed
     case ContainerRemoved =>
+      //stop tracking via reserved (should already be removed, except in case 
of failure)
+      resourceManager.releaseReservation(sender())
+
       // if container was in free pool, it may have been processing (but under 
capacity),
       // so there is capacity to accept another job request
-      freePool.get(sender()).foreach { f =>
+      val foundFree: Option[ActorRef] = freePool.get(sender()).map { f =>
         freePool = freePool - sender()
         if (f.activeActivationCount > 0) {
-          feed ! MessageFeed.Processed
+          processBuffer()
         }
+        updateUnused()
+        sender()
       }
       // container was busy (busy indicates at full capacity), so there is 
capacity to accept another job request
-      busyPool.get(sender()).foreach { _ =>
+      val foundBusy = busyPool.get(sender()).map { _ =>
         busyPool = busyPool - sender()
-        feed ! MessageFeed.Processed
+        processBuffer()
+        sender()
+      }
+      //if container was neither free or busy,
+      if (foundFree.orElse(foundBusy).isEmpty) {
+        processBuffer()
       }
 
     // This message is received for one of these reasons:
     // 1. Container errored while resuming a warm container, could not process 
the job, and sent the job back
     // 2. The container aged, is destroying itself, and was assigned a job 
which it had to send back
     // 3. The container aged and is destroying itself
+    // 4. The container is paused, and being removed to make room for new 
containers
     // Update the free/busy lists but no message is sent to the feed since 
there is no change in capacity yet
     case RescheduleJob =>
+      resourceManager.releaseReservation(sender())
       freePool = freePool - sender()
       busyPool = busyPool - sender()
+      updateUnused()
+    case InitPrewarms =>
+      initPrewarms()
+    case ResourceUpdate => //we may have more resources - either process the 
buffer or request another feed message
+      processBuffer()
+    case ContainerStarted => //only used for receiving post-start from cold 
container
+      //stop tracking via reserved
+      resourceManager.releaseReservation(sender())
+    case NeedResources(size: ByteSize) => //this is the inverse of NeedWork
+      
MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_RESOURCE_ERROR)
+      //we probably are here because resources were not available even though 
we thought they were,
+      //so preemptively request more
+      resourceManager.requestSpace(size)
+    case ReleaseFree(refs) =>
+      logging.info(this, s"pool is trying to release ${refs.size} containers 
by request of invoker")
+      //remove each ref, IFF it is still not in use, and has not been used 
since the removal was requested
+      ContainerPool.findIdlesToRemove(freePool, refs).foreach(removeContainer)
+    case EmitMetrics =>
+      logging.info(
+        this,
+        s"metrics invoker (self) has ${runBuffer.size} buffered 
(${runBuffer.map(_.action.limits.memory.megabytes).sum}MB)")
+
+      
MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_RUNBUFFER_COUNT, 
runBuffer.size)
+      MetricEmitter.emitGaugeMetric(
+        LoggingMarkers.CONTAINER_POOL_RUNBUFFER_SIZE,
+        runBuffer.map(_.action.limits.memory.megabytes).sum)
+      val containersInUse = inUse
+      
MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_ACTIVE_COUNT, 
containersInUse.size)
+      MetricEmitter.emitGaugeMetric(
+        LoggingMarkers.CONTAINER_POOL_ACTIVE_SIZE,
+        containersInUse.map(_._2.memoryLimit.toMB).sum)
+      
MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_COUNT, 
prewarmedPool.size)
+      MetricEmitter.emitGaugeMetric(
+        LoggingMarkers.CONTAINER_POOL_PREWARM_SIZE,
+        prewarmedPool.map(_._2.memoryLimit.toMB).sum)
+  }
+
+  /** Buffer processing in cluster managed resources means to send the first 
item in runBuffer;
+   *  In non-clustered case, it means signalling MessageFeed (since runBuffer 
is processed in tight loop).
+   * */
+  def processBuffer() = {
+    if (poolConfig.clusterManagedResources) {
 
 Review comment:
   Any reason for this logic to be restricted to cluster managed resources flow 
only (unless intention is to be strictly backward compatible with previous flow 
for non cluster manage case)? 
   
   So far the job from `runBuffer` was sent back only when current job was sent 
for processing. For `NeedWork` and `ContainerRemoved` case `runBuffer` was not 
touched. But logically it should be ok to first check for `runBuffer` and then 
only ask for more work
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to