http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala new file mode 100644 index 0000000..616894a --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.transaction.api + +import org.apache.gearpump.{Message, TimeStamp} + +import scala.util.Try + +/** + * Filters offsets and store the mapping from timestamp to offset + */ +trait MessageFilter { + def filter(messageAndOffset: (Message, Long)): Option[Message] +} + +/** + * Resolves timestamp to offset by look up the underlying storage + */ +trait OffsetTimeStampResolver { + def resolveOffset(time: TimeStamp): Try[Long] +} + +/** + * Manages message's offset on TimeReplayableSource and timestamp + */ +trait OffsetManager extends MessageFilter with OffsetTimeStampResolver { + def close(): Unit +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala new file mode 100644 index 0000000..40fc088 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.transaction.api + +import org.apache.gearpump.TimeStamp + +import scala.util.Try + +object OffsetStorage { + + /** + * StorageEmpty means no data has been stored + */ + case object StorageEmpty extends Throwable + + /** + * Overflow means the looked up time is + * larger than the maximum stored TimeStamp + */ + case class Overflow(maxTimestamp: Array[Byte]) extends Throwable + + /** + * Underflow means the looked up time is + * smaller than the minimum stored TimeStamp + */ + case class Underflow(minTimestamp: Array[Byte]) extends Throwable +} + +/** + * OffsetStorage stores the mapping from TimeStamp to Offset + */ +trait OffsetStorage { + /** + * Tries to look up the time in the OffsetStorage return the corresponding Offset if the time is + * in the range of stored TimeStamps or one of the failure info (StorageEmpty, Overflow, + * Underflow) + * + * @param time the time to look for + * @return the corresponding offset if the time is in the range, otherwise failure + */ + def lookUp(time: TimeStamp): Try[Array[Byte]] + + def append(time: TimeStamp, offset: Array[Byte]): Unit + + def close(): Unit +} + +trait OffsetStorageFactory extends java.io.Serializable { + def getOffsetStorage(dir: String): OffsetStorage +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala new file mode 100644 index 0000000..16f98d5 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.transaction.api + +import org.apache.gearpump.streaming.source.DataSource + +/** + * AT-LEAST-ONCE API. Represents a data source which allow replaying. + * + * Subclass should be able to replay messages on recovery from the time + * when an application crashed. + */ +trait TimeReplayableSource extends DataSource + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala new file mode 100644 index 0000000..2ddca3a --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.transaction.api + +import org.apache.gearpump.{Message, TimeStamp} + +/** + * TimeStampFilter filters out messages that are obsolete. + */ +trait TimeStampFilter extends java.io.Serializable { + def filter(msg: Message, predicate: TimeStamp): Option[Message] +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/util/ActorPathUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/util/ActorPathUtil.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/util/ActorPathUtil.scala new file mode 100644 index 0000000..97c9385 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/util/ActorPathUtil.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.util + +import akka.actor.{ActorPath, ActorRef} + +import org.apache.gearpump.streaming.task.TaskId + +object ActorPathUtil { + + def executorActorName(executorId: Int): String = executorId.toString + + def taskActorName(taskId: TaskId): String = { + s"processor_${taskId.processorId}_task_${taskId.index}" + } + + def taskActorPath(appMaster: ActorRef, executorId: Int, taskId: TaskId): ActorPath = { + val executorManager = appMaster.path.child(executorManagerActorName) + val executor = executorManager.child(executorActorName(executorId)) + val task = executor.child(taskActorName(taskId)) + task + } + + def executorManagerActorName: String = "executors" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala deleted file mode 100644 index 13b8e34..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming - -import org.scalacheck.Gen -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.partitioner.PartitionerDescription -import io.gearpump.streaming.task.TaskId -import io.gearpump.util.Graph -import io.gearpump.util.Graph.Node - -class DAGSpec extends PropSpec with PropertyChecks with Matchers { - - val parallelismGen = Gen.chooseNum[Int](1, 100) - - property("DAG should be built correctly for a single task") { - forAll(parallelismGen) { (parallelism: Int) => - val task = ProcessorDescription(id = 0, taskClass = "task", parallelism = parallelism) - val graph = Graph[ProcessorDescription, PartitionerDescription](task) - val dag = DAG(graph) - dag.processors.size shouldBe 1 - assert(dag.taskCount == parallelism) - dag.tasks.sortBy(_.index) shouldBe (0 until parallelism).map(index => TaskId(0, index)) - dag.graph.edges shouldBe empty - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala deleted file mode 100644 index 7938415..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming - -import org.jboss.netty.buffer.{ChannelBufferOutputStream, ChannelBuffers} -import org.scalatest.{Matchers, WordSpec} - -import io.gearpump.streaming.task._ -import io.gearpump.transport.netty.WrappedChannelBuffer - -class MessageSerializerSpec extends WordSpec with Matchers { - - def testSerializer[T](obj: T, taskMessageSerializer: TaskMessageSerializer[T]): T = { - val length = taskMessageSerializer.getLength(obj) - val bout = new ChannelBufferOutputStream(ChannelBuffers.buffer(length)) - taskMessageSerializer.write(bout, obj) - val input = new WrappedChannelBuffer(ChannelBuffers.wrappedBuffer(bout.buffer().array())) - taskMessageSerializer.read(input) - } - - "SerializedMessageSerializer" should { - "serialize and deserialize SerializedMessage properly" in { - val serializer = new SerializedMessageSerializer - val data = new Array[Byte](256) - new java.util.Random().nextBytes(data) - val msg = SerializedMessage(1024, data) - val result = testSerializer(msg, serializer) - assert(result.timeStamp == msg.timeStamp && result.bytes.sameElements(msg.bytes)) - } - } - - "TaskIdSerializer" should { - "serialize and deserialize TaskId properly" in { - val taskIdSerializer = new TaskIdSerializer - val taskId = TaskId(1, 3) - assert(testSerializer(taskId, taskIdSerializer).equals(taskId)) - } - } - - "AckRequestSerializer" should { - "serialize and deserialize AckRequest properly" in { - val serializer = new AckRequestSerializer - val ackRequest = AckRequest(TaskId(1, 2), 1000, 1024) - assert(testSerializer(ackRequest, serializer).equals(ackRequest)) - } - } - - "InitialAckRequestSerializer" should { - "serialize and deserialize AckRequest properly" in { - val serializer = new InitialAckRequestSerializer - val ackRequest = InitialAckRequest(TaskId(1, 2), 1024) - assert(testSerializer(ackRequest, serializer).equals(ackRequest)) - } - } - - "AckSerializer" should { - "serialize and deserialize Ack properly" in { - val serializer = new AckSerializer - val ack = Ack(TaskId(1, 2), 1024, 1023, 1799) - assert(testSerializer(ack, serializer).equals(ack)) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala b/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala deleted file mode 100644 index 40310f7..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming - -import akka.actor.{Actor, ActorSystem} -import akka.testkit.TestActorRef -import org.mockito.{ArgumentMatcher, Matchers, Mockito} - -import io.gearpump.cluster.TestUtil -import io.gearpump.streaming.task.{TaskContext, TaskId} - -object MockUtil { - - lazy val system: ActorSystem = ActorSystem("mockUtil", TestUtil.DEFAULT_CONFIG) - - def mockTaskContext: TaskContext = { - val context = Mockito.mock(classOf[TaskContext]) - Mockito.when(context.self).thenReturn(Mockito.mock(classOf[TestActorRef[Actor]])) - Mockito.when(context.system).thenReturn(system) - Mockito.when(context.parallelism).thenReturn(1) - Mockito.when(context.taskId).thenReturn(TaskId(0, 0)) - context - } - - def argMatch[T](func: T => Boolean): T = { - Matchers.argThat(new ArgumentMatcher[T] { - override def matches(param: Any): Boolean = { - val mesage = param.asInstanceOf[T] - func(mesage) - } - }) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala b/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala deleted file mode 100644 index 2ea8b84..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming - -import akka.actor._ -import akka.testkit.TestActorRef - -import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster -import io.gearpump.cluster.appmaster.AppMasterRuntimeInfo -import io.gearpump.cluster.scheduler.Resource -import io.gearpump.cluster.{AppDescription, AppMasterContext, MiniCluster, UserConfig} -import io.gearpump.streaming.appmaster.AppMaster -import io.gearpump.util.Graph - -object StreamingTestUtil { - private var executorId = 0 - val testUserName = "testuser" - - def startAppMaster(miniCluster: MiniCluster, appId: Int): TestActorRef[AppMaster] = { - - implicit val actorSystem = miniCluster.system - val masterConf = AppMasterContext(appId, testUserName, Resource(1), null, - None, miniCluster.mockMaster, AppMasterRuntimeInfo(appId, appName = appId.toString)) - - val app = StreamApplication("test", Graph.empty, UserConfig.empty) - val appDescription = AppDescription(app.name, app.appMaster.getName, app.userConfig) - val props = Props(new AppMaster(masterConf, appDescription)) - val appMaster = miniCluster.launchActor(props).asInstanceOf[TestActorRef[AppMaster]] - val registerAppMaster = RegisterAppMaster(appMaster, masterConf.registerData) - miniCluster.mockMaster.tell(registerAppMaster, appMaster) - - appMaster - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala deleted file mode 100644 index 90fb8ab..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.appmaster - -import scala.concurrent.duration._ - -import akka.actor.{ActorRef, Props} -import akka.testkit.{TestActorRef, TestProbe} -import org.scalatest._ - -import io.gearpump.Message -import io.gearpump.cluster.AppMasterToMaster._ -import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor -import io.gearpump.cluster.ClientToMaster.ShutdownApplication -import io.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated} -import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected -import io.gearpump.cluster._ -import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo} -import io.gearpump.cluster.master.MasterProxy -import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.jarstore.FilePath -import io.gearpump.partitioner.HashPartitioner -import io.gearpump.streaming.task.{StartTime, TaskContext, _} -import io.gearpump.streaming.{Processor, StreamApplication} -import io.gearpump.util.Graph -import io.gearpump.util.Graph._ - -class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness { - protected override def config = TestUtil.DEFAULT_CONFIG - - var appMaster: ActorRef = null - - val appId = 0 - val workerId = WorkerId(1, 0L) - val resource = Resource(1) - val taskDescription1 = Processor[TaskA](2) - val taskDescription2 = Processor[TaskB](2) - val partitioner = new HashPartitioner - var conf: UserConfig = null - - var mockTask: TestProbe = null - - var mockMaster: TestProbe = null - var mockMasterProxy: ActorRef = null - - var mockWorker: TestProbe = null - var appDescription: AppDescription = null - var appMasterContext: AppMasterContext = null - var appMasterRuntimeInfo: AppMasterRuntimeInfo = null - - override def beforeEach(): Unit = { - startActorSystem() - - mockTask = TestProbe()(getActorSystem) - - mockMaster = TestProbe()(getActorSystem) - mockWorker = TestProbe()(getActorSystem) - mockMaster.ignoreMsg(ignoreSaveAppData) - appMasterRuntimeInfo = AppMasterRuntimeInfo(appId, appName = appId.toString) - - implicit val system = getActorSystem - conf = UserConfig.empty.withValue(AppMasterSpec.MASTER, mockMaster.ref) - val mockJar = AppJar("for_test", FilePath("path")) - appMasterContext = AppMasterContext(appId, "test", resource, null, Some(mockJar), - mockMaster.ref, appMasterRuntimeInfo) - val graph = Graph(taskDescription1 ~ partitioner ~> taskDescription2) - val streamApp = StreamApplication("test", graph, conf) - appDescription = Application.ApplicationToAppDescription(streamApp) - import scala.concurrent.duration._ - mockMasterProxy = getActorSystem.actorOf(Props(new MasterProxy(List(mockMaster.ref.path), - 30.seconds)), AppMasterSpec.MOCK_MASTER_PROXY) - TestActorRef[AppMaster]( - AppMasterRuntimeEnvironment.props(List(mockMasterProxy.path), appDescription, - appMasterContext))(getActorSystem) - - val registerAppMaster = mockMaster.receiveOne(15.seconds) - assert(registerAppMaster.isInstanceOf[RegisterAppMaster]) - appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster - - mockMaster.reply(AppMasterRegistered(appId)) - mockMaster.expectMsg(15.seconds, GetAppData(appId, "DAG")) - mockMaster.reply(GetAppDataResult("DAG", null)) - mockMaster.expectMsg(15.seconds, GetAppData(appId, "startClock")) - - mockMaster.reply(GetAppDataResult("startClock", 0L)) - - mockMaster.expectMsg(15.seconds, RequestResource(appId, ResourceRequest(Resource(4), - workerId = WorkerId.unspecified))) - } - - override def afterEach(): Unit = { - shutdownActorSystem() - } - - "AppMaster" should { - "kill it self when allocate resource time out" in { - mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(2), - mockWorker.ref, workerId)))) - mockMaster.expectMsg(60.seconds, ShutdownApplication(appId)) - } - - "reschedule the resource when the worker reject to start executor" in { - val resource = Resource(4) - mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, - mockWorker.ref, workerId)))) - mockWorker.expectMsgClass(classOf[LaunchExecutor]) - mockWorker.reply(ExecutorLaunchRejected("")) - mockMaster.expectMsg(RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))) - } - - "find a new master when lost connection with master" in { - - val watcher = TestProbe()(getActorSystem) - watcher.watch(mockMasterProxy) - getActorSystem.stop(mockMasterProxy) - watcher.expectTerminated(mockMasterProxy) - // Make sure the parent of mockMasterProxy has received the Terminated message. - // Issus address: https://github.com/gearpump/gearpump/issues/1919 - Thread.sleep(2000) - - import scala.concurrent.duration._ - mockMasterProxy = getActorSystem.actorOf(Props(new MasterProxy(List(mockMaster.ref.path), - 30.seconds)), AppMasterSpec.MOCK_MASTER_PROXY) - mockMaster.expectMsgClass(15.seconds, classOf[RegisterAppMaster]) - } - - // // TODO: This test is failing on Travis randomly - // // We have not identifed the root cause. - // // Check: https://travis-ci.org/intel-hadoop/gearpump/builds/56826843 - // // Issue tracker: https://github.com/intel-hadoop/gearpump/issues/733 - // - // "launch executor and task properly" in { - // mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(4), mockWorker.ref, - // workerId)))) - // mockWorker.expectMsgClass(classOf[LaunchExecutor]) - // - // val workerSystem = ActorSystem("worker", TestUtil.DEFAULT_CONFIG) - // mockWorker.reply(RegisterActorSystem(ActorUtil.getSystemAddress(workerSystem).toString)) - // for (i <- 1 to 4) { - // mockMaster.expectMsg(10 seconds, AppMasterSpec.TaskStarted) - // } - // - // // clock status: task(0,0) -> 1, task(0,1)->0, task(1, 0)->0, task(1,1)->0 - // appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref) - // - // // there is no further upstream, so the upstreamMinClock is Long.MaxValue - // mockTask.expectMsg(UpstreamMinClock(Long.MaxValue)) - // - // // check min clock - // appMaster.tell(GetLatestMinClock, mockTask.ref) - // mockTask.expectMsg(LatestMinClock(0)) - // - // - // // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, task(1,1)->0 - // appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref) - // - // // there is no further upstream, so the upstreamMinClock is Long.MaxValue - // mockTask.expectMsg(UpstreamMinClock(Long.MaxValue)) - // - // // check min clock - // appMaster.tell(GetLatestMinClock, mockTask.ref) - // mockTask.expectMsg(LatestMinClock(0)) - // - // // Clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->0 - // appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref) - // - // // Min clock of processor 0 (Task(0, 0) and Task(0, 1)) - // mockTask.expectMsg(UpstreamMinClock(1)) - // - // // check min clock - // appMaster.tell(GetLatestMinClock, mockTask.ref) - // mockTask.expectMsg(LatestMinClock(0)) - // - // // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->1 - // appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref) - // - // // min clock of processor 0 (Task(0, 0) and Task(0, 1)) - // mockTask.expectMsg(UpstreamMinClock(1)) - // - // // check min clock - // appMaster.tell(GetLatestMinClock, mockTask.ref) - // mockTask.expectMsg(LatestMinClock(1)) - // - // // shutdown worker and all executor on this work, expect appmaster to ask - // // for new resources - // workerSystem.shutdown() - // mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(4), relaxation = - // Relaxation.ONEWORKER))) - // } - } - - def ignoreSaveAppData: PartialFunction[Any, Boolean] = { - case msg: SaveAppData => true - } -} - -object AppMasterSpec { - val MASTER = "master" - case object TaskStarted - - val MOCK_MASTER_PROXY = "mockMasterProxy" -} - -class TaskA(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - - val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get - override def onStart(startTime: StartTime): Unit = { - master ! AppMasterSpec.TaskStarted - } - - override def onNext(msg: Message): Unit = {} -} - -class TaskB(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - - val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get - override def onStart(startTime: StartTime): Unit = { - master ! AppMasterSpec.TaskStarted - } - - override def onNext(msg: Message): Unit = {} -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala deleted file mode 100644 index 01744a3..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.appmaster - -import scala.concurrent.{Future, Promise} - -import akka.actor.{ActorSystem, Props} -import akka.testkit.{ImplicitSender, TestKit, TestProbe} -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - -import io.gearpump.cluster.{TestUtil, UserConfig} -import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription} -import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess, HealthChecker, ProcessorClock} -import io.gearpump.streaming.appmaster.ClockServiceSpec.Store -import io.gearpump.streaming.storage.AppDataStore -import io.gearpump.streaming.task.{GetLatestMinClock, GetStartClock, UpstreamMinClock, _} -import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription} -import io.gearpump.util.Graph -import io.gearpump.util.Graph._ - -class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender - with WordSpecLike with Matchers with BeforeAndAfterAll { - - def this() = this(ActorSystem("ClockServiceSpec", TestUtil.DEFAULT_CONFIG)) - - val hash = Partitioner[HashPartitioner] - val task1 = ProcessorDescription(id = 0, taskClass = classOf[TaskActor].getName, parallelism = 1) - val task2 = ProcessorDescription(id = 1, taskClass = classOf[TaskActor].getName, parallelism = 1) - val dag = DAG(Graph(task1 ~ hash ~> task2)) - - override def afterAll { - TestKit.shutdownActorSystem(system) - } - - "The ClockService" should { - "maintain a global view of message timestamp in the application" in { - val store = new Store() - val startClock = 100L - store.put(ClockService.START_CLOCK, startClock) - val clockService = system.actorOf(Props(new ClockService(dag, store))) - clockService ! GetLatestMinClock - expectMsg(LatestMinClock(startClock)) - - // task(0,0): clock(101); task(1,0): clock(100) - clockService ! UpdateClock(TaskId(0, 0), 101) - - // There is no upstream, so pick Long.MaxValue - expectMsg(UpstreamMinClock(Long.MaxValue)) - - // Min clock is updated - clockService ! GetLatestMinClock - expectMsg(LatestMinClock(100)) - - // task(0,0): clock(101); task(1,0): clock(101) - clockService ! UpdateClock(TaskId(1, 0), 101) - - // Upstream is Task(0, 0), 101 - expectMsg(UpstreamMinClock(101)) - - // Min clock is updated - clockService ! GetLatestMinClock - expectMsg(LatestMinClock(101)) - } - - "act on ChangeToNewDAG and make sure downstream clock smaller than upstreams" in { - val store = new Store() - val startClock = 100L - store.put(ClockService.START_CLOCK, startClock) - val clockService = system.actorOf(Props(new ClockService(dag, store))) - val task = TestProbe() - clockService.tell(UpdateClock(TaskId(0, 0), 200), task.ref) - task.expectMsgType[UpstreamMinClock] - - val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName, - parallelism = 1) - val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName, - parallelism = 1) - val task5 = ProcessorDescription(id = 5, taskClass = classOf[TaskActor].getName, - parallelism = 1) - val dagAddMiddleNode = DAG(Graph( - task1 ~ hash ~> task2, - task1 ~ hash ~> task3, - task3 ~ hash ~> task2, - task2 ~ hash ~> task4, - task5 ~ hash ~> task1 - )) - val user = TestProbe() - clockService.tell(ChangeToNewDAG(dagAddMiddleNode), user.ref) - - val clocks = user.expectMsgPF() { - case ChangeToNewDAGSuccess(clocks) => - clocks - } - - // For intermediate task, pick its upstream as initial clock - assert(clocks(task3.id) == clocks(task1.id)) - - // For sink task, pick its upstream as initial clock - assert(clocks(task4.id) == clocks(task2.id)) - - // For source task, set the initial clock as startClock - assert(clocks(task5.id) == startClock) - } - - "maintain global checkpoint time" in { - val store = new Store() - val startClock = 100L - store.put(ClockService.START_CLOCK, startClock) - val clockService = system.actorOf(Props(new ClockService(dag, store))) - clockService ! UpdateClock(TaskId(0, 0), 200L) - expectMsgType[UpstreamMinClock] - clockService ! UpdateClock(TaskId(1, 0), 200L) - expectMsgType[UpstreamMinClock] - - clockService ! GetStartClock - expectMsg(StartClock(200L)) - - val conf = UserConfig.empty.withBoolean("state.checkpoint.enable", true) - val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName, - parallelism = 1, taskConf = conf) - val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName, - parallelism = 1, taskConf = conf) - val dagWithStateTasks = DAG(Graph( - task1 ~ hash ~> task2, - task1 ~ hash ~> task3, - task3 ~ hash ~> task2, - task2 ~ hash ~> task4 - ), version = 1) - - val taskId3 = TaskId(3, 0) - val taskId4 = TaskId(4, 0) - - clockService ! ChangeToNewDAG(dagWithStateTasks) - expectMsgType[ChangeToNewDAGSuccess] - - clockService ! ReportCheckpointClock(taskId3, startClock) - clockService ! ReportCheckpointClock(taskId4, startClock) - clockService ! GetStartClock - expectMsg(StartClock(startClock)) - - clockService ! ReportCheckpointClock(taskId3, 200L) - clockService ! ReportCheckpointClock(taskId4, 300L) - clockService ! GetStartClock - expectMsg(StartClock(startClock)) - - clockService ! ReportCheckpointClock(taskId3, 300L) - clockService ! GetStartClock - expectMsg(StartClock(300L)) - } - } - - "ProcessorClock" should { - "maintain the min clock of current processor" in { - val processorId = 0 - val parallism = 3 - val clock = new ProcessorClock(processorId, LifeTime.Immortal, parallism) - clock.init(100L) - clock.updateMinClock(0, 101) - assert(clock.min == 100L) - - clock.updateMinClock(1, 102) - assert(clock.min == 100L) - - clock.updateMinClock(2, 103) - assert(clock.min == 101L) - } - } - - "HealthChecker" should { - "report stalling if the clock is not advancing" in { - val healthChecker = new HealthChecker(stallingThresholdSeconds = 1) - val source = ProcessorDescription(id = 0, taskClass = null, parallelism = 1) - val sourceClock = new ProcessorClock(0, LifeTime.Immortal, 1) - sourceClock.init(0L) - val sink = ProcessorDescription(id = 1, taskClass = null, parallelism = 1) - val sinkClock = new ProcessorClock(1, LifeTime.Immortal, 1) - sinkClock.init(0L) - val graph = Graph.empty[ProcessorDescription, PartitionerDescription] - graph.addVertex(source) - graph.addVertex(sink) - graph.addEdge(source, PartitionerDescription(null), sink) - val dag = DAG(graph) - val clocks = Map( - 0 -> sourceClock, - 1 -> sinkClock - ) - - sourceClock.updateMinClock(0, 100L) - sinkClock.updateMinClock(0, 100L) - - // Clock advances from 0 to 100, there is no stalling. - healthChecker.check(currentMinClock = 100, clocks, dag, 200) - healthChecker.getReport.stallingTasks shouldBe List.empty[TaskId] - - // Clock not advancing. - // Pasted time exceed the stalling threshold, report stalling - healthChecker.check(currentMinClock = 100, clocks, dag, 1300) - - // The source task is stalling the clock - healthChecker.getReport.stallingTasks shouldBe List(TaskId(0, 0)) - - // Advance the source clock - sourceClock.updateMinClock(0, 101L) - healthChecker.check(currentMinClock = 100, clocks, dag, 1300) - // The sink task is stalling the clock - healthChecker.getReport.stallingTasks shouldBe List(TaskId(1, 0)) - } - } -} - -object ClockServiceSpec { - - class Store extends AppDataStore { - - private var map = Map.empty[String, Any] - - def put(key: String, value: Any): Future[Any] = { - map = map + (key -> value) - Promise.successful(value).future - } - - def get(key: String): Future[Any] = { - Promise.successful(map.get(key).getOrElse(null)).future - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala deleted file mode 100644 index fb633f9..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.appmaster - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{ActorSystem, Props} -import akka.testkit.TestProbe -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - -import io.gearpump.cluster.{TestUtil, UserConfig} -import io.gearpump.partitioner.{HashPartitioner, Partitioner} -import io.gearpump.streaming.appmaster.DagManager.{DAGOperationFailed, DAGOperationSuccess, GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, ReplaceProcessor, TaskLaunchData, WatchChange} -import io.gearpump.streaming.task.{Subscriber, TaskActor} -import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, StreamApplication} -import io.gearpump.util.Graph -import io.gearpump.util.Graph._ - -class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll { - - val hash = Partitioner[HashPartitioner] - val task1 = ProcessorDescription(id = 1, taskClass = classOf[TaskActor].getName, parallelism = 1) - val task2 = ProcessorDescription(id = 2, taskClass = classOf[TaskActor].getName, parallelism = 1) - val graph = Graph(task1 ~ hash ~> task2) - val dag = DAG(graph) - implicit var system: ActorSystem = null - val appId = 0 - lazy val userConfig = UserConfig.empty.withValue(StreamApplication.DAG, graph) - - "DagManager" should { - import io.gearpump.streaming.appmaster.ClockServiceSpec.Store - "maintain the dags properly" in { - val store = new Store - - val dagManager = system.actorOf(Props(new DagManager(appId, userConfig, store, Some(dag)))) - val client = TestProbe() - client.send(dagManager, GetLatestDAG) - client.expectMsg(LatestDAG(dag)) - - client.send(dagManager, GetTaskLaunchData(dag.version, task1.id, null)) - val task1LaunchData = TaskLaunchData(task1, Subscriber.of(task1.id, dag)) - client.expectMsg(task1LaunchData) - - val task2LaunchData = TaskLaunchData(task2, Subscriber.of(task2.id, dag)) - client.send(dagManager, GetTaskLaunchData(dag.version, task2.id, null)) - client.expectMsg(task2LaunchData) - - val watcher = TestProbe() - client.send(dagManager, WatchChange(watcher.ref)) - val task3 = task2.copy(id = 3, life = LifeTime(100, Long.MaxValue)) - - client.send(dagManager, ReplaceProcessor(task2.id, task3)) - client.expectMsg(DAGOperationSuccess) - - client.send(dagManager, GetLatestDAG) - val newDag = client.expectMsgPF() { - case LatestDAG(dag) => dag - } - assert(newDag.processors.contains(task3.id)) - watcher.expectMsgType[LatestDAG] - - val task4 = task3.copy(id = 4) - client.send(dagManager, ReplaceProcessor(task3.id, task4)) - client.expectMsgType[DAGOperationFailed] - - client.send(dagManager, NewDAGDeployed(newDag.version)) - client.send(dagManager, ReplaceProcessor(task3.id, task4)) - client.expectMsg(DAGOperationSuccess) - } - - "retrieve last stored dag properly" in { - val store = new Store - val newGraph = Graph(task1 ~ hash ~> task2 ~> task2) - val newDag = DAG(newGraph) - store.put(StreamApplication.DAG, newDag) - val dagManager = system.actorOf(Props(new DagManager(appId, userConfig, store, Some(dag)))) - val client = TestProbe() - client.send(dagManager, GetLatestDAG) - client.expectMsg(LatestDAG(newDag)) - } - } - - override def afterAll { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - override def beforeAll { - this.system = ActorSystem("DagManagerSpec", TestUtil.DEFAULT_CONFIG) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala deleted file mode 100644 index a57a1ae..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.appmaster - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor._ -import akka.testkit.TestProbe -import com.typesafe.config.ConfigFactory -import org.scalatest._ - -import io.gearpump.TestProbeUtil -import io.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource -import io.gearpump.cluster._ -import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems} -import io.gearpump.cluster.appmaster.{ExecutorSystem, WorkerInfo} -import io.gearpump.cluster.scheduler.{Resource, ResourceRequest} -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.jarstore.FilePath -import io.gearpump.streaming.ExecutorId -import io.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor -import io.gearpump.streaming.appmaster.ExecutorManager.{ExecutorStarted, _} -import io.gearpump.streaming.appmaster.ExecutorManagerSpec.StartExecutorActorPlease -import io.gearpump.util.ActorSystemBooter.BindLifeCycle -import io.gearpump.util.LogUtil - -class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - implicit var system: ActorSystem = null - - private val LOG = LogUtil.getLogger(getClass) - private val appId = 0 - private val resource = Resource(10) - - override def beforeAll(): Unit = { - system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - } - - override def afterAll(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - private def startExecutorSystems = { - val master = TestProbe() - val taskManager = TestProbe() - val executor = TestProbe() - val userConfig = UserConfig.empty - - val username = "user" - val appName = "app" - val appJar = Some(AppJar("for_test", FilePath("path"))) - - val appMasterContext = AppMasterContext(appId, username, null, null, appJar, master.ref, null) - - val executorFactory = (_: ExecutorContext, _: UserConfig, _: Address, _: ExecutorId) => { - executor.ref ! StartExecutorActorPlease - TestProbeUtil.toProps(executor) - } - val executorManager = system.actorOf(Props(new ExecutorManager(userConfig, appMasterContext, - executorFactory, ConfigFactory.empty, appName))) - - taskManager.send(executorManager, SetTaskManager(taskManager.ref)) - val resourceRequest = Array(ResourceRequest(resource, WorkerId.unspecified)) - - // Starts executors - taskManager.send(executorManager, StartExecutors(resourceRequest, appJar.get)) - - // Asks master to start executor systems - import scala.concurrent.duration._ - val startExecutorSystem = master.receiveOne(5.seconds).asInstanceOf[StartExecutorSystems] - assert(startExecutorSystem.resources == resourceRequest) - import startExecutorSystem.executorSystemConfig.{classPath, executorAkkaConfig, jar, jvmArguments, username => returnedUserName} - assert(startExecutorSystem.resources == resourceRequest) - - assert(classPath.length == 0) - assert(jvmArguments.length == 0) - assert(jar == appJar) - assert(returnedUserName == username) - assert(executorAkkaConfig.isEmpty) - - (master, executor, taskManager, executorManager) - } - - it should "report timeout to taskManager" in { - import io.gearpump.streaming.appmaster.ExecutorManager._ - val (master, executor, taskManager, executorManager) = startExecutorSystems - master.reply(StartExecutorSystemTimeout) - taskManager.expectMsg(StartExecutorsTimeOut) - } - - it should "start executor actor correctly" in { - val (master, executor, taskManager, executorManager) = startExecutorSystems - val executorSystemDaemon = TestProbe() - val worker = TestProbe() - val workerId = WorkerId(0, 0L) - val workerInfo = WorkerInfo(workerId, worker.ref) - val executorSystem = ExecutorSystem(0, null, executorSystemDaemon.ref, - resource, workerInfo) - master.reply(ExecutorSystemStarted(executorSystem, None)) - import scala.concurrent.duration._ - val bindLifeWith = executorSystemDaemon.receiveOne(3.seconds).asInstanceOf[BindLifeCycle] - val proxyExecutor = bindLifeWith.actor - executor.expectMsg(StartExecutorActorPlease) - - val executorId = 0 - - // Registers executor - executor.send(executorManager, RegisterExecutor(proxyExecutor, executorId, - resource, workerInfo)) - taskManager.expectMsgType[ExecutorStarted] - - // Broadcasts message to childs - taskManager.send(executorManager, BroadCast("broadcast")) - executor.expectMsg("broadcast") - - // Unicast - taskManager.send(executorManager, UniCast(executorId, "unicast")) - executor.expectMsg("unicast") - - // Updates executor resource status - val usedResource = Resource(5) - executorManager ! ExecutorResourceUsageSummary(Map(executorId -> usedResource)) - worker.expectMsg(ChangeExecutorResource(appId, executorId, resource - usedResource)) - - // Watches for executor termination - system.stop(executor.ref) - LOG.info("Shutting down executor, and wait taskManager to get notified") - taskManager.expectMsg(ExecutorStopped(executorId)) - } -} - -object ExecutorManagerSpec { - case object StartExecutorActorPlease -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala deleted file mode 100644 index 9d4432a..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.appmaster - -import scala.concurrent.duration._ - -import org.scalatest.{Matchers, WordSpec} - -import io.gearpump.streaming.executor.ExecutorRestartPolicy -import io.gearpump.streaming.task.TaskId - -class ExecutorRestartPolicySpec extends WordSpec with Matchers { - - "ExecutorRestartPolicy" should { - "decide whether to restart the executor" in { - val executorId1 = 1 - val executorId2 = 2 - val taskId = TaskId(0, 0) - val executorSupervisor = new ExecutorRestartPolicy( - maxNrOfRetries = 3, withinTimeRange = 1.seconds) - executorSupervisor.addTaskToExecutor(executorId1, taskId) - assert(executorSupervisor.allowRestartExecutor(executorId1)) - assert(executorSupervisor.allowRestartExecutor(executorId1)) - executorSupervisor.addTaskToExecutor(executorId2, taskId) - assert(executorSupervisor.allowRestartExecutor(executorId2)) - assert(!executorSupervisor.allowRestartExecutor(executorId2)) - Thread.sleep(1000) - assert(executorSupervisor.allowRestartExecutor(executorId2)) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala deleted file mode 100644 index 6cd70d9..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.appmaster - -import scala.concurrent.Await - -import akka.actor.{ActorSystem, Props} -import akka.testkit.TestProbe -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - -import io.gearpump.cluster.ClientToMaster.QueryHistoryMetrics -import io.gearpump.cluster.MasterToClient.HistoryMetrics -import io.gearpump.cluster.TestUtil -import io.gearpump.metrics.Metrics.{Counter, Histogram, Meter} -import io.gearpump.util.HistoryMetricsService -import io.gearpump.util.HistoryMetricsService._ - -class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEach { - - val count = 2 - val intervalMs = 10 - - val config = HistoryMetricsConfig( - retainHistoryDataHours = 72, - retainHistoryDataIntervalMs = 3600 * 1000, - retainRecentDataSeconds = 300, - retainRecentDataIntervalMs = 15 * 1000) - - "SingleValueMetricsStore" should "retain metrics and expire old value" in { - - val store = new SingleValueMetricsStore(count, intervalMs) - - var now = 0L - // Only 1 data point will be kept in @intervalMs - store.add(Counter("count", 1), now) - store.add(Counter("count", 2), now) - - now = now + intervalMs + 1 - - // Only 1 data point will be kept in @intervalMs - store.add(Counter("count", 3), now) - store.add(Counter("count", 4), now) - - now = now + intervalMs + 1 - - // Only 1 data point will be kept in @intervalMs - // expire oldest data point, because we only keep @count records - store.add(Counter("count", 5), now) - store.add(Counter("count", 6), now) - - val result = store.read - assert(result.size == count) - - // The oldest value is expired - assert(result.head.value.asInstanceOf[Counter].value == 3L) - - // The newest value is inserted - assert(result.last.value.asInstanceOf[Counter].value == 5L) - } - - val meterTemplate = Meter("meter", 0, 0, 0, "s") - - "HistogramMetricsStore" should "retain corse-grain history and fine-grain recent data" in { - val store = new HistogramMetricsStore(config) - val a = Histogram(null, 100, 0, 0, 0, 0, 0) - val b = Histogram(null, 200, 0, 0, 0, 0, 0) - val c = Histogram(null, 50, 0, 0, 0, 0, 0) - - store.add(a) - store.add(b) - store.add(c) - - assert(store.readLatest.map(_.value) == List(c)) - assert(store.readRecent.map(_.value) == List(a)) - assert(store.readHistory.map(_.value) == List(a)) - } - - "MeterMetricsStore" should "retain corse-grain history and fine-grain recent data" in { - val store = new MeterMetricsStore(config) - - val a = Meter(null, 1, 100, 0, null) - val b = Meter(null, 1, 200, 0, null) - val c = Meter(null, 1, 50, 0, null) - - store.add(a) - store.add(b) - store.add(c) - - assert(store.readLatest.map(_.value) == List(c)) - assert(store.readRecent.map(_.value) == List(a)) - assert(store.readHistory.map(_.value) == List(a)) - } - - "CounterMetricsStore" should "retain corse-grain history and fine-grain recent data" in { - val store = new CounterMetricsStore(config) - val a = Counter(null, 50) - val b = Counter(null, 100) - val c = Counter(null, 150) - - store.add(a) - store.add(b) - store.add(c) - - assert(store.readLatest.map(_.value) == List(c)) - assert(store.readRecent.map(_.value) == List(a)) - assert(store.readHistory.map(_.value) == List(a)) - } - - "HistoryMetricsService" should - "retain lastest metrics data and allow user to query metrics by path" in { - implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - val appId = 0 - val service = system.actorOf(Props(new HistoryMetricsService("app0", config))) - service ! Counter("metric.counter", 0) - service ! Meter("metric.meter", 0, 0, 0, null) - service ! Histogram("metric.histogram", 0, 0, 0, 0, 0, 0) - - val client = TestProbe() - - // Filters metrics with path "metric.counter" - client.send(service, QueryHistoryMetrics("metric.counter")) - import scala.concurrent.duration._ - client.expectMsgPF(3.seconds) { - case history: HistoryMetrics => - assert(history.path == "metric.counter") - val metricList = history.metrics - metricList.foreach(metricItem => - assert(metricItem.value.isInstanceOf[Counter]) - ) - } - - // Filters metrics with path "metric.meter" - client.send(service, QueryHistoryMetrics("metric.meter")) - client.expectMsgPF(3.seconds) { - case history: HistoryMetrics => - assert(history.path == "metric.meter") - val metricList = history.metrics - metricList.foreach(metricItem => - assert(metricItem.value.isInstanceOf[Meter]) - ) - } - - // Filters metrics with path "metric.histogram" - client.send(service, QueryHistoryMetrics("metric.histogram")) - client.expectMsgPF(3.seconds) { - case history: HistoryMetrics => - assert(history.path == "metric.histogram") - val metricList = history.metrics - metricList.foreach(metricItem => - assert(metricItem.value.isInstanceOf[Histogram]) - ) - } - - // Filters metrics with path prefix "metric", all metrics which can - // match the path prefix will be retained. - client.send(service, QueryHistoryMetrics("metric")) - client.expectMsgPF(3.seconds) { - case history: HistoryMetrics => - val metricList = history.metrics - - var counterFound = false - var meterFound = false - var histogramFound = false - - metricList.foreach(metricItem => - metricItem.value match { - case v: Counter => counterFound = true - case v: Meter => meterFound = true - case v: Histogram => histogramFound = true - case _ => // Skip - } - ) - - // All kinds of metric type are reserved. - assert(counterFound && meterFound && histogramFound) - } - - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala deleted file mode 100644 index b391196..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.appmaster - -import scala.concurrent.{Await, Future} - -import akka.actor.ActorSystem -import org.scalatest.{Matchers, WordSpec} - -import io.gearpump.cluster.scheduler.{Resource, ResourceRequest} -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.cluster.{AppJar, TestUtil} -import io.gearpump.jarstore.FilePath -import io.gearpump.partitioner.{HashPartitioner, Partitioner} -import io.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2} -import io.gearpump.streaming.task.TaskId -import io.gearpump.streaming.{DAG, ProcessorDescription, _} -import io.gearpump.util.Graph -import io.gearpump.util.Graph._ - -class JarSchedulerSpec extends WordSpec with Matchers { - val mockJar1 = AppJar("jar1", FilePath("path")) - val mockJar2 = AppJar("jar2", FilePath("path")) - val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask1].getName, parallelism = 1, - jar = mockJar1) - val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask2].getName, parallelism = 1, - jar = mockJar1) - val task3 = ProcessorDescription(id = 2, taskClass = classOf[TestTask2].getName, parallelism = 2, - jar = mockJar2) - val dag = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2)) - - import scala.concurrent.duration._ - - "JarScheduler" should { - "schedule tasks depends on app jar" in { - val system = ActorSystem("JarSchedulerSpec") - implicit val dispatcher = system.dispatcher - val manager = new JarScheduler(0, "APP", TestUtil.DEFAULT_CONFIG, system) - manager.setDag(dag, Future { - 0L - }) - val requests = Array(ResourceRequest(Resource(2), WorkerId.unspecified)) - val result = Await.result(manager.getResourceRequestDetails(), 15.seconds) - assert(result.length == 1) - assert(result.head.jar == mockJar1) - assert(result.head.requests.deep == requests.deep) - - val tasks = Await.result(manager.scheduleTask(mockJar1, WorkerId(0, 0L), 0, - Resource(2)), 15.seconds) - assert(tasks.contains(TaskId(0, 0))) - assert(tasks.contains(TaskId(1, 0))) - - val newDag = replaceDAG(dag, 1, task3, 1) - - manager.setDag(newDag, Future { - 0 - }) - val requestDetails = Await.result(manager.getResourceRequestDetails(). - map(_.sortBy(_.jar.name)), 15.seconds) - assert(requestDetails.length == 2) - assert(requestDetails.last.jar == mockJar2) - assert(requestDetails.last.requests.deep == requests.deep) - - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - } - - def replaceDAG( - dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int) - : DAG = { - val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth, - newProcessor.life.birth) - val newProcessorMap = dag.processors ++ - Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife), - newProcessor.id -> newProcessor) - val newGraph = dag.graph.subGraph(oldProcessorId). - replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph) - new DAG(newVersion, newProcessorMap, newGraph) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala deleted file mode 100644 index 2e07def..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.appmaster - -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.streaming.appmaster.TaskLocator.Localities -import io.gearpump.streaming.task.TaskId - -class TaskLocatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - it should "serialize/deserialize correctly" in { - val localities = new Localities(Map(WorkerId(0, 0L) -> Array(TaskId(0, 1), TaskId(1, 2)))) - Localities.toJson(localities) - - localities.localities.mapValues(_.toList) shouldBe - Localities.fromJson(Localities.toJson(localities)).localities.mapValues(_.toList) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala deleted file mode 100644 index 8153fce..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.appmaster - -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} - -import akka.actor.{ActorRef, ActorSystem, Props} -import akka.testkit.TestProbe -import org.mockito.Mockito._ -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - -import io.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge -import io.gearpump.cluster.scheduler.{Resource, ResourceRequest} -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.cluster.{AppJar, TestUtil, UserConfig} -import io.gearpump.jarstore.FilePath -import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription} -import io.gearpump.streaming.AppMasterToExecutor.{LaunchTasks, StartAllTasks, StartDynamicDag, TaskLocationsReady, TaskLocationsReceived, TaskRegistered} -import io.gearpump.streaming.ExecutorToAppMaster.RegisterTask -import io.gearpump.streaming.appmaster.AppMaster.AllocateResourceTimeOut -import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess} -import io.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, TaskLaunchData, WatchChange} -import io.gearpump.streaming.appmaster.ExecutorManager.{ExecutorResourceUsageSummary, SetTaskManager, StartExecutors, _} -import io.gearpump.streaming.appmaster.JarScheduler.ResourceRequestDetail -import io.gearpump.streaming.appmaster.TaskManagerSpec.{Env, Task1, Task2} -import io.gearpump.streaming.executor.Executor.RestartTasks -import io.gearpump.streaming.task.{StartTime, TaskContext, _} -import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, ProcessorId} -import io.gearpump.transport.HostPort -import io.gearpump.util.Graph -import io.gearpump.util.Graph._ -import io.gearpump.{Message, TimeStamp} - -class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { - - implicit var system: ActorSystem = null - - val task1Class = classOf[Task1].getName - val task2Class = classOf[Task2].getName - - val mockJar = AppJar("jar_for_test", FilePath("path")) - val task1 = ProcessorDescription(id = 0, taskClass = task1Class, parallelism = 1, jar = mockJar) - val task2 = ProcessorDescription(id = 1, taskClass = task2Class, parallelism = 1, jar = mockJar) - - val dag: DAG = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2)) - val dagVersion = 0 - - val task1LaunchData = TaskLaunchData(task1, Subscriber.of(processorId = 0, dag)) - val task2LaunchData = TaskLaunchData(task2, Subscriber.of(processorId = 1, dag)) - - val appId = 0 - - val resource = Resource(2) - val workerId = WorkerId(0, 0L) - val executorId = 0 - - override def beforeEach(): Unit = { - system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - } - - override def afterEach(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - it should "recover by requesting new executors when executor stopped unexpectedly" in { - val env = bootUp - import env._ - implicit val dispatcher = system.dispatcher - - val resourceRequest = Array(ResourceRequest(resource, workerId)) - when(scheduler.executorFailed(executorId)).thenReturn(Future { - Some(ResourceRequestDetail(mockJar, - resourceRequest)) - }) - - taskManager ! ExecutorStopped(executorId) - - // When one executor stop, it will also trigger the recovery by restart - // existing executors - executorManager.expectMsg(BroadCast(RestartTasks(dagVersion))) - - // Asks for new executors - val returned = executorManager.receiveN(1).head.asInstanceOf[StartExecutors] - assert(returned.resources.deep == resourceRequest.deep) - executorManager.reply(StartExecutorsTimeOut) - - // TaskManager cannot handle the TimeOut error itself, escalate to appmaster. - appMaster.expectMsg(AllocateResourceTimeOut) - } - - it should "recover by restarting existing executors when message loss happen" in { - val env = bootUp - import env._ - - taskManager ! ReplayFromTimestampWindowTrailingEdge(appId) - - // Restart the executors so that we can replay from minClock - executorManager.expectMsg(BroadCast(RestartTasks(dagVersion))) - } - - import io.gearpump.streaming.appmaster.TaskManager.TaskChangeRegistry - "TaskChangeRegistry" should "track all modified task registration" in { - val tasks = List(TaskId(0, 0), TaskId(0, 1)) - val registry = new TaskChangeRegistry(tasks) - registry.taskChanged(TaskId(0, 0)) - registry.taskChanged(TaskId(0, 1)) - assert(registry.allTaskChanged) - } - - "DAGDiff" should "track all the DAG migration impact" in { - val defaultEdge = PartitionerDescription(null) - val a = ProcessorDescription(id = 1, taskClass = null, parallelism = 1) - val b = ProcessorDescription(id = 2, taskClass = null, parallelism = 1) - val c = ProcessorDescription(id = 3, taskClass = null, parallelism = 1) - val left = Graph(a ~ defaultEdge ~> b, a ~ defaultEdge ~> c) - - val d = ProcessorDescription(id = 4, taskClass = null, parallelism = 1) - val right = left.copy - right.addVertex(d) - right.addEdge(c, defaultEdge, d) - val e = a.copy(life = LifeTime(0, 0)) - right.replaceVertex(a, e) - - val diff = TaskManager.migrate(DAG(left), DAG(right, version = 1)) - diff.addedProcessors shouldBe List(d.id) - - diff.modifiedProcessors shouldBe List(a.id) - - diff.impactedUpstream shouldBe List(c.id) - } - - private def bootUp: Env = { - - implicit val dispatcher = system.dispatcher - - val executorManager = TestProbe() - val clockService = TestProbe() - val appMaster = TestProbe() - val executor = TestProbe() - - val scheduler = mock(classOf[JarScheduler]) - - val dagManager = TestProbe() - - val taskManager = system.actorOf( - Props(new TaskManager(appId, dagManager.ref, scheduler, executorManager.ref, clockService.ref, - appMaster.ref, "appName"))) - - dagManager.expectMsgType[WatchChange] - executorManager.expectMsgType[SetTaskManager] - - // Step1: first transition from Unitialized to ApplicationReady - executorManager.expectMsgType[ExecutorResourceUsageSummary] - dagManager.expectMsgType[NewDAGDeployed] - - // Step2: Get Additional Resource Request - when(scheduler.getResourceRequestDetails()) - .thenReturn(Future { - Array(ResourceRequestDetail(mockJar, Array(ResourceRequest(resource, - WorkerId.unspecified)))) - }) - - // Step3: DAG changed. Start transit from ApplicationReady -> DynamicDAG - dagManager.expectMsg(GetLatestDAG) - dagManager.reply(LatestDAG(dag)) - - // Step4: Start remote Executors. - // received Broadcast - executorManager.expectMsg(BroadCast(StartDynamicDag(dag.version))) - executorManager.expectMsgType[StartExecutors] - - when(scheduler.scheduleTask(mockJar, workerId, executorId, resource)) - .thenReturn(Future(List(TaskId(0, 0), TaskId(1, 0)))) - - // Step5: Executor is started. - executorManager.reply(ExecutorStarted(executorId, resource, workerId, Some(mockJar))) - - // Step6: Prepare to start Task. First GetTaskLaunchData. - val taskLaunchData: PartialFunction[Any, TaskLaunchData] = { - case GetTaskLaunchData(_, 0, executorStarted) => - task1LaunchData.copy(context = executorStarted) - case GetTaskLaunchData(_, 1, executorStarted) => - task2LaunchData.copy(context = executorStarted) - } - - val launchData1 = dagManager.expectMsgPF()(taskLaunchData) - dagManager.reply(launchData1) - - val launchData2 = dagManager.expectMsgPF()(taskLaunchData) - dagManager.reply(launchData2) - - // Step7: Launch Task - val launchTaskMatch: PartialFunction[Any, RegisterTask] = { - case UniCast(executorId, launch: LaunchTasks) => - RegisterTask(launch.taskId.head, executorId, HostPort("127.0.0.1:3000")) - } - - // Taskmanager should return the latest start clock to task(0,0) - clockService.expectMsg(GetStartClock) - clockService.reply(StartClock(0)) - - // Step8: Task is started. registerTask. - val registerTask1 = executorManager.expectMsgPF()(launchTaskMatch) - taskManager.tell(registerTask1, executor.ref) - executor.expectMsgType[TaskRegistered] - - val registerTask2 = executorManager.expectMsgPF()(launchTaskMatch) - taskManager.tell(registerTask2, executor.ref) - executor.expectMsgType[TaskRegistered] - - // Step9: start broadcasting TaskLocations. - import scala.concurrent.duration._ - assert(executorManager.expectMsgPF(5.seconds) { - case BroadCast(startAllTasks) => startAllTasks.isInstanceOf[TaskLocationsReady] - }) - - // Step10: Executor confirm it has received TaskLocationsReceived(version, executorId) - taskManager.tell(TaskLocationsReceived(dag.version, executorId), executor.ref) - - // Step11: Tell ClockService to update DAG. - clockService.expectMsgType[ChangeToNewDAG] - clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, TimeStamp])) - - // Step12: start all tasks - import scala.concurrent.duration._ - assert(executorManager.expectMsgPF(5.seconds) { - case BroadCast(startAllTasks) => startAllTasks.isInstanceOf[StartAllTasks] - }) - - // Step13, Tell executor Manager the updated usage status of executors. - executorManager.expectMsgType[ExecutorResourceUsageSummary] - - // Step14: transition from DynamicDAG to ApplicationReady - Env(executorManager, clockService, appMaster, executor, taskManager, scheduler) - } -} - -object TaskManagerSpec { - case class Env( - executorManager: TestProbe, - clockService: TestProbe, - appMaster: TestProbe, - executor: TestProbe, - taskManager: ActorRef, - scheduler: JarScheduler) - - class Task1(taskContext: TaskContext, userConf: UserConfig) - extends Task(taskContext, userConf) { - override def onStart(startTime: StartTime): Unit = {} - override def onNext(msg: Message): Unit = {} - } - - class Task2(taskContext: TaskContext, userConf: UserConfig) - extends Task(taskContext, userConf) { - override def onStart(startTime: StartTime): Unit = {} - override def onNext(msg: Message): Unit = {} - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala deleted file mode 100644 index e8417ea..0000000 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.appmaster - -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - -import io.gearpump.cluster.scheduler.Resource -import io.gearpump.streaming.appmaster.TaskRegistry.{Accept, Reject, TaskLocation, TaskLocations} -import io.gearpump.streaming.task.TaskId -import io.gearpump.transport.HostPort -class TaskRegistrySpec extends FlatSpec with Matchers with BeforeAndAfterEach { - - it should "maintain registered tasks" in { - val task0 = TaskId(0, 0) - val task1 = TaskId(0, 1) - val task2 = TaskId(0, 2) - - val register = new TaskRegistry(expectedTasks = List(task0, task1, task2)) - val host1 = HostPort("127.0.0.1:3000") - val host2 = HostPort("127.0.0.1:3001") - - val executorId = 0 - assert(Accept == register.registerTask(task0, TaskLocation(executorId, host1))) - assert(Accept == register.registerTask(task1, TaskLocation(executorId, host1))) - assert(Accept == register.registerTask(task2, TaskLocation(executorId, host2))) - - assert(Reject == register.registerTask(TaskId(100, 0), TaskLocation(executorId, host2))) - - assert(register.isAllTasksRegistered) - val TaskLocations(taskLocations) = register.getTaskLocations - val tasksOnHost1 = taskLocations.get(host1).get - val tasksOnHost2 = taskLocations.get(host2).get - assert(tasksOnHost1.contains(task0)) - assert(tasksOnHost1.contains(task1)) - assert(tasksOnHost2.contains(task2)) - - assert(register.getExecutorId(task0) == Some(executorId)) - assert(register.isTaskRegisteredForExecutor(executorId)) - - register.processorExecutors(0) shouldBe Map( - executorId -> List(task0, task1, task2) - ) - - register.usedResource.resources shouldBe Map( - executorId -> Resource(3) - ) - } -}
