This is an automated email from the ASF dual-hosted git repository.

chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new ba525f1  Mesos metrics (#4102)
ba525f1 is described below

commit ba525f1de5d253eafec467d8f9801d7636a976e2
Author: tysonnorris <tysonnor...@gmail.com>
AuthorDate: Wed Nov 7 10:18:04 2018 -0800

    Mesos metrics (#4102)
    
    * include launch/kill timing metrics for mesos containers; handle 
launch/kill timeouts
    * update mesos-actor version, include configurable healthcheck changes, 
make all timeouts configurable
---
 common/scala/build.gradle                          |  2 +-
 common/scala/src/main/resources/application.conf   | 17 ++++-
 .../src/main/scala/whisk/common/Logging.scala      |  4 ++
 .../whisk/core/mesos/MesosContainerFactory.scala   | 45 +++++++-----
 .../main/scala/whisk/core/mesos/MesosTask.scala    | 82 +++++++++++++++++-----
 .../mesos/test/MesosContainerFactoryTest.scala     | 24 ++++---
 6 files changed, 128 insertions(+), 46 deletions(-)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index aa62429..2d550ed 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -64,7 +64,7 @@ dependencies {
     compile 'io.kamon:kamon-core_2.12:0.6.7'
     compile 'io.kamon:kamon-statsd_2.12:0.6.7'
     //for mesos
-    compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.8_2.12'
+    compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.13'
 
     //tracing support
     compile 'io.opentracing:opentracing-api:0.31.0'
diff --git a/common/scala/src/main/resources/application.conf 
b/common/scala/src/main/resources/application.conf
index b7afb041..6f168dd 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -231,12 +231,27 @@ whisk {
         master-url = "http://localhost:5050"; //your mesos master
         master-public-url = "http://localhost:5050"; // if 
mesos-link-log-message == true, this link will be included with the static log 
message (may or may not be different from master-url)
         role = "*" //see 
http://mesos.apache.org/documentation/latest/roles/#associating-frameworks-with-roles
-        failover-timeout = 0 seconds  //Timeout allowed for framework to 
reconnect after disconnection.
         mesos-link-log-message = true //If true, display a link to mesos in 
the static log message, otherwise do not include a link to mesos.
         constraints = [] //placement constraint strings to use for managed 
containers e.g. ["att1 LIKE v1", "att2 UNLIKE v2"]
         blackbox-constraints = [] //placement constraints to use for blackbox 
containers
         constraint-delimiter = " "//used to parse constraint strings
         teardown-on-exit = true //set to true to disable the mesos framework 
on system exit; set for false for HA deployments
+        offer-refuse-duration = 5 seconds //minimum time until an offer will 
arrive again at a particular invoker
+        timeouts {
+            failover = 0 seconds  //Timeout allowed for framework to reconnect 
after disconnection.
+            task-launch = 45 seconds //timeout for creating mesos tasks 
(containers)
+            task-delete = 30 seconds //timeout for destroying mesos tasks 
(containers)
+            subscribe = 10 seconds //timeout for framework subscription 
handshake
+            teardown = 30 seconds //timeout for framework teardown
+        }
+        health-check {#Remove health-section section to disable healthchecks 
at action containers
+            port-index = 0 //should always be port 0 (action container should 
only listen on 1 port)
+            delay = 0 seconds //the amount of time (in seconds) to wait until 
starting checking the task.
+            interval = 1 seconds //the interval (in seconds) between check 
attempts.
+            timeout = 1 seconds //the amount of time (in seconds) to wait for 
the check to complete
+            grace-period = 25 seconds //the amount of time after the task is 
launched during which health check failures are ignored.
+            max-consecutive-failures = 3 //the number of consecutive failures 
until the task is killed by the executor.
+        }
     }
 
     logstore {
diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala 
b/common/scala/src/main/scala/whisk/common/Logging.scala
index c61f059..c60efc3 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -302,6 +302,10 @@ object LoggingMarkers {
     LogMarkerToken(invoker, "docker", timeout, Some(cmd), Map("cmd" -> cmd))
   def INVOKER_RUNC_CMD(cmd: String) = LogMarkerToken(invoker, "runc", start, 
Some(cmd), Map("cmd" -> cmd))
   def INVOKER_KUBECTL_CMD(cmd: String) = LogMarkerToken(invoker, "kubectl", 
start, Some(cmd), Map("cmd" -> cmd))
+  def INVOKER_MESOS_CMD(cmd: String) =
+    LogMarkerToken(invoker, "mesos", start, Some(cmd), Map("cmd" -> cmd))
+  def INVOKER_MESOS_CMD_TIMEOUT(cmd: String) =
+    LogMarkerToken(invoker, "mesos", timeout, Some(cmd), Map("cmd" -> cmd))
   def INVOKER_CONTAINER_START(containerState: String) =
     LogMarkerToken(invoker, "containerStart", count, Some(containerState), 
Map("containerState" -> containerState))
   val CONTAINER_CLIENT_RETRIES =
diff --git 
a/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala 
b/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala
index b29bbd8..820718b 100644
--- a/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala
@@ -51,22 +51,38 @@ import whisk.core.entity.InvokerInstanceId
 import whisk.core.entity.UUID
 
 /**
+ * Configuration for mesos timeouts
+ */
+case class MesosTimeoutConfig(failover: FiniteDuration,
+                              taskLaunch: FiniteDuration,
+                              taskDelete: FiniteDuration,
+                              subscribe: FiniteDuration,
+                              teardown: FiniteDuration)
+
+/**
+ * Configuration for mesos action container health checks
+ */
+case class MesosContainerHealthCheckConfig(portIndex: Int,
+                                           delay: FiniteDuration,
+                                           interval: FiniteDuration,
+                                           timeout: FiniteDuration,
+                                           gracePeriod: FiniteDuration,
+                                           maxConsecutiveFailures: Int)
+
+/**
  * Configuration for MesosClient
- * @param masterUrl The mesos url e.g. http://leader.mesos:5050.
- * @param masterPublicUrl A public facing mesos url (which may be different 
that the internal facing url) e.g. http://mymesos:5050.
- * @param role The role used by this framework (see 
http://mesos.apache.org/documentation/latest/roles/#associating-frameworks-with-roles).
- * @param failoverTimeout Timeout allowed for framework to reconnect after 
disconnection.
- * @param mesosLinkLogMessage If true, display a link to mesos in the static 
log message, otherwise do not include a link to mesos.
  */
 case class MesosConfig(masterUrl: String,
                        masterPublicUrl: Option[String],
                        role: String,
-                       failoverTimeout: FiniteDuration,
                        mesosLinkLogMessage: Boolean,
                        constraints: Seq[String],
                        constraintDelimiter: String,
                        blackboxConstraints: Seq[String],
-                       teardownOnExit: Boolean) {}
+                       teardownOnExit: Boolean,
+                       healthCheck: Option[MesosContainerHealthCheckConfig],
+                       offerRefuseDuration: FiniteDuration,
+                       timeouts: MesosTimeoutConfig) {}
 
 class MesosContainerFactory(config: WhiskConfig,
                             actorSystem: ActorSystem,
@@ -79,9 +95,6 @@ class MesosContainerFactory(config: WhiskConfig,
                             taskIdGenerator: () => String = 
MesosContainerFactory.taskIdGenerator _)
     extends ContainerFactory {
 
-  val subscribeTimeout = 10.seconds
-  val teardownTimeout = 30.seconds
-
   implicit val as: ActorSystem = actorSystem
   implicit val ec: ExecutionContext = actorSystem.dispatcher
 
@@ -94,7 +107,7 @@ class MesosContainerFactory(config: WhiskConfig,
   private def subscribe(): Future[Unit] = {
     logging.info(this, s"subscribing to Mesos master at 
${mesosConfig.masterUrl}")
     mesosClientActor
-      .ask(Subscribe)(subscribeTimeout)
+      .ask(Subscribe)(mesosConfig.timeouts.subscribe)
       .mapTo[SubscribeComplete]
       .map(complete => logging.info(this, s"subscribe completed 
successfully... $complete"))
       .recoverWith {
@@ -122,7 +135,6 @@ class MesosContainerFactory(config: WhiskConfig,
       mesosConfig.constraints
     }
 
-    logging.info(this, s"using Mesos to create a container with image 
$image...")
     MesosTask.create(
       mesosClientActor,
       mesosConfig,
@@ -165,8 +177,8 @@ class MesosContainerFactory(config: WhiskConfig,
 
   /** Cleanups any remaining Containers; should block until complete; should 
ONLY be run at shutdown. */
   override def cleanup(): Unit = {
-    val complete: Future[Any] = mesosClientActor.ask(Teardown)(teardownTimeout)
-    Try(Await.result(complete, teardownTimeout))
+    val complete: Future[Any] = 
mesosClientActor.ask(Teardown)(mesosConfig.timeouts.teardown)
+    Try(Await.result(complete, mesosConfig.timeouts.teardown))
       .map(_ => logging.info(this, "Mesos framework teardown completed."))
       .recover {
         case _: TimeoutException => logging.error(this, "Mesos framework 
teardown took too long.")
@@ -184,8 +196,9 @@ object MesosContainerFactory {
           "whisk-containerfactory-framework",
           mesosConfig.masterUrl,
           mesosConfig.role,
-          mesosConfig.failoverTimeout,
-          taskStore = new LocalTaskStore))
+          mesosConfig.timeouts.failover,
+          taskStore = new LocalTaskStore,
+          refuseSeconds = mesosConfig.offerRefuseDuration.toSeconds.toDouble))
 
   val counter = new Counter()
   val startTime = Instant.now.getEpochSecond
diff --git a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala 
b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
index 9b21903..b28979c 100644
--- a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
+++ b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
@@ -19,6 +19,9 @@ package whisk.core.mesos
 
 import akka.actor.ActorRef
 import akka.actor.ActorSystem
+import akka.event.Logging.ErrorLevel
+import akka.event.Logging.InfoLevel
+import akka.pattern.AskTimeoutException
 import akka.pattern.ask
 import akka.stream.scaladsl.Source
 import akka.util.ByteString
@@ -27,6 +30,7 @@ import com.adobe.api.platform.runtime.mesos.Bridge
 import com.adobe.api.platform.runtime.mesos.CommandDef
 import com.adobe.api.platform.runtime.mesos.Constraint
 import com.adobe.api.platform.runtime.mesos.DeleteTask
+import com.adobe.api.platform.runtime.mesos.HealthCheckConfig
 import com.adobe.api.platform.runtime.mesos.Host
 import com.adobe.api.platform.runtime.mesos.Running
 import com.adobe.api.platform.runtime.mesos.SubmitTask
@@ -37,10 +41,12 @@ import org.apache.mesos.v1.Protos.TaskState
 import org.apache.mesos.v1.Protos.TaskStatus
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.language.postfixOps
+import scala.util.Failure
+import scala.util.Success
 import spray.json._
 import whisk.common.Logging
+import whisk.common.LoggingMarkers
+import whisk.common.MetricEmitter
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
 import whisk.core.containerpool.ContainerAddress
@@ -61,8 +67,9 @@ case object Environment
 case class CreateContainer(image: String, memory: String, cpuShare: String)
 
 object MesosTask {
-  val taskLaunchTimeout = Timeout(45 seconds)
-  val taskDeleteTimeout = Timeout(30 seconds)
+
+  val LAUNCH_CMD = "launch"
+  val KILL_CMD = "kill"
 
   def create(mesosClientActor: ActorRef,
              mesosConfig: MesosConfig,
@@ -82,8 +89,6 @@ object MesosTask {
                                                        as: ActorSystem): 
Future[Container] = {
     implicit val tid = transid
 
-    log.info(this, s"creating task for image $image...")
-
     val mesosCpuShares = cpuShares / 1024.0 // convert openwhisk (docker 
based) shares to mesos (cpu percentage)
     val mesosRam = memory.toMB.toInt
 
@@ -96,6 +101,17 @@ object MesosTask {
     }
     val dnsOrEmpty = if (dnsServers.nonEmpty) Map("dns" -> dnsServers.toSet) 
else Map.empty
 
+    //transform our config to mesos-actor config:
+    val healthCheckConfig = mesosConfig.healthCheck.map(
+      c =>
+        HealthCheckConfig(
+          c.portIndex,
+          c.delay.toSeconds.toDouble,
+          c.interval.toSeconds.toDouble,
+          c.timeout.toSeconds.toDouble,
+          c.gracePeriod.toSeconds.toDouble,
+          c.maxConsecutiveFailures))
+    //define task
     val task = new TaskDef(
       taskId,
       name.getOrElse(image), // task name either the indicated name, or else 
the image name
@@ -103,24 +119,40 @@ object MesosTask {
       mesosCpuShares,
       mesosRam,
       List(8080), // all action containers listen on 8080
-      Some(0), // port at index 0 used for health
+      healthCheckConfig, // port at index 0 used for health
       false,
       taskNetwork,
       dnsOrEmpty ++ parameters,
       Some(CommandDef(environment)),
       constraints.toSet)
 
+    val taskLaunchTimeout = Timeout(mesosConfig.timeouts.taskLaunch)
+    val start = transid.started(
+      this,
+      LoggingMarkers.INVOKER_MESOS_CMD(LAUNCH_CMD),
+      s"launching mesos task for taskid $taskId (image:$image, mem: $mesosRam, 
cpu: $mesosCpuShares) (timeout: $taskLaunchTimeout)",
+      logLevel = InfoLevel)
+
     val launched: Future[Running] =
       mesosClientActor.ask(SubmitTask(task))(taskLaunchTimeout).mapTo[Running]
 
-    launched.map(taskDetails => {
-      val taskHost = taskDetails.hostname
-      val taskPort = taskDetails.hostports(0)
-      log.info(this, s"launched task with state 
${taskDetails.taskStatus.getState} at ${taskHost}:${taskPort}")
-      val containerIp = new ContainerAddress(taskHost, taskPort)
-      val containerId = new ContainerId(taskId);
-      new MesosTask(containerId, containerIp, ec, log, as, taskId, 
mesosClientActor, mesosConfig)
-    })
+    launched
+      .andThen {
+        case Success(taskDetails) =>
+          transid.finished(this, start, s"launched task ${taskId} at 
${taskDetails.hostname}:${taskDetails
+            .hostports(0)}", logLevel = InfoLevel)
+        case Failure(ate: AskTimeoutException) =>
+          transid.failed(this, start, ate.getMessage, ErrorLevel)
+          
MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(LAUNCH_CMD))
+        case Failure(t) => transid.failed(this, start, t.getMessage, 
ErrorLevel)
+      }
+      .map(taskDetails => {
+        val taskHost = taskDetails.hostname
+        val taskPort = taskDetails.hostports(0)
+        val containerIp = new ContainerAddress(taskHost, taskPort)
+        val containerId = new ContainerId(taskId);
+        new MesosTask(containerId, containerIp, ec, log, as, taskId, 
mesosClientActor, mesosConfig)
+      })
 
   }
 
@@ -132,13 +164,14 @@ object JsonFormatters extends DefaultJsonProtocol {
 
 class MesosTask(override protected val id: ContainerId,
                 override protected val addr: ContainerAddress,
-                override protected val ec: ExecutionContext,
-                override protected val logging: Logging,
+                override protected implicit val ec: ExecutionContext,
+                override protected implicit val logging: Logging,
                 override protected val as: ActorSystem,
                 taskId: String,
                 mesosClientActor: ActorRef,
                 mesosConfig: MesosConfig)
     extends Container {
+  val taskDeleteTimeout = Timeout(mesosConfig.timeouts.taskLaunch)
 
   /** Stops the container from consuming CPU cycles. */
   override def suspend()(implicit transid: TransactionId): Future[Unit] = {
@@ -154,9 +187,22 @@ class MesosTask(override protected val id: ContainerId,
 
   /** Completely destroys this instance of the container. */
   override def destroy()(implicit transid: TransactionId): Future[Unit] = {
+    val start = transid.started(
+      this,
+      LoggingMarkers.INVOKER_MESOS_CMD(MesosTask.KILL_CMD),
+      s"killing mesos taskid $taskId (timeout: ${taskDeleteTimeout})",
+      logLevel = InfoLevel)
+
     mesosClientActor
-      .ask(DeleteTask(taskId))(MesosTask.taskDeleteTimeout)
+      .ask(DeleteTask(taskId))(taskDeleteTimeout)
       .mapTo[TaskStatus]
+      .andThen {
+        case Success(_) => transid.finished(this, start, logLevel = InfoLevel)
+        case Failure(ate: AskTimeoutException) =>
+          transid.failed(this, start, ate.getMessage, ErrorLevel)
+          
MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(MesosTask.KILL_CMD))
+        case Failure(t) => transid.failed(this, start, t.getMessage, 
ErrorLevel)
+      }
       .map(taskStatus => {
         // verify that task ended in TASK_KILLED state (but don't fail if it 
didn't...)
         if (taskStatus.getState != TaskState.TASK_KILLED) {
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
 
b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
index 6500859..695a84c 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
@@ -57,6 +57,7 @@ import whisk.core.entity.ExecManifest.ImageName
 import whisk.core.entity.size._
 import whisk.core.mesos.MesosConfig
 import whisk.core.mesos.MesosContainerFactory
+import whisk.core.mesos.MesosTimeoutConfig
 @RunWith(classOf[JUnitRunner])
 class MesosContainerFactoryTest
     extends TestKit(ActorSystem("MesosActorSystem"))
@@ -89,11 +90,16 @@ class MesosContainerFactoryTest
   override def beforeEach() = {
     stream.reset()
   }
+
+  val timeouts = MesosTimeoutConfig(1.seconds, 1.seconds, 1.seconds, 
1.seconds, 1.seconds)
+
+  val mesosConfig =
+    MesosConfig("http://master:5050";, None, "*", true, Seq.empty, " ", 
Seq.empty, true, None, 1.seconds, timeouts)
+
   behavior of "MesosContainerFactory"
 
   it should "send Subscribe on init" in {
     val wskConfig = new WhiskConfig(Map.empty)
-    val mesosConfig = MesosConfig("http://master:5050";, None, "*", 0.seconds, 
true, Seq.empty, " ", Seq.empty, true)
     new MesosContainerFactory(
       wskConfig,
       system,
@@ -111,12 +117,14 @@ class MesosContainerFactoryTest
       "http://master:5050";,
       None,
       "*",
-      0.seconds,
       true,
       Seq("att1 LIKE v1", "att2 UNLIKE v2"),
       " ",
       Seq("bbatt1 LIKE v1", "bbatt2 UNLIKE v2"),
-      true)
+      true,
+      None,
+      1.seconds,
+      timeouts)
 
     val factory =
       new MesosContainerFactory(
@@ -146,7 +154,7 @@ class MesosContainerFactoryTest
         mesosCpus,
         actionMemory.toMB.toInt,
         List(8080),
-        Some(0),
+        None,
         false,
         User("net1"),
         Map(
@@ -161,8 +169,6 @@ class MesosContainerFactoryTest
   }
 
   it should "send DeleteTask on destroy" in {
-    val mesosConfig = MesosConfig("http://master:5050";, None, "*", 0.seconds, 
true, Seq.empty, " ", Seq.empty, true)
-
     val probe = TestProbe()
     val factory =
       new MesosContainerFactory(
@@ -195,7 +201,7 @@ class MesosContainerFactoryTest
         mesosCpus,
         actionMemory.toMB.toInt,
         List(8080),
-        Some(0),
+        None,
         false,
         User("net1"),
         Map(
@@ -232,8 +238,6 @@ class MesosContainerFactoryTest
   }
 
   it should "return static message for logs" in {
-    val mesosConfig = MesosConfig("http://master:5050";, None, "*", 0.seconds, 
true, Seq.empty, " ", Seq.empty, true)
-
     val probe = TestProbe()
     val factory =
       new MesosContainerFactory(
@@ -267,7 +271,7 @@ class MesosContainerFactoryTest
         mesosCpus,
         actionMemory.toMB.toInt,
         List(8080),
-        Some(0),
+        None,
         false,
         Bridge,
         Map(

Reply via email to