[
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422764#comment-15422764
]
ASF GitHub Bot commented on FLINK-1984:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2315#discussion_r74941466
--- Diff:
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.mesos.Protos;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.ranges;
+import static org.apache.flink.mesos.Utils.scalar;
+
+/**
+ * Specifies how to launch a Mesos worker.
+ */
+public class LaunchableMesosWorker implements LaunchableTask {
+
+ /**
+ * The set of configuration keys to be dynamically configured with a
port allocated from Mesos.
+ */
+ private static String[] TM_PORT_KEYS = {
+ "taskmanager.rpc.port",
+ "taskmanager.data.port" };
+
+ private final MesosTaskManagerParameters params;
+ private final Protos.TaskInfo.Builder template;
+ private final Protos.TaskID taskID;
+ private final Request taskRequest;
+
+ /**
+ * Construct a launchable Mesos worker.
+ * @param params the TM parameters such as memory, cpu to acquire.
+ * @param template a template for the TaskInfo to be constructed at
launch time.
+ * @param taskID the taskID for this worker.
+ */
+ public LaunchableMesosWorker(MesosTaskManagerParameters params,
Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+ this.params = params;
+ this.template = template;
+ this.taskID = taskID;
+ this.taskRequest = new Request();
+ }
+
+ public Protos.TaskID taskID() {
+ return taskID;
+ }
+
+ @Override
+ public TaskRequest taskRequest() {
+ return taskRequest;
+ }
+
+ class Request implements TaskRequest {
+ private final AtomicReference<TaskRequest.AssignedResources>
assignedResources = new AtomicReference<>();
+
+ @Override
+ public String getId() {
+ return taskID.getValue();
+ }
+
+ @Override
+ public String taskGroupName() {
+ return "";
+ }
+
+ @Override
+ public double getCPUs() {
+ return params.cpus();
+ }
+
+ @Override
+ public double getMemory() {
+ return
params.containeredParameters().taskManagerTotalMemoryMB();
+ }
+
+ @Override
+ public double getNetworkMbps() {
+ return 0.0;
+ }
+
+ @Override
+ public double getDisk() {
+ return 0.0;
+ }
+
+ @Override
+ public int getPorts() {
+ return TM_PORT_KEYS.length;
+ }
+
+ @Override
+ public Map<String, NamedResourceSetRequest>
getCustomNamedResources() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public List<? extends ConstraintEvaluator> getHardConstraints()
{
+ return null;
+ }
+
+ @Override
+ public List<? extends VMTaskFitnessCalculator>
getSoftConstraints() {
+ return null;
+ }
+
+ @Override
+ public void setAssignedResources(AssignedResources
assignedResources) {
+ this.assignedResources.set(assignedResources);
+ }
+
+ @Override
+ public AssignedResources getAssignedResources() {
+ return assignedResources.get();
+ }
+
+ @Override
+ public String toString() {
+ return "Request{" +
+ "cpus=" + getCPUs() +
+ "memory=" + getMemory() +
+ '}';
+ }
+ }
+
+ /**
+ * Construct the TaskInfo needed to launch the worker.
+ * @param slaveId the assigned slave.
+ * @param assignment the assignment details.
+ * @return a fully-baked TaskInfo.
--- End diff --
indentation
> Integrate Flink with Apache Mesos
> ---------------------------------
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
> Issue Type: New Feature
> Components: Cluster Management
> Reporter: Robert Metzger
> Assignee: Eron Wright
> Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-:
> https://github.com/apache/flink/pull/251
> Update (May '16): a new effort is now underway, building on the recent
> ResourceManager work.
> Design document: ([google
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)