add test case for TransferHFile.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/cd41b8f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/cd41b8f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/cd41b8f1

Branch: refs/heads/master
Commit: cd41b8f147394e226298f4e8efa696a3eed0976d
Parents: 4e3fd9c
Author: DO YUNG YOON <[email protected]>
Authored: Tue Feb 27 16:35:37 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Tue Feb 27 16:56:18 2018 +0900

----------------------------------------------------------------------
 loader/build.sbt                                |   5 +-
 loader/loader.py                                |  30 ++-
 .../loader/subscriber/GraphSubscriber.scala     |  10 +
 .../loader/subscriber/TransferToHFile.scala     | 204 +++++++++++++------
 .../loader/subscriber/TransferToHFileTest.scala | 123 +++++++++++
 .../apache/s2graph/core/storage/SKeyValue.scala |   4 +
 .../s2graph/core/storage/StorageSerDe.scala     |   2 +-
 7 files changed, 306 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cd41b8f1/loader/build.sbt
----------------------------------------------------------------------
diff --git a/loader/build.sbt b/loader/build.sbt
index ac7d948..a93713a 100644
--- a/loader/build.sbt
+++ b/loader/build.sbt
@@ -29,14 +29,15 @@ projectDependencies := Seq(
 
 libraryDependencies ++= Seq(
   "com.google.guava" % "guava" % "12.0.1" force(), // use this old version of 
guava to avoid incompatibility
-  "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
+  "org.apache.spark" %% "spark-core" % sparkVersion % "provided" 
exclude("javax.servlet", "*"),
   "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
   "org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
   "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion,
   "org.apache.httpcomponents" % "fluent-hc" % "4.2.5",
   "org.specs2" %% "specs2-core" % specs2Version % "test",
   "org.scalatest" %% "scalatest" % "2.2.1" % "test",
-  "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion
+  "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion,
+  "com.github.scopt" %% "scopt" % "3.7.0"
 )
 
 crossScalaVersions := Seq("2.10.6")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cd41b8f1/loader/loader.py
----------------------------------------------------------------------
diff --git a/loader/loader.py b/loader/loader.py
index 1d4dc32..58447ff 100644
--- a/loader/loader.py
+++ b/loader/loader.py
@@ -30,8 +30,32 @@ def hfile(args):
 --name "TransferToHFile@shon" \
 --conf "spark.task.maxFailures=20" \
 --master yarn-cluster \
---num-executors %s --driver-memory 1g --executor-memory 2g --executor-cores 1 
%s \
-%s /tmp/%s %s %s %s %s %s %s""" % (args["num_executors"], JAR, args["input"], 
args["htable_name"], args["hbase_zk"], args["htable_name"], args["db_url"], 
args["max_file_per_region"], args["label_mapping"], args["auto_create_edge"])
+--num-executors %s \
+--driver-memory 1g \
+--executor-memory 2g \
+--executor-cores 1 \
+%s \
+--input %s \
+--tmpPath /tmp/%s \
+--zkQuorum %s \
+--table %s \
+--dbUrl %s \
+--dbUser %s \
+--dbPassword %s \
+--maxHFilePerRegionServer %s \
+--labelMapping %s \
+--autoEdgeCreate %s""" % (args["num_executors"],
+                                                  JAR,
+                                                  args["input"],
+                                                  args["htable_name"],
+                                                  args["hbase_zk"],
+                                                  args["htable_name"],
+                                                  args["db_url"],
+                                                  args["db_user"],
+                                                  args["db_password"],
+                                                  args["max_file_per_region"],
+                                                  args["label_mapping"],
+                                                  args["auto_create_edge"])
        print cmd
        ret = os.system(cmd)
        print cmd, "return", ret
@@ -96,6 +120,8 @@ args = {
 "hbase_namenode": "hdfs://nameservice:8020",
 "hbase_zk": "localhost",
 "db_url": "jdbc:mysql://localhost:3306/graph_dev",
+"db_user": "graph",
+"db_password": "graph",
 "max_file_per_region": 1,
 "label_mapping": "none",
 "auto_create_edge": "false",

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cd41b8f1/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
index 6ecb070..90e8bbb 100644
--- 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
@@ -76,6 +76,16 @@ object GraphSubscriberHelper extends WithKafka {
     }
   }
 
+  def apply(_config: Config): Unit = {
+    config = _config
+    if (g == null) {
+      val ec = ExecutionContext.Implicits.global
+      g = new S2Graph(config)(ec)
+      management = new Management(g)
+      builder = g.elementBuilder
+    }
+  }
+
   def apply(phase: String, dbUrl: String, zkQuorum: String, kafkaBrokerList: 
String): Unit = {
     config = GraphConfig(phase, toOption(dbUrl), toOption(zkQuorum), 
toOption(kafkaBrokerList))
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cd41b8f1/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
index 4eb9898..0d72b9c 100644
--- 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -19,44 +19,103 @@
 
 package org.apache.s2graph.loader.subscriber
 
-import org.apache.hadoop.hbase.client.Put
+import com.typesafe.config.Config
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase._
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
-import org.apache.hadoop.hbase.mapreduce.{TableOutputFormat}
-import org.apache.hadoop.hbase._
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
 import org.apache.hadoop.hbase.regionserver.BloomType
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
-import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId, 
LabelWithDirection}
-import org.apache.s2graph.loader.spark.{KeyFamilyQualifier, HBaseContext, 
FamilyHFileWriteOptions}
+import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
+import org.apache.s2graph.loader.spark.{FamilyHFileWriteOptions, HBaseContext, 
KeyFamilyQualifier}
 import org.apache.s2graph.spark.spark.SparkApp
-import org.apache.spark.{SparkContext}
+import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
-import org.hbase.async.{PutRequest}
+import org.hbase.async.PutRequest
 import play.api.libs.json.Json
+
 import scala.collection.JavaConversions._
 
 
 object TransferToHFile extends SparkApp {
 
-  val usages =
-    s"""
-       |create HFiles for hbase table on zkQuorum specified.
-       |note that hbase table is created already and pre-splitted properly.
-       |
-       |params:
-       |0. input: hdfs path for tsv file(bulk format).
-       |1. output: hdfs path for storing HFiles.
-       |2. zkQuorum: running hbase cluster zkQuorum.
-       |3. tableName: table name for this bulk upload.
-       |4. dbUrl: db url for parsing to graph element.
-     """.stripMargin
+  var options:GraphFileOptions = _
+
+  case class GraphFileOptions(input: String = "",
+                              tmpPath: String = "",
+                              zkQuorum: String = "",
+                              tableName: String = "",
+                              dbUrl: String = "",
+                              dbUser: String = "",
+                              dbPassword: String = "",
+                              maxHFilePerRegionServer: Int = 1,
+                              labelMapping: Map[String, String] = 
Map.empty[String, String],
+                              autoEdgeCreate: Boolean = false,
+                              buildDegree: Boolean = false,
+                              compressionAlgorithm: String = "") {
+    def toConfigParams = {
+      Map(
+        "hbase.zookeeper.quorum" -> zkQuorum,
+        "db.default.url" -> dbUrl,
+        "db.default.user" -> dbUser,
+        "db.default.password" -> dbPassword
+      )
+    }
+  }
+
+  val parser = new scopt.OptionParser[GraphFileOptions]("run") {
+
+    opt[String]('i', "input").required().action( (x, c) =>
+      c.copy(input = x) ).text("hdfs path for tsv file(bulk format)")
+
+    opt[String]('m', "tmpPath").required().action( (x, c) =>
+      c.copy(tmpPath = x) ).text("temp hdfs path for storing HFiles")
+
+    opt[String]('z', "zkQuorum").required().action( (x, c) =>
+      c.copy(zkQuorum = x) ).text("zookeeper config for hbase")
+
+    opt[String]('t', "table").required().action( (x, c) =>
+      c.copy(tableName = x) ).text("table name for this bulk upload.")
+
+    opt[String]('c', "dbUrl").required().action( (x, c) =>
+      c.copy(dbUrl = x)).text("jdbc connection url.")
+
+    opt[String]('u', "dbUser").required().action( (x, c) =>
+      c.copy(dbUser = x)).text("database user name.")
+
+    opt[String]('p', "dbPassword").required().action( (x, c) =>
+      c.copy(dbPassword = x)).text("database password.")
+
+    opt[Int]('h', "maxHFilePerRegionServer").action ( (x, c) =>
+      c.copy(maxHFilePerRegionServer = x)).text("maximum number of HFile per 
RegionServer."
+    )
+
+    opt[String]('l', "labelMapping").action( (x, c) =>
+      c.copy(labelMapping = toLabelMapping(x)) ).text("mapping info to change 
the label from source (originalLabel:newLabel)")
+
+    opt[Boolean]('d', "buildDegree").action( (x, c) =>
+      c.copy(buildDegree = x)).text("generate degree values")
+
+    opt[Boolean]('a', "autoEdgeCreate").action( (x, c) =>
+      c.copy(autoEdgeCreate = x)).text("generate reverse edge automatically")
+  }
 
   //TODO: Process AtomicIncrementRequest too.
   /** build key values */
   case class DegreeKey(vertexIdStr: String, labelName: String, direction: 
String)
 
+  private def toLabelMapping(lableMapping: String): Map[String, String] = {
+    (for {
+      token <- lableMapping.split(",")
+      inner = token.split(":") if inner.length == 2
+    } yield {
+      (inner.head, inner.last)
+    }).toMap
+  }
+
   private def insertBulkForLoaderAsync(edge: S2Edge, createRelEdges: Boolean = 
true): List[PutRequest] = {
     val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
     buildPutRequests(edge.toSnapshotEdge) ++ relEdges.toList.flatMap { e =>
@@ -83,14 +142,17 @@ object TransferToHFile extends SparkApp {
       output <- List(degreeKey -> 1L) ++ extra
     } yield output
   }
+
   def buildPutRequests(snapshotEdge: SnapshotEdge): List[PutRequest] = {
     val kvs = 
GraphSubscriberHelper.g.getStorage(snapshotEdge.label).serDe.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList
     kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, 
kv.value, kv.timestamp) }
   }
+
   def buildPutRequests(indexEdge: IndexEdge): List[PutRequest] = {
     val kvs = 
GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.toList
     kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, 
kv.value, kv.timestamp) }
   }
+
   def buildDegreePutRequests(vertexId: String, labelName: String, direction: 
String, degreeVal: Long): List[PutRequest] = {
     val label = Label.findByName(labelName).getOrElse(throw new 
RuntimeException(s"$labelName is not found in DB."))
     val dir = GraphUtil.directions(direction)
@@ -101,7 +163,7 @@ object TransferToHFile extends SparkApp {
 
     val ts = System.currentTimeMillis()
     val propsWithTs = Map(LabelMeta.timestamp -> 
InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
-    val edge = GraphSubscriberHelper.builder.newEdge(vertex, vertex, label, 
dir, propsWithTs=propsWithTs)
+    val edge = GraphSubscriberHelper.builder.newEdge(vertex, vertex, label, 
dir, propsWithTs = propsWithTs)
 
     edge.edgesWithIndex.flatMap { indexEdge =>
       
GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.map
 { kv =>
@@ -115,13 +177,13 @@ object TransferToHFile extends SparkApp {
       (key, value) <- degreeKeyVals
       putRequest <- buildDegreePutRequests(key.vertexIdStr, key.labelName, 
key.direction, value)
     } yield {
-        val p = putRequest
-        val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, 
p.value)
-        kv
-      }
+      val p = putRequest
+      val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, 
p.value)
+      kv
+    }
     kvs.toIterator
   }
-  
+
   def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], 
autoEdgeCreate: Boolean): Iterator[KeyValue] = {
     val kvList = new java.util.ArrayList[KeyValue]
     for (s <- strs) {
@@ -143,73 +205,81 @@ object TransferToHFile extends SparkApp {
             val kv = new KeyValue(p.key(), p.family(), p.qualifier, 
p.timestamp, p.value)
             kvList.add(kv)
           }
-        } 
+        }
       }
     }
     kvList.iterator()
   }
-  
 
+  def generateKeyValues(sc: SparkContext,
+                        s2Config: Config,
+                        input: RDD[String],
+                        graphFileOptions: GraphFileOptions): RDD[KeyValue] = {
+    val kvs = input.mapPartitions { iter =>
+      GraphSubscriberHelper.apply(s2Config)
 
-  override def run() = {
-    val input = args(0)
-    val tmpPath = args(1)
-    val zkQuorum = args(2)
-    val tableName = args(3)
-    val dbUrl = args(4)
-    val maxHFilePerResionServer = args(5).toInt
-    val labelMapping = if (args.length >= 7) 
GraphSubscriberHelper.toLabelMapping(args(6)) else Map.empty[String, String]
-    val autoEdgeCreate = if (args.length >= 8) args(7).toBoolean else false
-    val buildDegree = if (args.length >= 9) args(8).toBoolean else true
-    val compressionAlgorithm = if (args.length >= 10) args(9) else "lz4"
-    val conf = sparkConf(s"$input: TransferToHFile")
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-    conf.set("spark.kryoserializer.buffer.mb", "24")
+      toKeyValues(iter.toSeq, graphFileOptions.labelMapping, 
graphFileOptions.autoEdgeCreate)
+    }
 
-    val sc = new SparkContext(conf)
+    if (!graphFileOptions.buildDegree) kvs
+    else {
+      kvs ++ buildDegrees(input, graphFileOptions.labelMapping, 
graphFileOptions.autoEdgeCreate).reduceByKey { (agg, current) =>
+        agg + current
+      }.mapPartitions { iter =>
+        GraphSubscriberHelper.apply(s2Config)
 
-    val phase = System.getProperty("phase")
-    GraphSubscriberHelper.apply(phase, dbUrl, "none", "none")
-    GraphSubscriberHelper.management.createStorageTable(zkQuorum, tableName, 
List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm)
+        toKeyValues(iter.toSeq)
+      }
+    }
+  }
 
-    /* set up hbase init */
+  def toHBaseConfig(graphFileOptions: GraphFileOptions): Configuration = {
     val hbaseConf = HBaseConfiguration.create()
-    hbaseConf.set("hbase.zookeeper.quorum", zkQuorum)
-    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
-    hbaseConf.set("hadoop.tmp.dir", s"/tmp/$tableName")
 
+    hbaseConf.set("hbase.zookeeper.quorum", graphFileOptions.zkQuorum)
+    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, graphFileOptions.tableName)
+    hbaseConf.set("hadoop.tmp.dir", s"/tmp/${graphFileOptions.tableName}")
 
-    val rdd = sc.textFile(input)
-
+    hbaseConf
+  }
 
-    val kvs = rdd.mapPartitions { iter =>
-      val phase = System.getProperty("phase")
-      GraphSubscriberHelper.apply(phase, dbUrl, "none", "none")
-      toKeyValues(iter.toSeq, labelMapping, autoEdgeCreate)
+  override def run() = {
+    parser.parse(args, GraphFileOptions()) match {
+      case Some(o) => options = o
+      case None =>
+        parser.showUsage()
+        throw new IllegalArgumentException("failed to parse options...")
     }
 
-    val merged = if (!buildDegree) kvs
-    else {
-      kvs ++ buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { 
(agg, current) =>
-        agg + current
-      }.mapPartitions { iter =>
-        val phase = System.getProperty("phase")
-        GraphSubscriberHelper.apply(phase, dbUrl, "none", "none")
-        toKeyValues(iter.toSeq)
-      }
-    }
+    println(s">>> Options: ${options}")
+    val s2Config = Management.toConfig(options.toConfigParams)
+
+    val conf = sparkConf(s"TransferToHFile")
+
+    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+    conf.set("spark.kryoserializer.buffer.mb", "24")
+
+    val sc = new SparkContext(conf)
+    val rdd = sc.textFile(options.input)
+
+    GraphSubscriberHelper.apply(s2Config)
+
+    val merged = TransferToHFile.generateKeyValues(sc, s2Config, rdd, options)
+
+    /* set up hbase init */
+    val hbaseSc = new HBaseContext(sc, toHBaseConfig(options))
 
-    val hbaseSc = new HBaseContext(sc, hbaseConf)
     def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
       val k = new KeyFamilyQualifier(CellUtil.cloneRow(kv), 
CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv))
       val v = CellUtil.cloneValue(kv)
       Seq((k -> v)).toIterator
     }
+
     val familyOptions = new 
FamilyHFileWriteOptions(Algorithm.LZ4.getName.toUpperCase,
       BloomType.ROW.name().toUpperCase, 32768, 
DataBlockEncoding.FAST_DIFF.name().toUpperCase)
     val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, 
"v".getBytes("UTF-8") -> familyOptions)
 
-    hbaseSc.bulkLoad(merged, TableName.valueOf(tableName), flatMap, tmpPath, 
familyOptionsMap)
+    hbaseSc.bulkLoad(merged, TableName.valueOf(options.tableName), flatMap, 
options.tmpPath, familyOptionsMap)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cd41b8f1/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
 
b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
new file mode 100644
index 0000000..36ee530
--- /dev/null
+++ 
b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.s2graph.loader.subscriber
+
+import java.util
+
+import org.apache.s2graph.core.Management
+import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.storage.CanSKeyValue
+import org.apache.s2graph.core.types.HBaseType
+import org.apache.spark.{SparkConf, SparkContext}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+import scala.util.Try
+
+class TransferToHFileTest extends FunSuite with Matchers with 
BeforeAndAfterAll {
+
+  import TransferToHFile._
+
+  private val master = "local[2]"
+  private val appName = "example-spark"
+
+  private var sc: SparkContext = _
+
+  /* TransferHFile parameters */
+  val options = GraphFileOptions(
+    zkQuorum = "localhost",
+    dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL",
+    dbUser = "sa",
+    dbPassword = "sa",
+    tableName = "s2graph",
+    maxHFilePerRegionServer = 1,
+    compressionAlgorithm = "gz",
+    buildDegree = false,
+    autoEdgeCreate = false)
+
+  val s2Config = Management.toConfig(options.toConfigParams)
+
+  val tableName = options.tableName
+  val schemaVersion = HBaseType.DEFAULT_VERSION
+  val compressionAlgorithm: String = options.compressionAlgorithm
+
+  override def beforeAll(): Unit = {
+    // initialize spark context.
+    val conf = new SparkConf()
+      .setMaster(master)
+      .setAppName(appName)
+
+    sc = new SparkContext(conf)
+
+    GraphSubscriberHelper.apply(s2Config)
+  }
+
+  override def afterAll(): Unit = {
+    GraphSubscriberHelper.g.shutdown()
+    if (sc != null) {
+      sc.stop()
+    }
+  }
+
+
+  test("test TransferToHFile Local.") {
+    import scala.collection.JavaConverters._
+    import org.apache.s2graph.core.storage.CanSKeyValue._
+
+    /* initialize model for test */
+    val management = GraphSubscriberHelper.management
+
+    val service = management.createService(serviceName = "s2graph", cluster = 
"localhost",
+      hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, 
compressionAlgorithm = "gz")
+
+    val serviceColumn = management.createServiceColumn(service.serviceName, 
"user", "string", new util.ArrayList[Prop]())
+
+    Try {
+      management.createLabel("friends", serviceColumn, serviceColumn, 
isDirected = true,
+        serviceName = service.serviceName, indices = new 
java.util.ArrayList[Index],
+        props = Seq(Prop("since", "0", "long"), Prop("score", "0", 
"integer")).asJava, consistencyLevel = "strong", hTableName = tableName,
+        hTableTTL = -1, schemaVersion = schemaVersion, compressionAlgorithm = 
compressionAlgorithm, options = "")
+    }
+
+    val label = Label.findByName("friends").getOrElse(throw new 
IllegalArgumentException("friends label is not initialized."))
+    /* end of initialize model */
+
+    val bulkEdgeString = 
"1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
+    val input = sc.parallelize(Seq(bulkEdgeString))
+
+    val kvs = TransferToHFile.generateKeyValues(sc, s2Config, input, options)
+
+    val ls = kvs.map(kv => 
CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)).collect().toList
+
+    val serDe = GraphSubscriberHelper.g.defaultStorage.serDe
+
+    //    val snapshotEdgeOpt = 
serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(ls.head), 
None)
+    //    val indexEdgeOpt = 
serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(ls.last), 
None)
+
+    val bulkEdge = 
GraphSubscriberHelper.g.elementBuilder.toGraphElement(bulkEdgeString, 
options.labelMapping).get
+
+    val indexEdges = ls.flatMap { kv =>
+      serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), 
None)
+    }
+
+    val indexEdge = indexEdges.head
+
+    bulkEdge shouldBe(indexEdge)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cd41b8f1/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
index 775afda..57adc8a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
@@ -65,6 +65,10 @@ object CanSKeyValue {
     SKeyValue(Array.empty[Byte], kv.key(), kv.family(), kv.qualifier(), 
kv.value(), kv.timestamp())
   }
 
+  implicit val hbaseKeyValue = instance[org.apache.hadoop.hbase.KeyValue] { kv 
=>
+    SKeyValue(Array.empty[Byte], kv.getRow, kv.getFamily, kv.getQualifier, 
kv.getValue, kv.getTimestamp)
+  }
+
   // For asyncbase KeyValues
   implicit val sKeyValue = instance[SKeyValue](identity)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cd41b8f1/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala
index 32d640c..78da629 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala
@@ -70,7 +70,7 @@ trait StorageSerDe {
     **/
   def snapshotEdgeDeserializer(schemaVer: String): Deserializable[SnapshotEdge]
 
-  def indexEdgeDeserializer(schemaVer: String): IndexEdgeDeserializable
+  def indexEdgeDeserializer(schemaVer: String): Deserializable[S2EdgeLike]
 
   def vertexDeserializer(schemaVer: String): Deserializable[S2VertexLike]
 

Reply via email to