http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala deleted file mode 100644 index a48f887..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.task - -import akka.actor.{ExtendedActorSystem, Props} -import akka.testkit._ -import com.typesafe.config.{Config, ConfigFactory} -import org.mockito.Mockito.{mock, times, verify, when} -import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec} - -import io.gearpump.Message -import io.gearpump.cluster.{MasterHarness, TestUtil, UserConfig} -import io.gearpump.partitioner.{HashPartitioner, Partitioner} -import io.gearpump.serializer.{FastKryoSerializer, SerializationFramework} -import io.gearpump.streaming.AppMasterToExecutor.{ChangeTask, MsgLostException, StartTask, TaskChanged, TaskRegistered} -import io.gearpump.streaming.task.TaskActorSpec.TestTask -import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription} -import io.gearpump.util.Graph._ -import io.gearpump.util.{Graph, Util} - -class TaskActorSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness { - protected override def config: Config = { - ConfigFactory.parseString( - """ akka.loggers = ["akka.testkit.TestEventListener"] - | akka.test.filter-leeway = 20000 - """.stripMargin). - withFallback(TestUtil.DEFAULT_CONFIG) - } - - val appId = 0 - val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask].getName, parallelism = 1) - val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask].getName, parallelism = 1) - val dag: DAG = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2)) - val taskId1 = TaskId(0, 0) - val taskId2 = TaskId(1, 0) - val executorId1 = 1 - val executorId2 = 2 - - var mockMaster: TestProbe = null - var taskContext1: TaskContextData = null - - var mockSerializerPool: SerializationFramework = null - - override def beforeEach(): Unit = { - startActorSystem() - mockMaster = TestProbe()(getActorSystem) - - mockSerializerPool = mock(classOf[SerializationFramework]) - val serializer = new FastKryoSerializer(getActorSystem.asInstanceOf[ExtendedActorSystem]) - when(mockSerializerPool.get()).thenReturn(serializer) - - taskContext1 = TaskContextData(executorId1, appId, - "appName", mockMaster.ref, 1, - LifeTime.Immortal, - Subscriber.of(processorId = 0, dag)) - } - - "TaskActor" should { - "register itself to AppMaster when started" in { - val mockTask = mock(classOf[TaskWrapper]) - val testActor = TestActorRef[TaskActor](Props( - new TaskActor(taskId1, taskContext1, UserConfig.empty, - mockTask, mockSerializerPool)))(getActorSystem) - testActor ! TaskRegistered(taskId1, 0, Util.randInt()) - testActor ! StartTask(taskId1) - - implicit val system = getActorSystem - val ack = Ack(taskId2, 100, 99, testActor.underlyingActor.sessionId) - EventFilter[MsgLostException](occurrences = 1) intercept { - testActor ! ack - } - } - - "respond to ChangeTask" in { - val mockTask = mock(classOf[TaskWrapper]) - val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, taskContext1, - UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem) - testActor ! TaskRegistered(taskId1, 0, Util.randInt()) - testActor ! StartTask(taskId1) - mockMaster.expectMsgType[GetUpstreamMinClock] - - mockMaster.send(testActor, ChangeTask(taskId1, 1, LifeTime.Immortal, List.empty[Subscriber])) - mockMaster.expectMsgType[TaskChanged] - } - - "handle received message correctly" in { - val mockTask = mock(classOf[TaskWrapper]) - val msg = Message("test") - - val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, taskContext1, - UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem) - testActor.tell(TaskRegistered(taskId1, 0, Util.randInt()), mockMaster.ref) - testActor.tell(StartTask(taskId1), mockMaster.ref) - - testActor.tell(msg, testActor) - - verify(mockTask, times(1)).onNext(msg) - } - } - - override def afterEach(): Unit = { - shutdownActorSystem() - } -} - -object TaskActorSpec { - class TestTask -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala new file mode 100644 index 0000000..5f4faee --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming + +import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.task.TaskId +import org.apache.gearpump.util.Graph +import org.apache.gearpump.util.Graph.Node +import org.scalacheck.Gen +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class DAGSpec extends PropSpec with PropertyChecks with Matchers { + + val parallelismGen = Gen.chooseNum[Int](1, 100) + + property("DAG should be built correctly for a single task") { + forAll(parallelismGen) { (parallelism: Int) => + val task = ProcessorDescription(id = 0, taskClass = "task", parallelism = parallelism) + val graph = Graph[ProcessorDescription, PartitionerDescription](task) + val dag = DAG(graph) + dag.processors.size shouldBe 1 + assert(dag.taskCount == parallelism) + dag.tasks.sortBy(_.index) shouldBe (0 until parallelism).map(index => TaskId(0, index)) + dag.graph.edges shouldBe empty + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala new file mode 100644 index 0000000..f6f6af2 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming + +import org.jboss.netty.buffer.{ChannelBufferOutputStream, ChannelBuffers} +import org.scalatest.{Matchers, WordSpec} + +import org.apache.gearpump.streaming.task._ +import org.apache.gearpump.transport.netty.WrappedChannelBuffer + +class MessageSerializerSpec extends WordSpec with Matchers { + + def testSerializer[T](obj: T, taskMessageSerializer: TaskMessageSerializer[T]): T = { + val length = taskMessageSerializer.getLength(obj) + val bout = new ChannelBufferOutputStream(ChannelBuffers.buffer(length)) + taskMessageSerializer.write(bout, obj) + val input = new WrappedChannelBuffer(ChannelBuffers.wrappedBuffer(bout.buffer().array())) + taskMessageSerializer.read(input) + } + + "SerializedMessageSerializer" should { + "serialize and deserialize SerializedMessage properly" in { + val serializer = new SerializedMessageSerializer + val data = new Array[Byte](256) + new java.util.Random().nextBytes(data) + val msg = SerializedMessage(1024, data) + val result = testSerializer(msg, serializer) + assert(result.timeStamp == msg.timeStamp && result.bytes.sameElements(msg.bytes)) + } + } + + "TaskIdSerializer" should { + "serialize and deserialize TaskId properly" in { + val taskIdSerializer = new TaskIdSerializer + val taskId = TaskId(1, 3) + assert(testSerializer(taskId, taskIdSerializer).equals(taskId)) + } + } + + "AckRequestSerializer" should { + "serialize and deserialize AckRequest properly" in { + val serializer = new AckRequestSerializer + val ackRequest = AckRequest(TaskId(1, 2), 1000, 1024) + assert(testSerializer(ackRequest, serializer).equals(ackRequest)) + } + } + + "InitialAckRequestSerializer" should { + "serialize and deserialize AckRequest properly" in { + val serializer = new InitialAckRequestSerializer + val ackRequest = InitialAckRequest(TaskId(1, 2), 1024) + assert(testSerializer(ackRequest, serializer).equals(ackRequest)) + } + } + + "AckSerializer" should { + "serialize and deserialize Ack properly" in { + val serializer = new AckSerializer + val ack = Ack(TaskId(1, 2), 1024, 1023, 1799) + assert(testSerializer(ack, serializer).equals(ack)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/MockUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/MockUtil.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/MockUtil.scala new file mode 100644 index 0000000..f24c024 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/MockUtil.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming + +import akka.actor.{Actor, ActorSystem} +import akka.testkit.TestActorRef +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.streaming.task.{TaskContext, TaskId} +import org.mockito.{ArgumentMatcher, Matchers, Mockito} + +object MockUtil { + + lazy val system: ActorSystem = ActorSystem("mockUtil", TestUtil.DEFAULT_CONFIG) + + def mockTaskContext: TaskContext = { + val context = Mockito.mock(classOf[TaskContext]) + Mockito.when(context.self).thenReturn(Mockito.mock(classOf[TestActorRef[Actor]])) + Mockito.when(context.system).thenReturn(system) + Mockito.when(context.parallelism).thenReturn(1) + Mockito.when(context.taskId).thenReturn(TaskId(0, 0)) + context + } + + def argMatch[T](func: T => Boolean): T = { + Matchers.argThat(new ArgumentMatcher[T] { + override def matches(param: Any): Boolean = { + val mesage = param.asInstanceOf[T] + func(mesage) + } + }) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala new file mode 100644 index 0000000..f4fffb5 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming + +import akka.actor._ +import akka.testkit.TestActorRef +import org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster +import org.apache.gearpump.cluster.appmaster.AppMasterRuntimeInfo +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, MiniCluster, UserConfig} +import org.apache.gearpump.streaming.appmaster.AppMaster +import org.apache.gearpump.util.Graph + +object StreamingTestUtil { + private var executorId = 0 + val testUserName = "testuser" + + def startAppMaster(miniCluster: MiniCluster, appId: Int): TestActorRef[AppMaster] = { + + implicit val actorSystem = miniCluster.system + val masterConf = AppMasterContext(appId, testUserName, Resource(1), null, + None, miniCluster.mockMaster, AppMasterRuntimeInfo(appId, appName = appId.toString)) + + val app = StreamApplication("test", Graph.empty, UserConfig.empty) + val appDescription = AppDescription(app.name, app.appMaster.getName, app.userConfig) + val props = Props(new AppMaster(masterConf, appDescription)) + val appMaster = miniCluster.launchActor(props).asInstanceOf[TestActorRef[AppMaster]] + val registerAppMaster = RegisterAppMaster(appMaster, masterConf.registerData) + miniCluster.mockMaster.tell(registerAppMaster, appMaster) + + appMaster + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala new file mode 100644 index 0000000..38a5cf1 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.appmaster + +import akka.actor.{ActorRef, Props} +import akka.testkit.{TestActorRef, TestProbe} +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.AppMasterToMaster._ +import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor +import org.apache.gearpump.cluster.ClientToMaster.ShutdownApplication +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated} +import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected +import org.apache.gearpump.cluster._ +import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo} +import org.apache.gearpump.cluster.master.MasterProxy +import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.jarstore.FilePath +import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.task.{StartTime, TaskContext, _} +import org.apache.gearpump.streaming.{Processor, StreamApplication} +import org.apache.gearpump.util.Graph +import org.apache.gearpump.util.Graph._ +import org.scalatest._ + +import scala.concurrent.duration._ + +class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness { + protected override def config = TestUtil.DEFAULT_CONFIG + + var appMaster: ActorRef = null + + val appId = 0 + val workerId = WorkerId(1, 0L) + val resource = Resource(1) + val taskDescription1 = Processor[TaskA](2) + val taskDescription2 = Processor[TaskB](2) + val partitioner = new HashPartitioner + var conf: UserConfig = null + + var mockTask: TestProbe = null + + var mockMaster: TestProbe = null + var mockMasterProxy: ActorRef = null + + var mockWorker: TestProbe = null + var appDescription: AppDescription = null + var appMasterContext: AppMasterContext = null + var appMasterRuntimeInfo: AppMasterRuntimeInfo = null + + override def beforeEach(): Unit = { + startActorSystem() + + mockTask = TestProbe()(getActorSystem) + + mockMaster = TestProbe()(getActorSystem) + mockWorker = TestProbe()(getActorSystem) + mockMaster.ignoreMsg(ignoreSaveAppData) + appMasterRuntimeInfo = AppMasterRuntimeInfo(appId, appName = appId.toString) + + implicit val system = getActorSystem + conf = UserConfig.empty.withValue(AppMasterSpec.MASTER, mockMaster.ref) + val mockJar = AppJar("for_test", FilePath("path")) + appMasterContext = AppMasterContext(appId, "test", resource, null, Some(mockJar), + mockMaster.ref, appMasterRuntimeInfo) + val graph = Graph(taskDescription1 ~ partitioner ~> taskDescription2) + val streamApp = StreamApplication("test", graph, conf) + appDescription = Application.ApplicationToAppDescription(streamApp) + import scala.concurrent.duration._ + mockMasterProxy = getActorSystem.actorOf(Props(new MasterProxy(List(mockMaster.ref.path), + 30.seconds)), AppMasterSpec.MOCK_MASTER_PROXY) + TestActorRef[AppMaster]( + AppMasterRuntimeEnvironment.props(List(mockMasterProxy.path), appDescription, + appMasterContext))(getActorSystem) + + val registerAppMaster = mockMaster.receiveOne(15.seconds) + assert(registerAppMaster.isInstanceOf[RegisterAppMaster]) + appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster + + mockMaster.reply(AppMasterRegistered(appId)) + mockMaster.expectMsg(15.seconds, GetAppData(appId, "DAG")) + mockMaster.reply(GetAppDataResult("DAG", null)) + mockMaster.expectMsg(15.seconds, GetAppData(appId, "startClock")) + + mockMaster.reply(GetAppDataResult("startClock", 0L)) + + mockMaster.expectMsg(15.seconds, RequestResource(appId, ResourceRequest(Resource(4), + workerId = WorkerId.unspecified))) + } + + override def afterEach(): Unit = { + shutdownActorSystem() + } + + "AppMaster" should { + "kill it self when allocate resource time out" in { + mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(2), + mockWorker.ref, workerId)))) + mockMaster.expectMsg(60.seconds, ShutdownApplication(appId)) + } + + "reschedule the resource when the worker reject to start executor" in { + val resource = Resource(4) + mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, + mockWorker.ref, workerId)))) + mockWorker.expectMsgClass(classOf[LaunchExecutor]) + mockWorker.reply(ExecutorLaunchRejected("")) + mockMaster.expectMsg(RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))) + } + + "find a new master when lost connection with master" in { + + val watcher = TestProbe()(getActorSystem) + watcher.watch(mockMasterProxy) + getActorSystem.stop(mockMasterProxy) + watcher.expectTerminated(mockMasterProxy) + // Make sure the parent of mockMasterProxy has received the Terminated message. + // Issus address: https://github.com/gearpump/gearpump/issues/1919 + Thread.sleep(2000) + + import scala.concurrent.duration._ + mockMasterProxy = getActorSystem.actorOf(Props(new MasterProxy(List(mockMaster.ref.path), + 30.seconds)), AppMasterSpec.MOCK_MASTER_PROXY) + mockMaster.expectMsgClass(15.seconds, classOf[RegisterAppMaster]) + } + + // // TODO: This test is failing on Travis randomly + // // We have not identifed the root cause. + // // Check: https://travis-ci.org/intel-hadoop/gearpump/builds/56826843 + // // Issue tracker: https://github.com/intel-hadoop/gearpump/issues/733 + // + // "launch executor and task properly" in { + // mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(4), mockWorker.ref, + // workerId)))) + // mockWorker.expectMsgClass(classOf[LaunchExecutor]) + // + // val workerSystem = ActorSystem("worker", TestUtil.DEFAULT_CONFIG) + // mockWorker.reply(RegisterActorSystem(ActorUtil.getSystemAddress(workerSystem).toString)) + // for (i <- 1 to 4) { + // mockMaster.expectMsg(10 seconds, AppMasterSpec.TaskStarted) + // } + // + // // clock status: task(0,0) -> 1, task(0,1)->0, task(1, 0)->0, task(1,1)->0 + // appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref) + // + // // there is no further upstream, so the upstreamMinClock is Long.MaxValue + // mockTask.expectMsg(UpstreamMinClock(Long.MaxValue)) + // + // // check min clock + // appMaster.tell(GetLatestMinClock, mockTask.ref) + // mockTask.expectMsg(LatestMinClock(0)) + // + // + // // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, task(1,1)->0 + // appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref) + // + // // there is no further upstream, so the upstreamMinClock is Long.MaxValue + // mockTask.expectMsg(UpstreamMinClock(Long.MaxValue)) + // + // // check min clock + // appMaster.tell(GetLatestMinClock, mockTask.ref) + // mockTask.expectMsg(LatestMinClock(0)) + // + // // Clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->0 + // appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref) + // + // // Min clock of processor 0 (Task(0, 0) and Task(0, 1)) + // mockTask.expectMsg(UpstreamMinClock(1)) + // + // // check min clock + // appMaster.tell(GetLatestMinClock, mockTask.ref) + // mockTask.expectMsg(LatestMinClock(0)) + // + // // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->1 + // appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref) + // + // // min clock of processor 0 (Task(0, 0) and Task(0, 1)) + // mockTask.expectMsg(UpstreamMinClock(1)) + // + // // check min clock + // appMaster.tell(GetLatestMinClock, mockTask.ref) + // mockTask.expectMsg(LatestMinClock(1)) + // + // // shutdown worker and all executor on this work, expect appmaster to ask + // // for new resources + // workerSystem.shutdown() + // mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(4), relaxation = + // Relaxation.ONEWORKER))) + // } + } + + def ignoreSaveAppData: PartialFunction[Any, Boolean] = { + case msg: SaveAppData => true + } +} + +object AppMasterSpec { + val MASTER = "master" + case object TaskStarted + + val MOCK_MASTER_PROXY = "mockMasterProxy" +} + +class TaskA(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { + + val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get + override def onStart(startTime: StartTime): Unit = { + master ! AppMasterSpec.TaskStarted + } + + override def onNext(msg: Message): Unit = {} +} + +class TaskB(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { + + val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get + override def onStart(startTime: StartTime): Unit = { + master ! AppMasterSpec.TaskStarted + } + + override def onNext(msg: Message): Unit = {} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala new file mode 100644 index 0000000..2bb33b7 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.appmaster + +import akka.actor.{ActorSystem, Props} +import akka.testkit.{ImplicitSender, TestKit, TestProbe} +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription} +import org.apache.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess, HealthChecker, ProcessorClock} +import org.apache.gearpump.streaming.appmaster.ClockServiceSpec.Store +import org.apache.gearpump.streaming.storage.AppDataStore +import org.apache.gearpump.streaming.task.{GetLatestMinClock, GetStartClock, UpstreamMinClock, _} +import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription} +import org.apache.gearpump.util.Graph +import org.apache.gearpump.util.Graph._ +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + +import scala.concurrent.{Future, Promise} + +class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender + with WordSpecLike with Matchers with BeforeAndAfterAll { + + def this() = this(ActorSystem("ClockServiceSpec", TestUtil.DEFAULT_CONFIG)) + + val hash = Partitioner[HashPartitioner] + val task1 = ProcessorDescription(id = 0, taskClass = classOf[TaskActor].getName, parallelism = 1) + val task2 = ProcessorDescription(id = 1, taskClass = classOf[TaskActor].getName, parallelism = 1) + val dag = DAG(Graph(task1 ~ hash ~> task2)) + + override def afterAll { + TestKit.shutdownActorSystem(system) + } + + "The ClockService" should { + "maintain a global view of message timestamp in the application" in { + val store = new Store() + val startClock = 100L + store.put(ClockService.START_CLOCK, startClock) + val clockService = system.actorOf(Props(new ClockService(dag, store))) + clockService ! GetLatestMinClock + expectMsg(LatestMinClock(startClock)) + + // task(0,0): clock(101); task(1,0): clock(100) + clockService ! UpdateClock(TaskId(0, 0), 101) + + // There is no upstream, so pick Long.MaxValue + expectMsg(UpstreamMinClock(Long.MaxValue)) + + // Min clock is updated + clockService ! GetLatestMinClock + expectMsg(LatestMinClock(100)) + + // task(0,0): clock(101); task(1,0): clock(101) + clockService ! UpdateClock(TaskId(1, 0), 101) + + // Upstream is Task(0, 0), 101 + expectMsg(UpstreamMinClock(101)) + + // Min clock is updated + clockService ! GetLatestMinClock + expectMsg(LatestMinClock(101)) + } + + "act on ChangeToNewDAG and make sure downstream clock smaller than upstreams" in { + val store = new Store() + val startClock = 100L + store.put(ClockService.START_CLOCK, startClock) + val clockService = system.actorOf(Props(new ClockService(dag, store))) + val task = TestProbe() + clockService.tell(UpdateClock(TaskId(0, 0), 200), task.ref) + task.expectMsgType[UpstreamMinClock] + + val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName, + parallelism = 1) + val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName, + parallelism = 1) + val task5 = ProcessorDescription(id = 5, taskClass = classOf[TaskActor].getName, + parallelism = 1) + val dagAddMiddleNode = DAG(Graph( + task1 ~ hash ~> task2, + task1 ~ hash ~> task3, + task3 ~ hash ~> task2, + task2 ~ hash ~> task4, + task5 ~ hash ~> task1 + )) + val user = TestProbe() + clockService.tell(ChangeToNewDAG(dagAddMiddleNode), user.ref) + + val clocks = user.expectMsgPF() { + case ChangeToNewDAGSuccess(clocks) => + clocks + } + + // For intermediate task, pick its upstream as initial clock + assert(clocks(task3.id) == clocks(task1.id)) + + // For sink task, pick its upstream as initial clock + assert(clocks(task4.id) == clocks(task2.id)) + + // For source task, set the initial clock as startClock + assert(clocks(task5.id) == startClock) + } + + "maintain global checkpoint time" in { + val store = new Store() + val startClock = 100L + store.put(ClockService.START_CLOCK, startClock) + val clockService = system.actorOf(Props(new ClockService(dag, store))) + clockService ! UpdateClock(TaskId(0, 0), 200L) + expectMsgType[UpstreamMinClock] + clockService ! UpdateClock(TaskId(1, 0), 200L) + expectMsgType[UpstreamMinClock] + + clockService ! GetStartClock + expectMsg(StartClock(200L)) + + val conf = UserConfig.empty.withBoolean("state.checkpoint.enable", true) + val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName, + parallelism = 1, taskConf = conf) + val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName, + parallelism = 1, taskConf = conf) + val dagWithStateTasks = DAG(Graph( + task1 ~ hash ~> task2, + task1 ~ hash ~> task3, + task3 ~ hash ~> task2, + task2 ~ hash ~> task4 + ), version = 1) + + val taskId3 = TaskId(3, 0) + val taskId4 = TaskId(4, 0) + + clockService ! ChangeToNewDAG(dagWithStateTasks) + expectMsgType[ChangeToNewDAGSuccess] + + clockService ! ReportCheckpointClock(taskId3, startClock) + clockService ! ReportCheckpointClock(taskId4, startClock) + clockService ! GetStartClock + expectMsg(StartClock(startClock)) + + clockService ! ReportCheckpointClock(taskId3, 200L) + clockService ! ReportCheckpointClock(taskId4, 300L) + clockService ! GetStartClock + expectMsg(StartClock(startClock)) + + clockService ! ReportCheckpointClock(taskId3, 300L) + clockService ! GetStartClock + expectMsg(StartClock(300L)) + } + } + + "ProcessorClock" should { + "maintain the min clock of current processor" in { + val processorId = 0 + val parallism = 3 + val clock = new ProcessorClock(processorId, LifeTime.Immortal, parallism) + clock.init(100L) + clock.updateMinClock(0, 101) + assert(clock.min == 100L) + + clock.updateMinClock(1, 102) + assert(clock.min == 100L) + + clock.updateMinClock(2, 103) + assert(clock.min == 101L) + } + } + + "HealthChecker" should { + "report stalling if the clock is not advancing" in { + val healthChecker = new HealthChecker(stallingThresholdSeconds = 1) + val source = ProcessorDescription(id = 0, taskClass = null, parallelism = 1) + val sourceClock = new ProcessorClock(0, LifeTime.Immortal, 1) + sourceClock.init(0L) + val sink = ProcessorDescription(id = 1, taskClass = null, parallelism = 1) + val sinkClock = new ProcessorClock(1, LifeTime.Immortal, 1) + sinkClock.init(0L) + val graph = Graph.empty[ProcessorDescription, PartitionerDescription] + graph.addVertex(source) + graph.addVertex(sink) + graph.addEdge(source, PartitionerDescription(null), sink) + val dag = DAG(graph) + val clocks = Map( + 0 -> sourceClock, + 1 -> sinkClock + ) + + sourceClock.updateMinClock(0, 100L) + sinkClock.updateMinClock(0, 100L) + + // Clock advances from 0 to 100, there is no stalling. + healthChecker.check(currentMinClock = 100, clocks, dag, 200) + healthChecker.getReport.stallingTasks shouldBe List.empty[TaskId] + + // Clock not advancing. + // Pasted time exceed the stalling threshold, report stalling + healthChecker.check(currentMinClock = 100, clocks, dag, 1300) + + // The source task is stalling the clock + healthChecker.getReport.stallingTasks shouldBe List(TaskId(0, 0)) + + // Advance the source clock + sourceClock.updateMinClock(0, 101L) + healthChecker.check(currentMinClock = 100, clocks, dag, 1300) + // The sink task is stalling the clock + healthChecker.getReport.stallingTasks shouldBe List(TaskId(1, 0)) + } + } +} + +object ClockServiceSpec { + + class Store extends AppDataStore { + + private var map = Map.empty[String, Any] + + def put(key: String, value: Any): Future[Any] = { + map = map + (key -> value) + Promise.successful(value).future + } + + def get(key: String): Future[Any] = { + Promise.successful(map.get(key).getOrElse(null)).future + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala new file mode 100644 index 0000000..258c7ff --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.appmaster + +import akka.actor.{ActorSystem, Props} +import akka.testkit.TestProbe +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationFailed, DAGOperationSuccess, GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, ReplaceProcessor, TaskLaunchData, WatchChange} +import org.apache.gearpump.streaming.task.{Subscriber, TaskActor} +import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, StreamApplication} +import org.apache.gearpump.util.Graph +import org.apache.gearpump.util.Graph._ +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll { + + val hash = Partitioner[HashPartitioner] + val task1 = ProcessorDescription(id = 1, taskClass = classOf[TaskActor].getName, parallelism = 1) + val task2 = ProcessorDescription(id = 2, taskClass = classOf[TaskActor].getName, parallelism = 1) + val graph = Graph(task1 ~ hash ~> task2) + val dag = DAG(graph) + implicit var system: ActorSystem = null + val appId = 0 + lazy val userConfig = UserConfig.empty.withValue(StreamApplication.DAG, graph) + + "DagManager" should { + import org.apache.gearpump.streaming.appmaster.ClockServiceSpec.Store + "maintain the dags properly" in { + val store = new Store + + val dagManager = system.actorOf(Props(new DagManager(appId, userConfig, store, Some(dag)))) + val client = TestProbe() + client.send(dagManager, GetLatestDAG) + client.expectMsg(LatestDAG(dag)) + + client.send(dagManager, GetTaskLaunchData(dag.version, task1.id, null)) + val task1LaunchData = TaskLaunchData(task1, Subscriber.of(task1.id, dag)) + client.expectMsg(task1LaunchData) + + val task2LaunchData = TaskLaunchData(task2, Subscriber.of(task2.id, dag)) + client.send(dagManager, GetTaskLaunchData(dag.version, task2.id, null)) + client.expectMsg(task2LaunchData) + + val watcher = TestProbe() + client.send(dagManager, WatchChange(watcher.ref)) + val task3 = task2.copy(id = 3, life = LifeTime(100, Long.MaxValue)) + + client.send(dagManager, ReplaceProcessor(task2.id, task3)) + client.expectMsg(DAGOperationSuccess) + + client.send(dagManager, GetLatestDAG) + val newDag = client.expectMsgPF() { + case LatestDAG(dag) => dag + } + assert(newDag.processors.contains(task3.id)) + watcher.expectMsgType[LatestDAG] + + val task4 = task3.copy(id = 4) + client.send(dagManager, ReplaceProcessor(task3.id, task4)) + client.expectMsgType[DAGOperationFailed] + + client.send(dagManager, NewDAGDeployed(newDag.version)) + client.send(dagManager, ReplaceProcessor(task3.id, task4)) + client.expectMsg(DAGOperationSuccess) + } + + "retrieve last stored dag properly" in { + val store = new Store + val newGraph = Graph(task1 ~ hash ~> task2 ~> task2) + val newDag = DAG(newGraph) + store.put(StreamApplication.DAG, newDag) + val dagManager = system.actorOf(Props(new DagManager(appId, userConfig, store, Some(dag)))) + val client = TestProbe() + client.send(dagManager, GetLatestDAG) + client.expectMsg(LatestDAG(newDag)) + } + } + + override def afterAll { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + override def beforeAll { + this.system = ActorSystem("DagManagerSpec", TestUtil.DEFAULT_CONFIG) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala new file mode 100644 index 0000000..42b2618 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.appmaster + +import akka.actor._ +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory +import org.apache.gearpump.TestProbeUtil +import org.apache.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource +import org.apache.gearpump.cluster._ +import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems} +import org.apache.gearpump.cluster.appmaster.{ExecutorSystem, WorkerInfo} +import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest} +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.jarstore.FilePath +import org.apache.gearpump.streaming.ExecutorId +import org.apache.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor +import org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorStarted, _} +import org.apache.gearpump.streaming.appmaster.ExecutorManagerSpec.StartExecutorActorPlease +import org.apache.gearpump.util.ActorSystemBooter.BindLifeCycle +import org.apache.gearpump.util.LogUtil +import org.scalatest._ + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + implicit var system: ActorSystem = null + + private val LOG = LogUtil.getLogger(getClass) + private val appId = 0 + private val resource = Resource(10) + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + private def startExecutorSystems = { + val master = TestProbe() + val taskManager = TestProbe() + val executor = TestProbe() + val userConfig = UserConfig.empty + + val username = "user" + val appName = "app" + val appJar = Some(AppJar("for_test", FilePath("path"))) + + val appMasterContext = AppMasterContext(appId, username, null, null, appJar, master.ref, null) + + val executorFactory = (_: ExecutorContext, _: UserConfig, _: Address, _: ExecutorId) => { + executor.ref ! StartExecutorActorPlease + TestProbeUtil.toProps(executor) + } + val executorManager = system.actorOf(Props(new ExecutorManager(userConfig, appMasterContext, + executorFactory, ConfigFactory.empty, appName))) + + taskManager.send(executorManager, SetTaskManager(taskManager.ref)) + val resourceRequest = Array(ResourceRequest(resource, WorkerId.unspecified)) + + // Starts executors + taskManager.send(executorManager, StartExecutors(resourceRequest, appJar.get)) + + // Asks master to start executor systems + import scala.concurrent.duration._ + val startExecutorSystem = master.receiveOne(5.seconds).asInstanceOf[StartExecutorSystems] + assert(startExecutorSystem.resources == resourceRequest) + import startExecutorSystem.executorSystemConfig.{classPath, executorAkkaConfig, jar, jvmArguments, username => returnedUserName} + assert(startExecutorSystem.resources == resourceRequest) + + assert(classPath.length == 0) + assert(jvmArguments.length == 0) + assert(jar == appJar) + assert(returnedUserName == username) + assert(executorAkkaConfig.isEmpty) + + (master, executor, taskManager, executorManager) + } + + it should "report timeout to taskManager" in { + import org.apache.gearpump.streaming.appmaster.ExecutorManager._ + val (master, executor, taskManager, executorManager) = startExecutorSystems + master.reply(StartExecutorSystemTimeout) + taskManager.expectMsg(StartExecutorsTimeOut) + } + + it should "start executor actor correctly" in { + val (master, executor, taskManager, executorManager) = startExecutorSystems + val executorSystemDaemon = TestProbe() + val worker = TestProbe() + val workerId = WorkerId(0, 0L) + val workerInfo = WorkerInfo(workerId, worker.ref) + val executorSystem = ExecutorSystem(0, null, executorSystemDaemon.ref, + resource, workerInfo) + master.reply(ExecutorSystemStarted(executorSystem, None)) + import scala.concurrent.duration._ + val bindLifeWith = executorSystemDaemon.receiveOne(3.seconds).asInstanceOf[BindLifeCycle] + val proxyExecutor = bindLifeWith.actor + executor.expectMsg(StartExecutorActorPlease) + + val executorId = 0 + + // Registers executor + executor.send(executorManager, RegisterExecutor(proxyExecutor, executorId, + resource, workerInfo)) + taskManager.expectMsgType[ExecutorStarted] + + // Broadcasts message to childs + taskManager.send(executorManager, BroadCast("broadcast")) + executor.expectMsg("broadcast") + + // Unicast + taskManager.send(executorManager, UniCast(executorId, "unicast")) + executor.expectMsg("unicast") + + // Updates executor resource status + val usedResource = Resource(5) + executorManager ! ExecutorResourceUsageSummary(Map(executorId -> usedResource)) + worker.expectMsg(ChangeExecutorResource(appId, executorId, resource - usedResource)) + + // Watches for executor termination + system.stop(executor.ref) + LOG.info("Shutting down executor, and wait taskManager to get notified") + taskManager.expectMsg(ExecutorStopped(executorId)) + } +} + +object ExecutorManagerSpec { + case object StartExecutorActorPlease +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala new file mode 100644 index 0000000..5f3905f --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.appmaster + +import org.apache.gearpump.streaming.executor.ExecutorRestartPolicy +import org.apache.gearpump.streaming.task.TaskId +import org.scalatest.{Matchers, WordSpec} + +import scala.concurrent.duration._ + +class ExecutorRestartPolicySpec extends WordSpec with Matchers { + + "ExecutorRestartPolicy" should { + "decide whether to restart the executor" in { + val executorId1 = 1 + val executorId2 = 2 + val taskId = TaskId(0, 0) + val executorSupervisor = new ExecutorRestartPolicy( + maxNrOfRetries = 3, withinTimeRange = 1.seconds) + executorSupervisor.addTaskToExecutor(executorId1, taskId) + assert(executorSupervisor.allowRestartExecutor(executorId1)) + assert(executorSupervisor.allowRestartExecutor(executorId1)) + executorSupervisor.addTaskToExecutor(executorId2, taskId) + assert(executorSupervisor.allowRestartExecutor(executorId2)) + assert(!executorSupervisor.allowRestartExecutor(executorId2)) + Thread.sleep(1000) + assert(executorSupervisor.allowRestartExecutor(executorId2)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala new file mode 100644 index 0000000..5157284 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.appmaster + +import akka.actor.{ActorSystem, Props} +import akka.testkit.TestProbe +import org.apache.gearpump.cluster.ClientToMaster.QueryHistoryMetrics +import org.apache.gearpump.cluster.MasterToClient.HistoryMetrics +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.metrics.Metrics.{Counter, Histogram, Meter} +import org.apache.gearpump.util.HistoryMetricsService +import org.apache.gearpump.util.HistoryMetricsService._ +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + +import scala.concurrent.Await + +class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEach { + + val count = 2 + val intervalMs = 10 + + val config = HistoryMetricsConfig( + retainHistoryDataHours = 72, + retainHistoryDataIntervalMs = 3600 * 1000, + retainRecentDataSeconds = 300, + retainRecentDataIntervalMs = 15 * 1000) + + "SingleValueMetricsStore" should "retain metrics and expire old value" in { + + val store = new SingleValueMetricsStore(count, intervalMs) + + var now = 0L + // Only 1 data point will be kept in @intervalMs + store.add(Counter("count", 1), now) + store.add(Counter("count", 2), now) + + now = now + intervalMs + 1 + + // Only 1 data point will be kept in @intervalMs + store.add(Counter("count", 3), now) + store.add(Counter("count", 4), now) + + now = now + intervalMs + 1 + + // Only 1 data point will be kept in @intervalMs + // expire oldest data point, because we only keep @count records + store.add(Counter("count", 5), now) + store.add(Counter("count", 6), now) + + val result = store.read + assert(result.size == count) + + // The oldest value is expired + assert(result.head.value.asInstanceOf[Counter].value == 3L) + + // The newest value is inserted + assert(result.last.value.asInstanceOf[Counter].value == 5L) + } + + val meterTemplate = Meter("meter", 0, 0, 0, "s") + + "HistogramMetricsStore" should "retain corse-grain history and fine-grain recent data" in { + val store = new HistogramMetricsStore(config) + val a = Histogram(null, 100, 0, 0, 0, 0, 0) + val b = Histogram(null, 200, 0, 0, 0, 0, 0) + val c = Histogram(null, 50, 0, 0, 0, 0, 0) + + store.add(a) + store.add(b) + store.add(c) + + assert(store.readLatest.map(_.value) == List(c)) + assert(store.readRecent.map(_.value) == List(a)) + assert(store.readHistory.map(_.value) == List(a)) + } + + "MeterMetricsStore" should "retain corse-grain history and fine-grain recent data" in { + val store = new MeterMetricsStore(config) + + val a = Meter(null, 1, 100, 0, null) + val b = Meter(null, 1, 200, 0, null) + val c = Meter(null, 1, 50, 0, null) + + store.add(a) + store.add(b) + store.add(c) + + assert(store.readLatest.map(_.value) == List(c)) + assert(store.readRecent.map(_.value) == List(a)) + assert(store.readHistory.map(_.value) == List(a)) + } + + "CounterMetricsStore" should "retain corse-grain history and fine-grain recent data" in { + val store = new CounterMetricsStore(config) + val a = Counter(null, 50) + val b = Counter(null, 100) + val c = Counter(null, 150) + + store.add(a) + store.add(b) + store.add(c) + + assert(store.readLatest.map(_.value) == List(c)) + assert(store.readRecent.map(_.value) == List(a)) + assert(store.readHistory.map(_.value) == List(a)) + } + + "HistoryMetricsService" should + "retain lastest metrics data and allow user to query metrics by path" in { + implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + val appId = 0 + val service = system.actorOf(Props(new HistoryMetricsService("app0", config))) + service ! Counter("metric.counter", 0) + service ! Meter("metric.meter", 0, 0, 0, null) + service ! Histogram("metric.histogram", 0, 0, 0, 0, 0, 0) + + val client = TestProbe() + + // Filters metrics with path "metric.counter" + client.send(service, QueryHistoryMetrics("metric.counter")) + import scala.concurrent.duration._ + client.expectMsgPF(3.seconds) { + case history: HistoryMetrics => + assert(history.path == "metric.counter") + val metricList = history.metrics + metricList.foreach(metricItem => + assert(metricItem.value.isInstanceOf[Counter]) + ) + } + + // Filters metrics with path "metric.meter" + client.send(service, QueryHistoryMetrics("metric.meter")) + client.expectMsgPF(3.seconds) { + case history: HistoryMetrics => + assert(history.path == "metric.meter") + val metricList = history.metrics + metricList.foreach(metricItem => + assert(metricItem.value.isInstanceOf[Meter]) + ) + } + + // Filters metrics with path "metric.histogram" + client.send(service, QueryHistoryMetrics("metric.histogram")) + client.expectMsgPF(3.seconds) { + case history: HistoryMetrics => + assert(history.path == "metric.histogram") + val metricList = history.metrics + metricList.foreach(metricItem => + assert(metricItem.value.isInstanceOf[Histogram]) + ) + } + + // Filters metrics with path prefix "metric", all metrics which can + // match the path prefix will be retained. + client.send(service, QueryHistoryMetrics("metric")) + client.expectMsgPF(3.seconds) { + case history: HistoryMetrics => + val metricList = history.metrics + + var counterFound = false + var meterFound = false + var histogramFound = false + + metricList.foreach(metricItem => + metricItem.value match { + case v: Counter => counterFound = true + case v: Meter => meterFound = true + case v: Histogram => histogramFound = true + case _ => // Skip + } + ) + + // All kinds of metric type are reserved. + assert(counterFound && meterFound && histogramFound) + } + + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala new file mode 100644 index 0000000..5f6dd04 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.appmaster + +import akka.actor.ActorSystem +import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest} +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.cluster.{AppJar, TestUtil} +import org.apache.gearpump.jarstore.FilePath +import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2} +import org.apache.gearpump.streaming.task.TaskId +import org.apache.gearpump.streaming.{DAG, ProcessorDescription, _} +import org.apache.gearpump.util.Graph +import org.apache.gearpump.util.Graph._ +import org.scalatest.{Matchers, WordSpec} + +import scala.concurrent.{Await, Future} + +class JarSchedulerSpec extends WordSpec with Matchers { + val mockJar1 = AppJar("jar1", FilePath("path")) + val mockJar2 = AppJar("jar2", FilePath("path")) + val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask1].getName, parallelism = 1, + jar = mockJar1) + val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask2].getName, parallelism = 1, + jar = mockJar1) + val task3 = ProcessorDescription(id = 2, taskClass = classOf[TestTask2].getName, parallelism = 2, + jar = mockJar2) + val dag = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2)) + + import scala.concurrent.duration._ + + "JarScheduler" should { + "schedule tasks depends on app jar" in { + val system = ActorSystem("JarSchedulerSpec") + implicit val dispatcher = system.dispatcher + val manager = new JarScheduler(0, "APP", TestUtil.DEFAULT_CONFIG, system) + manager.setDag(dag, Future { + 0L + }) + val requests = Array(ResourceRequest(Resource(2), WorkerId.unspecified)) + val result = Await.result(manager.getResourceRequestDetails(), 15.seconds) + assert(result.length == 1) + assert(result.head.jar == mockJar1) + assert(result.head.requests.deep == requests.deep) + + val tasks = Await.result(manager.scheduleTask(mockJar1, WorkerId(0, 0L), 0, + Resource(2)), 15.seconds) + assert(tasks.contains(TaskId(0, 0))) + assert(tasks.contains(TaskId(1, 0))) + + val newDag = replaceDAG(dag, 1, task3, 1) + + manager.setDag(newDag, Future { + 0 + }) + val requestDetails = Await.result(manager.getResourceRequestDetails(). + map(_.sortBy(_.jar.name)), 15.seconds) + assert(requestDetails.length == 2) + assert(requestDetails.last.jar == mockJar2) + assert(requestDetails.last.requests.deep == requests.deep) + + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + } + + def replaceDAG( + dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int) + : DAG = { + val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth, + newProcessor.life.birth) + val newProcessorMap = dag.processors ++ + Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife), + newProcessor.id -> newProcessor) + val newGraph = dag.graph.subGraph(oldProcessorId). + replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph) + new DAG(newVersion, newProcessorMap, newGraph) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskLocatorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskLocatorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskLocatorSpec.scala new file mode 100644 index 0000000..4341041 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskLocatorSpec.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.appmaster + +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.streaming.appmaster.TaskLocator.Localities +import org.apache.gearpump.streaming.task.TaskId +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +class TaskLocatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + it should "serialize/deserialize correctly" in { + val localities = new Localities(Map(WorkerId(0, 0L) -> Array(TaskId(0, 1), TaskId(1, 2)))) + Localities.toJson(localities) + + localities.localities.mapValues(_.toList) shouldBe + Localities.fromJson(Localities.toJson(localities)).localities.mapValues(_.toList) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala new file mode 100644 index 0000000..9765278 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.appmaster + +import akka.actor.{ActorRef, ActorSystem, Props} +import akka.testkit.TestProbe +import org.apache.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge +import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest} +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.cluster.{AppJar, TestUtil, UserConfig} +import org.apache.gearpump.jarstore.FilePath +import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription} +import org.apache.gearpump.streaming.AppMasterToExecutor.{LaunchTasks, StartAllTasks, StartDynamicDag, TaskLocationsReady, TaskLocationsReceived, TaskRegistered} +import org.apache.gearpump.streaming.ExecutorToAppMaster.RegisterTask +import org.apache.gearpump.streaming.appmaster.AppMaster.AllocateResourceTimeOut +import org.apache.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess} +import org.apache.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, TaskLaunchData, WatchChange} +import org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorResourceUsageSummary, SetTaskManager, StartExecutors, _} +import org.apache.gearpump.streaming.appmaster.JarScheduler.ResourceRequestDetail +import org.apache.gearpump.streaming.appmaster.TaskManagerSpec.{Env, Task1, Task2} +import org.apache.gearpump.streaming.executor.Executor.RestartTasks +import org.apache.gearpump.streaming.task.{StartTime, TaskContext, _} +import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, ProcessorId} +import org.apache.gearpump.transport.HostPort +import org.apache.gearpump.util.Graph +import org.apache.gearpump.util.Graph._ +import org.apache.gearpump.{Message, TimeStamp} +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} + +class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { + + implicit var system: ActorSystem = null + + val task1Class = classOf[Task1].getName + val task2Class = classOf[Task2].getName + + val mockJar = AppJar("jar_for_test", FilePath("path")) + val task1 = ProcessorDescription(id = 0, taskClass = task1Class, parallelism = 1, jar = mockJar) + val task2 = ProcessorDescription(id = 1, taskClass = task2Class, parallelism = 1, jar = mockJar) + + val dag: DAG = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2)) + val dagVersion = 0 + + val task1LaunchData = TaskLaunchData(task1, Subscriber.of(processorId = 0, dag)) + val task2LaunchData = TaskLaunchData(task2, Subscriber.of(processorId = 1, dag)) + + val appId = 0 + + val resource = Resource(2) + val workerId = WorkerId(0, 0L) + val executorId = 0 + + override def beforeEach(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterEach(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + it should "recover by requesting new executors when executor stopped unexpectedly" in { + val env = bootUp + import env._ + implicit val dispatcher = system.dispatcher + + val resourceRequest = Array(ResourceRequest(resource, workerId)) + when(scheduler.executorFailed(executorId)).thenReturn(Future { + Some(ResourceRequestDetail(mockJar, + resourceRequest)) + }) + + taskManager ! ExecutorStopped(executorId) + + // When one executor stop, it will also trigger the recovery by restart + // existing executors + executorManager.expectMsg(BroadCast(RestartTasks(dagVersion))) + + // Asks for new executors + val returned = executorManager.receiveN(1).head.asInstanceOf[StartExecutors] + assert(returned.resources.deep == resourceRequest.deep) + executorManager.reply(StartExecutorsTimeOut) + + // TaskManager cannot handle the TimeOut error itself, escalate to appmaster. + appMaster.expectMsg(AllocateResourceTimeOut) + } + + it should "recover by restarting existing executors when message loss happen" in { + val env = bootUp + import env._ + + taskManager ! ReplayFromTimestampWindowTrailingEdge(appId) + + // Restart the executors so that we can replay from minClock + executorManager.expectMsg(BroadCast(RestartTasks(dagVersion))) + } + + import org.apache.gearpump.streaming.appmaster.TaskManager.TaskChangeRegistry + "TaskChangeRegistry" should "track all modified task registration" in { + val tasks = List(TaskId(0, 0), TaskId(0, 1)) + val registry = new TaskChangeRegistry(tasks) + registry.taskChanged(TaskId(0, 0)) + registry.taskChanged(TaskId(0, 1)) + assert(registry.allTaskChanged) + } + + "DAGDiff" should "track all the DAG migration impact" in { + val defaultEdge = PartitionerDescription(null) + val a = ProcessorDescription(id = 1, taskClass = null, parallelism = 1) + val b = ProcessorDescription(id = 2, taskClass = null, parallelism = 1) + val c = ProcessorDescription(id = 3, taskClass = null, parallelism = 1) + val left = Graph(a ~ defaultEdge ~> b, a ~ defaultEdge ~> c) + + val d = ProcessorDescription(id = 4, taskClass = null, parallelism = 1) + val right = left.copy + right.addVertex(d) + right.addEdge(c, defaultEdge, d) + val e = a.copy(life = LifeTime(0, 0)) + right.replaceVertex(a, e) + + val diff = TaskManager.migrate(DAG(left), DAG(right, version = 1)) + diff.addedProcessors shouldBe List(d.id) + + diff.modifiedProcessors shouldBe List(a.id) + + diff.impactedUpstream shouldBe List(c.id) + } + + private def bootUp: Env = { + + implicit val dispatcher = system.dispatcher + + val executorManager = TestProbe() + val clockService = TestProbe() + val appMaster = TestProbe() + val executor = TestProbe() + + val scheduler = mock(classOf[JarScheduler]) + + val dagManager = TestProbe() + + val taskManager = system.actorOf( + Props(new TaskManager(appId, dagManager.ref, scheduler, executorManager.ref, clockService.ref, + appMaster.ref, "appName"))) + + dagManager.expectMsgType[WatchChange] + executorManager.expectMsgType[SetTaskManager] + + // Step1: first transition from Unitialized to ApplicationReady + executorManager.expectMsgType[ExecutorResourceUsageSummary] + dagManager.expectMsgType[NewDAGDeployed] + + // Step2: Get Additional Resource Request + when(scheduler.getResourceRequestDetails()) + .thenReturn(Future { + Array(ResourceRequestDetail(mockJar, Array(ResourceRequest(resource, + WorkerId.unspecified)))) + }) + + // Step3: DAG changed. Start transit from ApplicationReady -> DynamicDAG + dagManager.expectMsg(GetLatestDAG) + dagManager.reply(LatestDAG(dag)) + + // Step4: Start remote Executors. + // received Broadcast + executorManager.expectMsg(BroadCast(StartDynamicDag(dag.version))) + executorManager.expectMsgType[StartExecutors] + + when(scheduler.scheduleTask(mockJar, workerId, executorId, resource)) + .thenReturn(Future(List(TaskId(0, 0), TaskId(1, 0)))) + + // Step5: Executor is started. + executorManager.reply(ExecutorStarted(executorId, resource, workerId, Some(mockJar))) + + // Step6: Prepare to start Task. First GetTaskLaunchData. + val taskLaunchData: PartialFunction[Any, TaskLaunchData] = { + case GetTaskLaunchData(_, 0, executorStarted) => + task1LaunchData.copy(context = executorStarted) + case GetTaskLaunchData(_, 1, executorStarted) => + task2LaunchData.copy(context = executorStarted) + } + + val launchData1 = dagManager.expectMsgPF()(taskLaunchData) + dagManager.reply(launchData1) + + val launchData2 = dagManager.expectMsgPF()(taskLaunchData) + dagManager.reply(launchData2) + + // Step7: Launch Task + val launchTaskMatch: PartialFunction[Any, RegisterTask] = { + case UniCast(executorId, launch: LaunchTasks) => + RegisterTask(launch.taskId.head, executorId, HostPort("127.0.0.1:3000")) + } + + // Taskmanager should return the latest start clock to task(0,0) + clockService.expectMsg(GetStartClock) + clockService.reply(StartClock(0)) + + // Step8: Task is started. registerTask. + val registerTask1 = executorManager.expectMsgPF()(launchTaskMatch) + taskManager.tell(registerTask1, executor.ref) + executor.expectMsgType[TaskRegistered] + + val registerTask2 = executorManager.expectMsgPF()(launchTaskMatch) + taskManager.tell(registerTask2, executor.ref) + executor.expectMsgType[TaskRegistered] + + // Step9: start broadcasting TaskLocations. + import scala.concurrent.duration._ + assert(executorManager.expectMsgPF(5.seconds) { + case BroadCast(startAllTasks) => startAllTasks.isInstanceOf[TaskLocationsReady] + }) + + // Step10: Executor confirm it has received TaskLocationsReceived(version, executorId) + taskManager.tell(TaskLocationsReceived(dag.version, executorId), executor.ref) + + // Step11: Tell ClockService to update DAG. + clockService.expectMsgType[ChangeToNewDAG] + clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, TimeStamp])) + + // Step12: start all tasks + import scala.concurrent.duration._ + assert(executorManager.expectMsgPF(5.seconds) { + case BroadCast(startAllTasks) => startAllTasks.isInstanceOf[StartAllTasks] + }) + + // Step13, Tell executor Manager the updated usage status of executors. + executorManager.expectMsgType[ExecutorResourceUsageSummary] + + // Step14: transition from DynamicDAG to ApplicationReady + Env(executorManager, clockService, appMaster, executor, taskManager, scheduler) + } +} + +object TaskManagerSpec { + case class Env( + executorManager: TestProbe, + clockService: TestProbe, + appMaster: TestProbe, + executor: TestProbe, + taskManager: ActorRef, + scheduler: JarScheduler) + + class Task1(taskContext: TaskContext, userConf: UserConfig) + extends Task(taskContext, userConf) { + override def onStart(startTime: StartTime): Unit = {} + override def onNext(msg: Message): Unit = {} + } + + class Task2(taskContext: TaskContext, userConf: UserConfig) + extends Task(taskContext, userConf) { + override def onStart(startTime: StartTime): Unit = {} + override def onNext(msg: Message): Unit = {} + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskRegistrySpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskRegistrySpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskRegistrySpec.scala new file mode 100644 index 0000000..6f85834 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskRegistrySpec.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.appmaster + +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.streaming.appmaster.TaskRegistry.{Accept, Reject, TaskLocation, TaskLocations} +import org.apache.gearpump.streaming.task.TaskId +import org.apache.gearpump.transport.HostPort +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} +class TaskRegistrySpec extends FlatSpec with Matchers with BeforeAndAfterEach { + + it should "maintain registered tasks" in { + val task0 = TaskId(0, 0) + val task1 = TaskId(0, 1) + val task2 = TaskId(0, 2) + + val register = new TaskRegistry(expectedTasks = List(task0, task1, task2)) + val host1 = HostPort("127.0.0.1:3000") + val host2 = HostPort("127.0.0.1:3001") + + val executorId = 0 + assert(Accept == register.registerTask(task0, TaskLocation(executorId, host1))) + assert(Accept == register.registerTask(task1, TaskLocation(executorId, host1))) + assert(Accept == register.registerTask(task2, TaskLocation(executorId, host2))) + + assert(Reject == register.registerTask(TaskId(100, 0), TaskLocation(executorId, host2))) + + assert(register.isAllTasksRegistered) + val TaskLocations(taskLocations) = register.getTaskLocations + val tasksOnHost1 = taskLocations.get(host1).get + val tasksOnHost2 = taskLocations.get(host2).get + assert(tasksOnHost1.contains(task0)) + assert(tasksOnHost1.contains(task1)) + assert(tasksOnHost2.contains(task2)) + + assert(register.getExecutorId(task0) == Some(executorId)) + assert(register.isTaskRegisteredForExecutor(executorId)) + + register.processorExecutors(0) shouldBe Map( + executorId -> List(task0, task1, task2) + ) + + register.usedResource.resources shouldBe Map( + executorId -> Resource(3) + ) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala new file mode 100644 index 0000000..4a532dd --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.appmaster + +import com.typesafe.config.ConfigFactory +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest} +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.appmaster.TaskLocator.Localities +import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2} +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} +import org.apache.gearpump.streaming.{DAG, ProcessorDescription} +import org.apache.gearpump.util.Graph +import org.apache.gearpump.util.Graph._ +import org.scalatest.{Matchers, WordSpec} + +import scala.collection.mutable.ArrayBuffer + +class TaskSchedulerSpec extends WordSpec with Matchers { + val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask1].getName, parallelism = 4) + val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask2].getName, parallelism = 2) + + val dag = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2)) + + val config = TestUtil.DEFAULT_CONFIG + + "TaskScheduler" should { + "schedule tasks on different workers properly according user's configuration" in { + + val localities = Localities( + Map(WorkerId(1, 0L) -> Array(TaskId(0, 0), TaskId(0, 1), TaskId(1, 0), TaskId(1, 1)), + WorkerId(2, 0L) -> Array(TaskId(0, 2), TaskId(0, 3)) + )) + + val localityConfig = ConfigFactory.parseString(Localities.toJson(localities)) + + import org.apache.gearpump.streaming.Constants.GEARPUMP_STREAMING_LOCALITIES + val appName = "app" + val taskScheduler = new TaskSchedulerImpl(appId = 0, appName, + config.withValue(s"$GEARPUMP_STREAMING_LOCALITIES.$appName", localityConfig.root)) + + val expectedRequests = + Array(ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER), + ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = Relaxation.SPECIFICWORKER)) + + taskScheduler.setDAG(dag) + val resourceRequests = taskScheduler.getResourceRequests() + + val acturalRequests = resourceRequests.sortBy(_.resource.slots) + assert(acturalRequests.sameElements(expectedRequests.sortBy(_.resource.slots))) + + val tasksOnWorker1 = ArrayBuffer[Int]() + val tasksOnWorker2 = ArrayBuffer[Int]() + for (i <- 0 until 4) { + tasksOnWorker1.append(taskScheduler.schedule(WorkerId(1, 0L), + executorId = 0, Resource(1)).head.processorId) + } + for (i <- 0 until 2) { + tasksOnWorker2.append(taskScheduler.schedule(WorkerId(2, 0L), executorId = 1, + Resource(1)).head.processorId) + } + + // Allocates more resource, and no tasks to launch + assert(taskScheduler.schedule(WorkerId(3, 0L), executorId = 3, + Resource(1)) == List.empty[TaskId]) + + // On worker1, executor 0 + assert(tasksOnWorker1.sorted.sameElements(Array(0, 0, 1, 1))) + + // On worker2, executor 1, Task(0, 0), Task(0, 1) + assert(tasksOnWorker2.sorted.sameElements(Array(0, 0))) + + val rescheduledResources = taskScheduler.executorFailed(executorId = 1) + + assert(rescheduledResources.sameElements(Array(ResourceRequest(Resource(2), + WorkerId.unspecified, relaxation = Relaxation.ONEWORKER)))) + + val launchedTask = taskScheduler.schedule(WorkerId(3, 0L), executorId = 3, Resource(2)) + + // Starts the failed 2 tasks Task(0, 0) and Task(0, 1) + assert(launchedTask.length == 2) + } + + "schedule task fairly" in { + val appName = "app" + val taskScheduler = new TaskSchedulerImpl(appId = 0, appName, config) + + val expectedRequests = + Array(ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER), + ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = Relaxation.SPECIFICWORKER)) + + taskScheduler.setDAG(dag) + val tasks = taskScheduler.schedule(WorkerId(1, 0L), executorId = 0, Resource(4)) + assert(tasks.filter(_.processorId == 0).length == 2) + assert(tasks.filter(_.processorId == 1).length == 2) + } + } +} + +object TaskSchedulerSpec { + class TestTask1(taskContext: TaskContext, userConf: UserConfig) + extends Task(taskContext, userConf) { + override def onStart(startTime: StartTime): Unit = Unit + override def onNext(msg: Message): Unit = Unit + } + + class TestTask2(taskContext: TaskContext, userConf: UserConfig) + extends Task(taskContext, userConf) { + override def onStart(startTime: StartTime): Unit = Unit + override def onNext(msg: Message): Unit = Unit + } +}
