tysonnorris commented on a change in pull request #4326: Invoker backpressure
URL:
https://github.com/apache/incubator-openwhisk/pull/4326#discussion_r298367413
##########
File path:
core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
##########
@@ -227,48 +276,131 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
busyPool = busyPool + (sender() -> newData)
freePool = freePool - sender()
}
+ updateUnused() //in case a previously in-use is now unused
+ processBuffer()
// Container is prewarmed and ready to take work
case NeedWork(data: PreWarmedData) =>
+ //stop tracking via reserved
+ resourceManager.releaseReservation(sender())
prewarmedPool = prewarmedPool + (sender() -> data)
// Container got removed
case ContainerRemoved =>
+ //stop tracking via reserved (should already be removed, except in case
of failure)
+ resourceManager.releaseReservation(sender())
+
// if container was in free pool, it may have been processing (but under
capacity),
// so there is capacity to accept another job request
- freePool.get(sender()).foreach { f =>
+ val foundFree: Option[ActorRef] = freePool.get(sender()).map { f =>
freePool = freePool - sender()
if (f.activeActivationCount > 0) {
- feed ! MessageFeed.Processed
+ processBuffer()
}
+ updateUnused()
+ sender()
}
// container was busy (busy indicates at full capacity), so there is
capacity to accept another job request
- busyPool.get(sender()).foreach { _ =>
+ val foundBusy = busyPool.get(sender()).map { _ =>
busyPool = busyPool - sender()
- feed ! MessageFeed.Processed
+ processBuffer()
+ sender()
+ }
+ //if container was neither free or busy,
+ if (foundFree.orElse(foundBusy).isEmpty) {
+ processBuffer()
}
// This message is received for one of these reasons:
// 1. Container errored while resuming a warm container, could not process
the job, and sent the job back
// 2. The container aged, is destroying itself, and was assigned a job
which it had to send back
// 3. The container aged and is destroying itself
+ // 4. The container is paused, and being removed to make room for new
containers
// Update the free/busy lists but no message is sent to the feed since
there is no change in capacity yet
case RescheduleJob =>
+ resourceManager.releaseReservation(sender())
freePool = freePool - sender()
busyPool = busyPool - sender()
+ updateUnused()
+ case InitPrewarms =>
+ initPrewarms()
+ case ResourceUpdate => //we may have more resources - either process the
buffer or request another feed message
+ processBuffer()
+ case ContainerStarted => //only used for receiving post-start from cold
container
+ //stop tracking via reserved
+ resourceManager.releaseReservation(sender())
+ case NeedResources(size: ByteSize) => //this is the inverse of NeedWork
+
MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_RESOURCE_ERROR)
+ //we probably are here because resources were not available even though
we thought they were,
+ //so preemptively request more
+ resourceManager.requestSpace(size)
+ case ReleaseFree(refs) =>
+ logging.info(this, s"pool is trying to release ${refs.size} containers
by request of invoker")
+ //remove each ref, IFF it is still not in use, and has not been used
since the removal was requested
+ ContainerPool.findIdlesToRemove(freePool, refs).foreach(removeContainer)
+ case EmitMetrics =>
+ logging.info(
+ this,
+ s"metrics invoker (self) has ${runBuffer.size} buffered
(${runBuffer.map(_.action.limits.memory.megabytes).sum}MB)")
+
+
MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_RUNBUFFER_COUNT,
runBuffer.size)
+ MetricEmitter.emitGaugeMetric(
+ LoggingMarkers.CONTAINER_POOL_RUNBUFFER_SIZE,
+ runBuffer.map(_.action.limits.memory.megabytes).sum)
+ val containersInUse = inUse
+
MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_ACTIVE_COUNT,
containersInUse.size)
+ MetricEmitter.emitGaugeMetric(
+ LoggingMarkers.CONTAINER_POOL_ACTIVE_SIZE,
+ containersInUse.map(_._2.memoryLimit.toMB).sum)
+
MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_COUNT,
prewarmedPool.size)
+ MetricEmitter.emitGaugeMetric(
+ LoggingMarkers.CONTAINER_POOL_PREWARM_SIZE,
+ prewarmedPool.map(_._2.memoryLimit.toMB).sum)
+ }
+
+ /** Buffer processing in cluster managed resources means to send the first
item in runBuffer;
+ * In non-clustered case, it means signalling MessageFeed (since runBuffer
is processed in tight loop).
+ * */
+ def processBuffer() = {
+ if (poolConfig.clusterManagedResources) {
Review comment:
Agreed - removed the restriction so that processBuffer() is called for all
(cluster managed, and not).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services