fix #1943 allow user to config how many executors to use in an application
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/47d867d2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/47d867d2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/47d867d2 Branch: refs/heads/master Commit: 47d867d276dfa9adf3508e030bd7f3258291727b Parents: 313b6c4 Author: huafengw <[email protected]> Authored: Tue Feb 2 18:01:14 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Apr 26 14:23:24 2016 +0800 ---------------------------------------------------------------------- conf/gear.conf | 5 +- core/src/main/resources/geardefault.conf | 9 +- .../appmaster/ExecutorSystemScheduler.scala | 12 +-- .../gearpump/cluster/client/ClientContext.scala | 43 ++++---- .../gearpump/cluster/scheduler/Resource.scala | 6 +- .../main/scala/io/gearpump/util/Constants.scala | 2 +- core/src/test/resources/test.conf | 2 + .../io/gearpump/cluster/main/AppSubmitter.scala | 2 + .../cluster/scheduler/PriorityScheduler.scala | 103 +++++++++++-------- .../scheduler/PrioritySchedulerSpec.scala | 44 +++++++- docs/commandline.md | 2 +- .../experiments/storm/main/GearpumpNimbus.scala | 3 +- .../checklist/ConnectorKafkaSpec.scala | 4 +- .../checklist/DynamicDagSpec.scala | 2 +- .../integrationtest/checklist/ExampleSpec.scala | 13 +-- .../checklist/RestServiceSpec.scala | 26 ++--- .../minicluster/CommandLineClient.scala | 4 +- .../minicluster/RestClient.scala | 4 +- services/dashboard/services/restapi.js | 12 +-- .../dashboard/views/apps/submit/submit.html | 4 + services/dashboard/views/apps/submit/submit.js | 3 +- .../io/gearpump/services/MasterService.scala | 10 +- .../streaming/appmaster/TaskSchedulerImpl.scala | 6 +- .../streaming/appmaster/JarSchedulerSpec.scala | 4 +- .../streaming/appmaster/TaskSchedulerSpec.scala | 4 +- 25 files changed, 192 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/conf/gear.conf ---------------------------------------------------------------------- diff --git a/conf/gear.conf b/conf/gear.conf index 84694a8..1ab4bb6 100644 --- a/conf/gear.conf +++ b/conf/gear.conf @@ -62,11 +62,14 @@ gearpump { ### When the resource cannot be allocated in the timeout, then ### the appmaster will shutdown itself. resource-allocation-timeout-seconds = 120 - + ## ## Executor share same process of worker worker.executor-share-same-jvm-as-worker = false + ## Number of executors to launch when starting an application + application.executor-num = 1 + ########################### ### Change the dispather for tasks ### If you don't know what this is about, don't change it http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/core/src/main/resources/geardefault.conf b/core/src/main/resources/geardefault.conf index 16bd8c0..17626a3 100644 --- a/core/src/main/resources/geardefault.conf +++ b/core/src/main/resources/geardefault.conf @@ -31,14 +31,8 @@ gearpump { ## The installation folder of gearpump home = "" - - - - serializer.pool = "io.gearpump.serializer.FastKryoSerializerPool" - - ## How many slots each worker contains worker.slots = 1000 @@ -46,6 +40,9 @@ gearpump { ## User can switch to "io.gearpump.cluster.worker.CGroupProcessLauncher" to enable CGroup support. worker.executor-process-launcher = "io.gearpump.cluster.worker.DefaultExecutorProcessLauncher" + ## Number of executors to launch when starting an application + application.executor-num = 1 + ## To enable worker use cgroup to make resource isolation, ## set gearpump.worker.executor-process-launcher = "io.gearpump.cluster.worker.CGroupProcessLauncher" ## http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala index 0b35db9..6af68eb 100644 --- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala +++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala @@ -70,20 +70,14 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef, def resourceAllocationMessageHandler: Receive = { case ResourceAllocatedForSession(allocations, session) => - if (isSessionAlive(session)) { - val groupedResource = allocations.groupBy(_.worker).mapValues { - _.reduce((resourceA, resourceB) => - resourceA.copy(resource = (resourceA.resource + resourceB.resource))) - }.toArray - - groupedResource.map((workerAndResources) => { - val ResourceAllocation(resource, worker, workerId) = workerAndResources._2 + allocations.foreach { resourceAllocation => + val ResourceAllocation(resource, worker, workerId) = resourceAllocation val launcher = context.actorOf(executorSystemLauncher(appId, session)) launcher ! LaunchExecutorSystem(WorkerInfo(workerId, worker), currentSystemId, resource) currentSystemId = currentSystemId + 1 - }) + } } case ResourceAllocationTimeOut(session) => if (isSessionAlive(session)) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala index 536d8c1..3a2868c 100644 --- a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala +++ b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala @@ -22,23 +22,21 @@ import java.util.concurrent.TimeUnit import akka.actor.{ActorRef, ActorSystem} import akka.util.Timeout -import com.typesafe.config.{ConfigFactory, Config} -import io.gearpump.cluster.MasterToAppMaster.{ReplayFromTimestampWindowTrailingEdge, AppMastersData} +import com.typesafe.config.{ConfigValueFactory, Config} +import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFromTimestampWindowTrailingEdge} import io.gearpump.cluster.MasterToClient.ReplayApplicationResult import io.gearpump.cluster._ import io.gearpump.cluster.master.MasterProxy -import io.gearpump.jarstore.{FilePath, JarStoreService} +import io.gearpump.jarstore.JarStoreService import io.gearpump.util.Constants._ import io.gearpump.util.{ActorUtil, Constants, LogUtil, Util} import org.slf4j.Logger import scala.collection.JavaConversions._ -import scala.concurrent.Await +import scala.concurrent.{Await, Future} import scala.concurrent.duration.Duration -import scala.concurrent.Future import scala.util.Try - /** * ClientContext is a user facing util to submit/manage an application. */ @@ -75,33 +73,30 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { * "gearpump.app.jar" if defined. Otherwise, will assume the jar is on * the target runtime classpath, and will not send it. */ - def submit(app : Application) : Int = { + def submit(app: Application): Int = { submit(app, System.getProperty(GEARPUMP_APP_JAR)) } + def submit(app: Application, jar: String): Int = { + submit(app, jar, getExecutorNum()) + } - def submit(app : Application, jar: String) : Int = { - import app.{name, appMaster, userConfig} + def submit(app: Application, jar: String, executorNum: Int): Int = { + val client = getMasterClient + val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX)) val submissionConfig = getSubmissionConfig(config) - val appDescription = AppDescription(name, appMaster.getName, userConfig, submissionConfig) - submit(appDescription, jar) + .withValue(APPLICATION_EXECUTOR_NUMBER, ConfigValueFactory.fromAnyRef(executorNum)) + val appDescription = AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig) + val appJar = Option(jar).map(loadFile) + client.submitApplication(appDescription, appJar) } - import scala.collection.JavaConverters._ - private def getSubmissionConfig(config: Config): Config = { - ClusterConfig.filterOutDefaultConfig(config) + private def getExecutorNum(): Int = { + Try(System.getProperty(APPLICATION_EXECUTOR_NUMBER).toInt).getOrElse(1) } - private def submit(app : AppDescription, jarPath: String) : Int = { - val client = getMasterClient - val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX)) - val updatedApp = AppDescription(appName, app.appMaster, app.userConfig, app.clusterConfig) - if (jarPath == null) { - client.submitApplication(updatedApp, None) - } else { - val appJar = loadFile(jarPath) - client.submitApplication(updatedApp, Option(appJar)) - } + private def getSubmissionConfig(config: Config): Config = { + ClusterConfig.filterOutDefaultConfig(config) } def replayFromTimestampWindowTrailingEdge(appId : Int): ReplayApplicationResult = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala index 574c8bd..94c7532 100644 --- a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala +++ b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala @@ -26,8 +26,12 @@ case class Resource(slots : Int) { def >(other : Resource): Boolean = slots > other.slots + def >=(other : Resource): Boolean = !(this < other) + def <(other : Resource): Boolean = slots < other.slots + def <=(other : Resource): Boolean = !(this > other) + def equals(other : Resource): Boolean = slots == other.slots def isEmpty: Boolean = slots == 0 @@ -47,7 +51,7 @@ object Relaxation extends Enumeration{ import Relaxation._ import Priority._ -case class ResourceRequest(resource: Resource, workerId: Int = 0, priority: Priority = NORMAL, relaxation: Relaxation = ANY) +case class ResourceRequest(resource: Resource, workerId: Int = 0, priority: Priority = NORMAL, relaxation: Relaxation = ANY, executorNum: Int = 1) case class ResourceAllocation(resource : Resource, worker : ActorRef, workerId : Int) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/main/scala/io/gearpump/util/Constants.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/Constants.scala b/core/src/main/scala/io/gearpump/util/Constants.scala index 084d99b..ecf09e4 100644 --- a/core/src/main/scala/io/gearpump/util/Constants.scala +++ b/core/src/main/scala/io/gearpump/util/Constants.scala @@ -162,5 +162,5 @@ object Constants { val PREFER_IPV4 = "java.net.preferIPv4Stack" - + val APPLICATION_EXECUTOR_NUMBER = "gearpump.application.executor-num" } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/test/resources/test.conf ---------------------------------------------------------------------- diff --git a/core/src/test/resources/test.conf b/core/src/test/resources/test.conf index 3deb28b..324e8bd 100644 --- a/core/src/test/resources/test.conf +++ b/core/src/test/resources/test.conf @@ -12,6 +12,8 @@ gearpump { ### the appmaster will shutdown itself. resource-allocation-timeout-seconds = 10 + application.executor-num = 1 + worker.executor-process-launcher = "io.gearpump.cluster.worker.DefaultExecutorProcessLauncher" cluster { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala b/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala index acb6247..90f653c 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala @@ -37,6 +37,7 @@ object AppSubmitter extends AkkaApp with ArgumentsParser { override val options: Array[(String, CLIOption[Any])] = Array( "namePrefix" -> CLIOption[String]("<application name prefix>", required = false, defaultValue = Some("")), "jar" -> CLIOption("<application>.jar", required = true), + "executors" -> CLIOption[Int]("number of executor to launch", required = false, defaultValue = Some(1)), "verbose" -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false)), // For document purpose only, OPTION_CONFIG option is not used here. // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. @@ -58,6 +59,7 @@ object AppSubmitter extends AkkaApp with ArgumentsParser { // Set jar path to be submitted to cluster System.setProperty(Constants.GEARPUMP_APP_JAR, jar) + System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString) val namePrefix = config.getString("namePrefix") if (namePrefix.nonEmpty) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala index c3121d8..10e9dcb 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala @@ -26,14 +26,13 @@ import io.gearpump.cluster.scheduler.Scheduler.PendingRequest import scala.collection.mutable -class PriorityScheduler extends Scheduler{ - +class PriorityScheduler extends Scheduler { private var resourceRequests = new mutable.PriorityQueue[PendingRequest]()(requestOrdering) def requestOrdering = new Ordering[PendingRequest] { override def compare(x: PendingRequest, y: PendingRequest) = { var res = x.request.priority.id - y.request.priority.id - if(res == 0) + if (res == 0) res = y.timeStamp.compareTo(x.timeStamp) res } @@ -45,23 +44,30 @@ class PriorityScheduler extends Scheduler{ var scheduleLater = Array.empty[PendingRequest] val resourcesSnapShot = resources.clone() var allocated = Resource.empty - val totalResource = resourcesSnapShot.foldLeft(Resource.empty){ (totalResource, workerWithResource) => - val (_, (_, resource)) = workerWithResource - totalResource + resource - } + val totalResource = Resource(resourcesSnapShot.values.map(_._2.slots).sum) - while(resourceRequests.nonEmpty && (allocated < totalResource)) { + while (resourceRequests.nonEmpty && (allocated < totalResource)) { val PendingRequest(appId, appMaster, request, timeStamp) = resourceRequests.dequeue() request.relaxation match { case ANY => - val newAllocated = allocateFairly(resourcesSnapShot, PendingRequest(appId, appMaster, request, timeStamp)) + val allocations = allocateFairly(resourcesSnapShot, request) + val newAllocated = Resource(allocations.map(_.resource.slots).sum) + if (allocations.nonEmpty) { + appMaster ! ResourceAllocated(allocations.toArray) + } + if (newAllocated < request.resource) { + val remainingRequest = request.resource - newAllocated + val remainingExecutors = request.executorNum - allocations.length + val newResourceRequest = request.copy(resource = remainingRequest, executorNum = remainingExecutors) + scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, newResourceRequest, timeStamp) + } allocated = allocated + newAllocated case ONEWORKER => - val availableResource = resourcesSnapShot.find{params => + val availableResource = resourcesSnapShot.find { params => val (_, (_, resource)) = params resource > request.resource } - if(availableResource.nonEmpty){ + if (availableResource.nonEmpty) { val (workerId, (worker, resource)) = availableResource.get allocated = allocated + request.resource appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, workerId))) @@ -70,13 +76,12 @@ class PriorityScheduler extends Scheduler{ scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp) } case SPECIFICWORKER => - if (resourcesSnapShot.contains(request.workerId)) { - val (worker, availableResource) = resourcesSnapShot.get(request.workerId).get - if (availableResource > request.resource) { - appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, request.workerId))) - allocated = allocated + request.resource - resourcesSnapShot.update(request.workerId, (worker, availableResource - request.resource)) - } + val workerAndResource = resourcesSnapShot.get(request.workerId) + if (workerAndResource.nonEmpty && workerAndResource.get._2 > request.resource) { + val (worker, availableResource) = workerAndResource.get + appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, request.workerId))) + allocated = allocated + request.resource + resourcesSnapShot.update(request.workerId, (worker, availableResource - request.resource)) } else { scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp) } @@ -88,38 +93,48 @@ class PriorityScheduler extends Scheduler{ def resourceRequestHandler: Receive = { case RequestResource(appId, request) => - LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, relaxation: ${request.relaxation}") + LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, relaxation: ${request.relaxation}," + + s" executor number: ${request.executorNum}") val appMaster = sender() resourceRequests.enqueue(new PendingRequest(appId, appMaster, request, System.currentTimeMillis())) allocateResource() } - private def allocateFairly(resources : mutable.HashMap[Int, (ActorRef, Resource)], pendindRequest : PendingRequest): Resource ={ - val length = resources.size - val flattenResource = resources.toArray.zipWithIndex.flatMap((workerWithIndex) => { - val ((workerId, (worker, resource)), index) = workerWithIndex - 0.until(resource.slots).map((seq) => ((workerId, worker), seq * length + index)) - }).sortBy(_._2).map(_._1) - val PendingRequest(appId, appMaster, request, timeStamp) = pendindRequest - val total = Resource(flattenResource.size) - - val newAllocated = Resource.min(total, request.resource) - val singleAllocation = flattenResource.take(newAllocated.slots) - .groupBy((actor) => actor).mapValues(_.length).toArray.map((params) => { - val ((workerId, worker), slots) = params - resources.update(workerId, (worker, resources.get(workerId).get._2 - Resource(slots))) - ResourceAllocation(Resource(slots), worker, workerId) - }) - pendindRequest.appMaster ! ResourceAllocated(singleAllocation) - if (pendindRequest.request.resource > newAllocated) { - resourceRequests.enqueue( - PendingRequest(appId, appMaster, - ResourceRequest(request.resource - newAllocated, request.workerId, request.priority), timeStamp)) - } - newAllocated - } - override def doneApplication(appId: Int): Unit = { resourceRequests = resourceRequests.filter(_.appId != appId) } + + private def allocateFairly(resources: mutable.HashMap[Int, (ActorRef, Resource)], request: ResourceRequest): List[ResourceAllocation] = { + val workerNum = resources.size + var allocations = List.empty[ResourceAllocation] + var totalAvailable = Resource(resources.values.map(_._2.slots).sum) + var remainingRequest = request.resource + var remainingExecutors = Math.min(request.executorNum, request.resource.slots) + + while (remainingExecutors > 0 && !totalAvailable.isEmpty) { + val exeutorNum = Math.min(workerNum, remainingExecutors) + val toRequest = Resource(remainingRequest.slots * exeutorNum / remainingExecutors) + + val flattenResource = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse).take(exeutorNum).zipWithIndex.flatMap { workerWithIndex => + val ((workerId, (worker, resource)), index) = workerWithIndex + 0.until(resource.slots).map(seq => ((workerId, worker), seq * workerNum + index)) + }.sortBy(_._2).map(_._1) + + if (flattenResource.length < toRequest.slots) { + //Can not safisfy the user's requirements + totalAvailable = Resource.empty + } else { + flattenResource.take(toRequest.slots).groupBy(actor => actor).mapValues(_.length). + toArray.foreach { params => + val ((workerId, worker), slots) = params + resources.update(workerId, (worker, resources.get(workerId).get._2 - Resource(slots))) + allocations :+= ResourceAllocation(Resource(slots), worker, workerId) + } + totalAvailable -= toRequest + remainingRequest -= toRequest + remainingExecutors -= exeutorNum + } + } + allocations + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala index ccfd453..0a7e1f8 100644 --- a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala +++ b/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala @@ -19,7 +19,6 @@ package io.gearpump.cluster.scheduler import akka.actor.{ActorSystem, Props} import akka.testkit.{ImplicitSender, TestKit, TestProbe} -import io.gearpump.cluster.master.Master.MasterInfo import io.gearpump.cluster.AppMasterToMaster.RequestResource import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered} @@ -84,12 +83,11 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1)))) mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))) mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))) - mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(10), mockWorker1.ref, workerId1)))) scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref) scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) - mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(30), mockWorker2.ref, workerId2)))) + mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))) } } @@ -124,7 +122,7 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))) - val request3 = ResourceRequest(Resource(30), 0, Priority.NORMAL, Relaxation.ANY) + val request3 = ResourceRequest(Resource(30), 0, Priority.NORMAL, Relaxation.ANY, executorNum = 2) scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(15), mockWorker1.ref, workerId1), ResourceAllocation(Resource(15), mockWorker2.ref, workerId2)))) @@ -136,4 +134,42 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1)))) } } + + "The PriorityScheduler" should { + "handle the resource request with different executor number" in { + val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) + scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref) + scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref) + scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) + + //By default, the request requires only one executor + val request2 = ResourceRequest(Resource(20)) + scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) + val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] + assert(allocations2.allocations.length == 1) + assert(allocations2.allocations.head.resource == Resource(20)) + + val request3 = ResourceRequest(Resource(24), executorNum = 3) + scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) + val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] + assert(allocations3.allocations.length == 3) + assert(allocations3.allocations.forall(_.resource == Resource(8))) + + //The total available resource can not satisfy the requirements with executor number + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref) + scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref) + val request4 = ResourceRequest(Resource(60), executorNum = 3) + scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref) + val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] + assert(allocations4.allocations.length == 2) + assert(allocations4.allocations.forall(_.resource == Resource(20))) + + //When new resources are available, the remaining request will be satisfied + scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref) + val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] + assert(allocations5.allocations.length == 1) + assert(allocations4.allocations.forall(_.resource == Resource(20))) + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/docs/commandline.md ---------------------------------------------------------------------- diff --git a/docs/commandline.md b/docs/commandline.md index 2c9334c..7e264f9 100644 --- a/docs/commandline.md +++ b/docs/commandline.md @@ -18,7 +18,7 @@ If you use Maven you can have a look [here](https://maven.apache.org/plugins/mav You can use the command `gear` under the bin directory to submit, query and terminate an application: ```bash -gear app [-namePrefix <application name prefix>] [-conf <custom gearpump config file>] -jar xx.jar MainClass <arg1> <arg2> ... +gear app [-namePrefix <application name prefix>] [-executors <number of executors to launch>] [-conf <custom gearpump config file>] -jar xx.jar MainClass <arg1> <arg2> ... ``` ### List all running applications http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala index e9973e5..c16ab41 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala @@ -139,13 +139,14 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe implicit val system = clientContext.system val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf) val stormConfig = gearpumpStormTopology.getStormConfig + val workerNum = StormUtil.getInt(stormConfig, Config.TOPOLOGY_WORKERS).getOrElse(1) val processorGraph = GraphBuilder.build(gearpumpStormTopology) val config = UserConfig.empty .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology) .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig) val app = StreamApplication(name, processorGraph, config) LOG.info(s"jar file uploaded to $uploadedJarLocation") - val appId = clientContext.submit(app, uploadedJarLocation) + val appId = clientContext.submit(app, uploadedJarLocation, workerNum) applications += name -> appId topologies += name -> TopologyData(topology, stormConfig, uploadedJarLocation) LOG.info(s"Storm Application $appId submitted") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala index 8e294ea..59c956b 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala @@ -62,7 +62,7 @@ class ConnectorKafkaSpec extends TestSpecBase { "-sourceTopic", sourceTopic, "-sinkTopic", sinkTopic).mkString(" ") val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(kafkaJar, args) + val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args) success shouldBe true // verify @@ -90,7 +90,7 @@ class ConnectorKafkaSpec extends TestSpecBase { "-sinkTopic", sinkTopic, "-source", sourcePartitionNum).mkString(" ") val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(kafkaJar, args) + val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args) success shouldBe true // verify #1 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala index 5a60274..44f98aa 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala @@ -121,7 +121,7 @@ class DynamicDagSpec extends TestSpecBase { private def expectSolJarSubmittedWithAppId(): Int = { val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(solJar) + val success = restClient.submitApp(solJar, cluster.getWorkerHosts.length) success shouldBe true expectAppIsRunning(appId, solName) Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala index ac18807..2772bc3 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala @@ -32,7 +32,7 @@ class ExampleSpec extends TestSpecBase { val mainClass = "io.gearpump.examples.distributedshell.DistributedShell" val clientClass = "io.gearpump.examples.distributedshell.DistributedShellClient" val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(distShellJar, mainClass) + val success = restClient.submitApp(distShellJar, cluster.getWorkerHosts.length, mainClass) success shouldBe true expectAppIsRunning(appId, "DistributedShell") val args = Array( @@ -43,7 +43,8 @@ class ExampleSpec extends TestSpecBase { val expectedHostNames = cluster.getWorkerHosts.map(Docker.execAndCaptureOutput(_, "hostname")) def verify(): Boolean = { - val result = commandLineClient.submitAppAndCaptureOutput(distShellJar, args.mkString(" ")).split("\n"). + val workerNum = cluster.getWorkerHosts.length + val result = commandLineClient.submitAppAndCaptureOutput(distShellJar, workerNum, args.mkString(" ")).split("\n"). filterNot(line => line.startsWith("[INFO]") || line.isEmpty) expectedHostNames.forall(result.contains) } @@ -59,7 +60,7 @@ class ExampleSpec extends TestSpecBase { "can submit immediately after killing a former one" in { // setup val formerAppId = restClient.getNextAvailableAppId() - val formerSubmissionSuccess = restClient.submitApp(wordCountJar) + val formerSubmissionSuccess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) formerSubmissionSuccess shouldBe true expectAppIsRunning(formerAppId, wordCountName) Util.retryUntil(restClient.queryStreamingAppDetail(formerAppId).clock > 0) @@ -67,7 +68,7 @@ class ExampleSpec extends TestSpecBase { // exercise val appId = formerAppId + 1 - val success = restClient.submitApp(wordCountJar) + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) success shouldBe true expectAppIsRunning(appId, wordCountName) } @@ -97,7 +98,7 @@ class ExampleSpec extends TestSpecBase { "can obtain application clock and the clock will keep changing" in { // setup val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(jar) + val success = restClient.submitApp(jar, cluster.getWorkerHosts.length) success shouldBe true expectAppIsRunning(appId, appName) @@ -110,7 +111,7 @@ class ExampleSpec extends TestSpecBase { "can change the parallelism and description of a processor" in { // setup val appId = restClient.getNextAvailableAppId() - val formerSubmissionSuccess = restClient.submitApp(jar) + val formerSubmissionSuccess = restClient.submitApp(jar, cluster.getWorkerHosts.length) formerSubmissionSuccess shouldBe true expectAppIsRunning(appId, appName) val formerProcessors = restClient.queryStreamingAppDetail(appId).processors http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala index cf2ca03..2830390 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala @@ -43,7 +43,7 @@ class RestServiceSpec extends TestSpecBase { "retrieve 1 application after the first application submission" in { // exercise val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(wordCountJar) + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) success shouldBe true expectAppIsRunning(appId, wordCountName) restClient.listRunningApps().length shouldEqual 1 @@ -54,7 +54,7 @@ class RestServiceSpec extends TestSpecBase { "find a running application after submission" in { // exercise val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(wordCountJar) + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) success shouldBe true expectAppIsRunning(appId, wordCountName) } @@ -62,18 +62,18 @@ class RestServiceSpec extends TestSpecBase { "reject a repeated submission request while the application is running" in { // setup val appId = restClient.getNextAvailableAppId() - val formerSubmissionSuccess = restClient.submitApp(wordCountJar) + val formerSubmissionSuccess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) formerSubmissionSuccess shouldBe true expectAppIsRunning(appId, wordCountName) // exercise - val success = restClient.submitApp(wordCountJar) + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) success shouldBe false } "reject an invalid submission (the jar file path is incorrect)" in { // exercise - val success = restClient.submitApp(wordCountJar + ".missing") + val success = restClient.submitApp(wordCountJar + ".missing", cluster.getWorkerHosts.length) success shouldBe false } @@ -84,7 +84,7 @@ class RestServiceSpec extends TestSpecBase { val appId = restClient.getNextAvailableAppId() // exercise - val success = restClient.submitApp(wordCountJar, s"-split $splitNum -sum $sumNum") + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length, s"-split $splitNum -sum $sumNum") success shouldBe true expectAppIsRunning(appId, wordCountName) val processors = restClient.queryStreamingAppDetail(appId).processors @@ -98,7 +98,7 @@ class RestServiceSpec extends TestSpecBase { "can obtain application metrics and the metrics will keep changing" in { // setup val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(wordCountJar) + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) success shouldBe true expectAppIsRunning(appId, wordCountName) @@ -122,7 +122,7 @@ class RestServiceSpec extends TestSpecBase { "can obtain application corresponding executors' metrics and the metrics will keep changing" in { // setup val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(wordCountJar) + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) success shouldBe true expectAppIsRunning(appId, wordCountName) @@ -148,7 +148,7 @@ class RestServiceSpec extends TestSpecBase { "a running application should be killed" in { // setup val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(wordCountJar) + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) success shouldBe true expectAppIsRunning(appId, wordCountName) @@ -159,7 +159,7 @@ class RestServiceSpec extends TestSpecBase { "should fail when attempting to kill a stopped application" in { // setup val appId = restClient.getNextAvailableAppId() - val submissionSucess = restClient.submitApp(wordCountJar) + val submissionSucess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) submissionSucess shouldBe true expectAppIsRunning(appId, wordCountName) killAppAndVerify(appId) @@ -291,7 +291,7 @@ class RestServiceSpec extends TestSpecBase { val appId = restClient.getNextAvailableAppId() // exercise - val success = restClient.submitApp(wordCountJar) + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) success shouldBe true restClient.queryExecutorBrief(appId).foreach { executor => val executorId = executor.executorId @@ -308,7 +308,7 @@ class RestServiceSpec extends TestSpecBase { val appId = restClient.getNextAvailableAppId() // exercise - val success = restClient.submitApp(wordCountJar) + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) success shouldBe true val actual = restClient.queryAppMasterConfig(appId) actual.hasPath("gearpump") shouldBe true @@ -322,7 +322,7 @@ class RestServiceSpec extends TestSpecBase { val originSplitNum = 4 val originSumNum = 3 val originAppId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(wordCountJar, s"-split $originSplitNum -sum $originSumNum") + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length, s"-split $originSplitNum -sum $originSumNum") success shouldBe true expectAppIsRunning(originAppId, wordCountName) val originAppDetail = restClient.queryStreamingAppDetail(originAppId) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala index d923a6d..506fc67 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala @@ -49,8 +49,8 @@ class CommandLineClient(host: String) { "" } - def submitAppAndCaptureOutput(jar: String, args: String = ""): String = { - execAndCaptureOutput(s"gear app -verbose true -jar $jar $args") + def submitAppAndCaptureOutput(jar: String, executorNum: Int, args: String = ""): String = { + execAndCaptureOutput(s"gear app -verbose true -jar $jar -executors $executorNum $args") } def submitApp(jar: String, args: String = ""): Int = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala index 2b3003c..2657bb3 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala @@ -87,10 +87,10 @@ class RestClient(host: String, port: Int) { listApps().length + 1 } - def submitApp(jar: String, args: String = "", config: String = ""): Boolean = try { + def submitApp(jar: String, executorNum: Int, args: String = "", config: String = ""): Boolean = try { var endpoint = "master/submitapp" if (args.length > 0) { - endpoint += "?args=" + Util.encodeUriComponent(args) + endpoint += s"?executorNum=${executorNum}&args=" + Util.encodeUriComponent(args) } var options = Seq(s"jar=@$jar") if (config.length > 0) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/services/dashboard/services/restapi.js ---------------------------------------------------------------------- diff --git a/services/dashboard/services/restapi.js b/services/dashboard/services/restapi.js index 386d319..d52d89d 100644 --- a/services/dashboard/services/restapi.js +++ b/services/dashboard/services/restapi.js @@ -118,19 +118,19 @@ angular.module('dashboard') }, /** Submit an user defined application with user configuration */ - submitUserApp: function(files, formFormNames, args, onComplete) { + submitUserApp: function(files, formFormNames, executorNum, args, onComplete) { return self._submitApp(restapiV1Root + 'master/submitapp', - files, formFormNames, args, onComplete); + files, formFormNames, executorNum, args, onComplete); }, /** Submit a Storm application */ - submitStormApp: function(files, formFormNames, args, onComplete) { + submitStormApp: function(files, formFormNames, executorNum, args, onComplete) { return self._submitApp(restapiV1Root + 'master/submitstormapp', - files, formFormNames, args, onComplete); + files, formFormNames, executorNum, args, onComplete); }, - _submitApp: function(url, files, formFormNames, args, onComplete) { - var params = args ? '?args=' + encodeURIComponent(args) : ''; + _submitApp: function(url, files, formFormNames, executorNum, args, onComplete) { + var params = '?executorNum=' + executorNum + '&args=' + encodeURIComponent(args); var upload = Upload.upload({ url: url + params, method: 'POST', http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/services/dashboard/views/apps/submit/submit.html ---------------------------------------------------------------------- diff --git a/services/dashboard/views/apps/submit/submit.html b/services/dashboard/views/apps/submit/submit.html index 140fe17..bc4af98 100644 --- a/services/dashboard/views/apps/submit/submit.html +++ b/services/dashboard/views/apps/submit/submit.html @@ -41,6 +41,10 @@ ng-model="conf" accept-pattern="{{confFileSuffix}}"></form-control> <!-- input 3 --> <form-control + type="integer" min="1" label="Executor Number" ng-hide="isStormApp" + ng-model="executorNum"></form-control> + <!-- input 4 --> + <form-control type="text" label="Arguments" help="Application specific launch arguments (optional). E.g. WordCount can use "-split 2 -sum 1"" ng-model="launchArgs"></form-control> http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/services/dashboard/views/apps/submit/submit.js ---------------------------------------------------------------------- diff --git a/services/dashboard/views/apps/submit/submit.js b/services/dashboard/views/apps/submit/submit.js index 4ca1895..5e967f5 100644 --- a/services/dashboard/views/apps/submit/submit.js +++ b/services/dashboard/views/apps/submit/submit.js @@ -10,6 +10,7 @@ angular.module('dashboard') $scope.dialogTitle = 'Submit Gearpump Application'; $scope.confFileSuffix = '.conf'; + $scope.executorNum = 1; var submitFn = restapi.submitUserApp; if ($scope.isStormApp) { $scope.dialogTitle = 'Submit Storm Application'; @@ -29,7 +30,7 @@ angular.module('dashboard') fileFormNames.push('conf'); } $scope.uploading = true; - submitFn(files, fileFormNames, $scope.launchArgs, function(response) { + submitFn(files, fileFormNames, $scope.executorNum, $scope.launchArgs, function(response) { $scope.shouldNoticeSubmitFailed = !response.success; $scope.uploading = false; if (response.success) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala index 839c0ae..dd0e719 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala @@ -109,12 +109,12 @@ class MasterService(val master: ActorRef, } ~ path("submitapp") { post { - parameters("args" ? "") { args: String => + parameters('executorNum.as[Int] ? 1, 'args ? "") { (executorNum, args) => uploadFile { fileMap => val jar = fileMap.get("jar").map(_.file) val userConf = fileMap.get("conf").map(_.file) onComplete(Future( - MasterService.submitGearApp(jar, args, systemConfig, userConf) + MasterService.submitGearApp(jar, executorNum, args, systemConfig, userConf) )) { case Success(success) => val response = MasterService.AppSubmissionResult(success) @@ -128,7 +128,7 @@ class MasterService(val master: ActorRef, } ~ path("submitstormapp") { post { - parameters("args" ? "") { args: String => + parameters('executorNum.as[Int] ? 1, 'args ? "") { (executorNum, args) => uploadFile { fileMap => val jar = fileMap.get("jar").map(_.file) val stormConf = fileMap.get("conf").map(_.file) @@ -201,10 +201,10 @@ object MasterService { /** * Submit Native Application. */ - def submitGearApp(jar: Option[File], args: String, systemConfig: Config, userConfigFile: Option[File]): Boolean = { + def submitGearApp(jar: Option[File], executorNum: Int, args: String, systemConfig: Config, userConfigFile: Option[File]): Boolean = { submitAndDeleteTempFiles( "io.gearpump.cluster.main.AppSubmitter", - argsArray = spaceSeparatedArgumentsToArray(args), + argsArray = Array("-executors", executorNum.toString) ++ spaceSeparatedArgumentsToArray(args), fileMap = Map("jar" -> jar).filter(_._2.isDefined).mapValues(_.get), classPath = getUserApplicationClassPath, systemConfig, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala index 6d589f5..9510531 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala @@ -24,7 +24,7 @@ import io.gearpump.streaming.DAG import io.gearpump.streaming.appmaster.TaskLocator.{Locality, WorkerLocality} import io.gearpump.streaming.appmaster.TaskScheduler.{Location, TaskStatus} import io.gearpump.streaming.task.TaskId -import io.gearpump.util.LogUtil +import io.gearpump.util.{Constants, LogUtil} import org.slf4j.Logger /** @@ -80,7 +80,7 @@ object TaskScheduler { } class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends TaskScheduler { - private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) + private val executorNum = config.getInt(Constants.APPLICATION_EXECUTOR_NUMBER) private var tasks = List.empty[TaskStatus] @@ -121,7 +121,7 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends T workersResourceRequest.map {workerIdAndResource => val (workerId, resource) = workerIdAndResource if (workerId == WORKER_NO_PREFERENCE) { - ResourceRequest(resource) + ResourceRequest(resource, executorNum = executorNum) } else { ResourceRequest(resource, workerId, relaxation = SPECIFICWORKER) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala index bc3f265..d9ebb37 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala @@ -20,7 +20,7 @@ package io.gearpump.streaming.appmaster import akka.actor.ActorSystem import com.typesafe.config.ConfigFactory import io.gearpump.streaming.{ProcessorDescription, DAG} -import io.gearpump.cluster.AppJar +import io.gearpump.cluster.{TestUtil, AppJar} import io.gearpump.cluster.scheduler.{Resource, ResourceRequest} import io.gearpump.jarstore.FilePath import io.gearpump.partitioner.{HashPartitioner, Partitioner} @@ -47,7 +47,7 @@ class JarSchedulerSpec extends WordSpec with Matchers { "schedule tasks depends on app jar" in { val system = ActorSystem("JarSchedulerSpec") implicit val dispatcher = system.dispatcher - val manager = new JarScheduler(0, "APP", ConfigFactory.empty(), system) + val manager = new JarScheduler(0, "APP", TestUtil.DEFAULT_CONFIG, system) manager.setDag(dag, Future{0L}) val requests = Array(ResourceRequest(Resource(2))) val result = Await.result(manager.getRequestDetails(), 15 seconds) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala index 78caf62..aeef61d 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala @@ -23,7 +23,7 @@ import io.gearpump.streaming.appmaster.TaskLocator.Localities import io.gearpump.streaming.task.{StartTime, TaskContext, TaskId} import io.gearpump.Message import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest} -import io.gearpump.cluster.{ClusterConfig, UserConfig} +import io.gearpump.cluster.{TestUtil, ClusterConfig, UserConfig} import io.gearpump.partitioner.{HashPartitioner, Partitioner} import io.gearpump.streaming.appmaster.TaskLocator.Localities import io.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2} @@ -41,7 +41,7 @@ class TaskSchedulerSpec extends WordSpec with Matchers { val dag = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2)) - val config = ClusterConfig.default() + val config = TestUtil.DEFAULT_CONFIG "TaskScheduler" should { "schedule tasks on different workers properly according user's configuration" in {
