This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 995387f change the wait logic to use akka after pattern instead of
fabric8 client waitUntilReady. (#4857)
995387f is described below
commit 995387f78c07be3da11bd2be34f7dd14f386e0c7
Author: tysonnorris <[email protected]>
AuthorDate: Thu Mar 12 09:41:02 2020 -0700
change the wait logic to use akka after pattern instead of fabric8 client
waitUntilReady. (#4857)
---
.../kubernetes/KubernetesClient.scala | 46 ++++++++++++++++------
1 file changed, 35 insertions(+), 11 deletions(-)
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index 4154166..9637d2d 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -27,6 +27,7 @@ import akka.event.Logging.ErrorLevel
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.Uri.{Path, Query}
+import akka.pattern.after
import akka.stream.scaladsl.Source
import akka.stream.stage._
import akka.stream.{ActorMaterializer, Attributes, Outlet, SourceShape}
@@ -69,6 +70,12 @@ case class KubernetesClientTimeoutConfig(run:
FiniteDuration, logs: FiniteDurati
case class KubernetesCpuScalingConfig(millicpus: Int, memory: ByteSize,
maxMillicpus: Int)
/**
+ * Exception to indicate a pod took too long to become ready.
+ */
+case class KubernetesPodReadyTimeoutException(timeout: FiniteDuration)
+ extends Exception(s"Pod readiness timed out after ${timeout.toSeconds}s")
+
+/**
* Configuration for node affinity for the pods that execute user action
containers
* The key,value pair should match the <key,value> pair with which the invoker
worker nodes
* are labeled in the Kubernetes cluster. The default pair is
<openwhisk-role,invoker>,
@@ -102,6 +109,7 @@ class KubernetesClient(
with ProcessRunner {
implicit protected val ec = executionContext
implicit protected val am = ActorMaterializer()
+ implicit protected val scheduler = as.scheduler
implicit protected val kubeRestClient = {
val configBuilder = new ConfigBuilder()
.withConnectionTimeout(config.timeouts.logs.toMillis.toInt)
@@ -130,7 +138,7 @@ class KubernetesClient(
logLevel = akka.event.Logging.InfoLevel)
//create the pod; catch any failure to end the transaction timer
- try {
+ val createdPod = try {
kubeRestClient.pods.inNamespace(namespace).create(pod)
pdb.map(
p =>
@@ -144,17 +152,10 @@ class KubernetesClient(
throw e
}
//wait for the pod to become ready; catch any failure to end the
transaction timer
- Future {
- blocking {
- val createdPod = kubeRestClient.pods
- .inNamespace(namespace)
- .withName(name)
- .waitUntilReady(config.timeouts.run.length, config.timeouts.run.unit)
- toContainer(createdPod)
- }
- }.map { container =>
+ waitForPod(namespace, createdPod, start.start, config.timeouts.run)
+ .map { readyPod =>
transid.finished(this, start, logLevel = InfoLevel)
- container
+ toContainer(readyPod)
}
.recoverWith {
case e =>
@@ -278,6 +279,29 @@ class KubernetesClient(
implicit val kubernetes = this
new KubernetesContainer(id, addr, workerIP, nativeContainerId, portFwd)
}
+ // check for ready status every 1 second until timeout (minus the start
time, which is the time for the pod create call) has past
+ private def waitForPod(namespace: String,
+ pod: Pod,
+ start: Instant,
+ timeout: FiniteDuration,
+ deadlineOpt: Option[Deadline] = None): Future[Pod] = {
+ val readyPod = kubeRestClient
+ .pods()
+ .inNamespace(namespace)
+ .withName(pod.getMetadata.getName)
+ val deadline = deadlineOpt.getOrElse((timeout -
(System.currentTimeMillis() - start.toEpochMilli).millis).fromNow)
+ if (!readyPod.isReady) {
+ if (deadline.isOverdue()) {
+ Future.failed(KubernetesPodReadyTimeoutException(timeout))
+ } else {
+ after(1.seconds, scheduler) {
+ waitForPod(namespace, pod, start, timeout, Some(deadline))
+ }
+ }
+ } else {
+ Future.successful(readyPod.get())
+ }
+ }
}
object KubernetesClient {