This is an automated email from the ASF dual-hosted git repository.

nicknezis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a1b981  [Heron-3723] Add support for Persistent Volumes for stateful 
storage (#3725)
5a1b981 is described below

commit 5a1b9814133d3767aeee7e796650cbab031365d5
Author: Saad Ur Rahman <[email protected]>
AuthorDate: Tue Nov 30 00:20:34 2021 -0500

    [Heron-3723] Add support for Persistent Volumes for stateful storage (#3725)
    
    Co-authored-by: Nicholas Nezis <[email protected]>
    Co-authored-by: Josh Fischer <[email protected]>
    Co-authored-by: zhangshaoning 
<[email protected]>
    Co-authored-by: Huijun Wu <[email protected]>
    Co-authored-by: Huijun Wu <[email protected]>
---
 deploy/kubernetes/general/apiserver.yaml           |   1 +
 deploy/kubernetes/helm/templates/tools.yaml        |  11 +
 deploy/kubernetes/helm/values.yaml.template        |   3 +
 deploy/kubernetes/minikube/apiserver.yaml          |   1 +
 .../scheduler/kubernetes/KubernetesConstants.java  |  15 +-
 .../scheduler/kubernetes/KubernetesContext.java    |  85 ++++++
 .../scheduler/kubernetes/KubernetesUtils.java      |  23 ++
 .../heron/scheduler/kubernetes/V1Controller.java   | 239 ++++++++++++++++-
 .../kubernetes/KubernetesContextTest.java          | 137 +++++++++-
 .../scheduler/kubernetes/V1ControllerTest.java     | 289 +++++++++++++++++++++
 .../schedulers-k8s-persistent-volume-claims.md     | 257 ++++++++++++++++++
 website2/website/sidebars.json                     |   1 +
 12 files changed, 1058 insertions(+), 4 deletions(-)

diff --git a/deploy/kubernetes/general/apiserver.yaml 
b/deploy/kubernetes/general/apiserver.yaml
index 33c9533..094715a 100644
--- a/deploy/kubernetes/general/apiserver.yaml
+++ b/deploy/kubernetes/general/apiserver.yaml
@@ -92,6 +92,7 @@ spec:
               -D 
heron.statefulstorage.classname=org.apache.heron.statefulstorage.dlog.DlogStorage
               -D 
heron.statefulstorage.dlog.namespace.uri=distributedlog://zookeeper:2181/heron
               -D heron.kubernetes.pod.template.configmap.disabled=false
+              -D heron.kubernetes.persistent.volume.claims.cli.disabled=false
 
 ---
 apiVersion: v1
diff --git a/deploy/kubernetes/helm/templates/tools.yaml 
b/deploy/kubernetes/helm/templates/tools.yaml
index 08b0707..c776e49 100644
--- a/deploy/kubernetes/helm/templates/tools.yaml
+++ b/deploy/kubernetes/helm/templates/tools.yaml
@@ -159,6 +159,7 @@ spec:
               {{- end }}
               -D heron.kubernetes.resource.request.mode={{ 
.Values.topologyResourceRequestMode }}
               -D heron.kubernetes.pod.template.configmap.disabled={{ 
.Values.disablePodTemplates }}
+              -D heron.kubernetes.persistent.volume.claims.cli.disabled={{ 
.Values.disablePersistentVolumeMountsCLI }}
           envFrom:
             - configMapRef:
                 name: {{ .Release.Name }}-tools-config
@@ -321,3 +322,13 @@ rules:
   verbs:
   - get
   - list
+- apiGroups:
+  - ""
+  resources:
+  - persistentvolumeclaims
+  verbs:
+  - create
+  - delete
+  - get
+  - list
+  - deletecollection
diff --git a/deploy/kubernetes/helm/values.yaml.template 
b/deploy/kubernetes/helm/values.yaml.template
index dfbdd1f..870db57 100644
--- a/deploy/kubernetes/helm/values.yaml.template
+++ b/deploy/kubernetes/helm/values.yaml.template
@@ -77,6 +77,9 @@ packing: RoundRobin # ResourceCompliantRR, FirstFitDecreasing
 # Support for ConfigMap mounted PodTemplates
 disablePodTemplates: false
 
+# Support for Dynamic Persistent Volume Mounts from CLI input
+disablePersistentVolumeMountsCLI: false
+
 # Number of replicas for storage bookies, memory and storage requirements
 bookieReplicas: 3
 bookieCpuMin: 100m
diff --git a/deploy/kubernetes/minikube/apiserver.yaml 
b/deploy/kubernetes/minikube/apiserver.yaml
index 8c08cc9..53d879a 100644
--- a/deploy/kubernetes/minikube/apiserver.yaml
+++ b/deploy/kubernetes/minikube/apiserver.yaml
@@ -83,6 +83,7 @@ spec:
               -D 
heron.statefulstorage.classname=org.apache.heron.statefulstorage.dlog.DlogStorage
               -D 
heron.statefulstorage.dlog.namespace.uri=distributedlog://zookeeper:2181/heronbkdl
               -D heron.kubernetes.pod.template.configmap.disabled=false
+              -D heron.kubernetes.persistent.volume.claims.cli.disabled=false
 
 ---
 apiVersion: v1
diff --git 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
index 5e7b19a..023d8bc 100644
--- 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
+++ 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
@@ -52,6 +52,7 @@ public final class KubernetesConstants {
   public static final String LABEL_APP = "app";
   public static final String LABEL_APP_VALUE = "heron";
   public static final String LABEL_TOPOLOGY = "topology";
+  public static final String LABEL_ON_DEMAND = "onDemand";
 
   // prometheus annotation keys
   public static final String ANNOTATION_PROMETHEUS_SCRAPE = 
"prometheus.io/scrape";
@@ -88,11 +89,13 @@ public final class KubernetesConstants {
   public static final String JOB_LINK =
       
"/api/v1/namespaces/kube-system/services/kubernetes-dashboard/proxy/#/pod";
 
-
   public static final Pattern VALID_POD_NAME_REGEX =
       
Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*",
           Pattern.CASE_INSENSITIVE);
 
+  public static final Pattern VALID_LOWERCASE_RFC_1123_REGEX =
+      
Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*");
+
   public static final List<String> VALID_IMAGE_PULL_POLICIES = 
Collections.unmodifiableList(
       Arrays.asList(
           "IfNotPresent",
@@ -107,4 +110,14 @@ public final class KubernetesConstants {
           "node.kubernetes.io/unreachable"
       )
   );
+
+  enum VolumeClaimTemplateConfigKeys {
+    claimName,
+    storageClassName,
+    sizeLimit,
+    accessModes,
+    volumeMode,
+    path,               // Added to container.
+    subPath,            // Added to container.
+  }
 }
diff --git 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
index 6d29b72..40b7618 100644
--- 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
+++ 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
@@ -23,7 +23,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Matcher;
 
+import org.apache.heron.scheduler.TopologySubmissionException;
 import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.common.Context;
 
@@ -109,6 +113,13 @@ public final class KubernetesContext extends Context {
   public static final String KUBERNETES_POD_SECRET_KEY_REF_PREFIX =
       "heron.kubernetes.pod.secretKeyRef.";
 
+  // Persistent Volume Claims
+  public static final String KUBERNETES_PERSISTENT_VOLUME_CLAIMS_CLI_DISABLED =
+      "heron.kubernetes.persistent.volume.claims.cli.disabled";
+  // 
heron.kubernetes.volumes.persistentVolumeClaim.VOLUME_NAME.OPTION=OPTION_VALUE
+  public static final String KUBERNETES_VOLUME_CLAIM_PREFIX =
+      "heron.kubernetes.volumes.persistentVolumeClaim.";
+
   private KubernetesContext() {
   }
 
@@ -211,6 +222,80 @@ public final class KubernetesContext extends Context {
     return getConfigItemsByPrefix(config, 
KUBERNETES_POD_SECRET_KEY_REF_PREFIX);
   }
 
+  public static boolean getPersistentVolumeClaimDisabled(Config config) {
+    final String disabled = 
config.getStringValue(KUBERNETES_PERSISTENT_VOLUME_CLAIMS_CLI_DISABLED);
+    return "true".equalsIgnoreCase(disabled);
+  }
+
+  /**
+   * Collects parameters form the <code>CLI</code> and generates a mapping 
between <code>Volumes</code>
+   * and their configuration <code>key-value</code> pairs.
+   * @param config Contains the configuration options collected from the 
<code>CLI</code>.
+   * @return A mapping between <code>Volumes</code> and their configuration 
<code>key-value</code> pairs.
+   * Will return an empty list if there are no Volume Claim Templates to be 
generated.
+   */
+  public static Map<String, 
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>>
+      getVolumeClaimTemplates(Config config) {
+    final Logger LOG = Logger.getLogger(V1Controller.class.getName());
+
+    final Set<String> completeConfigParam = getConfigKeys(config, 
KUBERNETES_VOLUME_CLAIM_PREFIX);
+    final int prefixLength = KUBERNETES_VOLUME_CLAIM_PREFIX.length();
+    final int volumeNameIdx = 0;
+    final int optionIdx = 1;
+    final Matcher matcher = 
KubernetesConstants.VALID_LOWERCASE_RFC_1123_REGEX.matcher("");
+
+    final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, 
String>> volumes
+        = new HashMap<>();
+
+    try {
+      for (String param : completeConfigParam) {
+        final String[] tokens = param.substring(prefixLength).split("\\.");
+        final String volumeName = tokens[volumeNameIdx];
+        final KubernetesConstants.VolumeClaimTemplateConfigKeys key =
+            
KubernetesConstants.VolumeClaimTemplateConfigKeys.valueOf(tokens[optionIdx]);
+        final String value = config.getStringValue(param);
+
+        Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String> volume =
+            volumes.get(volumeName);
+        if (volume == null) {
+          // Validate new Volume Names.
+          if (!matcher.reset(volumeName).matches()) {
+            throw new TopologySubmissionException(
+                String.format("Volume name `%s` does not match lowercase 
RFC-1123 pattern",
+                    volumeName));
+          }
+          volume = new HashMap<>();
+          volumes.put(volumeName, volume);
+        }
+
+        /* Validate Claim and Storage Class names.
+          [1] `claimNameNotOnDemand`: checks for a `claimName` which is not 
`OnDemand`.
+          [2] `storageClassName`: Check if it is the provided `option`.
+          Conditions [1] OR [2] are True, then...
+          [3] Check for a valid lowercase RFC-1123 pattern.
+         */
+        boolean claimNameNotOnDemand =
+            
KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName.equals(key)
+                && 
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(value);
+        if ((claimNameNotOnDemand // [1]
+            ||
+            
KubernetesConstants.VolumeClaimTemplateConfigKeys.storageClassName.equals(key)) 
// [2]
+            && !matcher.reset(value).matches()) { // [3]
+          throw new TopologySubmissionException(
+              String.format("Option `%s` value `%s` does not match lowercase 
RFC-1123 pattern",
+                  key, value));
+        }
+
+        volume.put(key, value);
+      }
+    } catch (IndexOutOfBoundsException | IllegalArgumentException e) {
+      final String message = "Invalid Persistent Volume Claim CLI parameter 
provided";
+      LOG.log(Level.CONFIG, message);
+      throw new TopologySubmissionException(message);
+    }
+    return volumes;
+  }
+
   static Set<String> getConfigKeys(Config config, String keyPrefix) {
     Set<String> annotations = new HashSet<>();
     for (String s : config.getKeySet()) {
diff --git 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
index a75e00c..2363d7a 100644
--- 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
+++ 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
@@ -122,4 +122,27 @@ final class KubernetesUtils {
       }
     }
   }
+
+  /**
+   * Generic testing class for test runners in Kubernetes Scheduler.
+   * @param <T1> Test input object type.
+   * @param <T2> Expected test object type.
+   */
+  static class TestTuple<T1, T2> {
+    public final String description;
+    public final T1 input;
+    public final T2 expected;
+
+    /**
+     * Configure the test object.
+     * @param description Description of the test to be run.
+     * @param input Input test case.
+     * @param expected Expected output form test.
+     */
+    TestTuple(String description, T1 input, T2 expected) {
+      this.description = description;
+      this.expected = expected;
+      this.input = input;
+    }
+  }
 }
diff --git 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index 305b5d2..e6fc819 100644
--- 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++ 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -63,6 +63,8 @@ import io.kubernetes.client.openapi.models.V1EnvVarSource;
 import io.kubernetes.client.openapi.models.V1LabelSelector;
 import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
 import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimBuilder;
 import io.kubernetes.client.openapi.models.V1PodSpec;
 import io.kubernetes.client.openapi.models.V1PodTemplate;
 import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
@@ -73,9 +75,12 @@ import io.kubernetes.client.openapi.models.V1Service;
 import io.kubernetes.client.openapi.models.V1ServiceSpec;
 import io.kubernetes.client.openapi.models.V1StatefulSet;
 import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
+import io.kubernetes.client.openapi.models.V1Status;
 import io.kubernetes.client.openapi.models.V1Toleration;
 import io.kubernetes.client.openapi.models.V1Volume;
+import io.kubernetes.client.openapi.models.V1VolumeBuilder;
 import io.kubernetes.client.openapi.models.V1VolumeMount;
+import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
 import io.kubernetes.client.util.PatchUtils;
 import io.kubernetes.client.util.Yaml;
 import okhttp3.Response;
@@ -94,6 +99,9 @@ public class V1Controller extends KubernetesController {
   private final AppsV1Api appsClient;
   private final CoreV1Api coreClient;
 
+  private Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, 
String>>
+      persistentVolumeClaimConfigs = null;
+
   V1Controller(Config configuration, Config runtimeConfiguration) {
     super(configuration, runtimeConfiguration);
 
@@ -130,6 +138,18 @@ public class V1Controller extends KubernetesController {
       throw new TopologySubmissionException(e.getMessage());
     }
 
+    // Get and then create Persistent Volume Claims from the CLI.
+    persistentVolumeClaimConfigs =
+        KubernetesContext.getVolumeClaimTemplates(getConfiguration());
+    if (KubernetesContext.getPersistentVolumeClaimDisabled(getConfiguration())
+        && !persistentVolumeClaimConfigs.isEmpty()) {
+      final String message =
+          String.format("Configuring Persistent Volume Claim from CLI is 
disabled: '%s'",
+              topologyName);
+      LOG.log(Level.WARNING, message);
+      throw new TopologySubmissionException(message);
+    }
+
     // find the max number of instances in a container so we can open
     // enough ports if remote debugging is enabled.
     int numberOfInstances = 0;
@@ -142,8 +162,9 @@ public class V1Controller extends KubernetesController {
       appsClient.createNamespacedStatefulSet(getNamespace(), statefulSet, null,
               null, null);
     } catch (ApiException e) {
-      KubernetesUtils.logExceptionWithDetails(LOG, "Error creating topology", 
e);
-      throw new TopologySubmissionException(e.getMessage());
+      final String message = String.format("Error creating topology: %s%n", 
e.getResponseBody());
+      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+      throw new TopologySubmissionException(message);
     }
 
     return true;
@@ -151,6 +172,7 @@ public class V1Controller extends KubernetesController {
 
   @Override
   boolean killTopology() {
+    removePersistentVolumeClaims();
     deleteStatefulSet();
     deleteService();
     return true;
@@ -424,6 +446,9 @@ public class V1Controller extends KubernetesController {
 
     statefulSet.setSpec(statefulSetSpec);
 
+    statefulSetSpec.setVolumeClaimTemplates(
+        createPersistentVolumeClaims(persistentVolumeClaimConfigs));
+
     return statefulSet;
   }
 
@@ -505,6 +530,10 @@ public class V1Controller extends KubernetesController {
       containers.add(executorContainer);
     }
 
+    if (!persistentVolumeClaimConfigs.isEmpty()) {
+      configurePodWithPersistentVolumeClaimVolumesAndMounts(podSpec, 
executorContainer);
+    }
+
     configureExecutorContainer(executorCommand, resource, numberOfInstances, 
executorContainer);
 
     podSpec.setContainers(containers);
@@ -852,6 +881,212 @@ public class V1Controller extends KubernetesController {
   }
 
   /**
+   * Generates <code>Persistent Volume Claims Templates</code> from a mapping 
of <code>Volumes</code>
+   * to <code>key-value</code> pairs of configuration options and values.
+   * @param mapOfOpts <code>Volume</code> to configuration 
<code>key-value</code> mappings.
+   * @return Fully populated list of only dynamically backed <code>Persistent 
Volume Claims</code>.
+   */
+  @VisibleForTesting
+  protected List<V1PersistentVolumeClaim> createPersistentVolumeClaims(
+      final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, 
String>> mapOfOpts) {
+
+    List<V1PersistentVolumeClaim> listOfPVCs = new LinkedList<>();
+
+    // Iterate over all the PVC Volumes.
+    for (Map.Entry<String, 
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>> pvc
+        : mapOfOpts.entrySet()) {
+
+      // Only create claims for `OnDemand` volumes.
+      final String claimName = pvc.getValue()
+          .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName);
+      if (claimName != null && 
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
+        continue;
+      }
+
+      V1PersistentVolumeClaim claim = new V1PersistentVolumeClaimBuilder()
+          .withNewMetadata()
+            .withName(pvc.getKey())
+            .withLabels(getPersistentVolumeClaimLabels(getTopologyName()))
+          .endMetadata()
+          .withNewSpec()
+          .endSpec()
+          .build();
+
+      // Populate PVC options.
+      for (Map.Entry<KubernetesConstants.VolumeClaimTemplateConfigKeys, 
String> option
+          : pvc.getValue().entrySet()) {
+        String optionValue = option.getValue();
+        switch(option.getKey()) {
+          case storageClassName:
+            claim.getSpec().setStorageClassName(optionValue);
+            break;
+          case sizeLimit:
+            claim.getSpec().setResources(
+                    new V1ResourceRequirements()
+                        .putRequestsItem("storage", new 
Quantity(optionValue)));
+            break;
+          case accessModes:
+            
claim.getSpec().setAccessModes(Arrays.asList(optionValue.split(",")));
+            break;
+          case volumeMode:
+            claim.getSpec().setVolumeMode(optionValue);
+            break;
+          // Valid ignored options not used in a PVC.
+          case path: case subPath: case claimName:
+            break;
+          default:
+            throw new TopologySubmissionException(
+                String.format("Invalid Persistent Volume Claim type option for 
'%s'",
+                    option.getKey()));
+        }
+      }
+      listOfPVCs.add(claim);
+    }
+    return listOfPVCs;
+  }
+
+  /**
+   * Generates the <code>Volume</code> and <code>Volume Mounts</code> to be 
placed in the <code>executor container</code>.
+   * @param mapConfig Mapping of <code>Volumes</code> to 
<code>key-value</code> configuration pairs.
+   * @return A pair of configured lists of <code>V1Volume</code> and 
<code>V1VolumeMount</code>.
+   */
+  @VisibleForTesting
+  protected Pair<List<V1Volume>, List<V1VolumeMount>> 
createPersistentVolumeClaimVolumesAndMounts(
+      final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, 
String>> mapConfig) {
+    List<V1Volume> volumeList = new LinkedList<>();
+    List<V1VolumeMount> mountList = new LinkedList<>();
+    for (Map.Entry<String, 
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>> configs
+        : mapConfig.entrySet()) {
+      final String volumeName = configs.getKey();
+      final String path = configs.getValue()
+          .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.path);
+      final String subPath = configs.getValue()
+          .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.subPath);
+
+      if (path == null || path.isEmpty()) {
+        throw new TopologySubmissionException(
+            String.format("A mount path is required and missing from '%s'", 
volumeName));
+      }
+
+      // Do not create Volumes for `OnDemand`.
+      final String claimName = configs.getValue()
+          .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName);
+      if (claimName != null && 
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
+        final V1Volume volume = new V1VolumeBuilder()
+            .withName(volumeName)
+            .withNewPersistentVolumeClaim()
+              .withClaimName(claimName)
+            .endPersistentVolumeClaim()
+            .build();
+        volumeList.add(volume);
+      }
+
+      final V1VolumeMountBuilder volumeMount = new V1VolumeMountBuilder()
+          .withName(volumeName)
+          .withMountPath(path);
+      if (subPath != null && !subPath.isEmpty()) {
+        volumeMount.withSubPath(subPath);
+      }
+      mountList.add(volumeMount.build());
+    }
+    return new Pair<>(volumeList, mountList);
+  }
+
+  /**
+   * Makes a call to generate <code>Volumes</code> and <code>Volume 
Mounts</code> and then inserts them.
+   * @param podSpec All generated <code>V1Volume</code> will be placed in the 
<code>Pod Spec</code>.
+   * @param executor All generated <code>V1VolumeMount</code> will be placed 
in the <code>Container</code>.
+   */
+  @VisibleForTesting
+  protected void configurePodWithPersistentVolumeClaimVolumesAndMounts(final 
V1PodSpec podSpec,
+                                                                       final 
V1Container executor) {
+    Pair<List<V1Volume>, List<V1VolumeMount>> volumesAndMounts =
+        
createPersistentVolumeClaimVolumesAndMounts(persistentVolumeClaimConfigs);
+
+    // Deduplicate on Names with Persistent Volume Claims taking precedence.
+
+    KubernetesUtils.V1ControllerUtils<V1VolumeMount> utilsMounts =
+        new KubernetesUtils.V1ControllerUtils<>();
+    executor.setVolumeMounts(
+        utilsMounts.mergeListsDedupe(volumesAndMounts.second, 
executor.getVolumeMounts(),
+            Comparator.comparing(V1VolumeMount::getName),
+            "Executor and Persistent Volume Claim Volume Mounts"));
+
+    KubernetesUtils.V1ControllerUtils<V1Volume> utilsVolumes =
+        new KubernetesUtils.V1ControllerUtils<>();
+    podSpec.setVolumes(
+        utilsVolumes.mergeListsDedupe(volumesAndMounts.first, 
podSpec.getVolumes(),
+            Comparator.comparing(V1Volume::getName),
+            "Pod and Persistent Volume Claim Volumes"));
+  }
+
+  /**
+   * Removes all Persistent Volume Claims associated with a specific topology, 
if they exist.
+   * It looks for the following:
+   * metadata:
+   *   labels:
+   *     topology: <code>topology-name</code>
+   *     onDemand: <code>true</code>
+   */
+  private void removePersistentVolumeClaims() {
+    final String topologyName = getTopologyName();
+    final StringBuilder selectorLabel = new StringBuilder();
+
+    // Generate selector label.
+    for (Map.Entry<String, String> label
+        : getPersistentVolumeClaimLabels(topologyName).entrySet()) {
+      if (selectorLabel.length() != 0) {
+        selectorLabel.append(",");
+      }
+      
selectorLabel.append(label.getKey()).append("=").append(label.getValue());
+    }
+
+    // Remove all dynamically backed Persistent Volume Claims.
+    try {
+      V1Status status = 
coreClient.deleteCollectionNamespacedPersistentVolumeClaim(
+          getNamespace(),
+          null,
+          null,
+          null,
+          null,
+          null,
+          selectorLabel.toString(),
+          null,
+          null,
+          null,
+          null,
+          null,
+          null,
+          null);
+
+      LOG.log(Level.INFO,
+          String.format("Removing automatically generated Persistent Volume 
Claims for `%s`:%n%s",
+          topologyName, status.getMessage()));
+    } catch (ApiException e) {
+      final String message = String.format("Failed to connect to K8s cluster 
to delete Persistent "
+              + "Volume Claims for topology `%s`. A manual clean-up is 
required.%n%s",
+          topologyName, e.getMessage());
+      LOG.log(Level.WARNING, message);
+      throw new TopologyRuntimeManagementException(message);
+    }
+  }
+
+  /**
+   * Generates the <code>Label</code> which are attached to a Topologies 
Persistent Volume Claims.
+   * @param topologyName Attached to the topology match label.
+   * @return A map consisting of the <code>label-value</code> pairs to be used 
in <code>Label</code>s.
+   */
+  @VisibleForTesting
+  protected static Map<String, String> getPersistentVolumeClaimLabels(String 
topologyName) {
+    return new HashMap<String, String>() {
+      {
+        put(KubernetesConstants.LABEL_TOPOLOGY, topologyName);
+        put(KubernetesConstants.LABEL_ON_DEMAND, "true");
+      }
+    };
+  }
+
+  /**
    * Generates the <code>Selector</code> match labels with which resources in 
this topology can be found.
    * @return A label of the form <code>app=heron,topology=topology-name</code>.
    */
diff --git 
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
 
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
index 9a762d6..95de4a8 100644
--- 
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
+++ 
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
@@ -19,14 +19,24 @@
 
 package org.apache.heron.scheduler.kubernetes;
 
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.heron.scheduler.TopologySubmissionException;
+import org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple;
 import org.apache.heron.spi.common.Config;
 
+import static 
org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeClaimTemplateConfigKeys;
+
 public class KubernetesContextTest {
 
-  public static final String KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME =
+  private static final String TOPOLOGY_NAME = "Topology-Name";
+  private static final String KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME =
       "heron.kubernetes.pod.template.configmap.name";
   private static final String POD_TEMPLATE_CONFIGMAP_NAME = 
"pod-template-configmap-name";
   private final Config config = Config.newBuilder().build();
@@ -59,4 +69,129 @@ public class KubernetesContextTest {
     Assert.assertTrue(KubernetesContext
         .getPodTemplateConfigMapDisabled(configWithPodTemplateConfigMapOff));
   }
+
+  @Test
+  public void testPersistentVolumeClaimDisabled() {
+    
Assert.assertFalse(KubernetesContext.getPersistentVolumeClaimDisabled(config));
+    Assert.assertFalse(KubernetesContext
+        .getPersistentVolumeClaimDisabled(configWithPodTemplateConfigMap));
+
+    final Config configWithPodTemplateConfigMapOff = Config.newBuilder()
+        .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME,
+            POD_TEMPLATE_CONFIGMAP_NAME)
+        
.put(KubernetesContext.KUBERNETES_PERSISTENT_VOLUME_CLAIMS_CLI_DISABLED, "TRUE")
+        .build();
+    Assert.assertTrue(KubernetesContext
+        .getPersistentVolumeClaimDisabled(configWithPodTemplateConfigMapOff));
+  }
+
+  @Test
+  public void testGetVolumeClaimTemplates() {
+    final String volumeNameOne = "volume-name-one";
+    final String volumeNameTwo = "volume-name-two";
+    final String claimName = "OnDeMaNd";
+    final String keyPattern = KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX 
+ "%s.%s";
+
+    final String storageClassField = 
VolumeClaimTemplateConfigKeys.storageClassName.name();
+    final String pathField = VolumeClaimTemplateConfigKeys.path.name();
+    final String claimNameField = 
VolumeClaimTemplateConfigKeys.claimName.name();
+    final String expectedStorageClass = "expected-storage-class";
+    final String storageClassKeyOne = String.format(keyPattern, volumeNameOne, 
storageClassField);
+    final String storageClassKeyTwo = String.format(keyPattern, volumeNameTwo, 
storageClassField);
+    final String expectedPath = "/path/for/volume/expected";
+    final String pathKeyOne = String.format(keyPattern, volumeNameOne, 
pathField);
+    final String pathKeyTwo = String.format(keyPattern, volumeNameTwo, 
pathField);
+    final String claimNameKeyOne = String.format(keyPattern, volumeNameOne, 
claimNameField);
+    final String claimNameKeyTwo = String.format(keyPattern, volumeNameTwo, 
claimNameField);
+
+    final Config configPVC = Config.newBuilder()
+        .put(pathKeyOne, expectedPath)
+        .put(pathKeyTwo, expectedPath)
+        .put(claimNameKeyOne, claimName)
+        .put(claimNameKeyTwo, claimName)
+        .put(storageClassKeyOne, expectedStorageClass)
+        .put(storageClassKeyTwo, expectedStorageClass)
+        .build();
+
+    final List<String> expectedKeys = Arrays.asList(volumeNameOne, 
volumeNameTwo);
+    final List<VolumeClaimTemplateConfigKeys> expectedOptionsKeys =
+        Arrays.asList(VolumeClaimTemplateConfigKeys.path,
+            VolumeClaimTemplateConfigKeys.storageClassName,
+            VolumeClaimTemplateConfigKeys.claimName);
+    final List<String> expectedOptionsValues =
+        Arrays.asList(expectedPath, expectedStorageClass, claimName);
+
+    // List of provided PVC options.
+    final Map<String, Map<VolumeClaimTemplateConfigKeys, String>> mapOfPVC =
+        KubernetesContext.getVolumeClaimTemplates(configPVC);
+
+    Assert.assertTrue("Contains all provided Volumes",
+        mapOfPVC.keySet().containsAll(expectedKeys));
+    for (Map<VolumeClaimTemplateConfigKeys, String> items : mapOfPVC.values()) 
{
+      Assert.assertTrue("Contains all provided option keys",
+          items.keySet().containsAll(expectedOptionsKeys));
+      Assert.assertTrue("Contains all provided option values",
+          items.values().containsAll(expectedOptionsValues));
+    }
+
+    // Empty PVC.
+    final Map<String, Map<VolumeClaimTemplateConfigKeys, String>> emptyPVC =
+        KubernetesContext.getVolumeClaimTemplates(Config.newBuilder().build());
+    Assert.assertTrue("Empty PVC is returned when no options provided", 
emptyPVC.isEmpty());
+  }
+
+  @Test
+  public void testGetPersistentVolumeClaimsErrors() {
+    final String volumeNameValid = "volume-name-valid";
+    final String volumeNameInvalid = "volume-Name-Invalid";
+    final String failureValue = "Should-Fail";
+    final String generalFailureMessage = "Invalid Persistent Volume";
+    final String keyPattern = KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX
+        + "%s.%s";
+    final List<TestTuple<Config, String>> testCases = new LinkedList<>();
+
+    // Invalid option key test.
+    final Config configInvalidOption = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "NonExistentKey"), 
failureValue)
+        .build();
+    testCases.add(new TestTuple<>("Invalid option key should trigger 
exception",
+        configInvalidOption, generalFailureMessage));
+
+    // Just the prefix.
+    final Config configJustPrefix = Config.newBuilder()
+        .put(KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX, failureValue)
+        .build();
+    testCases.add(new TestTuple<>("Only a key prefix should trigger exception",
+        configJustPrefix, generalFailureMessage));
+
+    // Invalid Volume Name.
+    final Config configInvalidVolumeName = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameInvalid, "path"), 
failureValue)
+        .build();
+    testCases.add(new TestTuple<>("Invalid Volume Name should trigger 
exception",
+        configInvalidVolumeName, "lowercase RFC-1123"));
+
+    // Invalid Claim Name.
+    final Config configInvalidClaimName = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "claimName"), 
failureValue)
+        .build();
+    testCases.add(new TestTuple<>("Invalid Claim Name should trigger 
exception",
+        configInvalidClaimName, "Option `claimName`"));
+
+    // Invalid Storage Class Name.
+    final Config configInvalidStorageClassName = Config.newBuilder()
+        .put(String.format(keyPattern, volumeNameValid, "storageClassName"), 
failureValue)
+        .build();
+    testCases.add(new TestTuple<>("Invalid Storage Class Name should trigger 
exception",
+        configInvalidStorageClassName, "Option `storageClassName`"));
+
+    // Testing loop.
+    for (TestTuple<Config, String> testCase : testCases) {
+      try {
+        KubernetesContext.getVolumeClaimTemplates(testCase.input);
+      } catch (TopologySubmissionException e) {
+        Assert.assertTrue(testCase.description, 
e.getMessage().contains(testCase.expected));
+      }
+    }
+  }
 }
diff --git 
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
 
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
index c777383..b543ea9 100644
--- 
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
+++ 
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
@@ -21,8 +21,12 @@ package org.apache.heron.scheduler.kubernetes;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -36,6 +40,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.heron.common.basics.ByteAmount;
 import org.apache.heron.common.basics.Pair;
 import org.apache.heron.scheduler.TopologySubmissionException;
+import org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple;
 import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.common.Key;
 import org.apache.heron.spi.packing.Resource;
@@ -49,6 +54,8 @@ import io.kubernetes.client.openapi.models.V1ContainerPort;
 import io.kubernetes.client.openapi.models.V1EnvVar;
 import io.kubernetes.client.openapi.models.V1EnvVarSource;
 import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimBuilder;
 import io.kubernetes.client.openapi.models.V1PodSpec;
 import io.kubernetes.client.openapi.models.V1PodSpecBuilder;
 import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
@@ -59,6 +66,8 @@ import io.kubernetes.client.openapi.models.V1VolumeBuilder;
 import io.kubernetes.client.openapi.models.V1VolumeMount;
 import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
 
+import static 
org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeClaimTemplateConfigKeys;
+import static org.mockito.Matchers.anyMap;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 
@@ -737,4 +746,284 @@ public class V1ControllerTest {
         CollectionUtils.containsAll(podSpecWithTolerations.getTolerations(),
             expectedTolerationsOverriding));
   }
+
+  @Test
+  public void testCreatePersistentVolumeClaims() {
+    final String topologyName = "topology-name";
+    final String volumeNameOne = "volume-name-one";
+    final String volumeNameTwo = "volume-name-two";
+    final String volumeNameStatic = "volume-name-static";
+    final String claimNameOne = "OnDemand";
+    final String claimNameTwo = "claim-name-two";
+    final String claimNameStatic = "OnDEmaND";
+    final String storageClassName = "storage-class-name";
+    final String sizeLimit = "555Gi";
+    final String accessModesList = "ReadWriteOnce,ReadOnlyMany,ReadWriteMany";
+    final String accessModes = "ReadOnlyMany";
+    final String volumeMode = "VolumeMode";
+    final String path = "/path/to/mount/";
+    final String subPath = "/sub/path/to/mount/";
+    final Map<String, Map<VolumeClaimTemplateConfigKeys, String>> mapPVCOpts =
+        ImmutableMap.of(
+            volumeNameOne, new HashMap<VolumeClaimTemplateConfigKeys, 
String>() {
+              {
+                put(VolumeClaimTemplateConfigKeys.claimName, claimNameOne);
+                put(VolumeClaimTemplateConfigKeys.storageClassName, 
storageClassName);
+                put(VolumeClaimTemplateConfigKeys.sizeLimit, sizeLimit);
+                put(VolumeClaimTemplateConfigKeys.accessModes, 
accessModesList);
+                put(VolumeClaimTemplateConfigKeys.volumeMode, volumeMode);
+                put(VolumeClaimTemplateConfigKeys.path, path);
+              }
+            },
+            volumeNameTwo, new HashMap<VolumeClaimTemplateConfigKeys, 
String>() {
+              {
+                put(VolumeClaimTemplateConfigKeys.claimName, claimNameTwo);
+                put(VolumeClaimTemplateConfigKeys.storageClassName, 
storageClassName);
+                put(VolumeClaimTemplateConfigKeys.sizeLimit, sizeLimit);
+                put(VolumeClaimTemplateConfigKeys.accessModes, accessModes);
+                put(VolumeClaimTemplateConfigKeys.volumeMode, volumeMode);
+                put(VolumeClaimTemplateConfigKeys.path, path);
+                put(VolumeClaimTemplateConfigKeys.subPath, subPath);
+              }
+            },
+            volumeNameStatic, new HashMap<VolumeClaimTemplateConfigKeys, 
String>() {
+              {
+                put(VolumeClaimTemplateConfigKeys.claimName, claimNameStatic);
+                put(VolumeClaimTemplateConfigKeys.sizeLimit, sizeLimit);
+                put(VolumeClaimTemplateConfigKeys.accessModes, accessModes);
+                put(VolumeClaimTemplateConfigKeys.volumeMode, volumeMode);
+                put(VolumeClaimTemplateConfigKeys.path, path);
+                put(VolumeClaimTemplateConfigKeys.subPath, subPath);
+              }
+            }
+        );
+
+    final V1PersistentVolumeClaim claimOne = new 
V1PersistentVolumeClaimBuilder()
+        .withNewMetadata()
+          .withName(volumeNameOne)
+          
.withLabels(V1Controller.getPersistentVolumeClaimLabels(topologyName))
+        .endMetadata()
+        .withNewSpec()
+          .withStorageClassName(storageClassName)
+          .withAccessModes(Arrays.asList(accessModesList.split(",")))
+          .withVolumeMode(volumeMode)
+          .withNewResources()
+            .addToRequests("storage", new Quantity(sizeLimit))
+          .endResources()
+        .endSpec()
+        .build();
+
+    final V1PersistentVolumeClaim claimStatic = new 
V1PersistentVolumeClaimBuilder()
+        .withNewMetadata()
+          .withName(volumeNameStatic)
+          
.withLabels(V1Controller.getPersistentVolumeClaimLabels(topologyName))
+        .endMetadata()
+        .withNewSpec()
+          .withAccessModes(Collections.singletonList(accessModes))
+          .withVolumeMode(volumeMode)
+          .withNewResources()
+            .addToRequests("storage", new Quantity(sizeLimit))
+          .endResources()
+        .endSpec()
+        .build();
+
+    final List<V1PersistentVolumeClaim> expectedClaims =
+        new LinkedList<>(Arrays.asList(claimOne, claimStatic));
+
+    final List<V1PersistentVolumeClaim> actualClaims =
+        v1ControllerWithPodTemplate.createPersistentVolumeClaims(mapPVCOpts);
+
+    Assert.assertTrue(expectedClaims.containsAll(actualClaims));
+  }
+
+  @Test
+  public void testCreatePersistentVolumeClaimVolumesAndMounts() {
+    final String volumeNameOne = "VolumeNameONE";
+    final String volumeNameTwo = "VolumeNameTWO";
+    final String claimNameOne = "claim-name-one";
+    final String claimNameTwo = "OnDemand";
+    final String mountPathOne = "/mount/path/ONE";
+    final String mountPathTwo = "/mount/path/TWO";
+    final String mountSubPathTwo = "/mount/sub/path/TWO";
+    Map<String, Map<VolumeClaimTemplateConfigKeys, String>> mapOfOpts =
+        ImmutableMap.of(
+            volumeNameOne, ImmutableMap.of(
+                VolumeClaimTemplateConfigKeys.claimName, claimNameOne,
+                VolumeClaimTemplateConfigKeys.path, mountPathOne),
+            volumeNameTwo, ImmutableMap.of(
+                VolumeClaimTemplateConfigKeys.claimName, claimNameTwo,
+                VolumeClaimTemplateConfigKeys.path, mountPathTwo,
+                VolumeClaimTemplateConfigKeys.subPath, mountSubPathTwo)
+        );
+    final V1Volume volumeOne = new V1VolumeBuilder()
+        .withName(volumeNameOne)
+        .withNewPersistentVolumeClaim()
+          .withClaimName(claimNameOne)
+        .endPersistentVolumeClaim()
+        .build();
+    final V1Volume volumeTwo = new V1VolumeBuilder()
+        .withName(volumeNameTwo)
+        .withNewPersistentVolumeClaim()
+          .withClaimName(claimNameTwo)
+        .endPersistentVolumeClaim()
+        .build();
+    final V1VolumeMount volumeMountOne = new V1VolumeMountBuilder()
+        .withName(volumeNameOne)
+        .withMountPath(mountPathOne)
+        .build();
+    final V1VolumeMount volumeMountTwo = new V1VolumeMountBuilder()
+        .withName(volumeNameTwo)
+        .withMountPath(mountPathTwo)
+        .withSubPath(mountSubPathTwo)
+        .build();
+
+    // Test case container.
+    final List<TestTuple<Pair<List<V1Volume>, List<V1VolumeMount>>,
+        Pair<List<V1Volume>, List<V1VolumeMount>>>> testCases = new 
LinkedList<>();
+
+    // Default case: No PVC provided.
+    final Pair<List<V1Volume>, List<V1VolumeMount>> actualEmpty =
+        
v1ControllerPodTemplate.createPersistentVolumeClaimVolumesAndMounts(new 
HashMap<>());
+    testCases.add(new TestTuple<>("Generated an empty list of Volumes", 
actualEmpty,
+        new Pair<>(new LinkedList<>(), new LinkedList<>())));
+
+    // PVC Provided.
+    final Pair<List<V1Volume>, List<V1VolumeMount>> expectedFull =
+        new Pair<>(
+            new LinkedList<>(Arrays.asList(volumeOne, volumeTwo)),
+            new LinkedList<>(Arrays.asList(volumeMountOne, volumeMountTwo)));
+    final Pair<List<V1Volume>, List<V1VolumeMount>> actualFull =
+        
v1ControllerPodTemplate.createPersistentVolumeClaimVolumesAndMounts(mapOfOpts);
+    testCases.add(new TestTuple<>("Generated a list of Volumes", actualFull,
+        new Pair<>(expectedFull.first, expectedFull.second)));
+
+    // Testing loop.
+    for (TestTuple<Pair<List<V1Volume>, List<V1VolumeMount>>,
+             Pair<List<V1Volume>, List<V1VolumeMount>>> testCase : testCases) {
+      Assert.assertTrue(testCase.description,
+          (testCase.expected.first).containsAll(testCase.input.first));
+      Assert.assertTrue(testCase.description + " Mounts",
+          (testCase.expected.second).containsAll(testCase.input.second));
+    }
+  }
+
+  @Test
+  public void testConfigurePodWithPersistentVolumeClaims() {
+    final String volumeNameClashing = "clashing-volume";
+    final String volumeMountNameClashing = "original-volume-mount";
+    V1Volume baseVolume = new V1VolumeBuilder()
+        .withName(volumeNameClashing)
+        .withNewPersistentVolumeClaim()
+        .withClaimName("Original Base Claim Name")
+        .endPersistentVolumeClaim()
+        .build();
+    V1VolumeMount baseVolumeMount = new V1VolumeMountBuilder()
+        .withName(volumeMountNameClashing)
+        .withMountPath("/original/mount/path")
+        .build();
+    V1Volume clashingVolume = new V1VolumeBuilder()
+        .withName(volumeNameClashing)
+        .withNewPersistentVolumeClaim()
+        .withClaimName("Clashing Claim Replaced")
+        .endPersistentVolumeClaim()
+        .build();
+    V1VolumeMount clashingVolumeMount = new V1VolumeMountBuilder()
+        .withName(volumeMountNameClashing)
+        .withMountPath("/clashing/mount/path")
+        .build();
+    V1Volume secondaryVolume = new V1VolumeBuilder()
+        .withName("secondary-volume")
+        .withNewPersistentVolumeClaim()
+        .withClaimName("Original Secondary Claim Name")
+        .endPersistentVolumeClaim()
+        .build();
+    V1VolumeMount secondaryVolumeMount = new V1VolumeMountBuilder()
+        .withName("secondary-volume-mount")
+        .withMountPath("/secondary/mount/path")
+        .build();
+
+    // Test case container.
+    // Input: Pod Spec to modify, Executor to modify, Volumes and Mounts to 
return from
+    // <createPersistentVolumeClaimVolumesAndMounts>.
+    // Output: The expected <V1PodSpec> and <V1Container>.
+    final List<TestTuple<Object[], Pair<V1PodSpec, V1Container>>> testCases = 
new LinkedList<>();
+
+    // No Persistent Volume Claim.
+    final V1PodSpec podSpecEmptyCase = new 
V1PodSpecBuilder().withVolumes(baseVolume).build();
+    final V1Container executorEmptyCase =
+        new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build();
+    final V1PodSpec expectedEmptyPodSpec = new 
V1PodSpecBuilder().withVolumes(baseVolume).build();
+    final V1Container expectedEmptyExecutor =
+        new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build();
+    Pair<List<V1Volume>, List<V1VolumeMount>> emptyVolumeAndMount =
+        new Pair<>(new LinkedList<>(), new LinkedList<>());
+
+    testCases.add(new TestTuple<>("Empty",
+        new Object[]{podSpecEmptyCase, executorEmptyCase, emptyVolumeAndMount},
+        new Pair<>(expectedEmptyPodSpec, expectedEmptyExecutor)));
+
+    // Non-clashing Persistent Volume Claim.
+    final V1PodSpec podSpecNoClashCase = new V1PodSpecBuilder()
+        .withVolumes(baseVolume)
+        .build();
+    final V1Container executorNoClashCase = new V1ContainerBuilder()
+        .withVolumeMounts(baseVolumeMount)
+        .build();
+    final V1PodSpec expectedNoClashPodSpec = new V1PodSpecBuilder()
+        .addToVolumes(baseVolume)
+        .addToVolumes(secondaryVolume)
+        .build();
+    final V1Container expectedNoClashExecutor = new V1ContainerBuilder()
+        .addToVolumeMounts(baseVolumeMount)
+        .addToVolumeMounts(secondaryVolumeMount)
+        .build();
+
+    Pair<List<V1Volume>, List<V1VolumeMount>> noClashVolumeAndMount = new 
Pair<>(
+        new LinkedList<>(Collections.singletonList(secondaryVolume)),
+        new LinkedList<>(Collections.singletonList(secondaryVolumeMount)));
+
+    testCases.add(new TestTuple<>("No Clash",
+        new Object[]{podSpecNoClashCase, executorNoClashCase, 
noClashVolumeAndMount},
+        new Pair<>(expectedNoClashPodSpec, expectedNoClashExecutor)));
+
+    // Clashing Persistent Volume Claim.
+    final V1PodSpec podSpecClashCase = new V1PodSpecBuilder()
+        .withVolumes(baseVolume)
+        .build();
+    final V1Container executorClashCase = new V1ContainerBuilder()
+        .withVolumeMounts(baseVolumeMount)
+        .build();
+    final V1PodSpec expectedClashPodSpec = new V1PodSpecBuilder()
+        .addToVolumes(clashingVolume)
+        .addToVolumes(secondaryVolume)
+        .build();
+    final V1Container expectedClashExecutor = new V1ContainerBuilder()
+        .addToVolumeMounts(clashingVolumeMount)
+        .addToVolumeMounts(secondaryVolumeMount)
+        .build();
+
+    Pair<List<V1Volume>, List<V1VolumeMount>> clashVolumeAndMount = new Pair<>(
+        new LinkedList<>(Arrays.asList(clashingVolume, secondaryVolume)),
+        new LinkedList<>(Arrays.asList(clashingVolumeMount, 
secondaryVolumeMount)));
+
+    testCases.add(new TestTuple<>("Clashing",
+        new Object[]{podSpecClashCase, executorClashCase, clashVolumeAndMount},
+        new Pair<>(expectedClashPodSpec, expectedClashExecutor)));
+
+    // Testing loop.
+    for (TestTuple<Object[], Pair<V1PodSpec, V1Container>> testCase : 
testCases) {
+      doReturn(testCase.input[2])
+          .when(v1ControllerWithPodTemplate)
+          .createPersistentVolumeClaimVolumesAndMounts(anyMap());
+
+      v1ControllerWithPodTemplate
+          .configurePodWithPersistentVolumeClaimVolumesAndMounts((V1PodSpec) 
testCase.input[0],
+              (V1Container) testCase.input[1]);
+
+      Assert.assertEquals("Pod Specs match " + testCase.description,
+          testCase.input[0], testCase.expected.first);
+      Assert.assertEquals("Executors match " + testCase.description,
+          testCase.input[1], testCase.expected.second);
+    }
+  }
 }
diff --git a/website2/docs/schedulers-k8s-persistent-volume-claims.md 
b/website2/docs/schedulers-k8s-persistent-volume-claims.md
new file mode 100644
index 0000000..994c22c
--- /dev/null
+++ b/website2/docs/schedulers-k8s-persistent-volume-claims.md
@@ -0,0 +1,257 @@
+---
+id: schedulers-k8s-persistent-volume-claims
+title: Kubernetes Persistent Volume Claims via CLI
+sidebar_label: Kubernetes Persistent Volume Claims (CLI)
+---
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+> This document demonstrates how you can utilize both static and dynamically 
backed [Persistent Volume 
Claims](https://kubernetes.io/docs/concepts/storage/dynamic-provisioning/) in 
the `Executor` containers. You will need to enable Dynamic Provisioning in your 
Kubernetes cluster to proceed to use the dynamic provisioning functionality.
+
+<br/>
+
+It is possible to leverage Persistent Volumes with custom Pod Templates but 
the Volumes you add will be shared between all Pods in the topology.
+
+The CLI commands allow you to configure a Persistent Volume Claim (dynamically 
or statically backed) which will be unique and isolated to each Pod and mounted 
in a single `Executor` when you submit your topology with a Claim name of 
`OnDemand`. Using any Claim name other than on `OnDemand` will permit you to 
configure a shared Persistent Volume without a custom Pod Template which will 
be specific to an individual Pod. The CLI commands override any configurations 
you may have present in t [...]
+
+Some use cases include process checkpointing, caching of results for later use 
in the process, intermediate results which could prove useful in analysis 
(ETL/ELT to a data lake or warehouse), as a source of data enrichment, etc.
+
+**Note:** Heron ***will*** remove any dynamically backed Persistent Volume 
Claims it creates when a topology is terminated. Please be aware that Heron 
uses the following `Labels` to locate the claims it has created:
+```yaml
+metadata:
+  labels:
+    topology: <topology-name>
+    onDemand: true
+```
+
+<br>
+
+> ***System Administrators:***
+>
+> * You may wish to disable the ability to configure dynamic Persistent Volume 
Claims specified on the CLI. To achieve this, you must pass the define option 
`-D heron.kubernetes.persistent.volume.claims.cli.disabled=true` to the Heron 
API Server on the command line during boot. This command has been added to the 
Kubernetes configuration files to deploy the Heron API Server and is set to 
`false` by default.
+> * If you have a custom `Role`/`ClusterRole` for the Heron API Server you 
will need to ensure the `ServiceAccount` attached to the API server has the 
correct permissions to access the `Persistent Volume Claim`s:
+>
+>```yaml
+>rules:
+>- apiGroups: 
+>  - ""
+>  resources: 
+>  - persistentvolumeclaims
+>  verbs: 
+>  - create
+>  - delete
+>  - get
+>  - list
+>  - deletecollection
+>```
+
+<br>
+
+## Usage
+
+To configure a Persistent Volume Claim you must use the `--config-property` 
option with the `heron.kubernetes.volumes.persistentVolumeClaim.` command 
prefix. Heron will not validate your Persistent Volume Claim configurations, so 
please validate them to ensure they are well-formed. All names must comply with 
the [*lowercase 
RFC-1123*](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/)
 standard.
+
+The command pattern is as follows:
+`heron.kubernetes.volumes.persistentVolumeClaim.[VOLUME NAME].[OPTION]=[VALUE]`
+
+The currently supported CLI `options` are:
+
+* `claimName`
+* `storageClass`
+* `sizeLimit`
+* `accessModes`
+* `volumeMode`
+* `path`
+* `subPath`
+
+***Note:*** A `claimName` of `OnDemand` will create unique Volumes for each 
`Executor` as well as deploy a Persistent Volume Claim for each Volume. Any 
other Claim name will result in a shared Volume being created between all Pods 
in the topology.
+
+***Note:*** The `accessModes` must be a comma separated list of values 
*without* any white space. Valid values can be found in the [Kubernetes 
documentation](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes).
+
+***Note:*** If a `storageClassName` is specified and there are no matching 
Persistent Volumes then [dynamic 
provisioning](https://kubernetes.io/docs/concepts/storage/dynamic-provisioning/)
 must be enabled. Kubernetes will attempt to locate a Persistent Volume that 
matches the `storageClassName` before it attempts to use dynamic provisioning. 
If a `storageClassName` is not specified there must be [Persistent 
Volumes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-persi
 [...]
+
+<br>
+
+### Example
+
+An example series of commands and the `YAML` entries they make in their 
respective configurations are as follows.
+
+***Dynamic:***
+
+```bash
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.claimName=OnDemand
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.storageClassName=storage-class-name-of-choice
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.accessModes=comma,separated,list
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.sizeLimit=555Gi
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.volumeMode=volume-mode-of-choice
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.path=/path/to/mount
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.subPath=/sub/path/to/mount
+```
+
+Generated `Persistent Volume Claim`:
+
+```yaml
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+  labels:
+    app: heron
+    onDemand: "true"
+    topology: <topology-name>
+  name: volumenameofchoice-<topology-name>-[Ordinal]
+spec:
+  accessModes:
+  - comma
+  - separated
+  - list
+  resources:
+    requests:
+      storage: 555Gi
+  storageClassName: storage-class-name-of-choice
+  volumeMode: volume-mode-of-choice
+```
+
+Pod Spec entries for `Volume`:
+
+```yaml
+volumes:
+  - name: volumenameofchoice
+    persistentVolumeClaim:
+      claimName: volumenameofchoice-<topology-name>-[Ordinal]
+```
+
+`Executor` container entries for `Volume Mounts`:
+
+```yaml
+volumeMounts:
+  - mountPath: /path/to/mount
+    subPath: /sub/path/to/mount
+    name: volumenameofchoice
+```
+
+<br>
+
+***Static:***
+
+```bash
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.claimName=OnDemand
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.accessModes=comma,separated,list
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.sizeLimit=555Gi
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.volumeMode=volume-mode-of-choice
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.path=/path/to/mount
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.subPath=/sub/path/to/mount
+```
+
+Generated `Persistent Volume Claim`:
+
+```yaml
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+  labels:
+    app: heron
+    onDemand: "true"
+    topology: <topology-name>
+  name: volumenameofchoice-<topology-name>-[Ordinal]
+spec:
+  accessModes:
+  - comma
+  - separated
+  - list
+  resources:
+    requests:
+      storage: 555Gi
+  storageClassName: standard
+  volumeMode: volume-mode-of-choice
+```
+
+Pod Spec entries for `Volume`:
+
+```yaml
+volumes:
+  - name: volumenameofchoice
+    persistentVolumeClaim:
+      claimName: volumenameofchoice-<topology-name>-[Ordinal]
+```
+
+`Executor` container entries for `Volume Mounts`:
+
+```yaml
+volumeMounts:
+  - mountPath: /path/to/mount
+    subPath: /sub/path/to/mount
+    name: volumenameofchoice
+```
+
+<br>
+
+## Submitting
+
+An example of sumbitting a topology using the *dynamic* example CLI commands 
above:
+
+```bash
+heron submit kubernetes \
+  
--service-url=http://localhost:8001/api/v1/namespaces/default/services/heron-apiserver:9000/proxy
 \
+  ~/.heron/examples/heron-api-examples.jar \
+  org.apache.heron.examples.api.AckingTopology acking \
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.claimName=OnDemand
 \
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.storageClassName=storage-class-name-of-choice
 \
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.accessModes=comma,separated,list
 \
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.sizeLimit=555Gi
 \
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.volumeMode=volume-mode-of-choice
 \
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.path=/path/to/mount
 \
+--config-property 
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.subPath=/sub/path/to/mount
+```
+
+## Required and Optional Configuration Items
+
+The following table outlines CLI options which are either ***required*** ( 
&#x2705; ), ***optional*** ( &#x2754; ), or ***not available*** ( &#x274c; ) 
depending on if you are using dynamic/statically backed or shared `Volume`.
+
+| Option | Dynamic | Static | Shared
+|---|---|---|---|
+| `VOLUME NAME` | &#x2705; | &#x2705; | &#x2705;
+| `claimName` | `OnDemand` | `OnDemand` | A valid name
+| `path` | &#x2705; | &#x2705; | &#x2705;
+| `subPath` | &#x2754; | &#x2754; | &#x2754;
+| `storageClassName` | &#x2705; | &#x274c; | &#x274c;
+| `accessModes` | &#x2705; | &#x2705; | &#x274c;
+| `sizeLimit` | &#x2754; | &#x2754; | &#x274c;
+| `volumeMode` | &#x2754; | &#x2754; | &#x274c;
+
+<br>
+
+***Note:*** The `VOLUME NAME` will be extracted from the CLI command and a 
`claimName` is a always required.
+
+<br>
+
+## Configuration Items Created and Entries Made
+
+The configuration items and entries in the tables below will made in their 
respective areas.
+
+One `Persistent Volume Claim`, a `Volume`, and a `Volume Mount` will be 
created for each `volume name` which you specify. Each will be unique to a Pod 
within the topology.
+
+| Name | Description | Policy |
+|---|---|---|
+| `VOLUME NAME` | The `name` of the `Volume`. | Entries made in the 
`Persistent Volume Claim`'s spec, the Pod Spec's `Volumes`, and the `executor` 
containers `volumeMounts`.
+| `claimName` | A Claim name for the Persistent Volume. | If `OnDemand` is 
provided as the parameter then a unique Volume and Persistent Volume Claim will 
be created. Any other name will result in a shared Volume between all Pods in 
the topology with only a Volume and Volume Mount being added.
+| `path` | The `mountPath` of the `Volume`. | Entries made in the `executor` 
containers `volumeMounts`.
+| `subPath` | The `subPath` of the `Volume`. | Entries made in the `executor` 
containers `volumeMounts`.
+| `storageClassName` | The identifier name used to reference the dynamic 
`StorageClass`. | Entries made in the `Persistent Volume Claim` and Pod Spec's 
`Volume`.
+| `accessModes` | A comma separated list of access modes. | Entries made in 
the `Persistent Volume Claim`.
+| `sizeLimit` | A resource request for storage space. | Entries made in the 
`Persistent Volume Claim`.
+| `volumeMode` | Either `FileSystem` (default) or `Block` (raw block). [Read 
more](https://kubernetes.io/docs/concepts/storage/_print/#volume-mode). | 
Entries made in the `Persistent Volume Claim`.
+| Labels | Two labels for `topology` and `onDemand` provisioning are added. | 
These labels are only added to dynamically backed `Persistent Volume Claim`'s 
created by Heron to support the removal of any claims created when a topology 
is terminated.
diff --git a/website2/website/sidebars.json b/website2/website/sidebars.json
index bab3043..c803fdf 100755
--- a/website2/website/sidebars.json
+++ b/website2/website/sidebars.json
@@ -54,6 +54,7 @@
       "schedulers-k8s-by-hand",
       "schedulers-k8s-with-helm",
       "schedulers-k8s-pod-templates",
+      "schedulers-k8s-persistent-volume-claims",
       "schedulers-aurora-cluster",
       "schedulers-aurora-local",
       "schedulers-local",

Reply via email to