This is an automated email from the ASF dual-hosted git repository.

nicknezis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new bded8f5  Added support for adding Kubernetes annotations to the 
topology pod and service (#3699)
bded8f5 is described below

commit bded8f5f48199a0d38f5808604fd6e387a7a2d22
Author: Nicholas Nezis <[email protected]>
AuthorDate: Thu Jul 8 09:00:41 2021 -0400

    Added support for adding Kubernetes annotations to the topology pod and 
service (#3699)
---
 .../scheduler/kubernetes/KubernetesContext.java    | 44 +++++++++++++++++++++-
 .../heron/scheduler/kubernetes/V1Controller.java   | 30 +++++++++++----
 2 files changed, 66 insertions(+), 8 deletions(-)

diff --git 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
index f938cd0..f6359b8 100644
--- 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
+++ 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
@@ -19,6 +19,11 @@
 
 package org.apache.heron.scheduler.kubernetes;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.common.Context;
 
@@ -44,7 +49,7 @@ public final class KubernetesContext extends Context {
      * provided in the Resource Limit. This mode effectively guarantees the
      * cpu and memory will be reserved.
      */
-    EQUAL_TO_LIMIT;
+    EQUAL_TO_LIMIT
   }
   /**
    * This config item is used to determine how to configure the K8s Resource 
Request.
@@ -83,6 +88,11 @@ public final class KubernetesContext extends Context {
   public static final String HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH =
       "heron.kubernetes.container.volumeMount.path";
 
+  public static final String HERON_KUBERNETES_POD_ANNOTATION =
+      "heron.kubernetes.pod.annotation.";
+  public static final String HERON_KUBERNETES_SERVICE_ANNOTATION =
+      "heron.kubernetes.service.annotation.";
+
   private KubernetesContext() {
   }
 
@@ -152,6 +162,38 @@ public final class KubernetesContext extends Context {
     return config.getStringValue(HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH);
   }
 
+  static Set<String> getConfigKeys(Config config, String keyPrefix) {
+    Set<String> annotations = new HashSet<>();
+    for (String s : config.getKeySet()) {
+      if (s.startsWith(keyPrefix)) {
+        annotations.add(s);
+      }
+    }
+    return annotations;
+  }
+
+  public static Map<String, String> getPodAnnotations(Config config) {
+    final Map<String, String> annotations = new HashMap<>();
+    final Set<String> keys = getConfigKeys(config, 
HERON_KUBERNETES_POD_ANNOTATION);
+    for (String s : keys) {
+      String value = config.getStringValue(s);
+      
annotations.put(s.replaceFirst(KubernetesContext.HERON_KUBERNETES_POD_ANNOTATION,
+              ""), value);
+    }
+    return annotations;
+  }
+
+  public static Map<String, String> getServiceAnnotations(Config config) {
+    final Map<String, String> annotations = new HashMap<>();
+    final Set<String> keys = getConfigKeys(config, 
HERON_KUBERNETES_SERVICE_ANNOTATION);
+    for (String s : keys) {
+      String value = config.getStringValue(s);
+      
annotations.put(s.replaceFirst(KubernetesContext.HERON_KUBERNETES_SERVICE_ANNOTATION,
+              ""), value);
+    }
+    return annotations;
+  }
+
   public static boolean hasContainerVolume(Config config) {
     final String name = getContainerVolumeName(config);
     final String path = getContainerVolumeMountPath(config);
diff --git 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index 2056372..59399b6 100644
--- 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++ 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -104,7 +104,7 @@ public class V1Controller extends KubernetesController {
 
     final Resource containerResource = getContainerResource(packingPlan);
 
-    final V1Service topologyService = createTopologyyService();
+    final V1Service topologyService = createTopologyService();
     try {
       final V1Service response =
           coreClient.createNamespacedService(getNamespace(), topologyService, 
null,
@@ -163,7 +163,7 @@ public class V1Controller extends KubernetesController {
     final int newContainerCount = currentContainerCount + 
containersToAdd.size();
 
     try {
-      patchStatefulsetReplicas(newContainerCount);
+      patchStatefulSetReplicas(newContainerCount);
     } catch (ApiException ae) {
       throw new TopologyRuntimeManagementException(
           ae.getMessage() + "\ndetails\n" + ae.getResponseBody());
@@ -185,14 +185,14 @@ public class V1Controller extends KubernetesController {
     final int newContainerCount = currentContainerCount - 
containersToRemove.size();
 
     try {
-      patchStatefulsetReplicas(newContainerCount);
+      patchStatefulSetReplicas(newContainerCount);
     } catch (ApiException e) {
       throw new TopologyRuntimeManagementException(
           e.getMessage() + "\ndetails\n" + e.getResponseBody());
     }
   }
 
-  private void patchStatefulsetReplicas(int replicas) throws ApiException {
+  private void patchStatefulSetReplicas(int replicas) throws ApiException {
     final String body =
             String.format(JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT,
                     replicas);
@@ -317,7 +317,7 @@ public class V1Controller extends KubernetesController {
     return String.format("%s=${POD_NAME##*-} && echo shardId=${%s}", 
ENV_SHARD_ID, ENV_SHARD_ID);
   }
 
-  private V1Service createTopologyyService() {
+  private V1Service createTopologyService() {
     final String topologyName = getTopologyName();
     final Config runtimeConfiguration = getRuntimeConfiguration();
 
@@ -326,6 +326,7 @@ public class V1Controller extends KubernetesController {
     // setup service metadata
     final V1ObjectMeta objectMeta = new V1ObjectMeta();
     objectMeta.name(topologyName);
+    objectMeta.annotations(getServiceAnnotations());
     service.setMetadata(objectMeta);
 
     // create the headless service
@@ -370,7 +371,10 @@ public class V1Controller extends KubernetesController {
 
     // set up pod meta
     final V1ObjectMeta templateMetaData = new 
V1ObjectMeta().labels(getLabels(topologyName));
-    templateMetaData.annotations(getPrometheusAnnotations());
+    Map<String, String> annotations = new HashMap<>();
+    annotations.putAll(getPodAnnotations());
+    annotations.putAll(getPrometheusAnnotations());
+    templateMetaData.annotations(annotations);
     podTemplateSpec.setMetadata(templateMetaData);
 
     final List<String> command = getExecutorCommand("$" + ENV_SHARD_ID);
@@ -383,6 +387,18 @@ public class V1Controller extends KubernetesController {
     return statefulSet;
   }
 
+  private Map<String, String> getPodAnnotations() {
+    Config config = getConfiguration();
+    final Map<String, String> annotations = 
KubernetesContext.getPodAnnotations(config);
+    return annotations;
+  }
+
+  private Map<String, String> getServiceAnnotations() {
+    Config config = getConfiguration();
+    final Map<String, String> annotations = 
KubernetesContext.getServiceAnnotations(config);
+    return annotations;
+  }
+
   private Map<String, String> getPrometheusAnnotations() {
     final Map<String, String> annotations = new HashMap<>();
     annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_SCRAPE, "true");
@@ -529,7 +545,7 @@ public class V1Controller extends KubernetesController {
     if (remoteDebugEnabled) {
       IntStream.range(0, numberOfInstances).forEach(i -> {
         final String portName =
-            KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + 
String.valueOf(i);
+            KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + i;
         final V1ContainerPort port = new V1ContainerPort();
         port.setName(portName);
         port.setContainerPort(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + 
i);

Reply via email to