This is an automated email from the ASF dual-hosted git repository.
chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 8bf79ef Pod template support with KubernetesContainerFactory (#4690)
8bf79ef is described below
commit 8bf79efa0c5fa5f83a1bf60604f8c85abcd32ac2
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Thu Oct 17 10:46:57 2019 +0530
Pod template support with KubernetesContainerFactory (#4690)
* Refactor existing pod builder logic to a separate class and add tests
* Add support for extending template
* Log the pod spec yaml if debug mode is on
* Document pod template
* Integration test for template support
* Implement a config map value reader which can be passed literal string or
file reference
* Skip test run if KUBECONFIG is not defined
---
common/scala/build.gradle | 2 +-
.../apache/openwhisk/common/ConfigMapValue.scala | 46 +++++++
core/invoker/src/main/resources/application.conf | 6 +
.../kubernetes/KubernetesClient.scala | 61 ++--------
.../containerpool/kubernetes/WhiskPodBuilder.scala | 119 ++++++++++++++++++
settings.gradle | 3 +-
tests/build.gradle | 2 +
.../openwhisk/common/ConfigMapValueTests.scala | 70 +++++++++++
.../kubernetes/test/KubeClientSupport.scala | 60 +++++++++
.../kubernetes/test/WhiskPodBuilderTests.scala | 134 +++++++++++++++++++++
.../openwhisk/standalone/StandaloneKCFTests.scala | 50 ++++++--
11 files changed, 491 insertions(+), 62 deletions(-)
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 4a6bced..f446072 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -59,7 +59,7 @@ dependencies {
compile ('com.fasterxml.uuid:java-uuid-generator:3.1.3')
compile 'com.github.ben-manes.caffeine:caffeine:2.6.2'
compile 'com.google.code.findbugs:jsr305:3.0.2'
- compile 'io.fabric8:kubernetes-client:4.4.2'
+ compile "io.fabric8:kubernetes-client:${gradle.kube_client.version}"
compile ('io.kamon:kamon-core_2.12:1.1.3') {
exclude group: 'com.lihaoyi'
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/common/ConfigMapValue.scala
b/common/scala/src/main/scala/org/apache/openwhisk/common/ConfigMapValue.scala
new file mode 100644
index 0000000..4ff7e7f
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/common/ConfigMapValue.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common
+
+import java.io.File
+import java.net.URI
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.commons.io.FileUtils
+import pureconfig.ConfigReader
+import pureconfig.ConvertHelpers.catchReadError
+
+class ConfigMapValue private (val value: String)
+
+object ConfigMapValue {
+
+ /**
+ * Checks if the value is a file url like `file:/etc/config/foo.yaml` then
treat it as a file reference
+ * and read its content otherwise consider it as a literal value
+ */
+ def apply(config: String): ConfigMapValue = {
+ val value = if (config.startsWith("file:")) {
+ val uri = new URI(config)
+ val file = new File(uri)
+ FileUtils.readFileToString(file, UTF_8)
+ } else config
+ new ConfigMapValue(value)
+ }
+
+ implicit val reader: ConfigReader[ConfigMapValue] =
ConfigReader.fromString[ConfigMapValue](catchReadError(apply))
+}
diff --git a/core/invoker/src/main/resources/application.conf
b/core/invoker/src/main/resources/application.conf
index 9afba4d..b518756 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -81,6 +81,12 @@ whisk {
# Enables forwarding to remote port via a local random port. This mode is
mostly useful
# for development via Standalone mode
port-forwarding-enabled = false
+
+ # Pod template used as base for Action Pods created. It can be either
+ # 1. Reference to file `file:/path/to/template.yml`
+ # 2. OR yaml formatted multi line string. See multi line config support
https://github.com/lightbend/config/blob/master/HOCON.md#multi-line-strings
+ #
+ #pod-template =
}
# Timeouts for runc commands. Set to "Inf" to disable timeout.
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index 9f24938..87a9a59 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -30,10 +30,11 @@ import akka.stream.stage._
import akka.stream.{ActorMaterializer, Attributes, Outlet, SourceShape}
import akka.util.ByteString
import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.utils.Serialization
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
import okhttp3.{Call, Callback, Request, Response}
import okio.BufferedSource
-import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.common.{ConfigMapValue, Logging, TransactionId}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.containerpool.docker.ProcessRunner
import org.apache.openwhisk.core.containerpool.{ContainerAddress, ContainerId}
@@ -44,7 +45,6 @@ import spray.json.DefaultJsonProtocol._
import spray.json._
import scala.annotation.tailrec
-import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.{blocking, ExecutionContext, Future}
@@ -75,7 +75,8 @@ case class KubernetesInvokerNodeAffinity(enabled: Boolean,
key: String, value: S
case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig,
invokerAgent: KubernetesInvokerAgentConfig,
userPodNodeAffinity:
KubernetesInvokerNodeAffinity,
- portForwardingEnabled: Boolean)
+ portForwardingEnabled: Boolean,
+ podTemplate: Option[ConfigMapValue] = None)
/**
* Serves as an interface to the Kubernetes API by proxying its REST API
and/or invoking the kubectl CLI.
@@ -98,62 +99,18 @@ class KubernetesClient(
.withRequestTimeout(config.timeouts.logs.toMillis.toInt)
.build())
+ private val podBuilder = new WhiskPodBuilder(kubeRestClient,
config.userPodNodeAffinity, config.podTemplate)
+
def run(name: String,
image: String,
memory: ByteSize = 256.MB,
environment: Map[String, String] = Map.empty,
labels: Map[String, String] = Map.empty)(implicit transid:
TransactionId): Future[KubernetesContainer] = {
- val envVars = environment.map {
- case (key, value) => new
EnvVarBuilder().withName(key).withValue(value).build()
- }.toSeq
-
- val podBuilder = new PodBuilder()
- .withNewMetadata()
- .withName(name)
- .addToLabels("name", name)
- .addToLabels(labels.asJava)
- .endMetadata()
- .withNewSpec()
- .withRestartPolicy("Always")
- if (config.userPodNodeAffinity.enabled) {
- val invokerNodeAffinity = new AffinityBuilder()
- .withNewNodeAffinity()
- .withNewRequiredDuringSchedulingIgnoredDuringExecution()
- .addNewNodeSelectorTerm()
- .addNewMatchExpression()
- .withKey(config.userPodNodeAffinity.key)
- .withOperator("In")
- .withValues(config.userPodNodeAffinity.value)
- .endMatchExpression()
- .endNodeSelectorTerm()
- .endRequiredDuringSchedulingIgnoredDuringExecution()
- .endNodeAffinity()
- .build()
- podBuilder.withAffinity(invokerNodeAffinity)
+ val pod = podBuilder.buildPodSpec(name, image, memory, environment, labels)
+ if (transid.meta.extraLogging) {
+ log.info(this, s"Pod spec being created\n${Serialization.asYaml(pod)}")
}
- val secContext = new SecurityContextBuilder()
- .withNewCapabilities()
- .addToDrop("NET_RAW", "NET_ADMIN")
- .endCapabilities()
- .build()
- val pod = podBuilder
- .addNewContainer()
- .withNewResources()
- .withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava)
- .endResources()
- .withSecurityContext(secContext)
- .withName("user-action")
- .withImage(image)
- .withEnv(envVars.asJava)
- .addNewPort()
- .withContainerPort(8080)
- .withName("action")
- .endPort()
- .endContainer()
- .endSpec()
- .build()
-
val namespace = kubeRestClient.getNamespace
kubeRestClient.pods.inNamespace(namespace).create(pod)
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
new file mode 100644
index 0000000..ca7627a
--- /dev/null
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.kubernetes
+
+import java.io.ByteArrayInputStream
+import java.nio.charset.StandardCharsets.UTF_8
+
+import io.fabric8.kubernetes.api.builder.Predicate
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, Pod,
PodBuilder, Quantity}
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient
+import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
+import org.apache.openwhisk.core.entity.ByteSize
+
+import scala.collection.JavaConverters._
+
+class WhiskPodBuilder(client: NamespacedKubernetesClient,
+ userPodNodeAffinity: KubernetesInvokerNodeAffinity,
+ podTemplate: Option[ConfigMapValue] = None) {
+ private val template = podTemplate.map(_.value.getBytes(UTF_8))
+ private val actionContainerName = "user-action"
+ private val actionContainerPredicate: Predicate[ContainerBuilder] = (cb) =>
cb.getName == actionContainerName
+
+ def affinityEnabled: Boolean = userPodNodeAffinity.enabled
+
+ def buildPodSpec(name: String,
+ image: String,
+ memory: ByteSize,
+ environment: Map[String, String],
+ labels: Map[String, String])(implicit transid:
TransactionId): Pod = {
+ val envVars = environment.map {
+ case (key, value) => new
EnvVarBuilder().withName(key).withValue(value).build()
+ }.toSeq
+
+ val baseBuilder = template match {
+ case Some(bytes) =>
+ new PodBuilder(loadPodSpec(bytes))
+ case None => new PodBuilder()
+ }
+
+ val pb1 = baseBuilder
+ .editOrNewMetadata()
+ .withName(name)
+ .addToLabels("name", name)
+ .addToLabels(labels.asJava)
+ .endMetadata()
+
+ val specBuilder = pb1.editOrNewSpec().withRestartPolicy("Always")
+
+ if (userPodNodeAffinity.enabled) {
+ val affinity = specBuilder
+ .editOrNewAffinity()
+ .editOrNewNodeAffinity()
+ .editOrNewRequiredDuringSchedulingIgnoredDuringExecution()
+ affinity
+ .addNewNodeSelectorTerm()
+ .addNewMatchExpression()
+ .withKey(userPodNodeAffinity.key)
+ .withOperator("In")
+ .withValues(userPodNodeAffinity.value)
+ .endMatchExpression()
+ .endNodeSelectorTerm()
+ .endRequiredDuringSchedulingIgnoredDuringExecution()
+ .endNodeAffinity()
+ .endAffinity()
+ }
+
+ val containerBuilder = if
(specBuilder.hasMatchingContainer(actionContainerPredicate)) {
+ specBuilder.editMatchingContainer(actionContainerPredicate)
+ } else specBuilder.addNewContainer()
+
+ //In container its assumed that env, port, resource limits are set
explicitly
+ //Here if any value exist in template then that would be overridden
+ containerBuilder
+ .withNewResources()
+ .withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava)
+ .endResources()
+ .withName("user-action")
+ .withImage(image)
+ .withEnv(envVars.asJava)
+ .addNewPort()
+ .withContainerPort(8080)
+ .withName("action")
+ .endPort()
+
+ //If any existing context entry is present then "update" it else add new
+ containerBuilder
+ .editOrNewSecurityContext()
+ .editOrNewCapabilities()
+ .addToDrop("NET_RAW", "NET_ADMIN")
+ .endCapabilities()
+ .endSecurityContext()
+
+ val pod = containerBuilder
+ .endContainer()
+ .endSpec()
+ .build()
+ pod
+ }
+
+ private def loadPodSpec(bytes: Array[Byte]): Pod = {
+ val resources = client.load(new ByteArrayInputStream(bytes))
+ resources.get().get(0).asInstanceOf[Pod]
+ }
+}
diff --git a/settings.gradle b/settings.gradle
index 25764ac..480daf8 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -48,4 +48,5 @@ gradle.ext.akka = [version : '2.5.22']
gradle.ext.akka_kafka = [version : '1.0.5']
gradle.ext.akka_http = [version : '10.1.8']
-gradle.ext.curator = [version:'4.0.0']
+gradle.ext.curator = [version : '4.0.0']
+gradle.ext.kube_client = [version: '4.4.2']
diff --git a/tests/build.gradle b/tests/build.gradle
index ac27820..2b62ab5 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -201,6 +201,8 @@ dependencies {
compile 'com.atlassian.oai:swagger-request-validator-core:1.4.5'
compile
"com.typesafe.akka:akka-stream-kafka-testkit_2.12:${gradle.akka_kafka.version}"
compile "com.typesafe.akka:akka-stream-testkit_2.12:${gradle.akka.version}"
+ compile "com.typesafe.akka:akka-stream-testkit_2.12:${gradle.akka.version}"
+ compile "io.fabric8:kubernetes-server-mock:${gradle.kube_client.version}"
compile "com.amazonaws:aws-java-sdk-s3:1.11.295"
diff --git
a/tests/src/test/scala/org/apache/openwhisk/common/ConfigMapValueTests.scala
b/tests/src/test/scala/org/apache/openwhisk/common/ConfigMapValueTests.scala
new file mode 100644
index 0000000..6067a28
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/common/ConfigMapValueTests.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common
+
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import com.typesafe.config.ConfigFactory
+import org.apache.commons.io.FileUtils
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+import pureconfig.loadConfigOrThrow
+
+@RunWith(classOf[JUnitRunner])
+class ConfigMapValueTests extends FlatSpec with Matchers {
+ behavior of "ConfigMapValue"
+
+ case class ValueTest(template: ConfigMapValue, count: Int)
+
+ it should "read from string" in {
+ val config = ConfigFactory.parseString("""
+ |whisk {
+ | value-test {
+ | template = "test string"
+ | count = 42
+ | }
+ |}""".stripMargin)
+
+ val valueTest = readValueTest(config)
+ valueTest.template.value shouldBe "test string"
+ }
+
+ it should "read from file reference" in {
+ val file = Files.createTempFile("whisk", null).toFile
+ FileUtils.write(file, "test string", UTF_8)
+
+ val config = ConfigFactory.parseString(s"""
+ |whisk {
+ | value-test {
+ | template = "${file.toURI}"
+ | count = 42
+ | }
+ |}""".stripMargin)
+
+ val valueTest = readValueTest(config)
+ valueTest.template.value shouldBe "test string"
+
+ file.delete()
+ }
+
+ private def readValueTest(config: com.typesafe.config.Config) = {
+ loadConfigOrThrow[ValueTest](config.getConfig("whisk.value-test"))
+ }
+}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubeClientSupport.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubeClientSupport.scala
new file mode 100644
index 0000000..0a64cd5
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubeClientSupport.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.kubernetes.test
+
+import common.StreamLogging
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer
+import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
+import org.scalatest.{BeforeAndAfterAll, Suite, TestSuite}
+
+import scala.concurrent.duration._
+
+trait KubeClientSupport extends TestSuite with BeforeAndAfterAll with
StreamLogging {
+ self: Suite =>
+
+ protected def useMockServer = true
+
+ protected lazy val (kubeClient, closeable) = {
+ if (useMockServer) {
+ val server = new KubernetesMockServer(false)
+ server.init()
+ (server.createClient(), () => server.destroy())
+ } else {
+ val client = new DefaultKubernetesClient(
+ new ConfigBuilder()
+ .withConnectionTimeout(1.minute.toMillis.toInt)
+ .withRequestTimeout(1.minute.toMillis.toInt)
+ .build())
+ (client, () => client.close())
+ }
+ }
+
+ override def beforeAll(): Unit = {
+ if (!useMockServer) {
+ val kubeconfig = sys.env.get("KUBECONFIG")
+ assume(kubeconfig.isDefined, "KUBECONFIG env must be defined")
+ println(s"Using kubeconfig from ${kubeconfig.get}")
+ }
+ super.beforeAll()
+ }
+
+ override protected def afterAll(): Unit = {
+ super.afterAll()
+ closeable.apply()
+ }
+}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
new file mode 100644
index 0000000..5c6eee2
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.kubernetes.test
+
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.utils.Serialization
+import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
+import
org.apache.openwhisk.core.containerpool.kubernetes.{KubernetesInvokerNodeAffinity,
WhiskPodBuilder}
+import org.apache.openwhisk.core.entity.size._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[JUnitRunner])
+class WhiskPodBuilderTests extends FlatSpec with Matchers with
KubeClientSupport {
+ implicit val tid: TransactionId = TransactionId.testing
+ private val testImage = "nodejs"
+ private val memLimit = 10.MB
+ private val name = "whisk"
+ private val affinity = KubernetesInvokerNodeAffinity(enabled = true,
"openwhisk-role", "invoker")
+
+ behavior of "WhiskPodBuilder"
+
+ it should "build a new pod" in {
+ val builder = new WhiskPodBuilder(kubeClient, affinity)
+ assertPodSettings(builder)
+ }
+
+ it should "extend existing pod template" in {
+ val template = """
+ |---
+ |apiVersion: "v1"
+ |kind: "Pod"
+ |metadata:
+ | annotations:
+ | my-foo : my-bar
+ | labels:
+ | my-fool : my-barv
+ | name: "testpod"
+ | namespace: whiskns
+ |spec:
+ | containers:
+ | - name: "user-action"
+ | securityContext:
+ | capabilities:
+ | drop:
+ | - "TEST_CAP"
+ | - name: "sidecar"
+ | image : "busybox"
+ |""".stripMargin
+
+ val builder = new WhiskPodBuilder(kubeClient, affinity.copy(enabled =
false), Some(ConfigMapValue(template)))
+ val pod = assertPodSettings(builder)
+
+ val ac = getActionContainer(pod)
+ ac.getSecurityContext.getCapabilities.getDrop.asScala should
contain("TEST_CAP")
+
+ val sc = pod.getSpec.getContainers.asScala.find(_.getName == "sidecar").get
+ sc.getImage shouldBe "busybox"
+
+ pod.getMetadata.getLabels.asScala.get("my-fool") shouldBe Some("my-barv")
+ pod.getMetadata.getAnnotations.asScala.get("my-foo") shouldBe
Some("my-bar")
+ pod.getMetadata.getNamespace shouldBe "whiskns"
+ }
+
+ it should "extend existing pod template with affinity" in {
+ val template = """
+ |apiVersion: "v1"
+ |kind: "Pod"
+ |spec:
+ | affinity:
+ | nodeAffinity:
+ | requiredDuringSchedulingIgnoredDuringExecution:
+ | nodeSelectorTerms:
+ | - matchExpressions:
+ | - key: "nodelabel"
+ | operator: "In"
+ | values:
+ | - "test"""".stripMargin
+
+ val builder = new WhiskPodBuilder(kubeClient, affinity.copy(enabled =
true), Some(ConfigMapValue(template)))
+ val pod = assertPodSettings(builder)
+
+ val terms =
+
pod.getSpec.getAffinity.getNodeAffinity.getRequiredDuringSchedulingIgnoredDuringExecution.getNodeSelectorTerms.asScala
+ terms.exists(_.getMatchExpressions.asScala.exists(_.getKey ==
"nodelabel")) shouldBe true
+ }
+
+ private def assertPodSettings(builder: WhiskPodBuilder): Pod = {
+ val pod = builder.buildPodSpec(name, testImage, memLimit, Map("foo" ->
"bar"), Map("fooL" -> "barV"))
+ withClue(Serialization.asYaml(pod)) {
+ val c = getActionContainer(pod)
+ c.getEnv.asScala.exists(_.getName == "foo") shouldBe true
+
+ c.getResources.getLimits.asScala.get("memory").map(_.getAmount) shouldBe
Some("10Mi")
+ c.getSecurityContext.getCapabilities.getDrop.asScala should contain
allOf ("NET_RAW", "NET_ADMIN")
+ c.getPorts.asScala.find(_.getName == "action").map(_.getContainerPort)
shouldBe Some(8080)
+ c.getImage shouldBe testImage
+
+ pod.getMetadata.getLabels.asScala.get("name") shouldBe Some(name)
+ pod.getMetadata.getLabels.asScala.get("fooL") shouldBe Some("barV")
+ pod.getMetadata.getName shouldBe name
+ pod.getSpec.getRestartPolicy shouldBe "Always"
+
+ if (builder.affinityEnabled) {
+ val terms =
+
pod.getSpec.getAffinity.getNodeAffinity.getRequiredDuringSchedulingIgnoredDuringExecution.getNodeSelectorTerms.asScala
+ terms.exists(_.getMatchExpressions.asScala.exists(_.getKey ==
affinity.key)) shouldBe true
+ }
+ }
+ pod
+ }
+
+ private def getActionContainer(pod: Pod) = {
+ pod.getSpec.getContainers.asScala.find(_.getName == "user-action").get
+ }
+}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/standalone/StandaloneKCFTests.scala
b/tests/src/test/scala/org/apache/openwhisk/standalone/StandaloneKCFTests.scala
index acf4106..bb5a0c2 100644
---
a/tests/src/test/scala/org/apache/openwhisk/standalone/StandaloneKCFTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/standalone/StandaloneKCFTests.scala
@@ -17,13 +17,22 @@
package org.apache.openwhisk.standalone
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
import common.WskProps
+import org.apache.commons.io.FileUtils
+import
org.apache.openwhisk.core.containerpool.kubernetes.test.KubeClientSupport
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import system.basic.WskRestBasicTests
@RunWith(classOf[JUnitRunner])
-class StandaloneKCFTests extends WskRestBasicTests with
StandaloneServerFixture with StandaloneSanityTestSupport {
+class StandaloneKCFTests
+ extends WskRestBasicTests
+ with StandaloneServerFixture
+ with StandaloneSanityTestSupport
+ with KubeClientSupport {
override implicit val wskprops = WskProps().copy(apihost = serverUrl)
//Turn on to debug locally easily
@@ -31,17 +40,42 @@ class StandaloneKCFTests extends WskRestBasicTests with
StandaloneServerFixture
override protected val dumpStartupLogs = false
+ override protected def useMockServer = false
+
override protected def supportedTests = Set("Wsk Action REST should invoke a
blocking action and get only the result")
override protected def extraArgs: Seq[String] = Seq("--dev-mode",
"--dev-kcf")
- override def beforeAll(): Unit = {
- val kubeconfig = sys.env.get("KUBECONFIG")
- require(kubeconfig.isDefined, "KUBECONFIG env must be defined")
- println(s"Using kubeconfig from ${kubeconfig.get}")
+ private val podTemplate = """---
+ |apiVersion: "v1"
+ |kind: "Pod"
+ |metadata:
+ | annotations:
+ | allow-outbound : "true"
+ | labels:
+ | launcher: standalone""".stripMargin
+
+ private val podTemplateFile = Files.createTempFile("whisk", null).toFile
+
+ override val customConfig = {
+ FileUtils.write(podTemplateFile, podTemplate, UTF_8)
+ Some(s"""include classpath("standalone-kcf.conf")
+ |
+ |whisk {
+ | kubernetes {
+ | pod-template = "${podTemplateFile.toURI}"
+ | }
+ |}""".stripMargin)
+ }
+
+ override def afterAll(): Unit = {
+ checkPodState()
+ super.afterAll()
+ podTemplateFile.delete()
+ }
- //Note the context need to specify default namespace
- //kubectl config set-context --current --namespace=default
- super.beforeAll()
+ def checkPodState(): Unit = {
+ val podList = kubeClient.pods().withLabel("launcher").list()
+ podList.getItems.isEmpty shouldBe false
}
}