This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f5177d  kubernetes: fix exception handing when api-server connection 
fails (#4918)
2f5177d is described below

commit 2f5177d079f440cae3db0a5ecb186d1e641e32b3
Author: tysonnorris <[email protected]>
AuthorDate: Thu Jun 11 13:31:49 2020 -0700

    kubernetes: fix exception handing when api-server connection fails (#4918)
---
 .../kubernetes/KubernetesClient.scala              |  73 ++++++++------
 .../kubernetes/KubernetesContainer.scala           |   3 +
 .../kubernetes/test/Fabric8ClientTests.scala       | 105 +++++++++++++++++++++
 .../kubernetes/test/KubeClientSupport.scala        |  18 +++-
 4 files changed, 165 insertions(+), 34 deletions(-)

diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index 3dea020..1f47ad1 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -81,6 +81,11 @@ case class KubernetesPodReadyTimeoutException(timeout: 
FiniteDuration)
     extends Exception(s"Pod readiness timed out after ${timeout.toSeconds}s")
 
 /**
+ * Exception to indicate a pod could not be created at the apiserver.
+ */
+case class KubernetesPodApiException(e: Throwable) extends Exception(s"Pod was 
not created at apiserver: ${e}")
+
+/**
  * Configuration for node affinity for the pods that execute user action 
containers
  * The key,value pair should match the <key,value> pair with which the invoker 
worker nodes
  * are labeled in the Kubernetes cluster.  The default pair is 
<openwhisk-role,invoker>,
@@ -110,14 +115,15 @@ case class KubernetesClientConfig(timeouts: 
KubernetesClientTimeoutConfig,
  * You only need one instance (and you shouldn't get more).
  */
 class KubernetesClient(
-  config: KubernetesClientConfig = 
loadConfigOrThrow[KubernetesClientConfig](ConfigKeys.kubernetes))(
-  executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
+  config: KubernetesClientConfig = 
loadConfigOrThrow[KubernetesClientConfig](ConfigKeys.kubernetes),
+  testClient: Option[DefaultKubernetesClient] = None)(executionContext: 
ExecutionContext)(implicit log: Logging,
+                                                                               
           as: ActorSystem)
     extends KubernetesApi
     with ProcessRunner {
   implicit protected val ec = executionContext
   implicit protected val am = ActorMaterializer()
   implicit protected val scheduler = as.scheduler
-  implicit protected val kubeRestClient = {
+  implicit protected val kubeRestClient = testClient.getOrElse {
     val configBuilder = new ConfigBuilder()
       .withConnectionTimeout(config.timeouts.logs.toMillis.toInt)
       .withRequestTimeout(config.timeouts.logs.toMillis.toInt)
@@ -145,7 +151,7 @@ class KubernetesClient(
       logLevel = akka.event.Logging.InfoLevel)
 
     //create the pod; catch any failure to end the transaction timer
-    val createdPod = try {
+    Try {
       val created = kubeRestClient.pods.inNamespace(namespace).create(pod)
       pdb.map(
         p =>
@@ -154,36 +160,41 @@ class KubernetesClient(
             .withName(name)
             .create(p))
       created
-    } catch {
-      case e: Throwable =>
+    } match {
+      case Failure(e) =>
+        //call to api-server failed
         transid.failed(this, start, s"Failed create pod for '$name': 
${e.getClass} - ${e.getMessage}", ErrorLevel)
-        throw e
-    }
-    //wait for the pod to become ready; catch any failure to end the 
transaction timer
-    waitForPod(namespace, createdPod, start.start, config.timeouts.run)
-      .map { readyPod =>
-        transid.finished(this, start, logLevel = InfoLevel)
-        toContainer(readyPod)
-      }
-      .recoverWith {
-        case e =>
-          transid.failed(this, start, s"Failed create pod for '$name': 
${e.getClass} - ${e.getMessage}", ErrorLevel)
-          //log pod events to diagnose pod readiness failures
-          val podEvents = kubeRestClient.events
-            .inNamespace(namespace)
-            .withField("involvedObject.name", name)
-            .list()
-            .getItems
-            .asScala
-          if (podEvents.isEmpty) {
-            log.info(this, s"No pod events for failed pod '$name'")
-          } else {
-            podEvents.foreach { podEvent =>
-              log.info(this, s"Pod event for failed pod '$name' 
${podEvent.getLastTimestamp}: ${podEvent.getMessage}")
-            }
+        Future.failed(KubernetesPodApiException(e))
+      case Success(createdPod) => {
+        //call to api-server succeeded; wait for the pod to become ready; 
catch any failure to end the transaction timer
+        waitForPod(namespace, createdPod, start.start, config.timeouts.run)
+          .map { readyPod =>
+            transid.finished(this, start, logLevel = InfoLevel)
+            toContainer(readyPod)
+          }
+          .recoverWith {
+            case e =>
+              transid.failed(this, start, s"Failed create pod for '$name': 
${e.getClass} - ${e.getMessage}", ErrorLevel)
+              //log pod events to diagnose pod readiness failures
+              val podEvents = kubeRestClient.events
+                .inNamespace(namespace)
+                .withField("involvedObject.name", name)
+                .list()
+                .getItems
+                .asScala
+              if (podEvents.isEmpty) {
+                log.info(this, s"No pod events for failed pod '$name'")
+              } else {
+                podEvents.foreach { podEvent =>
+                  log.info(
+                    this,
+                    s"Pod event for failed pod '$name' 
${podEvent.getLastTimestamp}: ${podEvent.getMessage}")
+                }
+              }
+              Future.failed(e)
           }
-          Future.failed(new Exception(s"Failed to create pod '$name'"))
       }
+    }
   }
 
   def rm(container: KubernetesContainer)(implicit transid: TransactionId): 
Future[Unit] = {
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
index be73bd4..8c90e29 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -70,6 +70,9 @@ object KubernetesContainer {
 
     for {
       container <- kubernetes.run(podName, image, memory, environment, 
labels).recoverWith {
+        case _: KubernetesPodApiException =>
+          //apiserver call failed - this will expose a different error to users
+          
Future.failed(WhiskContainerStartupError(Messages.resourceProvisionError))
         case _ =>
           kubernetes
             .rm(podName)
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/Fabric8ClientTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/Fabric8ClientTests.scala
new file mode 100644
index 0000000..19522c9
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/Fabric8ClientTests.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.kubernetes.test
+
+import java.net.HttpURLConnection
+
+import common.{StreamLogging, WskActorSystem}
+import io.fabric8.kubernetes.api.model.{EventBuilder, PodBuilder}
+import 
io.fabric8.kubernetes.client.utils.HttpClientUtils.createHttpClientForMockServer
+import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
+import okhttp3.TlsVersion.TLS_1_0
+import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
+import org.apache.openwhisk.core.containerpool.kubernetes._
+import org.apache.openwhisk.core.entity.size._
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class Fabric8ClientTests
+    extends FlatSpec
+    with Matchers
+    with WskActorSystem
+    with ScalaFutures
+    with KubeClientSupport
+    with StreamLogging {
+  implicit val tid: TransactionId = TransactionId.testing
+  behavior of "Fabric8Client"
+  val runTimeout = 2.seconds
+  def config(configMap: Option[ConfigMapValue] = None, affinity: 
Option[KubernetesInvokerNodeAffinity] = None) =
+    KubernetesClientConfig(
+      KubernetesClientTimeoutConfig(runTimeout, 2.seconds),
+      affinity.getOrElse(KubernetesInvokerNodeAffinity(false, "k", "v")),
+      false,
+      None,
+      configMap,
+      Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)),
+      false,
+      Some(Map("POD_UID" -> "metadata.uid")),
+      None)
+
+  it should "fail activation on cold start when apiserver fails" in {
+
+    //use an invalid client to simulate broken api server
+    def defaultClient = {
+      val config = new ConfigBuilder()
+        .withMasterUrl("http://localhost:11111";) //test assumes that port 
11111 will fail in some way
+        .withTrustCerts(true)
+        .withTlsVersions(TLS_1_0)
+        .withNamespace("test")
+        .build
+      new DefaultKubernetesClient(createHttpClientForMockServer(config), 
config)
+    }
+    val restClient = new KubernetesClient(config(), testClient = 
Some(defaultClient))(executionContext)
+    restClient.run("fail", "fail", 256.MB).failed.futureValue shouldBe 
a[KubernetesPodApiException]
+  }
+  it should "fail activation on cold start when pod ready times out" in {
+    val podName = "failWait"
+    server
+      .expect()
+      .post()
+      .withPath("/api/v1/namespaces/test/pods")
+      .andReturn(
+        HttpURLConnection.HTTP_CREATED,
+        new 
PodBuilder().withNewMetadata().withName(podName).endMetadata().build())
+      .once();
+    server
+      .expect()
+      .get()
+      .withPath("/api/v1/namespaces/test/pods/failWait")
+      .andReturn(HttpURLConnection.HTTP_OK, new 
PodBuilder().withNewMetadata().withName(podName).endMetadata().build())
+      .times(3)
+    server
+      .expect()
+      .get()
+      
.withPath("/api/v1/namespaces/test/events?fieldSelector=involvedObject.name%3DfailWait")
+      .andReturn(
+        HttpURLConnection.HTTP_OK,
+        new 
EventBuilder().withNewMetadata().withName(podName).endMetadata().build())
+      .once
+
+    implicit val patienceConfig = PatienceConfig(timeout = runTimeout + 
1.seconds, interval = 0.5.seconds)
+
+    val restClient = new KubernetesClient(config(), testClient = 
Some(kubeClient))(executionContext)
+    restClient.run(podName, "anyimage", 256.MB).failed.futureValue shouldBe 
a[KubernetesPodReadyTimeoutException]
+  }
+}
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubeClientSupport.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubeClientSupport.scala
index 0a64cd5..b45360d 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubeClientSupport.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubeClientSupport.scala
@@ -19,7 +19,9 @@ package 
org.apache.openwhisk.core.containerpool.kubernetes.test
 
 import common.StreamLogging
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer
+import 
io.fabric8.kubernetes.client.utils.HttpClientUtils.createHttpClientForMockServer
 import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
+import okhttp3.TlsVersion.TLS_1_0
 import org.scalatest.{BeforeAndAfterAll, Suite, TestSuite}
 
 import scala.concurrent.duration._
@@ -29,11 +31,21 @@ trait KubeClientSupport extends TestSuite with 
BeforeAndAfterAll with StreamLogg
 
   protected def useMockServer = true
 
+  val server = new KubernetesMockServer(false)
+
   protected lazy val (kubeClient, closeable) = {
     if (useMockServer) {
-      val server = new KubernetesMockServer(false)
       server.init()
-      (server.createClient(), () => server.destroy())
+      def defaultClient = {
+        val config = new ConfigBuilder()
+          .withMasterUrl(server.url("/"))
+          .withTrustCerts(true)
+          .withTlsVersions(TLS_1_0)
+          .withNamespace("test")
+          .build
+        new DefaultKubernetesClient(createHttpClientForMockServer(config), 
config)
+      }
+      (defaultClient, () => server.destroy())
     } else {
       val client = new DefaultKubernetesClient(
         new ConfigBuilder()
@@ -53,7 +65,7 @@ trait KubeClientSupport extends TestSuite with 
BeforeAndAfterAll with StreamLogg
     super.beforeAll()
   }
 
-  override protected def afterAll(): Unit = {
+  override def afterAll(): Unit = {
     super.afterAll()
     closeable.apply()
   }

Reply via email to