[
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14735270#comment-14735270
]
ASF GitHub Bot commented on FLINK-1984:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/948#discussion_r38957167
--- Diff:
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/SchedulerUtils.scala
---
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
+
+import com.google.common.base.Splitter
+import com.google.protobuf.{ByteString, GeneratedMessage}
+import org.apache.flink.configuration.{ConfigConstants, Configuration,
GlobalConfiguration}
+import org.apache.flink.configuration.ConfigConstants._
+import org.apache.flink.mesos._
+import org.apache.flink.mesos.scheduler.FlinkScheduler._
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.jobmanager.{JobManager, JobManagerMode}
+import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.mesos.{MesosSchedulerDriver, Scheduler, SchedulerDriver}
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.CommandInfo.URI
+import org.apache.mesos.Protos.Value.Ranges
+import org.apache.mesos.Protos.Value.Type._
+
+/**
+ * This code is borrowed and inspired from Apache Spark Project:
+ *
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+ */
+trait SchedulerUtils {
+
+ /**
+ * Converts the attributes from the resource offer into a Map of name ->
Attribute Value
+ * The attribute values are the mesos attribute types and they are
+ * @param offerAttributes List of attributes sent with an offer
+ * @return
+ */
+ def toAttributeMap(offerAttributes: JList[Attribute]): Map[String,
GeneratedMessage] = {
+ offerAttributes.map(attr => {
+ val attrValue = attr.getType match {
+ case SCALAR => attr.getScalar
+ case Value.Type.RANGES => attr.getRanges
+ case Value.Type.SET => attr.getSet
+ case Value.Type.TEXT => attr.getText
+ }
+ (attr.getName, attrValue)
+ }).toMap
+ }
+
+ def createJavaExecCommand(jvmArgs: String = "", classPath: String =
"flink-*.jar",
+ classToExecute: String, args: String = ""):
String = {
+ s"env; java $jvmArgs -cp $classPath $classToExecute $args"
+ }
+
+ def createExecutorInfo(id: String, role: String, artifactURIs:
Set[String], command: String,
+ nativeLibPath: String): ExecutorInfo = {
+ val uris = artifactURIs.map(uri =>
URI.newBuilder().setValue(uri).build())
+ ExecutorInfo.newBuilder()
+ .setExecutorId(ExecutorID
+ .newBuilder()
+ .setValue(s"executor_$id"))
+ .setName(s"Apache Flink Mesos Executor - $id")
+ .setCommand(CommandInfo.newBuilder()
+ .setValue(s"env; $command")
+ .addAllUris(uris)
+ .setEnvironment(Environment.newBuilder()
+ .addVariables(Environment.Variable.newBuilder()
+ .setName("MESOS_NATIVE_JAVA_LIBRARY").setValue(nativeLibPath)))
+ .setValue(command))
+ .build()
+ }
+
+ def createTaskInfo(taskName: String, taskId: TaskID, slaveID: SlaveID,
role: String, mem: Double,
+ cpus: Double, disk: Double, ports: Set[Int],
executorInfo: ExecutorInfo,
+ conf: Configuration): TaskInfo = {
+
+ val portRanges = Ranges.newBuilder().addAllRange(
+ ports.map(port =>
Value.Range.newBuilder().setBegin(port).setEnd(port).build())).build()
+
+ val taskConf = conf.clone()
+ val portsSeq = ports.toSeq
+ // set task manager ports
+ taskConf.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
portsSeq.get(0))
+ taskConf.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
portsSeq.get(1))
+
+ TaskInfo.newBuilder()
+ .setData(ByteString.copyFrom(Utils.serialize(taskConf)))
+ .setExecutor(executorInfo)
+ .setName(taskId.getValue)
+ .setTaskId(taskId)
+ .setSlaveId(slaveID)
+ .addResources(Resource.newBuilder()
+ .setName("ports").setType(RANGES)
+ .setRole(role)
+ .setRanges(portRanges))
+ .addResources(Resource.newBuilder()
+ .setName("mem").setType(SCALAR)
+ .setRole(role)
+ .setScalar(Value.Scalar.newBuilder().setValue(mem)))
+ .addResources(Resource.newBuilder()
+ .setName("cpus").setType(SCALAR)
+ .setRole(role)
+ .setScalar(Value.Scalar.newBuilder().setValue(cpus)))
+ .addResources(Resource.newBuilder()
+ .setName("disk").setType(SCALAR)
+ .setRole(role)
+ .setScalar(Value.Scalar.newBuilder().setValue(disk)))
+ .build()
+ }
+
+
+ def getResource(res: JList[Resource], name: String): Double = {
+ // A resource can have multiple values in the offer since it can
+ // either be from a specific role or wildcard.
+ res.filter(_.getName == name).map(_.getScalar.getValue).sum
+ }
+
+ /**
+ * Match the requirements (if any) to the offer attributes.
+ * if attribute requirements are not specified - return true
+ * else if attribute is defined and no values are given, simple
attribute presence is performed
+ * else if attribute name and value is specified, subset match is
performed on slave attributes
+ *
+ * This code is borrowed and inspired from: Apache Spark:
+ *
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+ */
+ def matchesAttributeRequirements(
+ slaveOfferConstraints: Map[String,
Set[String]],
+ offerAttributes: Map[String,
GeneratedMessage]): Boolean = {
+ slaveOfferConstraints.forall {
+ // offer has the required attribute and subsumes the required values
for that attribute
+ case (name, requiredValues) =>
+ offerAttributes.get(name) match {
+ case None => false
+ case Some(_) if requiredValues.isEmpty => true // empty value
matches presence
+ case Some(scalarValue: Value.Scalar) =>
+ // check if provided values is less than equal to the offered
values
+ requiredValues.map(_.toDouble).exists(_ <=
scalarValue.getValue)
+ case Some(rangeValue: Value.Range) =>
+ val offerRange = rangeValue.getBegin to rangeValue.getEnd
+ // Check if there is some required value that is between the
ranges specified
+ // Note: We only support the ability to specify discrete
values, in the future
+ // we may expand it to subsume ranges specified with a XX..YY
value or something
+ // similar to that.
+ requiredValues.map(_.toLong).exists(offerRange.contains(_))
+ case Some(offeredValue: Value.Set) =>
+ // check if the specified required values is a subset of
offered set
+ requiredValues.subsetOf(offeredValue.getItemList.toSet)
+ case Some(textValue: Value.Text) =>
+ // check if the specified value is equal, if multiple values
are specified
+ // we succeed if any of them match.
+ requiredValues.contains(textValue.getValue)
+ }
+ }
+ }
+
+ /**
+ * Parses the attributes constraints provided to spark and build a
matching data struct:
+ * Map[<attribute-name>, Set[values-to-match]]
+ * The constraints are specified as ';' separated key-value pairs where
keys and values
+ * are separated by ':'. The ':' implies equality (for singular values)
and "is one of" for
+ * multiple values (comma separated). For example:
+ * {{{
+ * parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b")
+ * // would result in
+ * <code>
+ * Map(
+ * "tachyon" -> Set("true"),
+ * "zone": -> Set("us-east-1a", "us-east-1b")
+ * )
+ * }}}
+ *
+ * Mesos documentation:
http://mesos.apache.org/documentation/attributes-resources/
+ * https://github.com/apache/mesos/blob/master/src/common/values.cpp
+ * https://github.com/apache/mesos/blob/master/src/common/attributes.cpp
+ *
+ * This code is borrowed and inspired from: Apache Spark:
+ *
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+ *
+ * @param constraintsVal constaints string consisting of ';' separated
key-value pairs (separated
+ * by ':')
+ * @return Map of constraints to match resources offers.
+ */
+ def parseConstraintString(constraintsVal: String): Map[String,
Set[String]] = {
+ /*
+ Based on mesos docs:
+ attributes : attribute ( ";" attribute )*
+ attribute : labelString ":" ( labelString | "," )+
+ labelString : [a-zA-Z0-9_/.-]
+ */
+ val splitter =
Splitter.on(';').trimResults().withKeyValueSeparator(':')
+ // kv splitter
+ if (constraintsVal.isEmpty) {
+ Map()
+ } else {
+ try {
+ Map() ++ mapAsScalaMap(splitter.split(constraintsVal)).map {
+ case (k, v) =>
+ if (v == null || v.isEmpty) {
+ (k, Set[String]())
+ } else {
+ (k, v.split(',').toSet)
+ }
+ }
+ } catch {
+ case NonFatal(e) =>
+ throw new IllegalArgumentException(s"Bad constraint string:
$constraintsVal", e)
+ }
+ }
+ }
+
+ def createFrameworkInfoAndCredentials(cliConf: Conf): (FrameworkInfo,
Option[Credential]) = {
+ // start scheduler
+ val frameworkBuilder = FrameworkInfo.newBuilder()
+
+ frameworkBuilder.setUser(GlobalConfiguration.getString(
+ MESOS_FRAMEWORK_USER_KEY, DEFAULT_MESOS_FRAMEWORK_USER))
+ val frameworkId = GlobalConfiguration.getString(
+ MESOS_FRAMEWORK_ID_KEY, DEFAULT_MESOS_FRAMEWORK_ID)
+ if (frameworkId != null) {
+
frameworkBuilder.setId(FrameworkID.newBuilder().setValue(frameworkId))
+ }
+
+ frameworkBuilder.setRole(GlobalConfiguration.getString(
+ MESOS_FRAMEWORK_ROLE_KEY, DEFAULT_MESOS_FRAMEWORK_ROLE))
+ frameworkBuilder.setName(GlobalConfiguration.getString(
+ MESOS_FRAMEWORK_NAME_KEY, DEFAULT_MESOS_FRAMEWORK_NAME))
+ val webUIPort =
GlobalConfiguration.getInteger(JOB_MANAGER_WEB_PORT_KEY, -1)
+ if (webUIPort > 0) {
+ val webUIHost = GlobalConfiguration.getString(
+ JOB_MANAGER_IPC_ADDRESS_KEY, cliConf.host)
+ frameworkBuilder.setWebuiUrl(s"http://$webUIHost:$webUIPort")
+ }
+
+ var credsBuilder: Credential.Builder = null
+ val principal = GlobalConfiguration.getString(
+ MESOS_FRAMEWORK_PRINCIPAL_KEY, DEFAULT_MESOS_FRAMEWORK_PRINCIPAL)
+ if (principal != null) {
+ frameworkBuilder.setPrincipal(principal)
+
+ credsBuilder = Credential.newBuilder()
+ credsBuilder.setPrincipal(principal)
+ val secret = GlobalConfiguration.getString(
+ MESOS_FRAMEWORK_SECRET_KEY, DEFAULT_MESOS_FRAMEWORK_SECRET)
+ if (secret != null) {
+ credsBuilder.setSecret(ByteString.copyFromUtf8(secret))
+ }
+ }
+ val credential = if (credsBuilder == null) Some(credsBuilder.build)
else None
--- End diff --
If credsBuilder is null, it creates a Some(..)?
> Integrate Flink with Apache Mesos
> ---------------------------------
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
> Issue Type: New Feature
> Components: New Components
> Reporter: Robert Metzger
> Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> There also is a pending pull request for adding Mesos support for Flink:
> https://github.com/apache/flink/pull/251
> But the PR is insufficiently tested. I'll add the code of the pull request to
> this JIRA in case somebody wants to pick it up in the future.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)