This is an automated email from the ASF dual-hosted git repository.
chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 996d001 Diagnostic info and metrics for Docker command failures and
timeouts (#4070)
996d001 is described below
commit 996d00190553a42b451167200885521ee0dc491b
Author: Sven Lange-Last <[email protected]>
AuthorDate: Fri Oct 26 08:33:19 2018 +0200
Diagnostic info and metrics for Docker command failures and timeouts (#4070)
This change improves diagnostic information for failing Docker commands as
well as timed out Docker commands:
* For all failures (including timeouts), a textual representation for exit
status values is logged.
* Timeouts are explicitly detected and reported using a specialized
exception allowing for a better timeout handling on higher layers of the Docker
container implementation.
* Emit counter metric on Docker command timeout
This change introduces a set of new counter metrics that are emitted if a
Docker command is terminated because of a timeout. A high number of such
timeout occurrences is usually an indication for highly loaded invokers. The
new metrics help to identify such invokers.
---
.../src/main/scala/whisk/common/Logging.scala | 3 +
.../core/containerpool/docker/DockerClient.scala | 11 +--
.../core/containerpool/docker/ProcessRunner.scala | 84 ++++++++++++++++++++--
docs/metrics.md | 10 ++-
.../docker/test/DockerClientTests.scala | 51 +++++++++----
.../docker/test/DockerContainerTests.scala | 2 +-
.../docker/test/ProcessRunnerTests.scala | 50 ++++++++++---
.../kubernetes/test/KubernetesContainerTests.scala | 2 +-
8 files changed, 174 insertions(+), 39 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala
b/common/scala/src/main/scala/whisk/common/Logging.scala
index 19642f9..d0457f8 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -243,6 +243,7 @@ object LoggingMarkers {
val finish = "finish"
val error = "error"
val count = "count"
+ val timeout = "timeout"
private val controller = "controller"
private val invoker = "invoker"
@@ -296,6 +297,8 @@ object LoggingMarkers {
// Time in invoker
val INVOKER_ACTIVATION = LogMarkerToken(invoker, activation, start)
def INVOKER_DOCKER_CMD(cmd: String) = LogMarkerToken(invoker, "docker",
start, Some(cmd), Map("cmd" -> cmd))
+ def INVOKER_DOCKER_CMD_TIMEOUT(cmd: String) =
+ LogMarkerToken(invoker, "docker", timeout, Some(cmd), Map("cmd" -> cmd))
def INVOKER_RUNC_CMD(cmd: String) = LogMarkerToken(invoker, "runc", start,
Some(cmd), Map("cmd" -> cmd))
def INVOKER_KUBECTL_CMD(cmd: String) = LogMarkerToken(invoker, "kubectl",
start, Some(cmd), Map("cmd" -> cmd))
def INVOKER_CONTAINER_START(containerState: String) =
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
index 8a87e37..371b38a 100644
---
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
+++
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
@@ -33,9 +33,7 @@ import scala.util.Success
import scala.util.Try
import akka.event.Logging.{ErrorLevel, InfoLevel}
import pureconfig.loadConfigOrThrow
-import whisk.common.Logging
-import whisk.common.LoggingMarkers
-import whisk.common.TransactionId
+import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
import whisk.core.ConfigKeys
import whisk.core.containerpool.ContainerId
import whisk.core.containerpool.ContainerAddress
@@ -127,11 +125,11 @@ class DockerClient(dockerHost: Option[String] = None,
.map(ContainerId.apply)
.recoverWith {
// https://docs.docker.com/v1.12/engine/reference/run/#/exit-status
- // Exit code 125 means an error reported by the Docker daemon.
+ // Exit status 125 means an error reported by the Docker daemon.
// Examples:
// - Unrecognized option specified
// - Not enough disk space
- case pre: ProcessRunningException if pre.exitCode == 125 =>
+ case pre: ProcessUnsuccessfulException if pre.exitStatus ==
ExitStatus(125) =>
Future.failed(
DockerContainerId
.parse(pre.stdout)
@@ -189,6 +187,9 @@ class DockerClient(dockerHost: Option[String] = None,
logLevel = InfoLevel)
executeProcess(cmd, timeout).andThen {
case Success(_) => transid.finished(this, start)
+ case Failure(pte: ProcessTimeoutException) =>
+ transid.failed(this, start, pte.getMessage, ErrorLevel)
+
MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_DOCKER_CMD_TIMEOUT(args.head))
case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
}
}
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
index b27e62f..f943b80 100644
---
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
+++
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
@@ -32,7 +32,7 @@ trait ProcessRunner {
* Runs the specified command with arguments asynchronously and
* capture stdout as well as stderr.
*
- * If not set to infinite, after timeout is reached the process is killed.
+ * If not set to infinite, after timeout is reached the process is
terminated.
*
* Be cautious with the execution context you pass because the command
* is blocking.
@@ -52,17 +52,87 @@ trait ProcessRunner {
case _ => None
}
- (process.exitValue(), out.mkString("\n"), err.mkString("\n"), scheduled)
+ (ExitStatus(process.exitValue()), out.mkString("\n"),
err.mkString("\n"), scheduled)
}).flatMap {
- case (0, stdout, _, scheduled) =>
+ case (ExitStatus(0), stdout, _, scheduled) =>
scheduled.foreach(_.cancel())
Future.successful(stdout)
- case (code, stdout, stderr, scheduled) =>
+ case (exitStatus, stdout, stderr, scheduled) =>
scheduled.foreach(_.cancel())
- Future.failed(ProcessRunningException(code, stdout, stderr))
+ timeout match {
+ case _: FiniteDuration if exitStatus.terminatedBySIGTERM =>
+ Future.failed(ProcessTimeoutException(timeout, exitStatus, stdout,
stderr))
+ case _ => Future.failed(ProcessUnsuccessfulException(exitStatus,
stdout, stderr))
+ }
+ }
+}
+
+object ExitStatus {
+ // Based on The Open Group Base Specifications Issue 7, 2018 edition:
+ // Shell & Utilities - Shell Command Language - 2.8.2 Exit Status for
Commands
+ //
http://pubs.opengroup.org/onlinepubs/9699919799/utilities/V3_chap02.html#tag_18_08_02
+ val STATUS_SUCCESSFUL = 0
+ val STATUS_NOT_EXECUTABLE = 126
+ val STATUS_NOT_FOUND = 127
+ // When a command is stopped by a signal, the exit status is 128 + signal
numer
+ val STATUS_SIGNAL = 128
+
+ // Based on The Open Group Base Specifications Issue 7, 2018 edition:
+ // Shell & Utilities - Utilities - kill
+ // http://pubs.opengroup.org/onlinepubs/9699919799/utilities/kill.html
+ val SIGHUP = 1
+ val SIGINT = 2
+ val SIGQUIT = 3
+ val SIGABRT = 6
+ val SIGKILL = 9
+ val SIGALRM = 14
+ val SIGTERM = 15
+}
+
+case class ExitStatus(statusValue: Int) {
+
+ import ExitStatus._
+
+ override def toString(): String = {
+ def signalAsString(signal: Int): String = {
+ signal match {
+ case SIGHUP => "SIGHUP"
+ case SIGINT => "SIGINT"
+ case SIGQUIT => "SIGQUIT"
+ case SIGABRT => "SIGABRT"
+ case SIGKILL => "SIGKILL"
+ case SIGALRM => "SIGALRM"
+ case SIGTERM => "SIGTERM"
+ case _ => signal.toString
+ }
+ }
+
+ val detail = statusValue match {
+ case STATUS_SUCCESSFUL => "successful"
+ case STATUS_NOT_EXECUTABLE => "not executable"
+ case STATUS_NOT_FOUND => "not found"
+ case _ if statusValue >= ExitStatus.STATUS_SIGNAL =>
+ "terminated by signal " + signalAsString(statusValue -
ExitStatus.STATUS_SIGNAL)
+ case _ => "unsuccessful"
}
+ s"$statusValue ($detail)"
+ }
+
+ val successful = statusValue == ExitStatus.STATUS_SUCCESSFUL
+ val terminatedBySIGTERM = (statusValue - ExitStatus.STATUS_SIGNAL) ==
ExitStatus.SIGTERM
}
-case class ProcessRunningException(exitCode: Int, stdout: String, stderr:
String)
- extends Exception(s"code: $exitCode ${if (exitCode == 143) "(killed)" else
""}, stdout: $stdout, stderr: $stderr")
+abstract class ProcessRunningException(info: String, val exitStatus:
ExitStatus, val stdout: String, val stderr: String)
+ extends Exception(s"info: $info, code: $exitStatus, stdout: $stdout,
stderr: $stderr")
+
+case class ProcessUnsuccessfulException(override val exitStatus: ExitStatus,
+ override val stdout: String,
+ override val stderr: String)
+ extends ProcessRunningException("command was unsuccessful", exitStatus,
stdout, stderr)
+
+case class ProcessTimeoutException(timeout: Duration,
+ override val exitStatus: ExitStatus,
+ override val stdout: String,
+ override val stderr: String)
+ extends ProcessRunningException(s"command was terminated, took longer than
$timeout", exitStatus, stdout, stderr)
diff --git a/docs/metrics.md b/docs/metrics.md
index 9cfd887..4bf3520 100644
--- a/docs/metrics.md
+++ b/docs/metrics.md
@@ -182,34 +182,40 @@ Metrics below are for invoker state as recorded within
load balancer monitoring.
Following metrics capture stats around various docker command executions.
-* Pause
+* pause
* `openwhisk.counter.invoker_docker.pause_start`
* `openwhisk.counter.invoker_docker.pause_error`
+ * `openwhisk.counter.invoker_docker.pause_timeout`
* `openwhisk.histogram.invoker_docker.pause_finish`
* `openwhisk.histogram.invoker_docker.pause_error`
-* Ps
+* ps
* `openwhisk.counter.invoker_docker.ps_start`
* `openwhisk.counter.invoker_docker.ps_error`
+ * `openwhisk.counter.invoker_docker.ps_timeout`
* `openwhisk.histogram.invoker_docker.ps_finish`
* `openwhisk.histogram.invoker_docker.ps_error`
* pull
* `openwhisk.counter.invoker_docker.pull_start`
* `openwhisk.counter.invoker_docker.pull_error`
+ * `openwhisk.counter.invoker_docker.pull_timeout`
* `openwhisk.histogram.invoker_docker.pull_finish`
* `openwhisk.histogram.invoker_docker.pull_error`
* rm
* `openwhisk.counter.invoker_docker.rm_start`
* `openwhisk.counter.invoker_docker.rm_error`
+ * `openwhisk.counter.invoker_docker.rm_timeout`
* `openwhisk.histogram.invoker_docker.rm_finish`
* `openwhisk.histogram.invoker_docker.rm_error`
* run
* `openwhisk.counter.invoker_docker.run_start`
* `openwhisk.counter.invoker_docker.run_error`
+ * `openwhisk.counter.invoker_docker.run_timeout`
* `openwhisk.histogram.invoker_docker.run_finish`
* `openwhisk.histogram.invoker_docker.run_error`
* unpause
* `openwhisk.counter.invoker_docker.unpause_start`
* `openwhisk.counter.invoker_docker.unpause_error`
+ * `openwhisk.counter.invoker_docker.unpause_timeout`
* `openwhisk.histogram.invoker_docker.unpause_finish`
* `openwhisk.histogram.invoker_docker.unpause_error`
diff --git
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
index 200bc99..66637f8 100644
---
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
@@ -18,7 +18,6 @@
package whisk.core.containerpool.docker.test
import akka.actor.ActorSystem
-
import java.util.concurrent.Semaphore
import scala.concurrent.Await
@@ -41,10 +40,7 @@ import whisk.common.LoggingMarkers.INVOKER_DOCKER_CMD
import whisk.common.TransactionId
import whisk.core.containerpool.ContainerAddress
import whisk.core.containerpool.ContainerId
-import whisk.core.containerpool.docker.BrokenDockerContainer
-import whisk.core.containerpool.docker.DockerClient
-import whisk.core.containerpool.docker.DockerContainerId
-import whisk.core.containerpool.docker.ProcessRunningException
+import whisk.core.containerpool.docker._
import whisk.utils.retry
@RunWith(classOf[JUnitRunner])
@@ -246,7 +242,7 @@ class DockerClientTests
runCmdCount += 1
println(s"runCmdCount=${runCmdCount}, args.last=${args.last}")
runCmdCount match {
- case 1 => Future.failed(ProcessRunningException(1, "", ""))
+ case 1 => Future.failed(ProcessUnsuccessfulException(ExitStatus(1),
"", ""))
case 2 => Future.successful(secondContainerId.asString)
case _ => Future.failed(new Throwable())
}
@@ -337,11 +333,11 @@ class DockerClientTests
runAndVerify(dc.pull("image"), "pull")
}
- it should "fail with BrokenDockerContainer when run returns with exit code
125 and a container ID" in {
+ it should "fail with BrokenDockerContainer when run returns with exit status
125 and a container ID" in {
val dc = dockerClient {
Future.failed(
- ProcessRunningException(
- exitCode = 125,
+ ProcessUnsuccessfulException(
+ exitStatus = ExitStatus(125),
stdout = id.asString,
stderr =
"""/usr/bin/docker: Error response from daemon: mkdir
/var/run/docker.1.1/libcontainerd.1.1/55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52:
no space left on device."""))
@@ -350,19 +346,44 @@ class DockerClientTests
bdc.id shouldBe id
}
- it should "fail with ProcessRunningException when run returns with exit code
!=125 or no container ID" in {
+ it should "fail with ProcessRunningException when run returns with exit code
!=125, no container ID or timeout" in {
def runAndVerify(pre: ProcessRunningException, clue: String) = {
val dc = dockerClient { Future.failed(pre) }
- withClue(s"${clue} - exitCode = ${pre.exitCode}, stdout =
'${pre.stdout}', stderr = '${pre.stderr}': ") {
+ withClue(s"${clue} - exitStatus = ${pre.exitStatus}, stdout =
'${pre.stdout}', stderr = '${pre.stderr}': ") {
the[ProcessRunningException] thrownBy await(dc.run("image",
Seq.empty)) shouldBe pre
}
}
Seq[(ProcessRunningException, String)](
- (ProcessRunningException(126, id.asString, "Unknown command"), "Exit
code not 125"),
- (ProcessRunningException(125, "", "Unknown flag: --foo"), "No container
ID"),
- (ProcessRunningException(1, "", ""), "Exit code not 125 and no container
ID")).foreach {
- case (pre, clue) => runAndVerify(pre, clue)
+ (ProcessUnsuccessfulException(ExitStatus(127), id.asString, "Unknown
command"), "Exit code not 125"),
+ (ProcessUnsuccessfulException(ExitStatus(125), "", "Unknown flag:
--foo"), "No container ID"),
+ (ProcessUnsuccessfulException(ExitStatus(1), "", ""), "Exit code not 125
and no container ID"),
+ (ProcessTimeoutException(1.second, ExitStatus(125), id.asString, ""),
"Timeout instead of unsuccessful command"))
+ .foreach {
+ case (pre, clue) => runAndVerify(pre, clue)
+ }
+ }
+
+ it should "fail with ProcessTimeoutException when command times out" in {
+ val expectedPTE =
+ ProcessTimeoutException(timeout = 10.seconds, exitStatus =
ExitStatus(143), stdout = "stdout", stderr = "stderr")
+ val dc = dockerClient {
+ Future.failed(expectedPTE)
}
+ Seq[(Future[_], String)](
+ (dc.run("image", Seq.empty), "run"),
+ (dc.inspectIPAddress(id, "network"), "inspectIPAddress"),
+ (dc.pause(id), "pause"),
+ (dc.unpause(id), "unpause"),
+ (dc.rm(id), "rm"),
+ (dc.ps(), "ps"),
+ (dc.pull("image"), "pull"),
+ (dc.isOomKilled(id), "isOomKilled"))
+ .foreach {
+ case (cmd, clue) =>
+ withClue(s"command '$clue' - ") {
+ the[ProcessTimeoutException] thrownBy await(cmd) shouldBe
expectedPTE
+ }
+ }
}
}
diff --git
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 6730e8c..c0e5e37 100644
---
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -214,7 +214,7 @@ class DockerContainerTests
override def run(image: String,
args: Seq[String] = Seq.empty[String])(implicit
transid: TransactionId): Future[ContainerId] = {
runs += ((image, args))
- Future.failed(ProcessRunningException(1, "", ""))
+ Future.failed(ProcessUnsuccessfulException(ExitStatus(1), "", ""))
}
}
implicit val runc = stub[RuncApi]
diff --git
a/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala
index 4abe61f..4527373 100644
---
a/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala
@@ -26,13 +26,12 @@ import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
import scala.concurrent.ExecutionContext.Implicits.global
-import whisk.core.containerpool.docker.ProcessRunner
+import whisk.core.containerpool.docker._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.concurrent.Await
import org.scalatest.Matchers
-import whisk.core.containerpool.docker.ProcessRunningException
import scala.language.reflectiveCalls // Needed to invoke run() method of
structural ProcessRunner extension
@@ -55,18 +54,53 @@ class ProcessRunnerTests extends FlatSpec with Matchers
with WskActorSystem {
}
it should "run an external command unsuccessfully and capture its output" in
{
- val exitCode = 1
+ val exitStatus = ExitStatus(1)
val stdout = "Output"
val stderr = "Error"
- val future = processRunner.run(Seq("/bin/sh", "-c", s"echo ${stdout}; echo
${stderr} 1>&2; exit ${exitCode}"))
+ val future =
+ processRunner.run(Seq("/bin/sh", "-c", s"echo ${stdout}; echo ${stderr}
1>&2; exit ${exitStatus.statusValue}"))
- the[ProcessRunningException] thrownBy await(future) shouldBe
ProcessRunningException(exitCode, stdout, stderr)
+ val exception = the[ProcessRunningException] thrownBy await(future)
+ exception shouldBe ProcessUnsuccessfulException(exitStatus, stdout, stderr)
+ exception.getMessage should startWith("info: command was unsuccessful")
}
it should "terminate an external command after the specified timeout is
reached" in {
- val future = processRunner.run(Seq("sleep", "1"), 100.milliseconds)
- val exception = the[ProcessRunningException] thrownBy await(future)
- exception.exitCode shouldBe 143
+ val timeout = 100.milliseconds
+ // Run "sleep" command for 1 second and make sure that stdout and stderr
are dropped
+ val future = processRunner.run(Seq("/bin/sh", "-c", "sleep 1 1>/dev/null
2>/dev/null"), timeout)
+ val exception = the[ProcessTimeoutException] thrownBy await(future)
+ exception shouldBe ProcessTimeoutException(timeout, ExitStatus(143), "",
"")
+ exception.getMessage should startWith(s"info: command was terminated, took
longer than $timeout")
+ }
+
+ behavior of "ExitStatus"
+
+ it should "provide a proper textual representation" in {
+ Seq[(Int, String)](
+ (0, "successful"),
+ (1, "unsuccessful"),
+ (125, "unsuccessful"),
+ (126, "not executable"),
+ (127, "not found"),
+ (128, "terminated by signal 0"),
+ (129, "terminated by signal SIGHUP"),
+ (130, "terminated by signal SIGINT"),
+ (131, "terminated by signal SIGQUIT"),
+ (134, "terminated by signal SIGABRT"),
+ (137, "terminated by signal SIGKILL"),
+ (142, "terminated by signal SIGALRM"),
+ (143, "terminated by signal SIGTERM"),
+ (144, "terminated by signal 16")).foreach {
+ case (statusValue, detailText) =>
+ ExitStatus(statusValue).toString shouldBe s"$statusValue ($detailText)"
+ }
+ }
+
+ it should "properly classify exit status" in {
+ withClue("Exit status 0 is successful - ") { ExitStatus(0).successful
shouldBe true }
+ withClue("Exit status 1 is not successful - ") { ExitStatus(1).successful
shouldBe false }
+ withClue("Exit status 143 means terminated by SIGTERM - ") {
ExitStatus(143).terminatedBySIGTERM shouldBe true }
}
}
diff --git
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index 227d3c9..573b581 100644
---
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -173,7 +173,7 @@ class KubernetesContainerTests
env: Map[String, String] = Map.empty,
labels: Map[String, String] = Map.empty)(implicit transid:
TransactionId): Future[KubernetesContainer] = {
runs += ((name, image, env, labels))
- Future.failed(ProcessRunningException(1, "", ""))
+ Future.failed(ProcessUnsuccessfulException(ExitStatus(1), "", ""))
}
}