Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 5b22148c6 -> 3332f6bc1


abstract read/write of user provided class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/a52adab0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/a52adab0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/a52adab0

Branch: refs/heads/master
Commit: a52adab0ad01c0d2e60b484cc07ca112a68e98dc
Parents: ddfd10d
Author: DO YUNG YOON <[email protected]>
Authored: Fri Mar 16 15:47:28 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Fri Mar 16 15:51:59 2018 +0900

----------------------------------------------------------------------
 .../scala/org/apache/s2graph/core/S2Graph.scala |   2 +
 .../s2graph/core/S2VertexPropertyHelper.scala   |  19 ++
 .../apache/s2graph/core/storage/SKeyValue.scala |   5 +-
 .../hbase/AsynchbaseStorageManagement.scala     |   2 +-
 .../org/apache/s2graph/s2jobs/DegreeKey.scala   |  52 ++++
 .../apache/s2graph/s2jobs/S2GraphHelper.scala   |  50 ++++
 .../s2jobs/loader/GraphFileGenerator.scala      |   8 +-
 .../s2graph/s2jobs/loader/HFileGenerator.scala  | 126 +--------
 .../s2jobs/loader/HFileMRGenerator.scala        |  34 +--
 .../loader/LocalBulkLoaderTransformer.scala     |  61 ++++
 .../s2jobs/loader/RawFileGenerator.scala        |  28 +-
 .../loader/SparkBulkLoaderTransformer.scala     |  76 +++++
 .../s2jobs/serde/GraphElementReadable.scala     |  26 ++
 .../s2jobs/serde/GraphElementWritable.scala     |  26 ++
 .../s2graph/s2jobs/serde/Transformer.scala      |  50 ++++
 .../serde/reader/TsvBulkFormatReader.scala      |  29 ++
 .../s2jobs/serde/writer/KeyValueWriter.scala    |  33 +++
 .../apache/s2graph/s2jobs/BaseSparkTest.scala   | 145 ++++++++++
 .../s2jobs/dump/GraphFileDumperTest.scala       |  97 +++++++
 .../s2jobs/loader/GraphFileGeneratorTest.scala  | 277 +++++++------------
 20 files changed, 801 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index c3d3887..09f9c7c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -176,6 +176,8 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends S2Grap
 
   override val config = _config.withFallback(S2Graph.DefaultConfig)
 
+  val storageBackend = Try { config.getString("s2graph.storage.backend") 
}.getOrElse("hbase")
+
   Model.apply(config)
   Model.loadCache()
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala
index bed69ef..bdb3c00 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.s2graph.core
 
 import org.apache.s2graph.core.mysqls.ColumnMeta

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
index 57adc8a..20ca5e6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
@@ -25,8 +25,9 @@ import org.hbase.async.KeyValue
 
 
 object SKeyValue {
-  val EdgeCf = "e".getBytes("UTF-8")
-  val VertexCf = "v".getBytes("UTF-8")
+  val SnapshotEdgeCf = "s".getBytes(StandardCharsets.UTF_8)
+  val EdgeCf = "e".getBytes(StandardCharsets.UTF_8)
+  val VertexCf = "v".getBytes(StandardCharsets.UTF_8)
   val Put = 1
   val Delete = 2
   val Increment = 3

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala
index 8475ba6..f504c75 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala
@@ -250,7 +250,7 @@ class AsynchbaseStorageManagement(val config: Config, val 
clients: Seq[HBaseClie
     conn.getAdmin
   }
 
-  private def withAdmin(config: Config)(op: Admin => Unit): Unit = {
+  def withAdmin(config: Config)(op: Admin => Unit): Unit = {
     val admin = getAdmin(config)
     try {
       op(admin)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala
new file mode 100644
index 0000000..f1efac3
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs
+
+import org.apache.s2graph.core.{GraphElement, S2Edge, S2Graph, S2Vertex}
+import org.apache.s2graph.core.storage.SKeyValue
+import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
+
+
+object DegreeKey {
+  def fromGraphElement(s2: S2Graph,
+                       element: GraphElement,
+                       labelMapping: Map[String, String] = Map.empty): 
Option[DegreeKey] = {
+    element match {
+      case v: S2Vertex => None
+      case e: S2Edge =>
+        val newLabel = labelMapping.getOrElse(e.innerLabel.label, 
e.innerLabel.label)
+        val degreeKey = DegreeKey(e.srcVertex.innerIdVal.toString, newLabel, 
e.getDirection())
+        Option(degreeKey)
+      case _ => None
+    }
+  }
+
+  def toSKeyValue(s2: S2Graph,
+                  degreeKey: DegreeKey,
+                  count: Long): Seq[SKeyValue] = {
+    S2GraphHelper.buildDegreePutRequests(s2, degreeKey.vertexIdStr, 
degreeKey.labelName, degreeKey.direction, count)
+  }
+
+  def toKeyValue(s2: S2Graph, degreeKey: DegreeKey, count: Long): 
Seq[HKeyValue] = {
+    toSKeyValue(s2, degreeKey, count).map(skv => new HKeyValue(skv.row, 
skv.cf, skv.qualifier, skv.timestamp, skv.value))
+  }
+}
+
+case class DegreeKey(vertexIdStr: String, labelName: String, direction: String)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index ef76608..3f80e8f 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -21,6 +21,10 @@ package org.apache.s2graph.s2jobs
 
 import com.typesafe.config.Config
 import org.apache.s2graph.core._
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.storage.SKeyValue
+import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
+import play.api.libs.json.Json
 
 import scala.concurrent.ExecutionContext
 
@@ -28,4 +32,50 @@ object S2GraphHelper {
   def initS2Graph(config: Config)(implicit ec: ExecutionContext = 
ExecutionContext.Implicits.global): S2Graph = {
     new S2Graph(config)
   }
+
+  def buildDegreePutRequests(s2: S2Graph,
+                             vertexId: String,
+                             labelName: String,
+                             direction: String,
+                             degreeVal: Long): Seq[SKeyValue] = {
+    val label = Label.findByName(labelName).getOrElse(throw new 
RuntimeException(s"$labelName is not found in DB."))
+    val dir = GraphUtil.directions(direction)
+    val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), 
label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse {
+      throw new RuntimeException(s"$vertexId can not be converted into 
innerval")
+    }
+    val vertex = s2.elementBuilder.newVertex(SourceVertexId(label.srcColumn, 
innerVal))
+
+    val ts = System.currentTimeMillis()
+    val propsWithTs = Map(LabelMeta.timestamp -> 
InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
+    val edge = s2.elementBuilder.newEdge(vertex, vertex, label, dir, 
propsWithTs = propsWithTs)
+
+    edge.edgesWithIndex.flatMap { indexEdge =>
+      
s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues
+    }
+  }
+
+  private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, 
createRelEdges: Boolean = true): Seq[SKeyValue] = {
+    val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
+
+    val snapshotEdgeKeyValues = 
s2.getStorage(edge.toSnapshotEdge.label).serDe.snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues
+    val indexEdgeKeyValues = relEdges.flatMap { edge =>
+      edge.edgesWithIndex.flatMap { indexEdge =>
+        
s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues
+      }
+    }
+
+    snapshotEdgeKeyValues ++ indexEdgeKeyValues
+  }
+
+  def toSKeyValues(s2: S2Graph, element: GraphElement, autoEdgeCreate: Boolean 
= false): Seq[SKeyValue] = {
+    if (element.isInstanceOf[S2Edge]) {
+      val edge = element.asInstanceOf[S2Edge]
+      insertBulkForLoaderAsync(s2, edge, autoEdgeCreate)
+    } else if (element.isInstanceOf[S2Vertex]) {
+      val vertex = element.asInstanceOf[S2Vertex]
+      s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues
+    } else {
+      Nil
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala
index 79eca36..5f1b940 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,9 +32,11 @@ object GraphFileGenerator {
     conf.setAppName(this.getClass.getSimpleName)
     val sc = new SparkContext(conf)
 
+
     val input = sc.textFile(options.input)
+
     options.method match {
-      case "MR" => HFileMRGenerator.generate(sc, s2Config, input, options)
+      //      case "MR" => HFileMRGenerator.generate(sc, s2Config, input, 
options)
       case "SPARK" => HFileGenerator.generate(sc, s2Config, input, options)
       case _ => throw new IllegalArgumentException("only supported type is 
MR/SPARK.")
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index acd3886..b4ac51f 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,102 +28,19 @@ import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
 import org.apache.hadoop.hbase.regionserver.BloomType
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, 
TableName}
-import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
-import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
-import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.s2jobs.S2GraphHelper
-import org.apache.s2graph.s2jobs.spark._
+import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, 
KeyFamilyQualifier}
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
-import org.hbase.async.PutRequest
-import play.api.libs.json.Json
 
-object HFileGenerator extends RawFileGenerator {
+object HFileGenerator extends RawFileGenerator[String, KeyValue] {
 
   import scala.collection.JavaConverters._
 
-  private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, 
createRelEdges: Boolean = true): List[PutRequest] = {
-    val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
-
-    buildPutRequests(s2, edge.toSnapshotEdge) ++ relEdges.toList.flatMap { e =>
-      e.edgesWithIndex.flatMap { indexEdge => buildPutRequests(s2, indexEdge) }
-    }
-  }
-
-  def buildPutRequests(s2: S2Graph, snapshotEdge: SnapshotEdge): 
List[PutRequest] = {
-    val kvs = 
s2.getStorage(snapshotEdge.label).serDe.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList
-    kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, 
kv.value, kv.timestamp) }
-  }
-
-  def buildPutRequests(s2: S2Graph, indexEdge: IndexEdge): List[PutRequest] = {
-    val kvs = 
s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.toList
-    kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, 
kv.value, kv.timestamp) }
-  }
-
-  def buildDegreePutRequests(s2: S2Graph, vertexId: String, labelName: String, 
direction: String, degreeVal: Long): List[PutRequest] = {
-    val label = Label.findByName(labelName).getOrElse(throw new 
RuntimeException(s"$labelName is not found in DB."))
-    val dir = GraphUtil.directions(direction)
-    val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), 
label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse {
-      throw new RuntimeException(s"$vertexId can not be converted into 
innerval")
-    }
-    val vertex = s2.elementBuilder.newVertex(SourceVertexId(label.srcColumn, 
innerVal))
-
-    val ts = System.currentTimeMillis()
-    val propsWithTs = Map(LabelMeta.timestamp -> 
InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
-    val edge = s2.elementBuilder.newEdge(vertex, vertex, label, dir, 
propsWithTs = propsWithTs)
-
-    edge.edgesWithIndex.flatMap { indexEdge =>
-      
s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.map
 { kv =>
-        new PutRequest(kv.table, kv.row, kv.cf, Array.empty[Byte], 
Bytes.toBytes(degreeVal), kv.timestamp)
-      }
-    }
-  }
-
-  def toKeyValues(s2: S2Graph, degreeKeyVals: Seq[(DegreeKey, Long)]): 
Iterator[KeyValue] = {
-    val kvs = for {
-      (key, value) <- degreeKeyVals
-      putRequest <- buildDegreePutRequests(s2, key.vertexIdStr, key.labelName, 
key.direction, value)
-    } yield {
-      val p = putRequest
-      val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, 
p.value)
-      kv
-    }
-    kvs.toIterator
-  }
-
-  def toKeyValues(s2: S2Graph, strs: Seq[String], labelMapping: Map[String, 
String], autoEdgeCreate: Boolean): Iterator[KeyValue] = {
-    val kvList = new java.util.ArrayList[KeyValue]
-    for (s <- strs) {
-      val elementList = s2.elementBuilder.toGraphElement(s, labelMapping).toSeq
-      for (element <- elementList) {
-        if (element.isInstanceOf[S2Edge]) {
-          val edge = element.asInstanceOf[S2Edge]
-          val putRequestList = insertBulkForLoaderAsync(s2, edge, 
autoEdgeCreate)
-          for (p <- putRequestList) {
-            val kv = new KeyValue(p.key(), p.family(), p.qualifier, 
p.timestamp, p.value)
-            kvList.add(kv)
-          }
-        } else if (element.isInstanceOf[S2Vertex]) {
-          val vertex = element.asInstanceOf[S2Vertex]
-          val putRequestList = 
s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues.map { 
kv =>
-            new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, 
kv.timestamp)
-          }
-          for (p <- putRequestList) {
-            val kv = new KeyValue(p.key(), p.family(), p.qualifier, 
p.timestamp, p.value)
-            kvList.add(kv)
-          }
-        }
-      }
-    }
-    kvList.iterator().asScala
-  }
-
-
   def getTableStartKeys(hbaseConfig: Configuration, tableName: TableName): 
Array[Array[Byte]] = {
     val conn = ConnectionFactory.createConnection(hbaseConfig)
     val regionLocator = conn.getRegionLocator(tableName)
+
     regionLocator.getStartKeys
   }
 
@@ -132,7 +49,6 @@ object HFileGenerator extends RawFileGenerator {
 
     hbaseConf.set("hbase.zookeeper.quorum", graphFileOptions.zkQuorum)
     hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, graphFileOptions.tableName)
-//    hbaseConf.set("hadoop.tmp.dir", s"/tmp/${graphFileOptions.tableName}")
 
     hbaseConf
   }
@@ -158,34 +74,12 @@ object HFileGenerator extends RawFileGenerator {
     }
   }
 
-  def transfer(sc: SparkContext,
-               s2Config: Config,
-               input: RDD[String],
-               graphFileOptions: GraphFileOptions): RDD[KeyValue] = {
-    val kvs = input.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(s2Config)
-
-      val s = toKeyValues(s2, iter.toSeq, graphFileOptions.labelMapping, 
graphFileOptions.autoEdgeCreate)
-      s
-    }
-
-    if (!graphFileOptions.buildDegree) kvs
-    else {
-      kvs ++ buildDegrees(input, graphFileOptions.labelMapping, 
graphFileOptions.autoEdgeCreate).reduceByKey { (agg, current) =>
-        agg + current
-      }.mapPartitions { iter =>
-        val s2 = S2GraphHelper.initS2Graph(s2Config)
-
-        toKeyValues(s2, iter.toSeq)
-      }
-    }
-  }
-
   def generateHFile(sc: SparkContext,
                     s2Config: Config,
                     kvs: RDD[KeyValue],
                     options: GraphFileOptions): Unit = {
     val hbaseConfig = toHBaseConfig(options)
+
     val startKeys =
       if (options.incrementalLoad) {
         // need hbase connection to existing table to figure out the ranges of 
regions.
@@ -207,6 +101,7 @@ object HFileGenerator extends RawFileGenerator {
     val compressionAlgorithmClass = 
Algorithm.valueOf(options.compressionAlgorithm).getName.toUpperCase
     val familyOptions = new FamilyHFileWriteOptions(compressionAlgorithmClass,
       BloomType.ROW.name().toUpperCase, 32768, 
DataBlockEncoding.FAST_DIFF.name().toUpperCase)
+
     val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, 
"v".getBytes("UTF-8") -> familyOptions)
 
 
@@ -217,7 +112,10 @@ object HFileGenerator extends RawFileGenerator {
                         config: Config,
                         rdd: RDD[String],
                         _options: GraphFileOptions): Unit = {
-    val kvs = transfer(sc, config, rdd, _options)
-    generateHFile(sc, config, kvs, _options)
+    val transformer = new SparkBulkLoaderTransformer(config, _options)
+    val kvs = transformer.transform(rdd).flatMap(kvs => kvs)
+
+    HFileGenerator.generateHFile(sc, config, kvs, _options)
   }
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
index ee4c338..3502bee 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,13 +34,10 @@ import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, 
SequenceFileInputFormat}
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, 
SequenceFileOutputFormat}
 import org.apache.hadoop.mapreduce.{Job, Mapper}
-import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
-import org.apache.s2graph.core.mysqls.Label
-import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 
-object HFileMRGenerator extends RawFileGenerator {
+object HFileMRGenerator extends RawFileGenerator[String, KeyValue] {
   val DefaultBlockSize = 32768
   val DefaultConfig = Map(
     "yarn.app.mapreduce.am.resource.mb" -> 4096,
@@ -69,25 +66,7 @@ object HFileMRGenerator extends RawFileGenerator {
   }
 
   def getStartKeys(numRegions: Int): Seq[ImmutableBytesWritable] = {
-    val startKey = AsynchbaseStorageManagement.getStartKey(numRegions)
-    val endKey = AsynchbaseStorageManagement.getEndKey(numRegions)
-    if (numRegions < 3) {
-      throw new IllegalArgumentException("Must create at least three regions")
-    }
-    else if (Bytes.compareTo(startKey, endKey) >= 0) {
-      throw new IllegalArgumentException("Start key must be smaller than end 
key")
-    }
-    val empty = new Array[Byte](0)
-    val results = if (numRegions == 3) {
-      Seq(empty, startKey, endKey)
-    } else {
-      val splitKeys: Array[Array[Byte]] = Bytes.split(startKey, endKey, 
numRegions - 3)
-      if (splitKeys == null || splitKeys.length != numRegions - 1) {
-        throw new IllegalArgumentException("Unable to split key range into 
enough regions")
-      }
-      Seq(empty) ++ splitKeys.toSeq
-    }
-    results.map(new ImmutableBytesWritable(_))
+    HFileGenerator.getStartKeys(numRegions).map(new ImmutableBytesWritable(_))
   }
 
   def sortKeyValues(hbaseConf: Configuration,
@@ -124,8 +103,9 @@ object HFileMRGenerator extends RawFileGenerator {
   def transfer(sc: SparkContext,
                s2Config: Config,
                input: RDD[String],
-               graphFileOptions: GraphFileOptions): RDD[KeyValue] = {
-    HFileGenerator.transfer(sc, s2Config, input, graphFileOptions)
+               options: GraphFileOptions): RDD[KeyValue] = {
+    val transformer = new SparkBulkLoaderTransformer(s2Config, options)
+    transformer.transform(input).flatMap(kvs => kvs)
   }
 
   override def generate(sc: SparkContext,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
new file mode 100644
index 0000000..7d405a6
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.loader
+
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.KeyValue
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.serde.Transformer
+import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+
+import scala.concurrent.ExecutionContext
+
+class LocalBulkLoaderTransformer(val config: Config,
+                                 val options: GraphFileOptions)(implicit ec: 
ExecutionContext) extends Transformer[String, Seq[KeyValue], Seq] {
+  val s2: S2Graph = S2GraphHelper.initS2Graph(config)
+
+  override val reader = new TsvBulkFormatReader
+  override val writer = new KeyValueWriter
+
+  override def read(input: Seq[String]): Seq[GraphElement] = 
input.flatMap(reader.read(s2)(_))
+
+  override def write(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = 
elements.map(writer.write(s2)(_))
+
+  override def buildDegrees(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = 
{
+    val degrees = elements.flatMap { element =>
+      DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 
1L)
+    }.groupBy(_._1).mapValues(_.map(_._2).sum)
+
+    degrees.toSeq.map { case (degreeKey, count) =>
+      DegreeKey.toKeyValue(s2, degreeKey, count)
+    }
+  }
+
+  override def transform(input: Seq[String]): Seq[Seq[KeyValue]] = {
+    val elements = read(input)
+    val kvs = write(elements)
+
+    val degrees = if (options.buildDegree) buildDegrees(elements) else Nil
+
+    kvs ++ degrees
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala
index 1613f20..ef1aaf6 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala
@@ -20,32 +20,12 @@
 package org.apache.s2graph.s2jobs.loader
 
 import com.typesafe.config.Config
-import org.apache.s2graph.core.GraphUtil
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 
-case class DegreeKey(vertexIdStr: String, labelName: String, direction: String)
-
-trait RawFileGenerator {
+trait RawFileGenerator[S, T] {
   def generate(sc: SparkContext,
-               config:Config,
-               rdd: RDD[String],
-               _options:GraphFileOptions)
-
-  def buildDegrees(msgs: RDD[String], labelMapping: Map[String, String], 
edgeAutoCreate: Boolean) = {
-    for {
-      msg <- msgs
-      tokens = GraphUtil.split(msg)
-      if tokens(2) == "e" || tokens(2) == "edge"
-      tempDirection = if (tokens.length == 7) "out" else tokens(7)
-      direction = if (tempDirection != "out" && tempDirection != "in") "out" 
else tempDirection
-      reverseDirection = if (direction == "out") "in" else "out"
-      convertedLabelName = labelMapping.get(tokens(5)).getOrElse(tokens(5))
-      (vertexIdStr, vertexIdStrReversed) = (tokens(3), tokens(4))
-      degreeKey = DegreeKey(vertexIdStr, convertedLabelName, direction)
-      degreeKeyReversed = DegreeKey(vertexIdStrReversed, convertedLabelName, 
reverseDirection)
-      extra = if (edgeAutoCreate) List(degreeKeyReversed -> 1L) else Nil
-      output <- List(degreeKey -> 1L) ++ extra
-    } yield output
-  }
+               config: Config,
+               rdd: RDD[S],
+               _options: GraphFileOptions): Unit
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
new file mode 100644
index 0000000..cd991e1
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.loader
+
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
+import org.apache.s2graph.core.GraphElement
+import org.apache.s2graph.s2jobs.serde.Transformer
+import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+import org.apache.spark.rdd.RDD
+
+class SparkBulkLoaderTransformer(val config: Config,
+                                 val options: GraphFileOptions) extends 
Transformer[String, Seq[HKeyValue], org.apache.spark.rdd.RDD] {
+  val reader = new TsvBulkFormatReader
+
+  val writer = new KeyValueWriter
+
+  override def read(input: RDD[String]): RDD[GraphElement] = 
input.mapPartitions { iter =>
+    val s2 = S2GraphHelper.initS2Graph(config)
+
+    iter.flatMap { line =>
+      reader.read(s2)(line)
+    }
+  }
+
+  override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = 
elements.mapPartitions { iter =>
+    val s2 = S2GraphHelper.initS2Graph(config)
+
+    iter.map(writer.write(s2)(_))
+  }
+
+  override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] 
= {
+    val degrees = elements.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.flatMap { element =>
+        DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 
1L)
+      }
+    }.reduceByKey(_ + _)
+
+    degrees.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.map { case (degreeKey, count) =>
+        DegreeKey.toKeyValue(s2, degreeKey, count)
+      }
+    }
+  }
+
+  override def transform(input: RDD[String]): RDD[Seq[HKeyValue]] = {
+    val elements = read(input)
+    val kvs = write(elements)
+
+    if (options.buildDegree) kvs ++ buildDegrees(elements)
+    kvs
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala
new file mode 100644
index 0000000..0544a84
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.serde
+
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+
+trait GraphElementReadable[S] extends Serializable {
+  def read(graph: S2Graph)(data: S): Option[GraphElement]
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
new file mode 100644
index 0000000..ae082d8
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.serde
+
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+
+trait GraphElementWritable[T] extends Serializable {
+  def write(s2: S2Graph)(element: GraphElement): T
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
new file mode 100644
index 0000000..3902c63
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.serde
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.GraphElement
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+
+/**
+  * Define serialize/deserialize.
+  * Source -> GraphElement
+  * GraphElement -> Target
+  *
+  * @tparam S : Source class. ex) String, RDF.Statement, ...
+  * @tparam T : Target class. ex) KeyValue, Array[Byte], String, ...
+  * @tparam M : Container type. ex) RDD, Seq, List, ...
+  */
+trait Transformer[S, T, M[_]] extends Serializable {
+  val config: Config
+  val options: GraphFileOptions
+
+  val reader: GraphElementReadable[S]
+
+  val writer: GraphElementWritable[T]
+
+  def read(input: M[S]): M[GraphElement]
+
+  def write(elements: M[GraphElement]): M[T]
+
+  def buildDegrees(elements: M[GraphElement]): M[T]
+
+  def transform(input: M[S]): M[T]
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala
new file mode 100644
index 0000000..5465517
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.serde.reader
+
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.serde.GraphElementReadable
+
+class TsvBulkFormatReader extends GraphElementReadable[String] {
+  override def read(graph: S2Graph)(data: String): Option[GraphElement] = {
+    graph.elementBuilder.toGraphElement(data)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
new file mode 100644
index 0000000..02034af
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.serde.writer
+
+import org.apache.hadoop.hbase.KeyValue
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.S2GraphHelper
+import org.apache.s2graph.s2jobs.serde.GraphElementWritable
+
+class KeyValueWriter(autoEdgeCreate: Boolean = false) extends 
GraphElementWritable[Seq[KeyValue]] {
+  override def write(s2: S2Graph)(element: GraphElement): Seq[KeyValue] = {
+    S2GraphHelper.toSKeyValues(s2, element, autoEdgeCreate).map { skv =>
+      new KeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, skv.value)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
new file mode 100644
index 0000000..78000d4
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs
+
+import java.io.{File, PrintWriter}
+
+import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
+import org.apache.s2graph.core.{Management, S2Graph}
+import org.apache.s2graph.core.types.HBaseType
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+import org.apache.spark.{SparkConf, SparkContext}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+import scala.util.Try
+
+class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll {
+  private val master = "local[2]"
+  private val appName = "example-spark"
+
+  protected var sc: SparkContext = _
+  protected val options = GraphFileOptions(
+    input = "/tmp/test.txt",
+    tempDir = "/tmp/bulkload_tmp",
+    output = "/tmp/s2graph_bulkload",
+    zkQuorum = "localhost",
+    dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL",
+    dbUser = "sa",
+    dbPassword = "sa",
+    dbDriver = "org.h2.Driver",
+    tableName = "s2graph",
+    maxHFilePerRegionServer = 1,
+    numRegions = 3,
+    compressionAlgorithm = "NONE",
+    buildDegree = false,
+    autoEdgeCreate = false)
+
+  protected val s2Config = Management.toConfig(options.toConfigParams)
+
+  protected val tableName = options.tableName
+  protected val schemaVersion = HBaseType.DEFAULT_VERSION
+  protected val compressionAlgorithm: String = options.compressionAlgorithm
+  protected var s2: S2Graph = _
+
+  private val testLines = Seq(
+    
"20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
+  )
+
+  override def beforeAll(): Unit = {
+    // initialize spark context.
+    val conf = new SparkConf()
+      .setMaster(master)
+      .setAppName(appName)
+
+    sc = new SparkContext(conf)
+
+    s2 = S2GraphHelper.initS2Graph(s2Config)
+    initTestDataFile
+  }
+
+  override def afterAll(): Unit = {
+    if (sc != null) sc.stop()
+    if (s2 != null) s2.shutdown()
+  }
+
+  def initTestDataFile: Unit = {
+    deleteRecursively(new File(options.input))
+    writeToFile(options.input)(testLines)
+  }
+
+  def initTestEdgeSchema(s2: S2Graph, tableName: String,
+                         schemaVersion: String = HBaseType.DEFAULT_VERSION,
+                         compressionAlgorithm: String = "none"): Label = {
+    import scala.collection.JavaConverters._
+    /* initialize model for test */
+    val management = s2.management
+
+    val service = management.createService(serviceName = "s2graph", cluster = 
"localhost",
+      hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, 
compressionAlgorithm = "gz")
+
+    val serviceColumn = management.createServiceColumn(service.serviceName, 
"user", "string", Nil)
+
+    Try {
+      management.createLabel("friends", serviceColumn, serviceColumn, 
isDirected = true,
+        serviceName = service.serviceName, indices = new 
java.util.ArrayList[Index],
+        props = Seq(Prop("since", "0", "long"), Prop("score", "0", 
"integer")).asJava, consistencyLevel = "strong", hTableName = tableName,
+        hTableTTL = -1, schemaVersion = schemaVersion, compressionAlgorithm = 
compressionAlgorithm, options = "")
+    }
+
+    Label.findByName("friends").getOrElse(throw new 
IllegalArgumentException("friends label is not initialized."))
+  }
+
+  def initTestVertexSchema(s2: S2Graph): ServiceColumn = {
+    import scala.collection.JavaConverters._
+    /* initialize model for test */
+    val management = s2.management
+
+    val service = management.createService(serviceName = "device_profile", 
cluster = "localhost",
+      hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, 
compressionAlgorithm = "gz")
+
+    management.createServiceColumn(service.serviceName, "imei", "string",
+      Seq(
+        Prop(name = "first_time", defaultValue = "''", dataType = "string"),
+        Prop(name = "last_time", defaultValue = "''", dataType = "string"),
+        Prop(name = "total_active_days", defaultValue = "-1", dataType = 
"integer"),
+        Prop(name = "query_amount", defaultValue = "-1", dataType = "integer"),
+        Prop(name = "active_months", defaultValue = "-1", dataType = 
"integer"),
+        Prop(name = "fua", defaultValue = "''", dataType = "string"),
+        Prop(name = "location_often_province", defaultValue = "''", dataType = 
"string"),
+        Prop(name = "location_often_city", defaultValue = "''", dataType = 
"string"),
+        Prop(name = "location_often_days", defaultValue = "-1", dataType = 
"integer"),
+        Prop(name = "location_last_province", defaultValue = "''", dataType = 
"string"),
+        Prop(name = "location_last_city", defaultValue = "''", dataType = 
"string"),
+        Prop(name = "fimei_legality", defaultValue = "-1", dataType = 
"integer")
+      ))
+  }
+
+  def writeToFile(fileName: String)(lines: Seq[String]): Unit = {
+    val writer = new PrintWriter(fileName)
+    lines.foreach(line => writer.write(line + "\n"))
+    writer.close
+  }
+
+  def deleteRecursively(file: File): Unit = {
+    if (file.isDirectory) file.listFiles.foreach(deleteRecursively)
+    if (file.exists && !file.delete) throw new Exception(s"Unable to delete 
${file.getAbsolutePath}")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala
new file mode 100644
index 0000000..81566f9
--- /dev/null
+++ 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//package org.apache.s2graph.s2jobs.dump
+//
+//import org.apache.s2graph.core._
+//import org.apache.s2graph.core.types.HBaseType
+//import org.apache.s2graph.s2jobs.S2GraphHelper
+//import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+//import org.apache.spark.{SparkConf, SparkContext}
+//import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+//import play.api.libs.json.Json
+//
+//class GraphFileDumperTest extends FunSuite with Matchers with 
BeforeAndAfterAll {
+//  private val master = "local[2]"
+//  private val appName = "example-spark"
+//
+//  private var sc: SparkContext = _
+//  val options = GraphFileOptions(
+//    input = "/tmp/imei-20.txt",
+//    tempDir = "/tmp/bulkload_tmp",
+//    output = "/tmp/s2graph_bulkload",
+//    zkQuorum = "localhost",
+//    dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL",
+//    dbUser = "sa",
+//    dbPassword = "sa",
+//    dbDriver = "org.h2.Driver",
+//    tableName = "s2graph",
+//    maxHFilePerRegionServer = 1,
+//    numRegions = 3,
+//    compressionAlgorithm = "NONE",
+//    buildDegree = false,
+//    autoEdgeCreate = false)
+//
+//  val s2Config = Management.toConfig(options.toConfigParams)
+//
+//  val tableName = options.tableName
+//  val schemaVersion = HBaseType.DEFAULT_VERSION
+//  val compressionAlgorithm: String = options.compressionAlgorithm
+//  var s2: S2Graph = _
+//
+//  override def beforeAll(): Unit = {
+//    // initialize spark context.
+//    val conf = new SparkConf()
+//      .setMaster(master)
+//      .setAppName(appName)
+//
+//    sc = new SparkContext(conf)
+//
+//    s2 = S2GraphHelper.initS2Graph(s2Config)
+//  }
+//
+//  override def afterAll(): Unit = {
+//    if (sc != null) sc.stop()
+//    if (s2 != null) s2.shutdown()
+//  }
+//
+//  test("test dump.") {
+//    implicit val graph = s2
+//    val snapshotPath = "/usr/local/var/hbase"
+//    val restorePath = "/tmp/hbase_restore"
+//    val snapshotTableNames = Seq("s2graph-snapshot")
+//
+//    val cellLs = HFileDumper.toKeyValue(sc, snapshotPath, restorePath,
+//      snapshotTableNames, columnFamily = "v")
+//
+//    val kvsLs = cellLs.map(CanGraphElement.cellsToSKeyValues).collect()
+//
+//    val elements = kvsLs.flatMap { kvs =>
+//      CanGraphElement.sKeyValueToGraphElement(s2)(kvs)
+//    }
+//
+//    elements.foreach { element =>
+//      val v = element.asInstanceOf[S2VertexLike]
+//      val json = Json.prettyPrint(PostProcess.s2VertexToJson(v).get)
+//
+//      println(json)
+//    }
+//
+//  }
+//}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
index 32c5a25..3fbbd88 100644
--- 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
+++ 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
@@ -19,145 +19,75 @@
 
 package org.apache.s2graph.s2jobs.loader
 
-import java.io.{File, PrintWriter}
-import java.util
-
 import org.apache.hadoop.hbase.HBaseConfiguration
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
 import org.apache.hadoop.util.ToolRunner
-import org.apache.s2graph.core.{Management, PostProcess, S2Graph, S2VertexLike}
-import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
-import org.apache.s2graph.core.storage.CanSKeyValue
-import org.apache.s2graph.core.types.HBaseType
-import org.apache.s2graph.s2jobs.S2GraphHelper
-import org.apache.spark.{SparkConf, SparkContext}
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+import org.apache.s2graph.core.{PostProcess, S2VertexLike}
+import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue}
+import org.apache.s2graph.s2jobs.BaseSparkTest
 import play.api.libs.json.Json
 
 import scala.io.Source
-import scala.util.Try
-
-object GraphFileGeneratorTest {
-  def initTestEdgeSchema(s2: S2Graph, tableName: String,
-                         schemaVersion: String = HBaseType.DEFAULT_VERSION,
-                         compressionAlgorithm: String = "none"): Label = {
-    import scala.collection.JavaConverters._
-    /* initialize model for test */
-    val management = s2.management
-
-    val service = management.createService(serviceName = "s2graph", cluster = 
"localhost",
-      hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, 
compressionAlgorithm = "gz")
-
-    val serviceColumn = management.createServiceColumn(service.serviceName, 
"user", "string", Nil)
-
-    Try {
-      management.createLabel("friends", serviceColumn, serviceColumn, 
isDirected = true,
-        serviceName = service.serviceName, indices = new 
java.util.ArrayList[Index],
-        props = Seq(Prop("since", "0", "long"), Prop("score", "0", 
"integer")).asJava, consistencyLevel = "strong", hTableName = tableName,
-        hTableTTL = -1, schemaVersion = schemaVersion, compressionAlgorithm = 
compressionAlgorithm, options = "")
-    }
 
-    Label.findByName("friends").getOrElse(throw new 
IllegalArgumentException("friends label is not initialized."))
+class GraphFileGeneratorTest extends BaseSparkTest {
+  import scala.concurrent.ExecutionContext.Implicits.global
+  import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
+
+  def transformToSKeyValues(transformerMode: String, edges: Seq[String]): 
List[SKeyValue] = {
+    transformerMode match {
+      case "spark" =>
+        val input = sc.parallelize(edges)
+        val transformer = new SparkBulkLoaderTransformer(s2Config, options)
+        val kvs = transformer.transform(input)
+        kvs.flatMap { kvs =>
+          kvs.map { kv =>
+            CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)
+          }
+        }.collect().toList
+
+      case "local" =>
+        val input = edges
+        val transformer = new LocalBulkLoaderTransformer(s2Config, options)
+        val kvs = transformer.transform(input)
+        kvs.flatMap { kvs =>
+          kvs.map { kv =>
+            CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)
+          }
+        }.toList
+    }
   }
+  test("test generateKeyValues edge only. SparkBulkLoaderTransformer") {
+    val label = initTestEdgeSchema(s2, tableName, schemaVersion, 
compressionAlgorithm)
+    /* end of initialize model */
 
-  def initTestVertexSchema(s2: S2Graph): ServiceColumn = {
-    import scala.collection.JavaConverters._
-    /* initialize model for test */
-    val management = s2.management
-
-    val service = management.createService(serviceName = "device_profile", 
cluster = "localhost",
-      hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, 
compressionAlgorithm = "gz")
-
-    management.createServiceColumn(service.serviceName, "imei", "string",
-      Seq(
-        Prop(name = "first_time", defaultValue = "''", dataType = "string"),
-        Prop(name = "last_time", defaultValue = "''", dataType = "string"),
-        Prop(name = "total_active_days", defaultValue = "-1", dataType = 
"integer"),
-        Prop(name = "query_amount", defaultValue = "-1", dataType = "integer"),
-        Prop(name = "active_months", defaultValue = "-1", dataType = 
"integer"),
-        Prop(name = "fua", defaultValue = "''", dataType = "string"),
-        Prop(name = "location_often_province", defaultValue = "''", dataType = 
"string"),
-        Prop(name = "location_often_city", defaultValue = "''", dataType = 
"string"),
-        Prop(name = "location_often_days", defaultValue = "-1", dataType = 
"integer"),
-        Prop(name = "location_last_province", defaultValue = "''", dataType = 
"string"),
-        Prop(name = "location_last_city", defaultValue = "''", dataType = 
"string"),
-        Prop(name = "fimei_legality", defaultValue = "-1", dataType = 
"integer")
-      ))
-  }
+    val bulkEdgeString = 
"1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
 
-  def writeToFile(fileName: String)(lines: Seq[String]): Unit = {
-    val writer = new PrintWriter(fileName)
-    lines.foreach(line => writer.write(line + "\n"))
-    writer.close
-  }
+    val transformerMode = "spark"
+    val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString))
 
-  def deleteRecursively(file: File): Unit = {
-    if (file.isDirectory) file.listFiles.foreach(deleteRecursively)
-    if (file.exists && !file.delete) throw new Exception(s"Unable to delete 
${file.getAbsolutePath}")
-  }
-}
+    val serDe = s2.defaultStorage.serDe
 
-class GraphFileGeneratorTest extends FunSuite with Matchers with 
BeforeAndAfterAll {
-  import GraphFileGeneratorTest._
-  import scala.collection.JavaConverters._
-
-  private val master = "local[2]"
-  private val appName = "example-spark"
-
-  private var sc: SparkContext = _
-  val options = GraphFileOptions(
-    input = "/tmp/imei-20.txt",
-    tempDir = "/tmp/bulkload_tmp",
-    output = "/tmp/s2graph_bulkload",
-    zkQuorum = "localhost",
-    dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL",
-    dbUser = "sa",
-    dbPassword = "sa",
-    dbDriver = "org.h2.Driver",
-    tableName = "s2graph",
-    maxHFilePerRegionServer = 1,
-    numRegions = 3,
-    compressionAlgorithm = "NONE",
-    buildDegree = false,
-    autoEdgeCreate = false)
-
-  val s2Config = Management.toConfig(options.toConfigParams)
-
-  val tableName = options.tableName
-  val schemaVersion = HBaseType.DEFAULT_VERSION
-  val compressionAlgorithm: String = options.compressionAlgorithm
-  var s2: S2Graph = _
-
-  override def beforeAll(): Unit = {
-    // initialize spark context.
-    val conf = new SparkConf()
-      .setMaster(master)
-      .setAppName(appName)
-
-    sc = new SparkContext(conf)
-
-    s2 = S2GraphHelper.initS2Graph(s2Config)
-  }
+    val bulkEdge = s2.elementBuilder.toGraphElement(bulkEdgeString, 
options.labelMapping).get
 
-  override def afterAll(): Unit = {
-    if (sc != null) sc.stop()
-    if (s2 != null) s2.shutdown()
-  }
+    val indexEdges = ls.flatMap { kv =>
+      serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), 
None)
+    }
+
+    val indexEdge = indexEdges.head
 
-  test("test generateKeyValues edge only.") {
-    import scala.collection.JavaConverters._
-    import org.apache.s2graph.core.storage.CanSKeyValue._
+    println(indexEdge)
+    println(bulkEdge)
 
+    bulkEdge shouldBe (indexEdge)
+  }
+  test("test generateKeyValues edge only. LocalBulkLoaderTransformer") {
     val label = initTestEdgeSchema(s2, tableName, schemaVersion, 
compressionAlgorithm)
     /* end of initialize model */
 
     val bulkEdgeString = 
"1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
-    val input = sc.parallelize(Seq(bulkEdgeString))
-
-    val kvs = HFileGenerator.transfer(sc, s2Config, input, options)
 
-    val ls = kvs.map(kv => 
CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)).collect().toList
+    val transformerMode = "local"
+    val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString))
 
     val serDe = s2.defaultStorage.serDe
 
@@ -172,20 +102,16 @@ class GraphFileGeneratorTest extends FunSuite with 
Matchers with BeforeAndAfterA
     println(indexEdge)
     println(bulkEdge)
 
-    bulkEdge shouldBe(indexEdge)
+    bulkEdge shouldBe (indexEdge)
   }
 
-
-  test("test generateKeyValues vertex only.") {
+  test("test generateKeyValues vertex only. SparkBulkLoaderTransformer") {
     val serviceColumn = initTestVertexSchema(s2)
     val bulkVertexString = 
"20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
     val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, 
options.labelMapping).get
 
-    val input = sc.parallelize(Seq(bulkVertexString))
-
-    val kvs = HFileGenerator.transfer(sc, s2Config, input, options)
-
-    val ls = kvs.map(kv => 
CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)).collect().toList
+    val transformerMode = "spark"
+    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
 
     val serDe = s2.defaultStorage.serDe
 
@@ -198,59 +124,62 @@ class GraphFileGeneratorTest extends FunSuite with 
Matchers with BeforeAndAfterA
     bulkVertex shouldBe(vertex)
   }
 
-  test("test generateHFile vertex only.") {
-    val serviceColumn = initTestVertexSchema(s2)
-//    val lines = Source.fromFile("/tmp/imei-20.txt").getLines().toSeq
-    val bulkVertexString = 
"20171201\tinsert\tvertex\t800188448586078\ts2graph\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
-    val lines = Seq(bulkVertexString)
-    val input = sc.parallelize(lines)
-
-    val kvs = HFileGenerator.transfer(sc, s2Config, input, options)
-    println(kvs.count())
-  }
-
-  // this test case expect options.input already exist with valid bulk load 
format.
-  test("bulk load and fetch vertex: spark mode") {
+  test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") {
     val serviceColumn = initTestVertexSchema(s2)
+    val bulkVertexString = 
"20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广东省\",\"location_often_city\":\"深圳市\",\"location_often_days\":6,\"location_last_province\":\"广东省\",\"location_last_city\":\"深圳市\",\"fimei_legality\":3}"
+    val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, 
options.labelMapping).get
 
-    deleteRecursively(new File(options.tempDir))
-    deleteRecursively(new File(options.output))
-
-    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-    val input = sc.parallelize(bulkVertexLs)
-
-    HFileGenerator.generate(sc, s2Config, input, options)
+    val transformerMode = "local"
+    val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString))
 
-    val hfileArgs = Array(options.output, options.tableName)
-    val hbaseConfig = HBaseConfiguration.create()
+    val serDe = s2.defaultStorage.serDe
 
-    val ret = ToolRunner.run(hbaseConfig, new 
LoadIncrementalHFiles(hbaseConfig), hfileArgs)
+    val vertex = 
serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, 
None).get
 
-    val s2Vertices = 
s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-    val json = PostProcess.verticesToJson(s2Vertices)
+    PostProcess.s2VertexToJson(vertex).foreach { jsValue =>
+      println(Json.prettyPrint(jsValue))
+    }
 
-    println(Json.prettyPrint(json))
+    bulkVertex shouldBe(vertex)
   }
 
-  // this test case expect options.input already exist with valid bulk load 
format.
-  test("bulk load and fetch vertex: mr mode") {
-    val serviceColumn = initTestVertexSchema(s2)
-
-    deleteRecursively(new File(options.tempDir))
-    deleteRecursively(new File(options.output))
-
-    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-    val input = sc.parallelize(bulkVertexLs)
-
-    HFileMRGenerator.generate(sc, s2Config, input, options)
-
-    val hfileArgs = Array(options.output, options.tableName)
-    val hbaseConfig = HBaseConfiguration.create()
-
-    val ret = ToolRunner.run(hbaseConfig, new 
LoadIncrementalHFiles(hbaseConfig), hfileArgs)
-    val s2Vertices = 
s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-    val json = PostProcess.verticesToJson(s2Vertices)
-
-    println(Json.prettyPrint(json))
-  }
+//   this test case expect options.input already exist with valid bulk load 
format.
+//  test("bulk load and fetch vertex: spark mode") {
+//    import scala.collection.JavaConverters._
+//    val serviceColumn = initTestVertexSchema(s2)
+//
+//    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+//    val input = sc.parallelize(bulkVertexLs)
+//
+//    HFileGenerator.generate(sc, s2Config, input, options)
+//
+//    val hfileArgs = Array(options.output, options.tableName)
+//    val hbaseConfig = HBaseConfiguration.create()
+//
+//    val ret = ToolRunner.run(hbaseConfig, new 
LoadIncrementalHFiles(hbaseConfig), hfileArgs)
+//
+//    val s2Vertices = 
s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+//    val json = PostProcess.verticesToJson(s2Vertices)
+//
+//    println(Json.prettyPrint(json))
+//  }
+
+//   this test case expect options.input already exist with valid bulk load 
format.
+//  test("bulk load and fetch vertex: mr mode") {
+//    val serviceColumn = initTestVertexSchema(s2)
+//
+//    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+//    val input = sc.parallelize(bulkVertexLs)
+//
+//    HFileMRGenerator.generate(sc, s2Config, input, options)
+//
+//    val hfileArgs = Array(options.output, options.tableName)
+//    val hbaseConfig = HBaseConfiguration.create()
+//
+//    val ret = ToolRunner.run(hbaseConfig, new 
LoadIncrementalHFiles(hbaseConfig), hfileArgs)
+//    val s2Vertices = 
s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+//    val json = PostProcess.verticesToJson(s2Vertices)
+//
+//    println(Json.prettyPrint(json))
+//  }
 }

Reply via email to