http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala index 01d3474..00bd408 100644 --- a/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala +++ b/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala @@ -18,8 +18,13 @@ package io.gearpump.cluster.appmaster +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor._ import akka.testkit.TestProbe +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + import io.gearpump.TestProbeUtil._ import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster import io.gearpump.cluster._ @@ -27,27 +32,28 @@ import io.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment._ import io.gearpump.cluster.appmaster.AppMasterRuntimeEnvironmentSpec.TestAppMasterEnv import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.StartExecutorSystems import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, MasterStopped} -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} -class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with BeforeAndAfterAll { +class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with BeforeAndAfterAll { implicit var system: ActorSystem = null - override def beforeAll() = { + override def beforeAll(): Unit = { system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) } - override def afterAll() = { - system.shutdown() - system.awaitTermination() + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } - "MasterWithExecutorSystemProvider" should "forward request StartExecutorSystem to ExecutorSystemProvider" in { + "MasterWithExecutorSystemProvider" should + "forward request StartExecutorSystem to ExecutorSystemProvider" in { val client = TestProbe() val master = TestProbe() val provider = TestProbe() val providerProps: Props = provider - val masterEnhanced = system.actorOf(Props(new MasterWithExecutorSystemProvider(master.ref, providerProps))) + val masterEnhanced = system.actorOf(Props( + new MasterWithExecutorSystemProvider(master.ref, providerProps))) val start = StartExecutorSystems(null, null) client.send(masterEnhanced, start) @@ -76,7 +82,8 @@ class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with Before } "AppMasterRuntimeEnvironment" should "start appMaster when master is connected" in { - val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = setupAppMasterRuntimeEnv + val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = + setupAppMasterRuntimeEnv() masterConnectionKeeper.send(runtimeEnv, MasterConnected) appMaster.expectMsg(StartAppMaster) @@ -84,7 +91,8 @@ class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with Before "AppMasterRuntimeEnvironment" should "shutdown itself when master is stopped" in { - val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = setupAppMasterRuntimeEnv + val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = + setupAppMasterRuntimeEnv() masterConnectionKeeper.send(runtimeEnv, MasterStopped) val client = TestProbe() @@ -94,7 +102,8 @@ class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with Before "AppMasterRuntimeEnvironment" should "shutdown itself when appMaster is stopped" in { - val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = setupAppMasterRuntimeEnv + val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = + setupAppMasterRuntimeEnv() val client = TestProbe() client.watch(runtimeEnv) @@ -102,13 +111,13 @@ class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with Before client.expectTerminated(runtimeEnv) } - private def setupAppMasterRuntimeEnv: TestAppMasterEnv = { - val appContext = AppMasterContext(0, null, null, null, null, null, null) + private def setupAppMasterRuntimeEnv(): TestAppMasterEnv = { + val appContext = AppMasterContext(0, null, null, null, null, null, null) val app = AppDescription("app", "AppMasterClass", null, null) val master = TestProbe() val masterFactory = (_: AppId, _: MasterActorRef) => toProps(master) val appMaster = TestProbe() - val appMasterFactory = (_: AppMasterContext, _: AppDescription)=> toProps(appMaster) + val appMasterFactory = (_: AppMasterContext, _: AppDescription) => toProps(appMaster) val masterConnectionKeeper = TestProbe() val masterConnectionKeeperFactory = (_: MasterActorRef, _: RegisterAppMaster, _: ListenerActorRef) => @@ -125,5 +134,7 @@ class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with Before object AppMasterRuntimeEnvironmentSpec { - case class TestAppMasterEnv(master: TestProbe, appMaster: TestProbe, connectionkeeper: TestProbe, appMasterRuntimeEnv: ActorRef) + case class TestAppMasterEnv( + master: TestProbe, appMaster: TestProbe, connectionkeeper: TestProbe, + appMasterRuntimeEnv: ActorRef) } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala index dc8e624..3595960 100644 --- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala +++ b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala @@ -18,24 +18,25 @@ package io.gearpump.cluster.appmaster +import scala.concurrent.Await +import scala.concurrent.duration._ + import akka.actor.{ActorSystem, Props} import akka.testkit.TestProbe import com.typesafe.config.ConfigValueFactory -import io.gearpump.WorkerId -import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor import io.gearpump.cluster.TestUtil import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._ import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.Session import io.gearpump.cluster.scheduler.Resource +import io.gearpump.cluster.worker.WorkerId import io.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, RegisterActorSystem} import io.gearpump.util.Constants -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import scala.concurrent.duration._ -class ExecutorSystemLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll { +class ExecutorSystemLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll { implicit var system: ActorSystem = null val workerId: WorkerId = WorkerId(0, 0L) val appId = 0 @@ -45,15 +46,15 @@ class ExecutorSystemLauncherSpec extends FlatSpec with Matchers with BeforeAndA val launchExecutorSystemTimeout = 3000 val activeConfig = TestUtil.DEFAULT_CONFIG. withValue(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS, - ConfigValueFactory.fromAnyRef(launchExecutorSystemTimeout)) + ConfigValueFactory.fromAnyRef(launchExecutorSystemTimeout)) - override def beforeAll() = { + override def beforeAll(): Unit = { system = ActorSystem("test", activeConfig) } - override def afterAll() = { - system.shutdown() - system.awaitTermination() + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } it should "report success when worker launch the system successfully" in { @@ -90,14 +91,15 @@ class ExecutorSystemLauncherSpec extends FlatSpec with Matchers with BeforeAndA client.expectTerminated(launcher) } - it should "report timeout when trying to start a executor system on worker, and worker doesn't response" in { + it should "report timeout when trying to start a executor system on worker, " + + "and worker doesn't response" in { val client = TestProbe() val worker = TestProbe() val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session))) client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, Resource(1))) client.watch(launcher) val waitFor = launchExecutorSystemTimeout + 10000 - client.expectMsgType[LaunchExecutorSystemTimeout](waitFor milliseconds) - client.expectTerminated(launcher, waitFor milliseconds) + client.expectMsgType[LaunchExecutorSystemTimeout](waitFor.milliseconds) + client.expectTerminated(launcher, waitFor.milliseconds) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala index 106d389..c2f6ee8 100644 --- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala +++ b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala @@ -18,20 +18,22 @@ package io.gearpump.cluster.appmaster +import scala.concurrent.Await +import scala.concurrent.duration._ + import akka.actor.{Actor, ActorSystem, Props} import akka.testkit.TestProbe -import io.gearpump.WorkerId +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + import io.gearpump.cluster.AppMasterToMaster.RequestResource import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated -import io.gearpump.cluster.{ExecutorJVMConfig, AppJar, TestUtil} import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._ import io.gearpump.cluster.appmaster.ExecutorSystemScheduler._ import io.gearpump.cluster.appmaster.ExecutorSystemSchedulerSpec.{ExecutorSystemLauncherStarted, MockExecutorSystemLauncher} import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} +import io.gearpump.cluster.worker.WorkerId +import io.gearpump.cluster.{AppJar, TestUtil} import io.gearpump.jarstore.FilePath -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - -import scala.concurrent.duration._ class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { val appId = 0 @@ -43,7 +45,7 @@ class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndA val start = StartExecutorSystems(Array(resourceRequest), emptyJvmConfig) implicit var system: ActorSystem = null - var worker:TestProbe = null + var worker: TestProbe = null var workerInfo: WorkerInfo = null var masterProxy: TestProbe = null var launcher: TestProbe = null @@ -58,20 +60,21 @@ class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndA client = TestProbe() val scheduler = system.actorOf( - Props(new ExecutorSystemScheduler(appId, masterProxy.ref, - (appId: Int, session:Session) => Props(new MockExecutorSystemLauncher(launcher, session))))) + Props(new ExecutorSystemScheduler(appId, masterProxy.ref, (appId: Int, session: Session) => { + Props(new MockExecutorSystemLauncher(launcher, session)) + }))) client.send(scheduler, start) masterProxy.expectMsg(RequestResource(appId, resourceRequest)) } override def afterEach(): Unit = { - system.shutdown() - system.awaitTermination() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } private def launcherStarted(launcher: TestProbe): Option[ExecutorSystemLauncherStarted] = { - val launcherStarted = launcher.receiveOne(15 seconds) + val launcherStarted = launcher.receiveOne(15.seconds) launcherStarted match { case start: ExecutorSystemLauncherStarted => Some(start) @@ -91,16 +94,18 @@ class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndA launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource)) val executorSystemProbe = TestProbe() - val executorSystem = ExecutorSystem(systemId, null, executorSystemProbe.ref, resource, workerInfo) + val executorSystem = + ExecutorSystem(systemId, null, executorSystemProbe.ref, resource, workerInfo) launcher.reply(LaunchExecutorSystemSuccess(executorSystem, session)) client.expectMsg(ExecutorSystemStarted(executorSystem, Some(mockJar))) } it should "report failure when resource cannot be allocated" in { - client.expectMsg(30 seconds, StartExecutorSystemTimeout) + client.expectMsg(30.seconds, StartExecutorSystemTimeout) } - it should "schedule new resouce on new worker when target worker reject creating executor system" in { + it should "schedule new resouce on new worker " + + "when target worker reject creating executor system" in { masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId)))) val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get @@ -110,7 +115,8 @@ class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndA masterProxy.expectMsg(RequestResource(appId, resourceRequest)) } - it should "report failure when resource is allocated, but timeout when starting the executor system" in { + it should "report failure when resource is allocated, but timeout " + + "when starting the executor system" in { masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId)))) val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala index 9221ca3..3272b99 100644 --- a/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala +++ b/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala @@ -18,51 +18,50 @@ package io.gearpump.cluster.appmaster +import scala.concurrent.Await +import scala.concurrent.duration._ + import akka.actor.{ActorRef, ActorSystem, Props} import akka.testkit.TestProbe -import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster -import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.MasterConnected -import io.gearpump.cluster.master.MasterProxy.WatchMaster +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster import io.gearpump.cluster.MasterToAppMaster.AppMasterRegistered import io.gearpump.cluster.TestUtil -import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus._ +import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, _} import io.gearpump.cluster.appmaster.MasterConnectionKeeperSpec.ConnectionKeeperTestEnv -import io.gearpump.cluster.master.MasterProxy import io.gearpump.cluster.master.MasterProxy.WatchMaster -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} -import scala.concurrent.duration._ - -class MasterConnectionKeeperSpec extends FlatSpec with Matchers with BeforeAndAfterAll { +class MasterConnectionKeeperSpec extends FlatSpec with Matchers with BeforeAndAfterAll { implicit var system: ActorSystem = null val register = RegisterAppMaster(null, null) val appId = 0 - override def beforeAll: Unit = { - system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - } + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } - override def afterAll: Unit = { - system.shutdown() - system.awaitTermination() + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } private def startMasterConnectionKeeper: ConnectionKeeperTestEnv = { val statusChangeSubscriber = TestProbe() val master = TestProbe() - val keeper = system.actorOf(Props(new MasterConnectionKeeper(register, master.ref, statusChangeSubscriber.ref))) + val keeper = system.actorOf(Props( + new MasterConnectionKeeper(register, master.ref, statusChangeSubscriber.ref))) statusChangeSubscriber.watch(keeper) master.expectMsgType[WatchMaster] - //master is alive, response to RegisterAppMaster + // Master is alive, response to RegisterAppMaster master.expectMsgType[RegisterAppMaster] master.reply(AppMasterRegistered(appId)) - //notify listener that master is alive + // Notify listener that master is alive statusChangeSubscriber.expectMsg(MasterConnected) ConnectionKeeperTestEnv(master, keeper, statusChangeSubscriber) } @@ -75,23 +74,23 @@ class MasterConnectionKeeperSpec extends FlatSpec with Matchers with BeforeAndA import io.gearpump.cluster.master.MasterProxy.MasterRestarted val ConnectionKeeperTestEnv(master, keeper, masterChangeListener) = startMasterConnectionKeeper - //master is restarted + // Master is restarted master.send(keeper, MasterRestarted) master.expectMsgType[RegisterAppMaster] master.reply(AppMasterRegistered(appId)) masterChangeListener.expectMsg(MasterConnected) - //Recovery from Master restart is transparent to listener + // Recovery from Master restart is transparent to listener masterChangeListener.expectNoMsg() } it should "notify listener and then shutdown itself when master is dead" in { val ConnectionKeeperTestEnv(master, keeper, masterChangeListener) = startMasterConnectionKeeper - //master is dead + // Master is dead master.send(keeper, MasterStopped) - //keeper should tell the listener that master is stopped before shutting down itself + // Keeper should tell the listener that master is stopped before shutting down itself masterChangeListener.expectMsg(MasterStopped) masterChangeListener.expectTerminated(keeper) } @@ -100,18 +99,20 @@ class MasterConnectionKeeperSpec extends FlatSpec with Matchers with BeforeAndA val statusChangeSubscriber = TestProbe() val master = TestProbe() - //keeper will register to master by sending RegisterAppMaster - val keeper = system.actorOf(Props(new MasterConnectionKeeper(register, master.ref, statusChangeSubscriber.ref))) + // MasterConnectionKeeper register itself to master by sending RegisterAppMaster + val keeper = system.actorOf(Props(new MasterConnectionKeeper(register, + master.ref, statusChangeSubscriber.ref))) - //master doesn't reply to keeper, + // Master doesn't reply to keeper, statusChangeSubscriber.watch(keeper) - //timeout, keeper notify listener, and then make suicide - statusChangeSubscriber.expectMsg(60 seconds, MasterStopped) - statusChangeSubscriber.expectTerminated(keeper, 60 seconds) + // Timeout, keeper notify listener, and then make suicide + statusChangeSubscriber.expectMsg(60.seconds, MasterStopped) + statusChangeSubscriber.expectTerminated(keeper, 60.seconds) } } object MasterConnectionKeeperSpec { - case class ConnectionKeeperTestEnv(master: TestProbe, keeper: ActorRef, masterChangeListener: TestProbe) + case class ConnectionKeeperTestEnv( + master: TestProbe, keeper: ActorRef, masterChangeListener: TestProbe) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala b/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala index f08e368..7544f9a 100644 --- a/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala +++ b/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala @@ -30,7 +30,7 @@ class ArgumentParserSpec extends FlatSpec with Matchers { "opt2" -> CLIOption[Any]("", required = true)) } - val result = parser.parse(Array("-flag" , "-opt1", "1","-opt2", "2", "arg1", "arg2")) + val result = parser.parse(Array("-flag", "-opt1", "1", "-opt2", "2", "arg1", "arg2")) assert(result.getBoolean("flag")) assert(result.getInt("opt1") == 1) assert(result.getString("opt1") == "1") @@ -47,7 +47,7 @@ class ArgumentParserSpec extends FlatSpec with Matchers { "opt1" -> CLIOption[Any]("", required = true)) } - val result = parser.parse(Array("-opt1", "1","xx.MainClass", "-opt2", "2")) + val result = parser.parse(Array("-opt1", "1", "xx.MainClass", "-opt2", "2")) assert(result.getInt("opt1") == 1) assert(result.remainArgs.length == 3) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala index 8ff2ea1..2a8dba1 100644 --- a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala +++ b/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,24 +18,27 @@ package io.gearpump.cluster.master +import scala.util.Success + import akka.actor._ import akka.testkit.TestProbe -import io.gearpump.WorkerId +import com.typesafe.config.Config +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + import io.gearpump.cluster.AppMasterToMaster.RequestResource -import io.gearpump.cluster.AppMasterToWorker.{ShutdownExecutor, LaunchExecutor} +import io.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor} +import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated import io.gearpump.cluster.MasterToClient.SubmitApplicationResult import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected -import io.gearpump.cluster.{TestUtil, MasterHarness} -import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated -import io.gearpump.cluster.scheduler.{ResourceRequest, Resource, ResourceAllocation} +import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} +import io.gearpump.cluster.worker.WorkerId +import io.gearpump.cluster.{MasterHarness, TestUtil} import io.gearpump.util.ActorSystemBooter._ -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - -import scala.util.Success -class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { +class AppMasterLauncherSpec extends FlatSpec with Matchers + with BeforeAndAfterEach with MasterHarness { - override def config = TestUtil.DEFAULT_CONFIG + override def config: Config = TestUtil.DEFAULT_CONFIG val appId = 1 val executorId = 2 @@ -45,7 +48,7 @@ class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEa var watcher: TestProbe = null var appMasterLauncher: ActorRef = null - override def beforeEach() = { + override def beforeEach(): Unit = { startActorSystem() master = createMockMaster() client = TestProbe()(getActorSystem) @@ -55,12 +58,13 @@ class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEa TestUtil.dummyApp, None, "username", master.ref, Some(client.ref))) watcher watch appMasterLauncher master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified))) - val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L)))) + val resource = ResourceAllocated( + Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L)))) master.reply(resource) worker.expectMsgType[LaunchExecutor] } - override def afterEach() = { + override def afterEach(): Unit = { shutdownActorSystem() } @@ -79,7 +83,8 @@ class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEa worker.reply(ExecutorLaunchRejected("")) master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified))) - val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L)))) + val resource = ResourceAllocated( + Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L)))) master.reply(resource) worker.expectMsgType[LaunchExecutor] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala index c1d6144..99cfc37 100644 --- a/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala +++ b/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala @@ -18,11 +18,11 @@ package io.gearpump.cluster.master +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + import io.gearpump.cluster.appmaster.ApplicationState -import io.gearpump.cluster.{AppJar, AppDescription} -import org.scalatest.{BeforeAndAfterEach, Matchers, FlatSpec} -class ApplicationStateSpec extends FlatSpec with Matchers with BeforeAndAfterEach { +class ApplicationStateSpec extends FlatSpec with Matchers with BeforeAndAfterEach { "ApplicationState" should "check equal with respect to only appId and attemptId" in { val stateA = ApplicationState(0, "application0", 0, null, null, null, "A") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala b/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala index 690d8ad..b007120 100644 --- a/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala +++ b/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala @@ -18,8 +18,7 @@ package io.gearpump.cluster.master -//TODO class MasterProxySpec { - // master proxy will retry to find the master + // Master proxy retries multiple times to find the master } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala index 972f9d5..e4122da 100644 --- a/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala +++ b/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala @@ -19,5 +19,4 @@ package io.gearpump.cluster.master class MasterSpec { - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala b/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala index 1602d10..3b3265f 100644 --- a/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala +++ b/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala @@ -18,13 +18,14 @@ package io.gearpump.metrics -import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter, Histogram => CodaHaleHistogram, Meter => CodaHaleMeter} import org.mockito.Matchers._ import org.mockito.Mockito._ -import org.scalatest.{FlatSpec, Matchers} import org.scalatest.mock.MockitoSugar +import org.scalatest.{FlatSpec, Matchers} -class MetricsSpec extends FlatSpec with Matchers with MockitoSugar{ +import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter, Histogram => CodaHaleHistogram, Meter => CodaHaleMeter} + +class MetricsSpec extends FlatSpec with Matchers with MockitoSugar { "Counter" should "handle sampleRate == 1" in { @@ -83,7 +84,6 @@ class MetricsSpec extends FlatSpec with Matchers with MockitoSugar{ verify(mockBaseHistogram, times(1)).update(6L) } - "Meter" should "handle sampleRate == 1" in { val mockBaseMeter = mock[CodaHaleMeter] @@ -101,7 +101,7 @@ class MetricsSpec extends FlatSpec with Matchers with MockitoSugar{ val mockBaseMeter = mock[CodaHaleMeter] - val meter = new Meter("m",mockBaseMeter, 2) + val meter = new Meter("m", mockBaseMeter, 2) meter.mark(1) meter.mark(3) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala b/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala index d5a3c18..9509d94 100644 --- a/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala +++ b/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala @@ -18,11 +18,12 @@ package io.gearpump.partitioner -import io.gearpump.Message import org.scalatest.{FlatSpec, Matchers} -class PartitionerSpec extends FlatSpec with Matchers { - val NUM = 10 +import io.gearpump.Message + +class PartitionerSpec extends FlatSpec with Matchers { + val NUM = 10 "HashPartitioner" should "hash same key to same slots" in { val partitioner = new HashPartitioner http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala b/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala index c64cfa7..c38a555 100644 --- a/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala +++ b/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,20 +18,20 @@ package io.gearpump.security -import akka.actor.ActorSystem -import io.gearpump.cluster.TestUtil -import io.gearpump.security.Authenticator.AuthenticationResult -import org.scalatest.{Matchers, FlatSpec} - import scala.concurrent.Await import scala.concurrent.duration._ -class ConfigFileBasedAuthenticatorSpec extends FlatSpec with Matchers { +import akka.actor.ActorSystem +import org.scalatest.{FlatSpec, Matchers} + +import io.gearpump.cluster.TestUtil + +class ConfigFileBasedAuthenticatorSpec extends FlatSpec with Matchers { it should "authenticate correctly" in { val config = TestUtil.UI_CONFIG implicit val system = ActorSystem("ConfigFileBasedAuthenticatorSpec", config) implicit val ec = system.dispatcher - val timeout = 30 seconds + val timeout = 30.seconds val authenticator = new ConfigFileBasedAuthenticator(config) val guest = Await.result(authenticator.authenticate("guest", "guest", ec), timeout) @@ -48,7 +48,7 @@ class ConfigFileBasedAuthenticatorSpec extends FlatSpec with Matchers { assert(failedGuest == Authenticator.UnAuthenticated) assert(failedAdmin == Authenticator.UnAuthenticated) - system.shutdown() - system.awaitTermination() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala b/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala index 69abb80..4a3963b 100644 --- a/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala +++ b/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,7 +18,7 @@ package io.gearpump.security -import org.scalatest.{Matchers, FlatSpec} +import org.scalatest.{FlatSpec, Matchers} class PasswordUtilSpec extends FlatSpec with Matchers { @@ -28,11 +28,10 @@ class PasswordUtilSpec extends FlatSpec with Matchers { val digest1 = PasswordUtil.hash(password) val digest2 = PasswordUtil.hash(password) - // we will use different salt each time, thus - // creating different hash. + // Uses different salt each time, thus creating different hash. assert(digest1 != digest2) - // both are valid hash. + // Both are valid hash. assert(PasswordUtil.verify(password, digest1)) assert(PasswordUtil.verify(password, digest2)) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala b/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala index c18f4d5..3ed6ffa 100644 --- a/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala +++ b/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala @@ -18,16 +18,19 @@ package io.gearpump.serializer +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.{ActorSystem, ExtendedActorSystem} -import io.gearpump.esotericsoftware.kryo.io.{Input, Output} -import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer} import com.typesafe.config.{ConfigFactory, ConfigValueFactory} -import io.gearpump.cluster.TestUtil -import io.gearpump.serializer.SerializerSpec._ import org.scalatest.mock.MockitoSugar import org.scalatest.{FlatSpec, Matchers} -import scala.collection.JavaConverters._ +import io.gearpump.cluster.TestUtil +import io.gearpump.esotericsoftware.kryo.io.{Input, Output} +import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer} +import io.gearpump.serializer.SerializerSpec._ class SerializerSpec extends FlatSpec with Matchers with MockitoSugar { val config = ConfigFactory.empty.withValue("gearpump.serializers", @@ -56,8 +59,8 @@ class SerializerSpec extends FlatSpec with Matchers with MockitoSugar { val anotherA = serializer.deserialize(bytes) assert(anotherA.isInstanceOf[ClassA]) - system.shutdown() - system.awaitTermination() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala b/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala index 8582f29..71b4218 100644 --- a/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala +++ b/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala @@ -15,15 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.gearpump.transport import java.io.{DataInput, DataOutput} import io.gearpump.transport.MockTransportSerializer.NettyMessage import io.gearpump.transport.netty.ITransportMessageSerializer -import org.jboss.netty.buffer.ChannelBuffer -class MockTransportSerializer extends ITransportMessageSerializer{ +class MockTransportSerializer extends ITransportMessageSerializer { override def getLength(obj: scala.Any): Int = 4 override def serialize(dataOutput: DataOutput, transportMessage: scala.Any): Unit = { @@ -38,6 +38,6 @@ class MockTransportSerializer extends ITransportMessageSerializer{ } } -object MockTransportSerializer{ - case class NettyMessage(num : Int) +object MockTransportSerializer { + case class NettyMessage(num: Int) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/transport/NettySpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/transport/NettySpec.scala b/core/src/test/scala/io/gearpump/transport/NettySpec.scala index a81e21d..6caf357 100644 --- a/core/src/test/scala/io/gearpump/transport/NettySpec.scala +++ b/core/src/test/scala/io/gearpump/transport/NettySpec.scala @@ -19,18 +19,19 @@ package io.gearpump.transport import java.util.concurrent.TimeUnit +import scala.concurrent.Await +import scala.concurrent.duration._ import akka.actor.{ActorRef, ActorSystem} import akka.testkit.TestProbe +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FlatSpec, Matchers} + import io.gearpump.cluster.TestUtil import io.gearpump.transport.MockTransportSerializer.NettyMessage import io.gearpump.transport.netty.{Context, TaskMessage} -import org.scalatest.mock.MockitoSugar -import org.scalatest.{FlatSpec, Matchers} import io.gearpump.util.Util -import scala.concurrent.duration._ - class NettySpec extends FlatSpec with Matchers with MockitoSugar { "Netty Transport" should "send and receive message correctly " in { @@ -39,7 +40,7 @@ class NettySpec extends FlatSpec with Matchers with MockitoSugar { val context = new Context(system, conf) val serverActor = TestProbe()(system) - val port = Util.findFreePort + val port = Util.findFreePort() import system.dispatcher system.scheduler.scheduleOnce(Duration(1, TimeUnit.SECONDS)) { @@ -52,10 +53,10 @@ class NettySpec extends FlatSpec with Matchers with MockitoSugar { val data = NettyMessage(0) val msg = new TaskMessage(0, 1, 2, data) client ! msg - serverActor.expectMsg(15 seconds, data) - - context.close - system.shutdown() - system.awaitTermination() + serverActor.expectMsg(15.seconds, data) + + context.close() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala b/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala index d5068fb..13530ed 100644 --- a/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala +++ b/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala @@ -18,22 +18,24 @@ package io.gearpump.util +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.{Actor, ActorSystem, Props} import akka.testkit.TestProbe -import io.gearpump.cluster.TestUtil -import io.gearpump.util.ActorSystemBooter.{ActorCreated, RegisterActorSystem} -import io.gearpump.cluster.TestUtil -import io.gearpump.util.ActorSystemBooter._ -import io.gearpump.util.ActorSystemBooterSpec._ import org.scalatest.mock.MockitoSugar import org.scalatest.{FlatSpec, Matchers} +import io.gearpump.cluster.TestUtil +import io.gearpump.util.ActorSystemBooter.{ActorCreated, RegisterActorSystem, _} +import io.gearpump.util.ActorSystemBooterSpec._ + class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar { "ActorSystemBooter" should "report its address back" in { val boot = bootSystem() boot.prob.expectMsgType[RegisterActorSystem] - boot.shutdown + boot.shutdown() } "ActorSystemBooter" should "terminate itself when parent actor dies" in { @@ -44,9 +46,9 @@ class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar { boot.prob.reply(ActorSystemRegistered(boot.prob.ref)) boot.prob.reply(BindLifeCycle(dummy)) boot.host.stop(dummy) - val terminated = retry(5)(boot.bootedSystem.isTerminated) + val terminated = retry(5)(boot.bootedSystem.whenTerminated.isCompleted) assert(terminated) - boot.shutdown + boot.shutdown() } "ActorSystemBooter" should "create new actor" in { @@ -59,10 +61,10 @@ class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar { boot.prob.reply(CreateActor(Props(classOf[AcceptZeroArguments]), "zero")) boot.prob.expectMsgType[ActorCreated] - boot.shutdown + boot.shutdown() } - private def bootSystem() : Boot = { + private def bootSystem(): Boot = { val booter = ActorSystemBooter(TestUtil.DEFAULT_CONFIG) val system = ActorSystem("reportback", TestUtil.DEFAULT_CONFIG) @@ -75,16 +77,16 @@ class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar { Boot(system, receiver, bootSystem) } - case class Boot(host : ActorSystem, prob : TestProbe, bootedSystem : ActorSystem) { - def shutdown = { - host.shutdown() - bootedSystem.shutdown() - host.awaitTermination() - bootedSystem.awaitTermination() + case class Boot(host: ActorSystem, prob: TestProbe, bootedSystem: ActorSystem) { + def shutdown(): Unit = { + host.terminate() + bootedSystem.terminate() + Await.result(host.whenTerminated, Duration.Inf) + Await.result(bootedSystem.whenTerminated, Duration.Inf) } } - def retry(seconds: Int)(fn: => Boolean) : Boolean = { + def retry(seconds: Int)(fn: => Boolean): Boolean = { val result = fn if (result) { result @@ -97,19 +99,19 @@ class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar { object ActorSystemBooterSpec { class Dummy extends Actor { - def receive : Receive = { + def receive: Receive = { case _ => } } class AcceptZeroArguments extends Actor { - def receive : Receive = { + def receive: Receive = { case _ => } } - class AcceptThreeArguments(a : Int, b : Int, c : Int) extends Actor { - def receive : Receive = { + class AcceptThreeArguments(a: Int, b: Int, c: Int) extends Actor { + def receive: Receive = { case _ => } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala b/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala index ec72810..6ab5a2f 100644 --- a/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala +++ b/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,23 +18,23 @@ package io.gearpump.util +import org.scalatest.FlatSpec + import io.gearpump.transport.HostPort -import org.scalatest.mock.MockitoSugar -import org.scalatest.{Matchers, FlatSpec} -class ActorUtilSpec extends FlatSpec { - "masterActorPath" should "construct the ActorPath from HostPort" in { - import Constants.MASTER +class ActorUtilSpec extends FlatSpec { + "masterActorPath" should "construct the ActorPath from HostPort" in { + import io.gearpump.util.Constants.MASTER - val host = "127.0.0.1" - val port = 3000 - val master = HostPort("127.0.0.1", 3000) - val masterPath = ActorUtil.getMasterActorPath(master) - assert(masterPath.address.port == Some(port)) - assert(masterPath.address.system == MASTER) - assert(masterPath.address.host == Some(host)) - assert(masterPath.address.protocol == "akka.tcp") - assert(masterPath.toStringWithoutAddress == s"/user/$MASTER") - } + val host = "127.0.0.1" + val port = 3000 + val master = HostPort("127.0.0.1", 3000) + val masterPath = ActorUtil.getMasterActorPath(master) + assert(masterPath.address.port == Some(port)) + assert(masterPath.address.system == MASTER) + assert(masterPath.address.host == Some(host)) + assert(masterPath.address.protocol == "akka.tcp") + assert(masterPath.toStringWithoutAddress == s"/user/$MASTER") + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala b/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala index 07e8121..0c798f3 100644 --- a/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala +++ b/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala @@ -19,16 +19,20 @@ package io.gearpump.util import java.io.File +import scala.concurrent.Await +import scala.concurrent.duration.Duration import akka.actor.ActorSystem -import io.gearpump.cluster.{ClusterConfigSource, ClusterConfig, UserConfig} import org.scalatest.mock.MockitoSugar import org.scalatest.{FlatSpec, Matchers} -class ConfigsSpec extends FlatSpec with Matchers with MockitoSugar { +import io.gearpump.cluster.{ClusterConfig, ClusterConfigSource, UserConfig} + +class ConfigsSpec extends FlatSpec with Matchers with MockitoSugar { "Typesafe Cluster Configs" should "follow the override rules" in { - val conf = """ + val conf = + """ gearpump { gear = "gearpump" } @@ -40,7 +44,7 @@ class ConfigsSpec extends FlatSpec with Matchers with MockitoSugar { conf = "worker" } conf = "base" - """ + """ val file = File.createTempFile("test", ".conf") FileUtils.write(file, conf) @@ -68,8 +72,7 @@ class ConfigsSpec extends FlatSpec with Matchers with MockitoSugar { implicit val system = ActorSystem("forSerialization") - - val map = Map[String,String]("key1"->"1", "key2"->"value2") + val map = Map[String, String]("key1" -> "1", "key2" -> "value2") val user = new UserConfig(map) .withLong("key3", 2L) @@ -86,11 +89,11 @@ class ConfigsSpec extends FlatSpec with Matchers with MockitoSugar { val data = new ConfigsSpec.Data(3) assert(data == user.withValue("data", data).getValue[ConfigsSpec.Data]("data").get) - system.shutdown() - system.awaitTermination() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } } -object ConfigsSpec{ +object ConfigsSpec { case class Data(value: Int) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala b/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala index 1c954d5..30e42c7 100644 --- a/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala +++ b/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala @@ -16,12 +16,13 @@ * limitations under the License. */ - package io.gearpump.util import java.io.File import java.util + import org.scalatest.FlatSpec + import io.gearpump.google.common.io.Files class FileUtilsSpec extends FlatSpec { @@ -43,7 +44,7 @@ class FileUtilsSpec extends FlatSpec { val file = File.createTempFile("fileutilspec", ".test") val bytes = TXT.toCharArray.map(_.toByte) FileUtils.writeByteArrayToFile(file, bytes) - util.Arrays.equals(bytes,FileUtils.readFileToByteArray(file)) + util.Arrays.equals(bytes, FileUtils.readFileToByteArray(file)) file.delete() } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/GraphSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/util/GraphSpec.scala b/core/src/test/scala/io/gearpump/util/GraphSpec.scala index 0028446..20eab6d 100644 --- a/core/src/test/scala/io/gearpump/util/GraphSpec.scala +++ b/core/src/test/scala/io/gearpump/util/GraphSpec.scala @@ -20,9 +20,9 @@ package io.gearpump.util import org.scalacheck.Gen import org.scalatest.prop.PropertyChecks -import org.scalatest.{PropSpec, Matchers} +import org.scalatest.{Matchers, PropSpec} -import io.gearpump.util.Graph.{Path, Node} +import io.gearpump.util.Graph.{Node, Path} class GraphSpec extends PropSpec with PropertyChecks with Matchers { @@ -33,7 +33,7 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers { property("Graph with no edges should be built correctly") { val vertexSet = Set("A", "B", "C") - val graph = Graph(vertexSet.toSeq.map(Node):_ *) + val graph = Graph(vertexSet.toSeq.map(Node): _*) graph.vertices.toSet shouldBe vertexSet } @@ -107,7 +107,7 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers { val levelMap = graph.vertexHierarchyLevelMap() - //check whether the rule holds: : if vertex A -> B, then level(A) < level(B) + // Check whether the rule holds: : if vertex A -> B, then level(A) < level(B) levelMap("A") < levelMap("B") levelMap("A") < levelMap("C") levelMap("B") < levelMap("C") @@ -178,8 +178,8 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers { assert(graph.hasCycle()) } - property("topologicalOrderIterator " + - "and topologicalOrderWithCirclesIterator method should return equal order of graph with no circle") { + property("topologicalOrderIterator and topologicalOrderWithCirclesIterator method should " + + "return equal order of graph with no circle") { val graph = Graph(1 ~> 2 ~> 3, 4 ~> 2, 2 ~> 5) val topoNoCircles = graph.topologicalOrderIterator val topoWithCircles = graph.topologicalOrderWithCirclesIterator @@ -188,9 +188,10 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers { } property("Topological sort of graph with circles should work properly") { - val graph = Graph(0 ~> 1 ~> 3 ~> 4 ~> 6 ~> 5 ~> 7, 4 ~> 1, 1 ~> 2 ~> 4, 7 ~> 6, 8 ~> 2, 6 ~> 9, 4 ~> 10) + val graph = Graph(0 ~> 1 ~> 3 ~> 4 ~> 6 ~> 5 ~> 7, + 4 ~> 1, 1 ~> 2 ~> 4, 7 ~> 6, 8 ~> 2, 6 ~> 9, 4 ~> 10) val topoWithCircles = graph.topologicalOrderWithCirclesIterator - val trueTopoWithCircles = Iterator[Int](0, 8, 1, 3, 4, 2, 6 ,5, 7, 10, 9) + val trueTopoWithCircles = Iterator[Int](0, 8, 1, 3, 4, 2, 6, 5, 7, 10, 9) assert(trueTopoWithCircles.zip(topoWithCircles).forall(x => x._1 == x._2)) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala b/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala index 213bd96..ef362ff 100644 --- a/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala +++ b/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,15 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.gearpump.util +import scala.concurrent.duration._ + import akka.actor._ import akka.testkit.{ImplicitSender, TestActorRef, TestKit, TestProbe} -import io.gearpump.cluster.TestUtil -import io.gearpump.cluster.TestUtil import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} import org.slf4j.Logger -import scala.concurrent.duration._ + +import io.gearpump.cluster.TestUtil class TimeOutSchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { @@ -40,7 +42,7 @@ class TimeOutSchedulerSpec(_system: ActorSystem) extends TestKit(_system) with I val testActorRef = TestActorRef(Props(classOf[TestActor], mockActor.ref)) val testActor = testActorRef.underlyingActor.asInstanceOf[TestActor] testActor.sendMsgToIgnore() - mockActor.expectMsg(30 seconds, MessageTimeOut) + mockActor.expectMsg(30.seconds, MessageTimeOut) } } } @@ -60,7 +62,7 @@ class TestActor(mock: ActorRef) extends Actor with TimeOutScheduler { def sendMsgToIgnore(): Unit = { sendMsgWithTimeOutCallBack(target, Echo, 2000, sendMsgTimeOut()) } - + private def sendMsgTimeOut(): Unit = { mock ! MessageTimeOut } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/UtilSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/util/UtilSpec.scala b/core/src/test/scala/io/gearpump/util/UtilSpec.scala index 8d614fe..b5bde04 100644 --- a/core/src/test/scala/io/gearpump/util/UtilSpec.scala +++ b/core/src/test/scala/io/gearpump/util/UtilSpec.scala @@ -18,17 +18,18 @@ package io.gearpump.util -import io.gearpump.transport.HostPort -import io.gearpump.util.Util._ import org.scalatest.mock.MockitoSugar import org.scalatest.{FlatSpec, Matchers} +import io.gearpump.transport.HostPort +import io.gearpump.util.Util._ + class UtilSpec extends FlatSpec with Matchers with MockitoSugar { it should "work" in { - assert(findFreePort.isSuccess) + assert(findFreePort().isSuccess) - assert(randInt != randInt) + assert(randInt() != randInt()) val hosts = parseHostList("host1:1,host2:2") assert(hosts(1) == HostPort("host2", 2)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService ---------------------------------------------------------------------- diff --git a/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService b/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService index f0e0c5c..d226af9 100644 --- a/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService +++ b/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService @@ -1,2 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + io.gearpump.jarstore.local.LocalJarStoreService io.gearpump.jarstore.dfs.DFSJarStoreService \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala index ac942ed..420f1b6 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,21 +18,34 @@ package io.gearpump.cluster import akka.actor.ActorRef -import io.gearpump.WorkerId + import io.gearpump.cluster.master.Master.MasterInfo import io.gearpump.cluster.scheduler.Resource +import io.gearpump.cluster.worker.WorkerId /** * Cluster Bootup Flow */ object WorkerToMaster { + + /** When an worker is started, it sends RegisterNewWorker */ case object RegisterNewWorker + + /** When worker lose connection with master, it tries to register itself again with old Id. */ case class RegisterWorker(workerId: WorkerId) + + /** Worker is responsible to broadcast its current status to master */ case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource) } object MasterToWorker { - case class WorkerRegistered(workerId : WorkerId, masterInfo: MasterInfo) - case class UpdateResourceFailed(reason : String = null, ex: Throwable = null) + + /** Master confirm the reception of RegisterNewWorker or RegisterWorker */ + case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo) + + /** Worker have not received reply from master */ + case class UpdateResourceFailed(reason: String = null, ex: Throwable = null) + + /** Master is synced with worker on resource slots managed by current worker */ case object UpdateResourceSucceed } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala b/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala index 6be52b2..53da645 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,16 +18,19 @@ package io.gearpump.cluster.embedded +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.{ActorRef, ActorSystem, Props} -import com.typesafe.config.{ConfigValueFactory, Config} +import com.typesafe.config.{Config, ConfigValueFactory} + import io.gearpump.cluster.ClusterConfig import io.gearpump.cluster.client.ClientContext import io.gearpump.cluster.master.{Master => MasterActor} import io.gearpump.cluster.worker.{Worker => WorkerActor} -import io.gearpump.util.{LogUtil, Constants, Util, ActorUtil} -import io.gearpump.util.Constants.{GEARPUMP_METRIC_ENABLED, GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, MASTER, GEARPUMP_CLUSTER_MASTERS} -import scala.collection.JavaConverters._ - +import io.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER} +import io.gearpump.util.{LogUtil, Util} /** * Create a in-process cluster with single worker @@ -41,8 +44,8 @@ class EmbeddedCluster(inputConfig: Config) { private val LOG = LogUtil.getLogger(getClass) - def start: Unit = { - val port = Util.findFreePort.get + def start(): Unit = { + val port = Util.findFreePort().get val akkaConf = getConfig(inputConfig, port) _config = akkaConf val system = ActorSystem(MASTER, akkaConf) @@ -64,10 +67,13 @@ class EmbeddedCluster(inputConfig: Config) { private def getConfig(inputConfig: Config, port: Int): Config = { val config = inputConfig. withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)). - withValue(GEARPUMP_CLUSTER_MASTERS, ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)). - withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, ConfigValueFactory.fromAnyRef(true)). + withValue(GEARPUMP_CLUSTER_MASTERS, + ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)). + withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, + ConfigValueFactory.fromAnyRef(true)). withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)). - withValue("akka.actor.provider", ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider")) + withValue("akka.actor.provider", + ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider")) config } @@ -75,14 +81,14 @@ class EmbeddedCluster(inputConfig: Config) { ClientContext(_config, _system, _master) } - def stop: Unit = { + def stop(): Unit = { _system.stop(_master) - _system.shutdown() - _system.awaitTermination() + _system.terminate() + Await.result(_system.whenTerminated, Duration.Inf) } } -object EmbeddedCluster{ +object EmbeddedCluster { def apply(): EmbeddedCluster = { new EmbeddedCluster(ClusterConfig.master()) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala b/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala index 90f653c..68f778e 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,13 +20,13 @@ package io.gearpump.cluster.main import java.io.File import java.net.{URL, URLClassLoader} import java.util.jar.JarFile +import scala.util.Try -import io.gearpump.util.{Util, Constants} -import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util} import org.slf4j.Logger -import scala.util.Try +import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util} +/** Tool to submit an application jar to cluster */ object AppSubmitter extends AkkaApp with ArgumentsParser { val LOG: Logger = LogUtil.getLogger(getClass) @@ -35,60 +35,66 @@ object AppSubmitter extends AkkaApp with ArgumentsParser { override val description = "Submit an application to Master by providing a jar" override val options: Array[(String, CLIOption[Any])] = Array( - "namePrefix" -> CLIOption[String]("<application name prefix>", required = false, defaultValue = Some("")), + "namePrefix" -> CLIOption[String]("<application name prefix>", required = false, + defaultValue = Some("")), "jar" -> CLIOption("<application>.jar", required = true), - "executors" -> CLIOption[Int]("number of executor to launch", required = false, defaultValue = Some(1)), - "verbose" -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false)), + "executors" -> CLIOption[Int]("number of executor to launch", required = false, + defaultValue = Some(1)), + "verbose" -> CLIOption("<print verbose log on console>", required = false, + defaultValue = Some(false)), // For document purpose only, OPTION_CONFIG option is not used here. // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. - Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, defaultValue = None)) + Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, + defaultValue = None)) def main(akkaConf: Config, args: Array[String]): Unit = { val config = parse(args) - if (null == config) { - return - } + if (null != config) { - val verbose = config.getBoolean("verbose") - if (verbose) { - LogUtil.verboseLogToConsole - } + val verbose = config.getBoolean("verbose") + if (verbose) { + LogUtil.verboseLogToConsole() + } - val jar = config.getString("jar") + val jar = config.getString("jar") - // Set jar path to be submitted to cluster - System.setProperty(Constants.GEARPUMP_APP_JAR, jar) - System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString) + // Set jar path to be submitted to cluster + System.setProperty(Constants.GEARPUMP_APP_JAR, jar) + System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString) - val namePrefix = config.getString("namePrefix") - if (namePrefix.nonEmpty) { - if (!Util.validApplicationName(namePrefix)) { - throw new Exception(s"$namePrefix is not a valid prefix for an application name") + val namePrefix = config.getString("namePrefix") + if (namePrefix.nonEmpty) { + if (!Util.validApplicationName(namePrefix)) { + throw new Exception(s"$namePrefix is not a valid prefix for an application name") + } + System.setProperty(Constants.GEARPUMP_APP_NAME_PREFIX, namePrefix) } - System.setProperty(Constants.GEARPUMP_APP_NAME_PREFIX, namePrefix) - } - val jarFile = new java.io.File(jar) + val jarFile = new java.io.File(jar) - //start main class - if (!jarFile.exists()) { - throw new Exception(s"jar $jar does not exist") - } + // Start main class + if (!jarFile.exists()) { + throw new Exception(s"jar $jar does not exist") + } - val classLoader: URLClassLoader = new URLClassLoader(Array(new URL("file:" + jarFile.getAbsolutePath)), - Thread.currentThread().getContextClassLoader()) - val (main, arguments) = parseMain(jarFile, config.remainArgs, classLoader) + val classLoader: URLClassLoader = new URLClassLoader(Array(new URL("file:" + + jarFile.getAbsolutePath)), Thread.currentThread().getContextClassLoader()) + val (main, arguments) = parseMain(jarFile, config.remainArgs, classLoader) - //set the context classloader as ActorSystem will use context classloader in precedence. - Thread.currentThread().setContextClassLoader(classLoader) - val clazz = classLoader.loadClass(main) - val mainMethod = clazz.getMethod("main", classOf[Array[String]]) - mainMethod.invoke(null, arguments) + // Set to context classloader. ActorSystem pick context classloader in preference + Thread.currentThread().setContextClassLoader(classLoader) + val clazz = classLoader.loadClass(main) + val mainMethod = clazz.getMethod("main", classOf[Array[String]]) + mainMethod.invoke(null, arguments) + } } - private def parseMain(jar: File, remainArgs: Array[String], classLoader: ClassLoader): (String, Array[String]) = { - val mainInManifest = Option(new JarFile(jar).getManifest.getMainAttributes.getValue("Main-Class")).getOrElse("") + private def parseMain(jar: File, remainArgs: Array[String], classLoader: ClassLoader) + : (String, Array[String]) = { + val mainInManifest = Option(new JarFile(jar).getManifest.getMainAttributes. + getValue("Main-Class")).getOrElse("") + if (remainArgs.length > 0 && Try(classLoader.loadClass(remainArgs(0))).isSuccess) { (remainArgs(0), remainArgs.drop(1)) } else if (mainInManifest.nonEmpty) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala index 31b7492..4423727 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,10 +17,11 @@ */ package io.gearpump.cluster.main -import io.gearpump.util.{Constants, LogUtil} import org.slf4j.Logger -object Gear { +import io.gearpump.util.{Constants, LogUtil} + +object Gear { val OPTION_CONFIG = "conf" @@ -29,12 +30,14 @@ object Gear { val commands = Map("app" -> AppSubmitter, "kill" -> Kill, "info" -> Info, "replay" -> Replay, "main" -> MainRunner) - def usage: Unit = { + def usage(): Unit = { val keys = commands.keys.toList.sorted + // scalastyle:off println Console.err.println("Usage: " + "<" + keys.mkString("|") + ">") + // scalastyle:on println } - def executeCommand(command : String, commandArgs : Array[String]) = { + private def executeCommand(command: String, commandArgs: Array[String]) = { commands.get(command).map(_.main(commandArgs)) if (!commands.contains(command)) { val allArgs = (command +: commandArgs.toList).toArray @@ -42,15 +45,15 @@ object Gear { } } - def main(inputArgs: Array[String]) = { + def main(inputArgs: Array[String]): Unit = { val (configFile, args) = extractConfig(inputArgs) if (configFile != null) { - // set custom config file... + // Sets custom config file... System.setProperty(Constants.GEARPUMP_CUSTOM_CONFIG_FILE, configFile) } if (args.length == 0) { - usage + usage() } else { val command = args(0) val commandArgs = args.drop(1) @@ -62,7 +65,7 @@ object Gear { var index = 0 var result = List.empty[String] - var configFile:String = null + var configFile: String = null while (index < inputArgs.length) { val item = inputArgs(index) if (item == s"-$OPTION_CONFIG") { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala index 878bcbf..4922690 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,13 +17,13 @@ */ package io.gearpump.cluster.main +import org.slf4j.Logger + import io.gearpump.cluster.MasterToAppMaster.AppMastersData import io.gearpump.cluster.client.ClientContext import io.gearpump.util.{AkkaApp, LogUtil} -import org.slf4j.Logger - -import scala.util.Try +/** Tool to query master info */ object Info extends AkkaApp with ArgumentsParser { private val LOG: Logger = LogUtil.getLogger(getClass) @@ -31,11 +31,12 @@ object Info extends AkkaApp with ArgumentsParser { override val options: Array[(String, CLIOption[Any])] = Array( // For document purpose only, OPTION_CONFIG option is not used here. // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. - Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, defaultValue = None)) - + Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, + defaultValue = None)) override val description = "Query the Application list" + // scalastyle:off println def main(akkaConf: Config, args: Array[String]): Unit = { val client = ClientContext(akkaConf) @@ -48,4 +49,5 @@ object Info extends AkkaApp with ArgumentsParser { } client.close() } + // scalastyle:on println } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala index 195809c..3ce781f 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,12 +18,12 @@ package io.gearpump.cluster.main -import io.gearpump.cluster.client.ClientContext -import io.gearpump.util.{AkkaApp, LogUtil} import org.slf4j.Logger -import scala.util.Try +import io.gearpump.cluster.client.ClientContext +import io.gearpump.util.{AkkaApp, LogUtil} +/** Tool to kill an App */ object Kill extends AkkaApp with ArgumentsParser { private val LOG: Logger = LogUtil.getLogger(getClass) @@ -32,20 +32,19 @@ object Kill extends AkkaApp with ArgumentsParser { "appid" -> CLIOption("<application id>", required = true), // For document purpose only, OPTION_CONFIG option is not used here. // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. - Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, defaultValue = None)) + Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, + defaultValue = None)) override val description = "Kill an application with application Id" def main(akkaConf: Config, args: Array[String]): Unit = { val config = parse(args) - if (null == config) { - return + if (null != config) { + val client = ClientContext(akkaConf) + LOG.info("Client ") + client.shutdown(config.getInt("appid")) + client.close() } - - val client = ClientContext(akkaConf) - LOG.info("Client ") - client.shutdown(config.getInt("appid")) - client.close() } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala index d6b2479..d5681df 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,17 +18,20 @@ package io.gearpump.cluster.main +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.{ActorSystem, Props} import com.typesafe.config.ConfigValueFactory +import org.slf4j.Logger + import io.gearpump.cluster.ClusterConfig import io.gearpump.cluster.master.{Master => MasterActor} import io.gearpump.cluster.worker.{Worker => WorkerActor} import io.gearpump.util.Constants._ import io.gearpump.util.LogUtil.ProcessType -import io.gearpump.util.{AkkaApp, ActorUtil, Constants, LogUtil, Util} -import org.slf4j.Logger - -import scala.collection.JavaConversions._ +import io.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util} object Local extends AkkaApp with ArgumentsParser { override def akkaConfig: Config = ClusterConfig.master() @@ -37,7 +40,8 @@ object Local extends AkkaApp with ArgumentsParser { override val options: Array[(String, CLIOption[Any])] = Array("sameprocess" -> CLIOption[Boolean]("", required = false, defaultValue = Some(false)), - "workernum"-> CLIOption[Int]("<how many workers to start>", required = false, defaultValue = Some(2))) + "workernum" -> CLIOption[Int]("<how many workers to start>", required = false, + defaultValue = Some(2))) override val description = "Start a local cluster" @@ -49,23 +53,23 @@ object Local extends AkkaApp with ArgumentsParser { } val config = parse(args) - if (null == config) { - return + if (null != config) { + local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf) } - local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf) } - def local(workerCount : Int, sameProcess : Boolean, akkaConf: Config) : Unit = { + def local(workerCount: Int, sameProcess: Boolean, akkaConf: Config): Unit = { if (sameProcess) { LOG.info("Starting local in same process") System.setProperty("LOCAL", "true") } - val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).toList.flatMap(Util.parseHostList) + val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS) + .asScala.flatMap(Util.parseHostList) val local = akkaConf.getString(Constants.GEARPUMP_HOSTNAME) - if(masters.size != 1 && masters.head.host != local) { - LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match with ${Constants.GEARPUMP_HOSTNAME}") - + if (masters.size != 1 && masters.head.host != local) { + LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match " + + s"with ${Constants.GEARPUMP_HOSTNAME}") } else { val hostPort = masters.head @@ -80,7 +84,7 @@ object Local extends AkkaApp with ArgumentsParser { system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id) } - system.awaitTermination() + Await.result(system.whenTerminated, Duration.Inf) } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala b/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala index ea235ed..923a646 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,19 +18,19 @@ package io.gearpump.cluster.main -import io.gearpump.util.{LogUtil, AkkaApp} -import io.gearpump.util.{AkkaApp, LogUtil} import org.slf4j.Logger -import scala.util.Try +import io.gearpump.util.{AkkaApp, LogUtil} +/** Tool to run any main class by providing a jar */ object MainRunner extends AkkaApp with ArgumentsParser { private val LOG: Logger = LogUtil.getLogger(getClass) override val options: Array[(String, CLIOption[Any])] = Array( // For document purpose only, OPTION_CONFIG option is not used here. // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. - Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, defaultValue = None)) + Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, + defaultValue = None)) def main(akkaConf: Config, args: Array[String]): Unit = { val mainClazz = args(0)
