dgrove-oss commented on a change in pull request #3338: implement
suspend/resume for KubernetesContainer
URL:
https://github.com/apache/incubator-openwhisk/pull/3338#discussion_r171450061
##########
File path:
core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
##########
@@ -99,43 +117,144 @@ class KubernetesClient(
}
protected val kubectlCmd = Seq(findKubectlCmd)
- def run(name: String, image: String, args: Seq[String] = Seq.empty[String])(
- implicit transid: TransactionId): Future[ContainerId] = {
- runCmd(Seq("run", name, s"--image=$image") ++ args, timeouts.run)
- .map(_ => ContainerId(name))
- }
+ def run(name: String,
+ image: String,
+ memory: ByteSize = 256.MB,
+ environment: Map[String, String] = Map.empty,
+ labels: Map[String, String] = Map.empty)(implicit transid:
TransactionId): Future[KubernetesContainer] = {
+
+ val envVars = environment.map {
+ case (key, value) => new
EnvVarBuilder().withName(key).withValue(value).build()
+ }.toSeq
+
+ val pod = new PodBuilder()
+ .withNewMetadata()
+ .withName(name)
+ .addToLabels("name", name)
+ .addToLabels(labels.asJava)
+ .endMetadata()
+ .withNewSpec()
+ .withRestartPolicy("Always")
+ .addNewContainer()
+ .withNewResources()
+ .withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava)
+ .endResources()
+ .withName("user-action")
+ .withImage(image)
+ .withEnv(envVars.asJava)
+ .addNewPort()
+ .withContainerPort(8080)
+ .withName("action")
+ .endPort()
+ .endContainer()
+ .endSpec()
+ .build()
+
+ kubeRestClient.pods.inNamespace(config.namespace).create(pod)
- def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId):
Future[ContainerAddress] = {
Future {
blocking {
- val pod =
-
kubeRestClient.pods().withName(id.asString).waitUntilReady(timeouts.inspect.length,
timeouts.inspect.unit)
- ContainerAddress(pod.getStatus().getPodIP())
+ val createdPod = kubeRestClient.pods
+ .inNamespace(config.namespace)
+ .withName(name)
+ .waitUntilReady(config.timeouts.run.length, config.timeouts.run.unit)
+ toContainer(createdPod)
}
}.recoverWith {
case e =>
- log.error(this, s"Failed to get IP of Pod '${id.asString}' within
timeout: ${e.getClass} - ${e.getMessage}")
- Future.failed(new Exception(s"Failed to get IP of Pod
'${id.asString}'"))
+ log.error(this, s"Failed create pod for '$name': ${e.getClass} -
${e.getMessage}")
+ Future.failed(new Exception(s"Failed to create pod '$name'"))
+ }
+ }
+
+ def rm(container: KubernetesContainer)(implicit transid: TransactionId):
Future[Unit] = {
+ runCmd(Seq("delete", "--now", "pod", container.id.asString),
config.timeouts.rm).map(_ => ())
+ }
+
+ def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit
transid: TransactionId): Future[Unit] = {
+ if (ensureUnpaused && config.invokerAgent.enabled) {
+ // The caller can't guarantee that every container with the label
key=value is already unpaused.
+ // Therefore we must enumerate them and ensure they are unpaused before
we attempt to delete them.
+ Future {
+ blocking {
+ kubeRestClient
+ .inNamespace(config.namespace)
+ .pods()
+ .withLabel(key, value)
+ .list()
+ .getItems
+ .asScala
+ .map { pod =>
+ val container = toContainer(pod)
+ container
+ .resume()
+ .recover { case _ => () } // Ignore errors; it is possible the
container was not actually suspended.
+ .map(_ => rm(container))
+ }
+ }
+ }.flatMap(futures =>
+ Future
+ .sequence(futures)
+ .map(_ => ()))
+ } else {
+ runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"),
config.timeouts.rm).map(_ => ())
}
}
- def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
- runCmd(Seq("delete", "--now", "pod", id.asString), timeouts.rm).map(_ =>
())
+ def suspend(container: KubernetesContainer)(implicit transid:
TransactionId): Future[Unit] = {
+ if (config.invokerAgent.enabled) {
+ agentCommand("suspend", container)
+ .map { response =>
+ response.discardEntityBytes()
+ }
+ } else {
+ Future.successful({})
+ }
+ }
- def rm(key: String, value: String)(implicit transid: TransactionId):
Future[Unit] =
- runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"),
timeouts.rm).map(_ => ())
+ def resume(container: KubernetesContainer)(implicit transid: TransactionId):
Future[Unit] = {
+ if (config.invokerAgent.enabled) {
+ agentCommand("resume", container)
+ .map { response =>
+ response.discardEntityBytes()
+ }
+ } else {
+ Future.successful({})
+ }
+ }
- def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel:
Boolean = false)(
+ def logs(container: KubernetesContainer, sinceTime: Option[Instant],
waitForSentinel: Boolean = false)(
implicit transid: TransactionId): Source[TypedLogLine, Any] = {
log.debug(this, "Parsing logs from Kubernetes Graph Stage?")
Source
- .fromGraph(new KubernetesRestLogSourceStage(id, sinceTime,
waitForSentinel))
+ .fromGraph(new KubernetesRestLogSourceStage(container.id, sinceTime,
waitForSentinel))
.log("foobar")
}
+ private def toContainer(pod: Pod): KubernetesContainer = {
+ val id = ContainerId(pod.getMetadata.getName)
+ val addr = ContainerAddress(pod.getStatus.getPodIP)
+ val workerIP = pod.getStatus.getHostIP
+ // Extract the native (docker or containerd) containerId for the container
+ // By convention, kubernetes adds a docker:// prefix when using docker as
the low-level container engine
+ val nativeContainerId =
pod.getStatus.getContainerStatuses.get(0).getContainerID.stripPrefix("docker://")
Review comment:
not a bad idea, but let's defer that to a later PR. I'd like to get this
one merged quickly, so I can submit the logging implementation I have that
builds on it. Then can rethink how we pass the nativeContainerId to the
invokerAgent.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services