Repository: flink Updated Branches: refs/heads/master 4656350fc -> e8318d6f4
[FLINK-4900] flink-master: Allow to deploy TM with container Allows via a setting to deploy a base image on that a task manager runs. This closes #2703. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8318d6f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8318d6f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8318d6f Branch: refs/heads/master Commit: e8318d6f449b0b4720ca2377eae4f98c59f5e494 Parents: 4656350 Author: Makman2 <[email protected]> Authored: Thu Oct 27 15:18:37 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Wed Nov 23 17:44:00 2016 +0100 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 25 +++++++++++ .../MesosApplicationMasterRunner.java | 47 ++++++++++++++++++++ 2 files changed, 72 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e8318d6f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index fb5a760..dd17f00 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -491,6 +491,29 @@ public final class ConfigConstants { */ public static final String MESOS_RESOURCEMANAGER_TASKS_CPUS = "mesos.resourcemanager.tasks.cpus"; + /** + * The container image to use for task managers. + */ + public static final String MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_NAME = + "mesos.resourcemanager.tasks.container.image.name"; + + /** + * The type of container to use for task managers. Valid values are + * {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS} or + * {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER}. + */ + public static final String MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE = + "mesos.resourcemanager.tasks.container.type"; + + /** + * Value for {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE} setting. Tells to use the Mesos containerizer. + */ + public static final String MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS = "mesos"; + /** + * Value for {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE} setting. Tells to use the Docker containerizer. + */ + public static final String MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER = "docker"; + // ------------------------ Hadoop Configuration ------------------------ /** @@ -1158,6 +1181,8 @@ public final class ConfigConstants { public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "*"; + public static final String DEFAULT_MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_TYPE = "mesos"; + // ------------------------ File System Behavior ------------------------ /** http://git-wip-us.apache.org/repos/asf/flink/blob/e8318d6f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 3695578..c35fa82 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -636,6 +636,53 @@ public class MesosApplicationMasterRunner { info.setCommand(cmd); + // Set container for task manager if specified in configs. + String tmImageName = flinkConfig.getString( + ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_NAME, ""); + + if (tmImageName.length() > 0) { + String taskManagerContainerType = flinkConfig.getString( + ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE, + ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_TYPE); + + Protos.ContainerInfo.Builder containerInfo; + + switch (taskManagerContainerType) { + case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS: + containerInfo = Protos.ContainerInfo.newBuilder() + .setType(Protos.ContainerInfo.Type.MESOS) + .setMesos(Protos.ContainerInfo.MesosInfo.newBuilder() + .setImage(Protos.Image.newBuilder() + .setType(Protos.Image.Type.DOCKER) + .setDocker(Protos.Image.Docker.newBuilder() + .setName(tmImageName)))); + break; + case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER: + containerInfo = Protos.ContainerInfo.newBuilder() + .setType(Protos.ContainerInfo.Type.DOCKER) + .setDocker(Protos.ContainerInfo.DockerInfo.newBuilder() + .setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST) + .setImage(tmImageName)); + break; + default: + LOG.warn( + "Invalid container type '{}' provided for setting {}. Valid values are '{}' or '{}'. " + + "Starting task managers now without container.", + taskManagerContainerType, + ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE, + ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS, + ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER); + + containerInfo = null; + + break; + } + + if (containerInfo != null) { + info.setContainer(containerInfo); + } + } + return info; } }
