http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellSpec.scala deleted file mode 100644 index 6eeba58..0000000 --- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellSpec.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.examples.distributedshell - -import scala.concurrent.Future -import scala.util.Success - -import com.typesafe.config.Config -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} - -import io.gearpump.cluster.ClientToMaster.SubmitApplication -import io.gearpump.cluster.MasterToClient.SubmitApplicationResult -import io.gearpump.cluster.{MasterHarness, TestUtil} - -class DistributedShellSpec - extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { - - before { - startActorSystem() - } - - after { - shutdownActorSystem() - } - - override def config: Config = TestUtil.DEFAULT_CONFIG - - property("DistributedShell should succeed to submit application with required arguments") { - val requiredArgs = Array.empty[String] - - val masterReceiver = createMockMaster() - - Future { - DistributedShell.main(masterConfig, requiredArgs) - } - - masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) - masterReceiver.reply(SubmitApplicationResult(Success(0))) - } -}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala deleted file mode 100644 index d59981b..0000000 --- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.examples.distributedshell - -import org.scalatest.{BeforeAndAfter, Matchers, WordSpec} - -import io.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommandResult, ShellCommandResultAggregator} - -class ShellCommandResultAggregatorSpec extends WordSpec with Matchers with BeforeAndAfter { - "ShellCommandResultAggregator" should { - "aggregate ShellCommandResult" in { - val executorId1 = 1 - val executorId2 = 2 - val responseBuilder = new ShellCommandResultAggregator - val response1 = ShellCommandResult(executorId1, "task1") - val response2 = ShellCommandResult(executorId2, "task2") - val result = responseBuilder.aggregate(response1).aggregate(response2).toString() - val expected = s"Execute results from executor $executorId1 : \ntask1\n" + - s"Execute results from executor $executorId2 : \ntask2\n" - assert(result == expected) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala deleted file mode 100644 index b301973..0000000 --- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.examples.distributedshell - -import scala.concurrent.Await -import scala.concurrent.duration.Duration -import scala.sys.process._ -import scala.util.{Failure, Success, Try} - -import akka.actor.{ActorSystem, Props} -import akka.testkit.TestProbe -import org.scalatest.{Matchers, WordSpec} - -import io.gearpump.cluster.appmaster.WorkerInfo -import io.gearpump.cluster.scheduler.Resource -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.cluster.{ExecutorContext, TestUtil, UserConfig} -import io.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommand, ShellCommandResult} - -class ShellExecutorSpec extends WordSpec with Matchers { - - "ShellExecutor" should { - "execute the shell command and return the result" in { - val executorId = 1 - val workerId = WorkerId(2, 0L) - val appId = 0 - val appName = "app" - val resource = Resource(1) - implicit val system = ActorSystem("ShellExecutor", TestUtil.DEFAULT_CONFIG) - val mockMaster = TestProbe()(system) - val worker = TestProbe() - val workerInfo = WorkerInfo(workerId, worker.ref) - val executorContext = ExecutorContext(executorId, workerInfo, appId, appName, - mockMaster.ref, resource) - val executor = system.actorOf(Props(classOf[ShellExecutor], executorContext, - UserConfig.empty)) - - val process = Try(s"ls /".!!) - val result = process match { - case Success(msg) => msg - case Failure(ex) => ex.getMessage - } - executor.tell(ShellCommand("ls /"), mockMaster.ref) - assert(mockMaster.receiveN(1).head.asInstanceOf[ShellCommandResult].equals( - ShellCommandResult(executorId, result))) - - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala new file mode 100644 index 0000000..e22abaf --- /dev/null +++ b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.examples.distributedshell + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +import akka.actor.ActorSystem +import akka.testkit.{TestActorRef, TestProbe} +import org.scalatest.{BeforeAndAfter, Matchers, WordSpec} + +import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterAppMaster, RequestResource} +import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated, WorkerList} +import org.apache.gearpump.cluster._ +import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo} +import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceAllocation, ResourceRequest} +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem +import org.apache.gearpump.util.ActorUtil + +class DistShellAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter { + implicit val system = ActorSystem("AppMasterSpec", TestUtil.DEFAULT_CONFIG) + val mockMaster = TestProbe()(system) + val mockWorker1 = TestProbe()(system) + val masterProxy = mockMaster.ref + val appId = 0 + val userName = "test" + val masterExecutorId = 0 + val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L)) + val resource = Resource(1) + val appJar = None + val appDescription = AppDescription("app0", classOf[DistShellAppMaster].getName, UserConfig.empty) + + "DistributedShell AppMaster" should { + "launch one ShellTask on each worker" in { + val appMasterInfo = AppMasterRuntimeInfo(appId, appName = appId.toString) + val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, + masterProxy, appMasterInfo) + TestActorRef[DistShellAppMaster]( + AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, appMasterContext)) + mockMaster.expectMsgType[RegisterAppMaster] + mockMaster.reply(AppMasterRegistered(appId)) + // The DistributedShell AppMaster asks for worker list from Master. + mockMaster.expectMsg(GetAllWorkers) + mockMaster.reply(WorkerList(workerList)) + // After worker list is ready, DistributedShell AppMaster requests resource on each worker + workerList.foreach { workerId => + mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId, + relaxation = Relaxation.SPECIFICWORKER))) + } + mockMaster.reply(ResourceAllocated( + Array(ResourceAllocation(resource, mockWorker1.ref, WorkerId(1, 0L))))) + mockWorker1.expectMsgClass(classOf[LaunchExecutor]) + mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString)) + } + } + + after { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClientSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClientSpec.scala b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClientSpec.scala new file mode 100644 index 0000000..7cfd07a --- /dev/null +++ b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClientSpec.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.examples.distributedshell + +import scala.concurrent.Future +import scala.util.{Success, Try} + +import akka.testkit.TestProbe +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + +import org.apache.gearpump.cluster.ClientToMaster.ResolveAppId +import org.apache.gearpump.cluster.MasterToClient.ResolveAppIdResult +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} +import org.apache.gearpump.examples.distributedshell.DistShellAppMaster.ShellCommand +import org.apache.gearpump.util.LogUtil + +class DistributedShellClientSpec + extends PropSpec with Matchers with BeforeAndAfter with MasterHarness { + + private val LOG = LogUtil.getLogger(getClass) + + before { + startActorSystem() + } + + after { + shutdownActorSystem() + } + + protected override def config = TestUtil.DEFAULT_CONFIG + + property("DistributedShellClient should succeed to submit application with required arguments") { + val command = "ls /" + val requiredArgs = Array("-appid", "0", "-command", command) + val masterReceiver = createMockMaster() + + assert(Try(DistributedShellClient.main(Array.empty[String])).isFailure, + "missing required arguments, print usage") + + Future { + DistributedShellClient.main(masterConfig, requiredArgs) + } + + masterReceiver.expectMsg(PROCESS_BOOT_TIME, ResolveAppId(0)) + val mockAppMaster = TestProbe()(getActorSystem) + masterReceiver.reply(ResolveAppIdResult(Success(mockAppMaster.ref))) + LOG.info(s"Reply back ResolveAppIdResult, current actorRef: ${mockAppMaster.ref.path.toString}") + mockAppMaster.expectMsg(PROCESS_BOOT_TIME, ShellCommand(command)) + mockAppMaster.reply("result") + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellSpec.scala b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellSpec.scala new file mode 100644 index 0000000..51b5ec3 --- /dev/null +++ b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellSpec.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.examples.distributedshell + +import scala.concurrent.Future +import scala.util.Success + +import com.typesafe.config.Config +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + +import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication +import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} + +class DistributedShellSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { + + before { + startActorSystem() + } + + after { + shutdownActorSystem() + } + + override def config: Config = TestUtil.DEFAULT_CONFIG + + property("DistributedShell should succeed to submit application with required arguments") { + val requiredArgs = Array.empty[String] + + val masterReceiver = createMockMaster() + + Future { + DistributedShell.main(masterConfig, requiredArgs) + } + + masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) + masterReceiver.reply(SubmitApplicationResult(Success(0))) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala new file mode 100644 index 0000000..11350a6 --- /dev/null +++ b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.examples.distributedshell + +import org.scalatest.{BeforeAndAfter, Matchers, WordSpec} + +import org.apache.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommandResult, ShellCommandResultAggregator} + +class ShellCommandResultAggregatorSpec extends WordSpec with Matchers with BeforeAndAfter { + "ShellCommandResultAggregator" should { + "aggregate ShellCommandResult" in { + val executorId1 = 1 + val executorId2 = 2 + val responseBuilder = new ShellCommandResultAggregator + val response1 = ShellCommandResult(executorId1, "task1") + val response2 = ShellCommandResult(executorId2, "task2") + val result = responseBuilder.aggregate(response1).aggregate(response2).toString() + val expected = s"Execute results from executor $executorId1 : \ntask1\n" + + s"Execute results from executor $executorId2 : \ntask2\n" + assert(result == expected) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellExecutorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellExecutorSpec.scala b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellExecutorSpec.scala new file mode 100644 index 0000000..e7a3a21 --- /dev/null +++ b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellExecutorSpec.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.examples.distributedshell + +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.sys.process._ +import scala.util.{Failure, Success, Try} + +import akka.actor.{ActorSystem, Props} +import akka.testkit.TestProbe +import org.scalatest.{Matchers, WordSpec} + +import org.apache.gearpump.cluster.appmaster.WorkerInfo +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.cluster.{ExecutorContext, TestUtil, UserConfig} +import org.apache.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommand, ShellCommandResult} + +class ShellExecutorSpec extends WordSpec with Matchers { + + "ShellExecutor" should { + "execute the shell command and return the result" in { + val executorId = 1 + val workerId = WorkerId(2, 0L) + val appId = 0 + val appName = "app" + val resource = Resource(1) + implicit val system = ActorSystem("ShellExecutor", TestUtil.DEFAULT_CONFIG) + val mockMaster = TestProbe()(system) + val worker = TestProbe() + val workerInfo = WorkerInfo(workerId, worker.ref) + val executorContext = ExecutorContext(executorId, workerInfo, appId, appName, + mockMaster.ref, resource) + val executor = system.actorOf(Props(classOf[ShellExecutor], executorContext, + UserConfig.empty)) + + val process = Try(s"ls /".!!) + val result = process match { + case Success(msg) => msg + case Failure(ex) => ex.getMessage + } + executor.tell(ShellCommand("ls /"), mockMaster.ref) + assert(mockMaster.receiveN(1).head.asInstanceOf[ShellCommandResult].equals( + ShellCommandResult(executorId, result))) + + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/README.md ---------------------------------------------------------------------- diff --git a/examples/distributeservice/README.md b/examples/distributeservice/README.md index 82b3726..65d41f6 100644 --- a/examples/distributeservice/README.md +++ b/examples/distributeservice/README.md @@ -6,12 +6,12 @@ In order to run the example: 2. Start the AppMaster:<br> ```bash - target/pack/bin/gear app -jar experiments/distributeservice/target/$SCALA_VERSION_MAJOR/gearpump-experiments-distributeservice_$VERSION.jar io.gearpump.distributeservice.DistributeService + target/pack/bin/gear app -jar experiments/distributeservice/target/$SCALA_VERSION_MAJOR/gearpump-experiments-distributeservice_$VERSION.jar org.apache.gearpump.distributeservice.DistributeService ``` 3. Distribute the service:<br> ```bash target/pack/bin/gear app -jar experiments/distributeservice/target/$SCALA_VERSION_MAJOR/gearpump-experiments-distributeservice_$VERSION.jar - io.gearpump.distributeservice.DistributeServiceClient -appid $APPID -file ${File_Path} + org.apache.gearpump.distributeservice.DistributeServiceClient -appid $APPID -file ${File_Path} -script ${Script_Path} -serviceName ${Service_Name} -target ${Target_Path} -Dkey1=value1 -Dkey2=value2 ```<br> This command will distribute the service zip file(variable ```file```) to the target path(variable ```target```), then copy the script to http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceAppMaster.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceAppMaster.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceAppMaster.scala deleted file mode 100644 index a220dc6..0000000 --- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceAppMaster.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.experiments.distributeservice - -import java.io.File -import scala.concurrent.Future - -import akka.actor.{Deploy, Props} -import akka.pattern.{ask, pipe} -import akka.remote.RemoteScope -import com.typesafe.config.Config -import org.slf4j.Logger - -import io.gearpump.cluster.ClientToMaster.ShutdownApplication -import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout} -import io.gearpump.cluster.{AppDescription, AppMasterContext, ApplicationMaster, ExecutorContext} -import io.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer, InstallService} -import io.gearpump.util._ - -class DistServiceAppMaster(appContext: AppMasterContext, app: AppDescription) - extends ApplicationMaster { - import appContext._ - import context.dispatcher - implicit val timeout = Constants.FUTURE_TIMEOUT - private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) - private var currentExecutorId = 0 - private var fileServerPort = -1 - - val rootDirectory = new File("/") - val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME) - val server = context.actorOf(Props(classOf[FileServer], rootDirectory, host, 0)) - - override def preStart(): Unit = { - LOG.info(s"Distribute Service AppMaster started") - ActorUtil.launchExecutorOnEachWorker(masterProxy, getExecutorJvmConfig, self) - } - - (server ? FileServer.GetPort).asInstanceOf[Future[FileServer.Port]] pipeTo self - - override def receive: Receive = { - case ExecutorSystemStarted(executorSystem, _) => - import executorSystem.{address, resource => executorResource, worker} - val executorContext = ExecutorContext(currentExecutorId, worker, - appId, app.name, self, executorResource) - // start executor - val executor = context.actorOf(Props(classOf[DistServiceExecutor], - executorContext, app.userConfig).withDeploy( - Deploy(scope = RemoteScope(address))), currentExecutorId.toString) - executorSystem.bindLifeCycleWith(executor) - currentExecutorId += 1 - case StartExecutorSystemTimeout => - LOG.error(s"Failed to allocate resource in time") - masterProxy ! ShutdownApplication(appId) - context.stop(self) - case FileServer.Port(port) => - this.fileServerPort = port - case GetFileContainer => - val name = Math.abs(new java.util.Random().nextLong()).toString - sender ! new FileContainer(s"http://$host:$fileServerPort/$name") - case installService: InstallService => - context.children.foreach(_ ! installService) - } - - private def getExecutorJvmConfig: ExecutorSystemJvmConfig = { - val config: Config = app.clusterConfig - val jvmSetting = Util.resolveJvmSetting( - config.withFallback(context.system.settings.config)).executor - ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, - appJar, username, config) - } -} - -object DistServiceAppMaster { - case object GetFileContainer - - case class FileContainer(url: String) - - case class InstallService( - url: String, - zipFileName: String, - targetPath: String, - script: Array[Byte], - serviceName: String, - serviceSettings: Map[String, Any]) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceExecutor.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceExecutor.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceExecutor.scala deleted file mode 100644 index 4a2a876..0000000 --- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceExecutor.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.experiments.distributeservice - -import java.io.{File, FileWriter} -import java.net.InetAddress -import scala.collection.JavaConverters._ -import scala.io.Source -import scala.sys.process._ -import scala.util.{Failure, Success, Try} - -import akka.actor.Actor -import org.apache.commons.io.FileUtils -import org.apache.commons.lang.text.StrSubstitutor -import org.slf4j.Logger - -import io.gearpump.cluster.{ExecutorContext, UserConfig} -import io.gearpump.experiments.distributeservice.DistServiceAppMaster.InstallService -import io.gearpump.util.{ActorUtil, LogUtil} - -class DistServiceExecutor(executorContext: ExecutorContext, userConf: UserConfig) extends Actor { - import executorContext._ - private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, app = appId) - - override def receive: Receive = { - case InstallService(url, zipFileName, targetPath, scriptData, serviceName, serviceSettings) => - LOG.info(s"Executor $executorId receive command to install " + - s"service $serviceName to $targetPath") - unzipFile(url, zipFileName, targetPath) - installService(scriptData, serviceName, serviceSettings) - } - - private def unzipFile(url: String, zipFileName: String, targetPath: String) = { - val zipFile = File.createTempFile(System.currentTimeMillis().toString, zipFileName) - val dir = new File(targetPath) - if (dir.exists()) { - FileUtils.forceDelete(dir) - } - val bytes = FileServer.newClient.get(url).get - FileUtils.writeByteArrayToFile(zipFile, bytes) - val result = Try(s"unzip ${zipFile.getAbsolutePath} -d $targetPath".!!) - result match { - case Success(msg) => LOG.info(s"Executor $executorId unzip file to $targetPath") - case Failure(ex) => throw ex - } - } - - private def installService( - scriptData: Array[Byte], serviceName: String, serviceSettings: Map[String, Any]) = { - val tempFile = File.createTempFile("gearpump", serviceName) - FileUtils.writeByteArrayToFile(tempFile, scriptData) - val script = new File("/etc/init.d", serviceName) - writeFileWithEnvVariables(tempFile, script, serviceSettings ++ getEnvSettings) - val result = Try(s"chkconfig --add $serviceName".!!) - result match { - case Success(msg) => LOG.info(s"Executor install service $serviceName successfully!") - case Failure(ex) => throw ex - } - } - - private def getEnvSettings: Map[String, Any] = { - Map("workerId" -> worker, - "localhost" -> ActorUtil.getSystemAddress(context.system).host.get, - "hostname" -> InetAddress.getLocalHost.getHostName) - } - - private def writeFileWithEnvVariables(source: File, target: File, envs: Map[String, Any]) = { - val writer = new FileWriter(target) - val sub = new StrSubstitutor(envs.asJava) - sub.setEnableSubstitutionInVariables(true) - Source.fromFile(source).getLines().foreach(line => writer.write(sub.replace(line) + "\r\n")) - writer.close() - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeService.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeService.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeService.scala deleted file mode 100644 index 522dc5e..0000000 --- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeService.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.experiments.distributeservice - -import org.slf4j.Logger - -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import io.gearpump.cluster.{Application, UserConfig} -import io.gearpump.util.{AkkaApp, LogUtil} - -/** Demo app to remotely deploy and start system service on machines in the cluster */ -object DistributeService extends AkkaApp with ArgumentsParser { - private val LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array.empty - - override def main(akkaConf: Config, args: Array[String]): Unit = { - LOG.info(s"Distribute Service submitting application...") - val context = ClientContext(akkaConf) - val appId = context.submit(Application[DistServiceAppMaster]("DistributedService", - UserConfig.empty)) - context.close() - LOG.info(s"Distribute Service Application started with appId $appId !") - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeServiceClient.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeServiceClient.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeServiceClient.scala deleted file mode 100644 index 0d85001..0000000 --- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeServiceClient.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.experiments.distributeservice - -import java.io.File -import scala.concurrent.Future -import scala.util.{Failure, Success} - -import akka.pattern.ask -import org.apache.commons.io.FileUtils - -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import io.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer, InstallService} -import io.gearpump.util.{AkkaApp, Constants} - -/** Client to submit the service jar */ -object DistributeServiceClient extends AkkaApp with ArgumentsParser { - implicit val timeout = Constants.FUTURE_TIMEOUT - - override val options: Array[(String, CLIOption[Any])] = Array( - "appid" -> CLIOption[Int]("<the distributed shell appid>", required = true), - "file" -> CLIOption[String]("<service zip file path>", required = true), - "script" -> CLIOption[String]( - "<file path of service script that will be installed to /etc/init.d>", required = true), - "serviceName" -> CLIOption[String]("<service name>", required = true), - "target" -> CLIOption[String]("<target path on each machine>", required = true) - ) - - override def help(): Unit = { - super.help() - // scalastyle:off println - Console.err.println(s"-D<name>=<value> set a property to the service") - // scalastyle:on println - } - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(filterCustomOptions(args)) - val context = ClientContext(akkaConf) - implicit val system = context.system - implicit val dispatcher = system.dispatcher - val appid = config.getInt("appid") - val zipFile = new File(config.getString("file")) - val script = new File(config.getString("script")) - val serviceName = config.getString("serviceName") - val appMaster = context.resolveAppID(appid) - (appMaster ? GetFileContainer).asInstanceOf[Future[FileContainer]].map { container => - val bytes = FileUtils.readFileToByteArray(zipFile) - val result = FileServer.newClient.save(container.url, bytes) - result match { - case Success(_) => - appMaster ! InstallService(container.url, zipFile.getName, config.getString("target"), - FileUtils.readFileToByteArray(script), serviceName, parseServiceConfig(args)) - context.close() - case Failure(ex) => throw ex - } - } - } - - private def filterCustomOptions(args: Array[String]): Array[String] = { - args.filter(!_.startsWith("-D")) - } - - private def parseServiceConfig(args: Array[String]): Map[String, Any] = { - val result = Map.empty[String, Any] - args.foldLeft(result) { (result, argument) => - if (argument.startsWith("-D") && argument.contains("=")) { - val fixedKV = argument.substring(2).split("=") - result + (fixedKV(0) -> fixedKV(1)) - } else { - result - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/FileServer.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/FileServer.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/FileServer.scala deleted file mode 100644 index ed0b24d..0000000 --- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/FileServer.scala +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.distributeservice - -import java.io.File -import scala.util.{Failure, Success, Try} - -import akka.actor.{Actor, Stash} -import akka.io.IO -import org.apache.commons.httpclient.HttpClient -import org.apache.commons.httpclient.methods.{ByteArrayRequestEntity, GetMethod, PostMethod} -import spray.can.Http -import spray.http.HttpMethods._ -import spray.http._ - -import io.gearpump.util.{FileUtils, LogUtil} - -/** - * - * Should not use this to server too big files(more than 100MB), otherwise OOM may happen. - * - * port: set port to 0 if you want to bind to random port - */ -class FileServer(rootDir: File, host: String, port: Int) extends Actor with Stash { - private val LOG = LogUtil.getLogger(getClass) - - implicit val system = context.system - - override def preStart(): Unit = { - // Creates http server - IO(Http) ! Http.Bind(self, host, port) - } - - override def postStop(): Unit = { - // Stop the server - IO(Http) ! Http.Unbind - } - - override def receive: Receive = { - case Http.Bound(address) => - LOG.info(s"FileServer bound on port: ${address.getPort}") - context.become(listen(address.getPort)) - unstashAll() - case _ => - stash() - } - - def listen(port: Int): Receive = { - case FileServer.GetPort => { - sender ! FileServer.Port(port) - } - case Http.Connected(remote, _) => - sender ! Http.Register(self) - - // Fetches files from remote uri - case HttpRequest(GET, uri, _, _, _) => - val child = uri.path.toString() - val payload = Try { - val source = new File(rootDir, child) - FileUtils.readFileToByteArray(source) - } - payload match { - case Success(data) => - sender ! HttpResponse(entity = HttpEntity(data)) - case Failure(ex) => - LOG.error("failed to get file " + ex.getMessage) - sender ! HttpResponse(status = StatusCodes.InternalServerError, entity = ex.getMessage) - } - // Save file to remote uri - case post@HttpRequest(POST, uri, _, _, _) => - val child = uri.path.toString() - - val status = Try { - val target = new File(rootDir, child) - val payload = post.entity.data.toByteArray - FileUtils.writeByteArrayToFile(target, payload) - "OK" - } - status match { - case Success(message) => sender ! HttpResponse(entity = message) - case Failure(ex) => - LOG.error("save file failed " + ex.getMessage) - sender ! HttpResponse(status = StatusCodes.InternalServerError, entity = ex.getMessage) - } - } -} - -object FileServer { - object GetPort - case class Port(port: Int) - - def newClient: Client = new Client - - class Client { - val client = new HttpClient() - - def save(uri: String, data: Array[Byte]): Try[Int] = { - Try { - val post = new PostMethod(uri) - val entity = new ByteArrayRequestEntity(data) - post.setRequestEntity(entity) - client.executeMethod(post) - } - } - - def get(uri: String): Try[Array[Byte]] = { - val get = new GetMethod(uri) - val status = Try { - client.executeMethod(get) - } - - val data = status.flatMap { code => - if (code == 200) { - Success(get.getResponseBody()) - } else { - Failure(new Exception(s"We cannot get the data, the status code is $code")) - } - } - data - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala new file mode 100644 index 0000000..ca0ab49 --- /dev/null +++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.distributeservice + +import java.io.File +import scala.concurrent.Future + +import akka.actor.{Deploy, Props} +import akka.pattern.{ask, pipe} +import akka.remote.RemoteScope +import com.typesafe.config.Config +import org.slf4j.Logger + +import org.apache.gearpump.cluster.ClientToMaster.ShutdownApplication +import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout} +import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, ApplicationMaster, ExecutorContext} +import org.apache.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer, InstallService} +import org.apache.gearpump.util._ + +class DistServiceAppMaster(appContext: AppMasterContext, app: AppDescription) + extends ApplicationMaster { + import appContext._ + import context.dispatcher + implicit val timeout = Constants.FUTURE_TIMEOUT + private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) + private var currentExecutorId = 0 + private var fileServerPort = -1 + + val rootDirectory = new File("/") + val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME) + val server = context.actorOf(Props(classOf[FileServer], rootDirectory, host, 0)) + + override def preStart(): Unit = { + LOG.info(s"Distribute Service AppMaster started") + ActorUtil.launchExecutorOnEachWorker(masterProxy, getExecutorJvmConfig, self) + } + + (server ? FileServer.GetPort).asInstanceOf[Future[FileServer.Port]] pipeTo self + + override def receive: Receive = { + case ExecutorSystemStarted(executorSystem, _) => + import executorSystem.{address, resource => executorResource, worker} + val executorContext = ExecutorContext(currentExecutorId, worker, + appId, app.name, self, executorResource) + // start executor + val executor = context.actorOf(Props(classOf[DistServiceExecutor], + executorContext, app.userConfig).withDeploy( + Deploy(scope = RemoteScope(address))), currentExecutorId.toString) + executorSystem.bindLifeCycleWith(executor) + currentExecutorId += 1 + case StartExecutorSystemTimeout => + LOG.error(s"Failed to allocate resource in time") + masterProxy ! ShutdownApplication(appId) + context.stop(self) + case FileServer.Port(port) => + this.fileServerPort = port + case GetFileContainer => + val name = Math.abs(new java.util.Random().nextLong()).toString + sender ! new FileContainer(s"http://$host:$fileServerPort/$name") + case installService: InstallService => + context.children.foreach(_ ! installService) + } + + private def getExecutorJvmConfig: ExecutorSystemJvmConfig = { + val config: Config = app.clusterConfig + val jvmSetting = Util.resolveJvmSetting( + config.withFallback(context.system.settings.config)).executor + ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, + appJar, username, config) + } +} + +object DistServiceAppMaster { + case object GetFileContainer + + case class FileContainer(url: String) + + case class InstallService( + url: String, + zipFileName: String, + targetPath: String, + script: Array[Byte], + serviceName: String, + serviceSettings: Map[String, Any]) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceExecutor.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceExecutor.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceExecutor.scala new file mode 100644 index 0000000..248156f --- /dev/null +++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceExecutor.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.distributeservice + +import java.io.{File, FileWriter} +import java.net.InetAddress +import scala.collection.JavaConverters._ +import scala.io.Source +import scala.sys.process._ +import scala.util.{Failure, Success, Try} + +import akka.actor.Actor +import org.apache.commons.io.FileUtils +import org.apache.commons.lang.text.StrSubstitutor +import org.slf4j.Logger + +import org.apache.gearpump.cluster.{ExecutorContext, UserConfig} +import org.apache.gearpump.experiments.distributeservice.DistServiceAppMaster.InstallService +import org.apache.gearpump.util.{ActorUtil, LogUtil} + +class DistServiceExecutor(executorContext: ExecutorContext, userConf: UserConfig) extends Actor { + import executorContext._ + private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, app = appId) + + override def receive: Receive = { + case InstallService(url, zipFileName, targetPath, scriptData, serviceName, serviceSettings) => + LOG.info(s"Executor $executorId receive command to install " + + s"service $serviceName to $targetPath") + unzipFile(url, zipFileName, targetPath) + installService(scriptData, serviceName, serviceSettings) + } + + private def unzipFile(url: String, zipFileName: String, targetPath: String) = { + val zipFile = File.createTempFile(System.currentTimeMillis().toString, zipFileName) + val dir = new File(targetPath) + if (dir.exists()) { + FileUtils.forceDelete(dir) + } + val bytes = FileServer.newClient.get(url).get + FileUtils.writeByteArrayToFile(zipFile, bytes) + val result = Try(s"unzip ${zipFile.getAbsolutePath} -d $targetPath".!!) + result match { + case Success(msg) => LOG.info(s"Executor $executorId unzip file to $targetPath") + case Failure(ex) => throw ex + } + } + + private def installService( + scriptData: Array[Byte], serviceName: String, serviceSettings: Map[String, Any]) = { + val tempFile = File.createTempFile("gearpump", serviceName) + FileUtils.writeByteArrayToFile(tempFile, scriptData) + val script = new File("/etc/init.d", serviceName) + writeFileWithEnvVariables(tempFile, script, serviceSettings ++ getEnvSettings) + val result = Try(s"chkconfig --add $serviceName".!!) + result match { + case Success(msg) => LOG.info(s"Executor install service $serviceName successfully!") + case Failure(ex) => throw ex + } + } + + private def getEnvSettings: Map[String, Any] = { + Map("workerId" -> worker, + "localhost" -> ActorUtil.getSystemAddress(context.system).host.get, + "hostname" -> InetAddress.getLocalHost.getHostName) + } + + private def writeFileWithEnvVariables(source: File, target: File, envs: Map[String, Any]) = { + val writer = new FileWriter(target) + val sub = new StrSubstitutor(envs.asJava) + sub.setEnableSubstitutionInVariables(true) + Source.fromFile(source).getLines().foreach(line => writer.write(sub.replace(line) + "\r\n")) + writer.close() + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala new file mode 100644 index 0000000..df7a517 --- /dev/null +++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.distributeservice + +import org.slf4j.Logger + +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.cluster.{Application, UserConfig} +import org.apache.gearpump.util.{AkkaApp, LogUtil} + +/** Demo app to remotely deploy and start system service on machines in the cluster */ +object DistributeService extends AkkaApp with ArgumentsParser { + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array.empty + + override def main(akkaConf: Config, args: Array[String]): Unit = { + LOG.info(s"Distribute Service submitting application...") + val context = ClientContext(akkaConf) + val appId = context.submit(Application[DistServiceAppMaster]("DistributedService", + UserConfig.empty)) + context.close() + LOG.info(s"Distribute Service Application started with appId $appId !") + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeServiceClient.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeServiceClient.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeServiceClient.scala new file mode 100644 index 0000000..b2c8f11 --- /dev/null +++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeServiceClient.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.distributeservice + +import java.io.File +import scala.concurrent.Future +import scala.util.{Failure, Success} + +import akka.pattern.ask +import org.apache.commons.io.FileUtils + +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer, InstallService} +import org.apache.gearpump.util.{AkkaApp, Constants} + +/** Client to submit the service jar */ +object DistributeServiceClient extends AkkaApp with ArgumentsParser { + implicit val timeout = Constants.FUTURE_TIMEOUT + + override val options: Array[(String, CLIOption[Any])] = Array( + "appid" -> CLIOption[Int]("<the distributed shell appid>", required = true), + "file" -> CLIOption[String]("<service zip file path>", required = true), + "script" -> CLIOption[String]( + "<file path of service script that will be installed to /etc/init.d>", required = true), + "serviceName" -> CLIOption[String]("<service name>", required = true), + "target" -> CLIOption[String]("<target path on each machine>", required = true) + ) + + override def help(): Unit = { + super.help() + // scalastyle:off println + Console.err.println(s"-D<name>=<value> set a property to the service") + // scalastyle:on println + } + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(filterCustomOptions(args)) + val context = ClientContext(akkaConf) + implicit val system = context.system + implicit val dispatcher = system.dispatcher + val appid = config.getInt("appid") + val zipFile = new File(config.getString("file")) + val script = new File(config.getString("script")) + val serviceName = config.getString("serviceName") + val appMaster = context.resolveAppID(appid) + (appMaster ? GetFileContainer).asInstanceOf[Future[FileContainer]].map { container => + val bytes = FileUtils.readFileToByteArray(zipFile) + val result = FileServer.newClient.save(container.url, bytes) + result match { + case Success(_) => + appMaster ! InstallService(container.url, zipFile.getName, config.getString("target"), + FileUtils.readFileToByteArray(script), serviceName, parseServiceConfig(args)) + context.close() + case Failure(ex) => throw ex + } + } + } + + private def filterCustomOptions(args: Array[String]): Array[String] = { + args.filter(!_.startsWith("-D")) + } + + private def parseServiceConfig(args: Array[String]): Map[String, Any] = { + val result = Map.empty[String, Any] + args.foldLeft(result) { (result, argument) => + if (argument.startsWith("-D") && argument.contains("=")) { + val fixedKV = argument.substring(2).split("=") + result + (fixedKV(0) -> fixedKV(1)) + } else { + result + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/FileServer.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/FileServer.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/FileServer.scala new file mode 100644 index 0000000..4cd71de --- /dev/null +++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/FileServer.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.distributeservice + +import java.io.File +import scala.util.{Failure, Success, Try} + +import akka.actor.{Actor, Stash} +import akka.io.IO +import org.apache.commons.httpclient.HttpClient +import org.apache.commons.httpclient.methods.{ByteArrayRequestEntity, GetMethod, PostMethod} +import spray.can.Http +import spray.http.HttpMethods._ +import spray.http._ + +import org.apache.gearpump.util.{FileUtils, LogUtil} + +/** + * + * Should not use this to server too big files(more than 100MB), otherwise OOM may happen. + * + * port: set port to 0 if you want to bind to random port + */ +class FileServer(rootDir: File, host: String, port: Int) extends Actor with Stash { + private val LOG = LogUtil.getLogger(getClass) + + implicit val system = context.system + + override def preStart(): Unit = { + // Creates http server + IO(Http) ! Http.Bind(self, host, port) + } + + override def postStop(): Unit = { + // Stop the server + IO(Http) ! Http.Unbind + } + + override def receive: Receive = { + case Http.Bound(address) => + LOG.info(s"FileServer bound on port: ${address.getPort}") + context.become(listen(address.getPort)) + unstashAll() + case _ => + stash() + } + + def listen(port: Int): Receive = { + case FileServer.GetPort => { + sender ! FileServer.Port(port) + } + case Http.Connected(remote, _) => + sender ! Http.Register(self) + + // Fetches files from remote uri + case HttpRequest(GET, uri, _, _, _) => + val child = uri.path.toString() + val payload = Try { + val source = new File(rootDir, child) + FileUtils.readFileToByteArray(source) + } + payload match { + case Success(data) => + sender ! HttpResponse(entity = HttpEntity(data)) + case Failure(ex) => + LOG.error("failed to get file " + ex.getMessage) + sender ! HttpResponse(status = StatusCodes.InternalServerError, entity = ex.getMessage) + } + // Save file to remote uri + case post@HttpRequest(POST, uri, _, _, _) => + val child = uri.path.toString() + + val status = Try { + val target = new File(rootDir, child) + val payload = post.entity.data.toByteArray + FileUtils.writeByteArrayToFile(target, payload) + "OK" + } + status match { + case Success(message) => sender ! HttpResponse(entity = message) + case Failure(ex) => + LOG.error("save file failed " + ex.getMessage) + sender ! HttpResponse(status = StatusCodes.InternalServerError, entity = ex.getMessage) + } + } +} + +object FileServer { + object GetPort + case class Port(port: Int) + + def newClient: Client = new Client + + class Client { + val client = new HttpClient() + + def save(uri: String, data: Array[Byte]): Try[Int] = { + Try { + val post = new PostMethod(uri) + val entity = new ByteArrayRequestEntity(data) + post.setRequestEntity(entity) + client.executeMethod(post) + } + } + + def get(uri: String): Try[Array[Byte]] = { + val get = new GetMethod(uri) + val status = Try { + client.executeMethod(get) + } + + val data = status.flatMap { code => + if (code == 200) { + Success(get.getResponseBody()) + } else { + Failure(new Exception(s"We cannot get the data, the status code is $code")) + } + } + data + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala b/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala deleted file mode 100644 index 5bafef1..0000000 --- a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.experiments.distributeservice - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.ActorSystem -import akka.testkit.{TestActorRef, TestProbe} -import org.scalatest.{BeforeAndAfter, Matchers, WordSpec} - -import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterAppMaster, RequestResource} -import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor -import io.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated, WorkerList} -import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo} -import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceAllocation, ResourceRequest} -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.cluster.{AppDescription, AppMasterContext, TestUtil, UserConfig} -import io.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer} -import io.gearpump.util.ActorSystemBooter.RegisterActorSystem -import io.gearpump.util.ActorUtil - -class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter { - implicit val system = ActorSystem("AppMasterSpec", TestUtil.DEFAULT_CONFIG) - val mockMaster = TestProbe()(system) - val mockWorker1 = TestProbe()(system) - val client = TestProbe()(system) - val masterProxy = mockMaster.ref - val appId = 0 - val userName = "test" - val masterExecutorId = 0 - val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L)) - val resource = Resource(1) - val appJar = None - val appDescription = AppDescription("app0", classOf[DistServiceAppMaster].getName, - UserConfig.empty) - - "DistService AppMaster" should { - "responsable for service distributing" in { - val appMasterInfo = AppMasterRuntimeInfo(appId, "appName", mockWorker1.ref) - val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, masterProxy, - appMasterInfo) - TestActorRef[DistServiceAppMaster]( - AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, appMasterContext)) - val registerAppMaster = mockMaster.receiveOne(15.seconds) - assert(registerAppMaster.isInstanceOf[RegisterAppMaster]) - - val appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster - mockMaster.reply(AppMasterRegistered(appId)) - // The DistributedShell AppMaster will ask for worker list - mockMaster.expectMsg(GetAllWorkers) - mockMaster.reply(WorkerList(workerList)) - // After worker list is ready, DistributedShell AppMaster will request resouce on each worker - workerList.foreach { workerId => - mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId, - relaxation = Relaxation.SPECIFICWORKER))) - } - mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, - WorkerId(1, 0L))))) - mockWorker1.expectMsgClass(classOf[LaunchExecutor]) - mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString)) - - appMaster.tell(GetFileContainer, client.ref) - client.expectMsgClass(15.seconds, classOf[FileContainer]) - } - } - - after { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala b/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala new file mode 100644 index 0000000..7516138 --- /dev/null +++ b/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.distributeservice + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.testkit.{TestActorRef, TestProbe} +import org.scalatest.{BeforeAndAfter, Matchers, WordSpec} + +import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterAppMaster, RequestResource} +import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated, WorkerList} +import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo} +import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceAllocation, ResourceRequest} +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, TestUtil, UserConfig} +import org.apache.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer} +import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem +import org.apache.gearpump.util.ActorUtil + +class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter { + implicit val system = ActorSystem("AppMasterSpec", TestUtil.DEFAULT_CONFIG) + val mockMaster = TestProbe()(system) + val mockWorker1 = TestProbe()(system) + val client = TestProbe()(system) + val masterProxy = mockMaster.ref + val appId = 0 + val userName = "test" + val masterExecutorId = 0 + val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L)) + val resource = Resource(1) + val appJar = None + val appDescription = AppDescription("app0", classOf[DistServiceAppMaster].getName, + UserConfig.empty) + + "DistService AppMaster" should { + "responsable for service distributing" in { + val appMasterInfo = AppMasterRuntimeInfo(appId, "appName", mockWorker1.ref) + val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, masterProxy, + appMasterInfo) + TestActorRef[DistServiceAppMaster]( + AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, appMasterContext)) + val registerAppMaster = mockMaster.receiveOne(15.seconds) + assert(registerAppMaster.isInstanceOf[RegisterAppMaster]) + + val appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster + mockMaster.reply(AppMasterRegistered(appId)) + // The DistributedShell AppMaster will ask for worker list + mockMaster.expectMsg(GetAllWorkers) + mockMaster.reply(WorkerList(workerList)) + // After worker list is ready, DistributedShell AppMaster will request resouce on each worker + workerList.foreach { workerId => + mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId, + relaxation = Relaxation.SPECIFICWORKER))) + } + mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, + WorkerId(1, 0L))))) + mockWorker1.expectMsgClass(classOf[LaunchExecutor]) + mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString)) + + appMaster.tell(GetFileContainer, client.ref) + client.expectMsgClass(15.seconds, classOf[FileContainer]) + } + } + + after { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/README.md ---------------------------------------------------------------------- diff --git a/examples/pagerank/README.md b/examples/pagerank/README.md index 20d9032..7fe9220 100644 --- a/examples/pagerank/README.md +++ b/examples/pagerank/README.md @@ -2,7 +2,7 @@ After compile, ```scala -bin\gear io.gearpump.experiments.pagerank.example.PageRankExample +bin\gear org.apache.gearpump.experiments.pagerank.example.PageRankExample ``` ### Syntax http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/examples/pagerank/src/main/resources/geardefault.conf b/examples/pagerank/src/main/resources/geardefault.conf index d2a4e16..67275f9 100644 --- a/examples/pagerank/src/main/resources/geardefault.conf +++ b/examples/pagerank/src/main/resources/geardefault.conf @@ -1,7 +1,7 @@ gearpump { serializers { - "io.gearpump.experiments.pagerank.PageRankController$Tick" = "" - "io.gearpump.experiments.pagerank.PageRankWorker$UpdateWeight" = "" - "io.gearpump.experiments.pagerank.PageRankWorker$LatestWeight" = "" + "org.apache.gearpump.experiments.pagerank.PageRankController$Tick" = "" + "org.apache.gearpump.experiments.pagerank.PageRankWorker$UpdateWeight" = "" + "org.apache.gearpump.experiments.pagerank.PageRankWorker$LatestWeight" = "" } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala ---------------------------------------------------------------------- diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala deleted file mode 100644 index 2e37091..0000000 --- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.experiments.pagerank - -import akka.actor.ActorSystem - -import io.gearpump.cluster.{Application, ApplicationMaster, UserConfig} -import io.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId -import io.gearpump.partitioner.HashPartitioner -import io.gearpump.streaming.appmaster.AppMaster -import io.gearpump.streaming.{Processor, StreamApplication} -import io.gearpump.util.Graph -import io.gearpump.util.Graph.Node - -/** - * - * A simple and naive pagerank implementation. - * - * @param name name of the application - * @param iteration max iteration count - * @param delta decide the accuracy when the page rank example stops. - * @param dag the page rank graph - */ -class PageRankApplication[T]( - override val name: String, iteration: Int, delta: Double, dag: Graph[T, _]) - extends Application { - - override def appMaster: Class[_ <: ApplicationMaster] = classOf[AppMaster] - override def userConfig(implicit system: ActorSystem): UserConfig = { - - // Map node with taskId - var taskId = 0 - val pageRankDag = dag.mapVertex { node => - val updatedNode = NodeWithTaskId(taskId, node) - taskId += 1 - updatedNode - } - - val taskCount = taskId - - val userConfig = UserConfig.empty.withValue(PageRankApplication.DAG, pageRankDag). - withInt(PageRankApplication.ITERATION, iteration). - withInt(PageRankApplication.COUNT, taskCount). - withDouble(PageRankApplication.DELTA, delta) - - val controller = Processor[PageRankController](1) - val pageRankWorker = Processor[PageRankWorker](taskCount) - val partitioner = new HashPartitioner - - val app = StreamApplication(name, Graph(controller ~ partitioner ~> pageRankWorker), userConfig) - app.userConfig - } -} - -object PageRankApplication { - val DAG = "PageRank.DAG" - val ITERATION = "PageRank.Iteration" - val COUNT = "PageRank.COUNT" - val DELTA = "PageRank.DELTA" - val REPORTER = "PageRank.Reporter" - case class NodeWithTaskId[T](taskId: Int, node: T) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala ---------------------------------------------------------------------- diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala deleted file mode 100644 index 0fb689d..0000000 --- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.experiments.pagerank - -import akka.actor.Actor.Receive - -import io.gearpump.cluster.UserConfig -import io.gearpump.experiments.pagerank.PageRankController.Tick -import io.gearpump.experiments.pagerank.PageRankWorker.LatestWeight -import io.gearpump.streaming.task._ - -class PageRankController(taskContext: TaskContext, conf: UserConfig) - extends Task(taskContext, conf) { - - val taskCount = conf.getInt(PageRankApplication.COUNT).get - val iterationMax = conf.getInt(PageRankApplication.ITERATION).get - val delta = conf.getDouble(PageRankApplication.DELTA).get - - val tasks = (0 until taskCount).toList.map(TaskId(1, _)) - - var tick: Int = 0 - var receivedWeightForCurrentTick = 0 - - var weights = Map.empty[TaskId, Double] - var deltas = Map.empty[TaskId, Double] - - override def onStart(startTime: StartTime): Unit = { - output(Tick(tick), tasks: _*) - } - - private def output(msg: AnyRef, tasks: TaskId*): Unit = { - taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*) - } - - override def receiveUnManagedMessage: Receive = { - case LatestWeight(taskId, weight, replyTick) => - if (this.tick == replyTick) { - - deltas += taskId -> Math.abs(weight - weights.getOrElse(taskId, 0.0)) - weights += taskId -> weight - receivedWeightForCurrentTick += 1 - if (receivedWeightForCurrentTick == taskCount) { - this.tick += 1 - receivedWeightForCurrentTick = 0 - if (continueIteration) { - LOG.debug(s"next iteration: $tick, weight: $weights, delta: $deltas") - output(Tick(tick), tasks: _*) - } else { - LOG.info(s"iterations: $tick, weight: $weights, delta: $deltas") - } - } - } - } - - private def continueIteration: Boolean = { - (tick < iterationMax) && deltas.values.foldLeft(false) { (deltaExceed, value) => - deltaExceed || value > delta - } - } -} - -object PageRankController { - case class Tick(iteration: Int) -}
