This is an automated email from the ASF dual-hosted git repository.

nicknezis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new fd30626  [Heron-3723] Add Kubernetes support for Empty Dir, Host Path, 
and NFS via CLI (#3747)
fd30626 is described below

commit fd30626d70e3cc3284dcc527f9d0883f42ff1157
Author: Saad Ur Rahman <[email protected]>
AuthorDate: Sun Dec 19 01:03:32 2021 -0500

    [Heron-3723] Add Kubernetes support for Empty Dir, Host Path, and NFS via 
CLI (#3747)
    
    Co-authored-by: Nicholas Nezis <[email protected]>
---
 deploy/kubernetes/general/apiserver.yaml           |   2 +-
 deploy/kubernetes/helm/templates/tools.yaml        |   2 +-
 deploy/kubernetes/helm/values.yaml.template        |   4 +-
 deploy/kubernetes/minikube/apiserver.yaml          |   2 +-
 .../scheduler/kubernetes/KubernetesConstants.java  |  27 +-
 .../scheduler/kubernetes/KubernetesContext.java    | 255 ++++++-
 .../heron/scheduler/kubernetes/V1Controller.java   | 294 ++++++--
 .../kubernetes/KubernetesContextTest.java          | 837 +++++++++++++++++++--
 .../scheduler/kubernetes/V1ControllerTest.java     | 348 +++++++--
 .../docs/schedulers-k8s-execution-environment.md   | 272 ++++++-
 10 files changed, 1778 insertions(+), 265 deletions(-)

diff --git a/deploy/kubernetes/general/apiserver.yaml 
b/deploy/kubernetes/general/apiserver.yaml
index 2e6290c..cd89d98 100644
--- a/deploy/kubernetes/general/apiserver.yaml
+++ b/deploy/kubernetes/general/apiserver.yaml
@@ -92,7 +92,7 @@ spec:
               -D 
heron.statefulstorage.classname=org.apache.heron.statefulstorage.dlog.DlogStorage
               -D 
heron.statefulstorage.dlog.namespace.uri=distributedlog://zookeeper:2181/heron
               -D heron.kubernetes.pod.template.disabled=false
-              -D heron.kubernetes.persistent.volume.claims.cli.disabled=false
+              -D heron.kubernetes.volume.from.cli.disabled=false
 
 ---
 apiVersion: v1
diff --git a/deploy/kubernetes/helm/templates/tools.yaml 
b/deploy/kubernetes/helm/templates/tools.yaml
index f418779d6..61ccd25 100644
--- a/deploy/kubernetes/helm/templates/tools.yaml
+++ b/deploy/kubernetes/helm/templates/tools.yaml
@@ -159,7 +159,7 @@ spec:
               {{- end }}
               -D heron.kubernetes.resource.request.mode={{ 
.Values.topologyResourceRequestMode }}
               -D heron.kubernetes.pod.template.disabled={{ 
.Values.disablePodTemplates }}
-              -D heron.kubernetes.persistent.volume.claims.cli.disabled={{ 
.Values.disablePersistentVolumeMountsCLI }}
+              -D heron.kubernetes.volume.from.cli.disabled={{ 
.Values.disableVolumesFromCLI }}
           envFrom:
             - configMapRef:
                 name: {{ .Release.Name }}-tools-config
diff --git a/deploy/kubernetes/helm/values.yaml.template 
b/deploy/kubernetes/helm/values.yaml.template
index 870db57..8814bdf 100644
--- a/deploy/kubernetes/helm/values.yaml.template
+++ b/deploy/kubernetes/helm/values.yaml.template
@@ -77,8 +77,8 @@ packing: RoundRobin # ResourceCompliantRR, FirstFitDecreasing
 # Support for ConfigMap mounted PodTemplates
 disablePodTemplates: false
 
-# Support for Dynamic Persistent Volume Mounts from CLI input
-disablePersistentVolumeMountsCLI: false
+# Support for Voume specification from CLI input
+disableVolumesFromCLI: false
 
 # Number of replicas for storage bookies, memory and storage requirements
 bookieReplicas: 3
diff --git a/deploy/kubernetes/minikube/apiserver.yaml 
b/deploy/kubernetes/minikube/apiserver.yaml
index ce20c90..e11cb8b 100644
--- a/deploy/kubernetes/minikube/apiserver.yaml
+++ b/deploy/kubernetes/minikube/apiserver.yaml
@@ -83,7 +83,7 @@ spec:
               -D 
heron.statefulstorage.classname=org.apache.heron.statefulstorage.dlog.DlogStorage
               -D 
heron.statefulstorage.dlog.namespace.uri=distributedlog://zookeeper:2181/heronbkdl
               -D heron.kubernetes.pod.template.disabled=false
-              -D heron.kubernetes.persistent.volume.claims.cli.disabled=false
+              -D heron.kubernetes.volume.from.cli.disabled=false
 
 ---
 apiVersion: v1
diff --git 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
index c89a0c6..e231fda 100644
--- 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
+++ 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
@@ -22,8 +22,10 @@ package org.apache.heron.scheduler.kubernetes;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.regex.Pattern;
 
 import org.apache.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
@@ -115,13 +117,34 @@ public final class KubernetesConstants {
       )
   );
 
-  enum VolumeClaimTemplateConfigKeys {
+  protected enum VolumeConfigKeys {
     claimName,
     storageClassName,
     sizeLimit,
     accessModes,
     volumeMode,
-    path,               // Added to container.
+    medium,
+    type,
+    readOnly,
+    server,
+    pathOnHost,
+    pathOnNFS,
+    path,               // Added to container, nfsVolume, hostPath.
     subPath,            // Added to container.
   }
+
+  protected static final Set<String> VALID_VOLUME_HOSTPATH_TYPES = 
Collections.unmodifiableSet(
+      new HashSet<String>() {
+        {
+          add("");
+          add("DirectoryOrCreate");
+          add("Directory");
+          add("FileOrCreate");
+          add("File");
+          add("Socket");
+          add("CharDevice");
+          add("BlockDevice");
+        }
+      }
+  );
 }
diff --git 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
index 83aa0b2..242e4f1 100644
--- 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
+++ 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
@@ -27,6 +27,8 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.regex.Matcher;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.heron.scheduler.TopologySubmissionException;
 import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.common.Context;
@@ -114,11 +116,20 @@ public final class KubernetesContext extends Context {
       "heron.kubernetes.pod.secretKeyRef.";
 
   // Persistent Volume Claims
-  public static final String KUBERNETES_PERSISTENT_VOLUME_CLAIMS_CLI_DISABLED =
-      "heron.kubernetes.persistent.volume.claims.cli.disabled";
+  public static final String KUBERNETES_VOLUME_FROM_CLI_DISABLED =
+      "heron.kubernetes.volume.from.cli.disabled";
   // heron.kubernetes.[executor | 
manager].volumes.persistentVolumeClaim.VOLUME_NAME.OPTION=VALUE
   public static final String KUBERNETES_VOLUME_CLAIM_PREFIX =
       "heron.kubernetes.%s.volumes.persistentVolumeClaim.";
+  // heron.kubernetes.[executor | 
manager].volumes.emptyDir.VOLUME_NAME.OPTION=VALUE
+  public static final String KUBERNETES_VOLUME_EMPTYDIR_PREFIX =
+      "heron.kubernetes.%s.volumes.emptyDir.";
+  // heron.kubernetes.[executor | 
manager].volumes.hostPath.VOLUME_NAME.OPTION=VALUE
+  public static final String KUBERNETES_VOLUME_HOSTPATH_PREFIX =
+      "heron.kubernetes.%s.volumes.hostPath.";
+  // heron.kubernetes.[executor | manager].volumes.nfs.VOLUME_NAME.OPTION=VALUE
+  public static final String KUBERNETES_VOLUME_NFS_PREFIX =
+      "heron.kubernetes.%s.volumes.nfs.";
   // heron.kubernetes.[executor | manager].limits.OPTION=VALUE
   public static final String KUBERNETES_RESOURCE_LIMITS_PREFIX =
       "heron.kubernetes.%s.limits.";
@@ -242,8 +253,8 @@ public final class KubernetesContext extends Context {
     return getConfigItemsByPrefix(config, key);
   }
 
-  public static boolean getPersistentVolumeClaimDisabled(Config config) {
-    final String disabled = 
config.getStringValue(KUBERNETES_PERSISTENT_VOLUME_CLAIMS_CLI_DISABLED);
+  public static boolean getVolumesFromCLIDisabled(Config config) {
+    final String disabled = 
config.getStringValue(KUBERNETES_VOLUME_FROM_CLI_DISABLED);
     return "true".equalsIgnoreCase(disabled);
   }
 
@@ -251,15 +262,17 @@ public final class KubernetesContext extends Context {
    * Collects parameters form the <code>CLI</code> and generates a mapping 
between <code>Volumes</code>
    * and their configuration <code>key-value</code> pairs.
    * @param config Contains the configuration options collected from the 
<code>CLI</code>.
-   * @param isExecutor Flag used to collect CLI commands for the 
<code>executor</code> and <code>manager</code>.
+   * @param prefix Configuration key to lookup for options.
+   * @param isExecutor Flag used to switch CLI commands for the 
<code>Executor</code> and <code>Manager</code>.
    * @return A mapping between <code>Volumes</code> and their configuration 
<code>key-value</code> pairs.
    * Will return an empty list if there are no Volume Claim Templates to be 
generated.
    */
-  public static Map<String, 
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>>
-      getVolumeClaimTemplates(Config config, boolean isExecutor) {
+  @VisibleForTesting
+  protected static Map<String, Map<KubernetesConstants.VolumeConfigKeys, 
String>>
+      getVolumeConfigs(final Config config, final String prefix, final boolean 
isExecutor) {
     final Logger LOG = Logger.getLogger(V1Controller.class.getName());
 
-    final String prefixKey = String.format(KUBERNETES_VOLUME_CLAIM_PREFIX,
+    final String prefixKey = String.format(prefix,
         isExecutor ? KubernetesConstants.EXECUTOR_NAME : 
KubernetesConstants.MANAGER_NAME);
     final Set<String> completeConfigParam = getConfigKeys(config, prefixKey);
     final int prefixLength = prefixKey.length();
@@ -267,19 +280,17 @@ public final class KubernetesContext extends Context {
     final int optionIdx = 1;
     final Matcher matcher = 
KubernetesConstants.VALID_LOWERCASE_RFC_1123_REGEX.matcher("");
 
-    final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, 
String>> volumes
-        = new HashMap<>();
+    final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
volumes = new HashMap<>();
 
     try {
       for (String param : completeConfigParam) {
         final String[] tokens = param.substring(prefixLength).split("\\.");
         final String volumeName = tokens[volumeNameIdx];
-        final KubernetesConstants.VolumeClaimTemplateConfigKeys key =
-            
KubernetesConstants.VolumeClaimTemplateConfigKeys.valueOf(tokens[optionIdx]);
+        final KubernetesConstants.VolumeConfigKeys key =
+            KubernetesConstants.VolumeConfigKeys.valueOf(tokens[optionIdx]);
         final String value = config.getStringValue(param);
 
-        Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String> volume =
-            volumes.get(volumeName);
+        Map<KubernetesConstants.VolumeConfigKeys, String> volume = 
volumes.get(volumeName);
         if (volume == null) {
           // Validate new Volume Names.
           if (!matcher.reset(volumeName).matches()) {
@@ -291,31 +302,209 @@ public final class KubernetesContext extends Context {
           volumes.put(volumeName, volume);
         }
 
-        /* Validate Claim and Storage Class names.
-          [1] `claimNameNotOnDemand`: checks for a `claimName` which is not 
`OnDemand`.
-          [2] `storageClassName`: Check if it is the provided `option`.
-          Conditions [1] OR [2] are True, then...
-          [3] Check for a valid lowercase RFC-1123 pattern.
-         */
-        boolean claimNameNotOnDemand =
-            
KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName.equals(key)
-                && 
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(value);
-        if ((claimNameNotOnDemand // [1]
-            ||
-            
KubernetesConstants.VolumeClaimTemplateConfigKeys.storageClassName.equals(key)) 
// [2]
-            && !matcher.reset(value).matches()) { // [3]
-          throw new TopologySubmissionException(
-              String.format("Option `%s` value `%s` does not match lowercase 
RFC-1123 pattern",
-                  key, value));
-        }
-
         volume.put(key, value);
       }
     } catch (IndexOutOfBoundsException | IllegalArgumentException e) {
-      final String message = "Invalid Persistent Volume Claim CLI parameter 
provided";
+      final String message = "Invalid Volume configuration option provided on 
CLI";
       LOG.log(Level.CONFIG, message);
       throw new TopologySubmissionException(message);
     }
+
+    // All Volumes must contain a path.
+    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
volume
+        : volumes.entrySet()) {
+      final String path = 
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.path);
+      if (path == null || path.isEmpty()) {
+        throw new TopologySubmissionException(String.format("Volume `%s`: All 
Volumes require a"
+            + " 'path'.", volume.getKey()));
+      }
+    }
+
+    // Check to see if functionality is disabled.
+    if (KubernetesContext.getVolumesFromCLIDisabled(config) && 
!volumes.isEmpty()) {
+      final String message = "Configuring Volumes from the CLI is disabled.";
+      LOG.log(Level.WARNING, message);
+      throw new TopologySubmissionException(message);
+    }
+
+    return volumes;
+  }
+
+  /**
+   * Collects parameters form the <code>CLI</code> and validates options for 
<code>PVC</code>s.
+   * @param config Contains the configuration options collected from the 
<code>CLI</code>.
+   * @param isExecutor Flag used to collect CLI commands for the 
<code>Executor</code> and <code>Manager</code>.
+   * @return A mapping between <code>Volumes</code> and their configuration 
<code>key-value</code> pairs.
+   * Will return an empty list if there are no Volume Claim Templates to be 
generated.
+   */
+  public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+      getVolumeClaimTemplates(final Config config, final boolean isExecutor) {
+    final Matcher matcher = 
KubernetesConstants.VALID_LOWERCASE_RFC_1123_REGEX.matcher("");
+
+    final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
volumes =
+        getVolumeConfigs(config, 
KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX, isExecutor);
+
+    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
volume
+        : volumes.entrySet()) {
+
+      // Claim name is required.
+      if 
(!volume.getValue().containsKey(KubernetesConstants.VolumeConfigKeys.claimName))
 {
+        throw new TopologySubmissionException(String.format("Volume `%s`: 
Persistent Volume"
+            + " Claims require a `claimName`.", volume.getKey()));
+      }
+
+      for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> volumeConfig
+          : volume.getValue().entrySet()) {
+        final KubernetesConstants.VolumeConfigKeys key = volumeConfig.getKey();
+        final String value = volumeConfig.getValue();
+
+        switch (key) {
+          case claimName:
+            // Claim names which are not OnDemand should be lowercase RFC-1123.
+            if (!matcher.reset(value).matches()
+                && 
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(value)) {
+              throw new TopologySubmissionException(String.format("Volume 
`%s`: `claimName` does"
+                  + " not match lowercase RFC-1123 pattern", volume.getKey()));
+            }
+            break;
+          case storageClassName:
+            if (!matcher.reset(value).matches()) {
+              throw new TopologySubmissionException(String.format("Volume 
`%s`: `storageClassName`"
+                  + " does not match lowercase RFC-1123 pattern", 
volume.getKey()));
+            }
+            break;
+          case sizeLimit: case accessModes: case volumeMode: case readOnly: 
case path: case subPath:
+            break;
+          default:
+            throw new TopologySubmissionException(String.format("Volume `%s`: 
Invalid Persistent"
+                + " Volume Claim type option for '%s'", volume.getKey(), key));
+        }
+      }
+    }
+
+    return volumes;
+  }
+
+  /**
+   * Collects parameters form the <code>CLI</code> and validates options for 
<code>Empty Directory</code>s.
+   * @param config Contains the configuration options collected from the 
<code>CLI</code>.
+   * @param isExecutor Flag used to collect CLI commands for the 
<code>Executor</code> and <code>Manager</code>.
+   * @return A mapping between <code>Volumes</code> and their configuration 
<code>key-value</code> pairs.
+   * Will return an empty list if there are no Volume Claim Templates to be 
generated.
+   */
+  public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+      getVolumeEmptyDir(final Config config, final boolean isExecutor) {
+    final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
volumes =
+        getVolumeConfigs(config, 
KubernetesContext.KUBERNETES_VOLUME_EMPTYDIR_PREFIX, isExecutor);
+
+    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
volume
+        : volumes.entrySet()) {
+      final String medium = 
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.medium);
+
+      if (medium != null && !medium.isEmpty() && !"Memory".equals(medium)) {
+        throw new TopologySubmissionException(String.format("Volume `%s`: 
Empty Directory"
+            + " 'medium' must be 'Memory' or empty.", volume.getKey()));
+      }
+      for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> volumeConfig
+          : volume.getValue().entrySet()) {
+        final KubernetesConstants.VolumeConfigKeys key = volumeConfig.getKey();
+
+        switch (key) {
+          case sizeLimit: case medium: case readOnly: case path: case subPath:
+            break;
+          default:
+            throw new TopologySubmissionException(String.format("Volume `%s`: 
Invalid Empty"
+                + " Directory type option for '%s'", volume.getKey(), key));
+        }
+      }
+    }
+
+    return volumes;
+  }
+
+  /**
+   * Collects parameters form the <code>CLI</code> and validates options for 
<code>Host Path</code>s.
+   * @param config Contains the configuration options collected from the 
<code>CLI</code>.
+   * @param isExecutor Flag used to collect CLI commands for the 
<code>Executor</code> and <code>Manager</code>.
+   * @return A mapping between <code>Volumes</code> and their configuration 
<code>key-value</code> pairs.
+   * Will return an empty list if there are no Volume Claim Templates to be 
generated.
+   */
+  public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+      getVolumeHostPath(final Config config, final boolean isExecutor) {
+    final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
volumes =
+        getVolumeConfigs(config, 
KubernetesContext.KUBERNETES_VOLUME_HOSTPATH_PREFIX, isExecutor);
+
+    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
volume
+        : volumes.entrySet()) {
+      final String type = 
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.type);
+      if (type != null && 
!KubernetesConstants.VALID_VOLUME_HOSTPATH_TYPES.contains(type)) {
+        throw new TopologySubmissionException(String.format("Volume `%s`: Host 
Path"
+            + " 'type' of '%s' is invalid.", volume.getKey(), type));
+      }
+      final String hostOnPath =
+          
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.pathOnHost);
+      if (hostOnPath == null || hostOnPath.isEmpty()) {
+        throw new TopologySubmissionException(String.format("Volume `%s`: Host 
Path  requires a"
+            + " path on the host.", volume.getKey()));
+      }
+
+      for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> volumeConfig
+          : volume.getValue().entrySet()) {
+        final KubernetesConstants.VolumeConfigKeys key = volumeConfig.getKey();
+
+        switch (key) {
+          case type: case pathOnHost: case readOnly: case path: case subPath:
+            break;
+          default:
+            throw new TopologySubmissionException(String.format("Volume `%s`: 
Invalid Host Path"
+                + " option for '%s'", volume.getKey(), key));
+        }
+      }
+    }
+
+    return volumes;
+  }
+
+  /**
+   * Collects parameters form the <code>CLI</code> and validates options for 
<code>NFS</code>s.
+   * @param config Contains the configuration options collected from the 
<code>CLI</code>.
+   * @param isExecutor Flag used to collect CLI commands for the 
<code>Executor</code> and <code>Manager</code>.
+   * @return A mapping between <code>Volumes</code> and their configuration 
<code>key-value</code> pairs.
+   * Will return an empty list if there are no Volume Claim Templates to be 
generated.
+   */
+  public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+      getVolumeNFS(final Config config, final boolean isExecutor) {
+    final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
volumes =
+        getVolumeConfigs(config, 
KubernetesContext.KUBERNETES_VOLUME_NFS_PREFIX, isExecutor);
+
+    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
volume
+        : volumes.entrySet()) {
+      final String server = 
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.server);
+      if (server == null || server.isEmpty()) {
+        throw new TopologySubmissionException(String.format("Volume `%s`: 
`NFS` volumes require a"
+            + " `server` to be specified", volume.getKey()));
+      }
+      final String hostOnNFS =
+          
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.pathOnNFS);
+      if (hostOnNFS == null || hostOnNFS.isEmpty()) {
+        throw new TopologySubmissionException(String.format("Volume `%s`: NFS 
requires a path on"
+            + " the NFS server.", volume.getKey()));
+      }
+
+      for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> volumeConfig
+          : volume.getValue().entrySet()) {
+        final KubernetesConstants.VolumeConfigKeys key = volumeConfig.getKey();
+
+        switch (key) {
+          case server: case pathOnNFS: case readOnly: case path: case subPath:
+            break;
+          default:
+            throw new TopologySubmissionException(String.format("Volume `%s`: 
Invalid NFS option"
+                + " for '%s'", volume.getKey(), key));
+        }
+      }
+    }
+
     return volumes;
   }
 
diff --git 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index 96793b6..3bde340 100644
--- 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++ 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -103,8 +103,10 @@ public class V1Controller extends KubernetesController {
     super(configuration, runtimeConfiguration);
 
     isPodTemplateDisabled = 
KubernetesContext.getPodTemplateDisabled(configuration);
-    LOG.log(Level.WARNING, String.format("Custom Pod Templates are %s",
+    LOG.log(Level.WARNING, String.format("Pod Template configuration is %s",
         isPodTemplateDisabled ? "DISABLED" : "ENABLED"));
+    LOG.log(Level.WARNING, String.format("Volume configuration from CLI is %s",
+        KubernetesContext.getVolumesFromCLIDisabled(configuration) ? 
"DISABLED" : "ENABLED"));
 
     try {
       final ApiClient apiClient = 
io.kubernetes.client.util.Config.defaultClient();
@@ -415,17 +417,21 @@ public class V1Controller extends KubernetesController {
     final String topologyName = getTopologyName();
     final Config runtimeConfiguration = getRuntimeConfiguration();
 
-    // Get and then create Persistent Volume Claims from the CLI.
-    final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, 
String>> configsPVC =
+    final List<V1Volume> volumes = new LinkedList<>();
+    final List<V1VolumeMount> volumeMounts = new LinkedList<>();
+
+    // Collect Persistent Volume Claim configurations from the CLI.
+    final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
configsPVC =
         KubernetesContext.getVolumeClaimTemplates(getConfiguration(), 
isExecutor);
-    if (KubernetesContext.getPersistentVolumeClaimDisabled(getConfiguration())
-        && !configsPVC.isEmpty()) {
-      final String message =
-          String.format("'%s': Configuring Persistent Volume Claim from CLI is 
disabled",
-              topologyName);
-      LOG.log(Level.WARNING, message);
-      throw new TopologySubmissionException(message);
-    }
+
+    // Collect all Volume configurations from the CLI and generate Volumes and 
Volume Mounts.
+    createVolumeAndMountsPersistentVolumeClaimCLI(configsPVC, volumes, 
volumeMounts);
+    createVolumeAndMountsHostPathCLI(
+        KubernetesContext.getVolumeHostPath(getConfiguration(), isExecutor), 
volumes, volumeMounts);
+    createVolumeAndMountsEmptyDirCLI(
+        KubernetesContext.getVolumeEmptyDir(getConfiguration(), isExecutor), 
volumes, volumeMounts);
+    createVolumeAndMountsNFSCLI(
+        KubernetesContext.getVolumeNFS(getConfiguration(), isExecutor), 
volumes, volumeMounts);
 
     final V1StatefulSet statefulSet = new V1StatefulSet();
 
@@ -465,7 +471,8 @@ public class V1Controller extends KubernetesController {
     templateMetaData.setAnnotations(annotations);
     podTemplateSpec.setMetadata(templateMetaData);
 
-    configurePodSpec(podTemplateSpec, containerResource, numberOfInstances, 
isExecutor, configsPVC);
+    configurePodSpec(podTemplateSpec, containerResource, numberOfInstances, 
isExecutor,
+        volumes, volumeMounts);
 
     statefulSetSpec.setTemplate(podTemplateSpec);
 
@@ -519,11 +526,12 @@ public class V1Controller extends KubernetesController {
    * @param resource Passed down to configure the resource limits.
    * @param numberOfInstances Passed down to configure the ports.
    * @param isExecutor Flag used to configure components specific to 
<code>Executor</code> and <code>Manager</code>.
-   * @param configPVC <code>Persistent Volume Claim</code> configurations 
options.
+   * @param volumes <code>Volumes</code> generated from configurations options.
+   * @param volumeMounts <code>Volume Mounts</code> generated from 
configurations options.
    */
   private void configurePodSpec(final V1PodTemplateSpec podTemplateSpec, 
Resource resource,
-      int numberOfInstances, boolean isExecutor,
-      final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, 
String>> configPVC) {
+      int numberOfInstances, boolean isExecutor, List<V1Volume> volumes,
+      List<V1VolumeMount> volumeMounts) {
     if (podTemplateSpec.getSpec() == null) {
       podTemplateSpec.setSpec(new V1PodSpec());
     }
@@ -561,8 +569,9 @@ public class V1Controller extends KubernetesController {
       containers.add(heronContainer);
     }
 
-    if (!configPVC.isEmpty()) {
-      configurePodWithPersistentVolumeClaimVolumesAndMounts(podSpec, 
heronContainer, configPVC);
+    if (!volumes.isEmpty() || !volumeMounts.isEmpty()) {
+      configurePodWithVolumesAndMountsFromCLI(podSpec, heronContainer, volumes,
+          volumeMounts);
     }
 
     configureHeronContainer(resource, numberOfInstances, heronContainer, 
isExecutor);
@@ -1043,17 +1052,16 @@ public class V1Controller extends KubernetesController {
    */
   @VisibleForTesting
   protected List<V1PersistentVolumeClaim> createPersistentVolumeClaims(
-      final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, 
String>> mapOfOpts) {
+      final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
mapOfOpts) {
 
     List<V1PersistentVolumeClaim> listOfPVCs = new LinkedList<>();
 
     // Iterate over all the PVC Volumes.
-    for (Map.Entry<String, 
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>> pvc
+    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
pvc
         : mapOfOpts.entrySet()) {
 
       // Only create claims for `OnDemand` volumes.
-      final String claimName = pvc.getValue()
-          .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName);
+      final String claimName = 
pvc.getValue().get(KubernetesConstants.VolumeConfigKeys.claimName);
       if (claimName != null && 
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
         continue;
       }
@@ -1064,11 +1072,12 @@ public class V1Controller extends KubernetesController {
             .withLabels(getPersistentVolumeClaimLabels(getTopologyName()))
           .endMetadata()
           .withNewSpec()
+            .withStorageClassName("")
           .endSpec()
           .build();
 
       // Populate PVC options.
-      for (Map.Entry<KubernetesConstants.VolumeClaimTemplateConfigKeys, 
String> option
+      for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> option
           : pvc.getValue().entrySet()) {
         String optionValue = option.getValue();
         switch(option.getKey()) {
@@ -1087,12 +1096,8 @@ public class V1Controller extends KubernetesController {
             claim.getSpec().setVolumeMode(optionValue);
             break;
           // Valid ignored options not used in a PVC.
-          case path: case subPath: case claimName:
-            break;
           default:
-            throw new TopologySubmissionException(
-                String.format("Invalid Persistent Volume Claim type option for 
'%s'",
-                    option.getKey()));
+            break;
         }
       }
       listOfPVCs.add(claim);
@@ -1101,80 +1106,212 @@ public class V1Controller extends KubernetesController 
{
   }
 
   /**
-   * Generates the <code>Volume</code> and <code>Volume Mounts</code> to be 
placed in the <code>executor container</code>.
-   * @param mapConfig Mapping of <code>Volumes</code> to 
<code>key-value</code> configuration pairs.
-   * @return A pair of configured lists of <code>V1Volume</code> and 
<code>V1VolumeMount</code>.
+   * Generates the <code>Volume Mounts</code> to be placed in the 
<code>Executor</code>
+   * and <code>Manager</code> from options on the CLI.
+   * @param volumeName Name of the <code>Volume</code>.
+   * @param configs Mapping of <code>Volume</code> option 
<code>key-value</code> configuration pairs.
+   * @return A configured <code>V1VolumeMount</code>.
+   */
+  @VisibleForTesting
+  protected V1VolumeMount createVolumeMountsCLI(final String volumeName,
+      final Map<KubernetesConstants.VolumeConfigKeys, String> configs) {
+    final V1VolumeMount volumeMount = new V1VolumeMountBuilder()
+        .withName(volumeName)
+        .build();
+    for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config : 
configs.entrySet()) {
+      switch (config.getKey()) {
+        case path:
+          volumeMount.mountPath(config.getValue());
+          break;
+        case subPath:
+          volumeMount.subPath(config.getValue());
+          break;
+        case readOnly:
+          volumeMount.readOnly(Boolean.parseBoolean(config.getValue()));
+          break;
+        default:
+          break;
+      }
+    }
+
+    return volumeMount;
+  }
+
+  /**
+   * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for 
<code>Persistent Volume Claims</code>s
+   *  to be placed in the <code>Executor</code> and <code>Manager</code> from 
options on the CLI.
+   * @param mapConfig Mapping of <code>Volume</code> option 
<code>key-value</code> configuration pairs.
+   * @param volumes A list of <code>Volume</code> to append to.
+   * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
    */
   @VisibleForTesting
-  protected Pair<List<V1Volume>, List<V1VolumeMount>> 
createPersistentVolumeClaimVolumesAndMounts(
-      final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, 
String>> mapConfig) {
-    List<V1Volume> volumeList = new LinkedList<>();
-    List<V1VolumeMount> mountList = new LinkedList<>();
-    for (Map.Entry<String, 
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>> configs
+  protected void createVolumeAndMountsPersistentVolumeClaimCLI(
+      final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
mapConfig,
+      final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
+    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
configs
         : mapConfig.entrySet()) {
       final String volumeName = configs.getKey();
-      final String path = configs.getValue()
-          .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.path);
-      final String subPath = configs.getValue()
-          .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.subPath);
-
-      if (path == null || path.isEmpty()) {
-        throw new TopologySubmissionException(
-            String.format("A mount path is required and missing from '%s'", 
volumeName));
-      }
 
       // Do not create Volumes for `OnDemand`.
       final String claimName = configs.getValue()
-          .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName);
+          .get(KubernetesConstants.VolumeConfigKeys.claimName);
       if (claimName != null && 
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
-        final V1Volume volume = new V1VolumeBuilder()
-            .withName(volumeName)
-            .withNewPersistentVolumeClaim()
-              .withClaimName(claimName)
-            .endPersistentVolumeClaim()
-            .build();
-        volumeList.add(volume);
+        volumes.add(
+            new V1VolumeBuilder()
+                .withName(volumeName)
+                .withNewPersistentVolumeClaim()
+                  .withClaimName(claimName)
+                .endPersistentVolumeClaim()
+                .build()
+        );
       }
+      volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+    }
+  }
 
-      final V1VolumeMountBuilder volumeMount = new V1VolumeMountBuilder()
+  /**
+   * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for 
<code>emptyDir</code>s to be
+   * placed in the <code>Executor</code> and <code>Manager</code> from options 
on the CLI.
+   * @param mapOfOpts Mapping of <code>Volume</code> option 
<code>key-value</code> configuration pairs.
+   * @param volumes A list of <code>Volume</code> to append to.
+   * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+   */
+  @VisibleForTesting
+  protected void createVolumeAndMountsEmptyDirCLI(
+      final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
mapOfOpts,
+      final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
+    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
configs
+        : mapOfOpts.entrySet()) {
+      final String volumeName = configs.getKey();
+      final V1Volume volume = new V1VolumeBuilder()
           .withName(volumeName)
-          .withMountPath(path);
-      if (subPath != null && !subPath.isEmpty()) {
-        volumeMount.withSubPath(subPath);
+          .withNewEmptyDir()
+          .endEmptyDir()
+          .build();
+
+      for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config
+          : configs.getValue().entrySet()) {
+        switch(config.getKey()) {
+          case medium:
+            volume.getEmptyDir().medium(config.getValue());
+            break;
+          case sizeLimit:
+            volume.getEmptyDir().sizeLimit(new Quantity(config.getValue()));
+            break;
+          default:
+            break;
+        }
       }
-      mountList.add(volumeMount.build());
+      volumes.add(volume);
+      volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
     }
-    return new Pair<>(volumeList, mountList);
   }
 
   /**
-   * Makes a call to generate <code>Volumes</code> and <code>Volume 
Mounts</code> and then inserts them.
+   * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for 
<code>Host Path</code>s to be
+   * placed in the <code>Executor</code> and <code>Manager</code> from options 
on the CLI.
+   * @param mapOfOpts Mapping of <code>Volume</code> option 
<code>key-value</code> configuration pairs.
+   * @param volumes A list of <code>Volume</code> to append to.
+   * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+   */
+  @VisibleForTesting
+  protected void createVolumeAndMountsHostPathCLI(
+      final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
mapOfOpts,
+      final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
+    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
configs
+        : mapOfOpts.entrySet()) {
+      final String volumeName = configs.getKey();
+      final V1Volume volume = new V1VolumeBuilder()
+          .withName(volumeName)
+          .withNewHostPath()
+          .endHostPath()
+          .build();
+
+      for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config
+          : configs.getValue().entrySet()) {
+        switch(config.getKey()) {
+          case type:
+            volume.getHostPath().setType(config.getValue());
+            break;
+          case pathOnHost:
+            volume.getHostPath().setPath(config.getValue());
+            break;
+          default:
+            break;
+        }
+      }
+      volumes.add(volume);
+      volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+    }
+  }
+
+  /**
+   * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for 
<code>NFS</code>s to be
+   * placed in the <code>Executor</code> and <code>Manager</code> from options 
on the CLI.
+   * @param mapOfOpts Mapping of <code>Volume</code> option 
<code>key-value</code> configuration pairs.
+   * @param volumes A list of <code>Volume</code> to append to.
+   * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+   */
+  @VisibleForTesting
+  protected void createVolumeAndMountsNFSCLI(
+      final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
mapOfOpts,
+      final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
+    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
configs
+        : mapOfOpts.entrySet()) {
+      final String volumeName = configs.getKey();
+      final V1Volume volume = new V1VolumeBuilder()
+          .withName(volumeName)
+          .withNewNfs()
+          .endNfs()
+          .build();
+
+      for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config
+          : configs.getValue().entrySet()) {
+        switch(config.getKey()) {
+          case server:
+            volume.getNfs().setServer(config.getValue());
+            break;
+          case pathOnNFS:
+            volume.getNfs().setPath(config.getValue());
+            break;
+          case readOnly:
+            
volume.getNfs().setReadOnly(Boolean.parseBoolean(config.getValue()));
+            break;
+          default:
+            break;
+        }
+      }
+      volumes.add(volume);
+      volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+    }
+  }
+
+  /**
+   * Configures the Pod Spec and Heron container with <code>Volumes</code> and 
<code>Volume Mounts</code>.
    * @param podSpec All generated <code>V1Volume</code> will be placed in the 
<code>Pod Spec</code>.
    * @param executor All generated <code>V1VolumeMount</code> will be placed 
in the <code>Container</code>.
-   * @param configPVC <code>Persistent Volume Claim</code> configurations 
options.
+   * @param volumes <code>Volumes</code> to be inserted in the Pod Spec.
+   * @param volumeMounts <code>Volumes Mounts</code> to be inserted in the 
Heron container.
    */
   @VisibleForTesting
-  protected void configurePodWithPersistentVolumeClaimVolumesAndMounts(final 
V1PodSpec podSpec,
-      final V1Container executor,
-      final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, 
String>> configPVC) {
-    Pair<List<V1Volume>, List<V1VolumeMount>> volumesAndMounts =
-        createPersistentVolumeClaimVolumesAndMounts(configPVC);
+  protected void configurePodWithVolumesAndMountsFromCLI(final V1PodSpec 
podSpec,
+      final V1Container executor, List<V1Volume> volumes, List<V1VolumeMount> 
volumeMounts) {
 
     // Deduplicate on Names with Persistent Volume Claims taking precedence.
 
-    KubernetesUtils.V1ControllerUtils<V1VolumeMount> utilsMounts =
-        new KubernetesUtils.V1ControllerUtils<>();
-    executor.setVolumeMounts(
-        utilsMounts.mergeListsDedupe(volumesAndMounts.second, 
executor.getVolumeMounts(),
-            Comparator.comparing(V1VolumeMount::getName),
-            "Executor and Persistent Volume Claim Volume Mounts"));
-
     KubernetesUtils.V1ControllerUtils<V1Volume> utilsVolumes =
         new KubernetesUtils.V1ControllerUtils<>();
     podSpec.setVolumes(
-        utilsVolumes.mergeListsDedupe(volumesAndMounts.first, 
podSpec.getVolumes(),
+        utilsVolumes.mergeListsDedupe(volumes, podSpec.getVolumes(),
             Comparator.comparing(V1Volume::getName),
-            "Pod and Persistent Volume Claim Volumes"));
+            "Pod with Volumes"));
+
+    KubernetesUtils.V1ControllerUtils<V1VolumeMount> utilsMounts =
+        new KubernetesUtils.V1ControllerUtils<>();
+    executor.setVolumeMounts(
+        utilsMounts.mergeListsDedupe(volumeMounts, executor.getVolumeMounts(),
+            Comparator.comparing(V1VolumeMount::getName),
+            "Heron container with Volume Mounts"));
   }
 
   /**
@@ -1186,12 +1323,11 @@ public class V1Controller extends KubernetesController {
    *     onDemand: <code>true</code>
    */
   private void removePersistentVolumeClaims() {
-    final String topologyName = getTopologyName();
+    final String name = getTopologyName();
     final StringBuilder selectorLabel = new StringBuilder();
 
     // Generate selector label.
-    for (Map.Entry<String, String> label
-        : getPersistentVolumeClaimLabels(topologyName).entrySet()) {
+    for (Map.Entry<String, String> label : 
getPersistentVolumeClaimLabels(name).entrySet()) {
       if (selectorLabel.length() != 0) {
         selectorLabel.append(",");
       }
@@ -1218,11 +1354,11 @@ public class V1Controller extends KubernetesController {
 
       LOG.log(Level.INFO,
           String.format("Removing automatically generated Persistent Volume 
Claims for `%s`:%n%s",
-          topologyName, status.getMessage()));
+          name, status.getMessage()));
     } catch (ApiException e) {
       final String message = String.format("Failed to connect to K8s cluster 
to delete Persistent "
               + "Volume Claims for topology `%s`. A manual clean-up is 
required.%n%s",
-          topologyName, e.getMessage());
+          name, e.getMessage());
       LOG.log(Level.WARNING, message);
       throw new TopologyRuntimeManagementException(message);
     }
diff --git 
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
 
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
index 988456f..da1b21d 100644
--- 
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
+++ 
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
@@ -20,10 +20,13 @@
 package org.apache.heron.scheduler.kubernetes;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.ImmutableMap;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -32,7 +35,7 @@ import org.apache.heron.scheduler.TopologySubmissionException;
 import org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple;
 import org.apache.heron.spi.common.Config;
 
-import static 
org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeClaimTemplateConfigKeys;
+import static 
org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeConfigKeys;
 
 public class KubernetesContextTest {
 
@@ -82,41 +85,43 @@ public class KubernetesContextTest {
   }
 
   @Test
-  public void testPersistentVolumeClaimDisabled() {
-    
Assert.assertFalse(KubernetesContext.getPersistentVolumeClaimDisabled(config));
+  public void testVolumesFromCLIDisabled() {
+    Assert.assertFalse(KubernetesContext.getVolumesFromCLIDisabled(config));
     Assert.assertFalse(KubernetesContext
-        .getPersistentVolumeClaimDisabled(configWithPodTemplateConfigMap));
+        .getVolumesFromCLIDisabled(configWithPodTemplateConfigMap));
 
     final Config configWithPodTemplateConfigMapOff = Config.newBuilder()
         .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_LOCATION,
             POD_TEMPLATE_CONFIGMAP_NAME)
-        
.put(KubernetesContext.KUBERNETES_PERSISTENT_VOLUME_CLAIMS_CLI_DISABLED, "TRUE")
+        .put(KubernetesContext.KUBERNETES_VOLUME_FROM_CLI_DISABLED, "TRUE")
         .build();
     Assert.assertTrue(KubernetesContext
-        .getPersistentVolumeClaimDisabled(configWithPodTemplateConfigMapOff));
+        .getVolumesFromCLIDisabled(configWithPodTemplateConfigMapOff));
   }
 
-  @Test
-  public void testGetVolumeClaimTemplates() {
+  /**
+   * Generate <code>Volume</code> Configs for testing.
+   * @param testCases Test case container.
+   *                  Input: [0] Config, [1] Boolean to indicate 
Manager/Executor.
+   *                  Output: [0] expectedKeys, [1] expectedOptionsKeys, [2] 
expectedOptionsValues.
+   * @param prefix Configuration prefix key to use in lookup.
+   */
+  private void createVolumeConfigs(List<TestTuple<Pair<Config, Boolean>, 
Object[]>> testCases,
+                                   String prefix) {
+    final String keyPattern = prefix + "%%s.%%s";
+    final String keyExecutor = String.format(keyPattern, 
KubernetesConstants.EXECUTOR_NAME);
+    final String keyManager = String.format(keyPattern, 
KubernetesConstants.MANAGER_NAME);
+
     final String volumeNameOne = "volume-name-one";
     final String volumeNameTwo = "volume-name-two";
     final String claimName = "OnDeMaNd";
-    final String keyPattern = KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX 
+ "%%s.%%s";
-    final String keyExecutor = String.format(keyPattern, 
KubernetesConstants.EXECUTOR_NAME);
-    final String keyManager = String.format(keyPattern, 
KubernetesConstants.MANAGER_NAME);
 
-    final String storageClassField = 
VolumeClaimTemplateConfigKeys.storageClassName.name();
-    final String pathField = VolumeClaimTemplateConfigKeys.path.name();
-    final String claimNameField = 
VolumeClaimTemplateConfigKeys.claimName.name();
+    final String storageClassField = VolumeConfigKeys.storageClassName.name();
+    final String pathField = VolumeConfigKeys.path.name();
+    final String claimNameField = VolumeConfigKeys.claimName.name();
     final String expectedStorageClass = "expected-storage-class";
     final String expectedPath = "/path/for/volume/expected";
 
-
-    // Test case container.
-    // Input: Config to extract options from, Boolean to indicate 
Manager/Executor.
-    // Output: [0] expectedKeys, [1] expectedOptionsKeys, [2] 
expectedOptionsValues.
-    final List<TestTuple<Pair<Config, Boolean>, Object[]>> testCases = new 
LinkedList<>();
-
     // Create test cases for Executor/Manager on even/odd indices respectively.
     for (int idx = 0; idx < 2; ++idx) {
 
@@ -149,28 +154,54 @@ public class KubernetesContextTest {
           .build();
 
       final List<String> expectedKeys = Arrays.asList(volumeNameOne, 
volumeNameTwo);
-      final List<VolumeClaimTemplateConfigKeys> expectedOptionsKeys =
-          Arrays.asList(VolumeClaimTemplateConfigKeys.path,
-              VolumeClaimTemplateConfigKeys.storageClassName,
-              VolumeClaimTemplateConfigKeys.claimName);
+      final List<VolumeConfigKeys> expectedOptionsKeys =
+          Arrays.asList(VolumeConfigKeys.path,
+              VolumeConfigKeys.storageClassName,
+              VolumeConfigKeys.claimName);
       final List<String> expectedOptionsValues =
           Arrays.asList(expectedPath, expectedStorageClass, claimName);
 
       testCases.add(new TestTuple<>(description,
           new Pair<>(configPVC, isExecutor),
           new Object[]{expectedKeys, expectedOptionsKeys, 
expectedOptionsValues}));
+
+      final Config configPVCDisabled = Config.newBuilder()
+          .put(KubernetesContext.KUBERNETES_VOLUME_FROM_CLI_DISABLED, "true")
+          .put(pathKeyOne, expectedPath)
+          .put(pathKeyTwo, expectedPath)
+          .put(claimNameKeyOne, claimName)
+          .put(claimNameKeyTwo, claimName)
+          .put(storageClassKeyOne, expectedStorageClass)
+          .put(storageClassKeyTwo, expectedStorageClass)
+          .build();
+
+      testCases.add(new TestTuple<>(description + " Disabled should not error",
+          new Pair<>(configPVCDisabled, !isExecutor),
+          new Object[]{new LinkedList<String>(), new 
LinkedList<VolumeConfigKeys>(),
+              new LinkedList<String>()}));
     }
+  }
+
+  @Test
+  public void testGetVolumeConfigs() {
+    final String prefix = KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX;
+
+    // Test case container.
+    // Input: [0] Config, [1] Boolean to indicate Manager/Executor.
+    // Output: [0] expectedKeys, [1] expectedOptionsKeys, [2] 
expectedOptionsValues.
+    final List<TestTuple<Pair<Config, Boolean>, Object[]>> testCases = new 
LinkedList<>();
+    createVolumeConfigs(testCases, prefix);
 
     // Test loop.
     for (TestTuple<Pair<Config, Boolean>, Object[]> testCase : testCases) {
-      final Map<String, Map<VolumeClaimTemplateConfigKeys, String>> mapOfPVC =
-          KubernetesContext.getVolumeClaimTemplates(testCase.input.first, 
testCase.input.second);
+      final Map<String, Map<VolumeConfigKeys, String>> mapOfPVC =
+          KubernetesContext.getVolumeConfigs(testCase.input.first, prefix, 
testCase.input.second);
 
       Assert.assertTrue(testCase.description + ": Contains all provided 
Volumes",
           mapOfPVC.keySet().containsAll((List<String>) testCase.expected[0]));
-      for (Map<VolumeClaimTemplateConfigKeys, String> items : 
mapOfPVC.values()) {
+      for (Map<VolumeConfigKeys, String> items : mapOfPVC.values()) {
         Assert.assertTrue(testCase.description + ": Contains all provided 
option keys",
-            items.keySet().containsAll((List<VolumeClaimTemplateConfigKeys>) 
testCase.expected[1]));
+            items.keySet().containsAll((List<VolumeConfigKeys>) 
testCase.expected[1]));
         Assert.assertTrue(testCase.description + ": Contains all provided 
option values",
             items.values().containsAll((List<String>) testCase.expected[2]));
       }
@@ -179,36 +210,42 @@ public class KubernetesContextTest {
     // Empty PVC.
     final Boolean[] emptyPVCTestCases = new Boolean[] {true, false};
     for (boolean testCase : emptyPVCTestCases) {
-      final Map<String, Map<VolumeClaimTemplateConfigKeys, String>> emptyPVC =
+      final Map<String, Map<VolumeConfigKeys, String>> emptyPVC =
           
KubernetesContext.getVolumeClaimTemplates(Config.newBuilder().build(), 
testCase);
       Assert.assertTrue("Empty PVC is returned when no options provided", 
emptyPVC.isEmpty());
     }
   }
 
   @Test
-  public void testGetPersistentVolumeClaimsErrors() {
+  public void testGetVolumeConfigsErrors() {
+    final String prefix = KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX;
     final String volumeNameValid = "volume-name-valid";
     final String volumeNameInvalid = "volume-Name-Invalid";
+    final String passingValue = "should-pass";
     final String failureValue = "Should-Fail";
-    final String generalFailureMessage = "Invalid Persistent Volume";
+    final String generalFailureMessage = "Invalid Volume configuration";
     final String keyPattern = 
String.format(KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX
         + "%%s.%%s", KubernetesConstants.EXECUTOR_NAME);
     final List<TestTuple<Config, String>> testCases = new LinkedList<>();
 
     // Invalid option key test.
     final Config configInvalidOption = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "claimName"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "storageClassName"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "sizeLimit"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "accessModes"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "volumeMode"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "server"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "type"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "medium"), 
passingValue)
         .put(String.format(keyPattern, volumeNameValid, "NonExistentKey"), 
failureValue)
         .build();
     testCases.add(new TestTuple<>("Invalid option key should trigger 
exception",
         configInvalidOption, generalFailureMessage));
 
-    // Just the prefix.
-    final Config configJustPrefix = Config.newBuilder()
-        .put(KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX, failureValue)
-        .build();
-    testCases.add(new TestTuple<>("Only a key prefix should trigger exception",
-        configJustPrefix, generalFailureMessage));
-
     // Invalid Volume Name.
     final Config configInvalidVolumeName = Config.newBuilder()
         .put(String.format(keyPattern, volumeNameInvalid, "path"), 
failureValue)
@@ -216,31 +253,729 @@ public class KubernetesContextTest {
     testCases.add(new TestTuple<>("Invalid Volume Name should trigger 
exception",
         configInvalidVolumeName, "lowercase RFC-1123"));
 
+    // Required Path.
+    final Config configRequiredPath = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "claimName"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "storageClassName"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "sizeLimit"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "accessModes"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "volumeMode"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "server"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "type"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "medium"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>("Missing path should trigger exception",
+        configRequiredPath, "All Volumes require a 'path'."));
+
+    // Disabled.
+    final Config configDisabled = Config.newBuilder()
+        .put(KubernetesContext.KUBERNETES_VOLUME_FROM_CLI_DISABLED, "true")
+        .put(String.format(keyPattern, volumeNameValid, "claimName"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "storageClassName"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "sizeLimit"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "accessModes"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "volumeMode"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "server"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "type"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "medium"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>("Disabled functionality should trigger 
exception",
+        configDisabled, "Configuring Volumes from the CLI is disabled."));
+
+    // Testing loop.
+    for (TestTuple<Config, String> testCase : testCases) {
+      String message = "";
+      try {
+        KubernetesContext.getVolumeConfigs(testCase.input, prefix, true);
+      } catch (TopologySubmissionException e) {
+        message = e.getMessage();
+      }
+      Assert.assertTrue(testCase.description, 
message.contains(testCase.expected));
+    }
+  }
+
+  /**
+   * Create test cases for <code>Volume Claim Templates</code>.
+   * @param testCases Test case container.
+   *                  Input: [0] Config, [1] Boolean to indicate 
Manager/Executor.
+   *                  Output: <code>Map<String, Map<VolumeConfigKeys, 
String></code>
+   * @param isExecutor Boolean to indicate Manager/Executor test case 
generation.
+   */
+  private void createVolumeClaimTemplates(
+      List<TestTuple<Pair<Config, Boolean>, Map<String, Map<VolumeConfigKeys, 
String>>>> testCases,
+      boolean isExecutor) {
+    final String volumeNameValid = "volume-name-valid";
+    final String passingValue = "should-pass";
+    final String processName = isExecutor ? KubernetesConstants.EXECUTOR_NAME
+        : KubernetesConstants.MANAGER_NAME;
+    final String keyPattern = 
String.format(KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX
+        + "%%s.%%s", processName);
+
+    // With Storage Class Name.
+    final Map<String, Map<VolumeConfigKeys, String>> 
expectedWithStorageClassName =
+        ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys, 
String>() {
+          {
+            put(VolumeConfigKeys.claimName, passingValue);
+            put(VolumeConfigKeys.storageClassName, passingValue);
+            put(VolumeConfigKeys.sizeLimit, passingValue);
+            put(VolumeConfigKeys.accessModes, passingValue);
+            put(VolumeConfigKeys.volumeMode, passingValue);
+            put(VolumeConfigKeys.path, passingValue);
+            put(VolumeConfigKeys.subPath, passingValue);
+            put(VolumeConfigKeys.readOnly, passingValue);
+          }
+        });
+    final Config configWithStorageClass = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "claimName"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "storageClassName"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "sizeLimit"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "accessModes"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "volumeMode"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": PVC with Storage Class 
name",
+        new Pair<>(configWithStorageClass, isExecutor), 
expectedWithStorageClassName));
+
+    // Without Storage Class Name.
+    final Map<String, Map<VolumeConfigKeys, String>> 
expectedWithoutStorageClassName =
+        ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys, 
String>() {
+          {
+            put(VolumeConfigKeys.claimName, passingValue);
+            put(VolumeConfigKeys.sizeLimit, passingValue);
+            put(VolumeConfigKeys.accessModes, passingValue);
+            put(VolumeConfigKeys.volumeMode, passingValue);
+            put(VolumeConfigKeys.path, passingValue);
+            put(VolumeConfigKeys.subPath, passingValue);
+            put(VolumeConfigKeys.readOnly, passingValue);
+          }
+        });
+    final Config configWithoutStorageClass = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "claimName"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "sizeLimit"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "accessModes"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "volumeMode"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": PVC with Storage Class 
name",
+        new Pair<>(configWithoutStorageClass, isExecutor), 
expectedWithoutStorageClassName));
+
+    // Ignored.
+    final Config configIgnored = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "claimName"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "storageClassName"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "sizeLimit"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "accessModes"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "volumeMode"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": PVC with ignored keys",
+        new Pair<>(configIgnored, !isExecutor), new HashMap<>()));
+  }
+
+  @Test
+  public void testGetVolumeClaimTemplates() {
+    final List<TestTuple<Pair<Config, Boolean>, Map<String, 
Map<VolumeConfigKeys, String>>>>
+        testCases = new LinkedList<>();
+    createVolumeClaimTemplates(testCases, true);
+    createVolumeClaimTemplates(testCases, false);
+
+    // Testing loop.
+    for (TestTuple<Pair<Config, Boolean>, Map<String, Map<VolumeConfigKeys, 
String>>> testCase
+        : testCases) {
+      Map<String, Map<VolumeConfigKeys, String>> actual =
+          KubernetesContext.getVolumeClaimTemplates(testCase.input.first, 
testCase.input.second);
+      Assert.assertEquals(testCase.description, testCase.expected, actual);
+    }
+  }
+
+  /**
+   * Create test cases for <code>Volume Claim Templates</code> errors.
+   * @param testCases Test case container.
+   *                  Input: [0] Config, [1] Boolean to indicate 
Manager/Executor.
+   *                  Output: Error message
+   * @param isExecutor Boolean to indicate Manager/Executor test case 
generation.
+   */
+  private void createVolumeClaimTemplatesErrors(
+      List<TestTuple<Pair<Config, Boolean>, String>> testCases, boolean 
isExecutor) {
+    final String volumeNameValid = "volume-name-valid";
+    final String passingValue = "should-pass";
+    final String failureValue = "Should-Fail";
+    final String processName = isExecutor ? KubernetesConstants.EXECUTOR_NAME
+        : KubernetesConstants.MANAGER_NAME;
+    final String keyPattern = 
String.format(KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX
+        + "%%s.%%s", processName);
+
+    // Required Claim Name.
+    final Config configRequiredClaimName = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": Missing Claim Name should 
trigger exception",
+        new Pair<>(configRequiredClaimName, isExecutor), "require a 
`claimName`"));
+
     // Invalid Claim Name.
     final Config configInvalidClaimName = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
         .put(String.format(keyPattern, volumeNameValid, "claimName"), 
failureValue)
         .build();
-    testCases.add(new TestTuple<>("Invalid Claim Name should trigger 
exception",
-        configInvalidClaimName, "Option `claimName`"));
+    testCases.add(new TestTuple<>(processName + ": Invalid Claim Name should 
trigger exception",
+        new Pair<>(configInvalidClaimName, isExecutor),
+        String.format("Volume `%s`: `claimName`", volumeNameValid)));
 
     // Invalid Storage Class Name.
     final Config configInvalidStorageClassName = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "claimName"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
         .put(String.format(keyPattern, volumeNameValid, "storageClassName"), 
failureValue)
         .build();
-    testCases.add(new TestTuple<>("Invalid Storage Class Name should trigger 
exception",
-        configInvalidStorageClassName, "Option `storageClassName`"));
+    testCases.add(new TestTuple<>(processName
+        + ": Invalid Storage Class Name should trigger exception",
+        new Pair<>(configInvalidStorageClassName, isExecutor),
+        String.format("Volume `%s`: `storageClassName`", volumeNameValid)));
+
+    // Invalid Storage Class Name.
+    final Config configInvalidOption = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "claimName"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "storageClassName"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "sizeLimit"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "accessModes"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "volumeMode"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "server"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": Invalid option should 
trigger exception",
+        new Pair<>(configInvalidOption, isExecutor),
+        String.format("Volume `%s`: Invalid Persistent", volumeNameValid)));
+  }
+
+  @Test
+  public void testGetVolumeClaimTemplatesErrors() {
+    final List<TestTuple<Pair<Config, Boolean>, String>> testCases = new 
LinkedList<>();
+    createVolumeClaimTemplatesErrors(testCases, true);
+    createVolumeClaimTemplatesErrors(testCases, false);
 
     // Testing loop.
-    final Boolean[] executorFlags = new Boolean[] {true, false};
-    for (TestTuple<Config, String> testCase : testCases) {
-      // Test for both Executor and Manager.
-      for (boolean isExecutor : executorFlags) {
-        try {
-          KubernetesContext.getVolumeClaimTemplates(testCase.input, 
isExecutor);
-        } catch (TopologySubmissionException e) {
-          Assert.assertTrue(testCase.description, 
e.getMessage().contains(testCase.expected));
-        }
+    for (TestTuple<Pair<Config, Boolean>, String> testCase : testCases) {
+      String message = "";
+      try {
+        KubernetesContext.getVolumeClaimTemplates(testCase.input.first, 
testCase.input.second);
+      } catch (TopologySubmissionException e) {
+        message = e.getMessage();
+      }
+      Assert.assertTrue(testCase.description, 
message.contains(testCase.expected));
+    }
+  }
+
+  /**
+   * Create test cases for <code>Empty Directory</code>.
+   * @param testCases Test case container.
+   *                  Input: [0] Config, [1] Boolean to indicate 
Manager/Executor.
+   *                  Output: <code>Map<String, Map<VolumeConfigKeys, 
String></code>
+   * @param isExecutor Boolean to indicate Manager/Executor test case 
generation.
+   */
+  private void createVolumeEmptyDir(
+      List<TestTuple<Pair<Config, Boolean>, Map<String, Map<VolumeConfigKeys, 
String>>>> testCases,
+      boolean isExecutor) {
+    final String volumeNameValid = "volume-name-valid";
+    final String passingValue = "should-pass";
+    final String processName = isExecutor ? KubernetesConstants.EXECUTOR_NAME
+        : KubernetesConstants.MANAGER_NAME;
+    final String keyPattern = 
String.format(KubernetesContext.KUBERNETES_VOLUME_EMPTYDIR_PREFIX
+        + "%%s.%%s", processName);
+
+    // With Medium.
+    final Map<String, Map<VolumeConfigKeys, String>> expectedWithMedium =
+        ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys, 
String>() {
+          {
+            put(VolumeConfigKeys.sizeLimit, passingValue);
+            put(VolumeConfigKeys.medium, "Memory");
+            put(VolumeConfigKeys.path, passingValue);
+            put(VolumeConfigKeys.subPath, passingValue);
+            put(VolumeConfigKeys.readOnly, passingValue);
+          }
+        });
+    final Config configWithMedium = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "sizeLimit"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "medium"), "Memory")
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": `emptyDir` with `medium`",
+        new Pair<>(configWithMedium, isExecutor), expectedWithMedium));
+
+    // With empty Medium.
+    final Map<String, Map<VolumeConfigKeys, String>> expectedEmptyMedium =
+        ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys, 
String>() {
+          {
+            put(VolumeConfigKeys.sizeLimit, passingValue);
+            put(VolumeConfigKeys.medium, "");
+            put(VolumeConfigKeys.path, passingValue);
+            put(VolumeConfigKeys.subPath, passingValue);
+            put(VolumeConfigKeys.readOnly, passingValue);
+          }
+        });
+    final Config configEmptyMedium = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "sizeLimit"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "medium"), "")
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": `emptyDir` with empty 
`medium`",
+        new Pair<>(configEmptyMedium, isExecutor), expectedEmptyMedium));
+
+    // Without Medium.
+    final Map<String, Map<VolumeConfigKeys, String>> expectedNoMedium =
+        ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys, 
String>() {
+          {
+            put(VolumeConfigKeys.sizeLimit, passingValue);
+            put(VolumeConfigKeys.path, passingValue);
+            put(VolumeConfigKeys.subPath, passingValue);
+            put(VolumeConfigKeys.readOnly, passingValue);
+          }
+        });
+    final Config configNoMedium = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "sizeLimit"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": `emptyDir` without 
`medium`",
+        new Pair<>(configNoMedium, isExecutor), expectedNoMedium));
+
+    // Ignored.
+    final Config configIgnored = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "sizeLimit"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "medium"), "")
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": `emptyDir` ignored",
+        new Pair<>(configIgnored, !isExecutor), new HashMap<>()));
+  }
+
+  @Test
+  public void testGetVolumeEmptyDir() {
+    final List<TestTuple<Pair<Config, Boolean>, Map<String, 
Map<VolumeConfigKeys, String>>>>
+        testCases = new LinkedList<>();
+    createVolumeEmptyDir(testCases, true);
+    createVolumeEmptyDir(testCases, false);
+
+    // Testing loop.
+    for (TestTuple<Pair<Config, Boolean>, Map<String, Map<VolumeConfigKeys, 
String>>> testCase
+        : testCases) {
+      Map<String, Map<VolumeConfigKeys, String>> actual =
+          KubernetesContext.getVolumeEmptyDir(testCase.input.first, 
testCase.input.second);
+      Assert.assertEquals(testCase.description, testCase.expected, actual);
+    }
+  }
+
+  /**
+   * Create test cases for <code>Empty Directory</code> errors.
+   * @param testCases Test case container.
+   *                  Input: [0] Config, [1] Boolean to indicate 
Manager/Executor.
+   *                  Output: Error message
+   * @param isExecutor Boolean to indicate Manager/Executor test case 
generation.
+   */
+  private void createVolumeEmptyDirError(
+      List<TestTuple<Pair<Config, Boolean>, String>> testCases, boolean 
isExecutor) {
+    final String volumeNameValid = "volume-name-valid";
+    final String passingValue = "should-pass";
+    final String failureValue = "Should-Fail";
+    final String processName = isExecutor ? KubernetesConstants.EXECUTOR_NAME
+        : KubernetesConstants.MANAGER_NAME;
+    final String keyPattern = 
String.format(KubernetesContext.KUBERNETES_VOLUME_EMPTYDIR_PREFIX
+        + "%%s.%%s", processName);
+
+    // Medium is invalid.
+    final Config configInvalidMedium = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "sizeLimit"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "medium"), 
failureValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": Invalid 'medium' should 
trigger exception",
+        new Pair<>(configInvalidMedium, isExecutor), "must be 'Memory' or 
empty."));
+
+    // Invalid option.
+    final Config configInvalidOption = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "sizeLimit"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "medium"), "Memory")
+        .put(String.format(keyPattern, volumeNameValid, "accessModes"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": Invalid option should 
trigger exception",
+        new Pair<>(configInvalidOption, isExecutor), "Directory type option"));
+  }
+
+  @Test
+  public void testGetVolumeEmptyDirErrors() {
+    final List<TestTuple<Pair<Config, Boolean>, String>> testCases = new 
LinkedList<>();
+    createVolumeEmptyDirError(testCases, true);
+    createVolumeEmptyDirError(testCases, false);
+
+    // Testing loop.
+    for (TestTuple<Pair<Config, Boolean>, String> testCase : testCases) {
+      String message = "";
+      try {
+        KubernetesContext.getVolumeEmptyDir(testCase.input.first, 
testCase.input.second);
+      } catch (TopologySubmissionException e) {
+        message = e.getMessage();
+      }
+      Assert.assertTrue(testCase.description, 
message.contains(testCase.expected));
+    }
+  }
+
+  /**
+   * Create test cases for <code>Host Path</code>.
+   * @param testCases Test case container.
+   *                  Input: [0] Config, [1] Boolean to indicate 
Manager/Executor.
+   *                  Output: <code>Map<String, Map<VolumeConfigKeys, 
String></code>
+   * @param isExecutor Boolean to indicate Manager/Executor test case 
generation.
+   */
+  private void createVolumeHostPath(
+      List<TestTuple<Pair<Config, Boolean>, Map<String, Map<VolumeConfigKeys, 
String>>>> testCases,
+      boolean isExecutor) {
+    final String volumeNameValid = "volume-name-valid";
+    final String passingValue = "should-pass";
+    final String processName = isExecutor ? KubernetesConstants.EXECUTOR_NAME
+        : KubernetesConstants.MANAGER_NAME;
+    final String keyPattern = 
String.format(KubernetesContext.KUBERNETES_VOLUME_HOSTPATH_PREFIX
+        + "%%s.%%s", processName);
+
+    // With type.
+    final Map<String, Map<VolumeConfigKeys, String>> expectedWithType =
+        ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys, 
String>() {
+          {
+            put(VolumeConfigKeys.type, "DirectoryOrCreate");
+            put(VolumeConfigKeys.path, passingValue);
+            put(VolumeConfigKeys.pathOnHost, passingValue);
+            put(VolumeConfigKeys.subPath, passingValue);
+            put(VolumeConfigKeys.readOnly, passingValue);
+          }
+        });
+    final Config configWithType = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "type"), 
"DirectoryOrCreate")
+        .put(String.format(keyPattern, volumeNameValid, "pathOnHost"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": 'hostPath' with 'type'",
+        new Pair<>(configWithType, isExecutor), expectedWithType));
+
+    // Without type.
+    final Map<String, Map<VolumeConfigKeys, String>> expectedWithoutType =
+        ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys, 
String>() {
+          {
+            put(VolumeConfigKeys.pathOnHost, passingValue);
+            put(VolumeConfigKeys.path, passingValue);
+            put(VolumeConfigKeys.subPath, passingValue);
+            put(VolumeConfigKeys.readOnly, passingValue);
+          }
+        });
+    final Config configWithoutType = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "pathOnHost"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": 'hostPath' without 'type'",
+        new Pair<>(configWithoutType, isExecutor), expectedWithoutType));
+
+    // Ignored.
+    final Config configIgnored = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "type"), "BlockDevice")
+        .put(String.format(keyPattern, volumeNameValid, "pathOnHost"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": 'hostPath' ignored",
+        new Pair<>(configIgnored, !isExecutor), new HashMap<>()));
+  }
+
+  @Test
+  public void testGetVolumeHostPath() {
+    final List<TestTuple<Pair<Config, Boolean>, Map<String, 
Map<VolumeConfigKeys, String>>>>
+        testCases = new LinkedList<>();
+    createVolumeHostPath(testCases, true);
+    createVolumeHostPath(testCases, false);
+
+    // Testing loop.
+    for (TestTuple<Pair<Config, Boolean>, Map<String, Map<VolumeConfigKeys, 
String>>> testCase
+        : testCases) {
+      Map<String, Map<VolumeConfigKeys, String>> actual =
+          KubernetesContext.getVolumeHostPath(testCase.input.first, 
testCase.input.second);
+      Assert.assertEquals(testCase.description, testCase.expected, actual);
+    }
+  }
+
+  /**
+   * Create test cases for <code>Host Path</code> errors.
+   * @param testCases Test case container.
+   *                  Input: [0] Config, [1] Boolean to indicate 
Manager/Executor.
+   *                  Output: Error message
+   * @param isExecutor Boolean to indicate Manager/Executor test case 
generation.
+   */
+  private void createVolumeHostPathError(
+      List<TestTuple<Pair<Config, Boolean>, String>> testCases, boolean 
isExecutor) {
+    final String volumeNameValid = "volume-name-valid";
+    final String passingValue = "should-pass";
+    final String failureValue = "Should-Fail";
+    final String processName = isExecutor ? KubernetesConstants.EXECUTOR_NAME
+        : KubernetesConstants.MANAGER_NAME;
+    final String keyPattern = 
String.format(KubernetesContext.KUBERNETES_VOLUME_HOSTPATH_PREFIX
+        + "%%s.%%s", processName);
+
+    // Type is invalid.
+    final Config configInvalidMedium = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "type"), failureValue)
+        .put(String.format(keyPattern, volumeNameValid, "pathOnHost"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": Invalid 'type' should 
trigger exception",
+        new Pair<>(configInvalidMedium, isExecutor), "Host Path 'type' of"));
+
+    // Path on Host is missing.
+    final Config configNoHostOnPath = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "type"), "BlockDevice")
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": No 'hostOnPath' should 
trigger exception",
+        new Pair<>(configNoHostOnPath, isExecutor), "requires a path on the 
host"));
+
+    // Path on Host is empty.
+    final Config configEmptyHostOnPath = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "type"), "BlockDevice")
+        .put(String.format(keyPattern, volumeNameValid, "pathOnHost"), "")
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": Empty 'hostOnPath' should 
trigger exception",
+        new Pair<>(configEmptyHostOnPath, isExecutor), "requires a path on the 
host"));
+
+    // Invalid option.
+    final Config configInvalidOption = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "type"), "BlockDevice")
+        .put(String.format(keyPattern, volumeNameValid, "pathOnHost"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "accessModes"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": Invalid option should 
trigger exception",
+        new Pair<>(configInvalidOption, isExecutor), "Invalid Host Path option 
for"));
+  }
+
+  @Test
+  public void testGetVolumeHostPathErrors() {
+    final List<TestTuple<Pair<Config, Boolean>, String>> testCases = new 
LinkedList<>();
+    createVolumeHostPathError(testCases, true);
+    createVolumeHostPathError(testCases, false);
+
+    // Testing loop.
+    for (TestTuple<Pair<Config, Boolean>, String> testCase : testCases) {
+      String message = "";
+      try {
+        KubernetesContext.getVolumeHostPath(testCase.input.first, 
testCase.input.second);
+      } catch (TopologySubmissionException e) {
+        message = e.getMessage();
+      }
+      Assert.assertTrue(testCase.description, 
message.contains(testCase.expected));
+    }
+  }
+
+  /**
+   * Create test cases for <code>NFS</code>.
+   * @param testCases Test case container.
+   *                  Input: [0] Config, [1] Boolean to indicate 
Manager/Executor.
+   *                  Output: <code>Map<String, Map<VolumeConfigKeys, 
String></code>
+   * @param isExecutor Boolean to indicate Manager/Executor test case 
generation.
+   */
+  private void createVolumeNFS(
+      List<TestTuple<Pair<Config, Boolean>, Map<String, Map<VolumeConfigKeys, 
String>>>> testCases,
+      boolean isExecutor) {
+    final String volumeNameValid = "volume-name-valid";
+    final String passingValue = "should-pass";
+    final String processName = isExecutor ? KubernetesConstants.EXECUTOR_NAME
+        : KubernetesConstants.MANAGER_NAME;
+    final String keyPattern = 
String.format(KubernetesContext.KUBERNETES_VOLUME_NFS_PREFIX
+        + "%%s.%%s", processName);
+
+    // With readOnly.
+    final Map<String, Map<VolumeConfigKeys, String>> expectedWithReadOnly =
+        ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys, 
String>() {
+          {
+            put(VolumeConfigKeys.server, "nfs-server.default.local");
+            put(VolumeConfigKeys.readOnly, "true");
+            put(VolumeConfigKeys.pathOnNFS, passingValue);
+            put(VolumeConfigKeys.path, passingValue);
+            put(VolumeConfigKeys.subPath, passingValue);
+            put(VolumeConfigKeys.readOnly, passingValue);
+          }
+        });
+    final Config configWithReadOnly = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "server"), 
"nfs-server.default.local")
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), "true")
+        .put(String.format(keyPattern, volumeNameValid, "pathOnNFS"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": `NFS` with `readOnly`",
+        new Pair<>(configWithReadOnly, isExecutor), expectedWithReadOnly));
+
+    // With readOnly.
+    final Map<String, Map<VolumeConfigKeys, String>> expectedWithoutReadOnly =
+        ImmutableMap.of(volumeNameValid, new HashMap<VolumeConfigKeys, 
String>() {
+          {
+            put(VolumeConfigKeys.server, "nfs-server.default.local");
+            put(VolumeConfigKeys.pathOnNFS, passingValue);
+            put(VolumeConfigKeys.path, passingValue);
+            put(VolumeConfigKeys.subPath, passingValue);
+            put(VolumeConfigKeys.readOnly, passingValue);
+          }
+        });
+    final Config configWithoutReadOnly = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "server"), 
"nfs-server.default.local")
+        .put(String.format(keyPattern, volumeNameValid, "pathOnNFS"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": `NFS` without `readOnly`",
+        new Pair<>(configWithoutReadOnly, isExecutor), 
expectedWithoutReadOnly));
+
+    // Ignored.
+    final Config configIgnored = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "server"), 
"nfs-server.default.local")
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), "true")
+        .put(String.format(keyPattern, volumeNameValid, "pathOnNFS"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": `NFS` ignored",
+        new Pair<>(configIgnored, !isExecutor), new HashMap<>()));
+  }
+
+  @Test
+  public void testGetVolumeNFS() {
+    final List<TestTuple<Pair<Config, Boolean>, Map<String, 
Map<VolumeConfigKeys, String>>>>
+        testCases = new LinkedList<>();
+    createVolumeNFS(testCases, true);
+    createVolumeNFS(testCases, false);
+
+    // Testing loop.
+    for (TestTuple<Pair<Config, Boolean>, Map<String, Map<VolumeConfigKeys, 
String>>> testCase
+        : testCases) {
+      Map<String, Map<VolumeConfigKeys, String>> actual =
+          KubernetesContext.getVolumeNFS(testCase.input.first, 
testCase.input.second);
+      Assert.assertEquals(testCase.description, testCase.expected, actual);
+    }
+  }
+  /**
+   * Create test cases for <code>NFS</code> errors.
+   * @param testCases Test case container.
+   *                  Input: [0] Config, [1] Boolean to indicate 
Manager/Executor.
+   *                  Output: Error message
+   * @param isExecutor Boolean to indicate Manager/Executor test case 
generation.
+   */
+  private void createVolumeNFSError(
+      List<TestTuple<Pair<Config, Boolean>, String>> testCases, boolean 
isExecutor) {
+    final String volumeNameValid = "volume-name-valid";
+    final String passingValue = "should-pass";
+    final String failureValue = "Should-Fail";
+    final String processName = isExecutor ? KubernetesConstants.EXECUTOR_NAME
+        : KubernetesConstants.MANAGER_NAME;
+    final String keyPattern = 
String.format(KubernetesContext.KUBERNETES_VOLUME_NFS_PREFIX
+        + "%%s.%%s", processName);
+
+    // Server is missing.
+    final Config configNoServer = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), "false")
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "pathOnNFS"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": No `server` should trigger 
exception",
+        new Pair<>(configNoServer, isExecutor), "`NFS` volumes require a"));
+
+    // Server is invalid.
+    final Config configInvalidServer = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "server"), "")
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), "false")
+        .put(String.format(keyPattern, volumeNameValid, "pathOnNFS"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": Invalid `server` should 
trigger exception",
+        new Pair<>(configInvalidServer, isExecutor), "`NFS` volumes require 
a"));
+
+    // Path on NFS missing.
+    final Config configNoNFSPath = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "server"), 
"nfs-server.default.local")
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), "false")
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": No path on NFS should 
trigger exception",
+        new Pair<>(configNoNFSPath, isExecutor), "NFS requires a path on"));
+
+    // Path on NFS is empty.
+    final Config configEmptyNFSPath = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "server"), 
"nfs-server.default.local")
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), "false")
+        .put(String.format(keyPattern, volumeNameValid, "pathOnNFS"), "")
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": No path on NFS should 
trigger exception",
+        new Pair<>(configEmptyNFSPath, isExecutor), "NFS requires a path on"));
+
+    // Invalid option.
+    final Config configInvalidOption = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "server"), 
"nfs-server.default.local")
+        .put(String.format(keyPattern, volumeNameValid, "readOnly"), "false")
+        .put(String.format(keyPattern, volumeNameValid, "pathOnNFS"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "path"), passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "subPath"), 
passingValue)
+        .put(String.format(keyPattern, volumeNameValid, "accessModes"), 
passingValue)
+        .build();
+    testCases.add(new TestTuple<>(processName + ": Invalid option should 
trigger exception",
+        new Pair<>(configInvalidOption, isExecutor), "Invalid NFS option"));
+  }
+
+  @Test
+  public void testGetVolumeNFSErrors() {
+    final List<TestTuple<Pair<Config, Boolean>, String>> testCases = new 
LinkedList<>();
+    createVolumeNFSError(testCases, true);
+    createVolumeNFSError(testCases, false);
+
+    // Testing loop.
+    for (TestTuple<Pair<Config, Boolean>, String> testCase : testCases) {
+      String message = "";
+      try {
+        KubernetesContext.getVolumeNFS(testCase.input.first, 
testCase.input.second);
+      } catch (TopologySubmissionException e) {
+        message = e.getMessage();
       }
+      Assert.assertTrue(testCase.description, 
message.contains(testCase.expected));
     }
   }
 }
diff --git 
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
 
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
index e79760d..6436520 100644
--- 
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
+++ 
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
@@ -66,8 +66,7 @@ import io.kubernetes.client.openapi.models.V1VolumeBuilder;
 import io.kubernetes.client.openapi.models.V1VolumeMount;
 import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
 
-import static 
org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeClaimTemplateConfigKeys;
-import static org.mockito.Matchers.anyMap;
+import static 
org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeConfigKeys;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 
@@ -866,37 +865,37 @@ public class V1ControllerTest {
     final String volumeMode = "VolumeMode";
     final String path = "/path/to/mount/";
     final String subPath = "/sub/path/to/mount/";
-    final Map<String, Map<VolumeClaimTemplateConfigKeys, String>> mapPVCOpts =
+    final Map<String, Map<VolumeConfigKeys, String>> mapPVCOpts =
         ImmutableMap.of(
-            volumeNameOne, new HashMap<VolumeClaimTemplateConfigKeys, 
String>() {
+            volumeNameOne, new HashMap<VolumeConfigKeys, String>() {
               {
-                put(VolumeClaimTemplateConfigKeys.claimName, claimNameOne);
-                put(VolumeClaimTemplateConfigKeys.storageClassName, 
storageClassName);
-                put(VolumeClaimTemplateConfigKeys.sizeLimit, sizeLimit);
-                put(VolumeClaimTemplateConfigKeys.accessModes, 
accessModesList);
-                put(VolumeClaimTemplateConfigKeys.volumeMode, volumeMode);
-                put(VolumeClaimTemplateConfigKeys.path, path);
+                put(VolumeConfigKeys.claimName, claimNameOne);
+                put(VolumeConfigKeys.storageClassName, storageClassName);
+                put(VolumeConfigKeys.sizeLimit, sizeLimit);
+                put(VolumeConfigKeys.accessModes, accessModesList);
+                put(VolumeConfigKeys.volumeMode, volumeMode);
+                put(VolumeConfigKeys.path, path);
               }
             },
-            volumeNameTwo, new HashMap<VolumeClaimTemplateConfigKeys, 
String>() {
+            volumeNameTwo, new HashMap<VolumeConfigKeys, String>() {
               {
-                put(VolumeClaimTemplateConfigKeys.claimName, claimNameTwo);
-                put(VolumeClaimTemplateConfigKeys.storageClassName, 
storageClassName);
-                put(VolumeClaimTemplateConfigKeys.sizeLimit, sizeLimit);
-                put(VolumeClaimTemplateConfigKeys.accessModes, accessModes);
-                put(VolumeClaimTemplateConfigKeys.volumeMode, volumeMode);
-                put(VolumeClaimTemplateConfigKeys.path, path);
-                put(VolumeClaimTemplateConfigKeys.subPath, subPath);
+                put(VolumeConfigKeys.claimName, claimNameTwo);
+                put(VolumeConfigKeys.storageClassName, storageClassName);
+                put(VolumeConfigKeys.sizeLimit, sizeLimit);
+                put(VolumeConfigKeys.accessModes, accessModes);
+                put(VolumeConfigKeys.volumeMode, volumeMode);
+                put(VolumeConfigKeys.path, path);
+                put(VolumeConfigKeys.subPath, subPath);
               }
             },
-            volumeNameStatic, new HashMap<VolumeClaimTemplateConfigKeys, 
String>() {
+            volumeNameStatic, new HashMap<VolumeConfigKeys, String>() {
               {
-                put(VolumeClaimTemplateConfigKeys.claimName, claimNameStatic);
-                put(VolumeClaimTemplateConfigKeys.sizeLimit, sizeLimit);
-                put(VolumeClaimTemplateConfigKeys.accessModes, accessModes);
-                put(VolumeClaimTemplateConfigKeys.volumeMode, volumeMode);
-                put(VolumeClaimTemplateConfigKeys.path, path);
-                put(VolumeClaimTemplateConfigKeys.subPath, subPath);
+                put(VolumeConfigKeys.claimName, claimNameStatic);
+                put(VolumeConfigKeys.sizeLimit, sizeLimit);
+                put(VolumeConfigKeys.accessModes, accessModes);
+                put(VolumeConfigKeys.volumeMode, volumeMode);
+                put(VolumeConfigKeys.path, path);
+                put(VolumeConfigKeys.subPath, subPath);
               }
             }
         );
@@ -922,6 +921,7 @@ public class V1ControllerTest {
           
.withLabels(V1Controller.getPersistentVolumeClaimLabels(topologyName))
         .endMetadata()
         .withNewSpec()
+          .withStorageClassName("")
           .withAccessModes(Collections.singletonList(accessModes))
           .withVolumeMode(volumeMode)
           .withNewResources()
@@ -948,15 +948,15 @@ public class V1ControllerTest {
     final String mountPathOne = "/mount/path/ONE";
     final String mountPathTwo = "/mount/path/TWO";
     final String mountSubPathTwo = "/mount/sub/path/TWO";
-    Map<String, Map<VolumeClaimTemplateConfigKeys, String>> mapOfOpts =
+    Map<String, Map<VolumeConfigKeys, String>> mapOfOpts =
         ImmutableMap.of(
             volumeNameOne, ImmutableMap.of(
-                VolumeClaimTemplateConfigKeys.claimName, claimNameOne,
-                VolumeClaimTemplateConfigKeys.path, mountPathOne),
+                VolumeConfigKeys.claimName, claimNameOne,
+                VolumeConfigKeys.path, mountPathOne),
             volumeNameTwo, ImmutableMap.of(
-                VolumeClaimTemplateConfigKeys.claimName, claimNameTwo,
-                VolumeClaimTemplateConfigKeys.path, mountPathTwo,
-                VolumeClaimTemplateConfigKeys.subPath, mountSubPathTwo)
+                VolumeConfigKeys.claimName, claimNameTwo,
+                VolumeConfigKeys.path, mountPathTwo,
+                VolumeConfigKeys.subPath, mountSubPathTwo)
         );
     final V1Volume volumeOne = new V1VolumeBuilder()
         .withName(volumeNameOne)
@@ -981,13 +981,13 @@ public class V1ControllerTest {
         .build();
 
     // Test case container.
-    final List<TestTuple<Pair<List<V1Volume>, List<V1VolumeMount>>,
+    // Input: Map of Volume configurations.
+    // Output: The expected lists of Volumes and Volume Mounts.
+    final List<TestTuple<Map<String, Map<VolumeConfigKeys, String>>,
         Pair<List<V1Volume>, List<V1VolumeMount>>>> testCases = new 
LinkedList<>();
 
     // Default case: No PVC provided.
-    final Pair<List<V1Volume>, List<V1VolumeMount>> actualEmpty =
-        
v1ControllerPodTemplate.createPersistentVolumeClaimVolumesAndMounts(new 
HashMap<>());
-    testCases.add(new TestTuple<>("Generated an empty list of Volumes", 
actualEmpty,
+    testCases.add(new TestTuple<>("Generated an empty list of Volumes", new 
HashMap<>(),
         new Pair<>(new LinkedList<>(), new LinkedList<>())));
 
     // PVC Provided.
@@ -995,23 +995,26 @@ public class V1ControllerTest {
         new Pair<>(
             new LinkedList<>(Arrays.asList(volumeOne, volumeTwo)),
             new LinkedList<>(Arrays.asList(volumeMountOne, volumeMountTwo)));
-    final Pair<List<V1Volume>, List<V1VolumeMount>> actualFull =
-        
v1ControllerPodTemplate.createPersistentVolumeClaimVolumesAndMounts(mapOfOpts);
-    testCases.add(new TestTuple<>("Generated a list of Volumes", actualFull,
+    testCases.add(new TestTuple<>("Generated a list of Volumes", mapOfOpts,
         new Pair<>(expectedFull.first, expectedFull.second)));
 
     // Testing loop.
-    for (TestTuple<Pair<List<V1Volume>, List<V1VolumeMount>>,
+    for (TestTuple<Map<String, Map<VolumeConfigKeys, String>>,
              Pair<List<V1Volume>, List<V1VolumeMount>>> testCase : testCases) {
+      List<V1Volume> actualVolume = new LinkedList<>();
+      List<V1VolumeMount> actualVolumeMount = new LinkedList<>();
+      
v1ControllerPodTemplate.createVolumeAndMountsPersistentVolumeClaimCLI(testCase.input,
+          actualVolume, actualVolumeMount);
+
       Assert.assertTrue(testCase.description,
-          (testCase.expected.first).containsAll(testCase.input.first));
+          (testCase.expected.first).containsAll(actualVolume));
       Assert.assertTrue(testCase.description + " Mounts",
-          (testCase.expected.second).containsAll(testCase.input.second));
+          (testCase.expected.second).containsAll(actualVolumeMount));
     }
   }
 
   @Test
-  public void testConfigurePodWithPersistentVolumeClaims() {
+  public void testConfigurePodWithVolumesAndMountsFromCLI() {
     final String volumeNameClashing = "clashing-volume";
     final String volumeMountNameClashing = "original-volume-mount";
     V1Volume baseVolume = new V1VolumeBuilder()
@@ -1046,8 +1049,8 @@ public class V1ControllerTest {
         .build();
 
     // Test case container.
-    // Input: Pod Spec to modify, Executor to modify, Volumes and Mounts to 
return from
-    // <createPersistentVolumeClaimVolumesAndMounts>.
+    // Input: [0] Pod Spec to modify, [1] Heron container to modify, [2] List 
of Volumes
+    // [3] List of Volume Mounts.
     // Output: The expected <V1PodSpec> and <V1Container>.
     final List<TestTuple<Object[], Pair<V1PodSpec, V1Container>>> testCases = 
new LinkedList<>();
 
@@ -1058,11 +1061,9 @@ public class V1ControllerTest {
     final V1PodSpec expectedEmptyPodSpec = new 
V1PodSpecBuilder().withVolumes(baseVolume).build();
     final V1Container expectedEmptyExecutor =
         new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build();
-    Pair<List<V1Volume>, List<V1VolumeMount>> emptyVolumeAndMount =
-        new Pair<>(new LinkedList<>(), new LinkedList<>());
 
     testCases.add(new TestTuple<>("Empty",
-        new Object[]{podSpecEmptyCase, executorEmptyCase, emptyVolumeAndMount},
+        new Object[]{podSpecEmptyCase, executorEmptyCase, new LinkedList<>(), 
new LinkedList<>()},
         new Pair<>(expectedEmptyPodSpec, expectedEmptyExecutor)));
 
     // Non-clashing Persistent Volume Claim.
@@ -1081,12 +1082,10 @@ public class V1ControllerTest {
         .addToVolumeMounts(secondaryVolumeMount)
         .build();
 
-    Pair<List<V1Volume>, List<V1VolumeMount>> noClashVolumeAndMount = new 
Pair<>(
-        new LinkedList<>(Collections.singletonList(secondaryVolume)),
-        new LinkedList<>(Collections.singletonList(secondaryVolumeMount)));
-
     testCases.add(new TestTuple<>("No Clash",
-        new Object[]{podSpecNoClashCase, executorNoClashCase, 
noClashVolumeAndMount},
+        new Object[]{podSpecNoClashCase, executorNoClashCase,
+            Collections.singletonList(secondaryVolume),
+            Collections.singletonList(secondaryVolumeMount)},
         new Pair<>(expectedNoClashPodSpec, expectedNoClashExecutor)));
 
     // Clashing Persistent Volume Claim.
@@ -1105,24 +1104,18 @@ public class V1ControllerTest {
         .addToVolumeMounts(secondaryVolumeMount)
         .build();
 
-    Pair<List<V1Volume>, List<V1VolumeMount>> clashVolumeAndMount = new Pair<>(
-        new LinkedList<>(Arrays.asList(clashingVolume, secondaryVolume)),
-        new LinkedList<>(Arrays.asList(clashingVolumeMount, 
secondaryVolumeMount)));
-
     testCases.add(new TestTuple<>("Clashing",
-        new Object[]{podSpecClashCase, executorClashCase, clashVolumeAndMount},
+        new Object[]{podSpecClashCase, executorClashCase,
+            Arrays.asList(clashingVolume, secondaryVolume),
+            Arrays.asList(clashingVolumeMount, secondaryVolumeMount)},
         new Pair<>(expectedClashPodSpec, expectedClashExecutor)));
 
     // Testing loop.
     for (TestTuple<Object[], Pair<V1PodSpec, V1Container>> testCase : 
testCases) {
-      doReturn(testCase.input[2])
-          .when(v1ControllerWithPodTemplate)
-          .createPersistentVolumeClaimVolumesAndMounts(anyMap());
-
-      // <configPVC> parameter is used in mock above, so we can set it to 
<null> as it is not used.
       v1ControllerWithPodTemplate
-          .configurePodWithPersistentVolumeClaimVolumesAndMounts((V1PodSpec) 
testCase.input[0],
-              (V1Container) testCase.input[1], null);
+          .configurePodWithVolumesAndMountsFromCLI((V1PodSpec) 
testCase.input[0],
+              (V1Container) testCase.input[1], (List<V1Volume>) 
testCase.input[2],
+              (List<V1VolumeMount>) testCase.input[3]);
 
       Assert.assertEquals("Pod Specs match " + testCase.description,
           testCase.input[0], testCase.expected.first);
@@ -1222,4 +1215,231 @@ public class V1ControllerTest {
       Assert.assertEquals(testCase.description, testCase.expected, actual);
     }
   }
+
+  @Test
+  public void testCreateVolumeMountsCLI() {
+    final String volumeNamePVC = "volume-name-pvc";
+    final String volumeNameHostPath = "volume-name-host-path";
+    final String volumeNameEmptyDir = "volume-name-empty-dir";
+    final String volumeNameNFS = "volume-name-nfs";
+    final String value = "inserted-value";
+
+    // Test case container.
+    // Input: [0] volume name, [1] volume options
+    // Output: The expected <V1VolumeMount>.
+    final List<TestTuple<Pair<String, Map<VolumeConfigKeys, String>>, 
V1VolumeMount>> testCases =
+        new LinkedList<>();
+
+    // PVC.
+    final Map<VolumeConfigKeys, String> configPVC = 
ImmutableMap.<VolumeConfigKeys, String>builder()
+        .put(VolumeConfigKeys.claimName, value)
+        .put(VolumeConfigKeys.storageClassName, value)
+        .put(VolumeConfigKeys.sizeLimit, value)
+        .put(VolumeConfigKeys.accessModes, value)
+        .put(VolumeConfigKeys.volumeMode, value)
+        .put(VolumeConfigKeys.path, value)
+        .put(VolumeConfigKeys.subPath, value)
+        .put(VolumeConfigKeys.readOnly, "true")
+        .build();
+    final V1VolumeMount volumeMountPVC = new V1VolumeMountBuilder()
+        .withName(volumeNamePVC)
+        .withMountPath(value)
+        .withSubPath(value)
+        .withReadOnly(true)
+        .build();
+    testCases.add(new TestTuple<>("PVC volume mount",
+        new Pair<>(volumeNamePVC, configPVC), volumeMountPVC));
+
+    // Host Path.
+    final Map<VolumeConfigKeys, String> configHostPath =
+        ImmutableMap.<VolumeConfigKeys, String>builder()
+            .put(VolumeConfigKeys.type, "DirectoryOrCreate")
+            .put(VolumeConfigKeys.pathOnHost, value)
+            .put(VolumeConfigKeys.path, value)
+            .put(VolumeConfigKeys.subPath, value)
+            .put(VolumeConfigKeys.readOnly, "true")
+            .build();
+    final V1VolumeMount volumeMountHostPath = new V1VolumeMountBuilder()
+        .withName(volumeNameHostPath)
+        .withMountPath(value)
+        .withSubPath(value)
+        .withReadOnly(true)
+        .build();
+    testCases.add(new TestTuple<>("Host Path volume mount",
+        new Pair<>(volumeNameHostPath, configHostPath), volumeMountHostPath));
+
+    // Empty Dir.
+    final Map<VolumeConfigKeys, String> configEmptyDir =
+        ImmutableMap.<VolumeConfigKeys, String>builder()
+            .put(VolumeConfigKeys.sizeLimit, value)
+            .put(VolumeConfigKeys.medium, "Memory")
+            .put(VolumeConfigKeys.path, value)
+            .put(VolumeConfigKeys.subPath, value)
+            .put(VolumeConfigKeys.readOnly, "true")
+            .build();
+    final V1VolumeMount volumeMountEmptyDir = new V1VolumeMountBuilder()
+        .withName(volumeNameEmptyDir)
+        .withMountPath(value)
+        .withSubPath(value)
+        .withReadOnly(true)
+        .build();
+    testCases.add(new TestTuple<>("Empty Dir volume mount",
+        new Pair<>(volumeNameEmptyDir, configEmptyDir), volumeMountEmptyDir));
+
+    // NFS.
+    final Map<VolumeConfigKeys, String> configNFS = 
ImmutableMap.<VolumeConfigKeys, String>builder()
+        .put(VolumeConfigKeys.server, "nfs.server.address")
+        .put(VolumeConfigKeys.readOnly, "true")
+        .put(VolumeConfigKeys.pathOnNFS, value)
+        .put(VolumeConfigKeys.path, value)
+        .put(VolumeConfigKeys.subPath, value)
+        .build();
+    final V1VolumeMount volumeMountNFS = new V1VolumeMountBuilder()
+        .withName(volumeNameNFS)
+        .withMountPath(value)
+        .withSubPath(value)
+        .withReadOnly(true)
+        .build();
+    testCases.add(new TestTuple<>("NFS volume mount",
+        new Pair<>(volumeNameNFS, configNFS), volumeMountNFS));
+
+    // Test loop.
+    for (TestTuple<Pair<String, Map<VolumeConfigKeys, String>>, V1VolumeMount> 
testCase
+        : testCases) {
+      V1VolumeMount actual = v1ControllerPodTemplate.createVolumeMountsCLI(
+          testCase.input.first, testCase.input.second);
+      Assert.assertEquals(testCase.description, testCase.expected, actual);
+    }
+
+  }
+
+  @Test
+  public void testCreateVolumeAndMountsEmptyDirCLI() {
+    final String volumeName = "volume-name-empty-dir";
+    final String medium = "Memory";
+    final String sizeLimit = "1Gi";
+    final String path = "/path/to/mount";
+    final String subPath = "/sub/path/to/mount";
+
+    // Empty Dir.
+    final Map<String, Map<VolumeConfigKeys, String>> config =
+        ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
+          {
+            put(VolumeConfigKeys.sizeLimit, sizeLimit);
+            put(VolumeConfigKeys.medium, "Memory");
+            put(VolumeConfigKeys.path, path);
+            put(VolumeConfigKeys.subPath, subPath);
+          }
+        });
+    final List<V1Volume> expectedVolumes = Collections.singletonList(
+        new V1VolumeBuilder()
+            .withName(volumeName)
+            .withNewEmptyDir()
+              .withMedium(medium)
+              .withNewSizeLimit(sizeLimit)
+            .endEmptyDir()
+            .build()
+    );
+    final List<V1VolumeMount> expectedMounts = Collections.singletonList(
+        new V1VolumeMountBuilder()
+            .withName(volumeName)
+              .withMountPath(path)
+              .withSubPath(subPath)
+            .build()
+    );
+
+    List<V1Volume> actualVolumes = new LinkedList<>();
+    List<V1VolumeMount> actualMounts = new LinkedList<>();
+    v1ControllerPodTemplate.createVolumeAndMountsEmptyDirCLI(config, 
actualVolumes, actualMounts);
+    Assert.assertEquals("Empty Dir Volume populated", expectedVolumes, 
actualVolumes);
+    Assert.assertEquals("Empty Dir Volume Mount populated", expectedMounts, 
actualMounts);
+  }
+
+  @Test
+  public void testCreateVolumeAndMountsHostPathCLI() {
+    final String volumeName = "volume-name-host-path";
+    final String type = "DirectoryOrCreate";
+    final String pathOnHost = "path.on.host";
+    final String path = "/path/to/mount";
+    final String subPath = "/sub/path/to/mount";
+
+    // Host Path.
+    final Map<String, Map<VolumeConfigKeys, String>> config =
+        ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
+          {
+            put(VolumeConfigKeys.type, type);
+            put(VolumeConfigKeys.pathOnHost, pathOnHost);
+            put(VolumeConfigKeys.path, path);
+            put(VolumeConfigKeys.subPath, subPath);
+          }
+        });
+    final List<V1Volume> expectedVolumes = Collections.singletonList(
+        new V1VolumeBuilder()
+            .withName(volumeName)
+            .withNewHostPath()
+              .withNewType(type)
+              .withNewPath(pathOnHost)
+            .endHostPath()
+            .build()
+    );
+    final List<V1VolumeMount> expectedMounts = Collections.singletonList(
+        new V1VolumeMountBuilder()
+            .withName(volumeName)
+              .withMountPath(path)
+              .withSubPath(subPath)
+            .build()
+    );
+
+    List<V1Volume> actualVolumes = new LinkedList<>();
+    List<V1VolumeMount> actualMounts = new LinkedList<>();
+    v1ControllerPodTemplate.createVolumeAndMountsHostPathCLI(config, 
actualVolumes, actualMounts);
+    Assert.assertEquals("Host Path Volume populated", expectedVolumes, 
actualVolumes);
+    Assert.assertEquals("Host Path Volume Mount populated", expectedMounts, 
actualMounts);
+  }
+
+  @Test
+  public void testCreateVolumeAndMountsNFSCLI() {
+    final String volumeName = "volume-name-nfs";
+    final String server = "nfs.server.address";
+    final String pathOnNFS = "path.on.host";
+    final String readOnly = "true";
+    final String path = "/path/to/mount";
+    final String subPath = "/sub/path/to/mount";
+
+    // NFS.
+    final Map<String, Map<VolumeConfigKeys, String>> config =
+        ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
+          {
+            put(VolumeConfigKeys.server, server);
+            put(VolumeConfigKeys.readOnly, readOnly);
+            put(VolumeConfigKeys.pathOnNFS, pathOnNFS);
+            put(VolumeConfigKeys.path, path);
+            put(VolumeConfigKeys.subPath, subPath);
+          }
+        });
+    final List<V1Volume> expectedVolumes = Collections.singletonList(
+        new V1VolumeBuilder()
+            .withName(volumeName)
+            .withNewNfs()
+              .withServer(server)
+              .withNewPath(pathOnNFS)
+              .withNewReadOnly(readOnly)
+            .endNfs()
+            .build()
+    );
+    final List<V1VolumeMount> expectedMounts = Collections.singletonList(
+        new V1VolumeMountBuilder()
+            .withName(volumeName)
+            .withMountPath(path)
+            .withSubPath(subPath)
+            .withReadOnly(true)
+            .build()
+    );
+
+    List<V1Volume> actualVolumes = new LinkedList<>();
+    List<V1VolumeMount> actualMounts = new LinkedList<>();
+    v1ControllerPodTemplate.createVolumeAndMountsNFSCLI(config, actualVolumes, 
actualMounts);
+    Assert.assertEquals("NFS Volume populated", expectedVolumes, 
actualVolumes);
+    Assert.assertEquals("NFS Volume Mount populated", expectedMounts, 
actualMounts);
+  }
 }
diff --git a/website2/docs/schedulers-k8s-execution-environment.md 
b/website2/docs/schedulers-k8s-execution-environment.md
index cbc1956..010bd73 100644
--- a/website2/docs/schedulers-k8s-execution-environment.md
+++ b/website2/docs/schedulers-k8s-execution-environment.md
@@ -1,7 +1,8 @@
 ---
 id: schedulers-k8s-execution-environment
 title: Kubernetes Execution Environment Customization
-sidebar_label:  Kubernetes Execution Environment Customization
+hide_title: true
+sidebar_label: Kubernetes Environment Customization
 ---
 <!--
     Licensed to the Apache Software Foundation (ASF) under one
@@ -20,34 +21,12 @@ sidebar_label:  Kubernetes Execution Environment 
Customization
     under the License.
 -->
 
-# Customizing the Heron Execution Environment
+# Customizing the Heron Execution Environment in Kubernetes
 
 This document demonstrates how you can customize various aspects of the Heron 
execution environment when using the Kubernetes Scheduler.
 
 <br>
 
-***Table of contents:***
-- [Customizing the Heron Execution 
Environment](#customizing-the-heron-execution-environment)
-  - [Customizing a Topology's Execution Environment Using Pod 
Templates](#customizing-a-topologys-execution-environment-using-pod-templates)
-    - [Preparation](#preparation)
-      - [Pod Templates](#pod-templates)
-      - [Configuration Maps](#configuration-maps)
-    - [Submitting](#submitting)
-    - [Heron Configured Items in Pod 
Templates](#heron-configured-items-in-pod-templates)
-      - [Executor and Manager Containers](#executor-and-manager-containers)
-      - [Pod](#pod)
-  - [Adding Persistent Volumes via the Command Line 
Interface](#adding-persistent-volumes-via-the-command-line-interface)
-    - [Usage](#usage)
-      - [Example](#example)
-    - [Submitting](#submitting-1)
-    - [Required and Optional Configuration 
Items](#required-and-optional-configuration-items)
-    - [Configuration Items Created and Entries 
Made](#configuration-items-created-and-entries-made)
-  - [Setting Limits and Requests via the Command Line 
Interface](#setting-limits-and-requests-via-the-command-line-interface)
-    - [Usage](#usage-1)
-      - [Example](#example-1)
-
-<br>
-
 ---
 
 <br>
@@ -243,7 +222,7 @@ The following items will be set in the Pod Template's 
`spec` by Heron.
 ---
 <br>
 
-## Adding Persistent Volumes via the Command Line Interface
+## Adding Persistent Volumes via the Command-line Interface
 
 <br>
 
@@ -269,7 +248,7 @@ metadata:
 
 > ***System Administrators:***
 >
-> * You may wish to disable the ability to configure Persistent Volume Claims 
specified via the CLI. To achieve this, you must pass the define option `-D 
heron.kubernetes.persistent.volume.claims.cli.disabled=true`to the Heron API 
Server on the command line when launching. This command has been added to the 
Kubernetes configuration files to deploy the Heron API Server and is set to 
`false` by default.
+> * You may wish to disable the ability to configure Persistent Volume Claims 
specified via the CLI. To achieve this, you must pass the define option `-D 
heron.kubernetes.volume.from.cli.disabled=true`to the Heron API Server on the 
command line when launching. This command has been added to the Kubernetes 
configuration files to deploy the Heron API Server and is set to `false` by 
default.
 > * If you have a custom `Role`/`ClusterRole` for the Heron API Server you 
 > will need to ensure the `ServiceAccount` attached to the API server has the 
 > correct permissions to access the `Persistent Volume Claim`s:
 >
 >```yaml
@@ -304,12 +283,13 @@ The currently supported CLI `options` are:
 * `volumeMode`
 * `path`
 * `subPath`
+* `readOnly`
 
 ***Note:*** A `claimName` of `OnDemand` will create unique Volumes for each 
`Heron container` as well as deploy a Persistent Volume Claim for each Volume. 
Any other claim name will result in a shared Volume being created between all 
Pods in the topology.
 
 ***Note:*** The `accessModes` must be a comma-separated list of values 
*without* any white space. Valid values can be found in the [Kubernetes 
documentation](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes).
 
-***Note:*** If a `storageClassName` is specified and there are no matching 
Persistent Volumes then [dynamic 
provisioning](https://kubernetes.io/docs/concepts/storage/dynamic-provisioning/)
 must be enabled. Kubernetes will attempt to locate a Persistent Volume that 
matches the `storageClassName` before it attempts to use dynamic provisioning. 
If a `storageClassName` is not specified there must be [Persistent 
Volumes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-persi
 [...]
+***Note:*** If a `storageClassName` is specified and there are no matching 
Persistent Volumes then [dynamic 
provisioning](https://kubernetes.io/docs/concepts/storage/dynamic-provisioning/)
 must be enabled. Kubernetes will attempt to locate a Persistent Volume that 
matches the `storageClassName` before it attempts to use dynamic provisioning. 
If a `storageClassName` is not specified there must be [Persistent 
Volumes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-persi
 [...]
 
 <br>
 
@@ -402,7 +382,7 @@ spec:
   resources:
     requests:
       storage: 555Gi
-  storageClassName: standard
+  storageClassName: ""
   volumeMode: volume-mode-of-choice
 ```
 
@@ -458,6 +438,7 @@ The following table outlines CLI options which are either 
***required*** ( &#x27
 | `accessModes` | &#x2705; | &#x2705; | &#x274c;
 | `sizeLimit` | &#x2754; | &#x2754; | &#x274c;
 | `volumeMode` | &#x2754; | &#x2754; | &#x274c;
+| `readOnly` | &#x2754; | &#x2754; | &#x2754;
 
 <br>
 
@@ -473,10 +454,10 @@ A `Volume` and a `Volume Mount` will be created for each 
`volume name` which you
 
 | Name | Description | Policy |
 |---|---|---|
-| `VOLUME NAME` | The `name` of the `Volume`. | Entries made in the 
`Persistent Volume Claim`'s spec, the Pod Spec's `Volumes`, and the `Heron 
containers` `volumeMounts`.
+| `VOLUME NAME` | The `name` of the `Volume`. | Entries made in the 
`Persistent Volume Claim`'s spec, the Pod Spec's `Volumes`, and the `Heron 
container`'s `volumeMounts`.
 | `claimName` | A Claim name for the Persistent Volume. | If `OnDemand` is 
provided as the parameter then a unique Volume and Persistent Volume Claim will 
be created. Any other name will result in a shared Volume between all Pods in 
the topology with only a Volume and Volume Mount being added.
-| `path` | The `mountPath` of the `Volume`. | Entries made in the `Heron 
containers` `volumeMounts`.
-| `subPath` | The `subPath` of the `Volume`. | Entries made in the `Heron 
containers` `volumeMounts`.
+| `path` | The `mountPath` of the `Volume`. | Entries made in the `Heron 
container`'s `volumeMounts`.
+| `subPath` | The `subPath` of the `Volume`. | Entries made in the `Heron 
container`'s `volumeMounts`.
 | `storageClassName` | The identifier name used to reference the dynamic 
`StorageClass`. | Entries made in the `Persistent Volume Claim` and Pod Spec's 
`Volume`.
 | `accessModes` | A comma-separated list of [access 
modes](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes).
 | Entries made in the `Persistent Volume Claim`.
 | `sizeLimit` | A resource request for storage space 
[units](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory).
 | Entries made in the `Persistent Volume Claim`.
@@ -489,6 +470,235 @@ A `Volume` and a `Volume Mount` will be created for each 
`volume name` which you
 
 <br>
 
+## Adding Empty Directory, Host Path, and Nework File System Volumes via the 
Command-line Interface
+
+<br>
+
+> This section demonstrates how you can specify configurations for `Empty 
Dir`, `Host Path`, and `NFS` volumes via the Command Line Interface during the 
submit process.
+
+<br/>
+
+It is possible to allocate and configure Volumes with Pod Templates but the 
CLI commands extend this to being able to specify Volumes at submission time.
+
+<br>
+
+> ***System Administrators:***
+>
+> * You may wish to disable the ability to configure Volume configurations 
specified via the CLI. To achieve this, you must pass the define option `-D 
heron.kubernetes.volume.from.cli.disabled=true`to the Heron API Server on the 
command line when launching. This command has been added to the Kubernetes 
configuration files to deploy the Heron API Server and is set to `false` by 
default.
+> * &#x26a0; ***WARNING*** &#x26a0; `Host Path` volumes have inherent 
[security 
concerns](https://kubernetes.io/docs/concepts/storage/volumes/#hostpath). `Host 
Path`s can breach the containment provided by containerization and should be 
exclusively used with volume mounts set to `read-only`, with usage limited to 
testing and development environments.
+
+<br>
+
+### Usage
+
+To configure a Volume on the CLI you must use the `--config-property` option 
in combination with the following prefixes:
+
+ * [Empty 
Directory](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir): 
`heron.kubernetes.[executor | manager].volumes.emptyDir.`
+ * [Host Path](https://kubernetes.io/docs/concepts/storage/volumes/#hostpath): 
`heron.kubernetes.[executor | manager].volumes.hostPath.`
+ * [Network File 
System](https://kubernetes.io/docs/concepts/storage/volumes/#nfs): 
`heron.kubernetes.[executor | manager].volumes.nfs.`
+
+ Heron will not validate your Volume configurations, so please validate them 
to ensure they are well-formed. All Volume names must comply with the 
[*lowercase 
RFC-1123*](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/)
 standard.
+
+The command patterns are as follows:
+
+ * Empty Directory: `heron.kubernetes.[executor | 
manager].volumes.emptyDir.[VOLUME NAME].[OPTION]=[VALUE]`
+ * Host Path: `heron.kubernetes.[executor | manager].volumes.hostPath.[VOLUME 
NAME].[OPTION]=[VALUE]`
+ * Network File System: `heron.kubernetes.[executor | 
manager].volumes.nfs.[VOLUME NAME].[OPTION]=[VALUE]`
+
+The currently supported CLI `options` are:
+
+* `medium`
+* `type`
+* `server`
+* `sizeLimit`
+* `pathOnHost`
+* `pathOnNFS`
+* `path`
+* `subPath`
+* `readOnly`
+
+<br>
+
+#### Example
+
+A series of example commands to add Volumes to a `Manager`, and the `YAML` 
entries they make in their respective configurations, are as follows.
+
+***Empty Directory:***
+
+```bash
+--config-property 
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.medium="Memory"
+--config-property 
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.sizeLimit="50Mi"
+--config-property 
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.path="empty/dir/path"
+--config-property 
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.subPath="empty/dir/sub/path"
+--config-property 
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.readOnly="true"
+```
+
+Generated `Volume` entry:
+
+```yaml
+volumes:
+- emptyDir:
+    medium: Memory
+    sizeLimit: 50Mi
+  name: manager-empty-dir
+```
+
+Generated `Volume Mount` entry:
+
+```yaml
+volumeMounts:
+- mountPath: empty/dir/path
+  name: manager-empty-dir
+  readOnly: true
+  subPath: empty/dir/sub/path
+```
+
+<br>
+
+***Host Path:***
+
+```bash
+--config-property 
heron.kubernetes.manager.volumes.hostPath.manager-host-path.type="File"
+--config-property 
heron.kubernetes.manager.volumes.hostPath.manager-host-path.pathOnHost="/dev/null"
+--config-property 
heron.kubernetes.manager.volumes.hostPath.manager-host-path.path="host/path/path"
+--config-property 
heron.kubernetes.manager.volumes.hostPath.manager-host-path.subPath="host/path/sub/path"
+--config-property 
heron.kubernetes.manager.volumes.hostPath.manager-host-path.readOnly="true"
+```
+
+Generated `Volume` entry:
+
+```yaml
+volumes:
+- hostPath:
+    path: /dev/null
+    type: File
+  name: manager-host-path
+```
+
+Generated `Volume Mount` entry:
+
+```yaml
+volumeMounts:
+- mountPath: host/path/path
+  name: manager-host-path
+  readOnly: true
+  subPath: host/path/sub/path
+```
+
+<br>
+
+***NFS:***
+
+```bash
+--config-property 
heron.kubernetes.manager.volumes.nfs.manager-nfs.server="nfs-server.address"
+--config-property 
heron.kubernetes.manager.volumes.nfs.manager-nfs.readOnly="true"
+--config-property 
heron.kubernetes.manager.volumes.nfs.manager-nfs.pathOnNFS="/dev/null"
+--config-property 
heron.kubernetes.manager.volumes.nfs.manager-nfs.path="nfs/path"
+--config-property 
heron.kubernetes.manager.volumes.nfs.manager-nfs.subPath="nfs/sub/path"
+--config-property 
heron.kubernetes.manager.volumes.nfs.manager-nfs.readOnly="true"
+```
+
+Generated `Volume` entry:
+
+```yaml
+volumes:
+- name: manager-nfs
+  nfs:
+    path: /dev/null
+    readOnly: true
+    server: nfs-server.address
+```
+
+Generated `Volume Mount` entry:
+
+```yaml
+volumeMounts:
+- mountPath: nfs/path
+  name: manager-nfs
+  readOnly: true
+  subPath: nfs/sub/path
+```
+
+<br>
+
+### Submitting
+
+A series of example commands to sumbit a topology using the example CLI 
commands above:
+
+```bash
+heron submit kubernetes \
+  
--service-url=http://localhost:8001/api/v1/namespaces/default/services/heron-apiserver:9000/proxy
 \
+  ~/.heron/examples/heron-api-examples.jar \
+  org.apache.heron.examples.api.AckingTopology acking \
+\
+--config-property 
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.medium="Memory" \
+--config-property 
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.sizeLimit="50Mi" \
+--config-property 
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.path="empty/dir/path"
 \
+--config-property 
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.subPath="empty/dir/sub/path"
 \
+--config-property 
heron.kubernetes.manager.volumes.emptyDir.manager-empty-dir.readOnly="true" \
+\
+--config-property 
heron.kubernetes.manager.volumes.hostPath.manager-host-path.type="File" \
+--config-property 
heron.kubernetes.manager.volumes.hostPath.manager-host-path.pathOnHost="/dev/null"
 \
+--config-property 
heron.kubernetes.manager.volumes.hostPath.manager-host-path.path="host/path/path"
 \
+--config-property 
heron.kubernetes.manager.volumes.hostPath.manager-host-path.subPath="host/path/sub/path"
 \
+--config-property 
heron.kubernetes.manager.volumes.hostPath.manager-host-path.readOnly="true" \
+\
+--config-property 
heron.kubernetes.manager.volumes.nfs.manager-nfs.server="nfs-server.address" \
+--config-property 
heron.kubernetes.manager.volumes.nfs.manager-nfs.readOnly="true" \
+--config-property 
heron.kubernetes.manager.volumes.nfs.manager-nfs.pathOnNFS="/dev/null" \
+--config-property 
heron.kubernetes.manager.volumes.nfs.manager-nfs.path="nfs/path" \
+--config-property 
heron.kubernetes.manager.volumes.nfs.manager-nfs.subPath="nfs/sub/path" \
+--config-property 
heron.kubernetes.manager.volumes.nfs.manager-nfs.readOnly="true"
+```
+
+### Required and Optional Configuration Items
+
+The following table outlines CLI options which are either ***required*** ( 
&#x2705; ), ***optional*** ( &#x2754; ), or ***not available*** ( &#x274c; ) 
depending on the type of `Volume`.
+
+| Option | emptyDir | hostPath | NFS
+|---|---|---|---|
+| `VOLUME NAME` | &#x2705; | &#x2705; | &#x2705;
+| `path` | &#x2705; | &#x2705; | &#x2705;
+| `subPath` | &#x2754; | &#x2754; | &#x2754;
+| `readOnly` | &#x2754; | &#x2754; | &#x2754;
+| `medium` | &#x2754; | &#x274c; | &#x274c;
+| `sizeLimit` | &#x2754; | &#x274c; | &#x274c;
+| `pathOnHost` | &#x274c; | &#x2705; | &#x274c;
+| `type` | &#x274c; | &#x2754; | &#x274c;
+| `pathOnNFS` | &#x274c; | &#x274c; | &#x2705;
+| `server` | &#x274c; | &#x274c; | &#x2705;
+
+<br>
+
+***Note:*** The `VOLUME NAME` will be extracted from the CLI command.
+
+<br>
+
+### Configuration Items Created and Entries Made
+
+The configuration items and entries in the tables below will made in their 
respective areas.
+
+A `Volume` and a `Volume Mount` will be created for each `volume name` which 
you specify.
+
+| Name | Description | Policy |
+|---|---|---|
+| `VOLUME NAME` | The `name` of the `Volume`. | Entries are made in the Pod 
Spec's `Volumes`, and the `Heron container`'s `volumeMounts`.
+| `path` | The `mountPath` of the `Volume`. | Entries are made in the `Heron 
container`'s `volumeMounts`.
+| `subPath` | The `subPath` of the `Volume`. | Entries are made in the `Heron 
container`'s `volumeMounts`.
+| `readOnly` | A boolean value which defaults to `false` and indicates whether 
the medium has read-write permissions. | Entries are made in the `Heron 
container`s `volumeMount`. When used with an `NFS` volume an entry is also made 
in the associated `Volume`.
+| `medium` | The type of storage medium that will back the `Empty Dir` and 
defaults to "", please read more 
[here](https://kubernetes.io/docs/concepts/storage/volumes#emptydir). | An 
entry is made in the `Empty Dir`'s `Volume`.
+| `sizeLimit` | Total 
[amount](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory)
 of local storage required for this `Empty Dir` Volume. | An entry is made 
`Empty Dir`'s `Volume`.
+| `pathOnHost` | The directory path to be mounted the host. | A `path` entry 
is made `Host Path`'s `Volume`.
+| `type` | The type of the `Host Path` volume and defaults to "", please read 
more [here](https://kubernetes.io/docs/concepts/storage/volumes#hostpath). | An 
entry is made `Host Path`'s `Volume`.
+| `pathOnNFS` | The directory path to be mounted the NFS server. | A `path` 
entry is made `NFS`'s `Volume`.
+| `server` | The hostname or IP address of the NFS server. | An entry is made 
`NFS`'s `Volume`.
+
+<br>
+
+---
+
+<br>
+
 ## Setting Limits and Requests via the Command Line Interface
 
 > This section demonstrates how you can configure a topology's `Executor` 
 > and/or `Manager` (hereinafter referred to as `Heron containers`) resource 
 > `Requests` and `Limits` through CLI commands.

Reply via email to