Repository: flink Updated Branches: refs/heads/master ead1a2074 -> d2de75e45
[FLINK-9660] Allow passing custom artifacts to Mesos workers This closes #6207. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d2de75e4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d2de75e4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d2de75e4 Branch: refs/heads/master Commit: d2de75e4587b005914eac3489cd96929cde73cfc Parents: ead1a20 Author: Leonid Ishimnikov <lis...@fastmail.com> Authored: Thu Jun 21 15:13:09 2018 -0400 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Sat Jun 30 23:19:47 2018 +0200 ---------------------------------------------------------------------- .../mesos_task_manager_configuration.html | 5 +++ docs/ops/deployment/mesos.md | 2 + .../clusterframework/LaunchableMesosWorker.java | 6 +++ .../clusterframework/MesosResourceManager.java | 3 +- .../MesosTaskManagerParameters.java | 43 +++++++++++++++++++- .../MesosFlinkResourceManagerTest.java | 3 +- .../MesosResourceManagerTest.java | 2 +- .../MesosTaskManagerParametersTest.java | 21 ++++++++++ 8 files changed, 80 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d2de75e4/docs/_includes/generated/mesos_task_manager_configuration.html ---------------------------------------------------------------------- diff --git a/docs/_includes/generated/mesos_task_manager_configuration.html b/docs/_includes/generated/mesos_task_manager_configuration.html index a6ee78b..2c2ed17 100644 --- a/docs/_includes/generated/mesos_task_manager_configuration.html +++ b/docs/_includes/generated/mesos_task_manager_configuration.html @@ -63,6 +63,11 @@ <td></td> </tr> <tr> + <td><h5>mesos.resourcemanager.tasks.uris</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>A comma separated list of URIs of custom artifacts to be downloaded into the sandbox of Mesos workers.</td> + </tr> + <tr> <td><h5>taskmanager.numberOfTaskSlots</h5></td> <td style="word-wrap: break-word;">1</td> <td>The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores).</td> http://git-wip-us.apache.org/repos/asf/flink/blob/d2de75e4/docs/ops/deployment/mesos.md ---------------------------------------------------------------------- diff --git a/docs/ops/deployment/mesos.md b/docs/ops/deployment/mesos.md index 0b27e45..567d9b7 100644 --- a/docs/ops/deployment/mesos.md +++ b/docs/ops/deployment/mesos.md @@ -268,6 +268,8 @@ May be set to -1 to disable this feature. `mesos.resourcemanager.tasks.container.docker.parameters`: Custom parameters to be passed into docker run command when using the docker containerizer. Comma separated list of `key=value` pairs. `value` may contain '=' (**NO DEFAULT**) +`mesos.resourcemanager.tasks.uris`: A comma separated list of URIs of custom artifacts to be downloaded into the sandbox of Mesos workers. (**NO DEFAULT**) + `mesos.resourcemanager.tasks.hostname`: Optional value to define the TaskManager's hostname. The pattern `_TASK_` is replaced by the actual id of the Mesos task. This can be used to configure the TaskManager to use Mesos DNS (e.g. `_TASK_.flink-service.mesos`) for name lookups. (**NO DEFAULT**) `mesos.resourcemanager.tasks.bootstrap-cmd`: A command which is executed before the TaskManager is started (**NO DEFAULT**). http://git-wip-us.apache.org/repos/asf/flink/blob/d2de75e4/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java index 18c6b1c..0bf09f8 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java @@ -36,6 +36,7 @@ import com.netflix.fenzo.ConstraintEvaluator; import com.netflix.fenzo.TaskRequest; import com.netflix.fenzo.VMTaskFitnessCalculator; import org.apache.mesos.Protos; +import org.apache.mesos.Protos.CommandInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -247,6 +248,11 @@ public class LaunchableMesosWorker implements LaunchableTask { cmd.addUris(Utils.uri(resolver, artifact)); } + // add user-specified URIs + for (String uri : params.uris()) { + cmd.addUris(CommandInfo.URI.newBuilder().setValue(uri)); + } + // propagate environment variables for (Map.Entry<String, String> entry : params.containeredParameters().taskManagerEnv().entrySet()) { env.addVariables(variable(entry.getKey(), entry.getValue())); http://git-wip-us.apache.org/repos/asf/flink/blob/d2de75e4/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index 9b5d50e..4d62c04 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -669,7 +669,8 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN taskManagerParameters.constraints(), taskManagerParameters.command(), taskManagerParameters.bootstrapCommand(), - taskManagerParameters.getTaskManagerHostname() + taskManagerParameters.getTaskManagerHostname(), + taskManagerParameters.uris() ); LOG.debug("LaunchableMesosWorker parameters: {}", params); http://git-wip-us.apache.org/repos/asf/flink/blob/d2de75e4/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java index 6dbfc93..1c19cf2 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java @@ -89,6 +89,12 @@ public class MesosTaskManagerParameters { key("mesos.resourcemanager.tasks.bootstrap-cmd") .noDefaultValue(); + public static final ConfigOption<String> MESOS_TM_URIS = + key("mesos.resourcemanager.tasks.uris") + .noDefaultValue() + .withDescription("A comma separated list of URIs of custom artifacts to be downloaded into the sandbox" + + " of Mesos workers."); + public static final ConfigOption<String> MESOS_RM_CONTAINER_VOLUMES = key("mesos.resourcemanager.tasks.container.volumes") .noDefaultValue() @@ -137,6 +143,8 @@ public class MesosTaskManagerParameters { private final Option<String> taskManagerHostname; + private final List<String> uris; + public MesosTaskManagerParameters( double cpus, int gpus, @@ -148,7 +156,8 @@ public class MesosTaskManagerParameters { List<ConstraintEvaluator> constraints, String command, Option<String> bootstrapCommand, - Option<String> taskManagerHostname) { + Option<String> taskManagerHostname, + List<String> uris) { this.cpus = cpus; this.gpus = gpus; @@ -161,6 +170,7 @@ public class MesosTaskManagerParameters { this.command = Preconditions.checkNotNull(command); this.bootstrapCommand = Preconditions.checkNotNull(bootstrapCommand); this.taskManagerHostname = Preconditions.checkNotNull(taskManagerHostname); + this.uris = Preconditions.checkNotNull(uris); } /** @@ -242,6 +252,13 @@ public class MesosTaskManagerParameters { return bootstrapCommand; } + /** + * Get custom artifact URIs. + */ + public List<String> uris() { + return uris; + } + @Override public String toString() { return "MesosTaskManagerParameters{" + @@ -256,6 +273,7 @@ public class MesosTaskManagerParameters { ", taskManagerHostName=" + taskManagerHostname + ", command=" + command + ", bootstrapCommand=" + bootstrapCommand + + ", uris=" + uris + '}'; } @@ -309,10 +327,14 @@ public class MesosTaskManagerParameters { Option<String> dockerParamsOpt = Option.<String>apply(flinkConfig.getString(MESOS_RM_CONTAINER_DOCKER_PARAMETERS)); + Option<String> uriParamsOpt = Option.<String>apply(flinkConfig.getString(MESOS_TM_URIS)); + List<Protos.Volume> containerVolumes = buildVolumes(containerVolOpt); List<Protos.Parameter> dockerParameters = buildDockerParameters(dockerParamsOpt); + List<String> uris = buildUris(uriParamsOpt); + //obtain Task Manager Host Name from the configuration Option<String> taskManagerHostname = Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME)); @@ -331,7 +353,8 @@ public class MesosTaskManagerParameters { constraints, tmCommand, tmBootstrapCommand, - taskManagerHostname); + taskManagerHostname, + uris); } private static List<ConstraintEvaluator> parseConstraints(String mesosConstraints) { @@ -445,6 +468,22 @@ public class MesosTaskManagerParameters { } /** + * Build a list of URIs for providing custom artifacts to Mesos tasks. + * @param uris a comma delimited optional string listing artifact URIs + */ + public static List<String> buildUris(Option<String> uris) { + if (uris.isEmpty()) { + return Collections.emptyList(); + } else { + List<String> urisList = new ArrayList<>(); + for (String uri : uris.get().split(",")) { + urisList.add(uri.trim()); + } + return urisList; + } + } + + /** * The supported containerizers. */ public enum ContainerType { http://git-wip-us.apache.org/repos/asf/flink/blob/d2de75e4/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java index 330a2c6..7ee8808 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java @@ -254,7 +254,8 @@ public class MesosFlinkResourceManagerTest extends TestLogger { Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(), - Option.<String>empty()); + Option.<String>empty(), + Collections.<String>emptyList()); TestActorRef<TestingMesosFlinkResourceManager> resourceManagerRef = TestActorRef.create(system, MesosFlinkResourceManager.createActorProps( http://git-wip-us.apache.org/repos/asf/flink/blob/d2de75e4/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index 56d2fbd..65576bb 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -281,7 +281,7 @@ public class MesosResourceManagerTest extends TestLogger { 1.0, 1, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams, Collections.<Protos.Volume>emptyList(), Collections.<Protos.Parameter>emptyList(), Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(), - Option.<String>empty()); + Option.<String>empty(), Collections.<String>emptyList()); // resource manager rmConfiguration = new ResourceManagerConfiguration( http://git-wip-us.apache.org/repos/asf/flink/blob/d2de75e4/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java index d5b5019..e002cb9 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java @@ -122,6 +122,27 @@ public class MesosTaskManagerParametersTest extends TestLogger { } @Test + public void testUriParameters() throws Exception { + Configuration config = new Configuration(); + config.setString(MesosTaskManagerParameters.MESOS_TM_URIS, + "file:///dev/null,http://localhost/test, test_url "); + + MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config); + assertEquals(params.uris().size(), 3); + assertEquals(params.uris().get(0), "file:///dev/null"); + assertEquals(params.uris().get(1), "http://localhost/test"); + assertEquals(params.uris().get(2), "test_url"); + } + + @Test + public void testUriParametersDefault() throws Exception { + Configuration config = new Configuration(); + + MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config); + assertEquals(params.uris().size(), 0); + } + + @Test public void givenTwoConstraintsInConfigShouldBeParsed() throws Exception { MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration("cluster:foo,az:eu-west-1"));