GEARPUMP-2, Define REST API to submit job jar
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/7d42d4cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/7d42d4cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/7d42d4cd Branch: refs/heads/master Commit: 7d42d4cd7035bce8ba8afed0bf9a5355d03828b4 Parents: 21d5921 Author: Sean Zhong <[email protected]> Authored: Tue Mar 29 21:40:16 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Apr 26 14:24:00 2016 +0800 ---------------------------------------------------------------------- .../scala/io/gearpump/util/FileDirective.scala | 52 +++++++++--- .../scala/io/gearpump/util/FileServer.scala | 16 +++- docs/dev-rest-api.md | 88 +++++++++++++++++++- .../minicluster/RestClient.scala | 11 ++- services/dashboard/services/restapi.js | 15 ++-- .../dashboard/views/apps/submit/submit.html | 3 +- services/dashboard/views/apps/submit/submit.js | 2 +- .../io/gearpump/services/AppMasterService.scala | 5 +- .../io/gearpump/services/MasterService.scala | 78 ++++++++++------- 9 files changed, 213 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/daemon/src/main/scala/io/gearpump/util/FileDirective.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala b/daemon/src/main/scala/io/gearpump/util/FileDirective.scala index f4f82fb..c39d27e 100644 --- a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala +++ b/daemon/src/main/scala/io/gearpump/util/FileDirective.scala @@ -26,6 +26,7 @@ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server._ import akka.stream.Materializer import akka.stream.scaladsl.FileIO +import akka.util.ByteString import scala.concurrent.{ExecutionContext, Future} @@ -52,12 +53,31 @@ object FileDirective { */ case class FileInfo(originFileName: String, file: File, length: Long) + class Form(val fields: Map[Name, FormField]) { + def getFile(fieldName: String): Option[FileInfo] = { + fields.get(fieldName).flatMap { + case Left(file) => Option(file) + case Right(_) => None + } + } + + def getValue(fieldName: String): Option[String] = { + fields.get(fieldName).flatMap { + case Left(_) => None + case Right(value) => Option(value) + } + } + } + + type FormField = Either[FileInfo, String] + + /** * directive to uploadFile, it store the uploaded files * to temporary directory, and return a Map from form field name * to FileInfo. */ - def uploadFile: Directive1[Map[Name, FileInfo]] = { + def uploadFile: Directive1[Form] = { uploadFileTo(null) } @@ -67,16 +87,14 @@ object FileDirective { * @param rootDirectory directory to store the files. * @return */ - def uploadFileTo(rootDirectory: File): Directive1[Map[Name, FileInfo]] = { - Directive[Tuple1[Map[Name, FileInfo]]] { inner => + def uploadFileTo(rootDirectory: File): Directive1[Form] = { + Directive[Tuple1[Form]] { inner => extractMaterializer {implicit mat => extractExecutionContext {implicit ec => uploadFileImpl(rootDirectory)(mat, ec) { filesFuture => ctx => { filesFuture.map(map => inner(Tuple1(map))).flatMap(route => route(ctx)) } - - } } } @@ -94,10 +112,10 @@ object FileDirective { complete(responseEntity) } - private def uploadFileImpl(rootDirectory: File)(implicit mat: Materializer, ec: ExecutionContext): Directive1[Future[Map[Name, FileInfo]]] = { - Directive[Tuple1[Future[Map[Name, FileInfo]]]] { inner => + private def uploadFileImpl(rootDirectory: File)(implicit mat: Materializer, ec: ExecutionContext): Directive1[Future[Form]] = { + Directive[Tuple1[Future[Form]]] { inner => entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) => - val fileNameMap = formdata.parts.mapAsync(1) { p => + val form = formdata.parts.mapAsync(1) { p => if (p.filename.isDefined) { //reserve the suffix @@ -105,15 +123,23 @@ object FileDirective { val written = p.entity.dataBytes.runWith(FileIO.toFile(targetPath)) written.map(written => if (written.count > 0) { - Map(p.name -> FileInfo(p.filename.get, targetPath, written.count)) + Map(p.name -> Left(FileInfo(p.filename.get, targetPath, written.count))) } else { - Map.empty[Name, FileInfo] + Map.empty[Name, FormField] }) } else { - Future(Map.empty[Name, FileInfo]) + val valueFuture = p.entity.dataBytes.runFold(ByteString.empty){(total, input) => + total ++ input + } + valueFuture.map{value => + Map(p.name -> Right(value.utf8String)) + } } - }.runFold(Map.empty[Name, FileInfo])((set, value) => set ++ value) - inner(Tuple1(fileNameMap)) + }.runFold(new Form(Map.empty[Name, FormField])){(set, value) => + new Form(set.fields ++ value) + } + + inner(Tuple1(form)) } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/daemon/src/main/scala/io/gearpump/util/FileServer.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/util/FileServer.scala b/daemon/src/main/scala/io/gearpump/util/FileServer.scala index 361c01d..4be3f2f 100644 --- a/daemon/src/main/scala/io/gearpump/util/FileServer.scala +++ b/daemon/src/main/scala/io/gearpump/util/FileServer.scala @@ -52,8 +52,20 @@ class FileServer(system: ActorSystem, host: String, port: Int = 0, rootDirectory val route: Route = { path("upload") { - uploadFileTo(rootDirectory) { fileMap => - complete(fileMap.head._2.file.getName) + uploadFileTo(rootDirectory) { form => + val fileName = form.fields.headOption.flatMap{pair => + val (_, fileInfo) = pair + fileInfo match { + case Left(file) => Option(file.file).map(_.getName) + case Right(_) => None + } + } + + if (fileName.isDefined) { + complete(fileName.get) + } else { + failWith(new Exception("File not found in the uploaded form")) + } } } ~ path("download") { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/docs/dev-rest-api.md ---------------------------------------------------------------------- diff --git a/docs/dev-rest-api.md b/docs/dev-rest-api.md index 36eeb0b..e4b8793 100644 --- a/docs/dev-rest-api.md +++ b/docs/dev-rest-api.md @@ -304,6 +304,92 @@ Sample Response: } ``` +### POST api/v1.0/master/submitapp +Submit a streaming job jar to Gearpump cluster. It functions like command line +``` +gear app -jar xx.jar -conf yy.conf -executors 1 <command line arguments> +``` + +Required MIME type: "multipart/form-data" + +Required post form fields: + +1. field name "jar", job jar file. + +Optional post form fields: + +1. "configfile", configuration file, in UTF8 format. +2. "configstring", text body of configuration file, in UTF8 format. +3. "executorcount", The count of JVM process to start across the cluster for this application job +4. "args", command line arguments for this job jar. + +Example html: + +```bash +<form id="submitapp" action="http://127.0.0.1:8090/api/v1.0/master/submitapp" +method="POST" enctype="multipart/form-data"> + +Job Jar (*.jar) [Required]: <br/> +<input type="file" name="jar"/> <br/> <br/> + +Config file (*.conf) [Optional]: <br/> +<input type="file" name="configfile"/> <br/> <br/> + +Config String, Config File in string format. [Optional]: <br/> +<input type="text" name="configstring" value="a.b.c.d=1"/> <br/><br/> + +Executor count (integer, how many process to start for this streaming job) [Optional]: <br/> +<input type="text" name="executorcount" value="1"/> <br/><br/> + +Application arguments (String) [Optional]: <br/> +<input type="text" name="args" value=""/> <br/><br/> + +<input type="submit" value="Submit"/> + +</table> + +</form> +``` + +### POST api/v1.0/master/submitstormapp +Submit a storm jar to Gearpump cluster. It functions like command line +``` +storm app -jar xx.jar -conf yy.yaml <command line arguments> +``` + +Required MIME type: "multipart/form-data" + +Required post form fields: + +1. field name "jar", job jar file. + +Optional post form fields: + +1. "configfile", .yaml configuration file, in UTF8 format. +2. "args", command line arguments for this job jar. + +Example html: + +```bash +<form id="submitstormapp" action="http://127.0.0.1:8090/api/v1.0/master/submitstormapp" +method="POST" enctype="multipart/form-data"> + +Job Jar (*.jar) [Required]: <br/> +<input type="file" name="jar"/> <br/> <br/> + +Config file (*.yaml) [Optional]: <br/> +<input type="file" name="configfile"/> <br/> <br/> + +Application arguments (String) [Optional]: <br/> +<input type="text" name="args" value=""/> <br/><br/> + +<input type="submit" value="Submit"/> + +</table> + +</form> +``` + ## Worker service ### GET api/v1.0/worker/<workerId> @@ -952,4 +1038,4 @@ Sample Response: ], "jvmName": "21304@lisa" } -``` \ No newline at end of file +``` http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala index 2657bb3..f11ff9c 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala @@ -89,13 +89,18 @@ class RestClient(host: String, port: Int) { def submitApp(jar: String, executorNum: Int, args: String = "", config: String = ""): Boolean = try { var endpoint = "master/submitapp" - if (args.length > 0) { - endpoint += s"?executorNum=${executorNum}&args=" + Util.encodeUriComponent(args) - } + var options = Seq(s"jar=@$jar") if (config.length > 0) { options :+= s"conf=@$config" } + + options :+= s"executorcount=$executorNum" + + if (args != null && !args.isEmpty) { + options :+= "args=\"" + args + "\"" + } + val resp = callApi(endpoint, options.map("-F " + _).mkString(" ")) val result = decodeAs[AppSubmissionResult](resp) assert(result.success) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/services/dashboard/services/restapi.js ---------------------------------------------------------------------- diff --git a/services/dashboard/services/restapi.js b/services/dashboard/services/restapi.js index d52d89d..f9cdceb 100644 --- a/services/dashboard/services/restapi.js +++ b/services/dashboard/services/restapi.js @@ -118,9 +118,9 @@ angular.module('dashboard') }, /** Submit an user defined application with user configuration */ - submitUserApp: function(files, formFormNames, executorNum, args, onComplete) { + submitUserApp: function(files, fileFieldNames, executorNum, args, onComplete) { return self._submitApp(restapiV1Root + 'master/submitapp', - files, formFormNames, executorNum, args, onComplete); + files, fileFieldNames, executorNum, args, onComplete); }, /** Submit a Storm application */ @@ -129,13 +129,16 @@ angular.module('dashboard') files, formFormNames, executorNum, args, onComplete); }, - _submitApp: function(url, files, formFormNames, executorNum, args, onComplete) { - var params = '?executorNum=' + executorNum + '&args=' + encodeURIComponent(args); + _submitApp: function(url, files, fileFieldNames, executorNum, args, onComplete) { var upload = Upload.upload({ - url: url + params, + url: url, method: 'POST', file: files, - fileFormDataName: formFormNames + fileFormDataName: fileFieldNames, + fields: { + "executorcount": executorNum, + "args": args + } }); upload.then(function(response) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/services/dashboard/views/apps/submit/submit.html ---------------------------------------------------------------------- diff --git a/services/dashboard/views/apps/submit/submit.html b/services/dashboard/views/apps/submit/submit.html index bc4af98..001a75a 100644 --- a/services/dashboard/views/apps/submit/submit.html +++ b/services/dashboard/views/apps/submit/submit.html @@ -41,7 +41,8 @@ ng-model="conf" accept-pattern="{{confFileSuffix}}"></form-control> <!-- input 3 --> <form-control - type="integer" min="1" label="Executor Number" ng-hide="isStormApp" + type="integer" min="1" label="Executor Count" ng-hide="isStormApp" + help="How many JVM processes to start for this job in the whole cluster. E.g. set it as 12 will start 12 executor processes spanning across the cluster" ng-model="executorNum"></form-control> <!-- input 4 --> <form-control http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/services/dashboard/views/apps/submit/submit.js ---------------------------------------------------------------------- diff --git a/services/dashboard/views/apps/submit/submit.js b/services/dashboard/views/apps/submit/submit.js index 5e967f5..23c806f 100644 --- a/services/dashboard/views/apps/submit/submit.js +++ b/services/dashboard/views/apps/submit/submit.js @@ -27,7 +27,7 @@ angular.module('dashboard') var fileFormNames = ['jar']; if ($scope.conf) { files.push($scope.conf); - fileFormNames.push('conf'); + fileFormNames.push('configfile'); } $scope.uploading = true; submitFn(files, fileFormNames, $scope.executorNum, $scope.launchArgs, function(response) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala index 50b2ede..98ce0b1 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala @@ -67,8 +67,9 @@ class AppMasterService(val master: ActorRef, val msg = java.net.URLDecoder.decode(args) val dagOperation = read[DAGOperation](msg) (post & entity(as[Multipart.FormData])) { _ => - uploadFile { fileMap => - val jar = fileMap.get("jar").map(_.file) + uploadFile { form => + val jar = form.getFile("jar").map(_.file) + if (jar.nonEmpty) { dagOperation match { case replace: ReplaceProcessor => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala index dd0e719..3e2b9ca 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala @@ -20,6 +20,9 @@ package io.gearpump.services import java.io.{File, IOException} +import java.nio.file.Files +import java.nio.charset.StandardCharsets.UTF_8 +import java.nio.file.StandardOpenOption.{WRITE, APPEND} import akka.actor.{ActorRef, ActorSystem} import akka.http.scaladsl.server.Directives._ @@ -109,38 +112,41 @@ class MasterService(val master: ActorRef, } ~ path("submitapp") { post { - parameters('executorNum.as[Int] ? 1, 'args ? "") { (executorNum, args) => - uploadFile { fileMap => - val jar = fileMap.get("jar").map(_.file) - val userConf = fileMap.get("conf").map(_.file) - onComplete(Future( - MasterService.submitGearApp(jar, executorNum, args, systemConfig, userConf) - )) { - case Success(success) => - val response = MasterService.AppSubmissionResult(success) - complete(write(response)) - case Failure(ex) => - failWith(ex) - } + uploadFile { form => + val jar = form.getFile("jar").map(_.file) + val configFile = form.getFile("configfile").map(_.file) + val configString = form.getValue("configstring").getOrElse("") + val executorCount = form.getValue("executorcount").getOrElse("1").toInt + val args = form.getValue("args").getOrElse("") + + val mergedConfigFile = mergeConfig(configFile, configString) + + onComplete(Future( + MasterService.submitGearApp(jar, executorCount, args, systemConfig, mergedConfigFile) + )) { + case Success(success) => + val response = MasterService.AppSubmissionResult(success) + complete(write(response)) + case Failure(ex) => + failWith(ex) } } } } ~ path("submitstormapp") { post { - parameters('executorNum.as[Int] ? 1, 'args ? "") { (executorNum, args) => - uploadFile { fileMap => - val jar = fileMap.get("jar").map(_.file) - val stormConf = fileMap.get("conf").map(_.file) - onComplete(Future( - MasterService.submitStormApp(jar, stormConf, args, systemConfig) - )) { - case Success(success) => - val response = MasterService.AppSubmissionResult(success) - complete(write(response)) - case Failure(ex) => - failWith(ex) - } + uploadFile { form => + val jar = form.getFile("jar").map(_.file) + val configFile = form.getFile("configfile").map(_.file) + val args = form.getValue("args").getOrElse("") + onComplete(Future( + MasterService.submitStormApp(jar, configFile, args, systemConfig) + )) { + case Success(success) => + val response = MasterService.AppSubmissionResult(success) + complete(write(response)) + case Failure(ex) => + failWith(ex) } } } @@ -171,8 +177,8 @@ class MasterService(val master: ActorRef, } } ~ path("uploadjar") { - uploadFile { fileMap => - val jar = fileMap.get("jar").map(_.file) + uploadFile { form => + val jar = form.getFile("jar").map(_.file) if (jar.isEmpty) { complete(write( MasterService.Status(success = false, reason = "Jar file not found"))) @@ -188,6 +194,22 @@ class MasterService(val master: ActorRef, } } } + + private def mergeConfig(configFile: Option[File], configString: String): Option[File] = { + if (configString == null || configString.isEmpty) { + configFile + } else { + configFile match { + case Some(file) => + Files.write(file.toPath, ("\n" + configString).getBytes(UTF_8), APPEND) + Some(file) + case None => + val file = File.createTempFile("\"userfile_configstring_", ".conf") + Files.write(file.toPath, configString.getBytes(UTF_8), WRITE) + Some(file) + } + } + } } object MasterService {
