This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 4db501b Add metrics to monitor the overall memory consumed by usercontainers. (#3831) 4db501b is described below commit 4db501b5032f7e8251e77c4698e5d103bac9a779 Author: Christian Bickel <git...@cbickel.de> AuthorDate: Tue Jul 3 09:09:10 2018 +0200 Add metrics to monitor the overall memory consumed by usercontainers. (#3831) --- common/scala/src/main/scala/whisk/common/Logging.scala | 2 ++ .../whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala | 9 ++++++++- .../loadBalancer/test/ShardingContainerPoolBalancerTests.scala | 4 +++- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala index cf8cb26..17bc2d3 100644 --- a/common/scala/src/main/scala/whisk/common/Logging.scala +++ b/common/scala/src/main/scala/whisk/common/Logging.scala @@ -278,6 +278,8 @@ object LoggingMarkers { def LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance: ControllerInstanceId) = LogMarkerToken(loadbalancer + controllerInstance.asString, "activationsInflight", count) + def LOADBALANCER_MEMORY_INFLIGHT(controllerInstance: ControllerInstanceId) = + LogMarkerToken(loadbalancer + controllerInstance.asString, "memoryInflight", count) // Time that is needed to execute the action val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start) diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala index 72124ec..9ddbf0a 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala @@ -35,6 +35,7 @@ import whisk.common._ import whisk.core.WhiskConfig._ import whisk.core.connector._ import whisk.core.entity._ +import whisk.core.entity.size._ import whisk.core.{ConfigKeys, WhiskConfig} import whisk.spi.SpiLoader @@ -159,12 +160,14 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con private val activations = TrieMap[ActivationId, ActivationEntry]() private val activationsPerNamespace = TrieMap[UUID, LongAdder]() private val totalActivations = new LongAdder() + private val totalActivationMemory = new LongAdder() /** State needed for scheduling. */ private val schedulingState = ShardingContainerPoolBalancerState()() actorSystem.scheduler.schedule(0.seconds, 10.seconds) { MetricEmitter.emitHistogramMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance), totalActivations.longValue) + MetricEmitter.emitHistogramMetric(LOADBALANCER_MEMORY_INFLIGHT(controllerInstance), totalActivationMemory.longValue) } /** @@ -248,6 +251,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con instance: InvokerInstanceId): ActivationEntry = { totalActivations.increment() + totalActivationMemory.add(action.limits.memory.megabytes) activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new LongAdder()).increment() val timeout = action.limits.timeout.duration.max(TimeLimit.STD_DURATION) + 1.minute @@ -266,6 +270,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con msg.activationId, msg.user.namespace.uuid, instance, + action.limits.memory.megabytes.MB, timeoutHandler, Promise[Either[ActivationId, WhiskActivation]]()) }) @@ -347,6 +352,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con activations.remove(aid) match { case Some(entry) => totalActivations.decrement() + totalActivationMemory.add(entry.memory.toMB * (-1)) activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement()) schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release()) @@ -427,7 +433,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider { dispatched: IndexedSeq[ForcableSemaphore], index: Int, step: Int, - stepsDone: Int = 0)(implicit logging: Logging): Option[InvokerInstanceId] = { + stepsDone: Int = 0)(implicit logging: Logging, transId: TransactionId): Option[InvokerInstanceId] = { val numInvokers = invokers.size if (numInvokers > 0) { @@ -588,5 +594,6 @@ case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, invoker case class ActivationEntry(id: ActivationId, namespaceId: UUID, invokerName: InvokerInstanceId, + memory: ByteSize, timeoutHandler: Cancellable, promise: Promise[Either[ActivationId, WhiskActivation]]) diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala index 4c8d15f..9b2567e 100644 --- a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala +++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala @@ -21,7 +21,7 @@ import common.StreamLogging import org.junit.runner.RunWith import org.scalatest.{FlatSpec, Matchers} import org.scalatest.junit.JUnitRunner -import whisk.common.ForcableSemaphore +import whisk.common.{ForcableSemaphore, TransactionId} import whisk.core.entity.InvokerInstanceId import whisk.core.loadBalancer._ @@ -153,6 +153,8 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str behavior of "schedule" + implicit val transId = TransactionId.testing + it should "return None on an empty invoker list" in { ShardingContainerPoolBalancer.schedule(IndexedSeq.empty, IndexedSeq.empty, index = 0, step = 2) shouldBe None }