GEARPUMP-2, Define REST API to submit job jar

Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/7d42d4cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/7d42d4cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/7d42d4cd

Branch: refs/heads/master
Commit: 7d42d4cd7035bce8ba8afed0bf9a5355d03828b4
Parents: 21d5921
Author: Sean Zhong <[email protected]>
Authored: Tue Mar 29 21:40:16 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Tue Apr 26 14:24:00 2016 +0800

----------------------------------------------------------------------
 .../scala/io/gearpump/util/FileDirective.scala  | 52 +++++++++---
 .../scala/io/gearpump/util/FileServer.scala     | 16 +++-
 docs/dev-rest-api.md                            | 88 +++++++++++++++++++-
 .../minicluster/RestClient.scala                | 11 ++-
 services/dashboard/services/restapi.js          | 15 ++--
 .../dashboard/views/apps/submit/submit.html     |  3 +-
 services/dashboard/views/apps/submit/submit.js  |  2 +-
 .../io/gearpump/services/AppMasterService.scala |  5 +-
 .../io/gearpump/services/MasterService.scala    | 78 ++++++++++-------
 9 files changed, 213 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala 
b/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
index f4f82fb..c39d27e 100644
--- a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
+++ b/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
@@ -26,6 +26,7 @@ import akka.http.scaladsl.server.Directives._
 import akka.http.scaladsl.server._
 import akka.stream.Materializer
 import akka.stream.scaladsl.FileIO
+import akka.util.ByteString
 
 import scala.concurrent.{ExecutionContext, Future}
 
@@ -52,12 +53,31 @@ object FileDirective {
    */
   case class FileInfo(originFileName: String, file: File, length: Long)
 
+  class Form(val fields: Map[Name, FormField]) {
+    def getFile(fieldName: String): Option[FileInfo] = {
+      fields.get(fieldName).flatMap {
+        case Left(file) => Option(file)
+        case Right(_) => None
+      }
+    }
+
+    def getValue(fieldName: String): Option[String] = {
+      fields.get(fieldName).flatMap {
+        case Left(_) => None
+        case Right(value) => Option(value)
+      }
+    }
+  }
+
+  type FormField = Either[FileInfo, String]
+
+
   /**
    * directive to uploadFile, it store the uploaded files
    * to temporary directory, and return a Map from form field name
    * to FileInfo.
    */
-  def uploadFile: Directive1[Map[Name, FileInfo]] = {
+  def uploadFile: Directive1[Form] = {
     uploadFileTo(null)
   }
 
@@ -67,16 +87,14 @@ object FileDirective {
    * @param rootDirectory directory to store the files.
    * @return
    */
-  def uploadFileTo(rootDirectory: File): Directive1[Map[Name, FileInfo]] = {
-    Directive[Tuple1[Map[Name, FileInfo]]] { inner =>
+  def uploadFileTo(rootDirectory: File): Directive1[Form] = {
+    Directive[Tuple1[Form]] { inner =>
       extractMaterializer {implicit mat =>
         extractExecutionContext {implicit ec =>
           uploadFileImpl(rootDirectory)(mat, ec) { filesFuture =>
             ctx => {
               filesFuture.map(map => inner(Tuple1(map))).flatMap(route => 
route(ctx))
             }
-
-
           }
         }
       }
@@ -94,10 +112,10 @@ object FileDirective {
     complete(responseEntity)
   }
 
-  private def uploadFileImpl(rootDirectory: File)(implicit mat: Materializer, 
ec: ExecutionContext): Directive1[Future[Map[Name, FileInfo]]] = {
-    Directive[Tuple1[Future[Map[Name, FileInfo]]]] { inner =>
+  private def uploadFileImpl(rootDirectory: File)(implicit mat: Materializer, 
ec: ExecutionContext): Directive1[Future[Form]] = {
+    Directive[Tuple1[Future[Form]]] { inner =>
       entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) =>
-        val fileNameMap = formdata.parts.mapAsync(1) { p =>
+        val form = formdata.parts.mapAsync(1) { p =>
           if (p.filename.isDefined) {
 
             //reserve the suffix
@@ -105,15 +123,23 @@ object FileDirective {
             val written = p.entity.dataBytes.runWith(FileIO.toFile(targetPath))
             written.map(written =>
               if (written.count > 0) {
-                Map(p.name -> FileInfo(p.filename.get, targetPath, 
written.count))
+                Map(p.name -> Left(FileInfo(p.filename.get, targetPath, 
written.count)))
               } else {
-                Map.empty[Name, FileInfo]
+                Map.empty[Name, FormField]
               })
           } else {
-            Future(Map.empty[Name, FileInfo])
+            val valueFuture = 
p.entity.dataBytes.runFold(ByteString.empty){(total, input) =>
+              total ++ input
+            }
+            valueFuture.map{value =>
+              Map(p.name -> Right(value.utf8String))
+            }
           }
-        }.runFold(Map.empty[Name, FileInfo])((set, value) => set ++ value)
-        inner(Tuple1(fileNameMap))
+        }.runFold(new Form(Map.empty[Name, FormField])){(set, value) =>
+          new Form(set.fields ++ value)
+        }
+
+        inner(Tuple1(form))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/daemon/src/main/scala/io/gearpump/util/FileServer.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/util/FileServer.scala 
b/daemon/src/main/scala/io/gearpump/util/FileServer.scala
index 361c01d..4be3f2f 100644
--- a/daemon/src/main/scala/io/gearpump/util/FileServer.scala
+++ b/daemon/src/main/scala/io/gearpump/util/FileServer.scala
@@ -52,8 +52,20 @@ class FileServer(system: ActorSystem, host: String, port: 
Int = 0, rootDirectory
 
   val route: Route = {
     path("upload") {
-      uploadFileTo(rootDirectory) { fileMap =>
-        complete(fileMap.head._2.file.getName)
+      uploadFileTo(rootDirectory) { form =>
+        val fileName = form.fields.headOption.flatMap{pair =>
+          val (_, fileInfo) = pair
+          fileInfo match {
+            case Left(file) => Option(file.file).map(_.getName)
+            case Right(_) => None
+          }
+        }
+
+        if (fileName.isDefined) {
+          complete(fileName.get)
+        } else {
+          failWith(new Exception("File not found in the uploaded form"))
+        }
       }
     } ~
       path("download") {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/docs/dev-rest-api.md
----------------------------------------------------------------------
diff --git a/docs/dev-rest-api.md b/docs/dev-rest-api.md
index 36eeb0b..e4b8793 100644
--- a/docs/dev-rest-api.md
+++ b/docs/dev-rest-api.md
@@ -304,6 +304,92 @@ Sample Response:
 }
 ```
 
+### POST api/v1.0/master/submitapp
+Submit a streaming job jar to Gearpump cluster. It functions like command line
+```
+gear app -jar xx.jar -conf yy.conf -executors 1 <command line arguments>
+```
+
+Required MIME type: "multipart/form-data"
+
+Required post form fields:
+
+1. field name "jar", job jar file.
+
+Optional post form fields:
+
+1. "configfile", configuration file, in UTF8 format.
+2. "configstring", text body of configuration file, in UTF8 format.
+3. "executorcount", The count of JVM process to start across the cluster for 
this application job
+4. "args", command line arguments for this job jar.
+
+Example html:
+
+```bash
+<form id="submitapp" action="http://127.0.0.1:8090/api/v1.0/master/submitapp";
+method="POST" enctype="multipart/form-data">
+ 
+Job Jar (*.jar) [Required]:  <br/>
+<input type="file" name="jar"/> <br/> <br/>
+ 
+Config file (*.conf) [Optional]:  <br/>
+<input type="file" name="configfile"/> <br/>  <br/>
+ 
+Config String, Config File in string format. [Optional]: <br/>
+<input type="text" name="configstring" value="a.b.c.d=1"/> <br/><br/>
+ 
+Executor count (integer, how many process to start for this streaming job) 
[Optional]: <br/>
+<input type="text" name="executorcount" value="1"/> <br/><br/>
+ 
+Application arguments (String) [Optional]: <br/>
+<input type="text" name="args" value=""/> <br/><br/>
+ 
+<input type="submit" value="Submit"/>
+ 
+</table>
+ 
+</form>
+```
+
+### POST api/v1.0/master/submitstormapp
+Submit a storm jar to Gearpump cluster. It functions like command line
+```
+storm app -jar xx.jar -conf yy.yaml <command line arguments>
+```
+
+Required MIME type: "multipart/form-data"
+
+Required post form fields:
+
+1. field name "jar", job jar file.
+
+Optional post form fields:
+
+1. "configfile", .yaml configuration file, in UTF8 format.
+2. "args", command line arguments for this job jar.
+
+Example html:
+
+```bash
+<form id="submitstormapp" 
action="http://127.0.0.1:8090/api/v1.0/master/submitstormapp";
+method="POST" enctype="multipart/form-data">
+ 
+Job Jar (*.jar) [Required]:  <br/>
+<input type="file" name="jar"/> <br/> <br/>
+ 
+Config file (*.yaml) [Optional]:  <br/>
+<input type="file" name="configfile"/> <br/>  <br/>
+
+Application arguments (String) [Optional]: <br/>
+<input type="text" name="args" value=""/> <br/><br/>
+ 
+<input type="submit" value="Submit"/>
+ 
+</table>
+ 
+</form>
+```
+
 ## Worker service
 
 ### GET api/v1.0/worker/&lt;workerId&gt;
@@ -952,4 +1038,4 @@ Sample Response:
   ],
   "jvmName": "21304@lisa"
 }
-```
\ No newline at end of file
+```

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
index 2657bb3..f11ff9c 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
@@ -89,13 +89,18 @@ class RestClient(host: String, port: Int) {
 
   def submitApp(jar: String, executorNum: Int, args: String = "", config: 
String = ""): Boolean = try {
     var endpoint = "master/submitapp"
-    if (args.length > 0) {
-      endpoint += s"?executorNum=${executorNum}&args=" + 
Util.encodeUriComponent(args)
-    }
+
     var options = Seq(s"jar=@$jar")
     if (config.length > 0) {
       options :+= s"conf=@$config"
     }
+
+    options :+= s"executorcount=$executorNum"
+
+    if (args != null && !args.isEmpty) {
+      options :+= "args=\"" + args + "\""
+    }
+
     val resp = callApi(endpoint, options.map("-F " + _).mkString(" "))
     val result = decodeAs[AppSubmissionResult](resp)
     assert(result.success)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/services/dashboard/services/restapi.js
----------------------------------------------------------------------
diff --git a/services/dashboard/services/restapi.js 
b/services/dashboard/services/restapi.js
index d52d89d..f9cdceb 100644
--- a/services/dashboard/services/restapi.js
+++ b/services/dashboard/services/restapi.js
@@ -118,9 +118,9 @@ angular.module('dashboard')
         },
 
         /** Submit an user defined application with user configuration */
-        submitUserApp: function(files, formFormNames, executorNum, args, 
onComplete) {
+        submitUserApp: function(files, fileFieldNames, executorNum, args, 
onComplete) {
           return self._submitApp(restapiV1Root + 'master/submitapp',
-            files, formFormNames, executorNum, args, onComplete);
+            files, fileFieldNames, executorNum, args, onComplete);
         },
 
         /** Submit a Storm application */
@@ -129,13 +129,16 @@ angular.module('dashboard')
             files, formFormNames, executorNum, args, onComplete);
         },
 
-        _submitApp: function(url, files, formFormNames, executorNum, args, 
onComplete) {
-          var params = '?executorNum=' + executorNum + '&args=' + 
encodeURIComponent(args);
+        _submitApp: function(url, files, fileFieldNames, executorNum, args, 
onComplete) {
           var upload = Upload.upload({
-            url: url + params,
+            url: url,
             method: 'POST',
             file: files,
-            fileFormDataName: formFormNames
+            fileFormDataName: fileFieldNames,
+            fields: {
+              "executorcount": executorNum,
+              "args": args
+            }
           });
 
           upload.then(function(response) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/services/dashboard/views/apps/submit/submit.html
----------------------------------------------------------------------
diff --git a/services/dashboard/views/apps/submit/submit.html 
b/services/dashboard/views/apps/submit/submit.html
index bc4af98..001a75a 100644
--- a/services/dashboard/views/apps/submit/submit.html
+++ b/services/dashboard/views/apps/submit/submit.html
@@ -41,7 +41,8 @@
               ng-model="conf" 
accept-pattern="{{confFileSuffix}}"></form-control>
             <!-- input 3 -->
             <form-control
-              type="integer" min="1" label="Executor Number" 
ng-hide="isStormApp"
+              type="integer" min="1" label="Executor Count" 
ng-hide="isStormApp"
+              help="How many JVM processes to start for this job in the whole 
cluster. E.g. set it as 12 will start 12 executor processes spanning across the 
cluster"
               ng-model="executorNum"></form-control>
             <!-- input 4 -->
             <form-control

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/services/dashboard/views/apps/submit/submit.js
----------------------------------------------------------------------
diff --git a/services/dashboard/views/apps/submit/submit.js 
b/services/dashboard/views/apps/submit/submit.js
index 5e967f5..23c806f 100644
--- a/services/dashboard/views/apps/submit/submit.js
+++ b/services/dashboard/views/apps/submit/submit.js
@@ -27,7 +27,7 @@ angular.module('dashboard')
         var fileFormNames = ['jar'];
         if ($scope.conf) {
           files.push($scope.conf);
-          fileFormNames.push('conf');
+          fileFormNames.push('configfile');
         }
         $scope.uploading = true;
         submitFn(files, fileFormNames, $scope.executorNum, $scope.launchArgs, 
function(response) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala 
b/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala
index 50b2ede..98ce0b1 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala
@@ -67,8 +67,9 @@ class AppMasterService(val master: ActorRef,
         val msg = java.net.URLDecoder.decode(args)
         val dagOperation = read[DAGOperation](msg)
         (post & entity(as[Multipart.FormData])) { _ =>
-        uploadFile { fileMap =>
-          val jar = fileMap.get("jar").map(_.file)
+        uploadFile { form =>
+          val jar = form.getFile("jar").map(_.file)
+
           if (jar.nonEmpty) {
             dagOperation match {
               case replace: ReplaceProcessor =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/7d42d4cd/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala 
b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
index dd0e719..3e2b9ca 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
@@ -20,6 +20,9 @@
 package io.gearpump.services
 
 import java.io.{File, IOException}
+import java.nio.file.Files
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.StandardOpenOption.{WRITE, APPEND}
 
 import akka.actor.{ActorRef, ActorSystem}
 import akka.http.scaladsl.server.Directives._
@@ -109,38 +112,41 @@ class MasterService(val master: ActorRef,
     } ~
     path("submitapp") {
       post {
-        parameters('executorNum.as[Int] ? 1, 'args ? "") { (executorNum, args) 
=>
-          uploadFile { fileMap =>
-            val jar = fileMap.get("jar").map(_.file)
-            val userConf = fileMap.get("conf").map(_.file)
-            onComplete(Future(
-              MasterService.submitGearApp(jar, executorNum, args, 
systemConfig, userConf)
-            )) {
-              case Success(success) =>
-                val response = MasterService.AppSubmissionResult(success)
-                complete(write(response))
-              case Failure(ex) =>
-                failWith(ex)
-            }
+        uploadFile { form =>
+          val jar = form.getFile("jar").map(_.file)
+          val configFile = form.getFile("configfile").map(_.file)
+          val configString = form.getValue("configstring").getOrElse("")
+          val executorCount = 
form.getValue("executorcount").getOrElse("1").toInt
+          val args = form.getValue("args").getOrElse("")
+
+          val mergedConfigFile = mergeConfig(configFile, configString)
+
+          onComplete(Future(
+            MasterService.submitGearApp(jar, executorCount, args, 
systemConfig, mergedConfigFile)
+          )) {
+            case Success(success) =>
+              val response = MasterService.AppSubmissionResult(success)
+              complete(write(response))
+            case Failure(ex) =>
+              failWith(ex)
           }
         }
       }
     } ~
     path("submitstormapp") {
       post {
-        parameters('executorNum.as[Int] ? 1, 'args ? "") { (executorNum, args) 
=>
-          uploadFile { fileMap =>
-            val jar = fileMap.get("jar").map(_.file)
-            val stormConf = fileMap.get("conf").map(_.file)
-            onComplete(Future(
-              MasterService.submitStormApp(jar, stormConf, args, systemConfig)
-            )) {
-              case Success(success) =>
-                val response = MasterService.AppSubmissionResult(success)
-                complete(write(response))
-              case Failure(ex) =>
-                failWith(ex)
-            }
+        uploadFile { form =>
+          val jar = form.getFile("jar").map(_.file)
+          val configFile = form.getFile("configfile").map(_.file)
+          val args = form.getValue("args").getOrElse("")
+          onComplete(Future(
+            MasterService.submitStormApp(jar, configFile, args, systemConfig)
+          )) {
+            case Success(success) =>
+              val response = MasterService.AppSubmissionResult(success)
+              complete(write(response))
+            case Failure(ex) =>
+              failWith(ex)
           }
         }
       }
@@ -171,8 +177,8 @@ class MasterService(val master: ActorRef,
       }
     } ~
     path("uploadjar") {
-      uploadFile { fileMap =>
-        val jar = fileMap.get("jar").map(_.file)
+      uploadFile { form =>
+        val jar = form.getFile("jar").map(_.file)
         if (jar.isEmpty) {
           complete(write(
             MasterService.Status(success = false, reason = "Jar file not 
found")))
@@ -188,6 +194,22 @@ class MasterService(val master: ActorRef,
       }
     }
   }
+
+  private def mergeConfig(configFile: Option[File], configString: String): 
Option[File] = {
+    if (configString == null || configString.isEmpty) {
+      configFile
+    } else {
+      configFile match {
+        case Some(file) =>
+          Files.write(file.toPath, ("\n" + configString).getBytes(UTF_8), 
APPEND)
+          Some(file)
+        case None =>
+          val file = File.createTempFile("\"userfile_configstring_", ".conf")
+          Files.write(file.toPath, configString.getBytes(UTF_8), WRITE)
+          Some(file)
+      }
+    }
+  }
 }
 
 object MasterService {

Reply via email to