http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala new file mode 100644 index 0000000..e82dff3 --- /dev/null +++ b/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.scheduler + +import org.apache.gearpump.cluster.worker.WorkerId + +import scala.concurrent.duration._ + +import akka.actor.{ActorSystem, Props} +import akka.testkit.{ImplicitSender, TestKit, TestProbe} +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + +import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource +import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated +import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered} +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate +import org.apache.gearpump.cluster.master.Master.MasterInfo +import org.apache.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL} +import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished + +class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender + with WordSpecLike with Matchers with BeforeAndAfterAll{ + + def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG)) + val appId = 0 + val workerId1: WorkerId = WorkerId(1, 0L) + val workerId2: WorkerId = WorkerId(2, 0L) + val mockAppMaster = TestProbe() + val mockWorker1 = TestProbe() + val mockWorker2 = TestProbe() + + override def afterAll { + TestKit.shutdownActorSystem(system) + } + + "The scheduler" should { + "update resource only when the worker is registered" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)) + expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId1 has not been " + + s"registered into master")) + } + + "drop application's resource requests when the application is removed" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY) + val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY) + scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) + scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref) + scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) + mockAppMaster.expectNoMsg(5.seconds) + } + } + + def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean = { + left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId)) + } + + "The resource request with higher priority" should { + "be handled first" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, Relaxation.ANY) + val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, NORMAL, Relaxation.ANY) + val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, Relaxation.ANY) + + scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) + scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) + + var expect = ResourceAllocated( + Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) + + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + } + } + + "The resource request which delivered earlier" should { + "be handled first if the priorities are the same" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY) + val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY) + scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) + scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) + + var expect = ResourceAllocated( + Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + } + } + + "The PriorityScheduler" should { + "handle the resource request with different relaxation" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + val request1 = ResourceRequest(Resource(40), workerId2, HIGH, Relaxation.SPECIFICWORKER) + val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, Relaxation.SPECIFICWORKER) + + scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) + scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) + scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) + + var expect = ResourceAllocated( + Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) + scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) + + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + val request3 = ResourceRequest( + Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, executorNum = 2) + scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) + + expect = ResourceAllocated(Array( + ResourceAllocation(Resource(15), mockWorker1.ref, workerId1), + ResourceAllocation(Resource(15), mockWorker2.ref, workerId2))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + + // We have to manually update the resource on each worker + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref) + val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, Relaxation.ONEWORKER) + scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref) + + expect = ResourceAllocated( + Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1))) + mockAppMaster.expectMsgPF(5.seconds) { + case request: ResourceAllocated if sameElement(request, expect) => Unit + } + } + } + + "The PriorityScheduler" should { + "handle the resource request with different executor number" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) + scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) + scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) + + // By default, the request requires only one executor + val request2 = ResourceRequest(Resource(20), WorkerId.unspecified) + scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) + val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] + assert(allocations2.allocations.length == 1) + assert(allocations2.allocations.head.resource == Resource(20)) + + val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3) + scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) + val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] + assert(allocations3.allocations.length == 3) + assert(allocations3.allocations.forall(_.resource == Resource(8))) + + // The total available resource can not satisfy the requirements with executor number + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref) + val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3) + scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref) + val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] + assert(allocations4.allocations.length == 2) + assert(allocations4.allocations.forall(_.resource == Resource(20))) + + // When new resources are available, the remaining request will be satisfied + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref) + val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] + assert(allocations5.allocations.length == 1) + assert(allocations4.allocations.forall(_.resource == Resource(20))) + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala new file mode 100644 index 0000000..bf25057 --- /dev/null +++ b/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.worker + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.{ActorSystem, PoisonPill, Props} +import akka.testkit.TestProbe +import com.typesafe.config.{Config, ConfigFactory} +import org.scalatest._ + +import org.apache.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, LaunchExecutor, ShutdownExecutor} +import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered} +import org.apache.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, ShutdownExecutorFailed, ShutdownExecutorSucceed} +import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate} +import org.apache.gearpump.cluster.master.Master.MasterInfo +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil} +import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants} + +class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness { + override def config: Config = TestUtil.DEFAULT_CONFIG + + val appId = 1 + val workerId: WorkerId = WorkerId(1, 0L) + val executorId = 1 + var masterProxy: TestProbe = null + var mockMaster: TestProbe = null + var client: TestProbe = null + val workerSlots = 50 + + override def beforeEach(): Unit = { + startActorSystem() + mockMaster = TestProbe()(getActorSystem) + masterProxy = TestProbe()(getActorSystem) + client = TestProbe()(getActorSystem) + } + + override def afterEach(): Unit = { + shutdownActorSystem() + } + + "The new started worker" should { + "kill itself if no response from Master after registering" in { + val worker = getActorSystem.actorOf(Props(classOf[Worker], mockMaster.ref)) + mockMaster watch worker + mockMaster.expectMsg(RegisterNewWorker) + mockMaster.expectTerminated(worker, 60.seconds) + } + } + + "Worker" should { + "init its resource from the gearpump config" in { + val config = ConfigFactory.parseString(s"${Constants.GEARPUMP_WORKER_SLOTS} = $workerSlots"). + withFallback(TestUtil.DEFAULT_CONFIG) + val workerSystem = ActorSystem("WorkerSystem", config) + val worker = workerSystem.actorOf(Props(classOf[Worker], mockMaster.ref)) + mockMaster watch worker + mockMaster.expectMsg(RegisterNewWorker) + + worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref) + mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(workerSlots))) + + worker.tell( + UpdateResourceFailed("Test resource update failed", new Exception()), mockMaster.ref) + mockMaster.expectTerminated(worker, 5.seconds) + workerSystem.terminate() + Await.result(workerSystem.whenTerminated, Duration.Inf) + } + } + + "Worker" should { + "update its remaining resource when launching and shutting down executors" in { + val worker = getActorSystem.actorOf(Props(classOf[Worker], masterProxy.ref)) + masterProxy.expectMsg(RegisterNewWorker) + + worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref) + mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100))) + + val executorName = ActorUtil.actorNameForExecutor(appId, executorId) + // This is an actor path which the ActorSystemBooter will report back to, + // not needed in this test + val reportBack = "dummy" + val executionContext = ExecutorJVMConfig(Array.empty[String], + getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split(" "), + classOf[ActorSystemBooter].getName, Array(executorName, reportBack), None, + username = "user") + + // Test LaunchExecutor + worker.tell(LaunchExecutor(appId, executorId, Resource(101), executionContext), + mockMaster.ref) + mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource on this machine")) + + worker.tell(LaunchExecutor(appId, executorId, Resource(5), executionContext), mockMaster.ref) + mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(95))) + + worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), client.ref) + mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98))) + + // Test terminationWatch + worker.tell(ShutdownExecutor(appId, executorId, "Test shut down executor"), client.ref) + mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100))) + client.expectMsg(ShutdownExecutorSucceed(1, 1)) + + worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down executor"), client.ref) + client.expectMsg(ShutdownExecutorFailed( + s"Can not find executor ${executorId + 1} for app $appId")) + + mockMaster.ref ! PoisonPill + masterProxy.expectMsg(RegisterWorker(workerId)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala new file mode 100644 index 0000000..b33c801 --- /dev/null +++ b/daemon/src/test/scala/org/apache/gearpump/util/FileServerSpec.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util + +import java.io.File +import java.util.concurrent.TimeUnit +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +import akka.actor.ActorSystem +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + +import org.apache.gearpump.cluster.TestUtil +import io.gearpump.google.common.io.Files +import org.apache.gearpump.jarstore.FilePath +import org.apache.gearpump.util.FileServer._ + +class FileServerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll { + + implicit val timeout = akka.util.Timeout(25, TimeUnit.SECONDS) + val host = "localhost" + private val LOG = LogUtil.getLogger(getClass) + + var system: ActorSystem = null + + override def afterAll { + if (null != system) { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + } + + override def beforeAll { + val config = TestUtil.DEFAULT_CONFIG + system = ActorSystem("FileServerSpec", config) + } + + private def save(client: Client, data: Array[Byte]): FilePath = { + val file = File.createTempFile("fileserverspec", "test") + FileUtils.writeByteArrayToFile(file, data) + val future = client.upload(file) + import scala.concurrent.duration._ + val path = Await.result(future, 30.seconds) + file.delete() + path + } + + private def get(client: Client, remote: FilePath): Array[Byte] = { + val file = File.createTempFile("fileserverspec", "test") + val future = client.download(remote, file) + import scala.concurrent.duration._ + val data = Await.result(future, 10.seconds) + + val bytes = FileUtils.readFileToByteArray(file) + file.delete() + bytes + } + + "The file server" should { + "serve the data previously stored" in { + + val rootDir = Files.createTempDir() + + val server = new FileServer(system, host, 0, rootDir) + val port = Await.result((server.start), Duration(25, TimeUnit.SECONDS)) + + LOG.info("start test web server on port " + port) + + val sizes = List(1, 100, 1000000, 50000000) + val client = new Client(system, host, port.port) + + sizes.foreach { size => + val bytes = randomBytes(size) + val url = s"http://$host:${port.port}/$size" + val remote = save(client, bytes) + val fetchedBytes = get(client, remote) + assert(fetchedBytes sameElements bytes, s"fetch data is coruppted, $url, $rootDir") + } + server.stop + rootDir.delete() + } + } + + "The file server" should { + "handle missed file" in { + + val rootDir = Files.createTempDir() + + val server = new FileServer(system, host, 0, rootDir) + val port = Await.result((server.start), Duration(25, TimeUnit.SECONDS)) + + val client = new Client(system, host, port.port) + val fetchedBytes = get(client, FilePath("noexist")) + assert(fetchedBytes.length == 0) + rootDir.delete() + } + } + + private def randomBytes(size: Int): Array[Byte] = { + val bytes = new Array[Byte](size) + (new java.util.Random()).nextBytes(bytes) + bytes + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/deployment-configuration.md ---------------------------------------------------------------------- diff --git a/docs/deployment-configuration.md b/docs/deployment-configuration.md index 70b5500..b651fed 100644 --- a/docs/deployment-configuration.md +++ b/docs/deployment-configuration.md @@ -76,7 +76,7 @@ This is the default configuration for `gear.conf`. | gearpump.executor.vmargs | "-server -Xss1M -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80 -XX:+UseParNewGC -XX:NewRatio=3 -Djava.rmi.server.hostname=localhost" | JVM arguments for executor | | gearpump.executor.extraClasspath | "" | JVM default class path for executor | | gearpump.jarstore.rootpath | "jarstore/" | Define where the submitted jar file will be stored. This path follows the hadoop path schema. For HDFS, use `hdfs://host:port/path/`; if you want to store on master nodes, then use local directory. `jarstore.rootpath = "jarstore/"` will point to relative directory where master is started. `jarstore.rootpath = "/jarstore/"` will point to absolute directory on master server | -| gearpump.scheduling.scheduler-class |"io.gearpump.cluster.scheduler.PriorityScheduler" | Class to schedule the applications. | +| gearpump.scheduling.scheduler-class |"org.apache.gearpump.cluster.scheduler.PriorityScheduler" | Class to schedule the applications. | | gearpump.services.host | "127.0.0.1" | dashboard UI host address | | gearpump.services.port | 8090 | dashboard UI host port | | gearpump.netty.buffer-size | 5242880 | netty connection buffer size | @@ -88,4 +88,4 @@ This is the default configuration for `gear.conf`. | gearpump.netty.dispatcher | "gearpump.shared-thread-pool-dispatcher" | default dispatcher for netty client and server | | gearpump.shared-thread-pool-dispatcher | default Dispatcher with "fork-join-executor" | default shared thread pool dispatcher | | gearpump.single-thread-dispatcher | PinnedDispatcher | default single thread dispatcher | -| serialization-framework | "io.gearpump.serializer.FastKryoSerializationFramework" | Gearpump has built-in serialization framework using Kryo. Users are allowed to use a different serialization framework, like Protobuf. See `io.gearpump.serializer.FastKryoSerializationFramework` to find how a custom serialization framework can be defined | +| serialization-framework | "org.apache.gearpump.serializer.FastKryoSerializationFramework" | Gearpump has built-in serialization framework using Kryo. Users are allowed to use a different serialization framework, like Protobuf. See `org.apache.gearpump.serializer.FastKryoSerializationFramework` to find how a custom serialization framework can be defined | http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/deployment-resource-isolation.md ---------------------------------------------------------------------- diff --git a/docs/deployment-resource-isolation.md b/docs/deployment-resource-isolation.md index 0cb3764..955b8dd 100644 --- a/docs/deployment-resource-isolation.md +++ b/docs/deployment-resource-isolation.md @@ -83,7 +83,7 @@ The following steps are supposed to be executed by root user. 2. Enter into Gearpump's home folder, edit gear.conf under folder ```${GEARPUMP_HOME}/conf/``` ``` - gearpump.worker.executor-process-launcher = "io.gearpump.cluster.worker.CGroupProcessLauncher" + gearpump.worker.executor-process-launcher = "org.apache.gearpump.cluster.worker.CGroupProcessLauncher" gearpump.cgroup.root = "gearpump" ``` http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/deployment-ui-authentication.md ---------------------------------------------------------------------- diff --git a/docs/deployment-ui-authentication.md b/docs/deployment-ui-authentication.md index 872a857..614dc15 100644 --- a/docs/deployment-ui-authentication.md +++ b/docs/deployment-ui-authentication.md @@ -35,7 +35,7 @@ UI server admin can also choose to enable **auxiliary** OAuth2 authentication ch Gearpump provides a built-in ConfigFileBasedAuthenticator which verify user name and password against password hashcode stored in config files. - However, developer can choose to extends the ```io.gearpump.security.Authenticator``` to provide a custom + However, developer can choose to extends the ```org.apache.gearpump.security.Authenticator``` to provide a custom User-Password based authenticator, to support LDAP, Kerberos, and Database-based authentication... ### ConfigFileBasedAuthenticator: built-in User-Password Authenticator @@ -59,7 +59,7 @@ Suppose we want to add user jerry as an administrator, here are the steps: to generate the digest: ``` - bin/gear io.gearpump.security.PasswordUtil -password ilovegearpump + bin/gear org.apache.gearpump.security.PasswordUtil -password ilovegearpump ``` It will generate a digest value like this: @@ -110,8 +110,8 @@ If developer choose to define his/her own User-Password based authenticator, it modify configuration option: ``` -## Replace "io.gearpump.security.CustomAuthenticator" with your real authenticator class. -gearpump.ui-security.authenticator = "io.gearpump.security.CustomAuthenticator" +## Replace "org.apache.gearpump.security.CustomAuthenticator" with your real authenticator class. +gearpump.ui-security.authenticator = "org.apache.gearpump.security.CustomAuthenticator" ``` Make sure CustomAuthenticator extends interface: @@ -357,7 +357,7 @@ You can follow the Google OAuth2 example code to define a custom OAuth2Authentic ``` ## name of this authenticator "socialnetworkx" { - "class" = "io.gearpump.services.security.oauth2.impl.SocialNetworkXAuthenticator" + "class" = "org.apache.gearpump.services.security.oauth2.impl.SocialNetworkXAuthenticator" ## Please make sure this URL matches the name "callback" = "http://127.0.0.1:8090/login/oauth2/socialnetworkx/callback" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/dev-connectors.md ---------------------------------------------------------------------- diff --git a/docs/dev-connectors.md b/docs/dev-connectors.md index 08ff023..2060c93 100644 --- a/docs/dev-connectors.md +++ b/docs/dev-connectors.md @@ -9,7 +9,7 @@ title: Gearpump Connectors ### DataSource `DataSource` is the concept in Gearpump that without input and will output messages. So, basically, `DataSource` is the start point of a streaming processing flow. -As Gearpump depends on `DataSource` to be replay-able to ensure at-least-once message delivery and exactly-once message delivery, for some data sources, we will need a `io.gearpump.streaming.transaction.api.OffsetStorageFactory` to store the offset (progress) of current `DataSource`. So that, when a replay is needed, Gearpump can guide `DataSource` to replay from certain offset. +As Gearpump depends on `DataSource` to be replay-able to ensure at-least-once message delivery and exactly-once message delivery, for some data sources, we will need a `org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory` to store the offset (progress) of current `DataSource`. So that, when a replay is needed, Gearpump can guide `DataSource` to replay from certain offset. Currently Gearpump `DataSource` only support infinite stream. Finite stream support will be added in a near future release. @@ -143,7 +143,7 @@ To implement your own `DataSource`, you need to implement two things: 2. a helper class to easy the usage in a DSL ### Implement your own `DataSource` -You need to implement a class derived from `io.gearpump.streaming.transaction.api.TimeReplayableSource`. +You need to implement a class derived from `org.apache.gearpump.streaming.transaction.api.TimeReplayableSource`. ### Implement DSL helper (Optional) If you would like to have a DSL at hand you may start with this customized stream; it is better if you can implement your own DSL helper. @@ -174,7 +174,7 @@ To implement your own `DataSink`, you need to implement two things: 2. a helper class to make it easy use in DSL ### Implement your own `DataSink` -You need to implement a class derived from `io.gearpump.streaming.sink.DataSink`. +You need to implement a class derived from `org.apache.gearpump.streaming.sink.DataSink`. ### Implement DSL helper (Optional) If you would like to have a DSL at hand you may start with this customized stream; it is better if you can implement your own DSL helper. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/dev-custom-serializer.md ---------------------------------------------------------------------- diff --git a/docs/dev-custom-serializer.md b/docs/dev-custom-serializer.md index 5be0544..8e6ba46 100644 --- a/docs/dev-custom-serializer.md +++ b/docs/dev-custom-serializer.md @@ -15,10 +15,10 @@ To register a class, you need to change the configuration file gear.conf(or appl gearpump { serializers { ## We will use default FieldSerializer to serialize this class type - "io.gearpump.UserMessage" = "" + "org.apache.gearpump.UserMessage" = "" ## we will use custom serializer to serialize this class type - "io.gearpump.UserMessage2" = "io.gearpump.UserMessageSerializer" + "org.apache.gearpump.UserMessage2" = "org.apache.gearpump.UserMessageSerializer" } } ``` @@ -27,7 +27,7 @@ gearpump { When you decide that you want to define a custom serializer, you can do this in two ways. -Please note that Gearpump shaded the original Kryo dependency. The package name ```com.esotericsoftware``` was relocated to ```io.gearpump.esotericsoftware```. So in the following customization, you should import corresponding shaded classes, the example code will show that part. +Please note that Gearpump shaded the original Kryo dependency. The package name ```com.esotericsoftware``` was relocated to ```org.apache.gearpump.esotericsoftware```. So in the following customization, you should import corresponding shaded classes, the example code will show that part. In general you should use the shaded version of a library whenever possible in order to avoid binary incompatibilities, eg don't use: @@ -38,7 +38,7 @@ In general you should use the shaded version of a library whenever possible in o but rather ```scala - import io.gearpump.google.common.io.Files + import org.apache.gearpump.google.common.io.Files ``` ##### System Level Serializer @@ -48,10 +48,10 @@ If the serializer is widely used, you can define a global serializer which is av ###### Step1: you first need to develop a java library which contains the custom serializer class. here is an example: ```scala -package io.gearpump +package org.apache.gearpump -import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer} -import io.gearpump.esotericsoftware.kryo.io.{Input, Output} +import org.apache.gearpump.esotericsoftware.kryo.{Kryo, Serializer} +import org.apache.gearpump.esotericsoftware.kryo.io.{Input, Output} class UserMessage(longField: Long, intField: Int) @@ -78,7 +78,7 @@ Distribute the jar file to lib/ folder of every Gearpump installation in the clu ``` gearpump { serializers { - "io.gearpump.UserMessage" = "io.gearpump.UserMessageSerializer" + "org.apache.gearpump.UserMessage" = "org.apache.gearpump.UserMessageSerializer" } } ``` @@ -93,10 +93,10 @@ If all you want is to define an application level serializer, which is only visi You should include the Serializer class in your application jar. Here is an example to define a custom serializer: ```scala -package io.gearpump +package org.apache.gearpump -import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer} -import io.gearpump.esotericsoftware.kryo.io.{Input, Output} +import org.apache.gearpump.esotericsoftware.kryo.{Kryo, Serializer} +import org.apache.gearpump.esotericsoftware.kryo.io.{Input, Output} class UserMessage(longField: Long, intField: Int) @@ -120,7 +120,7 @@ class UserMessageSerializer extends Serializer[UserMessage] { ### content of application.conf gearpump { serializers { - "io.gearpump.UserMessage" = "io.gearpump.UserMessageSerializer" + "org.apache.gearpump.UserMessage" = "org.apache.gearpump.UserMessageSerializer" } } ``` @@ -136,7 +136,7 @@ There are other serialization framework besides Kryo, like Protobuf. If user don basically, user need to define in gear.conf(or application.conf for single application's scope) file like this: ```bash -gearpump.serialization-framework = "io.gearpump.serializer.CustomSerializationFramework" +gearpump.serialization-framework = "org.apache.gearpump.serializer.CustomSerializationFramework" ``` Please find an example in gearpump storm module, search "StormSerializationFramework" in source code. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/dev-non-streaming-example.md ---------------------------------------------------------------------- diff --git a/docs/dev-non-streaming-example.md b/docs/dev-non-streaming-example.md index 1aec2d1..8b859dc 100644 --- a/docs/dev-non-streaming-example.md +++ b/docs/dev-non-streaming-example.md @@ -3,7 +3,7 @@ layout: global title: Gearpump Non-Streaming Example --- -We'll use [Distributed Shell](https://github.com/gearpump/gearpump/tree/master/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell) as an example to illustrate how to do that. +We'll use [Distributed Shell](https://github.com/gearpump/gearpump/tree/master/examples/distributedshell/src/main/scala/org.apache.gearpump/examples/distributedshell) as an example to illustrate how to do that. What Distributed Shell do is that user send a shell command to the cluster and the command will the executed on each node, then the result will be return to user. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/dev-rest-api.md ---------------------------------------------------------------------- diff --git a/docs/dev-rest-api.md b/docs/dev-rest-api.md index 6243a89..161a09f 100644 --- a/docs/dev-rest-api.md +++ b/docs/dev-rest-api.md @@ -262,36 +262,36 @@ Sample Response: : [{ "time": "1450758725070", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "master:memory.heap.used", "value": "59764272"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "master:memory.heap.used", "value": "59764272"} }, { "time": "1450758725070", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "master:thread.daemon.count", "value": "18"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "master:thread.daemon.count", "value": "18"} }, { "time": "1450758725070", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "master:memory.total.committed", "value": "210239488" } }, { "time": "1450758725070", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "master:memory.heap.max", "value": "880017408"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "master:memory.heap.max", "value": "880017408"} }, { "time": "1450758725070", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "master:memory.total.max", "value": "997457920"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "master:memory.total.max", "value": "997457920"} }, { "time": "1450758725070", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "master:memory.heap.committed", "value": "179830784" } }, { "time": "1450758725070", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "master:memory.total.used", "value": "89117352"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "master:memory.total.used", "value": "89117352"} }, { "time": "1450758725070", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "master:thread.count", "value": "28"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "master:thread.count", "value": "28"} }] } ``` @@ -471,88 +471,88 @@ Sample Response: [{ "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:memory.total.used", "value": "152931440" } }, { "time": "1450759137860", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "worker1:thread.daemon.count", "value": "18"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:thread.daemon.count", "value": "18"} }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:memory.heap.used", "value": "123139640" } }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:memory.total.max", "value": "997457920" } }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:memory.heap.committed", "value": "179830784" } }, { "time": "1450759137860", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "worker0:thread.count", "value": "28"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:thread.count", "value": "28"} }, { "time": "1450759137860", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "worker0:memory.heap.max", "value": "880017408"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:memory.heap.max", "value": "880017408"} }, { "time": "1450759137860", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "worker1:memory.heap.max", "value": "880017408"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:memory.heap.max", "value": "880017408"} }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:memory.total.committed", "value": "210239488" } }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:memory.total.used", "value": "152931440" } }, { "time": "1450759137860", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "worker1:thread.count", "value": "28"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:thread.count", "value": "28"} }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:memory.total.max", "value": "997457920" } }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:memory.heap.committed", "value": "179830784" } }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:memory.total.committed", "value": "210239488" } }, { "time": "1450759137860", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "worker0:thread.daemon.count", "value": "18"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:thread.daemon.count", "value": "18"} }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:memory.heap.used", "value": "123139640" } @@ -654,7 +654,7 @@ Sample Response: 0, { "id": 0, - "taskClass": "io.gearpump.streaming.examples.wordcount.Split", + "taskClass": "org.apache.gearpump.streaming.examples.wordcount.Split", "parallelism": 1, "description": "", "taskConf": { @@ -681,7 +681,7 @@ Sample Response: 1, { "id": 1, - "taskClass": "io.gearpump.streaming.examples.wordcount.Sum", + "taskClass": "org.apache.gearpump.streaming.examples.wordcount.Sum", "parallelism": 1, "description": "", "taskConf": { @@ -723,7 +723,7 @@ Sample Response: "edgeList": [ [ 0, - "io.gearpump.partitioner.HashPartitioner", + "org.apache.gearpump.partitioner.HashPartitioner", 1 ] ] @@ -856,7 +856,7 @@ Sample Response: }, "netty-dispatcher" : "akka.actor.default-dispatcher", "scheduling" : { - "scheduler-class" : "io.gearpump.cluster.scheduler.PriorityScheduler" + "scheduler-class" : "org.apache.gearpump.cluster.scheduler.PriorityScheduler" }, "serializers" : { "[B" : "", @@ -868,11 +868,11 @@ Sample Response: "[Ljava.lang.String;" : "", "[S" : "", "[Z" : "", - "io.gearpump.Message" : "io.gearpump.streaming.MessageSerializer", - "io.gearpump.streaming.task.Ack" : "io.gearpump.streaming.AckSerializer", - "io.gearpump.streaming.task.AckRequest" : "io.gearpump.streaming.AckRequestSerializer", - "io.gearpump.streaming.task.LatencyProbe" : "io.gearpump.streaming.LatencyProbeSerializer", - "io.gearpump.streaming.task.TaskId" : "io.gearpump.streaming.TaskIdSerializer", + "org.apache.gearpump.Message" : "org.apache.gearpump.streaming.MessageSerializer", + "org.apache.gearpump.streaming.task.Ack" : "org.apache.gearpump.streaming.AckSerializer", + "org.apache.gearpump.streaming.task.AckRequest" : "org.apache.gearpump.streaming.AckRequestSerializer", + "org.apache.gearpump.streaming.task.LatencyProbe" : "org.apache.gearpump.streaming.LatencyProbeSerializer", + "org.apache.gearpump.streaming.task.TaskId" : "org.apache.gearpump.streaming.TaskIdSerializer", "scala.Tuple1" : "", "scala.Tuple2" : "", "scala.Tuple3" : "", @@ -910,7 +910,7 @@ aggregator points to a aggregator class, which will aggregate on the current met Example: ```bash -curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/appmaster/1/metrics/app1?readLatest=true&aggregator=io.gearpump.streaming.metrics.ProcessorAggregator +curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/appmaster/1/metrics/app1?readLatest=true&aggregator=org.apache.gearpump.streaming.metrics.ProcessorAggregator ``` Sample Response: @@ -924,88 +924,88 @@ Sample Response: [{ "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:memory.total.used", "value": "152931440" } }, { "time": "1450759137860", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "worker1:thread.daemon.count", "value": "18"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:thread.daemon.count", "value": "18"} }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:memory.heap.used", "value": "123139640" } }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:memory.total.max", "value": "997457920" } }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:memory.heap.committed", "value": "179830784" } }, { "time": "1450759137860", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "worker0:thread.count", "value": "28"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:thread.count", "value": "28"} }, { "time": "1450759137860", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "worker0:memory.heap.max", "value": "880017408"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:memory.heap.max", "value": "880017408"} }, { "time": "1450759137860", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "worker1:memory.heap.max", "value": "880017408"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:memory.heap.max", "value": "880017408"} }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:memory.total.committed", "value": "210239488" } }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:memory.total.used", "value": "152931440" } }, { "time": "1450759137860", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "worker1:thread.count", "value": "28"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:thread.count", "value": "28"} }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:memory.total.max", "value": "997457920" } }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:memory.heap.committed", "value": "179830784" } }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:memory.total.committed", "value": "210239488" } }, { "time": "1450759137860", - "value": {"$type": "io.gearpump.metrics.Metrics.Gauge", "name": "worker0:thread.daemon.count", "value": "18"} + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:thread.daemon.count", "value": "18"} }, { "time": "1450759137860", "value": { - "$type": "io.gearpump.metrics.Metrics.Gauge", + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:memory.heap.used", "value": "123139640" } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/dev-write-1st-app.md ---------------------------------------------------------------------- diff --git a/docs/dev-write-1st-app.md b/docs/dev-write-1st-app.md index e768397..e125f3d 100644 --- a/docs/dev-write-1st-app.md +++ b/docs/dev-write-1st-app.md @@ -5,7 +5,7 @@ title: Write Your 1st Gearpump App description: Write Your 1st Gearpump App --- -We'll use [wordcount](https://github.com/gearpump/gearpump/blob/master/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/) as an example to illustrate how to write Gearpump applications. +We'll use [wordcount](https://github.com/gearpump/gearpump/blob/master/examples/streaming/wordcount/src/main/scala/org.apache.gearpump/streaming/examples/wordcount/) as an example to illustrate how to write Gearpump applications. ### Maven/Sbt Settings @@ -17,9 +17,9 @@ You can get your preferred IDE ready for Gearpump by following [this guide](dev- ### Decide which language and API to use for writing Gearpump supports two level APIs: -1. Low level API, which is more similar to Akka programming, operating on each event. The API document can be found at [Low Level API Doc](http://www.gearpump.io/releases/latest/api/scala/index.html#io.gearpump.streaming.package). +1. Low level API, which is more similar to Akka programming, operating on each event. The API document can be found at [Low Level API Doc](http://www.gearpump.io/releases/latest/api/scala/index.html#org.apache.gearpump.streaming.package). -2. High level API (aka DSL), which is operating on streaming instead of individual event. The API document can be found at [DSL API Doc](http://www.gearpump.io/releases/latest/api/scala/index.html#io.gearpump.streaming.dsl.package). +2. High level API (aka DSL), which is operating on streaming instead of individual event. The API document can be found at [DSL API Doc](http://www.gearpump.io/releases/latest/api/scala/index.html#org.apache.gearpump.streaming.dsl.package). And both APIs have their Java version and Scala version. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/docs/submit-your-1st-application.md ---------------------------------------------------------------------- diff --git a/docs/submit-your-1st-application.md b/docs/submit-your-1st-application.md index b72329d..3bdb2f8 100644 --- a/docs/submit-your-1st-application.md +++ b/docs/submit-your-1st-application.md @@ -26,7 +26,7 @@ Open another shell, ```bash ### To run WordCount example -bin/gear app -jar examples/wordcount-{{site.SCALA_BINARY_VERSION}}-{{site.GEARPUMP_VERSION}}-assembly.jar io.gearpump.streaming.examples.wordcount.WordCount +bin/gear app -jar examples/wordcount-{{site.SCALA_BINARY_VERSION}}-{{site.GEARPUMP_VERSION}}-assembly.jar org.apache.gearpump.streaming.examples.wordcount.WordCount ``` ### Step 2: Congratulations, you've submitted your first application. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/README.md ---------------------------------------------------------------------- diff --git a/examples/distributedshell/README.md b/examples/distributedshell/README.md index f21e87e..437960a 100644 --- a/examples/distributedshell/README.md +++ b/examples/distributedshell/README.md @@ -8,10 +8,10 @@ In order to run the example: 2. Start the AppMaster:<br> ```bash - target/pack/bin/gear app -jar target/pack/examples/gearpump-experiments-distributedshell_$VERSION.jar io.gearpump.examples.distributedshell.DistributedShell + target/pack/bin/gear app -jar target/pack/examples/gearpump-experiments-distributedshell_$VERSION.jar org.apache.gearpump.examples.distributedshell.DistributedShell ``` 3. Submit the shell command:<br> ```bash - target/pack/bin/gear app -verbose true -jar target/pack/examples/gearpump-experiments-distributedshell_$VERSION.jar io.gearpump.examples.distributedshell.DistributedShellClient -appid $APPID -command "ls /" + target/pack/bin/gear app -verbose true -jar target/pack/examples/gearpump-experiments-distributedshell_$VERSION.jar org.apache.gearpump.examples.distributedshell.DistributedShellClient -appid $APPID -command "ls /" ``` \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistShellAppMaster.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistShellAppMaster.scala b/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistShellAppMaster.scala deleted file mode 100644 index 9b417c0..0000000 --- a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistShellAppMaster.scala +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.examples.distributedshell - -import scala.concurrent.Future - -import akka.actor.{Deploy, Props} -import akka.pattern.{ask, pipe} -import akka.remote.RemoteScope -import com.typesafe.config.Config -import org.slf4j.Logger - -import io.gearpump.cluster.ClientToMaster.ShutdownApplication -import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout} -import io.gearpump.cluster.{AppDescription, AppMasterContext, ApplicationMaster, ExecutorContext} -import io.gearpump.examples.distributedshell.DistShellAppMaster._ -import io.gearpump.util.{ActorUtil, Constants, LogUtil, Util} - -class DistShellAppMaster(appContext: AppMasterContext, app: AppDescription) - extends ApplicationMaster { - - import appContext._ - import context.dispatcher - implicit val timeout = Constants.FUTURE_TIMEOUT - private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) - protected var currentExecutorId = 0 - - override def preStart(): Unit = { - LOG.info(s"Distributed Shell AppMaster started") - ActorUtil.launchExecutorOnEachWorker(masterProxy, getExecutorJvmConfig, self) - } - - override def receive: Receive = { - case ExecutorSystemStarted(executorSystem, _) => - import executorSystem.{address, resource => executorResource, worker} - val executorContext = ExecutorContext(currentExecutorId, worker, appId, app.name, - self, executorResource) - // Start executor - val executor = context.actorOf(Props(classOf[ShellExecutor], executorContext, app.userConfig) - .withDeploy(Deploy(scope = RemoteScope(address))), currentExecutorId.toString) - executorSystem.bindLifeCycleWith(executor) - currentExecutorId += 1 - case StartExecutorSystemTimeout => - LOG.error(s"Failed to allocate resource in time") - masterProxy ! ShutdownApplication(appId) - context.stop(self) - case msg: ShellCommand => - Future.fold(context.children.map(_ ? msg))(new ShellCommandResultAggregator) { - (aggregator, response) => { - aggregator.aggregate(response.asInstanceOf[ShellCommandResult]) - } - }.map(_.toString()) pipeTo sender - } - - private def getExecutorJvmConfig: ExecutorSystemJvmConfig = { - val config: Config = app.clusterConfig - val jvmSetting = Util.resolveJvmSetting(config.withFallback(context.system.settings.config)) - .executor - ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, - appJar, username, config) - } -} - -object DistShellAppMaster { - case class ShellCommand(command: String) - - case class ShellCommandResult(executorId: Int, msg: Any) - - class ShellCommandResultAggregator { - val result: StringBuilder = new StringBuilder - - def aggregate(response: ShellCommandResult): ShellCommandResultAggregator = { - result.append(s"Execute results from executor ${response.executorId} : \n") - result.append(response.msg + "\n") - this - } - - override def toString(): String = result.toString() - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShell.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShell.scala b/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShell.scala deleted file mode 100644 index b7b3eb0..0000000 --- a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShell.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.examples.distributedshell - -import org.slf4j.Logger - -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import io.gearpump.cluster.{Application, UserConfig} -import io.gearpump.util.{AkkaApp, LogUtil} - -/** Demo application to distribute and execute shell command on all machines of the cluster */ -object DistributedShell extends AkkaApp with ArgumentsParser { - private val LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array.empty - - override def main(akkaConf: Config, args: Array[String]): Unit = { - LOG.info(s"Distributed shell submitting application...") - val context = ClientContext(akkaConf) - val appId = context.submit(Application[DistShellAppMaster]("DistributedShell", - UserConfig.empty)) - context.close() - LOG.info(s"Distributed Shell Application started with appId $appId !") - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShellClient.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShellClient.scala b/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShellClient.scala deleted file mode 100644 index cd6f943..0000000 --- a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/DistributedShellClient.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.examples.distributedshell - -import java.util.concurrent.TimeUnit -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.pattern.ask -import org.slf4j.{Logger, LoggerFactory} - -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import io.gearpump.examples.distributedshell.DistShellAppMaster.ShellCommand -import io.gearpump.util.{AkkaApp, Constants} - -/** Client to DistributedShell to input "shell command" */ -object DistributedShellClient extends AkkaApp with ArgumentsParser { - implicit val timeout = Constants.FUTURE_TIMEOUT - private val LOG: Logger = LoggerFactory.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array( - "appid" -> CLIOption[Int]("<the distributed shell appid>", required = true), - "command" -> CLIOption[String]("<shell command>", required = true) - ) - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - val context = ClientContext(akkaConf) - implicit val system = context.system - implicit val dispatcher = system.dispatcher - val appid = config.getInt("appid") - val command = config.getString("command") - val appMaster = context.resolveAppID(appid) - LOG.info(s"Resolved appMaster $appid address $appMaster, sending command $command") - val future = (appMaster ? ShellCommand(command)).map { result => - LOG.info(s"Result: \n$result") - context.close() - } - Await.ready(future, Duration(60, TimeUnit.SECONDS)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/ShellExecutor.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/ShellExecutor.scala b/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/ShellExecutor.scala deleted file mode 100644 index 2d0fd06..0000000 --- a/examples/distributedshell/src/main/scala/io/gearpump/examples/distributedshell/ShellExecutor.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.examples.distributedshell - -import scala.sys.process._ -import scala.util.{Failure, Success, Try} - -import akka.actor.Actor -import org.slf4j.Logger - -import io.gearpump.cluster.{ExecutorContext, UserConfig} -import io.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommand, ShellCommandResult} -import io.gearpump.util.LogUtil - -/** Executor actor on remote machine */ -class ShellExecutor(executorContext: ExecutorContext, userConf: UserConfig) extends Actor { - import executorContext._ - private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, app = appId) - - LOG.info(s"ShellExecutor started!") - - override def receive: Receive = { - case ShellCommand(command) => - val process = Try(s"$command".!!) - val result = process match { - case Success(msg) => msg - case Failure(ex) => ex.getMessage - } - sender ! ShellCommandResult(executorId, result) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala new file mode 100644 index 0000000..cd0b18b --- /dev/null +++ b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMaster.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.examples.distributedshell + +import scala.concurrent.Future + +import akka.actor.{Deploy, Props} +import akka.pattern.{ask, pipe} +import akka.remote.RemoteScope +import com.typesafe.config.Config +import org.slf4j.Logger + +import org.apache.gearpump.cluster.ClientToMaster.ShutdownApplication +import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout} +import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, ApplicationMaster, ExecutorContext} +import org.apache.gearpump.examples.distributedshell.DistShellAppMaster._ +import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil, Util} + +class DistShellAppMaster(appContext: AppMasterContext, app: AppDescription) + extends ApplicationMaster { + + import appContext._ + import context.dispatcher + implicit val timeout = Constants.FUTURE_TIMEOUT + private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) + protected var currentExecutorId = 0 + + override def preStart(): Unit = { + LOG.info(s"Distributed Shell AppMaster started") + ActorUtil.launchExecutorOnEachWorker(masterProxy, getExecutorJvmConfig, self) + } + + override def receive: Receive = { + case ExecutorSystemStarted(executorSystem, _) => + import executorSystem.{address, resource => executorResource, worker} + val executorContext = ExecutorContext(currentExecutorId, worker, appId, app.name, + self, executorResource) + // Start executor + val executor = context.actorOf(Props(classOf[ShellExecutor], executorContext, app.userConfig) + .withDeploy(Deploy(scope = RemoteScope(address))), currentExecutorId.toString) + executorSystem.bindLifeCycleWith(executor) + currentExecutorId += 1 + case StartExecutorSystemTimeout => + LOG.error(s"Failed to allocate resource in time") + masterProxy ! ShutdownApplication(appId) + context.stop(self) + case msg: ShellCommand => + Future.fold(context.children.map(_ ? msg))(new ShellCommandResultAggregator) { + (aggregator, response) => { + aggregator.aggregate(response.asInstanceOf[ShellCommandResult]) + } + }.map(_.toString()) pipeTo sender + } + + private def getExecutorJvmConfig: ExecutorSystemJvmConfig = { + val config: Config = app.clusterConfig + val jvmSetting = Util.resolveJvmSetting(config.withFallback(context.system.settings.config)) + .executor + ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, + appJar, username, config) + } +} + +object DistShellAppMaster { + case class ShellCommand(command: String) + + case class ShellCommandResult(executorId: Int, msg: Any) + + class ShellCommandResultAggregator { + val result: StringBuilder = new StringBuilder + + def aggregate(response: ShellCommandResult): ShellCommandResultAggregator = { + result.append(s"Execute results from executor ${response.executorId} : \n") + result.append(response.msg + "\n") + this + } + + override def toString(): String = result.toString() + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala new file mode 100644 index 0000000..c4eec07 --- /dev/null +++ b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.examples.distributedshell + +import org.slf4j.Logger + +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.cluster.{Application, UserConfig} +import org.apache.gearpump.util.{AkkaApp, LogUtil} + +/** Demo application to distribute and execute shell command on all machines of the cluster */ +object DistributedShell extends AkkaApp with ArgumentsParser { + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array.empty + + override def main(akkaConf: Config, args: Array[String]): Unit = { + LOG.info(s"Distributed shell submitting application...") + val context = ClientContext(akkaConf) + val appId = context.submit(Application[DistShellAppMaster]("DistributedShell", + UserConfig.empty)) + context.close() + LOG.info(s"Distributed Shell Application started with appId $appId !") + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClient.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClient.scala b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClient.scala new file mode 100644 index 0000000..8a7efa5 --- /dev/null +++ b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClient.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.examples.distributedshell + +import java.util.concurrent.TimeUnit +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +import akka.pattern.ask +import org.slf4j.{Logger, LoggerFactory} + +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.examples.distributedshell.DistShellAppMaster.ShellCommand +import org.apache.gearpump.util.{AkkaApp, Constants} + +/** Client to DistributedShell to input "shell command" */ +object DistributedShellClient extends AkkaApp with ArgumentsParser { + implicit val timeout = Constants.FUTURE_TIMEOUT + private val LOG: Logger = LoggerFactory.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + "appid" -> CLIOption[Int]("<the distributed shell appid>", required = true), + "command" -> CLIOption[String]("<shell command>", required = true) + ) + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + val context = ClientContext(akkaConf) + implicit val system = context.system + implicit val dispatcher = system.dispatcher + val appid = config.getInt("appid") + val command = config.getString("command") + val appMaster = context.resolveAppID(appid) + LOG.info(s"Resolved appMaster $appid address $appMaster, sending command $command") + val future = (appMaster ? ShellCommand(command)).map { result => + LOG.info(s"Result: \n$result") + context.close() + } + Await.ready(future, Duration(60, TimeUnit.SECONDS)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/ShellExecutor.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/ShellExecutor.scala b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/ShellExecutor.scala new file mode 100644 index 0000000..fc394d8 --- /dev/null +++ b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/ShellExecutor.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.examples.distributedshell + +import scala.sys.process._ +import scala.util.{Failure, Success, Try} + +import akka.actor.Actor +import org.slf4j.Logger + +import org.apache.gearpump.cluster.{ExecutorContext, UserConfig} +import org.apache.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommand, ShellCommandResult} +import org.apache.gearpump.util.LogUtil + +/** Executor actor on remote machine */ +class ShellExecutor(executorContext: ExecutorContext, userConf: UserConfig) extends Actor { + import executorContext._ + private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, app = appId) + + LOG.info(s"ShellExecutor started!") + + override def receive: Receive = { + case ShellCommand(command) => + val process = Try(s"$command".!!) + val result = process match { + case Success(msg) => msg + case Failure(ex) => ex.getMessage + } + sender ! ShellCommandResult(executorId, result) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala deleted file mode 100644 index 2d63734..0000000 --- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.examples.distributedshell - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.ActorSystem -import akka.testkit.{TestActorRef, TestProbe} -import org.scalatest.{BeforeAndAfter, Matchers, WordSpec} - -import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterAppMaster, RequestResource} -import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor -import io.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated, WorkerList} -import io.gearpump.cluster._ -import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo} -import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceAllocation, ResourceRequest} -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.util.ActorSystemBooter.RegisterActorSystem -import io.gearpump.util.ActorUtil - -class DistShellAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter { - implicit val system = ActorSystem("AppMasterSpec", TestUtil.DEFAULT_CONFIG) - val mockMaster = TestProbe()(system) - val mockWorker1 = TestProbe()(system) - val masterProxy = mockMaster.ref - val appId = 0 - val userName = "test" - val masterExecutorId = 0 - val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L)) - val resource = Resource(1) - val appJar = None - val appDescription = AppDescription("app0", classOf[DistShellAppMaster].getName, UserConfig.empty) - - "DistributedShell AppMaster" should { - "launch one ShellTask on each worker" in { - val appMasterInfo = AppMasterRuntimeInfo(appId, appName = appId.toString) - val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, - masterProxy, appMasterInfo) - TestActorRef[DistShellAppMaster]( - AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, appMasterContext)) - mockMaster.expectMsgType[RegisterAppMaster] - mockMaster.reply(AppMasterRegistered(appId)) - // The DistributedShell AppMaster asks for worker list from Master. - mockMaster.expectMsg(GetAllWorkers) - mockMaster.reply(WorkerList(workerList)) - // After worker list is ready, DistributedShell AppMaster requests resource on each worker - workerList.foreach { workerId => - mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId, - relaxation = Relaxation.SPECIFICWORKER))) - } - mockMaster.reply(ResourceAllocated( - Array(ResourceAllocation(resource, mockWorker1.ref, WorkerId(1, 0L))))) - mockWorker1.expectMsgClass(classOf[LaunchExecutor]) - mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString)) - } - } - - after { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellClientSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellClientSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellClientSpec.scala deleted file mode 100644 index 973b3b3..0000000 --- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellClientSpec.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.examples.distributedshell - -import scala.concurrent.Future -import scala.util.{Success, Try} - -import akka.testkit.TestProbe -import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} - -import io.gearpump.cluster.ClientToMaster.ResolveAppId -import io.gearpump.cluster.MasterToClient.ResolveAppIdResult -import io.gearpump.cluster.{MasterHarness, TestUtil} -import io.gearpump.examples.distributedshell.DistShellAppMaster.ShellCommand -import io.gearpump.util.LogUtil - -class DistributedShellClientSpec - extends PropSpec with Matchers with BeforeAndAfter with MasterHarness { - - private val LOG = LogUtil.getLogger(getClass) - - before { - startActorSystem() - } - - after { - shutdownActorSystem() - } - - protected override def config = TestUtil.DEFAULT_CONFIG - - property("DistributedShellClient should succeed to submit application with required arguments") { - val command = "ls /" - val requiredArgs = Array("-appid", "0", "-command", command) - val masterReceiver = createMockMaster() - - assert(Try(DistributedShellClient.main(Array.empty[String])).isFailure, - "missing required arguments, print usage") - - Future { - DistributedShellClient.main(masterConfig, requiredArgs) - } - - masterReceiver.expectMsg(PROCESS_BOOT_TIME, ResolveAppId(0)) - val mockAppMaster = TestProbe()(getActorSystem) - masterReceiver.reply(ResolveAppIdResult(Success(mockAppMaster.ref))) - LOG.info(s"Reply back ResolveAppIdResult, current actorRef: ${mockAppMaster.ref.path.toString}") - mockAppMaster.expectMsg(PROCESS_BOOT_TIME, ShellCommand(command)) - mockAppMaster.reply("result") - } -}
