[
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15426395#comment-15426395
]
ASF GitHub Bot commented on FLINK-1984:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2315#discussion_r75303392
--- Diff:
flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
---
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import java.util.{Collections, UUID}
+import java.util.concurrent.atomic.AtomicReference
+
+import akka.actor.FSM.StateTimeout
+import akka.testkit._
+import com.netflix.fenzo.TaskRequest.{AssignedResources,
NamedResourceSetRequest}
+import com.netflix.fenzo._
+import com.netflix.fenzo.functions.{Action1, Action2}
+import com.netflix.fenzo.plugins.VMLeaseObject
+import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.mesos.scheduler.LaunchCoordinator._
+import org.apache.flink.mesos.scheduler.messages._
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.mesos.Protos.{SlaveID, TaskInfo}
+import org.apache.mesos.{SchedulerDriver, Protos}
+import org.junit.runner.RunWith
+import org.mockito.Mockito.{verify, _}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.{Matchers => MM, Mockito}
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import scala.collection.JavaConverters._
+
+import org.apache.flink.mesos.Utils.range
+import org.apache.flink.mesos.Utils.ranges
+import org.apache.flink.mesos.Utils.scalar
+
+@RunWith(classOf[JUnitRunner])
+class LaunchCoordinatorTest
+ extends TestKitBase
+ with ImplicitSender
+ with WordSpecLike
+ with Matchers
+ with BeforeAndAfterAll {
+
+ lazy val config = new Configuration()
+ implicit lazy val system = AkkaUtils.createLocalActorSystem(config)
+
+ override def afterAll(): Unit = {
+ TestKit.shutdownActorSystem(system)
+ }
+
+ def randomFramework = {
+
Protos.FrameworkID.newBuilder().setValue(UUID.randomUUID.toString).build
+ }
+
+ def randomTask = {
+ val taskID =
Protos.TaskID.newBuilder.setValue(UUID.randomUUID.toString).build
+
+ def generateTaskRequest = {
+ new TaskRequest() {
+ private[mesos] val assignedResources = new
AtomicReference[TaskRequest.AssignedResources]
+ override def getId: String = taskID.getValue
+ override def taskGroupName: String = ""
+ override def getCPUs: Double = 1.0
+ override def getMemory: Double = 1024.0
+ override def getNetworkMbps: Double = 0.0
+ override def getDisk: Double = 0.0
+ override def getPorts: Int = 1
+ override def getCustomNamedResources: java.util.Map[String,
NamedResourceSetRequest] =
+ Collections.emptyMap[String, NamedResourceSetRequest]
+ override def getSoftConstraints: java.util.List[_ <:
VMTaskFitnessCalculator] = null
+ override def getHardConstraints: java.util.List[_ <:
ConstraintEvaluator] = null
+ override def getAssignedResources: AssignedResources =
assignedResources.get()
+ override def setAssignedResources(assignedResources:
AssignedResources): Unit = {
+ this.assignedResources.set(assignedResources)
+ }
+ }
+ }
+
+ val task: LaunchableTask = new LaunchableTask() {
+ override def taskRequest: TaskRequest = generateTaskRequest
+ override def launch(slaveId: SlaveID, taskAssignment:
TaskAssignmentResult): Protos.TaskInfo = {
+ Protos.TaskInfo.newBuilder
+ .setTaskId(taskID).setName(taskID.getValue)
+ .setCommand(Protos.CommandInfo.newBuilder.setValue("whoami"))
+ .setSlaveId(slaveId)
+ .build()
+ }
+ override def toString = taskRequest.getId
+ }
+
+ (taskID, task)
+ }
+
+ def randomSlave = {
+ val slaveID =
Protos.SlaveID.newBuilder.setValue(UUID.randomUUID.toString).build
+ val hostname = s"host-${slaveID.getValue}"
+ (slaveID, hostname)
+ }
+
+ def randomOffer(frameworkID: Protos.FrameworkID, slave: (Protos.SlaveID,
String)) = {
+ val offerID =
Protos.OfferID.newBuilder().setValue(UUID.randomUUID.toString)
+ Protos.Offer.newBuilder()
+ .setFrameworkId(frameworkID)
+ .setId(offerID)
+ .setSlaveId(slave._1)
+ .setHostname(slave._2)
+ .addResources(scalar("cpus", 0.75))
+ .addResources(scalar("mem", 4096.0))
+ .addResources(scalar("disk", 1024.0))
+ .addResources(ranges("ports", range(9000, 9001)))
+ .build()
+ }
+
+ def lease(offer: Protos.Offer) = {
+ new VMLeaseObject(offer)
+ }
+
+ /**
+ * Mock a successful task assignment result matching a task to an offer.
+ */
+ def taskAssignmentResult(lease: VirtualMachineLease, task: TaskRequest):
TaskAssignmentResult = {
+ val ports = lease.portRanges().get(0)
+ val r = mock(classOf[TaskAssignmentResult])
+ when(r.getTaskId).thenReturn(task.getId)
+ when(r.getHostname).thenReturn(lease.hostname())
+ when(r.getAssignedPorts).thenReturn(
+ (ports.getBeg to ports.getBeg +
task.getPorts).toList.asJava.asInstanceOf[java.util.List[Integer]])
+ when(r.getRequest).thenReturn(task)
+ when(r.isSuccessful).thenReturn(true)
+ when(r.getFitness).thenReturn(1.0)
+ r
+ }
+
+ /**
+ * Mock a VM assignment result with the given leases and tasks.
+ */
+ def vmAssignmentResult(hostname: String,
+ leasesUsed: Seq[VirtualMachineLease],
+ tasksAssigned: Set[TaskAssignmentResult]):
VMAssignmentResult = {
+ new VMAssignmentResult(hostname, leasesUsed.asJava,
tasksAssigned.asJava)
+ }
+
+ /**
+ * Mock a scheduling result with the given successes and failures.
+ */
+ def schedulingResult(successes: Seq[VMAssignmentResult],
+ failures: Seq[TaskAssignmentResult] = Nil,
+ exceptions: Seq[Exception] = Nil,
+ leasesAdded: Int = 0,
+ leasesRejected: Int = 0): SchedulingResult = {
+ val r = mock(classOf[SchedulingResult])
+ when(r.getResultMap).thenReturn(successes.map(r => r.getHostname ->
r).toMap.asJava)
+ when(r.getExceptions).thenReturn(exceptions.asJava)
+ val groupedFailures =
failures.groupBy(_.getRequest).mapValues(_.asJava)
+ when(r.getFailures).thenReturn(groupedFailures.asJava)
+ when(r.getLeasesAdded).thenReturn(leasesAdded)
+ when(r.getLeasesRejected).thenReturn(leasesRejected)
+ when(r.getRuntime).thenReturn(0)
+ when(r.getNumAllocations).thenThrow(new NotImplementedError())
+ when(r.getTotalVMsCount).thenThrow(new NotImplementedError())
+ when(r.getIdleVMsCount).thenThrow(new NotImplementedError())
+ r
+ }
+
+
+ /**
+ * Mock a task scheduler.
+ * The task assigner/unassigner is pre-wired.
+ */
+ def taskScheduler() = {
+ val optimizer = mock(classOf[TaskScheduler])
+ val taskAssigner = mock(classOf[Action2[TaskRequest, String]])
+ when[Action2[TaskRequest,
String]](optimizer.getTaskAssigner).thenReturn(taskAssigner)
+ val taskUnassigner = mock(classOf[Action2[String, String]])
+ when[Action2[String,
String]](optimizer.getTaskUnAssigner).thenReturn(taskUnassigner)
+ optimizer
+ }
+
+ /**
+ * Create a task scheduler builder.
+ */
+ def taskSchedulerBuilder(optimizer: TaskScheduler) = new
TaskSchedulerBuilder {
+ var leaseRejectAction: Action1[VirtualMachineLease] = null
+ override def withLeaseRejectAction(action:
Action1[VirtualMachineLease]): TaskSchedulerBuilder = {
+ leaseRejectAction = action
+ this
+ }
+ override def build(): TaskScheduler = optimizer
+ }
+
+ /**
+ * Process a call to scheduleOnce with the given function.
+ */
+ def scheduleOnce(f: (Seq[TaskRequest],Seq[VirtualMachineLease]) =>
SchedulingResult) = {
+ new Answer[SchedulingResult] {
+ override def answer(invocationOnMock: InvocationOnMock):
SchedulingResult = {
+ val args = invocationOnMock.getArguments
+ val requests = args(0).asInstanceOf[java.util.List[TaskRequest]]
+ val newLeases =
args(1).asInstanceOf[java.util.List[VirtualMachineLease]]
+ f(requests.asScala, newLeases.asScala)
+ }
+ }
+ }
+
+ /**
+ * The context fixture.
+ */
+ class Context {
+ val optimizer = taskScheduler()
+ val optimizerBuilder = taskSchedulerBuilder(optimizer)
+ val schedulerDriver = mock(classOf[SchedulerDriver])
+ val trace = Mockito.inOrder(schedulerDriver)
+ val fsm = TestFSMRef(new LaunchCoordinator(testActor, config,
schedulerDriver, optimizerBuilder))
+
+ val framework = randomFramework
+ val task1 = randomTask
+ val task2 = randomTask
+ val task3 = randomTask
+
+ val slave1 = {
+ val slave = randomSlave
+ (slave._1, slave._2, randomOffer(framework, slave),
randomOffer(framework, slave), randomOffer(framework, slave))
+ }
+
+ val slave2 = {
+ val slave = randomSlave
+ (slave._1, slave._2, randomOffer(framework, slave),
randomOffer(framework, slave), randomOffer(framework, slave))
+ }
+ }
+
+ def inState = afterWord("in state")
+ def handle = afterWord("handle")
+
+ def handlesAssignments(state: TaskState) = {
+ "Unassign" which {
+ s"stays in $state with updated optimizer state" in new Context {
--- End diff --
Very nice testing although I'm not much of a fan of the Scala `WordSpec` :)
> Integrate Flink with Apache Mesos
> ---------------------------------
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
> Issue Type: New Feature
> Components: Cluster Management
> Reporter: Robert Metzger
> Assignee: Eron Wright
> Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-:
> https://github.com/apache/flink/pull/251
> Update (May '16): a new effort is now underway, building on the recent
> ResourceManager work.
> Design document: ([google
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)