This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new f95f219 At most 10 `docker run` commands are allowed in parallel.
(#2995)
f95f219 is described below
commit f95f219f371002507ddd57291d56dbc8bf8f22ab
Author: Sven Lange-Last <[email protected]>
AuthorDate: Fri Nov 24 11:30:41 2017 +0100
At most 10 `docker run` commands are allowed in parallel. (#2995)
Docker < 1.13.1 has a known problem: if more than 10 containers are created
(`docker run`) concurrently, there is a good chance that some of them will
fail. See https://github.com/moby/moby/issues/29369
Use a semaphore to make sure that at most 10 `docker run` commands are
active the same time.
---
.../core/containerpool/docker/DockerClient.scala | 59 +++++++----
.../docker/test/DockerClientTests.scala | 109 +++++++++++++++++++--
2 files changed, 144 insertions(+), 24 deletions(-)
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
index bf94325..e6b3dab 100644
---
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
+++
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
@@ -20,18 +20,20 @@ package whisk.core.containerpool.docker
import java.io.FileNotFoundException
import java.nio.file.Files
import java.nio.file.Paths
+import java.util.concurrent.Semaphore
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.blocking
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import akka.event.Logging.ErrorLevel
+
import whisk.common.Logging
import whisk.common.LoggingMarkers
import whisk.common.TransactionId
-
-import scala.collection.concurrent.TrieMap
import whisk.core.containerpool.ContainerId
import whisk.core.containerpool.ContainerAddress
@@ -75,25 +77,46 @@ class DockerClient(dockerHost: Option[String] =
None)(executionContext: Executio
Seq(dockerBin) ++ host
}
+ protected val maxParallelRuns = 10
+ protected val runSemaphore = new Semaphore( /* permits= */ maxParallelRuns,
/* fair= */ true)
+
+ // Docker < 1.13.1 has a known problem: if more than 10 containers are
created (docker run)
+ // concurrently, there is a good chance that some of them will fail.
+ // See https://github.com/moby/moby/issues/29369
+ // Use a semaphore to make sure that at most 10 `docker run` commands are
active
+ // the same time.
def run(image: String, args: Seq[String] = Seq.empty[String])(
implicit transid: TransactionId): Future[ContainerId] = {
- runCmd((Seq("run", "-d") ++ args ++ Seq(image)): _*)
- .map {
- ContainerId(_)
- }
- .recoverWith {
- // https://docs.docker.com/v1.12/engine/reference/run/#/exit-status
- // Exit code 125 means an error reported by the Docker daemon.
- // Examples:
- // - Unrecognized option specified
- // - Not enough disk space
- case pre: ProcessRunningException if pre.exitCode == 125 =>
- Future.failed(
- DockerContainerId
- .parse(pre.stdout)
- .map(BrokenDockerContainer(_, s"Broken container:
${pre.getMessage}"))
- .getOrElse(pre))
+ Future {
+ blocking {
+ // Acquires a permit from this semaphore, blocking until one is
available, or the thread is interrupted.
+ // Throws InterruptedException if the current thread is interrupted
+ runSemaphore.acquire()
}
+ }.flatMap { _ =>
+ // Iff the semaphore was acquired successfully
+ runCmd((Seq("run", "-d") ++ args ++ Seq(image)): _*)
+ .andThen {
+ // Release the semaphore as quick as possible regardless of the
runCmd() result
+ case _ => runSemaphore.release()
+ }
+ .map {
+ ContainerId(_)
+ }
+ .recoverWith {
+ // https://docs.docker.com/v1.12/engine/reference/run/#/exit-status
+ // Exit code 125 means an error reported by the Docker daemon.
+ // Examples:
+ // - Unrecognized option specified
+ // - Not enough disk space
+ case pre: ProcessRunningException if pre.exitCode == 125 =>
+ Future.failed(
+ DockerContainerId
+ .parse(pre.stdout)
+ .map(BrokenDockerContainer(_, s"Broken container:
${pre.getMessage}"))
+ .getOrElse(pre))
+ }
+ }
}
def inspectIPAddress(id: ContainerId, network: String)(implicit transid:
TransactionId): Future[ContainerAddress] =
diff --git
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
index 8f447cf..9c2b5b3 100644
---
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
@@ -17,6 +17,8 @@
package whisk.core.containerpool.docker.test
+import java.util.concurrent.Semaphore
+
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
@@ -27,26 +29,31 @@ import scala.concurrent.Promise
import scala.util.Success
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
import org.scalatest.Matchers
+import org.scalatest.time.{Seconds, Span}
import common.StreamLogging
+
import whisk.common.LogMarker
import whisk.common.LoggingMarkers.INVOKER_DOCKER_CMD
import whisk.common.TransactionId
-import whisk.core.containerpool.docker.DockerClient
-import whisk.core.containerpool.ContainerId
import whisk.core.containerpool.ContainerAddress
-import whisk.utils.retry
-import whisk.core.containerpool.docker.ProcessRunningException
-import whisk.core.containerpool.docker.DockerContainerId
+import whisk.core.containerpool.ContainerId
import whisk.core.containerpool.docker.BrokenDockerContainer
+import whisk.core.containerpool.docker.DockerClient
+import whisk.core.containerpool.docker.DockerContainerId
+import whisk.core.containerpool.docker.ProcessRunningException
+import whisk.utils.retry
@RunWith(classOf[JUnitRunner])
-class DockerClientTests extends FlatSpec with Matchers with StreamLogging with
BeforeAndAfterEach {
+class DockerClientTests extends FlatSpec with Matchers with StreamLogging with
BeforeAndAfterEach with Eventually {
override def beforeEach = stream.reset()
+ implicit override val patienceConfig = PatienceConfig(timeout =
scaled(Span(5, Seconds)))
+
implicit val transid = TransactionId.testing
val id =
ContainerId("55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52")
@@ -169,6 +176,96 @@ class DockerClientTests extends FlatSpec with Matchers
with StreamLogging with B
}
}
+ it should "limit the number of concurrent docker run invocations" in {
+ // Delay execution of Docker run command
+ val firstRunPromise = Promise[String]()
+
+ val firstContainerId = ContainerId("1" * 64)
+ val secondContainerId = ContainerId("2" * 64)
+
+ var runCmdCount = 0
+ val dc = new DockerClient()(global) {
+ override val dockerCmd = Seq(dockerCommand)
+ override def executeProcess(args: String*)(implicit ec:
ExecutionContext) = {
+ runCmdCount += 1
+ runCmdCount match {
+ case 1 => firstRunPromise.future
+ case 2 => Future.successful(secondContainerId.asString)
+ case _ => Future.failed(new Throwable())
+ }
+ }
+ // Need to override the semaphore, otherwise the tested code will still
+ // create the semaphore with the original value of maxParallelRuns.
+ override val maxParallelRuns = 1
+ override val runSemaphore = new Semaphore( /* permits= */
maxParallelRuns, /* fair= */ true)
+ }
+
+ val image = "image"
+ val args = Seq("args")
+
+ val firstRunResult = dc.run(image, args)
+ val secondRunResult = dc.run(image, args)
+
+ // The tested code won't reach the mocked executeProcess() and thus,
increase runCmdCount,
+ // until at least one Future is successfully completed. For this reason,
it takes
+ // some time until the following matcher is successful.
+ eventually { runCmdCount shouldBe 1 }
+
+ // Complete the first Docker run command so that the second is eligible to
run
+ firstRunPromise.success(firstContainerId.asString)
+
+ // Cannot assert that the first Docker run always obtains the first
container because
+ // the tested code uses Futures so that sequence may differ from test run
to test run.
+ val firstResultContainerId = await(firstRunResult)
+
+ // Now, second command should be complete
+ eventually { runCmdCount shouldBe 2 }
+
+ val secondResultContainerId = await(secondRunResult)
+ Set(firstResultContainerId, secondResultContainerId) should contain
theSameElementsAs Set(
+ firstContainerId,
+ secondContainerId)
+ }
+
+ it should "tolerate docker run errors when limiting the number of concurrent
docker run invocations" in {
+ val secondContainerId = ContainerId("2" * 64)
+
+ var runCmdCount = 0
+ val dc = new DockerClient()(global) {
+ override val dockerCmd = Seq(dockerCommand)
+ override def executeProcess(args: String*)(implicit ec:
ExecutionContext) = {
+ runCmdCount += 1
+ println(s"runCmdCount=${runCmdCount}, args.last=${args.last}")
+ runCmdCount match {
+ case 1 => Future.failed(ProcessRunningException(1, "", ""))
+ case 2 => Future.successful(secondContainerId.asString)
+ case _ => Future.failed(new Throwable())
+ }
+ }
+ // Need to override the semaphore, otherwise the tested code will still
+ // create the semaphore with the original value of maxParallelRuns.
+ override val maxParallelRuns = 1
+ override val runSemaphore = new Semaphore( /* permits= */
maxParallelRuns, /* fair= */ true)
+ }
+
+ val image = "image"
+ val args = Seq("args")
+
+ // Kick off the first Docker run command - it will fail.
+ val firstRunResult = dc.run(image, args)
+
+ an[Exception] should be thrownBy await(firstRunResult)
+ runCmdCount shouldBe 1
+
+ // Now kick off the second Docker run command - it is expected to succeed.
+ // If this command completes without timeout, the concurrency limit
properly
+ // deals with errors.
+ val secondRunResult = dc.run(image, args)
+
+ await(secondRunResult) shouldBe secondContainerId
+ runCmdCount shouldBe 2
+ }
+
it should "write proper log markers on a successful command" in {
// a dummy string works here as we do not assert any output
// from the methods below
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].