http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala new file mode 100644 index 0000000..a41856d --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.appmaster + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +import akka.actor._ +import akka.testkit.TestProbe +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import org.apache.gearpump.TestProbeUtil._ +import org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster +import org.apache.gearpump.cluster._ +import org.apache.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment._ +import org.apache.gearpump.cluster.appmaster.AppMasterRuntimeEnvironmentSpec.TestAppMasterEnv +import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.StartExecutorSystems +import org.apache.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, MasterStopped} + +class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + implicit var system: ActorSystem = null + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + "MasterWithExecutorSystemProvider" should + "forward request StartExecutorSystem to ExecutorSystemProvider" in { + + val client = TestProbe() + val master = TestProbe() + val provider = TestProbe() + val providerProps: Props = provider + val masterEnhanced = system.actorOf(Props( + new MasterWithExecutorSystemProvider(master.ref, providerProps))) + + val start = StartExecutorSystems(null, null) + client.send(masterEnhanced, start) + provider.expectMsg(start) + + val anyOtherMessage = "any other message" + client.send(masterEnhanced, anyOtherMessage) + master.expectMsg(anyOtherMessage) + system.stop(masterEnhanced) + } + + "LazyStartAppMaster" should "forward command to appmaster when app master started" in { + + val appMaster = TestProbe() + val appMasterProps: Props = appMaster + val lazyAppMaster = system.actorOf(Props(new LazyStartAppMaster(appId = 0, appMasterProps))) + val msg = "Some" + lazyAppMaster ! msg + lazyAppMaster ! StartAppMaster + appMaster.expectMsg(msg) + + system.stop(appMaster.ref) + val client = TestProbe() + client.watch(lazyAppMaster) + client.expectTerminated(lazyAppMaster) + } + + "AppMasterRuntimeEnvironment" should "start appMaster when master is connected" in { + val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = + setupAppMasterRuntimeEnv() + + masterConnectionKeeper.send(runtimeEnv, MasterConnected) + appMaster.expectMsg(StartAppMaster) + } + + "AppMasterRuntimeEnvironment" should "shutdown itself when master is stopped" in { + + val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = + setupAppMasterRuntimeEnv() + + masterConnectionKeeper.send(runtimeEnv, MasterStopped) + val client = TestProbe() + client.watch(runtimeEnv) + client.expectTerminated(runtimeEnv) + } + + "AppMasterRuntimeEnvironment" should "shutdown itself when appMaster is stopped" in { + + val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = + setupAppMasterRuntimeEnv() + + val client = TestProbe() + client.watch(runtimeEnv) + system.stop(appMaster.ref) + client.expectTerminated(runtimeEnv) + } + + private def setupAppMasterRuntimeEnv(): TestAppMasterEnv = { + val appContext = AppMasterContext(0, null, null, null, null, null, null) + val app = AppDescription("app", "AppMasterClass", null, null) + val master = TestProbe() + val masterFactory = (_: AppId, _: MasterActorRef) => toProps(master) + val appMaster = TestProbe() + val appMasterFactory = (_: AppMasterContext, _: AppDescription) => toProps(appMaster) + val masterConnectionKeeper = TestProbe() + val masterConnectionKeeperFactory = + (_: MasterActorRef, _: RegisterAppMaster, _: ListenerActorRef) => + toProps(masterConnectionKeeper) + + val runtimeEnv = system.actorOf( + Props(new AppMasterRuntimeEnvironment( + appContext, app, List(master.ref.path), masterFactory, + appMasterFactory, masterConnectionKeeperFactory))) + + TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) + } +} + +object AppMasterRuntimeEnvironmentSpec { + + case class TestAppMasterEnv( + master: TestProbe, appMaster: TestProbe, connectionkeeper: TestProbe, + appMasterRuntimeEnv: ActorRef) +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala new file mode 100644 index 0000000..d40e775 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.appmaster + +import org.apache.gearpump.cluster.worker.WorkerId + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.{ActorSystem, Props} +import akka.testkit.TestProbe +import com.typesafe.config.ConfigValueFactory +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected +import org.apache.gearpump.cluster.appmaster.ExecutorSystemLauncher._ +import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.Session +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, RegisterActorSystem} +import org.apache.gearpump.util.Constants + +class ExecutorSystemLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + implicit var system: ActorSystem = null + val workerId: WorkerId = WorkerId(0, 0L) + val appId = 0 + val executorId = 0 + val url = "akka.tcp://[email protected]:3000" + val session = Session(null, null) + val launchExecutorSystemTimeout = 3000 + val activeConfig = TestUtil.DEFAULT_CONFIG. + withValue(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS, + ConfigValueFactory.fromAnyRef(launchExecutorSystemTimeout)) + + override def beforeAll(): Unit = { + system = ActorSystem("test", activeConfig) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + it should "report success when worker launch the system successfully" in { + val worker = TestProbe() + val client = TestProbe() + + val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session))) + client.watch(launcher) + client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, Resource(1))) + + worker.expectMsgType[LaunchExecutor] + worker.reply(RegisterActorSystem(url)) + + worker.expectMsgType[ActorSystemRegistered] + + client.expectMsgType[LaunchExecutorSystemSuccess] + client.expectTerminated(launcher) + } + + it should "report failure when worker refuse to launch the system explicitly" in { + val worker = TestProbe() + val client = TestProbe() + + val resource = Resource(4) + + val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session))) + client.watch(launcher) + client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, resource)) + + worker.expectMsgType[LaunchExecutor] + worker.reply(ExecutorLaunchRejected()) + + client.expectMsg(LaunchExecutorSystemRejected(resource, null, session)) + client.expectTerminated(launcher) + } + + it should "report timeout when trying to start a executor system on worker, " + + "and worker doesn't response" in { + val client = TestProbe() + val worker = TestProbe() + val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session))) + client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, Resource(1))) + client.watch(launcher) + val waitFor = launchExecutorSystemTimeout + 10000 + client.expectMsgType[LaunchExecutorSystemTimeout](waitFor.milliseconds) + client.expectTerminated(launcher, waitFor.milliseconds) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala new file mode 100644 index 0000000..6987af4 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.appmaster + +import org.apache.gearpump.cluster.worker.WorkerId + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.{Actor, ActorSystem, Props} +import akka.testkit.TestProbe +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + +import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource +import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated +import org.apache.gearpump.cluster.appmaster.ExecutorSystemLauncher._ +import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler._ +import org.apache.gearpump.cluster.appmaster.ExecutorSystemSchedulerSpec.{ExecutorSystemLauncherStarted, MockExecutorSystemLauncher} +import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} +import org.apache.gearpump.cluster.{AppJar, TestUtil} +import org.apache.gearpump.jarstore.FilePath + +class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { + val appId = 0 + val workerId = WorkerId(0, 0L) + val resource = Resource(1) + val resourceRequest = ResourceRequest(resource, WorkerId.unspecified) + val mockJar = AppJar("for_test", FilePath("PATH")) + val emptyJvmConfig = ExecutorSystemJvmConfig(Array.empty, Array.empty, Some(mockJar), "") + val start = StartExecutorSystems(Array(resourceRequest), emptyJvmConfig) + + implicit var system: ActorSystem = null + var worker: TestProbe = null + var workerInfo: WorkerInfo = null + var masterProxy: TestProbe = null + var launcher: TestProbe = null + var client: TestProbe = null + + override def beforeEach(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + worker = TestProbe() + workerInfo = WorkerInfo(workerId, worker.ref) + masterProxy = TestProbe() + launcher = TestProbe() + client = TestProbe() + + val scheduler = system.actorOf( + Props(new ExecutorSystemScheduler(appId, masterProxy.ref, (appId: Int, session: Session) => { + Props(new MockExecutorSystemLauncher(launcher, session)) + }))) + + client.send(scheduler, start) + masterProxy.expectMsg(RequestResource(appId, resourceRequest)) + } + + override def afterEach(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + private def launcherStarted(launcher: TestProbe): Option[ExecutorSystemLauncherStarted] = { + val launcherStarted = launcher.receiveOne(15.seconds) + + launcherStarted match { + case start: ExecutorSystemLauncherStarted => Some(start) + case x => + assert(false, "ExecutorSystemLauncherStarted == false") + None + } + } + + it should "schedule and launch an executor system on target worker successfully" in { + + masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId)))) + + val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get + + var systemId = 0 + launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource)) + + val executorSystemProbe = TestProbe() + val executorSystem = + ExecutorSystem(systemId, null, executorSystemProbe.ref, resource, workerInfo) + launcher.reply(LaunchExecutorSystemSuccess(executorSystem, session)) + client.expectMsg(ExecutorSystemStarted(executorSystem, Some(mockJar))) + } + + it should "report failure when resource cannot be allocated" in { + client.expectMsg(30.seconds, StartExecutorSystemTimeout) + } + + it should "schedule new resouce on new worker " + + "when target worker reject creating executor system" in { + masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId)))) + val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get + + var systemId = 0 + launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource)) + launcher.reply(LaunchExecutorSystemRejected(resource, "", session)) + masterProxy.expectMsg(RequestResource(appId, resourceRequest)) + } + + it should "report failure when resource is allocated, but timeout " + + "when starting the executor system" in { + masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId)))) + val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get + + var systemId = 0 + launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource)) + launcher.reply(LaunchExecutorSystemTimeout(session)) + client.expectMsg(StartExecutorSystemTimeout) + } +} + +object ExecutorSystemSchedulerSpec { + class MockExecutorSystemLauncher(forwardTo: TestProbe, session: Session) extends Actor { + forwardTo.ref ! ExecutorSystemLauncherStarted(session) + + def receive: Receive = { + case msg => forwardTo.ref forward msg + } + } + + case class ExecutorSystemLauncherStarted(session: Session) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala new file mode 100644 index 0000000..163da0a --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.appmaster + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.{ActorRef, ActorSystem, Props} +import akka.testkit.TestProbe +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster +import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterRegistered +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, _} +import org.apache.gearpump.cluster.appmaster.MasterConnectionKeeperSpec.ConnectionKeeperTestEnv +import org.apache.gearpump.cluster.master.MasterProxy.WatchMaster + +class MasterConnectionKeeperSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + + implicit var system: ActorSystem = null + val register = RegisterAppMaster(null, null) + val appId = 0 + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + private def startMasterConnectionKeeper: ConnectionKeeperTestEnv = { + val statusChangeSubscriber = TestProbe() + val master = TestProbe() + + val keeper = system.actorOf(Props( + new MasterConnectionKeeper(register, master.ref, statusChangeSubscriber.ref))) + statusChangeSubscriber.watch(keeper) + + master.expectMsgType[WatchMaster] + + // Master is alive, response to RegisterAppMaster + master.expectMsgType[RegisterAppMaster] + master.reply(AppMasterRegistered(appId)) + + // Notify listener that master is alive + statusChangeSubscriber.expectMsg(MasterConnected) + ConnectionKeeperTestEnv(master, keeper, statusChangeSubscriber) + } + + it should "start correctly and notify listener that master is alive" in { + startMasterConnectionKeeper + } + + it should "re-register the appmaster when master is restarted" in { + import org.apache.gearpump.cluster.master.MasterProxy.MasterRestarted + val ConnectionKeeperTestEnv(master, keeper, masterChangeListener) = startMasterConnectionKeeper + + // Master is restarted + master.send(keeper, MasterRestarted) + master.expectMsgType[RegisterAppMaster] + master.reply(AppMasterRegistered(appId)) + masterChangeListener.expectMsg(MasterConnected) + + // Recovery from Master restart is transparent to listener + masterChangeListener.expectNoMsg() + } + + it should "notify listener and then shutdown itself when master is dead" in { + val ConnectionKeeperTestEnv(master, keeper, masterChangeListener) = startMasterConnectionKeeper + + // Master is dead + master.send(keeper, MasterStopped) + + // Keeper should tell the listener that master is stopped before shutting down itself + masterChangeListener.expectMsg(MasterStopped) + masterChangeListener.expectTerminated(keeper) + } + + it should "mark the master as dead when timeout" in { + val statusChangeSubscriber = TestProbe() + val master = TestProbe() + + // MasterConnectionKeeper register itself to master by sending RegisterAppMaster + val keeper = system.actorOf(Props(new MasterConnectionKeeper(register, + master.ref, statusChangeSubscriber.ref))) + + // Master doesn't reply to keeper, + statusChangeSubscriber.watch(keeper) + + // Timeout, keeper notify listener, and then make suicide + statusChangeSubscriber.expectMsg(60.seconds, MasterStopped) + statusChangeSubscriber.expectTerminated(keeper, 60.seconds) + } +} + +object MasterConnectionKeeperSpec { + case class ConnectionKeeperTestEnv( + master: TestProbe, keeper: ActorRef, masterChangeListener: TestProbe) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/main/ArgumentParserSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/ArgumentParserSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/ArgumentParserSpec.scala new file mode 100644 index 0000000..0c7c0c3 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/main/ArgumentParserSpec.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.main + +import org.scalatest.{FlatSpec, Matchers} + +class ArgumentParserSpec extends FlatSpec with Matchers { + it should "parse arguments correctly" in { + + val parser = new ArgumentsParser { + override val options = Array( + "flag" -> CLIOption[Any]("", required = true), + "opt1" -> CLIOption[Any]("", required = true), + "opt2" -> CLIOption[Any]("", required = true)) + } + + val result = parser.parse(Array("-flag", "-opt1", "1", "-opt2", "2", "arg1", "arg2")) + assert(result.getBoolean("flag")) + assert(result.getInt("opt1") == 1) + assert(result.getString("opt1") == "1") + assert(result.getInt("opt2") == 2) + + assert(result.remainArgs(0) == "arg1") + assert(result.remainArgs(1) == "arg2") + } + + it should "handle interleaved options and remain args" in { + + val parser = new ArgumentsParser { + override val options = Array( + "opt1" -> CLIOption[Any]("", required = true)) + } + + val result = parser.parse(Array("-opt1", "1", "xx.MainClass", "-opt2", "2")) + assert(result.getInt("opt1") == 1) + + assert(result.remainArgs.length == 3) + + intercept[Exception] { + parser.parse(Array("-opt2")) + } + + intercept[Exception] { + parser.parse(Array("-opt2", "2", "-opt1")) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/master/AppMasterLauncherSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/master/AppMasterLauncherSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/master/AppMasterLauncherSpec.scala new file mode 100644 index 0000000..ac08276 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/master/AppMasterLauncherSpec.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.master + +import org.apache.gearpump.cluster.worker.WorkerId + +import scala.util.Success + +import akka.actor._ +import akka.testkit.TestProbe +import com.typesafe.config.Config +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + +import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource +import org.apache.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor} +import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated +import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult +import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected +import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} +import org.apache.gearpump.util.ActorSystemBooter._ + +class AppMasterLauncherSpec extends FlatSpec with Matchers + with BeforeAndAfterEach with MasterHarness { + + override def config: Config = TestUtil.DEFAULT_CONFIG + + val appId = 1 + val executorId = 2 + var master: TestProbe = null + var client: TestProbe = null + var worker: TestProbe = null + var watcher: TestProbe = null + var appMasterLauncher: ActorRef = null + + override def beforeEach(): Unit = { + startActorSystem() + master = createMockMaster() + client = TestProbe()(getActorSystem) + worker = TestProbe()(getActorSystem) + watcher = TestProbe()(getActorSystem) + appMasterLauncher = getActorSystem.actorOf(AppMasterLauncher.props(appId, executorId, + TestUtil.dummyApp, None, "username", master.ref, Some(client.ref))) + watcher watch appMasterLauncher + master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified))) + val resource = ResourceAllocated( + Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L)))) + master.reply(resource) + worker.expectMsgType[LaunchExecutor] + } + + override def afterEach(): Unit = { + shutdownActorSystem() + } + + "AppMasterLauncher" should "launch appmaster correctly" in { + worker.reply(RegisterActorSystem("systempath")) + worker.expectMsgType[ActorSystemRegistered] + + worker.expectMsgType[CreateActor] + worker.reply(ActorCreated(master.ref, "appmaster")) + + client.expectMsg(SubmitApplicationResult(Success(appId))) + watcher.expectTerminated(appMasterLauncher) + } + + "AppMasterLauncher" should "reallocate resource if executor launch rejected" in { + worker.reply(ExecutorLaunchRejected("")) + master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified))) + + val resource = ResourceAllocated( + Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L)))) + master.reply(resource) + worker.expectMsgType[LaunchExecutor] + + worker.reply(RegisterActorSystem("systempath")) + worker.expectMsgType[ActorSystemRegistered] + + worker.expectMsgType[CreateActor] + worker.reply(CreateActorFailed("", new Exception)) + worker.expectMsgType[ShutdownExecutor] + assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure) + watcher.expectTerminated(appMasterLauncher) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala new file mode 100644 index 0000000..a8adaf0 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.master + +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + +import org.apache.gearpump.cluster.appmaster.ApplicationState + +class ApplicationStateSpec extends FlatSpec with Matchers with BeforeAndAfterEach { + + "ApplicationState" should "check equal with respect to only appId and attemptId" in { + val stateA = ApplicationState(0, "application0", 0, null, null, null, "A") + val stateB = ApplicationState(0, "application0", 0, null, null, null, "B") + val stateC = ApplicationState(0, "application1", 1, null, null, null, "A") + + assert(stateA == stateB) + assert(stateA.hashCode == stateB.hashCode) + assert(stateA != stateC) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/master/MasterProxySpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/master/MasterProxySpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/master/MasterProxySpec.scala new file mode 100644 index 0000000..ff1e7b1 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/master/MasterProxySpec.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.master + +class MasterProxySpec { + + // Master proxy retries multiple times to find the master +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/master/MasterSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/master/MasterSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/master/MasterSpec.scala new file mode 100644 index 0000000..4070d8c --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/master/MasterSpec.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.master + +class MasterSpec { +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala b/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala new file mode 100644 index 0000000..aa27d8f --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.metrics + +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FlatSpec, Matchers} + +import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter, Histogram => CodaHaleHistogram, Meter => CodaHaleMeter} + +class MetricsSpec extends FlatSpec with Matchers with MockitoSugar { + + "Counter" should "handle sampleRate == 1" in { + + val mockBaseCounter = mock[CodaHaleCounter] + + val counter = new Counter("c", mockBaseCounter) + + counter.inc() + counter.inc() + + verify(mockBaseCounter, times(2)).inc(1) + } + + "Counter" should "handle sampleRate == 3" in { + + val mockBaseCounter = mock[CodaHaleCounter] + + val counter = new Counter("c", mockBaseCounter, 3) + + counter.inc(1) + counter.inc(1) + counter.inc(1) + counter.inc(1) + counter.inc(1) + counter.inc(1) + + verify(mockBaseCounter, times(2)).inc(3) + } + + "Histogram" should "handle sampleRate == 1" in { + + val mockBaseHistogram = mock[CodaHaleHistogram] + + val histogram = new Histogram("h", mockBaseHistogram) + + histogram.update(3) + histogram.update(7) + histogram.update(5) + histogram.update(9) + + verify(mockBaseHistogram, times(4)).update(anyLong()) + } + + "Histogram" should "handle sampleRate > 1" in { + + val mockBaseHistogram = mock[CodaHaleHistogram] + + val histogram = new Histogram("h", mockBaseHistogram, 2) + + histogram.update(3) + histogram.update(4) + histogram.update(5) + histogram.update(6) + + verify(mockBaseHistogram, times(1)).update(4L) + verify(mockBaseHistogram, times(1)).update(6L) + } + + "Meter" should "handle sampleRate == 1" in { + + val mockBaseMeter = mock[CodaHaleMeter] + + val meter = new Meter("m", mockBaseMeter) + + meter.mark() + meter.mark(3) + + verify(mockBaseMeter, times(1)).mark(1L) + verify(mockBaseMeter, times(1)).mark(3L) + } + + "Meter" should "handle sampleRate > 1" in { + + val mockBaseMeter = mock[CodaHaleMeter] + + val meter = new Meter("m", mockBaseMeter, 2) + + meter.mark(1) + meter.mark(3) + + meter.mark(5) + meter.mark(7) + + verify(mockBaseMeter, times(1)).mark(4L) + verify(mockBaseMeter, times(1)).mark(12L) + } + + "Metrics" should "have a name" in { + val metrics = new Metrics(3) + assert(metrics.counter("counter").name == "counter") + assert(metrics.histogram("histogram").name == "histogram") + assert(metrics.meter("meter").name == "meter") + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala b/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala new file mode 100644 index 0000000..14be887 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.partitioner + +import org.scalatest.{FlatSpec, Matchers} + +import org.apache.gearpump.Message + +class PartitionerSpec extends FlatSpec with Matchers { + val NUM = 10 + + "HashPartitioner" should "hash same key to same slots" in { + val partitioner = new HashPartitioner + + val data = new Array[Byte](1000) + (new java.util.Random()).nextBytes(data) + val msg = Message(data) + + val partition = partitioner.getPartition(msg, NUM) + assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0") + + assert(partition == partitioner.getPartition(msg, NUM), "multiple run should return" + + "consistent result") + } + + "ShufflePartitioner" should "hash same key randomly" in { + val partitioner = new ShufflePartitioner + + val data = new Array[Byte](1000) + (new java.util.Random()).nextBytes(data) + val msg = Message(data) + + val partition = partitioner.getPartition(msg, NUM) + assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0") + + assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" + + "consistent result") + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala b/core/src/test/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala new file mode 100644 index 0000000..ffe47be --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.security + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import org.scalatest.{FlatSpec, Matchers} + +import org.apache.gearpump.cluster.TestUtil + +class ConfigFileBasedAuthenticatorSpec extends FlatSpec with Matchers { + it should "authenticate correctly" in { + val config = TestUtil.UI_CONFIG + implicit val system = ActorSystem("ConfigFileBasedAuthenticatorSpec", config) + implicit val ec = system.dispatcher + val timeout = 30.seconds + + val authenticator = new ConfigFileBasedAuthenticator(config) + val guest = Await.result(authenticator.authenticate("guest", "guest", ec), timeout) + val admin = Await.result(authenticator.authenticate("admin", "admin", ec), timeout) + + val nonexist = Await.result(authenticator.authenticate("nonexist", "nonexist", ec), timeout) + + val failedGuest = Await.result(authenticator.authenticate("guest", "wrong", ec), timeout) + val failedAdmin = Await.result(authenticator.authenticate("admin", "wrong", ec), timeout) + + assert(guest == Authenticator.Guest) + assert(admin == Authenticator.Admin) + assert(nonexist == Authenticator.UnAuthenticated) + assert(failedGuest == Authenticator.UnAuthenticated) + assert(failedAdmin == Authenticator.UnAuthenticated) + + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/security/PasswordUtilSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/security/PasswordUtilSpec.scala b/core/src/test/scala/org/apache/gearpump/security/PasswordUtilSpec.scala new file mode 100644 index 0000000..1cec360 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/security/PasswordUtilSpec.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.security + +import org.scalatest.{FlatSpec, Matchers} + +class PasswordUtilSpec extends FlatSpec with Matchers { + + it should "verify the credential correctly" in { + val password = "password" + + val digest1 = PasswordUtil.hash(password) + val digest2 = PasswordUtil.hash(password) + + // Uses different salt each time, thus creating different hash. + assert(digest1 != digest2) + + // Both are valid hash. + assert(PasswordUtil.verify(password, digest1)) + assert(PasswordUtil.verify(password, digest2)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala b/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala new file mode 100644 index 0000000..d4f4167 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.serializer + +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +import akka.actor.{ActorSystem, ExtendedActorSystem} +import com.typesafe.config.{ConfigFactory, ConfigValueFactory} +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FlatSpec, Matchers} + +import org.apache.gearpump.cluster.TestUtil +import io.gearpump.esotericsoftware.kryo.io.{Input, Output} +import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer} +import org.apache.gearpump.serializer.SerializerSpec._ + +class SerializerSpec extends FlatSpec with Matchers with MockitoSugar { + val config = ConfigFactory.empty.withValue("gearpump.serializers", + ConfigValueFactory.fromAnyRef(Map(classOf[ClassA].getName -> classOf[ClassASerializer].getName, + classOf[ClassB].getName -> classOf[ClassBSerializer].getName).asJava)) + + "GearpumpSerialization" should "register custom serializers" in { + val serialization = new GearpumpSerialization(config) + val kryo = new Kryo + serialization.customize(kryo) + + val forB = kryo.getRegistration(classOf[ClassB]) + assert(forB.getSerializer.isInstanceOf[ClassBSerializer]) + + val forA = kryo.getRegistration(classOf[ClassA]) + assert(forA.getSerializer.isInstanceOf[ClassASerializer]) + } + + "FastKryoSerializer" should "serialize correctly" in { + val myConfig = config.withFallback(TestUtil.DEFAULT_CONFIG.withoutPath("gearpump.serializers")) + val system = ActorSystem("my", myConfig) + + val serializer = new FastKryoSerializer(system.asInstanceOf[ExtendedActorSystem]) + + val bytes = serializer.serialize(new ClassA) + val anotherA = serializer.deserialize(bytes) + + assert(anotherA.isInstanceOf[ClassA]) + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } +} + +object SerializerSpec { + + class ClassA {} + + class ClassASerializer extends KryoSerializer[ClassA] { + override def write(kryo: Kryo, output: Output, `object`: ClassA): Unit = { + output.writeString(classOf[ClassA].getName.toString) + } + + override def read(kryo: Kryo, input: Input, `type`: Class[ClassA]): ClassA = { + val className = input.readString() + Class.forName(className).newInstance().asInstanceOf[ClassA] + } + } + + class ClassB {} + + class ClassBSerializer extends KryoSerializer[ClassA] { + override def write(kryo: Kryo, output: Output, `object`: ClassA): Unit = {} + + override def read(kryo: Kryo, input: Input, `type`: Class[ClassA]): ClassA = { + null + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/transport/MockTransportSerializer.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/transport/MockTransportSerializer.scala b/core/src/test/scala/org/apache/gearpump/transport/MockTransportSerializer.scala new file mode 100644 index 0000000..ddeb72d --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/transport/MockTransportSerializer.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.transport + +import java.io.{DataInput, DataOutput} +import org.apache.gearpump.transport.MockTransportSerializer.NettyMessage +import org.apache.gearpump.transport.netty.ITransportMessageSerializer + +class MockTransportSerializer extends ITransportMessageSerializer { + override def getLength(obj: scala.Any): Int = 4 + + override def serialize(dataOutput: DataOutput, transportMessage: scala.Any): Unit = { + transportMessage match { + case msg: NettyMessage => + dataOutput.writeInt(msg.num) + } + } + + override def deserialize(dataInput: DataInput, length: Int): AnyRef = { + NettyMessage(dataInput.readInt()) + } +} + +object MockTransportSerializer { + case class NettyMessage(num: Int) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/transport/NettySpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/transport/NettySpec.scala b/core/src/test/scala/org/apache/gearpump/transport/NettySpec.scala new file mode 100644 index 0000000..b1a429c --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/transport/NettySpec.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.transport + +import java.util.concurrent.TimeUnit + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.{ActorRef, ActorSystem} +import akka.testkit.TestProbe +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FlatSpec, Matchers} + +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.transport.MockTransportSerializer.NettyMessage +import org.apache.gearpump.transport.netty.{TaskMessage, Context} +import org.apache.gearpump.util.Util + +class NettySpec extends FlatSpec with Matchers with MockitoSugar { + + "Netty Transport" should "send and receive message correctly " in { + val conf = TestUtil.DEFAULT_CONFIG + val system = ActorSystem("transport", conf) + val context = new Context(system, conf) + val serverActor = TestProbe()(system) + + val port = Util.findFreePort() + + import system.dispatcher + system.scheduler.scheduleOnce(Duration(1, TimeUnit.SECONDS)) { + context.bind("server", new ActorLookupById { + override def lookupLocalActor(id: Long): Option[ActorRef] = Some(serverActor.ref) + }, false, port.get) + } + val client = context.connect(HostPort("127.0.0.1", port.get)) + + val data = NettyMessage(0) + val msg = new TaskMessage(0, 1, 2, data) + client ! msg + serverActor.expectMsg(15.seconds, data) + + context.close() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/util/ActorSystemBooterSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/util/ActorSystemBooterSpec.scala b/core/src/test/scala/org/apache/gearpump/util/ActorSystemBooterSpec.scala new file mode 100644 index 0000000..0d33d44 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/util/ActorSystemBooterSpec.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +import akka.actor.{Actor, ActorSystem, Props} +import akka.testkit.TestProbe +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FlatSpec, Matchers} + +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.util.ActorSystemBooter.{ActorCreated, RegisterActorSystem, _} +import org.apache.gearpump.util.ActorSystemBooterSpec._ + +class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar { + + "ActorSystemBooter" should "report its address back" in { + val boot = bootSystem() + boot.prob.expectMsgType[RegisterActorSystem] + boot.shutdown() + } + + "ActorSystemBooter" should "terminate itself when parent actor dies" in { + val boot = bootSystem() + boot.prob.expectMsgType[RegisterActorSystem] + + val dummy = boot.host.actorOf(Props(classOf[Dummy]), "dummy") + boot.prob.reply(ActorSystemRegistered(boot.prob.ref)) + boot.prob.reply(BindLifeCycle(dummy)) + boot.host.stop(dummy) + val terminated = retry(5)(boot.bootedSystem.whenTerminated.isCompleted) + assert(terminated) + boot.shutdown() + } + + "ActorSystemBooter" should "create new actor" in { + val boot = bootSystem() + boot.prob.expectMsgType[RegisterActorSystem] + boot.prob.reply(ActorSystemRegistered(boot.prob.ref)) + boot.prob.reply(CreateActor(Props(classOf[AcceptThreeArguments], 1, 2, 3), "three")) + boot.prob.expectMsgType[ActorCreated] + + boot.prob.reply(CreateActor(Props(classOf[AcceptZeroArguments]), "zero")) + boot.prob.expectMsgType[ActorCreated] + + boot.shutdown() + } + + private def bootSystem(): Boot = { + val booter = ActorSystemBooter(TestUtil.DEFAULT_CONFIG) + + val system = ActorSystem("reportback", TestUtil.DEFAULT_CONFIG) + + val receiver = TestProbe()(system) + val address = ActorUtil.getFullPath(system, receiver.ref.path) + + val bootSystem = booter.boot("booter", address) + + Boot(system, receiver, bootSystem) + } + + case class Boot(host: ActorSystem, prob: TestProbe, bootedSystem: ActorSystem) { + def shutdown(): Unit = { + host.terminate() + bootedSystem.terminate() + Await.result(host.whenTerminated, Duration.Inf) + Await.result(bootedSystem.whenTerminated, Duration.Inf) + } + } + + def retry(seconds: Int)(fn: => Boolean): Boolean = { + val result = fn + if (result) { + result + } else { + Thread.sleep(1000) + retry(seconds - 1)(fn) + } + } +} + +object ActorSystemBooterSpec { + class Dummy extends Actor { + def receive: Receive = { + case _ => + } + } + + class AcceptZeroArguments extends Actor { + def receive: Receive = { + case _ => + } + } + + class AcceptThreeArguments(a: Int, b: Int, c: Int) extends Actor { + def receive: Receive = { + case _ => + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/util/ActorUtilSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/util/ActorUtilSpec.scala b/core/src/test/scala/org/apache/gearpump/util/ActorUtilSpec.scala new file mode 100644 index 0000000..534bf00 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/util/ActorUtilSpec.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import org.scalatest.FlatSpec + +import org.apache.gearpump.transport.HostPort + +class ActorUtilSpec extends FlatSpec { + "masterActorPath" should "construct the ActorPath from HostPort" in { + import org.apache.gearpump.util.Constants.MASTER + + val host = "127.0.0.1" + val port = 3000 + val master = HostPort("127.0.0.1", 3000) + val masterPath = ActorUtil.getMasterActorPath(master) + assert(masterPath.address.port == Some(port)) + assert(masterPath.address.system == MASTER) + assert(masterPath.address.host == Some(host)) + assert(masterPath.address.protocol == "akka.tcp") + assert(masterPath.toStringWithoutAddress == s"/user/$MASTER") + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/util/ConfigsSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/util/ConfigsSpec.scala b/core/src/test/scala/org/apache/gearpump/util/ConfigsSpec.scala new file mode 100644 index 0000000..e7b9d18 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/util/ConfigsSpec.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import java.io.File +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +import akka.actor.ActorSystem +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FlatSpec, Matchers} + +import org.apache.gearpump.cluster.{ClusterConfig, ClusterConfigSource, UserConfig} + +class ConfigsSpec extends FlatSpec with Matchers with MockitoSugar { + "Typesafe Cluster Configs" should "follow the override rules" in { + + val conf = + """ + gearpump { + gear = "gearpump" + } + + gearpump-master { + conf = "master" + } + gearpump-worker { + conf = "worker" + } + conf = "base" + """ + + val file = File.createTempFile("test", ".conf") + FileUtils.write(file, conf) + + val raw = ClusterConfig.load(ClusterConfigSource(file.toString)) + + assert(raw.master.getString("conf") == "master", "master > base") + assert(raw.worker.getString("conf") == "worker", "worker > base") + assert(raw.default.getString("conf") == "base", "application > base") + + file.delete() + } + + "ClusterConfigSource" should "return empty for non-exist files" in { + val source = ClusterConfigSource("non-exist") + var config = source.getConfig + assert(config.isEmpty) + + val nullCheck = ClusterConfigSource(null) + config = nullCheck.getConfig + assert(config.isEmpty) + } + + "User Config" should "work" in { + + implicit val system = ActorSystem("forSerialization") + + val map = Map[String, String]("key1" -> "1", "key2" -> "value2") + + val user = new UserConfig(map) + .withLong("key3", 2L) + .withBoolean("key4", value = true) + .withFloat("key5", 3.14F) + .withDouble("key6", 2.718) + + assert(user.getInt("key1").get == 1) + assert(user.getString("key1").get == "1") + assert(user.getLong("key3").get == 2L) + assert(user.getBoolean("key4").get == true) + assert(user.getFloat("key5").get == 3.14F) + assert(user.getDouble("key6").get == 2.718) + + val data = new ConfigsSpec.Data(3) + assert(data == user.withValue("data", data).getValue[ConfigsSpec.Data]("data").get) + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } +} + +object ConfigsSpec { + case class Data(value: Int) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala b/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala new file mode 100644 index 0000000..7ad83ce --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import java.io.File +import java.util + +import org.scalatest.FlatSpec + +import io.gearpump.google.common.io.Files + +class FileUtilsSpec extends FlatSpec { + val TXT = + """ + |This is a multiple line + |text + | + """.stripMargin + + it should "read/write string correctly" in { + val file = File.createTempFile("fileutilspec", ".test") + FileUtils.write(file, TXT) + assert(FileUtils.read(file) == TXT) + file.delete() + } + + it should "read/write bytes array correctly" in { + val file = File.createTempFile("fileutilspec", ".test") + val bytes = TXT.toCharArray.map(_.toByte) + FileUtils.writeByteArrayToFile(file, bytes) + util.Arrays.equals(bytes, FileUtils.readFileToByteArray(file)) + file.delete() + } + + it should "create directory and all parents" in { + val temp = Files.createTempDir() + val parent = new File(temp, "sub1") + val child = new File(parent, "sub2" + File.separator) + FileUtils.forceMkdir(child) + assert(child.exists()) + assert(child.isDirectory) + child.delete() + parent.delete() + temp.delete() + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala b/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala new file mode 100644 index 0000000..2b6df78 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import org.scalacheck.Gen +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump.util.Graph.{Node, Path} + +class GraphSpec extends PropSpec with PropertyChecks with Matchers { + + case class Vertex(id: Int) + case class Edge(from: Int, to: Int) + + val vertexCount = 100 + + property("Graph with no edges should be built correctly") { + val vertexSet = Set("A", "B", "C") + val graph = Graph(vertexSet.toSeq.map(Node): _*) + graph.vertices.toSet shouldBe vertexSet + } + + property("Graph with vertices and edges should be built correctly") { + val vertices: Array[Vertex] = 0.until(vertexCount).map(Vertex).toArray + val genEdge = for { + from <- Gen.chooseNum[Int](0, vertexCount - 1) + to <- Gen.chooseNum[Int](0, vertexCount - 1) + } yield Edge(from, to) + + var graphElements = Array.empty[Path[Vertex, _ <: Edge]] + val outDegrees = new Array[Int](vertices.length) + val outGoingEdges = vertices.map(_ => Set.empty[(Vertex, Edge, Vertex)]) + val edgesOf = vertices.map(_ => Set.empty[(Vertex, Edge, Vertex)]) + vertices.foreach { v => + graphElements :+= Node(v) + } + + forAll(genEdge) { + e: Edge => + val from = vertices(e.from) + val to = vertices(e.to) + graphElements :+= from ~ e ~> to + outDegrees(e.from) += 1 + + val nodeEdgeNode = (from, e, to) + outGoingEdges(e.from) += nodeEdgeNode + + edgesOf(e.from) += nodeEdgeNode + edgesOf(e.to) += nodeEdgeNode + } + + val graph: Graph[Vertex, Edge] = Graph(graphElements: _*) + graph.vertices should contain theSameElementsAs vertices + + 0.until(vertices.size).foreach { i => + val v = vertices(i) + graph.outgoingEdgesOf(v) should contain theSameElementsAs outGoingEdges(i) + graph.edgesOf(v).sortBy(_._1.id) + graph.edgesOf(v) should contain theSameElementsAs edgesOf(i) + } + } + + property("Check empty graph") { + val graph = Graph.empty[String, String] + assert(graph.isEmpty) + } + + property("check level map for a graph") { + val graph = Graph.empty[String, String] + + val defaultEdge = "edge" + + graph.addVertex("A") + graph.addVertex("B") + graph.addVertex("C") + + graph.addEdge("A", defaultEdge, "B") + graph.addEdge("B", defaultEdge, "C") + graph.addEdge("A", defaultEdge, "C") + + graph.addVertex("D") + graph.addVertex("E") + graph.addVertex("F") + + graph.addEdge("D", defaultEdge, "E") + graph.addEdge("E", defaultEdge, "F") + graph.addEdge("D", defaultEdge, "F") + + graph.addEdge("C", defaultEdge, "E") + + val levelMap = graph.vertexHierarchyLevelMap() + + // Check whether the rule holds: : if vertex A -> B, then level(A) < level(B) + levelMap("A") < levelMap("B") + levelMap("A") < levelMap("C") + levelMap("B") < levelMap("C") + + levelMap("D") < levelMap("E") + levelMap("D") < levelMap("F") + levelMap("E") < levelMap("F") + + levelMap("C") < levelMap("F") + } + + property("copy should return a immutalbe new Graph") { + val graph = Graph.empty[String, String] + val defaultEdge = "edge" + graph.addVertex("A") + graph.addVertex("B") + graph.addEdge("A", defaultEdge, "B") + + val newGraph = graph.copy + newGraph.addVertex("C") + + assert(!graph.vertices.toSet.contains("C"), "Graph should be immutable") + } + + property("subGraph should return a sub-graph for certain vertex") { + val graph = Graph.empty[String, String] + val defaultEdge = "edge" + graph.addVertex("A") + graph.addVertex("B") + graph.addVertex("C") + graph.addEdge("A", defaultEdge, "B") + graph.addEdge("B", defaultEdge, "C") + graph.addEdge("A", defaultEdge, "C") + + val subGraph = graph.subGraph("C") + assert(subGraph.outDegreeOf("A") != graph.outDegreeOf("A")) + } + + property("replaceVertex should hold all upstream downstream relation for a vertex") { + val graph = Graph.empty[String, String] + val defaultEdge = "edge" + graph.addVertex("A") + graph.addVertex("B") + graph.addVertex("C") + graph.addEdge("A", defaultEdge, "B") + graph.addEdge("B", defaultEdge, "C") + + val newGraph = graph.copy.replaceVertex("B", "D") + assert(newGraph.inDegreeOf("D") == graph.inDegreeOf("B")) + assert(newGraph.outDegreeOf("D") == graph.outDegreeOf("B")) + } + + property("Cycle detecting should work properly") { + val graph = Graph.empty[String, String] + val defaultEdge = "edge" + graph.addVertex("A") + graph.addVertex("B") + graph.addVertex("C") + graph.addEdge("A", defaultEdge, "B") + graph.addEdge("B", defaultEdge, "C") + + assert(!graph.hasCycle()) + + graph.addEdge("C", defaultEdge, "B") + assert(graph.hasCycle()) + + graph.addEdge("C", defaultEdge, "A") + assert(graph.hasCycle()) + } + + property("topologicalOrderIterator and topologicalOrderWithCirclesIterator method should " + + "return equal order of graph with no circle") { + val graph = Graph(1 ~> 2 ~> 3, 4 ~> 2, 2 ~> 5) + val topoNoCircles = graph.topologicalOrderIterator + val topoWithCircles = graph.topologicalOrderWithCirclesIterator + + assert(topoNoCircles.zip(topoWithCircles).forall(x => x._1 == x._2)) + } + + property("Topological sort of graph with circles should work properly") { + val graph = Graph(0 ~> 1 ~> 3 ~> 4 ~> 6 ~> 5 ~> 7, + 4 ~> 1, 1 ~> 2 ~> 4, 7 ~> 6, 8 ~> 2, 6 ~> 9, 4 ~> 10) + val topoWithCircles = graph.topologicalOrderWithCirclesIterator + val trueTopoWithCircles = Iterator[Int](0, 8, 1, 3, 4, 2, 6, 5, 7, 10, 9) + + assert(trueTopoWithCircles.zip(topoWithCircles).forall(x => x._1 == x._2)) + } + + property("Duplicated edges detecting should work properly") { + val graph = Graph.empty[String, String] + val defaultEdge = "edge" + val anotherEdge = "edge2" + graph.addVertex("A") + graph.addVertex("B") + graph.addEdge("A", defaultEdge, "B") + + assert(!graph.hasDuplicatedEdge()) + + graph.addEdge("A", anotherEdge, "B") + + assert(graph.hasDuplicatedEdge()) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/util/TimeOutSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/util/TimeOutSchedulerSpec.scala b/core/src/test/scala/org/apache/gearpump/util/TimeOutSchedulerSpec.scala new file mode 100644 index 0000000..6f18497 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/util/TimeOutSchedulerSpec.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import scala.concurrent.duration._ + +import akka.actor._ +import akka.testkit.{ImplicitSender, TestActorRef, TestKit, TestProbe} +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} +import org.slf4j.Logger + +import org.apache.gearpump.cluster.TestUtil + +class TimeOutSchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender + with WordSpecLike with Matchers with BeforeAndAfterAll { + + def this() = this(ActorSystem("WorkerSpec", TestUtil.DEFAULT_CONFIG)) + val mockActor = TestProbe() + + override def afterAll { + TestKit.shutdownActorSystem(system) + } + + "The TimeOutScheduler" should { + "handle the time out event" in { + val testActorRef = TestActorRef(Props(classOf[TestActor], mockActor.ref)) + val testActor = testActorRef.underlyingActor.asInstanceOf[TestActor] + testActor.sendMsgToIgnore() + mockActor.expectMsg(30.seconds, MessageTimeOut) + } + } +} + +case object Echo +case object MessageTimeOut + +class TestActor(mock: ActorRef) extends Actor with TimeOutScheduler { + private val LOG: Logger = LogUtil.getLogger(getClass) + + val target = context.actorOf(Props(classOf[EchoActor])) + + override def receive: Receive = { + case _ => + } + + def sendMsgToIgnore(): Unit = { + sendMsgWithTimeOutCallBack(target, Echo, 2000, sendMsgTimeOut()) + } + + private def sendMsgTimeOut(): Unit = { + mock ! MessageTimeOut + } +} + +class EchoActor extends Actor { + override def receive: Receive = { + case _ => + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/util/UtilSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/util/UtilSpec.scala b/core/src/test/scala/org/apache/gearpump/util/UtilSpec.scala new file mode 100644 index 0000000..6e6cec1 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/util/UtilSpec.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FlatSpec, Matchers} + +import org.apache.gearpump.transport.HostPort +import org.apache.gearpump.util.Util._ + +class UtilSpec extends FlatSpec with Matchers with MockitoSugar { + it should "work" in { + + assert(findFreePort().isSuccess) + + assert(randInt() != randInt()) + + val hosts = parseHostList("host1:1,host2:2") + assert(hosts(1) == HostPort("host2", 2)) + + assert(Util.getCurrentClassPath.length > 0) + } + + it should "check application name properly" in { + assert(Util.validApplicationName("_application_1")) + assert(Util.validApplicationName("application_1_")) + assert(!Util.validApplicationName("0_application_1")) + assert(!Util.validApplicationName("_applicat&&ion_1")) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService ---------------------------------------------------------------------- diff --git a/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService b/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService deleted file mode 100644 index d226af9..0000000 --- a/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -io.gearpump.jarstore.local.LocalJarStoreService -io.gearpump.jarstore.dfs.DFSJarStoreService \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService ---------------------------------------------------------------------- diff --git a/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService b/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService new file mode 100644 index 0000000..bf37316 --- /dev/null +++ b/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.gearpump.jarstore.local.LocalJarStoreService +org.apache.gearpump.jarstore.dfs.DFSJarStoreService \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala deleted file mode 100644 index 420f1b6..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster - -import akka.actor.ActorRef - -import io.gearpump.cluster.master.Master.MasterInfo -import io.gearpump.cluster.scheduler.Resource -import io.gearpump.cluster.worker.WorkerId - -/** - * Cluster Bootup Flow - */ -object WorkerToMaster { - - /** When an worker is started, it sends RegisterNewWorker */ - case object RegisterNewWorker - - /** When worker lose connection with master, it tries to register itself again with old Id. */ - case class RegisterWorker(workerId: WorkerId) - - /** Worker is responsible to broadcast its current status to master */ - case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource) -} - -object MasterToWorker { - - /** Master confirm the reception of RegisterNewWorker or RegisterWorker */ - case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo) - - /** Worker have not received reply from master */ - case class UpdateResourceFailed(reason: String = null, ex: Throwable = null) - - /** Master is synced with worker on resource slots managed by current worker */ - case object UpdateResourceSucceed -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala b/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala deleted file mode 100644 index 53da645..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.embedded - -import scala.collection.JavaConverters._ -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{ActorRef, ActorSystem, Props} -import com.typesafe.config.{Config, ConfigValueFactory} - -import io.gearpump.cluster.ClusterConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.master.{Master => MasterActor} -import io.gearpump.cluster.worker.{Worker => WorkerActor} -import io.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER} -import io.gearpump.util.{LogUtil, Util} - -/** - * Create a in-process cluster with single worker - */ -class EmbeddedCluster(inputConfig: Config) { - - private val workerCount: Int = 1 - private var _master: ActorRef = null - private var _system: ActorSystem = null - private var _config: Config = null - - private val LOG = LogUtil.getLogger(getClass) - - def start(): Unit = { - val port = Util.findFreePort().get - val akkaConf = getConfig(inputConfig, port) - _config = akkaConf - val system = ActorSystem(MASTER, akkaConf) - - val master = system.actorOf(Props[MasterActor], MASTER) - - 0.until(workerCount).foreach { id => - system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id) - } - this._master = master - this._system = system - - LOG.info("=================================") - LOG.info("Local Cluster is started at: ") - LOG.info(s" 127.0.0.1:$port") - LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port") - } - - private def getConfig(inputConfig: Config, port: Int): Config = { - val config = inputConfig. - withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)). - withValue(GEARPUMP_CLUSTER_MASTERS, - ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)). - withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, - ConfigValueFactory.fromAnyRef(true)). - withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)). - withValue("akka.actor.provider", - ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider")) - config - } - - def newClientContext: ClientContext = { - ClientContext(_config, _system, _master) - } - - def stop(): Unit = { - _system.stop(_master) - _system.terminate() - Await.result(_system.whenTerminated, Duration.Inf) - } -} - -object EmbeddedCluster { - def apply(): EmbeddedCluster = { - new EmbeddedCluster(ClusterConfig.master()) - } -} \ No newline at end of file
