Repository: flink
Updated Branches:
  refs/heads/master ead1a2074 -> d2de75e45


[FLINK-9660] Allow passing custom artifacts to Mesos workers

This closes #6207.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d2de75e4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d2de75e4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d2de75e4

Branch: refs/heads/master
Commit: d2de75e4587b005914eac3489cd96929cde73cfc
Parents: ead1a20
Author: Leonid Ishimnikov <lis...@fastmail.com>
Authored: Thu Jun 21 15:13:09 2018 -0400
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Sat Jun 30 23:19:47 2018 +0200

----------------------------------------------------------------------
 .../mesos_task_manager_configuration.html       |  5 +++
 docs/ops/deployment/mesos.md                    |  2 +
 .../clusterframework/LaunchableMesosWorker.java |  6 +++
 .../clusterframework/MesosResourceManager.java  |  3 +-
 .../MesosTaskManagerParameters.java             | 43 +++++++++++++++++++-
 .../MesosFlinkResourceManagerTest.java          |  3 +-
 .../MesosResourceManagerTest.java               |  2 +-
 .../MesosTaskManagerParametersTest.java         | 21 ++++++++++
 8 files changed, 80 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d2de75e4/docs/_includes/generated/mesos_task_manager_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/mesos_task_manager_configuration.html 
b/docs/_includes/generated/mesos_task_manager_configuration.html
index a6ee78b..2c2ed17 100644
--- a/docs/_includes/generated/mesos_task_manager_configuration.html
+++ b/docs/_includes/generated/mesos_task_manager_configuration.html
@@ -63,6 +63,11 @@
             <td></td>
         </tr>
         <tr>
+            <td><h5>mesos.resourcemanager.tasks.uris</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>A comma separated list of URIs of custom artifacts to be 
downloaded into the sandbox of Mesos workers.</td>
+        </tr>
+        <tr>
             <td><h5>taskmanager.numberOfTaskSlots</h5></td>
             <td style="word-wrap: break-word;">1</td>
             <td>The number of parallel operator or user function instances 
that a single TaskManager can run. If this value is larger than 1, a single 
TaskManager takes multiple instances of a function or operator. That way, the 
TaskManager can utilize multiple CPU cores, but at the same time, the available 
memory is divided between the different operator or function instances. This 
value is typically proportional to the number of physical CPU cores that the 
TaskManager's machine has (e.g., equal to the number of cores, or half the 
number of cores).</td>

http://git-wip-us.apache.org/repos/asf/flink/blob/d2de75e4/docs/ops/deployment/mesos.md
----------------------------------------------------------------------
diff --git a/docs/ops/deployment/mesos.md b/docs/ops/deployment/mesos.md
index 0b27e45..567d9b7 100644
--- a/docs/ops/deployment/mesos.md
+++ b/docs/ops/deployment/mesos.md
@@ -268,6 +268,8 @@ May be set to -1 to disable this feature.
 
 `mesos.resourcemanager.tasks.container.docker.parameters`: Custom parameters 
to be passed into docker run command when using the docker containerizer. Comma 
separated list of `key=value` pairs. `value` may contain '=' (**NO DEFAULT**)
 
+`mesos.resourcemanager.tasks.uris`: A comma separated list of URIs of custom 
artifacts to be downloaded into the sandbox of Mesos workers. (**NO DEFAULT**)
+
 `mesos.resourcemanager.tasks.hostname`: Optional value to define the 
TaskManager's hostname. The pattern `_TASK_` is replaced by the actual id of 
the Mesos task. This can be used to configure the TaskManager to use Mesos DNS 
(e.g. `_TASK_.flink-service.mesos`) for name lookups. (**NO DEFAULT**)
 
 `mesos.resourcemanager.tasks.bootstrap-cmd`: A command which is executed 
before the TaskManager is started (**NO DEFAULT**).

http://git-wip-us.apache.org/repos/asf/flink/blob/d2de75e4/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 18c6b1c..0bf09f8 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -36,6 +36,7 @@ import com.netflix.fenzo.ConstraintEvaluator;
 import com.netflix.fenzo.TaskRequest;
 import com.netflix.fenzo.VMTaskFitnessCalculator;
 import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.CommandInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -247,6 +248,11 @@ public class LaunchableMesosWorker implements 
LaunchableTask {
                        cmd.addUris(Utils.uri(resolver, artifact));
                }
 
+               // add user-specified URIs
+               for (String uri : params.uris()) {
+                       cmd.addUris(CommandInfo.URI.newBuilder().setValue(uri));
+               }
+
                // propagate environment variables
                for (Map.Entry<String, String> entry : 
params.containeredParameters().taskManagerEnv().entrySet()) {
                        env.addVariables(variable(entry.getKey(), 
entry.getValue()));

http://git-wip-us.apache.org/repos/asf/flink/blob/d2de75e4/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 9b5d50e..4d62c04 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -669,7 +669,8 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
                        taskManagerParameters.constraints(),
                        taskManagerParameters.command(),
                        taskManagerParameters.bootstrapCommand(),
-                       taskManagerParameters.getTaskManagerHostname()
+                       taskManagerParameters.getTaskManagerHostname(),
+                       taskManagerParameters.uris()
                );
 
                LOG.debug("LaunchableMesosWorker parameters: {}", params);

http://git-wip-us.apache.org/repos/asf/flink/blob/d2de75e4/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 6dbfc93..1c19cf2 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -89,6 +89,12 @@ public class MesosTaskManagerParameters {
                key("mesos.resourcemanager.tasks.bootstrap-cmd")
                .noDefaultValue();
 
+       public static final ConfigOption<String> MESOS_TM_URIS =
+               key("mesos.resourcemanager.tasks.uris")
+               .noDefaultValue()
+               .withDescription("A comma separated list of URIs of custom 
artifacts to be downloaded into the sandbox" +
+                       " of Mesos workers.");
+
        public static final ConfigOption<String> MESOS_RM_CONTAINER_VOLUMES =
                key("mesos.resourcemanager.tasks.container.volumes")
                .noDefaultValue()
@@ -137,6 +143,8 @@ public class MesosTaskManagerParameters {
 
        private final Option<String> taskManagerHostname;
 
+       private final List<String> uris;
+
        public MesosTaskManagerParameters(
                        double cpus,
                        int gpus,
@@ -148,7 +156,8 @@ public class MesosTaskManagerParameters {
                        List<ConstraintEvaluator> constraints,
                        String command,
                        Option<String> bootstrapCommand,
-                       Option<String> taskManagerHostname) {
+                       Option<String> taskManagerHostname,
+                       List<String> uris) {
 
                this.cpus = cpus;
                this.gpus = gpus;
@@ -161,6 +170,7 @@ public class MesosTaskManagerParameters {
                this.command = Preconditions.checkNotNull(command);
                this.bootstrapCommand = 
Preconditions.checkNotNull(bootstrapCommand);
                this.taskManagerHostname = 
Preconditions.checkNotNull(taskManagerHostname);
+               this.uris = Preconditions.checkNotNull(uris);
        }
 
        /**
@@ -242,6 +252,13 @@ public class MesosTaskManagerParameters {
                return bootstrapCommand;
        }
 
+       /**
+        * Get custom artifact URIs.
+        */
+       public List<String> uris() {
+               return uris;
+       }
+
        @Override
        public String toString() {
                return "MesosTaskManagerParameters{" +
@@ -256,6 +273,7 @@ public class MesosTaskManagerParameters {
                        ", taskManagerHostName=" + taskManagerHostname +
                        ", command=" + command +
                        ", bootstrapCommand=" + bootstrapCommand +
+                       ", uris=" + uris +
                        '}';
        }
 
@@ -309,10 +327,14 @@ public class MesosTaskManagerParameters {
 
                Option<String> dockerParamsOpt = 
Option.<String>apply(flinkConfig.getString(MESOS_RM_CONTAINER_DOCKER_PARAMETERS));
 
+               Option<String> uriParamsOpt = 
Option.<String>apply(flinkConfig.getString(MESOS_TM_URIS));
+
                List<Protos.Volume> containerVolumes = 
buildVolumes(containerVolOpt);
 
                List<Protos.Parameter> dockerParameters = 
buildDockerParameters(dockerParamsOpt);
 
+               List<String> uris = buildUris(uriParamsOpt);
+
                //obtain Task Manager Host Name from the configuration
                Option<String> taskManagerHostname = 
Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME));
 
@@ -331,7 +353,8 @@ public class MesosTaskManagerParameters {
                        constraints,
                        tmCommand,
                        tmBootstrapCommand,
-                       taskManagerHostname);
+                       taskManagerHostname,
+                       uris);
        }
 
        private static List<ConstraintEvaluator> parseConstraints(String 
mesosConstraints) {
@@ -445,6 +468,22 @@ public class MesosTaskManagerParameters {
        }
 
        /**
+        * Build a list of URIs for providing custom artifacts to Mesos tasks.
+        * @param uris a comma delimited optional string listing artifact URIs
+        */
+       public static List<String> buildUris(Option<String> uris) {
+               if (uris.isEmpty()) {
+                       return Collections.emptyList();
+               } else {
+                       List<String> urisList = new ArrayList<>();
+                       for (String uri : uris.get().split(",")) {
+                               urisList.add(uri.trim());
+                       }
+                       return urisList;
+               }
+       }
+
+       /**
         * The supported containerizers.
         */
        public enum ContainerType {

http://git-wip-us.apache.org/repos/asf/flink/blob/d2de75e4/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 330a2c6..7ee8808 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -254,7 +254,8 @@ public class MesosFlinkResourceManagerTest extends 
TestLogger {
                                Collections.<ConstraintEvaluator>emptyList(),
                                "",
                                Option.<String>empty(),
-                               Option.<String>empty());
+                               Option.<String>empty(),
+                               Collections.<String>emptyList());
 
                        TestActorRef<TestingMesosFlinkResourceManager> 
resourceManagerRef =
                                TestActorRef.create(system, 
MesosFlinkResourceManager.createActorProps(

http://git-wip-us.apache.org/repos/asf/flink/blob/d2de75e4/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 56d2fbd..65576bb 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -281,7 +281,7 @@ public class MesosResourceManagerTest extends TestLogger {
                                1.0, 1, 
MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), 
containeredParams,
                                Collections.<Protos.Volume>emptyList(), 
Collections.<Protos.Parameter>emptyList(),
                                Collections.<ConstraintEvaluator>emptyList(), 
"", Option.<String>empty(),
-                               Option.<String>empty());
+                               Option.<String>empty(), 
Collections.<String>emptyList());
 
                        // resource manager
                        rmConfiguration = new ResourceManagerConfiguration(

http://git-wip-us.apache.org/repos/asf/flink/blob/d2de75e4/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
index d5b5019..e002cb9 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
@@ -122,6 +122,27 @@ public class MesosTaskManagerParametersTest extends 
TestLogger {
        }
 
        @Test
+       public void testUriParameters() throws Exception {
+               Configuration config = new Configuration();
+               config.setString(MesosTaskManagerParameters.MESOS_TM_URIS,
+                               "file:///dev/null,http://localhost/test,  
test_url ");
+
+               MesosTaskManagerParameters params = 
MesosTaskManagerParameters.create(config);
+               assertEquals(params.uris().size(), 3);
+               assertEquals(params.uris().get(0), "file:///dev/null");
+               assertEquals(params.uris().get(1), "http://localhost/test";);
+               assertEquals(params.uris().get(2), "test_url");
+       }
+
+       @Test
+       public void testUriParametersDefault() throws Exception {
+               Configuration config = new Configuration();
+
+               MesosTaskManagerParameters params = 
MesosTaskManagerParameters.create(config);
+               assertEquals(params.uris().size(), 0);
+       }
+
+       @Test
        public void givenTwoConstraintsInConfigShouldBeParsed() throws 
Exception {
 
                MesosTaskManagerParameters mesosTaskManagerParameters = 
MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration("cluster:foo,az:eu-west-1"));

Reply via email to