[
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15201019#comment-15201019
]
ASF GitHub Bot commented on FLINK-1984:
---------------------------------------
Github user Astralidea commented on a diff in the pull request:
https://github.com/apache/flink/pull/948#discussion_r56613503
--- Diff:
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/FlinkScheduler.scala
---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import java.io.File
+import java.util.{List => JList}
+
+import org.apache.flink.configuration.{Configuration, GlobalConfiguration}
+import org.apache.mesos.Protos.TaskState._
+import org.apache.mesos.Protos._
+import org.apache.mesos.{Scheduler, SchedulerDriver}
+import org.slf4j.LoggerFactory
+import scopt.OptionParser
+
+import scala.collection.JavaConversions._
+
+object FlinkScheduler extends Scheduler with SchedulerUtils {
+
+ val LOG = LoggerFactory.getLogger(FlinkScheduler.getClass)
+ var jobManager: Option[Thread] = None
+ var currentConfiguration: Option[Configuration] = None
+ var taskManagers: Set[RunningTaskManager] = Set()
+ var taskManagerCount = 0
+ // http port where http server is hosting the configuration files
+ var httpConfigServerAddress: Option[String] = None
+
+ override def offerRescinded(driver: SchedulerDriver, offerId: OfferID):
Unit = { }
+
+ override def disconnected(driver: SchedulerDriver): Unit = { }
+
+ override def reregistered(driver: SchedulerDriver, masterInfo:
MasterInfo): Unit = { }
+
+ override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit
= {
+ LOG.warn(s"Slave lost: ${slaveId.getValue}, removing all task managers
matching slaveId")
+ taskManagers = taskManagers.filter(_.slaveId != slaveId)
+ }
+
+ override def error(driver: SchedulerDriver, message: String): Unit = { }
+
+ override def frameworkMessage(driver: SchedulerDriver, executorId:
ExecutorID,
+ slaveId: SlaveID, data: Array[Byte]): Unit
= { }
+
+ override def registered(driver: SchedulerDriver, frameworkId:
FrameworkID,
+ masterInfo: MasterInfo): Unit = { }
+
+ override def executorLost(driver: SchedulerDriver, executorId:
ExecutorID,
+ slaveId: SlaveID, status: Int): Unit = {
+ LOG.warn(s"Executor ${executorId.getValue} lost with status $status on
slave $slaveId")
+ }
+
+ override def statusUpdate(driver: SchedulerDriver, status: TaskStatus):
Unit = {
+ val taskId = status.getTaskId.getValue
+ val slaveId = status.getSlaveId.getValue
+ LOG.info(
+ s"statusUpdate received from taskId: $taskId slaveId: $slaveId
[${status.getState.name()}]")
+
+ status.getState match {
+ case TASK_FAILED | TASK_FINISHED | TASK_KILLED | TASK_LOST |
TASK_ERROR =>
+ LOG.info(s"Lost taskManager with TaskId: $taskId on slave:
$slaveId")
+ taskManagers = taskManagers.filter(_.taskId != status.getTaskId)
+ case _ =>
+ LOG.debug(s"No action to take for statusUpdate
${status.getState.name()}")
+ }
+ }
+
+ override def resourceOffers(driver: SchedulerDriver, offers:
JList[Offer]): Unit = {
+ // we will combine all resources from te same slave and then launch a
single task rather
+ // than one per offer this way we have better utilization and less
memory wasted on overhead.
+ for((slaveId, offers) <- offers.groupBy(_.getSlaveId)) {
+ val tasks = constructTaskInfoFromOffers(slaveId, offers.toList)
+ driver.launchTasks(offers.map(_.getId), tasks)
+ }
+ }
+
+ def constructTaskInfoFromOffers(slaveId: SlaveID, offers: List[Offer]):
Seq[TaskInfo] = {
+ val maxTaskManagers = GlobalConfiguration.getInteger(
+ TASK_MANAGER_COUNT_KEY, DEFAULT_TASK_MANAGER_COUNT)
+ val requiredMem = GlobalConfiguration.getFloat(
+ TASK_MANAGER_MEM_KEY, DEFAULT_TASK_MANAGER_MEM)
+ val requiredCPU = GlobalConfiguration.getFloat(
+ TASK_MANAGER_CPU_KEY, DEFAULT_TASK_MANAGER_CPU.toFloat)
+ val requiredDisk = GlobalConfiguration.getFloat(
+ TASK_MANAGER_DISK_KEY, DEFAULT_TASK_MANGER_DISK)
+ val attributeConstraints =
parseConstraintString(GlobalConfiguration.getString(
+ TASK_MANAGER_OFFER_ATTRIBUTES_KEY,
DEFAULT_TASK_MANAGER_OFFER_ATTRIBUTES))
+ val role = GlobalConfiguration.getString(
+ MESOS_FRAMEWORK_ROLE_KEY, DEFAULT_MESOS_FRAMEWORK_ROLE)
+ val uberJarLocation = GlobalConfiguration.getString(
+ FLINK_UBERJAR_LOCATION_KEY, null)
+ val nativeLibPath = GlobalConfiguration.getString(
+ MESOS_NATIVE_JAVA_LIBRARY_KEY, DEFAULT_MESOS_NATIVE_JAVA_LIBRARY)
+
+ // combine offers into a single chunk
+ val totalMemory = offers.flatMap(_.getResourcesList
+ .filter(x => x.getName == "mem" && x.getRole == role)
+ .map(_.getScalar.getValue)).sum
+
+ val totalCPU = offers.flatMap(_.getResourcesList
+ .filter(x => x.getName == "cpus" && x.getRole == role)
+ .map(_.getScalar.getValue)).sum
+
+ val totalDisk = offers.flatMap(_.getResourcesList
+ .filter(x => x.getName == "disk" && x.getRole == role)
+ .map(_.getScalar.getValue)).sum
+
+ val portRanges = offers.flatMap(_.getResourcesList
+ .filter(x => x.getName == "ports" && x.getRole == role)
+ .flatMap(_.getRanges.getRangeList))
+
+ val ports = getNPortsFromPortRanges(2, portRanges)
+
+ val offerAttributes =
toAttributeMap(offers.flatMap(_.getAttributesList))
+
+ // check if all constraints are satisfield
+ // 0. We need more task managers
+ // 1. Attribute constraints
+ // 2. Memory requirements
+ // 3. CPU requirements
+ // 4. Port requirements
+ val meetsRequirements =
+ taskManagers.size < maxTaskManagers &&
+ totalCPU >= requiredCPU &&
+ totalMemory >= requiredMem &&
+ totalDisk >= requiredDisk &&
+ ports.size == 2 &&
+ matchesAttributeRequirements(attributeConstraints, offerAttributes)
+
+ LOG.info( if(meetsRequirements) "Accepting" else "Declining " +
+ s"offer(s) from slave ${slaveId.getValue} " +
+ s"offered [cpus: $totalCPU | mem : $totalMemory | disk: $totalDisk]
" +
+ s"required [cpus: $requiredCPU | mem: $requiredMem | disk:
$requiredDisk]")
+
+ if (meetsRequirements) {
+ // create task Id
+ taskManagerCount += 1
+
+ // create executor
+ val command = createTaskManagerCommand(requiredMem.toInt)
+ val log4jUrl = s"${httpConfigServerAddress.get}/log4j.properties"
+ val executorInfo = createExecutorInfo(s"$taskManagerCount", role,
+ Set(uberJarLocation, log4jUrl), command, nativeLibPath)
+
+ // create task
+ val taskId =
TaskID.newBuilder().setValue(s"TaskManager_$taskManagerCount").build()
+ val taskInfo = createTaskInfo(
+ "taskManager", taskId, slaveId, role, requiredMem,
+ requiredCPU, requiredDisk, ports, executorInfo,
currentConfiguration.get)
+
+ Seq(taskInfo)
+ } else {
+ Seq()
--- End diff --
There has a bug, if you not call driver.declineOffer(offer.id) mesos will
hold this offer for FlinkScheduler, then other framework could not use this
offer and there is no resource any more.
> Integrate Flink with Apache Mesos
> ---------------------------------
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
> Issue Type: New Feature
> Components: New Components
> Reporter: Robert Metzger
> Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> There also is a pending pull request for adding Mesos support for Flink:
> https://github.com/apache/flink/pull/251
> But the PR is insufficiently tested. I'll add the code of the pull request to
> this JIRA in case somebody wants to pick it up in the future.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)