[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14735281#comment-14735281
 ] 

ASF GitHub Bot commented on FLINK-1984:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/948#discussion_r38958292
  
    --- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/SchedulerUtils.scala
 ---
    @@ -0,0 +1,358 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.scheduler
    +
    +import java.util.{List => JList}
    +
    +import scala.collection.JavaConversions._
    +import scala.util.{Failure, Success, Try}
    +import scala.util.control.NonFatal
    +
    +import com.google.common.base.Splitter
    +import com.google.protobuf.{ByteString, GeneratedMessage}
    +import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
    +import org.apache.flink.configuration.ConfigConstants._
    +import org.apache.flink.mesos._
    +import org.apache.flink.mesos.scheduler.FlinkScheduler._
    +import org.apache.flink.runtime.StreamingMode
    +import org.apache.flink.runtime.jobmanager.{JobManager, JobManagerMode}
    +import org.apache.flink.runtime.util.EnvironmentInformation
    +import org.apache.mesos.{MesosSchedulerDriver, Scheduler, SchedulerDriver}
    +import org.apache.mesos.Protos._
    +import org.apache.mesos.Protos.CommandInfo.URI
    +import org.apache.mesos.Protos.Value.Ranges
    +import org.apache.mesos.Protos.Value.Type._
    +
    +/**
    + * This code is borrowed and inspired from Apache Spark Project:
    + *   
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
    + */
    +trait SchedulerUtils {
    +
    +  /**
    +   * Converts the attributes from the resource offer into a Map of name -> 
Attribute Value
    +   * The attribute values are the mesos attribute types and they are
    +   * @param offerAttributes List of attributes sent with an offer
    +   * @return
    +   */
    +  def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, 
GeneratedMessage] = {
    +    offerAttributes.map(attr => {
    +      val attrValue = attr.getType match {
    +        case SCALAR => attr.getScalar
    +        case Value.Type.RANGES => attr.getRanges
    +        case Value.Type.SET => attr.getSet
    +        case Value.Type.TEXT => attr.getText
    +      }
    +      (attr.getName, attrValue)
    +    }).toMap
    +  }
    +
    +  def createJavaExecCommand(jvmArgs: String = "", classPath: String = 
"flink-*.jar",
    +                            classToExecute: String, args: String = ""): 
String = {
    +    s"env; java $jvmArgs -cp $classPath $classToExecute $args"
    +  }
    +
    +  def createExecutorInfo(id: String, role: String, artifactURIs: 
Set[String], command: String,
    +                         nativeLibPath: String): ExecutorInfo = {
    +    val uris = artifactURIs.map(uri => 
URI.newBuilder().setValue(uri).build())
    +    ExecutorInfo.newBuilder()
    +      .setExecutorId(ExecutorID
    +        .newBuilder()
    +        .setValue(s"executor_$id"))
    +      .setName(s"Apache Flink Mesos Executor - $id")
    +      .setCommand(CommandInfo.newBuilder()
    +        .setValue(s"env; $command")
    +        .addAllUris(uris)
    +        .setEnvironment(Environment.newBuilder()
    +          .addVariables(Environment.Variable.newBuilder()
    +            .setName("MESOS_NATIVE_JAVA_LIBRARY").setValue(nativeLibPath)))
    +        .setValue(command))
    +      .build()
    +  }
    +
    +  def createTaskInfo(taskName: String, taskId: TaskID, slaveID: SlaveID, 
role: String, mem: Double,
    +                     cpus: Double, disk: Double, ports: Set[Int], 
executorInfo: ExecutorInfo,
    +                     conf: Configuration): TaskInfo = {
    +
    +    val portRanges = Ranges.newBuilder().addAllRange(
    +      ports.map(port => 
Value.Range.newBuilder().setBegin(port).setEnd(port).build())).build()
    +
    +    val taskConf = conf.clone()
    +    val portsSeq = ports.toSeq
    +    // set task manager ports
    +    taskConf.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 
portsSeq.get(0))
    +    taskConf.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, 
portsSeq.get(1))
    +
    +    TaskInfo.newBuilder()
    +      .setData(ByteString.copyFrom(Utils.serialize(taskConf)))
    +      .setExecutor(executorInfo)
    +      .setName(taskId.getValue)
    +      .setTaskId(taskId)
    +      .setSlaveId(slaveID)
    +      .addResources(Resource.newBuilder()
    +        .setName("ports").setType(RANGES)
    +        .setRole(role)
    +        .setRanges(portRanges))
    +      .addResources(Resource.newBuilder()
    +        .setName("mem").setType(SCALAR)
    +        .setRole(role)
    +        .setScalar(Value.Scalar.newBuilder().setValue(mem)))
    +      .addResources(Resource.newBuilder()
    +        .setName("cpus").setType(SCALAR)
    +        .setRole(role)
    +        .setScalar(Value.Scalar.newBuilder().setValue(cpus)))
    +      .addResources(Resource.newBuilder()
    +        .setName("disk").setType(SCALAR)
    +        .setRole(role)
    +        .setScalar(Value.Scalar.newBuilder().setValue(disk)))
    +      .build()
    +  }
    +
    +
    +  def getResource(res: JList[Resource], name: String): Double = {
    +    // A resource can have multiple values in the offer since it can
    +    // either be from a specific role or wildcard.
    +    res.filter(_.getName == name).map(_.getScalar.getValue).sum
    +  }
    +
    +  /**
    +   * Match the requirements (if any) to the offer attributes.
    +   * if attribute requirements are not specified - return true
    +   * else if attribute is defined and no values are given, simple 
attribute presence is performed
    +   * else if attribute name and value is specified, subset match is 
performed on slave attributes
    +   *
    +   * This code is borrowed and inspired from: Apache Spark:
    +   *  
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
    +   */
    +  def matchesAttributeRequirements(
    +                                    slaveOfferConstraints: Map[String, 
Set[String]],
    +                                    offerAttributes: Map[String, 
GeneratedMessage]): Boolean = {
    +    slaveOfferConstraints.forall {
    +      // offer has the required attribute and subsumes the required values 
for that attribute
    +      case (name, requiredValues) =>
    +        offerAttributes.get(name) match {
    +          case None => false
    +          case Some(_) if requiredValues.isEmpty => true // empty value 
matches presence
    +          case Some(scalarValue: Value.Scalar) =>
    +            // check if provided values is less than equal to the offered 
values
    +            requiredValues.map(_.toDouble).exists(_ <= 
scalarValue.getValue)
    +          case Some(rangeValue: Value.Range) =>
    +            val offerRange = rangeValue.getBegin to rangeValue.getEnd
    +            // Check if there is some required value that is between the 
ranges specified
    +            // Note: We only support the ability to specify discrete 
values, in the future
    +            // we may expand it to subsume ranges specified with a XX..YY 
value or something
    +            // similar to that.
    +            requiredValues.map(_.toLong).exists(offerRange.contains(_))
    +          case Some(offeredValue: Value.Set) =>
    +            // check if the specified required values is a subset of 
offered set
    +            requiredValues.subsetOf(offeredValue.getItemList.toSet)
    +          case Some(textValue: Value.Text) =>
    +            // check if the specified value is equal, if multiple values 
are specified
    +            // we succeed if any of them match.
    +            requiredValues.contains(textValue.getValue)
    +        }
    +    }
    +  }
    +
    +  /**
    +   * Parses the attributes constraints provided to spark and build a 
matching data struct:
    +   * Map[<attribute-name>, Set[values-to-match]]
    +   * The constraints are specified as ';' separated key-value pairs where 
keys and values
    +   * are separated by ':'. The ':' implies equality (for singular values) 
and "is one of" for
    +   * multiple values (comma separated). For example:
    +   * {{{
    +   *  parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b")
    +   *  // would result in
    +   * <code>
    +   * Map(
    +   * "tachyon" -> Set("true"),
    +   * "zone":   -> Set("us-east-1a", "us-east-1b")
    +   * )
    +   * }}}
    +   *
    +   * Mesos documentation: 
http://mesos.apache.org/documentation/attributes-resources/
    +   * https://github.com/apache/mesos/blob/master/src/common/values.cpp
    +   * https://github.com/apache/mesos/blob/master/src/common/attributes.cpp
    +   *
    +   * This code is borrowed and inspired from: Apache Spark:
    +   *  
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
    +   *
    +   * @param constraintsVal constaints string consisting of ';' separated 
key-value pairs (separated
    +   * by ':')
    +   * @return  Map of constraints to match resources offers.
    +   */
    +  def parseConstraintString(constraintsVal: String): Map[String, 
Set[String]] = {
    +    /*
    +      Based on mesos docs:
    +      attributes : attribute ( ";" attribute )*
    +      attribute : labelString ":" ( labelString | "," )+
    +      labelString : [a-zA-Z0-9_/.-]
    +    */
    +    val splitter = 
Splitter.on(';').trimResults().withKeyValueSeparator(':')
    +    // kv splitter
    +    if (constraintsVal.isEmpty) {
    +      Map()
    +    } else {
    +      try {
    +        Map() ++ mapAsScalaMap(splitter.split(constraintsVal)).map {
    +          case (k, v) =>
    +            if (v == null || v.isEmpty) {
    +              (k, Set[String]())
    +            } else {
    +              (k, v.split(',').toSet)
    +            }
    +        }
    +      } catch {
    +        case NonFatal(e) =>
    +          throw new IllegalArgumentException(s"Bad constraint string: 
$constraintsVal", e)
    +      }
    +    }
    +  }
    +
    +  def createFrameworkInfoAndCredentials(cliConf: Conf): (FrameworkInfo, 
Option[Credential]) = {
    +    // start scheduler
    +    val frameworkBuilder = FrameworkInfo.newBuilder()
    +
    +    frameworkBuilder.setUser(GlobalConfiguration.getString(
    +      MESOS_FRAMEWORK_USER_KEY, DEFAULT_MESOS_FRAMEWORK_USER))
    +    val frameworkId = GlobalConfiguration.getString(
    +      MESOS_FRAMEWORK_ID_KEY, DEFAULT_MESOS_FRAMEWORK_ID)
    +    if (frameworkId != null) {
    +      
frameworkBuilder.setId(FrameworkID.newBuilder().setValue(frameworkId))
    +    }
    +
    +    frameworkBuilder.setRole(GlobalConfiguration.getString(
    +      MESOS_FRAMEWORK_ROLE_KEY, DEFAULT_MESOS_FRAMEWORK_ROLE))
    +    frameworkBuilder.setName(GlobalConfiguration.getString(
    +      MESOS_FRAMEWORK_NAME_KEY, DEFAULT_MESOS_FRAMEWORK_NAME))
    +    val webUIPort = 
GlobalConfiguration.getInteger(JOB_MANAGER_WEB_PORT_KEY, -1)
    +    if (webUIPort > 0) {
    +      val webUIHost = GlobalConfiguration.getString(
    +        JOB_MANAGER_IPC_ADDRESS_KEY, cliConf.host)
    +      frameworkBuilder.setWebuiUrl(s"http://$webUIHost:$webUIPort";)
    +    }
    +
    +    var credsBuilder: Credential.Builder = null
    +    val principal = GlobalConfiguration.getString(
    +      MESOS_FRAMEWORK_PRINCIPAL_KEY, DEFAULT_MESOS_FRAMEWORK_PRINCIPAL)
    +    if (principal != null) {
    +      frameworkBuilder.setPrincipal(principal)
    +
    +      credsBuilder = Credential.newBuilder()
    +      credsBuilder.setPrincipal(principal)
    +      val secret = GlobalConfiguration.getString(
    +        MESOS_FRAMEWORK_SECRET_KEY, DEFAULT_MESOS_FRAMEWORK_SECRET)
    +      if (secret != null) {
    +        credsBuilder.setSecret(ByteString.copyFromUtf8(secret))
    +      }
    +    }
    +    val credential = if (credsBuilder == null) Some(credsBuilder.build) 
else None
    --- End diff --
    
    I guess that's the cause of this exception
    ```
    Exception in thread "main" java.lang.NullPointerException
        at 
org.apache.flink.mesos.scheduler.SchedulerUtils$class.createFrameworkInfoAndCredentials(SchedulerUtils.scala:265)
        at 
org.apache.flink.mesos.scheduler.FlinkScheduler$.createFrameworkInfoAndCredentials(FlinkScheduler.scala:33)
        at 
org.apache.flink.mesos.scheduler.FlinkScheduler$.main(FlinkScheduler.scala:209)
        at 
org.apache.flink.mesos.scheduler.FlinkScheduler.main(FlinkScheduler.scala)
    
    ```


> Integrate Flink with Apache Mesos
> ---------------------------------
>
>                 Key: FLINK-1984
>                 URL: https://issues.apache.org/jira/browse/FLINK-1984
>             Project: Flink
>          Issue Type: New Feature
>          Components: New Components
>            Reporter: Robert Metzger
>            Priority: Minor
>         Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> There also is a pending pull request for adding Mesos support for Flink: 
> https://github.com/apache/flink/pull/251
> But the PR is insufficiently tested. I'll add the code of the pull request to 
> this JIRA in case somebody wants to pick it up in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to