Repository: incubator-s2graph
Updated Branches:
  refs/heads/master b21db657e -> b8f990504


merge mailing list and PR144 to README.md


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/f4312882
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/f4312882
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/f4312882

Branch: refs/heads/master
Commit: f43128827080e6bdd0eb93b42ba978a4793961c8
Parents: 245335a
Author: DO YUNG YOON <[email protected]>
Authored: Fri Mar 30 11:54:19 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Fri Mar 30 11:54:19 2018 +0900

----------------------------------------------------------------------
 s2jobs/README.md | 318 ++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 318 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f4312882/s2jobs/README.md
----------------------------------------------------------------------
diff --git a/s2jobs/README.md b/s2jobs/README.md
new file mode 100644
index 0000000..e201ad6
--- /dev/null
+++ b/s2jobs/README.md
@@ -0,0 +1,318 @@
+
+## S2Jobs
+
+S2Jobs is a collection of spark programs that connect S2Graph `WAL` to other 
systems.
+
+
+## Background
+
+By default, S2Graph publish all incoming data as `WAL` to Apache Kafka for 
users who want to subscribe `WAL`.
+
+There are many use cases of this `WAL`, but let's just start with simple 
example, such as **finding out the number of new edges created per minute(OLAP 
query).**
+
+One possible way is run full table scan on HBase using API, then group by each 
edge's `createdAt` property value, then count number of edges per each 
`createdAt` bucket, in this case minute. 
+
+Running full table scan on HBase through RegionServer on same cluster that is 
serving lots of concurrent OLTP requests is prohibit, arguably.
+
+Instead one can subscribe `WAL` from kafka, and sink `WAL` into HDFS, which 
usually separate hadoop cluster from the cluster which run HBase region server 
for OLTP requests.
+
+Once `WAL` is available in separate cluster as file, by default the Spark 
DataFrame, answering above question becomes very easy with spark sql. 
+
+```
+select         MINUTE(timestamp), count(1) 
+from           wal
+where          operation = 'insert'
+and            timestamp between (${start_ts}, ${end_ts})
+```
+
+Above approach works, but there is usually few minutes of lag. If user want to 
reduce this lag, then it is also possible to subscribe `WAL` from kafka then 
ingest data into analytics platform such as Druid. 
+
+S2Jobs intentionaly provide only interfaces and very basic implementation for 
connecting `WAL` to other system. It is up to users what system they would use 
for `WAL` and S2Jobs want the community to contribute this as they leverage 
S2Graph `WAL`.
+
+## Basic Architecture
+
+One simple example data flow would look like following.
+
+<img width="1222" alt="screen shot 2018-03-29 at 3 04 21 pm" 
src="https://user-images.githubusercontent.com/1264825/38072702-84ef93dc-3362-11e8-9f47-db41f50467f0.png";>
+
+Most of spark program available on S2jobs follow following abstraction.
+
+### Task
+`Process class` ? `Task trait` ? `TaskConf`?
+
+### Current Supported Task
+
+### Source
+
+-   kakfa : built-in 
+-   file : built-in
+-   hive : built-in
+
+### Process
+
+-   sql : process spark sql
+-   custom : implement if necessary
+
+### Sink
+
+-   kafka : built-in
+    
+-   file : built-in
+    
+-   es : elasticsearch-spark
+    
+-   **s2graph** : added
+    
+    -   Use the mutateElement function of the S2graph object.
+    -   S2graph related setting is required.
+    -   put the config file in the classpath or specify it in the job 
description options.
+    
+    ```
+    ex)
+        "type": "s2graph",
+        "options": {
+          "hbase.zookeeper.quorum": "",
+          "db.default.driver": "",
+          "db.default.url": ""
+        }
+    
+    ```
+
+#### Data Schema for Kafka
+
+When using Kafka as data source consumer needs to parse it and later on 
interpret it, because of Kafka has no schema.
+
+When reading data from Kafka with structure streaming, the Dataframe has the 
following schema.
+
+```
+Column    Type
+key        binary
+value    binary
+topic    string
+partition    int
+offset    long
+timestamp    long
+timestampType    int
+
+```
+
+In the case of JSON format, data schema can be supported in config.  
+You can create a schema by giving a string representing the struct type as 
JSON as shown below.
+
+```
+{
+  "type": "struct",
+  "fields": [
+    {
+      "name": "timestamp",
+      "type": "long",
+      "nullable": false,
+      "metadata": {}
+    },
+    {
+      "name": "operation",
+      "type": "string",
+      "nullable": true,
+      "metadata": {}
+    },
+    {
+      "name": "elem",
+      "type": "string",
+      "nullable": true,
+      "metadata": {}
+    },
+    {
+      "name": "from",
+      "type": "string",
+      "nullable": true,
+      "metadata": {}
+    },
+    {
+      "name": "to",
+      "type": "string",
+      "nullable": true,
+      "metadata": {}
+    },
+    {
+      "name": "label",
+      "type": "string",
+      "nullable": true,
+      "metadata": {}
+    },
+    {
+      "name": "service",
+      "type": "string",
+      "nullable": true,
+      "metadata": {}
+    },
+    {
+      "name": "props",
+      "type": "string",
+      "nullable": true,
+      "metadata": {}
+    }
+  ]
+}
+
+```
+
+
+----------
+
+### Job Description
+
+**Tasks** and **workflow** can be described in **job** description, and 
dependencies between tasks are defined by the name of the task specified in the 
inputs field
+
+>Note that this works was influenced by [airstream of 
Airbnb](https://www.slideshare.net/databricks/building-data-product-based-on-apache-spark-at-airbnb-with-jingwei-lu-and-liyin-tang).
+
+#### Json Spec
+
+```
+{
+    "name": "JOB_NAME",
+    "source": [
+        {
+            "name": "TASK_NAME",
+            "inputs": [],
+            "type": "SOURCE_TYPE",
+            "options": {
+                "KEY" : "VALUE"
+            }
+        }
+    ],
+    "process": [
+        {
+            "name": "TASK_NAME",
+            "inputs": ["INPUT_TASK_NAME"],
+            "type": "PROCESS_TYPE",
+            "options": {
+                "KEY" : "VALUE"
+            }
+        }
+    ],
+    "sink": [
+        {
+            "name": "TASK_NAME",
+            "inputs": ["INPUT_TASK_NAME"],
+            "type": "SINK_TYPE",
+            "options": {
+                "KEY" : "VALUE"
+            }
+        }
+    ]
+}
+
+```
+
+
+----------
+
+
+### Sample job
+
+#### 1. wallog trasnform (kafka to kafka)
+
+```
+{
+    "name": "kafkaJob",
+    "source": [
+        {
+            "name": "wal",
+            "inputs": [],
+            "type": "kafka",
+            "options": {
+                "kafka.bootstrap.servers" : "localhost:9092",
+                "subscribe": "s2graphInJson",
+                "maxOffsetsPerTrigger": "10000",
+                "format": "json",
+                "schema": 
"{\"type\":\"struct\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"nullable\":false,\"metadata\":{}},{\"name\":\"operation\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"elem\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"from\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"to\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"label\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"service\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"props\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"
+            }
+        }
+    ],
+    "process": [
+        {
+            "name": "transform",
+            "inputs": ["wal"],
+            "type": "sql",
+            "options": {
+                "sql": "SELECT timestamp, `from` as userId, to as itemId, 
label as action FROM wal WHERE label = 'user_action'"
+            }
+        }
+    ],
+    "sink": [
+        {
+            "name": "kafka_sink",
+            "inputs": ["transform"],
+            "type": "kafka",
+            "options": {
+                "kafka.bootstrap.servers" : "localhost:9092",
+                "topic": "s2graphTransform",
+                "format": "json"
+            }
+        }
+    ]
+}
+
+```
+
+#### 2. wallog transform (hdfs to hdfs)
+
+```
+{
+    "name": "hdfsJob",
+    "source": [
+        {
+            "name": "wal",
+            "inputs": [],
+            "type": "file",
+            "options": {
+                "paths": "/wal",
+                "format": "parquet"
+            }
+        }
+    ],
+    "process": [
+        {
+            "name": "transform",
+            "inputs": ["wal"],
+            "type": "sql",
+            "options": {
+                "sql": "SELECT timestamp, `from` as userId, to as itemId, 
label as action FROM wal WHERE label = 'user_action'"
+            }
+        }
+    ],
+    "sink": [
+        {
+            "name": "hdfs_sink",
+            "inputs": ["transform"],
+            "type": "file",
+            "options": {
+                "path": "/wal_transform",
+                "format": "json"
+            }
+        }
+    ]
+}
+
+```
+
+
+----------
+
+
+### Launch Job
+
+When submitting spark job with assembly jar, use these parameters with the job 
description file path.  
+(currently only support file type)
+
+```
+// main class : org.apache.s2graph.s2jobs.JobLauncher
+Usage: run [file|db] [options]
+  -n, --name <value>      job display name
+Command: file [options]
+get config from file
+  -f, --confFile <file>   configuration file
+Command: db [options]
+get config from db
+  -i, --jobId <jobId>     configuration file
+```
\ No newline at end of file

Reply via email to