This is an automated email from the ASF dual-hosted git repository. rabbah pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new f05058a Collapse multiple concurrent pulls of the same image. (#2626) f05058a is described below commit f05058a959848d6524601f6814ac527561c32d46 Author: Markus Thömmes <markusthoem...@me.com> AuthorDate: Thu Aug 17 03:27:58 2017 +0200 Collapse multiple concurrent pulls of the same image. (#2626) Multiple concurrent calls of `docker pull` to the same image create unnecessary network traffic. Instead, all of those calls are collapsed into the first calls. After that call finished, a subsequent pull will result in a `docker pull` again to enable updates to the same image, without having to work with tags for convenience. --- .../core/containerpool/docker/DockerClient.scala | 11 +++++++- .../docker/test/DockerClientTests.scala | 33 +++++++++++++++++++++- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala index 166a618..bf94b8e 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala @@ -31,6 +31,7 @@ import akka.event.Logging.ErrorLevel import whisk.common.Logging import whisk.common.LoggingMarkers import whisk.common.TransactionId +import scala.collection.concurrent.TrieMap /** * Serves as interface to the docker CLI tool. @@ -86,8 +87,16 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext: Executio runCmd(cmd: _*).map(_.lines.toSeq.map(ContainerId.apply)) } + /** + * Stores pulls that are currently being executed and collapses multiple + * pulls into just one. After a pull is finished, the cached future is removed + * to enable constant updates of an image without changing its tag. + */ + private val pullsInFlight = TrieMap[String, Future[Unit]]() def pull(image: String)(implicit transid: TransactionId): Future[Unit] = - runCmd("pull", image).map(_ => ()) + pullsInFlight.getOrElseUpdate(image, { + runCmd("pull", image).map(_ => pullsInFlight.remove(image)) + }) private def runCmd(args: String*)(implicit transid: TransactionId): Future[String] = { val cmd = dockerCmd ++ args diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala index 09843c4..07e31ce 100644 --- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala @@ -37,6 +37,8 @@ import whisk.common.TransactionId import whisk.core.containerpool.docker.ContainerId import whisk.core.containerpool.docker.ContainerIp import whisk.core.containerpool.docker.DockerClient +import scala.concurrent.Promise +import whisk.utils.retry @RunWith(classOf[JUnitRunner]) class DockerClientTests extends FlatSpec with Matchers with StreamLogging with BeforeAndAfterEach { @@ -51,7 +53,7 @@ class DockerClientTests extends FlatSpec with Matchers with StreamLogging with B val dockerCommand = "docker" /** Returns a DockerClient with a mocked result for 'executeProcess' */ - def dockerClient(execResult: Future[String]) = new DockerClient()(global) { + def dockerClient(execResult: => Future[String]) = new DockerClient()(global) { override val dockerCmd = Seq(dockerCommand) override def executeProcess(args: String*)(implicit ec: ExecutionContext) = execResult } @@ -81,6 +83,35 @@ class DockerClientTests extends FlatSpec with Matchers with StreamLogging with B a[NoSuchElementException] should be thrownBy await(dc.inspectIPAddress(id, "foo network")) } + it should "collapse multiple parallel pull calls into just one" in { + // Delay execution of the pull command + val pullPromise = Promise[String]() + var commandsRun = 0 + val dc = dockerClient { + commandsRun += 1 + pullPromise.future + } + + val image = "testimage" + + // Pull first, command should be run + dc.pull(image) + commandsRun shouldBe 1 + + // Pull again, command should not be run + dc.pull(image) + commandsRun shouldBe 1 + + // Finish the pulls above + pullPromise.success("pulled") + + retry { + // Pulling again should execute the command again + await(dc.pull(image)) + commandsRun shouldBe 2 + } + } + it should "write proper log markers on a successful command" in { // a dummy string works here as we do not assert any output // from the methods below -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].