http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala b/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala new file mode 100644 index 0000000..3a0faad --- /dev/null +++ b/daemon/src/main/scala/org/apache/gearpump/util/FileServer.scala @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.util + +import java.io.File +import scala.concurrent.{ExecutionContext, Future} + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.Http.ServerBinding +import akka.http.scaladsl.marshalling.Marshal +import akka.http.scaladsl.model.Uri.{Path, Query} +import akka.http.scaladsl.model.{HttpEntity, HttpRequest, MediaTypes, Multipart, _} +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server._ +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{FileIO, Sink, Source} +import spray.json.DefaultJsonProtocol._ +import spray.json.JsonFormat + +import org.apache.gearpump.jarstore.FilePath +import org.apache.gearpump.util.FileDirective._ +import org.apache.gearpump.util.FileServer.Port + +/** + * A simple file server implemented with akka-http to store/fetch large + * binary files. + */ +class FileServer(system: ActorSystem, host: String, port: Int = 0, rootDirectory: File) { + import system.dispatcher + implicit val actorSystem = system + implicit val materializer = ActorMaterializer() + implicit def ec: ExecutionContext = system.dispatcher + + val route: Route = { + path("upload") { + uploadFileTo(rootDirectory) { form => + val fileName = form.fields.headOption.flatMap { pair => + val (_, fileInfo) = pair + fileInfo match { + case Left(file) => Option(file.file).map(_.getName) + case Right(_) => None + } + } + + if (fileName.isDefined) { + complete(fileName.get) + } else { + failWith(new Exception("File not found in the uploaded form")) + } + } + } ~ + path("download") { + parameters("file") { file: String => + downloadFile(new File(rootDirectory, file)) + } + } ~ + pathEndOrSingleSlash { + extractUri { uri => + val upload = uri.withPath(Uri.Path("/upload")).toString() + val entity = HttpEntity(ContentTypes.`text/html(UTF-8)`, + s""" + | + |<h2>Please specify a file to upload:</h2> + |<form action="$upload" enctype="multipart/form-data" method="post"> + |<input type="file" name="datafile" size="40"> + |</p> + |<div> + |<input type="submit" value="Submit"> + |</div> + |</form> + """.stripMargin) + complete(entity) + } + } + } + + private var connection: Future[ServerBinding] = null + + def start: Future[Port] = { + connection = Http().bindAndHandle(Route.handlerFlow(route), host, port) + connection.map(address => Port(address.localAddress.getPort)) + } + + def stop: Future[Unit] = { + connection.flatMap(_.unbind()) + } +} + +object FileServer { + + implicit def filePathFormat: JsonFormat[FilePath] = jsonFormat1(FilePath.apply) + + case class Port(port: Int) + + /** + * Client of [[org.apache.gearpump.util.FileServer]] + */ + class Client(system: ActorSystem, host: String, port: Int) { + + def this(system: ActorSystem, url: String) = { + this(system, Uri(url).authority.host.address(), Uri(url).authority.port) + } + + private implicit val actorSystem = system + private implicit val materializer = ActorMaterializer() + private implicit val ec = system.dispatcher + + val server = Uri(s"http://$host:$port") + val httpClient = Http(system).outgoingConnection(server.authority.host.address(), + server.authority.port) + + def upload(file: File): Future[FilePath] = { + val target = server.withPath(Path("/upload")) + + val request = entity(file).map { entity => + HttpRequest(HttpMethods.POST, uri = target, entity = entity) + } + + val response = Source.fromFuture(request).via(httpClient).runWith(Sink.head) + response.flatMap { some => + Unmarshal(some).to[String] + }.map { path => + FilePath(path) + } + } + + def download(remoteFile: FilePath, saveAs: File): Future[Unit] = { + val download = server.withPath(Path("/download")).withQuery(Query("file" -> remoteFile.path)) + // Download file to local + val response = Source.single(HttpRequest(uri = download)).via(httpClient).runWith(Sink.head) + val downloaded = response.flatMap { response => + response.entity.dataBytes.runWith(FileIO.toFile(saveAs)) + } + downloaded.map(written => Unit) + } + + private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = { + val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), + FileIO.fromFile(file, chunkSize = 100000)) + val body = Source.single( + Multipart.FormData.BodyPart( + "uploadfile", + entity, + Map("filename" -> file.getName))) + val form = Multipart.FormData(body) + + Marshal(form).to[RequestEntity] + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/resources/META-INF/services/io.gearpump.jarstore.JarStoreService ---------------------------------------------------------------------- diff --git a/daemon/src/test/resources/META-INF/services/io.gearpump.jarstore.JarStoreService b/daemon/src/test/resources/META-INF/services/io.gearpump.jarstore.JarStoreService deleted file mode 100644 index d226af9..0000000 --- a/daemon/src/test/resources/META-INF/services/io.gearpump.jarstore.JarStoreService +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -io.gearpump.jarstore.local.LocalJarStoreService -io.gearpump.jarstore.dfs.DFSJarStoreService \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService ---------------------------------------------------------------------- diff --git a/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService b/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService new file mode 100644 index 0000000..bf37316 --- /dev/null +++ b/daemon/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.gearpump.jarstore.local.LocalJarStoreService +org.apache.gearpump.jarstore.dfs.DFSJarStoreService \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/io/gearpump/cluster/MiniCluster.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/io/gearpump/cluster/MiniCluster.scala b/daemon/src/test/scala/io/gearpump/cluster/MiniCluster.scala deleted file mode 100644 index c6dbbfe..0000000 --- a/daemon/src/test/scala/io/gearpump/cluster/MiniCluster.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster - -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} - -import akka.actor.{Actor, ActorRef, ActorSystem, Props} -import akka.pattern.ask -import akka.testkit.TestActorRef -import com.typesafe.config.ConfigValueFactory - -import io.gearpump.cluster.AppMasterToMaster.GetAllWorkers -import io.gearpump.cluster.MasterToAppMaster.WorkerList -import io.gearpump.cluster.master.Master -import io.gearpump.cluster.worker.Worker -import io.gearpump.util.Constants - -class MiniCluster { - private val mockMasterIP = "127.0.0.1" - - implicit val system = ActorSystem("system", TestUtil.MASTER_CONFIG. - withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(mockMasterIP))) - - val (mockMaster, worker) = { - val master = system.actorOf(Props(classOf[Master]), "master") - val worker = system.actorOf(Props(classOf[Worker], master), "worker") - - // Wait until worker register itself to master - waitUtilWorkerIsRegistered(master) - (master, worker) - } - - def launchActor(props: Props): TestActorRef[Actor] = { - TestActorRef(props) - } - - private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = { - while (!isWorkerRegistered(master)) {} - } - - private def isWorkerRegistered(master: ActorRef): Boolean = { - import scala.concurrent.duration._ - implicit val dispatcher = system.dispatcher - - implicit val futureTimeout = Constants.FUTURE_TIMEOUT - - val workerListFuture = (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]] - - // Waits until the worker is registered. - val workers = Await.result[WorkerList](workerListFuture, 15.seconds) - workers.workers.size > 0 - } - - def shutDown(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala deleted file mode 100644 index 30347d2..0000000 --- a/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.main - -import scala.concurrent.Future -import scala.util.{Success, Try} - -import com.typesafe.config.Config -import org.scalatest._ - -import io.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication} -import io.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge, _} -import io.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult} -import io.gearpump.cluster.WorkerToMaster.RegisterNewWorker -import io.gearpump.cluster.{MasterHarness, TestUtil} -import io.gearpump.util.Constants._ -import io.gearpump.util.{Constants, LogUtil, Util} - -class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { - - private val LOG = LogUtil.getLogger(getClass) - - override def config: Config = TestUtil.DEFAULT_CONFIG - - override def beforeEach(): Unit = { - startActorSystem() - } - - override def afterEach(): Unit = { - shutdownActorSystem() - } - - "Worker" should "register worker address to master when started." in { - - val masterReceiver = createMockMaster() - - val tempTestConf = convertTestConf(getHost, getPort) - - val options = Array( - s"-D$GEARPUMP_CUSTOM_CONFIG_FILE=${tempTestConf.toString}", - s"-D${PREFER_IPV4}=true" - ) ++ getMasterListOption() - - val worker = Util.startProcess(options, - getContextClassPath, - getMainClassName(Worker), - Array.empty) - - try { - masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker) - - tempTestConf.delete() - } finally { - worker.destroy() - } - } - - // This UT fails a lot on Travis, temporarily delete it. - // "Master" should "accept worker RegisterNewWorker when started" in { - // val worker = TestProbe()(getActorSystem) - // - // val port = Util.findFreePort.get - // - // val masterConfig = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=127.0.0.1:$port", - // s"-D${Constants.GEARPUMP_HOSTNAME}=127.0.0.1") - // - // val masterProcess = Util.startProcess(masterConfig, - // getContextClassPath, - // getMainClassName(io.gearpump.cluster.main.Master), - // Array("-ip", "127.0.0.1", "-port", port.toString)) - // - // //wait for master process to be started - // - // try { - // - // val masterProxy = getActorSystem.actorOf( - // MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec") - // - // worker.send(masterProxy, RegisterNewWorker) - // worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME) - // } finally { - // masterProcess.destroy() - // } - // } - - "Info" should "be started without exception" in { - - val masterReceiver = createMockMaster() - - Future { - io.gearpump.cluster.main.Info.main(masterConfig, Array.empty) - } - - masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest) - masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, "appName")))) - } - - "Kill" should "be started without exception" in { - - val masterReceiver = createMockMaster() - - Future { - Kill.main(masterConfig, Array("-appid", "0")) - } - - masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0)) - masterReceiver.reply(ShutdownApplicationResult(Success(0))) - } - - "Replay" should "be started without exception" in { - - val masterReceiver = createMockMaster() - - Future { - Replay.main(masterConfig, Array("-appid", "0")) - } - - masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME) - masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref))) - masterReceiver.expectMsgType[ReplayFromTimestampWindowTrailingEdge](PROCESS_BOOT_TIME) - masterReceiver.reply(ReplayApplicationResult(Success(0))) - } - - "Local" should "be started without exception" in { - val port = Util.findFreePort().get - val options = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port", - s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost", - s"-D${PREFER_IPV4}=true") - - val local = Util.startProcess(options, - getContextClassPath, - getMainClassName(Local), - Array.empty) - - def retry(times: Int)(fn: => Boolean): Boolean = { - - LOG.info(s"Local Test: Checking whether local port is available, remain times $times ..") - - val result = fn - if (result || times <= 0) { - result - } else { - Thread.sleep(1000) - retry(times - 1)(fn) - } - } - - try { - assert(retry(10)(isPortUsed("127.0.0.1", port)), - "local is not started successfully, as port is not used " + port) - } finally { - local.destroy() - } - } - - "Gear" should "support app|info|kill|shell|replay" in { - - val commands = Array("app", "info", "kill", "shell", "replay") - - assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw") - - for (command <- commands) { - assert(Try(Gear.main(Array("-noexist"))).isFailure, - "pass unknown option, throw, command: " + command) - } - - assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown command, throw ") - - val tryThis = Try(Gear.main(Array("unknownCommand", "-noexist"))) - assert(tryThis.isFailure, "unknown command, throw") - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala deleted file mode 100644 index 66b9ea8..0000000 --- a/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.main - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.{ActorSystem, Props} -import akka.testkit.TestProbe -import com.typesafe.config.Config -import org.scalatest.{FlatSpec, Matchers} - -import io.gearpump.cluster.TestUtil - -class MasterWatcherSpec extends FlatSpec with Matchers { - def config: Config = TestUtil.MASTER_CONFIG - - "MasterWatcher" should "kill itself when can not get a quorum" in { - val system = ActorSystem("ForMasterWatcher", config) - - val actorWatcher = TestProbe()(system) - - val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher")) - actorWatcher watch masterWatcher - actorWatcher.expectTerminated(masterWatcher, 5.seconds) - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/io/gearpump/cluster/master/AppManagerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/io/gearpump/cluster/master/AppManagerSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/master/AppManagerSpec.scala deleted file mode 100644 index ee6e0e2..0000000 --- a/daemon/src/test/scala/io/gearpump/cluster/master/AppManagerSpec.scala +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.master - -import scala.util.Success - -import akka.actor.{Actor, ActorRef, Props} -import akka.testkit.TestProbe -import com.typesafe.config.Config -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - -import io.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _} -import io.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication} -import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _} -import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult} -import io.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState} -import io.gearpump.cluster.master.AppManager._ -import io.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess} -import io.gearpump.cluster.{TestUtil, _} -import io.gearpump.util.LogUtil - -class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { - var kvService: TestProbe = null - var haService: TestProbe = null - var appLauncher: TestProbe = null - var appManager: ActorRef = null - private val LOG = LogUtil.getLogger(getClass) - - override def config: Config = TestUtil.DEFAULT_CONFIG - - override def beforeEach(): Unit = { - startActorSystem() - kvService = TestProbe()(getActorSystem) - appLauncher = TestProbe()(getActorSystem) - - appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref, - new DummyAppMasterLauncherFactory(appLauncher)))) - kvService.expectMsgType[GetKV] - kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Map.empty))) - } - - override def afterEach(): Unit = { - shutdownActorSystem() - } - - "AppManager" should "handle appmaster message correctly" in { - val appMaster = TestProbe()(getActorSystem) - val worker = TestProbe()(getActorSystem) - - val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(0, "appName")) - appMaster.send(appManager, register) - appMaster.expectMsgType[AppMasterRegistered] - } - - "DataStoreService" should "support Put and Get" in { - val appMaster = TestProbe()(getActorSystem) - appMaster.send(appManager, SaveAppData(0, "key", 1)) - kvService.expectMsgType[PutKV] - kvService.reply(PutKVSuccess) - appMaster.expectMsg(AppDataSaved) - - appMaster.send(appManager, GetAppData(0, "key")) - kvService.expectMsgType[GetKV] - kvService.reply(GetKVSuccess("key", 1)) - appMaster.expectMsg(GetAppDataResult("key", 1)) - } - - "AppManager" should "support application submission and shutdown" in { - testClientSubmission(withRecover = false) - } - - "AppManager" should "support application submission and recover if appmaster dies" in { - LOG.info("=================testing recover==============") - testClientSubmission(withRecover = true) - } - - "AppManager" should "handle client message correctly" in { - val mockClient = TestProbe()(getActorSystem) - mockClient.send(appManager, ShutdownApplication(1)) - assert(mockClient.receiveN(1).head.asInstanceOf[ShutdownApplicationResult].appId.isFailure) - - mockClient.send(appManager, ResolveAppId(1)) - assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure) - - mockClient.send(appManager, AppMasterDataRequest(1)) - mockClient.expectMsg(AppMasterData(AppMasterNonExist)) - } - - "AppManager" should "reject the application submission if the app name already existed" in { - val app = TestUtil.dummyApp - val submit = SubmitApplication(app, None, "username") - val client = TestProbe()(getActorSystem) - val appMaster = TestProbe()(getActorSystem) - val worker = TestProbe()(getActorSystem) - val appId = 1 - - client.send(appManager, submit) - - kvService.expectMsgType[PutKV] - appLauncher.expectMsg(LauncherStarted(appId)) - appMaster.send(appManager, RegisterAppMaster(appMaster.ref, - AppMasterRuntimeInfo(appId, app.name))) - appMaster.expectMsgType[AppMasterRegistered] - - client.send(appManager, submit) - assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure) - } - - def testClientSubmission(withRecover: Boolean): Unit = { - val app = TestUtil.dummyApp - val submit = SubmitApplication(app, None, "username") - val client = TestProbe()(getActorSystem) - val appMaster = TestProbe()(getActorSystem) - val worker = TestProbe()(getActorSystem) - val appId = 1 - - client.send(appManager, submit) - - kvService.expectMsgType[PutKV] - appLauncher.expectMsg(LauncherStarted(appId)) - appMaster.send(appManager, RegisterAppMaster(appMaster.ref, - AppMasterRuntimeInfo(appId, app.name))) - kvService.expectMsgType[PutKV] - appMaster.expectMsgType[AppMasterRegistered] - - client.send(appManager, ResolveAppId(appId)) - client.expectMsg(ResolveAppIdResult(Success(appMaster.ref))) - - client.send(appManager, AppMastersDataRequest) - client.expectMsgType[AppMastersData] - - client.send(appManager, AppMasterDataRequest(appId, false)) - client.expectMsgType[AppMasterData] - - if (!withRecover) { - client.send(appManager, ShutdownApplication(appId)) - client.expectMsg(ShutdownApplicationResult(Success(appId))) - } else { - // Do recovery - getActorSystem.stop(appMaster.ref) - kvService.expectMsgType[GetKV] - val appState = ApplicationState(appId, "application1", 1, app, None, "username", null) - kvService.reply(GetKVSuccess(APP_STATE, appState)) - appLauncher.expectMsg(LauncherStarted(appId)) - } - } -} - -class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFactory { - - override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar], - username: String, master: ActorRef, client: Option[ActorRef]): Props = { - Props(new DummyAppMasterLauncher(test, appId)) - } -} - -class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor { - - test.ref ! LauncherStarted(appId) - override def receive: Receive = { - case any: Any => test.ref forward any - } -} - -case class LauncherStarted(appId: Int) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala deleted file mode 100644 index b929349..0000000 --- a/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.master - -import scala.concurrent.duration._ - -import akka.actor.Props -import akka.testkit.TestProbe -import com.typesafe.config.Config -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - -import io.gearpump.cluster.master.InMemoryKVService._ -import io.gearpump.cluster.{MasterHarness, TestUtil} - -class InMemoryKVServiceSpec - extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { - - override def beforeEach(): Unit = { - startActorSystem() - } - - override def afterEach(): Unit = { - shutdownActorSystem() - } - - override def config: Config = TestUtil.MASTER_CONFIG - - "KVService" should "get, put, delete correctly" in { - val system = getActorSystem - val kvService = system.actorOf(Props(new InMemoryKVService())) - val group = "group" - - val client = TestProbe()(system) - - client.send(kvService, PutKV(group, "key", 1)) - client.expectMsg(PutKVSuccess) - - client.send(kvService, PutKV(group, "key", 2)) - client.expectMsg(PutKVSuccess) - - client.send(kvService, GetKV(group, "key")) - client.expectMsg(GetKVSuccess("key", 2)) - - client.send(kvService, DeleteKVGroup(group)) - - // After DeleteGroup, it no longer accept Get and Put message for this group. - client.send(kvService, GetKV(group, "key")) - client.expectNoMsg(3.seconds) - - client.send(kvService, PutKV(group, "key", 3)) - client.expectNoMsg(3.seconds) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala deleted file mode 100644 index a75ade2..0000000 --- a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.scheduler - -import scala.concurrent.duration._ - -import akka.actor.{ActorSystem, Props} -import akka.testkit.{ImplicitSender, TestKit, TestProbe} -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - -import io.gearpump.cluster.AppMasterToMaster.RequestResource -import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated -import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered} -import io.gearpump.cluster.TestUtil -import io.gearpump.cluster.WorkerToMaster.ResourceUpdate -import io.gearpump.cluster.master.Master.MasterInfo -import io.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL} -import io.gearpump.cluster.scheduler.Scheduler.ApplicationFinished -import io.gearpump.cluster.worker.WorkerId - -class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender - with WordSpecLike with Matchers with BeforeAndAfterAll{ - - def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG)) - val appId = 0 - val workerId1: WorkerId = WorkerId(1, 0L) - val workerId2: WorkerId = WorkerId(2, 0L) - val mockAppMaster = TestProbe() - val mockWorker1 = TestProbe() - val mockWorker2 = TestProbe() - - override def afterAll { - TestKit.shutdownActorSystem(system) - } - - "The scheduler" should { - "update resource only when the worker is registered" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)) - expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId1 has not been " + - s"registered into master")) - } - - "drop application's resource requests when the application is removed" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY) - val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY) - scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) - scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref) - scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) - mockAppMaster.expectNoMsg(5.seconds) - } - } - - def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean = { - left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId)) - } - - "The resource request with higher priority" should { - "be handled first" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, Relaxation.ANY) - val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, NORMAL, Relaxation.ANY) - val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, Relaxation.ANY) - - scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) - scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) - - var expect = ResourceAllocated( - Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) - - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - } - } - - "The resource request which delivered earlier" should { - "be handled first if the priorities are the same" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY) - val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY) - scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) - scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) - - var expect = ResourceAllocated( - Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - } - } - - "The PriorityScheduler" should { - "handle the resource request with different relaxation" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - val request1 = ResourceRequest(Resource(40), workerId2, HIGH, Relaxation.SPECIFICWORKER) - val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, Relaxation.SPECIFICWORKER) - - scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) - scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) - scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) - - var expect = ResourceAllocated( - Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) - scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) - - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - val request3 = ResourceRequest( - Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, executorNum = 2) - scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) - - expect = ResourceAllocated(Array( - ResourceAllocation(Resource(15), mockWorker1.ref, workerId1), - ResourceAllocation(Resource(15), mockWorker2.ref, workerId2))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - - // We have to manually update the resource on each worker - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref) - val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, Relaxation.ONEWORKER) - scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref) - - expect = ResourceAllocated( - Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1))) - mockAppMaster.expectMsgPF(5.seconds) { - case request: ResourceAllocated if sameElement(request, expect) => Unit - } - } - } - - "The PriorityScheduler" should { - "handle the resource request with different executor number" in { - val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) - scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) - scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) - - // By default, the request requires only one executor - val request2 = ResourceRequest(Resource(20), WorkerId.unspecified) - scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) - val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] - assert(allocations2.allocations.length == 1) - assert(allocations2.allocations.head.resource == Resource(20)) - - val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3) - scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) - val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] - assert(allocations3.allocations.length == 3) - assert(allocations3.allocations.forall(_.resource == Resource(8))) - - // The total available resource can not satisfy the requirements with executor number - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref) - scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref) - val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3) - scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref) - val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] - assert(allocations4.allocations.length == 2) - assert(allocations4.allocations.forall(_.resource == Resource(20))) - - // When new resources are available, the remaining request will be satisfied - scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref) - val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] - assert(allocations5.allocations.length == 1) - assert(allocations4.allocations.forall(_.resource == Resource(20))) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala deleted file mode 100644 index 46e8d37..0000000 --- a/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.worker - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.{ActorSystem, PoisonPill, Props} -import akka.testkit.TestProbe -import com.typesafe.config.{Config, ConfigFactory} -import org.scalatest._ - -import io.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, LaunchExecutor, ShutdownExecutor} -import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered} -import io.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, ShutdownExecutorFailed, ShutdownExecutorSucceed} -import io.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate} -import io.gearpump.cluster.master.Master.MasterInfo -import io.gearpump.cluster.scheduler.Resource -import io.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil} -import io.gearpump.util.{ActorSystemBooter, ActorUtil, Constants} - -class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness { - override def config: Config = TestUtil.DEFAULT_CONFIG - - val appId = 1 - val workerId: WorkerId = WorkerId(1, 0L) - val executorId = 1 - var masterProxy: TestProbe = null - var mockMaster: TestProbe = null - var client: TestProbe = null - val workerSlots = 50 - - override def beforeEach(): Unit = { - startActorSystem() - mockMaster = TestProbe()(getActorSystem) - masterProxy = TestProbe()(getActorSystem) - client = TestProbe()(getActorSystem) - } - - override def afterEach(): Unit = { - shutdownActorSystem() - } - - "The new started worker" should { - "kill itself if no response from Master after registering" in { - val worker = getActorSystem.actorOf(Props(classOf[Worker], mockMaster.ref)) - mockMaster watch worker - mockMaster.expectMsg(RegisterNewWorker) - mockMaster.expectTerminated(worker, 60.seconds) - } - } - - "Worker" should { - "init its resource from the gearpump config" in { - val config = ConfigFactory.parseString(s"${Constants.GEARPUMP_WORKER_SLOTS} = $workerSlots"). - withFallback(TestUtil.DEFAULT_CONFIG) - val workerSystem = ActorSystem("WorkerSystem", config) - val worker = workerSystem.actorOf(Props(classOf[Worker], mockMaster.ref)) - mockMaster watch worker - mockMaster.expectMsg(RegisterNewWorker) - - worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref) - mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(workerSlots))) - - worker.tell( - UpdateResourceFailed("Test resource update failed", new Exception()), mockMaster.ref) - mockMaster.expectTerminated(worker, 5.seconds) - workerSystem.terminate() - Await.result(workerSystem.whenTerminated, Duration.Inf) - } - } - - "Worker" should { - "update its remaining resource when launching and shutting down executors" in { - val worker = getActorSystem.actorOf(Props(classOf[Worker], masterProxy.ref)) - masterProxy.expectMsg(RegisterNewWorker) - - worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref) - mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100))) - - val executorName = ActorUtil.actorNameForExecutor(appId, executorId) - // This is an actor path which the ActorSystemBooter will report back to, - // not needed in this test - val reportBack = "dummy" - val executionContext = ExecutorJVMConfig(Array.empty[String], - getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split(" "), - classOf[ActorSystemBooter].getName, Array(executorName, reportBack), None, - username = "user") - - // Test LaunchExecutor - worker.tell(LaunchExecutor(appId, executorId, Resource(101), executionContext), - mockMaster.ref) - mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource on this machine")) - - worker.tell(LaunchExecutor(appId, executorId, Resource(5), executionContext), mockMaster.ref) - mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(95))) - - worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), client.ref) - mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98))) - - // Test terminationWatch - worker.tell(ShutdownExecutor(appId, executorId, "Test shut down executor"), client.ref) - mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100))) - client.expectMsg(ShutdownExecutorSucceed(1, 1)) - - worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down executor"), client.ref) - client.expectMsg(ShutdownExecutorFailed( - s"Can not find executor ${executorId + 1} for app $appId")) - - mockMaster.ref ! PoisonPill - masterProxy.expectMsg(RegisterWorker(workerId)) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/io/gearpump/util/FileServerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/io/gearpump/util/FileServerSpec.scala b/daemon/src/test/scala/io/gearpump/util/FileServerSpec.scala deleted file mode 100644 index 66c7c1d..0000000 --- a/daemon/src/test/scala/io/gearpump/util/FileServerSpec.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import java.io.File -import java.util.concurrent.TimeUnit -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.ActorSystem -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - -import io.gearpump.cluster.TestUtil -import io.gearpump.google.common.io.Files -import io.gearpump.jarstore.FilePath -import io.gearpump.util.FileServer._ - -class FileServerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll { - - implicit val timeout = akka.util.Timeout(25, TimeUnit.SECONDS) - val host = "localhost" - private val LOG = LogUtil.getLogger(getClass) - - var system: ActorSystem = null - - override def afterAll { - if (null != system) { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - } - - override def beforeAll { - val config = TestUtil.DEFAULT_CONFIG - system = ActorSystem("FileServerSpec", config) - } - - private def save(client: Client, data: Array[Byte]): FilePath = { - val file = File.createTempFile("fileserverspec", "test") - FileUtils.writeByteArrayToFile(file, data) - val future = client.upload(file) - import scala.concurrent.duration._ - val path = Await.result(future, 30.seconds) - file.delete() - path - } - - private def get(client: Client, remote: FilePath): Array[Byte] = { - val file = File.createTempFile("fileserverspec", "test") - val future = client.download(remote, file) - import scala.concurrent.duration._ - val data = Await.result(future, 10.seconds) - - val bytes = FileUtils.readFileToByteArray(file) - file.delete() - bytes - } - - "The file server" should { - "serve the data previously stored" in { - - val rootDir = Files.createTempDir() - - val server = new FileServer(system, host, 0, rootDir) - val port = Await.result((server.start), Duration(25, TimeUnit.SECONDS)) - - LOG.info("start test web server on port " + port) - - val sizes = List(1, 100, 1000000, 50000000) - val client = new Client(system, host, port.port) - - sizes.foreach { size => - val bytes = randomBytes(size) - val url = s"http://$host:${port.port}/$size" - val remote = save(client, bytes) - val fetchedBytes = get(client, remote) - assert(fetchedBytes sameElements bytes, s"fetch data is coruppted, $url, $rootDir") - } - server.stop - rootDir.delete() - } - } - - "The file server" should { - "handle missed file" in { - - val rootDir = Files.createTempDir() - - val server = new FileServer(system, host, 0, rootDir) - val port = Await.result((server.start), Duration(25, TimeUnit.SECONDS)) - - val client = new Client(system, host, port.port) - val fetchedBytes = get(client, FilePath("noexist")) - assert(fetchedBytes.length == 0) - rootDir.delete() - } - } - - private def randomBytes(size: Int): Array[Byte] = { - val bytes = new Array[Byte](size) - (new java.util.Random()).nextBytes(bytes) - bytes - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala new file mode 100644 index 0000000..a6b75cb --- /dev/null +++ b/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} + +import akka.actor.{Actor, ActorRef, ActorSystem, Props} +import akka.pattern.ask +import akka.testkit.TestActorRef +import com.typesafe.config.ConfigValueFactory + +import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers +import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList +import org.apache.gearpump.cluster.master.Master +import org.apache.gearpump.cluster.worker.Worker +import org.apache.gearpump.util.Constants + +class MiniCluster { + private val mockMasterIP = "127.0.0.1" + + implicit val system = ActorSystem("system", TestUtil.MASTER_CONFIG. + withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(mockMasterIP))) + + val (mockMaster, worker) = { + val master = system.actorOf(Props(classOf[Master]), "master") + val worker = system.actorOf(Props(classOf[Worker], master), "worker") + + // Wait until worker register itself to master + waitUtilWorkerIsRegistered(master) + (master, worker) + } + + def launchActor(props: Props): TestActorRef[Actor] = { + TestActorRef(props) + } + + private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = { + while (!isWorkerRegistered(master)) {} + } + + private def isWorkerRegistered(master: ActorRef): Boolean = { + import scala.concurrent.duration._ + implicit val dispatcher = system.dispatcher + + implicit val futureTimeout = Constants.FUTURE_TIMEOUT + + val workerListFuture = (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]] + + // Waits until the worker is registered. + val workers = Await.result[WorkerList](workerListFuture, 15.seconds) + workers.workers.size > 0 + } + + def shutDown(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala new file mode 100644 index 0000000..205bb49 --- /dev/null +++ b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.main + +import scala.concurrent.Future +import scala.util.{Success, Try} + +import com.typesafe.config.Config +import org.scalatest._ + +import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication} +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge, _} +import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult} +import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.{Constants, LogUtil, Util} + +class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { + + private val LOG = LogUtil.getLogger(getClass) + + override def config: Config = TestUtil.DEFAULT_CONFIG + + override def beforeEach(): Unit = { + startActorSystem() + } + + override def afterEach(): Unit = { + shutdownActorSystem() + } + + "Worker" should "register worker address to master when started." in { + + val masterReceiver = createMockMaster() + + val tempTestConf = convertTestConf(getHost, getPort) + + val options = Array( + s"-D$GEARPUMP_CUSTOM_CONFIG_FILE=${tempTestConf.toString}", + s"-D${PREFER_IPV4}=true" + ) ++ getMasterListOption() + + val worker = Util.startProcess(options, + getContextClassPath, + getMainClassName(Worker), + Array.empty) + + try { + masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker) + + tempTestConf.delete() + } finally { + worker.destroy() + } + } + + // This UT fails a lot on Travis, temporarily delete it. + // "Master" should "accept worker RegisterNewWorker when started" in { + // val worker = TestProbe()(getActorSystem) + // + // val port = Util.findFreePort.get + // + // val masterConfig = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=127.0.0.1:$port", + // s"-D${Constants.GEARPUMP_HOSTNAME}=127.0.0.1") + // + // val masterProcess = Util.startProcess(masterConfig, + // getContextClassPath, + // getMainClassName(org.apache.gearpump.cluster.main.Master), + // Array("-ip", "127.0.0.1", "-port", port.toString)) + // + // //wait for master process to be started + // + // try { + // + // val masterProxy = getActorSystem.actorOf( + // MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec") + // + // worker.send(masterProxy, RegisterNewWorker) + // worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME) + // } finally { + // masterProcess.destroy() + // } + // } + + "Info" should "be started without exception" in { + + val masterReceiver = createMockMaster() + + Future { + org.apache.gearpump.cluster.main.Info.main(masterConfig, Array.empty) + } + + masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest) + masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, "appName")))) + } + + "Kill" should "be started without exception" in { + + val masterReceiver = createMockMaster() + + Future { + Kill.main(masterConfig, Array("-appid", "0")) + } + + masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0)) + masterReceiver.reply(ShutdownApplicationResult(Success(0))) + } + + "Replay" should "be started without exception" in { + + val masterReceiver = createMockMaster() + + Future { + Replay.main(masterConfig, Array("-appid", "0")) + } + + masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME) + masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref))) + masterReceiver.expectMsgType[ReplayFromTimestampWindowTrailingEdge](PROCESS_BOOT_TIME) + masterReceiver.reply(ReplayApplicationResult(Success(0))) + } + + "Local" should "be started without exception" in { + val port = Util.findFreePort().get + val options = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port", + s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost", + s"-D${PREFER_IPV4}=true") + + val local = Util.startProcess(options, + getContextClassPath, + getMainClassName(Local), + Array.empty) + + def retry(times: Int)(fn: => Boolean): Boolean = { + + LOG.info(s"Local Test: Checking whether local port is available, remain times $times ..") + + val result = fn + if (result || times <= 0) { + result + } else { + Thread.sleep(1000) + retry(times - 1)(fn) + } + } + + try { + assert(retry(10)(isPortUsed("127.0.0.1", port)), + "local is not started successfully, as port is not used " + port) + } finally { + local.destroy() + } + } + + "Gear" should "support app|info|kill|shell|replay" in { + + val commands = Array("app", "info", "kill", "shell", "replay") + + assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw") + + for (command <- commands) { + assert(Try(Gear.main(Array("-noexist"))).isFailure, + "pass unknown option, throw, command: " + command) + } + + assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown command, throw ") + + val tryThis = Try(Gear.main(Array("unknownCommand", "-noexist"))) + assert(tryThis.isFailure, "unknown command, throw") + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala new file mode 100644 index 0000000..e1ba8f6 --- /dev/null +++ b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.main + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.{ActorSystem, Props} +import akka.testkit.TestProbe +import com.typesafe.config.Config +import org.scalatest.{FlatSpec, Matchers} + +import org.apache.gearpump.cluster.TestUtil + +class MasterWatcherSpec extends FlatSpec with Matchers { + def config: Config = TestUtil.MASTER_CONFIG + + "MasterWatcher" should "kill itself when can not get a quorum" in { + val system = ActorSystem("ForMasterWatcher", config) + + val actorWatcher = TestProbe()(system) + + val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher")) + actorWatcher watch masterWatcher + actorWatcher.expectTerminated(masterWatcher, 5.seconds) + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala new file mode 100644 index 0000000..ae0ebcd --- /dev/null +++ b/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.master + +import scala.util.Success + +import akka.actor.{Actor, ActorRef, Props} +import akka.testkit.TestProbe +import com.typesafe.config.Config +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + +import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _} +import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication} +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _} +import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult} +import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState} +import org.apache.gearpump.cluster.master.AppManager._ +import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess} +import org.apache.gearpump.cluster.{TestUtil, _} +import org.apache.gearpump.util.LogUtil + +class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { + var kvService: TestProbe = null + var haService: TestProbe = null + var appLauncher: TestProbe = null + var appManager: ActorRef = null + private val LOG = LogUtil.getLogger(getClass) + + override def config: Config = TestUtil.DEFAULT_CONFIG + + override def beforeEach(): Unit = { + startActorSystem() + kvService = TestProbe()(getActorSystem) + appLauncher = TestProbe()(getActorSystem) + + appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref, + new DummyAppMasterLauncherFactory(appLauncher)))) + kvService.expectMsgType[GetKV] + kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Map.empty))) + } + + override def afterEach(): Unit = { + shutdownActorSystem() + } + + "AppManager" should "handle appmaster message correctly" in { + val appMaster = TestProbe()(getActorSystem) + val worker = TestProbe()(getActorSystem) + + val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(0, "appName")) + appMaster.send(appManager, register) + appMaster.expectMsgType[AppMasterRegistered] + } + + "DataStoreService" should "support Put and Get" in { + val appMaster = TestProbe()(getActorSystem) + appMaster.send(appManager, SaveAppData(0, "key", 1)) + kvService.expectMsgType[PutKV] + kvService.reply(PutKVSuccess) + appMaster.expectMsg(AppDataSaved) + + appMaster.send(appManager, GetAppData(0, "key")) + kvService.expectMsgType[GetKV] + kvService.reply(GetKVSuccess("key", 1)) + appMaster.expectMsg(GetAppDataResult("key", 1)) + } + + "AppManager" should "support application submission and shutdown" in { + testClientSubmission(withRecover = false) + } + + "AppManager" should "support application submission and recover if appmaster dies" in { + LOG.info("=================testing recover==============") + testClientSubmission(withRecover = true) + } + + "AppManager" should "handle client message correctly" in { + val mockClient = TestProbe()(getActorSystem) + mockClient.send(appManager, ShutdownApplication(1)) + assert(mockClient.receiveN(1).head.asInstanceOf[ShutdownApplicationResult].appId.isFailure) + + mockClient.send(appManager, ResolveAppId(1)) + assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure) + + mockClient.send(appManager, AppMasterDataRequest(1)) + mockClient.expectMsg(AppMasterData(AppMasterNonExist)) + } + + "AppManager" should "reject the application submission if the app name already existed" in { + val app = TestUtil.dummyApp + val submit = SubmitApplication(app, None, "username") + val client = TestProbe()(getActorSystem) + val appMaster = TestProbe()(getActorSystem) + val worker = TestProbe()(getActorSystem) + val appId = 1 + + client.send(appManager, submit) + + kvService.expectMsgType[PutKV] + appLauncher.expectMsg(LauncherStarted(appId)) + appMaster.send(appManager, RegisterAppMaster(appMaster.ref, + AppMasterRuntimeInfo(appId, app.name))) + appMaster.expectMsgType[AppMasterRegistered] + + client.send(appManager, submit) + assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure) + } + + def testClientSubmission(withRecover: Boolean): Unit = { + val app = TestUtil.dummyApp + val submit = SubmitApplication(app, None, "username") + val client = TestProbe()(getActorSystem) + val appMaster = TestProbe()(getActorSystem) + val worker = TestProbe()(getActorSystem) + val appId = 1 + + client.send(appManager, submit) + + kvService.expectMsgType[PutKV] + appLauncher.expectMsg(LauncherStarted(appId)) + appMaster.send(appManager, RegisterAppMaster(appMaster.ref, + AppMasterRuntimeInfo(appId, app.name))) + kvService.expectMsgType[PutKV] + appMaster.expectMsgType[AppMasterRegistered] + + client.send(appManager, ResolveAppId(appId)) + client.expectMsg(ResolveAppIdResult(Success(appMaster.ref))) + + client.send(appManager, AppMastersDataRequest) + client.expectMsgType[AppMastersData] + + client.send(appManager, AppMasterDataRequest(appId, false)) + client.expectMsgType[AppMasterData] + + if (!withRecover) { + client.send(appManager, ShutdownApplication(appId)) + client.expectMsg(ShutdownApplicationResult(Success(appId))) + } else { + // Do recovery + getActorSystem.stop(appMaster.ref) + kvService.expectMsgType[GetKV] + val appState = ApplicationState(appId, "application1", 1, app, None, "username", null) + kvService.reply(GetKVSuccess(APP_STATE, appState)) + appLauncher.expectMsg(LauncherStarted(appId)) + } + } +} + +class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFactory { + + override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar], + username: String, master: ActorRef, client: Option[ActorRef]): Props = { + Props(new DummyAppMasterLauncher(test, appId)) + } +} + +class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor { + + test.ref ! LauncherStarted(appId) + override def receive: Receive = { + case any: Any => test.ref forward any + } +} + +case class LauncherStarted(appId: Int) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala new file mode 100644 index 0000000..325a484 --- /dev/null +++ b/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.master + +import scala.concurrent.duration._ + +import akka.actor.Props +import akka.testkit.TestProbe +import com.typesafe.config.Config +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + +import org.apache.gearpump.cluster.master.InMemoryKVService._ +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} + +class InMemoryKVServiceSpec + extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { + + override def beforeEach(): Unit = { + startActorSystem() + } + + override def afterEach(): Unit = { + shutdownActorSystem() + } + + override def config: Config = TestUtil.MASTER_CONFIG + + "KVService" should "get, put, delete correctly" in { + val system = getActorSystem + val kvService = system.actorOf(Props(new InMemoryKVService())) + val group = "group" + + val client = TestProbe()(system) + + client.send(kvService, PutKV(group, "key", 1)) + client.expectMsg(PutKVSuccess) + + client.send(kvService, PutKV(group, "key", 2)) + client.expectMsg(PutKVSuccess) + + client.send(kvService, GetKV(group, "key")) + client.expectMsg(GetKVSuccess("key", 2)) + + client.send(kvService, DeleteKVGroup(group)) + + // After DeleteGroup, it no longer accept Get and Put message for this group. + client.send(kvService, GetKV(group, "key")) + client.expectNoMsg(3.seconds) + + client.send(kvService, PutKV(group, "key", 3)) + client.expectNoMsg(3.seconds) + } +}
