kpavel closed pull request #4082: Rename prewarmed containers
URL: https://github.com/apache/incubator-openwhisk/pull/4082
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
index 3167176abd..37a2fbafef 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
@@ -64,6 +64,7 @@ object Container {
trait Container {
+ protected var _name: String
implicit protected val as: ActorSystem
protected val id: ContainerId
protected val addr: ContainerAddress
@@ -73,6 +74,8 @@ trait Container {
/** HTTP connection to the container, will be lazily established by
callContainer */
protected var httpConnection: Option[ContainerClient] = None
+ def name = _name
+
/** Stops the container from consuming CPU cycles. */
def suspend()(implicit transid: TransactionId): Future[Unit] = {
//close connection first, then close connection pool
@@ -199,6 +202,10 @@ trait Container {
RunResult(Interval(started, finished), response)
}
}
+
+ /** Rename container. */
+ def rename(name: String)(implicit transid: TransactionId): Future[Unit]
+
private def closeConnections(toClose: Option[ContainerClient]): Future[Unit]
= {
toClose.map(_.close()).getOrElse(Future.successful(()))
}
diff --git a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
index 9b21903a24..36f459fcf3 100644
--- a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
+++ b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala
@@ -119,7 +119,7 @@ object MesosTask {
log.info(this, s"launched task with state
${taskDetails.taskStatus.getState} at ${taskHost}:${taskPort}")
val containerIp = new ContainerAddress(taskHost, taskPort)
val containerId = new ContainerId(taskId);
- new MesosTask(containerId, containerIp, ec, log, as, taskId,
mesosClientActor, mesosConfig)
+ new MesosTask(name.get, containerId, containerIp, ec, log, as, taskId,
mesosClientActor, mesosConfig)
})
}
@@ -130,7 +130,8 @@ object JsonFormatters extends DefaultJsonProtocol {
implicit val createContainerJson = jsonFormat3(CreateContainer)
}
-class MesosTask(override protected val id: ContainerId,
+class MesosTask(override protected var _name: String,
+ override protected val id: ContainerId,
override protected val addr: ContainerAddress,
override protected val ec: ExecutionContext,
override protected val logging: Logging,
@@ -182,4 +183,9 @@ class MesosTask(override protected val id: ContainerId,
override def logs(limit: ByteSize, waitForSentinel: Boolean)(
implicit transid: TransactionId): Source[ByteString, Any] =
Source.single(ByteString(LogLine(logMsg, "stdout",
Instant.now.toString).toJson.compactPrint))
+
+ /** Rename container. */
+ override def rename(name: String)(implicit transid: TransactionId):
Future[Unit] =
+ // rename currently not supported
+ Future.successful(Unit)
}
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index b34ce587d4..99699b6ece 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -330,6 +330,19 @@ class ContainerProxy(
goto(Removing)
}
+ /**
+ * Checks whether container name is "prewarmed"
+ *
+ * @param suffix the container name's suffix
+ * @return a unique container name
+ */
+ def isPrewarmed(name: String, instance: InvokerInstanceId, suffix: String):
Boolean = {
+ val sanitizedSuffix = suffix.filter(ContainerProxy.isAllowed)
+
+ return
name.startsWith(s"${ContainerFactory.containerNamePrefix(instance)}_") &&
name.endsWith(
+ s"_prewarm_${sanitizedSuffix}")
+ }
+
/**
* Runs the job, initialize first if necessary.
* Completes the job by:
@@ -349,7 +362,15 @@ class ContainerProxy(
// Only initialize iff we haven't yet warmed the container
val initialize = stateData match {
case data: WarmedData => Future.successful(None)
- case _ =>
container.initialize(job.action.containerInitializer,
actionTimeout).map(Some(_))
+ case _ =>
+ // check if container has generic "prewarm" name
+ if (isPrewarmed(container.name, instance, job.action.exec.kind)) {
+ // now rename container to action based name
+ val newName =
+ ContainerProxy.containerName(instance,
job.msg.user.namespace.name.asString, job.action.name.asString)
+ container.rename(newName)
+ }
+ container.initialize(job.action.containerInitializer,
actionTimeout).map(Some(_))
}
val activation: Future[WhiskActivation] = initialize
@@ -450,6 +471,8 @@ object ContainerProxy {
val timeouts =
loadConfigOrThrow[ContainerProxyTimeoutConfig](ConfigKeys.containerProxyTimeouts)
+ def isAllowed(c: Char): Boolean = c.isLetterOrDigit || c == '_'
+
/**
* Generates a unique container name.
*
@@ -458,8 +481,6 @@ object ContainerProxy {
* @return a unique container name
*/
def containerName(instance: InvokerInstanceId, prefix: String, suffix:
String): String = {
- def isAllowed(c: Char): Boolean = c.isLetterOrDigit || c == '_'
-
val sanitizedPrefix = prefix.filter(isAllowed)
val sanitizedSuffix = suffix.filter(isAllowed)
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
index 371b38a40b..2cd54f4047 100644
---
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
+++
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
@@ -193,6 +193,9 @@ class DockerClient(dockerHost: Option[String] = None,
case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
}
}
+
+ def rename(id: ContainerId, name: String)(implicit transid: TransactionId):
Future[Unit] =
+ runCmd(Seq("rename", id.asString, name), config.timeouts.pause).map(_ =>
())
}
trait DockerApi {
@@ -269,6 +272,15 @@ trait DockerApi {
* @return a Future containing whether the container was killed or not
*/
def isOomKilled(id: ContainerId)(implicit transid: TransactionId):
Future[Boolean]
+
+ /**
+ * Rename container.
+ *
+ * @param id the id of the container to rename
+ * @param name new container name
+ * @return a Future completing once the rename is complete
+ */
+ def rename(id: ContainerId, name: String)(implicit transid: TransactionId):
Future[Unit]
}
/** Indicates any error while starting a container that leaves a broken
container behind that needs to be removed */
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index 47b6d2439f..cd810851eb 100644
---
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -39,6 +39,7 @@ import spray.json._
import whisk.core.containerpool.logging.LogLine
import whisk.core.entity.ExecManifest.ImageName
import whisk.http.Messages
+import scala.util.{Failure, Success}
object DockerContainer {
@@ -141,7 +142,7 @@ object DockerContainer {
docker.rm(id)
Future.failed(WhiskContainerStartupError(Messages.resourceProvisionError))
}
- } yield new DockerContainer(id, ip, useRunc)
+ } yield new DockerContainer(name.get, id, ip, useRunc)
}
}
@@ -155,7 +156,8 @@ object DockerContainer {
* @param id the id of the container
* @param addr the ip of the container
*/
-class DockerContainer(protected val id: ContainerId,
+class DockerContainer(protected var _name: String,
+ protected val id: ContainerId,
protected val addr: ContainerAddress,
protected val useRunc: Boolean)(implicit docker:
DockerApiWithFileAccess,
runc: RuncApi,
@@ -181,6 +183,15 @@ class DockerContainer(protected val id: ContainerId,
docker.rm(id)
}
+ def rename(newName: String)(implicit transid: TransactionId): Future[Unit] =
{
+ docker.rename(id, newName).andThen {
+ case Success(_) =>
+ _name = newName
+ case Failure(e) =>
+ logging.error(this, s"Failed to rename container: $e")
+ }
+ }
+
/**
* Was the container killed due to memory exhaustion?
*
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
index c6e5233062..5f08020615 100644
---
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
+++
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -224,7 +224,7 @@ class KubernetesClient(
// By convention, kubernetes adds a docker:// prefix when using docker as
the low-level container engine
val nativeContainerId =
pod.getStatus.getContainerStatuses.get(0).getContainerID.stripPrefix("docker://")
implicit val kubernetes = this
- new KubernetesContainer(id, addr, workerIP, nativeContainerId)
+ new KubernetesContainer(id.asString, id, addr, workerIP, nativeContainerId)
}
}
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
index 5ccec760f9..7898c86921 100644
---
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -86,7 +86,8 @@ object KubernetesContainer {
* @param workerIP the ip of the workernode on which the container is executing
* @param nativeContainerId the docker/containerd lowlevel id for the container
*/
-class KubernetesContainer(protected[core] val id: ContainerId,
+class KubernetesContainer(protected[core] var _name: String,
+ protected[core] val id: ContainerId,
protected[core] val addr: ContainerAddress,
protected[core] val workerIP: String,
protected[core] val nativeContainerId:
String)(implicit kubernetes: KubernetesApi,
@@ -138,4 +139,9 @@ class KubernetesContainer(protected[core] val id:
ContainerId,
.takeWithin(waitForLogs)
.map { _.toByteString }
}
+
+ /** Rename container. */
+ override def rename(name: String)(implicit transid: TransactionId):
Future[Unit] =
+ // rename currently not supported
+ Future.successful(Unit)
}
diff --git
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index c0e5e376d8..c5105019f7 100644
---
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -106,7 +106,7 @@ class DockerContainerTests
Future.successful(RunResult(intervalOf(1.millisecond),
Right(ContainerResponse(true, "", None)))),
awaitLogs: FiniteDuration = 2.seconds)(implicit docker:
DockerApiWithFileAccess, runc: RuncApi): DockerContainer = {
- new DockerContainer(id, addr, true) {
+ new DockerContainer("test", id, addr, true) {
override protected def callContainer(
path: String,
body: JsObject,
@@ -181,7 +181,11 @@ class DockerContainerTests
implicit val runc = stub[RuncApi]
val container =
- DockerContainer.create(transid = transid, image =
Left(ImageName("image")), dockerRunParameters = parameters)
+ DockerContainer.create(
+ transid = transid,
+ image = Left(ImageName("image")),
+ name = Some("myContainer"),
+ dockerRunParameters = parameters)
await(container)
docker.pulls should have size 1
@@ -303,6 +307,7 @@ class DockerContainerTests
DockerContainer.create(
transid = transid,
image = Left(ImageName("image", tag = Some("prod"))),
+ name = Some("myContainer"),
dockerRunParameters = parameters)
noException should be thrownBy await(container)
@@ -347,7 +352,7 @@ class DockerContainerTests
implicit val runc = new TestRuncClient
val id = ContainerId("id")
- val container = new DockerContainer(id, ContainerAddress("ip"), true)
+ val container = new DockerContainer("test", id, ContainerAddress("ip"),
true)
val suspend = container.suspend()
val resume = container.resume()
@@ -367,7 +372,7 @@ class DockerContainerTests
implicit val runc = new TestRuncClient
val id = ContainerId("id")
- val container = new DockerContainer(id, ContainerAddress("ip"), false)
+ val container = new DockerContainer("test", id, ContainerAddress("ip"),
false)
val suspend = container.suspend()
val resume = container.resume()
@@ -387,7 +392,7 @@ class DockerContainerTests
implicit val runc = stub[RuncApi]
val id = ContainerId("id")
- val container = new DockerContainer(id, ContainerAddress("ip"), true)
+ val container = new DockerContainer("test", id, ContainerAddress("ip"),
true)
container.destroy()
@@ -849,5 +854,7 @@ class DockerContainerTests
rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
Source.single(ByteString.empty)
}
+
+ def rename(id: ContainerId, name: String)(implicit transid:
TransactionId): Future[Unit] = Future.successful(())
}
}
diff --git
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index 3336e831b1..d13a3f6093 100644
---
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -87,7 +87,7 @@ class KubernetesClientTests
}
def kubernetesContainer(id: ContainerId) =
- new KubernetesContainer(id, ContainerAddress("ip"), "ip", "docker://" +
id.asString)(kubernetesClient {
+ new KubernetesContainer("test", id, ContainerAddress("ip"), "ip",
"docker://" + id.asString)(kubernetesClient {
Future.successful("")
}, actorSystem, global, logging)
@@ -117,7 +117,7 @@ class KubernetesClientTests
it should "forward suspend commands to the client" in {
implicit val kubernetes = new TestKubernetesClient
val id = ContainerId("id")
- val container = new KubernetesContainer(id, ContainerAddress("ip"),
"127.0.0.1", "docker://foo")
+ val container = new KubernetesContainer("test", id,
ContainerAddress("ip"), "127.0.0.1", "docker://foo")
await(container.suspend())
kubernetes.suspends should have size 1
kubernetes.suspends(0) shouldBe id
@@ -126,7 +126,7 @@ class KubernetesClientTests
it should "forward resume commands to the client" in {
implicit val kubernetes = new TestKubernetesClient
val id = ContainerId("id")
- val container = new KubernetesContainer(id, ContainerAddress("ip"),
"127.0.0.1", "docker://foo")
+ val container = new KubernetesContainer("test", id,
ContainerAddress("ip"), "127.0.0.1", "docker://foo")
await(container.resume())
kubernetes.resumes should have size 1
kubernetes.resumes(0) shouldBe id
@@ -204,7 +204,7 @@ object KubernetesClientTests {
val addr: ContainerAddress = ContainerAddress("ip")
val workerIP: String = "127.0.0.1"
val nativeContainerId: String = "docker://" + containerId.asString
- Future.successful(new KubernetesContainer(containerId, addr, workerIP,
nativeContainerId))
+ Future.successful(new KubernetesContainer("test", containerId, addr,
workerIP, nativeContainerId))
}
def rm(container: KubernetesContainer)(implicit transid: TransactionId):
Future[Unit] = {
diff --git
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index 573b581dfe..bd49e4af6b 100644
---
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -102,7 +102,7 @@ class KubernetesContainerTests
Future.successful(RunResult(intervalOf(1.millisecond),
Right(ContainerResponse(true, "", None)))),
awaitLogs: FiniteDuration = 2.seconds)(implicit kubernetes:
KubernetesApi): KubernetesContainer = {
- new KubernetesContainer(id, addr, addr.host, "docker://" + id.asString) {
+ new KubernetesContainer("test", id, addr, addr.host, "docker://" +
id.asString) {
override protected def callContainer(
path: String,
body: JsObject,
@@ -192,7 +192,7 @@ class KubernetesContainerTests
implicit val kubernetes = stub[KubernetesApi]
val id = ContainerId("id")
- val container = new KubernetesContainer(id, ContainerAddress("ip"),
"127.0.0.1", "docker://foo")
+ val container = new KubernetesContainer("test", id,
ContainerAddress("ip"), "127.0.0.1", "docker://foo")
container.destroy()
diff --git
a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
index 287c20b61a..6fb0754c8a 100644
---
a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
@@ -98,6 +98,7 @@ class DockerToActivationLogStoreTests extends FlatSpec with
Matchers with WskAct
}
class TestContainer(lines: Source[ByteString, Any],
+ var _name: String = "test",
val id: ContainerId = ContainerId("test"),
val addr: ContainerAddress = ContainerAddress("test",
1234))(implicit val ec: ExecutionContext,
val logging: Logging)
@@ -108,5 +109,7 @@ class DockerToActivationLogStoreTests extends FlatSpec with
Matchers with WskAct
def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid:
TransactionId) = lines
override implicit protected val as: ActorSystem = actorSystem
+
+ def rename(name: String)(implicit transid: TransactionId): Future[Unit] =
???
}
}
diff --git
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 920d5358b6..d2d9497aa8 100644
---
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -908,6 +908,7 @@ class ContainerProxyTests
* Implements all the good cases of a perfect run to facilitate error case
overriding.
*/
class TestContainer extends Container {
+ protected var _name: String = "test"
protected val id = ContainerId("testcontainer")
protected val addr = ContainerAddress("0.0.0.0")
protected implicit val logging: Logging = log
@@ -957,5 +958,8 @@ class ContainerProxyTests
Future.successful((runInterval, ActivationResponse.success()))
}
def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid:
TransactionId): Source[ByteString, Any] = ???
+ override def rename(name: String)(implicit transid: TransactionId):
Future[Unit] =
+ // rename currently not supported
+ Future.successful(Unit)
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services