[ 
https://issues.apache.org/jira/browse/GEARPUMP-86?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manu Zhang updated GEARPUMP-86:
-------------------------------
    Fix Version/s:     (was: 0.8.1)

> Application manifest definition and usage
> -----------------------------------------
>
>                 Key: GEARPUMP-86
>                 URL: https://issues.apache.org/jira/browse/GEARPUMP-86
>             Project: Apache Gearpump
>          Issue Type: Bug
>          Components: examples, restapi
>    Affects Versions: 0.8.0
>            Reporter: Kam Kasravi
>
> h1. GearPump Application and Library Manifest
> h2. Requirements
> This design is to solve a number of issues related to application jar 
> submission and DAG UI creation and manipulation. The goals are to make it 
> easier or possible to:
>  
> {quote}
> # Submit applications and have them run with zero configuration. Everything 
> required for them to run including environment specific configuration 
> properties is in the jar. Submission may be browser or command based.
> # Decrease the likelihood of DAG failure after submission due to 
> misconfiguration, etc.
> # Manipulate or create a DAG easily in a browser without requiring manual 
> input of Tasks.
> # Query a task repository for specific tasks or for a listing of task types 
> that can be imported / exposed within a DAG Editor.
> # Enable an extensibility mechanism where libraries of tasks may be defined 
> and categorized and made available to tools like DAG builders
> # Provide a way to reduce jar sizes and possibly share jar dependencies 
> across tasks.
> # Allow categorization of tasks as appropriate for types of flow: source, 
> sink, bidirectional, multicast, etc. This information may be used by DAG 
> editors to embellish task icons or filter task selection, etc.
> # Allow a task to be optionally typed with related message types for input 
> and output. This may allows programmatic generation of flows or even 
> programmatic generation of message types. 
> # Allow task grouping which would enable sub-graph imports and labeling. A 
> DAG editor should allow the user to define and package a subgraph. 
> Considering the dynamic DAG usage, user wants to attach a subgraph to the 
> running DAG. But, how is this subgraph being developed, defined and packaged? 
> E.g. a data scientist can develop a new model (subgraph) to read from kafka 
> source and performing scoring. Apparently, when packaging, we need to avoid 
> packaging the kafka source processor into the package. Otherwise, this model 
> will not run as it will use training kafka instead of product kakfa.
> # Allow a task to be distributed and run with a minimal set of dependencies.
> h2. Use Cases
> Currently GearPump applications can be submitted as a jar via the browser. To 
> the browser this jar is opaque. Validation of the application must be done 
> server side, and error details may be needlessly complex, misleading or 
> uninformative to the end user. Additionally, surfacing DAG information prior 
> to submitting the application for possible editing is hopelessly convoluted 
> and would involve: 
> * submitting the jar
> * querying the server to retrieve application details
> * killing the application
> Given the jar may be quite large - having the user wait possibly minutes for 
> upload just to expose the DAG is impractical and limits functionality 
> required to move forward with critical features like DAG creation and 
> editing. Rather we require a mechanism that surfaces application information 
> like its DAG within seconds after selecting the jar on the user's machine. We 
> also want an ability to minimize the size of GearPump applications even when 
> these applications may have a massive set of dependencies. Upcoming use cases 
> may require dependencies that are impractical to resolve using jar inclusion 
> or fat jars. Additionally, the current application jar does not provide 
> information that could allow individual task distribution with specific jar 
> allocations. In other words a 100M application jar defining 5 Tasks would 
> require this jar to be distributed to each GearPump worker even if one of the 
> tasks did nothing but summed 2 fields and sent the result downstream.
> h2. Design
> h3. Manifest details
> h4. Manifest structure
> Central to this design is a manifest specification that describes a GearPump 
> jar. This specification is a JSON file that can be bundled with all GearPump 
> jars. The creation of this manifest will either be generated programmatically 
> or could be built manually. Programmatic generation will be a simple tool or 
> sbt plugin that builds this file as part of application packaging. Similar to 
> node.js's _[package.json|https://docs.npmjs.com/files/package.json]_ file, 
> this file will provide meta-data describing a GearPump application or library 
> including:
> > - manifest version
> > - manifest type
> > - application main
> > - application name
> > - application version
> > - application configuration
> > - processors
> > - graph
> > - dependencies
> > - repos
> > - user name
> > - keywords
> h4. Manifest name
> Within the jar file, a top level entry of the name gearpump.json will 
> indicate that this jar is a GearPump related jar. This file will hold 
> contents noted above and will be validated per its JSON schema included in 
> this document.
> h3. Entries and definitions
> The JSON schema describing the above entries will provide relevant typing 
> that may be structural or categorical. For example the manifest type entry 
> above would be (APPLICATION|LIBRARY). It's anticipated that additional 
> information not specified here will be required. For example a user's 
> organization or perhaps a user's role capability. There could be other 
> security attributes as well. Updates to the JSON schema that include new 
> information will result in a bump in the manifest version to reflect the 
> schema change. The manifest version will adhere to the [semantic versioning 
> specification|http://semver.org/]. It's also anticipated that this manifest 
> will enable DAG creation and tooling related to types of applications that a 
> DAG creation tool may want to expose for example possible types of tasks 
> within general categories (source, sink, bidirectional, multitask etc).
> h3. Dynamic creation of applications
> One impact of providing a manifest is the application main entry noted above 
> in the manifest structure is now optional. Once a manifest is generated as 
> part of the build we could safely remove all application mains - like those 
> that are included under the gearpump examples directory. We **will** require 
> the jar submission REST endpoint to do the equivalent of:
>  - parse the json file 
>  - create a userConfig instance based on the application configuration values
>  - create Processor\[Task\] instances using the tasks information
>  - connect Processor\[Task\] instances based on the graph information and 
> create a Graph instance
>  - create a StreamingApplication instance using the application name, 
> application config and Graph instance
>  - submit the StreamingApplication instance to a ClientContext. 
> In other words we require the REST endpoint receiving the jar file to 
> **programmatically** do something similar to what the WordCount application 
> does below based on the application manifest.
> {code}
> object WordCount extends AkkaApp with ArgumentsParser {
>   private val LOG: Logger = LogUtil.getLogger(getClass)
>   val RUN_FOR_EVER = -1
>   override val options: Array[(String, CLIOption[Any])] = Array(
>     "split" -> CLIOption[Int]("<how many split tasks>", required = false, 
> defaultValue = Some(1)),
>     "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, 
> defaultValue = Some(1))
>   )
>   def application(config: ParseResult) : StreamApplication = {
>     val splitNum = config.getInt("split")
>     val sumNum = config.getInt("sum")
>     val split = Processor[Split](splitNum)
>     val sum = Processor[Sum](sumNum)
>     val partitioner = new HashPartitioner
>     val app = StreamApplication("wordCount", Graph(split ~ partitioner ~> 
> sum), UserConfig.empty)
>     app
>   }
>   override def main(akkaConf: Config, args: Array[String]): Unit = {
>     val config = parse(args)
>     val context = ClientContext(akkaConf)
>     val appId = context.submit(application(config))
>     context.close()
>   }
> }
> {code}
> Note that something like this already exists in GearPump associated with the 
> REST endpoint **/api/v1.0/submitapp**. This endpoint accepts a POST with a 
> JSON structure that is reified into a scala case class shown below
> {code}
> case class SubmitApplicationRequest (
>     appName: String,
>     processors: Map[ProcessorId, ProcessorDescription],
>     dag: Graph[Int, String],
>     userconfig: UserConfig = UserConfig.empty)
> {code}
> This is used to submit the application to the master. The manifest is merely 
> a way of storing the SubmitApplicationRequest with the jar as a JSON file. 
> h3. UI DAG Editor functionality
> h4. Use case: Creating a new DAG
> DAG editors that are browser based will likely address a number of use cases. 
> One key use case is the ability to select a local application jar and create 
> or edit a DAG based on its contents.
> h5. Parsing jar files
> An application jar may be parsed immediately within the browser prior to 
> upload by using [JsZip|https://stuk.github.io/jszip/]. JsZip can be used 
> within any grade A browser or from the command line, allowing tooling to be 
> created in either area. JsZip is also quite fast, parsing a 61M jar in a 
> chrome browser on Mac OS X took 2seconds for JsZip to return all entries. 
> JsZip has a nice set of features:
> * JsZip can read local jars that are selected using the native file dialog
> * JsZip can read remote jar's, allowing importing of Tasks from external 
> repo's other the origin server.
> * JsZip can create jar files, opening up the possibility of saving the 
> results of building an application locally to the users machine. An example 
> of parsing a jar file that had been dropped within a HTML drop zone can be 
> found [here|http://onehungrymind.com/zip-parsing-jszip-angular/]. 
> h6. Security risks
> Parsing a jar that has been selected by a user using a browser's native 
> dialog is safe for several reasons:
> # The jar is already resident on the users computer. 
> # Parsing a jar to read the jar entries using JsZip typically uses modern 
> browser's ArrayBuffer or Uint8Array. Both are intended to deal with binary 
> data and there is no increased risk of buffer overflows.
> # Parsing a jar file from a remote url rather than locally should only come 
> from sanctioned repo's. 
> h3. Manifest Definition
> h4. JSON schema (incomplete)
> see [generator|http://jsonschema.net/#/]
> {code}
> {
>   "$schema": "http://json-schema.org/draft-04/schema#";,
>   "id": "http://jsonschema.net";,
>   "type": "object",
>   "properties": {
>     "manifestVersion": {
>       "id": "http://jsonschema.net/manifestVersion";,
>       "type": "string"
>     },
>     "manifestType": {
>       "enum": ["APPLICATION", "LIBRARY"]
>     },
>     "applicationMain": {
>       "id": "http://jsonschema.net/applicationMain";,
>       "type": "string"
>     },
>     "applicationName": {
>       "id": "http://jsonschema.net/applicationName";,
>       "type": "string"
>     },
>     "applicationConf": {
>       "id": "http://jsonschema.net/processors/11/1/applicationConf";,
>       "type": "object",
>        "properties": {
>             "_config": {
>             "id": "http://jsonschema.net/applicationConf/_config";,
>             "type": "object",
>             "properties": {}
>          }
>      },
>     "processors": {
>       "id": "http://jsonschema.net/processors";,
>       "type": "array",
>       "items": {
>         "id": "http://jsonschema.net/processors/11";,
>         "type": "array",
>         "items": {
>           "id": "http://jsonschema.net/processors/11/1";,
>           "type": "object",
>           "properties": {
>             "id": {
>               "id": "http://jsonschema.net/processors/11/1/id";,
>               "type": "integer"
>             },
>             "taskClass": {
>               "id": "http://jsonschema.net/processors/11/1/taskClass";,
>               "type": "string"
>             },
>             "parallelism": {
>               "id": "http://jsonschema.net/processors/11/1/parallelism";,
>               "type": "integer"
>             },
>             "description": {
>               "id": "http://jsonschema.net/processors/11/1/description";,
>               "type": "string"
>             },
>             "taskConf": {
>               "id": "http://jsonschema.net/processors/11/1/taskConf";,
>               "type": "object",
>               "properties": {
>                 "_config": {
>                   "id": 
> "http://jsonschema.net/processors/11/1/taskConf/_config";,
>                   "type": "object",
>                   "properties": {}
>                 }
>               }
>             },
>             "life": {
>               "id": "http://jsonschema.net/processors/11/1/life";,
>               "type": "object",
>               "properties": {
>                 "birth": {
>                   "id": "http://jsonschema.net/processors/11/1/life/birth";,
>                   "type": "string"
>                 },
>                 "death": {
>                   "id": "http://jsonschema.net/processors/11/1/life/death";,
>                   "type": "string"
>                 }
>               }
>             },
>             "executors": {
>               "id": "http://jsonschema.net/processors/11/1/executors";,
>               "type": "array",
>               "items": {
>                 "id": "http://jsonschema.net/processors/11/1/executors/0";,
>                 "type": "integer"
>               }
>             },
>             "taskCount": {
>               "id": "http://jsonschema.net/processors/11/1/taskCount";,
>               "type": "array",
>               "items": {
>                 "id": "http://jsonschema.net/processors/11/1/taskCount/0";,
>                 "type": "array",
>                 "items": {
>                   "id": "http://jsonschema.net/processors/11/1/taskCount/0/1";,
>                   "type": "object",
>                   "properties": {
>                     "count": {
>                       "id": 
> "http://jsonschema.net/processors/11/1/taskCount/0/1/count";,
>                       "type": "integer"
>                     }
>                   }
>                 }
>               }
>             }
>           }
>         }
>       }
>     },
>     "dag": {
>       "id": "http://jsonschema.net/dag";,
>       "type": "object",
>       "properties": {
>         "vertexList": {
>           "id": "http://jsonschema.net/dag/vertexList";,
>           "type": "array",
>           "items": {
>             "id": "http://jsonschema.net/dag/vertexList/11";,
>             "type": "integer"
>           }
>         },
>         "edgeList": {
>           "id": "http://jsonschema.net/dag/edgeList";,
>           "type": "array",
>           "items": {
>             "id": "http://jsonschema.net/dag/edgeList/14";,
>             "type": "array",
>             "items": {
>               "id": "http://jsonschema.net/dag/edgeList/14/2";,
>               "type": "integer"
>             }
>           }
>         }
>       }
>     },
>     "user": {
>       "id": "http://jsonschema.net/user";,
>       "type": "string"
>     }
>   },
>   "required": [
>     "manifestVersion",
>     "manifestType",
>     "applicationName",
>     "processors",
>     "dag",
>     "user"
>   ]
> }
> {code}
> h3. TODO
> * describe each schema entry
> * provide sample output
> * scrub schema and validate
> * detail how the DAG editor needs to generate or update the gearpump.json 
> file.
> * describe how dependencies are included and how this can reduce jar size
> h3. ISSUES
> * how to attach to an existing DAG using this manifest - maybe define an 
> optional applicationId
> * the scope here is quite large and may need to be broken into a number of 
> design tasks or reference them
> h4. Relevance
> >    - #1450 
> >    - #1437
> h4. Related
> * application specific types like TAP/ATK and types of tasks within an 
> application category
> * akka-streams RunnableGraph which has no concept of serde and cannot be 
> persisted or distributed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to