This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new fb6c779  kcf - enable use of fieldref to inject action pod details to 
pod env (#4859)
fb6c779 is described below

commit fb6c779da576e9e4d32a6d7501a4d2031ab1dbf3
Author: tysonnorris <[email protected]>
AuthorDate: Mon Mar 16 14:35:08 2020 -0700

    kcf - enable use of fieldref to inject action pod details to pod env (#4859)
    
    * update env var to load dynamic value
    
    * make fieldref env vars configurable
    
    * cleanup
    
    * cleanup
    
    Co-authored-by: Duy Nguyen <[email protected]>
---
 core/invoker/src/main/resources/application.conf   |  8 +++
 .../kubernetes/KubernetesClient.scala              |  5 +-
 .../containerpool/kubernetes/WhiskPodBuilder.scala | 27 +++++----
 .../kubernetes/test/WhiskPodBuilderTests.scala     | 65 ++++++++++++++--------
 4 files changed, 70 insertions(+), 35 deletions(-)

diff --git a/core/invoker/src/main/resources/application.conf 
b/core/invoker/src/main/resources/application.conf
index 6c6d5aa..63d0824 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -98,6 +98,14 @@ whisk {
     #  max-millicpus = 4000
     #}
 
+    # Action pods can be injected with pod data using field refs to the pod 
spec (aka The Downward API):
+    # 
https://kubernetes.io/docs/tasks/inject-data-application/environment-variable-expose-pod-information/#use-pod-fields-as-values-for-environment-variables
+    #field-ref-environment: {
+    #  "POD_NAMESPACE":"metadata.namespace",
+    #  "POD_NAME":"metadata.name",
+    #  "POD_UID": "metadata.uid"
+    #}
+
     #enable PodDisruptionBudget creation for pods? (will include same labels 
as pods, and specify minAvailable=1 to prevent termination of action pods 
during maintenance)
     pdb-enabled = false
   }
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index 322ade9..ab42f5c 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -92,7 +92,8 @@ case class KubernetesClientConfig(timeouts: 
KubernetesClientTimeoutConfig,
                                   actionNamespace: Option[String],
                                   podTemplate: Option[ConfigMapValue],
                                   cpuScaling: 
Option[KubernetesCpuScalingConfig],
-                                  pdbEnabled: Boolean)
+                                  pdbEnabled: Boolean,
+                                  fieldRefEnvironment: Option[Map[String, 
String]])
 
 /**
  * Serves as an interface to the Kubernetes API by proxying its REST API 
and/or invoking the kubectl CLI.
@@ -118,7 +119,7 @@ class KubernetesClient(
     new DefaultKubernetesClient(configBuilder.build())
   }
 
-  private val podBuilder = new WhiskPodBuilder(kubeRestClient, 
config.userPodNodeAffinity, config.podTemplate)
+  private val podBuilder = new WhiskPodBuilder(kubeRestClient, config)
 
   def run(name: String,
           image: String,
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
index 3b606ba..a289820 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
@@ -25,6 +25,7 @@ import 
io.fabric8.kubernetes.api.model.policy.{PodDisruptionBudget, PodDisruptio
 import io.fabric8.kubernetes.api.model.{
   ContainerBuilder,
   EnvVarBuilder,
+  EnvVarSourceBuilder,
   IntOrString,
   LabelSelectorBuilder,
   Pod,
@@ -32,19 +33,17 @@ import io.fabric8.kubernetes.api.model.{
   Quantity
 }
 import io.fabric8.kubernetes.client.NamespacedKubernetesClient
-import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
+import org.apache.openwhisk.common.TransactionId
 import org.apache.openwhisk.core.entity.ByteSize
 
 import scala.collection.JavaConverters._
 
-class WhiskPodBuilder(client: NamespacedKubernetesClient,
-                      userPodNodeAffinity: KubernetesInvokerNodeAffinity,
-                      podTemplate: Option[ConfigMapValue] = None) {
-  private val template = podTemplate.map(_.value.getBytes(UTF_8))
+class WhiskPodBuilder(client: NamespacedKubernetesClient, config: 
KubernetesClientConfig) {
+  private val template = config.podTemplate.map(_.value.getBytes(UTF_8))
   private val actionContainerName = "user-action"
   private val actionContainerPredicate: Predicate[ContainerBuilder] = (cb) => 
cb.getName == actionContainerName
 
-  def affinityEnabled: Boolean = userPodNodeAffinity.enabled
+  def affinityEnabled: Boolean = config.userPodNodeAffinity.enabled
 
   def buildPodSpec(
     name: String,
@@ -55,7 +54,15 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
     config: KubernetesClientConfig)(implicit transid: TransactionId): (Pod, 
Option[PodDisruptionBudget]) = {
     val envVars = environment.map {
       case (key, value) => new 
EnvVarBuilder().withName(key).withValue(value).build()
-    }.toSeq
+    }.toSeq ++ config.fieldRefEnvironment
+      .map(_.map({
+        case (key, value) =>
+          new EnvVarBuilder()
+            .withName(key)
+            .withValueFrom(new 
EnvVarSourceBuilder().withNewFieldRef().withFieldPath(value).endFieldRef().build())
+            .build()
+      }).toSeq)
+      .getOrElse(Seq.empty)
 
     val baseBuilder = template match {
       case Some(bytes) =>
@@ -73,7 +80,7 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
 
     val specBuilder = pb1.editOrNewSpec().withRestartPolicy("Always")
 
-    if (userPodNodeAffinity.enabled) {
+    if (config.userPodNodeAffinity.enabled) {
       val affinity = specBuilder
         .editOrNewAffinity()
         .editOrNewNodeAffinity()
@@ -81,9 +88,9 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
       affinity
         .addNewNodeSelectorTerm()
         .addNewMatchExpression()
-        .withKey(userPodNodeAffinity.key)
+        .withKey(config.userPodNodeAffinity.key)
         .withOperator("In")
-        .withValues(userPodNodeAffinity.value)
+        .withValues(config.userPodNodeAffinity.value)
         .endMatchExpression()
         .endNodeSelectorTerm()
         .endRequiredDuringSchedulingIgnoredDuringExecution()
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
index 12ed183..4e344c9 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
@@ -18,7 +18,14 @@
 package org.apache.openwhisk.core.containerpool.kubernetes.test
 
 import io.fabric8.kubernetes.api.model.policy.PodDisruptionBudgetBuilder
-import io.fabric8.kubernetes.api.model.{IntOrString, LabelSelectorBuilder, Pod}
+import io.fabric8.kubernetes.api.model.{
+  EnvVar,
+  EnvVarSource,
+  IntOrString,
+  LabelSelectorBuilder,
+  ObjectFieldSelector,
+  Pod
+}
 import io.fabric8.kubernetes.client.utils.Serialization
 import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
 import org.apache.openwhisk.core.containerpool.kubernetes.{
@@ -46,12 +53,23 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers 
with KubeClientSupport
 
   behavior of "WhiskPodBuilder"
 
+  def config(configMap: Option[ConfigMapValue] = None, affinity: 
Option[KubernetesInvokerNodeAffinity] = None) =
+    KubernetesClientConfig(
+      KubernetesClientTimeoutConfig(1.seconds, 2.seconds),
+      affinity.getOrElse(KubernetesInvokerNodeAffinity(false, "k", "v")),
+      false,
+      None,
+      configMap,
+      Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)),
+      false,
+      Some(Map("POD_UID" -> "metadata.uid")))
+
   it should "build a new pod" in {
-    val builder = new WhiskPodBuilder(kubeClient, affinity)
-    assertPodSettings(builder)
+    val c = config()
+    val builder = new WhiskPodBuilder(kubeClient, c)
+    assertPodSettings(builder, c)
   }
   it should "build set cpu scaled based on memory, if enabled in 
configuration" in {
-    val builder = new WhiskPodBuilder(kubeClient, affinity)
     val config = KubernetesClientConfig(
       KubernetesClientTimeoutConfig(1.second, 1.second),
       KubernetesInvokerNodeAffinity(false, "k", "v"),
@@ -59,7 +77,9 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers 
with KubeClientSupport
       None,
       None,
       Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)),
-      false)
+      false,
+      None)
+    val builder = new WhiskPodBuilder(kubeClient, config)
 
     val (pod, _) = builder.buildPodSpec(name, testImage, 2.MB, Map("foo" -> 
"bar"), Map("fooL" -> "barV"), config)
     withClue(Serialization.asYaml(pod)) {
@@ -89,7 +109,8 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers 
with KubeClientSupport
       None,
       None,
       None,
-      false)
+      false,
+      None)
     val (pod4, _) = builder.buildPodSpec(name, testImage, 7.MB, Map("foo" -> 
"bar"), Map("fooL" -> "barV"), config2)
     withClue(Serialization.asYaml(pod4)) {
       val c = getActionContainer(pod4)
@@ -121,8 +142,9 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers 
with KubeClientSupport
        |      image : "busybox"
        |""".stripMargin
 
-    val builder = new WhiskPodBuilder(kubeClient, affinity.copy(enabled = 
false), Some(ConfigMapValue(template)))
-    val pod = assertPodSettings(builder)
+    val c = config(Some(ConfigMapValue(template)))
+    val builder = new WhiskPodBuilder(kubeClient, c)
+    val pod = assertPodSettings(builder, c)
 
     val ac = getActionContainer(pod)
     ac.getSecurityContext.getCapabilities.getDrop.asScala should 
contain("TEST_CAP")
@@ -136,8 +158,9 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers 
with KubeClientSupport
   }
 
   it should "build a pod disruption budget for the pod, if enabled" in {
-    val builder = new WhiskPodBuilder(kubeClient, affinity)
-    assertPodSettings(builder, true)
+    val c = config()
+    val builder = new WhiskPodBuilder(kubeClient, c)
+    assertPodSettings(builder, c)
   }
 
   it should "extend existing pod template with affinity" in {
@@ -155,28 +178,24 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers 
with KubeClientSupport
        |            values:
        |            - "test"""".stripMargin
 
-    val builder = new WhiskPodBuilder(kubeClient, affinity.copy(enabled = 
true), Some(ConfigMapValue(template)))
-    val pod = assertPodSettings(builder)
+    val c = config(Some(ConfigMapValue(template)), Some(affinity.copy(enabled 
= true)))
+    val builder =
+      new WhiskPodBuilder(kubeClient, c)
+    val pod = assertPodSettings(builder, c)
 
     val terms =
       
pod.getSpec.getAffinity.getNodeAffinity.getRequiredDuringSchedulingIgnoredDuringExecution.getNodeSelectorTerms.asScala
     terms.exists(_.getMatchExpressions.asScala.exists(_.getKey == 
"nodelabel")) shouldBe true
   }
 
-  private def assertPodSettings(builder: WhiskPodBuilder, pdbEnabled: Boolean 
= false): Pod = {
-    val config = KubernetesClientConfig(
-      KubernetesClientTimeoutConfig(1.second, 1.second),
-      KubernetesInvokerNodeAffinity(false, "k", "v"),
-      true,
-      None,
-      None,
-      Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)),
-      pdbEnabled)
+  private def assertPodSettings(builder: WhiskPodBuilder, config: 
KubernetesClientConfig): Pod = {
     val labels = Map("fooL" -> "barV")
     val (pod, pdb) = builder.buildPodSpec(name, testImage, memLimit, Map("foo" 
-> "bar"), labels, config)
     withClue(Serialization.asYaml(pod)) {
       val c = getActionContainer(pod)
-      c.getEnv.asScala.exists(_.getName == "foo") shouldBe true
+      c.getEnv.asScala.shouldBe(Seq(
+        new EnvVar("foo", "bar", null),
+        new EnvVar("POD_UID", null, new EnvVarSource(null, new 
ObjectFieldSelector(null, "metadata.uid"), null, null))))
 
       c.getResources.getLimits.asScala.get("memory").map(_.getAmount) shouldBe 
Some("10Mi")
       c.getResources.getLimits.asScala.get("cpu").map(_.getAmount) shouldBe 
Some("900m")
@@ -195,7 +214,7 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers 
with KubeClientSupport
         terms.exists(_.getMatchExpressions.asScala.exists(_.getKey == 
affinity.key)) shouldBe true
       }
     }
-    if (pdbEnabled) {
+    if (config.pdbEnabled) {
       println("matching pdb...")
       pdb shouldBe Some(
         new PodDisruptionBudgetBuilder().withNewMetadata

Reply via email to