cbickel closed pull request #3094: Bound docker/runc commands in their allowed
runtime.
URL: https://github.com/apache/incubator-openwhisk/pull/3094
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/invoker/src/main/resources/application.conf
b/core/invoker/src/main/resources/application.conf
new file mode 100644
index 0000000000..e7ed0a64ea
--- /dev/null
+++ b/core/invoker/src/main/resources/application.conf
@@ -0,0 +1,22 @@
+# common logging configuration see common scala
+include "logging"
+include "akka-http-version"
+
+whisk {
+ # Timeouts for docker commands. Set to "Inf" to disable timeout.
+ docker.timeouts {
+ run: 1 minute
+ rm: 1 minute
+ pull: 10 minutes
+ ps: 1 minute
+ inspect: 1 minute
+ pause: 10 seconds
+ unpause: 10 seconds
+ }
+
+ # Timeouts for runc commands. Set to "Inf" to disable timeout.
+ runc.timeouts {
+ pause: 10 seconds
+ resume: 10 seconds
+ }
+}
\ No newline at end of file
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
index e6b3dabe95..4ed631d8ed 100644
---
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
+++
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
@@ -22,6 +22,8 @@ import java.nio.file.Files
import java.nio.file.Paths
import java.util.concurrent.Semaphore
+import akka.actor.ActorSystem
+
import scala.collection.concurrent.TrieMap
import scala.concurrent.blocking
import scala.concurrent.ExecutionContext
@@ -30,13 +32,15 @@ import scala.util.Failure
import scala.util.Success
import scala.util.Try
import akka.event.Logging.ErrorLevel
-
+import pureconfig.loadConfigOrThrow
import whisk.common.Logging
import whisk.common.LoggingMarkers
import whisk.common.TransactionId
import whisk.core.containerpool.ContainerId
import whisk.core.containerpool.ContainerAddress
+import scala.concurrent.duration.Duration
+
object DockerContainerId {
val containerIdRegex = """^([0-9a-f]{64})$""".r
@@ -49,6 +53,17 @@ object DockerContainerId {
}
}
+/**
+ * Configuration for docker client command timeouts.
+ */
+case class DockerClientTimeoutConfig(run: Duration,
+ rm: Duration,
+ pull: Duration,
+ ps: Duration,
+ pause: Duration,
+ unpause: Duration,
+ inspect: Duration)
+
/**
* Serves as interface to the docker CLI tool.
*
@@ -57,7 +72,10 @@ object DockerContainerId {
*
* You only need one instance (and you shouldn't get more).
*/
-class DockerClient(dockerHost: Option[String] = None)(executionContext:
ExecutionContext)(implicit log: Logging)
+class DockerClient(dockerHost: Option[String] = None,
+ timeouts: DockerClientTimeoutConfig =
+
loadConfigOrThrow[DockerClientTimeoutConfig]("whisk.docker.timeouts"))(
+ executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
extends DockerApi
with ProcessRunner {
implicit private val ec = executionContext
@@ -95,14 +113,12 @@ class DockerClient(dockerHost: Option[String] =
None)(executionContext: Executio
}
}.flatMap { _ =>
// Iff the semaphore was acquired successfully
- runCmd((Seq("run", "-d") ++ args ++ Seq(image)): _*)
+ runCmd(Seq("run", "-d") ++ args ++ Seq(image), timeouts.run)
.andThen {
// Release the semaphore as quick as possible regardless of the
runCmd() result
case _ => runSemaphore.release()
}
- .map {
- ContainerId(_)
- }
+ .map(ContainerId.apply)
.recoverWith {
// https://docs.docker.com/v1.12/engine/reference/run/#/exit-status
// Exit code 125 means an error reported by the Docker daemon.
@@ -120,28 +136,28 @@ class DockerClient(dockerHost: Option[String] =
None)(executionContext: Executio
}
def inspectIPAddress(id: ContainerId, network: String)(implicit transid:
TransactionId): Future[ContainerAddress] =
- runCmd("inspect", "--format",
s"{{.NetworkSettings.Networks.${network}.IPAddress}}", id.asString).flatMap {
- _ match {
- case "<no value>" => Future.failed(new NoSuchElementException)
- case stdout => Future.successful(ContainerAddress(stdout))
- }
+ runCmd(
+ Seq("inspect", "--format",
s"{{.NetworkSettings.Networks.${network}.IPAddress}}", id.asString),
+ timeouts.inspect).flatMap {
+ case "<no value>" => Future.failed(new NoSuchElementException)
+ case stdout => Future.successful(ContainerAddress(stdout))
}
def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
- runCmd("pause", id.asString).map(_ => ())
+ runCmd(Seq("pause", id.asString), timeouts.pause).map(_ => ())
def unpause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
- runCmd("unpause", id.asString).map(_ => ())
+ runCmd(Seq("unpause", id.asString), timeouts.unpause).map(_ => ())
def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
- runCmd("rm", "-f", id.asString).map(_ => ())
+ runCmd(Seq("rm", "-f", id.asString), timeouts.rm).map(_ => ())
def ps(filters: Seq[(String, String)] = Seq(), all: Boolean = false)(
implicit transid: TransactionId): Future[Seq[ContainerId]] = {
- val filterArgs = filters.map { case (attr, value) => Seq("--filter",
s"$attr=$value") }.flatten
+ val filterArgs = filters.flatMap { case (attr, value) => Seq("--filter",
s"$attr=$value") }
val allArg = if (all) Seq("--all") else Seq.empty[String]
val cmd = Seq("ps", "--quiet", "--no-trunc") ++ allArg ++ filterArgs
- runCmd(cmd: _*).map(_.lines.toSeq.map(ContainerId.apply))
+ runCmd(cmd, timeouts.ps).map(_.lines.toSeq.map(ContainerId.apply))
}
/**
@@ -152,16 +168,19 @@ class DockerClient(dockerHost: Option[String] =
None)(executionContext: Executio
private val pullsInFlight = TrieMap[String, Future[Unit]]()
def pull(image: String)(implicit transid: TransactionId): Future[Unit] =
pullsInFlight.getOrElseUpdate(image, {
- runCmd("pull", image).map(_ => ()).andThen { case _ =>
pullsInFlight.remove(image) }
+ runCmd(Seq("pull", image), timeouts.pull).map(_ => ()).andThen { case _
=> pullsInFlight.remove(image) }
})
def isOomKilled(id: ContainerId)(implicit transid: TransactionId):
Future[Boolean] =
- runCmd("inspect", id.asString, "--format",
"{{.State.OOMKilled}}").map(_.toBoolean)
+ runCmd(Seq("inspect", id.asString, "--format", "{{.State.OOMKilled}}"),
timeouts.inspect).map(_.toBoolean)
- private def runCmd(args: String*)(implicit transid: TransactionId):
Future[String] = {
+ private def runCmd(args: Seq[String], timeout: Duration)(implicit transid:
TransactionId): Future[String] = {
val cmd = dockerCmd ++ args
- val start = transid.started(this,
LoggingMarkers.INVOKER_DOCKER_CMD(args.head), s"running ${cmd.mkString(" ")}")
- executeProcess(cmd: _*).andThen {
+ val start = transid.started(
+ this,
+ LoggingMarkers.INVOKER_DOCKER_CMD(args.head),
+ s"running ${cmd.mkString(" ")} (timeout: $timeout)")
+ executeProcess(cmd, timeout).andThen {
case Success(_) => transid.finished(this, start)
case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
}
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
index 1644e2e183..5487f5d3a9 100644
---
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
+++
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
@@ -20,6 +20,7 @@ package whisk.core.containerpool.docker
import java.io.File
import java.nio.file.Paths
+import akka.actor.ActorSystem
import akka.stream.alpakka.file.scaladsl.FileTailSource
import akka.stream.scaladsl.{FileIO, Source => AkkaSource}
import akka.util.ByteString
@@ -37,10 +38,10 @@ import whisk.core.containerpool.ContainerAddress
import scala.io.Source
import scala.concurrent.duration.FiniteDuration
-class DockerClientWithFileAccess(
- dockerHost: Option[String] = None,
- containersDirectory: File =
Paths.get("containers").toFile)(executionContext: ExecutionContext)(implicit
log: Logging)
- extends DockerClient(dockerHost)(executionContext)(log)
+class DockerClientWithFileAccess(dockerHost: Option[String] = None,
+ containersDirectory: File =
Paths.get("containers").toFile)(
+ executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
+ extends DockerClient(dockerHost)(executionContext)
with DockerApiWithFileAccess {
implicit private val ec = executionContext
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
index af486f277e..ffd18c9885 100644
---
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
+++
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
@@ -41,7 +41,7 @@ class DockerContainerFactory(config: WhiskConfig, instance:
InstanceId, paramete
/** Initialize container clients */
implicit val docker = new DockerClientWithFileAccess()(ec)
- implicit val runc = new RuncClient(ec)
+ implicit val runc = new RuncClient()(ec)
/** Create a container using docker cli */
override def createContainer(tid: TransactionId,
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
index f8139b4cdc..b27e62fde9 100644
---
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
+++
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/ProcessRunner.scala
@@ -17,10 +17,13 @@
package whisk.core.containerpool.docker
+import akka.actor.ActorSystem
+
import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.blocking
+import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.sys.process._
trait ProcessRunner {
@@ -29,26 +32,37 @@ trait ProcessRunner {
* Runs the specified command with arguments asynchronously and
* capture stdout as well as stderr.
*
+ * If not set to infinite, after timeout is reached the process is killed.
+ *
* Be cautious with the execution context you pass because the command
* is blocking.
*
* @param args command to be run including arguments
+ * @param timeout maximum time the command is allowed to take
* @return a future completing according to the command's exit code
*/
- protected def executeProcess(args: String*)(implicit ec: ExecutionContext) =
+ protected def executeProcess(args: Seq[String], timeout: Duration)(implicit
ec: ExecutionContext, as: ActorSystem) =
Future(blocking {
val out = new mutable.ListBuffer[String]
val err = new mutable.ListBuffer[String]
- val exitCode = args ! ProcessLogger(o => out += o, e => err += e)
+ val process = args.run(ProcessLogger(o => out += o, e => err += e))
- (exitCode, out.mkString("\n"), err.mkString("\n"))
+ val scheduled = timeout match {
+ case t: FiniteDuration =>
Some(as.scheduler.scheduleOnce(t)(process.destroy()))
+ case _ => None
+ }
+
+ (process.exitValue(), out.mkString("\n"), err.mkString("\n"), scheduled)
}).flatMap {
- case (0, stdout, _) =>
+ case (0, stdout, _, scheduled) =>
+ scheduled.foreach(_.cancel())
Future.successful(stdout)
- case (code, stdout, stderr) =>
+ case (code, stdout, stderr, scheduled) =>
+ scheduled.foreach(_.cancel())
Future.failed(ProcessRunningException(code, stdout, stderr))
}
+
}
case class ProcessRunningException(exitCode: Int, stdout: String, stderr:
String)
- extends Exception(s"code: $exitCode, stdout: $stdout, stderr: $stderr")
+ extends Exception(s"code: $exitCode ${if (exitCode == 143) "(killed)" else
""}, stdout: $stdout, stderr: $stderr")
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
index c398765590..526bfc4702 100644
---
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
+++
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala
@@ -17,16 +17,27 @@
package whisk.core.containerpool.docker
+import akka.actor.ActorSystem
+
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import scala.util.Failure
import whisk.common.TransactionId
+
import scala.util.Success
import whisk.common.LoggingMarkers
import whisk.common.Logging
import akka.event.Logging.ErrorLevel
+import pureconfig.loadConfigOrThrow
import whisk.core.containerpool.ContainerId
+import scala.concurrent.duration.Duration
+
+/**
+ * Configuration for runc client command timeouts.
+ */
+case class RuncClientTimeouts(pause: Duration, resume: Duration)
+
/**
* Serves as interface to the docker CLI tool.
*
@@ -35,22 +46,29 @@ import whisk.core.containerpool.ContainerId
*
* You only need one instance (and you shouldn't get more).
*/
-class RuncClient(executionContext: ExecutionContext)(implicit log: Logging)
extends RuncApi with ProcessRunner {
+class RuncClient(timeouts: RuncClientTimeouts =
loadConfigOrThrow[RuncClientTimeouts]("whisk.runc.timeouts"))(
+ executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
+ extends RuncApi
+ with ProcessRunner {
implicit private val ec = executionContext
// Determines how to run docker. Failure to find a Docker binary implies
// a failure to initialize this instance of DockerClient.
protected val runcCmd: Seq[String] = Seq("/usr/bin/docker-runc")
- def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
runCmd("pause", id.asString).map(_ => ())
+ def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
+ runCmd(Seq("pause", id.asString), timeouts.pause).map(_ => ())
def resume(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
- runCmd("resume", id.asString).map(_ => ())
+ runCmd(Seq("resume", id.asString), timeouts.resume).map(_ => ())
- private def runCmd(args: String*)(implicit transid: TransactionId):
Future[String] = {
+ private def runCmd(args: Seq[String], timeout: Duration)(implicit transid:
TransactionId): Future[String] = {
val cmd = runcCmd ++ args
- val start = transid.started(this,
LoggingMarkers.INVOKER_RUNC_CMD(args.head), s"running ${cmd.mkString(" ")}")
- executeProcess(cmd: _*).andThen {
+ val start = transid.started(
+ this,
+ LoggingMarkers.INVOKER_RUNC_CMD(args.head),
+ s"running ${cmd.mkString(" ")} (timeout: $timeout)")
+ executeProcess(cmd, timeout).andThen {
case Success(_) => transid.finished(this, start)
case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
}
diff --git
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
index 9c2b5b3239..267865d329 100644
---
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
@@ -17,14 +17,15 @@
package whisk.core.containerpool.docker.test
+import akka.actor.ActorSystem
+
import java.util.concurrent.Semaphore
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
-import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration._
import scala.concurrent.Promise
import scala.util.Success
import org.junit.runner.RunWith
@@ -34,8 +35,7 @@ import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
import org.scalatest.Matchers
import org.scalatest.time.{Seconds, Span}
-import common.StreamLogging
-
+import common.{StreamLogging, WskActorSystem}
import whisk.common.LogMarker
import whisk.common.LoggingMarkers.INVOKER_DOCKER_CMD
import whisk.common.TransactionId
@@ -48,7 +48,13 @@ import
whisk.core.containerpool.docker.ProcessRunningException
import whisk.utils.retry
@RunWith(classOf[JUnitRunner])
-class DockerClientTests extends FlatSpec with Matchers with StreamLogging with
BeforeAndAfterEach with Eventually {
+class DockerClientTests
+ extends FlatSpec
+ with Matchers
+ with StreamLogging
+ with BeforeAndAfterEach
+ with Eventually
+ with WskActorSystem {
override def beforeEach = stream.reset()
@@ -65,7 +71,8 @@ class DockerClientTests extends FlatSpec with Matchers with
StreamLogging with B
/** Returns a DockerClient with a mocked result for 'executeProcess' */
def dockerClient(execResult: => Future[String]) = new DockerClient()(global)
{
override val dockerCmd = Seq(dockerCommand)
- override def executeProcess(args: String*)(implicit ec: ExecutionContext)
= execResult
+ override def executeProcess(args: Seq[String], timeout: Duration)(implicit
ec: ExecutionContext, as: ActorSystem) =
+ execResult
}
behavior of "DockerContainerId"
@@ -186,7 +193,8 @@ class DockerClientTests extends FlatSpec with Matchers with
StreamLogging with B
var runCmdCount = 0
val dc = new DockerClient()(global) {
override val dockerCmd = Seq(dockerCommand)
- override def executeProcess(args: String*)(implicit ec:
ExecutionContext) = {
+ override def executeProcess(args: Seq[String], timeout:
Duration)(implicit ec: ExecutionContext,
+ as:
ActorSystem) = {
runCmdCount += 1
runCmdCount match {
case 1 => firstRunPromise.future
@@ -233,7 +241,8 @@ class DockerClientTests extends FlatSpec with Matchers with
StreamLogging with B
var runCmdCount = 0
val dc = new DockerClient()(global) {
override val dockerCmd = Seq(dockerCommand)
- override def executeProcess(args: String*)(implicit ec:
ExecutionContext) = {
+ override def executeProcess(args: Seq[String], timeout:
Duration)(implicit ec: ExecutionContext,
+ as:
ActorSystem) = {
runCmdCount += 1
println(s"runCmdCount=${runCmdCount}, args.last=${args.last}")
runCmdCount match {
diff --git
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
index 895461d2e1..2918c8e5e7 100644
---
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
@@ -19,21 +19,20 @@ package whisk.core.containerpool.docker.test
import java.io.File
+import akka.actor.ActorSystem
+
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
-import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
-import scala.language.reflectiveCalls // Needed to invoke
publicIpAddressFromFile() method of structural dockerClientForIp extension
-
+import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
+import scala.language.reflectiveCalls
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterEach
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
import org.scalatest.Matchers
-
-import common.StreamLogging
+import common.{StreamLogging, WskActorSystem}
import spray.json._
import spray.json.DefaultJsonProtocol._
import whisk.common.TransactionId
@@ -42,7 +41,12 @@ import whisk.core.containerpool.ContainerAddress
import whisk.core.containerpool.docker.DockerClientWithFileAccess
@RunWith(classOf[JUnitRunner])
-class DockerClientWithFileAccessTestsIp extends FlatSpec with Matchers with
StreamLogging with BeforeAndAfterEach {
+class DockerClientWithFileAccessTestsIp
+ extends FlatSpec
+ with Matchers
+ with StreamLogging
+ with BeforeAndAfterEach
+ with WskActorSystem {
override def beforeEach = stream.reset()
@@ -69,7 +73,8 @@ class DockerClientWithFileAccessTestsIp extends FlatSpec with
Matchers with Stre
readResult: Future[JsObject] =
Future.successful(dockerConfig)) =
new DockerClientWithFileAccess()(global) {
override val dockerCmd = Seq(dockerCommand)
- override def executeProcess(args: String*)(implicit ec:
ExecutionContext) = execResult
+ override def executeProcess(args: Seq[String], timeout:
Duration)(implicit ec: ExecutionContext,
+ as:
ActorSystem) = execResult
override def configFileContents(configFile: File) = readResult
// Make protected ipAddressFromFile available for testing - requires
reflectiveCalls
def publicIpAddressFromFile(id: ContainerId, network: String):
Future[ContainerAddress] =
@@ -108,7 +113,12 @@ class DockerClientWithFileAccessTestsIp extends FlatSpec
with Matchers with Stre
}
@RunWith(classOf[JUnitRunner])
-class DockerClientWithFileAccessTestsOom extends FlatSpec with Matchers with
StreamLogging with BeforeAndAfterEach {
+class DockerClientWithFileAccessTestsOom
+ extends FlatSpec
+ with Matchers
+ with StreamLogging
+ with BeforeAndAfterEach
+ with WskActorSystem {
override def beforeEach = stream.reset()
implicit val transid = TransactionId.testing
diff --git
a/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala
index b2ac40fe19..4abe61fd5a 100644
---
a/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/docker/test/ProcessRunnerTests.scala
@@ -17,35 +17,41 @@
package whisk.core.containerpool.docker.test
-import scala.concurrent.Future
+import akka.actor.ActorSystem
+import common.WskActorSystem
+import scala.concurrent.Future
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
import scala.concurrent.ExecutionContext.Implicits.global
import whisk.core.containerpool.docker.ProcessRunner
+
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.concurrent.Await
import org.scalatest.Matchers
import whisk.core.containerpool.docker.ProcessRunningException
+
import scala.language.reflectiveCalls // Needed to invoke run() method of
structural ProcessRunner extension
@RunWith(classOf[JUnitRunner])
-class ProcessRunnerTests extends FlatSpec with Matchers {
+class ProcessRunnerTests extends FlatSpec with Matchers with WskActorSystem {
- def await[A](f: Future[A], timeout: FiniteDuration = 500.milliseconds) =
Await.result(f, timeout)
+ def await[A](f: Future[A], timeout: FiniteDuration = 2.seconds) =
Await.result(f, timeout)
val processRunner = new ProcessRunner {
- def run(args: String*)(implicit ec: ExecutionContext) =
executeProcess(args: _*)
+ def run(args: Seq[String], timeout: FiniteDuration =
100.milliseconds)(implicit ec: ExecutionContext,
+ as:
ActorSystem) =
+ executeProcess(args, timeout)(ec, as)
}
behavior of "ProcessRunner"
it should "run an external command successfully and capture its output" in {
val stdout = "Output"
- await(processRunner.run("echo", stdout)) shouldBe stdout
+ await(processRunner.run(Seq("echo", stdout))) shouldBe stdout
}
it should "run an external command unsuccessfully and capture its output" in
{
@@ -53,8 +59,14 @@ class ProcessRunnerTests extends FlatSpec with Matchers {
val stdout = "Output"
val stderr = "Error"
- val future = processRunner.run("/bin/sh", "-c", s"echo ${stdout}; echo
${stderr} 1>&2; exit ${exitCode}")
+ val future = processRunner.run(Seq("/bin/sh", "-c", s"echo ${stdout}; echo
${stderr} 1>&2; exit ${exitCode}"))
the[ProcessRunningException] thrownBy await(future) shouldBe
ProcessRunningException(exitCode, stdout, stderr)
}
+
+ it should "terminate an external command after the specified timeout is
reached" in {
+ val future = processRunner.run(Seq("sleep", "1"), 100.milliseconds)
+ val exception = the[ProcessRunningException] thrownBy await(future)
+ exception.exitCode shouldBe 143
+ }
}
diff --git
a/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
index b4e2e47515..cf5990b208 100644
---
a/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala
@@ -17,8 +17,9 @@
package whisk.core.containerpool.docker.test
-import scala.concurrent.Future
+import akka.actor.ActorSystem
+import scala.concurrent.Future
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
@@ -29,7 +30,7 @@ import scala.concurrent.duration._
import scala.concurrent.Await
import org.scalatest.Matchers
import whisk.core.containerpool.docker.RuncClient
-import common.StreamLogging
+import common.{StreamLogging, WskActorSystem}
import whisk.core.containerpool.ContainerId
import whisk.common.TransactionId
import org.scalatest.BeforeAndAfterEach
@@ -37,7 +38,7 @@ import whisk.common.LogMarker
import whisk.common.LoggingMarkers.INVOKER_RUNC_CMD
@RunWith(classOf[JUnitRunner])
-class RuncClientTests extends FlatSpec with Matchers with StreamLogging with
BeforeAndAfterEach {
+class RuncClientTests extends FlatSpec with Matchers with StreamLogging with
BeforeAndAfterEach with WskActorSystem {
override def beforeEach = stream.reset()
@@ -49,9 +50,10 @@ class RuncClientTests extends FlatSpec with Matchers with
StreamLogging with Bef
val runcCommand = "docker-runc"
/** Returns a RuncClient with a mocked result for 'executeProcess' */
- def runcClient(result: Future[String]) = new RuncClient(global) {
+ def runcClient(result: Future[String]) = new RuncClient()(global) {
override val runcCmd = Seq(runcCommand)
- override def executeProcess(args: String*)(implicit ec: ExecutionContext)
= result
+ override def executeProcess(args: Seq[String], timeout: Duration)(implicit
ec: ExecutionContext, as: ActorSystem) =
+ result
}
/** Calls a runc method based on the name of the method. */
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services