[FLINK-8490] Allow custom docker parameters for docker tasks on Mesos This closes #5346.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6969fe2f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6969fe2f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6969fe2f Branch: refs/heads/master Commit: 6969fe2fa823be7748cee002a32df02fd1cae09f Parents: a6d7f2d Author: Leonid Ishimnikov <[email protected]> Authored: Tue Jan 23 13:55:17 2018 -0500 Committer: Till Rohrmann <[email protected]> Committed: Fri Jan 26 13:50:21 2018 +0100 ---------------------------------------------------------------------- docs/ops/config.md | 2 + docs/ops/deployment/mesos.md | 2 + .../clusterframework/LaunchableMesosWorker.java | 1 + .../clusterframework/MesosResourceManager.java | 1 + .../MesosTaskManagerParameters.java | 47 ++++++++++++++++++++ .../MesosFlinkResourceManagerTest.java | 1 + .../MesosResourceManagerTest.java | 3 +- .../MesosTaskManagerParametersTest.java | 36 +++++++++++++++ 8 files changed, 92 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/docs/ops/config.md ---------------------------------------------------------------------- diff --git a/docs/ops/config.md b/docs/ops/config.md index 8a4b1c1..43f281b 100644 --- a/docs/ops/config.md +++ b/docs/ops/config.md @@ -512,6 +512,8 @@ May be set to -1 to disable this feature. - `mesos.resourcemanager.tasks.container.volumes`: A comma separated list of `[host_path:]`container_path`[:RO|RW]`. This allows for mounting additional volumes into your container. (**NO DEFAULT**) +- `mesos.resourcemanager.tasks.container.docker.parameters`: Custom parameters to be passed into docker run command when using the docker containerizer. Comma separated list of `key=value` pairs. `value` may contain '=' (**NO DEFAULT**) + - `high-availability.zookeeper.path.mesos-workers`: The ZooKeeper root path for persisting the Mesos worker information. ### High Availability (HA) http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/docs/ops/deployment/mesos.md ---------------------------------------------------------------------- diff --git a/docs/ops/deployment/mesos.md b/docs/ops/deployment/mesos.md index 5771abe..4d08104 100644 --- a/docs/ops/deployment/mesos.md +++ b/docs/ops/deployment/mesos.md @@ -264,6 +264,8 @@ May be set to -1 to disable this feature. `mesos.resourcemanager.tasks.container.volumes`: A comma separated list of `[host_path:]`container_path`[:RO|RW]`. This allows for mounting additional volumes into your container. (**NO DEFAULT**) +`mesos.resourcemanager.tasks.container.docker.parameters`: Custom parameters to be passed into docker run command when using the docker containerizer. Comma separated list of `key=value` pairs. `value` may contain '=' (**NO DEFAULT**) + `mesos.resourcemanager.tasks.hostname`: Optional value to define the TaskManager's hostname. The pattern `_TASK_` is replaced by the actual id of the Mesos task. This can be used to configure the TaskManager to use Mesos DNS (e.g. `_TASK_.flink-service.mesos`) for name lookups. (**NO DEFAULT**) `mesos.resourcemanager.tasks.bootstrap-cmd`: A command which is executed before the TaskManager is started (**NO DEFAULT**). http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java index e71c703..b4176a8 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java @@ -298,6 +298,7 @@ public class LaunchableMesosWorker implements LaunchableTask { containerInfo .setType(Protos.ContainerInfo.Type.DOCKER) .setDocker(Protos.ContainerInfo.DockerInfo.newBuilder() + .addAllParameters(params.dockerParameters()) .setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST) .setImage(params.containerImageName().get())); break; http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index 8b67257..d76f2fe 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -672,6 +672,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN 1, new HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())), taskManagerParameters.containerVolumes(), + taskManagerParameters.dockerParameters(), taskManagerParameters.constraints(), taskManagerParameters.command(), taskManagerParameters.bootstrapCommand(), http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java index 3859913..f0ba113 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java @@ -86,6 +86,10 @@ public class MesosTaskManagerParameters { key("mesos.resourcemanager.tasks.container.volumes") .noDefaultValue(); + public static final ConfigOption<String> MESOS_RM_CONTAINER_DOCKER_PARAMETERS = + key("mesos.resourcemanager.tasks.container.docker.parameters") + .noDefaultValue(); + public static final ConfigOption<String> MESOS_CONSTRAINTS_HARD_HOSTATTR = key("mesos.constraints.hard.hostattribute") .noDefaultValue(); @@ -109,6 +113,8 @@ public class MesosTaskManagerParameters { private final List<Protos.Volume> containerVolumes; + private final List<Protos.Parameter> dockerParameters; + private final List<ConstraintEvaluator> constraints; private final String command; @@ -123,6 +129,7 @@ public class MesosTaskManagerParameters { Option<String> containerImageName, ContaineredTaskManagerParameters containeredParameters, List<Protos.Volume> containerVolumes, + List<Protos.Parameter> dockerParameters, List<ConstraintEvaluator> constraints, String command, Option<String> bootstrapCommand, @@ -133,6 +140,7 @@ public class MesosTaskManagerParameters { this.containerImageName = Preconditions.checkNotNull(containerImageName); this.containeredParameters = Preconditions.checkNotNull(containeredParameters); this.containerVolumes = Preconditions.checkNotNull(containerVolumes); + this.dockerParameters = Preconditions.checkNotNull(dockerParameters); this.constraints = Preconditions.checkNotNull(constraints); this.command = Preconditions.checkNotNull(command); this.bootstrapCommand = Preconditions.checkNotNull(bootstrapCommand); @@ -177,6 +185,13 @@ public class MesosTaskManagerParameters { } /** + * Get Docker runtime parameters. + */ + public List<Protos.Parameter> dockerParameters() { + return dockerParameters; + } + + /** * Get the placement constraints. */ public List<ConstraintEvaluator> constraints() { @@ -212,6 +227,7 @@ public class MesosTaskManagerParameters { ", containerImageName=" + containerImageName + ", containeredParameters=" + containeredParameters + ", containerVolumes=" + containerVolumes + + ", dockerParameters=" + dockerParameters + ", constraints=" + constraints + ", taskManagerHostName=" + taskManagerHostname + ", command=" + command + @@ -260,8 +276,12 @@ public class MesosTaskManagerParameters { Option<String> containerVolOpt = Option.<String>apply(flinkConfig.getString(MESOS_RM_CONTAINER_VOLUMES)); + Option<String> dockerParamsOpt = Option.<String>apply(flinkConfig.getString(MESOS_RM_CONTAINER_DOCKER_PARAMETERS)); + List<Protos.Volume> containerVolumes = buildVolumes(containerVolOpt); + List<Protos.Parameter> dockerParameters = buildDockerParameters(dockerParamsOpt); + //obtain Task Manager Host Name from the configuration Option<String> taskManagerHostname = Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME)); @@ -275,6 +295,7 @@ public class MesosTaskManagerParameters { Option.apply(imageName), containeredParameters, containerVolumes, + dockerParameters, constraints, tmCommand, tmBootstrapCommand, @@ -365,6 +386,32 @@ public class MesosTaskManagerParameters { } } + public static List<Protos.Parameter> buildDockerParameters(Option<String> dockerParameters) { + if (dockerParameters.isEmpty()) { + return Collections.emptyList(); + } else { + String[] dockerParameterSpecifications = dockerParameters.get().split(","); + + List<Protos.Parameter> parameters = new ArrayList<>(dockerParameterSpecifications.length); + + for (String dockerParameterSpecification : dockerParameterSpecifications) { + if (!dockerParameterSpecification.trim().isEmpty()) { + // split with the limit of 2 in case the value includes '=' + String[] match = dockerParameterSpecification.split("=", 2); + if (match.length != 2) { + throw new IllegalArgumentException("Docker parameter specification is invalid, given: " + + dockerParameterSpecification); + } + Protos.Parameter.Builder parameter = Protos.Parameter.newBuilder(); + parameter.setKey(match[0]); + parameter.setValue(match[1]); + parameters.add(parameter.build()); + } + } + return parameters; + } + } + /** * The supported containerizers. */ http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java index ff32486..c2455ec 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java @@ -250,6 +250,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger { Option.<String>empty(), containeredParams, Collections.<Protos.Volume>emptyList(), + Collections.<Protos.Parameter>emptyList(), Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(), http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index 85ba142..14e8ed9 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -263,7 +263,8 @@ public class MesosResourceManagerTest extends TestLogger { new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>()); MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters( 1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams, - Collections.<Protos.Volume>emptyList(), Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(), + Collections.<Protos.Volume>emptyList(), Collections.<Protos.Parameter>emptyList(), + Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(), Option.<String>empty()); // resource manager http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java index 1f33cb5..84b0ff2 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java @@ -85,6 +85,42 @@ public class MesosTaskManagerParametersTest extends TestLogger { } @Test + public void testContainerDockerParameter() throws Exception { + Configuration config = new Configuration(); + config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS, "testKey=testValue"); + + MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config); + assertEquals(params.dockerParameters().size(), 1); + assertEquals(params.dockerParameters().get(0).getKey(), "testKey"); + assertEquals(params.dockerParameters().get(0).getValue(), "testValue"); + } + + @Test + public void testContainerDockerParameters() throws Exception { + Configuration config = new Configuration(); + config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS, + "testKey1=testValue1,testKey2=testValue2,testParam3=key3=value3,testParam4=\"key4=value4\""); + + MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config); + assertEquals(params.dockerParameters().size(), 4); + assertEquals(params.dockerParameters().get(0).getKey(), "testKey1"); + assertEquals(params.dockerParameters().get(0).getValue(), "testValue1"); + assertEquals(params.dockerParameters().get(1).getKey(), "testKey2"); + assertEquals(params.dockerParameters().get(1).getValue(), "testValue2"); + assertEquals(params.dockerParameters().get(2).getKey(), "testParam3"); + assertEquals(params.dockerParameters().get(2).getValue(), "key3=value3"); + assertEquals(params.dockerParameters().get(3).getKey(), "testParam4"); + assertEquals(params.dockerParameters().get(3).getValue(), "\"key4=value4\""); + } + + @Test(expected = IllegalArgumentException.class) + public void testContainerDockerParametersMalformed() throws Exception { + Configuration config = new Configuration(); + config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS, "badParam"); + MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config); + } + + @Test public void givenTwoConstraintsInConfigShouldBeParsed() throws Exception { MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration("cluster:foo,az:eu-west-1"));
