http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala index 1869eab..7039b71 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,8 +21,7 @@ package io.gearpump.streaming.transaction.api import io.gearpump.Message /** - * MessageDecoder decodes raw bytes to Message - * It is usually written by end user and + * MessageDecoder decodes raw bytes to Message It is usually written by end user and * passed into TimeReplayableSource */ trait MessageDecoder extends java.io.Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala index 29656dd..412ddcc 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,28 +18,27 @@ package io.gearpump.streaming.transaction.api -import io.gearpump.{Message, TimeStamp} - import scala.util.Try +import io.gearpump.{Message, TimeStamp} + /** - * filter offsets and store the mapping from timestamp to offset + * Filters offsets and store the mapping from timestamp to offset */ trait MessageFilter { def filter(messageAndOffset: (Message, Long)): Option[Message] } /** - * resolve timestamp to offset by look up the underlying storage + * Resolves timestamp to offset by look up the underlying storage */ trait OffsetTimeStampResolver { def resolveOffset(time: TimeStamp): Try[Long] } /** - * manages message's offset on TimeReplayableSource and timestamp + * Manages message's offset on TimeReplayableSource and timestamp */ trait OffsetManager extends MessageFilter with OffsetTimeStampResolver { def close(): Unit -} - +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala index aa9966e..fa7161c 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,10 +18,10 @@ package io.gearpump.streaming.transaction.api -import io.gearpump.TimeStamp - import scala.util.Try +import io.gearpump.TimeStamp + object OffsetStorage { /** @@ -47,15 +47,17 @@ object OffsetStorage { */ trait OffsetStorage { /** - * try to look up the time in the OffsetStorage - * return the corresponding Offset if the time is - * in the range of stored TimeStamps or one of the - * failure info (StorageEmpty, Overflow, Underflow) + * Tries to look up the time in the OffsetStorage return the corresponding Offset if the time is + * in the range of stored TimeStamps or one of the failure info (StorageEmpty, Overflow, + * Underflow) + * * @param time the time to look for * @return the corresponding offset if the time is in the range, otherwise failure */ def lookUp(time: TimeStamp): Try[Array[Byte]] + def append(time: TimeStamp, offset: Array[Byte]): Unit + def close(): Unit } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala index b6c1e6c..50711ee 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,11 +21,10 @@ package io.gearpump.streaming.transaction.api import io.gearpump.streaming.source.DataSource /** - * AT-LEAST-ONCE API + * AT-LEAST-ONCE API. Represents a data source which allow replaying. * - * subclass should be able to replay messages on recovery from the time - * when an application crashed + * Subclass should be able to replay messages on recovery from the time + * when an application crashed. */ trait TimeReplayableSource extends DataSource - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala index 7df508c..7c34e1a 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,10 +21,9 @@ package io.gearpump.streaming.transaction.api import io.gearpump.{Message, TimeStamp} /** - * TimeStampFilter filters message comparing its TimeStamp with the predicate. + * TimeStampFilter filters out messages that are obsolete. */ trait TimeStampFilter extends java.io.Serializable { def filter(msg: Message, predicate: TimeStamp): Option[Message] } - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala b/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala index 5edcf38..c2ac32a 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,16 +19,22 @@ package io.gearpump.streaming.util import akka.actor.{ActorPath, ActorRef} + import io.gearpump.streaming.task.TaskId object ActorPathUtil { - def executorActorName(executorId: Int) = executorId.toString + def executorActorName(executorId: Int): String = executorId.toString - def taskActorName(taskId: TaskId) = s"processor_${taskId.processorId}_task_${taskId.index}" + def taskActorName(taskId: TaskId): String = { + s"processor_${taskId.processorId}_task_${taskId.index}" + } def taskActorPath(appMaster: ActorRef, executorId: Int, taskId: TaskId): ActorPath = { - appMaster.path.child(executorManagerActorName).child(executorActorName(executorId)).child(taskActorName(taskId)) + val executorManager = appMaster.path.child(executorManagerActorName) + val executor = executorManager.child(executorActorName(executorId)) + val task = executor.child(taskActorName(taskId)) + task } def executorManagerActorName: String = "executors" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala index 0207fb3..13b8e34 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,13 +18,14 @@ package io.gearpump.streaming +import org.scalacheck.Gen +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + import io.gearpump.partitioner.PartitionerDescription import io.gearpump.streaming.task.TaskId import io.gearpump.util.Graph import io.gearpump.util.Graph.Node -import org.scalacheck.Gen -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} class DAGSpec extends PropSpec with PropertyChecks with Matchers { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala index a5067f2..7938415 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,11 +17,12 @@ */ package io.gearpump.streaming -import io.gearpump.streaming.task._ -import io.gearpump.transport.netty.WrappedChannelBuffer import org.jboss.netty.buffer.{ChannelBufferOutputStream, ChannelBuffers} import org.scalatest.{Matchers, WordSpec} +import io.gearpump.streaming.task._ +import io.gearpump.transport.netty.WrappedChannelBuffer + class MessageSerializerSpec extends WordSpec with Matchers { def testSerializer[T](obj: T, taskMessageSerializer: TaskMessageSerializer[T]): T = { @@ -32,7 +33,7 @@ class MessageSerializerSpec extends WordSpec with Matchers { taskMessageSerializer.read(input) } - "SerializedMessageSerializer" should { + "SerializedMessageSerializer" should { "serialize and deserialize SerializedMessage properly" in { val serializer = new SerializedMessageSerializer val data = new Array[Byte](256) @@ -43,7 +44,7 @@ class MessageSerializerSpec extends WordSpec with Matchers { } } - "TaskIdSerializer" should { + "TaskIdSerializer" should { "serialize and deserialize TaskId properly" in { val taskIdSerializer = new TaskIdSerializer val taskId = TaskId(1, 3) @@ -51,7 +52,7 @@ class MessageSerializerSpec extends WordSpec with Matchers { } } - "AckRequestSerializer" should { + "AckRequestSerializer" should { "serialize and deserialize AckRequest properly" in { val serializer = new AckRequestSerializer val ackRequest = AckRequest(TaskId(1, 2), 1000, 1024) @@ -59,7 +60,7 @@ class MessageSerializerSpec extends WordSpec with Matchers { } } - "InitialAckRequestSerializer" should { + "InitialAckRequestSerializer" should { "serialize and deserialize AckRequest properly" in { val serializer = new InitialAckRequestSerializer val ackRequest = InitialAckRequest(TaskId(1, 2), 1024) @@ -67,7 +68,7 @@ class MessageSerializerSpec extends WordSpec with Matchers { } } - "AckSerializer" should { + "AckSerializer" should { "serialize and deserialize Ack properly" in { val serializer = new AckSerializer val ack = Ack(TaskId(1, 2), 1024, 1023, 1799) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala b/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala index 6577718..40310f7 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,14 +17,12 @@ */ package io.gearpump.streaming -import akka.actor.{ActorSystem, Actor} -import akka.testkit.{TestProbe, TestActorRef} -import io.gearpump.streaming.task.TaskContext -import io.gearpump.streaming.task.TaskId +import akka.actor.{Actor, ActorSystem} +import akka.testkit.TestActorRef +import org.mockito.{ArgumentMatcher, Matchers, Mockito} + import io.gearpump.cluster.TestUtil -import org.mockito.{Mockito, ArgumentMatcher} -import org.mockito.Mockito -import org.mockito.Matchers +import io.gearpump.streaming.task.{TaskContext, TaskId} object MockUtil { @@ -39,7 +37,7 @@ object MockUtil { context } - def argMatch[T](func: T => Boolean) : T = { + def argMatch[T](func: T => Boolean): T = { Matchers.argThat(new ArgumentMatcher[T] { override def matches(param: Any): Boolean = { val mesage = param.asInstanceOf[T] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala b/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala index 6866907..2ea8b84 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,10 +19,11 @@ package io.gearpump.streaming import akka.actor._ import akka.testkit.TestActorRef + import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster import io.gearpump.cluster.appmaster.AppMasterRuntimeInfo import io.gearpump.cluster.scheduler.Resource -import io.gearpump.cluster.{MiniCluster, AppDescription, AppMasterContext, UserConfig} +import io.gearpump.cluster.{AppDescription, AppMasterContext, MiniCluster, UserConfig} import io.gearpump.streaming.appmaster.AppMaster import io.gearpump.util.Graph @@ -33,7 +34,8 @@ object StreamingTestUtil { def startAppMaster(miniCluster: MiniCluster, appId: Int): TestActorRef[AppMaster] = { implicit val actorSystem = miniCluster.system - val masterConf = AppMasterContext(appId, testUserName, Resource(1), null, None,miniCluster.mockMaster,AppMasterRuntimeInfo(appId, appName = appId.toString)) + val masterConf = AppMasterContext(appId, testUserName, Resource(1), null, + None, miniCluster.mockMaster, AppMasterRuntimeInfo(appId, appName = appId.toString)) val app = StreamApplication("test", Graph.empty, UserConfig.empty) val appDescription = AppDescription(app.name, app.appMaster.getName, app.userConfig) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala index d1d7006..90fb8ab 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,9 +17,13 @@ */ package io.gearpump.streaming.appmaster +import scala.concurrent.duration._ + import akka.actor.{ActorRef, Props} import akka.testkit.{TestActorRef, TestProbe} -import io.gearpump.{WorkerId, Message} +import org.scalatest._ + +import io.gearpump.Message import io.gearpump.cluster.AppMasterToMaster._ import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor import io.gearpump.cluster.ClientToMaster.ShutdownApplication @@ -29,19 +33,16 @@ import io.gearpump.cluster._ import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo} import io.gearpump.cluster.master.MasterProxy import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} +import io.gearpump.cluster.worker.WorkerId import io.gearpump.jarstore.FilePath import io.gearpump.partitioner.HashPartitioner import io.gearpump.streaming.task.{StartTime, TaskContext, _} import io.gearpump.streaming.{Processor, StreamApplication} import io.gearpump.util.Graph import io.gearpump.util.Graph._ -import org.scalatest._ - -import scala.concurrent.duration._ -import scala.language.postfixOps class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness { - override def config = TestUtil.DEFAULT_CONFIG + protected override def config = TestUtil.DEFAULT_CONFIG var appMaster: ActorRef = null @@ -63,7 +64,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with var appMasterContext: AppMasterContext = null var appMasterRuntimeInfo: AppMasterRuntimeInfo = null - override def beforeEach() = { + override def beforeEach(): Unit = { startActorSystem() mockTask = TestProbe()(getActorSystem) @@ -76,50 +77,54 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with implicit val system = getActorSystem conf = UserConfig.empty.withValue(AppMasterSpec.MASTER, mockMaster.ref) val mockJar = AppJar("for_test", FilePath("path")) - appMasterContext = AppMasterContext(appId, "test", resource, null, Some(mockJar), mockMaster.ref, appMasterRuntimeInfo) + appMasterContext = AppMasterContext(appId, "test", resource, null, Some(mockJar), + mockMaster.ref, appMasterRuntimeInfo) val graph = Graph(taskDescription1 ~ partitioner ~> taskDescription2) val streamApp = StreamApplication("test", graph, conf) appDescription = Application.ApplicationToAppDescription(streamApp) import scala.concurrent.duration._ - mockMasterProxy = getActorSystem.actorOf( - Props(new MasterProxy(List(mockMaster.ref.path), 30 seconds)), AppMasterSpec.MOCK_MASTER_PROXY) + mockMasterProxy = getActorSystem.actorOf(Props(new MasterProxy(List(mockMaster.ref.path), + 30.seconds)), AppMasterSpec.MOCK_MASTER_PROXY) TestActorRef[AppMaster]( - AppMasterRuntimeEnvironment.props(List(mockMasterProxy.path), appDescription, appMasterContext))(getActorSystem) + AppMasterRuntimeEnvironment.props(List(mockMasterProxy.path), appDescription, + appMasterContext))(getActorSystem) - val registerAppMaster = mockMaster.receiveOne(15 seconds) + val registerAppMaster = mockMaster.receiveOne(15.seconds) assert(registerAppMaster.isInstanceOf[RegisterAppMaster]) appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster mockMaster.reply(AppMasterRegistered(appId)) - mockMaster.expectMsg(15 seconds, GetAppData(appId, "DAG")) + mockMaster.expectMsg(15.seconds, GetAppData(appId, "DAG")) mockMaster.reply(GetAppDataResult("DAG", null)) - mockMaster.expectMsg(15 seconds, GetAppData(appId, "startClock")) + mockMaster.expectMsg(15.seconds, GetAppData(appId, "startClock")) mockMaster.reply(GetAppDataResult("startClock", 0L)) - mockMaster.expectMsg(15 seconds, RequestResource(appId, ResourceRequest(Resource(4), workerId = WorkerId.unspecified))) + mockMaster.expectMsg(15.seconds, RequestResource(appId, ResourceRequest(Resource(4), + workerId = WorkerId.unspecified))) } - override def afterEach() = { + override def afterEach(): Unit = { shutdownActorSystem() } "AppMaster" should { "kill it self when allocate resource time out" in { - mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(2), mockWorker.ref, workerId)))) - mockMaster.expectMsg(60 seconds, ShutdownApplication(appId)) + mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(2), + mockWorker.ref, workerId)))) + mockMaster.expectMsg(60.seconds, ShutdownApplication(appId)) } "reschedule the resource when the worker reject to start executor" in { val resource = Resource(4) - mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker.ref, workerId)))) + mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, + mockWorker.ref, workerId)))) mockWorker.expectMsgClass(classOf[LaunchExecutor]) mockWorker.reply(ExecutorLaunchRejected("")) mockMaster.expectMsg(RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))) } "find a new master when lost connection with master" in { - println(config.getList("akka.loggers")) val watcher = TestProbe()(getActorSystem) watcher.watch(mockMasterProxy) @@ -130,74 +135,74 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with Thread.sleep(2000) import scala.concurrent.duration._ - mockMasterProxy = getActorSystem.actorOf(Props(new MasterProxy(List(mockMaster.ref.path), 30 seconds)), AppMasterSpec.MOCK_MASTER_PROXY) - mockMaster.expectMsgClass(15 seconds, classOf[RegisterAppMaster]) - } - - /* - - TODO: This test is failing on Travis randomly - We have not identifed the root cause. - Check: https://travis-ci.org/intel-hadoop/gearpump/builds/56826843 - Issue tracker: https://github.com/intel-hadoop/gearpump/issues/733 - - "launch executor and task properly" in { - mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(4), mockWorker.ref, workerId)))) - mockWorker.expectMsgClass(classOf[LaunchExecutor]) - - val workerSystem = ActorSystem("worker", TestUtil.DEFAULT_CONFIG) - mockWorker.reply(RegisterActorSystem(ActorUtil.getSystemAddress(workerSystem).toString)) - for (i <- 1 to 4) { - mockMaster.expectMsg(10 seconds, AppMasterSpec.TaskStarted) - } - - //clock status: task(0,0) -> 1, task(0,1)->0, task(1, 0)->0, task(1,1)->0 - appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref) - - //there is no further upstream, so the upstreamMinClock is Long.MaxValue - mockTask.expectMsg(UpstreamMinClock(Long.MaxValue)) - - // check min clock - appMaster.tell(GetLatestMinClock, mockTask.ref) - mockTask.expectMsg(LatestMinClock(0)) - - - //clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, task(1,1)->0 - appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref) - - //there is no further upstream, so the upstreamMinClock is Long.MaxValue - mockTask.expectMsg(UpstreamMinClock(Long.MaxValue)) - - // check min clock - appMaster.tell(GetLatestMinClock, mockTask.ref) - mockTask.expectMsg(LatestMinClock(0)) - - //clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->0 - appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref) - - // min clock of processor 0 (Task(0, 0) and Task(0, 1)) - mockTask.expectMsg(UpstreamMinClock(1)) - - // check min clock - appMaster.tell(GetLatestMinClock, mockTask.ref) - mockTask.expectMsg(LatestMinClock(0)) - - //clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->1 - appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref) - - // min clock of processor 0 (Task(0, 0) and Task(0, 1)) - mockTask.expectMsg(UpstreamMinClock(1)) - - // check min clock - appMaster.tell(GetLatestMinClock, mockTask.ref) - mockTask.expectMsg(LatestMinClock(1)) - - //shutdown worker and all executor on this work, expect appmaster to ask for new resources - workerSystem.shutdown() - mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(4), relaxation = Relaxation.ONEWORKER))) + mockMasterProxy = getActorSystem.actorOf(Props(new MasterProxy(List(mockMaster.ref.path), + 30.seconds)), AppMasterSpec.MOCK_MASTER_PROXY) + mockMaster.expectMsgClass(15.seconds, classOf[RegisterAppMaster]) } -**/ + // // TODO: This test is failing on Travis randomly + // // We have not identifed the root cause. + // // Check: https://travis-ci.org/intel-hadoop/gearpump/builds/56826843 + // // Issue tracker: https://github.com/intel-hadoop/gearpump/issues/733 + // + // "launch executor and task properly" in { + // mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(4), mockWorker.ref, + // workerId)))) + // mockWorker.expectMsgClass(classOf[LaunchExecutor]) + // + // val workerSystem = ActorSystem("worker", TestUtil.DEFAULT_CONFIG) + // mockWorker.reply(RegisterActorSystem(ActorUtil.getSystemAddress(workerSystem).toString)) + // for (i <- 1 to 4) { + // mockMaster.expectMsg(10 seconds, AppMasterSpec.TaskStarted) + // } + // + // // clock status: task(0,0) -> 1, task(0,1)->0, task(1, 0)->0, task(1,1)->0 + // appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref) + // + // // there is no further upstream, so the upstreamMinClock is Long.MaxValue + // mockTask.expectMsg(UpstreamMinClock(Long.MaxValue)) + // + // // check min clock + // appMaster.tell(GetLatestMinClock, mockTask.ref) + // mockTask.expectMsg(LatestMinClock(0)) + // + // + // // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, task(1,1)->0 + // appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref) + // + // // there is no further upstream, so the upstreamMinClock is Long.MaxValue + // mockTask.expectMsg(UpstreamMinClock(Long.MaxValue)) + // + // // check min clock + // appMaster.tell(GetLatestMinClock, mockTask.ref) + // mockTask.expectMsg(LatestMinClock(0)) + // + // // Clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->0 + // appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref) + // + // // Min clock of processor 0 (Task(0, 0) and Task(0, 1)) + // mockTask.expectMsg(UpstreamMinClock(1)) + // + // // check min clock + // appMaster.tell(GetLatestMinClock, mockTask.ref) + // mockTask.expectMsg(LatestMinClock(0)) + // + // // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->1 + // appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref) + // + // // min clock of processor 0 (Task(0, 0) and Task(0, 1)) + // mockTask.expectMsg(UpstreamMinClock(1)) + // + // // check min clock + // appMaster.tell(GetLatestMinClock, mockTask.ref) + // mockTask.expectMsg(LatestMinClock(1)) + // + // // shutdown worker and all executor on this work, expect appmaster to ask + // // for new resources + // workerSystem.shutdown() + // mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(4), relaxation = + // Relaxation.ONEWORKER))) + // } } def ignoreSaveAppData: PartialFunction[Any, Boolean] = { @@ -212,7 +217,7 @@ object AppMasterSpec { val MOCK_MASTER_PROXY = "mockMasterProxy" } -class TaskA(taskContext : TaskContext, userConf : UserConfig) extends Task(taskContext, userConf) { +class TaskA(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get override def onStart(startTime: StartTime): Unit = { @@ -222,7 +227,7 @@ class TaskA(taskContext : TaskContext, userConf : UserConfig) extends Task(taskC override def onNext(msg: Message): Unit = {} } -class TaskB(taskContext : TaskContext, userConf : UserConfig) extends Task(taskContext, userConf) { +class TaskB(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get override def onStart(startTime: StartTime): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala index a451053..01744a3 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,24 +17,24 @@ */ package io.gearpump.streaming.appmaster +import scala.concurrent.{Future, Promise} + import akka.actor.{ActorSystem, Props} import akka.testkit.{ImplicitSender, TestKit, TestProbe} -import io.gearpump.streaming.task.{GetStartClock, UpstreamMinClock, GetLatestMinClock} -import io.gearpump.cluster.{UserConfig, TestUtil} +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + +import io.gearpump.cluster.{TestUtil, UserConfig} import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription} import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess, HealthChecker, ProcessorClock} import io.gearpump.streaming.appmaster.ClockServiceSpec.Store import io.gearpump.streaming.storage.AppDataStore -import io.gearpump.streaming.task._ -import io.gearpump.streaming.{LifeTime, DAG, ProcessorDescription} +import io.gearpump.streaming.task.{GetLatestMinClock, GetStartClock, UpstreamMinClock, _} +import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription} import io.gearpump.util.Graph import io.gearpump.util.Graph._ -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - -import scala.concurrent.{Future, Promise} class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender - with WordSpecLike with Matchers with BeforeAndAfterAll{ + with WordSpecLike with Matchers with BeforeAndAfterAll { def this() = this(ActorSystem("ClockServiceSpec", TestUtil.DEFAULT_CONFIG)) @@ -50,46 +50,48 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli "The ClockService" should { "maintain a global view of message timestamp in the application" in { val store = new Store() - val startClock = 100L + val startClock = 100L store.put(ClockService.START_CLOCK, startClock) val clockService = system.actorOf(Props(new ClockService(dag, store))) clockService ! GetLatestMinClock expectMsg(LatestMinClock(startClock)) - //task(0,0): clock(101); task(1,0): clock(100) + // task(0,0): clock(101); task(1,0): clock(100) clockService ! UpdateClock(TaskId(0, 0), 101) - // there is no upstream, so pick Long.MaxValue + // There is no upstream, so pick Long.MaxValue expectMsg(UpstreamMinClock(Long.MaxValue)) - // min clock is updated + // Min clock is updated clockService ! GetLatestMinClock expectMsg(LatestMinClock(100)) - - //task(0,0): clock(101); task(1,0): clock(101) + // task(0,0): clock(101); task(1,0): clock(101) clockService ! UpdateClock(TaskId(1, 0), 101) - //upstream is Task(0, 0), 101 + // Upstream is Task(0, 0), 101 expectMsg(UpstreamMinClock(101)) - // min clock is updated + // Min clock is updated clockService ! GetLatestMinClock expectMsg(LatestMinClock(101)) } "act on ChangeToNewDAG and make sure downstream clock smaller than upstreams" in { val store = new Store() - val startClock = 100L + val startClock = 100L store.put(ClockService.START_CLOCK, startClock) val clockService = system.actorOf(Props(new ClockService(dag, store))) val task = TestProbe() clockService.tell(UpdateClock(TaskId(0, 0), 200), task.ref) task.expectMsgType[UpstreamMinClock] - val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName, parallelism = 1) - val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName, parallelism = 1) - val task5 = ProcessorDescription(id = 5, taskClass = classOf[TaskActor].getName, parallelism = 1) + val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName, + parallelism = 1) + val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName, + parallelism = 1) + val task5 = ProcessorDescription(id = 5, taskClass = classOf[TaskActor].getName, + parallelism = 1) val dagAddMiddleNode = DAG(Graph( task1 ~ hash ~> task2, task1 ~ hash ~> task3, @@ -100,12 +102,12 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli val user = TestProbe() clockService.tell(ChangeToNewDAG(dagAddMiddleNode), user.ref) - val clocks = user.expectMsgPF(){ + val clocks = user.expectMsgPF() { case ChangeToNewDAGSuccess(clocks) => clocks } - // for intermediate task, pick its upstream as initial clock + // For intermediate task, pick its upstream as initial clock assert(clocks(task3.id) == clocks(task1.id)) // For sink task, pick its upstream as initial clock @@ -117,7 +119,7 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli "maintain global checkpoint time" in { val store = new Store() - val startClock = 100L + val startClock = 100L store.put(ClockService.START_CLOCK, startClock) val clockService = system.actorOf(Props(new ClockService(dag, store))) clockService ! UpdateClock(TaskId(0, 0), 200L) @@ -129,8 +131,10 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli expectMsg(StartClock(200L)) val conf = UserConfig.empty.withBoolean("state.checkpoint.enable", true) - val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName, parallelism = 1, taskConf = conf) - val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName, parallelism = 1, taskConf = conf) + val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName, + parallelism = 1, taskConf = conf) + val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName, + parallelism = 1, taskConf = conf) val dagWithStateTasks = DAG(Graph( task1 ~ hash ~> task2, task1 ~ hash ~> task3, @@ -184,14 +188,14 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli val sourceClock = new ProcessorClock(0, LifeTime.Immortal, 1) sourceClock.init(0L) val sink = ProcessorDescription(id = 1, taskClass = null, parallelism = 1) - val sinkClock = new ProcessorClock(1,LifeTime.Immortal, 1) + val sinkClock = new ProcessorClock(1, LifeTime.Immortal, 1) sinkClock.init(0L) val graph = Graph.empty[ProcessorDescription, PartitionerDescription] graph.addVertex(source) graph.addVertex(sink) graph.addEdge(source, PartitionerDescription(null), sink) val dag = DAG(graph) - val clocks = Map ( + val clocks = Map( 0 -> sourceClock, 1 -> sinkClock ) @@ -199,30 +203,29 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli sourceClock.updateMinClock(0, 100L) sinkClock.updateMinClock(0, 100L) - // clock advance from 0 to 100, there is no stalling. + // Clock advances from 0 to 100, there is no stalling. healthChecker.check(currentMinClock = 100, clocks, dag, 200) healthChecker.getReport.stallingTasks shouldBe List.empty[TaskId] - // clock not advancing. - // pasted time exceed the stalling threshold, report stalling + // Clock not advancing. + // Pasted time exceed the stalling threshold, report stalling healthChecker.check(currentMinClock = 100, clocks, dag, 1300) - // the source task is stalling the clock + // The source task is stalling the clock healthChecker.getReport.stallingTasks shouldBe List(TaskId(0, 0)) - // advance the source clock + // Advance the source clock sourceClock.updateMinClock(0, 101L) healthChecker.check(currentMinClock = 100, clocks, dag, 1300) - // the sink task is stalling the clock + // The sink task is stalling the clock healthChecker.getReport.stallingTasks shouldBe List(TaskId(1, 0)) } } - } object ClockServiceSpec { - class Store extends AppDataStore{ + class Store extends AppDataStore { private var map = Map.empty[String, Any] @@ -231,7 +234,7 @@ object ClockServiceSpec { Promise.successful(value).future } - def get(key: String) : Future[Any] = { + def get(key: String): Future[Any] = { Promise.successful(map.get(key).getOrElse(null)).future } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala index 2750cc5..fb633f9 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala @@ -18,8 +18,13 @@ package io.gearpump.streaming.appmaster +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.{ActorSystem, Props} import akka.testkit.TestProbe +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + import io.gearpump.cluster.{TestUtil, UserConfig} import io.gearpump.partitioner.{HashPartitioner, Partitioner} import io.gearpump.streaming.appmaster.DagManager.{DAGOperationFailed, DAGOperationSuccess, GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, ReplaceProcessor, TaskLaunchData, WatchChange} @@ -27,7 +32,6 @@ import io.gearpump.streaming.task.{Subscriber, TaskActor} import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, StreamApplication} import io.gearpump.util.Graph import io.gearpump.util.Graph._ -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll { @@ -94,8 +98,8 @@ class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll { } override def afterAll { - system.shutdown() - system.awaitTermination() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } override def beforeAll { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala index 9121a22..a57a1ae 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,39 +18,43 @@ package io.gearpump.streaming.appmaster +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor._ import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory -import io.gearpump.streaming.appmaster.ExecutorManager.ExecutorStarted -import io.gearpump.{WorkerId, TestProbeUtil} +import org.scalatest._ + +import io.gearpump.TestProbeUtil import io.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource import io.gearpump.cluster._ -import io.gearpump.cluster.appmaster.{ExecutorSystem, WorkerInfo} import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems} +import io.gearpump.cluster.appmaster.{ExecutorSystem, WorkerInfo} import io.gearpump.cluster.scheduler.{Resource, ResourceRequest} +import io.gearpump.cluster.worker.WorkerId import io.gearpump.jarstore.FilePath import io.gearpump.streaming.ExecutorId import io.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor -import io.gearpump.streaming.appmaster.ExecutorManager._ +import io.gearpump.streaming.appmaster.ExecutorManager.{ExecutorStarted, _} import io.gearpump.streaming.appmaster.ExecutorManagerSpec.StartExecutorActorPlease import io.gearpump.util.ActorSystemBooter.BindLifeCycle import io.gearpump.util.LogUtil -import org.scalatest._ -class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll { +class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll { implicit var system: ActorSystem = null private val LOG = LogUtil.getLogger(getClass) private val appId = 0 private val resource = Resource(10) - override def beforeAll = { + override def beforeAll(): Unit = { system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) } - override def afterAll = { - system.shutdown() - system.awaitTermination() + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } private def startExecutorSystems = { @@ -69,17 +73,18 @@ class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll executor.ref ! StartExecutorActorPlease TestProbeUtil.toProps(executor) } - val executorManager = system.actorOf(Props(new ExecutorManager(userConfig, appMasterContext, executorFactory, ConfigFactory.empty, appName))) + val executorManager = system.actorOf(Props(new ExecutorManager(userConfig, appMasterContext, + executorFactory, ConfigFactory.empty, appName))) taskManager.send(executorManager, SetTaskManager(taskManager.ref)) val resourceRequest = Array(ResourceRequest(resource, WorkerId.unspecified)) - //start executors + // Starts executors taskManager.send(executorManager, StartExecutors(resourceRequest, appJar.get)) - //ask master to start executor systems + // Asks master to start executor systems import scala.concurrent.duration._ - val startExecutorSystem = master.receiveOne(5 seconds).asInstanceOf[StartExecutorSystems] + val startExecutorSystem = master.receiveOne(5.seconds).asInstanceOf[StartExecutorSystems] assert(startExecutorSystem.resources == resourceRequest) import startExecutorSystem.executorSystemConfig.{classPath, executorAkkaConfig, jar, jvmArguments, username => returnedUserName} assert(startExecutorSystem.resources == resourceRequest) @@ -94,7 +99,7 @@ class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll } it should "report timeout to taskManager" in { - import ExecutorManager._ + import io.gearpump.streaming.appmaster.ExecutorManager._ val (master, executor, taskManager, executorManager) = startExecutorSystems master.reply(StartExecutorSystemTimeout) taskManager.expectMsg(StartExecutorsTimeOut) @@ -110,30 +115,31 @@ class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll resource, workerInfo) master.reply(ExecutorSystemStarted(executorSystem, None)) import scala.concurrent.duration._ - val bindLifeWith = executorSystemDaemon.receiveOne(3 seconds).asInstanceOf[BindLifeCycle] + val bindLifeWith = executorSystemDaemon.receiveOne(3.seconds).asInstanceOf[BindLifeCycle] val proxyExecutor = bindLifeWith.actor executor.expectMsg(StartExecutorActorPlease) val executorId = 0 - //register executor - executor.send(executorManager, RegisterExecutor(proxyExecutor, executorId, resource, workerInfo)) + // Registers executor + executor.send(executorManager, RegisterExecutor(proxyExecutor, executorId, + resource, workerInfo)) taskManager.expectMsgType[ExecutorStarted] - //broad message to childs + // Broadcasts message to childs taskManager.send(executorManager, BroadCast("broadcast")) executor.expectMsg("broadcast") - //unicast + // Unicast taskManager.send(executorManager, UniCast(executorId, "unicast")) executor.expectMsg("unicast") - //update executor resource status + // Updates executor resource status val usedResource = Resource(5) executorManager ! ExecutorResourceUsageSummary(Map(executorId -> usedResource)) worker.expectMsg(ChangeExecutorResource(appId, executorId, resource - usedResource)) - //watch for executor termination + // Watches for executor termination system.stop(executor.ref) LOG.info("Shutting down executor, and wait taskManager to get notified") taskManager.expectMsg(ExecutorStopped(executorId)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala index 35659d6..9d4432a 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala @@ -15,12 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.gearpump.streaming.appmaster +import scala.concurrent.duration._ + +import org.scalatest.{Matchers, WordSpec} + import io.gearpump.streaming.executor.ExecutorRestartPolicy import io.gearpump.streaming.task.TaskId -import org.scalatest.{Matchers, WordSpec} -import scala.concurrent.duration._ class ExecutorRestartPolicySpec extends WordSpec with Matchers { @@ -29,7 +32,8 @@ class ExecutorRestartPolicySpec extends WordSpec with Matchers { val executorId1 = 1 val executorId2 = 2 val taskId = TaskId(0, 0) - val executorSupervisor = new ExecutorRestartPolicy(maxNrOfRetries = 3, withinTimeRange = 1 seconds) + val executorSupervisor = new ExecutorRestartPolicy( + maxNrOfRetries = 3, withinTimeRange = 1.seconds) executorSupervisor.addTaskToExecutor(executorId1, taskId) assert(executorSupervisor.allowRestartExecutor(executorId1)) assert(executorSupervisor.allowRestartExecutor(executorId1)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala index d023be8..6cd70d9 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,17 +18,20 @@ package io.gearpump.streaming.appmaster -import akka.actor.{Props, ActorSystem} +import scala.concurrent.Await + +import akka.actor.{ActorSystem, Props} import akka.testkit.TestProbe +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + import io.gearpump.cluster.ClientToMaster.QueryHistoryMetrics -import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem} +import io.gearpump.cluster.MasterToClient.HistoryMetrics import io.gearpump.cluster.TestUtil -import io.gearpump.metrics.Metrics.{Histogram, Meter, Counter} +import io.gearpump.metrics.Metrics.{Counter, Histogram, Meter} import io.gearpump.util.HistoryMetricsService -import HistoryMetricsService._ -import org.scalatest.{BeforeAndAfterEach, Matchers, FlatSpec} +import io.gearpump.util.HistoryMetricsService._ -class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEach { +class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEach { val count = 2 val intervalMs = 10 @@ -44,30 +47,30 @@ class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAf val store = new SingleValueMetricsStore(count, intervalMs) var now = 0L - //only 1 data point will be kept in @intervalMs + // Only 1 data point will be kept in @intervalMs store.add(Counter("count", 1), now) store.add(Counter("count", 2), now) now = now + intervalMs + 1 - //only 1 data point will be kept in @intervalMs + // Only 1 data point will be kept in @intervalMs store.add(Counter("count", 3), now) store.add(Counter("count", 4), now) now = now + intervalMs + 1 - //only 1 data point will be kept in @intervalMs - //expire oldest data point, because we only keep @count records + // Only 1 data point will be kept in @intervalMs + // expire oldest data point, because we only keep @count records store.add(Counter("count", 5), now) store.add(Counter("count", 6), now) val result = store.read assert(result.size == count) - //the oldest value is expired + // The oldest value is expired assert(result.head.value.asInstanceOf[Counter].value == 3L) - //the newest value is inserted + // The newest value is inserted assert(result.last.value.asInstanceOf[Counter].value == 5L) } @@ -119,7 +122,8 @@ class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAf assert(store.readHistory.map(_.value) == List(a)) } - "HistoryMetricsService" should "retain lastest metrics data and allow user to query metrics by path" in { + "HistoryMetricsService" should + "retain lastest metrics data and allow user to query metrics by path" in { implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) val appId = 0 val service = system.actorOf(Props(new HistoryMetricsService("app0", config))) @@ -129,10 +133,10 @@ class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAf val client = TestProbe() - // filter metrics with path "metric.counter" + // Filters metrics with path "metric.counter" client.send(service, QueryHistoryMetrics("metric.counter")) import scala.concurrent.duration._ - client.expectMsgPF(3 seconds) { + client.expectMsgPF(3.seconds) { case history: HistoryMetrics => assert(history.path == "metric.counter") val metricList = history.metrics @@ -141,9 +145,9 @@ class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAf ) } - // filter metrics with path "metric.meter" + // Filters metrics with path "metric.meter" client.send(service, QueryHistoryMetrics("metric.meter")) - client.expectMsgPF(3 seconds) { + client.expectMsgPF(3.seconds) { case history: HistoryMetrics => assert(history.path == "metric.meter") val metricList = history.metrics @@ -152,9 +156,9 @@ class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAf ) } - // filter metrics with path "metric.histogram" + // Filters metrics with path "metric.histogram" client.send(service, QueryHistoryMetrics("metric.histogram")) - client.expectMsgPF(3 seconds) { + client.expectMsgPF(3.seconds) { case history: HistoryMetrics => assert(history.path == "metric.histogram") val metricList = history.metrics @@ -163,10 +167,10 @@ class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAf ) } - // filter metrics with path prefix "metric", all metrics which can + // Filters metrics with path prefix "metric", all metrics which can // match the path prefix will be retained. client.send(service, QueryHistoryMetrics("metric")) - client.expectMsgPF(3 seconds) { + client.expectMsgPF(3.seconds) { case history: HistoryMetrics => val metricList = history.metrics @@ -179,7 +183,7 @@ class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAf case v: Counter => counterFound = true case v: Meter => meterFound = true case v: Histogram => histogramFound = true - case _ => //skip + case _ => // Skip } ) @@ -187,8 +191,7 @@ class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAf assert(counterFound && meterFound && histogramFound) } - system.shutdown() - system.awaitTermination() - + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala index 12128c4..b391196 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,29 +17,31 @@ */ package io.gearpump.streaming.appmaster +import scala.concurrent.{Await, Future} + import akka.actor.ActorSystem -import com.typesafe.config.ConfigFactory -import io.gearpump.WorkerId -import io.gearpump.streaming.{ProcessorDescription, DAG} -import io.gearpump.cluster.{TestUtil, AppJar} +import org.scalatest.{Matchers, WordSpec} + import io.gearpump.cluster.scheduler.{Resource, ResourceRequest} +import io.gearpump.cluster.worker.WorkerId +import io.gearpump.cluster.{AppJar, TestUtil} import io.gearpump.jarstore.FilePath import io.gearpump.partitioner.{HashPartitioner, Partitioner} -import io.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask2, TestTask1} +import io.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2} import io.gearpump.streaming.task.TaskId -import io.gearpump.streaming._ +import io.gearpump.streaming.{DAG, ProcessorDescription, _} import io.gearpump.util.Graph import io.gearpump.util.Graph._ -import org.scalatest.{Matchers, WordSpec} - -import scala.concurrent.{Await, Future} class JarSchedulerSpec extends WordSpec with Matchers { val mockJar1 = AppJar("jar1", FilePath("path")) val mockJar2 = AppJar("jar2", FilePath("path")) - val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask1].getName, parallelism = 1, jar = mockJar1) - val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask2].getName, parallelism = 1, jar = mockJar1) - val task3 = ProcessorDescription(id = 2, taskClass = classOf[TestTask2].getName, parallelism = 2, jar = mockJar2) + val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask1].getName, parallelism = 1, + jar = mockJar1) + val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask2].getName, parallelism = 1, + jar = mockJar1) + val task3 = ProcessorDescription(id = 2, taskClass = classOf[TestTask2].getName, parallelism = 2, + jar = mockJar2) val dag = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2)) import scala.concurrent.duration._ @@ -49,37 +51,46 @@ class JarSchedulerSpec extends WordSpec with Matchers { val system = ActorSystem("JarSchedulerSpec") implicit val dispatcher = system.dispatcher val manager = new JarScheduler(0, "APP", TestUtil.DEFAULT_CONFIG, system) - manager.setDag(dag, Future{0L}) + manager.setDag(dag, Future { + 0L + }) val requests = Array(ResourceRequest(Resource(2), WorkerId.unspecified)) - val result = Await.result(manager.getRequestDetails(), 15 seconds) + val result = Await.result(manager.getResourceRequestDetails(), 15.seconds) assert(result.length == 1) assert(result.head.jar == mockJar1) assert(result.head.requests.deep == requests.deep) - val tasks = Await.result(manager.scheduleTask(mockJar1, WorkerId(0, 0L), 0, Resource(2)), 15 seconds) + val tasks = Await.result(manager.scheduleTask(mockJar1, WorkerId(0, 0L), 0, + Resource(2)), 15.seconds) assert(tasks.contains(TaskId(0, 0))) assert(tasks.contains(TaskId(1, 0))) val newDag = replaceDAG(dag, 1, task3, 1) - manager.setDag(newDag, Future{0}) - val requestDetails = Await.result(manager.getRequestDetails().map(_.sortBy(_.jar.name)), 15 seconds) + manager.setDag(newDag, Future { + 0 + }) + val requestDetails = Await.result(manager.getResourceRequestDetails(). + map(_.sortBy(_.jar.name)), 15.seconds) assert(requestDetails.length == 2) assert(requestDetails.last.jar == mockJar2) assert(requestDetails.last.requests.deep == requests.deep) - system.shutdown() - system.awaitTermination() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } } - def replaceDAG(dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int): DAG = { - val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth, newProcessor.life.birth) + def replaceDAG( + dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int) + : DAG = { + val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth, + newProcessor.life.birth) val newProcessorMap = dag.processors ++ - Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife), - newProcessor.id -> newProcessor) + Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife), + newProcessor.id -> newProcessor) val newGraph = dag.graph.subGraph(oldProcessorId). - replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph) + replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph) new DAG(newVersion, newProcessorMap, newGraph) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala index c55be84..2e07def 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,14 +18,15 @@ package io.gearpump.streaming.appmaster -import io.gearpump.WorkerId +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import io.gearpump.cluster.worker.WorkerId import io.gearpump.streaming.appmaster.TaskLocator.Localities import io.gearpump.streaming.task.TaskId -import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec} class TaskLocatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { it should "serialize/deserialize correctly" in { - val localities = new Localities(Map(WorkerId(0, 0L) -> Array(TaskId(0, 1), TaskId(1,2)))) + val localities = new Localities(Map(WorkerId(0, 0L) -> Array(TaskId(0, 1), TaskId(1, 2)))) Localities.toJson(localities) localities.localities.mapValues(_.toList) shouldBe http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala index 8105df3..8153fce 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,10 +18,17 @@ package io.gearpump.streaming.appmaster +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} + import akka.actor.{ActorRef, ActorSystem, Props} import akka.testkit.TestProbe +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + import io.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge import io.gearpump.cluster.scheduler.{Resource, ResourceRequest} +import io.gearpump.cluster.worker.WorkerId import io.gearpump.cluster.{AppJar, TestUtil, UserConfig} import io.gearpump.jarstore.FilePath import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription} @@ -39,11 +46,7 @@ import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, ProcessorId} import io.gearpump.transport.HostPort import io.gearpump.util.Graph import io.gearpump.util.Graph._ -import io.gearpump.{WorkerId, Message, TimeStamp} -import org.mockito.Mockito._ -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - -import scala.concurrent.Future +import io.gearpump.{Message, TimeStamp} class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { @@ -73,8 +76,8 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { } override def afterEach(): Unit = { - system.shutdown() - system.awaitTermination() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } it should "recover by requesting new executors when executor stopped unexpectedly" in { @@ -83,15 +86,18 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { implicit val dispatcher = system.dispatcher val resourceRequest = Array(ResourceRequest(resource, workerId)) - when(scheduler.executorFailed(executorId)).thenReturn(Future{Some(ResourceRequestDetail(mockJar, resourceRequest))}) + when(scheduler.executorFailed(executorId)).thenReturn(Future { + Some(ResourceRequestDetail(mockJar, + resourceRequest)) + }) taskManager ! ExecutorStopped(executorId) - // when one executor stop, it will also trigger the recovery by restart + // When one executor stop, it will also trigger the recovery by restart // existing executors executorManager.expectMsg(BroadCast(RestartTasks(dagVersion))) - // ask for new executors + // Asks for new executors val returned = executorManager.receiveN(1).head.asInstanceOf[StartExecutors] assert(returned.resources.deep == resourceRequest.deep) executorManager.reply(StartExecutorsTimeOut) @@ -110,7 +116,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { executorManager.expectMsg(BroadCast(RestartTasks(dagVersion))) } - import TaskManager.TaskChangeRegistry + import io.gearpump.streaming.appmaster.TaskManager.TaskChangeRegistry "TaskChangeRegistry" should "track all modified task registration" in { val tasks = List(TaskId(0, 0), TaskId(0, 1)) val registry = new TaskChangeRegistry(tasks) @@ -155,24 +161,28 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { val dagManager = TestProbe() val taskManager = system.actorOf( - Props(new TaskManager(appId, dagManager.ref, scheduler, executorManager.ref, clockService.ref, appMaster.ref, "appName"))) + Props(new TaskManager(appId, dagManager.ref, scheduler, executorManager.ref, clockService.ref, + appMaster.ref, "appName"))) dagManager.expectMsgType[WatchChange] executorManager.expectMsgType[SetTaskManager] - // step1: first transition from Unitialized to ApplicationReady + // Step1: first transition from Unitialized to ApplicationReady executorManager.expectMsgType[ExecutorResourceUsageSummary] dagManager.expectMsgType[NewDAGDeployed] - // step2: Get Additional Resource Request - when(scheduler.getRequestDetails()) - .thenReturn(Future{Array(ResourceRequestDetail(mockJar, Array(ResourceRequest(resource, WorkerId.unspecified))))}) + // Step2: Get Additional Resource Request + when(scheduler.getResourceRequestDetails()) + .thenReturn(Future { + Array(ResourceRequestDetail(mockJar, Array(ResourceRequest(resource, + WorkerId.unspecified)))) + }) - // step3: DAG changed. Start transit from ApplicationReady -> DynamicDAG + // Step3: DAG changed. Start transit from ApplicationReady -> DynamicDAG dagManager.expectMsg(GetLatestDAG) dagManager.reply(LatestDAG(dag)) - // step4: Start remote Executors. + // Step4: Start remote Executors. // received Broadcast executorManager.expectMsg(BroadCast(StartDynamicDag(dag.version))) executorManager.expectMsgType[StartExecutors] @@ -180,10 +190,10 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { when(scheduler.scheduleTask(mockJar, workerId, executorId, resource)) .thenReturn(Future(List(TaskId(0, 0), TaskId(1, 0)))) - // step5: Executor is started. + // Step5: Executor is started. executorManager.reply(ExecutorStarted(executorId, resource, workerId, Some(mockJar))) - // step6: Prepare to start Task. First GetTaskLaunchData. + // Step6: Prepare to start Task. First GetTaskLaunchData. val taskLaunchData: PartialFunction[Any, TaskLaunchData] = { case GetTaskLaunchData(_, 0, executorStarted) => task1LaunchData.copy(context = executorStarted) @@ -197,18 +207,17 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { val launchData2 = dagManager.expectMsgPF()(taskLaunchData) dagManager.reply(launchData2) - // step7: Launch Task + // Step7: Launch Task val launchTaskMatch: PartialFunction[Any, RegisterTask] = { case UniCast(executorId, launch: LaunchTasks) => - Console.println("Launch Task " + launch.processorDescription.id) RegisterTask(launch.taskId.head, executorId, HostPort("127.0.0.1:3000")) } - // taskmanager should return the latest start clock to task(0,0) + // Taskmanager should return the latest start clock to task(0,0) clockService.expectMsg(GetStartClock) clockService.reply(StartClock(0)) - // step8: Task is started. registerTask. + // Step8: Task is started. registerTask. val registerTask1 = executorManager.expectMsgPF()(launchTaskMatch) taskManager.tell(registerTask1, executor.ref) executor.expectMsgType[TaskRegistered] @@ -217,53 +226,51 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { taskManager.tell(registerTask2, executor.ref) executor.expectMsgType[TaskRegistered] - // step9: start broadcasting TaskLocations. + // Step9: start broadcasting TaskLocations. import scala.concurrent.duration._ - assert(executorManager.expectMsgPF(5 seconds) { + assert(executorManager.expectMsgPF(5.seconds) { case BroadCast(startAllTasks) => startAllTasks.isInstanceOf[TaskLocationsReady] }) - //step10: Executor confirm it has received TaskLocationsReceived(version, executorId) + // Step10: Executor confirm it has received TaskLocationsReceived(version, executorId) taskManager.tell(TaskLocationsReceived(dag.version, executorId), executor.ref) - - // step11: Tell ClockService to update DAG. + // Step11: Tell ClockService to update DAG. clockService.expectMsgType[ChangeToNewDAG] clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, TimeStamp])) - - //step12: start all tasks + // Step12: start all tasks import scala.concurrent.duration._ - assert(executorManager.expectMsgPF(5 seconds) { + assert(executorManager.expectMsgPF(5.seconds) { case BroadCast(startAllTasks) => startAllTasks.isInstanceOf[StartAllTasks] }) - // step13, Tell executor Manager the updated usage status of executors. + // Step13, Tell executor Manager the updated usage status of executors. executorManager.expectMsgType[ExecutorResourceUsageSummary] - // step14: transition from DynamicDAG to ApplicationReady + // Step14: transition from DynamicDAG to ApplicationReady Env(executorManager, clockService, appMaster, executor, taskManager, scheduler) } } object TaskManagerSpec { case class Env( - executorManager: TestProbe, - clockService: TestProbe, - appMaster: TestProbe, - executor: TestProbe, - taskManager: ActorRef, - scheduler: JarScheduler) - - class Task1(taskContext : TaskContext, userConf : UserConfig) + executorManager: TestProbe, + clockService: TestProbe, + appMaster: TestProbe, + executor: TestProbe, + taskManager: ActorRef, + scheduler: JarScheduler) + + class Task1(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - override def onStart(startTime: StartTime): Unit = ??? - override def onNext(msg: Message): Unit = ??? + override def onStart(startTime: StartTime): Unit = {} + override def onNext(msg: Message): Unit = {} } - class Task2 (taskContext : TaskContext, userConf : UserConfig) + class Task2(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - override def onStart(startTime: StartTime): Unit = ??? - override def onNext(msg: Message): Unit = ??? + override def onStart(startTime: StartTime): Unit = {} + override def onNext(msg: Message): Unit = {} } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala index ecac824..e8417ea 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala @@ -18,12 +18,12 @@ package io.gearpump.streaming.appmaster -import io.gearpump.streaming.appmaster.TaskRegistry.{Reject, Accept} +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + import io.gearpump.cluster.scheduler.Resource import io.gearpump.streaming.appmaster.TaskRegistry.{Accept, Reject, TaskLocation, TaskLocations} import io.gearpump.streaming.task.TaskId import io.gearpump.transport.HostPort -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} class TaskRegistrySpec extends FlatSpec with Matchers with BeforeAndAfterEach { it should "maintain registered tasks" in { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala index d2373ea..2c64133 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,23 +17,22 @@ */ package io.gearpump.streaming.appmaster +import scala.collection.mutable.ArrayBuffer + import com.typesafe.config.ConfigFactory -import io.gearpump.streaming.Constants -import io.gearpump.streaming.appmaster.TaskLocator.Localities -import io.gearpump.streaming.task.{StartTime, TaskContext, TaskId} -import io.gearpump.{WorkerId, Message} +import org.scalatest.{Matchers, WordSpec} + +import io.gearpump.Message import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest} -import io.gearpump.cluster.{TestUtil, ClusterConfig, UserConfig} +import io.gearpump.cluster.worker.WorkerId +import io.gearpump.cluster.{TestUtil, UserConfig} import io.gearpump.partitioner.{HashPartitioner, Partitioner} import io.gearpump.streaming.appmaster.TaskLocator.Localities import io.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2} import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} -import io.gearpump.streaming.{DAG, ProcessorDescription} +import io.gearpump.streaming.{Constants, DAG, ProcessorDescription} import io.gearpump.util.Graph import io.gearpump.util.Graph._ -import org.scalatest.{Matchers, WordSpec} - -import scala.collection.mutable.ArrayBuffer class TaskSchedulerSpec extends WordSpec with Matchers { val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask1].getName, parallelism = 4) @@ -47,19 +46,19 @@ class TaskSchedulerSpec extends WordSpec with Matchers { "schedule tasks on different workers properly according user's configuration" in { val localities = Localities( - Map(WorkerId(1, 0L) -> Array(TaskId(0,0), TaskId(0,1), TaskId(1,0), TaskId(1,1)), - WorkerId(2, 0L) -> Array(TaskId(0,2), TaskId(0,3)) - )) + Map(WorkerId(1, 0L) -> Array(TaskId(0, 0), TaskId(0, 1), TaskId(1, 0), TaskId(1, 1)), + WorkerId(2, 0L) -> Array(TaskId(0, 2), TaskId(0, 3)) + )) val localityConfig = ConfigFactory.parseString(Localities.toJson(localities)) - import Constants.GEARPUMP_STREAMING_LOCALITIES + import io.gearpump.streaming.Constants.GEARPUMP_STREAMING_LOCALITIES val appName = "app" val taskScheduler = new TaskSchedulerImpl(appId = 0, appName, config.withValue(s"$GEARPUMP_STREAMING_LOCALITIES.$appName", localityConfig.root)) val expectedRequests = - Array( ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER), + Array(ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER), ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = Relaxation.SPECIFICWORKER)) taskScheduler.setDAG(dag) @@ -71,28 +70,32 @@ class TaskSchedulerSpec extends WordSpec with Matchers { val tasksOnWorker1 = ArrayBuffer[Int]() val tasksOnWorker2 = ArrayBuffer[Int]() for (i <- 0 until 4) { - tasksOnWorker1.append(taskScheduler.schedule(WorkerId(1, 0L), executorId = 0, Resource(1)).head.processorId) + tasksOnWorker1.append(taskScheduler.schedule(WorkerId(1, 0L), + executorId = 0, Resource(1)).head.processorId) } for (i <- 0 until 2) { - tasksOnWorker2.append(taskScheduler.schedule(WorkerId(2, 0L), executorId = 1, Resource(1)).head.processorId) + tasksOnWorker2.append(taskScheduler.schedule(WorkerId(2, 0L), executorId = 1, + Resource(1)).head.processorId) } - //allocate more resource, and no tasks to launch - assert(taskScheduler.schedule(WorkerId(3, 0L), executorId = 3, Resource(1)) == List.empty[TaskId]) + // Allocates more resource, and no tasks to launch + assert(taskScheduler.schedule(WorkerId(3, 0L), executorId = 3, + Resource(1)) == List.empty[TaskId]) - //on worker1, executor 0 + // On worker1, executor 0 assert(tasksOnWorker1.sorted.sameElements(Array(0, 0, 1, 1))) - //on worker2, executor 1, Task(0, 0), Task(0, 1) + // On worker2, executor 1, Task(0, 0), Task(0, 1) assert(tasksOnWorker2.sorted.sameElements(Array(0, 0))) val rescheduledResources = taskScheduler.executorFailed(executorId = 1) - assert(rescheduledResources.sameElements(Array(ResourceRequest(Resource(2), WorkerId.unspecified, relaxation = Relaxation.ONEWORKER)))) + assert(rescheduledResources.sameElements(Array(ResourceRequest(Resource(2), + WorkerId.unspecified, relaxation = Relaxation.ONEWORKER)))) val launchedTask = taskScheduler.schedule(WorkerId(3, 0L), executorId = 3, Resource(2)) - //start the failed 2 tasks Task(0, 0) and Task(0, 1) + // Starts the failed 2 tasks Task(0, 0) and Task(0, 1) assert(launchedTask.length == 2) } @@ -101,7 +104,7 @@ class TaskSchedulerSpec extends WordSpec with Matchers { val taskScheduler = new TaskSchedulerImpl(appId = 0, appName, config) val expectedRequests = - Array( ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER), + Array(ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER), ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = Relaxation.SPECIFICWORKER)) taskScheduler.setDAG(dag) @@ -112,16 +115,16 @@ class TaskSchedulerSpec extends WordSpec with Matchers { } } -object TaskSchedulerSpec{ - class TestTask1(taskContext : TaskContext, userConf : UserConfig) - extends Task(taskContext, userConf) { - override def onStart(startTime: StartTime): Unit = ??? - override def onNext(msg: Message): Unit = ??? +object TaskSchedulerSpec { + class TestTask1(taskContext: TaskContext, userConf: UserConfig) + extends Task(taskContext, userConf) { + override def onStart(startTime: StartTime): Unit = Unit + override def onNext(msg: Message): Unit = Unit } - class TestTask2(taskContext : TaskContext, userConf : UserConfig) - extends Task(taskContext, userConf) { - override def onStart(startTime: StartTime): Unit = ??? - override def onNext(msg: Message): Unit = ??? + class TestTask2(taskContext: TaskContext, userConf: UserConfig) + extends Task(taskContext, userConf) { + override def onStart(startTime: StartTime): Unit = Unit + override def onNext(msg: Message): Unit = Unit } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala index 89d2d64..132d46c 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,28 +18,30 @@ package io.gearpump.streaming.dsl +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.ActorSystem -import io.gearpump.streaming.dsl.plan.OpTranslator.SourceTask -import io.gearpump.cluster.TestUtil -import io.gearpump.cluster.client.ClientContext -import io.gearpump.streaming.dsl.plan.OpTranslator._ import org.mockito.Mockito.when import org.scalatest._ import org.scalatest.mock.MockitoSugar -class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { + +import io.gearpump.cluster.TestUtil +import io.gearpump.cluster.client.ClientContext +import io.gearpump.streaming.dsl.plan.OpTranslator.SourceTask +class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { implicit var system: ActorSystem = null - override def beforeAll: Unit = { - system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) } - override def afterAll: Unit = { - system.shutdown() - system.awaitTermination() + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } - it should "be able to generate multiple new streams" in { val context: ClientContext = mock[ClientContext] when(context.system).thenReturn(system) @@ -57,7 +59,7 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with val app = StreamApp("dsl", context) val parallism = 3 - app.source(List("A","B","C"), parallism, "").flatMap(Array(_)).reduce(_+_) + app.source(List("A", "B", "C"), parallism, "").flatMap(Array(_)).reduce(_ + _) val task = app.plan.dag.vertices.iterator.next() assert(task.taskClass == classOf[SourceTask[_, _]].getName) assert(task.parallelism == parallism) @@ -72,7 +74,7 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with "1", "2" ) - val producer = app.source(list, 1, "producer").flatMap(Array(_)).reduce(_+_) + val producer = app.source(list, 1, "producer").flatMap(Array(_)).reduce(_ + _) val task = app.plan.dag.vertices.iterator.next() /* val task = app.plan.dag.vertices.iterator.map(desc => {
