http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala deleted file mode 100644 index 00bd408..0000000 --- a/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.appmaster - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor._ -import akka.testkit.TestProbe -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import io.gearpump.TestProbeUtil._ -import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster -import io.gearpump.cluster._ -import io.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment._ -import io.gearpump.cluster.appmaster.AppMasterRuntimeEnvironmentSpec.TestAppMasterEnv -import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.StartExecutorSystems -import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, MasterStopped} - -class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - implicit var system: ActorSystem = null - - override def beforeAll(): Unit = { - system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - } - - override def afterAll(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - "MasterWithExecutorSystemProvider" should - "forward request StartExecutorSystem to ExecutorSystemProvider" in { - - val client = TestProbe() - val master = TestProbe() - val provider = TestProbe() - val providerProps: Props = provider - val masterEnhanced = system.actorOf(Props( - new MasterWithExecutorSystemProvider(master.ref, providerProps))) - - val start = StartExecutorSystems(null, null) - client.send(masterEnhanced, start) - provider.expectMsg(start) - - val anyOtherMessage = "any other message" - client.send(masterEnhanced, anyOtherMessage) - master.expectMsg(anyOtherMessage) - system.stop(masterEnhanced) - } - - "LazyStartAppMaster" should "forward command to appmaster when app master started" in { - - val appMaster = TestProbe() - val appMasterProps: Props = appMaster - val lazyAppMaster = system.actorOf(Props(new LazyStartAppMaster(appId = 0, appMasterProps))) - val msg = "Some" - lazyAppMaster ! msg - lazyAppMaster ! StartAppMaster - appMaster.expectMsg(msg) - - system.stop(appMaster.ref) - val client = TestProbe() - client.watch(lazyAppMaster) - client.expectTerminated(lazyAppMaster) - } - - "AppMasterRuntimeEnvironment" should "start appMaster when master is connected" in { - val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = - setupAppMasterRuntimeEnv() - - masterConnectionKeeper.send(runtimeEnv, MasterConnected) - appMaster.expectMsg(StartAppMaster) - } - - "AppMasterRuntimeEnvironment" should "shutdown itself when master is stopped" in { - - val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = - setupAppMasterRuntimeEnv() - - masterConnectionKeeper.send(runtimeEnv, MasterStopped) - val client = TestProbe() - client.watch(runtimeEnv) - client.expectTerminated(runtimeEnv) - } - - "AppMasterRuntimeEnvironment" should "shutdown itself when appMaster is stopped" in { - - val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = - setupAppMasterRuntimeEnv() - - val client = TestProbe() - client.watch(runtimeEnv) - system.stop(appMaster.ref) - client.expectTerminated(runtimeEnv) - } - - private def setupAppMasterRuntimeEnv(): TestAppMasterEnv = { - val appContext = AppMasterContext(0, null, null, null, null, null, null) - val app = AppDescription("app", "AppMasterClass", null, null) - val master = TestProbe() - val masterFactory = (_: AppId, _: MasterActorRef) => toProps(master) - val appMaster = TestProbe() - val appMasterFactory = (_: AppMasterContext, _: AppDescription) => toProps(appMaster) - val masterConnectionKeeper = TestProbe() - val masterConnectionKeeperFactory = - (_: MasterActorRef, _: RegisterAppMaster, _: ListenerActorRef) => - toProps(masterConnectionKeeper) - - val runtimeEnv = system.actorOf( - Props(new AppMasterRuntimeEnvironment( - appContext, app, List(master.ref.path), masterFactory, - appMasterFactory, masterConnectionKeeperFactory))) - - TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) - } -} - -object AppMasterRuntimeEnvironmentSpec { - - case class TestAppMasterEnv( - master: TestProbe, appMaster: TestProbe, connectionkeeper: TestProbe, - appMasterRuntimeEnv: ActorRef) -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala deleted file mode 100644 index 3595960..0000000 --- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.appmaster - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.{ActorSystem, Props} -import akka.testkit.TestProbe -import com.typesafe.config.ConfigValueFactory -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor -import io.gearpump.cluster.TestUtil -import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected -import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._ -import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.Session -import io.gearpump.cluster.scheduler.Resource -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, RegisterActorSystem} -import io.gearpump.util.Constants - -class ExecutorSystemLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - implicit var system: ActorSystem = null - val workerId: WorkerId = WorkerId(0, 0L) - val appId = 0 - val executorId = 0 - val url = "akka.tcp://[email protected]:3000" - val session = Session(null, null) - val launchExecutorSystemTimeout = 3000 - val activeConfig = TestUtil.DEFAULT_CONFIG. - withValue(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS, - ConfigValueFactory.fromAnyRef(launchExecutorSystemTimeout)) - - override def beforeAll(): Unit = { - system = ActorSystem("test", activeConfig) - } - - override def afterAll(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - it should "report success when worker launch the system successfully" in { - val worker = TestProbe() - val client = TestProbe() - - val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session))) - client.watch(launcher) - client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, Resource(1))) - - worker.expectMsgType[LaunchExecutor] - worker.reply(RegisterActorSystem(url)) - - worker.expectMsgType[ActorSystemRegistered] - - client.expectMsgType[LaunchExecutorSystemSuccess] - client.expectTerminated(launcher) - } - - it should "report failure when worker refuse to launch the system explicitly" in { - val worker = TestProbe() - val client = TestProbe() - - val resource = Resource(4) - - val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session))) - client.watch(launcher) - client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, resource)) - - worker.expectMsgType[LaunchExecutor] - worker.reply(ExecutorLaunchRejected()) - - client.expectMsg(LaunchExecutorSystemRejected(resource, null, session)) - client.expectTerminated(launcher) - } - - it should "report timeout when trying to start a executor system on worker, " + - "and worker doesn't response" in { - val client = TestProbe() - val worker = TestProbe() - val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session))) - client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, Resource(1))) - client.watch(launcher) - val waitFor = launchExecutorSystemTimeout + 10000 - client.expectMsgType[LaunchExecutorSystemTimeout](waitFor.milliseconds) - client.expectTerminated(launcher, waitFor.milliseconds) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala deleted file mode 100644 index c2f6ee8..0000000 --- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.appmaster - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.{Actor, ActorSystem, Props} -import akka.testkit.TestProbe -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - -import io.gearpump.cluster.AppMasterToMaster.RequestResource -import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated -import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._ -import io.gearpump.cluster.appmaster.ExecutorSystemScheduler._ -import io.gearpump.cluster.appmaster.ExecutorSystemSchedulerSpec.{ExecutorSystemLauncherStarted, MockExecutorSystemLauncher} -import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.cluster.{AppJar, TestUtil} -import io.gearpump.jarstore.FilePath - -class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { - val appId = 0 - val workerId = WorkerId(0, 0L) - val resource = Resource(1) - val resourceRequest = ResourceRequest(resource, WorkerId.unspecified) - val mockJar = AppJar("for_test", FilePath("PATH")) - val emptyJvmConfig = ExecutorSystemJvmConfig(Array.empty, Array.empty, Some(mockJar), "") - val start = StartExecutorSystems(Array(resourceRequest), emptyJvmConfig) - - implicit var system: ActorSystem = null - var worker: TestProbe = null - var workerInfo: WorkerInfo = null - var masterProxy: TestProbe = null - var launcher: TestProbe = null - var client: TestProbe = null - - override def beforeEach(): Unit = { - system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - worker = TestProbe() - workerInfo = WorkerInfo(workerId, worker.ref) - masterProxy = TestProbe() - launcher = TestProbe() - client = TestProbe() - - val scheduler = system.actorOf( - Props(new ExecutorSystemScheduler(appId, masterProxy.ref, (appId: Int, session: Session) => { - Props(new MockExecutorSystemLauncher(launcher, session)) - }))) - - client.send(scheduler, start) - masterProxy.expectMsg(RequestResource(appId, resourceRequest)) - } - - override def afterEach(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - private def launcherStarted(launcher: TestProbe): Option[ExecutorSystemLauncherStarted] = { - val launcherStarted = launcher.receiveOne(15.seconds) - - launcherStarted match { - case start: ExecutorSystemLauncherStarted => Some(start) - case x => - assert(false, "ExecutorSystemLauncherStarted == false") - None - } - } - - it should "schedule and launch an executor system on target worker successfully" in { - - masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId)))) - - val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get - - var systemId = 0 - launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource)) - - val executorSystemProbe = TestProbe() - val executorSystem = - ExecutorSystem(systemId, null, executorSystemProbe.ref, resource, workerInfo) - launcher.reply(LaunchExecutorSystemSuccess(executorSystem, session)) - client.expectMsg(ExecutorSystemStarted(executorSystem, Some(mockJar))) - } - - it should "report failure when resource cannot be allocated" in { - client.expectMsg(30.seconds, StartExecutorSystemTimeout) - } - - it should "schedule new resouce on new worker " + - "when target worker reject creating executor system" in { - masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId)))) - val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get - - var systemId = 0 - launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource)) - launcher.reply(LaunchExecutorSystemRejected(resource, "", session)) - masterProxy.expectMsg(RequestResource(appId, resourceRequest)) - } - - it should "report failure when resource is allocated, but timeout " + - "when starting the executor system" in { - masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId)))) - val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get - - var systemId = 0 - launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource)) - launcher.reply(LaunchExecutorSystemTimeout(session)) - client.expectMsg(StartExecutorSystemTimeout) - } -} - -object ExecutorSystemSchedulerSpec { - class MockExecutorSystemLauncher(forwardTo: TestProbe, session: Session) extends Actor { - forwardTo.ref ! ExecutorSystemLauncherStarted(session) - - def receive: Receive = { - case msg => forwardTo.ref forward msg - } - } - - case class ExecutorSystemLauncherStarted(session: Session) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala deleted file mode 100644 index 3272b99..0000000 --- a/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.appmaster - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.{ActorRef, ActorSystem, Props} -import akka.testkit.TestProbe -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster -import io.gearpump.cluster.MasterToAppMaster.AppMasterRegistered -import io.gearpump.cluster.TestUtil -import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, _} -import io.gearpump.cluster.appmaster.MasterConnectionKeeperSpec.ConnectionKeeperTestEnv -import io.gearpump.cluster.master.MasterProxy.WatchMaster - -class MasterConnectionKeeperSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - - implicit var system: ActorSystem = null - val register = RegisterAppMaster(null, null) - val appId = 0 - - override def beforeAll(): Unit = { - system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - } - - override def afterAll(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - private def startMasterConnectionKeeper: ConnectionKeeperTestEnv = { - val statusChangeSubscriber = TestProbe() - val master = TestProbe() - - val keeper = system.actorOf(Props( - new MasterConnectionKeeper(register, master.ref, statusChangeSubscriber.ref))) - statusChangeSubscriber.watch(keeper) - - master.expectMsgType[WatchMaster] - - // Master is alive, response to RegisterAppMaster - master.expectMsgType[RegisterAppMaster] - master.reply(AppMasterRegistered(appId)) - - // Notify listener that master is alive - statusChangeSubscriber.expectMsg(MasterConnected) - ConnectionKeeperTestEnv(master, keeper, statusChangeSubscriber) - } - - it should "start correctly and notify listener that master is alive" in { - startMasterConnectionKeeper - } - - it should "re-register the appmaster when master is restarted" in { - import io.gearpump.cluster.master.MasterProxy.MasterRestarted - val ConnectionKeeperTestEnv(master, keeper, masterChangeListener) = startMasterConnectionKeeper - - // Master is restarted - master.send(keeper, MasterRestarted) - master.expectMsgType[RegisterAppMaster] - master.reply(AppMasterRegistered(appId)) - masterChangeListener.expectMsg(MasterConnected) - - // Recovery from Master restart is transparent to listener - masterChangeListener.expectNoMsg() - } - - it should "notify listener and then shutdown itself when master is dead" in { - val ConnectionKeeperTestEnv(master, keeper, masterChangeListener) = startMasterConnectionKeeper - - // Master is dead - master.send(keeper, MasterStopped) - - // Keeper should tell the listener that master is stopped before shutting down itself - masterChangeListener.expectMsg(MasterStopped) - masterChangeListener.expectTerminated(keeper) - } - - it should "mark the master as dead when timeout" in { - val statusChangeSubscriber = TestProbe() - val master = TestProbe() - - // MasterConnectionKeeper register itself to master by sending RegisterAppMaster - val keeper = system.actorOf(Props(new MasterConnectionKeeper(register, - master.ref, statusChangeSubscriber.ref))) - - // Master doesn't reply to keeper, - statusChangeSubscriber.watch(keeper) - - // Timeout, keeper notify listener, and then make suicide - statusChangeSubscriber.expectMsg(60.seconds, MasterStopped) - statusChangeSubscriber.expectTerminated(keeper, 60.seconds) - } -} - -object MasterConnectionKeeperSpec { - case class ConnectionKeeperTestEnv( - master: TestProbe, keeper: ActorRef, masterChangeListener: TestProbe) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala b/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala deleted file mode 100644 index 7544f9a..0000000 --- a/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.main - -import org.scalatest.{FlatSpec, Matchers} - -class ArgumentParserSpec extends FlatSpec with Matchers { - it should "parse arguments correctly" in { - - val parser = new ArgumentsParser { - override val options = Array( - "flag" -> CLIOption[Any]("", required = true), - "opt1" -> CLIOption[Any]("", required = true), - "opt2" -> CLIOption[Any]("", required = true)) - } - - val result = parser.parse(Array("-flag", "-opt1", "1", "-opt2", "2", "arg1", "arg2")) - assert(result.getBoolean("flag")) - assert(result.getInt("opt1") == 1) - assert(result.getString("opt1") == "1") - assert(result.getInt("opt2") == 2) - - assert(result.remainArgs(0) == "arg1") - assert(result.remainArgs(1) == "arg2") - } - - it should "handle interleaved options and remain args" in { - - val parser = new ArgumentsParser { - override val options = Array( - "opt1" -> CLIOption[Any]("", required = true)) - } - - val result = parser.parse(Array("-opt1", "1", "xx.MainClass", "-opt2", "2")) - assert(result.getInt("opt1") == 1) - - assert(result.remainArgs.length == 3) - - intercept[Exception] { - parser.parse(Array("-opt2")) - } - - intercept[Exception] { - parser.parse(Array("-opt2", "2", "-opt1")) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala deleted file mode 100644 index 2a8dba1..0000000 --- a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.master - -import scala.util.Success - -import akka.actor._ -import akka.testkit.TestProbe -import com.typesafe.config.Config -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - -import io.gearpump.cluster.AppMasterToMaster.RequestResource -import io.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor} -import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated -import io.gearpump.cluster.MasterToClient.SubmitApplicationResult -import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected -import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.cluster.{MasterHarness, TestUtil} -import io.gearpump.util.ActorSystemBooter._ - -class AppMasterLauncherSpec extends FlatSpec with Matchers - with BeforeAndAfterEach with MasterHarness { - - override def config: Config = TestUtil.DEFAULT_CONFIG - - val appId = 1 - val executorId = 2 - var master: TestProbe = null - var client: TestProbe = null - var worker: TestProbe = null - var watcher: TestProbe = null - var appMasterLauncher: ActorRef = null - - override def beforeEach(): Unit = { - startActorSystem() - master = createMockMaster() - client = TestProbe()(getActorSystem) - worker = TestProbe()(getActorSystem) - watcher = TestProbe()(getActorSystem) - appMasterLauncher = getActorSystem.actorOf(AppMasterLauncher.props(appId, executorId, - TestUtil.dummyApp, None, "username", master.ref, Some(client.ref))) - watcher watch appMasterLauncher - master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified))) - val resource = ResourceAllocated( - Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L)))) - master.reply(resource) - worker.expectMsgType[LaunchExecutor] - } - - override def afterEach(): Unit = { - shutdownActorSystem() - } - - "AppMasterLauncher" should "launch appmaster correctly" in { - worker.reply(RegisterActorSystem("systempath")) - worker.expectMsgType[ActorSystemRegistered] - - worker.expectMsgType[CreateActor] - worker.reply(ActorCreated(master.ref, "appmaster")) - - client.expectMsg(SubmitApplicationResult(Success(appId))) - watcher.expectTerminated(appMasterLauncher) - } - - "AppMasterLauncher" should "reallocate resource if executor launch rejected" in { - worker.reply(ExecutorLaunchRejected("")) - master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified))) - - val resource = ResourceAllocated( - Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L)))) - master.reply(resource) - worker.expectMsgType[LaunchExecutor] - - worker.reply(RegisterActorSystem("systempath")) - worker.expectMsgType[ActorSystemRegistered] - - worker.expectMsgType[CreateActor] - worker.reply(CreateActorFailed("", new Exception)) - worker.expectMsgType[ShutdownExecutor] - assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure) - watcher.expectTerminated(appMasterLauncher) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala deleted file mode 100644 index 99cfc37..0000000 --- a/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.master - -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - -import io.gearpump.cluster.appmaster.ApplicationState - -class ApplicationStateSpec extends FlatSpec with Matchers with BeforeAndAfterEach { - - "ApplicationState" should "check equal with respect to only appId and attemptId" in { - val stateA = ApplicationState(0, "application0", 0, null, null, null, "A") - val stateB = ApplicationState(0, "application0", 0, null, null, null, "B") - val stateC = ApplicationState(0, "application1", 1, null, null, null, "A") - - assert(stateA == stateB) - assert(stateA.hashCode == stateB.hashCode) - assert(stateA != stateC) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala b/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala deleted file mode 100644 index b007120..0000000 --- a/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.master - -class MasterProxySpec { - - // Master proxy retries multiple times to find the master -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala deleted file mode 100644 index e4122da..0000000 --- a/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.master - -class MasterSpec { -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala b/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala deleted file mode 100644 index 3b3265f..0000000 --- a/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.metrics - -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{FlatSpec, Matchers} - -import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter, Histogram => CodaHaleHistogram, Meter => CodaHaleMeter} - -class MetricsSpec extends FlatSpec with Matchers with MockitoSugar { - - "Counter" should "handle sampleRate == 1" in { - - val mockBaseCounter = mock[CodaHaleCounter] - - val counter = new Counter("c", mockBaseCounter) - - counter.inc() - counter.inc() - - verify(mockBaseCounter, times(2)).inc(1) - } - - "Counter" should "handle sampleRate == 3" in { - - val mockBaseCounter = mock[CodaHaleCounter] - - val counter = new Counter("c", mockBaseCounter, 3) - - counter.inc(1) - counter.inc(1) - counter.inc(1) - counter.inc(1) - counter.inc(1) - counter.inc(1) - - verify(mockBaseCounter, times(2)).inc(3) - } - - "Histogram" should "handle sampleRate == 1" in { - - val mockBaseHistogram = mock[CodaHaleHistogram] - - val histogram = new Histogram("h", mockBaseHistogram) - - histogram.update(3) - histogram.update(7) - histogram.update(5) - histogram.update(9) - - verify(mockBaseHistogram, times(4)).update(anyLong()) - } - - "Histogram" should "handle sampleRate > 1" in { - - val mockBaseHistogram = mock[CodaHaleHistogram] - - val histogram = new Histogram("h", mockBaseHistogram, 2) - - histogram.update(3) - histogram.update(4) - histogram.update(5) - histogram.update(6) - - verify(mockBaseHistogram, times(1)).update(4L) - verify(mockBaseHistogram, times(1)).update(6L) - } - - "Meter" should "handle sampleRate == 1" in { - - val mockBaseMeter = mock[CodaHaleMeter] - - val meter = new Meter("m", mockBaseMeter) - - meter.mark() - meter.mark(3) - - verify(mockBaseMeter, times(1)).mark(1L) - verify(mockBaseMeter, times(1)).mark(3L) - } - - "Meter" should "handle sampleRate > 1" in { - - val mockBaseMeter = mock[CodaHaleMeter] - - val meter = new Meter("m", mockBaseMeter, 2) - - meter.mark(1) - meter.mark(3) - - meter.mark(5) - meter.mark(7) - - verify(mockBaseMeter, times(1)).mark(4L) - verify(mockBaseMeter, times(1)).mark(12L) - } - - "Metrics" should "have a name" in { - val metrics = new Metrics(3) - assert(metrics.counter("counter").name == "counter") - assert(metrics.histogram("histogram").name == "histogram") - assert(metrics.meter("meter").name == "meter") - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala b/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala deleted file mode 100644 index 9509d94..0000000 --- a/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.partitioner - -import org.scalatest.{FlatSpec, Matchers} - -import io.gearpump.Message - -class PartitionerSpec extends FlatSpec with Matchers { - val NUM = 10 - - "HashPartitioner" should "hash same key to same slots" in { - val partitioner = new HashPartitioner - - val data = new Array[Byte](1000) - (new java.util.Random()).nextBytes(data) - val msg = Message(data) - - val partition = partitioner.getPartition(msg, NUM) - assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0") - - assert(partition == partitioner.getPartition(msg, NUM), "multiple run should return" + - "consistent result") - } - - "ShufflePartitioner" should "hash same key randomly" in { - val partitioner = new ShufflePartitioner - - val data = new Array[Byte](1000) - (new java.util.Random()).nextBytes(data) - val msg = Message(data) - - val partition = partitioner.getPartition(msg, NUM) - assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0") - - assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" + - "consistent result") - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala b/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala deleted file mode 100644 index c38a555..0000000 --- a/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.security - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.ActorSystem -import org.scalatest.{FlatSpec, Matchers} - -import io.gearpump.cluster.TestUtil - -class ConfigFileBasedAuthenticatorSpec extends FlatSpec with Matchers { - it should "authenticate correctly" in { - val config = TestUtil.UI_CONFIG - implicit val system = ActorSystem("ConfigFileBasedAuthenticatorSpec", config) - implicit val ec = system.dispatcher - val timeout = 30.seconds - - val authenticator = new ConfigFileBasedAuthenticator(config) - val guest = Await.result(authenticator.authenticate("guest", "guest", ec), timeout) - val admin = Await.result(authenticator.authenticate("admin", "admin", ec), timeout) - - val nonexist = Await.result(authenticator.authenticate("nonexist", "nonexist", ec), timeout) - - val failedGuest = Await.result(authenticator.authenticate("guest", "wrong", ec), timeout) - val failedAdmin = Await.result(authenticator.authenticate("admin", "wrong", ec), timeout) - - assert(guest == Authenticator.Guest) - assert(admin == Authenticator.Admin) - assert(nonexist == Authenticator.UnAuthenticated) - assert(failedGuest == Authenticator.UnAuthenticated) - assert(failedAdmin == Authenticator.UnAuthenticated) - - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala b/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala deleted file mode 100644 index 4a3963b..0000000 --- a/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.security - -import org.scalatest.{FlatSpec, Matchers} - -class PasswordUtilSpec extends FlatSpec with Matchers { - - it should "verify the credential correctly" in { - val password = "password" - - val digest1 = PasswordUtil.hash(password) - val digest2 = PasswordUtil.hash(password) - - // Uses different salt each time, thus creating different hash. - assert(digest1 != digest2) - - // Both are valid hash. - assert(PasswordUtil.verify(password, digest1)) - assert(PasswordUtil.verify(password, digest2)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala b/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala deleted file mode 100644 index 3ed6ffa..0000000 --- a/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.serializer - -import scala.collection.JavaConverters._ -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{ActorSystem, ExtendedActorSystem} -import com.typesafe.config.{ConfigFactory, ConfigValueFactory} -import org.scalatest.mock.MockitoSugar -import org.scalatest.{FlatSpec, Matchers} - -import io.gearpump.cluster.TestUtil -import io.gearpump.esotericsoftware.kryo.io.{Input, Output} -import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer} -import io.gearpump.serializer.SerializerSpec._ - -class SerializerSpec extends FlatSpec with Matchers with MockitoSugar { - val config = ConfigFactory.empty.withValue("gearpump.serializers", - ConfigValueFactory.fromAnyRef(Map(classOf[ClassA].getName -> classOf[ClassASerializer].getName, - classOf[ClassB].getName -> classOf[ClassBSerializer].getName).asJava)) - - "GearpumpSerialization" should "register custom serializers" in { - val serialization = new GearpumpSerialization(config) - val kryo = new Kryo - serialization.customize(kryo) - - val forB = kryo.getRegistration(classOf[ClassB]) - assert(forB.getSerializer.isInstanceOf[ClassBSerializer]) - - val forA = kryo.getRegistration(classOf[ClassA]) - assert(forA.getSerializer.isInstanceOf[ClassASerializer]) - } - - "FastKryoSerializer" should "serialize correctly" in { - val myConfig = config.withFallback(TestUtil.DEFAULT_CONFIG.withoutPath("gearpump.serializers")) - val system = ActorSystem("my", myConfig) - - val serializer = new FastKryoSerializer(system.asInstanceOf[ExtendedActorSystem]) - - val bytes = serializer.serialize(new ClassA) - val anotherA = serializer.deserialize(bytes) - - assert(anotherA.isInstanceOf[ClassA]) - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } -} - -object SerializerSpec { - - class ClassA {} - - class ClassASerializer extends KryoSerializer[ClassA] { - override def write(kryo: Kryo, output: Output, `object`: ClassA): Unit = { - output.writeString(classOf[ClassA].getName.toString) - } - - override def read(kryo: Kryo, input: Input, `type`: Class[ClassA]): ClassA = { - val className = input.readString() - Class.forName(className).newInstance().asInstanceOf[ClassA] - } - } - - class ClassB {} - - class ClassBSerializer extends KryoSerializer[ClassA] { - override def write(kryo: Kryo, output: Output, `object`: ClassA): Unit = {} - - override def read(kryo: Kryo, input: Input, `type`: Class[ClassA]): ClassA = { - null - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala b/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala deleted file mode 100644 index 71b4218..0000000 --- a/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.transport - -import java.io.{DataInput, DataOutput} - -import io.gearpump.transport.MockTransportSerializer.NettyMessage -import io.gearpump.transport.netty.ITransportMessageSerializer - -class MockTransportSerializer extends ITransportMessageSerializer { - override def getLength(obj: scala.Any): Int = 4 - - override def serialize(dataOutput: DataOutput, transportMessage: scala.Any): Unit = { - transportMessage match { - case msg: NettyMessage => - dataOutput.writeInt(msg.num) - } - } - - override def deserialize(dataInput: DataInput, length: Int): AnyRef = { - NettyMessage(dataInput.readInt()) - } -} - -object MockTransportSerializer { - case class NettyMessage(num: Int) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/transport/NettySpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/transport/NettySpec.scala b/core/src/test/scala/io/gearpump/transport/NettySpec.scala deleted file mode 100644 index 6caf357..0000000 --- a/core/src/test/scala/io/gearpump/transport/NettySpec.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.transport - -import java.util.concurrent.TimeUnit -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.{ActorRef, ActorSystem} -import akka.testkit.TestProbe -import org.scalatest.mock.MockitoSugar -import org.scalatest.{FlatSpec, Matchers} - -import io.gearpump.cluster.TestUtil -import io.gearpump.transport.MockTransportSerializer.NettyMessage -import io.gearpump.transport.netty.{Context, TaskMessage} -import io.gearpump.util.Util - -class NettySpec extends FlatSpec with Matchers with MockitoSugar { - - "Netty Transport" should "send and receive message correctly " in { - val conf = TestUtil.DEFAULT_CONFIG - val system = ActorSystem("transport", conf) - val context = new Context(system, conf) - val serverActor = TestProbe()(system) - - val port = Util.findFreePort() - - import system.dispatcher - system.scheduler.scheduleOnce(Duration(1, TimeUnit.SECONDS)) { - context.bind("server", new ActorLookupById { - override def lookupLocalActor(id: Long): Option[ActorRef] = Some(serverActor.ref) - }, false, port.get) - } - val client = context.connect(HostPort("127.0.0.1", port.get)) - - val data = NettyMessage(0) - val msg = new TaskMessage(0, 1, 2, data) - client ! msg - serverActor.expectMsg(15.seconds, data) - - context.close() - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala b/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala deleted file mode 100644 index 13530ed..0000000 --- a/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{Actor, ActorSystem, Props} -import akka.testkit.TestProbe -import org.scalatest.mock.MockitoSugar -import org.scalatest.{FlatSpec, Matchers} - -import io.gearpump.cluster.TestUtil -import io.gearpump.util.ActorSystemBooter.{ActorCreated, RegisterActorSystem, _} -import io.gearpump.util.ActorSystemBooterSpec._ - -class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar { - - "ActorSystemBooter" should "report its address back" in { - val boot = bootSystem() - boot.prob.expectMsgType[RegisterActorSystem] - boot.shutdown() - } - - "ActorSystemBooter" should "terminate itself when parent actor dies" in { - val boot = bootSystem() - boot.prob.expectMsgType[RegisterActorSystem] - - val dummy = boot.host.actorOf(Props(classOf[Dummy]), "dummy") - boot.prob.reply(ActorSystemRegistered(boot.prob.ref)) - boot.prob.reply(BindLifeCycle(dummy)) - boot.host.stop(dummy) - val terminated = retry(5)(boot.bootedSystem.whenTerminated.isCompleted) - assert(terminated) - boot.shutdown() - } - - "ActorSystemBooter" should "create new actor" in { - val boot = bootSystem() - boot.prob.expectMsgType[RegisterActorSystem] - boot.prob.reply(ActorSystemRegistered(boot.prob.ref)) - boot.prob.reply(CreateActor(Props(classOf[AcceptThreeArguments], 1, 2, 3), "three")) - boot.prob.expectMsgType[ActorCreated] - - boot.prob.reply(CreateActor(Props(classOf[AcceptZeroArguments]), "zero")) - boot.prob.expectMsgType[ActorCreated] - - boot.shutdown() - } - - private def bootSystem(): Boot = { - val booter = ActorSystemBooter(TestUtil.DEFAULT_CONFIG) - - val system = ActorSystem("reportback", TestUtil.DEFAULT_CONFIG) - - val receiver = TestProbe()(system) - val address = ActorUtil.getFullPath(system, receiver.ref.path) - - val bootSystem = booter.boot("booter", address) - - Boot(system, receiver, bootSystem) - } - - case class Boot(host: ActorSystem, prob: TestProbe, bootedSystem: ActorSystem) { - def shutdown(): Unit = { - host.terminate() - bootedSystem.terminate() - Await.result(host.whenTerminated, Duration.Inf) - Await.result(bootedSystem.whenTerminated, Duration.Inf) - } - } - - def retry(seconds: Int)(fn: => Boolean): Boolean = { - val result = fn - if (result) { - result - } else { - Thread.sleep(1000) - retry(seconds - 1)(fn) - } - } -} - -object ActorSystemBooterSpec { - class Dummy extends Actor { - def receive: Receive = { - case _ => - } - } - - class AcceptZeroArguments extends Actor { - def receive: Receive = { - case _ => - } - } - - class AcceptThreeArguments(a: Int, b: Int, c: Int) extends Actor { - def receive: Receive = { - case _ => - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala b/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala deleted file mode 100644 index 6ab5a2f..0000000 --- a/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import org.scalatest.FlatSpec - -import io.gearpump.transport.HostPort - -class ActorUtilSpec extends FlatSpec { - "masterActorPath" should "construct the ActorPath from HostPort" in { - import io.gearpump.util.Constants.MASTER - - val host = "127.0.0.1" - val port = 3000 - val master = HostPort("127.0.0.1", 3000) - val masterPath = ActorUtil.getMasterActorPath(master) - assert(masterPath.address.port == Some(port)) - assert(masterPath.address.system == MASTER) - assert(masterPath.address.host == Some(host)) - assert(masterPath.address.protocol == "akka.tcp") - assert(masterPath.toStringWithoutAddress == s"/user/$MASTER") - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala b/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala deleted file mode 100644 index 0c798f3..0000000 --- a/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import java.io.File -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.ActorSystem -import org.scalatest.mock.MockitoSugar -import org.scalatest.{FlatSpec, Matchers} - -import io.gearpump.cluster.{ClusterConfig, ClusterConfigSource, UserConfig} - -class ConfigsSpec extends FlatSpec with Matchers with MockitoSugar { - "Typesafe Cluster Configs" should "follow the override rules" in { - - val conf = - """ - gearpump { - gear = "gearpump" - } - - gearpump-master { - conf = "master" - } - gearpump-worker { - conf = "worker" - } - conf = "base" - """ - - val file = File.createTempFile("test", ".conf") - FileUtils.write(file, conf) - - val raw = ClusterConfig.load(ClusterConfigSource(file.toString)) - - assert(raw.master.getString("conf") == "master", "master > base") - assert(raw.worker.getString("conf") == "worker", "worker > base") - assert(raw.default.getString("conf") == "base", "application > base") - - file.delete() - } - - "ClusterConfigSource" should "return empty for non-exist files" in { - val source = ClusterConfigSource("non-exist") - var config = source.getConfig - assert(config.isEmpty) - - val nullCheck = ClusterConfigSource(null) - config = nullCheck.getConfig - assert(config.isEmpty) - } - - "User Config" should "work" in { - - implicit val system = ActorSystem("forSerialization") - - val map = Map[String, String]("key1" -> "1", "key2" -> "value2") - - val user = new UserConfig(map) - .withLong("key3", 2L) - .withBoolean("key4", value = true) - .withFloat("key5", 3.14F) - .withDouble("key6", 2.718) - - assert(user.getInt("key1").get == 1) - assert(user.getString("key1").get == "1") - assert(user.getLong("key3").get == 2L) - assert(user.getBoolean("key4").get == true) - assert(user.getFloat("key5").get == 3.14F) - assert(user.getDouble("key6").get == 2.718) - - val data = new ConfigsSpec.Data(3) - assert(data == user.withValue("data", data).getValue[ConfigsSpec.Data]("data").get) - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } -} - -object ConfigsSpec { - case class Data(value: Int) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala b/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala deleted file mode 100644 index 30e42c7..0000000 --- a/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import java.io.File -import java.util - -import org.scalatest.FlatSpec - -import io.gearpump.google.common.io.Files - -class FileUtilsSpec extends FlatSpec { - val TXT = - """ - |This is a multiple line - |text - | - """.stripMargin - - it should "read/write string correctly" in { - val file = File.createTempFile("fileutilspec", ".test") - FileUtils.write(file, TXT) - assert(FileUtils.read(file) == TXT) - file.delete() - } - - it should "read/write bytes array correctly" in { - val file = File.createTempFile("fileutilspec", ".test") - val bytes = TXT.toCharArray.map(_.toByte) - FileUtils.writeByteArrayToFile(file, bytes) - util.Arrays.equals(bytes, FileUtils.readFileToByteArray(file)) - file.delete() - } - - it should "create directory and all parents" in { - val temp = Files.createTempDir() - val parent = new File(temp, "sub1") - val child = new File(parent, "sub2" + File.separator) - FileUtils.forceMkdir(child) - assert(child.exists()) - assert(child.isDirectory) - child.delete() - parent.delete() - temp.delete() - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/GraphSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/util/GraphSpec.scala b/core/src/test/scala/io/gearpump/util/GraphSpec.scala deleted file mode 100644 index 20eab6d..0000000 --- a/core/src/test/scala/io/gearpump/util/GraphSpec.scala +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import org.scalacheck.Gen -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.util.Graph.{Node, Path} - -class GraphSpec extends PropSpec with PropertyChecks with Matchers { - - case class Vertex(id: Int) - case class Edge(from: Int, to: Int) - - val vertexCount = 100 - - property("Graph with no edges should be built correctly") { - val vertexSet = Set("A", "B", "C") - val graph = Graph(vertexSet.toSeq.map(Node): _*) - graph.vertices.toSet shouldBe vertexSet - } - - property("Graph with vertices and edges should be built correctly") { - val vertices: Array[Vertex] = 0.until(vertexCount).map(Vertex).toArray - val genEdge = for { - from <- Gen.chooseNum[Int](0, vertexCount - 1) - to <- Gen.chooseNum[Int](0, vertexCount - 1) - } yield Edge(from, to) - - var graphElements = Array.empty[Path[Vertex, _ <: Edge]] - val outDegrees = new Array[Int](vertices.length) - val outGoingEdges = vertices.map(_ => Set.empty[(Vertex, Edge, Vertex)]) - val edgesOf = vertices.map(_ => Set.empty[(Vertex, Edge, Vertex)]) - vertices.foreach { v => - graphElements :+= Node(v) - } - - forAll(genEdge) { - e: Edge => - val from = vertices(e.from) - val to = vertices(e.to) - graphElements :+= from ~ e ~> to - outDegrees(e.from) += 1 - - val nodeEdgeNode = (from, e, to) - outGoingEdges(e.from) += nodeEdgeNode - - edgesOf(e.from) += nodeEdgeNode - edgesOf(e.to) += nodeEdgeNode - } - - val graph: Graph[Vertex, Edge] = Graph(graphElements: _*) - graph.vertices should contain theSameElementsAs vertices - - 0.until(vertices.size).foreach { i => - val v = vertices(i) - graph.outgoingEdgesOf(v) should contain theSameElementsAs outGoingEdges(i) - graph.edgesOf(v).sortBy(_._1.id) - graph.edgesOf(v) should contain theSameElementsAs edgesOf(i) - } - } - - property("Check empty graph") { - val graph = Graph.empty[String, String] - assert(graph.isEmpty) - } - - property("check level map for a graph") { - val graph = Graph.empty[String, String] - - val defaultEdge = "edge" - - graph.addVertex("A") - graph.addVertex("B") - graph.addVertex("C") - - graph.addEdge("A", defaultEdge, "B") - graph.addEdge("B", defaultEdge, "C") - graph.addEdge("A", defaultEdge, "C") - - graph.addVertex("D") - graph.addVertex("E") - graph.addVertex("F") - - graph.addEdge("D", defaultEdge, "E") - graph.addEdge("E", defaultEdge, "F") - graph.addEdge("D", defaultEdge, "F") - - graph.addEdge("C", defaultEdge, "E") - - val levelMap = graph.vertexHierarchyLevelMap() - - // Check whether the rule holds: : if vertex A -> B, then level(A) < level(B) - levelMap("A") < levelMap("B") - levelMap("A") < levelMap("C") - levelMap("B") < levelMap("C") - - levelMap("D") < levelMap("E") - levelMap("D") < levelMap("F") - levelMap("E") < levelMap("F") - - levelMap("C") < levelMap("F") - } - - property("copy should return a immutalbe new Graph") { - val graph = Graph.empty[String, String] - val defaultEdge = "edge" - graph.addVertex("A") - graph.addVertex("B") - graph.addEdge("A", defaultEdge, "B") - - val newGraph = graph.copy - newGraph.addVertex("C") - - assert(!graph.vertices.toSet.contains("C"), "Graph should be immutable") - } - - property("subGraph should return a sub-graph for certain vertex") { - val graph = Graph.empty[String, String] - val defaultEdge = "edge" - graph.addVertex("A") - graph.addVertex("B") - graph.addVertex("C") - graph.addEdge("A", defaultEdge, "B") - graph.addEdge("B", defaultEdge, "C") - graph.addEdge("A", defaultEdge, "C") - - val subGraph = graph.subGraph("C") - assert(subGraph.outDegreeOf("A") != graph.outDegreeOf("A")) - } - - property("replaceVertex should hold all upstream downstream relation for a vertex") { - val graph = Graph.empty[String, String] - val defaultEdge = "edge" - graph.addVertex("A") - graph.addVertex("B") - graph.addVertex("C") - graph.addEdge("A", defaultEdge, "B") - graph.addEdge("B", defaultEdge, "C") - - val newGraph = graph.copy.replaceVertex("B", "D") - assert(newGraph.inDegreeOf("D") == graph.inDegreeOf("B")) - assert(newGraph.outDegreeOf("D") == graph.outDegreeOf("B")) - } - - property("Cycle detecting should work properly") { - val graph = Graph.empty[String, String] - val defaultEdge = "edge" - graph.addVertex("A") - graph.addVertex("B") - graph.addVertex("C") - graph.addEdge("A", defaultEdge, "B") - graph.addEdge("B", defaultEdge, "C") - - assert(!graph.hasCycle()) - - graph.addEdge("C", defaultEdge, "B") - assert(graph.hasCycle()) - - graph.addEdge("C", defaultEdge, "A") - assert(graph.hasCycle()) - } - - property("topologicalOrderIterator and topologicalOrderWithCirclesIterator method should " + - "return equal order of graph with no circle") { - val graph = Graph(1 ~> 2 ~> 3, 4 ~> 2, 2 ~> 5) - val topoNoCircles = graph.topologicalOrderIterator - val topoWithCircles = graph.topologicalOrderWithCirclesIterator - - assert(topoNoCircles.zip(topoWithCircles).forall(x => x._1 == x._2)) - } - - property("Topological sort of graph with circles should work properly") { - val graph = Graph(0 ~> 1 ~> 3 ~> 4 ~> 6 ~> 5 ~> 7, - 4 ~> 1, 1 ~> 2 ~> 4, 7 ~> 6, 8 ~> 2, 6 ~> 9, 4 ~> 10) - val topoWithCircles = graph.topologicalOrderWithCirclesIterator - val trueTopoWithCircles = Iterator[Int](0, 8, 1, 3, 4, 2, 6, 5, 7, 10, 9) - - assert(trueTopoWithCircles.zip(topoWithCircles).forall(x => x._1 == x._2)) - } - - property("Duplicated edges detecting should work properly") { - val graph = Graph.empty[String, String] - val defaultEdge = "edge" - val anotherEdge = "edge2" - graph.addVertex("A") - graph.addVertex("B") - graph.addEdge("A", defaultEdge, "B") - - assert(!graph.hasDuplicatedEdge()) - - graph.addEdge("A", anotherEdge, "B") - - assert(graph.hasDuplicatedEdge()) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala b/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala deleted file mode 100644 index ef362ff..0000000 --- a/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import scala.concurrent.duration._ - -import akka.actor._ -import akka.testkit.{ImplicitSender, TestActorRef, TestKit, TestProbe} -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} -import org.slf4j.Logger - -import io.gearpump.cluster.TestUtil - -class TimeOutSchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender - with WordSpecLike with Matchers with BeforeAndAfterAll { - - def this() = this(ActorSystem("WorkerSpec", TestUtil.DEFAULT_CONFIG)) - val mockActor = TestProbe() - - override def afterAll { - TestKit.shutdownActorSystem(system) - } - - "The TimeOutScheduler" should { - "handle the time out event" in { - val testActorRef = TestActorRef(Props(classOf[TestActor], mockActor.ref)) - val testActor = testActorRef.underlyingActor.asInstanceOf[TestActor] - testActor.sendMsgToIgnore() - mockActor.expectMsg(30.seconds, MessageTimeOut) - } - } -} - -case object Echo -case object MessageTimeOut - -class TestActor(mock: ActorRef) extends Actor with TimeOutScheduler { - private val LOG: Logger = LogUtil.getLogger(getClass) - - val target = context.actorOf(Props(classOf[EchoActor])) - - override def receive: Receive = { - case _ => - } - - def sendMsgToIgnore(): Unit = { - sendMsgWithTimeOutCallBack(target, Echo, 2000, sendMsgTimeOut()) - } - - private def sendMsgTimeOut(): Unit = { - mock ! MessageTimeOut - } -} - -class EchoActor extends Actor { - override def receive: Receive = { - case _ => - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/UtilSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/util/UtilSpec.scala b/core/src/test/scala/io/gearpump/util/UtilSpec.scala deleted file mode 100644 index b5bde04..0000000 --- a/core/src/test/scala/io/gearpump/util/UtilSpec.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import org.scalatest.mock.MockitoSugar -import org.scalatest.{FlatSpec, Matchers} - -import io.gearpump.transport.HostPort -import io.gearpump.util.Util._ - -class UtilSpec extends FlatSpec with Matchers with MockitoSugar { - it should "work" in { - - assert(findFreePort().isSuccess) - - assert(randInt() != randInt()) - - val hosts = parseHostList("host1:1,host2:2") - assert(hosts(1) == HostPort("host2", 2)) - - assert(Util.getCurrentClassPath.length > 0) - } - - it should "check application name properly" in { - assert(Util.validApplicationName("_application_1")) - assert(Util.validApplicationName("application_1_")) - assert(!Util.validApplicationName("0_application_1")) - assert(!Util.validApplicationName("_applicat&&ion_1")) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/TestProbeUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/TestProbeUtil.scala b/core/src/test/scala/org/apache/gearpump/TestProbeUtil.scala new file mode 100644 index 0000000..6ab7eed --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/TestProbeUtil.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump + +import scala.language.implicitConversions + +import akka.actor.{Actor, Props, Terminated} +import akka.testkit.TestProbe + +object TestProbeUtil { + implicit def toProps(probe: TestProbe): Props = { + Props(new Actor { + val probeRef = probe.ref + context.watch(probeRef) + def receive: Receive = { + case Terminated(probeRef) => context.stop(self) + case x => probeRef.forward(x) + } + }) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala new file mode 100644 index 0000000..f2f374e --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster + +import java.io.File +import java.net.{InetSocketAddress, Socket, SocketTimeoutException, URLClassLoader, UnknownHostException} +import java.util.Properties +import java.util.concurrent.{Executors, TimeUnit} +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext} + +import akka.actor.{Actor, ActorSystem, Address, Props} +import akka.testkit.TestProbe +import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigValueFactory} + +import org.apache.gearpump.cluster.MasterHarness.MockMaster +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.{ActorUtil, FileUtils, LogUtil} + +trait MasterHarness { + private val LOG = LogUtil.getLogger(getClass) + + implicit val pool = MasterHarness.cachedPool + + private var system: ActorSystem = null + private var systemAddress: Address = null + private var host: String = null + private var port: Int = 0 + private var masterProperties = new Properties() + val PROCESS_BOOT_TIME = Duration(25, TimeUnit.SECONDS) + + def getActorSystem: ActorSystem = system + def getHost: String = host + def getPort: Int = port + + protected def config: Config + + def startActorSystem(): Unit = { + val systemConfig = config + system = ActorSystem(MASTER, systemConfig) + systemAddress = ActorUtil.getSystemAddress(system) + host = systemAddress.host.get + port = systemAddress.port.get + + masterProperties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", s"$getHost:$getPort") + masterProperties.put(s"${GEARPUMP_HOSTNAME}", s"$getHost") + + LOG.info(s"Actor system is started, $host, $port") + } + + def shutdownActorSystem(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + LOG.info(s"Actor system is stopped, $host, $port") + } + + def convertTestConf(host: String, port: Int): File = { + val test = ConfigFactory.parseResourcesAnySyntax("test.conf", + ConfigParseOptions.defaults.setAllowMissing(true)) + + val newConf = test.withValue(GEARPUMP_CLUSTER_MASTERS, + ConfigValueFactory.fromAnyRef(Array(s"$host:$port").toList.asJava)) + + val confFile = File.createTempFile("main", ".conf") + val serialized = newConf.root().render() + FileUtils.write(confFile, serialized) + confFile + } + + def createMockMaster(): TestProbe = { + val masterReceiver = TestProbe()(system) + val master = system.actorOf(Props(classOf[MockMaster], masterReceiver), MASTER) + masterReceiver + } + + def isPortUsed(host: String, port: Int): Boolean = { + + var isPortUsed = true + val socket = new Socket() + try { + socket.setReuseAddress(true) + socket.connect(new InetSocketAddress(host, port), 1000) + socket.isConnected + } catch { + case ex: SocketTimeoutException => + isPortUsed = false + case ex: UnknownHostException => + isPortUsed = false + case ex: Throwable => + // For other case, we think the port has been occupied. + isPortUsed = true + } finally { + socket.close() + } + isPortUsed + } + + def getContextClassPath: Array[String] = { + val contextLoader = Thread.currentThread().getContextClassLoader() + + val urlLoader = if (!contextLoader.isInstanceOf[URLClassLoader]) { + contextLoader.getParent.asInstanceOf[URLClassLoader] + } else { + contextLoader.asInstanceOf[URLClassLoader] + } + + val urls = urlLoader.getURLs() + val classPath = urls.map { url => + new File(url.getPath()).toString + } + classPath + } + + /** + * Remove trailing $ + */ + def getMainClassName(mainObj: Any): String = { + mainObj.getClass.getName.dropRight(1) + } + + def getMasterListOption(): Array[String] = { + masterProperties.asScala.toList.map { kv => + s"-D${kv._1}=${kv._2}" + }.toArray + } + + def masterConfig: Config = { + ConfigFactory.parseProperties(masterProperties).withFallback(system.settings.config) + } +} + +object MasterHarness { + + val cachedPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) + + class MockMaster(receiver: TestProbe) extends Actor { + def receive: Receive = { + case msg => { + receiver.ref forward msg + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/TestUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/TestUtil.scala b/core/src/test/scala/org/apache/gearpump/cluster/TestUtil.scala new file mode 100644 index 0000000..5d8727e --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/TestUtil.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster + +import akka.actor._ + +object TestUtil { + val DEFAULT_CONFIG = ClusterConfig.default("test.conf") + val MASTER_CONFIG = ClusterConfig.master("test.conf") + val UI_CONFIG = ClusterConfig.ui("test.conf") + + class DummyAppMaster(context: AppMasterContext, app: AppDescription) extends ApplicationMaster { + context.masterProxy !(context, app) + + def receive: Receive = null + } + + val dummyApp: AppDescription = + AppDescription("dummy", classOf[DummyAppMaster].getName, UserConfig.empty) +} \ No newline at end of file
