This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new fb6c779 kcf - enable use of fieldref to inject action pod details to
pod env (#4859)
fb6c779 is described below
commit fb6c779da576e9e4d32a6d7501a4d2031ab1dbf3
Author: tysonnorris <[email protected]>
AuthorDate: Mon Mar 16 14:35:08 2020 -0700
kcf - enable use of fieldref to inject action pod details to pod env (#4859)
* update env var to load dynamic value
* make fieldref env vars configurable
* cleanup
* cleanup
Co-authored-by: Duy Nguyen <[email protected]>
---
core/invoker/src/main/resources/application.conf | 8 +++
.../kubernetes/KubernetesClient.scala | 5 +-
.../containerpool/kubernetes/WhiskPodBuilder.scala | 27 +++++----
.../kubernetes/test/WhiskPodBuilderTests.scala | 65 ++++++++++++++--------
4 files changed, 70 insertions(+), 35 deletions(-)
diff --git a/core/invoker/src/main/resources/application.conf
b/core/invoker/src/main/resources/application.conf
index 6c6d5aa..63d0824 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -98,6 +98,14 @@ whisk {
# max-millicpus = 4000
#}
+ # Action pods can be injected with pod data using field refs to the pod
spec (aka The Downward API):
+ #
https://kubernetes.io/docs/tasks/inject-data-application/environment-variable-expose-pod-information/#use-pod-fields-as-values-for-environment-variables
+ #field-ref-environment: {
+ # "POD_NAMESPACE":"metadata.namespace",
+ # "POD_NAME":"metadata.name",
+ # "POD_UID": "metadata.uid"
+ #}
+
#enable PodDisruptionBudget creation for pods? (will include same labels
as pods, and specify minAvailable=1 to prevent termination of action pods
during maintenance)
pdb-enabled = false
}
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index 322ade9..ab42f5c 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -92,7 +92,8 @@ case class KubernetesClientConfig(timeouts:
KubernetesClientTimeoutConfig,
actionNamespace: Option[String],
podTemplate: Option[ConfigMapValue],
cpuScaling:
Option[KubernetesCpuScalingConfig],
- pdbEnabled: Boolean)
+ pdbEnabled: Boolean,
+ fieldRefEnvironment: Option[Map[String,
String]])
/**
* Serves as an interface to the Kubernetes API by proxying its REST API
and/or invoking the kubectl CLI.
@@ -118,7 +119,7 @@ class KubernetesClient(
new DefaultKubernetesClient(configBuilder.build())
}
- private val podBuilder = new WhiskPodBuilder(kubeRestClient,
config.userPodNodeAffinity, config.podTemplate)
+ private val podBuilder = new WhiskPodBuilder(kubeRestClient, config)
def run(name: String,
image: String,
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
index 3b606ba..a289820 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
@@ -25,6 +25,7 @@ import
io.fabric8.kubernetes.api.model.policy.{PodDisruptionBudget, PodDisruptio
import io.fabric8.kubernetes.api.model.{
ContainerBuilder,
EnvVarBuilder,
+ EnvVarSourceBuilder,
IntOrString,
LabelSelectorBuilder,
Pod,
@@ -32,19 +33,17 @@ import io.fabric8.kubernetes.api.model.{
Quantity
}
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
-import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
+import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.entity.ByteSize
import scala.collection.JavaConverters._
-class WhiskPodBuilder(client: NamespacedKubernetesClient,
- userPodNodeAffinity: KubernetesInvokerNodeAffinity,
- podTemplate: Option[ConfigMapValue] = None) {
- private val template = podTemplate.map(_.value.getBytes(UTF_8))
+class WhiskPodBuilder(client: NamespacedKubernetesClient, config:
KubernetesClientConfig) {
+ private val template = config.podTemplate.map(_.value.getBytes(UTF_8))
private val actionContainerName = "user-action"
private val actionContainerPredicate: Predicate[ContainerBuilder] = (cb) =>
cb.getName == actionContainerName
- def affinityEnabled: Boolean = userPodNodeAffinity.enabled
+ def affinityEnabled: Boolean = config.userPodNodeAffinity.enabled
def buildPodSpec(
name: String,
@@ -55,7 +54,15 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
config: KubernetesClientConfig)(implicit transid: TransactionId): (Pod,
Option[PodDisruptionBudget]) = {
val envVars = environment.map {
case (key, value) => new
EnvVarBuilder().withName(key).withValue(value).build()
- }.toSeq
+ }.toSeq ++ config.fieldRefEnvironment
+ .map(_.map({
+ case (key, value) =>
+ new EnvVarBuilder()
+ .withName(key)
+ .withValueFrom(new
EnvVarSourceBuilder().withNewFieldRef().withFieldPath(value).endFieldRef().build())
+ .build()
+ }).toSeq)
+ .getOrElse(Seq.empty)
val baseBuilder = template match {
case Some(bytes) =>
@@ -73,7 +80,7 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
val specBuilder = pb1.editOrNewSpec().withRestartPolicy("Always")
- if (userPodNodeAffinity.enabled) {
+ if (config.userPodNodeAffinity.enabled) {
val affinity = specBuilder
.editOrNewAffinity()
.editOrNewNodeAffinity()
@@ -81,9 +88,9 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
affinity
.addNewNodeSelectorTerm()
.addNewMatchExpression()
- .withKey(userPodNodeAffinity.key)
+ .withKey(config.userPodNodeAffinity.key)
.withOperator("In")
- .withValues(userPodNodeAffinity.value)
+ .withValues(config.userPodNodeAffinity.value)
.endMatchExpression()
.endNodeSelectorTerm()
.endRequiredDuringSchedulingIgnoredDuringExecution()
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
index 12ed183..4e344c9 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
@@ -18,7 +18,14 @@
package org.apache.openwhisk.core.containerpool.kubernetes.test
import io.fabric8.kubernetes.api.model.policy.PodDisruptionBudgetBuilder
-import io.fabric8.kubernetes.api.model.{IntOrString, LabelSelectorBuilder, Pod}
+import io.fabric8.kubernetes.api.model.{
+ EnvVar,
+ EnvVarSource,
+ IntOrString,
+ LabelSelectorBuilder,
+ ObjectFieldSelector,
+ Pod
+}
import io.fabric8.kubernetes.client.utils.Serialization
import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
import org.apache.openwhisk.core.containerpool.kubernetes.{
@@ -46,12 +53,23 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers
with KubeClientSupport
behavior of "WhiskPodBuilder"
+ def config(configMap: Option[ConfigMapValue] = None, affinity:
Option[KubernetesInvokerNodeAffinity] = None) =
+ KubernetesClientConfig(
+ KubernetesClientTimeoutConfig(1.seconds, 2.seconds),
+ affinity.getOrElse(KubernetesInvokerNodeAffinity(false, "k", "v")),
+ false,
+ None,
+ configMap,
+ Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)),
+ false,
+ Some(Map("POD_UID" -> "metadata.uid")))
+
it should "build a new pod" in {
- val builder = new WhiskPodBuilder(kubeClient, affinity)
- assertPodSettings(builder)
+ val c = config()
+ val builder = new WhiskPodBuilder(kubeClient, c)
+ assertPodSettings(builder, c)
}
it should "build set cpu scaled based on memory, if enabled in
configuration" in {
- val builder = new WhiskPodBuilder(kubeClient, affinity)
val config = KubernetesClientConfig(
KubernetesClientTimeoutConfig(1.second, 1.second),
KubernetesInvokerNodeAffinity(false, "k", "v"),
@@ -59,7 +77,9 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers
with KubeClientSupport
None,
None,
Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)),
- false)
+ false,
+ None)
+ val builder = new WhiskPodBuilder(kubeClient, config)
val (pod, _) = builder.buildPodSpec(name, testImage, 2.MB, Map("foo" ->
"bar"), Map("fooL" -> "barV"), config)
withClue(Serialization.asYaml(pod)) {
@@ -89,7 +109,8 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers
with KubeClientSupport
None,
None,
None,
- false)
+ false,
+ None)
val (pod4, _) = builder.buildPodSpec(name, testImage, 7.MB, Map("foo" ->
"bar"), Map("fooL" -> "barV"), config2)
withClue(Serialization.asYaml(pod4)) {
val c = getActionContainer(pod4)
@@ -121,8 +142,9 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers
with KubeClientSupport
| image : "busybox"
|""".stripMargin
- val builder = new WhiskPodBuilder(kubeClient, affinity.copy(enabled =
false), Some(ConfigMapValue(template)))
- val pod = assertPodSettings(builder)
+ val c = config(Some(ConfigMapValue(template)))
+ val builder = new WhiskPodBuilder(kubeClient, c)
+ val pod = assertPodSettings(builder, c)
val ac = getActionContainer(pod)
ac.getSecurityContext.getCapabilities.getDrop.asScala should
contain("TEST_CAP")
@@ -136,8 +158,9 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers
with KubeClientSupport
}
it should "build a pod disruption budget for the pod, if enabled" in {
- val builder = new WhiskPodBuilder(kubeClient, affinity)
- assertPodSettings(builder, true)
+ val c = config()
+ val builder = new WhiskPodBuilder(kubeClient, c)
+ assertPodSettings(builder, c)
}
it should "extend existing pod template with affinity" in {
@@ -155,28 +178,24 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers
with KubeClientSupport
| values:
| - "test"""".stripMargin
- val builder = new WhiskPodBuilder(kubeClient, affinity.copy(enabled =
true), Some(ConfigMapValue(template)))
- val pod = assertPodSettings(builder)
+ val c = config(Some(ConfigMapValue(template)), Some(affinity.copy(enabled
= true)))
+ val builder =
+ new WhiskPodBuilder(kubeClient, c)
+ val pod = assertPodSettings(builder, c)
val terms =
pod.getSpec.getAffinity.getNodeAffinity.getRequiredDuringSchedulingIgnoredDuringExecution.getNodeSelectorTerms.asScala
terms.exists(_.getMatchExpressions.asScala.exists(_.getKey ==
"nodelabel")) shouldBe true
}
- private def assertPodSettings(builder: WhiskPodBuilder, pdbEnabled: Boolean
= false): Pod = {
- val config = KubernetesClientConfig(
- KubernetesClientTimeoutConfig(1.second, 1.second),
- KubernetesInvokerNodeAffinity(false, "k", "v"),
- true,
- None,
- None,
- Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)),
- pdbEnabled)
+ private def assertPodSettings(builder: WhiskPodBuilder, config:
KubernetesClientConfig): Pod = {
val labels = Map("fooL" -> "barV")
val (pod, pdb) = builder.buildPodSpec(name, testImage, memLimit, Map("foo"
-> "bar"), labels, config)
withClue(Serialization.asYaml(pod)) {
val c = getActionContainer(pod)
- c.getEnv.asScala.exists(_.getName == "foo") shouldBe true
+ c.getEnv.asScala.shouldBe(Seq(
+ new EnvVar("foo", "bar", null),
+ new EnvVar("POD_UID", null, new EnvVarSource(null, new
ObjectFieldSelector(null, "metadata.uid"), null, null))))
c.getResources.getLimits.asScala.get("memory").map(_.getAmount) shouldBe
Some("10Mi")
c.getResources.getLimits.asScala.get("cpu").map(_.getAmount) shouldBe
Some("900m")
@@ -195,7 +214,7 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers
with KubeClientSupport
terms.exists(_.getMatchExpressions.asScala.exists(_.getKey ==
affinity.key)) shouldBe true
}
}
- if (pdbEnabled) {
+ if (config.pdbEnabled) {
println("matching pdb...")
pdb shouldBe Some(
new PodDisruptionBudgetBuilder().withNewMetadata