This is an automated email from the ASF dual-hosted git repository.
nicknezis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new fd30626 [Heron-3723] Add Kubernetes support for Empty Dir, Host Path,
and NFS via CLI (#3747)
fd30626 is described below
commit fd30626d70e3cc3284dcc527f9d0883f42ff1157
Author: Saad Ur Rahman <[email protected]>
AuthorDate: Sun Dec 19 01:03:32 2021 -0500
[Heron-3723] Add Kubernetes support for Empty Dir, Host Path, and NFS via
CLI (#3747)
Co-authored-by: Nicholas Nezis <[email protected]>
---
deploy/kubernetes/general/apiserver.yaml | 2 +-
deploy/kubernetes/helm/templates/tools.yaml | 2 +-
deploy/kubernetes/helm/values.yaml.template | 4 +-
deploy/kubernetes/minikube/apiserver.yaml | 2 +-
.../scheduler/kubernetes/KubernetesConstants.java | 27 +-
.../scheduler/kubernetes/KubernetesContext.java | 255 ++++++-
.../heron/scheduler/kubernetes/V1Controller.java | 294 ++++++--
.../kubernetes/KubernetesContextTest.java | 837 +++++++++++++++++++--
.../scheduler/kubernetes/V1ControllerTest.java | 348 +++++++--
.../docs/schedulers-k8s-execution-environment.md | 272 ++++++-
10 files changed, 1778 insertions(+), 265 deletions(-)
diff --git a/deploy/kubernetes/general/apiserver.yaml
b/deploy/kubernetes/general/apiserver.yaml
index 2e6290c..cd89d98 100644
--- a/deploy/kubernetes/general/apiserver.yaml
+++ b/deploy/kubernetes/general/apiserver.yaml
@@ -92,7 +92,7 @@ spec:
-D
heron.statefulstorage.classname=org.apache.heron.statefulstorage.dlog.DlogStorage
-D
heron.statefulstorage.dlog.namespace.uri=distributedlog://zookeeper:2181/heron
-D heron.kubernetes.pod.template.disabled=false
- -D heron.kubernetes.persistent.volume.claims.cli.disabled=false
+ -D heron.kubernetes.volume.from.cli.disabled=false
---
apiVersion: v1
diff --git a/deploy/kubernetes/helm/templates/tools.yaml
b/deploy/kubernetes/helm/templates/tools.yaml
index f418779d6..61ccd25 100644
--- a/deploy/kubernetes/helm/templates/tools.yaml
+++ b/deploy/kubernetes/helm/templates/tools.yaml
@@ -159,7 +159,7 @@ spec:
{{- end }}
-D heron.kubernetes.resource.request.mode={{
.Values.topologyResourceRequestMode }}
-D heron.kubernetes.pod.template.disabled={{
.Values.disablePodTemplates }}
- -D heron.kubernetes.persistent.volume.claims.cli.disabled={{
.Values.disablePersistentVolumeMountsCLI }}
+ -D heron.kubernetes.volume.from.cli.disabled={{
.Values.disableVolumesFromCLI }}
envFrom:
- configMapRef:
name: {{ .Release.Name }}-tools-config
diff --git a/deploy/kubernetes/helm/values.yaml.template
b/deploy/kubernetes/helm/values.yaml.template
index 870db57..8814bdf 100644
--- a/deploy/kubernetes/helm/values.yaml.template
+++ b/deploy/kubernetes/helm/values.yaml.template
@@ -77,8 +77,8 @@ packing: RoundRobin # ResourceCompliantRR, FirstFitDecreasing
# Support for ConfigMap mounted PodTemplates
disablePodTemplates: false
-# Support for Dynamic Persistent Volume Mounts from CLI input
-disablePersistentVolumeMountsCLI: false
+# Support for Voume specification from CLI input
+disableVolumesFromCLI: false
# Number of replicas for storage bookies, memory and storage requirements
bookieReplicas: 3
diff --git a/deploy/kubernetes/minikube/apiserver.yaml
b/deploy/kubernetes/minikube/apiserver.yaml
index ce20c90..e11cb8b 100644
--- a/deploy/kubernetes/minikube/apiserver.yaml
+++ b/deploy/kubernetes/minikube/apiserver.yaml
@@ -83,7 +83,7 @@ spec:
-D
heron.statefulstorage.classname=org.apache.heron.statefulstorage.dlog.DlogStorage
-D
heron.statefulstorage.dlog.namespace.uri=distributedlog://zookeeper:2181/heronbkdl
-D heron.kubernetes.pod.template.disabled=false
- -D heron.kubernetes.persistent.volume.claims.cli.disabled=false
+ -D heron.kubernetes.volume.from.cli.disabled=false
---
apiVersion: v1
diff --git
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
index c89a0c6..e231fda 100644
---
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
+++
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
@@ -22,8 +22,10 @@ package org.apache.heron.scheduler.kubernetes;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.regex.Pattern;
import org.apache.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
@@ -115,13 +117,34 @@ public final class KubernetesConstants {
)
);
- enum VolumeClaimTemplateConfigKeys {
+ protected enum VolumeConfigKeys {
claimName,
storageClassName,
sizeLimit,
accessModes,
volumeMode,
- path, // Added to container.
+ medium,
+ type,
+ readOnly,
+ server,
+ pathOnHost,
+ pathOnNFS,
+ path, // Added to container, nfsVolume, hostPath.
subPath, // Added to container.
}
+
+ protected static final Set<String> VALID_VOLUME_HOSTPATH_TYPES =
Collections.unmodifiableSet(
+ new HashSet<String>() {
+ {
+ add("");
+ add("DirectoryOrCreate");
+ add("Directory");
+ add("FileOrCreate");
+ add("File");
+ add("Socket");
+ add("CharDevice");
+ add("BlockDevice");
+ }
+ }
+ );
}
diff --git
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
index 83aa0b2..242e4f1 100644
---
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
+++
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
@@ -27,6 +27,8 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.heron.scheduler.TopologySubmissionException;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
@@ -114,11 +116,20 @@ public final class KubernetesContext extends Context {
"heron.kubernetes.pod.secretKeyRef.";
// Persistent Volume Claims
- public static final String KUBERNETES_PERSISTENT_VOLUME_CLAIMS_CLI_DISABLED =
- "heron.kubernetes.persistent.volume.claims.cli.disabled";
+ public static final String KUBERNETES_VOLUME_FROM_CLI_DISABLED =
+ "heron.kubernetes.volume.from.cli.disabled";
// heron.kubernetes.[executor |
manager].volumes.persistentVolumeClaim.VOLUME_NAME.OPTION=VALUE
public static final String KUBERNETES_VOLUME_CLAIM_PREFIX =
"heron.kubernetes.%s.volumes.persistentVolumeClaim.";
+ // heron.kubernetes.[executor |
manager].volumes.emptyDir.VOLUME_NAME.OPTION=VALUE
+ public static final String KUBERNETES_VOLUME_EMPTYDIR_PREFIX =
+ "heron.kubernetes.%s.volumes.emptyDir.";
+ // heron.kubernetes.[executor |
manager].volumes.hostPath.VOLUME_NAME.OPTION=VALUE
+ public static final String KUBERNETES_VOLUME_HOSTPATH_PREFIX =
+ "heron.kubernetes.%s.volumes.hostPath.";
+ // heron.kubernetes.[executor | manager].volumes.nfs.VOLUME_NAME.OPTION=VALUE
+ public static final String KUBERNETES_VOLUME_NFS_PREFIX =
+ "heron.kubernetes.%s.volumes.nfs.";
// heron.kubernetes.[executor | manager].limits.OPTION=VALUE
public static final String KUBERNETES_RESOURCE_LIMITS_PREFIX =
"heron.kubernetes.%s.limits.";
@@ -242,8 +253,8 @@ public final class KubernetesContext extends Context {
return getConfigItemsByPrefix(config, key);
}
- public static boolean getPersistentVolumeClaimDisabled(Config config) {
- final String disabled =
config.getStringValue(KUBERNETES_PERSISTENT_VOLUME_CLAIMS_CLI_DISABLED);
+ public static boolean getVolumesFromCLIDisabled(Config config) {
+ final String disabled =
config.getStringValue(KUBERNETES_VOLUME_FROM_CLI_DISABLED);
return "true".equalsIgnoreCase(disabled);
}
@@ -251,15 +262,17 @@ public final class KubernetesContext extends Context {
* Collects parameters form the <code>CLI</code> and generates a mapping
between <code>Volumes</code>
* and their configuration <code>key-value</code> pairs.
* @param config Contains the configuration options collected from the
<code>CLI</code>.
- * @param isExecutor Flag used to collect CLI commands for the
<code>executor</code> and <code>manager</code>.
+ * @param prefix Configuration key to lookup for options.
+ * @param isExecutor Flag used to switch CLI commands for the
<code>Executor</code> and <code>Manager</code>.
* @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
* Will return an empty list if there are no Volume Claim Templates to be
generated.
*/
- public static Map<String,
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>>
- getVolumeClaimTemplates(Config config, boolean isExecutor) {
+ @VisibleForTesting
+ protected static Map<String, Map<KubernetesConstants.VolumeConfigKeys,
String>>
+ getVolumeConfigs(final Config config, final String prefix, final boolean
isExecutor) {
final Logger LOG = Logger.getLogger(V1Controller.class.getName());
- final String prefixKey = String.format(KUBERNETES_VOLUME_CLAIM_PREFIX,
+ final String prefixKey = String.format(prefix,
isExecutor ? KubernetesConstants.EXECUTOR_NAME :
KubernetesConstants.MANAGER_NAME);
final Set<String> completeConfigParam = getConfigKeys(config, prefixKey);
final int prefixLength = prefixKey.length();
@@ -267,19 +280,17 @@ public final class KubernetesContext extends Context {
final int optionIdx = 1;
final Matcher matcher =
KubernetesConstants.VALID_LOWERCASE_RFC_1123_REGEX.matcher("");
- final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String>> volumes
- = new HashMap<>();
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volumes = new HashMap<>();
try {
for (String param : completeConfigParam) {
final String[] tokens = param.substring(prefixLength).split("\\.");
final String volumeName = tokens[volumeNameIdx];
- final KubernetesConstants.VolumeClaimTemplateConfigKeys key =
-
KubernetesConstants.VolumeClaimTemplateConfigKeys.valueOf(tokens[optionIdx]);
+ final KubernetesConstants.VolumeConfigKeys key =
+ KubernetesConstants.VolumeConfigKeys.valueOf(tokens[optionIdx]);
final String value = config.getStringValue(param);
- Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String> volume =
- volumes.get(volumeName);
+ Map<KubernetesConstants.VolumeConfigKeys, String> volume =
volumes.get(volumeName);
if (volume == null) {
// Validate new Volume Names.
if (!matcher.reset(volumeName).matches()) {
@@ -291,31 +302,209 @@ public final class KubernetesContext extends Context {
volumes.put(volumeName, volume);
}
- /* Validate Claim and Storage Class names.
- [1] `claimNameNotOnDemand`: checks for a `claimName` which is not
`OnDemand`.
- [2] `storageClassName`: Check if it is the provided `option`.
- Conditions [1] OR [2] are True, then...
- [3] Check for a valid lowercase RFC-1123 pattern.
- */
- boolean claimNameNotOnDemand =
-
KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName.equals(key)
- &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(value);
- if ((claimNameNotOnDemand // [1]
- ||
-
KubernetesConstants.VolumeClaimTemplateConfigKeys.storageClassName.equals(key))
// [2]
- && !matcher.reset(value).matches()) { // [3]
- throw new TopologySubmissionException(
- String.format("Option `%s` value `%s` does not match lowercase
RFC-1123 pattern",
- key, value));
- }
-
volume.put(key, value);
}
} catch (IndexOutOfBoundsException | IllegalArgumentException e) {
- final String message = "Invalid Persistent Volume Claim CLI parameter
provided";
+ final String message = "Invalid Volume configuration option provided on
CLI";
LOG.log(Level.CONFIG, message);
throw new TopologySubmissionException(message);
}
+
+ // All Volumes must contain a path.
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+ final String path =
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.path);
+ if (path == null || path.isEmpty()) {
+ throw new TopologySubmissionException(String.format("Volume `%s`: All
Volumes require a"
+ + " 'path'.", volume.getKey()));
+ }
+ }
+
+ // Check to see if functionality is disabled.
+ if (KubernetesContext.getVolumesFromCLIDisabled(config) &&
!volumes.isEmpty()) {
+ final String message = "Configuring Volumes from the CLI is disabled.";
+ LOG.log(Level.WARNING, message);
+ throw new TopologySubmissionException(message);
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>PVC</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+ getVolumeClaimTemplates(final Config config, final boolean isExecutor) {
+ final Matcher matcher =
KubernetesConstants.VALID_LOWERCASE_RFC_1123_REGEX.matcher("");
+
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volumes =
+ getVolumeConfigs(config,
KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX, isExecutor);
+
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+
+ // Claim name is required.
+ if
(!volume.getValue().containsKey(KubernetesConstants.VolumeConfigKeys.claimName))
{
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Persistent Volume"
+ + " Claims require a `claimName`.", volume.getKey()));
+ }
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> volumeConfig
+ : volume.getValue().entrySet()) {
+ final KubernetesConstants.VolumeConfigKeys key = volumeConfig.getKey();
+ final String value = volumeConfig.getValue();
+
+ switch (key) {
+ case claimName:
+ // Claim names which are not OnDemand should be lowercase RFC-1123.
+ if (!matcher.reset(value).matches()
+ &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(value)) {
+ throw new TopologySubmissionException(String.format("Volume
`%s`: `claimName` does"
+ + " not match lowercase RFC-1123 pattern", volume.getKey()));
+ }
+ break;
+ case storageClassName:
+ if (!matcher.reset(value).matches()) {
+ throw new TopologySubmissionException(String.format("Volume
`%s`: `storageClassName`"
+ + " does not match lowercase RFC-1123 pattern",
volume.getKey()));
+ }
+ break;
+ case sizeLimit: case accessModes: case volumeMode: case readOnly:
case path: case subPath:
+ break;
+ default:
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Invalid Persistent"
+ + " Volume Claim type option for '%s'", volume.getKey(), key));
+ }
+ }
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>Empty Directory</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+ getVolumeEmptyDir(final Config config, final boolean isExecutor) {
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volumes =
+ getVolumeConfigs(config,
KubernetesContext.KUBERNETES_VOLUME_EMPTYDIR_PREFIX, isExecutor);
+
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+ final String medium =
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.medium);
+
+ if (medium != null && !medium.isEmpty() && !"Memory".equals(medium)) {
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Empty Directory"
+ + " 'medium' must be 'Memory' or empty.", volume.getKey()));
+ }
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> volumeConfig
+ : volume.getValue().entrySet()) {
+ final KubernetesConstants.VolumeConfigKeys key = volumeConfig.getKey();
+
+ switch (key) {
+ case sizeLimit: case medium: case readOnly: case path: case subPath:
+ break;
+ default:
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Invalid Empty"
+ + " Directory type option for '%s'", volume.getKey(), key));
+ }
+ }
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>Host Path</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+ getVolumeHostPath(final Config config, final boolean isExecutor) {
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volumes =
+ getVolumeConfigs(config,
KubernetesContext.KUBERNETES_VOLUME_HOSTPATH_PREFIX, isExecutor);
+
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+ final String type =
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.type);
+ if (type != null &&
!KubernetesConstants.VALID_VOLUME_HOSTPATH_TYPES.contains(type)) {
+ throw new TopologySubmissionException(String.format("Volume `%s`: Host
Path"
+ + " 'type' of '%s' is invalid.", volume.getKey(), type));
+ }
+ final String hostOnPath =
+
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.pathOnHost);
+ if (hostOnPath == null || hostOnPath.isEmpty()) {
+ throw new TopologySubmissionException(String.format("Volume `%s`: Host
Path requires a"
+ + " path on the host.", volume.getKey()));
+ }
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> volumeConfig
+ : volume.getValue().entrySet()) {
+ final KubernetesConstants.VolumeConfigKeys key = volumeConfig.getKey();
+
+ switch (key) {
+ case type: case pathOnHost: case readOnly: case path: case subPath:
+ break;
+ default:
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Invalid Host Path"
+ + " option for '%s'", volume.getKey(), key));
+ }
+ }
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>NFS</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+ getVolumeNFS(final Config config, final boolean isExecutor) {
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volumes =
+ getVolumeConfigs(config,
KubernetesContext.KUBERNETES_VOLUME_NFS_PREFIX, isExecutor);
+
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+ final String server =
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.server);
+ if (server == null || server.isEmpty()) {
+ throw new TopologySubmissionException(String.format("Volume `%s`:
`NFS` volumes require a"
+ + " `server` to be specified", volume.getKey()));
+ }
+ final String hostOnNFS =
+
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.pathOnNFS);
+ if (hostOnNFS == null || hostOnNFS.isEmpty()) {
+ throw new TopologySubmissionException(String.format("Volume `%s`: NFS
requires a path on"
+ + " the NFS server.", volume.getKey()));
+ }
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> volumeConfig
+ : volume.getValue().entrySet()) {
+ final KubernetesConstants.VolumeConfigKeys key = volumeConfig.getKey();
+
+ switch (key) {
+ case server: case pathOnNFS: case readOnly: case path: case subPath:
+ break;
+ default:
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Invalid NFS option"
+ + " for '%s'", volume.getKey(), key));
+ }
+ }
+ }
+
return volumes;
}
diff --git
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index 96793b6..3bde340 100644
---
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -103,8 +103,10 @@ public class V1Controller extends KubernetesController {
super(configuration, runtimeConfiguration);
isPodTemplateDisabled =
KubernetesContext.getPodTemplateDisabled(configuration);
- LOG.log(Level.WARNING, String.format("Custom Pod Templates are %s",
+ LOG.log(Level.WARNING, String.format("Pod Template configuration is %s",
isPodTemplateDisabled ? "DISABLED" : "ENABLED"));
+ LOG.log(Level.WARNING, String.format("Volume configuration from CLI is %s",
+ KubernetesContext.getVolumesFromCLIDisabled(configuration) ?
"DISABLED" : "ENABLED"));
try {
final ApiClient apiClient =
io.kubernetes.client.util.Config.defaultClient();
@@ -415,17 +417,21 @@ public class V1Controller extends KubernetesController {
final String topologyName = getTopologyName();
final Config runtimeConfiguration = getRuntimeConfiguration();
- // Get and then create Persistent Volume Claims from the CLI.
- final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String>> configsPVC =
+ final List<V1Volume> volumes = new LinkedList<>();
+ final List<V1VolumeMount> volumeMounts = new LinkedList<>();
+
+ // Collect Persistent Volume Claim configurations from the CLI.
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
configsPVC =
KubernetesContext.getVolumeClaimTemplates(getConfiguration(),
isExecutor);
- if (KubernetesContext.getPersistentVolumeClaimDisabled(getConfiguration())
- && !configsPVC.isEmpty()) {
- final String message =
- String.format("'%s': Configuring Persistent Volume Claim from CLI is
disabled",
- topologyName);
- LOG.log(Level.WARNING, message);
- throw new TopologySubmissionException(message);
- }
+
+ // Collect all Volume configurations from the CLI and generate Volumes and
Volume Mounts.
+ createVolumeAndMountsPersistentVolumeClaimCLI(configsPVC, volumes,
volumeMounts);
+ createVolumeAndMountsHostPathCLI(
+ KubernetesContext.getVolumeHostPath(getConfiguration(), isExecutor),
volumes, volumeMounts);
+ createVolumeAndMountsEmptyDirCLI(
+ KubernetesContext.getVolumeEmptyDir(getConfiguration(), isExecutor),
volumes, volumeMounts);
+ createVolumeAndMountsNFSCLI(
+ KubernetesContext.getVolumeNFS(getConfiguration(), isExecutor),
volumes, volumeMounts);
final V1StatefulSet statefulSet = new V1StatefulSet();
@@ -465,7 +471,8 @@ public class V1Controller extends KubernetesController {
templateMetaData.setAnnotations(annotations);
podTemplateSpec.setMetadata(templateMetaData);
- configurePodSpec(podTemplateSpec, containerResource, numberOfInstances,
isExecutor, configsPVC);
+ configurePodSpec(podTemplateSpec, containerResource, numberOfInstances,
isExecutor,
+ volumes, volumeMounts);
statefulSetSpec.setTemplate(podTemplateSpec);
@@ -519,11 +526,12 @@ public class V1Controller extends KubernetesController {
* @param resource Passed down to configure the resource limits.
* @param numberOfInstances Passed down to configure the ports.
* @param isExecutor Flag used to configure components specific to
<code>Executor</code> and <code>Manager</code>.
- * @param configPVC <code>Persistent Volume Claim</code> configurations
options.
+ * @param volumes <code>Volumes</code> generated from configurations options.
+ * @param volumeMounts <code>Volume Mounts</code> generated from
configurations options.
*/
private void configurePodSpec(final V1PodTemplateSpec podTemplateSpec,
Resource resource,
- int numberOfInstances, boolean isExecutor,
- final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String>> configPVC) {
+ int numberOfInstances, boolean isExecutor, List<V1Volume> volumes,
+ List<V1VolumeMount> volumeMounts) {
if (podTemplateSpec.getSpec() == null) {
podTemplateSpec.setSpec(new V1PodSpec());
}
@@ -561,8 +569,9 @@ public class V1Controller extends KubernetesController {
containers.add(heronContainer);
}
- if (!configPVC.isEmpty()) {
- configurePodWithPersistentVolumeClaimVolumesAndMounts(podSpec,
heronContainer, configPVC);
+ if (!volumes.isEmpty() || !volumeMounts.isEmpty()) {
+ configurePodWithVolumesAndMountsFromCLI(podSpec, heronContainer, volumes,
+ volumeMounts);
}
configureHeronContainer(resource, numberOfInstances, heronContainer,
isExecutor);
@@ -1043,17 +1052,16 @@ public class V1Controller extends KubernetesController {
*/
@VisibleForTesting
protected List<V1PersistentVolumeClaim> createPersistentVolumeClaims(
- final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String>> mapOfOpts) {
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
mapOfOpts) {
List<V1PersistentVolumeClaim> listOfPVCs = new LinkedList<>();
// Iterate over all the PVC Volumes.
- for (Map.Entry<String,
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>> pvc
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
pvc
: mapOfOpts.entrySet()) {
// Only create claims for `OnDemand` volumes.
- final String claimName = pvc.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName);
+ final String claimName =
pvc.getValue().get(KubernetesConstants.VolumeConfigKeys.claimName);
if (claimName != null &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
continue;
}
@@ -1064,11 +1072,12 @@ public class V1Controller extends KubernetesController {
.withLabels(getPersistentVolumeClaimLabels(getTopologyName()))
.endMetadata()
.withNewSpec()
+ .withStorageClassName("")
.endSpec()
.build();
// Populate PVC options.
- for (Map.Entry<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String> option
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> option
: pvc.getValue().entrySet()) {
String optionValue = option.getValue();
switch(option.getKey()) {
@@ -1087,12 +1096,8 @@ public class V1Controller extends KubernetesController {
claim.getSpec().setVolumeMode(optionValue);
break;
// Valid ignored options not used in a PVC.
- case path: case subPath: case claimName:
- break;
default:
- throw new TopologySubmissionException(
- String.format("Invalid Persistent Volume Claim type option for
'%s'",
- option.getKey()));
+ break;
}
}
listOfPVCs.add(claim);
@@ -1101,80 +1106,212 @@ public class V1Controller extends KubernetesController
{
}
/**
- * Generates the <code>Volume</code> and <code>Volume Mounts</code> to be
placed in the <code>executor container</code>.
- * @param mapConfig Mapping of <code>Volumes</code> to
<code>key-value</code> configuration pairs.
- * @return A pair of configured lists of <code>V1Volume</code> and
<code>V1VolumeMount</code>.
+ * Generates the <code>Volume Mounts</code> to be placed in the
<code>Executor</code>
+ * and <code>Manager</code> from options on the CLI.
+ * @param volumeName Name of the <code>Volume</code>.
+ * @param configs Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @return A configured <code>V1VolumeMount</code>.
+ */
+ @VisibleForTesting
+ protected V1VolumeMount createVolumeMountsCLI(final String volumeName,
+ final Map<KubernetesConstants.VolumeConfigKeys, String> configs) {
+ final V1VolumeMount volumeMount = new V1VolumeMountBuilder()
+ .withName(volumeName)
+ .build();
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config :
configs.entrySet()) {
+ switch (config.getKey()) {
+ case path:
+ volumeMount.mountPath(config.getValue());
+ break;
+ case subPath:
+ volumeMount.subPath(config.getValue());
+ break;
+ case readOnly:
+ volumeMount.readOnly(Boolean.parseBoolean(config.getValue()));
+ break;
+ default:
+ break;
+ }
+ }
+
+ return volumeMount;
+ }
+
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>Persistent Volume Claims</code>s
+ * to be placed in the <code>Executor</code> and <code>Manager</code> from
options on the CLI.
+ * @param mapConfig Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
*/
@VisibleForTesting
- protected Pair<List<V1Volume>, List<V1VolumeMount>>
createPersistentVolumeClaimVolumesAndMounts(
- final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String>> mapConfig) {
- List<V1Volume> volumeList = new LinkedList<>();
- List<V1VolumeMount> mountList = new LinkedList<>();
- for (Map.Entry<String,
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>> configs
+ protected void createVolumeAndMountsPersistentVolumeClaimCLI(
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
mapConfig,
+ final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
configs
: mapConfig.entrySet()) {
final String volumeName = configs.getKey();
- final String path = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.path);
- final String subPath = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.subPath);
-
- if (path == null || path.isEmpty()) {
- throw new TopologySubmissionException(
- String.format("A mount path is required and missing from '%s'",
volumeName));
- }
// Do not create Volumes for `OnDemand`.
final String claimName = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName);
+ .get(KubernetesConstants.VolumeConfigKeys.claimName);
if (claimName != null &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
- final V1Volume volume = new V1VolumeBuilder()
- .withName(volumeName)
- .withNewPersistentVolumeClaim()
- .withClaimName(claimName)
- .endPersistentVolumeClaim()
- .build();
- volumeList.add(volume);
+ volumes.add(
+ new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewPersistentVolumeClaim()
+ .withClaimName(claimName)
+ .endPersistentVolumeClaim()
+ .build()
+ );
}
+ volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+ }
+ }
- final V1VolumeMountBuilder volumeMount = new V1VolumeMountBuilder()
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>emptyDir</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options
on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsEmptyDirCLI(
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
mapOfOpts,
+ final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
configs
+ : mapOfOpts.entrySet()) {
+ final String volumeName = configs.getKey();
+ final V1Volume volume = new V1VolumeBuilder()
.withName(volumeName)
- .withMountPath(path);
- if (subPath != null && !subPath.isEmpty()) {
- volumeMount.withSubPath(subPath);
+ .withNewEmptyDir()
+ .endEmptyDir()
+ .build();
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config
+ : configs.getValue().entrySet()) {
+ switch(config.getKey()) {
+ case medium:
+ volume.getEmptyDir().medium(config.getValue());
+ break;
+ case sizeLimit:
+ volume.getEmptyDir().sizeLimit(new Quantity(config.getValue()));
+ break;
+ default:
+ break;
+ }
}
- mountList.add(volumeMount.build());
+ volumes.add(volume);
+ volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
}
- return new Pair<>(volumeList, mountList);
}
/**
- * Makes a call to generate <code>Volumes</code> and <code>Volume
Mounts</code> and then inserts them.
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>Host Path</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options
on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsHostPathCLI(
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
mapOfOpts,
+ final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
configs
+ : mapOfOpts.entrySet()) {
+ final String volumeName = configs.getKey();
+ final V1Volume volume = new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewHostPath()
+ .endHostPath()
+ .build();
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config
+ : configs.getValue().entrySet()) {
+ switch(config.getKey()) {
+ case type:
+ volume.getHostPath().setType(config.getValue());
+ break;
+ case pathOnHost:
+ volume.getHostPath().setPath(config.getValue());
+ break;
+ default:
+ break;
+ }
+ }
+ volumes.add(volume);
+ volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+ }
+ }
+
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>NFS</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options
on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsNFSCLI(
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
mapOfOpts,
+ final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
configs
+ : mapOfOpts.entrySet()) {
+ final String volumeName = configs.getKey();
+ final V1Volume volume = new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewNfs()
+ .endNfs()
+ .build();
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config
+ : configs.getValue().entrySet()) {
+ switch(config.getKey()) {
+ case server:
+ volume.getNfs().setServer(config.getValue());
+ break;
+ case pathOnNFS:
+ volume.getNfs().setPath(config.getValue());
+ break;
+ case readOnly:
+
volume.getNfs().setReadOnly(Boolean.parseBoolean(config.getValue()));
+ break;
+ default:
+ break;
+ }
+ }
+ volumes.add(volume);
+ volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+ }
+ }
+
+ /**
+ * Configures the Pod Spec and Heron container with <code>Volumes</code> and
<code>Volume Mounts</code>.
* @param podSpec All generated <code>V1Volume</code> will be placed in the
<code>Pod Spec</code>.
* @param executor All generated <code>V1VolumeMount</code> will be placed
in the <code>Container</code>.
- * @param configPVC <code>Persistent Volume Claim</code> configurations
options.
+ * @param volumes <code>Volumes</code> to be inserted in the Pod Spec.
+ * @param volumeMounts <code>Volumes Mounts</code> to be inserted in the
Heron container.
*/
@VisibleForTesting
- protected void configurePodWithPersistentVolumeClaimVolumesAndMounts(final
V1PodSpec podSpec,
- final V1Container executor,
- final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String>> configPVC) {
- Pair<List<V1Volume>, List<V1VolumeMount>> volumesAndMounts =
- createPersistentVolumeClaimVolumesAndMounts(configPVC);
+ protected void configurePodWithVolumesAndMountsFromCLI(final V1PodSpec
podSpec,
+ final V1Container executor, List<V1Volume> volumes, List<V1VolumeMount>
volumeMounts) {
// Deduplicate on Names with Persistent Volume Claims taking precedence.
- KubernetesUtils.V1ControllerUtils<V1VolumeMount> utilsMounts =
- new KubernetesUtils.V1ControllerUtils<>();
- executor.setVolumeMounts(
- utilsMounts.mergeListsDedupe(volumesAndMounts.second,
executor.getVolumeMounts(),
- Comparator.comparing(V1VolumeMount::getName),
- "Executor and Persistent Volume Claim Volume Mounts"));
-
KubernetesUtils.V1ControllerUtils<V1Volume> utilsVolumes =
new KubernetesUtils.V1ControllerUtils<>();
podSpec.setVolumes(
- utilsVolumes.mergeListsDedupe(volumesAndMounts.first,
podSpec.getVolumes(),
+ utilsVolumes.mergeListsDedupe(volumes, podSpec.getVolumes(),
Comparator.comparing(V1Volume::getName),
- "Pod and Persistent Volume Claim Volumes"));
+ "Pod with Volumes"));
+
+ KubernetesUtils.V1ControllerUtils<V1VolumeMount> utilsMounts =
+ new KubernetesUtils.V1ControllerUtils<>();
+ executor.setVolumeMounts(
+ utilsMounts.mergeListsDedupe(volumeMounts, executor.getVolumeMounts(),
+ Comparator.comparing(V1VolumeMount::getName),
+ "Heron container with Volume Mounts"));
}
/**
@@ -1186,12 +1323,11 @@ public class V1Controller extends KubernetesController {
* onDemand: <code>true</code>
*/
private void removePersistentVolumeClaims() {
- final String topologyName = getTopologyName();
+ final String name = getTopologyName();
final StringBuilder selectorLabel = new StringBuilder();
// Generate selector label.
- for (Map.Entry<String, String> label
- : getPersistentVolumeClaimLabels(topologyName).entrySet()) {
+ for (Map.Entry<String, String> label :
getPersistentVolumeClaimLabels(name).entrySet()) {
if (selectorLabel.length() != 0) {
selectorLabel.append(",");
}
@@ -1218,11 +1354,11 @@ public class V1Controller extends KubernetesController {
LOG.log(Level.INFO,
String.format("Removing automatically generated Persistent Volume
Claims for `%s`:%n%s",
- topologyName, status.getMessage()));
+ name, status.getMessage()));
} catch (ApiException e) {
final String message = String.format("Failed to connect to K8s cluster
to delete Persistent "
+ "Volume Claims for topology `%s`. A manual clean-up is
required.%n%s",
- topologyName, e.getMessage());
+ name, e.getMessage());
LOG.log(Level.WARNING, message);
throw new TopologyRuntimeManagementException(message);
}
diff --git
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
index 988456f..da1b21d 100644
---
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
+++
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
@@ -20,10 +20,13 @@
package org.apache.heron.scheduler.kubernetes;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.ImmutableMap;
+
import org.junit.Assert;
import org.junit.Test;
@@ -32,7 +35,7 @@ import org.apache.heron.scheduler.TopologySubmissionException;
import org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple;
import org.apache.heron.spi.common.Config;
-import static
org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeClaimTemplateConfigKeys;
+import static
org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeConfigKeys;
public class KubernetesContextTest {
@@ -82,41 +85,43 @@ public class KubernetesContextTest {
}
@Test
- public void testPersistentVolumeClaimDisabled() {
-
Assert.assertFalse(KubernetesContext.getPersistentVolumeClaimDisabled(config));
+ public void testVolumesFromCLIDisabled() {
+ Assert.assertFalse(KubernetesContext.getVolumesFromCLIDisabled(config));
Assert.assertFalse(KubernetesContext
- .getPersistentVolumeClaimDisabled(configWithPodTemplateConfigMap));
+ .getVolumesFromCLIDisabled(configWithPodTemplateConfigMap));
final Config configWithPodTemplateConfigMapOff = Config.newBuilder()
.put(KubernetesContext.KUBERNETES_POD_TEMPLATE_LOCATION,
POD_TEMPLATE_CONFIGMAP_NAME)
-
.put(KubernetesContext.KUBERNETES_PERSISTENT_VOLUME_CLAIMS_CLI_DISABLED, "TRUE")
+ .put(KubernetesContext.KUBERNETES_VOLUME_FROM_CLI_DISABLED, "TRUE")
.build();
Assert.assertTrue(KubernetesContext
- .getPersistentVolumeClaimDisabled(configWithPodTemplateConfigMapOff));
+ .getVolumesFromCLIDisabled(configWithPodTemplateConfigMapOff));
}
- @Test
- public void testGetVolumeClaimTemplates() {
+ /**
+ * Generate <code>Volume</code> Configs for testing.
+ * @param testCases Test case container.
+ * Input: [0] Config, [1] Boolean to indicate
Manager/Executor.
+ * Output: [0] expectedKeys, [1] expectedOptionsKeys, [2]
expectedOptionsValues.
+ * @param prefix Configuration prefix key to use in lookup.
+ */
+ private void createVolumeConfigs(List<TestTuple<Pair<Config, Boolean>,
Object[]>> testCases,
+ String prefix) {
+ final String keyPattern = prefix + "%%s.%%s";
+ final String keyExecutor = String.format(keyPattern,
KubernetesConstants.EXECUTOR_NAME);
+ final String keyManager = String.format(keyPattern,
KubernetesConstants.MANAGER_NAME);
+
final String volumeNameOne = "volume-name-one";
final String volumeNameTwo = "volume-name-two";
final String claimName = "OnDeMaNd";
- final String keyPattern = KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX
+ "%%s.%%s";
- final String keyExecutor = String.format(keyPattern,
KubernetesConstants.EXECUTOR_NAME);
- final String keyManager = String.format(keyPattern,
KubernetesConstants.MANAGER_NAME);
- final String storageClassField =
VolumeClaimTemplateConfigKeys.storageClassName.name();
- final String pathField = VolumeClaimTemplateConfigKeys.path.name();
- final String claimNameField =
VolumeClaimTemplateConfigKeys.claimName.name();
+ final String storageClassField = VolumeConfigKeys.storageClassName.name();
+ final String pathField = VolumeConfigKeys.path.name();
+ final String claimNameField = VolumeConfigKeys.claimName.name();
final String expectedStorageClass = "expected-storage-class";
final String expectedPath = "/path/for/volume/expected";
-
- // Test case container.
- // Input: Config to extract options from, Boolean to indicate
Manager/Executor.
- // Output: [0] expectedKeys, [1] expectedOptionsKeys, [2]
expectedOptionsValues.
- final List<TestTuple<Pair<Config, Boolean>, Object[]>> testCases = new
LinkedList<>();
-
// Create test cases for Executor/Manager on even/odd indices respectively.
for (int idx = 0; idx < 2; ++idx) {
@@ -149,28 +154,54 @@ public class KubernetesContextTest {
.build();
final List<String> expectedKeys = Arrays.asList(volumeNameOne,
volumeNameTwo);
- final List<VolumeClaimTemplateConfigKeys> expectedOptionsKeys =
- Arrays.asList(VolumeClaimTemplateConfigKeys.path,
- VolumeClaimTemplateConfigKeys.storageClassName,
- VolumeClaimTemplateConfigKeys.claimName);
+ final List<VolumeConfigKeys> expectedOptionsKeys =
+ Arrays.asList(VolumeConfigKeys.path,
+ VolumeConfigKeys.storageClassName,
+ VolumeConfigKeys.claimName);
final List<String> expectedOptionsValues =
Arrays.asList(expectedPath, expectedStorageClass, claimName);
testCases.add(new TestTuple<>(description,
new Pair<>(configPVC, isExecutor),
new Object[]{expectedKeys, expectedOptionsKeys,
expectedOptionsValues}));
+
+ final Config configPVCDisabled = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_VOLUME_FROM_CLI_DISABLED, "true")
+ .put(pathKeyOne, expectedPath)
+ .put(pathKeyTwo, expectedPath)
+ .put(claimNameKeyOne, claimName)
+ .put(claimNameKeyTwo, claimName)
+ .put(storageClassKeyOne, expectedStorageClass)
+ .put(storageClassKeyTwo, expectedStorageClass)
+ .build();
+
+ testCases.add(new TestTuple<>(description + " Disabled should not error",
+ new Pair<>(configPVCDisabled, !isExecutor),
+ new Object[]{new LinkedList<String>(), new
LinkedList<VolumeConfigKeys>(),
+ new LinkedList<String>()}));
}
+ }
+
+ @Test
+ public void testGetVolumeConfigs() {
+ final String prefix = KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX;
+
+ // Test case container.
+ // Input: [0] Config, [1] Boolean to indicate Manager/Executor.
+ // Output: [0] expectedKeys, [1] expectedOptionsKeys, [2]
expectedOptionsValues.
+ final List<TestTuple<Pair<Config, Boolean>, Object[]>> testCases = new
LinkedList<>();
+ createVolumeConfigs(testCases, prefix);
// Test loop.
for (TestTuple<Pair<Config, Boolean>, Object[]> testCase : testCases) {
- final Map<String, Map<VolumeClaimTemplateConfigKeys, String>> mapOfPVC =
- KubernetesContext.getVolumeClaimTemplates(testCase.input.first,
testCase.input.second);
+ final Map<String, Map<VolumeConfigKeys, String>> mapOfPVC =
+ KubernetesContext.getVolumeConfigs(testCase.input.first, prefix,
testCase.input.second);
Assert.assertTrue(testCase.description + ": Contains all provided
Volumes",
mapOfPVC.keySet().containsAll((List<String>) testCase.expected[0]));
- for (Map<VolumeClaimTemplateConfigKeys, String> items :
mapOfPVC.values()) {
+ for (Map<VolumeConfigKeys, String> items : mapOfPVC.values()) {
Assert.assertTrue(testCase.description + ": Contains all provided
option keys",
- items.keySet().containsAll((List<VolumeClaimTemplateConfigKeys>)
testCase.expected[1]));
+ items.keySet().containsAll((List<VolumeConfigKeys>)
testCase.expected[1]));
Assert.assertTrue(testCase.description + ": Contains all provided
option values",
items.values().containsAll((List<String>) testCase.expected[2]));
}
@@ -179,36 +210,42 @@ public class KubernetesContextTest {
// Empty PVC.
final Boolean[] emptyPVCTestCases = new Boolean[] {true, false};
for (boolean testCase : emptyPVCTestCases) {
- final Map<String, Map<VolumeClaimTemplateConfigKeys, String>> emptyPVC =
+ final Map<String, Map<VolumeConfigKeys, String>> emptyPVC =
KubernetesContext.getVolumeClaimTemplates(Config.newBuilder().build(),
testCase);
Assert.assertTrue("Empty PVC is returned when no options provided",
emptyPVC.isEmpty());
}
}
@Test
- public void testGetPersistentVolumeClaimsErrors() {
+ public void testGetVolumeConfigsErrors() {
+ final String prefix = KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX;
final String volumeNameValid = "volume-name-valid";
final String volumeNameInvalid = "volume-Name-Invalid";
+ final String passingValue = "should-pass";
final String failureValue = "Should-Fail";
- final String generalFailureMessage = "Invalid Persistent Volume";
+ final String generalFailureMessage = "Invalid Volume configuration";
final String keyPattern =
String.format(KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX
+ "%%s.%%s", KubernetesConstants.EXECUTOR_NAME);
final List<TestTuple<Config, String>> testCases = new LinkedList<>();
// Invalid option key test.
final Config configInvalidOption = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "claimName"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "storageClassName"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "sizeLimit"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "accessModes"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "volumeMode"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "server"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "type"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "medium"),
passingValue)
.put(String.format(keyPattern, volumeNameValid, "NonExistentKey"),
failureValue)
.build();
testCases.add(new TestTuple<>("Invalid option key should trigger
exception",
configInvalidOption, generalFailureMessage));
- // Just the prefix.
- final Config configJustPrefix = Config.newBuilder()
- .put(KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX, failureValue)
- .build();
- testCases.add(new TestTuple<>("Only a key prefix should trigger exception",
- configJustPrefix, generalFailureMessage));
-
// Invalid Volume Name.
final Config configInvalidVolumeName = Config.newBuilder()
.put(String.format(keyPattern, volumeNameInvalid, "path"),
failureValue)
@@ -216,31 +253,729 @@ public class KubernetesContextTest {
testCases.add(new TestTuple<>("Invalid Volume Name should trigger
exception",
configInvalidVolumeName, "lowercase RFC-1123"));
+ // Required Path.
+ final Config configRequiredPath = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "claimName"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "storageClassName"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "sizeLimit"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "accessModes"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "volumeMode"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "server"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "type"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "medium"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>("Missing path should trigger exception",
+ configRequiredPath, "All Volumes require a 'path'."));
+
+ // Disabled.
+ final Config configDisabled = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_VOLUME_FROM_CLI_DISABLED, "true")
+ .put(String.format(keyPattern, volumeNameValid, "claimName"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "storageClassName"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "sizeLimit"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "accessModes"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "volumeMode"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "server"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "type"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "medium"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>("Disabled functionality should trigger
exception",
+ configDisabled, "Configuring Volumes from the CLI is disabled."));
+
+ // Testing loop.
+ for (TestTuple<Config, String> testCase : testCases) {
+ String message = "";
+ try {
+ KubernetesContext.getVolumeConfigs(testCase.input, prefix, true);
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
+ }
+ Assert.assertTrue(testCase.description,
message.contains(testCase.expected));
+ }
+ }
+
+ /**
+ * Create test cases for <code>Volume Claim Templates</code>.
+ * @param testCases Test case container.
+ * Input: [0] Config, [1] Boolean to indicate
Manager/Executor.
+ * Output: <code>Map<String, Map<VolumeConfigKeys,
String></code>
+ * @param isExecutor Boolean to indicate Manager/Executor test case
generation.
+ */
+ private void createVolumeClaimTemplates(
+ List<TestTuple<Pair<Config, Boolean>, Map<String, Map<VolumeConfigKeys,
String>>>> testCases,
+ boolean isExecutor) {
+ final String volumeNameValid = "volume-name-valid";
+ final String passingValue = "should-pass";
+ final String processName = isExecutor ? KubernetesConstants.EXECUTOR_NAME
+ : KubernetesConstants.MANAGER_NAME;
+ final String keyPattern =
String.format(KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX
+ + "%%s.%%s", processName);
+
+ // With Storage Class Name.
+ final Map<String, Map<VolumeConfigKeys, String>>
expectedWithStorageClassName =
+ ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys,
String>() {
+ {
+ put(VolumeConfigKeys.claimName, passingValue);
+ put(VolumeConfigKeys.storageClassName, passingValue);
+ put(VolumeConfigKeys.sizeLimit, passingValue);
+ put(VolumeConfigKeys.accessModes, passingValue);
+ put(VolumeConfigKeys.volumeMode, passingValue);
+ put(VolumeConfigKeys.path, passingValue);
+ put(VolumeConfigKeys.subPath, passingValue);
+ put(VolumeConfigKeys.readOnly, passingValue);
+ }
+ });
+ final Config configWithStorageClass = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "claimName"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "storageClassName"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "sizeLimit"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "accessModes"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "volumeMode"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": PVC with Storage Class
name",
+ new Pair<>(configWithStorageClass, isExecutor),
expectedWithStorageClassName));
+
+ // Without Storage Class Name.
+ final Map<String, Map<VolumeConfigKeys, String>>
expectedWithoutStorageClassName =
+ ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys,
String>() {
+ {
+ put(VolumeConfigKeys.claimName, passingValue);
+ put(VolumeConfigKeys.sizeLimit, passingValue);
+ put(VolumeConfigKeys.accessModes, passingValue);
+ put(VolumeConfigKeys.volumeMode, passingValue);
+ put(VolumeConfigKeys.path, passingValue);
+ put(VolumeConfigKeys.subPath, passingValue);
+ put(VolumeConfigKeys.readOnly, passingValue);
+ }
+ });
+ final Config configWithoutStorageClass = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "claimName"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "sizeLimit"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "accessModes"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "volumeMode"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": PVC with Storage Class
name",
+ new Pair<>(configWithoutStorageClass, isExecutor),
expectedWithoutStorageClassName));
+
+ // Ignored.
+ final Config configIgnored = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "claimName"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "storageClassName"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "sizeLimit"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "accessModes"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "volumeMode"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": PVC with ignored keys",
+ new Pair<>(configIgnored, !isExecutor), new HashMap<>()));
+ }
+
+ @Test
+ public void testGetVolumeClaimTemplates() {
+ final List<TestTuple<Pair<Config, Boolean>, Map<String,
Map<VolumeConfigKeys, String>>>>
+ testCases = new LinkedList<>();
+ createVolumeClaimTemplates(testCases, true);
+ createVolumeClaimTemplates(testCases, false);
+
+ // Testing loop.
+ for (TestTuple<Pair<Config, Boolean>, Map<String, Map<VolumeConfigKeys,
String>>> testCase
+ : testCases) {
+ Map<String, Map<VolumeConfigKeys, String>> actual =
+ KubernetesContext.getVolumeClaimTemplates(testCase.input.first,
testCase.input.second);
+ Assert.assertEquals(testCase.description, testCase.expected, actual);
+ }
+ }
+
+ /**
+ * Create test cases for <code>Volume Claim Templates</code> errors.
+ * @param testCases Test case container.
+ * Input: [0] Config, [1] Boolean to indicate
Manager/Executor.
+ * Output: Error message
+ * @param isExecutor Boolean to indicate Manager/Executor test case
generation.
+ */
+ private void createVolumeClaimTemplatesErrors(
+ List<TestTuple<Pair<Config, Boolean>, String>> testCases, boolean
isExecutor) {
+ final String volumeNameValid = "volume-name-valid";
+ final String passingValue = "should-pass";
+ final String failureValue = "Should-Fail";
+ final String processName = isExecutor ? KubernetesConstants.EXECUTOR_NAME
+ : KubernetesConstants.MANAGER_NAME;
+ final String keyPattern =
String.format(KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX
+ + "%%s.%%s", processName);
+
+ // Required Claim Name.
+ final Config configRequiredClaimName = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": Missing Claim Name should
trigger exception",
+ new Pair<>(configRequiredClaimName, isExecutor), "require a
`claimName`"));
+
// Invalid Claim Name.
final Config configInvalidClaimName = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
.put(String.format(keyPattern, volumeNameValid, "claimName"),
failureValue)
.build();
- testCases.add(new TestTuple<>("Invalid Claim Name should trigger
exception",
- configInvalidClaimName, "Option `claimName`"));
+ testCases.add(new TestTuple<>(processName + ": Invalid Claim Name should
trigger exception",
+ new Pair<>(configInvalidClaimName, isExecutor),
+ String.format("Volume `%s`: `claimName`", volumeNameValid)));
// Invalid Storage Class Name.
final Config configInvalidStorageClassName = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "claimName"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
.put(String.format(keyPattern, volumeNameValid, "storageClassName"),
failureValue)
.build();
- testCases.add(new TestTuple<>("Invalid Storage Class Name should trigger
exception",
- configInvalidStorageClassName, "Option `storageClassName`"));
+ testCases.add(new TestTuple<>(processName
+ + ": Invalid Storage Class Name should trigger exception",
+ new Pair<>(configInvalidStorageClassName, isExecutor),
+ String.format("Volume `%s`: `storageClassName`", volumeNameValid)));
+
+ // Invalid Storage Class Name.
+ final Config configInvalidOption = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "claimName"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "storageClassName"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "sizeLimit"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "accessModes"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "volumeMode"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "server"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": Invalid option should
trigger exception",
+ new Pair<>(configInvalidOption, isExecutor),
+ String.format("Volume `%s`: Invalid Persistent", volumeNameValid)));
+ }
+
+ @Test
+ public void testGetVolumeClaimTemplatesErrors() {
+ final List<TestTuple<Pair<Config, Boolean>, String>> testCases = new
LinkedList<>();
+ createVolumeClaimTemplatesErrors(testCases, true);
+ createVolumeClaimTemplatesErrors(testCases, false);
// Testing loop.
- final Boolean[] executorFlags = new Boolean[] {true, false};
- for (TestTuple<Config, String> testCase : testCases) {
- // Test for both Executor and Manager.
- for (boolean isExecutor : executorFlags) {
- try {
- KubernetesContext.getVolumeClaimTemplates(testCase.input,
isExecutor);
- } catch (TopologySubmissionException e) {
- Assert.assertTrue(testCase.description,
e.getMessage().contains(testCase.expected));
- }
+ for (TestTuple<Pair<Config, Boolean>, String> testCase : testCases) {
+ String message = "";
+ try {
+ KubernetesContext.getVolumeClaimTemplates(testCase.input.first,
testCase.input.second);
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
+ }
+ Assert.assertTrue(testCase.description,
message.contains(testCase.expected));
+ }
+ }
+
+ /**
+ * Create test cases for <code>Empty Directory</code>.
+ * @param testCases Test case container.
+ * Input: [0] Config, [1] Boolean to indicate
Manager/Executor.
+ * Output: <code>Map<String, Map<VolumeConfigKeys,
String></code>
+ * @param isExecutor Boolean to indicate Manager/Executor test case
generation.
+ */
+ private void createVolumeEmptyDir(
+ List<TestTuple<Pair<Config, Boolean>, Map<String, Map<VolumeConfigKeys,
String>>>> testCases,
+ boolean isExecutor) {
+ final String volumeNameValid = "volume-name-valid";
+ final String passingValue = "should-pass";
+ final String processName = isExecutor ? KubernetesConstants.EXECUTOR_NAME
+ : KubernetesConstants.MANAGER_NAME;
+ final String keyPattern =
String.format(KubernetesContext.KUBERNETES_VOLUME_EMPTYDIR_PREFIX
+ + "%%s.%%s", processName);
+
+ // With Medium.
+ final Map<String, Map<VolumeConfigKeys, String>> expectedWithMedium =
+ ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys,
String>() {
+ {
+ put(VolumeConfigKeys.sizeLimit, passingValue);
+ put(VolumeConfigKeys.medium, "Memory");
+ put(VolumeConfigKeys.path, passingValue);
+ put(VolumeConfigKeys.subPath, passingValue);
+ put(VolumeConfigKeys.readOnly, passingValue);
+ }
+ });
+ final Config configWithMedium = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "sizeLimit"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "medium"), "Memory")
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": `emptyDir` with `medium`",
+ new Pair<>(configWithMedium, isExecutor), expectedWithMedium));
+
+ // With empty Medium.
+ final Map<String, Map<VolumeConfigKeys, String>> expectedEmptyMedium =
+ ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys,
String>() {
+ {
+ put(VolumeConfigKeys.sizeLimit, passingValue);
+ put(VolumeConfigKeys.medium, "");
+ put(VolumeConfigKeys.path, passingValue);
+ put(VolumeConfigKeys.subPath, passingValue);
+ put(VolumeConfigKeys.readOnly, passingValue);
+ }
+ });
+ final Config configEmptyMedium = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "sizeLimit"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "medium"), "")
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": `emptyDir` with empty
`medium`",
+ new Pair<>(configEmptyMedium, isExecutor), expectedEmptyMedium));
+
+ // Without Medium.
+ final Map<String, Map<VolumeConfigKeys, String>> expectedNoMedium =
+ ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys,
String>() {
+ {
+ put(VolumeConfigKeys.sizeLimit, passingValue);
+ put(VolumeConfigKeys.path, passingValue);
+ put(VolumeConfigKeys.subPath, passingValue);
+ put(VolumeConfigKeys.readOnly, passingValue);
+ }
+ });
+ final Config configNoMedium = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "sizeLimit"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": `emptyDir` without
`medium`",
+ new Pair<>(configNoMedium, isExecutor), expectedNoMedium));
+
+ // Ignored.
+ final Config configIgnored = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "sizeLimit"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "medium"), "")
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": `emptyDir` ignored",
+ new Pair<>(configIgnored, !isExecutor), new HashMap<>()));
+ }
+
+ @Test
+ public void testGetVolumeEmptyDir() {
+ final List<TestTuple<Pair<Config, Boolean>, Map<String,
Map<VolumeConfigKeys, String>>>>
+ testCases = new LinkedList<>();
+ createVolumeEmptyDir(testCases, true);
+ createVolumeEmptyDir(testCases, false);
+
+ // Testing loop.
+ for (TestTuple<Pair<Config, Boolean>, Map<String, Map<VolumeConfigKeys,
String>>> testCase
+ : testCases) {
+ Map<String, Map<VolumeConfigKeys, String>> actual =
+ KubernetesContext.getVolumeEmptyDir(testCase.input.first,
testCase.input.second);
+ Assert.assertEquals(testCase.description, testCase.expected, actual);
+ }
+ }
+
+ /**
+ * Create test cases for <code>Empty Directory</code> errors.
+ * @param testCases Test case container.
+ * Input: [0] Config, [1] Boolean to indicate
Manager/Executor.
+ * Output: Error message
+ * @param isExecutor Boolean to indicate Manager/Executor test case
generation.
+ */
+ private void createVolumeEmptyDirError(
+ List<TestTuple<Pair<Config, Boolean>, String>> testCases, boolean
isExecutor) {
+ final String volumeNameValid = "volume-name-valid";
+ final String passingValue = "should-pass";
+ final String failureValue = "Should-Fail";
+ final String processName = isExecutor ? KubernetesConstants.EXECUTOR_NAME
+ : KubernetesConstants.MANAGER_NAME;
+ final String keyPattern =
String.format(KubernetesContext.KUBERNETES_VOLUME_EMPTYDIR_PREFIX
+ + "%%s.%%s", processName);
+
+ // Medium is invalid.
+ final Config configInvalidMedium = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "sizeLimit"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "medium"),
failureValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": Invalid 'medium' should
trigger exception",
+ new Pair<>(configInvalidMedium, isExecutor), "must be 'Memory' or
empty."));
+
+ // Invalid option.
+ final Config configInvalidOption = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "sizeLimit"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "medium"), "Memory")
+ .put(String.format(keyPattern, volumeNameValid, "accessModes"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": Invalid option should
trigger exception",
+ new Pair<>(configInvalidOption, isExecutor), "Directory type option"));
+ }
+
+ @Test
+ public void testGetVolumeEmptyDirErrors() {
+ final List<TestTuple<Pair<Config, Boolean>, String>> testCases = new
LinkedList<>();
+ createVolumeEmptyDirError(testCases, true);
+ createVolumeEmptyDirError(testCases, false);
+
+ // Testing loop.
+ for (TestTuple<Pair<Config, Boolean>, String> testCase : testCases) {
+ String message = "";
+ try {
+ KubernetesContext.getVolumeEmptyDir(testCase.input.first,
testCase.input.second);
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
+ }
+ Assert.assertTrue(testCase.description,
message.contains(testCase.expected));
+ }
+ }
+
+ /**
+ * Create test cases for <code>Host Path</code>.
+ * @param testCases Test case container.
+ * Input: [0] Config, [1] Boolean to indicate
Manager/Executor.
+ * Output: <code>Map<String, Map<VolumeConfigKeys,
String></code>
+ * @param isExecutor Boolean to indicate Manager/Executor test case
generation.
+ */
+ private void createVolumeHostPath(
+ List<TestTuple<Pair<Config, Boolean>, Map<String, Map<VolumeConfigKeys,
String>>>> testCases,
+ boolean isExecutor) {
+ final String volumeNameValid = "volume-name-valid";
+ final String passingValue = "should-pass";
+ final String processName = isExecutor ? KubernetesConstants.EXECUTOR_NAME
+ : KubernetesConstants.MANAGER_NAME;
+ final String keyPattern =
String.format(KubernetesContext.KUBERNETES_VOLUME_HOSTPATH_PREFIX
+ + "%%s.%%s", processName);
+
+ // With type.
+ final Map<String, Map<VolumeConfigKeys, String>> expectedWithType =
+ ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys,
String>() {
+ {
+ put(VolumeConfigKeys.type, "DirectoryOrCreate");
+ put(VolumeConfigKeys.path, passingValue);
+ put(VolumeConfigKeys.pathOnHost, passingValue);
+ put(VolumeConfigKeys.subPath, passingValue);
+ put(VolumeConfigKeys.readOnly, passingValue);
+ }
+ });
+ final Config configWithType = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "type"),
"DirectoryOrCreate")
+ .put(String.format(keyPattern, volumeNameValid, "pathOnHost"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": 'hostPath' with 'type'",
+ new Pair<>(configWithType, isExecutor), expectedWithType));
+
+ // Without type.
+ final Map<String, Map<VolumeConfigKeys, String>> expectedWithoutType =
+ ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys,
String>() {
+ {
+ put(VolumeConfigKeys.pathOnHost, passingValue);
+ put(VolumeConfigKeys.path, passingValue);
+ put(VolumeConfigKeys.subPath, passingValue);
+ put(VolumeConfigKeys.readOnly, passingValue);
+ }
+ });
+ final Config configWithoutType = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "pathOnHost"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": 'hostPath' without 'type'",
+ new Pair<>(configWithoutType, isExecutor), expectedWithoutType));
+
+ // Ignored.
+ final Config configIgnored = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "type"), "BlockDevice")
+ .put(String.format(keyPattern, volumeNameValid, "pathOnHost"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": 'hostPath' ignored",
+ new Pair<>(configIgnored, !isExecutor), new HashMap<>()));
+ }
+
+ @Test
+ public void testGetVolumeHostPath() {
+ final List<TestTuple<Pair<Config, Boolean>, Map<String,
Map<VolumeConfigKeys, String>>>>
+ testCases = new LinkedList<>();
+ createVolumeHostPath(testCases, true);
+ createVolumeHostPath(testCases, false);
+
+ // Testing loop.
+ for (TestTuple<Pair<Config, Boolean>, Map<String, Map<VolumeConfigKeys,
String>>> testCase
+ : testCases) {
+ Map<String, Map<VolumeConfigKeys, String>> actual =
+ KubernetesContext.getVolumeHostPath(testCase.input.first,
testCase.input.second);
+ Assert.assertEquals(testCase.description, testCase.expected, actual);
+ }
+ }
+
+ /**
+ * Create test cases for <code>Host Path</code> errors.
+ * @param testCases Test case container.
+ * Input: [0] Config, [1] Boolean to indicate
Manager/Executor.
+ * Output: Error message
+ * @param isExecutor Boolean to indicate Manager/Executor test case
generation.
+ */
+ private void createVolumeHostPathError(
+ List<TestTuple<Pair<Config, Boolean>, String>> testCases, boolean
isExecutor) {
+ final String volumeNameValid = "volume-name-valid";
+ final String passingValue = "should-pass";
+ final String failureValue = "Should-Fail";
+ final String processName = isExecutor ? KubernetesConstants.EXECUTOR_NAME
+ : KubernetesConstants.MANAGER_NAME;
+ final String keyPattern =
String.format(KubernetesContext.KUBERNETES_VOLUME_HOSTPATH_PREFIX
+ + "%%s.%%s", processName);
+
+ // Type is invalid.
+ final Config configInvalidMedium = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "type"), failureValue)
+ .put(String.format(keyPattern, volumeNameValid, "pathOnHost"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": Invalid 'type' should
trigger exception",
+ new Pair<>(configInvalidMedium, isExecutor), "Host Path 'type' of"));
+
+ // Path on Host is missing.
+ final Config configNoHostOnPath = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "type"), "BlockDevice")
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": No 'hostOnPath' should
trigger exception",
+ new Pair<>(configNoHostOnPath, isExecutor), "requires a path on the
host"));
+
+ // Path on Host is empty.
+ final Config configEmptyHostOnPath = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "type"), "BlockDevice")
+ .put(String.format(keyPattern, volumeNameValid, "pathOnHost"), "")
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": Empty 'hostOnPath' should
trigger exception",
+ new Pair<>(configEmptyHostOnPath, isExecutor), "requires a path on the
host"));
+
+ // Invalid option.
+ final Config configInvalidOption = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "type"), "BlockDevice")
+ .put(String.format(keyPattern, volumeNameValid, "pathOnHost"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "accessModes"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": Invalid option should
trigger exception",
+ new Pair<>(configInvalidOption, isExecutor), "Invalid Host Path option
for"));
+ }
+
+ @Test
+ public void testGetVolumeHostPathErrors() {
+ final List<TestTuple<Pair<Config, Boolean>, String>> testCases = new
LinkedList<>();
+ createVolumeHostPathError(testCases, true);
+ createVolumeHostPathError(testCases, false);
+
+ // Testing loop.
+ for (TestTuple<Pair<Config, Boolean>, String> testCase : testCases) {
+ String message = "";
+ try {
+ KubernetesContext.getVolumeHostPath(testCase.input.first,
testCase.input.second);
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
+ }
+ Assert.assertTrue(testCase.description,
message.contains(testCase.expected));
+ }
+ }
+
+ /**
+ * Create test cases for <code>NFS</code>.
+ * @param testCases Test case container.
+ * Input: [0] Config, [1] Boolean to indicate
Manager/Executor.
+ * Output: <code>Map<String, Map<VolumeConfigKeys,
String></code>
+ * @param isExecutor Boolean to indicate Manager/Executor test case
generation.
+ */
+ private void createVolumeNFS(
+ List<TestTuple<Pair<Config, Boolean>, Map<String, Map<VolumeConfigKeys,
String>>>> testCases,
+ boolean isExecutor) {
+ final String volumeNameValid = "volume-name-valid";
+ final String passingValue = "should-pass";
+ final String processName = isExecutor ? KubernetesConstants.EXECUTOR_NAME
+ : KubernetesConstants.MANAGER_NAME;
+ final String keyPattern =
String.format(KubernetesContext.KUBERNETES_VOLUME_NFS_PREFIX
+ + "%%s.%%s", processName);
+
+ // With readOnly.
+ final Map<String, Map<VolumeConfigKeys, String>> expectedWithReadOnly =
+ ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys,
String>() {
+ {
+ put(VolumeConfigKeys.server, "nfs-server.default.local");
+ put(VolumeConfigKeys.readOnly, "true");
+ put(VolumeConfigKeys.pathOnNFS, passingValue);
+ put(VolumeConfigKeys.path, passingValue);
+ put(VolumeConfigKeys.subPath, passingValue);
+ put(VolumeConfigKeys.readOnly, passingValue);
+ }
+ });
+ final Config configWithReadOnly = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "server"),
"nfs-server.default.local")
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"), "true")
+ .put(String.format(keyPattern, volumeNameValid, "pathOnNFS"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": `NFS` with `readOnly`",
+ new Pair<>(configWithReadOnly, isExecutor), expectedWithReadOnly));
+
+ // With readOnly.
+ final Map<String, Map<VolumeConfigKeys, String>> expectedWithoutReadOnly =
+ ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys,
String>() {
+ {
+ put(VolumeConfigKeys.server, "nfs-server.default.local");
+ put(VolumeConfigKeys.pathOnNFS, passingValue);
+ put(VolumeConfigKeys.path, passingValue);
+ put(VolumeConfigKeys.subPath, passingValue);
+ put(VolumeConfigKeys.readOnly, passingValue);
+ }
+ });
+ final Config configWithoutReadOnly = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "server"),
"nfs-server.default.local")
+ .put(String.format(keyPattern, volumeNameValid, "pathOnNFS"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": `NFS` without `readOnly`",
+ new Pair<>(configWithoutReadOnly, isExecutor),
expectedWithoutReadOnly));
+
+ // Ignored.
+ final Config configIgnored = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "server"),
"nfs-server.default.local")
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"), "true")
+ .put(String.format(keyPattern, volumeNameValid, "pathOnNFS"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": `NFS` ignored",
+ new Pair<>(configIgnored, !isExecutor), new HashMap<>()));
+ }
+
+ @Test
+ public void testGetVolumeNFS() {
+ final List<TestTuple<Pair<Config, Boolean>, Map<String,
Map<VolumeConfigKeys, String>>>>
+ testCases = new LinkedList<>();
+ createVolumeNFS(testCases, true);
+ createVolumeNFS(testCases, false);
+
+ // Testing loop.
+ for (TestTuple<Pair<Config, Boolean>, Map<String, Map<VolumeConfigKeys,
String>>> testCase
+ : testCases) {
+ Map<String, Map<VolumeConfigKeys, String>> actual =
+ KubernetesContext.getVolumeNFS(testCase.input.first,
testCase.input.second);
+ Assert.assertEquals(testCase.description, testCase.expected, actual);
+ }
+ }
+ /**
+ * Create test cases for <code>NFS</code> errors.
+ * @param testCases Test case container.
+ * Input: [0] Config, [1] Boolean to indicate
Manager/Executor.
+ * Output: Error message
+ * @param isExecutor Boolean to indicate Manager/Executor test case
generation.
+ */
+ private void createVolumeNFSError(
+ List<TestTuple<Pair<Config, Boolean>, String>> testCases, boolean
isExecutor) {
+ final String volumeNameValid = "volume-name-valid";
+ final String passingValue = "should-pass";
+ final String failureValue = "Should-Fail";
+ final String processName = isExecutor ? KubernetesConstants.EXECUTOR_NAME
+ : KubernetesConstants.MANAGER_NAME;
+ final String keyPattern =
String.format(KubernetesContext.KUBERNETES_VOLUME_NFS_PREFIX
+ + "%%s.%%s", processName);
+
+ // Server is missing.
+ final Config configNoServer = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"), "false")
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "pathOnNFS"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": No `server` should trigger
exception",
+ new Pair<>(configNoServer, isExecutor), "`NFS` volumes require a"));
+
+ // Server is invalid.
+ final Config configInvalidServer = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "server"), "")
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"), "false")
+ .put(String.format(keyPattern, volumeNameValid, "pathOnNFS"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": Invalid `server` should
trigger exception",
+ new Pair<>(configInvalidServer, isExecutor), "`NFS` volumes require
a"));
+
+ // Path on NFS missing.
+ final Config configNoNFSPath = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "server"),
"nfs-server.default.local")
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"), "false")
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": No path on NFS should
trigger exception",
+ new Pair<>(configNoNFSPath, isExecutor), "NFS requires a path on"));
+
+ // Path on NFS is empty.
+ final Config configEmptyNFSPath = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "server"),
"nfs-server.default.local")
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"), "false")
+ .put(String.format(keyPattern, volumeNameValid, "pathOnNFS"), "")
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": No path on NFS should
trigger exception",
+ new Pair<>(configEmptyNFSPath, isExecutor), "NFS requires a path on"));
+
+ // Invalid option.
+ final Config configInvalidOption = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "server"),
"nfs-server.default.local")
+ .put(String.format(keyPattern, volumeNameValid, "readOnly"), "false")
+ .put(String.format(keyPattern, volumeNameValid, "pathOnNFS"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "subPath"),
passingValue)
+ .put(String.format(keyPattern, volumeNameValid, "accessModes"),
passingValue)
+ .build();
+ testCases.add(new TestTuple<>(processName + ": Invalid option should
trigger exception",
+ new Pair<>(configInvalidOption, isExecutor), "Invalid NFS option"));
+ }
+
+ @Test
+ public void testGetVolumeNFSErrors() {
+ final List<TestTuple<Pair<Config, Boolean>, String>> testCases = new
LinkedList<>();
+ createVolumeNFSError(testCases, true);
+ createVolumeNFSError(testCases, false);
+
+ // Testing loop.
+ for (TestTuple<Pair<Config, Boolean>, String> testCase : testCases) {
+ String message = "";
+ try {
+ KubernetesContext.getVolumeNFS(testCase.input.first,
testCase.input.second);
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
}
+ Assert.assertTrue(testCase.description,
message.contains(testCase.expected));
}
}
}
diff --git
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
index e79760d..6436520 100644
---
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
+++
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
@@ -66,8 +66,7 @@ import io.kubernetes.client.openapi.models.V1VolumeBuilder;
import io.kubernetes.client.openapi.models.V1VolumeMount;
import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
-import static
org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeClaimTemplateConfigKeys;
-import static org.mockito.Matchers.anyMap;
+import static
org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeConfigKeys;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -866,37 +865,37 @@ public class V1ControllerTest {
final String volumeMode = "VolumeMode";
final String path = "/path/to/mount/";
final String subPath = "/sub/path/to/mount/";
- final Map<String, Map<VolumeClaimTemplateConfigKeys, String>> mapPVCOpts =
+ final Map<String, Map<VolumeConfigKeys, String>> mapPVCOpts =
ImmutableMap.of(
- volumeNameOne, new HashMap<VolumeClaimTemplateConfigKeys,
String>() {
+ volumeNameOne, new HashMap<VolumeConfigKeys, String>() {
{
- put(VolumeClaimTemplateConfigKeys.claimName, claimNameOne);
- put(VolumeClaimTemplateConfigKeys.storageClassName,
storageClassName);
- put(VolumeClaimTemplateConfigKeys.sizeLimit, sizeLimit);
- put(VolumeClaimTemplateConfigKeys.accessModes,
accessModesList);
- put(VolumeClaimTemplateConfigKeys.volumeMode, volumeMode);
- put(VolumeClaimTemplateConfigKeys.path, path);
+ put(VolumeConfigKeys.claimName, claimNameOne);
+ put(VolumeConfigKeys.storageClassName, storageClassName);
+ put(VolumeConfigKeys.sizeLimit, sizeLimit);
+ put(VolumeConfigKeys.accessModes, accessModesList);
+ put(VolumeConfigKeys.volumeMode, volumeMode);
+ put(VolumeConfigKeys.path, path);
}
},
- volumeNameTwo, new HashMap<VolumeClaimTemplateConfigKeys,
String>() {
+ volumeNameTwo, new HashMap<VolumeConfigKeys, String>() {
{
- put(VolumeClaimTemplateConfigKeys.claimName, claimNameTwo);
- put(VolumeClaimTemplateConfigKeys.storageClassName,
storageClassName);
- put(VolumeClaimTemplateConfigKeys.sizeLimit, sizeLimit);
- put(VolumeClaimTemplateConfigKeys.accessModes, accessModes);
- put(VolumeClaimTemplateConfigKeys.volumeMode, volumeMode);
- put(VolumeClaimTemplateConfigKeys.path, path);
- put(VolumeClaimTemplateConfigKeys.subPath, subPath);
+ put(VolumeConfigKeys.claimName, claimNameTwo);
+ put(VolumeConfigKeys.storageClassName, storageClassName);
+ put(VolumeConfigKeys.sizeLimit, sizeLimit);
+ put(VolumeConfigKeys.accessModes, accessModes);
+ put(VolumeConfigKeys.volumeMode, volumeMode);
+ put(VolumeConfigKeys.path, path);
+ put(VolumeConfigKeys.subPath, subPath);
}
},
- volumeNameStatic, new HashMap<VolumeClaimTemplateConfigKeys,
String>() {
+ volumeNameStatic, new HashMap<VolumeConfigKeys, String>() {
{
- put(VolumeClaimTemplateConfigKeys.claimName, claimNameStatic);
- put(VolumeClaimTemplateConfigKeys.sizeLimit, sizeLimit);
- put(VolumeClaimTemplateConfigKeys.accessModes, accessModes);
- put(VolumeClaimTemplateConfigKeys.volumeMode, volumeMode);
- put(VolumeClaimTemplateConfigKeys.path, path);
- put(VolumeClaimTemplateConfigKeys.subPath, subPath);
+ put(VolumeConfigKeys.claimName, claimNameStatic);
+ put(VolumeConfigKeys.sizeLimit, sizeLimit);
+ put(VolumeConfigKeys.accessModes, accessModes);
+ put(VolumeConfigKeys.volumeMode, volumeMode);
+ put(VolumeConfigKeys.path, path);
+ put(VolumeConfigKeys.subPath, subPath);
}
}
);
@@ -922,6 +921,7 @@ public class V1ControllerTest {
.withLabels(V1Controller.getPersistentVolumeClaimLabels(topologyName))
.endMetadata()
.withNewSpec()
+ .withStorageClassName("")
.withAccessModes(Collections.singletonList(accessModes))
.withVolumeMode(volumeMode)
.withNewResources()
@@ -948,15 +948,15 @@ public class V1ControllerTest {
final String mountPathOne = "/mount/path/ONE";
final String mountPathTwo = "/mount/path/TWO";
final String mountSubPathTwo = "/mount/sub/path/TWO";
- Map<String, Map<VolumeClaimTemplateConfigKeys, String>> mapOfOpts =
+ Map<String, Map<VolumeConfigKeys, String>> mapOfOpts =
ImmutableMap.of(
volumeNameOne, ImmutableMap.of(
- VolumeClaimTemplateConfigKeys.claimName, claimNameOne,
- VolumeClaimTemplateConfigKeys.path, mountPathOne),
+ VolumeConfigKeys.claimName, claimNameOne,
+ VolumeConfigKeys.path, mountPathOne),
volumeNameTwo, ImmutableMap.of(
- VolumeClaimTemplateConfigKeys.claimName, claimNameTwo,
- VolumeClaimTemplateConfigKeys.path, mountPathTwo,
- VolumeClaimTemplateConfigKeys.subPath, mountSubPathTwo)
+ VolumeConfigKeys.claimName, claimNameTwo,
+ VolumeConfigKeys.path, mountPathTwo,
+ VolumeConfigKeys.subPath, mountSubPathTwo)
);
final V1Volume volumeOne = new V1VolumeBuilder()
.withName(volumeNameOne)
@@ -981,13 +981,13 @@ public class V1ControllerTest {
.build();
// Test case container.
- final List<TestTuple<Pair<List<V1Volume>, List<V1VolumeMount>>,
+ // Input: Map of Volume configurations.
+ // Output: The expected lists of Volumes and Volume Mounts.
+ final List<TestTuple<Map<String, Map<VolumeConfigKeys, String>>,
Pair<List<V1Volume>, List<V1VolumeMount>>>> testCases = new
LinkedList<>();
// Default case: No PVC provided.
- final Pair<List<V1Volume>, List<V1VolumeMount>> actualEmpty =
-
v1ControllerPodTemplate.createPersistentVolumeClaimVolumesAndMounts(new
HashMap<>());
- testCases.add(new TestTuple<>("Generated an empty list of Volumes",
actualEmpty,
+ testCases.add(new TestTuple<>("Generated an empty list of Volumes", new
HashMap<>(),
new Pair<>(new LinkedList<>(), new LinkedList<>())));
// PVC Provided.
@@ -995,23 +995,26 @@ public class V1ControllerTest {
new Pair<>(
new LinkedList<>(Arrays.asList(volumeOne, volumeTwo)),
new LinkedList<>(Arrays.asList(volumeMountOne, volumeMountTwo)));
- final Pair<List<V1Volume>, List<V1VolumeMount>> actualFull =
-
v1ControllerPodTemplate.createPersistentVolumeClaimVolumesAndMounts(mapOfOpts);
- testCases.add(new TestTuple<>("Generated a list of Volumes", actualFull,
+ testCases.add(new TestTuple<>("Generated a list of Volumes", mapOfOpts,
new Pair<>(expectedFull.first, expectedFull.second)));
// Testing loop.
- for (TestTuple<Pair<List<V1Volume>, List<V1VolumeMount>>,
+ for (TestTuple<Map<String, Map<VolumeConfigKeys, String>>,
Pair<List<V1Volume>, List<V1VolumeMount>>> testCase : testCases) {
+ List<V1Volume> actualVolume = new LinkedList<>();
+ List<V1VolumeMount> actualVolumeMount = new LinkedList<>();
+
v1ControllerPodTemplate.createVolumeAndMountsPersistentVolumeClaimCLI(testCase.input,
+ actualVolume, actualVolumeMount);
+
Assert.assertTrue(testCase.description,
- (testCase.expected.first).containsAll(testCase.input.first));
+ (testCase.expected.first).containsAll(actualVolume));
Assert.assertTrue(testCase.description + " Mounts",
- (testCase.expected.second).containsAll(testCase.input.second));
+ (testCase.expected.second).containsAll(actualVolumeMount));
}
}
@Test
- public void testConfigurePodWithPersistentVolumeClaims() {
+ public void testConfigurePodWithVolumesAndMountsFromCLI() {
final String volumeNameClashing = "clashing-volume";
final String volumeMountNameClashing = "original-volume-mount";
V1Volume baseVolume = new V1VolumeBuilder()
@@ -1046,8 +1049,8 @@ public class V1ControllerTest {
.build();
// Test case container.
- // Input: Pod Spec to modify, Executor to modify, Volumes and Mounts to
return from
- // <createPersistentVolumeClaimVolumesAndMounts>.
+ // Input: [0] Pod Spec to modify, [1] Heron container to modify, [2] List
of Volumes
+ // [3] List of Volume Mounts.
// Output: The expected <V1PodSpec> and <V1Container>.
final List<TestTuple<Object[], Pair<V1PodSpec, V1Container>>> testCases =
new LinkedList<>();
@@ -1058,11 +1061,9 @@ public class V1ControllerTest {
final V1PodSpec expectedEmptyPodSpec = new
V1PodSpecBuilder().withVolumes(baseVolume).build();
final V1Container expectedEmptyExecutor =
new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build();
- Pair<List<V1Volume>, List<V1VolumeMount>> emptyVolumeAndMount =
- new Pair<>(new LinkedList<>(), new LinkedList<>());
testCases.add(new TestTuple<>("Empty",
- new Object[]{podSpecEmptyCase, executorEmptyCase, emptyVolumeAndMount},
+ new Object[]{podSpecEmptyCase, executorEmptyCase, new LinkedList<>(),
new LinkedList<>()},
new Pair<>(expectedEmptyPodSpec, expectedEmptyExecutor)));
// Non-clashing Persistent Volume Claim.
@@ -1081,12 +1082,10 @@ public class V1ControllerTest {
.addToVolumeMounts(secondaryVolumeMount)
.build();
- Pair<List<V1Volume>, List<V1VolumeMount>> noClashVolumeAndMount = new
Pair<>(
- new LinkedList<>(Collections.singletonList(secondaryVolume)),
- new LinkedList<>(Collections.singletonList(secondaryVolumeMount)));
-
testCases.add(new TestTuple<>("No Clash",
- new Object[]{podSpecNoClashCase, executorNoClashCase,
noClashVolumeAndMount},
+ new Object[]{podSpecNoClashCase, executorNoClashCase,
+ Collections.singletonList(secondaryVolume),
+ Collections.singletonList(secondaryVolumeMount)},
new Pair<>(expectedNoClashPodSpec, expectedNoClashExecutor)));
// Clashing Persistent Volume Claim.
@@ -1105,24 +1104,18 @@ public class V1ControllerTest {
.addToVolumeMounts(secondaryVolumeMount)
.build();
- Pair<List<V1Volume>, List<V1VolumeMount>> clashVolumeAndMount = new Pair<>(
- new LinkedList<>(Arrays.asList(clashingVolume, secondaryVolume)),
- new LinkedList<>(Arrays.asList(clashingVolumeMount,
secondaryVolumeMount)));
-
testCases.add(new TestTuple<>("Clashing",
- new Object[]{podSpecClashCase, executorClashCase, clashVolumeAndMount},
+ new Object[]{podSpecClashCase, executorClashCase,
+ Arrays.asList(clashingVolume, secondaryVolume),
+ Arrays.asList(clashingVolumeMount, secondaryVolumeMount)},
new Pair<>(expectedClashPodSpec, expectedClashExecutor)));
// Testing loop.
for (TestTuple<Object[], Pair<V1PodSpec, V1Container>> testCase :
testCases) {
- doReturn(testCase.input[2])
- .when(v1ControllerWithPodTemplate)
- .createPersistentVolumeClaimVolumesAndMounts(anyMap());
-
- // <configPVC> parameter is used in mock above, so we can set it to
<null> as it is not used.
v1ControllerWithPodTemplate
- .configurePodWithPersistentVolumeClaimVolumesAndMounts((V1PodSpec)
testCase.input[0],
- (V1Container) testCase.input[1], null);
+ .configurePodWithVolumesAndMountsFromCLI((V1PodSpec)
testCase.input[0],
+ (V1Container) testCase.input[1], (List<V1Volume>)
testCase.input[2],
+ (List<V1VolumeMount>) testCase.input[3]);
Assert.assertEquals("Pod Specs match " + testCase.description,
testCase.input[0], testCase.expected.first);
@@ -1222,4 +1215,231 @@ public class V1ControllerTest {
Assert.assertEquals(testCase.description, testCase.expected, actual);
}
}
+
+ @Test
+ public void testCreateVolumeMountsCLI() {
+ final String volumeNamePVC = "volume-name-pvc";
+ final String volumeNameHostPath = "volume-name-host-path";
+ final String volumeNameEmptyDir = "volume-name-empty-dir";
+ final String volumeNameNFS = "volume-name-nfs";
+ final String value = "inserted-value";
+
+ // Test case container.
+ // Input: [0] volume name, [1] volume options
+ // Output: The expected <V1VolumeMount>.
+ final List<TestTuple<Pair<String, Map<VolumeConfigKeys, String>>,
V1VolumeMount>> testCases =
+ new LinkedList<>();
+
+ // PVC.
+ final Map<VolumeConfigKeys, String> configPVC =
ImmutableMap.<VolumeConfigKeys, String>builder()
+ .put(VolumeConfigKeys.claimName, value)
+ .put(VolumeConfigKeys.storageClassName, value)
+ .put(VolumeConfigKeys.sizeLimit, value)
+ .put(VolumeConfigKeys.accessModes, value)
+ .put(VolumeConfigKeys.volumeMode, value)
+ .put(VolumeConfigKeys.path, value)
+ .put(VolumeConfigKeys.subPath, value)
+ .put(VolumeConfigKeys.readOnly, "true")
+ .build();
+ final V1VolumeMount volumeMountPVC = new V1VolumeMountBuilder()
+ .withName(volumeNamePVC)
+ .withMountPath(value)
+ .withSubPath(value)
+ .withReadOnly(true)
+ .build();
+ testCases.add(new TestTuple<>("PVC volume mount",
+ new Pair<>(volumeNamePVC, configPVC), volumeMountPVC));
+
+ // Host Path.
+ final Map<VolumeConfigKeys, String> configHostPath =
+ ImmutableMap.<VolumeConfigKeys, String>builder()
+ .put(VolumeConfigKeys.type, "DirectoryOrCreate")
+ .put(VolumeConfigKeys.pathOnHost, value)
+ .put(VolumeConfigKeys.path, value)
+ .put(VolumeConfigKeys.subPath, value)
+ .put(VolumeConfigKeys.readOnly, "true")
+ .build();
+ final V1VolumeMount volumeMountHostPath = new V1VolumeMountBuilder()
+ .withName(volumeNameHostPath)
+ .withMountPath(value)
+ .withSubPath(value)
+ .withReadOnly(true)
+ .build();
+ testCases.add(new TestTuple<>("Host Path volume mount",
+ new Pair<>(volumeNameHostPath, configHostPath), volumeMountHostPath));
+
+ // Empty Dir.
+ final Map<VolumeConfigKeys, String> configEmptyDir =
+ ImmutableMap.<VolumeConfigKeys, String>builder()
+ .put(VolumeConfigKeys.sizeLimit, value)
+ .put(VolumeConfigKeys.medium, "Memory")
+ .put(VolumeConfigKeys.path, value)
+ .put(VolumeConfigKeys.subPath, value)
+ .put(VolumeConfigKeys.readOnly, "true")
+ .build();
+ final V1VolumeMount volumeMountEmptyDir = new V1VolumeMountBuilder()
+ .withName(volumeNameEmptyDir)
+ .withMountPath(value)
+ .withSubPath(value)
+ .withReadOnly(true)
+ .build();
+ testCases.add(new TestTuple<>("Empty Dir volume mount",
+ new Pair<>(volumeNameEmptyDir, configEmptyDir), volumeMountEmptyDir));
+
+ // NFS.
+ final Map<VolumeConfigKeys, String> configNFS =
ImmutableMap.<VolumeConfigKeys, String>builder()
+ .put(VolumeConfigKeys.server, "nfs.server.address")
+ .put(VolumeConfigKeys.readOnly, "true")
+ .put(VolumeConfigKeys.pathOnNFS, value)
+ .put(VolumeConfigKeys.path, value)
+ .put(VolumeConfigKeys.subPath, value)
+ .build();
+ final V1VolumeMount volumeMountNFS = new V1VolumeMountBuilder()
+ .withName(volumeNameNFS)
+ .withMountPath(value)
+ .withSubPath(value)
+ .withReadOnly(true)
+ .build();
+ testCases.add(new TestTuple<>("NFS volume mount",
+ new Pair<>(volumeNameNFS, configNFS), volumeMountNFS));
+
+ // Test loop.
+ for (TestTuple<Pair<String, Map<VolumeConfigKeys, String>>, V1VolumeMount>
testCase
+ : testCases) {
+ V1VolumeMount actual = v1ControllerPodTemplate.createVolumeMountsCLI(
+ testCase.input.first, testCase.input.second);
+ Assert.assertEquals(testCase.description, testCase.expected, actual);
+ }
+
+ }
+
+ @Test
+ public void testCreateVolumeAndMountsEmptyDirCLI() {
+ final String volumeName = "volume-name-empty-dir";
+ final String medium = "Memory";
+ final String sizeLimit = "1Gi";
+ final String path = "/path/to/mount";
+ final String subPath = "/sub/path/to/mount";
+
+ // Empty Dir.
+ final Map<String, Map<VolumeConfigKeys, String>> config =
+ ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
+ {
+ put(VolumeConfigKeys.sizeLimit, sizeLimit);
+ put(VolumeConfigKeys.medium, "Memory");
+ put(VolumeConfigKeys.path, path);
+ put(VolumeConfigKeys.subPath, subPath);
+ }
+ });
+ final List<V1Volume> expectedVolumes = Collections.singletonList(
+ new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewEmptyDir()
+ .withMedium(medium)
+ .withNewSizeLimit(sizeLimit)
+ .endEmptyDir()
+ .build()
+ );
+ final List<V1VolumeMount> expectedMounts = Collections.singletonList(
+ new V1VolumeMountBuilder()
+ .withName(volumeName)
+ .withMountPath(path)
+ .withSubPath(subPath)
+ .build()
+ );
+
+ List<V1Volume> actualVolumes = new LinkedList<>();
+ List<V1VolumeMount> actualMounts = new LinkedList<>();
+ v1ControllerPodTemplate.createVolumeAndMountsEmptyDirCLI(config,
actualVolumes, actualMounts);
+ Assert.assertEquals("Empty Dir Volume populated", expectedVolumes,
actualVolumes);
+ Assert.assertEquals("Empty Dir Volume Mount populated", expectedMounts,
actualMounts);
+ }
+
+ @Test
+ public void testCreateVolumeAndMountsHostPathCLI() {
+ final String volumeName = "volume-name-host-path";
+ final String type = "DirectoryOrCreate";
+ final String pathOnHost = "path.on.host";
+ final String path = "/path/to/mount";
+ final String subPath = "/sub/path/to/mount";
+
+ // Host Path.
+ final Map<String, Map<VolumeConfigKeys, String>> config =
+ ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
+ {
+ put(VolumeConfigKeys.type, type);
+ put(VolumeConfigKeys.pathOnHost, pathOnHost);
+ put(VolumeConfigKeys.path, path);
+ put(VolumeConfigKeys.subPath, subPath);
+ }
+ });
+ final List<V1Volume> expectedVolumes = Collections.singletonList(
+ new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewHostPath()
+ .withNewType(type)
+ .withNewPath(pathOnHost)
+ .endHostPath()
+ .build()
+ );
+ final List<V1VolumeMount> expectedMounts = Collections.singletonList(
+ new V1VolumeMountBuilder()
+ .withName(volumeName)
+ .withMountPath(path)
+ .withSubPath(subPath)
+ .build()
+ );
+
+ List<V1Volume> actualVolumes = new LinkedList<>();
+ List<V1VolumeMount> actualMounts = new LinkedList<>();
+ v1ControllerPodTemplate.createVolumeAndMountsHostPathCLI(config,
actualVolumes, actualMounts);
+ Assert.assertEquals("Host Path Volume populated", expectedVolumes,
actualVolumes);
+ Assert.assertEquals("Host Path Volume Mount populated", expectedMounts,
actualMounts);
+ }
+
+ @Test
+ public void testCreateVolumeAndMountsNFSCLI() {
+ final String volumeName = "volume-name-nfs";
+ final String server = "nfs.server.address";
+ final String pathOnNFS = "path.on.host";
+ final String readOnly = "true";
+ final String path = "/path/to/mount";
+ final String subPath = "/sub/path/to/mount";
+
+ // NFS.
+ final Map<String, Map<VolumeConfigKeys, String>> config =
+ ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
+ {
+ put(VolumeConfigKeys.server, server);
+ put(VolumeConfigKeys.readOnly, readOnly);
+ put(VolumeConfigKeys.pathOnNFS, pathOnNFS);
+ put(VolumeConfigKeys.path, path);
+ put(VolumeConfigKeys.subPath, subPath);
+ }
+ });
+ final List<V1Volume> expectedVolumes = Collections.singletonList(
+ new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewNfs()
+ .withServer(server)
+ .withNewPath(pathOnNFS)
+ .withNewReadOnly(readOnly)
+ .endNfs()
+ .build()
+ );
+ final List<V1VolumeMount> expectedMounts = Collections.singletonList(
+ new V1VolumeMountBuilder()
+ .withName(volumeName)
+ .withMountPath(path)
+ .withSubPath(subPath)
+ .withReadOnly(true)
+ .build()
+ );
+
+ List<V1Volume> actualVolumes = new LinkedList<>();
+ List<V1VolumeMount> actualMounts = new LinkedList<>();
+ v1ControllerPodTemplate.createVolumeAndMountsNFSCLI(config, actualVolumes,
actualMounts);
+ Assert.assertEquals("NFS Volume populated", expectedVolumes,
actualVolumes);
+ Assert.assertEquals("NFS Volume Mount populated", expectedMounts,
actualMounts);
+ }
}
diff --git a/website2/docs/schedulers-k8s-execution-environment.md
b/website2/docs/schedulers-k8s-execution-environment.md
index cbc1956..010bd73 100644
--- a/website2/docs/schedulers-k8s-execution-environment.md
+++ b/website2/docs/schedulers-k8s-execution-environment.md
@@ -1,7 +1,8 @@
---
id: schedulers-k8s-execution-environment
title: Kubernetes Execution Environment Customization
-sidebar_label: Kubernetes Execution Environment Customization
+hide_title: true
+sidebar_label: Kubernetes Environment Customization
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
@@ -20,34 +21,12 @@ sidebar_label: Kubernetes Execution Environment
Customization
under the License.
-->
-# Customizing the Heron Execution Environment
+# Customizing the Heron Execution Environment in Kubernetes
This document demonstrates how you can customize various aspects of the Heron
execution environment when using the Kubernetes Scheduler.
<br>
-***Table of contents:***
-- [Customizing the Heron Execution
Environment](#customizing-the-heron-execution-environment)
- - [Customizing a Topology's Execution Environment Using Pod
Templates](#customizing-a-topologys-execution-environment-using-pod-templates)
- - [Preparation](#preparation)
- - [Pod Templates](#pod-templates)
- - [Configuration Maps](#configuration-maps)
- - [Submitting](#submitting)
- - [Heron Configured Items in Pod
Templates](#heron-configured-items-in-pod-templates)
- - [Executor and Manager Containers](#executor-and-manager-containers)
- - [Pod](#pod)
- - [Adding Persistent Volumes via the Command Line
Interface](#adding-persistent-volumes-via-the-command-line-interface)
- - [Usage](#usage)
- - [Example](#example)
- - [Submitting](#submitting-1)
- - [Required and Optional Configuration
Items](#required-and-optional-configuration-items)
- - [Configuration Items Created and Entries
Made](#configuration-items-created-and-entries-made)
- - [Setting Limits and Requests via the Command Line
Interface](#setting-limits-and-requests-via-the-command-line-interface)
- - [Usage](#usage-1)
- - [Example](#example-1)
-
-<br>
-
---
<br>
@@ -243,7 +222,7 @@ The following items will be set in the Pod Template's
`spec` by Heron.
---
<br>
-## Adding Persistent Volumes via the Command Line Interface
+## Adding Persistent Volumes via the Command-line Interface
<br>
@@ -269,7 +248,7 @@ metadata:
> ***System Administrators:***
>
-> * You may wish to disable the ability to configure Persistent Volume Claims
specified via the CLI. To achieve this, you must pass the define option `-D
heron.kubernetes.persistent.volume.claims.cli.disabled=true`to the Heron API
Server on the command line when launching. This command has been added to the
Kubernetes configuration files to deploy the Heron API Server and is set to
`false` by default.
+> * You may wish to disable the ability to configure Persistent Volume Claims
specified via the CLI. To achieve this, you must pass the define option `-D
heron.kubernetes.volume.from.cli.disabled=true`to the Heron API Server on the
command line when launching. This command has been added to the Kubernetes
configuration files to deploy the Heron API Server and is set to `false` by
default.
> * If you have a custom `Role`/`ClusterRole` for the Heron API Server you
> will need to ensure the `ServiceAccount` attached to the API server has the
> correct permissions to access the `Persistent Volume Claim`s:
>
>```yaml
@@ -304,12 +283,13 @@ The currently supported CLI `options` are:
* `volumeMode`
* `path`
* `subPath`
+* `readOnly`
***Note:*** A `claimName` of `OnDemand` will create unique Volumes for each
`Heron container` as well as deploy a Persistent Volume Claim for each Volume.
Any other claim name will result in a shared Volume being created between all
Pods in the topology.
***Note:*** The `accessModes` must be a comma-separated list of values
*without* any white space. Valid values can be found in the [Kubernetes
documentation](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes).
-***Note:*** If a `storageClassName` is specified and there are no matching
Persistent Volumes then [dynamic
provisioning](https://kubernetes.io/docs/concepts/storage/dynamic-provisioning/)
must be enabled. Kubernetes will attempt to locate a Persistent Volume that
matches the `storageClassName` before it attempts to use dynamic provisioning.
If a `storageClassName` is not specified there must be [Persistent
Volumes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-persi
[...]
+***Note:*** If a `storageClassName` is specified and there are no matching
Persistent Volumes then [dynamic
provisioning](https://kubernetes.io/docs/concepts/storage/dynamic-provisioning/)
must be enabled. Kubernetes will attempt to locate a Persistent Volume that
matches the `storageClassName` before it attempts to use dynamic provisioning.
If a `storageClassName` is not specified there must be [Persistent
Volumes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-persi
[...]
<br>
@@ -402,7 +382,7 @@ spec:
resources:
requests:
storage: 555Gi
- storageClassName: standard
+ storageClassName: ""
volumeMode: volume-mode-of-choice
```
@@ -458,6 +438,7 @@ The following table outlines CLI options which are either
***required*** ( '
| `accessModes` | ✅ | ✅ | ❌
| `sizeLimit` | ❔ | ❔ | ❌
| `volumeMode` | ❔ | ❔ | ❌
+| `readOnly` | ❔ | ❔ | ❔
<br>
@@ -473,10 +454,10 @@ A `Volume` and a `Volume Mount` will be created for each
`volume name` which you
| Name | Description | Policy |
|---|---|---|
-| `VOLUME NAME` | The `name` of the `Volume`. | Entries made in the
`Persistent Volume Claim`'s spec, the Pod Spec's `Volumes`, and the `Heron
containers` `volumeMounts`.
+| `VOLUME NAME` | The `name` of the `Volume`. | Entries made in the
`Persistent Volume Claim`'s spec, the Pod Spec's `Volumes`, and the `Heron
container`'s `volumeMounts`.
| `claimName` | A Claim name for the Persistent Volume. | If `OnDemand` is
provided as the parameter then a unique Volume and Persistent Volume Claim will
be created. Any other name will result in a shared Volume between all Pods in
the topology with only a Volume and Volume Mount being added.
-| `path` | The `mountPath` of the `Volume`. | Entries made in the `Heron
containers` `volumeMounts`.
-| `subPath` | The `subPath` of the `Volume`. | Entries made in the `Heron
containers` `volumeMounts`.
+| `path` | The `mountPath` of the `Volume`. | Entries made in the `Heron
container`'s `volumeMounts`.
+| `subPath` | The `subPath` of the `Volume`. | Entries made in the `Heron
container`'s `volumeMounts`.
| `storageClassName` | The identifier name used to reference the dynamic
`StorageClass`. | Entries made in the `Persistent Volume Claim` and Pod Spec's
`Volume`.
| `accessModes` | A comma-separated list of [access
modes](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes).
| Entries made in the `Persistent Volume Claim`.
| `sizeLimit` | A resource request for storage space
[units](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory).
| Entries made in the `Persistent Volume Claim`.
@@ -489,6 +470,235 @@ A `Volume` and a `Volume Mount` will be created for each
`volume name` which you
<br>
+## Adding Empty Directory, Host Path, and Nework File System Volumes via the
Command-line Interface
+
+<br>
+
+> This section demonstrates how you can specify configurations for `Empty
Dir`, `Host Path`, and `NFS` volumes via the Command Line Interface during the
submit process.
+
+<br/>
+
+It is possible to allocate and configure Volumes with Pod Templates but the
CLI commands extend this to being able to specify Volumes at submission time.
+
+<br>
+
+> ***System Administrators:***
+>
+> * You may wish to disable the ability to configure Volume configurations
specified via the CLI. To achieve this, you must pass the define option `-D
heron.kubernetes.volume.from.cli.disabled=true`to the Heron API Server on the
command line when launching. This command has been added to the Kubernetes
configuration files to deploy the Heron API Server and is set to `false` by
default.
+> * ⚠ ***WARNING*** ⚠ `Host Path` volumes have inherent
[security
concerns](https://kubernetes.io/docs/concepts/storage/volumes/#hostpath). `Host
Path`s can breach the containment provided by containerization and should be
exclusively used with volume mounts set to `read-only`, with usage limited to
testing and development environments.
+
+<br>
+
+### Usage
+
+To configure a Volume on the CLI you must use the `--config-property` option
in combination with the following prefixes:
+
+ * [Empty
Directory](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir):
`heron.kubernetes.[executor | manager].volumes.emptyDir.`
+ * [Host Path](https://kubernetes.io/docs/concepts/storage/volumes/#hostpath):
`heron.kubernetes.[executor | manager].volumes.hostPath.`
+ * [Network File
System](https://kubernetes.io/docs/concepts/storage/volumes/#nfs):
`heron.kubernetes.[executor | manager].volumes.nfs.`
+
+ Heron will not validate your Volume configurations, so please validate them
to ensure they are well-formed. All Volume names must comply with the
[*lowercase
RFC-1123*](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/)
standard.
+
+The command patterns are as follows:
+
+ * Empty Directory: `heron.kubernetes.[executor |
manager].volumes.emptyDir.[VOLUME NAME].[OPTION]=[VALUE]`
+ * Host Path: `heron.kubernetes.[executor | manager].volumes.hostPath.[VOLUME
NAME].[OPTION]=[VALUE]`
+ * Network File System: `heron.kubernetes.[executor |
manager].volumes.nfs.[VOLUME NAME].[OPTION]=[VALUE]`
+
+The currently supported CLI `options` are:
+
+* `medium`
+* `type`
+* `server`
+* `sizeLimit`
+* `pathOnHost`
+* `pathOnNFS`
+* `path`
+* `subPath`
+* `readOnly`
+
+<br>
+
+#### Example
+
+A series of example commands to add Volumes to a `Manager`, and the `YAML`
entries they make in their respective configurations, are as follows.
+
+***Empty Directory:***
+
+```bash
+--config-property
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.medium="Memory"
+--config-property
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.sizeLimit="50Mi"
+--config-property
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.path="empty/dir/path"
+--config-property
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.subPath="empty/dir/sub/path"
+--config-property
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.readOnly="true"
+```
+
+Generated `Volume` entry:
+
+```yaml
+volumes:
+- emptyDir:
+ medium: Memory
+ sizeLimit: 50Mi
+ name: manager-empty-dir
+```
+
+Generated `Volume Mount` entry:
+
+```yaml
+volumeMounts:
+- mountPath: empty/dir/path
+ name: manager-empty-dir
+ readOnly: true
+ subPath: empty/dir/sub/path
+```
+
+<br>
+
+***Host Path:***
+
+```bash
+--config-property
heron.kubernetes.manager.volumes.hostPath.manager-host-path.type="File"
+--config-property
heron.kubernetes.manager.volumes.hostPath.manager-host-path.pathOnHost="/dev/null"
+--config-property
heron.kubernetes.manager.volumes.hostPath.manager-host-path.path="host/path/path"
+--config-property
heron.kubernetes.manager.volumes.hostPath.manager-host-path.subPath="host/path/sub/path"
+--config-property
heron.kubernetes.manager.volumes.hostPath.manager-host-path.readOnly="true"
+```
+
+Generated `Volume` entry:
+
+```yaml
+volumes:
+- hostPath:
+ path: /dev/null
+ type: File
+ name: manager-host-path
+```
+
+Generated `Volume Mount` entry:
+
+```yaml
+volumeMounts:
+- mountPath: host/path/path
+ name: manager-host-path
+ readOnly: true
+ subPath: host/path/sub/path
+```
+
+<br>
+
+***NFS:***
+
+```bash
+--config-property
heron.kubernetes.manager.volumes.nfs.manager-nfs.server="nfs-server.address"
+--config-property
heron.kubernetes.manager.volumes.nfs.manager-nfs.readOnly="true"
+--config-property
heron.kubernetes.manager.volumes.nfs.manager-nfs.pathOnNFS="/dev/null"
+--config-property
heron.kubernetes.manager.volumes.nfs.manager-nfs.path="nfs/path"
+--config-property
heron.kubernetes.manager.volumes.nfs.manager-nfs.subPath="nfs/sub/path"
+--config-property
heron.kubernetes.manager.volumes.nfs.manager-nfs.readOnly="true"
+```
+
+Generated `Volume` entry:
+
+```yaml
+volumes:
+- name: manager-nfs
+ nfs:
+ path: /dev/null
+ readOnly: true
+ server: nfs-server.address
+```
+
+Generated `Volume Mount` entry:
+
+```yaml
+volumeMounts:
+- mountPath: nfs/path
+ name: manager-nfs
+ readOnly: true
+ subPath: nfs/sub/path
+```
+
+<br>
+
+### Submitting
+
+A series of example commands to sumbit a topology using the example CLI
commands above:
+
+```bash
+heron submit kubernetes \
+
--service-url=http://localhost:8001/api/v1/namespaces/default/services/heron-apiserver:9000/proxy
\
+ ~/.heron/examples/heron-api-examples.jar \
+ org.apache.heron.examples.api.AckingTopology acking \
+\
+--config-property
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.medium="Memory" \
+--config-property
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.sizeLimit="50Mi" \
+--config-property
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.path="empty/dir/path"
\
+--config-property
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.subPath="empty/dir/sub/path"
\
+--config-property
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.readOnly="true" \
+\
+--config-property
heron.kubernetes.manager.volumes.hostPath.manager-host-path.type="File" \
+--config-property
heron.kubernetes.manager.volumes.hostPath.manager-host-path.pathOnHost="/dev/null"
\
+--config-property
heron.kubernetes.manager.volumes.hostPath.manager-host-path.path="host/path/path"
\
+--config-property
heron.kubernetes.manager.volumes.hostPath.manager-host-path.subPath="host/path/sub/path"
\
+--config-property
heron.kubernetes.manager.volumes.hostPath.manager-host-path.readOnly="true" \
+\
+--config-property
heron.kubernetes.manager.volumes.nfs.manager-nfs.server="nfs-server.address" \
+--config-property
heron.kubernetes.manager.volumes.nfs.manager-nfs.readOnly="true" \
+--config-property
heron.kubernetes.manager.volumes.nfs.manager-nfs.pathOnNFS="/dev/null" \
+--config-property
heron.kubernetes.manager.volumes.nfs.manager-nfs.path="nfs/path" \
+--config-property
heron.kubernetes.manager.volumes.nfs.manager-nfs.subPath="nfs/sub/path" \
+--config-property
heron.kubernetes.manager.volumes.nfs.manager-nfs.readOnly="true"
+```
+
+### Required and Optional Configuration Items
+
+The following table outlines CLI options which are either ***required*** (
✅ ), ***optional*** ( ❔ ), or ***not available*** ( ❌ )
depending on the type of `Volume`.
+
+| Option | emptyDir | hostPath | NFS
+|---|---|---|---|
+| `VOLUME NAME` | ✅ | ✅ | ✅
+| `path` | ✅ | ✅ | ✅
+| `subPath` | ❔ | ❔ | ❔
+| `readOnly` | ❔ | ❔ | ❔
+| `medium` | ❔ | ❌ | ❌
+| `sizeLimit` | ❔ | ❌ | ❌
+| `pathOnHost` | ❌ | ✅ | ❌
+| `type` | ❌ | ❔ | ❌
+| `pathOnNFS` | ❌ | ❌ | ✅
+| `server` | ❌ | ❌ | ✅
+
+<br>
+
+***Note:*** The `VOLUME NAME` will be extracted from the CLI command.
+
+<br>
+
+### Configuration Items Created and Entries Made
+
+The configuration items and entries in the tables below will made in their
respective areas.
+
+A `Volume` and a `Volume Mount` will be created for each `volume name` which
you specify.
+
+| Name | Description | Policy |
+|---|---|---|
+| `VOLUME NAME` | The `name` of the `Volume`. | Entries are made in the Pod
Spec's `Volumes`, and the `Heron container`'s `volumeMounts`.
+| `path` | The `mountPath` of the `Volume`. | Entries are made in the `Heron
container`'s `volumeMounts`.
+| `subPath` | The `subPath` of the `Volume`. | Entries are made in the `Heron
container`'s `volumeMounts`.
+| `readOnly` | A boolean value which defaults to `false` and indicates whether
the medium has read-write permissions. | Entries are made in the `Heron
container`s `volumeMount`. When used with an `NFS` volume an entry is also made
in the associated `Volume`.
+| `medium` | The type of storage medium that will back the `Empty Dir` and
defaults to "", please read more
[here](https://kubernetes.io/docs/concepts/storage/volumes#emptydir). | An
entry is made in the `Empty Dir`'s `Volume`.
+| `sizeLimit` | Total
[amount](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory)
of local storage required for this `Empty Dir` Volume. | An entry is made
`Empty Dir`'s `Volume`.
+| `pathOnHost` | The directory path to be mounted the host. | A `path` entry
is made `Host Path`'s `Volume`.
+| `type` | The type of the `Host Path` volume and defaults to "", please read
more [here](https://kubernetes.io/docs/concepts/storage/volumes#hostpath). | An
entry is made `Host Path`'s `Volume`.
+| `pathOnNFS` | The directory path to be mounted the NFS server. | A `path`
entry is made `NFS`'s `Volume`.
+| `server` | The hostname or IP address of the NFS server. | An entry is made
`NFS`'s `Volume`.
+
+<br>
+
+---
+
+<br>
+
## Setting Limits and Requests via the Command Line Interface
> This section demonstrates how you can configure a topology's `Executor`
> and/or `Manager` (hereinafter referred to as `Heron containers`) resource
> `Requests` and `Limits` through CLI commands.