This is an automated email from the ASF dual-hosted git repository.

joshfischer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 837c4f2  [HERON-3707] ConfigMap Pod Template Support (#3710)
837c4f2 is described below

commit 837c4f2e15a0759abfa7061728d6f5d1ba98151a
Author: Saad Ur Rahman <[email protected]>
AuthorDate: Tue Nov 2 09:40:39 2021 -0400

    [HERON-3707] ConfigMap Pod Template Support (#3710)
    
    * [Kubernetes] setup basic mount info for Pod ConfigMap.
    
    * [Kubernetes] updated function signature to handle Pod Template ConfigMap 
name.
    
    * [kubernetes] extracting Pod Template ConfigMap name from <Config>.
    
    * [kubernetes] checking for Pod Template ConfigMap and appropriately adding 
to Stateful Set.
    
    * [kubernetes] Java Style lint fix.
    
    * [Tests] Kubernetes Controller tests for Pod Template ConfigMap.
    
    * [Tests] Kubernetes Constants and Context tests for Pod Template ConfigMap.
    
    * [Tests] Kubernetes V1Controller test suite stubbed.
    
    * [Tests] Java style lint fixed.
    
    * [Tests] Kubernetes V1Controller Pod Template ConfigMap volume mount.
    
    * [Kubernetes] cleaned up to begin work on <loadPodFromTemplate>.
    
    * [Kubernetes] created <loadPodFromTemplate>.
    
    * [Tests] Begun mock test setup of <loadPodFromTemplate>.
    
    * [Kubernetes] style check/linting fix.
    
    * [Kubernetes] refactoring <V1Controller> and <KubernetesController>.
    
    * [Kubernetes] added description to failed to locate exception.
    
    * [Tests] <loadPodFromTemplate> Pod Template checks.
    
    * [Kubernetes] check for no ConfigMaps set.
    
    * [Tests] working on mocking null list of V1ConfigMapList.
    
    * [Kubernetes] refactoring <loadPodFromTemplate>
    
    Adding checks for null pointers. Default constructed V1 objects tend to 
have uninitialised fields set to null by default. Extracting <getConfigMaps> to 
method to support mocking.
    
    * [Tests] Stubbed <getConfigMaps> and testing <loadPodFromTemplate>.
    
    * [Kubernetes] <loadPodFromTemplate> adjusted to get <V1PodTemplateSpec> 
from <V1PodTemplate>.
    
    * [Kubernetes] check for empty Pod Template in ConfigMap.
    
    * [Tests] Valid Pod Template test.
    
    * [Tests] Invalid Pod Template.
    
    * [Tests] refactored test data to their respective tests.
    
    * [Kubernetes] refactored <loadPodFromTemplate> for readability.
    
    * [Kubernetes] params for <getConfigMaps> tweaked.
    
    Judging from 
<release-11.0.0/kubernetes/src/main/java/io/kubernetes/client/openapi/apis/CoreV1Api.java>
 "optional" means the field can be set to <null>.
    
    * [Kubernetes] <getPodTemplateLocation> extracting ConfigMap and Pod 
Template names.
    
    * [Tests] <getPodTemplateLocation> for correct and incorrect parsing.
    
    * [Kubernetes] <getPodTemplateLocation> catching empty names.
    
    * [Tests] <getPodTemplateLocation> separated tests for ConfigMap and Pod 
Template names.
    
    * [Kubernetes] updated <loadPodFromTemplate> to use ConfigMap and Pod 
Template names.
    
    * [Tests] updated tests for <loadPodFromTemplate> to use ConfigMap and Pod 
Template names.
    
    * [Kubernetes] added INFO logging to <loadPodFromTemplate> for the deployed 
Pod Template.
    
    * [Kubernetes] Bug fixes in error messages for <loadPodFromTemplate>.
    
    * [Kubernetes] bug fix and test for missing delimiter in 
<getPodTemplateLocation>.
    
    * [Kubernetes] <getConfigMaps> namespace access updated.
    
    * [Kubernetes] <configureRBAC> basic logic.
    
    TODO: get API key for K8s.
    
    * [Tests] cleaned up <V1Controller> tests.
    
    * [Kubernetes] <configureRBAC> more detailed error log.
    
    * [Kubernetes] <configureRBAC> role configurations.
    
    * [Kubernetes] refactored <loadFromPodTemplate>.
    
    * [Tests] switched to <ConfigMapBuilder> in <V1ControllerTest>.
    
    * [Kubernetes] switched to <V1RoleBuilder> in <configureRBAC>.
    
    * [Kubernetes] made <loadPodFromTemplate> protected.
    
    Removed illegal reflection access to avoid support issues with newer 
testing frameworks.
    
    * [Kubernetes] removed <configureRBAC>.
    
    RBAC must be configured using Role/ClusterRole and 
RoleBinding/ClusterRoleBinding to the Heron <heron-apiserver> ServiceAccount.
    
    * [Kubernetes] <getPodTemplateLocation> error message passed up.
    
    * [Kubernetes] refactored <getPodSpec> to <finalizePodSpec>.
    
    Heron should have the final say on the Pod Spec. This is as much a point of 
security as an operational one.
    
    * [Kubernetes] Added boot flag to disable Pod Templates.
    
    * [Tests] testing to validate boot flag for disabled Pod Templates.
    
    * [Kubernetes] Wiring in boot flag to disable Pod Templates in  
<loadPodFromTemplate>.
    
    * [Tests] disabled Pod Templates output validation.
    
    * [Kubernetes] Added class scoped variable <isPodTemplateDisabled>.
    
    * [Kubernetes] <getContainer> modified to utilise supplied executor 
container.
    
    * [Kubernetes] <getContainer> <V1EnvVar>s.
    
    Environment variables merged with Heron defaults taking precedence.
    
    * [Kubernetes] <getContainer> Limits.
    
    Resource Limits merged with Heron defaults taking precedence.
    
    * [Kubernetes] disabled Pod Templates will return error when attempting to 
submit.
    
    * [Kubernetes] <API Server> configs.
    
    Updated RBAC API version and added a commented flag command to disable to 
Pod Templates.
    
    * [Kubernetes] <configureExecutorContainer>
    
    Refactored <getContainer> to <configureExecutorContainer>. Permitting 
additional containers for side-car purposes.
    
    * [Kubernetes] <configureExecutorContainer>
    
    Switched to <TreeSet> with custom comparator for <V1EnvVar> name. 
<V1EnvVar>'s comparator performs a complete element wise comparison.
    
    * [Kubernetes] <configureExecutorContainer>
    
    Merged executor container ports with ports provided in Pod Template. Heron 
defaults take precedence.
    
    * [Kubernetes] <mountVolumeIfPresent>
    
    Merge volume mounts with those provided in Pod Template. Heron defaults 
take precedence.
    
    * [Kubernetes] <V1Controller>
    
    general cleanup of new code and comments.
    
    * [Kubernetes] <mountVolumeIfPresent>
    
    Error check for malformed Pod Template.
    
    * [Kubernetes] <configureContainerPorts>
    
    Refactored <getContainerPorts> and moved port merge with error handling to 
it.
    
    * [Kubernetes] <configureExecutorContainer>
    
    Removed a redundant <limit> put into the HashMap.
    
    * [Kubernetes] <addVolumesIfPresent>
    
    Merging Pod Template volume configs with Heron defaults. Heron values take 
precedence.
    
    * [Kubernetes] <configureExecutorContainer>
    
    Allow user values for CPU and MEMORY limits to override those provided by 
Heron.
    
    * [Scheduler-Core] <LaunchRunner> handling <submit> errors better.
    
    Some Schedulers, such as K8s, throw exceptions instead of returning false 
when <submit> fails. This leaves the Topology Manager with dangling references. 
An addition RPC call to the Scheduler is required to completely clear the state.
    
    * [Kubernetes] <V1Controller>
    
    General cleanup in tests and class.
    
    * [Kubernetes] code review changes.
    
    Code review from @nwangtw.
    <KubernetesContext.getPodTemplateConfigMapDisabled> switched to 
<equalsIgnoreCase>.
    
    * [Scheduler-Core] code review changes.
    
    Code review from @nwangtw.
    <LaunchRunner> error message assembly improved.
    <LaunchRunner> added <FINE> level logging for failure to clear failed 
topology launch from Scheduler.
    
    * [Tests] <configureContainerPorts>.
    
    * [Kubernetes] <API Server> configs.
    
    Code review from @nwangtw, @nicknezis.
    Updated command to disable Pod Templates to <false> by default.
    
    * [Kubernetes] <configureContainerEnvVars>
    
    Logic for merging environment variables extracted to a method for testing.
    
    * [Tests] <configureContainerEnvVars>.
    
    * [Kubernetes] <configureExecutorContainer>
    
    Wired in <configureContainerEnvVars> and removed old code.
    
    * Update for Helm chart
    
    * Updated version to match the other k8s ClusterRoleBindings
    
    * [Kubernetes] <configureContainerResources>
    
    Logic to configure container's resources extracted to method to facilitate 
testing.
    
    * [Kubernetes] <configureExecutorContainer>
    
    Removed old logic and wired <configureContainerResources> into 
<configureExecutorContainer>.
    
    * [Tests] <testConfigureContainerPorts>.
    
    Added a test for debugging ports.
    
    * [Kubernetes] <addVolumesIfPresent>.
    
    Exposed for testing.
    
    * [Tests] <addVolumesIfPresent>.
    
    Testing on a <hostPath> volume but will generalise across others.
    
    * [Kubernetes] <mountVolumeIfPresent>.
    
    Exposed for testing.
    
    * [Tests] <mountVolumeIfPresent>.
    
    Tested by setting a Volume Mount in the Config and then a custom Volume 
Mount in the container.
    
    * [Tests] <addVolumesIfPresent>.
    
    Cleaned up tests.
    
    * [Tests] <mountVolumesIfPresent>.
    
    Testing for when no Volume Mounts should be set.
    
    * [Tests] <addVolumesIfPresent>.
    
    Testing for when no Volumes should be set.
    
    * Attempt to fix Travis CI build
    
    * [Tests] <configureContainerEnvVars> <configureContainerPorts>.
    
    Extracted logic to generate executor environment variables, ports, and 
debugging ports. This is to resolve production-testing code inconsistencies 
which may arise.
    
    * [Tests] <V1ControllerTests>
    
    General cleanup and simplification of test suite.
    
    * Travis fix take 3
    
    * Travis CI fix
    
    * [Kubernetes] <V1ControllerUtils>
    
    Added nested utility class to improve code maintainability.
    
    <mergeListDedupe> will merge two input lists by keeping all values in one 
and deduplicating the second list.
    
    * [Tests] <mergeListDedupe>.
    
    Full battery of tests null lists, merged lists, and thrown errors.
    
    * [Kubernetes] <V1Controller>.
    
    Switched to using <mergeListsDedupe> to improve code maintainability.
    Effects:
    <addVolumesIfPresent>
    <configureContainerEnvVars>
    <configureContainerPorts>
    <mountVolumeIfPresent>
    
    * [Kubernetes] <V1Controller> cleaned up unneeded returns when using setter 
methods.
    
    * [Kubernetes] <V1Controller>.
    
    Merging Pod Specification Tolerations and deduplicating on the 
<V1Tolerations::key>.
    
    * [Tests] <configureTolerations>.
    
    Test for a null, empty, and merging of Toleration lists.
    
    * [Kubernetes] <configurePodSpec>.
    
    Wired in <configureTolerations>
    
    * [Tests] cleaning up code.
    
    * [Kubernetes] <configurePodSpec>.
    
    Added check for multiple executor container specs in Pod Template. Will 
throw error if detected.
    
    * [Tests] <V1Controller> general cleanup.
    
    * [Kubernetes] Constants
    
    Updated tolerations to remove deprecated taints.
    
    * [Kubernetes] <V1Controller>
    
    <getConfigMap> retrieving a single named ConfigMap in a specific namespace.
    <loadPodFromTemplate> logic updated to handle a single ConfigMap.
    
    * [Tests] <V1Controller>
    
    Fixed and cleaned up tests after switching to <readNamespacedConfigMap>.
    
    * [Kubernetes] <V1Controller>
    
    Error message cleanup.
    
    * [Tests] <V1Controller>
    
    Test description cleanup.
    
    * [Kubernetes] <KubernetesUtils>
    
    Javadoc cleanup.
    
    * [Tests] <KubernetesUtils>
    
    Test description cleanup.
    
    * [Tests] <V1Controller>
    
    <configureContainerResources> Heron values take precedence for limits.
    
    * [Kubernetes] <V1Controller>
    
    <configureContainerResources> Heron values take precedence for limits.
    
    * Add support for reading configmap
    
    * Removed deprecated k8s tolerations
    
    Co-authored-by: Nicholas Nezis <[email protected]>
---
 .travis.yml                                        |   9 +-
 deploy/kubernetes/general/apiserver.yaml           |   9 +-
 deploy/kubernetes/general/tools.yaml               |   6 +-
 deploy/kubernetes/gke/gcs-apiserver.yaml           |   8 +-
 deploy/kubernetes/helm/templates/tools.yaml        |  14 +-
 deploy/kubernetes/helm/values.yaml.template        |   3 +
 deploy/kubernetes/minikube/apiserver.yaml          |   3 +-
 .../org/apache/heron/scheduler/LaunchRunner.java   |  47 +-
 .../scheduler/kubernetes/KubernetesConstants.java  |   5 +-
 .../scheduler/kubernetes/KubernetesContext.java    |  15 +
 .../scheduler/kubernetes/KubernetesUtils.java      |  40 +-
 .../heron/scheduler/kubernetes/V1Controller.java   | 398 ++++++++---
 heron/schedulers/tests/java/BUILD                  |   7 +-
 .../kubernetes/KubernetesContextTest.java          |  62 ++
 .../scheduler/kubernetes/KubernetesUtilsTest.java  | 119 ++++
 .../scheduler/kubernetes/V1ControllerTest.java     | 740 +++++++++++++++++++++
 16 files changed, 1352 insertions(+), 133 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index e608fa8..532a335 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -18,6 +18,9 @@ addons:
       - libcppunit-dev
       - pkg-config
       - python3-dev
+      - python3-pip
+      - python3-setuptools
+      - python3-wheel
       - python3-venv
       - wget
       - zip
@@ -34,10 +37,6 @@ before_install:
   - chmod +x bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh
   - ./bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh --user
 
-install:
-  - sudo apt-get install python3-pip python3-setuptools
-  - pip3 install travis-wait-improved
-
 script:
   - which gcc
   - gcc --version
@@ -47,4 +46,4 @@ script:
   - python -V
   - which python3
   - python3 -V
-  - travis-wait-improved --timeout=180m scripts/travis/ci.sh
+  - scripts/travis/ci.sh
diff --git a/deploy/kubernetes/general/apiserver.yaml 
b/deploy/kubernetes/general/apiserver.yaml
index aa5b93e..33c9533 100644
--- a/deploy/kubernetes/general/apiserver.yaml
+++ b/deploy/kubernetes/general/apiserver.yaml
@@ -27,7 +27,7 @@ metadata:
   namespace: default
 
 ---
-apiVersion: rbac.authorization.k8s.io/v1beta1
+apiVersion: rbac.authorization.k8s.io/v1
 kind: ClusterRoleBinding
 metadata:
   name: heron-apiserver
@@ -67,11 +67,7 @@ spec:
           operator: "Equal"
           effect: "NoExecute"
           tolerationSeconds: 10
-        - key: "node.alpha.kubernetes.io/notReady"
-          operator: "Equal"
-          effect: "NoExecute"
-          tolerationSeconds: 10
-        - key: "node.alpha.kubernetes.io/unreachable"
+        - key: "node.kubernetes.io/unreachable"
           operator: "Equal"
           effect: "NoExecute"
           tolerationSeconds: 10
@@ -95,6 +91,7 @@ spec:
               -D 
heron.uploader.dlog.topologies.namespace.uri=distributedlog://zookeeper:2181/heron
               -D 
heron.statefulstorage.classname=org.apache.heron.statefulstorage.dlog.DlogStorage
               -D 
heron.statefulstorage.dlog.namespace.uri=distributedlog://zookeeper:2181/heron
+              -D heron.kubernetes.pod.template.configmap.disabled=false
 
 ---
 apiVersion: v1
diff --git a/deploy/kubernetes/general/tools.yaml 
b/deploy/kubernetes/general/tools.yaml
index b650e04..d60790a 100644
--- a/deploy/kubernetes/general/tools.yaml
+++ b/deploy/kubernetes/general/tools.yaml
@@ -38,11 +38,7 @@ spec:
           operator: "Equal"
           effect: "NoExecute"
           tolerationSeconds: 10
-        - key: "node.alpha.kubernetes.io/notReady"
-          operator: "Equal"
-          effect: "NoExecute"
-          tolerationSeconds: 10
-        - key: "node.alpha.kubernetes.io/unreachable"
+        - key: "node.kubernetes.io/unreachable"
           operator: "Equal"
           effect: "NoExecute"
           tolerationSeconds: 10
diff --git a/deploy/kubernetes/gke/gcs-apiserver.yaml 
b/deploy/kubernetes/gke/gcs-apiserver.yaml
index 2a84be1..3c98674 100644
--- a/deploy/kubernetes/gke/gcs-apiserver.yaml
+++ b/deploy/kubernetes/gke/gcs-apiserver.yaml
@@ -27,7 +27,7 @@ metadata:
   namespace: default
 
 ---
-apiVersion: rbac.authorization.k8s.io/v1beta1
+apiVersion: rbac.authorization.k8s.io/v1
 kind: ClusterRoleBinding
 metadata:
   name: heron-apiserver
@@ -67,11 +67,7 @@ spec:
           operator: "Equal"
           effect: "NoExecute"
           tolerationSeconds: 10
-        - key: "node.alpha.kubernetes.io/notReady"
-          operator: "Equal"
-          effect: "NoExecute"
-          tolerationSeconds: 10
-        - key: "node.alpha.kubernetes.io/unreachable"
+        - key: "node.kubernetes.io/unreachable"
           operator: "Equal"
           effect: "NoExecute"
           tolerationSeconds: 10
diff --git a/deploy/kubernetes/helm/templates/tools.yaml 
b/deploy/kubernetes/helm/templates/tools.yaml
index 0a6fe15..123f0e2 100644
--- a/deploy/kubernetes/helm/templates/tools.yaml
+++ b/deploy/kubernetes/helm/templates/tools.yaml
@@ -56,11 +56,7 @@ spec:
           operator: "Equal"
           effect: "NoExecute"
           tolerationSeconds: 10
-        - key: "node.alpha.kubernetes.io/notReady"
-          operator: "Equal"
-          effect: "NoExecute"
-          tolerationSeconds: 10
-        - key: "node.alpha.kubernetes.io/unreachable"
+        - key: "node.kubernetes.io/unreachable"
           operator: "Equal"
           effect: "NoExecute"
           tolerationSeconds: 10
@@ -162,6 +158,7 @@ spec:
               -D 
heron.class.repacking.algorithm=org.apache.heron.packing.binpacking.FirstFitDecreasingPacking
               {{- end }}
               -D heron.kubernetes.resource.request.mode={{ 
.Values.topologyResourceRequestMode }}
+              -D heron.kubernetes.pod.template.configmap.disabled={{ 
.Values.disablePodTemplates }}
           envFrom:
             - configMapRef:
                 name: {{ .Release.Name }}-tools-config
@@ -265,6 +262,13 @@ rules:
   - patch
   - update
   - watch
+- apiGroups:
+  - ""
+  resources:
+  - configmaps
+  verbs:
+  - get
+  - list
 
 ---
 apiVersion: v1
diff --git a/deploy/kubernetes/helm/values.yaml.template 
b/deploy/kubernetes/helm/values.yaml.template
index cf0154d..61daf04 100644
--- a/deploy/kubernetes/helm/values.yaml.template
+++ b/deploy/kubernetes/helm/values.yaml.template
@@ -58,6 +58,9 @@ uploader:
 # Packing algorithms
 packing: RoundRobin # ResourceCompliantRR, FirstFitDecreasing
 
+# Support for ConfigMap mounted PodTemplates
+disablePodTemplates: false
+
 # Number of replicas for storage bookies, memory and storage requirements
 bookieReplicas: 3
 bookieCpuMin: 100m
diff --git a/deploy/kubernetes/minikube/apiserver.yaml 
b/deploy/kubernetes/minikube/apiserver.yaml
index 2cbf363..8c08cc9 100644
--- a/deploy/kubernetes/minikube/apiserver.yaml
+++ b/deploy/kubernetes/minikube/apiserver.yaml
@@ -28,7 +28,7 @@ metadata:
   namespace: default
 
 ---
-apiVersion: rbac.authorization.k8s.io/v1beta1
+apiVersion: rbac.authorization.k8s.io/v1
 kind: ClusterRoleBinding
 metadata:
   name: heron-apiserver
@@ -82,6 +82,7 @@ spec:
               -D 
heron.uploader.dlog.topologies.namespace.uri=distributedlog://zookeeper:2181/heronbkdl
               -D 
heron.statefulstorage.classname=org.apache.heron.statefulstorage.dlog.DlogStorage
               -D 
heron.statefulstorage.dlog.namespace.uri=distributedlog://zookeeper:2181/heronbkdl
+              -D heron.kubernetes.pod.template.configmap.disabled=false
 
 ---
 apiVersion: v1
diff --git 
a/heron/scheduler-core/src/java/org/apache/heron/scheduler/LaunchRunner.java 
b/heron/scheduler-core/src/java/org/apache/heron/scheduler/LaunchRunner.java
index e937443..1b78667 100644
--- a/heron/scheduler-core/src/java/org/apache/heron/scheduler/LaunchRunner.java
+++ b/heron/scheduler-core/src/java/org/apache/heron/scheduler/LaunchRunner.java
@@ -19,11 +19,15 @@
 
 package org.apache.heron.scheduler;
 
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.proto.scheduler.Scheduler;
 import org.apache.heron.proto.system.ExecutionEnvironment;
 import org.apache.heron.proto.system.PackingPlans;
+import org.apache.heron.scheduler.client.ISchedulerClient;
+import org.apache.heron.scheduler.client.SchedulerClientFactory;
 import org.apache.heron.scheduler.dryrun.SubmitDryRunResponse;
 import org.apache.heron.scheduler.utils.LauncherUtils;
 import org.apache.heron.scheduler.utils.Runtime;
@@ -169,13 +173,48 @@ public class LaunchRunner {
           "Failed to set execution state for topology '%s'", topologyName));
     }
 
-    // launch the topology, clear the state if it fails
-    if (!launcher.launch(packedPlan)) {
+    // Launch the topology, clear the state if it fails. Some schedulers throw 
exceptions instead of
+    // returning false. In some cases the scheduler needs to have the topology 
deleted.
+    try {
+      if (!launcher.launch(packedPlan)) {
+        throw new TopologySubmissionException(null);
+      }
+    } catch (TopologySubmissionException e) {
+      // Compile error message to throw.
+      final StringBuilder errorMessage = new StringBuilder(
+          String.format("Failed to launch topology '%s'", topologyName));
+      if (e.getMessage() != null) {
+        errorMessage.append("\n").append(e.getMessage());
+      }
+
+      try {
+        // Clear state from the Scheduler via RPC.
+        Scheduler.KillTopologyRequest killTopologyRequest = 
Scheduler.KillTopologyRequest
+            .newBuilder()
+            .setTopologyName(topologyName).build();
+
+        ISchedulerClient schedulerClient = new SchedulerClientFactory(config, 
runtime)
+            .getSchedulerClient();
+        if (!schedulerClient.killTopology(killTopologyRequest)) {
+          final String logMessage =
+              String.format("Failed to remove topology '%s' from scheduler 
after failed submit. "
+                  + "Please re-try the kill command.", topologyName);
+          errorMessage.append("\n").append(logMessage);
+          LOG.log(Level.SEVERE, logMessage);
+        }
+      // SUPPRESS CHECKSTYLE IllegalCatch
+      } catch (Exception ignored){
+        // The above call to clear the Scheduler may fail. This situation can 
be ignored.
+        LOG.log(Level.FINE,
+            String.format("Failure clearing failed topology `%s` from 
Scheduler during `submit`",
+                topologyName));
+      }
+
+      // Clear state from the State Manager.
       statemgr.deleteExecutionState(topologyName);
       statemgr.deletePackingPlan(topologyName);
       statemgr.deleteTopology(topologyName);
-      throw new LauncherException(String.format(
-          "Failed to launch topology '%s'", topologyName));
+      throw new LauncherException(errorMessage.toString());
     }
   }
 }
diff --git 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
index be45918..5e7b19a 100644
--- 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
+++ 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
@@ -35,6 +35,8 @@ public final class KubernetesConstants {
   public static final String MEMORY = "memory";
   public static final String CPU = "cpu";
 
+  public static final String EXECUTOR_NAME = "executor";
+
   // container env constants
   public static final String ENV_HOST = "HOST";
   public static final String POD_IP = "status.podIP";
@@ -102,8 +104,7 @@ public final class KubernetesConstants {
   static final List<String> TOLERATIONS = Collections.unmodifiableList(
       Arrays.asList(
           "node.kubernetes.io/not-ready",
-          "node.alpha.kubernetes.io/notReady",
-          "node.alpha.kubernetes.io/unreachable"
+          "node.kubernetes.io/unreachable"
       )
   );
 }
diff --git 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
index 71a660a..6d29b72 100644
--- 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
+++ 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
@@ -82,6 +82,12 @@ public final class KubernetesContext extends Context {
   public static final String KUBERNETES_VOLUME_AWS_EBS_FS_TYPE =
       "heron.kubernetes.volume.awsElasticBlockStore.fsType";
 
+  // pod template configmap
+  public static final String KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME =
+      "heron.kubernetes.pod.template.configmap.name";
+  public static final String KUBERNETES_POD_TEMPLATE_CONFIGMAP_DISABLED =
+      "heron.kubernetes.pod.template.configmap.disabled";
+
   // container mount volume mount keys
   public static final String KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME =
       "heron.kubernetes.container.volumeMount.name";
@@ -172,6 +178,15 @@ public final class KubernetesContext extends Context {
     return config.getStringValue(KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH);
   }
 
+  public static String getPodTemplateConfigMapName(Config config) {
+    return config.getStringValue(KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME);
+  }
+
+  public static boolean getPodTemplateConfigMapDisabled(Config config) {
+    final String disabled = 
config.getStringValue(KUBERNETES_POD_TEMPLATE_CONFIGMAP_DISABLED);
+    return "true".equalsIgnoreCase(disabled);
+  }
+
   public static Map<String, String> getPodLabels(Config config) {
     return getConfigItemsByPrefix(config, KUBERNETES_POD_LABEL_PREFIX);
   }
diff --git 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
index 5f963fd..a75e00c 100644
--- 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
+++ 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
@@ -20,17 +20,22 @@
 package org.apache.heron.scheduler.kubernetes;
 
 import java.io.IOException;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.heron.common.basics.ByteAmount;
 import org.apache.heron.common.basics.SysUtils;
+import org.apache.heron.scheduler.TopologySubmissionException;
 import org.apache.heron.scheduler.utils.Runtime;
 import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.common.Context;
 
 import io.kubernetes.client.openapi.ApiException;
-
 import okhttp3.Response;
 
 final class KubernetesUtils {
@@ -84,4 +89,37 @@ final class KubernetesUtils {
   static String Megabytes(ByteAmount amount) {
     return String.format("%sMi", Long.toString(amount.asMegabytes()));
   }
+
+  static class V1ControllerUtils<T> {
+    private static final Logger LOG = 
Logger.getLogger(V1Controller.class.getName());
+
+    /**
+     * Merge two lists by keeping all values in the <code>primaryList</code> 
and de-duplicating values in
+     * <code>secondaryList</code> using the <code>comparator</code>.
+     * @param primaryList All the values in this will be retained.
+     * @param secondaryList The values in this list will be deduplicated 
against <code>primaryList</code>.
+     * @param comparator Used to compare keys in the <code>TreeSet</code> to 
find their insertion position.
+     * @param description Description of the list merge operation which is 
used for error messages.
+     * @return A de-duplicated list of all the values in both input lists 
using the <code>comparator</code>.
+     */
+    protected List<T> mergeListsDedupe(List<T> primaryList, List<T> 
secondaryList,
+                                       Comparator<T> comparator, String 
description) {
+      if (primaryList == null || primaryList.isEmpty()) {
+        return secondaryList;
+      }
+      if (secondaryList == null || secondaryList.isEmpty()) {
+        return primaryList;
+      }
+      try {
+        Set<T> treeSet = new TreeSet<>(comparator);
+        treeSet.addAll(primaryList);
+        treeSet.addAll(secondaryList);
+        return new LinkedList<>(treeSet);
+      } catch (NullPointerException e) {
+        final String message = String.format("Failed to merge lists for %s", 
description);
+        LOG.log(Level.FINE, message);
+        throw new TopologySubmissionException(message);
+      }
+    }
+  }
 }
diff --git 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index 747ed3c..761d615 100644
--- 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++ 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -34,7 +35,10 @@ import java.util.logging.Logger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.heron.api.utils.TopologyUtils;
+import org.apache.heron.common.basics.Pair;
 import org.apache.heron.scheduler.TopologyRuntimeManagementException;
 import org.apache.heron.scheduler.TopologySubmissionException;
 import org.apache.heron.scheduler.utils.Runtime;
@@ -51,6 +55,7 @@ import io.kubernetes.client.openapi.ApiException;
 import io.kubernetes.client.openapi.Configuration;
 import io.kubernetes.client.openapi.apis.AppsV1Api;
 import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1ConfigMap;
 import io.kubernetes.client.openapi.models.V1Container;
 import io.kubernetes.client.openapi.models.V1ContainerPort;
 import io.kubernetes.client.openapi.models.V1EnvVar;
@@ -59,6 +64,7 @@ import io.kubernetes.client.openapi.models.V1LabelSelector;
 import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
 import io.kubernetes.client.openapi.models.V1ObjectMeta;
 import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1PodTemplate;
 import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
 import io.kubernetes.client.openapi.models.V1ResourceRequirements;
 import io.kubernetes.client.openapi.models.V1SecretKeySelector;
@@ -71,7 +77,7 @@ import io.kubernetes.client.openapi.models.V1Toleration;
 import io.kubernetes.client.openapi.models.V1Volume;
 import io.kubernetes.client.openapi.models.V1VolumeMount;
 import io.kubernetes.client.util.PatchUtils;
-
+import io.kubernetes.client.util.Yaml;
 import okhttp3.Response;
 
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
@@ -83,11 +89,18 @@ public class V1Controller extends KubernetesController {
 
   private static final String ENV_SHARD_ID = "SHARD_ID";
 
+  private final boolean isPodTemplateDisabled;
+
   private final AppsV1Api appsClient;
   private final CoreV1Api coreClient;
 
   V1Controller(Config configuration, Config runtimeConfiguration) {
     super(configuration, runtimeConfiguration);
+
+    isPodTemplateDisabled = 
KubernetesContext.getPodTemplateConfigMapDisabled(configuration);
+    LOG.log(Level.WARNING, String.format("Custom Pod Templates are %s",
+        isPodTemplateDisabled ? "DISABLED" : "ENABLED"));
+
     try {
       final ApiClient apiClient = 
io.kubernetes.client.util.Config.defaultClient();
       Configuration.setDefaultApiClient(apiClient);
@@ -338,17 +351,17 @@ public class V1Controller extends KubernetesController {
 
     final V1Service service = new V1Service();
 
-    // setup service metadata
-    final V1ObjectMeta objectMeta = new V1ObjectMeta();
-    objectMeta.name(topologyName);
-    objectMeta.annotations(getServiceAnnotations());
-    objectMeta.setLabels(getServiceLabels());
+    // Setup service metadata.
+    final V1ObjectMeta objectMeta = new V1ObjectMeta()
+        .name(topologyName)
+        .annotations(getServiceAnnotations())
+        .labels(getServiceLabels());
     service.setMetadata(objectMeta);
 
-    // create the headless service
-    final V1ServiceSpec serviceSpec = new V1ServiceSpec();
-    serviceSpec.clusterIP("None");
-    serviceSpec.setSelector(getPodMatchLabels(topologyName));
+    // Create the headless service.
+    final V1ServiceSpec serviceSpec = new V1ServiceSpec()
+        .clusterIP("None")
+        .selector(getPodMatchLabels(topologyName));
 
     service.setSpec(serviceSpec);
 
@@ -361,44 +374,44 @@ public class V1Controller extends KubernetesController {
 
     final V1StatefulSet statefulSet = new V1StatefulSet();
 
-    // setup stateful set metadata
-    final V1ObjectMeta objectMeta = new V1ObjectMeta();
-    objectMeta.name(topologyName);
-    statefulSet.metadata(objectMeta);
+    // Setup StatefulSet's metadata.
+    final V1ObjectMeta objectMeta = new V1ObjectMeta()
+        .name(topologyName);
+    statefulSet.setMetadata(objectMeta);
 
-    // create the stateful set spec
-    final V1StatefulSetSpec statefulSetSpec = new V1StatefulSetSpec();
-    statefulSetSpec.serviceName(topologyName);
-    
statefulSetSpec.setReplicas(Runtime.numContainers(runtimeConfiguration).intValue());
+    // Create the stateful set spec.
+    final V1StatefulSetSpec statefulSetSpec = new V1StatefulSetSpec()
+        .serviceName(topologyName)
+        .replicas(Runtime.numContainers(runtimeConfiguration).intValue());
 
     // Parallel pod management tells the StatefulSet controller to launch or 
terminate
     // all Pods in parallel, and not to wait for Pods to become Running and 
Ready or completely
     // terminated prior to launching or terminating another Pod.
     statefulSetSpec.setPodManagementPolicy("Parallel");
 
-    // add selector match labels "app=heron" and "topology=topology-name"
-    // so the we know which pods to manage
-    final V1LabelSelector selector = new V1LabelSelector();
-    selector.matchLabels(getPodMatchLabels(topologyName));
-    statefulSetSpec.selector(selector);
+    // Add selector match labels "app=heron" and "topology=topology-name"
+    // so we know which pods to manage.
+    final V1LabelSelector selector = new V1LabelSelector()
+        .matchLabels(getPodMatchLabels(topologyName));
+    statefulSetSpec.setSelector(selector);
 
     // create a pod template
-    final V1PodTemplateSpec podTemplateSpec = new V1PodTemplateSpec();
+    final V1PodTemplateSpec podTemplateSpec = loadPodFromTemplate();
 
     // set up pod meta
     final V1ObjectMeta templateMetaData = new 
V1ObjectMeta().labels(getPodLabels(topologyName));
     Map<String, String> annotations = new HashMap<>();
     annotations.putAll(getPodAnnotations());
     annotations.putAll(getPrometheusAnnotations());
-    templateMetaData.annotations(annotations);
+    templateMetaData.setAnnotations(annotations);
     podTemplateSpec.setMetadata(templateMetaData);
 
     final List<String> command = getExecutorCommand("$" + ENV_SHARD_ID, 
numberOfInstances);
-    podTemplateSpec.spec(getPodSpec(command, containerResource, 
numberOfInstances));
+    configurePodSpec(podTemplateSpec, command, containerResource, 
numberOfInstances);
 
     statefulSetSpec.setTemplate(podTemplateSpec);
 
-    statefulSet.spec(statefulSetSpec);
+    statefulSet.setSpec(statefulSetSpec);
 
     return statefulSet;
   }
@@ -441,28 +454,67 @@ public class V1Controller extends KubernetesController {
     return KubernetesContext.getServiceLabels(getConfiguration());
   }
 
-  private V1PodSpec getPodSpec(List<String> executorCommand, Resource resource,
-      int numberOfInstances) {
-    final V1PodSpec podSpec = new V1PodSpec();
+  private void configurePodSpec(final V1PodTemplateSpec podTemplateSpec,
+                                List<String> executorCommand,
+                                Resource resource,
+                                int numberOfInstances) {
+    if (podTemplateSpec.getSpec() == null) {
+      podTemplateSpec.setSpec(new V1PodSpec());
+    }
+    final V1PodSpec podSpec = podTemplateSpec.getSpec();
 
     // set the termination period to 0 so pods can be deleted quickly
     podSpec.setTerminationGracePeriodSeconds(0L);
 
     // set the pod tolerations so pods are rescheduled when nodes go down
     // 
https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/#taint-based-evictions
-    podSpec.setTolerations(getTolerations());
+    configureTolerations(podSpec);
+
+    // Get <executor> container and discard all others.
+    final String executorName = KubernetesConstants.EXECUTOR_NAME;
+    V1Container executorContainer = null;
+    List<V1Container> containers = podSpec.getContainers();
+    if (containers != null) {
+      for (V1Container container : containers) {
+        final String name = container.getName();
+        if (name != null && name.equals(executorName)) {
+          if (executorContainer != null) {
+            throw new TopologySubmissionException(
+                String.format("Multiple configurations found for %s 
container", executorName));
+          }
+          executorContainer = container;
+        }
+      }
+    } else {
+      containers = new LinkedList<>();
+    }
 
-    podSpec.containers(Collections.singletonList(
-        getContainer(executorCommand, resource, numberOfInstances)));
+    if (executorContainer == null) {
+      executorContainer = new V1Container().name(executorName);
+      containers.add(executorContainer);
+    }
+
+    configureExecutorContainer(executorCommand, resource, numberOfInstances, 
executorContainer);
+
+    podSpec.setContainers(containers);
 
     addVolumesIfPresent(podSpec);
 
     mountSecretsAsVolumes(podSpec);
+  }
 
-    return podSpec;
+  @VisibleForTesting
+  protected void configureTolerations(final V1PodSpec spec) {
+    KubernetesUtils.V1ControllerUtils<V1Toleration> utils =
+        new KubernetesUtils.V1ControllerUtils<>();
+    spec.setTolerations(
+        utils.mergeListsDedupe(getTolerations(), spec.getTolerations(),
+            Comparator.comparing(V1Toleration::getKey), "Pod Specification 
Tolerations")
+    );
   }
 
-  private List<V1Toleration> getTolerations() {
+  @VisibleForTesting
+  protected static List<V1Toleration> getTolerations() {
     final List<V1Toleration> tolerations = new ArrayList<>();
     KubernetesConstants.TOLERATIONS.forEach(t -> {
       final V1Toleration toleration =
@@ -477,13 +529,20 @@ public class V1Controller extends KubernetesController {
     return tolerations;
   }
 
-  private void addVolumesIfPresent(V1PodSpec spec) {
+  @VisibleForTesting
+  protected void addVolumesIfPresent(final V1PodSpec spec) {
     final Config config = getConfiguration();
     if (KubernetesContext.hasVolume(config)) {
-      final V1Volume volume = Volumes.get().create(config);
-      if (volume != null) {
-        LOG.fine("Adding volume: " + volume.toString());
-        spec.volumes(Collections.singletonList(volume));
+      final V1Volume volumeFromConfig = Volumes.get().create(config);
+      if (volumeFromConfig != null) {
+        // Merge volumes. Deduplicate using volume's name with Heron defaults 
taking precedence.
+        KubernetesUtils.V1ControllerUtils<V1Volume> utils =
+            new KubernetesUtils.V1ControllerUtils<>();
+        spec.setVolumes(
+            
utils.mergeListsDedupe(Collections.singletonList(volumeFromConfig), 
spec.getVolumes(),
+                Comparator.comparing(V1Volume::getName), "Pod Template 
Volumes")
+        );
+        LOG.fine("Adding volume: " + volumeFromConfig);
       }
     }
   }
@@ -507,52 +566,62 @@ public class V1Controller extends KubernetesController {
     }
   }
 
-  private V1Container getContainer(List<String> executorCommand, Resource 
resource,
-      int numberOfInstances) {
+  private void configureExecutorContainer(List<String> executorCommand, 
Resource resource,
+                                          int numberOfInstances, final 
V1Container container) {
     final Config configuration = getConfiguration();
-    final V1Container container = new V1Container().name("executor");
 
-    // set up the container images
+    // Set up the container images.
     
container.setImage(KubernetesContext.getExecutorDockerImage(configuration));
 
-    // set up the container command
+    // Set up the container command.
     container.setCommand(executorCommand);
 
     if (KubernetesContext.hasImagePullPolicy(configuration)) {
       
container.setImagePullPolicy(KubernetesContext.getKubernetesImagePullPolicy(configuration));
     }
 
-    // setup the environment variables for the container
-    final V1EnvVar envVarHost = new V1EnvVar();
-    envVarHost.name(KubernetesConstants.ENV_HOST)
-        .valueFrom(new V1EnvVarSource()
-            .fieldRef(new V1ObjectFieldSelector()
-                .fieldPath(KubernetesConstants.POD_IP)));
-
-    final V1EnvVar envVarPodName = new V1EnvVar();
-    envVarPodName.name(KubernetesConstants.ENV_POD_NAME)
-        .valueFrom(new V1EnvVarSource()
-            .fieldRef(new V1ObjectFieldSelector()
-                .fieldPath(KubernetesConstants.POD_NAME)));
-    container.addEnvItem(envVarHost);
-    container.addEnvItem(envVarPodName);
+    // Configure environment variables.
+    configureContainerEnvVars(container);
 
+    // Set secret keys.
     setSecretKeyRefs(container);
 
-    // set container resources
-    final V1ResourceRequirements resourceRequirements = new 
V1ResourceRequirements();
-    // Set the Kubernetes container resource limit
-    final Map<String, Quantity> limits = new HashMap<>();
+    // Set container resources
+    configureContainerResources(container, configuration, resource);
+
+    // Set container ports.
+    final boolean debuggingEnabled =
+        TopologyUtils.getTopologyRemoteDebuggingEnabled(
+            Runtime.topology(getRuntimeConfiguration()));
+    configureContainerPorts(debuggingEnabled, numberOfInstances, container);
+
+    // setup volume mounts
+    mountVolumeIfPresent(container);
+  }
+
+  @VisibleForTesting
+  protected void configureContainerResources(final V1Container container,
+                                             final Config configuration, final 
Resource resource) {
+    if (container.getResources() == null) {
+      container.setResources(new V1ResourceRequirements());
+    }
+    final V1ResourceRequirements resourceRequirements = 
container.getResources();
+
+    // Configure resource limits. Deduplicate on limit name with user values 
taking precedence.
+    if (resourceRequirements.getLimits() == null) {
+      resourceRequirements.setLimits(new HashMap<>());
+    }
+    final Map<String, Quantity> limits = resourceRequirements.getLimits();
     limits.put(KubernetesConstants.MEMORY,
-            Quantity.fromString(KubernetesUtils.Megabytes(
-                    resource.getRam())));
+        Quantity.fromString(KubernetesUtils.Megabytes(
+            resource.getRam())));
     limits.put(KubernetesConstants.CPU,
-            Quantity.fromString(Double.toString(roundDecimal(
-                    resource.getCpu(), 3))));
-    resourceRequirements.setLimits(limits);
+        Quantity.fromString(Double.toString(roundDecimal(
+            resource.getCpu(), 3))));
+
+    // Set the Kubernetes container resource request.
     KubernetesContext.KubernetesResourceRequestMode requestMode =
-            KubernetesContext.getKubernetesRequestMode(configuration);
-    // Set the Kubernetes container resource request
+        KubernetesContext.getKubernetesRequestMode(configuration);
     if (requestMode == 
KubernetesContext.KubernetesResourceRequestMode.EQUAL_TO_LIMIT) {
       LOG.log(Level.CONFIG, "Setting K8s Request equal to Limit");
       resourceRequirements.setRequests(limits);
@@ -560,51 +629,95 @@ public class V1Controller extends KubernetesController {
       LOG.log(Level.CONFIG, "Not setting K8s request because config was 
NOT_SET");
     }
     container.setResources(resourceRequirements);
+  }
 
-    // set container ports
-    final boolean debuggingEnabled =
-        TopologyUtils.getTopologyRemoteDebuggingEnabled(
-            Runtime.topology(getRuntimeConfiguration()));
-    container.setPorts(getContainerPorts(debuggingEnabled, numberOfInstances));
+  @VisibleForTesting
+  protected void configureContainerEnvVars(final V1Container container) {
+    // Deduplicate on var name with Heron defaults take precedence.
+    KubernetesUtils.V1ControllerUtils<V1EnvVar> utils = new 
KubernetesUtils.V1ControllerUtils<>();
+    container.setEnv(
+        utils.mergeListsDedupe(getExecutorEnvVars(), container.getEnv(),
+          Comparator.comparing(V1EnvVar::getName), "Pod Template Environment 
Variables")
+    );
+  }
 
-    // setup volume mounts
-    mountVolumeIfPresent(container);
+  @VisibleForTesting
+  protected static List<V1EnvVar> getExecutorEnvVars() {
+    final V1EnvVar envVarHost = new V1EnvVar();
+    envVarHost.name(KubernetesConstants.ENV_HOST)
+        .valueFrom(new V1EnvVarSource()
+            .fieldRef(new V1ObjectFieldSelector()
+                .fieldPath(KubernetesConstants.POD_IP)));
 
-    return container;
+    final V1EnvVar envVarPodName = new V1EnvVar();
+    envVarPodName.name(KubernetesConstants.ENV_POD_NAME)
+        .valueFrom(new V1EnvVarSource()
+            .fieldRef(new V1ObjectFieldSelector()
+                .fieldPath(KubernetesConstants.POD_NAME)));
+
+    return Arrays.asList(envVarHost, envVarPodName);
   }
 
-  private List<V1ContainerPort> getContainerPorts(boolean remoteDebugEnabled,
-      int numberOfInstances) {
-    List<V1ContainerPort> ports = new ArrayList<>();
-    KubernetesConstants.EXECUTOR_PORTS.forEach((p, v) -> {
-      final V1ContainerPort port = new V1ContainerPort();
-      port.setName(p.getName());
-      port.setContainerPort(v);
-      ports.add(port);
-    });
+  @VisibleForTesting
+  protected void configureContainerPorts(boolean remoteDebugEnabled, int 
numberOfInstances,
+                                         final V1Container container) {
+    List<V1ContainerPort> ports = new ArrayList<>(getExecutorPorts());
 
     if (remoteDebugEnabled) {
-      IntStream.range(0, numberOfInstances).forEach(i -> {
-        final String portName =
-            KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + i;
-        final V1ContainerPort port = new V1ContainerPort();
-        port.setName(portName);
-        port.setContainerPort(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + 
i);
-        ports.add(port);
-      });
+      ports.addAll(getDebuggingPorts(numberOfInstances));
     }
 
+    // Set container ports. Deduplicate using port number with Heron defaults 
taking precedence.
+    KubernetesUtils.V1ControllerUtils<V1ContainerPort> utils =
+        new KubernetesUtils.V1ControllerUtils<>();
+    container.setPorts(
+        utils.mergeListsDedupe(getExecutorPorts(), container.getPorts(),
+            Comparator.comparing(V1ContainerPort::getContainerPort), "Pod 
Template Ports")
+    );
+  }
+
+  @VisibleForTesting
+  protected static List<V1ContainerPort> getExecutorPorts() {
+    List<V1ContainerPort> ports = new LinkedList<>();
+    KubernetesConstants.EXECUTOR_PORTS.forEach((p, v) -> {
+      final V1ContainerPort port = new V1ContainerPort()
+          .name(p.getName())
+          .containerPort(v);
+      ports.add(port);
+    });
+    return ports;
+  }
+
+  @VisibleForTesting
+  protected static List<V1ContainerPort> getDebuggingPorts(int 
numberOfInstances) {
+    List<V1ContainerPort> ports = new LinkedList<>();
+    IntStream.range(0, numberOfInstances).forEach(i -> {
+      final String portName =
+          KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + i;
+      final V1ContainerPort port = new V1ContainerPort()
+          .name(portName)
+          .containerPort(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i);
+      ports.add(port);
+    });
     return ports;
   }
 
-  private void mountVolumeIfPresent(V1Container container) {
+  @VisibleForTesting
+  protected void mountVolumeIfPresent(final V1Container container) {
     final Config config = getConfiguration();
     if (KubernetesContext.hasContainerVolume(config)) {
       final V1VolumeMount mount =
           new V1VolumeMount()
               .name(KubernetesContext.getContainerVolumeName(config))
               
.mountPath(KubernetesContext.getContainerVolumeMountPath(config));
-      container.volumeMounts(Collections.singletonList(mount));
+
+      // Merge volume mounts. Deduplicate using mount's name with Heron 
defaults taking precedence.
+      KubernetesUtils.V1ControllerUtils<V1VolumeMount> utils =
+          new KubernetesUtils.V1ControllerUtils<>();
+      container.setVolumeMounts(
+          utils.mergeListsDedupe(Collections.singletonList(mount), 
container.getVolumeMounts(),
+              Comparator.comparing(V1VolumeMount::getName), "Pod Template 
Volume Mounts")
+      );
     }
   }
 
@@ -635,4 +748,95 @@ public class V1Controller extends KubernetesController {
     double scale = Math.pow(10, places);
     return Math.round(value * scale) / scale;
   }
+
+  @VisibleForTesting
+  protected V1PodTemplateSpec loadPodFromTemplate() {
+    final Pair<String, String> podTemplateConfigMapName = 
getPodTemplateLocation();
+
+    // Default Pod Template.
+    if (podTemplateConfigMapName == null) {
+      LOG.log(Level.INFO, "Configuring cluster with the Default Pod Template");
+      return new V1PodTemplateSpec();
+    }
+
+    if (isPodTemplateDisabled) {
+      throw new TopologySubmissionException("Custom Pod Templates are 
disabled");
+    }
+
+    final String configMapName = podTemplateConfigMapName.first;
+    final String podTemplateName = podTemplateConfigMapName.second;
+
+    // Attempt to locate ConfigMap with provided Pod Template name.
+    try {
+      V1ConfigMap configMap = getConfigMap(configMapName);
+      if (configMap == null) {
+        throw new ApiException(
+            String.format("K8s client unable to locate ConfigMap '%s'", 
configMapName));
+      }
+
+      final Map<String, String> configMapData = configMap.getData();
+      if (configMapData != null && configMapData.containsKey(podTemplateName)) 
{
+        // NullPointerException when Pod Template is empty.
+        V1PodTemplateSpec podTemplate = ((V1PodTemplate)
+            Yaml.load(configMapData.get(podTemplateName))).getTemplate();
+        LOG.log(Level.INFO, String.format("Configuring cluster with the %s.%s 
Pod Template",
+            configMapName, podTemplateName));
+        return podTemplate;
+      }
+
+      // Failure to locate Pod Template with provided name.
+      throw new ApiException(String.format("Failed to locate Pod Template '%s' 
in ConfigMap '%s'",
+          podTemplateName, configMapName));
+    } catch (ApiException e) {
+      KubernetesUtils.logExceptionWithDetails(LOG, e.getMessage(), e);
+      throw new TopologySubmissionException(e.getMessage());
+    } catch (IOException | ClassCastException | NullPointerException e) {
+      final String message = String.format("Error parsing Pod Template '%s' in 
ConfigMap '%s'",
+          podTemplateName, configMapName);
+      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+      throw new TopologySubmissionException(message);
+    }
+  }
+
+  @VisibleForTesting
+  protected Pair<String, String> getPodTemplateLocation() {
+    final String podTemplateConfigMapName = KubernetesContext
+        .getPodTemplateConfigMapName(getConfiguration());
+
+    if (podTemplateConfigMapName == null) {
+      return null;
+    }
+
+    try {
+      final int splitPoint = podTemplateConfigMapName.indexOf(".");
+      final String configMapName = podTemplateConfigMapName.substring(0, 
splitPoint);
+      final String podTemplateName = 
podTemplateConfigMapName.substring(splitPoint + 1);
+
+      if (configMapName.isEmpty() || podTemplateName.isEmpty()) {
+        throw new IllegalArgumentException("Empty ConfigMap or Pod Template 
name");
+      }
+
+      return new Pair<>(configMapName, podTemplateName);
+    } catch (IndexOutOfBoundsException | IllegalArgumentException e) {
+      final String message = "Invalid ConfigMap and/or Pod Template name";
+      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+      throw new TopologySubmissionException(message);
+    }
+  }
+
+  @VisibleForTesting
+  protected V1ConfigMap getConfigMap(String configMapName) {
+    try {
+      return coreClient.readNamespacedConfigMap(
+          configMapName,
+          getNamespace(),
+          null,
+          null,
+          null);
+    } catch (ApiException e) {
+      final String message = "Error retrieving ConfigMaps";
+      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+      throw new TopologySubmissionException(String.format("%s: %s", message, 
e.getMessage()));
+    }
+  }
 }
diff --git a/heron/schedulers/tests/java/BUILD 
b/heron/schedulers/tests/java/BUILD
index 9bbe49b..70ded82 100644
--- a/heron/schedulers/tests/java/BUILD
+++ b/heron/schedulers/tests/java/BUILD
@@ -192,7 +192,9 @@ java_tests(
 java_library(
     name = "kubernetes-tests",
     srcs = glob(["**/kubernetes/*.java"]),
-    deps = scheduler_deps_files + kubernetes_deps_files,
+    deps = scheduler_deps_files + kubernetes_deps_files + [
+        "@maven//:org_apache_commons_commons_collections4",
+    ],
 )
 
 java_tests(
@@ -202,6 +204,9 @@ java_tests(
         "org.apache.heron.scheduler.kubernetes.KubernetesControllerTest",
         "org.apache.heron.scheduler.kubernetes.KubernetesLauncherTest",
         "org.apache.heron.scheduler.kubernetes.VolumesTests",
+        "org.apache.heron.scheduler.kubernetes.KubernetesContextTest",
+        "org.apache.heron.scheduler.kubernetes.V1ControllerTest",
+        "org.apache.heron.scheduler.kubernetes.KubernetesUtilsTest",
     ],
     runtime_deps = [":kubernetes-tests"],
 )
diff --git 
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
 
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
new file mode 100644
index 0000000..9a762d6
--- /dev/null
+++ 
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.scheduler.kubernetes;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.heron.spi.common.Config;
+
+public class KubernetesContextTest {
+
+  public static final String KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME =
+      "heron.kubernetes.pod.template.configmap.name";
+  private static final String POD_TEMPLATE_CONFIGMAP_NAME = 
"pod-template-configmap-name";
+  private final Config config = Config.newBuilder().build();
+  private final Config configWithPodTemplateConfigMap = Config.newBuilder()
+      .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME,
+          POD_TEMPLATE_CONFIGMAP_NAME)
+      .build();
+
+  @Test
+  public void testPodTemplateConfigMapName() {
+    
Assert.assertEquals(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME,
+        KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME);
+    Assert.assertEquals(
+        
KubernetesContext.getPodTemplateConfigMapName(configWithPodTemplateConfigMap),
+        POD_TEMPLATE_CONFIGMAP_NAME);
+    Assert.assertNull(KubernetesContext.getPodTemplateConfigMapName(config));
+  }
+
+  @Test
+  public void testPodTemplateConfigMapDisabled() {
+    
Assert.assertFalse(KubernetesContext.getPodTemplateConfigMapDisabled(config));
+    Assert.assertFalse(KubernetesContext
+        .getPodTemplateConfigMapDisabled(configWithPodTemplateConfigMap));
+
+    final Config configWithPodTemplateConfigMapOff = Config.newBuilder()
+        .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME,
+            POD_TEMPLATE_CONFIGMAP_NAME)
+        .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_DISABLED, 
"TRUE")
+        .build();
+    Assert.assertTrue(KubernetesContext
+        .getPodTemplateConfigMapDisabled(configWithPodTemplateConfigMapOff));
+  }
+}
diff --git 
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
 
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
new file mode 100644
index 0000000..ddc1f69
--- /dev/null
+++ 
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.scheduler.kubernetes;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import org.apache.heron.scheduler.TopologySubmissionException;
+
+import io.kubernetes.client.openapi.models.V1EnvVar;
+import io.kubernetes.client.openapi.models.V1EnvVarSource;
+import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KubernetesUtilsTest {
+
+  @Test
+  public void testMergeListsDedupe() {
+    final String description = "Pod Template Environment Variables";
+    final List<V1EnvVar> heronEnvVars =
+        Collections.unmodifiableList(V1Controller.getExecutorEnvVars());
+    final V1EnvVar additionEnvVar = new V1EnvVar()
+        .name("env-variable-to-be-kept")
+        .valueFrom(new V1EnvVarSource()
+            .fieldRef(new V1ObjectFieldSelector()
+                .fieldPath("env-variable-was-kept")));
+    final List<V1EnvVar> expectedEnvVars = Collections.unmodifiableList(
+        new LinkedList<V1EnvVar>(V1Controller.getExecutorEnvVars()) {{
+          add(additionEnvVar);
+        }});
+    final List<V1EnvVar> inputEnvVars = Arrays.asList(
+        new V1EnvVar()
+            .name(KubernetesConstants.ENV_HOST)
+            .valueFrom(new V1EnvVarSource()
+                .fieldRef(new V1ObjectFieldSelector()
+                    .fieldPath("env-host-to-be-replaced"))),
+        new V1EnvVar()
+            .name(KubernetesConstants.ENV_POD_NAME)
+            .valueFrom(new V1EnvVarSource()
+                .fieldRef(new V1ObjectFieldSelector()
+                    .fieldPath("pod-name-to-be-replaced"))),
+        additionEnvVar
+    );
+
+    KubernetesUtils.V1ControllerUtils<V1EnvVar> v1ControllerUtils =
+        new KubernetesUtils.V1ControllerUtils<>();
+
+    // Both input lists are null.
+    Assert.assertNull("Both input lists are <null>",
+         v1ControllerUtils.mergeListsDedupe(null, null,
+             Comparator.comparing(V1EnvVar::getName), description));
+
+    // <primaryList> is <null>.
+    Assert.assertEquals("<primaryList> is null and <secondaryList> should be 
returned",
+        inputEnvVars,
+        v1ControllerUtils.mergeListsDedupe(null, inputEnvVars,
+            Comparator.comparing(V1EnvVar::getName), description));
+
+    // <primaryList> is empty.
+    Assert.assertEquals("<primaryList> is empty and <secondaryList> should be 
returned",
+        inputEnvVars,
+        v1ControllerUtils.mergeListsDedupe(new LinkedList<>(), inputEnvVars,
+            Comparator.comparing(V1EnvVar::getName), description));
+
+    // <secondaryList> is <null>.
+    Assert.assertEquals("<secondaryList> is null and <primaryList> should be 
returned",
+        heronEnvVars,
+        v1ControllerUtils.mergeListsDedupe(heronEnvVars, null,
+            Comparator.comparing(V1EnvVar::getName), description));
+
+    // <secondaryList> is empty.
+    Assert.assertEquals("<secondaryList> is empty and <primaryList> should be 
returned",
+        heronEnvVars,
+        v1ControllerUtils.mergeListsDedupe(heronEnvVars, new LinkedList<>(),
+            Comparator.comparing(V1EnvVar::getName), description));
+
+    // Merge both lists.
+    Assert.assertTrue("<primaryList> and <secondaryList> merged and 
deduplicated",
+        expectedEnvVars.containsAll(
+            v1ControllerUtils.mergeListsDedupe(heronEnvVars, inputEnvVars,
+                Comparator.comparing(V1EnvVar::getName), description)));
+
+    // Expect thrown error.
+    String errorMessage = "";
+    try {
+      v1ControllerUtils.mergeListsDedupe(heronEnvVars, 
Collections.singletonList(new V1EnvVar()),
+          Comparator.comparing(V1EnvVar::getName), description);
+    } catch (TopologySubmissionException e) {
+      errorMessage = e.getMessage();
+    }
+    Assert.assertTrue("Expecting error to be thrown for null deduplication 
key",
+        errorMessage.contains(description));
+  }
+}
diff --git 
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
 
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
new file mode 100644
index 0000000..c777383
--- /dev/null
+++ 
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
@@ -0,0 +1,740 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.scheduler.kubernetes;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.common.basics.Pair;
+import org.apache.heron.scheduler.TopologySubmissionException;
+import org.apache.heron.spi.common.Config;
+import org.apache.heron.spi.common.Key;
+import org.apache.heron.spi.packing.Resource;
+
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.models.V1ConfigMap;
+import io.kubernetes.client.openapi.models.V1ConfigMapBuilder;
+import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1ContainerBuilder;
+import io.kubernetes.client.openapi.models.V1ContainerPort;
+import io.kubernetes.client.openapi.models.V1EnvVar;
+import io.kubernetes.client.openapi.models.V1EnvVarSource;
+import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1PodSpecBuilder;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1ResourceRequirements;
+import io.kubernetes.client.openapi.models.V1Toleration;
+import io.kubernetes.client.openapi.models.V1Volume;
+import io.kubernetes.client.openapi.models.V1VolumeBuilder;
+import io.kubernetes.client.openapi.models.V1VolumeMount;
+import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+
+@RunWith(MockitoJUnitRunner.class)
+public class V1ControllerTest {
+
+  private static final String TOPOLOGY_NAME = "topology-name";
+  private static final String CONFIGMAP_NAME = "CONFIG-MAP-NAME";
+  private static final String POD_TEMPLATE_NAME = "POD-TEMPLATE-NAME";
+  private static final String CONFIGMAP_POD_TEMPLATE_NAME =
+      String.format("%s.%s", CONFIGMAP_NAME, POD_TEMPLATE_NAME);
+  private static final String POD_TEMPLATE_VALID =
+      "apiVersion: apps/v1\n"
+          + "kind: PodTemplate\n"
+          + "metadata:\n"
+          + "  name: heron-tracker\n"
+          + "  namespace: default\n"
+          + "template:\n"
+          + "  metadata:\n"
+          + "    labels:\n"
+          + "      app: heron-tracker\n"
+          + "  spec:\n"
+          + "    containers:\n"
+          + "      - name: heron-tracker\n"
+          + "        image: apache/heron:latest\n"
+          + "        ports:\n"
+          + "          - containerPort: 8888\n"
+          + "            name: api-port\n"
+          + "        resources:\n"
+          + "          requests:\n"
+          + "            cpu: \"100m\"\n"
+          + "            memory: \"200M\"\n"
+          + "          limits:\n"
+          + "            cpu: \"400m\"\n"
+          + "            memory: \"512M\"";
+
+  private final Config config = Config.newBuilder().build();
+  private final Config configWithPodTemplate = Config.newBuilder()
+      .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME, 
CONFIGMAP_POD_TEMPLATE_NAME)
+      .build();
+  private final Config runtime = Config.newBuilder()
+      .put(Key.TOPOLOGY_NAME, TOPOLOGY_NAME)
+      .build();
+  private final Config configDisabledPodTemplate = Config.newBuilder()
+      .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME, 
CONFIGMAP_POD_TEMPLATE_NAME)
+      .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_DISABLED, 
"true")
+      .build();
+
+  @Spy
+  private final V1Controller v1ControllerWithPodTemplate =
+      new V1Controller(configWithPodTemplate, runtime);
+
+  @Spy
+  private final V1Controller v1ControllerPodTemplate =
+      new V1Controller(configDisabledPodTemplate, runtime);
+
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testLoadPodFromTemplateDefault() {
+    final V1Controller v1ControllerNoPodTemplate = new V1Controller(config, 
runtime);
+    final V1PodTemplateSpec podSpec = 
v1ControllerNoPodTemplate.loadPodFromTemplate();
+
+    Assert.assertEquals(podSpec, new V1PodTemplateSpec());
+  }
+
+  @Test
+  public void testLoadPodFromTemplateNullConfigMap() {
+    final String expected = "unable to locate";
+    String message = "";
+
+    doReturn(null)
+        .when(v1ControllerWithPodTemplate)
+        .getConfigMap(anyString());
+    try {
+      v1ControllerWithPodTemplate.loadPodFromTemplate();
+    } catch (TopologySubmissionException e) {
+      message = e.getMessage();
+    }
+    Assert.assertTrue(message.contains(expected));
+  }
+
+  @Test
+  public void testLoadPodFromTemplateNoConfigMap() {
+    final String expected = "Failed to locate Pod Template";
+    String message = "";
+
+    doReturn(new V1ConfigMap())
+        .when(v1ControllerWithPodTemplate)
+        .getConfigMap(anyString());
+    try {
+      v1ControllerWithPodTemplate.loadPodFromTemplate();
+    } catch (TopologySubmissionException e) {
+      message = e.getMessage();
+    }
+    Assert.assertTrue(message.contains(expected));
+  }
+
+  @Test
+  public void testLoadPodFromTemplateNoTargetConfigMap() {
+    final String expected = "Failed to locate Pod Template";
+    String message = "";
+    V1ConfigMap configMapNoTargetData = new V1ConfigMapBuilder()
+        .withNewMetadata()
+          .withName(CONFIGMAP_NAME)
+        .endMetadata()
+        .addToData("Dummy Key", "Dummy Value")
+        .build();
+
+    doReturn(configMapNoTargetData)
+        .when(v1ControllerWithPodTemplate)
+        .getConfigMap(anyString());
+    try {
+      v1ControllerWithPodTemplate.loadPodFromTemplate();
+    } catch (TopologySubmissionException e) {
+      message = e.getMessage();
+    }
+    Assert.assertTrue(message.contains(expected));
+  }
+
+  @Test
+  public void testLoadPodFromTemplateBadTargetConfigMap() {
+    final String expected = "Error parsing";
+    String message = "";
+
+    // ConfigMap with target ConfigMap and an invalid Pod Template.
+    V1ConfigMap configMapInvalidPod = new V1ConfigMapBuilder()
+        .withNewMetadata()
+          .withName(CONFIGMAP_NAME)
+        .endMetadata()
+        .addToData(POD_TEMPLATE_NAME, "Dummy Value")
+        .build();
+
+    doReturn(configMapInvalidPod)
+        .when(v1ControllerWithPodTemplate)
+        .getConfigMap(anyString());
+    try {
+      v1ControllerWithPodTemplate.loadPodFromTemplate();
+    } catch (TopologySubmissionException e) {
+      message = e.getMessage();
+    }
+    Assert.assertTrue("Invalid Pod Template parsing should fail", 
message.contains(expected));
+
+    // ConfigMap with target ConfigMaps and an empty Pod Template.
+    V1ConfigMap configMapEmptyPod = new V1ConfigMapBuilder()
+        .withNewMetadata()
+          .withName(CONFIGMAP_NAME)
+        .endMetadata()
+        .addToData(POD_TEMPLATE_NAME, "")
+        .build();
+
+    doReturn(configMapEmptyPod)
+        .when(v1ControllerWithPodTemplate)
+        .getConfigMap(anyString());
+    try {
+      v1ControllerWithPodTemplate.loadPodFromTemplate();
+    } catch (TopologySubmissionException e) {
+      message = e.getMessage();
+    }
+    Assert.assertTrue("Empty Pod Template parsing should fail", 
message.contains(expected));
+  }
+
+  @Test
+  public void testLoadPodFromTemplateValidConfigMap() {
+    final String expected =
+        "        containers: [class V1Container {\n"
+        + "            args: null\n"
+        + "            command: null\n"
+        + "            env: null\n"
+        + "            envFrom: null\n"
+        + "            image: apache/heron:latest\n"
+        + "            imagePullPolicy: null\n"
+        + "            lifecycle: null\n"
+        + "            livenessProbe: null\n"
+        + "            name: heron-tracker\n"
+        + "            ports: [class V1ContainerPort {\n"
+        + "                containerPort: 8888\n"
+        + "                hostIP: null\n"
+        + "                hostPort: null\n"
+        + "                name: api-port\n"
+        + "                protocol: null\n"
+        + "            }]\n"
+        + "            readinessProbe: null\n"
+        + "            resources: class V1ResourceRequirements {\n"
+        + "                limits: {cpu=Quantity{number=0.400, 
format=DECIMAL_SI}, "
+        + "memory=Quantity{number=512000000, format=DECIMAL_SI}}\n"
+        + "                requests: {cpu=Quantity{number=0.100, 
format=DECIMAL_SI}, "
+        + "memory=Quantity{number=200000000, format=DECIMAL_SI}}\n"
+        + "            }\n"
+        + "            securityContext: null\n"
+        + "            startupProbe: null\n"
+        + "            stdin: null\n"
+        + "            stdinOnce: null\n"
+        + "            terminationMessagePath: null\n"
+        + "            terminationMessagePolicy: null\n"
+        + "            tty: null\n"
+        + "            volumeDevices: null\n"
+        + "            volumeMounts: null\n"
+        + "            workingDir: null\n"
+        + "        }]";
+
+
+    // ConfigMap with valid Pod Template.
+    V1ConfigMap configMapValidPod = new V1ConfigMapBuilder()
+        .withNewMetadata()
+          .withName(CONFIGMAP_NAME)
+        .endMetadata()
+        .addToData(POD_TEMPLATE_NAME, POD_TEMPLATE_VALID)
+        .build();
+    doReturn(configMapValidPod)
+        .when(v1ControllerWithPodTemplate)
+        .getConfigMap(anyString());
+    V1PodTemplateSpec podTemplateSpec = 
v1ControllerWithPodTemplate.loadPodFromTemplate();
+
+    Assert.assertTrue(podTemplateSpec.toString().contains(expected));
+  }
+
+  @Test
+  public void testLoadPodFromTemplateInvalidConfigMap() {
+    // ConfigMap with an invalid Pod Template.
+    final String invalidPodTemplate =
+        "apiVersion: apps/v1\n"
+            + "kind: InvalidTemplate\n"
+            + "metadata:\n"
+            + "  name: heron-tracker\n"
+            + "  namespace: default\n"
+            + "template:\n"
+            + "  metadata:\n"
+            + "    labels:\n"
+            + "      app: heron-tracker\n"
+            + "  spec:\n";
+    V1ConfigMap configMap = new V1ConfigMapBuilder()
+        .withNewMetadata()
+          .withName(CONFIGMAP_NAME)
+        .endMetadata()
+        .addToData(POD_TEMPLATE_NAME, invalidPodTemplate)
+        .build();
+    final String expected = "Error parsing";
+    String message = "";
+
+    doReturn(configMap)
+        .when(v1ControllerWithPodTemplate)
+        .getConfigMap(anyString());
+    try {
+      v1ControllerWithPodTemplate.loadPodFromTemplate();
+    } catch (TopologySubmissionException e) {
+      message = e.getMessage();
+    }
+    Assert.assertTrue(message.contains(expected));
+  }
+
+  @Test
+  public void testDisablePodTemplates() {
+    // ConfigMap with valid Pod Template.
+    V1ConfigMap configMapValidPod = new V1ConfigMapBuilder()
+        .withNewMetadata()
+          .withName(CONFIGMAP_NAME)
+        .endMetadata()
+        .addToData(POD_TEMPLATE_NAME, POD_TEMPLATE_VALID)
+        .build();
+    final String expected = "Pod Templates are disabled";
+    String message = "";
+    doReturn(configMapValidPod)
+        .when(v1ControllerPodTemplate)
+        .getConfigMap(anyString());
+
+    try {
+      v1ControllerPodTemplate.loadPodFromTemplate();
+    } catch (TopologySubmissionException e) {
+      message = e.getMessage();
+    }
+    Assert.assertTrue(message.contains(expected));
+  }
+
+  @Test
+  public void testGetPodTemplateLocationPassing() {
+    final Config testConfig = Config.newBuilder()
+        .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME, 
CONFIGMAP_POD_TEMPLATE_NAME)
+        .build();
+    final V1Controller v1Controller = new V1Controller(testConfig, runtime);
+    final Pair<String, String> expected = new Pair<>(CONFIGMAP_NAME, 
POD_TEMPLATE_NAME);
+    Pair<String, String> actual;
+
+    // Correct parsing
+    actual = v1Controller.getPodTemplateLocation();
+    Assert.assertEquals(actual, expected);
+  }
+
+  @Test
+  public void testGetPodTemplateLocationNoConfigMap() {
+    expectedException.expect(TopologySubmissionException.class);
+    final Config testConfig = Config.newBuilder()
+        .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME,
+        ".POD-TEMPLATE-NAME").build();
+    V1Controller v1Controller = new V1Controller(testConfig, runtime);
+    v1Controller.getPodTemplateLocation();
+  }
+
+  @Test
+  public void testGetPodTemplateLocationNoPodTemplate() {
+    expectedException.expect(TopologySubmissionException.class);
+    final Config testConfig = Config.newBuilder()
+        .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME,
+        "CONFIGMAP-NAME.").build();
+    V1Controller v1Controller = new V1Controller(testConfig, runtime);
+    v1Controller.getPodTemplateLocation();
+  }
+
+  @Test
+  public void testGetPodTemplateLocationNoDelimiter() {
+    expectedException.expect(TopologySubmissionException.class);
+    final Config testConfig = Config.newBuilder()
+        .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME,
+        "CONFIGMAP-NAMEPOD-TEMPLATE-NAME").build();
+    V1Controller v1Controller = new V1Controller(testConfig, runtime);
+    v1Controller.getPodTemplateLocation();
+  }
+
+  @Test
+  public void testConfigureContainerPorts() {
+    final String portNamekept = "random-port-to-be-kept";
+    final int portNumberkept = 1111;
+    final int numInstances = 3;
+    final List<V1ContainerPort> expectedPortsBase =
+        Collections.unmodifiableList(V1Controller.getExecutorPorts());
+    final List<V1ContainerPort> debugPorts =
+        
Collections.unmodifiableList(V1Controller.getDebuggingPorts(numInstances));
+    final List<V1ContainerPort> inputPortsBase = Collections.unmodifiableList(
+        Arrays.asList(
+            new V1ContainerPort()
+                
.name("server-port-to-replace").containerPort(KubernetesConstants.SERVER_PORT),
+            new V1ContainerPort()
+                
.name("shell-port-to-replace").containerPort(KubernetesConstants.SHELL_PORT),
+            new 
V1ContainerPort().name(portNamekept).containerPort(portNumberkept)
+        )
+    );
+
+    // Null ports. This is the default case.
+    final V1Container inputContainerWithNullPorts = new 
V1ContainerBuilder().build();
+    v1ControllerWithPodTemplate.configureContainerPorts(false, 0, 
inputContainerWithNullPorts);
+    Assert.assertTrue("Server and/or shell PORTS for container with null ports 
list",
+        CollectionUtils.containsAll(inputContainerWithNullPorts.getPorts(), 
expectedPortsBase));
+
+    // Empty ports.
+    final V1Container inputContainerWithEmptyPorts = new V1ContainerBuilder()
+        .withPorts(new LinkedList<>())
+        .build();
+    v1ControllerWithPodTemplate.configureContainerPorts(false, 0, 
inputContainerWithEmptyPorts);
+    Assert.assertTrue("Server and/or shell PORTS for container with empty 
ports list",
+        CollectionUtils.containsAll(inputContainerWithEmptyPorts.getPorts(), 
expectedPortsBase));
+
+    // Port overriding.
+    final List<V1ContainerPort> inputPorts = new LinkedList<>(inputPortsBase);
+    final V1Container inputContainerWithPorts = new V1ContainerBuilder()
+        .withPorts(inputPorts)
+        .build();
+    final List<V1ContainerPort> expectedPortsOverriding = new 
LinkedList<>(expectedPortsBase);
+    expectedPortsOverriding
+        .add(new 
V1ContainerPort().name(portNamekept).containerPort(portNumberkept));
+
+    v1ControllerWithPodTemplate.configureContainerPorts(false, 0, 
inputContainerWithPorts);
+    Assert.assertTrue("Server and/or shell PORTS for container should be 
overwritten.",
+        CollectionUtils.containsAll(inputContainerWithPorts.getPorts(), 
expectedPortsOverriding));
+
+    // Port overriding with debug ports.
+    final List<V1ContainerPort> inputPortsWithDebug = new 
LinkedList<>(debugPorts);
+    inputPortsWithDebug.addAll(inputPortsBase);
+    final V1Container inputContainerWithDebug = new V1ContainerBuilder()
+        .withPorts(inputPortsWithDebug)
+        .build();
+    final List<V1ContainerPort> expectedPortsDebug = new 
LinkedList<>(expectedPortsBase);
+    expectedPortsDebug.add(new 
V1ContainerPort().name(portNamekept).containerPort(portNumberkept));
+    expectedPortsDebug.addAll(debugPorts);
+
+    v1ControllerWithPodTemplate.configureContainerPorts(
+        true, numInstances, inputContainerWithDebug);
+    Assert.assertTrue("Server and/or shell with debug PORTS for container 
should be overwritten.",
+        CollectionUtils.containsAll(inputContainerWithDebug.getPorts(), 
expectedPortsDebug));
+  }
+
+  @Test
+  public void testConfigureContainerEnvVars() {
+    final List<V1EnvVar> heronEnvVars =
+        Collections.unmodifiableList(V1Controller.getExecutorEnvVars());
+    final V1EnvVar additionEnvVar = new V1EnvVar()
+        .name("env-variable-to-be-kept")
+        .valueFrom(new V1EnvVarSource()
+            .fieldRef(new V1ObjectFieldSelector()
+                .fieldPath("env-variable-was-kept")));
+    final List<V1EnvVar> inputEnvVars = Arrays.asList(
+        new V1EnvVar()
+            .name(KubernetesConstants.ENV_HOST)
+            .valueFrom(new V1EnvVarSource()
+                .fieldRef(new V1ObjectFieldSelector()
+                    .fieldPath("env-host-to-be-replaced"))),
+        new V1EnvVar()
+            .name(KubernetesConstants.ENV_POD_NAME)
+            .valueFrom(new V1EnvVarSource()
+                .fieldRef(new V1ObjectFieldSelector()
+                    .fieldPath("pod-name-to-be-replaced"))),
+        additionEnvVar
+    );
+
+    // Null env vars. This is the default case.
+    V1Container containerWithNullEnvVars = new V1ContainerBuilder().build();
+    
v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithNullEnvVars);
+    Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with null Env Vars 
should match",
+        CollectionUtils.containsAll(containerWithNullEnvVars.getEnv(), 
heronEnvVars));
+
+    // Empty env vars.
+    V1Container containerWithEmptyEnvVars = new V1ContainerBuilder()
+        .withEnv(new LinkedList<>())
+        .build();
+    
v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithEmptyEnvVars);
+    Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with empty Env 
Vars should match",
+        CollectionUtils.containsAll(containerWithEmptyEnvVars.getEnv(), 
heronEnvVars));
+
+    // Env Var overriding.
+    final List<V1EnvVar> expectedOverriding = new LinkedList<>(heronEnvVars);
+    expectedOverriding.add(additionEnvVar);
+    V1Container containerWithEnvVars = new V1ContainerBuilder()
+        .withEnv(inputEnvVars)
+        .build();
+    
v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithEnvVars);
+    Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with Env Vars 
should be overridden",
+        CollectionUtils.containsAll(containerWithEnvVars.getEnv(), 
expectedOverriding));
+  }
+
+  @Test
+  public void testConfigureContainerResources() {
+    final Resource resourceDefault = new Resource(
+        9, ByteAmount.fromGigabytes(19), ByteAmount.fromGigabytes(99));
+    final Resource resourceCustom = new Resource(
+        4, ByteAmount.fromGigabytes(34), ByteAmount.fromGigabytes(400));
+
+    final Quantity defaultRAM = Quantity.fromString(
+        KubernetesUtils.Megabytes(resourceDefault.getRam()));
+    final Quantity defaultCPU = Quantity.fromString(
+        Double.toString(V1Controller.roundDecimal(resourceDefault.getCpu(), 
3)));
+    final Quantity customRAM = Quantity.fromString(
+        KubernetesUtils.Megabytes(resourceCustom.getRam()));
+    final Quantity customCPU = Quantity.fromString(
+        Double.toString(V1Controller.roundDecimal(resourceCustom.getCpu(), 
3)));
+    final Quantity customDisk = Quantity.fromString(
+        
Double.toString(V1Controller.roundDecimal(resourceCustom.getDisk().getValue(), 
3)));
+
+    final Config configNoLimit = Config.newBuilder()
+        .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "NOT_SET")
+        .build();
+    final Config configWithLimit = Config.newBuilder()
+        .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, 
"EQUAL_TO_LIMIT")
+        .build();
+
+    final V1ResourceRequirements expectDefaultRequirements = new 
V1ResourceRequirements()
+        .putLimitsItem(KubernetesConstants.MEMORY, defaultRAM)
+        .putLimitsItem(KubernetesConstants.CPU, defaultCPU);
+
+    final V1ResourceRequirements expectCustomRequirements = new 
V1ResourceRequirements()
+        .putLimitsItem(KubernetesConstants.MEMORY, defaultRAM)
+        .putLimitsItem(KubernetesConstants.CPU, defaultCPU)
+        .putLimitsItem("disk", customDisk);
+
+    final V1ResourceRequirements customRequirements = new 
V1ResourceRequirements()
+        .putLimitsItem(KubernetesConstants.MEMORY, customRAM)
+        .putLimitsItem(KubernetesConstants.CPU, customCPU)
+        .putLimitsItem("disk", customDisk);
+
+    // Default. Null resources.
+    V1Container containerNull = new V1ContainerBuilder().build();
+    v1ControllerWithPodTemplate.configureContainerResources(
+        containerNull, configNoLimit, resourceDefault);
+    Assert.assertTrue("Default LIMITS should be set in container with null 
LIMITS",
+        containerNull.getResources().getLimits().entrySet()
+            .containsAll(expectDefaultRequirements.getLimits().entrySet()));
+
+    // Empty resources.
+    V1Container containerEmpty = new 
V1ContainerBuilder().withNewResources().endResources().build();
+    v1ControllerWithPodTemplate.configureContainerResources(
+        containerEmpty, configNoLimit, resourceDefault);
+    Assert.assertTrue("Default LIMITS should be set in container with empty 
LIMITS",
+        containerNull.getResources().getLimits().entrySet()
+            .containsAll(expectDefaultRequirements.getLimits().entrySet()));
+
+    // Custom resources.
+    V1Container containerCustom = new V1ContainerBuilder()
+        .withResources(customRequirements)
+        .build();
+    v1ControllerWithPodTemplate.configureContainerResources(
+        containerCustom, configNoLimit, resourceDefault);
+    Assert.assertTrue("Custom LIMITS should be set in container with custom 
LIMITS",
+        containerCustom.getResources().getLimits().entrySet()
+            .containsAll(expectCustomRequirements.getLimits().entrySet()));
+
+    // Custom resources with request.
+    V1Container containerRequests = new V1ContainerBuilder()
+        .withResources(customRequirements)
+        .build();
+    v1ControllerWithPodTemplate.configureContainerResources(
+        containerRequests, configWithLimit, resourceDefault);
+    Assert.assertTrue("Custom LIMITS should be set in container with custom 
LIMITS and REQUEST",
+        containerRequests.getResources().getLimits().entrySet()
+            .containsAll(expectCustomRequirements.getLimits().entrySet()));
+    Assert.assertTrue("Custom REQUEST should be set in container with custom 
LIMITS and REQUEST",
+        containerRequests.getResources().getRequests().entrySet()
+            .containsAll(expectCustomRequirements.getLimits().entrySet()));
+  }
+
+  @Test
+  public void testAddVolumesIfPresent() {
+    final String pathDefault = "config-host-volume-path";
+    final String pathNameDefault = "config-host-volume-name";
+    final Config configWithVolumes = Config.newBuilder()
+        .put(KubernetesContext.KUBERNETES_VOLUME_NAME, pathNameDefault)
+        .put(KubernetesContext.KUBERNETES_VOLUME_TYPE, Volumes.HOST_PATH)
+        .put(KubernetesContext.KUBERNETES_VOLUME_HOSTPATH_PATH, pathDefault)
+        .build();
+    final V1Controller controllerWithVol = new V1Controller(configWithVolumes, 
runtime);
+
+    final V1Volume volumeDefault = new V1VolumeBuilder()
+        .withName(pathNameDefault)
+        .withNewHostPath()
+          .withNewPath(pathDefault)
+        .endHostPath()
+        .build();
+    final V1Volume volumeToBeKept = new V1VolumeBuilder()
+        .withName("volume-to-be-kept-name")
+        .withNewHostPath()
+          .withNewPath("volume-to-be-kept-path")
+        .endHostPath()
+        .build();
+
+    final List<V1Volume> customVolumeList = Arrays.asList(
+        new V1VolumeBuilder()
+            .withName(pathNameDefault)
+            .withNewHostPath()
+              .withNewPath("this-path-must-be-replaced")
+            .endHostPath()
+            .build(),
+        volumeToBeKept
+    );
+    final List<V1Volume> expectedDefault = 
Collections.singletonList(volumeDefault);
+    final List<V1Volume> expectedCustom = Arrays.asList(volumeDefault, 
volumeToBeKept);
+
+    // No Volumes set.
+    V1Controller controllerDoNotSetVolumes = new 
V1Controller(Config.newBuilder().build(), runtime);
+    V1PodSpec podSpecNoSetVolumes = new V1PodSpec();
+    controllerDoNotSetVolumes.addVolumesIfPresent(podSpecNoSetVolumes);
+    Assert.assertNull(podSpecNoSetVolumes.getVolumes());
+
+    // Default. Null Volumes.
+    V1PodSpec podSpecNull = new V1PodSpecBuilder().build();
+    controllerWithVol.addVolumesIfPresent(podSpecNull);
+    Assert.assertTrue("Default VOLUMES should be set in container with null 
VOLUMES",
+        CollectionUtils.containsAll(expectedDefault, 
podSpecNull.getVolumes()));
+
+    // Empty Volumes list
+    V1PodSpec podSpecEmpty = new V1PodSpecBuilder()
+        .withVolumes(new LinkedList<>())
+        .build();
+    controllerWithVol.addVolumesIfPresent(podSpecEmpty);
+    Assert.assertTrue("Default VOLUMES should be set in container with empty 
VOLUMES",
+        CollectionUtils.containsAll(expectedDefault, 
podSpecEmpty.getVolumes()));
+
+    // Custom Volumes list
+    V1PodSpec podSpecCustom = new V1PodSpecBuilder()
+        .withVolumes(customVolumeList)
+        .build();
+    controllerWithVol.addVolumesIfPresent(podSpecCustom);
+    Assert.assertTrue("Default VOLUMES should be set in container with custom 
VOLUMES",
+        CollectionUtils.containsAll(expectedCustom, 
podSpecCustom.getVolumes()));
+  }
+
+  @Test
+  public void testMountVolumeIfPresent() {
+    final String pathDefault = "config-host-volume-path";
+    final String pathNameDefault = "config-host-volume-name";
+    final Config configWithVolumes = Config.newBuilder()
+        .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME, 
pathNameDefault)
+        .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH, 
pathDefault)
+        .build();
+    final V1Controller controllerWithMounts = new 
V1Controller(configWithVolumes, runtime);
+    final V1VolumeMount volumeDefault = new V1VolumeMountBuilder()
+        .withName(pathNameDefault)
+        .withMountPath(pathDefault)
+        .build();
+    final V1VolumeMount volumeCustom = new V1VolumeMountBuilder()
+        .withName("custom-volume-mount")
+        .withMountPath("should-be-kept")
+        .build();
+
+    final List<V1VolumeMount> expectedMountsDefault = 
Collections.singletonList(volumeDefault);
+    final List<V1VolumeMount> expectedMountsCustom = 
Arrays.asList(volumeCustom, volumeDefault);
+    final List<V1VolumeMount> volumeMountsCustomList = Arrays.asList(
+        new V1VolumeMountBuilder()
+            .withName(pathNameDefault)
+            .withMountPath("should-be-replaced")
+            .build(),
+        volumeCustom
+    );
+
+    // No Volume Mounts set.
+    V1Controller controllerDoNotSetMounts = new 
V1Controller(Config.newBuilder().build(), runtime);
+    V1Container containerNoSetMounts = new V1Container();
+    controllerDoNotSetMounts.mountVolumeIfPresent(containerNoSetMounts);
+    Assert.assertNull(containerNoSetMounts.getVolumeMounts());
+
+    // Default. Null Volume Mounts.
+    V1Container containerNull = new V1ContainerBuilder().build();
+    controllerWithMounts.mountVolumeIfPresent(containerNull);
+    Assert.assertTrue("Default VOLUME MOUNTS should be set in container with 
null VOLUME MOUNTS",
+        CollectionUtils.containsAll(expectedMountsDefault, 
containerNull.getVolumeMounts()));
+
+    // Empty Volume Mounts.
+    V1Container containerEmpty = new V1ContainerBuilder()
+        .withVolumeMounts(new LinkedList<>())
+        .build();
+    controllerWithMounts.mountVolumeIfPresent(containerEmpty);
+    Assert.assertTrue("Default VOLUME MOUNTS should be set in container with 
empty VOLUME MOUNTS",
+        CollectionUtils.containsAll(expectedMountsDefault, 
containerEmpty.getVolumeMounts()));
+
+    // Custom Volume Mounts.
+    V1Container containerCustom = new V1ContainerBuilder()
+        .withVolumeMounts(volumeMountsCustomList)
+        .build();
+    controllerWithMounts.mountVolumeIfPresent(containerCustom);
+    Assert.assertTrue("Default VOLUME MOUNTS should be set in container with 
custom VOLUME MOUNTS",
+        CollectionUtils.containsAll(expectedMountsCustom, 
containerCustom.getVolumeMounts()));
+  }
+
+  @Test
+  public void testConfigureTolerations() {
+    final V1Toleration keptToleration = new V1Toleration()
+        .key("kept toleration")
+        .operator("Some Operator")
+        .effect("Some Effect")
+        .tolerationSeconds(5L);
+    final List<V1Toleration> expectedTolerationBase =
+        Collections.unmodifiableList(V1Controller.getTolerations());
+    final List<V1Toleration> inputTolerationsBase = 
Collections.unmodifiableList(
+        Arrays.asList(
+            new V1Toleration()
+                
.key(KubernetesConstants.TOLERATIONS.get(0)).operator("replace").effect("replace"),
+            new V1Toleration()
+                
.key(KubernetesConstants.TOLERATIONS.get(1)).operator("replace").effect("replace"),
+            keptToleration
+        )
+    );
+
+    // Null Tolerations. This is the default case.
+    final V1PodSpec podSpecNullTolerations = new V1PodSpecBuilder().build();
+    v1ControllerWithPodTemplate.configureTolerations(podSpecNullTolerations);
+    Assert.assertTrue("Pod Spec has null TOLERATIONS and should be set to 
Heron's defaults",
+        CollectionUtils.containsAll(podSpecNullTolerations.getTolerations(),
+            expectedTolerationBase));
+
+    // Empty Tolerations.
+    final V1PodSpec podSpecWithEmptyTolerations = new V1PodSpecBuilder()
+        .withTolerations(new LinkedList<>())
+        .build();
+    
v1ControllerWithPodTemplate.configureTolerations(podSpecWithEmptyTolerations);
+    Assert.assertTrue("Pod Spec has empty TOLERATIONS and should be set to 
Heron's defaults",
+        
CollectionUtils.containsAll(podSpecWithEmptyTolerations.getTolerations(),
+            expectedTolerationBase));
+
+    // Toleration overriding.
+    final V1PodSpec podSpecWithTolerations = new V1PodSpecBuilder()
+        .withTolerations(inputTolerationsBase)
+        .build();
+    final List<V1Toleration> expectedTolerationsOverriding =
+        new LinkedList<>(expectedTolerationBase);
+    expectedTolerationsOverriding.add(keptToleration);
+
+    v1ControllerWithPodTemplate.configureTolerations(podSpecWithTolerations);
+    Assert.assertTrue("Pod Spec has TOLERATIONS and should be overridden with 
Heron's defaults",
+        CollectionUtils.containsAll(podSpecWithTolerations.getTolerations(),
+            expectedTolerationsOverriding));
+  }
+}

Reply via email to