- add s2jobs subproject.
- migrate bulk loader with spark 2.3.0.
- add test cases for bulk loader.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/db7f0191
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/db7f0191
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/db7f0191

Branch: refs/heads/master
Commit: db7f01914073b8f56f64c243d797ee538c1ffc51
Parents: aeaff3f
Author: DO YUNG YOON <[email protected]>
Authored: Tue Mar 6 15:01:50 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Tue Mar 6 16:02:41 2018 +0900

----------------------------------------------------------------------
 CHANGES                                         |   1 +
 build.sbt                                       |  11 +-
 loader/build.sbt                                |   5 +-
 loader/loader.py                                |  85 +-
 .../s2graph/loader/spark/HBaseContext.scala     |  12 +-
 .../loader/spark/HBaseRDDFunctions.scala        |   3 +-
 .../loader/subscriber/GraphSubscriber.scala     |  10 +
 .../loader/subscriber/TransferToHFile.scala     | 278 ++++--
 .../loader/subscriber/TransferToHFileTest.scala | 232 +++++
 project/Common.scala                            |   1 +
 .../s2graph/core/GraphElementBuilder.scala      |   2 +-
 .../org/apache/s2graph/core/JSONParser.scala    |   3 +-
 .../s2graph/core/mysqls/ServiceColumn.scala     |   4 +-
 .../apache/s2graph/core/storage/SKeyValue.scala |   4 +
 .../s2graph/core/storage/StorageSerDe.scala     |   2 +-
 .../hbase/AsynchbaseStorageManagement.scala     |  16 +-
 s2jobs/build.sbt                                |  57 ++
 s2jobs/loader.py                                | 153 ++++
 .../hbase/mapreduce/GraphHFileOutputFormat.java | 169 ++++
 .../apache/s2graph/s2jobs/S2GraphHelper.scala   |  31 +
 .../s2jobs/loader/GraphFileGenerator.scala      |  42 +
 .../s2jobs/loader/GraphFileOptions.scala        | 138 +++
 .../s2graph/s2jobs/loader/HFileGenerator.scala  | 222 +++++
 .../s2jobs/loader/HFileMRGenerator.scala        | 161 ++++
 .../s2jobs/loader/RawFileGenerator.scala        |  51 ++
 .../s2jobs/spark/BulkLoadPartitioner.scala      |  56 ++
 .../s2jobs/spark/FamilyHFileWriteOptions.scala  |  35 +
 .../s2graph/s2jobs/spark/HBaseContext.scala     | 850 +++++++++++++++++++
 .../s2jobs/spark/HBaseDStreamFunctions.scala    | 158 ++++
 .../s2jobs/spark/HBaseRDDFunctions.scala        | 206 +++++
 .../s2graph/s2jobs/spark/JavaHBaseContext.scala | 342 ++++++++
 .../s2jobs/spark/KeyFamilyQualifier.scala       |  46 +
 .../s2graph/s2jobs/S2GraphHelperTest.scala      |  24 +
 .../s2jobs/loader/GraphFileGeneratorTest.scala  | 256 ++++++
 s2rest_play/conf/application.conf               |   2 +-
 35 files changed, 3546 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b54aaeb..1f72a38 100644
--- a/CHANGES
+++ b/CHANGES
@@ -39,6 +39,7 @@ Release Notes - S2Graph - Version 0.2.0
     * [S2GRAPH-168] - Fix args order mismatch when use addServiceColumnProp
     * [S2GRAPH-176] - Fix compile error on LabelMeta
     * [S2GRAPH-179] - Add defaultValue on ColumnMeta
+    * [S2GRAPH-178] - Fix null pointer error on BulkLoader
 
 ** Improvement
     * [S2GRAPH-72] - Support Apache TinkerPop and Gremlin

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/build.sbt
----------------------------------------------------------------------
diff --git a/build.sbt b/build.sbt
index cdabfbf..af6fbd7 100755
--- a/build.sbt
+++ b/build.sbt
@@ -56,8 +56,8 @@ lazy val s2core = project.settings(commonSettings: _*)
 
 lazy val spark = project.settings(commonSettings: _*)
 
-lazy val loader = project.dependsOn(s2core, spark)
-  .settings(commonSettings: _*)
+//lazy val loader = project.dependsOn(s2core, spark)
+//  .settings(commonSettings: _*)
 
 lazy val s2counter_core = project.dependsOn(s2core)
   .settings(commonSettings: _*)
@@ -65,12 +65,15 @@ lazy val s2counter_core = project.dependsOn(s2core)
 lazy val s2counter_loader = project.dependsOn(s2counter_core, spark)
   .settings(commonSettings: _*)
 
+lazy val s2jobs = project.dependsOn(s2core)
+  .settings(commonSettings: _*)
+
 lazy val s2graph_gremlin = project.dependsOn(s2core)
   .settings(commonSettings: _*)
 
 lazy val root = (project in file("."))
   .aggregate(s2core, s2rest_play)
-  .dependsOn(s2rest_play, s2rest_netty, loader, s2counter_loader, s2graphql) 
// this enables packaging on the root project
+  .dependsOn(s2rest_play, s2rest_netty, s2jobs, s2counter_loader, s2graphql) 
// this enables packaging on the root project
   .settings(commonSettings: _*)
 
 lazy val runRatTask = inputKey[Unit]("Runs Apache rat on S2Graph")
@@ -88,7 +91,7 @@ publishSigned := {
   (publishSigned in s2rest_netty).value
   (publishSigned in s2core).value
   (publishSigned in spark).value
-  (publishSigned in loader).value
+  (publishSigned in s2jobs).value
   (publishSigned in s2counter_core).value
   (publishSigned in s2counter_loader).value
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/loader/build.sbt
----------------------------------------------------------------------
diff --git a/loader/build.sbt b/loader/build.sbt
index ac7d948..a93713a 100644
--- a/loader/build.sbt
+++ b/loader/build.sbt
@@ -29,14 +29,15 @@ projectDependencies := Seq(
 
 libraryDependencies ++= Seq(
   "com.google.guava" % "guava" % "12.0.1" force(), // use this old version of 
guava to avoid incompatibility
-  "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
+  "org.apache.spark" %% "spark-core" % sparkVersion % "provided" 
exclude("javax.servlet", "*"),
   "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
   "org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
   "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion,
   "org.apache.httpcomponents" % "fluent-hc" % "4.2.5",
   "org.specs2" %% "specs2-core" % specs2Version % "test",
   "org.scalatest" %% "scalatest" % "2.2.1" % "test",
-  "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion
+  "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion,
+  "com.github.scopt" %% "scopt" % "3.7.0"
 )
 
 crossScalaVersions := Seq("2.10.6")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/loader/loader.py
----------------------------------------------------------------------
diff --git a/loader/loader.py b/loader/loader.py
index 1d4dc32..634f2be 100644
--- a/loader/loader.py
+++ b/loader/loader.py
@@ -16,50 +16,76 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import os, sys, urllib2, urllib
+import os, sys
+#, urllib2, urllib
 
 def cleanup(args): 
        cmd = "hadoop fs -rm -r /tmp/%s" % args["htable_name"]
-       print cmd
+       print(cmd)
        ret = os.system(cmd)
-       print cmd, "return", ret
+       print(cmd, "return", ret)
        return ret
 
 def hfile(args):
-       cmd = """spark-submit --class "subscriber.TransferToHFile" \
+       cmd = """spark-submit --class 
"org.apache.s2graph.loader.subscriber.TransferToHFile" \
 --name "TransferToHFile@shon" \
 --conf "spark.task.maxFailures=20" \
 --master yarn-cluster \
---num-executors %s --driver-memory 1g --executor-memory 2g --executor-cores 1 
%s \
-%s /tmp/%s %s %s %s %s %s %s""" % (args["num_executors"], JAR, args["input"], 
args["htable_name"], args["hbase_zk"], args["htable_name"], args["db_url"], 
args["max_file_per_region"], args["label_mapping"], args["auto_create_edge"])
-       print cmd
+--num-executors %s \
+--driver-memory 1g \
+--executor-memory 2g \
+--executor-cores 1 \
+%s \
+--input %s \
+--tmpPath /tmp/%s \
+--zkQuorum %s \
+--table %s \
+--dbUrl %s \
+--dbUser %s \
+--dbPassword %s \
+--maxHFilePerRegionServer %s \
+--labelMapping %s \
+--autoEdgeCreate %s""" % (args["num_executors"],
+                                                  JAR,
+                                                  args["input"],
+                                                  args["htable_name"],
+                                                  args["hbase_zk"],
+                                                  args["htable_name"],
+                                                  args["db_url"],
+                                                  args["db_user"],
+                                                  args["db_password"],
+                                                  args["max_file_per_region"],
+                                                  args["label_mapping"],
+                                                  args["auto_create_edge"])
+       print(cmd)
        ret = os.system(cmd)
-       print cmd, "return", ret
+       print(cmd, "return", ret)
        return ret
 
 def distcp(args): 
        cmd = "hadoop distcp -overwrite -m %s -bandwidth %s /tmp/%s %s/tmp/%s" 
% (args["-m"], args["-bandwidth"], args["htable_name"], args["hbase_namenode"], 
args["htable_name"])
-       print cmd
+       print(cmd)
        ret = os.system(cmd)
-       print cmd, "return", ret
+       print(cmd, "return", ret)
        return ret
 
 def chmod(args):
        cmd = "export HADOOP_CONF_DIR=%s; export HADOOP_USER_NAME=hdfs; hadoop 
fs -chmod -R 777 /tmp/%s" % (args["HADOOP_CONF_DIR"], args["htable_name"])
-       print cmd
+       print(cmd)
        ret = os.system(cmd)
-       print cmd, "return", ret
+       print(cmd, "return", ret)
        return ret
 
 def load(args):
-       cmd = "export HADOOP_CONF_DIR=%s; export HBASE_CONF_DIR=%s; hbase %s 
/tmp/%s %s" % (args["HADOOP_CONF_DIR"], args["HBASE_CONF_DIR"], LOADER_CLASS, 
args["htable_name"], args["htable_name"])
-       print cmd
+       cmd = "export HADOOP_CONF_DIR=%s; export HBASE_CONF_DIR=%s; hbase %s 
/tmp/%s %s" % \
+                 (args["HADOOP_CONF_DIR"], args["HBASE_CONF_DIR"], 
LOADER_CLASS, args["htable_name"], args["htable_name"])
+       print(cmd)
        ret = os.system(cmd)
-       print cmd, "return", ret
+       print(cmd, "return", ret)
        return ret
 
 def send(msg):
-       print msg
+       print(msg)
 
 def run(args):
        cleanup(args)
@@ -69,15 +95,15 @@ def run(args):
        if ret != 0: return send("[Failed]: loader build hfile failed %s" % ret)
        else: send("[Success]: loader build hfile")
        
-       ret = distcp(args)
-
-       if ret != 0: return send("[Failed]: loader distcp failed %s" % ret)
-       else: send("[Success]: loader distcp")
-
-       ret = chmod(args)
-       
-       if ret != 0: return send("[Failed]: loader chmod failed %s" % ret)
-       else: send("[Success]: loader chmod")
+       # ret = distcp(args)
+    #
+       # if ret != 0: return send("[Failed]: loader distcp failed %s" % ret)
+       # else: send("[Success]: loader distcp")
+    #
+       # ret = chmod(args)
+       #
+       # if ret != 0: return send("[Failed]: loader chmod failed %s" % ret)
+       # else: send("[Success]: loader chmod")
 
        ret = load(args)
 
@@ -86,23 +112,24 @@ def run(args):
 
 
 LOADER_CLASS = "org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles"
-JAR="loader/target/scala-2.10/s2loader-assembly-0.11.0-SNAPSHOT.jar"
-
+JAR="loader/target/scala-2.11/s2loader-assembly-0.2.1-SNAPSHOT.jar"
 
 args = {
 "HADOOP_CONF_DIR": "hdfs_conf_gasan", 
 "HBASE_CONF_DIR": "hbase_conf_gasan", 
 "htable_name": "test", 
-"hbase_namenode": "hdfs://nameservice:8020",
+"hbase_namenode": "hdfs://localhost:8020",
 "hbase_zk": "localhost",
 "db_url": "jdbc:mysql://localhost:3306/graph_dev",
+"db_user": "sa",
+"db_password": "sa",
 "max_file_per_region": 1,
 "label_mapping": "none",
 "auto_create_edge": "false",
 "-m": 1, 
 "-bandwidth": 10,
 "num_executors": 2,
-"input": "/user/test.txt"
+"input": "/tmp/test.txt"
 }
 
 run(args)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala 
b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala
index 1f68dc2..e177196 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala
@@ -591,6 +591,7 @@ class HBaseContext(@transient private val sc: SparkContext,
    *
    * @param rdd                            The RDD we are bulk loading from
    * @param tableName                      The HBase table we are loading into
+   * @param startKeys
    * @param flatMap                        A flapMap function that will make 
every
    *                                       row in the RDD
    *                                       into N cells for the bulk load
@@ -603,17 +604,16 @@ class HBaseContext(@transient private val sc: 
SparkContext,
    */
   def bulkLoad[T](rdd:RDD[T],
                   tableName: TableName,
+                  startKeys: Array[Array[Byte]],
                   flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
                   stagingDir:String,
-                  familyHFileWriteOptionsMap:
-                  util.Map[Array[Byte], FamilyHFileWriteOptions] =
-                  new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
+                  familyHFileWriteOptionsMap: util.Map[Array[Byte], 
FamilyHFileWriteOptions] = new util.HashMap[Array[Byte], 
FamilyHFileWriteOptions],
                   compactionExclude: Boolean = false,
                   maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
   Unit = {
-    val conn = ConnectionFactory.createConnection(config)
-    val regionLocator = conn.getRegionLocator(tableName)
-    val startKeys = regionLocator.getStartKeys
+//    val conn = ConnectionFactory.createConnection(config)
+//    val regionLocator = conn.getRegionLocator(tableName)
+//    val startKeys = regionLocator.getStartKeys
     val defaultCompressionStr = config.get("hfile.compression",
       Compression.Algorithm.NONE.getName)
     val defaultCompression = 
Compression.getCompressionAlgorithmByName(defaultCompressionStr)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala 
b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala
index b818a3c..54949a8 100644
--- 
a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala
@@ -192,6 +192,7 @@ object HBaseRDDFunctions
      */
     def hbaseBulkLoad(hc: HBaseContext,
                          tableName: TableName,
+                         startKeys: Array[Array[Byte]],
                          flatMap: (T) => Iterator[(KeyFamilyQualifier, 
Array[Byte])],
                          stagingDir:String,
                          familyHFileWriteOptionsMap:
@@ -199,7 +200,7 @@ object HBaseRDDFunctions
                          new util.HashMap[Array[Byte], 
FamilyHFileWriteOptions](),
                          compactionExclude: Boolean = false,
                          maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit 
= {
-      hc.bulkLoad(rdd, tableName,
+      hc.bulkLoad(rdd, tableName, startKeys,
         flatMap, stagingDir, familyHFileWriteOptionsMap,
         compactionExclude, maxSize)
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
index 6ecb070..90e8bbb 100644
--- 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
@@ -76,6 +76,16 @@ object GraphSubscriberHelper extends WithKafka {
     }
   }
 
+  def apply(_config: Config): Unit = {
+    config = _config
+    if (g == null) {
+      val ec = ExecutionContext.Implicits.global
+      g = new S2Graph(config)(ec)
+      management = new Management(g)
+      builder = g.elementBuilder
+    }
+  }
+
   def apply(phase: String, dbUrl: String, zkQuorum: String, kafkaBrokerList: 
String): Unit = {
     config = GraphConfig(phase, toOption(dbUrl), toOption(zkQuorum), 
toOption(kafkaBrokerList))
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
index 6aaf6fd..bfb5a96 100644
--- 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -19,44 +19,115 @@
 
 package org.apache.s2graph.loader.subscriber
 
-import org.apache.hadoop.hbase.client.Put
+import com.typesafe.config.Config
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase._
+import org.apache.hadoop.hbase.client.ConnectionFactory
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
-import org.apache.hadoop.hbase.mapreduce.{TableOutputFormat}
-import org.apache.hadoop.hbase._
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
 import org.apache.hadoop.hbase.regionserver.BloomType
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
-import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId, 
LabelWithDirection}
-import org.apache.s2graph.loader.spark.{KeyFamilyQualifier, HBaseContext, 
FamilyHFileWriteOptions}
+import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
+import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
+import org.apache.s2graph.loader.spark.{FamilyHFileWriteOptions, HBaseContext, 
KeyFamilyQualifier}
 import org.apache.s2graph.spark.spark.SparkApp
-import org.apache.spark.{SparkContext}
+import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
-import org.hbase.async.{PutRequest}
+import org.hbase.async.PutRequest
 import play.api.libs.json.Json
+
 import scala.collection.JavaConversions._
 
 
 object TransferToHFile extends SparkApp {
 
-  val usages =
-    s"""
-       |create HFiles for hbase table on zkQuorum specified.
-       |note that hbase table is created already and pre-splitted properly.
-       |
-       |params:
-       |0. input: hdfs path for tsv file(bulk format).
-       |1. output: hdfs path for storing HFiles.
-       |2. zkQuorum: running hbase cluster zkQuorum.
-       |3. tableName: table name for this bulk upload.
-       |4. dbUrl: db url for parsing to graph element.
-     """.stripMargin
+  var options:GraphFileOptions = _
+
+  case class GraphFileOptions(input: String = "",
+                              tmpPath: String = s"/tmp/bulkload",
+                              zkQuorum: String = "",
+                              tableName: String = "",
+                              dbUrl: String = "",
+                              dbUser: String = "",
+                              dbPassword: String = "",
+                              maxHFilePerRegionServer: Int = 1,
+                              numRegions: Int = 3,
+                              labelMapping: Map[String, String] = 
Map.empty[String, String],
+                              autoEdgeCreate: Boolean = false,
+                              buildDegree: Boolean = false,
+                              incrementalLoad: Boolean = false,
+                              compressionAlgorithm: String = "NONE") {
+    def toConfigParams = {
+      Map(
+        "hbase.zookeeper.quorum" -> zkQuorum,
+        "db.default.url" -> dbUrl,
+        "db.default.user" -> dbUser,
+        "db.default.password" -> dbPassword
+      )
+    }
+  }
+
+  val parser = new scopt.OptionParser[GraphFileOptions]("run") {
+
+    opt[String]('i', "input").required().action( (x, c) =>
+      c.copy(input = x) ).text("hdfs path for tsv file(bulk format)")
+
+    opt[String]('m', "tmpPath").required().action( (x, c) =>
+      c.copy(tmpPath = x) ).text("temp hdfs path for storing HFiles")
+
+    opt[String]('z', "zkQuorum").required().action( (x, c) =>
+      c.copy(zkQuorum = x) ).text("zookeeper config for hbase")
+
+    opt[String]('t', "table").required().action( (x, c) =>
+      c.copy(tableName = x) ).text("table name for this bulk upload.")
+
+    opt[String]('c', "dbUrl").required().action( (x, c) =>
+      c.copy(dbUrl = x)).text("jdbc connection url.")
+
+    opt[String]('u', "dbUser").required().action( (x, c) =>
+      c.copy(dbUser = x)).text("database user name.")
+
+    opt[String]('p', "dbPassword").required().action( (x, c) =>
+      c.copy(dbPassword = x)).text("database password.")
+
+    opt[Int]('h', "maxHFilePerRegionServer").action ( (x, c) =>
+      c.copy(maxHFilePerRegionServer = x)).text("maximum number of HFile per 
RegionServer."
+    )
+
+    opt[Int]('n', "numRegions").action ( (x, c) =>
+      c.copy(numRegions = x)).text("total numRegions(pre-split size) on table."
+    )
+
+    opt[String]('l', "labelMapping").action( (x, c) =>
+      c.copy(labelMapping = toLabelMapping(x)) ).text("mapping info to change 
the label from source (originalLabel:newLabel)")
+
+    opt[Boolean]('d', "buildDegree").action( (x, c) =>
+      c.copy(buildDegree = x)).text("generate degree values")
+
+    opt[Boolean]('a', "autoEdgeCreate").action( (x, c) =>
+      c.copy(autoEdgeCreate = x)).text("generate reverse edge automatically")
+
+    opt[Boolean]('c', "incrementalLoad").action( (x, c) =>
+      c.copy(incrementalLoad = x)).text("whether incremental bulkload which 
append data on existing table or not."
+    )
+  }
 
   //TODO: Process AtomicIncrementRequest too.
   /** build key values */
   case class DegreeKey(vertexIdStr: String, labelName: String, direction: 
String)
 
+  private def toLabelMapping(lableMapping: String): Map[String, String] = {
+    (for {
+      token <- lableMapping.split(",")
+      inner = token.split(":") if inner.length == 2
+    } yield {
+      (inner.head, inner.last)
+    }).toMap
+  }
+
   private def insertBulkForLoaderAsync(edge: S2Edge, createRelEdges: Boolean = 
true): List[PutRequest] = {
     val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
     buildPutRequests(edge.toSnapshotEdge) ++ relEdges.toList.flatMap { e =>
@@ -83,14 +154,17 @@ object TransferToHFile extends SparkApp {
       output <- List(degreeKey -> 1L) ++ extra
     } yield output
   }
+
   def buildPutRequests(snapshotEdge: SnapshotEdge): List[PutRequest] = {
     val kvs = 
GraphSubscriberHelper.g.getStorage(snapshotEdge.label).serDe.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList
     kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, 
kv.value, kv.timestamp) }
   }
+
   def buildPutRequests(indexEdge: IndexEdge): List[PutRequest] = {
     val kvs = 
GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.toList
     kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, 
kv.value, kv.timestamp) }
   }
+
   def buildDegreePutRequests(vertexId: String, labelName: String, direction: 
String, degreeVal: Long): List[PutRequest] = {
     val label = Label.findByName(labelName).getOrElse(throw new 
RuntimeException(s"$labelName is not found in DB."))
     val dir = GraphUtil.directions(direction)
@@ -101,7 +175,7 @@ object TransferToHFile extends SparkApp {
 
     val ts = System.currentTimeMillis()
     val propsWithTs = Map(LabelMeta.timestamp -> 
InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
-    val edge = GraphSubscriberHelper.builder.newEdge(vertex, vertex, label, 
dir, propsWithTs=propsWithTs)
+    val edge = GraphSubscriberHelper.builder.newEdge(vertex, vertex, label, 
dir, propsWithTs = propsWithTs)
 
     edge.edgesWithIndex.flatMap { indexEdge =>
       
GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.map
 { kv =>
@@ -115,13 +189,13 @@ object TransferToHFile extends SparkApp {
       (key, value) <- degreeKeyVals
       putRequest <- buildDegreePutRequests(key.vertexIdStr, key.labelName, 
key.direction, value)
     } yield {
-        val p = putRequest
-        val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, 
p.value)
-        kv
-      }
+      val p = putRequest
+      val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, 
p.value)
+      kv
+    }
     kvs.toIterator
   }
-  
+
   def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], 
autoEdgeCreate: Boolean): Iterator[KeyValue] = {
     val kvList = new java.util.ArrayList[KeyValue]
     for (s <- strs) {
@@ -143,71 +217,141 @@ object TransferToHFile extends SparkApp {
             val kv = new KeyValue(p.key(), p.family(), p.qualifier, 
p.timestamp, p.value)
             kvList.add(kv)
           }
-        } 
+        }
       }
     }
     kvList.iterator()
   }
-  
-
 
-  override def run() = {
-    val input = args(0)
-    val tmpPath = args(1)
-    val zkQuorum = args(2)
-    val tableName = args(3)
-    val dbUrl = args(4)
-    val maxHFilePerResionServer = args(5).toInt
-    val labelMapping = if (args.length >= 7) 
GraphSubscriberHelper.toLabelMapping(args(6)) else Map.empty[String, String]
-    val autoEdgeCreate = if (args.length >= 8) args(7).toBoolean else false
-    val buildDegree = if (args.length >= 9) args(8).toBoolean else true
-    val compressionAlgorithm = if (args.length >= 10) args(9) else "lz4"
-    val conf = sparkConf(s"$input: TransferToHFile")
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-    conf.set("spark.kryoserializer.buffer.mb", "24")
+  def generateKeyValues(sc: SparkContext,
+                        s2Config: Config,
+                        input: RDD[String],
+                        graphFileOptions: GraphFileOptions): RDD[KeyValue] = {
+    val kvs = input.mapPartitions { iter =>
+      GraphSubscriberHelper.apply(s2Config)
 
-    val sc = new SparkContext(conf)
-
-    GraphSubscriberHelper.management.createStorageTable(zkQuorum, tableName, 
List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm)
-
-    /* set up hbase init */
-    val hbaseConf = HBaseConfiguration.create()
-    hbaseConf.set("hbase.zookeeper.quorum", zkQuorum)
-    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
-    hbaseConf.set("hadoop.tmp.dir", s"/tmp/$tableName")
-
-
-    val rdd = sc.textFile(input)
-
-
-    val kvs = rdd.mapPartitions { iter =>
-      val phase = System.getProperty("phase")
-      GraphSubscriberHelper.apply(phase, dbUrl, "none", "none")
-      toKeyValues(iter.toSeq, labelMapping, autoEdgeCreate)
+      toKeyValues(iter.toSeq, graphFileOptions.labelMapping, 
graphFileOptions.autoEdgeCreate)
     }
 
-    val merged = if (!buildDegree) kvs
+    if (!graphFileOptions.buildDegree) kvs
     else {
-      kvs ++ buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { 
(agg, current) =>
+      kvs ++ buildDegrees(input, graphFileOptions.labelMapping, 
graphFileOptions.autoEdgeCreate).reduceByKey { (agg, current) =>
         agg + current
       }.mapPartitions { iter =>
-        val phase = System.getProperty("phase")
-        GraphSubscriberHelper.apply(phase, dbUrl, "none", "none")
+        GraphSubscriberHelper.apply(s2Config)
+
         toKeyValues(iter.toSeq)
       }
     }
+  }
+
+  def generateHFile(sc: SparkContext,
+                    s2Config: Config,
+                    kvs: RDD[KeyValue],
+                    options: GraphFileOptions): Unit = {
+    val hbaseConfig = toHBaseConfig(options)
+    val startKeys =
+      if (options.incrementalLoad) {
+        // need hbase connection to existing table to figure out the ranges of 
regions.
+        getTableStartKeys(hbaseConfig, TableName.valueOf(options.tableName))
+      } else {
+        // otherwise we do not need to initialize Connection to hbase cluster.
+        // only numRegions determine region's pre-split.
+        getStartKeys(numRegions = options.numRegions)
+      }
+
+    val hbaseSc = new HBaseContext(sc, hbaseConfig)
 
-    val hbaseSc = new HBaseContext(sc, hbaseConf)
     def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
       val k = new KeyFamilyQualifier(CellUtil.cloneRow(kv), 
CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv))
       val v = CellUtil.cloneValue(kv)
       Seq((k -> v)).toIterator
     }
-    val familyOptions = new 
FamilyHFileWriteOptions(Algorithm.LZ4.getName.toUpperCase,
+
+    val compressionAlgorithmClass = 
Algorithm.valueOf(options.compressionAlgorithm).getName.toUpperCase
+    val familyOptions = new FamilyHFileWriteOptions(compressionAlgorithmClass,
       BloomType.ROW.name().toUpperCase, 32768, 
DataBlockEncoding.FAST_DIFF.name().toUpperCase)
     val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, 
"v".getBytes("UTF-8") -> familyOptions)
 
-    hbaseSc.bulkLoad(merged, TableName.valueOf(tableName), flatMap, tmpPath, 
familyOptionsMap)
+
+    hbaseSc.bulkLoad(kvs, TableName.valueOf(options.tableName), startKeys, 
flatMap, options.tmpPath, familyOptionsMap)
+  }
+
+  def getTableStartKeys(hbaseConfig: Configuration, tableName: TableName): 
Array[Array[Byte]] = {
+    val conn = ConnectionFactory.createConnection(hbaseConfig)
+    val regionLocator = conn.getRegionLocator(tableName)
+    regionLocator.getStartKeys
+  }
+
+  def toHBaseConfig(graphFileOptions: GraphFileOptions): Configuration = {
+    val hbaseConf = HBaseConfiguration.create()
+
+    hbaseConf.set("hbase.zookeeper.quorum", graphFileOptions.zkQuorum)
+    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, graphFileOptions.tableName)
+    hbaseConf.set("hadoop.tmp.dir", s"/tmp/${graphFileOptions.tableName}")
+
+    hbaseConf
+  }
+
+  def getStartKeys(numRegions: Int): Array[Array[Byte]] = {
+    val startKey = AsynchbaseStorageManagement.getStartKey(numRegions)
+    val endKey = AsynchbaseStorageManagement.getEndKey(numRegions)
+    if (numRegions < 3) {
+      throw new IllegalArgumentException("Must create at least three regions")
+    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
+      throw new IllegalArgumentException("Start key must be smaller than end 
key")
+    }
+    val empty = new Array[Byte](0)
+
+    if (numRegions == 3) {
+      Array(empty, startKey, endKey)
+    } else {
+      val splitKeys: Array[Array[Byte]] = Bytes.split(startKey, endKey, 
numRegions - 3)
+      if (splitKeys == null || splitKeys.length != numRegions - 1) {
+        throw new IllegalArgumentException("Unable to split key range into 
enough regions")
+      }
+      Array(empty) ++ splitKeys.toSeq
+    }
+  }
+
+  override def run() = {
+    parser.parse(args, GraphFileOptions()) match {
+      case Some(o) => options = o
+      case None =>
+        parser.showUsage()
+        throw new IllegalArgumentException("failed to parse options...")
+    }
+
+    println(s">>> Options: ${options}")
+    val s2Config = Management.toConfig(options.toConfigParams)
+
+    val conf = sparkConf(s"TransferToHFile")
+
+    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+    conf.set("spark.kryoserializer.buffer.mb", "24")
+
+    val sc = new SparkContext(conf)
+    val rdd = sc.textFile(options.input)
+
+    GraphSubscriberHelper.apply(s2Config)
+
+    val merged = TransferToHFile.generateKeyValues(sc, s2Config, rdd, options)
+    generateHFile(sc, s2Config, merged, options)
+//    /* set up hbase init */
+//    val hbaseSc = new HBaseContext(sc, toHBaseConfig(options))
+//
+//    def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = 
{
+//      val k = new KeyFamilyQualifier(CellUtil.cloneRow(kv), 
CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv))
+//      val v = CellUtil.cloneValue(kv)
+//      Seq((k -> v)).toIterator
+//    }
+//
+//    val familyOptions = new 
FamilyHFileWriteOptions(Algorithm.LZ4.getName.toUpperCase,
+//      BloomType.ROW.name().toUpperCase, 32768, 
DataBlockEncoding.FAST_DIFF.name().toUpperCase)
+//    val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, 
"v".getBytes("UTF-8") -> familyOptions)
+//
+//    val startKeys = getStartKeys(numRegions = options.numRegions)
+//    hbaseSc.bulkLoad(merged, TableName.valueOf(options.tableName), 
startKeys, flatMap, options.tmpPath, familyOptionsMap)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
 
b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
new file mode 100644
index 0000000..6918ce4
--- /dev/null
+++ 
b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.s2graph.loader.subscriber
+
+import java.io.PrintWriter
+import java.util
+
+import com.typesafe.config.ConfigFactory
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
+import org.apache.hadoop.util.ToolRunner
+import org.apache.s2graph.core.{Management, PostProcess}
+import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
+import org.apache.s2graph.core.storage.CanSKeyValue
+import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
+import org.apache.s2graph.core.types.HBaseType
+import org.apache.s2graph.loader.subscriber.TransferToHFile.options
+import org.apache.spark.{SparkConf, SparkContext}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+import play.api.libs.json.Json
+
+import scala.util.Try
+
+class TransferToHFileTest extends FunSuite with Matchers with 
BeforeAndAfterAll {
+
+  import TransferToHFile._
+  import scala.collection.JavaConverters._
+
+  private val master = "local[2]"
+  private val appName = "example-spark"
+
+  private var sc: SparkContext = _
+
+  /* TransferHFile parameters */
+  val options = GraphFileOptions(
+    input = "/tmp/test.txt",
+    tmpPath = "/tmp/s2graph",
+    zkQuorum = "localhost",
+    dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL",
+    dbUser = "sa",
+    dbPassword = "sa",
+    tableName = "s2graph",
+    maxHFilePerRegionServer = 1,
+    numRegions = 3,
+    compressionAlgorithm = "NONE",
+    buildDegree = false,
+    autoEdgeCreate = false)
+
+  val s2Config = Management.toConfig(options.toConfigParams)
+
+  val tableName = options.tableName
+  val schemaVersion = HBaseType.DEFAULT_VERSION
+  val compressionAlgorithm: String = options.compressionAlgorithm
+
+  override def beforeAll(): Unit = {
+    // initialize spark context.
+    val conf = new SparkConf()
+      .setMaster(master)
+      .setAppName(appName)
+
+    sc = new SparkContext(conf)
+
+    GraphSubscriberHelper.apply(s2Config)
+  }
+
+  override def afterAll(): Unit = {
+    GraphSubscriberHelper.g.shutdown()
+    if (sc != null) {
+      sc.stop()
+    }
+  }
+
+  private def writeToFile(fileName: String)(lines: Seq[String]): Unit = {
+    val writer = new PrintWriter(fileName)
+    lines.foreach(line => writer.write(line + "\n"))
+    writer.close
+  }
+
+  private def initTestEdgeSchema(): Label = {
+    import scala.collection.JavaConverters._
+    /* initialize model for test */
+    val management = GraphSubscriberHelper.management
+
+    val service = management.createService(serviceName = "s2graph", cluster = 
"localhost",
+      hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, 
compressionAlgorithm = "gz")
+
+    val serviceColumn = management.createServiceColumn(service.serviceName, 
"user", "string", Nil)
+
+    Try {
+      management.createLabel("friends", serviceColumn, serviceColumn, 
isDirected = true,
+        serviceName = service.serviceName, indices = new 
java.util.ArrayList[Index],
+        props = Seq(Prop("since", "0", "long"), Prop("score", "0", 
"integer")).asJava, consistencyLevel = "strong", hTableName = tableName,
+        hTableTTL = -1, schemaVersion = schemaVersion, compressionAlgorithm = 
compressionAlgorithm, options = "")
+    }
+
+    Label.findByName("friends").getOrElse(throw new 
IllegalArgumentException("friends label is not initialized."))
+  }
+
+  private def initTestVertexSchema(): ServiceColumn = {
+    import scala.collection.JavaConverters._
+    /* initialize model for test */
+    val management = GraphSubscriberHelper.management
+
+    val service = management.createService(serviceName = "s2graph", cluster = 
"localhost",
+      hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, 
compressionAlgorithm = "gz")
+
+    management.createServiceColumn(service.serviceName, "imei", "string",
+      Seq(
+        Prop(name = "first_time", defaultValue = "''", dataType = "string"),
+        Prop(name = "last_time", defaultValue = "''", dataType = "string"),
+        Prop(name = "total_active_days", defaultValue = "-1", dataType = 
"integer"),
+        Prop(name = "query_amount", defaultValue = "-1", dataType = "integer"),
+        Prop(name = "active_months", defaultValue = "-1", dataType = 
"integer"),
+        Prop(name = "fua", defaultValue = "''", dataType = "string"),
+        Prop(name = "location_often_province", defaultValue = "''", dataType = 
"string"),
+        Prop(name = "location_often_city", defaultValue = "''", dataType = 
"string"),
+        Prop(name = "location_often_days", defaultValue = "-1", dataType = 
"integer"),
+        Prop(name = "location_last_province", defaultValue = "''", dataType = 
"string"),
+        Prop(name = "location_last_city", defaultValue = "''", dataType = 
"string"),
+        Prop(name = "fimei_legality", defaultValue = "-1", dataType = 
"integer")
+      ))
+  }
+
+  test("test generateKeyValues edge only.") {
+    import scala.collection.JavaConverters._
+    import org.apache.s2graph.core.storage.CanSKeyValue._
+
+    val label = initTestEdgeSchema()
+    /* end of initialize model */
+
+    val bulkEdgeString = 
"1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
+    val input = sc.parallelize(Seq(bulkEdgeString))
+
+    val kvs = TransferToHFile.generateKeyValues(sc, s2Config, input, options)
+
+    val ls = kvs.map(kv => 
CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)).collect().toList
+
+    val serDe = GraphSubscriberHelper.g.defaultStorage.serDe
+
+    //    val snapshotEdgeOpt = 
serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(ls.head), 
None)
+    //    val indexEdgeOpt = 
serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(ls.last), 
None)
+
+    val bulkEdge = 
GraphSubscriberHelper.g.elementBuilder.toGraphElement(bulkEdgeString, 
options.labelMapping).get
+
+    val indexEdges = ls.flatMap { kv =>
+      serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), 
None)
+    }
+
+    val indexEdge = indexEdges.head
+
+    bulkEdge shouldBe(indexEdge)
+  }
+
+
+  test("test generateKeyValues vertex only.") {
+    val serviceColumn = initTestVertexSchema()
+    val bulkVertexString = 
"20171201\tinsert\tvertex\t800188448586078\ts2graph\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
+    val bulkVertex = 
GraphSubscriberHelper.g.elementBuilder.toGraphElement(bulkVertexString, 
options.labelMapping).get
+
+    val input = sc.parallelize(Seq(bulkVertexString))
+
+    val kvs = TransferToHFile.generateKeyValues(sc, s2Config, input, options)
+
+    val ls = kvs.map(kv => 
CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)).collect().toList
+
+    val serDe = GraphSubscriberHelper.g.defaultStorage.serDe
+
+
+
+    val vertex = 
serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, 
None).get
+
+    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
+      println(Json.prettyPrint(jsValue))
+    }
+
+    bulkVertex shouldBe(vertex)
+  }
+
+  test("test generateHFile vertex only.") {
+    val serviceColumn = initTestVertexSchema()
+
+    val bulkVertexString = 
"20171201\tinsert\tvertex\t800188448586078\ts2graph\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
+    val input = sc.parallelize(Seq(bulkVertexString))
+
+    val kvs = TransferToHFile.generateKeyValues(sc, s2Config, input, options)
+    TransferToHFile.generateHFile(sc, s2Config, kvs, options)
+  }
+
+  test("test loader script.") {
+    val serviceColumn = initTestVertexSchema()
+
+    val bulkVertexString = 
"20171201\tinsert\tvertex\t800188448586078\ts2graph\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
+    val bulkVertexLs = Seq(bulkVertexString)
+    writeToFile(options.input)(bulkVertexLs)
+
+    val input = sc.parallelize(bulkVertexLs)
+    GraphSubscriberHelper.apply(s2Config)
+    val graph = GraphSubscriberHelper.g
+    val vertex = graph.elementBuilder.toVertex(bulkVertexString).get
+
+    val kvs = TransferToHFile.generateKeyValues(sc, s2Config, input, options)
+    TransferToHFile.generateHFile(sc, s2Config, kvs, options)
+
+    val hfileArgs = Array(options.tmpPath, options.tableName)
+    val hbaseConfig = HBaseConfiguration.create()
+
+    val ret = ToolRunner.run(hbaseConfig, new 
LoadIncrementalHFiles(hbaseConfig), hfileArgs)
+
+    val vertexId = 
graph.elementBuilder.newVertexId("s2graph")("imei")("800188448586078")
+    val vertexOpt = graph.getVertex(vertexId)
+
+    vertexOpt.isDefined shouldBe(true)
+    vertexOpt.get shouldBe (vertex)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/project/Common.scala
----------------------------------------------------------------------
diff --git a/project/Common.scala b/project/Common.scala
index d3b8d93..e2323a2 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -21,6 +21,7 @@ import sbt._
 
 object Common {
   val sparkVersion = "1.4.1"
+  val spark2Version = "2.3.0"
   val playVersion = "2.5.9"
   val specs2Version = "3.8.5"
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
index c8c25b3..0478413 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
@@ -52,7 +52,7 @@ class GraphElementBuilder(graph: S2GraphLike) {
     element
   } recover {
     case e: Exception =>
-      logger.error(s"[toElement]: $e", e)
+      logger.error(s"[toElement]: $s", e)
       None
   } get
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
index 9de3d9d..b92e47b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
@@ -344,8 +344,9 @@ object JSONParser {
   def fromJsonToProperties(jsObject: JsObject): Map[String, Any] = {
     val kvs = for {
       (k, v) <- jsObject.fieldSet
+      anyVal <- jsValueToAny(v)
     } yield {
-        k -> jsValueToString(v)
+        k -> anyVal
       }
     kvs.toMap
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
index 819c378..95c3c7b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
@@ -121,11 +121,13 @@ case class ServiceColumn(id: Option[Int],
   lazy val toJson = Json.obj("serviceName" -> service.serviceName, 
"columnName" -> columnName, "columnType" -> columnType)
 
   def propsToInnerVals(props: Map[String, Any]): Map[ColumnMeta, InnerValLike] 
= {
-    for {
+    val ret = for {
       (k, v) <- props
       labelMeta <- metasInvMap.get(k)
       innerVal = toInnerVal(v, labelMeta.dataType, schemaVersion)
     } yield labelMeta -> innerVal
+
+    ret
   }
 
   def innerValsToProps(props: Map[Int, InnerValLike]): Map[String, Any] = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
index 775afda..57adc8a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
@@ -65,6 +65,10 @@ object CanSKeyValue {
     SKeyValue(Array.empty[Byte], kv.key(), kv.family(), kv.qualifier(), 
kv.value(), kv.timestamp())
   }
 
+  implicit val hbaseKeyValue = instance[org.apache.hadoop.hbase.KeyValue] { kv 
=>
+    SKeyValue(Array.empty[Byte], kv.getRow, kv.getFamily, kv.getQualifier, 
kv.getValue, kv.getTimestamp)
+  }
+
   // For asyncbase KeyValues
   implicit val sKeyValue = instance[SKeyValue](identity)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala
index 32d640c..78da629 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala
@@ -70,7 +70,7 @@ trait StorageSerDe {
     **/
   def snapshotEdgeDeserializer(schemaVer: String): Deserializable[SnapshotEdge]
 
-  def indexEdgeDeserializer(schemaVer: String): IndexEdgeDeserializable
+  def indexEdgeDeserializer(schemaVer: String): Deserializable[S2EdgeLike]
 
   def vertexDeserializer(schemaVer: String): Deserializable[S2VertexLike]
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala
index 0fb3173..8475ba6 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala
@@ -56,6 +56,14 @@ object AsynchbaseStorageManagement {
   val DefaultCreateTableOptions = Map(
     "hbase.zookeeper.quorum" -> "localhost"
   )
+
+  def getStartKey(regionCount: Int): Array[Byte] = {
+    Bytes.toBytes((Int.MaxValue / regionCount))
+  }
+
+  def getEndKey(regionCount: Int): Array[Byte] = {
+    Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1)))
+  }
 }
 
 class AsynchbaseStorageManagement(val config: Config, val clients: 
Seq[HBaseClient]) extends StorageManagement {
@@ -271,12 +279,4 @@ class AsynchbaseStorageManagement(val config: Config, val 
clients: Seq[HBaseClie
       conn.getAdmin
     }
   }
-
-  private def getStartKey(regionCount: Int): Array[Byte] = {
-    Bytes.toBytes((Int.MaxValue / regionCount))
-  }
-
-  private def getEndKey(regionCount: Int): Array[Byte] = {
-    Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1)))
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/build.sbt
----------------------------------------------------------------------
diff --git a/s2jobs/build.sbt b/s2jobs/build.sbt
new file mode 100644
index 0000000..b915238
--- /dev/null
+++ b/s2jobs/build.sbt
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import Common._
+
+name := "s2jobs"
+
+scalacOptions ++= Seq("-deprecation")
+
+projectDependencies := Seq(
+  (projectID in "s2core").value exclude("org.mortbay.jetty", "j*") 
exclude("javax.xml.stream", "s*") exclude("javax.servlet", "s*") 
exclude("javax.servlet", "j*")
+)
+
+libraryDependencies ++= Seq(
+  "com.google.guava" % "guava" % "12.0.1" force(), // use this old version of 
guava to avoid incompatibility
+  "org.apache.spark" %% "spark-core" % spark2Version,
+  "org.apache.spark" %% "spark-streaming" % spark2Version % "provided",
+  "org.apache.spark" %% "spark-hive" % spark2Version % "provided",
+  "org.specs2" %% "specs2-core" % specs2Version % "test",
+  "org.scalatest" %% "scalatest" % "2.2.1" % "test",
+  "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion,
+  "com.github.scopt" %% "scopt" % "3.7.0"
+)
+
+crossScalaVersions := Seq("2.10.6")
+
+mergeStrategy in assembly := {
+  case PathList("META-INF", ps @ _*) => MergeStrategy.discard
+  case _ => MergeStrategy.first
+}
+
+excludedJars in assembly := {
+  val cp = (fullClasspath in assembly).value
+  cp filter {_.data.getName == "guava-16.0.1.jar"}
+}
+
+test in assembly := {}
+
+parallelExecution in Test := false
+
+mainClass in (Compile) := None

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/loader.py
----------------------------------------------------------------------
diff --git a/s2jobs/loader.py b/s2jobs/loader.py
new file mode 100644
index 0000000..d3a9e67
--- /dev/null
+++ b/s2jobs/loader.py
@@ -0,0 +1,153 @@
+#!/usr/bin/python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os, sys
+#, urllib2, urllib
+
+def cleanup(args):
+       cmd = "hadoop fs -rm -r /tmp/%s" % args["htable_name"]
+       print(cmd)
+       ret = os.system(cmd)
+       print(cmd, "return", ret)
+       return ret
+
+def hfile(args):
+       print(args)
+       cmd = """HADOOP_CONF_DIR=%s spark-submit --class 
"org.apache.s2graph.s2jobs.loader.GraphFileGenerator" \
+--name "GraphFileGenerator@shon" \
+--conf "spark.task.maxFailures=20" \
+--conf "spark.executor.extraClassPath=%s" \
+--conf "spark.driver.extraClassPath=%s" \
+--jars %s \
+--master local[2] \
+--num-executors %s \
+--driver-memory 1g \
+--executor-memory 2g \
+--executor-cores 1 \
+%s \
+--input %s \
+--tempDir %s \
+--output /tmp/%s \
+--zkQuorum %s \
+--table %s \
+--dbUrl '%s' \
+--dbUser %s \
+--dbPassword %s \
+--dbDriver %s \
+--maxHFilePerRegionServer %s \
+--labelMapping %s \
+--autoEdgeCreate %s""" % (args["HADOOP_CONF_DIR"],
+                                                 MYSQL_JAR,
+                                                 MYSQL_JAR,
+                                                 MYSQL_JAR,
+                                                 args["num_executors"],
+                                                 JAR,
+                                                 args["input"],
+                                                 args["tempDir"],
+                                                 args["htable_name"],
+                                                 args["hbase_zk"],
+                                                 args["htable_name"],
+                                                 args["db_url"],
+                                                 args["db_user"],
+                                                 args["db_password"],
+                                                 args["db_driver"],
+                                                 args["max_file_per_region"],
+                                                 args["label_mapping"],
+                                                 args["auto_create_edge"])
+       print(cmd)
+       ret = os.system(cmd)
+       print(cmd, "return", ret)
+       return ret
+
+def distcp(args):
+       cmd = "hadoop distcp -overwrite -m %s -bandwidth %s /tmp/%s %s/tmp/%s" 
% (args["-m"], args["-bandwidth"], args["htable_name"], args["hbase_namenode"], 
args["htable_name"])
+       print(cmd)
+       ret = os.system(cmd)
+       print(cmd, "return", ret)
+       return ret
+
+def chmod(args):
+       cmd = "export HADOOP_CONF_DIR=%s; export HADOOP_USER_NAME=hdfs; hadoop 
fs -chmod -R 777 /tmp/%s" % (args["HADOOP_CONF_DIR"], args["htable_name"])
+       print(cmd)
+       ret = os.system(cmd)
+       print(cmd, "return", ret)
+       return ret
+
+def load(args):
+       cmd = "export HADOOP_CONF_DIR=%s; export HBASE_CONF_DIR=%s; hbase %s 
/tmp/%s %s" % \
+                 (args["HADOOP_CONF_DIR"], args["HBASE_CONF_DIR"], 
LOADER_CLASS, args["htable_name"], args["htable_name"])
+       print(cmd)
+       ret = os.system(cmd)
+       print(cmd, "return", ret)
+       return ret
+
+def send(msg):
+       print(msg)
+
+def run(args):
+       cleanup(args)
+       send("[Start]: bulk loader")
+       ret = hfile(args)
+
+       if ret != 0: return send("[Failed]: loader build hfile failed %s" % ret)
+       else: send("[Success]: loader build hfile")
+
+       # ret = distcp(args)
+       #
+       # if ret != 0: return send("[Failed]: loader distcp failed %s" % ret)
+       # else: send("[Success]: loader distcp")
+       #
+       # ret = chmod(args)
+       #
+       # if ret != 0: return send("[Failed]: loader chmod failed %s" % ret)
+       # else: send("[Success]: loader chmod")
+
+       ret = load(args)
+
+       if ret != 0: return send("[Failed]: loader complete bulkload failed %s" 
% ret)
+       else: send("[Success]: loader complete bulkload")
+
+
+LOADER_CLASS = "org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles"
+JAR="s2jobs/target/scala-2.11/s2jobs-assembly-0.2.1-SNAPSHOT.jar"
+MYSQL_JAR="/Users/shon/Downloads/mysql-connector-java-5.1.28.jar"
+MYSQL_CLASSPATH="/Users/shon/Downloads/mysql-connector-java-5.1.28.jar"
+DB_DRIVER="com.mysql.jdbc.Driver"
+DB_URL="jdbc:mysql://localhost:3306/graph_dev"
+# DB_URL="jdbc:h2:file:./var/metastore;MODE=MYSQL"
+args = {
+       "HADOOP_CONF_DIR": "/usr/local/Cellar/hadoop/2.7.3/libexec/etc/hadoop",
+       "HBASE_CONF_DIR": "/usr/local/opt/hbase/libexec/conf",
+       "htable_name": "test",
+       "hbase_namenode": "hdfs://localhost:8020",
+       "hbase_zk": "localhost",
+       "db_driver": DB_DRIVER,
+       "db_url": DB_URL,
+       "db_user": "graph",
+       "db_password": "graph",
+       "max_file_per_region": 1,
+       "label_mapping": "none",
+       "auto_create_edge": "false",
+       "-m": 1,
+       "-bandwidth": 10,
+       "num_executors": 2,
+       "input": "/tmp/imei-20.txt",
+       "tempDir": "/tmp/bulkload_tmp"
+}
+
+run(args)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/java/org/apache/hadoop/hbase/mapreduce/GraphHFileOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/java/org/apache/hadoop/hbase/mapreduce/GraphHFileOutputFormat.java
 
b/s2jobs/src/main/java/org/apache/hadoop/hbase/mapreduce/GraphHFileOutputFormat.java
new file mode 100644
index 0000000..4d6599f
--- /dev/null
+++ 
b/s2jobs/src/main/java/org/apache/hadoop/hbase/mapreduce/GraphHFileOutputFormat.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+
+public class GraphHFileOutputFormat extends HFileOutputFormat2 {
+    private static final String COMPRESSION_FAMILIES_CONF_KEY =
+            "hbase.hfileoutputformat.families.compression";
+    private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
+            "hbase.hfileoutputformat.families.bloomtype";
+    private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
+            "hbase.mapreduce.hfileoutputformat.blocksize";
+    private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
+            "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
+
+    // This constant is public since the client can modify this when setting
+    // up their conf object and thus refer to this symbol.
+    // It is present for backwards compatibility reasons. Use it only to
+    // override the auto-detection of datablock encoding.
+    public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
+            "hbase.mapreduce.hfileoutputformat.datablock.encoding";
+
+    static Log LOG = LogFactory.getLog(GraphHFileOutputFormat.class);
+
+    public static void configureIncrementalLoad(Job job, 
List<ImmutableBytesWritable> startKeys,
+                                                List<String> familyNames, 
Compression.Algorithm compression, BloomType bloomType,
+                                                int blockSize, 
DataBlockEncoding dataBlockEncoding) throws IOException {
+
+        Configuration conf = job.getConfiguration();
+
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(KeyValue.class);
+        job.setOutputFormatClass(HFileOutputFormat2.class);
+
+        // Based on the configured map output class, set the correct reducer 
to properly
+        // sort the incoming values.
+        // TODO it would be nice to pick one or the other of these formats.
+        if (KeyValue.class.equals(job.getMapOutputValueClass())) {
+            job.setReducerClass(KeyValueSortReducer.class);
+        } else if (Put.class.equals(job.getMapOutputValueClass())) {
+            job.setReducerClass(PutSortReducer.class);
+        } else if (Text.class.equals(job.getMapOutputValueClass())) {
+            job.setReducerClass(TextSortReducer.class);
+        } else {
+            LOG.warn("Unknown map output value type:" + 
job.getMapOutputValueClass());
+        }
+
+        conf.setStrings("io.serializations", conf.get("io.serializations"),
+                MutationSerialization.class.getName(), 
ResultSerialization.class.getName(),
+                KeyValueSerialization.class.getName());
+
+        job.setNumReduceTasks(startKeys.size());
+
+        configurePartitioner(job, startKeys);
+        // Set compression algorithms based on column families
+        configureCompression(familyNames, compression, conf);
+        configureBloomType(familyNames, bloomType, conf);
+        configureBlockSize(familyNames, blockSize, conf);
+        configureDataBlockEncoding(familyNames, dataBlockEncoding, conf);
+
+        TableMapReduceUtil.addDependencyJars(job);
+        TableMapReduceUtil.initCredentials(job);
+        LOG.info("Incremental table output configured.");
+    }
+
+    static void configureCompression(List<String> familyNames, 
Compression.Algorithm compression,
+                                     Configuration conf) throws IOException {
+        StringBuilder compressionConfigValue = new StringBuilder();
+        int i = 0;
+        for (String familyName : familyNames) {
+            if (i++ > 0) {
+                compressionConfigValue.append('&');
+            }
+            compressionConfigValue.append(URLEncoder.encode(familyName, 
"UTF-8"));
+            compressionConfigValue.append('=');
+            
compressionConfigValue.append(URLEncoder.encode(compression.getName(), 
"UTF-8"));
+        }
+        // Get rid of the last ampersand
+        conf.set(COMPRESSION_FAMILIES_CONF_KEY, 
compressionConfigValue.toString());
+    }
+
+    static void configureBloomType(List<String> familyNames, BloomType 
bloomType, Configuration conf)
+            throws IOException {
+        StringBuilder bloomTypeConfigValue = new StringBuilder();
+        int i = 0;
+        for (String familyName : familyNames) {
+            if (i++ > 0) {
+                bloomTypeConfigValue.append('&');
+            }
+            bloomTypeConfigValue.append(URLEncoder.encode(familyName, 
"UTF-8"));
+            bloomTypeConfigValue.append('=');
+            String bloomTypeStr = bloomType.toString();
+            if (bloomTypeStr == null) {
+                bloomTypeStr = HColumnDescriptor.DEFAULT_BLOOMFILTER;
+            }
+            bloomTypeConfigValue.append(URLEncoder.encode(bloomTypeStr, 
"UTF-8"));
+        }
+        conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, 
bloomTypeConfigValue.toString());
+    }
+
+    static void configureBlockSize(List<String> familyNames, int blockSize, 
Configuration conf)
+            throws IOException {
+        StringBuilder blockSizeConfigValue = new StringBuilder();
+        int i = 0;
+        for (String familyName : familyNames) {
+            if (i++ > 0) {
+                blockSizeConfigValue.append('&');
+            }
+            blockSizeConfigValue.append(URLEncoder.encode(familyName, 
"UTF-8"));
+            blockSizeConfigValue.append('=');
+            
blockSizeConfigValue.append(URLEncoder.encode(String.valueOf(blockSize), 
"UTF-8"));
+        }
+        // Get rid of the last ampersand
+        conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, 
blockSizeConfigValue.toString());
+    }
+
+    static void configureDataBlockEncoding(List<String> familyNames, 
DataBlockEncoding encoding,
+                                           Configuration conf) throws 
IOException {
+        StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
+        int i = 0;
+        for (String familyName : familyNames) {
+            if (i++ > 0) {
+                dataBlockEncodingConfigValue.append('&');
+            }
+            dataBlockEncodingConfigValue.append(URLEncoder.encode(familyName, 
"UTF-8"));
+            dataBlockEncodingConfigValue.append('=');
+            if (encoding == null) {
+                encoding = DataBlockEncoding.NONE;
+            }
+            
dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(), 
"UTF-8"));
+        }
+        conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 
dataBlockEncodingConfigValue.toString());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
new file mode 100644
index 0000000..ef76608
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core._
+
+import scala.concurrent.ExecutionContext
+
+object S2GraphHelper {
+  def initS2Graph(config: Config)(implicit ec: ExecutionContext = 
ExecutionContext.Implicits.global): S2Graph = {
+    new S2Graph(config)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala
new file mode 100644
index 0000000..51476c1
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.loader
+
+import org.apache.s2graph.core._
+import org.apache.spark.{SparkConf, SparkContext}
+
+object GraphFileGenerator {
+  def main(args: Array[String]): Unit = {
+    val options = GraphFileOptions.toOption(args)
+
+    val s2Config = Management.toConfig(options.toConfigParams)
+
+    val conf = new SparkConf()
+    conf.setAppName(this.getClass.getSimpleName)
+    val sc = new SparkContext(conf)
+
+    val input = sc.textFile(options.input)
+    options.method match {
+      case "MR" => HFileMRGenerator.generate(sc, s2Config, input, options)
+      case "SPARK" => HFileGenerator.generate(sc, s2Config, input, options)
+      case _ => throw new IllegalArgumentException("only supported type is 
MR/SPARK.")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
new file mode 100644
index 0000000..3e3ffb9
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.loader
+
+object GraphFileOptions {
+  val parser = new scopt.OptionParser[GraphFileOptions]("run") {
+
+    opt[String]('i', "input").required().action( (x, c) =>
+      c.copy(input = x) ).text("hdfs path for tsv file(bulk format)")
+
+    opt[String]('t', "tempDir").required().action( (x, c) =>
+      c.copy(tempDir = x) ).text("temp hdfs path for staging HFiles")
+
+    opt[String]('o', "output").required().action( (x, c) =>
+      c.copy(output = x) ).text("output hdfs path for storing HFiles")
+
+    opt[String]('z', "zkQuorum").required().action( (x, c) =>
+      c.copy(zkQuorum = x) ).text("zookeeper config for hbase")
+
+    opt[String]('t', "table").required().action( (x, c) =>
+      c.copy(tableName = x) ).text("table name for this bulk upload.")
+
+    opt[String]('c', "dbUrl").required().action( (x, c) =>
+      c.copy(dbUrl = x)).text("jdbc connection url.")
+
+    opt[String]('u', "dbUser").required().action( (x, c) =>
+      c.copy(dbUser = x)).text("database user name.")
+
+    opt[String]('p', "dbPassword").required().action( (x, c) =>
+      c.copy(dbPassword = x)).text("database password.")
+
+    opt[String]('r', "dbDriver").action( (x, c) =>
+      c.copy(dbDriver = x)).text("jdbc driver class.")
+
+    opt[Int]('h', "maxHFilePerRegionServer").action ( (x, c) =>
+      c.copy(maxHFilePerRegionServer = x)).text("maximum number of HFile per 
RegionServer."
+    )
+
+    opt[Int]('n', "numRegions").action ( (x, c) =>
+      c.copy(numRegions = x)).text("total numRegions(pre-split size) on table."
+    )
+
+    opt[String]('l', "labelMapping").action( (x, c) =>
+      c.copy(labelMapping = toLabelMapping(x)) ).text("mapping info to change 
the label from source (originalLabel:newLabel)")
+
+    opt[Boolean]('d', "buildDegree").action( (x, c) =>
+      c.copy(buildDegree = x)).text("generate degree values")
+
+    opt[Boolean]('a', "autoEdgeCreate").action( (x, c) =>
+      c.copy(autoEdgeCreate = x)).text("generate reverse edge automatically")
+
+    opt[Boolean]('c', "incrementalLoad").action( (x, c) =>
+      c.copy(incrementalLoad = x)).text("whether incremental bulkload which 
append data on existing table or not."
+    )
+    opt[String]('m', "method").action( (x, c) =>
+      c.copy(method = x)).text("run method. currently MR(default)/SPARK 
supported."
+    )
+  }
+
+  def toOption(args: Array[String]): GraphFileOptions = {
+    parser.parse(args, GraphFileOptions()) match {
+      case Some(o) => o
+      case None =>
+        parser.showUsage()
+        throw new IllegalArgumentException("failed to parse options...")
+    }
+  }
+
+  private def toLabelMapping(lableMapping: String): Map[String, String] = {
+    (for {
+      token <- lableMapping.split(",")
+      inner = token.split(":") if inner.length == 2
+    } yield {
+      (inner.head, inner.last)
+    }).toMap
+  }
+}
+/**
+  * Option case class for TransferToHFile.
+  * @param input
+  * @param output
+  * @param zkQuorum
+  * @param tableName
+  * @param dbUrl
+  * @param dbUser
+  * @param dbPassword
+  * @param maxHFilePerRegionServer
+  * @param numRegions
+  * @param labelMapping
+  * @param autoEdgeCreate
+  * @param buildDegree
+  * @param incrementalLoad
+  * @param compressionAlgorithm
+  */
+case class GraphFileOptions(input: String = "",
+                            tempDir: String = "",
+                            output: String = s"/tmp/bulkload",
+                            zkQuorum: String = "",
+                            tableName: String = "",
+                            dbUrl: String = "",
+                            dbUser: String = "",
+                            dbPassword: String = "",
+                            dbDriver: String = "org.h2.Driver",
+                            maxHFilePerRegionServer: Int = 1,
+                            numRegions: Int = 3,
+                            labelMapping: Map[String, String] = 
Map.empty[String, String],
+                            autoEdgeCreate: Boolean = false,
+                            buildDegree: Boolean = false,
+                            incrementalLoad: Boolean = false,
+                            compressionAlgorithm: String = "NONE",
+                            method: String = "SPARK") {
+  def toConfigParams = {
+    Map(
+      "hbase.zookeeper.quorum" -> zkQuorum,
+      "db.default.url" -> dbUrl,
+      "db.default.user" -> dbUser,
+      "db.default.password" -> dbPassword,
+      "db.default.driver" -> dbDriver
+    )
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
new file mode 100644
index 0000000..9c3de2a
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.loader
+
+import com.typesafe.config.Config
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.client.ConnectionFactory
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
+import org.apache.hadoop.hbase.regionserver.BloomType
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, 
TableName}
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
+import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
+import org.apache.s2graph.s2jobs.S2GraphHelper
+import org.apache.s2graph.s2jobs.spark._
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.hbase.async.PutRequest
+import play.api.libs.json.Json
+
+object HFileGenerator extends RawFileGenerator {
+
+  import scala.collection.JavaConverters._
+
+  private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, 
createRelEdges: Boolean = true): List[PutRequest] = {
+    val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
+
+    buildPutRequests(s2, edge.toSnapshotEdge) ++ relEdges.toList.flatMap { e =>
+      e.edgesWithIndex.flatMap { indexEdge => buildPutRequests(s2, indexEdge) }
+    }
+  }
+
+  def buildPutRequests(s2: S2Graph, snapshotEdge: SnapshotEdge): 
List[PutRequest] = {
+    val kvs = 
s2.getStorage(snapshotEdge.label).serDe.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList
+    kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, 
kv.value, kv.timestamp) }
+  }
+
+  def buildPutRequests(s2: S2Graph, indexEdge: IndexEdge): List[PutRequest] = {
+    val kvs = 
s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.toList
+    kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, 
kv.value, kv.timestamp) }
+  }
+
+  def buildDegreePutRequests(s2: S2Graph, vertexId: String, labelName: String, 
direction: String, degreeVal: Long): List[PutRequest] = {
+    val label = Label.findByName(labelName).getOrElse(throw new 
RuntimeException(s"$labelName is not found in DB."))
+    val dir = GraphUtil.directions(direction)
+    val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), 
label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse {
+      throw new RuntimeException(s"$vertexId can not be converted into 
innerval")
+    }
+    val vertex = s2.elementBuilder.newVertex(SourceVertexId(label.srcColumn, 
innerVal))
+
+    val ts = System.currentTimeMillis()
+    val propsWithTs = Map(LabelMeta.timestamp -> 
InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
+    val edge = s2.elementBuilder.newEdge(vertex, vertex, label, dir, 
propsWithTs = propsWithTs)
+
+    edge.edgesWithIndex.flatMap { indexEdge =>
+      
s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.map
 { kv =>
+        new PutRequest(kv.table, kv.row, kv.cf, Array.empty[Byte], 
Bytes.toBytes(degreeVal), kv.timestamp)
+      }
+    }
+  }
+
+  def toKeyValues(s2: S2Graph, degreeKeyVals: Seq[(DegreeKey, Long)]): 
Iterator[KeyValue] = {
+    val kvs = for {
+      (key, value) <- degreeKeyVals
+      putRequest <- buildDegreePutRequests(s2, key.vertexIdStr, key.labelName, 
key.direction, value)
+    } yield {
+      val p = putRequest
+      val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, 
p.value)
+      kv
+    }
+    kvs.toIterator
+  }
+
+  def toKeyValues(s2: S2Graph, strs: Seq[String], labelMapping: Map[String, 
String], autoEdgeCreate: Boolean): Iterator[KeyValue] = {
+    val kvList = new java.util.ArrayList[KeyValue]
+    for (s <- strs) {
+      val elementList = s2.elementBuilder.toGraphElement(s, labelMapping).toSeq
+      for (element <- elementList) {
+        if (element.isInstanceOf[S2Edge]) {
+          val edge = element.asInstanceOf[S2Edge]
+          val putRequestList = insertBulkForLoaderAsync(s2, edge, 
autoEdgeCreate)
+          for (p <- putRequestList) {
+            val kv = new KeyValue(p.key(), p.family(), p.qualifier, 
p.timestamp, p.value)
+            kvList.add(kv)
+          }
+        } else if (element.isInstanceOf[S2Vertex]) {
+          val vertex = element.asInstanceOf[S2Vertex]
+          val putRequestList = 
s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues.map { 
kv =>
+            new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, 
kv.timestamp)
+          }
+          for (p <- putRequestList) {
+            val kv = new KeyValue(p.key(), p.family(), p.qualifier, 
p.timestamp, p.value)
+            kvList.add(kv)
+          }
+        }
+      }
+    }
+    kvList.iterator().asScala
+  }
+
+
+  def getTableStartKeys(hbaseConfig: Configuration, tableName: TableName): 
Array[Array[Byte]] = {
+    val conn = ConnectionFactory.createConnection(hbaseConfig)
+    val regionLocator = conn.getRegionLocator(tableName)
+    regionLocator.getStartKeys
+  }
+
+  def toHBaseConfig(graphFileOptions: GraphFileOptions): Configuration = {
+    val hbaseConf = HBaseConfiguration.create()
+
+    hbaseConf.set("hbase.zookeeper.quorum", graphFileOptions.zkQuorum)
+    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, graphFileOptions.tableName)
+    hbaseConf.set("hadoop.tmp.dir", s"/tmp/${graphFileOptions.tableName}")
+
+    hbaseConf
+  }
+
+  def getStartKeys(numRegions: Int): Array[Array[Byte]] = {
+    val startKey = AsynchbaseStorageManagement.getStartKey(numRegions)
+    val endKey = AsynchbaseStorageManagement.getEndKey(numRegions)
+    if (numRegions < 3) {
+      throw new IllegalArgumentException("Must create at least three regions")
+    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
+      throw new IllegalArgumentException("Start key must be smaller than end 
key")
+    }
+    val empty = new Array[Byte](0)
+
+    if (numRegions == 3) {
+      Array(empty, startKey, endKey)
+    } else {
+      val splitKeys: Array[Array[Byte]] = Bytes.split(startKey, endKey, 
numRegions - 3)
+      if (splitKeys == null || splitKeys.length != numRegions - 1) {
+        throw new IllegalArgumentException("Unable to split key range into 
enough regions")
+      }
+      Array(empty) ++ splitKeys.toSeq
+    }
+  }
+
+  def transfer(sc: SparkContext,
+               s2Config: Config,
+               input: RDD[String],
+               graphFileOptions: GraphFileOptions): RDD[KeyValue] = {
+    val kvs = input.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(s2Config)
+
+      val s = toKeyValues(s2, iter.toSeq, graphFileOptions.labelMapping, 
graphFileOptions.autoEdgeCreate)
+      s
+    }
+
+    if (!graphFileOptions.buildDegree) kvs
+    else {
+      kvs ++ buildDegrees(input, graphFileOptions.labelMapping, 
graphFileOptions.autoEdgeCreate).reduceByKey { (agg, current) =>
+        agg + current
+      }.mapPartitions { iter =>
+        val s2 = S2GraphHelper.initS2Graph(s2Config)
+
+        toKeyValues(s2, iter.toSeq)
+      }
+    }
+  }
+
+  def generateHFile(sc: SparkContext,
+                    s2Config: Config,
+                    kvs: RDD[KeyValue],
+                    options: GraphFileOptions): Unit = {
+    val hbaseConfig = toHBaseConfig(options)
+    val startKeys =
+      if (options.incrementalLoad) {
+        // need hbase connection to existing table to figure out the ranges of 
regions.
+        getTableStartKeys(hbaseConfig, TableName.valueOf(options.tableName))
+      } else {
+        // otherwise we do not need to initialize Connection to hbase cluster.
+        // only numRegions determine region's pre-split.
+        getStartKeys(numRegions = options.numRegions)
+      }
+
+    val hbaseSc = new HBaseContext(sc, hbaseConfig)
+
+    def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
+      val k = new KeyFamilyQualifier(CellUtil.cloneRow(kv), 
CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv))
+      val v = CellUtil.cloneValue(kv)
+      Seq((k -> v)).toIterator
+    }
+
+    val compressionAlgorithmClass = 
Algorithm.valueOf(options.compressionAlgorithm).getName.toUpperCase
+    val familyOptions = new FamilyHFileWriteOptions(compressionAlgorithmClass,
+      BloomType.ROW.name().toUpperCase, 32768, 
DataBlockEncoding.FAST_DIFF.name().toUpperCase)
+    val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, 
"v".getBytes("UTF-8") -> familyOptions)
+
+
+    hbaseSc.bulkLoad(kvs, TableName.valueOf(options.tableName), startKeys, 
flatMap, options.output, familyOptionsMap.asJava)
+  }
+
+  override def generate(sc: SparkContext,
+                        config: Config,
+                        rdd: RDD[String],
+                        _options: GraphFileOptions): Unit = {
+    val kvs = transfer(sc, config, rdd, _options)
+    generateHFile(sc, config, kvs, _options)
+  }
+}

Reply via email to