This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 995387f  change the wait logic to use akka after pattern instead of 
fabric8 client waitUntilReady. (#4857)
995387f is described below

commit 995387f78c07be3da11bd2be34f7dd14f386e0c7
Author: tysonnorris <[email protected]>
AuthorDate: Thu Mar 12 09:41:02 2020 -0700

    change the wait logic to use akka after pattern instead of fabric8 client 
waitUntilReady. (#4857)
---
 .../kubernetes/KubernetesClient.scala              | 46 ++++++++++++++++------
 1 file changed, 35 insertions(+), 11 deletions(-)

diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index 4154166..9637d2d 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -27,6 +27,7 @@ import akka.event.Logging.ErrorLevel
 import akka.event.Logging.InfoLevel
 import akka.http.scaladsl.model.Uri
 import akka.http.scaladsl.model.Uri.{Path, Query}
+import akka.pattern.after
 import akka.stream.scaladsl.Source
 import akka.stream.stage._
 import akka.stream.{ActorMaterializer, Attributes, Outlet, SourceShape}
@@ -69,6 +70,12 @@ case class KubernetesClientTimeoutConfig(run: 
FiniteDuration, logs: FiniteDurati
 case class KubernetesCpuScalingConfig(millicpus: Int, memory: ByteSize, 
maxMillicpus: Int)
 
 /**
+ * Exception to indicate a pod took too long to become ready.
+ */
+case class KubernetesPodReadyTimeoutException(timeout: FiniteDuration)
+    extends Exception(s"Pod readiness timed out after ${timeout.toSeconds}s")
+
+/**
  * Configuration for node affinity for the pods that execute user action 
containers
  * The key,value pair should match the <key,value> pair with which the invoker 
worker nodes
  * are labeled in the Kubernetes cluster.  The default pair is 
<openwhisk-role,invoker>,
@@ -102,6 +109,7 @@ class KubernetesClient(
     with ProcessRunner {
   implicit protected val ec = executionContext
   implicit protected val am = ActorMaterializer()
+  implicit protected val scheduler = as.scheduler
   implicit protected val kubeRestClient = {
     val configBuilder = new ConfigBuilder()
       .withConnectionTimeout(config.timeouts.logs.toMillis.toInt)
@@ -130,7 +138,7 @@ class KubernetesClient(
       logLevel = akka.event.Logging.InfoLevel)
 
     //create the pod; catch any failure to end the transaction timer
-    try {
+    val createdPod = try {
       kubeRestClient.pods.inNamespace(namespace).create(pod)
       pdb.map(
         p =>
@@ -144,17 +152,10 @@ class KubernetesClient(
         throw e
     }
     //wait for the pod to become ready; catch any failure to end the 
transaction timer
-    Future {
-      blocking {
-        val createdPod = kubeRestClient.pods
-          .inNamespace(namespace)
-          .withName(name)
-          .waitUntilReady(config.timeouts.run.length, config.timeouts.run.unit)
-        toContainer(createdPod)
-      }
-    }.map { container =>
+    waitForPod(namespace, createdPod, start.start, config.timeouts.run)
+      .map { readyPod =>
         transid.finished(this, start, logLevel = InfoLevel)
-        container
+        toContainer(readyPod)
       }
       .recoverWith {
         case e =>
@@ -278,6 +279,29 @@ class KubernetesClient(
     implicit val kubernetes = this
     new KubernetesContainer(id, addr, workerIP, nativeContainerId, portFwd)
   }
+  // check for ready status every 1 second until timeout (minus the start 
time, which is the time for the pod create call) has past
+  private def waitForPod(namespace: String,
+                         pod: Pod,
+                         start: Instant,
+                         timeout: FiniteDuration,
+                         deadlineOpt: Option[Deadline] = None): Future[Pod] = {
+    val readyPod = kubeRestClient
+      .pods()
+      .inNamespace(namespace)
+      .withName(pod.getMetadata.getName)
+    val deadline = deadlineOpt.getOrElse((timeout - 
(System.currentTimeMillis() - start.toEpochMilli).millis).fromNow)
+    if (!readyPod.isReady) {
+      if (deadline.isOverdue()) {
+        Future.failed(KubernetesPodReadyTimeoutException(timeout))
+      } else {
+        after(1.seconds, scheduler) {
+          waitForPod(namespace, pod, start, timeout, Some(deadline))
+        }
+      }
+    } else {
+      Future.successful(readyPod.get())
+    }
+  }
 }
 
 object KubernetesClient {

Reply via email to