This is an automated email from the ASF dual-hosted git repository. saadurrahman pushed a commit to branch saadurrahman/3846-Refactoring-K8s-Shim-dev in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
commit 23ded81598237d687be0b9301b307af96ef0c825 Author: Saad Ur Rahman <[email protected]> AuthorDate: Wed Jul 20 19:57:56 2022 -0400 [Tests] Shim to Stateful Set test migration. --- .../scheduler/kubernetes/StatefulSetTest.java | 966 ++++++++++++++++++++- 1 file changed, 964 insertions(+), 2 deletions(-) diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java index 746823f21ea..10f48845b5f 100644 --- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java +++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java @@ -19,12 +19,974 @@ package org.apache.heron.scheduler.kubernetes; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; + import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.heron.common.basics.ByteAmount; +import org.apache.heron.common.basics.Pair; +import org.apache.heron.spi.common.Config; +import org.apache.heron.spi.common.Key; +import org.apache.heron.spi.packing.Resource; + +import io.kubernetes.client.custom.Quantity; +import io.kubernetes.client.openapi.models.V1Container; +import io.kubernetes.client.openapi.models.V1ContainerBuilder; +import io.kubernetes.client.openapi.models.V1ContainerPort; +import io.kubernetes.client.openapi.models.V1ContainerPortBuilder; +import io.kubernetes.client.openapi.models.V1EnvVar; +import io.kubernetes.client.openapi.models.V1EnvVarSource; +import io.kubernetes.client.openapi.models.V1ObjectFieldSelector; +import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim; +import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimBuilder; +import io.kubernetes.client.openapi.models.V1PodSpec; +import io.kubernetes.client.openapi.models.V1PodSpecBuilder; +import io.kubernetes.client.openapi.models.V1PodTemplateSpec; +import io.kubernetes.client.openapi.models.V1PodTemplateSpecBuilder; +import io.kubernetes.client.openapi.models.V1ResourceRequirements; +import io.kubernetes.client.openapi.models.V1Toleration; +import io.kubernetes.client.openapi.models.V1Volume; +import io.kubernetes.client.openapi.models.V1VolumeBuilder; +import io.kubernetes.client.openapi.models.V1VolumeMount; +import io.kubernetes.client.openapi.models.V1VolumeMountBuilder; +import static org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeConfigKeys; +import static org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple; + +@RunWith(MockitoJUnitRunner.class) public class StatefulSetTest { + private static final String TOPOLOGY_NAME = "topology-name"; + private static final String CONFIGMAP_NAME = "CONFIG-MAP-NAME"; + private static final String POD_TEMPLATE_NAME = "POD-TEMPLATE-NAME"; + private static final String CONFIGMAP_POD_TEMPLATE_NAME = + String.format("%s.%s", CONFIGMAP_NAME, POD_TEMPLATE_NAME); + private static final String POD_TEMPLATE_LOCATION_EXECUTOR = + String.format(KubernetesContext.KUBERNETES_POD_TEMPLATE_LOCATION, + KubernetesConstants.EXECUTOR_NAME); + private static final String POD_TEMPLATE_LOCATION_MANAGER = + String.format(KubernetesContext.KUBERNETES_POD_TEMPLATE_LOCATION, + KubernetesConstants.MANAGER_NAME); + private static final Config CONFIG_WITH_POD_TEMPLATE = Config.newBuilder() + .put(POD_TEMPLATE_LOCATION_EXECUTOR, CONFIGMAP_POD_TEMPLATE_NAME) + .put(POD_TEMPLATE_LOCATION_MANAGER, CONFIGMAP_POD_TEMPLATE_NAME) + .build(); + private static final Config RUNTIME = Config.newBuilder() + .put(Key.TOPOLOGY_NAME, TOPOLOGY_NAME) + .build(); + private static final StatefulSet STATEFUL_SET = new StatefulSet(); + private static final V1PodTemplateSpec POD_TEMPLATE_SPEC = new V1PodTemplateSpecBuilder() + .withNewMetadata() + .withLabels(Collections.singletonMap("app", "heron-tracker")) + .endMetadata() + .withNewSpec() + .withContainers(new V1ContainerBuilder() + .withName("heron-tracker") + .withImage("apache/heron:latest") + .withPorts(new V1ContainerPortBuilder() + .withName("api-port") + .withContainerPort(8888) + .build()) + .withNewResources() + .withRequests(Collections.singletonMap("100m", new Quantity("200M"))) + .withLimits(Collections.singletonMap("400m", new Quantity("512M"))) + .endResources() + .build()) + .endSpec() + .build(); + private static final StatefulSet.Configs CLUSTER_CONFIGS = + new StatefulSet.Configs(CONFIG_WITH_POD_TEMPLATE, RUNTIME, + POD_TEMPLATE_SPEC, POD_TEMPLATE_SPEC); + + + @Test + public void testConfigureContainerPorts() { + final String portNamekept = "random-port-to-be-kept"; + final int portNumberkept = 1111; + final int numInstances = 3; + final List<V1ContainerPort> expectedPortsBase = + Collections.unmodifiableList(StatefulSet.getExecutorPorts()); + final List<V1ContainerPort> debugPorts = + Collections.unmodifiableList(StatefulSet.getDebuggingPorts(numInstances)); + final List<V1ContainerPort> inputPortsBase = Collections.unmodifiableList( + Arrays.asList( + new V1ContainerPort() + .name("server-port-to-replace").containerPort(KubernetesConstants.SERVER_PORT), + new V1ContainerPort() + .name("shell-port-to-replace").containerPort(KubernetesConstants.SHELL_PORT), + new V1ContainerPort().name(portNamekept).containerPort(portNumberkept) + ) + ); + + // Load configurations into test class. + STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS); + + // Null ports. This is the default case. + final V1Container inputContainerWithNullPorts = new V1ContainerBuilder().build(); + STATEFUL_SET.configureContainerPorts(false, 0, inputContainerWithNullPorts); + Assert.assertTrue("Server and/or shell PORTS for container with null ports list", + CollectionUtils.containsAll(inputContainerWithNullPorts.getPorts(), expectedPortsBase)); + + // Empty ports. + final V1Container inputContainerWithEmptyPorts = new V1ContainerBuilder() + .withPorts(new LinkedList<>()) + .build(); + STATEFUL_SET.configureContainerPorts(false, 0, inputContainerWithEmptyPorts); + Assert.assertTrue("Server and/or shell PORTS for container with empty ports list", + CollectionUtils.containsAll(inputContainerWithEmptyPorts.getPorts(), expectedPortsBase)); + + // Port overriding. + final List<V1ContainerPort> inputPorts = new LinkedList<>(inputPortsBase); + final V1Container inputContainerWithPorts = new V1ContainerBuilder() + .withPorts(inputPorts) + .build(); + final List<V1ContainerPort> expectedPortsOverriding = new LinkedList<>(expectedPortsBase); + expectedPortsOverriding + .add(new V1ContainerPort().name(portNamekept).containerPort(portNumberkept)); + + STATEFUL_SET.configureContainerPorts(false, 0, inputContainerWithPorts); + Assert.assertTrue("Server and/or shell PORTS for container should be overwritten.", + CollectionUtils.containsAll(inputContainerWithPorts.getPorts(), expectedPortsOverriding)); + + // Port overriding with debug ports. + final List<V1ContainerPort> inputPortsWithDebug = new LinkedList<>(debugPorts); + inputPortsWithDebug.addAll(inputPortsBase); + final V1Container inputContainerWithDebug = new V1ContainerBuilder() + .withPorts(inputPortsWithDebug) + .build(); + final List<V1ContainerPort> expectedPortsDebug = new LinkedList<>(expectedPortsBase); + expectedPortsDebug.add(new V1ContainerPort().name(portNamekept).containerPort(portNumberkept)); + expectedPortsDebug.addAll(debugPorts); + + STATEFUL_SET.configureContainerPorts(true, numInstances, inputContainerWithDebug); + Assert.assertTrue("Server and/or shell with debug PORTS for container should be overwritten.", + CollectionUtils.containsAll(inputContainerWithDebug.getPorts(), expectedPortsDebug)); + } + + @Test + public void testConfigureContainerEnvVars() { + final List<V1EnvVar> heronEnvVars = + Collections.unmodifiableList(STATEFUL_SET.getExecutorEnvVars()); + final V1EnvVar additionEnvVar = new V1EnvVar() + .name("env-variable-to-be-kept") + .valueFrom(new V1EnvVarSource() + .fieldRef(new V1ObjectFieldSelector() + .fieldPath("env-variable-was-kept"))); + final List<V1EnvVar> inputEnvVars = Arrays.asList( + new V1EnvVar() + .name(KubernetesConstants.ENV_HOST) + .valueFrom(new V1EnvVarSource() + .fieldRef(new V1ObjectFieldSelector() + .fieldPath("env-host-to-be-replaced"))), + new V1EnvVar() + .name(KubernetesConstants.ENV_POD_NAME) + .valueFrom(new V1EnvVarSource() + .fieldRef(new V1ObjectFieldSelector() + .fieldPath("pod-name-to-be-replaced"))), + additionEnvVar + ); + + // Load configurations into test class. + STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS); + + // Null env vars. This is the default case. + V1Container containerWithNullEnvVars = new V1ContainerBuilder().build(); + STATEFUL_SET.configureContainerEnvVars(containerWithNullEnvVars); + Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with null Env Vars should match", + CollectionUtils.containsAll(containerWithNullEnvVars.getEnv(), heronEnvVars)); + + // Empty env vars. + V1Container containerWithEmptyEnvVars = new V1ContainerBuilder() + .withEnv(new LinkedList<>()) + .build(); + STATEFUL_SET.configureContainerEnvVars(containerWithEmptyEnvVars); + Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with empty Env Vars should match", + CollectionUtils.containsAll(containerWithEmptyEnvVars.getEnv(), heronEnvVars)); + + // Env Var overriding. + final List<V1EnvVar> expectedOverriding = new LinkedList<>(heronEnvVars); + expectedOverriding.add(additionEnvVar); + V1Container containerWithEnvVars = new V1ContainerBuilder() + .withEnv(inputEnvVars) + .build(); + STATEFUL_SET.configureContainerEnvVars(containerWithEnvVars); + Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with Env Vars should be overridden", + CollectionUtils.containsAll(containerWithEnvVars.getEnv(), expectedOverriding)); + } + + @Test + public void testConfigureContainerResources() { + // Load configurations into test class. + STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS); + + final boolean isExecutor = true; + + final Resource resourceDefault = new Resource( + 9, ByteAmount.fromMegabytes(19000), ByteAmount.fromMegabytes(99000)); + final Resource resourceCustom = new Resource( + 4, ByteAmount.fromMegabytes(34000), ByteAmount.fromMegabytes(400000)); + + final Quantity defaultRAM = Quantity.fromString( + KubernetesUtils.Megabytes(resourceDefault.getRam())); + final Quantity defaultCPU = Quantity.fromString( + Double.toString(KubernetesUtils.roundDecimal(resourceDefault.getCpu(), 3))); + final Quantity customRAM = Quantity.fromString( + KubernetesUtils.Megabytes(resourceCustom.getRam())); + final Quantity customCPU = Quantity.fromString( + Double.toString(KubernetesUtils.roundDecimal(resourceCustom.getCpu(), 3))); + final Quantity customDisk = Quantity.fromString( + KubernetesUtils.Megabytes(resourceCustom.getDisk())); + + final Config configNoLimit = Config.newBuilder() + .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "NOT_SET") + .build(); + final Config configWithLimit = Config.newBuilder() + .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "EQUAL_TO_LIMIT") + .build(); + + final V1ResourceRequirements expectDefaultRequirements = new V1ResourceRequirements() + .putLimitsItem(KubernetesConstants.MEMORY, defaultRAM) + .putLimitsItem(KubernetesConstants.CPU, defaultCPU); + + final V1ResourceRequirements expectCustomRequirements = new V1ResourceRequirements() + .putLimitsItem(KubernetesConstants.MEMORY, defaultRAM) + .putLimitsItem(KubernetesConstants.CPU, defaultCPU) + .putLimitsItem("disk", customDisk); + + final V1ResourceRequirements customRequirements = new V1ResourceRequirements() + .putLimitsItem(KubernetesConstants.MEMORY, customRAM) + .putLimitsItem(KubernetesConstants.CPU, customCPU) + .putLimitsItem("disk", customDisk); + + // Default. Null resources. + V1Container containerNull = new V1ContainerBuilder().build(); + STATEFUL_SET.configureContainerResources(containerNull, configNoLimit, resourceDefault, + isExecutor); + Assert.assertTrue("Default LIMITS should be set in container with null LIMITS", + containerNull.getResources().getLimits().entrySet() + .containsAll(expectDefaultRequirements.getLimits().entrySet())); + + // Empty resources. + V1Container containerEmpty = new V1ContainerBuilder().withNewResources().endResources().build(); + STATEFUL_SET.configureContainerResources(containerEmpty, configNoLimit, resourceDefault, + isExecutor); + Assert.assertTrue("Default LIMITS should be set in container with empty LIMITS", + containerNull.getResources().getLimits().entrySet() + .containsAll(expectDefaultRequirements.getLimits().entrySet())); + + // Custom resources. + V1Container containerCustom = new V1ContainerBuilder() + .withResources(customRequirements) + .build(); + STATEFUL_SET.configureContainerResources(containerCustom, configNoLimit, resourceDefault, + isExecutor); + Assert.assertTrue("Custom LIMITS should be set in container with custom LIMITS", + containerCustom.getResources().getLimits().entrySet() + .containsAll(expectCustomRequirements.getLimits().entrySet())); + + // Custom resources with request. + V1Container containerRequests = new V1ContainerBuilder() + .withResources(customRequirements) + .build(); + STATEFUL_SET.configureContainerResources(containerRequests, configWithLimit, resourceDefault, + isExecutor); + Assert.assertTrue("Custom LIMITS should be set in container with custom LIMITS and REQUEST", + containerRequests.getResources().getLimits().entrySet() + .containsAll(expectCustomRequirements.getLimits().entrySet())); + Assert.assertTrue("Custom REQUEST should be set in container with custom LIMITS and REQUEST", + containerRequests.getResources().getRequests().entrySet() + .containsAll(expectCustomRequirements.getLimits().entrySet())); + } + + @Test + public void testConfigureContainerResourcesCLI() { + // Load configurations into test class. + STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS); + + final boolean isExecutor = true; + final String customLimitMEMStr = "120Gi"; + final String customLimitCPUStr = "5"; + final String customRequestMEMStr = "100Mi"; + final String customRequestCPUStr = "4"; + + final Resource resources = new Resource( + 6, ByteAmount.fromMegabytes(34000), ByteAmount.fromGigabytes(400)); + + final Quantity customLimitMEM = Quantity.fromString(customLimitMEMStr); + final Quantity customLimitCPU = Quantity.fromString(customLimitCPUStr); + final Quantity customRequestMEM = Quantity.fromString(customRequestMEMStr); + final Quantity customRequestCPU = Quantity.fromString(customRequestCPUStr); + + final Config config = Config.newBuilder() + .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_LIMITS_PREFIX + + KubernetesConstants.CPU, KubernetesConstants.EXECUTOR_NAME), customLimitCPUStr) + .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_LIMITS_PREFIX + + KubernetesConstants.MEMORY, KubernetesConstants.EXECUTOR_NAME), customLimitMEMStr) + .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_REQUESTS_PREFIX + + KubernetesConstants.CPU, KubernetesConstants.EXECUTOR_NAME), customRequestCPUStr) + .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_REQUESTS_PREFIX + + KubernetesConstants.MEMORY, KubernetesConstants.EXECUTOR_NAME), + customRequestMEMStr) + .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "EQUAL_TO_LIMIT") + .build(); + + final V1Container expected = new V1ContainerBuilder() + .withNewResources() + .addToLimits(KubernetesConstants.CPU, customLimitCPU) + .addToLimits(KubernetesConstants.MEMORY, customLimitMEM) + .addToRequests(KubernetesConstants.CPU, customRequestCPU) + .addToRequests(KubernetesConstants.MEMORY, customRequestMEM) + .endResources() + .build(); + + final V1Container actual = new V1Container(); + STATEFUL_SET.configureContainerResources(actual, config, resources, isExecutor); + Assert.assertEquals("Container Resources are set from CLI.", expected, actual); + } + + @Test + public void testMountVolumeIfPresent() { + final String pathDefault = "config-host-volume-path"; + final String pathNameDefault = "config-host-volume-name"; + final Config configWithVolumes = Config.newBuilder() + .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME, pathNameDefault) + .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH, pathDefault) + .build(); + final StatefulSet.Configs configsWithVolumes = new StatefulSet.Configs(configWithVolumes, + RUNTIME, POD_TEMPLATE_SPEC, POD_TEMPLATE_SPEC); + final StatefulSet.Configs configsWithNoVolumes = new StatefulSet.Configs( + Config.newBuilder().build(), RUNTIME, POD_TEMPLATE_SPEC, POD_TEMPLATE_SPEC); + final V1VolumeMount volumeDefault = new V1VolumeMountBuilder() + .withName(pathNameDefault) + .withMountPath(pathDefault) + .build(); + final V1VolumeMount volumeCustom = new V1VolumeMountBuilder() + .withName("custom-volume-mount") + .withMountPath("should-be-kept") + .build(); + + final List<V1VolumeMount> expectedMountsDefault = Collections.singletonList(volumeDefault); + final List<V1VolumeMount> expectedMountsCustom = Arrays.asList(volumeCustom, volumeDefault); + final List<V1VolumeMount> volumeMountsCustomList = Arrays.asList( + new V1VolumeMountBuilder() + .withName(pathNameDefault) + .withMountPath("should-be-replaced") + .build(), + volumeCustom + ); + + // No Volume Mounts set. + STATEFUL_SET.setClusterConfigs(configsWithNoVolumes); + V1Container containerNoSetMounts = new V1Container(); + STATEFUL_SET.mountVolumeIfPresent(containerNoSetMounts); + Assert.assertNull(containerNoSetMounts.getVolumeMounts()); + + // Configure factory for volume mounts. + STATEFUL_SET.setClusterConfigs(configsWithVolumes); + + // Default. Null Volume Mounts. + V1Container containerNull = new V1ContainerBuilder().build(); + STATEFUL_SET.mountVolumeIfPresent(containerNull); + Assert.assertTrue("Default VOLUME MOUNTS should be set in container with null VOLUME MOUNTS", + CollectionUtils.containsAll(expectedMountsDefault, containerNull.getVolumeMounts())); + + // Empty Volume Mounts. + V1Container containerEmpty = new V1ContainerBuilder() + .withVolumeMounts(new LinkedList<>()) + .build(); + STATEFUL_SET.mountVolumeIfPresent(containerEmpty); + Assert.assertTrue("Default VOLUME MOUNTS should be set in container with empty VOLUME MOUNTS", + CollectionUtils.containsAll(expectedMountsDefault, containerEmpty.getVolumeMounts())); + + // Custom Volume Mounts. + V1Container containerCustom = new V1ContainerBuilder() + .withVolumeMounts(volumeMountsCustomList) + .build(); + STATEFUL_SET.mountVolumeIfPresent(containerCustom); + Assert.assertTrue("Default VOLUME MOUNTS should be set in container with custom VOLUME MOUNTS", + CollectionUtils.containsAll(expectedMountsCustom, containerCustom.getVolumeMounts())); + } + + @Test + public void testConfigureTolerations() { + // Load configurations into test class. + STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS); + + final V1Toleration keptToleration = new V1Toleration() + .key("kept toleration") + .operator("Some Operator") + .effect("Some Effect") + .tolerationSeconds(5L); + final List<V1Toleration> expectedTolerationBase = + Collections.unmodifiableList(KubernetesShim.getTolerations()); + final List<V1Toleration> inputTolerationsBase = Collections.unmodifiableList( + Arrays.asList( + new V1Toleration() + .key(KubernetesConstants.TOLERATIONS.get(0)).operator("replace").effect("replace"), + new V1Toleration() + .key(KubernetesConstants.TOLERATIONS.get(1)).operator("replace").effect("replace"), + keptToleration + ) + ); + + // Null Tolerations. This is the default case. + final V1PodSpec podSpecNullTolerations = new V1PodSpecBuilder().build(); + STATEFUL_SET.configureTolerations(podSpecNullTolerations); + Assert.assertTrue("Pod Spec has null TOLERATIONS and should be set to Heron's defaults", + CollectionUtils.containsAll(podSpecNullTolerations.getTolerations(), + expectedTolerationBase)); + + // Empty Tolerations. + final V1PodSpec podSpecWithEmptyTolerations = new V1PodSpecBuilder() + .withTolerations(new LinkedList<>()) + .build(); + STATEFUL_SET.configureTolerations(podSpecWithEmptyTolerations); + Assert.assertTrue("Pod Spec has empty TOLERATIONS and should be set to Heron's defaults", + CollectionUtils.containsAll(podSpecWithEmptyTolerations.getTolerations(), + expectedTolerationBase)); + + // Toleration overriding. + final V1PodSpec podSpecWithTolerations = new V1PodSpecBuilder() + .withTolerations(inputTolerationsBase) + .build(); + final List<V1Toleration> expectedTolerationsOverriding = + new LinkedList<>(expectedTolerationBase); + expectedTolerationsOverriding.add(keptToleration); + + STATEFUL_SET.configureTolerations(podSpecWithTolerations); + Assert.assertTrue("Pod Spec has TOLERATIONS and should be overridden with Heron's defaults", + CollectionUtils.containsAll(podSpecWithTolerations.getTolerations(), + expectedTolerationsOverriding)); + } + + @Test + public void testCreatePersistentVolumeClaims() { + // Load configurations into test class. + STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS); + + final String topologyName = "topology-name"; + final String volumeNameOne = "volume-name-one"; + final String volumeNameTwo = "volume-name-two"; + final String volumeNameStatic = "volume-name-static"; + final String claimNameOne = "OnDemand"; + final String claimNameTwo = "claim-name-two"; + final String claimNameStatic = "OnDEmaND"; + final String storageClassName = "storage-class-name"; + final String sizeLimit = "555Gi"; + final String accessModesList = "ReadWriteOnce,ReadOnlyMany,ReadWriteMany"; + final String accessModes = "ReadOnlyMany"; + final String volumeMode = "VolumeMode"; + final String path = "/path/to/mount/"; + final String subPath = "/sub/path/to/mount/"; + final Map<String, Map<VolumeConfigKeys, String>> mapPVCOpts = + ImmutableMap.of( + volumeNameOne, new HashMap<VolumeConfigKeys, String>() { + { + put(VolumeConfigKeys.claimName, claimNameOne); + put(VolumeConfigKeys.storageClassName, storageClassName); + put(VolumeConfigKeys.sizeLimit, sizeLimit); + put(VolumeConfigKeys.accessModes, accessModesList); + put(VolumeConfigKeys.volumeMode, volumeMode); + put(VolumeConfigKeys.path, path); + } + }, + volumeNameTwo, new HashMap<VolumeConfigKeys, String>() { + { + put(VolumeConfigKeys.claimName, claimNameTwo); + put(VolumeConfigKeys.storageClassName, storageClassName); + put(VolumeConfigKeys.sizeLimit, sizeLimit); + put(VolumeConfigKeys.accessModes, accessModes); + put(VolumeConfigKeys.volumeMode, volumeMode); + put(VolumeConfigKeys.path, path); + put(VolumeConfigKeys.subPath, subPath); + } + }, + volumeNameStatic, new HashMap<VolumeConfigKeys, String>() { + { + put(VolumeConfigKeys.claimName, claimNameStatic); + put(VolumeConfigKeys.sizeLimit, sizeLimit); + put(VolumeConfigKeys.accessModes, accessModes); + put(VolumeConfigKeys.volumeMode, volumeMode); + put(VolumeConfigKeys.path, path); + put(VolumeConfigKeys.subPath, subPath); + } + } + ); + + final V1PersistentVolumeClaim claimOne = new V1PersistentVolumeClaimBuilder() + .withNewMetadata() + .withName(volumeNameOne) + .withLabels(KubernetesShim.getPersistentVolumeClaimLabels(topologyName)) + .endMetadata() + .withNewSpec() + .withStorageClassName(storageClassName) + .withAccessModes(Arrays.asList(accessModesList.split(","))) + .withVolumeMode(volumeMode) + .withNewResources() + .addToRequests("storage", new Quantity(sizeLimit)) + .endResources() + .endSpec() + .build(); + + final V1PersistentVolumeClaim claimStatic = new V1PersistentVolumeClaimBuilder() + .withNewMetadata() + .withName(volumeNameStatic) + .withLabels(KubernetesShim.getPersistentVolumeClaimLabels(topologyName)) + .endMetadata() + .withNewSpec() + .withStorageClassName("") + .withAccessModes(Collections.singletonList(accessModes)) + .withVolumeMode(volumeMode) + .withNewResources() + .addToRequests("storage", new Quantity(sizeLimit)) + .endResources() + .endSpec() + .build(); + + final List<V1PersistentVolumeClaim> expectedClaims = + new LinkedList<>(Arrays.asList(claimOne, claimStatic)); + + final List<V1PersistentVolumeClaim> actualClaims = + STATEFUL_SET.createPersistentVolumeClaims(mapPVCOpts); + + Assert.assertEquals("Generated claim sizes match", expectedClaims.size(), actualClaims.size()); + Assert.assertTrue(expectedClaims.containsAll(actualClaims)); + } + + @Test + public void testCreatePersistentVolumeClaimVolumesAndMounts() { + // Load configurations into test class. + STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS); + + final String volumeNameOne = "VolumeNameONE"; + final String volumeNameTwo = "VolumeNameTWO"; + final String claimNameOne = "claim-name-one"; + final String claimNameTwo = "OnDemand"; + final String mountPathOne = "/mount/path/ONE"; + final String mountPathTwo = "/mount/path/TWO"; + final String mountSubPathTwo = "/mount/sub/path/TWO"; + Map<String, Map<VolumeConfigKeys, String>> mapOfOpts = + ImmutableMap.of( + volumeNameOne, ImmutableMap.of( + VolumeConfigKeys.claimName, claimNameOne, + VolumeConfigKeys.path, mountPathOne), + volumeNameTwo, ImmutableMap.of( + VolumeConfigKeys.claimName, claimNameTwo, + VolumeConfigKeys.path, mountPathTwo, + VolumeConfigKeys.subPath, mountSubPathTwo) + ); + final V1Volume volumeOne = new V1VolumeBuilder() + .withName(volumeNameOne) + .withNewPersistentVolumeClaim() + .withClaimName(claimNameOne) + .endPersistentVolumeClaim() + .build(); + final V1Volume volumeTwo = new V1VolumeBuilder() + .withName(volumeNameTwo) + .withNewPersistentVolumeClaim() + .withClaimName(claimNameTwo) + .endPersistentVolumeClaim() + .build(); + final V1VolumeMount volumeMountOne = new V1VolumeMountBuilder() + .withName(volumeNameOne) + .withMountPath(mountPathOne) + .build(); + final V1VolumeMount volumeMountTwo = new V1VolumeMountBuilder() + .withName(volumeNameTwo) + .withMountPath(mountPathTwo) + .withSubPath(mountSubPathTwo) + .build(); + + // Test case container. + // Input: Map of Volume configurations. + // Output: The expected lists of Volumes and Volume Mounts. + final List<KubernetesUtils.TestTuple<Map<String, Map<VolumeConfigKeys, String>>, + Pair<List<V1Volume>, List<V1VolumeMount>>>> testCases = new LinkedList<>(); + + // Default case: No PVC provided. + testCases.add(new TestTuple<>("Generated an empty list of Volumes", new HashMap<>(), + new Pair<>(new LinkedList<>(), new LinkedList<>()))); + + // PVC Provided. + final Pair<List<V1Volume>, List<V1VolumeMount>> expectedFull = + new Pair<>( + new LinkedList<>(Arrays.asList(volumeOne, volumeTwo)), + new LinkedList<>(Arrays.asList(volumeMountOne, volumeMountTwo))); + testCases.add(new TestTuple<>("Generated a list of Volumes", mapOfOpts, + new Pair<>(expectedFull.first, expectedFull.second))); + + // Testing loop. + for (TestTuple<Map<String, Map<VolumeConfigKeys, String>>, + Pair<List<V1Volume>, List<V1VolumeMount>>> testCase : testCases) { + List<V1Volume> actualVolume = new LinkedList<>(); + List<V1VolumeMount> actualVolumeMount = new LinkedList<>(); + STATEFUL_SET.createVolumeAndMountsPersistentVolumeClaimCLI(testCase.input, + actualVolume, actualVolumeMount); + + Assert.assertTrue(testCase.description, + (testCase.expected.first).containsAll(actualVolume)); + Assert.assertTrue(testCase.description + " Mounts", + (testCase.expected.second).containsAll(actualVolumeMount)); + } + } + + @Test + public void testConfigurePodWithVolumesAndMountsFromCLI() { + // Load configurations into test class. + STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS); + + final String volumeNameClashing = "clashing-volume"; + final String volumeMountNameClashing = "original-volume-mount"; + V1Volume baseVolume = new V1VolumeBuilder() + .withName(volumeNameClashing) + .withNewPersistentVolumeClaim() + .withClaimName("Original Base Claim Name") + .endPersistentVolumeClaim() + .build(); + V1VolumeMount baseVolumeMount = new V1VolumeMountBuilder() + .withName(volumeMountNameClashing) + .withMountPath("/original/mount/path") + .build(); + V1Volume clashingVolume = new V1VolumeBuilder() + .withName(volumeNameClashing) + .withNewPersistentVolumeClaim() + .withClaimName("Clashing Claim Replaced") + .endPersistentVolumeClaim() + .build(); + V1VolumeMount clashingVolumeMount = new V1VolumeMountBuilder() + .withName(volumeMountNameClashing) + .withMountPath("/clashing/mount/path") + .build(); + V1Volume secondaryVolume = new V1VolumeBuilder() + .withName("secondary-volume") + .withNewPersistentVolumeClaim() + .withClaimName("Original Secondary Claim Name") + .endPersistentVolumeClaim() + .build(); + V1VolumeMount secondaryVolumeMount = new V1VolumeMountBuilder() + .withName("secondary-volume-mount") + .withMountPath("/secondary/mount/path") + .build(); + + // Test case container. + // Input: [0] Pod Spec to modify, [1] Heron container to modify, [2] List of Volumes + // [3] List of Volume Mounts. + // Output: The expected <V1PodSpec> and <V1Container>. + final List<TestTuple<Object[], Pair<V1PodSpec, V1Container>>> testCases = new LinkedList<>(); + + // No Persistent Volume Claim. + final V1PodSpec podSpecEmptyCase = new V1PodSpecBuilder().withVolumes(baseVolume).build(); + final V1Container executorEmptyCase = + new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build(); + final V1PodSpec expectedEmptyPodSpec = new V1PodSpecBuilder().withVolumes(baseVolume).build(); + final V1Container expectedEmptyExecutor = + new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build(); + + testCases.add(new TestTuple<>("Empty", + new Object[]{podSpecEmptyCase, executorEmptyCase, new LinkedList<>(), new LinkedList<>()}, + new Pair<>(expectedEmptyPodSpec, expectedEmptyExecutor))); + + // Non-clashing Persistent Volume Claim. + final V1PodSpec podSpecNoClashCase = new V1PodSpecBuilder() + .withVolumes(baseVolume) + .build(); + final V1Container executorNoClashCase = new V1ContainerBuilder() + .withVolumeMounts(baseVolumeMount) + .build(); + final V1PodSpec expectedNoClashPodSpec = new V1PodSpecBuilder() + .addToVolumes(baseVolume) + .addToVolumes(secondaryVolume) + .build(); + final V1Container expectedNoClashExecutor = new V1ContainerBuilder() + .addToVolumeMounts(baseVolumeMount) + .addToVolumeMounts(secondaryVolumeMount) + .build(); + + testCases.add(new TestTuple<>("No Clash", + new Object[]{podSpecNoClashCase, executorNoClashCase, + Collections.singletonList(secondaryVolume), + Collections.singletonList(secondaryVolumeMount)}, + new Pair<>(expectedNoClashPodSpec, expectedNoClashExecutor))); + + // Clashing Persistent Volume Claim. + final V1PodSpec podSpecClashCase = new V1PodSpecBuilder() + .withVolumes(baseVolume) + .build(); + final V1Container executorClashCase = new V1ContainerBuilder() + .withVolumeMounts(baseVolumeMount) + .build(); + final V1PodSpec expectedClashPodSpec = new V1PodSpecBuilder() + .addToVolumes(clashingVolume) + .addToVolumes(secondaryVolume) + .build(); + final V1Container expectedClashExecutor = new V1ContainerBuilder() + .addToVolumeMounts(clashingVolumeMount) + .addToVolumeMounts(secondaryVolumeMount) + .build(); + + testCases.add(new TestTuple<>("Clashing", + new Object[]{podSpecClashCase, executorClashCase, + Arrays.asList(clashingVolume, secondaryVolume), + Arrays.asList(clashingVolumeMount, secondaryVolumeMount)}, + new Pair<>(expectedClashPodSpec, expectedClashExecutor))); + + // Testing loop. + for (TestTuple<Object[], Pair<V1PodSpec, V1Container>> testCase : testCases) { + STATEFUL_SET + .configurePodWithVolumesAndMountsFromCLI((V1PodSpec) testCase.input[0], + (V1Container) testCase.input[1], (List<V1Volume>) testCase.input[2], + (List<V1VolumeMount>) testCase.input[3]); + + Assert.assertEquals("Pod Specs match " + testCase.description, + testCase.input[0], testCase.expected.first); + Assert.assertEquals("Executors match " + testCase.description, + testCase.input[1], testCase.expected.second); + } + } + @Test - public void testEmpty() { - Assert.assertTrue(true); + public void testSetShardIdEnvironmentVariableCommand() { + // Load configurations into test class. + STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS); + + List<TestTuple<Boolean, String>> testCases = new LinkedList<>(); + + testCases.add(new TestTuple<>("Executor command is set correctly", + true, "SHARD_ID=$((${POD_NAME##*-} + 1)) && echo shardId=${SHARD_ID}")); + testCases.add(new TestTuple<>("Manager command is set correctly", + false, "SHARD_ID=${POD_NAME##*-} && echo shardId=${SHARD_ID}")); + + for (TestTuple<Boolean, String> testCase : testCases) { + Assert.assertEquals(testCase.description, testCase.expected, + STATEFUL_SET.setShardIdEnvironmentVariableCommand(testCase.input)); + } + } + + @Test + public void testCreateResourcesRequirement() { + // Load configurations into test class. + STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS); + + final String managerCpuLimit = "3000m"; + final String managerMemLimit = "256Gi"; + final Quantity memory = Quantity.fromString(managerMemLimit); + final Quantity cpu = Quantity.fromString(managerCpuLimit); + final List<TestTuple<Map<String, String>, Map<String, Quantity>>> testCases = + new LinkedList<>(); + + // No input. + Map<String, String> inputEmpty = new HashMap<>(); + testCases.add(new TestTuple<>("Empty input.", inputEmpty, new HashMap<>())); + + // Only memory. + Map<String, String> inputMemory = new HashMap<String, String>() { + { + put(KubernetesConstants.MEMORY, managerMemLimit); + } + }; + Map<String, Quantity> expectedMemory = new HashMap<String, Quantity>() { + { + put(KubernetesConstants.MEMORY, memory); + } + }; + testCases.add(new TestTuple<>("Only memory input.", inputMemory, expectedMemory)); + + // Only CPU. + Map<String, String> inputCPU = new HashMap<String, String>() { + { + put(KubernetesConstants.CPU, managerCpuLimit); + } + }; + Map<String, Quantity> expectedCPU = new HashMap<String, Quantity>() { + { + put(KubernetesConstants.CPU, cpu); + } + }; + testCases.add(new TestTuple<>("Only CPU input.", inputCPU, expectedCPU)); + + // CPU and memory. + Map<String, String> inputMemoryCPU = new HashMap<String, String>() { + { + put(KubernetesConstants.MEMORY, managerMemLimit); + put(KubernetesConstants.CPU, managerCpuLimit); + } + }; + Map<String, Quantity> expectedMemoryCPU = new HashMap<String, Quantity>() { + { + put(KubernetesConstants.MEMORY, memory); + put(KubernetesConstants.CPU, cpu); + } + }; + testCases.add(new TestTuple<>("Memory and CPU input.", inputMemoryCPU, expectedMemoryCPU)); + + // Invalid. + Map<String, String> inputInvalid = new HashMap<String, String>() { + { + put("invalid input", "will not be ignored"); + put(KubernetesConstants.CPU, managerCpuLimit); + } + }; + Map<String, Quantity> expectedInvalid = new HashMap<String, Quantity>() { + { + put(KubernetesConstants.CPU, cpu); + } + }; + testCases.add(new TestTuple<>("Invalid input.", inputInvalid, expectedInvalid)); + + // Test loop. + for (TestTuple<Map<String, String>, Map<String, Quantity>> testCase : testCases) { + Map<String, Quantity> actual = + STATEFUL_SET.createResourcesRequirement(testCase.input); + Assert.assertEquals(testCase.description, testCase.expected, actual); + } + } + + @Test + public void testCreateVolumeAndMountsEmptyDirCLI() { + // Load configurations into test class. + STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS); + + final String volumeName = "volume-name-empty-dir"; + final String medium = "Memory"; + final String sizeLimit = "1Gi"; + final String path = "/path/to/mount"; + final String subPath = "/sub/path/to/mount"; + + // Empty Dir. + final Map<String, Map<VolumeConfigKeys, String>> config = + ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() { + { + put(VolumeConfigKeys.sizeLimit, sizeLimit); + put(VolumeConfigKeys.medium, "Memory"); + put(VolumeConfigKeys.path, path); + put(VolumeConfigKeys.subPath, subPath); + } + }); + final List<V1Volume> expectedVolumes = Collections.singletonList( + new V1VolumeBuilder() + .withName(volumeName) + .withNewEmptyDir() + .withMedium(medium) + .withNewSizeLimit(sizeLimit) + .endEmptyDir() + .build() + ); + final List<V1VolumeMount> expectedMounts = Collections.singletonList( + new V1VolumeMountBuilder() + .withName(volumeName) + .withMountPath(path) + .withSubPath(subPath) + .build() + ); + + List<V1Volume> actualVolumes = new LinkedList<>(); + List<V1VolumeMount> actualMounts = new LinkedList<>(); + STATEFUL_SET.createVolumeAndMountsEmptyDirCLI(config, actualVolumes, actualMounts); + Assert.assertEquals("Empty Dir Volume populated", expectedVolumes, actualVolumes); + Assert.assertEquals("Empty Dir Volume Mount populated", expectedMounts, actualMounts); + } + + @Test + public void testCreateVolumeAndMountsHostPathCLI() { + // Load configurations into test class. + STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS); + + final String volumeName = "volume-name-host-path"; + final String type = "DirectoryOrCreate"; + final String pathOnHost = "path.on.host"; + final String path = "/path/to/mount"; + final String subPath = "/sub/path/to/mount"; + + // Host Path. + final Map<String, Map<VolumeConfigKeys, String>> config = + ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() { + { + put(VolumeConfigKeys.type, type); + put(VolumeConfigKeys.pathOnHost, pathOnHost); + put(VolumeConfigKeys.path, path); + put(VolumeConfigKeys.subPath, subPath); + } + }); + final List<V1Volume> expectedVolumes = Collections.singletonList( + new V1VolumeBuilder() + .withName(volumeName) + .withNewHostPath() + .withNewType(type) + .withNewPath(pathOnHost) + .endHostPath() + .build() + ); + final List<V1VolumeMount> expectedMounts = Collections.singletonList( + new V1VolumeMountBuilder() + .withName(volumeName) + .withMountPath(path) + .withSubPath(subPath) + .build() + ); + + List<V1Volume> actualVolumes = new LinkedList<>(); + List<V1VolumeMount> actualMounts = new LinkedList<>(); + STATEFUL_SET.createVolumeAndMountsHostPathCLI(config, actualVolumes, actualMounts); + Assert.assertEquals("Host Path Volume populated", expectedVolumes, actualVolumes); + Assert.assertEquals("Host Path Volume Mount populated", expectedMounts, actualMounts); + } + + @Test + public void testCreateVolumeAndMountsNFSCLI() { + // Load configurations into test class. + STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS); + + final String volumeName = "volume-name-nfs"; + final String server = "nfs.server.address"; + final String pathOnNFS = "path.on.host"; + final String readOnly = "true"; + final String path = "/path/to/mount"; + final String subPath = "/sub/path/to/mount"; + + // NFS. + final Map<String, Map<VolumeConfigKeys, String>> config = + ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() { + { + put(VolumeConfigKeys.server, server); + put(VolumeConfigKeys.readOnly, readOnly); + put(VolumeConfigKeys.pathOnNFS, pathOnNFS); + put(VolumeConfigKeys.path, path); + put(VolumeConfigKeys.subPath, subPath); + } + }); + final List<V1Volume> expectedVolumes = Collections.singletonList( + new V1VolumeBuilder() + .withName(volumeName) + .withNewNfs() + .withServer(server) + .withPath(pathOnNFS) + .withReadOnly(Boolean.parseBoolean(readOnly)) + .endNfs() + .build() + ); + final List<V1VolumeMount> expectedMounts = Collections.singletonList( + new V1VolumeMountBuilder() + .withName(volumeName) + .withMountPath(path) + .withSubPath(subPath) + .withReadOnly(true) + .build() + ); + + List<V1Volume> actualVolumes = new LinkedList<>(); + List<V1VolumeMount> actualMounts = new LinkedList<>(); + STATEFUL_SET.createVolumeAndMountsNFSCLI(config, actualVolumes, actualMounts); + Assert.assertEquals("NFS Volume populated", expectedVolumes, actualVolumes); + Assert.assertEquals("NFS Volume Mount populated", expectedMounts, actualMounts); } }
