This is an automated email from the ASF dual-hosted git repository.

saadurrahman pushed a commit to branch 
saadurrahman/3846-Refactoring-K8s-Shim-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to 
refs/heads/saadurrahman/3846-Refactoring-K8s-Shim-dev by this push:
     new 8a1f15b35b8 [StatefulSet] removed calls to cluster for Pod Templates.
8a1f15b35b8 is described below

commit 8a1f15b35b81ebba9cccd9f44cf9e93d1ecd55f4
Author: Saad Ur Rahman <[email protected]>
AuthorDate: Wed Jul 20 14:20:47 2022 -0400

    [StatefulSet] removed calls to cluster for Pod Templates.
    
    Removed cross-cutting calls to the K8s cluster from within the Stateful Set 
factory. All calls to the K8s cluster should remain in the shim with Pod Specs, 
 if found, provided in the configuration container.
---
 .../heron/scheduler/kubernetes/StatefulSet.java    | 211 ++-------------------
 1 file changed, 19 insertions(+), 192 deletions(-)

diff --git 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
index 3f23b4be321..c8abd842057 100644
--- 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
+++ 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
@@ -19,8 +19,6 @@
 
 package org.apache.heron.scheduler.kubernetes;
 
-
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -37,7 +35,6 @@ import java.util.stream.IntStream;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.heron.api.utils.TopologyUtils;
-import org.apache.heron.common.basics.Pair;
 import org.apache.heron.scheduler.TopologyRuntimeManagementException;
 import org.apache.heron.scheduler.TopologySubmissionException;
 import org.apache.heron.scheduler.utils.Runtime;
@@ -47,9 +44,6 @@ import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.packing.Resource;
 
 import io.kubernetes.client.custom.Quantity;
-import io.kubernetes.client.openapi.ApiException;
-import io.kubernetes.client.openapi.apis.CoreV1Api;
-import io.kubernetes.client.openapi.models.V1ConfigMap;
 import io.kubernetes.client.openapi.models.V1Container;
 import io.kubernetes.client.openapi.models.V1ContainerPort;
 import io.kubernetes.client.openapi.models.V1EnvVar;
@@ -59,19 +53,15 @@ import 
io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
 import io.kubernetes.client.openapi.models.V1ObjectMeta;
 import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
 import io.kubernetes.client.openapi.models.V1PodSpec;
-import io.kubernetes.client.openapi.models.V1PodTemplate;
 import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
 import io.kubernetes.client.openapi.models.V1ResourceRequirements;
 import io.kubernetes.client.openapi.models.V1SecretKeySelector;
 import io.kubernetes.client.openapi.models.V1SecretVolumeSourceBuilder;
-import io.kubernetes.client.openapi.models.V1Service;
-import io.kubernetes.client.openapi.models.V1ServiceSpec;
 import io.kubernetes.client.openapi.models.V1StatefulSet;
 import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
 import io.kubernetes.client.openapi.models.V1Toleration;
 import io.kubernetes.client.openapi.models.V1Volume;
 import io.kubernetes.client.openapi.models.V1VolumeMount;
-import io.kubernetes.client.util.Yaml;
 
 final class StatefulSet {
   private final Map<Type, IStatefulSetFactory> statefulsets = new HashMap<>();
@@ -92,21 +82,19 @@ final class StatefulSet {
    * <code>KubernetesController</code> cannot be accessed externally since it 
is an abstract class.
    */
   static final class Configs {
-    private final CoreV1Api coreClient;
-    private final String namespace;
     private final String topologyName;
     private final Config configuration;
     private final Config runtimeConfiguration;
-    private final boolean podTemplateDisabled;
+    private final V1PodTemplateSpec managerPodTemplateSpec;
+    private final V1PodTemplateSpec executorPodTemplateSpec;
 
-    Configs(CoreV1Api coreClient, String namespace, Config configuration,
-            Config runtimeConfiguration) {
-      this.coreClient = coreClient;
-      this.namespace = namespace;
+    Configs(Config configuration, Config runtimeConfiguration,
+            V1PodTemplateSpec managerPodTemplateSpec, V1PodTemplateSpec 
executorPodTemplateSpec) {
       this.topologyName = Runtime.topologyName(runtimeConfiguration);
       this.configuration = configuration;
       this.runtimeConfiguration = runtimeConfiguration;
-      this.podTemplateDisabled = 
KubernetesContext.getPodTemplateDisabled(configuration);
+      this.managerPodTemplateSpec = managerPodTemplateSpec;
+      this.executorPodTemplateSpec = executorPodTemplateSpec;
     }
 
     Config getConfiguration() {
@@ -117,20 +105,16 @@ final class StatefulSet {
       return runtimeConfiguration;
     }
 
-    String getNamespace() {
-      return namespace;
-    }
-
     String getTopologyName() {
       return topologyName;
     }
 
-    boolean isPodTemplateDisabled() {
-      return podTemplateDisabled;
+    V1PodTemplateSpec getManagerPodTemplateSpec() {
+      return managerPodTemplateSpec;
     }
 
-    public CoreV1Api getCoreClient() {
-      return coreClient;
+    V1PodTemplateSpec getExecutorPodTemplateSpec() {
+      return executorPodTemplateSpec;
     }
   }
 
@@ -166,7 +150,7 @@ final class StatefulSet {
     public V1StatefulSet create(Configs configs, Resource containerResources,
                                 int numberOfInstances) {
       clusterConfigs = configs;
-      return null;
+      return createStatefulSet(containerResources, numberOfInstances, true);
     }
   }
 
@@ -176,7 +160,7 @@ final class StatefulSet {
     public V1StatefulSet create(Configs configs, Resource containerResources,
                                 int numberOfInstances) {
       clusterConfigs = configs;
-      return null;
+      return createStatefulSet(containerResources, numberOfInstances, false);
     }
   }
 
@@ -235,32 +219,6 @@ final class StatefulSet {
     return String.format(pattern, ENV_SHARD_ID, ENV_SHARD_ID);
   }
 
-  /**
-   * Creates a headless <code>Service</code> to facilitate communication 
between Pods in a <code>topology</code>.
-   * @return A fully configured <code>Service</code> to be used by a 
<code>topology</code>.
-   */
-  private V1Service createTopologyService() {
-    final String topologyName = clusterConfigs.getTopologyName();
-
-    final V1Service service = new V1Service();
-
-    // Setup service metadata.
-    final V1ObjectMeta objectMeta = new V1ObjectMeta()
-        .name(topologyName)
-        .annotations(getServiceAnnotations())
-        .labels(getServiceLabels());
-    service.setMetadata(objectMeta);
-
-    // Create the headless service.
-    final V1ServiceSpec serviceSpec = new V1ServiceSpec()
-        .clusterIP("None")
-        .selector(getPodMatchLabels(topologyName));
-
-    service.setSpec(serviceSpec);
-
-    return service;
-  }
-
   /**
    * Creates and configures the <code>StatefulSet</code> which the topology's 
<code>executor</code>s will run in.
    * @param containerResource Passed down to configure the 
<code>executor</code> resource limits.
@@ -320,7 +278,9 @@ final class StatefulSet {
     statefulSetSpec.setSelector(selector);
 
     // Create a Pod Template.
-    final V1PodTemplateSpec podTemplateSpec = loadPodFromTemplate(isExecutor);
+    final V1PodTemplateSpec podTemplateSpec =
+        isExecutor ? clusterConfigs.getExecutorPodTemplateSpec()
+        : clusterConfigs.getManagerPodTemplateSpec();
 
     // Set up Pod Metadata.
     final V1ObjectMeta templateMetaData = new 
V1ObjectMeta().labels(getPodLabels(topologyName));
@@ -350,14 +310,6 @@ final class StatefulSet {
     return 
KubernetesContext.getPodAnnotations(clusterConfigs.getConfiguration());
   }
 
-  /**
-   * Extracts <code>Service Annotations</code> for configurations.
-   * @return Key-value pairs of service <code>Annotation</code>s to be added 
to the Pod.
-   */
-  private Map<String, String> getServiceAnnotations() {
-    return 
KubernetesContext.getServiceAnnotations(clusterConfigs.getConfiguration());
-  }
-
   /**
    * Generates <code>Label</code>s to indicate Prometheus scraping and the 
exposed port.
    * @return Key-value pairs of Prometheus <code>Annotation</code>s to be 
added to the Pod.
@@ -397,14 +349,6 @@ final class StatefulSet {
     return labels;
   }
 
-  /**
-   * Extracts <code>Selector Labels</code> for<code>Service</code>s from 
configurations.
-   * @return Key-value pairs of <code>Service Labels</code> to be added to the 
Pod.
-   */
-  private Map<String, String> getServiceLabels() {
-    return 
KubernetesContext.getServiceLabels(clusterConfigs.getConfiguration());
-  }
-
   /**
    * Configures the <code>Pod Spec</code> section of the 
<code>StatefulSet</code>. The <code>Heron</code> container
    * will be configured to allow it to function but other supplied containers 
are loaded verbatim.
@@ -783,10 +727,10 @@ final class StatefulSet {
     for (Map.Entry<String, String> secret : podSecretKeyRefs.entrySet()) {
       final String[] keyRefParts = secret.getValue().split(":");
       if (keyRefParts.length != 2) {
-        LOG.log(Level.SEVERE,
-                "SecretKeyRef must be in the form name:key. <" + 
secret.getValue() + ">");
-        throw new TopologyRuntimeManagementException(
-                "SecretKeyRef must be in the form name:key. <" + 
secret.getValue() + ">");
+        final String msg =
+            String.format("SecretKeyRef must be in the form name:key. <%s>", 
secret.getValue());
+        LOG.log(Level.SEVERE, msg);
+        throw new TopologyRuntimeManagementException(msg);
       }
       String name = keyRefParts[0];
       String key = keyRefParts[1];
@@ -800,113 +744,6 @@ final class StatefulSet {
     }
   }
 
-  /**
-   * Initiates the process of locating and loading <code>Pod Template</code> 
from a <code>ConfigMap</code>.
-   * The loaded text is then parsed into a usable <code>Pod Template</code>.
-   * @param isExecutor Flag to indicate loading of <code>Pod Template</code> 
for <code>Executor</code>
-   *                   or <code>Manager</code>.
-   * @return A <code>Pod Template</code> which is loaded and parsed from a 
<code>ConfigMap</code>.
-   */
-  @VisibleForTesting
-  protected V1PodTemplateSpec loadPodFromTemplate(boolean isExecutor) {
-    final Pair<String, String> podTemplateConfigMapName = 
getPodTemplateLocation(isExecutor);
-
-    // Default Pod Template.
-    if (podTemplateConfigMapName == null) {
-      LOG.log(Level.INFO, "Configuring cluster with the Default Pod Template");
-      return new V1PodTemplateSpec();
-    }
-
-    if (clusterConfigs.isPodTemplateDisabled()) {
-      throw new TopologySubmissionException("Custom Pod Templates are 
disabled");
-    }
-
-    final String configMapName = podTemplateConfigMapName.first;
-    final String podTemplateName = podTemplateConfigMapName.second;
-
-    // Attempt to locate ConfigMap with provided Pod Template name.
-    try {
-      V1ConfigMap configMap = getConfigMap(configMapName);
-      if (configMap == null) {
-        throw new ApiException(
-            String.format("K8s client unable to locate ConfigMap '%s'", 
configMapName));
-      }
-
-      final Map<String, String> configMapData = configMap.getData();
-      if (configMapData != null && configMapData.containsKey(podTemplateName)) 
{
-        // NullPointerException when Pod Template is empty.
-        V1PodTemplateSpec podTemplate = ((V1PodTemplate)
-            Yaml.load(configMapData.get(podTemplateName))).getTemplate();
-        LOG.log(Level.INFO, String.format("Configuring cluster with the %s.%s 
Pod Template",
-            configMapName, podTemplateName));
-        return podTemplate;
-      }
-
-      // Failure to locate Pod Template with provided name.
-      throw new ApiException(String.format("Failed to locate Pod Template '%s' 
in ConfigMap '%s'",
-          podTemplateName, configMapName));
-    } catch (ApiException e) {
-      KubernetesUtils.logExceptionWithDetails(LOG, e.getMessage(), e);
-      throw new TopologySubmissionException(e.getMessage());
-    } catch (IOException | ClassCastException | NullPointerException e) {
-      final String message = String.format("Error parsing Pod Template '%s' in 
ConfigMap '%s'",
-          podTemplateName, configMapName);
-      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
-      throw new TopologySubmissionException(message);
-    }
-  }
-
-  /**
-   * Extracts the <code>ConfigMap</code> and <code>Pod Template</code> names 
from the CLI parameter.
-   * @param isExecutor Flag to indicate loading of <code>Pod Template</code> 
for <code>Executor</code>
-   *                   or <code>Manager</code>.
-   * @return A pair of the form <code>(ConfigMap, Pod Template)</code>.
-   */
-  @VisibleForTesting
-  protected Pair<String, String> getPodTemplateLocation(boolean isExecutor) {
-    final String podTemplateConfigMapName = KubernetesContext
-        .getPodTemplateConfigMapName(clusterConfigs.getConfiguration(), 
isExecutor);
-
-    if (podTemplateConfigMapName == null) {
-      return null;
-    }
-
-    try {
-      final int splitPoint = podTemplateConfigMapName.indexOf(".");
-      final String configMapName = podTemplateConfigMapName.substring(0, 
splitPoint);
-      final String podTemplateName = 
podTemplateConfigMapName.substring(splitPoint + 1);
-
-      if (configMapName.isEmpty() || podTemplateName.isEmpty()) {
-        throw new IllegalArgumentException("Empty ConfigMap or Pod Template 
name");
-      }
-
-      return new Pair<>(configMapName, podTemplateName);
-    } catch (IndexOutOfBoundsException | IllegalArgumentException e) {
-      final String message = "Invalid ConfigMap and/or Pod Template name";
-      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
-      throw new TopologySubmissionException(message);
-    }
-  }
-
-  /**
-   * Retrieves a <code>ConfigMap</code> from the K8s cluster in the API 
Server's namespace.
-   * @param configMapName Name of the <code>ConfigMap</code> to retrieve.
-   * @return The retrieved <code>ConfigMap</code>.
-   */
-  @VisibleForTesting
-  protected V1ConfigMap getConfigMap(String configMapName) {
-    try {
-      return clusterConfigs.getCoreClient().readNamespacedConfigMap(
-          configMapName,
-          clusterConfigs.getNamespace(),
-          null);
-    } catch (ApiException e) {
-      final String message = "Error retrieving ConfigMaps";
-      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
-      throw new TopologySubmissionException(String.format("%s: %s", message, 
e.getMessage()));
-    }
-  }
-
   /**
    * Generates <code>Persistent Volume Claims Templates</code> from a mapping 
of <code>Volumes</code>
    * to <code>key-value</code> pairs of configuration options and values.
@@ -1077,14 +914,4 @@ final class StatefulSet {
     return String.format("%s-%s", clusterConfigs.getTopologyName(),
         isExecutor ? KubernetesConstants.EXECUTOR_NAME : 
KubernetesConstants.MANAGER_NAME);
   }
-
-  /**
-   * Generates the <code>Selector</code> match labels with which resources in 
this topology can be found.
-   * @return A label of the form <code>app=heron,topology=topology-name</code>.
-   */
-  private String createTopologySelectorLabels() {
-    return String.format("%s=%s,%s=%s",
-        KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE,
-        KubernetesConstants.LABEL_TOPOLOGY, clusterConfigs.getTopologyName());
-  }
 }

Reply via email to