This is an automated email from the ASF dual-hosted git repository. chetanm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new ba525f1 Mesos metrics (#4102) ba525f1 is described below commit ba525f1de5d253eafec467d8f9801d7636a976e2 Author: tysonnorris <tysonnor...@gmail.com> AuthorDate: Wed Nov 7 10:18:04 2018 -0800 Mesos metrics (#4102) * include launch/kill timing metrics for mesos containers; handle launch/kill timeouts * update mesos-actor version, include configurable healthcheck changes, make all timeouts configurable --- common/scala/build.gradle | 2 +- common/scala/src/main/resources/application.conf | 17 ++++- .../src/main/scala/whisk/common/Logging.scala | 4 ++ .../whisk/core/mesos/MesosContainerFactory.scala | 45 +++++++----- .../main/scala/whisk/core/mesos/MesosTask.scala | 82 +++++++++++++++++----- .../mesos/test/MesosContainerFactoryTest.scala | 24 ++++--- 6 files changed, 128 insertions(+), 46 deletions(-) diff --git a/common/scala/build.gradle b/common/scala/build.gradle index aa62429..2d550ed 100644 --- a/common/scala/build.gradle +++ b/common/scala/build.gradle @@ -64,7 +64,7 @@ dependencies { compile 'io.kamon:kamon-core_2.12:0.6.7' compile 'io.kamon:kamon-statsd_2.12:0.6.7' //for mesos - compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.8_2.12' + compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.13' //tracing support compile 'io.opentracing:opentracing-api:0.31.0' diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index b7afb041..6f168dd 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -231,12 +231,27 @@ whisk { master-url = "http://localhost:5050" //your mesos master master-public-url = "http://localhost:5050" // if mesos-link-log-message == true, this link will be included with the static log message (may or may not be different from master-url) role = "*" //see http://mesos.apache.org/documentation/latest/roles/#associating-frameworks-with-roles - failover-timeout = 0 seconds //Timeout allowed for framework to reconnect after disconnection. mesos-link-log-message = true //If true, display a link to mesos in the static log message, otherwise do not include a link to mesos. constraints = [] //placement constraint strings to use for managed containers e.g. ["att1 LIKE v1", "att2 UNLIKE v2"] blackbox-constraints = [] //placement constraints to use for blackbox containers constraint-delimiter = " "//used to parse constraint strings teardown-on-exit = true //set to true to disable the mesos framework on system exit; set for false for HA deployments + offer-refuse-duration = 5 seconds //minimum time until an offer will arrive again at a particular invoker + timeouts { + failover = 0 seconds //Timeout allowed for framework to reconnect after disconnection. + task-launch = 45 seconds //timeout for creating mesos tasks (containers) + task-delete = 30 seconds //timeout for destroying mesos tasks (containers) + subscribe = 10 seconds //timeout for framework subscription handshake + teardown = 30 seconds //timeout for framework teardown + } + health-check {#Remove health-section section to disable healthchecks at action containers + port-index = 0 //should always be port 0 (action container should only listen on 1 port) + delay = 0 seconds //the amount of time (in seconds) to wait until starting checking the task. + interval = 1 seconds //the interval (in seconds) between check attempts. + timeout = 1 seconds //the amount of time (in seconds) to wait for the check to complete + grace-period = 25 seconds //the amount of time after the task is launched during which health check failures are ignored. + max-consecutive-failures = 3 //the number of consecutive failures until the task is killed by the executor. + } } logstore { diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala index c61f059..c60efc3 100644 --- a/common/scala/src/main/scala/whisk/common/Logging.scala +++ b/common/scala/src/main/scala/whisk/common/Logging.scala @@ -302,6 +302,10 @@ object LoggingMarkers { LogMarkerToken(invoker, "docker", timeout, Some(cmd), Map("cmd" -> cmd)) def INVOKER_RUNC_CMD(cmd: String) = LogMarkerToken(invoker, "runc", start, Some(cmd), Map("cmd" -> cmd)) def INVOKER_KUBECTL_CMD(cmd: String) = LogMarkerToken(invoker, "kubectl", start, Some(cmd), Map("cmd" -> cmd)) + def INVOKER_MESOS_CMD(cmd: String) = + LogMarkerToken(invoker, "mesos", start, Some(cmd), Map("cmd" -> cmd)) + def INVOKER_MESOS_CMD_TIMEOUT(cmd: String) = + LogMarkerToken(invoker, "mesos", timeout, Some(cmd), Map("cmd" -> cmd)) def INVOKER_CONTAINER_START(containerState: String) = LogMarkerToken(invoker, "containerStart", count, Some(containerState), Map("containerState" -> containerState)) val CONTAINER_CLIENT_RETRIES = diff --git a/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala b/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala index b29bbd8..820718b 100644 --- a/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala +++ b/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala @@ -51,22 +51,38 @@ import whisk.core.entity.InvokerInstanceId import whisk.core.entity.UUID /** + * Configuration for mesos timeouts + */ +case class MesosTimeoutConfig(failover: FiniteDuration, + taskLaunch: FiniteDuration, + taskDelete: FiniteDuration, + subscribe: FiniteDuration, + teardown: FiniteDuration) + +/** + * Configuration for mesos action container health checks + */ +case class MesosContainerHealthCheckConfig(portIndex: Int, + delay: FiniteDuration, + interval: FiniteDuration, + timeout: FiniteDuration, + gracePeriod: FiniteDuration, + maxConsecutiveFailures: Int) + +/** * Configuration for MesosClient - * @param masterUrl The mesos url e.g. http://leader.mesos:5050. - * @param masterPublicUrl A public facing mesos url (which may be different that the internal facing url) e.g. http://mymesos:5050. - * @param role The role used by this framework (see http://mesos.apache.org/documentation/latest/roles/#associating-frameworks-with-roles). - * @param failoverTimeout Timeout allowed for framework to reconnect after disconnection. - * @param mesosLinkLogMessage If true, display a link to mesos in the static log message, otherwise do not include a link to mesos. */ case class MesosConfig(masterUrl: String, masterPublicUrl: Option[String], role: String, - failoverTimeout: FiniteDuration, mesosLinkLogMessage: Boolean, constraints: Seq[String], constraintDelimiter: String, blackboxConstraints: Seq[String], - teardownOnExit: Boolean) {} + teardownOnExit: Boolean, + healthCheck: Option[MesosContainerHealthCheckConfig], + offerRefuseDuration: FiniteDuration, + timeouts: MesosTimeoutConfig) {} class MesosContainerFactory(config: WhiskConfig, actorSystem: ActorSystem, @@ -79,9 +95,6 @@ class MesosContainerFactory(config: WhiskConfig, taskIdGenerator: () => String = MesosContainerFactory.taskIdGenerator _) extends ContainerFactory { - val subscribeTimeout = 10.seconds - val teardownTimeout = 30.seconds - implicit val as: ActorSystem = actorSystem implicit val ec: ExecutionContext = actorSystem.dispatcher @@ -94,7 +107,7 @@ class MesosContainerFactory(config: WhiskConfig, private def subscribe(): Future[Unit] = { logging.info(this, s"subscribing to Mesos master at ${mesosConfig.masterUrl}") mesosClientActor - .ask(Subscribe)(subscribeTimeout) + .ask(Subscribe)(mesosConfig.timeouts.subscribe) .mapTo[SubscribeComplete] .map(complete => logging.info(this, s"subscribe completed successfully... $complete")) .recoverWith { @@ -122,7 +135,6 @@ class MesosContainerFactory(config: WhiskConfig, mesosConfig.constraints } - logging.info(this, s"using Mesos to create a container with image $image...") MesosTask.create( mesosClientActor, mesosConfig, @@ -165,8 +177,8 @@ class MesosContainerFactory(config: WhiskConfig, /** Cleanups any remaining Containers; should block until complete; should ONLY be run at shutdown. */ override def cleanup(): Unit = { - val complete: Future[Any] = mesosClientActor.ask(Teardown)(teardownTimeout) - Try(Await.result(complete, teardownTimeout)) + val complete: Future[Any] = mesosClientActor.ask(Teardown)(mesosConfig.timeouts.teardown) + Try(Await.result(complete, mesosConfig.timeouts.teardown)) .map(_ => logging.info(this, "Mesos framework teardown completed.")) .recover { case _: TimeoutException => logging.error(this, "Mesos framework teardown took too long.") @@ -184,8 +196,9 @@ object MesosContainerFactory { "whisk-containerfactory-framework", mesosConfig.masterUrl, mesosConfig.role, - mesosConfig.failoverTimeout, - taskStore = new LocalTaskStore)) + mesosConfig.timeouts.failover, + taskStore = new LocalTaskStore, + refuseSeconds = mesosConfig.offerRefuseDuration.toSeconds.toDouble)) val counter = new Counter() val startTime = Instant.now.getEpochSecond diff --git a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala index 9b21903..b28979c 100644 --- a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala +++ b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala @@ -19,6 +19,9 @@ package whisk.core.mesos import akka.actor.ActorRef import akka.actor.ActorSystem +import akka.event.Logging.ErrorLevel +import akka.event.Logging.InfoLevel +import akka.pattern.AskTimeoutException import akka.pattern.ask import akka.stream.scaladsl.Source import akka.util.ByteString @@ -27,6 +30,7 @@ import com.adobe.api.platform.runtime.mesos.Bridge import com.adobe.api.platform.runtime.mesos.CommandDef import com.adobe.api.platform.runtime.mesos.Constraint import com.adobe.api.platform.runtime.mesos.DeleteTask +import com.adobe.api.platform.runtime.mesos.HealthCheckConfig import com.adobe.api.platform.runtime.mesos.Host import com.adobe.api.platform.runtime.mesos.Running import com.adobe.api.platform.runtime.mesos.SubmitTask @@ -37,10 +41,12 @@ import org.apache.mesos.v1.Protos.TaskState import org.apache.mesos.v1.Protos.TaskStatus import scala.concurrent.ExecutionContext import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.language.postfixOps +import scala.util.Failure +import scala.util.Success import spray.json._ import whisk.common.Logging +import whisk.common.LoggingMarkers +import whisk.common.MetricEmitter import whisk.common.TransactionId import whisk.core.containerpool.Container import whisk.core.containerpool.ContainerAddress @@ -61,8 +67,9 @@ case object Environment case class CreateContainer(image: String, memory: String, cpuShare: String) object MesosTask { - val taskLaunchTimeout = Timeout(45 seconds) - val taskDeleteTimeout = Timeout(30 seconds) + + val LAUNCH_CMD = "launch" + val KILL_CMD = "kill" def create(mesosClientActor: ActorRef, mesosConfig: MesosConfig, @@ -82,8 +89,6 @@ object MesosTask { as: ActorSystem): Future[Container] = { implicit val tid = transid - log.info(this, s"creating task for image $image...") - val mesosCpuShares = cpuShares / 1024.0 // convert openwhisk (docker based) shares to mesos (cpu percentage) val mesosRam = memory.toMB.toInt @@ -96,6 +101,17 @@ object MesosTask { } val dnsOrEmpty = if (dnsServers.nonEmpty) Map("dns" -> dnsServers.toSet) else Map.empty + //transform our config to mesos-actor config: + val healthCheckConfig = mesosConfig.healthCheck.map( + c => + HealthCheckConfig( + c.portIndex, + c.delay.toSeconds.toDouble, + c.interval.toSeconds.toDouble, + c.timeout.toSeconds.toDouble, + c.gracePeriod.toSeconds.toDouble, + c.maxConsecutiveFailures)) + //define task val task = new TaskDef( taskId, name.getOrElse(image), // task name either the indicated name, or else the image name @@ -103,24 +119,40 @@ object MesosTask { mesosCpuShares, mesosRam, List(8080), // all action containers listen on 8080 - Some(0), // port at index 0 used for health + healthCheckConfig, // port at index 0 used for health false, taskNetwork, dnsOrEmpty ++ parameters, Some(CommandDef(environment)), constraints.toSet) + val taskLaunchTimeout = Timeout(mesosConfig.timeouts.taskLaunch) + val start = transid.started( + this, + LoggingMarkers.INVOKER_MESOS_CMD(LAUNCH_CMD), + s"launching mesos task for taskid $taskId (image:$image, mem: $mesosRam, cpu: $mesosCpuShares) (timeout: $taskLaunchTimeout)", + logLevel = InfoLevel) + val launched: Future[Running] = mesosClientActor.ask(SubmitTask(task))(taskLaunchTimeout).mapTo[Running] - launched.map(taskDetails => { - val taskHost = taskDetails.hostname - val taskPort = taskDetails.hostports(0) - log.info(this, s"launched task with state ${taskDetails.taskStatus.getState} at ${taskHost}:${taskPort}") - val containerIp = new ContainerAddress(taskHost, taskPort) - val containerId = new ContainerId(taskId); - new MesosTask(containerId, containerIp, ec, log, as, taskId, mesosClientActor, mesosConfig) - }) + launched + .andThen { + case Success(taskDetails) => + transid.finished(this, start, s"launched task ${taskId} at ${taskDetails.hostname}:${taskDetails + .hostports(0)}", logLevel = InfoLevel) + case Failure(ate: AskTimeoutException) => + transid.failed(this, start, ate.getMessage, ErrorLevel) + MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(LAUNCH_CMD)) + case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel) + } + .map(taskDetails => { + val taskHost = taskDetails.hostname + val taskPort = taskDetails.hostports(0) + val containerIp = new ContainerAddress(taskHost, taskPort) + val containerId = new ContainerId(taskId); + new MesosTask(containerId, containerIp, ec, log, as, taskId, mesosClientActor, mesosConfig) + }) } @@ -132,13 +164,14 @@ object JsonFormatters extends DefaultJsonProtocol { class MesosTask(override protected val id: ContainerId, override protected val addr: ContainerAddress, - override protected val ec: ExecutionContext, - override protected val logging: Logging, + override protected implicit val ec: ExecutionContext, + override protected implicit val logging: Logging, override protected val as: ActorSystem, taskId: String, mesosClientActor: ActorRef, mesosConfig: MesosConfig) extends Container { + val taskDeleteTimeout = Timeout(mesosConfig.timeouts.taskLaunch) /** Stops the container from consuming CPU cycles. */ override def suspend()(implicit transid: TransactionId): Future[Unit] = { @@ -154,9 +187,22 @@ class MesosTask(override protected val id: ContainerId, /** Completely destroys this instance of the container. */ override def destroy()(implicit transid: TransactionId): Future[Unit] = { + val start = transid.started( + this, + LoggingMarkers.INVOKER_MESOS_CMD(MesosTask.KILL_CMD), + s"killing mesos taskid $taskId (timeout: ${taskDeleteTimeout})", + logLevel = InfoLevel) + mesosClientActor - .ask(DeleteTask(taskId))(MesosTask.taskDeleteTimeout) + .ask(DeleteTask(taskId))(taskDeleteTimeout) .mapTo[TaskStatus] + .andThen { + case Success(_) => transid.finished(this, start, logLevel = InfoLevel) + case Failure(ate: AskTimeoutException) => + transid.failed(this, start, ate.getMessage, ErrorLevel) + MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(MesosTask.KILL_CMD)) + case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel) + } .map(taskStatus => { // verify that task ended in TASK_KILLED state (but don't fail if it didn't...) if (taskStatus.getState != TaskState.TASK_KILLED) { diff --git a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala index 6500859..695a84c 100644 --- a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala +++ b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala @@ -57,6 +57,7 @@ import whisk.core.entity.ExecManifest.ImageName import whisk.core.entity.size._ import whisk.core.mesos.MesosConfig import whisk.core.mesos.MesosContainerFactory +import whisk.core.mesos.MesosTimeoutConfig @RunWith(classOf[JUnitRunner]) class MesosContainerFactoryTest extends TestKit(ActorSystem("MesosActorSystem")) @@ -89,11 +90,16 @@ class MesosContainerFactoryTest override def beforeEach() = { stream.reset() } + + val timeouts = MesosTimeoutConfig(1.seconds, 1.seconds, 1.seconds, 1.seconds, 1.seconds) + + val mesosConfig = + MesosConfig("http://master:5050", None, "*", true, Seq.empty, " ", Seq.empty, true, None, 1.seconds, timeouts) + behavior of "MesosContainerFactory" it should "send Subscribe on init" in { val wskConfig = new WhiskConfig(Map.empty) - val mesosConfig = MesosConfig("http://master:5050", None, "*", 0.seconds, true, Seq.empty, " ", Seq.empty, true) new MesosContainerFactory( wskConfig, system, @@ -111,12 +117,14 @@ class MesosContainerFactoryTest "http://master:5050", None, "*", - 0.seconds, true, Seq("att1 LIKE v1", "att2 UNLIKE v2"), " ", Seq("bbatt1 LIKE v1", "bbatt2 UNLIKE v2"), - true) + true, + None, + 1.seconds, + timeouts) val factory = new MesosContainerFactory( @@ -146,7 +154,7 @@ class MesosContainerFactoryTest mesosCpus, actionMemory.toMB.toInt, List(8080), - Some(0), + None, false, User("net1"), Map( @@ -161,8 +169,6 @@ class MesosContainerFactoryTest } it should "send DeleteTask on destroy" in { - val mesosConfig = MesosConfig("http://master:5050", None, "*", 0.seconds, true, Seq.empty, " ", Seq.empty, true) - val probe = TestProbe() val factory = new MesosContainerFactory( @@ -195,7 +201,7 @@ class MesosContainerFactoryTest mesosCpus, actionMemory.toMB.toInt, List(8080), - Some(0), + None, false, User("net1"), Map( @@ -232,8 +238,6 @@ class MesosContainerFactoryTest } it should "return static message for logs" in { - val mesosConfig = MesosConfig("http://master:5050", None, "*", 0.seconds, true, Seq.empty, " ", Seq.empty, true) - val probe = TestProbe() val factory = new MesosContainerFactory( @@ -267,7 +271,7 @@ class MesosContainerFactoryTest mesosCpus, actionMemory.toMB.toInt, List(8080), - Some(0), + None, false, Bridge, Map(