This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 2f5177d kubernetes: fix exception handing when api-server connection
fails (#4918)
2f5177d is described below
commit 2f5177d079f440cae3db0a5ecb186d1e641e32b3
Author: tysonnorris <[email protected]>
AuthorDate: Thu Jun 11 13:31:49 2020 -0700
kubernetes: fix exception handing when api-server connection fails (#4918)
---
.../kubernetes/KubernetesClient.scala | 73 ++++++++------
.../kubernetes/KubernetesContainer.scala | 3 +
.../kubernetes/test/Fabric8ClientTests.scala | 105 +++++++++++++++++++++
.../kubernetes/test/KubeClientSupport.scala | 18 +++-
4 files changed, 165 insertions(+), 34 deletions(-)
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index 3dea020..1f47ad1 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -81,6 +81,11 @@ case class KubernetesPodReadyTimeoutException(timeout:
FiniteDuration)
extends Exception(s"Pod readiness timed out after ${timeout.toSeconds}s")
/**
+ * Exception to indicate a pod could not be created at the apiserver.
+ */
+case class KubernetesPodApiException(e: Throwable) extends Exception(s"Pod was
not created at apiserver: ${e}")
+
+/**
* Configuration for node affinity for the pods that execute user action
containers
* The key,value pair should match the <key,value> pair with which the invoker
worker nodes
* are labeled in the Kubernetes cluster. The default pair is
<openwhisk-role,invoker>,
@@ -110,14 +115,15 @@ case class KubernetesClientConfig(timeouts:
KubernetesClientTimeoutConfig,
* You only need one instance (and you shouldn't get more).
*/
class KubernetesClient(
- config: KubernetesClientConfig =
loadConfigOrThrow[KubernetesClientConfig](ConfigKeys.kubernetes))(
- executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
+ config: KubernetesClientConfig =
loadConfigOrThrow[KubernetesClientConfig](ConfigKeys.kubernetes),
+ testClient: Option[DefaultKubernetesClient] = None)(executionContext:
ExecutionContext)(implicit log: Logging,
+
as: ActorSystem)
extends KubernetesApi
with ProcessRunner {
implicit protected val ec = executionContext
implicit protected val am = ActorMaterializer()
implicit protected val scheduler = as.scheduler
- implicit protected val kubeRestClient = {
+ implicit protected val kubeRestClient = testClient.getOrElse {
val configBuilder = new ConfigBuilder()
.withConnectionTimeout(config.timeouts.logs.toMillis.toInt)
.withRequestTimeout(config.timeouts.logs.toMillis.toInt)
@@ -145,7 +151,7 @@ class KubernetesClient(
logLevel = akka.event.Logging.InfoLevel)
//create the pod; catch any failure to end the transaction timer
- val createdPod = try {
+ Try {
val created = kubeRestClient.pods.inNamespace(namespace).create(pod)
pdb.map(
p =>
@@ -154,36 +160,41 @@ class KubernetesClient(
.withName(name)
.create(p))
created
- } catch {
- case e: Throwable =>
+ } match {
+ case Failure(e) =>
+ //call to api-server failed
transid.failed(this, start, s"Failed create pod for '$name':
${e.getClass} - ${e.getMessage}", ErrorLevel)
- throw e
- }
- //wait for the pod to become ready; catch any failure to end the
transaction timer
- waitForPod(namespace, createdPod, start.start, config.timeouts.run)
- .map { readyPod =>
- transid.finished(this, start, logLevel = InfoLevel)
- toContainer(readyPod)
- }
- .recoverWith {
- case e =>
- transid.failed(this, start, s"Failed create pod for '$name':
${e.getClass} - ${e.getMessage}", ErrorLevel)
- //log pod events to diagnose pod readiness failures
- val podEvents = kubeRestClient.events
- .inNamespace(namespace)
- .withField("involvedObject.name", name)
- .list()
- .getItems
- .asScala
- if (podEvents.isEmpty) {
- log.info(this, s"No pod events for failed pod '$name'")
- } else {
- podEvents.foreach { podEvent =>
- log.info(this, s"Pod event for failed pod '$name'
${podEvent.getLastTimestamp}: ${podEvent.getMessage}")
- }
+ Future.failed(KubernetesPodApiException(e))
+ case Success(createdPod) => {
+ //call to api-server succeeded; wait for the pod to become ready;
catch any failure to end the transaction timer
+ waitForPod(namespace, createdPod, start.start, config.timeouts.run)
+ .map { readyPod =>
+ transid.finished(this, start, logLevel = InfoLevel)
+ toContainer(readyPod)
+ }
+ .recoverWith {
+ case e =>
+ transid.failed(this, start, s"Failed create pod for '$name':
${e.getClass} - ${e.getMessage}", ErrorLevel)
+ //log pod events to diagnose pod readiness failures
+ val podEvents = kubeRestClient.events
+ .inNamespace(namespace)
+ .withField("involvedObject.name", name)
+ .list()
+ .getItems
+ .asScala
+ if (podEvents.isEmpty) {
+ log.info(this, s"No pod events for failed pod '$name'")
+ } else {
+ podEvents.foreach { podEvent =>
+ log.info(
+ this,
+ s"Pod event for failed pod '$name'
${podEvent.getLastTimestamp}: ${podEvent.getMessage}")
+ }
+ }
+ Future.failed(e)
}
- Future.failed(new Exception(s"Failed to create pod '$name'"))
}
+ }
}
def rm(container: KubernetesContainer)(implicit transid: TransactionId):
Future[Unit] = {
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
index be73bd4..8c90e29 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -70,6 +70,9 @@ object KubernetesContainer {
for {
container <- kubernetes.run(podName, image, memory, environment,
labels).recoverWith {
+ case _: KubernetesPodApiException =>
+ //apiserver call failed - this will expose a different error to users
+
Future.failed(WhiskContainerStartupError(Messages.resourceProvisionError))
case _ =>
kubernetes
.rm(podName)
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/Fabric8ClientTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/Fabric8ClientTests.scala
new file mode 100644
index 0000000..19522c9
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/Fabric8ClientTests.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.kubernetes.test
+
+import java.net.HttpURLConnection
+
+import common.{StreamLogging, WskActorSystem}
+import io.fabric8.kubernetes.api.model.{EventBuilder, PodBuilder}
+import
io.fabric8.kubernetes.client.utils.HttpClientUtils.createHttpClientForMockServer
+import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
+import okhttp3.TlsVersion.TLS_1_0
+import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
+import org.apache.openwhisk.core.containerpool.kubernetes._
+import org.apache.openwhisk.core.entity.size._
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class Fabric8ClientTests
+ extends FlatSpec
+ with Matchers
+ with WskActorSystem
+ with ScalaFutures
+ with KubeClientSupport
+ with StreamLogging {
+ implicit val tid: TransactionId = TransactionId.testing
+ behavior of "Fabric8Client"
+ val runTimeout = 2.seconds
+ def config(configMap: Option[ConfigMapValue] = None, affinity:
Option[KubernetesInvokerNodeAffinity] = None) =
+ KubernetesClientConfig(
+ KubernetesClientTimeoutConfig(runTimeout, 2.seconds),
+ affinity.getOrElse(KubernetesInvokerNodeAffinity(false, "k", "v")),
+ false,
+ None,
+ configMap,
+ Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)),
+ false,
+ Some(Map("POD_UID" -> "metadata.uid")),
+ None)
+
+ it should "fail activation on cold start when apiserver fails" in {
+
+ //use an invalid client to simulate broken api server
+ def defaultClient = {
+ val config = new ConfigBuilder()
+ .withMasterUrl("http://localhost:11111") //test assumes that port
11111 will fail in some way
+ .withTrustCerts(true)
+ .withTlsVersions(TLS_1_0)
+ .withNamespace("test")
+ .build
+ new DefaultKubernetesClient(createHttpClientForMockServer(config),
config)
+ }
+ val restClient = new KubernetesClient(config(), testClient =
Some(defaultClient))(executionContext)
+ restClient.run("fail", "fail", 256.MB).failed.futureValue shouldBe
a[KubernetesPodApiException]
+ }
+ it should "fail activation on cold start when pod ready times out" in {
+ val podName = "failWait"
+ server
+ .expect()
+ .post()
+ .withPath("/api/v1/namespaces/test/pods")
+ .andReturn(
+ HttpURLConnection.HTTP_CREATED,
+ new
PodBuilder().withNewMetadata().withName(podName).endMetadata().build())
+ .once();
+ server
+ .expect()
+ .get()
+ .withPath("/api/v1/namespaces/test/pods/failWait")
+ .andReturn(HttpURLConnection.HTTP_OK, new
PodBuilder().withNewMetadata().withName(podName).endMetadata().build())
+ .times(3)
+ server
+ .expect()
+ .get()
+
.withPath("/api/v1/namespaces/test/events?fieldSelector=involvedObject.name%3DfailWait")
+ .andReturn(
+ HttpURLConnection.HTTP_OK,
+ new
EventBuilder().withNewMetadata().withName(podName).endMetadata().build())
+ .once
+
+ implicit val patienceConfig = PatienceConfig(timeout = runTimeout +
1.seconds, interval = 0.5.seconds)
+
+ val restClient = new KubernetesClient(config(), testClient =
Some(kubeClient))(executionContext)
+ restClient.run(podName, "anyimage", 256.MB).failed.futureValue shouldBe
a[KubernetesPodReadyTimeoutException]
+ }
+}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubeClientSupport.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubeClientSupport.scala
index 0a64cd5..b45360d 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubeClientSupport.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubeClientSupport.scala
@@ -19,7 +19,9 @@ package
org.apache.openwhisk.core.containerpool.kubernetes.test
import common.StreamLogging
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer
+import
io.fabric8.kubernetes.client.utils.HttpClientUtils.createHttpClientForMockServer
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
+import okhttp3.TlsVersion.TLS_1_0
import org.scalatest.{BeforeAndAfterAll, Suite, TestSuite}
import scala.concurrent.duration._
@@ -29,11 +31,21 @@ trait KubeClientSupport extends TestSuite with
BeforeAndAfterAll with StreamLogg
protected def useMockServer = true
+ val server = new KubernetesMockServer(false)
+
protected lazy val (kubeClient, closeable) = {
if (useMockServer) {
- val server = new KubernetesMockServer(false)
server.init()
- (server.createClient(), () => server.destroy())
+ def defaultClient = {
+ val config = new ConfigBuilder()
+ .withMasterUrl(server.url("/"))
+ .withTrustCerts(true)
+ .withTlsVersions(TLS_1_0)
+ .withNamespace("test")
+ .build
+ new DefaultKubernetesClient(createHttpClientForMockServer(config),
config)
+ }
+ (defaultClient, () => server.destroy())
} else {
val client = new DefaultKubernetesClient(
new ConfigBuilder()
@@ -53,7 +65,7 @@ trait KubeClientSupport extends TestSuite with
BeforeAndAfterAll with StreamLogg
super.beforeAll()
}
- override protected def afterAll(): Unit = {
+ override def afterAll(): Unit = {
super.afterAll()
closeable.apply()
}