update README.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/0c9f74fd Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/0c9f74fd Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/0c9f74fd Branch: refs/heads/master Commit: 0c9f74fda42aa7b319cc3b11c2f504d5d799191e Parents: f431288 Author: DO YUNG YOON <[email protected]> Authored: Fri Apr 6 19:08:16 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Fri Apr 6 20:25:45 2018 +0900 ---------------------------------------------------------------------- s2jobs/README.md | 237 +++++++++++++++++++++++++++++--------------------- 1 file changed, 139 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0c9f74fd/s2jobs/README.md ---------------------------------------------------------------------- diff --git a/s2jobs/README.md b/s2jobs/README.md index e201ad6..f79abb7 100644 --- a/s2jobs/README.md +++ b/s2jobs/README.md @@ -1,84 +1,127 @@ -## S2Jobs -S2Jobs is a collection of spark programs that connect S2Graph `WAL` to other systems. + +# S2Jobs +S2Jobs is a collection of spark programs which can be used to support `online transaction processing(OLAP)` on S2Graph. -## Background +There are currently two ways to run `OLAP` on S2Graph. -By default, S2Graph publish all incoming data as `WAL` to Apache Kafka for users who want to subscribe `WAL`. -There are many use cases of this `WAL`, but let's just start with simple example, such as **finding out the number of new edges created per minute(OLAP query).** +---------- + + +## 1. HBase Snapshots + +HBase provides excellent support for creating table [snapshot](http://hbase.apache.org/0.94/book/ops.snapshots.html) -One possible way is run full table scan on HBase using API, then group by each edge's `createdAt` property value, then count number of edges per each `createdAt` bucket, in this case minute. +S2Jobs provide `S2GraphSource` class which can create `Spark DataFrame` from `S2Edge/S2Vertex` stored in HBase Snapshot. -Running full table scan on HBase through RegionServer on same cluster that is serving lots of concurrent OLTP requests is prohibit, arguably. +Instead of providing graph algorithms such as `PageRank` by itself, S2Graph let users connect graph stored in S2Graph to their favorite analytics platform, for example [**`Apache Spark`**](https://spark.apache.org/). -Instead one can subscribe `WAL` from kafka, and sink `WAL` into HDFS, which usually separate hadoop cluster from the cluster which run HBase region server for OLTP requests. +Once user finished processing, S2Jobs provide `S2GraphSink` to connect analyzed data into S2Graph back. -Once `WAL` is available in separate cluster as file, by default the Spark DataFrame, answering above question becomes very easy with spark sql. + + + +This architecture seems complicated at the first glace, but note that this approach has lots of advantages on performance and stability on `OLTP` cluster especially comparing to using HBase client API `Scan`. + +Here is result `DataFrame` schema for `S2Vertex` and `S2Edge`. ``` -select MINUTE(timestamp), count(1) -from wal -where operation = 'insert' -and timestamp between (${start_ts}, ${end_ts}) +S2Vertex +root + |-- timestamp: long (nullable = false) + |-- operation: string (nullable = false) + |-- elem: string (nullable = false) + |-- id: string (nullable = false) + |-- service: string (nullable = false) + |-- column: string (nullable = false) + |-- props: string (nullable = false) + +S2Edge +root + |-- timestamp: long (nullable = false) + |-- operation: string (nullable = false) + |-- elem: string (nullable = false) + |-- from: string (nullable = false) + |-- to: string (nullable = false) + |-- label: string (nullable = false) + |-- props: string (nullable = false) + |-- direction: string (nullable = true) ``` -Above approach works, but there is usually few minutes of lag. If user want to reduce this lag, then it is also possible to subscribe `WAL` from kafka then ingest data into analytics platform such as Druid. +To run graph algorithm, transform above `DataFrame` into [GraphFrames](https://graphframes.github.io/index.html), then run provided functionality on `GraphFrames`. -S2Jobs intentionaly provide only interfaces and very basic implementation for connecting `WAL` to other system. It is up to users what system they would use for `WAL` and S2Jobs want the community to contribute this as they leverage S2Graph `WAL`. +Lastly, `S2GraphSource` and `S2GraphSink` open two interface `GraphElementReadable` and `GraphElementWritable` for users who want to serialize/deserialize custom graph from/to S2Graph. -## Basic Architecture +For example, one can simply implement `RDFTsvFormatReader` to convert each triple on RDF file to `S2Edge/S2Vertex` then use it in `S2GraphSource`'s `toDF` method to create `DataFrame` from RDF. -One simple example data flow would look like following. +This comes very handily when there are many different data sources with different formats to migrate into S2Graph. -<img width="1222" alt="screen shot 2018-03-29 at 3 04 21 pm" src="https://user-images.githubusercontent.com/1264825/38072702-84ef93dc-3362-11e8-9f47-db41f50467f0.png"> -Most of spark program available on S2jobs follow following abstraction. +## 2. `WAL` log on Kafka -### Task -`Process class` ? `Task trait` ? `TaskConf`? +By default, S2Graph publish all incoming data into Kafka, and users subscribe this for **incremental processing**. -### Current Supported Task +S2jobs provide programs to process `stream` for incremental processing, using [Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html), which provide a great way to express streaming computation the same way as a batch computation. -### Source +The `Job` in S2Jobs abstract one spark and `Job` consist of multiple `Task`s. Think `Job` as very simple `workflow` and there are `Source`, `Process`, `Sink` subclass that implement `Task` interface. -- kakfa : built-in -- file : built-in -- hive : built-in +---------- +### 2.1. Job Description -### Process +**Tasks** and **workflow** can be described in **Job** description, and dependencies between tasks are defined by the name of the task specified in the inputs field -- sql : process spark sql -- custom : implement if necessary +>Note that these works were influenced by [airstream of Airbnb](https://www.slideshare.net/databricks/building-data-product-based-on-apache-spark-at-airbnb-with-jingwei-lu-and-liyin-tang). -### Sink +#### Json Spec -- kafka : built-in - -- file : built-in - -- es : elasticsearch-spark - -- **s2graph** : added - - - Use the mutateElement function of the S2graph object. - - S2graph related setting is required. - - put the config file in the classpath or specify it in the job description options. - - ``` - ex) - "type": "s2graph", - "options": { - "hbase.zookeeper.quorum": "", - "db.default.driver": "", - "db.default.url": "" +```js +{ + "name": "JOB_NAME", + "source": [ + { + "name": "TASK_NAME", + "inputs": [], + "type": "SOURCE_TYPE", + "options": { + "KEY" : "VALUE" + } } - - ``` + ], + "process": [ + { + "name": "TASK_NAME", + "inputs": ["INPUT_TASK_NAME"], + "type": "PROCESS_TYPE", + "options": { + "KEY" : "VALUE" + } + } + ], + "sink": [ + { + "name": "TASK_NAME", + "inputs": ["INPUT_TASK_NAME"], + "type": "SINK_TYPE", + "options": { + "KEY" : "VALUE" + } + } + ] +} + +``` +---------- + +### 2.2. Current supported `Task`s. -#### Data Schema for Kafka +#### Source + +- KafkaSource: Built-in from Spark. + +##### Data Schema for Kafka When using Kafka as data source consumer needs to parse it and later on interpret it, because of Kafka has no schema. @@ -156,61 +199,57 @@ You can create a schema by giving a string representing the struct type as JSON ``` +- FileSource: Built-in from Spark. +- HiveSource: Built-in from Spark. +- S2GraphSource + - HBaseSnapshot read, then create DataFrame. See HBaseSnapshot in this document. + - Example options for `S2GraphSource` are following(reference examples for details). + +```js +{ + "type": "s2graph", + "options": { + "hbase.zookeeper.quorum": "localhost", + "db.default.driver": "com.mysql.jdbc.Driver", + "db.default.url": "jdbc:mysql://localhost:3306/graph_dev", + "db.default.user": "graph", + "db.default.password": "graph", + "hbase.rootdir": "/hbase", + "restore.path": "/tmp/restore_hbase", + "hbase.table.names": "movielens-snapshot" + } +} +``` ----------- -### Job Description +#### Process +- SqlProcess : process spark sql +- custom : implement if necessary -**Tasks** and **workflow** can be described in **job** description, and dependencies between tasks are defined by the name of the task specified in the inputs field +#### Sink ->Note that this works was influenced by [airstream of Airbnb](https://www.slideshare.net/databricks/building-data-product-based-on-apache-spark-at-airbnb-with-jingwei-lu-and-liyin-tang). +- KafkaSink : built-in from Spark. +- FileSink : built-in from Spark. +- HiveSink: buit-in from Spark. +- ESSink : elasticsearch-spark +- **S2GraphSink** + - writeBatchBulkload: build `HFile` directly, then load it using `LoadIncrementalHFiles` from HBase. + - writeBatchWithMutate: use the `mutateElement` function of the S2graph object. -#### Json Spec -``` -{ - "name": "JOB_NAME", - "source": [ - { - "name": "TASK_NAME", - "inputs": [], - "type": "SOURCE_TYPE", - "options": { - "KEY" : "VALUE" - } - } - ], - "process": [ - { - "name": "TASK_NAME", - "inputs": ["INPUT_TASK_NAME"], - "type": "PROCESS_TYPE", - "options": { - "KEY" : "VALUE" - } - } - ], - "sink": [ - { - "name": "TASK_NAME", - "inputs": ["INPUT_TASK_NAME"], - "type": "SINK_TYPE", - "options": { - "KEY" : "VALUE" - } - } - ] -} - -``` ---------- -### Sample job +The very basic pipeline can be illustrated in the following figure. -#### 1. wallog trasnform (kafka to kafka) + + + +# Job Examples + +## 1. `WAL` log trasnform (kafka to kafka) ``` { @@ -255,7 +294,7 @@ You can create a schema by giving a string representing the struct type as JSON ``` -#### 2. wallog transform (hdfs to hdfs) +## 2. `WAL` log transform (HDFS to HDFS) ``` { @@ -300,7 +339,7 @@ You can create a schema by giving a string representing the struct type as JSON ---------- -### Launch Job +## Launch Job When submitting spark job with assembly jar, use these parameters with the job description file path. (currently only support file type) @@ -315,4 +354,6 @@ get config from file Command: db [options] get config from db -i, --jobId <jobId> configuration file -``` \ No newline at end of file +``` + +
