[FLINK-5091] Formalize the Mesos AppMaster environment for docker compatibility

- introduced ContainerSpecification.
- reworked how the TM container environment is constructed; eliminated
- special-case environment variables, file layout.
- added dynamic configuration support to GlobalConfiguration.
- integrated the SecurityContext into AM/TM runners.
- added config setting for Mesos framework user.
- support DCOS side-channel authentication.
- set the FS default scheme.
- made the artifact server more generic (no assumption about existence
- of dispatcher, Path-based).
- moved some test code related to overriding the JVM’s env.
- moved the Mesos containerizer config code to the MesosTaskManagerParameters.

This closes #2915.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/230bf17b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/230bf17b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/230bf17b

Branch: refs/heads/master
Commit: 230bf17bac3d76959a5cb6aa73ac685757c51cab
Parents: 3b85f42
Author: wrighe3 <[email protected]>
Authored: Thu Dec 1 00:21:28 2016 -0800
Committer: Maximilian Michels <[email protected]>
Committed: Tue Dec 6 00:29:25 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  15 +
 .../configuration/GlobalConfiguration.java      |  27 +-
 flink-dist/src/main/assemblies/bin.xml          |   6 +
 .../main/flink-bin/mesos-bin/mesos-appmaster.sh |  51 +++
 .../flink-bin/mesos-bin/mesos-taskmanager.sh    |  60 +++
 .../main/java/org/apache/flink/mesos/Utils.java |  21 +
 .../clusterframework/LaunchableMesosWorker.java | 106 ++++-
 .../MesosApplicationMasterRunner.java           | 456 ++++++-------------
 .../clusterframework/MesosConfigKeys.java       |  25 +-
 .../MesosFlinkResourceManager.java              |  45 +-
 .../MesosTaskManagerParameters.java             | 106 ++++-
 .../MesosTaskManagerRunner.java                 |  73 ++-
 .../flink/mesos/util/MesosArtifactResolver.java |  31 ++
 .../flink/mesos/util/MesosArtifactServer.java   | 146 ++++--
 .../MesosFlinkResourceManagerTest.java          |  19 +-
 .../clusterframework/BootstrapTools.java        |  36 ++
 .../ContainerSpecification.java                 | 206 +++++++++
 .../overlays/AbstractContainerOverlay.java      |  72 +++
 .../overlays/CompositeContainerOverlay.java     |  49 ++
 .../overlays/ContainerOverlay.java              |  37 ++
 .../overlays/FlinkDistributionOverlay.java      | 126 +++++
 .../overlays/HadoopConfOverlay.java             | 147 ++++++
 .../overlays/HadoopUserOverlay.java             |  83 ++++
 .../overlays/KeytabOverlay.java                 | 102 +++++
 .../overlays/Krb5ConfOverlay.java               | 111 +++++
 .../overlays/SSLStoreOverlay.java               | 124 +++++
 .../flink/runtime/security/SecurityUtils.java   |   4 +-
 .../overlays/ContainerOverlayTestBase.java      |  73 +++
 .../overlays/FlinkDistributionOverlayTest.java  | 117 +++++
 .../overlays/HadoopConfOverlayTest.java         | 119 +++++
 .../overlays/HadoopUserOverlayTest.java         |  73 +++
 .../overlays/KeytabOverlayTest.java             |  71 +++
 .../overlays/Krb5ConfOverlayTest.java           |  59 +++
 .../overlays/SSLStoreOverlayTest.java           |  78 ++++
 .../flink/core/testutils/CommonTestUtils.java   |  39 ++
 .../apache/flink/test/util/TestBaseUtils.java   |  38 +-
 36 files changed, 2450 insertions(+), 501 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 6bc5e2e..a515c33 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -440,6 +440,11 @@ public final class ConfigConstants {
        // ------------------------ Mesos Configuration ------------------------
 
        /**
+        * The initial number of Mesos tasks to allocate.
+        */
+       public static final String MESOS_INITIAL_TASKS = "mesos.initial-tasks";
+
+       /**
         * The maximum number of failed Mesos tasks before entirely stopping
         * the Mesos session / job on Mesos.
         *
@@ -484,6 +489,8 @@ public final class ConfigConstants {
 
        public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET = 
"mesos.resourcemanager.framework.secret";
 
+       public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_USER = 
"mesos.resourcemanager.framework.user";
+
        /**
         * The cpus to acquire from Mesos.
         *
@@ -1186,6 +1193,8 @@ public final class ConfigConstants {
 
        public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE 
= "*";
 
+       public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER 
= "";
+
        /** Default value to override SSL support for the Artifact Server */
        public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = 
true;
 
@@ -1405,6 +1414,12 @@ public final class ConfigConstants {
        /** The environment variable name which contains the location of the 
lib folder */
        public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR";
 
+       /** The environment variable name which contains the location of the 
bin directory */
+       public static final String ENV_FLINK_BIN_DIR = "FLINK_BIN_DIR";
+
+       /** The environment variable name which contains the Flink installation 
root directory */
+       public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME";
+
        // -------------------------------- Security 
-------------------------------
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index ecfbc72..dca6307 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -39,12 +39,31 @@ public final class GlobalConfiguration {
 
        public static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
 
+
        // 
--------------------------------------------------------------------------------------------
 
        private GlobalConfiguration() {}
 
        // 
--------------------------------------------------------------------------------------------
 
+       private static Configuration dynamicProperties = null;
+
+       /**
+        * Set the process-wide dynamic properties to be merged with the loaded 
configuration.
+     */
+       public static void setDynamicProperties(Configuration 
dynamicProperties) {
+               GlobalConfiguration.dynamicProperties = new 
Configuration(dynamicProperties);
+       }
+
+       /**
+        * Get the dynamic properties.
+     */
+       public static Configuration getDynamicProperties() {
+               return GlobalConfiguration.dynamicProperties;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
        /**
         * Loads the global configuration from the environment. Fails if an 
error occurs during loading. Returns an
         * empty configuration object if the environment variable is not set. 
In production this variable is set but
@@ -90,7 +109,13 @@ public final class GlobalConfiguration {
                                        "' (" + confDirFile.getAbsolutePath() + 
") does not exist.");
                }
 
-               return loadYAMLResource(yamlConfigFile);
+               Configuration conf = loadYAMLResource(yamlConfigFile);
+
+               if(dynamicProperties != null) {
+                       conf.addAll(dynamicProperties);
+               }
+
+               return conf;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml 
b/flink-dist/src/main/assemblies/bin.xml
index b4291d3..901cac9 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -82,6 +82,12 @@ under the License.
                        <outputDirectory>bin</outputDirectory>
                        <fileMode>0755</fileMode>
                </fileSet>
+               <!-- copy Mesos start scripts -->
+               <fileSet>
+                       <directory>src/main/flink-bin/mesos-bin</directory>
+                       <outputDirectory>bin</outputDirectory>
+                       <fileMode>0755</fileMode>
+               </fileSet>
                
                <!-- copy default configuration -->
                <fileSet>

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh 
b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
new file mode 100755
index 0000000..d65c6b0
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
@@ -0,0 +1,51 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+# auxilliary function to construct a lightweight classpath for the
+# Flink AppMaster
+constructAppMasterClassPath() {
+
+    while read -d '' -r jarfile ; do
+        if [[ $CC_CLASSPATH = "" ]]; then
+            CC_CLASSPATH="$jarfile";
+        else
+            CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
+        fi
+    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
+
+    echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
+}
+
+CC_CLASSPATH=`manglePathList $(constructAppMasterClassPath)`
+
+log=flink-appmaster.log
+log_setting="-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties 
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_LIB_DIR
+
+$JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner 
"$@"
+

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh 
b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
new file mode 100755
index 0000000..ff03abd
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
@@ -0,0 +1,60 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+# auxilliary function to construct a lightweight classpath for the
+# Flink TaskManager
+constructTaskManagerClassPath() {
+
+    while read -d '' -r jarfile ; do
+        if [[ $CC_CLASSPATH = "" ]]; then
+            CC_CLASSPATH="$jarfile";
+        else
+            CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
+        fi
+    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
+
+    echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
+}
+
+CC_CLASSPATH=`manglePathList $(constructTaskManagerClassPath)`
+
+log=flink-taskmanager.log
+log_setting="-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties 
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+# Add precomputed memory JVM options
+if [ -z "${FLINK_ENV_JAVA_OPTS_MEM}" ]; then
+    FLINK_ENV_JAVA_OPTS_MEM=""
+fi
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_MEM}"
+
+# Add TaskManager-specific JVM options
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
+
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_LIB_DIR
+
+$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} -classpath "$CC_CLASSPATH" 
$log_setting org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager 
"$@"
+

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java 
b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
index 173ae33..7787e40 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.mesos;
 
+import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.mesos.Protos;
+import scala.Option;
 
 import java.net.URL;
 import java.util.Arrays;
@@ -46,6 +49,24 @@ public class Utils {
        }
 
        /**
+        * Construct a Mesos URI.
+        */
+       public static Protos.CommandInfo.URI uri(MesosArtifactResolver 
resolver, ContainerSpecification.Artifact artifact) {
+               Option<URL> url = resolver.resolve(artifact.dest);
+               if(url.isEmpty()) {
+                       throw new IllegalArgumentException("Unresolvable 
artifact: " + artifact.dest);
+               }
+
+               return Protos.CommandInfo.URI.newBuilder()
+                       .setValue(url.get().toExternalForm())
+                       .setOutputFile(artifact.dest.toString())
+                       .setExtract(artifact.extract)
+                       .setCache(artifact.cachable)
+                       .setExecutable(artifact.executable)
+                       .build();
+       }
+
+       /**
         * Construct a scalar resource value.
         */
        public static Protos.Resource scalar(String name, double value) {

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 5f940b5..c6e51f1 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -23,8 +23,11 @@ import com.netflix.fenzo.TaskAssignmentResult;
 import com.netflix.fenzo.TaskRequest;
 import com.netflix.fenzo.VMTaskFitnessCalculator;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.Utils;
 import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.util.MesosArtifactResolver;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.mesos.Protos;
 
 import java.util.Collections;
@@ -38,7 +41,10 @@ import static org.apache.flink.mesos.Utils.ranges;
 import static org.apache.flink.mesos.Utils.scalar;
 
 /**
- * Specifies how to launch a Mesos worker.
+ * Implements the launch of a Mesos worker.
+ *
+ * Translates the abstract {@link ContainerSpecification} into a concrete
+ * Mesos-specific {@link org.apache.mesos.Protos.TaskInfo}.
  */
 public class LaunchableMesosWorker implements LaunchableTask {
 
@@ -49,20 +55,24 @@ public class LaunchableMesosWorker implements 
LaunchableTask {
                "taskmanager.rpc.port",
                "taskmanager.data.port" };
 
+       private final MesosArtifactResolver resolver;
+       private final ContainerSpecification containerSpec;
        private final MesosTaskManagerParameters params;
-       private final Protos.TaskInfo.Builder template;
        private final Protos.TaskID taskID;
        private final Request taskRequest;
 
        /**
         * Construct a launchable Mesos worker.
         * @param params the TM parameters such as memory, cpu to acquire.
-        * @param template a template for the TaskInfo to be constructed at 
launch time.
+        * @param containerSpec an abstract container specification for launch 
time.
         * @param taskID the taskID for this worker.
         */
-       public LaunchableMesosWorker(MesosTaskManagerParameters params, 
Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+       public LaunchableMesosWorker(
+               MesosArtifactResolver resolver, MesosTaskManagerParameters 
params,
+               ContainerSpecification containerSpec, Protos.TaskID taskID) {
+               this.resolver = resolver;
                this.params = params;
-               this.template = template;
+               this.containerSpec = containerSpec;
                this.taskID = taskID;
                this.taskRequest = new Request();
        }
@@ -157,17 +167,25 @@ public class LaunchableMesosWorker implements 
LaunchableTask {
        @Override
        public Protos.TaskInfo launch(Protos.SlaveID slaveId, 
TaskAssignmentResult assignment) {
 
+               ContaineredTaskManagerParameters tmParams = 
params.containeredParameters();
+
                final Configuration dynamicProperties = new Configuration();
 
-               // specialize the TaskInfo template with assigned resources, 
environment variables, etc
-               final Protos.TaskInfo.Builder taskInfo = template
-                       .clone()
+               // incorporate the dynamic properties set by the template
+               
dynamicProperties.addAll(containerSpec.getDynamicConfiguration());
+
+               // build a TaskInfo with assigned resources, environment 
variables, etc
+               final Protos.TaskInfo.Builder taskInfo = 
Protos.TaskInfo.newBuilder()
                        .setSlaveId(slaveId)
                        .setTaskId(taskID)
                        .setName(taskID.getValue())
                        .addResources(scalar("cpus", 
assignment.getRequest().getCPUs()))
                        .addResources(scalar("mem", 
assignment.getRequest().getMemory()));
 
+               final Protos.CommandInfo.Builder cmd = 
taskInfo.getCommandBuilder();
+               final Protos.Environment.Builder env = 
cmd.getEnvironmentBuilder();
+               final StringBuilder jvmArgs = new StringBuilder();
+
                // use the assigned ports for the TM
                if (assignment.getAssignedPorts().size() < TM_PORT_KEYS.length) 
{
                        throw new IllegalArgumentException("unsufficient # of 
ports assigned");
@@ -179,17 +197,69 @@ public class LaunchableMesosWorker implements 
LaunchableTask {
                        dynamicProperties.setInteger(key, port);
                }
 
-               // finalize environment variables
-               final Protos.Environment.Builder environmentBuilder = 
taskInfo.getCommandBuilder().getEnvironmentBuilder();
+               // ship additional files
+               for(ContainerSpecification.Artifact artifact : 
containerSpec.getArtifacts()) {
+                       cmd.addUris(Utils.uri(resolver, artifact));
+               }
 
-               // propagate the Mesos task ID to the TM
-               environmentBuilder
-                       
.addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, 
taskInfo.getTaskId().getValue()));
+               // propagate environment variables
+               for (Map.Entry<String, String> entry : 
params.containeredParameters().taskManagerEnv().entrySet()) {
+                       env.addVariables(variable(entry.getKey(), 
entry.getValue()));
+               }
+               for (Map.Entry<String, String> entry : 
containerSpec.getEnvironmentVariables().entrySet()) {
+                       env.addVariables(variable(entry.getKey(), 
entry.getValue()));
+               }
 
-               // propagate the dynamic configuration properties to the TM
-               String dynamicPropertiesEncoded = 
FlinkMesosSessionCli.encodeDynamicProperties(dynamicProperties);
-               environmentBuilder
-                       
.addVariables(variable(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES, 
dynamicPropertiesEncoded));
+               // propagate the Mesos task ID to the TM
+               
env.addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, 
taskInfo.getTaskId().getValue()));
+
+               // finalize the memory parameters
+               jvmArgs.append(" 
-Xms").append(tmParams.taskManagerHeapSizeMB()).append("m");
+               jvmArgs.append(" 
-Xmx").append(tmParams.taskManagerHeapSizeMB()).append("m");
+               jvmArgs.append(" 
-XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append("m");
+
+               // pass dynamic system properties
+               jvmArgs.append(' ').append(
+                       
ContainerSpecification.formatSystemProperties(containerSpec.getSystemProperties()));
+
+               // finalize JVM args
+               env.addVariables(variable(MesosConfigKeys.ENV_JVM_ARGS, 
jvmArgs.toString()));
+
+               // build the launch command w/ dynamic application properties
+               StringBuilder launchCommand = new 
StringBuilder("$FLINK_HOME/bin/mesos-taskmanager.sh ");
+               
launchCommand.append(ContainerSpecification.formatSystemProperties(dynamicProperties));
+               cmd.setValue(launchCommand.toString());
+
+               // build the container info
+               Protos.ContainerInfo.Builder containerInfo = null;
+               switch(params.containerType()) {
+                       case MESOS:
+                               if(params.containerImageName().isDefined()) {
+                                       containerInfo = 
Protos.ContainerInfo.newBuilder()
+                                               
.setType(Protos.ContainerInfo.Type.MESOS)
+                                               
.setMesos(Protos.ContainerInfo.MesosInfo.newBuilder()
+                                               
.setImage(Protos.Image.newBuilder()
+                                                       
.setType(Protos.Image.Type.DOCKER)
+                                                       
.setDocker(Protos.Image.Docker.newBuilder()
+                                                               
.setName(params.containerImageName().get()))));
+                               }
+                               break;
+
+                       case DOCKER:
+                               assert(params.containerImageName().isDefined());
+                               containerInfo = 
Protos.ContainerInfo.newBuilder()
+                                       
.setType(Protos.ContainerInfo.Type.DOCKER)
+                                       
.setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
+                                               
.setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
+                                               
.setImage(params.containerImageName().get()));
+                               break;
+
+                       default:
+                               throw new IllegalStateException("unsupported 
container type");
+               }
+               if(containerInfo != null) {
+                       taskInfo.setContainer(containerInfo);
+               }
 
                return taskInfo.build();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index ef58250..4b9bd82 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -22,14 +22,17 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
-
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
 import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
@@ -38,14 +41,20 @@ import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.mesos.util.ZooKeeperUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import 
org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
+import 
org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityUtils;
-import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -53,21 +62,15 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.NamedThreadFactory;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
-
-import org.apache.hadoop.security.UserGroupInformation;
-
 import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import scala.Option;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.File;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URL;
-import java.net.UnknownHostException;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -75,9 +78,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.mesos.Utils.uri;
-import static org.apache.flink.mesos.Utils.variable;
-
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -106,6 +106,18 @@ public class MesosApplicationMasterRunner {
        private static final int ACTOR_DIED_EXIT_CODE = 32;
 
        // 
------------------------------------------------------------------------
+       //  Command-line options
+       // 
------------------------------------------------------------------------
+
+       private static final Options ALL_OPTIONS;
+
+       static {
+               ALL_OPTIONS =
+                       new Options()
+                       .addOption(BootstrapTools.newDynamicPropertiesOption());
+       }
+
+       // 
------------------------------------------------------------------------
        //  Program entry point
        // 
------------------------------------------------------------------------
 
@@ -126,36 +138,44 @@ public class MesosApplicationMasterRunner {
 
        /**
         * The instance entry point for the Mesos AppMaster. Obtains user group
-        * information and calls the main work method {@link 
#runPrivileged(Configuration)} as a
+        * information and calls the main work method {@link 
#runPrivileged(Configuration,Configuration)} as a
         * privileged action.
         *
         * @param args The command line arguments.
         * @return The process exit code.
         */
-       protected int run(String[] args) {
+       protected int run(final String[] args) {
                try {
                        LOG.debug("All environment variables: {}", ENV);
 
-                       final String workingDir = 
ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
-                       checkState(workingDir != null, "Sandbox directory 
variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
+                       // loading all config values here has the advantage 
that the program fails fast, if any
+                       // configuration problem occurs
 
-                       // Flink configuration
-                       final Configuration dynamicProperties =
-                                       
FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
-                       LOG.debug("Mesos dynamic properties: {}", 
dynamicProperties);
+                       CommandLineParser parser = new PosixParser();
+                       CommandLine cmd = parser.parse(ALL_OPTIONS, args);
 
-                       final Configuration configuration = 
createConfiguration(workingDir, dynamicProperties);
+                       final Configuration dynamicProperties = 
BootstrapTools.parseDynamicProperties(cmd);
+                       
GlobalConfiguration.setDynamicProperties(dynamicProperties);
+                       final Configuration config = 
GlobalConfiguration.loadConfiguration();
 
-                       SecurityUtils.SecurityConfiguration sc = new 
SecurityUtils.SecurityConfiguration(configuration);
+                       // configure the default filesystem
+                       try {
+                               FileSystem.setDefaultScheme(config);
+                       } catch (IOException e) {
+                               throw new IOException("Error while setting the 
default " +
+                                       "filesystem scheme from 
configuration.", e);
+                       }
+
+                       // configure security
+                       SecurityUtils.SecurityConfiguration sc = new 
SecurityUtils.SecurityConfiguration(config);
                        
sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
                        SecurityUtils.install(sc);
 
-                       LOG.info("Running Flink as user {}", 
UserGroupInformation.getCurrentUser().getShortUserName());
-
+                       // run the actual work in the installed security context
                        return 
SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
                                @Override
-                               public Integer call() {
-                                       return runPrivileged(configuration);
+                               public Integer call() throws Exception {
+                                       return runPrivileged(config, 
dynamicProperties);
                                }
                        });
                }
@@ -175,78 +195,38 @@ public class MesosApplicationMasterRunner {
         *
         * @return The return code for the Java process.
         */
-       protected int runPrivileged(Configuration config) {
+       protected int runPrivileged(Configuration config, Configuration 
dynamicProperties) {
 
                ActorSystem actorSystem = null;
                WebMonitor webMonitor = null;
                MesosArtifactServer artifactServer = null;
-
-               // ------- (1) load and parse / validate all configurations 
-------
-
-               // loading all config values here has the advantage that the 
program fails fast, if any
-               // configuration problem occurs
-
-               final String workingDir = 
ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
-
-               final String sessionID = 
ENV.get(MesosConfigKeys.ENV_SESSION_ID);
-               checkState(sessionID != null, "Session ID (%s) not set", 
MesosConfigKeys.ENV_SESSION_ID);
-
-               // Note that we use the "appMasterHostname" given by the 
system, to make sure
-               // we use the hostnames consistently throughout akka.
-               // for akka "localhost" and "localhost.localdomain" are 
different actors.
-               final String appMasterHostname;
+               ExecutorService futureExecutor = null;
+               ExecutorService ioExecutor = null;
 
                try {
-                       appMasterHostname = 
InetAddress.getLocalHost().getHostName();
-               } catch (UnknownHostException uhe) {
-                       LOG.error("Could not retrieve the local hostname.", 
uhe);
+                       // ------- (1) load and parse / validate all 
configurations -------
 
-                       return INIT_ERROR_EXIT_CODE;
-               }
+                       // Note that we use the "appMasterHostname" given by 
the system, to make sure
+                       // we use the hostnames consistently throughout akka.
+                       // for akka "localhost" and "localhost.localdomain" are 
different actors.
+                       final String appMasterHostname = 
InetAddress.getLocalHost().getHostName();
 
-               // Mesos configuration
-               final MesosConfiguration mesosConfig = 
createMesosConfig(config, appMasterHostname);
+                       // Mesos configuration
+                       final MesosConfiguration mesosConfig = 
createMesosConfig(config, appMasterHostname);
 
-               int numberProcessors = Hardware.getNumberCPUCores();
+                       // JM configuration
+                       int numberProcessors = Hardware.getNumberCPUCores();
 
-               final ExecutorService futureExecutor = 
Executors.newFixedThreadPool(
-                       numberProcessors,
-                       new NamedThreadFactory("mesos-jobmanager-future-", 
"-thread-"));
+                       futureExecutor = Executors.newFixedThreadPool(
+                               numberProcessors,
+                               new 
NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
 
-               final ExecutorService ioExecutor = Executors.newFixedThreadPool(
-                       numberProcessors,
-                       new NamedThreadFactory("mesos-jobmanager-io-", 
"-thread-"));
+                       ioExecutor = Executors.newFixedThreadPool(
+                               numberProcessors,
+                               new NamedThreadFactory("mesos-jobmanager-io-", 
"-thread-"));
 
-               try {
-                       // environment values related to TM
-                       final int taskManagerContainerMemory;
-                       final int numInitialTaskManagers;
-                       final int slotsPerTaskManager;
-
-                       try {
-                               taskManagerContainerMemory = 
Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_MEMORY));
-                       } catch (NumberFormatException e) {
-                               throw new RuntimeException("Invalid value for " 
+ MesosConfigKeys.ENV_TM_MEMORY + " : "
-                                       + e.getMessage());
-                       }
-                       try {
-                               numInitialTaskManagers = 
Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_COUNT));
-                       } catch (NumberFormatException e) {
-                               throw new RuntimeException("Invalid value for " 
+ MesosConfigKeys.ENV_TM_COUNT + " : "
-                                       + e.getMessage());
-                       }
-                       try {
-                               slotsPerTaskManager = 
Integer.parseInt(ENV.get(MesosConfigKeys.ENV_SLOTS));
-                       } catch (NumberFormatException e) {
-                               throw new RuntimeException("Invalid value for " 
+ MesosConfigKeys.ENV_SLOTS + " : "
-                                       + e.getMessage());
-                       }
-
-                       final ContaineredTaskManagerParameters 
containeredParameters =
-                               ContaineredTaskManagerParameters.create(config, 
taskManagerContainerMemory, slotsPerTaskManager);
-
-                       final MesosTaskManagerParameters taskManagerParameters =
-                               MesosTaskManagerParameters.create(config, 
containeredParameters);
+                       // TM configuration
+                       final MesosTaskManagerParameters taskManagerParameters 
= MesosTaskManagerParameters.create(config);
 
                        LOG.info("TaskManagers will be created with {} task 
slots",
                                
taskManagerParameters.containeredParameters().numSlots());
@@ -257,7 +237,7 @@ public class MesosApplicationMasterRunner {
                                
taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
                                taskManagerParameters.cpus());
 
-                       // JM endpoint, which should be explicitly configured 
by the dispatcher (based on acquired net resources)
+                       // JM endpoint, which should be explicitly configured 
based on acquired net resources
                        final int listeningPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
                                ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
                        checkState(listeningPort >= 0 && listeningPort <= 
65536, "Config parameter \"" +
@@ -279,18 +259,28 @@ public class MesosApplicationMasterRunner {
                        LOG.debug("Starting Artifact Server");
                        final int artifactServerPort = 
config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY,
                                
ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT);
-                       artifactServer = new MesosArtifactServer(sessionID, 
akkaHostname, artifactServerPort, config);
+                       final String artifactServerPrefix = 
UUID.randomUUID().toString();
+                       artifactServer = new 
MesosArtifactServer(artifactServerPrefix, akkaHostname, artifactServerPort, 
config);
 
                        // ----------------- (3) Generate the configuration for 
the TaskManagers -------------------
 
+                       // generate a container spec which conveys the 
artifacts/vars needed to launch a TM
+                       ContainerSpecification taskManagerContainerSpec = new 
ContainerSpecification();
+
+                       // propagate the AM dynamic configuration to the TM
+                       
taskManagerContainerSpec.getDynamicConfiguration().addAll(dynamicProperties);
+
+                       // propagate newly-generated configuration elements
                        final Configuration taskManagerConfig = 
BootstrapTools.generateTaskManagerConfiguration(
-                               config, akkaHostname, akkaPort, 
slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT);
-                       LOG.debug("TaskManager configuration: {}", 
taskManagerConfig);
+                               new Configuration(), akkaHostname, akkaPort, 
taskManagerParameters.containeredParameters().numSlots(),
+                               TASKMANAGER_REGISTRATION_TIMEOUT);
+                       
taskManagerContainerSpec.getDynamicConfiguration().addAll(taskManagerConfig);
+
+                       // apply the overlays
+                       applyOverlays(config, taskManagerContainerSpec);
 
-                       final Protos.TaskInfo.Builder taskManagerContext = 
createTaskManagerContext(
-                               config, ENV,
-                               taskManagerParameters, taskManagerConfig,
-                               workingDir, getTaskManagerClass(), 
artifactServer, LOG);
+                       // configure the artifact server to serve the specified 
artifacts
+                       configureArtifactServer(artifactServer, 
taskManagerContainerSpec);
 
                        // ----------------- (4) start the actors 
-------------------
 
@@ -341,8 +331,8 @@ public class MesosApplicationMasterRunner {
                                workerStore,
                                leaderRetriever,
                                taskManagerParameters,
-                               taskManagerContext,
-                               numInitialTaskManagers,
+                               taskManagerContainerSpec,
+                               artifactServer,
                                LOG);
 
                        ActorRef resourceMaster = 
actorSystem.actorOf(resourceMasterProps, "Mesos_Resource_Master");
@@ -389,8 +379,21 @@ public class MesosApplicationMasterRunner {
                                }
                        }
 
-                       futureExecutor.shutdownNow();
-                       ioExecutor.shutdownNow();
+                       if(futureExecutor != null) {
+                               try {
+                                       futureExecutor.shutdownNow();
+                               } catch (Throwable tt) {
+                                       LOG.error("Error shutting down future 
executor", tt);
+                               }
+                       }
+
+                       if(ioExecutor != null) {
+                               try {
+                                       ioExecutor.shutdownNow();
+                               } catch (Throwable tt) {
+                                       LOG.error("Error shutting down io 
executor", tt);
+                               }
+                       }
 
                        return INIT_ERROR_EXIT_CODE;
                }
@@ -442,35 +445,12 @@ public class MesosApplicationMasterRunner {
                return MemoryArchivist.class;
        }
 
-       protected Class<? extends TaskManager> getTaskManagerClass() {
-               return MesosTaskManager.class;
-       }
-
-       /**
-        *
-        * @param baseDirectory
-        * @param additional
-        *
-        * @return The configuration to be used by the TaskManagers.
-        */
-       private static Configuration createConfiguration(String baseDirectory, 
Configuration additional) {
-               LOG.info("Loading config from directory {}", baseDirectory);
-
-               Configuration configuration = 
GlobalConfiguration.loadConfiguration();
-
-               // add dynamic properties to JobManager configuration.
-               configuration.addAll(additional);
-
-               return configuration;
-       }
-
        /**
         * Loads and validates the ResourceManager Mesos configuration from the 
given Flink configuration.
         */
        public static MesosConfiguration createMesosConfig(Configuration 
flinkConfig, String hostname) {
 
                Protos.FrameworkInfo.Builder frameworkInfo = 
Protos.FrameworkInfo.newBuilder()
-                       .setUser("")
                        .setHostname(hostname);
                Protos.Credential.Builder credential = null;
 
@@ -494,6 +474,10 @@ public class MesosApplicationMasterRunner {
                        ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE,
                        
ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE));
 
+               frameworkInfo.setUser(flinkConfig.getString(
+                       ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_USER,
+                       
ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER));
+
                
if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL))
 {
                        frameworkInfo.setPrincipal(flinkConfig.getString(
                                
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null));
@@ -501,15 +485,16 @@ public class MesosApplicationMasterRunner {
                        credential = Protos.Credential.newBuilder();
                        credential.setPrincipal(frameworkInfo.getPrincipal());
 
-                       
if(!flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET))
 {
-                               throw new 
IllegalConfigurationException(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET
 + " must be configured.");
+                       // some environments use a side-channel to communicate 
the secret to Mesos,
+                       // and thus don't set the 'secret' configuration setting
+                       
if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET))
 {
+                               credential.setSecret(flinkConfig.getString(
+                                       
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
                        }
-                       credential.setSecret(flinkConfig.getString(
-                               
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
                }
 
                MesosConfiguration mesos =
-                       new MesosConfiguration(masterUrl, frameworkInfo, 
Option.apply(credential));
+                       new MesosConfiguration(masterUrl, frameworkInfo, 
scala.Option.apply(credential));
 
                return mesos;
        }
@@ -533,203 +518,34 @@ public class MesosApplicationMasterRunner {
        }
 
        /**
-        * Creates a Mesos task info template, which describes how to bring up 
a TaskManager process as
-        * a Mesos task.
+        * Generate a container specification as a TaskManager template.
         *
         * <p>This code is extremely Mesos-specific and registers all the 
artifacts that the TaskManager
-        * needs (such as JAR file, config file, ...) and all environment 
variables in a task info record.
+        * needs (such as JAR file, config file, ...) and all environment 
variables into a container specification.
         * The Mesos fetcher then ensures that those artifacts will be copied 
into the task's sandbox directory.
         * A lightweight HTTP server serves the artifacts to the fetcher.
-        *
-        * <p>We do this work before we start the ResourceManager actor in 
order to fail early if
-        * any of the operations here fail.
-        *
-        * @param flinkConfig
-        *         The Flink configuration object.
-        * @param env
-        *         The environment variables.
-        * @param tmParams
-        *         The TaskManager container memory parameters.
-        * @param taskManagerConfig
-        *         The configuration for the TaskManagers.
-        * @param workingDirectory
-        *         The current application master container's working directory.
-        * @param taskManagerMainClass
-        *         The class with the main method.
-        * @param artifactServer
-        *         The artifact server.
-        * @param log
-        *         The logger.
-        *
-        * @return The task info template for the TaskManager processes.
-        *
-        * @throws Exception Thrown if the task info could not be created, for 
example if
-        *                   the resources could not be copied.
-        */
-       public static Protos.TaskInfo.Builder createTaskManagerContext(
-               Configuration flinkConfig,
-               Map<String, String> env,
-               MesosTaskManagerParameters tmParams,
-               Configuration taskManagerConfig,
-               String workingDirectory,
-               Class<?> taskManagerMainClass,
-               MesosArtifactServer artifactServer,
-               Logger log) throws Exception {
-
-
-               Protos.TaskInfo.Builder info = Protos.TaskInfo.newBuilder();
-               Protos.CommandInfo.Builder cmd = 
Protos.CommandInfo.newBuilder();
-
-               log.info("Setting up artifacts for TaskManagers");
-
-               String shipListString = 
env.get(MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
-               checkState(shipListString != null, "Environment variable %s not 
set", MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
-
-               String classPathString = 
env.get(MesosConfigKeys.ENV_FLINK_CLASSPATH);
-               checkState(classPathString != null, "Environment variable %s 
not set", MesosConfigKeys.ENV_FLINK_CLASSPATH);
-
-               // register the Flink jar
-               final File flinkJarFile = new File(workingDirectory, 
"flink.jar");
-               cmd.addUris(uri(artifactServer.addFile(flinkJarFile, 
"flink.jar"), true));
-
-               String hadoopConfDir = env.get("HADOOP_CONF_DIR");
-               LOG.debug("ENV: hadoopConfDir = {}", hadoopConfDir);
-
-               //upload Hadoop configurations to artifact server
-               boolean hadoopConf = false;
-               if(hadoopConfDir != null && hadoopConfDir.length() != 0) {
-                       File source = new File(hadoopConfDir);
-                       if(source.exists() && source.isDirectory()) {
-                               hadoopConf = true;
-                               File[] fileList = source.listFiles();
-                               for(File file: fileList) {
-                                       
if(file.getName().equals("core-site.xml") || 
file.getName().equals("hdfs-site.xml")) {
-                                               LOG.debug("Adding local file: 
[{}] to artifact server", file);
-                                               File f = new 
File(hadoopConfDir, file.getName());
-                                               
cmd.addUris(uri(artifactServer.addFile(f, file.getName()), true));
-                                       }
-                               }
-                       }
-               }
-
-               //upload keytab to the artifact server
-               String keytabFileName = null;
-               String keytab = 
flinkConfig.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
-               if(keytab != null) {
-                       File source = new File(keytab);
-                       if(source.exists()) {
-                               LOG.debug("Adding keytab file: [{}] to artifact 
server", source);
-                               keytabFileName = source.getName();
-                               cmd.addUris(uri(artifactServer.addFile(source, 
source.getName()), true));
-                       }
-               }
-
-               String principal = 
flinkConfig.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
-               if(keytabFileName != null && principal != null) {
-                       //reset the configurations since we will use in-memory 
reference from within the TM instance
-                       
taskManagerConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,"");
-                       
taskManagerConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,"");
-               }
-
-               // register the TaskManager configuration
-               final File taskManagerConfigFile =
-                       new File(workingDirectory, UUID.randomUUID() + 
"-taskmanager-conf.yaml");
-               LOG.debug("Writing TaskManager configuration to {}", 
taskManagerConfigFile.getAbsolutePath());
-               BootstrapTools.writeConfiguration(taskManagerConfig, 
taskManagerConfigFile);
-               cmd.addUris(uri(artifactServer.addFile(taskManagerConfigFile, 
GlobalConfiguration.FLINK_CONF_FILENAME), true));
-
-               // prepare additional files to be shipped
-               for (String pathStr : shipListString.split(",")) {
-                       if (!pathStr.isEmpty()) {
-                               File shipFile = new File(workingDirectory, 
pathStr);
-                               
cmd.addUris(uri(artifactServer.addFile(shipFile, shipFile.getName()), true));
-                       }
-               }
-
-               log.info("Creating task info for TaskManagers");
-
-               // build the launch command
-               boolean hasLogback = new File(workingDirectory, 
"logback.xml").exists();
-               boolean hasLog4j = new File(workingDirectory, 
"log4j.properties").exists();
-               boolean hasKrb5 = false;
-
-               String launchCommand = 
BootstrapTools.getTaskManagerShellCommand(
-                       flinkConfig, tmParams.containeredParameters(), ".", ".",
-                       hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
-               cmd.setValue(launchCommand);
-
-               // build the environment variables
-               Protos.Environment.Builder envBuilder = 
Protos.Environment.newBuilder();
-               for (Map.Entry<String, String> entry : 
tmParams.containeredParameters().taskManagerEnv().entrySet()) {
-                       envBuilder.addVariables(variable(entry.getKey(), 
entry.getValue()));
-               }
-               envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLASSPATH, 
classPathString));
-
-               //add hadoop config directory to the environment
-               if(hadoopConf) {
-                       
envBuilder.addVariables(variable(MesosConfigKeys.ENV_HADOOP_CONF_DIR, "."));
-               }
-
-               //add keytab and principal to environment
-               if(keytabFileName != null && principal != null) {
-                       
envBuilder.addVariables(variable(MesosConfigKeys.ENV_KEYTAB, keytabFileName));
-                       
envBuilder.addVariables(variable(MesosConfigKeys.ENV_KEYTAB_PRINCIPAL, 
principal));
-               }
-
-               
envBuilder.addVariables(variable(MesosConfigKeys.ENV_HADOOP_USER_NAME,
-                               
UserGroupInformation.getCurrentUser().getUserName()));
-
-               cmd.setEnvironment(envBuilder);
-
-               info.setCommand(cmd);
-
-               // Set container for task manager if specified in configs.
-               String tmImageName = flinkConfig.getString(
-                       
ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_NAME, "");
-
-               if (tmImageName.length() > 0) {
-                       String taskManagerContainerType = flinkConfig.getString(
-                               
ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE,
-                               
ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_TYPE);
-
-                       Protos.ContainerInfo.Builder containerInfo;
-
-                       switch (taskManagerContainerType) {
-                               case 
ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS:
-                                       containerInfo = 
Protos.ContainerInfo.newBuilder()
-                                               
.setType(Protos.ContainerInfo.Type.MESOS)
-                                               
.setMesos(Protos.ContainerInfo.MesosInfo.newBuilder()
-                                                       
.setImage(Protos.Image.newBuilder()
-                                                               
.setType(Protos.Image.Type.DOCKER)
-                                                               
.setDocker(Protos.Image.Docker.newBuilder()
-                                                                       
.setName(tmImageName))));
-                                       break;
-                               case 
ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER:
-                                       containerInfo = 
Protos.ContainerInfo.newBuilder()
-                                               
.setType(Protos.ContainerInfo.Type.DOCKER)
-                                               
.setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
-                                                       
.setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
-                                                       .setImage(tmImageName));
-                                       break;
-                               default:
-                                       LOG.warn(
-                                               "Invalid container type '{}' 
provided for setting {}. Valid values are '{}' or '{}'. " +
-                                                       "Starting task managers 
now without container.",
-                                               taskManagerContainerType,
-                                               
ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE,
-                                               
ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS,
-                                               
ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER);
-
-                                       containerInfo = null;
-
-                                       break;
-                       }
+     */
+       private static void applyOverlays(
+               Configuration globalConfiguration, ContainerSpecification 
containerSpec) throws IOException {
+
+               // create the overlays that will produce the specification
+               CompositeContainerOverlay overlay = new 
CompositeContainerOverlay(
+                       
FlinkDistributionOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+                       
HadoopConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+                       
HadoopUserOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+                       
KeytabOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+                       
Krb5ConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+                       
SSLStoreOverlay.newBuilder().fromEnvironment(globalConfiguration).build()
+               );
+
+               // apply the overlays
+               overlay.configure(containerSpec);
+       }
 
-                       if (containerInfo != null) {
-                               info.setContainer(containerInfo);
-                       }
+       private static void configureArtifactServer(MesosArtifactServer server, 
ContainerSpecification container) throws IOException {
+               // serve the artifacts associated with the container environment
+               for(ContainerSpecification.Artifact artifact : 
container.getArtifacts()) {
+                       server.addPath(artifact.source, artifact.dest);
                }
-
-               return info;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
index bc6dde4..ebd9af5 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
@@ -26,21 +26,20 @@ public class MesosConfigKeys {
        //  Environment variable names
        // 
------------------------------------------------------------------------
 
-       public static final String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
-       public static final String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
-       public static final String ENV_SLOTS = "_SLOTS";
-       public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
-       public static final String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
-       public static final String ENV_DYNAMIC_PROPERTIES = 
"_DYNAMIC_PROPERTIES";
+       /**
+        * The Mesos task ID, used by the TM for informational purposes
+        */
        public static final String ENV_FLINK_CONTAINER_ID = 
"_FLINK_CONTAINER_ID";
+
+       /**
+        * Reserved for future enhancement
+        */
        public static final String ENV_FLINK_TMP_DIR = "_FLINK_TMP_DIR";
-       public static final String ENV_FLINK_CLASSPATH = "_FLINK_CLASSPATH";
-       public static final String ENV_CLASSPATH = "CLASSPATH";
-       public static final String ENV_MESOS_SANDBOX = "MESOS_SANDBOX";
-       public static final String ENV_SESSION_ID = "_CLIENT_SESSION_ID";
-       public static final String ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
-       public static final String ENV_KEYTAB = "_KEYTAB_FILE";
-       public static final String ENV_KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
+
+       /**
+        * JVM arguments, used by the JM and TM
+        */
+       public static final String ENV_JVM_ARGS = "JVM_ARGS";
 
        /** Private constructor to prevent instantiation */
        private MesosConfigKeys() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index 6b24ee8..a7321a3 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -27,6 +27,7 @@ import com.netflix.fenzo.functions.Action1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchableTask;
@@ -44,9 +45,11 @@ import 
org.apache.flink.mesos.scheduler.messages.ReRegistered;
 import org.apache.flink.mesos.scheduler.messages.Registered;
 import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
 import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosArtifactResolver;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -76,8 +79,11 @@ public class MesosFlinkResourceManager extends 
FlinkResourceManager<RegisteredMe
        /** The TaskManager container parameters (like container memory size) */
        private final MesosTaskManagerParameters taskManagerParameters;
 
-       /** Context information used to start a TaskManager Java process */
-       private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+       /** Container specification for launching a TM */
+       private final ContainerSpecification taskManagerContainerSpec;
+
+       /** Resolver for HTTP artifacts **/
+       private final MesosArtifactResolver artifactResolver;
 
        /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
        private final int maxFailedTasks;
@@ -112,7 +118,8 @@ public class MesosFlinkResourceManager extends 
FlinkResourceManager<RegisteredMe
                MesosWorkerStore workerStore,
                LeaderRetrievalService leaderRetrievalService,
                MesosTaskManagerParameters taskManagerParameters,
-               Protos.TaskInfo.Builder taskManagerLaunchContext,
+               ContainerSpecification taskManagerContainerSpec,
+               MesosArtifactResolver artifactResolver,
                int maxFailedTasks,
                int numInitialTaskManagers) {
 
@@ -121,9 +128,10 @@ public class MesosFlinkResourceManager extends 
FlinkResourceManager<RegisteredMe
                this.mesosConfig = requireNonNull(mesosConfig);
 
                this.workerStore = requireNonNull(workerStore);
+               this.artifactResolver = requireNonNull(artifactResolver);
 
                this.taskManagerParameters = 
requireNonNull(taskManagerParameters);
-               this.taskManagerLaunchContext = 
requireNonNull(taskManagerLaunchContext);
+               this.taskManagerContainerSpec = 
requireNonNull(taskManagerContainerSpec);
                this.maxFailedTasks = maxFailedTasks;
 
                this.workersInNew = new HashMap<>();
@@ -661,7 +669,7 @@ public class MesosFlinkResourceManager extends 
FlinkResourceManager<RegisteredMe
 
        private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID 
taskID) {
                LaunchableMesosWorker launchable =
-                       new LaunchableMesosWorker(taskManagerParameters, 
taskManagerLaunchContext, taskID);
+                       new LaunchableMesosWorker(artifactResolver, 
taskManagerParameters, taskManagerContainerSpec, taskID);
                return launchable;
        }
 
@@ -723,10 +731,10 @@ public class MesosFlinkResourceManager extends 
FlinkResourceManager<RegisteredMe
         *             The Flink configuration object.
         * @param taskManagerParameters
         *             The parameters for launching TaskManager containers.
-        * @param taskManagerLaunchContext
-        *             The parameters for launching the TaskManager processes 
in the TaskManager containers.
-        * @param numInitialTaskManagers
-        *             The initial number of TaskManagers to allocate.
+        * @param taskManagerContainerSpec
+        *             The container specification.
+        * @param artifactResolver
+        *             The artifact resolver to locate artifacts
         * @param log
         *             The logger to log to.
         *
@@ -738,10 +746,22 @@ public class MesosFlinkResourceManager extends 
FlinkResourceManager<RegisteredMe
                        MesosWorkerStore workerStore,
                        LeaderRetrievalService leaderRetrievalService,
                        MesosTaskManagerParameters taskManagerParameters,
-                       Protos.TaskInfo.Builder taskManagerLaunchContext,
-                       int numInitialTaskManagers,
+                       ContainerSpecification taskManagerContainerSpec,
+                       MesosArtifactResolver artifactResolver,
                        Logger log)
        {
+
+               final int numInitialTaskManagers = flinkConfig.getInteger(
+                       ConfigConstants.MESOS_INITIAL_TASKS, 0);
+               if (numInitialTaskManagers >= 0) {
+                       log.info("Mesos framework to allocate {} initial tasks",
+                               numInitialTaskManagers);
+               }
+               else {
+                       throw new IllegalConfigurationException("Invalid value 
for " +
+                               ConfigConstants.MESOS_INITIAL_TASKS + ", which 
must be at least zero.");
+               }
+
                final int maxFailedTasks = flinkConfig.getInteger(
                        ConfigConstants.MESOS_MAX_FAILED_TASKS, 
numInitialTaskManagers);
                if (maxFailedTasks >= 0) {
@@ -755,7 +775,8 @@ public class MesosFlinkResourceManager extends 
FlinkResourceManager<RegisteredMe
                        workerStore,
                        leaderRetrievalService,
                        taskManagerParameters,
-                       taskManagerLaunchContext,
+                       taskManagerContainerSpec,
+                       artifactResolver,
                        maxFailedTasks,
                        numInitialTaskManagers);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 1b19d08..7fae58c 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -19,10 +19,14 @@
 package org.apache.flink.mesos.runtime.clusterframework;
 
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import scala.Option;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
  * This class describes the Mesos-specific parameters for launching a 
TaskManager process.
@@ -32,13 +36,43 @@ import static java.util.Objects.requireNonNull;
  */
 public class MesosTaskManagerParameters {
 
-       private double cpus;
+       public static final ConfigOption<Integer> MESOS_RM_TASKS_SLOTS =
+                       key(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)
+                       .defaultValue(1);
 
-       private ContaineredTaskManagerParameters containeredParameters;
+       public static final ConfigOption<Integer> MESOS_RM_TASKS_MEMORY_MB =
+                       key("mesos.resourcemanager.tasks.mem")
+                       .defaultValue(1024);
 
-       public MesosTaskManagerParameters(double cpus, 
ContaineredTaskManagerParameters containeredParameters) {
+       public static final ConfigOption<Double> MESOS_RM_TASKS_CPUS =
+                       key("mesos.resourcemanager.tasks.cpus")
+                       .defaultValue(0.0);
+
+       public static final ConfigOption<String> MESOS_RM_CONTAINER_TYPE =
+               key("mesos.resourcemanager.tasks.container.type")
+                       .defaultValue("mesos");
+
+       public static final ConfigOption<String> MESOS_RM_CONTAINER_IMAGE_NAME =
+               key("mesos.resourcemanager.tasks.container.image.name")
+                       .noDefaultValue();
+
+       private final double cpus;
+
+       private final ContainerType containerType;
+
+       private final Option<String> containerImageName;
+
+       private final ContaineredTaskManagerParameters containeredParameters;
+
+       public MesosTaskManagerParameters(
+               double cpus,
+               ContainerType containerType,
+               Option<String> containerImageName,
+               ContaineredTaskManagerParameters containeredParameters) {
                requireNonNull(containeredParameters);
                this.cpus = cpus;
+               this.containerType = containerType;
+               this.containerImageName = containerImageName;
                this.containeredParameters = containeredParameters;
        }
 
@@ -50,6 +84,22 @@ public class MesosTaskManagerParameters {
        }
 
        /**
+        * Get the container type (Mesos or Docker).  The default is Mesos.
+        *
+        * Mesos provides a facility for a framework to specify which 
containerizer to use.
+     */
+       public ContainerType containerType() {
+               return containerType;
+       }
+
+       /**
+        * Get the container image name.
+     */
+       public Option<String> containerImageName() {
+               return containerImageName;
+       }
+
+       /**
         * Get the common containered parameters.
      */
        public ContaineredTaskManagerParameters containeredParameters() {
@@ -60,6 +110,8 @@ public class MesosTaskManagerParameters {
        public String toString() {
                return "MesosTaskManagerParameters{" +
                        "cpus=" + cpus +
+                       ", containerType=" + containerType +
+                       ", containerImageName=" + containerImageName +
                        ", containeredParameters=" + containeredParameters +
                        '}';
        }
@@ -67,15 +119,49 @@ public class MesosTaskManagerParameters {
        /**
         * Create the Mesos TaskManager parameters.
         * @param flinkConfig the TM configuration.
-        * @param containeredParameters additional containered parameters.
      */
-       public static MesosTaskManagerParameters create(
-               Configuration flinkConfig,
-               ContaineredTaskManagerParameters containeredParameters) {
+       public static MesosTaskManagerParameters create(Configuration 
flinkConfig) {
 
-               double cpus = 
flinkConfig.getDouble(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CPUS,
-                       Math.max(containeredParameters.numSlots(), 1.0));
+               // parse the common parameters
+               ContaineredTaskManagerParameters containeredParameters = 
ContaineredTaskManagerParameters.create(
+                       flinkConfig,
+                       flinkConfig.getInteger(MESOS_RM_TASKS_MEMORY_MB),
+                       flinkConfig.getInteger(MESOS_RM_TASKS_SLOTS));
+
+               double cpus = flinkConfig.getDouble(MESOS_RM_TASKS_CPUS);
+               if(cpus <= 0.0) {
+                       cpus = Math.max(containeredParameters.numSlots(), 1.0);
+               }
+
+               // parse the containerization parameters
+               String imageName = 
flinkConfig.getString(MESOS_RM_CONTAINER_IMAGE_NAME);
+
+               ContainerType containerType;
+               String containerTypeString = 
flinkConfig.getString(MESOS_RM_CONTAINER_TYPE);
+               switch(containerTypeString) {
+                       case 
ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS:
+                               containerType = ContainerType.MESOS;
+                               break;
+                       case 
ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER:
+                               containerType = ContainerType.DOCKER;
+                               if(imageName == null || imageName.length() == 
0) {
+                                       throw new 
IllegalConfigurationException(MESOS_RM_CONTAINER_IMAGE_NAME.key() +
+                                               " must be specified for docker 
container type");
+                               }
+                               break;
+                       default:
+                               throw new 
IllegalConfigurationException("invalid container type: " + containerTypeString);
+               }
+
+               return new MesosTaskManagerParameters(
+                       cpus,
+                       containerType,
+                       Option.apply(imageName),
+                       containeredParameters);
+       }
 
-               return new MesosTaskManagerParameters(cpus, 
containeredParameters);
+       public enum ContainerType {
+               MESOS,
+               DOCKER
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 5100deb..75b5043 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -18,15 +18,20 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
 import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -35,7 +40,6 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.security.UserGroupInformation;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,24 +51,33 @@ public class MesosTaskManagerRunner {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(MesosTaskManagerRunner.class);
 
+       private static final Options ALL_OPTIONS;
+
+       static {
+               ALL_OPTIONS =
+                       new Options()
+                               
.addOption(BootstrapTools.newDynamicPropertiesOption());
+       }
+
        /** The process environment variables */
        private static final Map<String, String> ENV = System.getenv();
 
-       public static void runTaskManager(String[] args, final Class<? extends 
TaskManager> taskManager) throws IOException {
+       public static void runTaskManager(String[] args, final Class<? extends 
TaskManager> taskManager) throws Exception {
                EnvironmentInformation.logEnvironmentInfo(LOG, 
taskManager.getSimpleName(), args);
                SignalHandler.register(LOG);
                JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
                // try to parse the command line arguments
+               CommandLineParser parser = new PosixParser();
+               CommandLine cmd = parser.parse(ALL_OPTIONS, args);
+
                final Configuration configuration;
                try {
-                       configuration = 
TaskManager.parseArgsAndLoadConfig(args);
-
-                       // add dynamic properties to TaskManager configuration.
-                       final Configuration dynamicProperties =
-                               
FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+                       final Configuration dynamicProperties = 
BootstrapTools.parseDynamicProperties(cmd);
+                       
GlobalConfiguration.setDynamicProperties(dynamicProperties);
                        LOG.debug("Mesos dynamic properties: {}", 
dynamicProperties);
-                       configuration.addAll(dynamicProperties);
+
+                       configuration = GlobalConfiguration.loadConfiguration();
                }
                catch (Throwable t) {
                        LOG.error("Failed to load the TaskManager configuration 
and dynamic properties.", t);
@@ -74,7 +87,6 @@ public class MesosTaskManagerRunner {
 
                // read the environment variables
                final Map<String, String> envs = System.getenv();
-               final String effectiveUsername = 
envs.get(MesosConfigKeys.ENV_HADOOP_USER_NAME);
                final String tmpDirs = 
envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
 
                // configure local directory
@@ -88,20 +100,12 @@ public class MesosTaskManagerRunner {
                        
configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs);
                }
 
-               final String keytab = envs.get(MesosConfigKeys.ENV_KEYTAB);
-               LOG.info("Keytab file:{}", keytab);
-
-               final String principal = 
envs.get(MesosConfigKeys.ENV_KEYTAB_PRINCIPAL);
-               LOG.info("Keytab principal:{}", principal);
-
-               if(keytab != null && keytab.length() != 0) {
-                       File f = new File(".", keytab);
-                       if(!f.exists()) {
-                               LOG.error("Could not locate keytab file:[" + 
keytab + "]");
-                               
System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
-                       }
-                       
configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytab);
-                       
configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, principal);
+               // configure the default filesystem
+               try {
+                       FileSystem.setDefaultScheme(configuration);
+               } catch (IOException e) {
+                       throw new IOException("Error while setting the default 
" +
+                               "filesystem scheme from configuration.", e);
                }
 
                // tell akka to die in case of an error
@@ -112,23 +116,17 @@ public class MesosTaskManagerRunner {
                final ResourceID resourceId = new ResourceID(containerID);
                LOG.info("ResourceID assigned for this container: {}", 
resourceId);
 
-               String hadoopConfDir = 
envs.get(MesosConfigKeys.ENV_HADOOP_CONF_DIR);
-               LOG.info("hadoopConfDir: {}", hadoopConfDir);
-
+               // Run the TM in the security context
                SecurityUtils.SecurityConfiguration sc = new 
SecurityUtils.SecurityConfiguration(configuration);
-               if(hadoopConfDir != null && hadoopConfDir.length() != 0) {
-                       
sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
-               }
+               sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
+               SecurityUtils.install(sc);
 
                try {
-                       SecurityUtils.install(sc);
-                       LOG.info("Mesos task runs as '{}', setting user to 
execute Flink TaskManager to '{}'",
-                                       
UserGroupInformation.getCurrentUser().getShortUserName(), effectiveUsername);
-                       SecurityUtils.getInstalledContext().runSecured(new 
Callable<Object>() {
+                       SecurityUtils.getInstalledContext().runSecured(new 
Callable<Integer>() {
                                @Override
-                               public Object call() throws Exception {
+                               public Integer call() throws Exception {
                                        
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, 
taskManager);
-                                       return null;
+                                       return 0;
                                }
                        });
                }
@@ -136,6 +134,5 @@ public class MesosTaskManagerRunner {
                        LOG.error("Error while starting the TaskManager", t);
                        System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
                }
-
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
new file mode 100644
index 0000000..a6a26dc
--- /dev/null
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.util;
+
+import org.apache.flink.core.fs.Path;
+import scala.Option;
+
+import java.net.URL;
+
+/**
+ * An interface for resolving artifact URIs.
+ */
+public interface MesosArtifactResolver {
+       Option<URL> resolve(Path remoteFile);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index fbf61ac..37cb260 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -26,7 +26,6 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
-import io.netty.channel.DefaultFileRegion;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
@@ -43,24 +42,32 @@ import io.netty.handler.codec.http.LastHttpContent;
 import io.netty.handler.codec.http.router.Handler;
 import io.netty.handler.codec.http.router.Routed;
 import io.netty.handler.codec.http.router.Router;
+import io.netty.handler.stream.ChunkedStream;
+
+import io.netty.handler.stream.ChunkedWriteHandler;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.CharsetUtil;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.jets3t.service.utils.Mimetypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
 
 import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
 import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
@@ -82,7 +89,7 @@ import static 
io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
  * http://mesos.apache.org/documentation/latest/fetcher/
  * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/
  */
-public class MesosArtifactServer {
+public class MesosArtifactServer implements MesosArtifactResolver {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(MesosArtifactServer.class);
 
@@ -92,17 +99,19 @@ public class MesosArtifactServer {
 
        private Channel serverChannel;
 
-       private URL baseURL;
+       private final URL baseURL;
+
+       private final Map<Path,URL> paths = new HashMap<>();
 
        private final SSLContext serverSSLContext;
 
-       public MesosArtifactServer(String sessionID, String serverHostname, int 
configuredPort, Configuration config)
-                       throws Exception {
+       public MesosArtifactServer(String prefix, String serverHostname, int 
configuredPort, Configuration config)
+               throws Exception {
                if (configuredPort < 0 || configuredPort > 0xFFFF) {
                        throw new IllegalArgumentException("File server port is 
invalid: " + configuredPort);
                }
 
-               // Config to enable https access to the web-ui
+               // Config to enable https access to the artifact server
                boolean enableSSL = config.getBoolean(
                                
ConfigConstants.MESOS_ARTIFACT_SERVER_SSL_ENABLED,
                                
ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED) &&
@@ -136,6 +145,7 @@ public class MesosArtifactServer {
 
                                ch.pipeline()
                                        .addLast(new HttpServerCodec())
+                                       .addLast(new ChunkedWriteHandler())
                                        .addLast(handler.name(), handler)
                                        .addLast(new UnknownFileHandler());
                        }
@@ -159,11 +169,15 @@ public class MesosArtifactServer {
 
                String httpProtocol = (serverSSLContext != null) ? "https": 
"http";
 
-               baseURL = new URL(httpProtocol, serverHostname, port, "/" + 
sessionID + "/");
+               baseURL = new URL(httpProtocol, serverHostname, port, "/" + 
prefix + "/");
 
                LOG.info("Mesos Artifact Server Base URL: {}, listening at 
{}:{}", baseURL, address, port);
        }
 
+       public URL baseURL() {
+               return baseURL;
+       }
+
        /**
         * Get the server port on which the artifact server is listening.
         */
@@ -185,13 +199,51 @@ public class MesosArtifactServer {
         * @param remoteFile the remote path with which to locate the file.
         * @return the fully-qualified remote path to the file.
         * @throws MalformedURLException if the remote path is invalid.
+     */
+       public synchronized URL addFile(File localFile, String remoteFile) 
throws IOException, MalformedURLException {
+               return addPath(new Path(localFile.toURI()), new 
Path(remoteFile));
+       }
+
+       /**
+        * Adds a path to the artifact server.
+        * @param path the qualified FS path to serve (local, hdfs, etc).
+        * @param remoteFile the remote path with which to locate the file.
+        * @return the fully-qualified remote path to the file.
+        * @throws MalformedURLException if the remote path is invalid.
         */
-       public synchronized URL addFile(File localFile, String remoteFile) 
throws MalformedURLException {
-               URL fileURL = new URL(baseURL, remoteFile);
-               router.ANY(fileURL.getPath(), new 
VirtualFileServerHandler(localFile));
+       public synchronized URL addPath(Path path, Path remoteFile) throws 
IOException, MalformedURLException {
+               if(paths.containsKey(remoteFile)) {
+                       throw new IllegalArgumentException("duplicate path 
registered");
+               }
+               if(remoteFile.isAbsolute()) {
+                       throw new IllegalArgumentException("not expecting an 
absolute path");
+               }
+               URL fileURL = new URL(baseURL, remoteFile.toString());
+               router.ANY(fileURL.getPath(), new 
VirtualFileServerHandler(path));
+
+               paths.put(remoteFile, fileURL);
+
                return fileURL;
        }
 
+       public synchronized void removePath(Path remoteFile) {
+               if(paths.containsKey(remoteFile)) {
+                       URL fileURL = null;
+                       try {
+                               fileURL = new URL(baseURL, 
remoteFile.toString());
+                       } catch (MalformedURLException e) {
+                               throw new RuntimeException(e);
+                       }
+                       router.removePath(fileURL.getPath());
+               }
+       }
+
+       @Override
+       public synchronized Option<URL> resolve(Path remoteFile) {
+               Option<URL> resolved = Option.apply(paths.get(remoteFile));
+               return resolved;
+       }
+
        /**
         * Stops the artifact server.
         * @throws Exception
@@ -215,12 +267,17 @@ public class MesosArtifactServer {
        @ChannelHandler.Sharable
        public static class VirtualFileServerHandler extends 
SimpleChannelInboundHandler<Routed> {
 
-               private final File file;
+               private FileSystem fs;
+               private Path path;
 
-               public VirtualFileServerHandler(File file) {
-                       this.file = file;
-                       if(!file.exists()) {
-                               throw new IllegalArgumentException("no such 
file: " + file.getAbsolutePath());
+               public VirtualFileServerHandler(Path path) throws IOException {
+                       this.path = path;
+                       if(!path.isAbsolute()) {
+                               throw new IllegalArgumentException("path must 
be absolute: " + path.toString());
+                       }
+                       this.fs = path.getFileSystem();
+                       if(!fs.exists(path) || fs.getFileStatus(path).isDir()) {
+                               throw new IllegalArgumentException("no such 
file: " + path.toString());
                        }
                }
 
@@ -230,7 +287,7 @@ public class MesosArtifactServer {
                        HttpRequest request = routed.request();
 
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("{} request for file '{}'", 
request.getMethod(), file.getAbsolutePath());
+                               LOG.debug("{} request for file '{}'", 
request.getMethod(), path);
                        }
 
                        if(!(request.getMethod() == GET || request.getMethod() 
== HEAD)) {
@@ -238,47 +295,40 @@ public class MesosArtifactServer {
                                return;
                        }
 
-                       final RandomAccessFile raf;
+
+                       final FileStatus status;
                        try {
-                               raf = new RandomAccessFile(file, "r");
+                               status = fs.getFileStatus(path);
                        }
-                       catch (FileNotFoundException e) {
+                       catch (IOException e) {
+                               LOG.error("unable to stat file", e);
                                sendError(ctx, GONE);
                                return;
                        }
-                       try {
-                               long fileLength = raf.length();
 
-                               // compose the response
-                               HttpResponse response = new 
DefaultHttpResponse(HTTP_1_1, OK);
-                               if (HttpHeaders.isKeepAlive(request)) {
-                                       response.headers().set(CONNECTION, 
HttpHeaders.Values.KEEP_ALIVE);
-                               }
-                               HttpHeaders.setHeader(response, CACHE_CONTROL, 
"private");
-                               HttpHeaders.setHeader(response, CONTENT_TYPE, 
Mimetypes.MIMETYPE_OCTET_STREAM);
-                               HttpHeaders.setContentLength(response, 
fileLength);
+                       // compose the response
+                       HttpResponse response = new 
DefaultHttpResponse(HTTP_1_1, OK);
+                       HttpHeaders.setHeader(response, CONNECTION, 
HttpHeaders.Values.CLOSE);
+                       HttpHeaders.setHeader(response, CACHE_CONTROL, 
"private");
+                       HttpHeaders.setHeader(response, CONTENT_TYPE, 
Mimetypes.MIMETYPE_OCTET_STREAM);
+                       HttpHeaders.setContentLength(response, status.getLen());
 
-                               ctx.write(response);
+                       ctx.write(response);
 
-                               if (request.getMethod() == GET) {
-                                       // write the content.  Netty's 
DefaultFileRegion will close the file.
-                                       ctx.write(new 
DefaultFileRegion(raf.getChannel(), 0, fileLength), 
ctx.newProgressivePromise());
+                       if (request.getMethod() == GET) {
+                               // write the content.  Netty will close the 
stream.
+                               final FSDataInputStream stream = fs.open(path);
+                               try {
+                                       ctx.write(new ChunkedStream(stream));
                                }
-                               else {
-                                       // close the file immediately in HEAD 
case
-                                       raf.close();
+                               catch(Exception e) {
+                                       stream.close();
+                                       throw e;
                                }
-                               ChannelFuture lastContentFuture = 
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
-
-                               // close the connection, if no keep-alive is 
needed
-                               if (!HttpHeaders.isKeepAlive(request)) {
-                                       
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
-                               }
-                       }
-                       catch(Exception ex) {
-                               raf.close();
-                               throw ex;
                        }
+
+                       ChannelFuture lastContentFuture = 
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+                       
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index f287e13..93ccf68 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -33,10 +33,12 @@ import org.apache.flink.mesos.scheduler.LaunchCoordinator;
 import org.apache.flink.mesos.scheduler.TaskMonitor;
 import org.apache.flink.mesos.scheduler.messages.*;
 import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.util.MesosArtifactResolver;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.messages.*;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -79,6 +81,7 @@ public class MesosFlinkResourceManagerTest {
 
        private static Configuration config = new Configuration() {{
                setInteger(ConfigConstants.MESOS_MAX_FAILED_TASKS, -1);
+               setInteger(ConfigConstants.MESOS_INITIAL_TASKS, 0);
        }};
 
        @BeforeClass
@@ -107,12 +110,13 @@ public class MesosFlinkResourceManagerTest {
                        MesosWorkerStore workerStore,
                        LeaderRetrievalService leaderRetrievalService,
                        MesosTaskManagerParameters taskManagerParameters,
-                       Protos.TaskInfo.Builder taskManagerLaunchContext,
+                       ContainerSpecification taskManagerContainerSpec,
+                       MesosArtifactResolver artifactResolver,
                        int maxFailedTasks,
                        int numInitialTaskManagers) {
 
                        super(flinkConfig, mesosConfig, workerStore, 
leaderRetrievalService, taskManagerParameters,
-                               taskManagerLaunchContext, maxFailedTasks, 
numInitialTaskManagers);
+                               taskManagerContainerSpec, artifactResolver, 
maxFailedTasks, numInitialTaskManagers);
                }
 
                @Override
@@ -141,6 +145,7 @@ public class MesosFlinkResourceManagerTest {
                public LeaderRetrievalService retrievalService;
                public MesosConfiguration mesosConfig;
                public MesosWorkerStore workerStore;
+               public MesosArtifactResolver artifactResolver;
                public SchedulerDriver schedulerDriver;
                public TestingMesosFlinkResourceManager resourceManagerInstance;
                public ActorGateway resourceManager;
@@ -176,6 +181,9 @@ public class MesosFlinkResourceManagerTest {
                                // worker store
                                workerStore = mock(MesosWorkerStore.class);
                                
when(workerStore.getFrameworkID()).thenReturn(Option.<Protos.FrameworkID>empty());
+
+                               // artifact
+                               artifactResolver = 
mock(MesosArtifactResolver.class);
                        } catch (Exception ex) {
                                throw new RuntimeException(ex);
                        }
@@ -185,15 +193,16 @@ public class MesosFlinkResourceManagerTest {
                 * Initialize the resource manager.
                 */
                public void initialize() {
+                       ContainerSpecification containerSpecification = new 
ContainerSpecification();
                        ContaineredTaskManagerParameters containeredParams =
                                new ContaineredTaskManagerParameters(1024, 768, 
256, 4, new HashMap<String, String>());
-                       MesosTaskManagerParameters tmParams = new 
MesosTaskManagerParameters(1.0, containeredParams);
-                       Protos.TaskInfo.Builder taskInfo = 
Protos.TaskInfo.newBuilder();
+                       MesosTaskManagerParameters tmParams = new 
MesosTaskManagerParameters(
+                               1.0, 
MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), 
containeredParams);
 
                        TestActorRef<TestingMesosFlinkResourceManager> 
resourceManagerRef =
                                TestActorRef.create(system, 
MesosFlinkResourceManager.createActorProps(
                                        TestingMesosFlinkResourceManager.class,
-                                       config, mesosConfig, workerStore, 
retrievalService, tmParams, taskInfo, 0, LOG));
+                                       config, mesosConfig, workerStore, 
retrievalService, tmParams, containerSpecification, artifactResolver, LOG));
                        resourceManagerInstance = 
resourceManagerRef.underlyingActor();
                        resourceManager = new 
AkkaActorGateway(resourceManagerRef, null);
 

Reply via email to