This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cf10e70 [FLINK-12139][Mesos] Add disk space parameter.
cf10e70 is described below
commit cf10e70088645b0fcc54ab03ecec3ef372ddb652
Author: Juan Gentile <[email protected]>
AuthorDate: Tue Apr 9 13:48:36 2019 +0200
[FLINK-12139][Mesos] Add disk space parameter.
This closes #8224.
---
.../generated/mesos_task_manager_configuration.html | 5 +++++
.../flink/mesos/entrypoint/MesosEntrypointUtils.java | 5 +++--
.../clusterframework/LaunchableMesosWorker.java | 6 +++++-
.../clusterframework/MesosTaskManagerParameters.java | 19 +++++++++++++++++++
.../clusterframework/MesosResourceManagerTest.java | 2 +-
5 files changed, 33 insertions(+), 4 deletions(-)
diff --git a/docs/_includes/generated/mesos_task_manager_configuration.html
b/docs/_includes/generated/mesos_task_manager_configuration.html
index 1e67f84..338acc6 100644
--- a/docs/_includes/generated/mesos_task_manager_configuration.html
+++ b/docs/_includes/generated/mesos_task_manager_configuration.html
@@ -48,6 +48,11 @@
<td>CPUs to assign to the Mesos workers.</td>
</tr>
<tr>
+ <td><h5>mesos.resourcemanager.tasks.disk</h5></td>
+ <td style="word-wrap: break-word;">0</td>
+ <td>Disk space to assign to the Mesos workers in MB.</td>
+ </tr>
+ <tr>
<td><h5>mesos.resourcemanager.tasks.gpus</h5></td>
<td style="word-wrap: break-word;">0</td>
<td>GPUs to assign to the Mesos workers.</td>
diff --git
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
index 2059c8e..4d7a485 100755
---
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
+++
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
@@ -109,12 +109,13 @@ public class MesosEntrypointUtils {
log.info("TaskManagers will be created with {} task slots",
taskManagerParameters.containeredParameters().numSlots());
log.info("TaskManagers will be started with container size {}
MB, JVM heap size {} MB, " +
- "JVM direct memory limit {} MB, {} cpus, {}
gpus",
+ "JVM direct memory limit {} MB, {} cpus, {}
gpus, disk space {} MB",
taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(),
taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(),
taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
taskManagerParameters.cpus(),
- taskManagerParameters.gpus());
+ taskManagerParameters.gpus(),
+ taskManagerParameters.disk());
return taskManagerParameters;
}
diff --git
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 637442c..22f6a30 100644
---
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -145,7 +145,7 @@ public class LaunchableMesosWorker implements
LaunchableTask {
@Override
public double getDisk() {
- return 0.0;
+ return params.disk();
}
@Override
@@ -221,6 +221,10 @@ public class LaunchableMesosWorker implements
LaunchableTask {
taskInfo.addAllResources(allocation.takeScalar("gpus",
taskRequest.getGPUs(), roles));
taskInfo.addAllResources(allocation.takeScalar("mem",
taskRequest.getMemory(), roles));
+ if (taskRequest.getDisk() > 0.0) {
+ taskInfo.addAllResources(allocation.takeScalar("disk",
taskRequest.getDisk(), roles));
+ }
+
final Protos.CommandInfo.Builder cmd =
taskInfo.getCommandBuilder();
final Protos.Environment.Builder env =
cmd.getEnvironmentBuilder();
final StringBuilder jvmArgs = new StringBuilder();
diff --git
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 0315629..1d49000 100644
---
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -59,6 +59,11 @@ public class MesosTaskManagerParameters {
.defaultValue(1024)
.withDescription("Memory to assign to the Mesos workers in
MB.");
+ public static final ConfigOption<Integer> MESOS_RM_TASKS_DISK_MB =
+ key("mesos.resourcemanager.tasks.disk")
+ .defaultValue(0)
+ .withDescription(Description.builder().text("Disk space to
assign to the Mesos workers in MB.").build());
+
public static final ConfigOption<Double> MESOS_RM_TASKS_CPUS =
key("mesos.resourcemanager.tasks.cpus")
.defaultValue(0.0)
@@ -145,6 +150,8 @@ public class MesosTaskManagerParameters {
private final int gpus;
+ private final int disk;
+
private final ContainerType containerType;
private final Option<String> containerImageName;
@@ -170,6 +177,7 @@ public class MesosTaskManagerParameters {
public MesosTaskManagerParameters(
double cpus,
int gpus,
+ int disk,
ContainerType containerType,
Option<String> containerImageName,
ContaineredTaskManagerParameters containeredParameters,
@@ -184,6 +192,7 @@ public class MesosTaskManagerParameters {
this.cpus = cpus;
this.gpus = gpus;
+ this.disk = disk;
this.containerType = Preconditions.checkNotNull(containerType);
this.containerImageName =
Preconditions.checkNotNull(containerImageName);
this.containeredParameters =
Preconditions.checkNotNull(containeredParameters);
@@ -212,6 +221,13 @@ public class MesosTaskManagerParameters {
}
/**
+ * Get the disk space in MB to use for the TaskManager Process.
+ */
+ public int disk() {
+ return disk;
+ }
+
+ /**
* Get the container type (Mesos or Docker). The default is Mesos.
*
* <p>Mesos provides a facility for a framework to specify which
containerizer to use.
@@ -335,6 +351,8 @@ public class MesosTaskManagerParameters {
" cannot be negative");
}
+ int disk = flinkConfig.getInteger(MESOS_RM_TASKS_DISK_MB);
+
// parse the containerization parameters
String imageName =
flinkConfig.getString(MESOS_RM_CONTAINER_IMAGE_NAME);
@@ -379,6 +397,7 @@ public class MesosTaskManagerParameters {
return new MesosTaskManagerParameters(
cpus,
gpus,
+ disk,
containerType,
Option.apply(imageName),
containeredParameters,
diff --git
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index c5d053c..617ad41 100644
---
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -279,7 +279,7 @@ public class MesosResourceManagerTest extends TestLogger {
ContaineredTaskManagerParameters containeredParams =
new ContaineredTaskManagerParameters(1024, 768,
256, 4, new HashMap<String, String>());
MesosTaskManagerParameters tmParams = new
MesosTaskManagerParameters(
- 1.0, 1,
MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(),
containeredParams,
+ 1.0, 1, 0,
MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(),
containeredParams,
Collections.<Protos.Volume>emptyList(),
Collections.<Protos.Parameter>emptyList(), false,
Collections.<ConstraintEvaluator>emptyList(),
"", Option.<String>empty(),
Option.<String>empty(),
Collections.<String>emptyList());