This is an automated email from the ASF dual-hosted git repository. saadurrahman pushed a commit to branch saadurrahman/3846-Refactoring-K8s-Shim-dev in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
commit 8e67ad10cedf217cdec43ec3c22bad445ae28910 Author: Saad Ur Rahman <[email protected]> AuthorDate: Wed Jul 20 23:25:18 2022 -0400 [KubernetesShim] removing unneeded methods that were relocated to Stateful Set factory. --- .../heron/scheduler/kubernetes/KubernetesShim.java | 720 ----------------- .../scheduler/kubernetes/KubernetesShimTest.java | 866 --------------------- 2 files changed, 1586 deletions(-) diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java index c122ff5cda3..de1f4764fb6 100644 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java +++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java @@ -22,8 +22,6 @@ package org.apache.heron.scheduler.kubernetes; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -48,7 +46,6 @@ import org.apache.heron.spi.common.Config; import org.apache.heron.spi.packing.PackingPlan; import org.apache.heron.spi.packing.Resource; -import io.kubernetes.client.custom.Quantity; import io.kubernetes.client.custom.V1Patch; import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.openapi.ApiException; @@ -56,28 +53,18 @@ import io.kubernetes.client.openapi.Configuration; import io.kubernetes.client.openapi.apis.AppsV1Api; import io.kubernetes.client.openapi.apis.CoreV1Api; import io.kubernetes.client.openapi.models.V1ConfigMap; -import io.kubernetes.client.openapi.models.V1Container; -import io.kubernetes.client.openapi.models.V1ContainerPort; import io.kubernetes.client.openapi.models.V1EnvVar; import io.kubernetes.client.openapi.models.V1EnvVarSource; -import io.kubernetes.client.openapi.models.V1LabelSelector; import io.kubernetes.client.openapi.models.V1ObjectFieldSelector; import io.kubernetes.client.openapi.models.V1ObjectMeta; -import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim; -import io.kubernetes.client.openapi.models.V1PodSpec; import io.kubernetes.client.openapi.models.V1PodTemplate; import io.kubernetes.client.openapi.models.V1PodTemplateSpec; -import io.kubernetes.client.openapi.models.V1ResourceRequirements; -import io.kubernetes.client.openapi.models.V1SecretKeySelector; -import io.kubernetes.client.openapi.models.V1SecretVolumeSourceBuilder; import io.kubernetes.client.openapi.models.V1Service; import io.kubernetes.client.openapi.models.V1ServiceSpec; import io.kubernetes.client.openapi.models.V1StatefulSet; import io.kubernetes.client.openapi.models.V1StatefulSetSpec; import io.kubernetes.client.openapi.models.V1Status; import io.kubernetes.client.openapi.models.V1Toleration; -import io.kubernetes.client.openapi.models.V1Volume; -import io.kubernetes.client.openapi.models.V1VolumeMount; import io.kubernetes.client.util.PatchUtils; import io.kubernetes.client.util.Yaml; import okhttp3.Response; @@ -383,60 +370,6 @@ public class KubernetesShim extends KubernetesController { + "] in namespace [" + getNamespace() + "] is deleted."); } - /** - * Generates the command to start Heron within the <code>container</code>. - * @param containerId Passed down to <>SchedulerUtils</> to generate executor command. - * @param numOfInstances Used to configure the debugging ports. - * @param isExecutor Flag used to generate the correct <code>shard_id</code>. - * @return The complete command to start Heron in a <code>container</code>. - */ - protected List<String> getExecutorCommand(String containerId, int numOfInstances, - boolean isExecutor) { - final Config configuration = getConfiguration(); - final Config runtimeConfiguration = getRuntimeConfiguration(); - final Map<ExecutorPort, String> ports = - KubernetesConstants.EXECUTOR_PORTS.entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, - e -> e.getValue().toString())); - - if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtimeConfiguration)) - && numOfInstances != 0) { - List<String> remoteDebuggingPorts = new LinkedList<>(); - IntStream.range(0, numOfInstances).forEach(i -> { - int port = KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i; - remoteDebuggingPorts.add(String.valueOf(port)); - }); - ports.put(ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS, - String.join(",", remoteDebuggingPorts)); - } - - final String[] executorCommand = - SchedulerUtils.getExecutorCommand(configuration, runtimeConfiguration, - containerId, ports); - return Arrays.asList( - "sh", - "-c", - KubernetesUtils.getConfCommand(configuration) - + " && " + KubernetesUtils.getFetchCommand(configuration, runtimeConfiguration) - + " && " + setShardIdEnvironmentVariableCommand(isExecutor) - + " && " + String.join(" ", executorCommand) - ); - } - - /** - * Configures the <code>shard_id</code> for the Heron container based on whether it is an <code>executor</code> - * or <code>manager</code>. <code>executor</code> IDs are [1 - n) and the <code>manager</code> IDs start at 0. - * @param isExecutor Switch flag to generate correct command. - * @return The command required to put the Heron instance in <code>executor</code> or <code>manager</code> mode. - */ - @VisibleForTesting - protected static String setShardIdEnvironmentVariableCommand(boolean isExecutor) { - final String pattern = String.format("%%s=%s && echo shardId=${%%s}", - isExecutor ? "$((${POD_NAME##*-} + 1))" : "${POD_NAME##*-}"); - return String.format(pattern, ENV_SHARD_ID, ENV_SHARD_ID); - } - /** * Creates a headless <code>Service</code> to facilitate communication between Pods in a <code>topology</code>. * @return A fully configured <code>Service</code> to be used by a <code>topology</code>. @@ -463,92 +396,6 @@ public class KubernetesShim extends KubernetesController { return service; } - /** - * Creates and configures the <code>StatefulSet</code> which the topology's <code>executor</code>s will run in. - * @param containerResource Passed down to configure the <code>executor</code> resource limits. - * @param numberOfInstances Used to configure the execution command and ports for the <code>executor</code>. - * @param isExecutor Flag used to configure components specific to <code>executor</code> and <code>manager</code>. - * @return A fully configured <code>V1StatefulSet</code> for the topology's <code>executors</code>. - */ - private V1StatefulSet createStatefulSet(Resource containerResource, int numberOfInstances, - boolean isExecutor) { - final String topologyName = getTopologyName(); - final Config runtimeConfiguration = getRuntimeConfiguration(); - - final List<V1Volume> volumes = new LinkedList<>(); - final List<V1VolumeMount> volumeMounts = new LinkedList<>(); - - // Collect Persistent Volume Claim configurations from the CLI. - final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configsPVC = - KubernetesContext.getVolumeClaimTemplates(getConfiguration(), isExecutor); - - // Collect all Volume configurations from the CLI and generate Volumes and Volume Mounts. - createVolumeAndMountsPersistentVolumeClaimCLI(configsPVC, volumes, volumeMounts); - createVolumeAndMountsHostPathCLI( - KubernetesContext.getVolumeHostPath(getConfiguration(), isExecutor), volumes, volumeMounts); - createVolumeAndMountsEmptyDirCLI( - KubernetesContext.getVolumeEmptyDir(getConfiguration(), isExecutor), volumes, volumeMounts); - createVolumeAndMountsNFSCLI( - KubernetesContext.getVolumeNFS(getConfiguration(), isExecutor), volumes, volumeMounts); - - final V1StatefulSet statefulSet = new V1StatefulSet(); - - // Setup StatefulSet's metadata. - final V1ObjectMeta objectMeta = new V1ObjectMeta() - .name(getStatefulSetName(isExecutor)) - .labels(getPodLabels(topologyName)); - statefulSet.setMetadata(objectMeta); - - // Create the StatefulSet Spec. - // Reduce replica count by one for Executors and set to one for Manager. - final int replicasCount = - isExecutor ? Runtime.numContainers(runtimeConfiguration).intValue() - 1 : 1; - final V1StatefulSetSpec statefulSetSpec = new V1StatefulSetSpec() - .serviceName(topologyName) - .replicas(replicasCount); - - // Parallel pod management tells the StatefulSet controller to launch or terminate - // all Pods in parallel, and not to wait for Pods to become Running and Ready or completely - // terminated prior to launching or terminating another Pod. - statefulSetSpec.setPodManagementPolicy("Parallel"); - - // Add selector match labels "app=heron" and "topology=topology-name" - // so we know which pods to manage. - final V1LabelSelector selector = new V1LabelSelector() - .matchLabels(getPodMatchLabels(topologyName)); - statefulSetSpec.setSelector(selector); - - // Create a Pod Template. - final V1PodTemplateSpec podTemplateSpec = loadPodFromTemplate(isExecutor); - - // Set up Pod Metadata. - final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getPodLabels(topologyName)); - Map<String, String> annotations = new HashMap<>(); - annotations.putAll(getPodAnnotations()); - annotations.putAll(getPrometheusAnnotations()); - templateMetaData.setAnnotations(annotations); - podTemplateSpec.setMetadata(templateMetaData); - - configurePodSpec(podTemplateSpec, containerResource, numberOfInstances, isExecutor, - volumes, volumeMounts); - - statefulSetSpec.setTemplate(podTemplateSpec); - - statefulSet.setSpec(statefulSetSpec); - - statefulSetSpec.setVolumeClaimTemplates(createPersistentVolumeClaims(configsPVC)); - - return statefulSet; - } - - /** - * Extracts general Pod <code>Annotation</code>s from configurations. - * @return Key-value pairs of general <code>Annotation</code>s to be added to the Pod. - */ - private Map<String, String> getPodAnnotations() { - return KubernetesContext.getPodAnnotations(getConfiguration()); - } - /** * Extracts <code>Service Annotations</code> for configurations. * @return Key-value pairs of service <code>Annotation</code>s to be added to the Pod. @@ -557,19 +404,6 @@ public class KubernetesShim extends KubernetesController { return KubernetesContext.getServiceAnnotations(getConfiguration()); } - /** - * Generates <code>Label</code>s to indicate Prometheus scraping and the exposed port. - * @return Key-value pairs of Prometheus <code>Annotation</code>s to be added to the Pod. - */ - private Map<String, String> getPrometheusAnnotations() { - final Map<String, String> annotations = new HashMap<>(); - annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_SCRAPE, "true"); - annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_PORT, - KubernetesConstants.PROMETHEUS_PORT); - - return annotations; - } - /** * Generates the <code>heron</code> and <code>topology</code> name <code>Match Label</code>s. * @param topologyName Name of the <code>topology</code>. @@ -582,20 +416,6 @@ public class KubernetesShim extends KubernetesController { return labels; } - /** - * Extracts <code>Label</code>s from configurations, generates the <code>heron</code> and - * <code>topology</code> name <code>Label</code>s. - * @param topologyName Name of the <code>topology</code>. - * @return Key-value pairs of <code>Label</code>s to be added to the Pod. - */ - private Map<String, String> getPodLabels(String topologyName) { - final Map<String, String> labels = new HashMap<>(); - labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE); - labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName); - labels.putAll(KubernetesContext.getPodLabels(getConfiguration())); - return labels; - } - /** * Extracts <code>Selector Labels</code> for<code>Service</code>s from configurations. * @return Key-value pairs of <code>Service Labels</code> to be added to the Pod. @@ -604,401 +424,6 @@ public class KubernetesShim extends KubernetesController { return KubernetesContext.getServiceLabels(getConfiguration()); } - /** - * Configures the <code>Pod Spec</code> section of the <code>StatefulSet</code>. The <code>Heron</code> container - * will be configured to allow it to function but other supplied containers are loaded verbatim. - * @param podTemplateSpec The <code>Pod Template Spec</code> section to update. - * @param resource Passed down to configure the resource limits. - * @param numberOfInstances Passed down to configure the ports. - * @param isExecutor Flag used to configure components specific to <code>Executor</code> and <code>Manager</code>. - * @param volumes <code>Volumes</code> generated from configurations options. - * @param volumeMounts <code>Volume Mounts</code> generated from configurations options. - */ - private void configurePodSpec(final V1PodTemplateSpec podTemplateSpec, Resource resource, - int numberOfInstances, boolean isExecutor, List<V1Volume> volumes, - List<V1VolumeMount> volumeMounts) { - if (podTemplateSpec.getSpec() == null) { - podTemplateSpec.setSpec(new V1PodSpec()); - } - final V1PodSpec podSpec = podTemplateSpec.getSpec(); - - // Set the termination period to 0 so pods can be deleted quickly - podSpec.setTerminationGracePeriodSeconds(0L); - - // Set the pod tolerations so pods are rescheduled when nodes go down - // https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/#taint-based-evictions - configureTolerations(podSpec); - - // Get <Heron> container and ignore all others. - final String containerName = - isExecutor ? KubernetesConstants.EXECUTOR_NAME : KubernetesConstants.MANAGER_NAME; - V1Container heronContainer = null; - List<V1Container> containers = podSpec.getContainers(); - if (containers != null) { - for (V1Container container : containers) { - final String name = container.getName(); - if (name != null && name.equals(containerName)) { - if (heronContainer != null) { - throw new TopologySubmissionException( - String.format("Multiple configurations found for '%s' container", containerName)); - } - heronContainer = container; - } - } - } else { - containers = new LinkedList<>(); - } - - if (heronContainer == null) { - heronContainer = new V1Container().name(containerName); - containers.add(heronContainer); - } - - if (!volumes.isEmpty() || !volumeMounts.isEmpty()) { - configurePodWithVolumesAndMountsFromCLI(podSpec, heronContainer, volumes, - volumeMounts); - } - - configureHeronContainer(resource, numberOfInstances, heronContainer, isExecutor); - - podSpec.setContainers(containers); - - mountSecretsAsVolumes(podSpec); - } - - /** - * Adds <code>tolerations</code> to the <code>Pod Spec</code> with Heron's values taking precedence. - * @param spec <code>Pod Spec</code> to be configured. - */ - @VisibleForTesting - protected void configureTolerations(final V1PodSpec spec) { - KubernetesUtils.V1ControllerUtils<V1Toleration> utils = - new KubernetesUtils.V1ControllerUtils<>(); - spec.setTolerations( - utils.mergeListsDedupe(getTolerations(), spec.getTolerations(), - Comparator.comparing(V1Toleration::getKey), "Pod Specification Tolerations") - ); - } - - /** - * Generates a list of <code>tolerations</code> which Heron requires. - * @return A list of configured <code>tolerations</code>. - */ - @VisibleForTesting - protected static List<V1Toleration> getTolerations() { - final List<V1Toleration> tolerations = new ArrayList<>(); - KubernetesConstants.TOLERATIONS.forEach(t -> { - final V1Toleration toleration = - new V1Toleration() - .key(t) - .operator("Exists") - .effect("NoExecute") - .tolerationSeconds(10L); - tolerations.add(toleration); - }); - - return tolerations; - } - - /** - * Adds <code>Volume Mounts</code> for <code>Secrets</code> to a pod. - * @param podSpec <code>Pod Spec</code> to add secrets to. - */ - private void mountSecretsAsVolumes(V1PodSpec podSpec) { - final Config config = getConfiguration(); - final Map<String, String> secrets = KubernetesContext.getPodSecretsToMount(config); - for (Map.Entry<String, String> secret : secrets.entrySet()) { - final V1VolumeMount mount = new V1VolumeMount() - .name(secret.getKey()) - .mountPath(secret.getValue()); - final V1Volume secretVolume = new V1Volume() - .name(secret.getKey()) - .secret(new V1SecretVolumeSourceBuilder() - .withSecretName(secret.getKey()) - .build()); - podSpec.addVolumesItem(secretVolume); - for (V1Container container : podSpec.getContainers()) { - container.addVolumeMountsItem(mount); - } - } - } - - /** - * Configures the <code>Heron</code> container with values for parameters Heron requires for functioning. - * @param resource Resource limits. - * @param numberOfInstances Required number of <code>executor</code> containers which is used to configure ports. - * @param container The <code>executor</code> container to be configured. - * @param isExecutor Flag indicating whether to set a <code>executor</code> or <code>manager</code> command. - */ - private void configureHeronContainer(Resource resource, int numberOfInstances, - final V1Container container, boolean isExecutor) { - final Config configuration = getConfiguration(); - - // Set up the container images. - container.setImage(KubernetesContext.getExecutorDockerImage(configuration)); - - // Set up the container command. - final List<String> command = - getExecutorCommand("$" + ENV_SHARD_ID, numberOfInstances, isExecutor); - container.setCommand(command); - - if (KubernetesContext.hasImagePullPolicy(configuration)) { - container.setImagePullPolicy(KubernetesContext.getKubernetesImagePullPolicy(configuration)); - } - - // Configure environment variables. - configureContainerEnvVars(container); - - // Set secret keys. - setSecretKeyRefs(container); - - // Set container resources - configureContainerResources(container, configuration, resource, isExecutor); - - // Set container ports. - final boolean debuggingEnabled = - TopologyUtils.getTopologyRemoteDebuggingEnabled( - Runtime.topology(getRuntimeConfiguration())); - configureContainerPorts(debuggingEnabled, numberOfInstances, container); - - // setup volume mounts - mountVolumeIfPresent(container); - } - - /** - * Configures the resources in the <code>container</code> with values in the <code>config</code> taking precedence. - * @param container The <code>container</code> to be configured. - * @param configuration The <code>Config</code> object to check if a resource request needs to be set. - * @param resource User defined resources limits from input. - * @param isExecutor Flag to indicate configuration for an <code>executor</code> or <code>manager</code>. - */ - @VisibleForTesting - protected void configureContainerResources(final V1Container container, - final Config configuration, final Resource resource, - boolean isExecutor) { - if (container.getResources() == null) { - container.setResources(new V1ResourceRequirements()); - } - final V1ResourceRequirements resourceRequirements = container.getResources(); - - // Collect Limits and Requests from CLI. - final Map<String, Quantity> limitsCLI = createResourcesRequirement( - KubernetesContext.getResourceLimits(configuration, isExecutor)); - final Map<String, Quantity> requestsCLI = createResourcesRequirement( - KubernetesContext.getResourceRequests(configuration, isExecutor)); - - if (resourceRequirements.getLimits() == null) { - resourceRequirements.setLimits(new HashMap<>()); - } - - // Set Limits and Resources from CLI <if> available, <else> use Configs. Deduplicate on name - // with precedence [1] CLI, [2] Config. - final Map<String, Quantity> limits = resourceRequirements.getLimits(); - final Quantity limitCPU = limitsCLI.getOrDefault(KubernetesConstants.CPU, - Quantity.fromString(Double.toString(KubernetesUtils.roundDecimal(resource.getCpu(), 3)))); - final Quantity limitMEMORY = limitsCLI.getOrDefault(KubernetesConstants.MEMORY, - Quantity.fromString(KubernetesUtils.Megabytes(resource.getRam()))); - - limits.put(KubernetesConstants.MEMORY, limitMEMORY); - limits.put(KubernetesConstants.CPU, limitCPU); - - // Set the Kubernetes container resource request. - // Order: [1] CLI, [2] EQUAL_TO_LIMIT, [3] NOT_SET - KubernetesContext.KubernetesResourceRequestMode requestMode = - KubernetesContext.getKubernetesRequestMode(configuration); - if (!requestsCLI.isEmpty()) { - if (resourceRequirements.getRequests() == null) { - resourceRequirements.setRequests(new HashMap<>()); - } - final Map<String, Quantity> requests = resourceRequirements.getRequests(); - - if (requestsCLI.containsKey(KubernetesConstants.MEMORY)) { - requests.put(KubernetesConstants.MEMORY, requestsCLI.get(KubernetesConstants.MEMORY)); - } - if (requestsCLI.containsKey(KubernetesConstants.CPU)) { - requests.put(KubernetesConstants.CPU, requestsCLI.get(KubernetesConstants.CPU)); - } - } else if (requestMode == KubernetesContext.KubernetesResourceRequestMode.EQUAL_TO_LIMIT) { - LOG.log(Level.CONFIG, "Setting K8s Request equal to Limit"); - resourceRequirements.setRequests(limits); - } else { - LOG.log(Level.CONFIG, "Not setting K8s request because config was NOT_SET"); - } - container.setResources(resourceRequirements); - } - - /** - * Creates <code>Resource Requirements</code> from a Map of <code>Config</code> items for <code>CPU</code> - * and <code>Memory</code>. - * @param configs <code>Configs</code> to be parsed for configuration. - * @return Configured <code>Resource Requirements</code>. An <code>empty</code> map will be returned - * if there are no <code>configs</code>. - */ - @VisibleForTesting - protected Map<String, Quantity> createResourcesRequirement(Map<String, String> configs) { - final Map<String, Quantity> requirements = new HashMap<>(); - - if (configs == null || configs.isEmpty()) { - return requirements; - } - - final String memoryLimit = configs.get(KubernetesConstants.MEMORY); - if (memoryLimit != null && !memoryLimit.isEmpty()) { - requirements.put(KubernetesConstants.MEMORY, Quantity.fromString(memoryLimit)); - } - final String cpuLimit = configs.get(KubernetesConstants.CPU); - if (cpuLimit != null && !cpuLimit.isEmpty()) { - requirements.put(KubernetesConstants.CPU, Quantity.fromString(cpuLimit)); - } - - return requirements; - } - - /** - * Configures the environment variables in the <code>container</code> with those Heron requires. - * Heron's values take precedence. - * @param container The <code>container</code> to be configured. - */ - @VisibleForTesting - protected void configureContainerEnvVars(final V1Container container) { - // Deduplicate on var name with Heron defaults take precedence. - KubernetesUtils.V1ControllerUtils<V1EnvVar> utils = new KubernetesUtils.V1ControllerUtils<>(); - container.setEnv( - utils.mergeListsDedupe(getExecutorEnvVars(), container.getEnv(), - Comparator.comparing(V1EnvVar::getName), "Pod Template Environment Variables") - ); - } - - /** - * Generates a list of <code>Environment Variables</code> required by Heron to function. - * @return A list of configured <code>Environment Variables</code> required by Heron to function. - */ - @VisibleForTesting - protected static List<V1EnvVar> getExecutorEnvVars() { - final V1EnvVar envVarHost = new V1EnvVar(); - envVarHost.name(KubernetesConstants.ENV_HOST) - .valueFrom(new V1EnvVarSource() - .fieldRef(new V1ObjectFieldSelector() - .fieldPath(KubernetesConstants.POD_IP))); - - final V1EnvVar envVarPodName = new V1EnvVar(); - envVarPodName.name(KubernetesConstants.ENV_POD_NAME) - .valueFrom(new V1EnvVarSource() - .fieldRef(new V1ObjectFieldSelector() - .fieldPath(KubernetesConstants.POD_NAME))); - - return Arrays.asList(envVarHost, envVarPodName); - } - - /** - * Configures the ports in the <code>container</code> with those Heron requires. Heron's values take precedence. - * @param remoteDebugEnabled Flag used to indicate if debugging ports need to be added. - * @param numberOfInstances The number of debugging ports to be opened. - * @param container <code>container</code> to be configured. - */ - @VisibleForTesting - protected void configureContainerPorts(boolean remoteDebugEnabled, int numberOfInstances, - final V1Container container) { - List<V1ContainerPort> ports = new ArrayList<>(getExecutorPorts()); - - if (remoteDebugEnabled) { - ports.addAll(getDebuggingPorts(numberOfInstances)); - } - - // Set container ports. Deduplicate using port number with Heron defaults taking precedence. - KubernetesUtils.V1ControllerUtils<V1ContainerPort> utils = - new KubernetesUtils.V1ControllerUtils<>(); - container.setPorts( - utils.mergeListsDedupe(getExecutorPorts(), container.getPorts(), - Comparator.comparing(V1ContainerPort::getContainerPort), "Pod Template Ports") - ); - } - - /** - * Generates a list of <code>ports</code> required by Heron to function. - * @return A list of configured <code>ports</code> required by Heron to function. - */ - @VisibleForTesting - protected static List<V1ContainerPort> getExecutorPorts() { - List<V1ContainerPort> ports = new LinkedList<>(); - KubernetesConstants.EXECUTOR_PORTS.forEach((p, v) -> { - final V1ContainerPort port = new V1ContainerPort() - .name(p.getName()) - .containerPort(v); - ports.add(port); - }); - return ports; - } - - /** - * Generate the debugging ports required by Heron. - * @param numberOfInstances The number of debugging ports to generate. - * @return A list of configured debugging <code>ports</code>. - */ - @VisibleForTesting - protected static List<V1ContainerPort> getDebuggingPorts(int numberOfInstances) { - List<V1ContainerPort> ports = new LinkedList<>(); - IntStream.range(0, numberOfInstances).forEach(i -> { - final String portName = - KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + i; - final V1ContainerPort port = new V1ContainerPort() - .name(portName) - .containerPort(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i); - ports.add(port); - }); - return ports; - } - - /** - * Adds volume mounts to the <code>container</code> that Heron requires. Heron's values taking precedence. - * @param container <code>container</code> to be configured. - */ - @VisibleForTesting - protected void mountVolumeIfPresent(final V1Container container) { - final Config config = getConfiguration(); - if (KubernetesContext.hasContainerVolume(config)) { - final V1VolumeMount mount = - new V1VolumeMount() - .name(KubernetesContext.getContainerVolumeName(config)) - .mountPath(KubernetesContext.getContainerVolumeMountPath(config)); - - // Merge volume mounts. Deduplicate using mount's name with Heron defaults taking precedence. - KubernetesUtils.V1ControllerUtils<V1VolumeMount> utils = - new KubernetesUtils.V1ControllerUtils<>(); - container.setVolumeMounts( - utils.mergeListsDedupe(Collections.singletonList(mount), container.getVolumeMounts(), - Comparator.comparing(V1VolumeMount::getName), "Pod Template Volume Mounts") - ); - } - } - - /** - * Adds <code>Secret Key</code> references to a <code>container</code>. - * @param container <code>container</code> to be configured. - */ - private void setSecretKeyRefs(V1Container container) { - final Config config = getConfiguration(); - final Map<String, String> podSecretKeyRefs = KubernetesContext.getPodSecretKeyRefs(config); - for (Map.Entry<String, String> secret : podSecretKeyRefs.entrySet()) { - final String[] keyRefParts = secret.getValue().split(":"); - if (keyRefParts.length != 2) { - LOG.log(Level.SEVERE, - "SecretKeyRef must be in the form name:key. <" + secret.getValue() + ">"); - throw new TopologyRuntimeManagementException( - "SecretKeyRef must be in the form name:key. <" + secret.getValue() + ">"); - } - String name = keyRefParts[0]; - String key = keyRefParts[1]; - final V1EnvVar envVar = new V1EnvVar() - .name(secret.getKey()) - .valueFrom(new V1EnvVarSource() - .secretKeyRef(new V1SecretKeySelector() - .key(key) - .name(name))); - container.addEnvItem(envVar); - } - } - /** * Initiates the process of locating and loading <code>Pod Template</code> from a <code>ConfigMap</code>. * The loaded text is then parsed into a usable <code>Pod Template</code>. @@ -1106,151 +531,6 @@ public class KubernetesShim extends KubernetesController { } } - /** - * Generates <code>Persistent Volume Claims Templates</code> from a mapping of <code>Volumes</code> - * to <code>key-value</code> pairs of configuration options and values. - * @param mapOfOpts <code>Volume</code> to configuration <code>key-value</code> mappings. - * @return Fully populated list of only dynamically backed <code>Persistent Volume Claims</code>. - */ - @VisibleForTesting - protected List<V1PersistentVolumeClaim> createPersistentVolumeClaims( - final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts) { - - List<V1PersistentVolumeClaim> listOfPVCs = new LinkedList<>(); - - // Iterate over all the PVC Volumes. - for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> pvc - : mapOfOpts.entrySet()) { - - // Only create claims for `OnDemand` volumes. - final String claimName = pvc.getValue().get(KubernetesConstants.VolumeConfigKeys.claimName); - if (claimName != null && !KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) { - continue; - } - - listOfPVCs.add(Volumes.get() - .createPersistentVolumeClaim(pvc.getKey(), - getPersistentVolumeClaimLabels(getTopologyName()), pvc.getValue())); - } - return listOfPVCs; - } - - /** - * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for <code>Persistent Volume Claims</code>s - * to be placed in the <code>Executor</code> and <code>Manager</code> from options on the CLI. - * @param mapConfig Mapping of <code>Volume</code> option <code>key-value</code> configuration pairs. - * @param volumes A list of <code>Volume</code> to append to. - * @param volumeMounts A list of <code>Volume Mounts</code> to append to. - */ - @VisibleForTesting - protected void createVolumeAndMountsPersistentVolumeClaimCLI( - final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapConfig, - final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) { - for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configs - : mapConfig.entrySet()) { - final String volumeName = configs.getKey(); - - // Do not create Volumes for `OnDemand`. - final String claimName = configs.getValue() - .get(KubernetesConstants.VolumeConfigKeys.claimName); - if (claimName != null && !KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) { - volumes.add(Volumes.get().createPersistentVolumeClaim(claimName, volumeName)); - } - volumeMounts.add(Volumes.get().createMount(volumeName, configs.getValue())); - } - } - - /** - * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for <code>emptyDir</code>s to be - * placed in the <code>Executor</code> and <code>Manager</code> from options on the CLI. - * @param mapOfOpts Mapping of <code>Volume</code> option <code>key-value</code> configuration pairs. - * @param volumes A list of <code>Volume</code> to append to. - * @param volumeMounts A list of <code>Volume Mounts</code> to append to. - */ - @VisibleForTesting - protected void createVolumeAndMountsEmptyDirCLI( - final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts, - final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) { - for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configs - : mapOfOpts.entrySet()) { - final String volumeName = configs.getKey(); - final V1Volume volume = Volumes.get() - .createVolume(Volumes.VolumeType.EmptyDir, volumeName, configs.getValue()); - volumes.add(volume); - volumeMounts.add(Volumes.get().createMount(volumeName, configs.getValue())); - } - } - - /** - * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for <code>Host Path</code>s to be - * placed in the <code>Executor</code> and <code>Manager</code> from options on the CLI. - * @param mapOfOpts Mapping of <code>Volume</code> option <code>key-value</code> configuration pairs. - * @param volumes A list of <code>Volume</code> to append to. - * @param volumeMounts A list of <code>Volume Mounts</code> to append to. - */ - @VisibleForTesting - protected void createVolumeAndMountsHostPathCLI( - final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts, - final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) { - for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configs - : mapOfOpts.entrySet()) { - final String volumeName = configs.getKey(); - final V1Volume volume = Volumes.get() - .createVolume(Volumes.VolumeType.HostPath, volumeName, configs.getValue()); - volumes.add(volume); - volumeMounts.add(Volumes.get().createMount(volumeName, configs.getValue())); - } - } - - /** - * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for <code>NFS</code>s to be - * placed in the <code>Executor</code> and <code>Manager</code> from options on the CLI. - * @param mapOfOpts Mapping of <code>Volume</code> option <code>key-value</code> configuration pairs. - * @param volumes A list of <code>Volume</code> to append to. - * @param volumeMounts A list of <code>Volume Mounts</code> to append to. - */ - @VisibleForTesting - protected void createVolumeAndMountsNFSCLI( - final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts, - final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) { - for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configs - : mapOfOpts.entrySet()) { - final String volumeName = configs.getKey(); - final V1Volume volume = Volumes.get() - .createVolume(Volumes.VolumeType.NetworkFileSystem, volumeName, configs.getValue()); - volumes.add(volume); - volumeMounts.add(Volumes.get().createMount(volumeName, configs.getValue())); - } - } - - /** - * Configures the Pod Spec and Heron container with <code>Volumes</code> and <code>Volume Mounts</code>. - * @param podSpec All generated <code>V1Volume</code> will be placed in the <code>Pod Spec</code>. - * @param executor All generated <code>V1VolumeMount</code> will be placed in the <code>Container</code>. - * @param volumes <code>Volumes</code> to be inserted in the Pod Spec. - * @param volumeMounts <code>Volumes Mounts</code> to be inserted in the Heron container. - */ - @VisibleForTesting - protected void configurePodWithVolumesAndMountsFromCLI(final V1PodSpec podSpec, - final V1Container executor, List<V1Volume> volumes, List<V1VolumeMount> volumeMounts) { - - // Deduplicate on Names with Persistent Volume Claims taking precedence. - - KubernetesUtils.V1ControllerUtils<V1Volume> utilsVolumes = - new KubernetesUtils.V1ControllerUtils<>(); - podSpec.setVolumes( - utilsVolumes.mergeListsDedupe(volumes, podSpec.getVolumes(), - Comparator.comparing(V1Volume::getName), - "Pod with Volumes")); - - KubernetesUtils.V1ControllerUtils<V1VolumeMount> utilsMounts = - new KubernetesUtils.V1ControllerUtils<>(); - executor.setVolumeMounts( - utilsMounts.mergeListsDedupe(volumeMounts, executor.getVolumeMounts(), - Comparator.comparing(V1VolumeMount::getName), - "Heron container with Volume Mounts")); - } - /** * Removes all Persistent Volume Claims associated with a specific topology, if they exist. * It looks for the following: diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java index d713d823466..ec1be4e87a9 100644 --- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java +++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java @@ -19,14 +19,8 @@ package org.apache.heron.scheduler.kubernetes; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; - -import com.google.common.collect.ImmutableMap; import org.junit.Assert; import org.junit.Rule; @@ -36,37 +30,16 @@ import org.junit.runner.RunWith; import org.mockito.Spy; import org.mockito.runners.MockitoJUnitRunner; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.heron.common.basics.ByteAmount; import org.apache.heron.common.basics.Pair; import org.apache.heron.scheduler.TopologySubmissionException; import org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple; import org.apache.heron.spi.common.Config; import org.apache.heron.spi.common.Key; -import org.apache.heron.spi.packing.Resource; -import io.kubernetes.client.custom.Quantity; import io.kubernetes.client.openapi.models.V1ConfigMap; import io.kubernetes.client.openapi.models.V1ConfigMapBuilder; -import io.kubernetes.client.openapi.models.V1Container; -import io.kubernetes.client.openapi.models.V1ContainerBuilder; -import io.kubernetes.client.openapi.models.V1ContainerPort; -import io.kubernetes.client.openapi.models.V1EnvVar; -import io.kubernetes.client.openapi.models.V1EnvVarSource; -import io.kubernetes.client.openapi.models.V1ObjectFieldSelector; -import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim; -import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimBuilder; -import io.kubernetes.client.openapi.models.V1PodSpec; -import io.kubernetes.client.openapi.models.V1PodSpecBuilder; import io.kubernetes.client.openapi.models.V1PodTemplateSpec; -import io.kubernetes.client.openapi.models.V1ResourceRequirements; -import io.kubernetes.client.openapi.models.V1Toleration; -import io.kubernetes.client.openapi.models.V1Volume; -import io.kubernetes.client.openapi.models.V1VolumeBuilder; -import io.kubernetes.client.openapi.models.V1VolumeMount; -import io.kubernetes.client.openapi.models.V1VolumeMountBuilder; -import static org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeConfigKeys; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; @@ -443,843 +416,4 @@ public class KubernetesShimTest { KubernetesShim kubernetesShim = new KubernetesShim(testConfig, RUNTIME); kubernetesShim.getPodTemplateLocation(true); } - - @Test - public void testConfigureContainerPorts() { - final String portNamekept = "random-port-to-be-kept"; - final int portNumberkept = 1111; - final int numInstances = 3; - final List<V1ContainerPort> expectedPortsBase = - Collections.unmodifiableList(KubernetesShim.getExecutorPorts()); - final List<V1ContainerPort> debugPorts = - Collections.unmodifiableList(KubernetesShim.getDebuggingPorts(numInstances)); - final List<V1ContainerPort> inputPortsBase = Collections.unmodifiableList( - Arrays.asList( - new V1ContainerPort() - .name("server-port-to-replace").containerPort(KubernetesConstants.SERVER_PORT), - new V1ContainerPort() - .name("shell-port-to-replace").containerPort(KubernetesConstants.SHELL_PORT), - new V1ContainerPort().name(portNamekept).containerPort(portNumberkept) - ) - ); - - // Null ports. This is the default case. - final V1Container inputContainerWithNullPorts = new V1ContainerBuilder().build(); - v1ControllerWithPodTemplate.configureContainerPorts(false, 0, inputContainerWithNullPorts); - Assert.assertTrue("Server and/or shell PORTS for container with null ports list", - CollectionUtils.containsAll(inputContainerWithNullPorts.getPorts(), expectedPortsBase)); - - // Empty ports. - final V1Container inputContainerWithEmptyPorts = new V1ContainerBuilder() - .withPorts(new LinkedList<>()) - .build(); - v1ControllerWithPodTemplate.configureContainerPorts(false, 0, inputContainerWithEmptyPorts); - Assert.assertTrue("Server and/or shell PORTS for container with empty ports list", - CollectionUtils.containsAll(inputContainerWithEmptyPorts.getPorts(), expectedPortsBase)); - - // Port overriding. - final List<V1ContainerPort> inputPorts = new LinkedList<>(inputPortsBase); - final V1Container inputContainerWithPorts = new V1ContainerBuilder() - .withPorts(inputPorts) - .build(); - final List<V1ContainerPort> expectedPortsOverriding = new LinkedList<>(expectedPortsBase); - expectedPortsOverriding - .add(new V1ContainerPort().name(portNamekept).containerPort(portNumberkept)); - - v1ControllerWithPodTemplate.configureContainerPorts(false, 0, inputContainerWithPorts); - Assert.assertTrue("Server and/or shell PORTS for container should be overwritten.", - CollectionUtils.containsAll(inputContainerWithPorts.getPorts(), expectedPortsOverriding)); - - // Port overriding with debug ports. - final List<V1ContainerPort> inputPortsWithDebug = new LinkedList<>(debugPorts); - inputPortsWithDebug.addAll(inputPortsBase); - final V1Container inputContainerWithDebug = new V1ContainerBuilder() - .withPorts(inputPortsWithDebug) - .build(); - final List<V1ContainerPort> expectedPortsDebug = new LinkedList<>(expectedPortsBase); - expectedPortsDebug.add(new V1ContainerPort().name(portNamekept).containerPort(portNumberkept)); - expectedPortsDebug.addAll(debugPorts); - - v1ControllerWithPodTemplate.configureContainerPorts( - true, numInstances, inputContainerWithDebug); - Assert.assertTrue("Server and/or shell with debug PORTS for container should be overwritten.", - CollectionUtils.containsAll(inputContainerWithDebug.getPorts(), expectedPortsDebug)); - } - - @Test - public void testConfigureContainerEnvVars() { - final List<V1EnvVar> heronEnvVars = - Collections.unmodifiableList(KubernetesShim.getExecutorEnvVars()); - final V1EnvVar additionEnvVar = new V1EnvVar() - .name("env-variable-to-be-kept") - .valueFrom(new V1EnvVarSource() - .fieldRef(new V1ObjectFieldSelector() - .fieldPath("env-variable-was-kept"))); - final List<V1EnvVar> inputEnvVars = Arrays.asList( - new V1EnvVar() - .name(KubernetesConstants.ENV_HOST) - .valueFrom(new V1EnvVarSource() - .fieldRef(new V1ObjectFieldSelector() - .fieldPath("env-host-to-be-replaced"))), - new V1EnvVar() - .name(KubernetesConstants.ENV_POD_NAME) - .valueFrom(new V1EnvVarSource() - .fieldRef(new V1ObjectFieldSelector() - .fieldPath("pod-name-to-be-replaced"))), - additionEnvVar - ); - - // Null env vars. This is the default case. - V1Container containerWithNullEnvVars = new V1ContainerBuilder().build(); - v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithNullEnvVars); - Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with null Env Vars should match", - CollectionUtils.containsAll(containerWithNullEnvVars.getEnv(), heronEnvVars)); - - // Empty env vars. - V1Container containerWithEmptyEnvVars = new V1ContainerBuilder() - .withEnv(new LinkedList<>()) - .build(); - v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithEmptyEnvVars); - Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with empty Env Vars should match", - CollectionUtils.containsAll(containerWithEmptyEnvVars.getEnv(), heronEnvVars)); - - // Env Var overriding. - final List<V1EnvVar> expectedOverriding = new LinkedList<>(heronEnvVars); - expectedOverriding.add(additionEnvVar); - V1Container containerWithEnvVars = new V1ContainerBuilder() - .withEnv(inputEnvVars) - .build(); - v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithEnvVars); - Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with Env Vars should be overridden", - CollectionUtils.containsAll(containerWithEnvVars.getEnv(), expectedOverriding)); - } - - @Test - public void testConfigureContainerResources() { - final boolean isExecutor = true; - - final Resource resourceDefault = new Resource( - 9, ByteAmount.fromMegabytes(19000), ByteAmount.fromMegabytes(99000)); - final Resource resourceCustom = new Resource( - 4, ByteAmount.fromMegabytes(34000), ByteAmount.fromMegabytes(400000)); - - final Quantity defaultRAM = Quantity.fromString( - KubernetesUtils.Megabytes(resourceDefault.getRam())); - final Quantity defaultCPU = Quantity.fromString( - Double.toString(KubernetesUtils.roundDecimal(resourceDefault.getCpu(), 3))); - final Quantity customRAM = Quantity.fromString( - KubernetesUtils.Megabytes(resourceCustom.getRam())); - final Quantity customCPU = Quantity.fromString( - Double.toString(KubernetesUtils.roundDecimal(resourceCustom.getCpu(), 3))); - final Quantity customDisk = Quantity.fromString( - KubernetesUtils.Megabytes(resourceCustom.getDisk())); - - final Config configNoLimit = Config.newBuilder() - .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "NOT_SET") - .build(); - final Config configWithLimit = Config.newBuilder() - .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "EQUAL_TO_LIMIT") - .build(); - - final V1ResourceRequirements expectDefaultRequirements = new V1ResourceRequirements() - .putLimitsItem(KubernetesConstants.MEMORY, defaultRAM) - .putLimitsItem(KubernetesConstants.CPU, defaultCPU); - - final V1ResourceRequirements expectCustomRequirements = new V1ResourceRequirements() - .putLimitsItem(KubernetesConstants.MEMORY, defaultRAM) - .putLimitsItem(KubernetesConstants.CPU, defaultCPU) - .putLimitsItem("disk", customDisk); - - final V1ResourceRequirements customRequirements = new V1ResourceRequirements() - .putLimitsItem(KubernetesConstants.MEMORY, customRAM) - .putLimitsItem(KubernetesConstants.CPU, customCPU) - .putLimitsItem("disk", customDisk); - - // Default. Null resources. - V1Container containerNull = new V1ContainerBuilder().build(); - v1ControllerWithPodTemplate.configureContainerResources( - containerNull, configNoLimit, resourceDefault, isExecutor); - Assert.assertTrue("Default LIMITS should be set in container with null LIMITS", - containerNull.getResources().getLimits().entrySet() - .containsAll(expectDefaultRequirements.getLimits().entrySet())); - - // Empty resources. - V1Container containerEmpty = new V1ContainerBuilder().withNewResources().endResources().build(); - v1ControllerWithPodTemplate.configureContainerResources( - containerEmpty, configNoLimit, resourceDefault, isExecutor); - Assert.assertTrue("Default LIMITS should be set in container with empty LIMITS", - containerNull.getResources().getLimits().entrySet() - .containsAll(expectDefaultRequirements.getLimits().entrySet())); - - // Custom resources. - V1Container containerCustom = new V1ContainerBuilder() - .withResources(customRequirements) - .build(); - v1ControllerWithPodTemplate.configureContainerResources( - containerCustom, configNoLimit, resourceDefault, isExecutor); - Assert.assertTrue("Custom LIMITS should be set in container with custom LIMITS", - containerCustom.getResources().getLimits().entrySet() - .containsAll(expectCustomRequirements.getLimits().entrySet())); - - // Custom resources with request. - V1Container containerRequests = new V1ContainerBuilder() - .withResources(customRequirements) - .build(); - v1ControllerWithPodTemplate.configureContainerResources( - containerRequests, configWithLimit, resourceDefault, isExecutor); - Assert.assertTrue("Custom LIMITS should be set in container with custom LIMITS and REQUEST", - containerRequests.getResources().getLimits().entrySet() - .containsAll(expectCustomRequirements.getLimits().entrySet())); - Assert.assertTrue("Custom REQUEST should be set in container with custom LIMITS and REQUEST", - containerRequests.getResources().getRequests().entrySet() - .containsAll(expectCustomRequirements.getLimits().entrySet())); - } - - @Test - public void testConfigureContainerResourcesCLI() { - final boolean isExecutor = true; - final String customLimitMEMStr = "120Gi"; - final String customLimitCPUStr = "5"; - final String customRequestMEMStr = "100Mi"; - final String customRequestCPUStr = "4"; - - final Resource resources = new Resource( - 6, ByteAmount.fromMegabytes(34000), ByteAmount.fromGigabytes(400)); - - final Quantity customLimitMEM = Quantity.fromString(customLimitMEMStr); - final Quantity customLimitCPU = Quantity.fromString(customLimitCPUStr); - final Quantity customRequestMEM = Quantity.fromString(customRequestMEMStr); - final Quantity customRequestCPU = Quantity.fromString(customRequestCPUStr); - - final Config config = Config.newBuilder() - .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_LIMITS_PREFIX - + KubernetesConstants.CPU, KubernetesConstants.EXECUTOR_NAME), customLimitCPUStr) - .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_LIMITS_PREFIX - + KubernetesConstants.MEMORY, KubernetesConstants.EXECUTOR_NAME), customLimitMEMStr) - .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_REQUESTS_PREFIX - + KubernetesConstants.CPU, KubernetesConstants.EXECUTOR_NAME), customRequestCPUStr) - .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_REQUESTS_PREFIX - + KubernetesConstants.MEMORY, KubernetesConstants.EXECUTOR_NAME), - customRequestMEMStr) - .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "EQUAL_TO_LIMIT") - .build(); - - final V1Container expected = new V1ContainerBuilder() - .withNewResources() - .addToLimits(KubernetesConstants.CPU, customLimitCPU) - .addToLimits(KubernetesConstants.MEMORY, customLimitMEM) - .addToRequests(KubernetesConstants.CPU, customRequestCPU) - .addToRequests(KubernetesConstants.MEMORY, customRequestMEM) - .endResources() - .build(); - - final V1Container actual = new V1Container(); - v1ControllerWithPodTemplate.configureContainerResources(actual, config, resources, isExecutor); - Assert.assertEquals("Container Resources are set from CLI.", expected, actual); - } - - @Test - public void testMountVolumeIfPresent() { - final String pathDefault = "config-host-volume-path"; - final String pathNameDefault = "config-host-volume-name"; - final Config configWithVolumes = Config.newBuilder() - .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME, pathNameDefault) - .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH, pathDefault) - .build(); - final KubernetesShim controllerWithMounts = new KubernetesShim(configWithVolumes, RUNTIME); - final V1VolumeMount volumeDefault = new V1VolumeMountBuilder() - .withName(pathNameDefault) - .withMountPath(pathDefault) - .build(); - final V1VolumeMount volumeCustom = new V1VolumeMountBuilder() - .withName("custom-volume-mount") - .withMountPath("should-be-kept") - .build(); - - final List<V1VolumeMount> expectedMountsDefault = Collections.singletonList(volumeDefault); - final List<V1VolumeMount> expectedMountsCustom = Arrays.asList(volumeCustom, volumeDefault); - final List<V1VolumeMount> volumeMountsCustomList = Arrays.asList( - new V1VolumeMountBuilder() - .withName(pathNameDefault) - .withMountPath("should-be-replaced") - .build(), - volumeCustom - ); - - // No Volume Mounts set. - KubernetesShim controllerDoNotSetMounts = - new KubernetesShim(Config.newBuilder().build(), RUNTIME); - V1Container containerNoSetMounts = new V1Container(); - controllerDoNotSetMounts.mountVolumeIfPresent(containerNoSetMounts); - Assert.assertNull(containerNoSetMounts.getVolumeMounts()); - - // Default. Null Volume Mounts. - V1Container containerNull = new V1ContainerBuilder().build(); - controllerWithMounts.mountVolumeIfPresent(containerNull); - Assert.assertTrue("Default VOLUME MOUNTS should be set in container with null VOLUME MOUNTS", - CollectionUtils.containsAll(expectedMountsDefault, containerNull.getVolumeMounts())); - - // Empty Volume Mounts. - V1Container containerEmpty = new V1ContainerBuilder() - .withVolumeMounts(new LinkedList<>()) - .build(); - controllerWithMounts.mountVolumeIfPresent(containerEmpty); - Assert.assertTrue("Default VOLUME MOUNTS should be set in container with empty VOLUME MOUNTS", - CollectionUtils.containsAll(expectedMountsDefault, containerEmpty.getVolumeMounts())); - - // Custom Volume Mounts. - V1Container containerCustom = new V1ContainerBuilder() - .withVolumeMounts(volumeMountsCustomList) - .build(); - controllerWithMounts.mountVolumeIfPresent(containerCustom); - Assert.assertTrue("Default VOLUME MOUNTS should be set in container with custom VOLUME MOUNTS", - CollectionUtils.containsAll(expectedMountsCustom, containerCustom.getVolumeMounts())); - } - - @Test - public void testConfigureTolerations() { - final V1Toleration keptToleration = new V1Toleration() - .key("kept toleration") - .operator("Some Operator") - .effect("Some Effect") - .tolerationSeconds(5L); - final List<V1Toleration> expectedTolerationBase = - Collections.unmodifiableList(KubernetesShim.getTolerations()); - final List<V1Toleration> inputTolerationsBase = Collections.unmodifiableList( - Arrays.asList( - new V1Toleration() - .key(KubernetesConstants.TOLERATIONS.get(0)).operator("replace").effect("replace"), - new V1Toleration() - .key(KubernetesConstants.TOLERATIONS.get(1)).operator("replace").effect("replace"), - keptToleration - ) - ); - - // Null Tolerations. This is the default case. - final V1PodSpec podSpecNullTolerations = new V1PodSpecBuilder().build(); - v1ControllerWithPodTemplate.configureTolerations(podSpecNullTolerations); - Assert.assertTrue("Pod Spec has null TOLERATIONS and should be set to Heron's defaults", - CollectionUtils.containsAll(podSpecNullTolerations.getTolerations(), - expectedTolerationBase)); - - // Empty Tolerations. - final V1PodSpec podSpecWithEmptyTolerations = new V1PodSpecBuilder() - .withTolerations(new LinkedList<>()) - .build(); - v1ControllerWithPodTemplate.configureTolerations(podSpecWithEmptyTolerations); - Assert.assertTrue("Pod Spec has empty TOLERATIONS and should be set to Heron's defaults", - CollectionUtils.containsAll(podSpecWithEmptyTolerations.getTolerations(), - expectedTolerationBase)); - - // Toleration overriding. - final V1PodSpec podSpecWithTolerations = new V1PodSpecBuilder() - .withTolerations(inputTolerationsBase) - .build(); - final List<V1Toleration> expectedTolerationsOverriding = - new LinkedList<>(expectedTolerationBase); - expectedTolerationsOverriding.add(keptToleration); - - v1ControllerWithPodTemplate.configureTolerations(podSpecWithTolerations); - Assert.assertTrue("Pod Spec has TOLERATIONS and should be overridden with Heron's defaults", - CollectionUtils.containsAll(podSpecWithTolerations.getTolerations(), - expectedTolerationsOverriding)); - } - - @Test - public void testCreatePersistentVolumeClaims() { - final String topologyName = "topology-name"; - final String volumeNameOne = "volume-name-one"; - final String volumeNameTwo = "volume-name-two"; - final String volumeNameStatic = "volume-name-static"; - final String claimNameOne = "OnDemand"; - final String claimNameTwo = "claim-name-two"; - final String claimNameStatic = "OnDEmaND"; - final String storageClassName = "storage-class-name"; - final String sizeLimit = "555Gi"; - final String accessModesList = "ReadWriteOnce,ReadOnlyMany,ReadWriteMany"; - final String accessModes = "ReadOnlyMany"; - final String volumeMode = "VolumeMode"; - final String path = "/path/to/mount/"; - final String subPath = "/sub/path/to/mount/"; - final Map<String, Map<VolumeConfigKeys, String>> mapPVCOpts = - ImmutableMap.of( - volumeNameOne, new HashMap<VolumeConfigKeys, String>() { - { - put(VolumeConfigKeys.claimName, claimNameOne); - put(VolumeConfigKeys.storageClassName, storageClassName); - put(VolumeConfigKeys.sizeLimit, sizeLimit); - put(VolumeConfigKeys.accessModes, accessModesList); - put(VolumeConfigKeys.volumeMode, volumeMode); - put(VolumeConfigKeys.path, path); - } - }, - volumeNameTwo, new HashMap<VolumeConfigKeys, String>() { - { - put(VolumeConfigKeys.claimName, claimNameTwo); - put(VolumeConfigKeys.storageClassName, storageClassName); - put(VolumeConfigKeys.sizeLimit, sizeLimit); - put(VolumeConfigKeys.accessModes, accessModes); - put(VolumeConfigKeys.volumeMode, volumeMode); - put(VolumeConfigKeys.path, path); - put(VolumeConfigKeys.subPath, subPath); - } - }, - volumeNameStatic, new HashMap<VolumeConfigKeys, String>() { - { - put(VolumeConfigKeys.claimName, claimNameStatic); - put(VolumeConfigKeys.sizeLimit, sizeLimit); - put(VolumeConfigKeys.accessModes, accessModes); - put(VolumeConfigKeys.volumeMode, volumeMode); - put(VolumeConfigKeys.path, path); - put(VolumeConfigKeys.subPath, subPath); - } - } - ); - - final V1PersistentVolumeClaim claimOne = new V1PersistentVolumeClaimBuilder() - .withNewMetadata() - .withName(volumeNameOne) - .withLabels(KubernetesShim.getPersistentVolumeClaimLabels(topologyName)) - .endMetadata() - .withNewSpec() - .withStorageClassName(storageClassName) - .withAccessModes(Arrays.asList(accessModesList.split(","))) - .withVolumeMode(volumeMode) - .withNewResources() - .addToRequests("storage", new Quantity(sizeLimit)) - .endResources() - .endSpec() - .build(); - - final V1PersistentVolumeClaim claimStatic = new V1PersistentVolumeClaimBuilder() - .withNewMetadata() - .withName(volumeNameStatic) - .withLabels(KubernetesShim.getPersistentVolumeClaimLabels(topologyName)) - .endMetadata() - .withNewSpec() - .withStorageClassName("") - .withAccessModes(Collections.singletonList(accessModes)) - .withVolumeMode(volumeMode) - .withNewResources() - .addToRequests("storage", new Quantity(sizeLimit)) - .endResources() - .endSpec() - .build(); - - final List<V1PersistentVolumeClaim> expectedClaims = - new LinkedList<>(Arrays.asList(claimOne, claimStatic)); - - final List<V1PersistentVolumeClaim> actualClaims = - v1ControllerWithPodTemplate.createPersistentVolumeClaims(mapPVCOpts); - - Assert.assertEquals("Generated claim sizes match", expectedClaims.size(), actualClaims.size()); - Assert.assertTrue(expectedClaims.containsAll(actualClaims)); - } - - @Test - public void testCreatePersistentVolumeClaimVolumesAndMounts() { - final String volumeNameOne = "VolumeNameONE"; - final String volumeNameTwo = "VolumeNameTWO"; - final String claimNameOne = "claim-name-one"; - final String claimNameTwo = "OnDemand"; - final String mountPathOne = "/mount/path/ONE"; - final String mountPathTwo = "/mount/path/TWO"; - final String mountSubPathTwo = "/mount/sub/path/TWO"; - Map<String, Map<VolumeConfigKeys, String>> mapOfOpts = - ImmutableMap.of( - volumeNameOne, ImmutableMap.of( - VolumeConfigKeys.claimName, claimNameOne, - VolumeConfigKeys.path, mountPathOne), - volumeNameTwo, ImmutableMap.of( - VolumeConfigKeys.claimName, claimNameTwo, - VolumeConfigKeys.path, mountPathTwo, - VolumeConfigKeys.subPath, mountSubPathTwo) - ); - final V1Volume volumeOne = new V1VolumeBuilder() - .withName(volumeNameOne) - .withNewPersistentVolumeClaim() - .withClaimName(claimNameOne) - .endPersistentVolumeClaim() - .build(); - final V1Volume volumeTwo = new V1VolumeBuilder() - .withName(volumeNameTwo) - .withNewPersistentVolumeClaim() - .withClaimName(claimNameTwo) - .endPersistentVolumeClaim() - .build(); - final V1VolumeMount volumeMountOne = new V1VolumeMountBuilder() - .withName(volumeNameOne) - .withMountPath(mountPathOne) - .build(); - final V1VolumeMount volumeMountTwo = new V1VolumeMountBuilder() - .withName(volumeNameTwo) - .withMountPath(mountPathTwo) - .withSubPath(mountSubPathTwo) - .build(); - - // Test case container. - // Input: Map of Volume configurations. - // Output: The expected lists of Volumes and Volume Mounts. - final List<TestTuple<Map<String, Map<VolumeConfigKeys, String>>, - Pair<List<V1Volume>, List<V1VolumeMount>>>> testCases = new LinkedList<>(); - - // Default case: No PVC provided. - testCases.add(new TestTuple<>("Generated an empty list of Volumes", new HashMap<>(), - new Pair<>(new LinkedList<>(), new LinkedList<>()))); - - // PVC Provided. - final Pair<List<V1Volume>, List<V1VolumeMount>> expectedFull = - new Pair<>( - new LinkedList<>(Arrays.asList(volumeOne, volumeTwo)), - new LinkedList<>(Arrays.asList(volumeMountOne, volumeMountTwo))); - testCases.add(new TestTuple<>("Generated a list of Volumes", mapOfOpts, - new Pair<>(expectedFull.first, expectedFull.second))); - - // Testing loop. - for (TestTuple<Map<String, Map<VolumeConfigKeys, String>>, - Pair<List<V1Volume>, List<V1VolumeMount>>> testCase : testCases) { - List<V1Volume> actualVolume = new LinkedList<>(); - List<V1VolumeMount> actualVolumeMount = new LinkedList<>(); - v1ControllerPodTemplate.createVolumeAndMountsPersistentVolumeClaimCLI(testCase.input, - actualVolume, actualVolumeMount); - - Assert.assertTrue(testCase.description, - (testCase.expected.first).containsAll(actualVolume)); - Assert.assertTrue(testCase.description + " Mounts", - (testCase.expected.second).containsAll(actualVolumeMount)); - } - } - - @Test - public void testConfigurePodWithVolumesAndMountsFromCLI() { - final String volumeNameClashing = "clashing-volume"; - final String volumeMountNameClashing = "original-volume-mount"; - V1Volume baseVolume = new V1VolumeBuilder() - .withName(volumeNameClashing) - .withNewPersistentVolumeClaim() - .withClaimName("Original Base Claim Name") - .endPersistentVolumeClaim() - .build(); - V1VolumeMount baseVolumeMount = new V1VolumeMountBuilder() - .withName(volumeMountNameClashing) - .withMountPath("/original/mount/path") - .build(); - V1Volume clashingVolume = new V1VolumeBuilder() - .withName(volumeNameClashing) - .withNewPersistentVolumeClaim() - .withClaimName("Clashing Claim Replaced") - .endPersistentVolumeClaim() - .build(); - V1VolumeMount clashingVolumeMount = new V1VolumeMountBuilder() - .withName(volumeMountNameClashing) - .withMountPath("/clashing/mount/path") - .build(); - V1Volume secondaryVolume = new V1VolumeBuilder() - .withName("secondary-volume") - .withNewPersistentVolumeClaim() - .withClaimName("Original Secondary Claim Name") - .endPersistentVolumeClaim() - .build(); - V1VolumeMount secondaryVolumeMount = new V1VolumeMountBuilder() - .withName("secondary-volume-mount") - .withMountPath("/secondary/mount/path") - .build(); - - // Test case container. - // Input: [0] Pod Spec to modify, [1] Heron container to modify, [2] List of Volumes - // [3] List of Volume Mounts. - // Output: The expected <V1PodSpec> and <V1Container>. - final List<TestTuple<Object[], Pair<V1PodSpec, V1Container>>> testCases = new LinkedList<>(); - - // No Persistent Volume Claim. - final V1PodSpec podSpecEmptyCase = new V1PodSpecBuilder().withVolumes(baseVolume).build(); - final V1Container executorEmptyCase = - new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build(); - final V1PodSpec expectedEmptyPodSpec = new V1PodSpecBuilder().withVolumes(baseVolume).build(); - final V1Container expectedEmptyExecutor = - new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build(); - - testCases.add(new TestTuple<>("Empty", - new Object[]{podSpecEmptyCase, executorEmptyCase, new LinkedList<>(), new LinkedList<>()}, - new Pair<>(expectedEmptyPodSpec, expectedEmptyExecutor))); - - // Non-clashing Persistent Volume Claim. - final V1PodSpec podSpecNoClashCase = new V1PodSpecBuilder() - .withVolumes(baseVolume) - .build(); - final V1Container executorNoClashCase = new V1ContainerBuilder() - .withVolumeMounts(baseVolumeMount) - .build(); - final V1PodSpec expectedNoClashPodSpec = new V1PodSpecBuilder() - .addToVolumes(baseVolume) - .addToVolumes(secondaryVolume) - .build(); - final V1Container expectedNoClashExecutor = new V1ContainerBuilder() - .addToVolumeMounts(baseVolumeMount) - .addToVolumeMounts(secondaryVolumeMount) - .build(); - - testCases.add(new TestTuple<>("No Clash", - new Object[]{podSpecNoClashCase, executorNoClashCase, - Collections.singletonList(secondaryVolume), - Collections.singletonList(secondaryVolumeMount)}, - new Pair<>(expectedNoClashPodSpec, expectedNoClashExecutor))); - - // Clashing Persistent Volume Claim. - final V1PodSpec podSpecClashCase = new V1PodSpecBuilder() - .withVolumes(baseVolume) - .build(); - final V1Container executorClashCase = new V1ContainerBuilder() - .withVolumeMounts(baseVolumeMount) - .build(); - final V1PodSpec expectedClashPodSpec = new V1PodSpecBuilder() - .addToVolumes(clashingVolume) - .addToVolumes(secondaryVolume) - .build(); - final V1Container expectedClashExecutor = new V1ContainerBuilder() - .addToVolumeMounts(clashingVolumeMount) - .addToVolumeMounts(secondaryVolumeMount) - .build(); - - testCases.add(new TestTuple<>("Clashing", - new Object[]{podSpecClashCase, executorClashCase, - Arrays.asList(clashingVolume, secondaryVolume), - Arrays.asList(clashingVolumeMount, secondaryVolumeMount)}, - new Pair<>(expectedClashPodSpec, expectedClashExecutor))); - - // Testing loop. - for (TestTuple<Object[], Pair<V1PodSpec, V1Container>> testCase : testCases) { - v1ControllerWithPodTemplate - .configurePodWithVolumesAndMountsFromCLI((V1PodSpec) testCase.input[0], - (V1Container) testCase.input[1], (List<V1Volume>) testCase.input[2], - (List<V1VolumeMount>) testCase.input[3]); - - Assert.assertEquals("Pod Specs match " + testCase.description, - testCase.input[0], testCase.expected.first); - Assert.assertEquals("Executors match " + testCase.description, - testCase.input[1], testCase.expected.second); - } - } - - @Test - public void testSetShardIdEnvironmentVariableCommand() { - - List<TestTuple<Boolean, String>> testCases = new LinkedList<>(); - - testCases.add(new TestTuple<>("Executor command is set correctly", - true, "SHARD_ID=$((${POD_NAME##*-} + 1)) && echo shardId=${SHARD_ID}")); - testCases.add(new TestTuple<>("Manager command is set correctly", - false, "SHARD_ID=${POD_NAME##*-} && echo shardId=${SHARD_ID}")); - - for (TestTuple<Boolean, String> testCase : testCases) { - Assert.assertEquals(testCase.description, testCase.expected, - v1ControllerWithPodTemplate.setShardIdEnvironmentVariableCommand(testCase.input)); - } - } - - @Test - public void testCreateResourcesRequirement() { - final String managerCpuLimit = "3000m"; - final String managerMemLimit = "256Gi"; - final Quantity memory = Quantity.fromString(managerMemLimit); - final Quantity cpu = Quantity.fromString(managerCpuLimit); - final List<TestTuple<Map<String, String>, Map<String, Quantity>>> testCases = - new LinkedList<>(); - - // No input. - Map<String, String> inputEmpty = new HashMap<>(); - testCases.add(new TestTuple<>("Empty input.", inputEmpty, new HashMap<>())); - - // Only memory. - Map<String, String> inputMemory = new HashMap<String, String>() { - { - put(KubernetesConstants.MEMORY, managerMemLimit); - } - }; - Map<String, Quantity> expectedMemory = new HashMap<String, Quantity>() { - { - put(KubernetesConstants.MEMORY, memory); - } - }; - testCases.add(new TestTuple<>("Only memory input.", inputMemory, expectedMemory)); - - // Only CPU. - Map<String, String> inputCPU = new HashMap<String, String>() { - { - put(KubernetesConstants.CPU, managerCpuLimit); - } - }; - Map<String, Quantity> expectedCPU = new HashMap<String, Quantity>() { - { - put(KubernetesConstants.CPU, cpu); - } - }; - testCases.add(new TestTuple<>("Only CPU input.", inputCPU, expectedCPU)); - - // CPU and memory. - Map<String, String> inputMemoryCPU = new HashMap<String, String>() { - { - put(KubernetesConstants.MEMORY, managerMemLimit); - put(KubernetesConstants.CPU, managerCpuLimit); - } - }; - Map<String, Quantity> expectedMemoryCPU = new HashMap<String, Quantity>() { - { - put(KubernetesConstants.MEMORY, memory); - put(KubernetesConstants.CPU, cpu); - } - }; - testCases.add(new TestTuple<>("Memory and CPU input.", inputMemoryCPU, expectedMemoryCPU)); - - // Invalid. - Map<String, String> inputInvalid = new HashMap<String, String>() { - { - put("invalid input", "will not be ignored"); - put(KubernetesConstants.CPU, managerCpuLimit); - } - }; - Map<String, Quantity> expectedInvalid = new HashMap<String, Quantity>() { - { - put(KubernetesConstants.CPU, cpu); - } - }; - testCases.add(new TestTuple<>("Invalid input.", inputInvalid, expectedInvalid)); - - // Test loop. - for (TestTuple<Map<String, String>, Map<String, Quantity>> testCase : testCases) { - Map<String, Quantity> actual = - v1ControllerPodTemplate.createResourcesRequirement(testCase.input); - Assert.assertEquals(testCase.description, testCase.expected, actual); - } - } - - @Test - public void testCreateVolumeAndMountsEmptyDirCLI() { - final String volumeName = "volume-name-empty-dir"; - final String medium = "Memory"; - final String sizeLimit = "1Gi"; - final String path = "/path/to/mount"; - final String subPath = "/sub/path/to/mount"; - - // Empty Dir. - final Map<String, Map<VolumeConfigKeys, String>> config = - ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() { - { - put(VolumeConfigKeys.sizeLimit, sizeLimit); - put(VolumeConfigKeys.medium, "Memory"); - put(VolumeConfigKeys.path, path); - put(VolumeConfigKeys.subPath, subPath); - } - }); - final List<V1Volume> expectedVolumes = Collections.singletonList( - new V1VolumeBuilder() - .withName(volumeName) - .withNewEmptyDir() - .withMedium(medium) - .withNewSizeLimit(sizeLimit) - .endEmptyDir() - .build() - ); - final List<V1VolumeMount> expectedMounts = Collections.singletonList( - new V1VolumeMountBuilder() - .withName(volumeName) - .withMountPath(path) - .withSubPath(subPath) - .build() - ); - - List<V1Volume> actualVolumes = new LinkedList<>(); - List<V1VolumeMount> actualMounts = new LinkedList<>(); - v1ControllerPodTemplate.createVolumeAndMountsEmptyDirCLI(config, actualVolumes, actualMounts); - Assert.assertEquals("Empty Dir Volume populated", expectedVolumes, actualVolumes); - Assert.assertEquals("Empty Dir Volume Mount populated", expectedMounts, actualMounts); - } - - @Test - public void testCreateVolumeAndMountsHostPathCLI() { - final String volumeName = "volume-name-host-path"; - final String type = "DirectoryOrCreate"; - final String pathOnHost = "path.on.host"; - final String path = "/path/to/mount"; - final String subPath = "/sub/path/to/mount"; - - // Host Path. - final Map<String, Map<VolumeConfigKeys, String>> config = - ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() { - { - put(VolumeConfigKeys.type, type); - put(VolumeConfigKeys.pathOnHost, pathOnHost); - put(VolumeConfigKeys.path, path); - put(VolumeConfigKeys.subPath, subPath); - } - }); - final List<V1Volume> expectedVolumes = Collections.singletonList( - new V1VolumeBuilder() - .withName(volumeName) - .withNewHostPath() - .withNewType(type) - .withNewPath(pathOnHost) - .endHostPath() - .build() - ); - final List<V1VolumeMount> expectedMounts = Collections.singletonList( - new V1VolumeMountBuilder() - .withName(volumeName) - .withMountPath(path) - .withSubPath(subPath) - .build() - ); - - List<V1Volume> actualVolumes = new LinkedList<>(); - List<V1VolumeMount> actualMounts = new LinkedList<>(); - v1ControllerPodTemplate.createVolumeAndMountsHostPathCLI(config, actualVolumes, actualMounts); - Assert.assertEquals("Host Path Volume populated", expectedVolumes, actualVolumes); - Assert.assertEquals("Host Path Volume Mount populated", expectedMounts, actualMounts); - } - - @Test - public void testCreateVolumeAndMountsNFSCLI() { - final String volumeName = "volume-name-nfs"; - final String server = "nfs.server.address"; - final String pathOnNFS = "path.on.host"; - final String readOnly = "true"; - final String path = "/path/to/mount"; - final String subPath = "/sub/path/to/mount"; - - // NFS. - final Map<String, Map<VolumeConfigKeys, String>> config = - ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() { - { - put(VolumeConfigKeys.server, server); - put(VolumeConfigKeys.readOnly, readOnly); - put(VolumeConfigKeys.pathOnNFS, pathOnNFS); - put(VolumeConfigKeys.path, path); - put(VolumeConfigKeys.subPath, subPath); - } - }); - final List<V1Volume> expectedVolumes = Collections.singletonList( - new V1VolumeBuilder() - .withName(volumeName) - .withNewNfs() - .withServer(server) - .withPath(pathOnNFS) - .withReadOnly(Boolean.parseBoolean(readOnly)) - .endNfs() - .build() - ); - final List<V1VolumeMount> expectedMounts = Collections.singletonList( - new V1VolumeMountBuilder() - .withName(volumeName) - .withMountPath(path) - .withSubPath(subPath) - .withReadOnly(true) - .build() - ); - - List<V1Volume> actualVolumes = new LinkedList<>(); - List<V1VolumeMount> actualMounts = new LinkedList<>(); - v1ControllerPodTemplate.createVolumeAndMountsNFSCLI(config, actualVolumes, actualMounts); - Assert.assertEquals("NFS Volume populated", expectedVolumes, actualVolumes); - Assert.assertEquals("NFS Volume Mount populated", expectedMounts, actualMounts); - } }
