[FLINK-8490] Allow custom docker parameters for docker tasks on Mesos

This closes #5346.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6969fe2f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6969fe2f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6969fe2f

Branch: refs/heads/master
Commit: 6969fe2fa823be7748cee002a32df02fd1cae09f
Parents: a6d7f2d
Author: Leonid Ishimnikov <[email protected]>
Authored: Tue Jan 23 13:55:17 2018 -0500
Committer: Till Rohrmann <[email protected]>
Committed: Fri Jan 26 13:50:21 2018 +0100

----------------------------------------------------------------------
 docs/ops/config.md                              |  2 +
 docs/ops/deployment/mesos.md                    |  2 +
 .../clusterframework/LaunchableMesosWorker.java |  1 +
 .../clusterframework/MesosResourceManager.java  |  1 +
 .../MesosTaskManagerParameters.java             | 47 ++++++++++++++++++++
 .../MesosFlinkResourceManagerTest.java          |  1 +
 .../MesosResourceManagerTest.java               |  3 +-
 .../MesosTaskManagerParametersTest.java         | 36 +++++++++++++++
 8 files changed, 92 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 8a4b1c1..43f281b 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -512,6 +512,8 @@ May be set to -1 to disable this feature.
 
 - `mesos.resourcemanager.tasks.container.volumes`: A comma separated list of 
`[host_path:]`container_path`[:RO|RW]`. This allows for mounting additional 
volumes into your container. (**NO DEFAULT**)
 
+- `mesos.resourcemanager.tasks.container.docker.parameters`: Custom parameters 
to be passed into docker run command when using the docker containerizer. Comma 
separated list of `key=value` pairs. `value` may contain '=' (**NO DEFAULT**)
+
 - `high-availability.zookeeper.path.mesos-workers`: The ZooKeeper root path 
for persisting the Mesos worker information.
 
 ### High Availability (HA)

http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/docs/ops/deployment/mesos.md
----------------------------------------------------------------------
diff --git a/docs/ops/deployment/mesos.md b/docs/ops/deployment/mesos.md
index 5771abe..4d08104 100644
--- a/docs/ops/deployment/mesos.md
+++ b/docs/ops/deployment/mesos.md
@@ -264,6 +264,8 @@ May be set to -1 to disable this feature.
 
 `mesos.resourcemanager.tasks.container.volumes`: A comma separated list of 
`[host_path:]`container_path`[:RO|RW]`. This allows for mounting additional 
volumes into your container. (**NO DEFAULT**)
 
+`mesos.resourcemanager.tasks.container.docker.parameters`: Custom parameters 
to be passed into docker run command when using the docker containerizer. Comma 
separated list of `key=value` pairs. `value` may contain '=' (**NO DEFAULT**)
+
 `mesos.resourcemanager.tasks.hostname`: Optional value to define the 
TaskManager's hostname. The pattern `_TASK_` is replaced by the actual id of 
the Mesos task. This can be used to configure the TaskManager to use Mesos DNS 
(e.g. `_TASK_.flink-service.mesos`) for name lookups. (**NO DEFAULT**)
 
 `mesos.resourcemanager.tasks.bootstrap-cmd`: A command which is executed 
before the TaskManager is started (**NO DEFAULT**).

http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index e71c703..b4176a8 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -298,6 +298,7 @@ public class LaunchableMesosWorker implements 
LaunchableTask {
                                containerInfo
                                        
.setType(Protos.ContainerInfo.Type.DOCKER)
                                        
.setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
+                                               
.addAllParameters(params.dockerParameters())
                                                
.setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
                                                
.setImage(params.containerImageName().get()));
                                break;

http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 8b67257..d76f2fe 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -672,6 +672,7 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
                                1,
                                new 
HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())),
                        taskManagerParameters.containerVolumes(),
+                       taskManagerParameters.dockerParameters(),
                        taskManagerParameters.constraints(),
                        taskManagerParameters.command(),
                        taskManagerParameters.bootstrapCommand(),

http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 3859913..f0ba113 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -86,6 +86,10 @@ public class MesosTaskManagerParameters {
                key("mesos.resourcemanager.tasks.container.volumes")
                .noDefaultValue();
 
+       public static final ConfigOption<String> 
MESOS_RM_CONTAINER_DOCKER_PARAMETERS =
+               key("mesos.resourcemanager.tasks.container.docker.parameters")
+               .noDefaultValue();
+
        public static final ConfigOption<String> 
MESOS_CONSTRAINTS_HARD_HOSTATTR =
                key("mesos.constraints.hard.hostattribute")
                .noDefaultValue();
@@ -109,6 +113,8 @@ public class MesosTaskManagerParameters {
 
        private final List<Protos.Volume> containerVolumes;
 
+       private final List<Protos.Parameter> dockerParameters;
+
        private final List<ConstraintEvaluator> constraints;
 
        private final String command;
@@ -123,6 +129,7 @@ public class MesosTaskManagerParameters {
                        Option<String> containerImageName,
                        ContaineredTaskManagerParameters containeredParameters,
                        List<Protos.Volume> containerVolumes,
+                       List<Protos.Parameter> dockerParameters,
                        List<ConstraintEvaluator> constraints,
                        String command,
                        Option<String> bootstrapCommand,
@@ -133,6 +140,7 @@ public class MesosTaskManagerParameters {
                this.containerImageName = 
Preconditions.checkNotNull(containerImageName);
                this.containeredParameters = 
Preconditions.checkNotNull(containeredParameters);
                this.containerVolumes = 
Preconditions.checkNotNull(containerVolumes);
+               this.dockerParameters = 
Preconditions.checkNotNull(dockerParameters);
                this.constraints = Preconditions.checkNotNull(constraints);
                this.command = Preconditions.checkNotNull(command);
                this.bootstrapCommand = 
Preconditions.checkNotNull(bootstrapCommand);
@@ -177,6 +185,13 @@ public class MesosTaskManagerParameters {
        }
 
        /**
+        * Get Docker runtime parameters.
+        */
+       public List<Protos.Parameter> dockerParameters() {
+               return dockerParameters;
+       }
+
+       /**
         * Get the placement constraints.
         */
        public List<ConstraintEvaluator> constraints() {
@@ -212,6 +227,7 @@ public class MesosTaskManagerParameters {
                        ", containerImageName=" + containerImageName +
                        ", containeredParameters=" + containeredParameters +
                        ", containerVolumes=" + containerVolumes +
+                       ", dockerParameters=" + dockerParameters +
                        ", constraints=" + constraints +
                        ", taskManagerHostName=" + taskManagerHostname +
                        ", command=" + command +
@@ -260,8 +276,12 @@ public class MesosTaskManagerParameters {
 
                Option<String> containerVolOpt = 
Option.<String>apply(flinkConfig.getString(MESOS_RM_CONTAINER_VOLUMES));
 
+               Option<String> dockerParamsOpt = 
Option.<String>apply(flinkConfig.getString(MESOS_RM_CONTAINER_DOCKER_PARAMETERS));
+
                List<Protos.Volume> containerVolumes = 
buildVolumes(containerVolOpt);
 
+               List<Protos.Parameter> dockerParameters = 
buildDockerParameters(dockerParamsOpt);
+
                //obtain Task Manager Host Name from the configuration
                Option<String> taskManagerHostname = 
Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME));
 
@@ -275,6 +295,7 @@ public class MesosTaskManagerParameters {
                        Option.apply(imageName),
                        containeredParameters,
                        containerVolumes,
+                       dockerParameters,
                        constraints,
                        tmCommand,
                        tmBootstrapCommand,
@@ -365,6 +386,32 @@ public class MesosTaskManagerParameters {
                }
        }
 
+       public static List<Protos.Parameter> 
buildDockerParameters(Option<String> dockerParameters) {
+               if (dockerParameters.isEmpty()) {
+                       return Collections.emptyList();
+               } else {
+                       String[] dockerParameterSpecifications = 
dockerParameters.get().split(",");
+
+                       List<Protos.Parameter> parameters = new 
ArrayList<>(dockerParameterSpecifications.length);
+
+                       for (String dockerParameterSpecification : 
dockerParameterSpecifications) {
+                               if 
(!dockerParameterSpecification.trim().isEmpty()) {
+                                       // split with the limit of 2 in case 
the value includes '='
+                                       String[] match = 
dockerParameterSpecification.split("=", 2);
+                                       if (match.length != 2) {
+                                               throw new 
IllegalArgumentException("Docker parameter specification is invalid, given: "
+                                                       + 
dockerParameterSpecification);
+                                       }
+                                       Protos.Parameter.Builder parameter = 
Protos.Parameter.newBuilder();
+                                       parameter.setKey(match[0]);
+                                       parameter.setValue(match[1]);
+                                       parameters.add(parameter.build());
+                               }
+                       }
+                       return parameters;
+               }
+       }
+
        /**
         * The supported containerizers.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index ff32486..c2455ec 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -250,6 +250,7 @@ public class MesosFlinkResourceManagerTest extends 
TestLogger {
                                Option.<String>empty(),
                                containeredParams,
                                Collections.<Protos.Volume>emptyList(),
+                               Collections.<Protos.Parameter>emptyList(),
                                Collections.<ConstraintEvaluator>emptyList(),
                                "",
                                Option.<String>empty(),

http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 85ba142..14e8ed9 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -263,7 +263,8 @@ public class MesosResourceManagerTest extends TestLogger {
                                new ContaineredTaskManagerParameters(1024, 768, 
256, 4, new HashMap<String, String>());
                        MesosTaskManagerParameters tmParams = new 
MesosTaskManagerParameters(
                                1.0, 
MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), 
containeredParams,
-                               Collections.<Protos.Volume>emptyList(), 
Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(),
+                               Collections.<Protos.Volume>emptyList(), 
Collections.<Protos.Parameter>emptyList(),
+                               Collections.<ConstraintEvaluator>emptyList(), 
"", Option.<String>empty(),
                                Option.<String>empty());
 
                        // resource manager

http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
index 1f33cb5..84b0ff2 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
@@ -85,6 +85,42 @@ public class MesosTaskManagerParametersTest extends 
TestLogger {
        }
 
        @Test
+       public void testContainerDockerParameter() throws Exception {
+               Configuration config = new Configuration();
+               
config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS,
 "testKey=testValue");
+
+               MesosTaskManagerParameters params = 
MesosTaskManagerParameters.create(config);
+               assertEquals(params.dockerParameters().size(), 1);
+               assertEquals(params.dockerParameters().get(0).getKey(), 
"testKey");
+               assertEquals(params.dockerParameters().get(0).getValue(), 
"testValue");
+       }
+
+       @Test
+       public void testContainerDockerParameters() throws Exception {
+               Configuration config = new Configuration();
+               
config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS,
+                               
"testKey1=testValue1,testKey2=testValue2,testParam3=key3=value3,testParam4=\"key4=value4\"");
+
+               MesosTaskManagerParameters params = 
MesosTaskManagerParameters.create(config);
+               assertEquals(params.dockerParameters().size(), 4);
+               assertEquals(params.dockerParameters().get(0).getKey(), 
"testKey1");
+               assertEquals(params.dockerParameters().get(0).getValue(), 
"testValue1");
+               assertEquals(params.dockerParameters().get(1).getKey(), 
"testKey2");
+               assertEquals(params.dockerParameters().get(1).getValue(), 
"testValue2");
+               assertEquals(params.dockerParameters().get(2).getKey(), 
"testParam3");
+               assertEquals(params.dockerParameters().get(2).getValue(), 
"key3=value3");
+               assertEquals(params.dockerParameters().get(3).getKey(), 
"testParam4");
+               assertEquals(params.dockerParameters().get(3).getValue(), 
"\"key4=value4\"");
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testContainerDockerParametersMalformed() throws Exception {
+               Configuration config = new Configuration();
+               
config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS,
 "badParam");
+               MesosTaskManagerParameters params = 
MesosTaskManagerParameters.create(config);
+       }
+
+       @Test
        public void givenTwoConstraintsInConfigShouldBeParsed() throws 
Exception {
 
                MesosTaskManagerParameters mesosTaskManagerParameters = 
MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration("cluster:foo,az:eu-west-1"));

Reply via email to