http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/test/scala/subscriber/TransferToHFileTest.scala
----------------------------------------------------------------------
diff --git a/loader/src/test/scala/subscriber/TransferToHFileTest.scala 
b/loader/src/test/scala/subscriber/TransferToHFileTest.scala
deleted file mode 100644
index 7b3f72f..0000000
--- a/loader/src/test/scala/subscriber/TransferToHFileTest.scala
+++ /dev/null
@@ -1,169 +0,0 @@
-package subscriber
-
-import com.kakao.s2graph.core.Management
-import com.kakao.s2graph.core.types.HBaseType
-import org.apache.spark.{SparkConf, SparkContext}
-import org.scalatest._
-import subscriber.TransferToHFile._
-
-/**
-  * Created by Eric on 2015. 12. 2..
-  */
-class TransferToHFileTest  extends FlatSpec with BeforeAndAfterAll with 
Matchers {
-
-  private val master = "local[2]"
-  private val appName = "example-spark"
-
-  private var sc: SparkContext = _
-
-  val dataWithoutDir =
-    """
-      |1447686000000   insertBulk      e       a       b       friends_rel     
{}
-      |1447686000000   insertBulk      e       a       c       friends_rel     
{}
-      |1447686000000   insertBulk      e       a       d       friends_rel     
{}
-      |1447686000000   insertBulk      e       b       d       friends_rel     
{}
-      |1447686000000   insertBulk      e       b       e       friends_rel     
{}
-    """.stripMargin.trim
-
-  val dataWithDir =
-    """
-     |1447686000000    insertBulk      e       a       b       friends_rel     
{}      out
-     |1447686000000    insertBulk      e       b       a       friends_rel     
{}      in
-     |1447686000000    insertBulk      e       a       c       friends_rel     
{}      out
-     |1447686000000    insertBulk      e       c       a       friends_rel     
{}      in
-     |1447686000000    insertBulk      e       a       d       friends_rel     
{}      out
-     |1447686000000    insertBulk      e       d       a       friends_rel     
{}      in
-     |1447686000000    insertBulk      e       b       d       friends_rel     
{}      out
-     |1447686000000    insertBulk      e       d       b       friends_rel     
{}      in
-     |1447686000000    insertBulk      e       b       e       friends_rel     
{}      out
-     |1447686000000    insertBulk      e       e       b       friends_rel     
{}      in
-    """.stripMargin.trim
-
-  override def beforeAll(): Unit = {
-    println("### beforeAll")
-
-    GraphSubscriberHelper.apply("dev", "none", "none", "none")
-    // 1. create service
-    if(Management.findService("loader-test").isEmpty) {
-      println(">>> create service...")
-      Management.createService("loader-test", "localhost", "loader-test-dev", 
1, None, "gz")
-    }
-
-    // 2. create label
-    if(Management.findLabel("friends_rel").isEmpty) {
-      println(">>> create label...")
-      Management.createLabel(
-        "friends_rel",
-        "loader-test", "user_id", "string",
-        "loader-test", "user_id", "string",
-        true,
-        "loader-test",
-        Seq(),
-        Seq(),
-        "weak",
-        None, None,
-        HBaseType.DEFAULT_VERSION,
-        false,
-        Management.defaultCompressionAlgorithm
-      )
-    }
-
-    // create spark context
-    val conf = new SparkConf()
-      .setMaster(master)
-      .setAppName(appName)
-
-    sc = new SparkContext(conf)
-  }
-
-  override def afterAll(): Unit = {
-    println("### afterALL")
-    if (sc != null) {
-      sc.stop()
-    }
-
-    Management.deleteLabel("friends_rel")
-  }
-
-  "buildDegreePutRequest" should "transform degree to PutRequest" in {
-    val putReqs = buildDegreePutRequests("a", "friends_rel", "out", 3L)
-    putReqs.size should equal(1)
-  }
-
-  "toKeyValues" should "transform edges to KeyValues on edge format data 
without direction" in {
-    val rdd = sc.parallelize(dataWithoutDir.split("\n"))
-
-    val kvs = rdd.mapPartitions { iter =>
-      GraphSubscriberHelper.apply("dev", "none", "none", "none")
-      TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], false)
-    }
-    kvs.foreach(println)
-    // edges * 2 (snapshot edges + indexed edges)
-    kvs.count() should equal(10)
-
-
-    val kvsAutoCreated = rdd.mapPartitions { iter =>
-      GraphSubscriberHelper.apply("dev", "none", "none", "none")
-      TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], true)
-    }
-
-    // edges * 3 (snapshot edges + indexed edges + reverse edges)
-    kvsAutoCreated.count() should equal(15)
-  }
-
-  "toKeyValues" should "transform edges to KeyValues on edge format data with 
direction" in {
-    val rdd = sc.parallelize(dataWithDir.split("\n"))
-
-    val kvs = rdd.mapPartitions { iter =>
-      GraphSubscriberHelper.apply("dev", "none", "none", "none")
-      TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], false)
-    }
-
-    // edges * 2 (snapshot edges + indexed edges)
-    kvs.count() should equal(20)
-  }
-
-  "buildDegrees" should "build degrees on edge format data without direction" 
in {
-    val rdd = sc.parallelize(dataWithoutDir.split("\n"))
-
-    // autoCreate = false
-    val degrees = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], 
false).reduceByKey { (agg, current) =>
-      agg + current
-    }.collectAsMap()
-    degrees.size should equal(2)
-
-    degrees should contain(DegreeKey("a", "friends_rel", "out") -> 3L)
-    degrees should contain(DegreeKey("b", "friends_rel", "out") -> 2L)
-
-
-    // autoCreate = true
-    val degreesAutoCreated = TransferToHFile.buildDegrees(rdd, 
Map.empty[String, String], true).reduceByKey { (agg, current) =>
-      agg + current
-    }.collectAsMap()
-    degreesAutoCreated.size should equal(6)
-
-    degreesAutoCreated should contain(DegreeKey("a", "friends_rel", "out") -> 
3L)
-    degreesAutoCreated should contain(DegreeKey("b", "friends_rel", "out") -> 
2L)
-    degreesAutoCreated should contain(DegreeKey("b", "friends_rel", "in") -> 
1L)
-    degreesAutoCreated should contain(DegreeKey("c", "friends_rel", "in") -> 
1L)
-    degreesAutoCreated should contain(DegreeKey("d", "friends_rel", "in") -> 
2L)
-    degreesAutoCreated should contain(DegreeKey("e", "friends_rel", "in") -> 
1L)
-  }
-
-  "buildDegrees" should "build degrees on edge format data with direction" in {
-    val rdd = sc.parallelize(dataWithDir.split("\n"))
-
-    val degrees = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], 
false).reduceByKey { (agg, current) =>
-      agg + current
-    }.collectAsMap()
-
-    degrees.size should equal(6)
-
-    degrees should contain(DegreeKey("a", "friends_rel", "out") -> 3L)
-    degrees should contain(DegreeKey("b", "friends_rel", "out") -> 2L)
-    degrees should contain(DegreeKey("b", "friends_rel", "in") -> 1L)
-    degrees should contain(DegreeKey("c", "friends_rel", "in") -> 1L)
-    degrees should contain(DegreeKey("d", "friends_rel", "in") -> 2L)
-    degrees should contain(DegreeKey("e", "friends_rel", "in") -> 1L)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala
deleted file mode 100644
index 8e6ad7d..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala
+++ /dev/null
@@ -1,568 +0,0 @@
-package com.kakao.s2graph.core
-
-
-import com.kakao.s2graph.core.mysqls._
-import com.kakao.s2graph.core.types._
-import com.kakao.s2graph.core.utils.logger
-import play.api.libs.json.{JsNumber, Json}
-
-import scala.collection.JavaConversions._
-import scala.util.hashing.MurmurHash3
-
-
-case class SnapshotEdge(srcVertex: Vertex,
-                        tgtVertex: Vertex,
-                        labelWithDir: LabelWithDirection,
-                        op: Byte,
-                        version: Long,
-                        props: Map[Byte, InnerValLikeWithTs],
-                        pendingEdgeOpt: Option[Edge],
-                        statusCode: Byte = 0,
-                        lockTs: Option[Long]) extends JSONParser {
-
-  if (!props.containsKey(LabelMeta.timeStampSeq)) throw new 
Exception("Timestamp is required.")
-
-  val label = Label.findById(labelWithDir.labelId)
-  val schemaVer = label.schemaVersion
-  lazy val propsWithoutTs = props.mapValues(_.innerVal)
-  val ts = props(LabelMeta.timeStampSeq).innerVal.toString().toLong
-
-  def toEdge: Edge = {
-    val ts = props.get(LabelMeta.timeStampSeq).map(v => 
v.ts).getOrElse(version)
-    Edge(srcVertex, tgtVertex, labelWithDir, op,
-      version, props, pendingEdgeOpt = pendingEdgeOpt,
-      statusCode = statusCode, lockTs = lockTs)
-  }
-
-  def propsWithName = (for {
-    (seq, v) <- props
-    meta <- label.metaPropsMap.get(seq)
-    jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
-  } yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version))
-
-  // only for debug
-  def toLogString() = {
-    List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, 
label.label, propsWithName).mkString("\t")
-  }
-}
-
-case class IndexEdge(srcVertex: Vertex,
-                     tgtVertex: Vertex,
-                     labelWithDir: LabelWithDirection,
-                     op: Byte,
-                     version: Long,
-                     labelIndexSeq: Byte,
-                     props: Map[Byte, InnerValLike]) extends JSONParser {
-  if (!props.containsKey(LabelMeta.timeStampSeq)) throw new 
Exception("Timestamp is required.")
-  //  assert(props.containsKey(LabelMeta.timeStampSeq))
-
-  val ts = props(LabelMeta.timeStampSeq).toString.toLong
-  val degreeEdge = props.contains(LabelMeta.degreeSeq)
-  lazy val label = Label.findById(labelWithDir.labelId)
-  val schemaVer = label.schemaVersion
-  lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, 
labelIndexSeq).get
-  lazy val defaultIndexMetas = labelIndex.sortKeyTypes.map { meta =>
-    val innerVal = toInnerVal(meta.defaultValue, meta.dataType, schemaVer)
-    meta.seq -> innerVal
-  }.toMap
-
-  lazy val labelIndexMetaSeqs = labelIndex.metaSeqs
-
-  /** TODO: make sure call of this class fill props as this assumes */
-  lazy val orders = for (k <- labelIndexMetaSeqs) yield {
-    props.get(k) match {
-      case None =>
-
-        /**
-          * TODO: agly hack
-          * now we double store target vertex.innerId/srcVertex.innerId for 
easy development. later fix this to only store id once
-          */
-        val v = k match {
-          case LabelMeta.timeStampSeq => InnerVal.withLong(version, schemaVer)
-          case LabelMeta.toSeq => tgtVertex.innerId
-          case LabelMeta.fromSeq => //srcVertex.innerId
-            // for now, it does not make sense to build index on 
srcVertex.innerId since all edges have same data.
-            throw new RuntimeException("_from on indexProps is not supported")
-          case _ => defaultIndexMetas(k)
-        }
-
-        k -> v
-      case Some(v) => k -> v
-    }
-  }
-
-  lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet
-  lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k 
-> v
-
-  lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, 
version) }
-
-  //TODO:
-  //  lazy val kvs = 
Graph.client.indexedEdgeSerializer(this).toKeyValues.toList
-
-  lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length
-
-  def propsWithName = for {
-    (seq, v) <- props
-    meta <- label.metaPropsMap.get(seq) if seq >= 0
-    jsValue <- innerValToJsValue(v, meta.dataType)
-  } yield meta.name -> jsValue
-
-
-  def toEdge: Edge = Edge(srcVertex, tgtVertex, labelWithDir, op, version, 
propsWithTs)
-
-  // only for debug
-  def toLogString() = {
-    List(version, GraphUtil.fromOp(op), "e", srcVertex.innerId, 
tgtVertex.innerId, label.label, Json.toJson(propsWithName)).mkString("\t")
-  }
-}
-
-case class Edge(srcVertex: Vertex,
-                tgtVertex: Vertex,
-                labelWithDir: LabelWithDirection,
-                op: Byte = GraphUtil.defaultOpByte,
-                version: Long = System.currentTimeMillis(),
-                propsWithTs: Map[Byte, InnerValLikeWithTs],
-                parentEdges: Seq[EdgeWithScore] = Nil,
-                originalEdgeOpt: Option[Edge] = None,
-                pendingEdgeOpt: Option[Edge] = None,
-                statusCode: Byte = 0,
-                lockTs: Option[Long] = None) extends GraphElement with 
JSONParser {
-
-  if (!props.containsKey(LabelMeta.timeStampSeq)) throw new 
Exception("Timestamp is required.")
-  //  assert(propsWithTs.containsKey(LabelMeta.timeStampSeq))
-  val schemaVer = label.schemaVersion
-  val ts = propsWithTs(LabelMeta.timeStampSeq).innerVal.toString.toLong
-
-  def props = propsWithTs.mapValues(_.innerVal)
-
-  def relatedEdges = {
-    if (labelWithDir.isDirected) List(this, duplicateEdge)
-    else {
-      val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
-      val base = copy(labelWithDir = outDir)
-      List(base, base.reverseSrcTgtEdge)
-    }
-  }
-
-  //    def relatedEdges = List(this)
-
-  def srcForVertex = {
-    val belongLabelIds = Seq(labelWithDir.labelId)
-    if (labelWithDir.dir == GraphUtil.directions("in")) {
-      Vertex(VertexId(label.tgtColumn.id.get, tgtVertex.innerId), 
tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
-    } else {
-      Vertex(VertexId(label.srcColumn.id.get, srcVertex.innerId), 
srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
-    }
-  }
-
-  def tgtForVertex = {
-    val belongLabelIds = Seq(labelWithDir.labelId)
-    if (labelWithDir.dir == GraphUtil.directions("in")) {
-      Vertex(VertexId(label.srcColumn.id.get, srcVertex.innerId), 
srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
-    } else {
-      Vertex(VertexId(label.tgtColumn.id.get, tgtVertex.innerId), 
tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
-    }
-  }
-
-  def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge
-
-  def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled)
-
-  def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex)
-
-  def label = Label.findById(labelWithDir.labelId)
-
-  def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId)
-
-  override def serviceName = label.serviceName
-
-  override def queueKey = Seq(ts.toString, tgtVertex.serviceName).mkString("|")
-
-  override def queuePartitionKey = Seq(srcVertex.innerId, 
tgtVertex.innerId).mkString("|")
-
-  override def isAsync = label.isAsync
-
-  def isDegree = propsWithTs.contains(LabelMeta.degreeSeq)
-
-  def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
-    case Some(_) => props
-    case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, 
schemaVer))
-  }
-
-  def propsPlusTsValid = propsPlusTs.filter(kv => kv._1 >= 0)
-
-  def edgesWithIndex = for (labelOrder <- labelOrders) yield {
-    IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, 
propsPlusTs)
-  }
-
-  def edgesWithIndexValid = for (labelOrder <- labelOrders) yield {
-    IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, 
propsPlusTsValid)
-  }
-
-  /** force direction as out on invertedEdge */
-  def toSnapshotEdge: SnapshotEdge = {
-    val (smaller, larger) = (srcForVertex, tgtForVertex)
-
-    val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, 
GraphUtil.directions("out"))
-
-    val ret = SnapshotEdge(smaller, larger, newLabelWithDir, op, version,
-      Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(ts, 
schemaVer), ts)) ++ propsWithTs,
-      pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = 
lockTs)
-    ret
-  }
-
-  override def hashCode(): Int = {
-    MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + 
tgtVertex.innerId)
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case e: Edge =>
-      srcVertex.innerId == e.srcVertex.innerId &&
-        tgtVertex.innerId == e.tgtVertex.innerId &&
-        labelWithDir == e.labelWithDir
-    case _ => false
-  }
-
-  def propsWithName = for {
-    (seq, v) <- props
-    meta <- label.metaPropsMap.get(seq) if seq > 0
-    jsValue <- innerValToJsValue(v, meta.dataType)
-  } yield meta.name -> jsValue
-
-  def updateTgtVertex(id: InnerValLike) = {
-    val newId = TargetVertexId(tgtVertex.id.colId, id)
-    val newTgtVertex = Vertex(newId, tgtVertex.ts, tgtVertex.props)
-    Edge(srcVertex, newTgtVertex, labelWithDir, op, version, propsWithTs)
-  }
-
-  def rank(r: RankParam): Double =
-    if (r.keySeqAndWeights.size <= 0) 1.0f
-    else {
-      var sum: Double = 0
-
-      for ((seq, w) <- r.keySeqAndWeights) {
-        seq match {
-          case LabelMeta.countSeq => sum += 1
-          case _ => {
-            propsWithTs.get(seq) match {
-              case None => // do nothing
-              case Some(innerValWithTs) => {
-                val cost = try innerValWithTs.innerVal.toString.toDouble catch 
{
-                  case e: Exception =>
-                    logger.error("toInnerval failed in rank", e)
-                    1.0
-                }
-                sum += w * cost
-              }
-            }
-          }
-        }
-      }
-      sum
-    }
-
-  def toLogString: String = {
-    val ret =
-      if (propsWithName.nonEmpty)
-        List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, 
tgtVertex.innerId, label.label, Json.toJson(propsWithName))
-      else
-        List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, 
tgtVertex.innerId, label.label)
-
-    ret.mkString("\t")
-  }
-}
-
-case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge],
-                      edgesToInsert: List[IndexEdge] = List.empty[IndexEdge],
-                      newSnapshotEdge: Option[SnapshotEdge] = None) {
-
-  def toLogString: String = {
-    val l = (0 until 50).map(_ => "-").mkString("")
-    val deletes = s"deletes: ${edgesToDelete.map(e => 
e.toLogString).mkString("\n")}"
-    val inserts = s"inserts: ${edgesToInsert.map(e => 
e.toLogString).mkString("\n")}"
-    val updates = s"snapshot: ${newSnapshotEdge.map(e => 
e.toLogString).mkString("\n")}"
-
-    List("\n", l, deletes, inserts, updates, l, "\n").mkString("\n")
-  }
-}
-
-object Edge extends JSONParser {
-  val incrementVersion = 1L
-  val minTsVal = 0L
-
-  /** now version information is required also **/
-  type State = Map[Byte, InnerValLikeWithTs]
-  type PropsPairWithTs = (State, State, Long, String)
-  type MergeState = PropsPairWithTs => (State, Boolean)
-  type UpdateFunc = (Option[Edge], Edge, MergeState)
-
-  def allPropsDeleted(props: Map[Byte, InnerValLikeWithTs]): Boolean =
-    if (!props.containsKey(LabelMeta.lastDeletedAt)) false
-    else {
-      val lastDeletedAt = props.get(LabelMeta.lastDeletedAt).get.ts
-      val propsWithoutLastDeletedAt = props - LabelMeta.lastDeletedAt
-
-      propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt }
-    }
-
-  def buildDeleteBulk(invertedEdge: Option[Edge], requestEdge: Edge): (Edge, 
EdgeMutate) = {
-    //    assert(invertedEdge.isEmpty)
-    //    assert(requestEdge.op == GraphUtil.operations("delete"))
-
-    val edgesToDelete = requestEdge.relatedEdges.flatMap { relEdge => 
relEdge.edgesWithIndexValid }
-    val edgeInverted = Option(requestEdge.toSnapshotEdge)
-
-    (requestEdge, EdgeMutate(edgesToDelete, edgesToInsert = Nil, edgeInverted))
-  }
-
-  def buildOperation(invertedEdge: Option[Edge], requestEdges: Seq[Edge]): 
(Edge, EdgeMutate) = {
-    //            logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}")
-    //            logger.debug(s"requestEdge: ${requestEdge.toStringRaw}")
-    val oldPropsWithTs =
-      if (invertedEdge.isEmpty) Map.empty[Byte, InnerValLikeWithTs] else 
invertedEdge.get.propsWithTs
-
-    val funcs = requestEdges.map { edge =>
-      if (edge.op == GraphUtil.operations("insert")) {
-        edge.label.consistencyLevel match {
-          case "strong" => Edge.mergeUpsert _
-          case _ => Edge.mergeInsertBulk _
-        }
-      } else if (edge.op == GraphUtil.operations("insertBulk")) {
-        Edge.mergeInsertBulk _
-      } else if (edge.op == GraphUtil.operations("delete")) {
-        edge.label.consistencyLevel match {
-          case "strong" => Edge.mergeDelete _
-          case _ => throw new RuntimeException("not supported")
-        }
-      }
-      else if (edge.op == GraphUtil.operations("update")) Edge.mergeUpdate _
-      else if (edge.op == GraphUtil.operations("increment")) 
Edge.mergeIncrement _
-      else throw new RuntimeException(s"not supported operation on edge: 
$edge")
-    }
-
-    val oldTs = invertedEdge.map(_.ts).getOrElse(minTsVal)
-    val requestWithFuncs = requestEdges.zip(funcs).filter(oldTs != 
_._1.ts).sortBy(_._1.ts)
-
-    if (requestWithFuncs.isEmpty) {
-      (requestEdges.head, EdgeMutate())
-    } else {
-      val requestEdge = requestWithFuncs.last._1
-      var prevPropsWithTs = oldPropsWithTs
-
-      for {
-        (requestEdge, func) <- requestWithFuncs
-      } {
-        val (_newPropsWithTs, _) = func(prevPropsWithTs, 
requestEdge.propsWithTs, requestEdge.ts, requestEdge.schemaVer)
-        prevPropsWithTs = _newPropsWithTs
-        //        
logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n")
-      }
-      val requestTs = requestEdge.ts
-      /** version should be monotoniously increasing so our RPC mutation 
should be applied safely */
-      val newVersion = invertedEdge.map(e => e.version + 
incrementVersion).getOrElse(requestTs)
-      val maxTs = prevPropsWithTs.map(_._2.ts).max
-      val newTs = if (maxTs > requestTs) maxTs else requestTs
-      val propsWithTs = prevPropsWithTs ++
-        Map(LabelMeta.timeStampSeq -> 
InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.label.schemaVersion), 
newTs))
-      val edgeMutate = buildMutation(invertedEdge, requestEdge, newVersion, 
oldPropsWithTs, propsWithTs)
-
-      //      logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}")
-      //      logger.error(s"$propsWithTs")
-      (requestEdge, edgeMutate)
-    }
-  }
-
-  def buildMutation(snapshotEdgeOpt: Option[Edge],
-                    requestEdge: Edge,
-                    newVersion: Long,
-                    oldPropsWithTs: Map[Byte, InnerValLikeWithTs],
-                    newPropsWithTs: Map[Byte, InnerValLikeWithTs]): EdgeMutate 
= {
-    if (oldPropsWithTs == newPropsWithTs) {
-      // all requests should be dropped. so empty mutation.
-      //      logger.error(s"Case 1")
-      EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = 
None)
-    } else {
-      val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != 
LabelMeta.lastDeletedAt)
-      val newOp = snapshotEdgeOpt match {
-        case None => requestEdge.op
-        case Some(old) =>
-          val oldMaxTs = old.propsWithTs.map(_._2.ts).max
-          if (oldMaxTs > requestEdge.ts) old.op
-          else requestEdge.op
-      }
-
-      val newSnapshotEdgeOpt =
-        Option(requestEdge.copy(op = newOp, propsWithTs = newPropsWithTs, 
version = newVersion).toSnapshotEdge)
-      // delete request must always update snapshot.
-      if (withOutDeletedAt == oldPropsWithTs && 
newPropsWithTs.containsKey(LabelMeta.lastDeletedAt)) {
-        // no mutation on indexEdges. only snapshotEdge should be updated to 
record lastDeletedAt.
-        //        logger.error(s"Case 2")
-        EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = 
newSnapshotEdgeOpt)
-      } else {
-        //        logger.error(s"Case 3")
-        val edgesToDelete = snapshotEdgeOpt match {
-          case Some(snapshotEdge) if snapshotEdge.op != 
GraphUtil.operations("delete") =>
-            snapshotEdge.copy(op = GraphUtil.defaultOpByte).
-              relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
-          case _ => Nil
-        }
-
-        val edgesToInsert =
-          if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil
-          else
-            requestEdge.copy(version = newVersion, propsWithTs = 
newPropsWithTs, op = GraphUtil.defaultOpByte).
-              relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
-
-        EdgeMutate(edgesToDelete = edgesToDelete, edgesToInsert = 
edgesToInsert, newSnapshotEdge = newSnapshotEdgeOpt)
-      }
-    }
-  }
-
-  def mergeUpsert(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
-    var shouldReplace = false
-    val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
-    val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => 
v.ts).getOrElse(minTsVal)
-    val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
-      propsWithTs.get(k) match {
-        case Some(newValWithTs) =>
-          assert(oldValWithTs.ts >= lastDeletedAt)
-          val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
-          else {
-            shouldReplace = true
-            newValWithTs
-          }
-          Some(k -> v)
-
-        case None =>
-          assert(oldValWithTs.ts >= lastDeletedAt)
-          if (oldValWithTs.ts >= requestTs || k < 0) Some(k -> oldValWithTs)
-          else {
-            shouldReplace = true
-            None
-          }
-      }
-    }
-    val existInNew =
-      for {
-        (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && 
newValWithTs.ts > lastDeletedAt
-      } yield {
-        shouldReplace = true
-        Some(k -> newValWithTs)
-      }
-
-    ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
-  }
-
-  def mergeUpdate(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
-    var shouldReplace = false
-    val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
-    val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => 
v.ts).getOrElse(minTsVal)
-    val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
-      propsWithTs.get(k) match {
-        case Some(newValWithTs) =>
-          assert(oldValWithTs.ts >= lastDeletedAt)
-          val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
-          else {
-            shouldReplace = true
-            newValWithTs
-          }
-          Some(k -> v)
-        case None =>
-          // important: update need to merge previous valid values.
-          assert(oldValWithTs.ts >= lastDeletedAt)
-          Some(k -> oldValWithTs)
-      }
-    }
-    val existInNew = for {
-      (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && 
newValWithTs.ts > lastDeletedAt
-    } yield {
-      shouldReplace = true
-      Some(k -> newValWithTs)
-    }
-
-    ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
-  }
-
-  def mergeIncrement(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
-    var shouldReplace = false
-    val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
-    val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => 
v.ts).getOrElse(minTsVal)
-    val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
-      propsWithTs.get(k) match {
-        case Some(newValWithTs) =>
-          if (k == LabelMeta.timeStampSeq) {
-            val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
-            else {
-              shouldReplace = true
-              newValWithTs
-            }
-            Some(k -> v)
-          } else {
-            if (oldValWithTs.ts >= newValWithTs.ts) {
-              Some(k -> oldValWithTs)
-            } else {
-              assert(oldValWithTs.ts < newValWithTs.ts && oldValWithTs.ts >= 
lastDeletedAt)
-              shouldReplace = true
-              // incr(t0), incr(t2), d(t1) => deleted
-              Some(k -> InnerValLikeWithTs(oldValWithTs.innerVal + 
newValWithTs.innerVal, oldValWithTs.ts))
-            }
-          }
-
-        case None =>
-          assert(oldValWithTs.ts >= lastDeletedAt)
-          Some(k -> oldValWithTs)
-        //          if (oldValWithTs.ts >= lastDeletedAt) Some(k -> 
oldValWithTs) else None
-      }
-    }
-    val existInNew = for {
-      (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && 
newValWithTs.ts > lastDeletedAt
-    } yield {
-      shouldReplace = true
-      Some(k -> newValWithTs)
-    }
-
-    ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
-  }
-
-  def mergeDelete(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
-    var shouldReplace = false
-    val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
-    val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt) match {
-      case Some(prevDeletedAt) =>
-        if (prevDeletedAt.ts >= requestTs) prevDeletedAt.ts
-        else {
-          shouldReplace = true
-          requestTs
-        }
-      case None => {
-        shouldReplace = true
-        requestTs
-      }
-    }
-    val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
-      if (k == LabelMeta.timeStampSeq) {
-        if (oldValWithTs.ts >= requestTs) Some(k -> oldValWithTs)
-        else {
-          shouldReplace = true
-          Some(k -> InnerValLikeWithTs.withLong(requestTs, requestTs, version))
-        }
-      } else {
-        if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs)
-        else {
-          shouldReplace = true
-          None
-        }
-      }
-    }
-    val mustExistInNew = Map(LabelMeta.lastDeletedAt -> 
InnerValLikeWithTs.withLong(lastDeletedAt, lastDeletedAt, version))
-    ((existInOld.flatten ++ mustExistInNew).toMap, shouldReplace)
-  }
-
-  def mergeInsertBulk(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
-    val (_, propsWithTs, _, _) = propsPairWithTs
-    (propsWithTs, true)
-  }
-
-  def fromString(s: String): Option[Edge] = Graph.toEdge(s)
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/ExceptionHandler.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/ExceptionHandler.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/ExceptionHandler.scala
deleted file mode 100644
index a965e90..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/ExceptionHandler.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-package com.kakao.s2graph.core
-
-import java.util.Properties
-import java.util.concurrent.atomic.AtomicLong
-
-import akka.actor._
-import akka.routing.{Broadcast, RoundRobinPool}
-import com.typesafe.config.Config
-import org.apache.kafka.clients.producer._
-
-import scala.concurrent.duration._
-
-/**
- * Created by shon on 7/16/15.
- */
-object ExceptionHandler {
-
-  var producer: Option[Producer[Key, Val]] = None
-  var properties: Option[Properties] = None
-  val numOfRoutees = 1
-  val actorSystem = ActorSystem("ExceptionHandler")
-  var routees: Option[ActorRef] = None
-  var shutdownTime = 1000 millis
-  var phase = "dev"
-  lazy val failTopic = s"mutateFailed_${phase}"
-
-  def apply(config: Config) = {
-    properties =
-      if (config.hasPath("kafka.metadata.broker.list")) 
Option(kafkaConfig(config))
-      else None
-    phase = if (config.hasPath("phase")) config.getString("phase") else "dev"
-    producer = for {
-      props <- properties
-      p <- try {
-        Option(new KafkaProducer[Key, Val](props))
-      } catch {
-        case e: Throwable => None
-      }
-    } yield {
-        p
-      }
-    init()
-  }
-
-  def props(producer: Producer[Key, Val]) = 
Props(classOf[KafkaAggregatorActor], producer)
-
-  def init() = {
-    for {
-      p <- producer
-    } {
-      routees = 
Option(actorSystem.actorOf(RoundRobinPool(numOfRoutees).props(props(p))))
-    }
-  }
-
-  def shutdown() = {
-    routees.map(_ ! Broadcast(PoisonPill))
-    Thread.sleep(shutdownTime.length)
-  }
-
-  def enqueues(msgs: Seq[KafkaMessage]) = {
-    msgs.foreach(enqueue)
-  }
-
-  def enqueue(msg: KafkaMessage) = {
-    routees.map(_ ! msg)
-  }
-
-
-  def kafkaConfig(config: Config) = {
-    val props = new Properties();
-
-    /** all default configuration for new producer */
-    val brokers =
-      if (config.hasPath("kafka.metadata.broker.list")) 
config.getString("kafka.metadata.broker.list")
-      else "localhost"
-    props.put("bootstrap.servers", brokers)
-    props.put("acks", "1")
-    props.put("buffer.memory", "33554432")
-    props.put("compression.type", "snappy")
-    props.put("retries", "0")
-    props.put("batch.size", "16384")
-    props.put("linger.ms", "0")
-    props.put("max.request.size", "1048576")
-    props.put("receive.buffer.bytes", "32768")
-    props.put("send.buffer.bytes", "131072")
-    props.put("timeout.ms", "30000")
-    props.put("block.on.buffer.full", "false")
-    props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")
-    props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")
-    props
-  }
-
-  type Key = String
-  type Val = String
-
-  def toKafkaMessage(topic: String = failTopic, element: GraphElement, 
originalString: Option[String] = None) = {
-    KafkaMessage(new ProducerRecord[Key, Val](topic, element.queuePartitionKey,
-      originalString.getOrElse(element.toLogString())))
-  }
-
-  case class KafkaMessage(msg: ProducerRecord[Key, Val])
-
-  case class Message(topic: String, msg: String)
-
-  case class BufferedKafkaMessage(msgs: Seq[ProducerRecord[Key, Val]], 
bufferSize: Int)
-
-  case class BufferedMessage(topic: String, bufferedMsgs: String, bufferSize: 
Int)
-
-  case object FlushBuffer
-
-  case class UpdateHealth(isHealty: Boolean)
-
-  case object ShowMetrics
-
-}
-
-class KafkaAggregatorActor(kafkaProducer: Producer[String, String]) extends 
Stash with ActorLogging {
-
-  import ExceptionHandler._
-
-  val failedCount = new AtomicLong(0L)
-  val successCount = new AtomicLong(0L)
-  val stashCount = new AtomicLong(0L)
-
-  implicit val ex = context.system.dispatcher
-
-  context.system.scheduler.schedule(0 millis, 10 seconds) {
-    self ! ShowMetrics
-  }
-
-  override def receive = {
-    case ShowMetrics =>
-      log.info(s"[Stats]: failed[${failedCount.get}], 
stashed[${stashCount.get}], success[${successCount.get}]")
-
-    case m: KafkaMessage =>
-      val replayTo = self
-      try {
-        kafkaProducer.send(m.msg, new Callback() {
-          override def onCompletion(meta: RecordMetadata, e: Exception) = {
-            if (e == null) {
-              // success
-              successCount.incrementAndGet()
-              unstashAll()
-              stashCount.set(0L)
-            } else {
-              // failure
-              log.error(s"onCompletion: $e", e)
-              failedCount.incrementAndGet()
-              replayTo ! m
-            }
-          }
-        })
-      } catch {
-        case e@(_: org.apache.kafka.clients.producer.BufferExhaustedException 
| _: Throwable) =>
-          log.error(s"$e", e)
-          log.info(s"stash")
-          stash()
-          stashCount.incrementAndGet()
-      }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala
deleted file mode 100644
index fdc8553..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala
+++ /dev/null
@@ -1,381 +0,0 @@
-package com.kakao.s2graph.core
-
-import java.util
-import java.util.concurrent.ConcurrentHashMap
-
-import com.google.common.cache.CacheBuilder
-import com.kakao.s2graph.core.mysqls._
-import com.kakao.s2graph.core.parsers.WhereParser
-import com.kakao.s2graph.core.storage.Storage
-import com.kakao.s2graph.core.storage.hbase._
-import com.kakao.s2graph.core.types._
-import com.kakao.s2graph.core.utils.logger
-import com.typesafe.config.{Config, ConfigFactory}
-
-import scala.collection.JavaConversions._
-import scala.collection._
-import scala.collection.mutable.ListBuffer
-import scala.concurrent._
-import scala.util.Try
-
-object Graph {
-  val DefaultScore = 1.0
-
-  private val DefaultConfigs: Map[String, AnyRef] = Map(
-    "hbase.zookeeper.quorum" -> "localhost",
-    "hbase.table.name" -> "s2graph",
-    "hbase.table.compression.algorithm" -> "gz",
-    "phase" -> "dev",
-    "db.default.driver" -> "com.mysql.jdbc.Driver",
-    "db.default.url" -> "jdbc:mysql://localhost:3306/graph_dev",
-    "db.default.password" -> "graph",
-    "db.default.user" -> "graph",
-    "cache.max.size" -> java.lang.Integer.valueOf(10000),
-    "cache.ttl.seconds" -> java.lang.Integer.valueOf(60),
-    "hbase.client.retries.number" -> java.lang.Integer.valueOf(20),
-    "hbase.rpcs.buffered_flush_interval" -> 
java.lang.Short.valueOf(100.toShort),
-    "hbase.rpc.timeout" -> java.lang.Integer.valueOf(1000),
-    "max.retry.number" -> java.lang.Integer.valueOf(100),
-    "lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10),
-    "max.back.off" -> java.lang.Integer.valueOf(100),
-    "hbase.fail.prob" -> java.lang.Double.valueOf(-0.1),
-    "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000),
-    "future.cache.max.size" -> java.lang.Integer.valueOf(100000),
-    "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
-    "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
-    "s2graph.storage.backend" -> "hbase"
-  )
-
-  var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)
-
-  /** helpers for filterEdges */
-  type HashKey = (Int, Int, Int, Int, Boolean)
-  type FilterHashKey = (Int, Int)
-  type Result = (ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]],
-    ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)],
-    ListBuffer[(HashKey, FilterHashKey, Edge, Double)])
-
-  def toHashKey(queryParam: QueryParam, edge: Edge, isDegree: Boolean): 
(HashKey, FilterHashKey) = {
-    val src = edge.srcVertex.innerId.hashCode()
-    val tgt = edge.tgtVertex.innerId.hashCode()
-    val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, 
isDegree)
-    val filterHashKey = (src, tgt)
-
-    (hashKey, filterHashKey)
-  }
-
-  def alreadyVisitedVertices(queryResultLs: Seq[QueryResult]): 
Map[(LabelWithDirection, Vertex), Boolean] = {
-    val vertices = for {
-      queryResult <- queryResultLs
-      edgeWithScore <- queryResult.edgeWithScoreLs
-      edge = edgeWithScore.edge
-      vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) 
edge.tgtVertex else edge.srcVertex
-    } yield (edge.labelWithDir, vertex) -> true
-
-    vertices.toMap
-  }
-
-  /** common methods for filter out, transform, aggregate queryResult */
-  def convertEdges(queryParam: QueryParam, edge: Edge, nextStepOpt: 
Option[Step]): Seq[Edge] = {
-    for {
-      convertedEdge <- queryParam.transformer.transform(edge, nextStepOpt) if 
!edge.isDegree
-    } yield convertedEdge
-  }
-
-  def processTimeDecay(queryParam: QueryParam, edge: Edge) = {
-    /** process time decay */
-    val tsVal = queryParam.timeDecay match {
-      case None => 1.0
-      case Some(timeDecay) =>
-        val tsVal = try {
-          val labelMeta = edge.label.metaPropsMap(timeDecay.labelMetaSeq)
-          val innerValWithTsOpt = edge.propsWithTs.get(timeDecay.labelMetaSeq)
-          innerValWithTsOpt.map { innerValWithTs =>
-            val innerVal = innerValWithTs.innerVal
-            labelMeta.dataType match {
-              case InnerVal.LONG => innerVal.value match {
-                case n: BigDecimal => n.bigDecimal.longValue()
-                case _ => innerVal.toString().toLong
-              }
-              case _ => innerVal.toString().toLong
-            }
-          } getOrElse(edge.ts)
-        } catch {
-          case e: Exception =>
-            logger.error(s"processTimeDecay error. ${edge.toLogString}", e)
-            edge.ts
-        }
-        val timeDiff = queryParam.timestamp - tsVal
-        timeDecay.decay(timeDiff)
-    }
-
-    tsVal
-  }
-
-  def aggregateScore(newScore: Double,
-                     resultEdges: ConcurrentHashMap[HashKey, (FilterHashKey, 
Edge, Double)],
-                     duplicateEdges: ConcurrentHashMap[HashKey, 
ListBuffer[(Edge, Double)]],
-                     edgeWithScoreSorted: ListBuffer[(HashKey, FilterHashKey, 
Edge, Double)],
-                     hashKey: HashKey,
-                     filterHashKey: FilterHashKey,
-                     queryParam: QueryParam,
-                     convertedEdge: Edge) = {
-
-    /** skip duplicate policy check if consistencyLevel is strong */
-    if (queryParam.label.consistencyLevel != "strong" && 
resultEdges.containsKey(hashKey)) {
-      val (oldFilterHashKey, oldEdge, oldScore) = resultEdges.get(hashKey)
-      //TODO:
-      queryParam.duplicatePolicy match {
-        case Query.DuplicatePolicy.First => // do nothing
-        case Query.DuplicatePolicy.Raw =>
-          if (duplicateEdges.containsKey(hashKey)) {
-            duplicateEdges.get(hashKey).append(convertedEdge -> newScore)
-          } else {
-            val newBuffer = new ListBuffer[(Edge, Double)]
-            newBuffer.append(convertedEdge -> newScore)
-            duplicateEdges.put(hashKey, newBuffer)
-          }
-        case Query.DuplicatePolicy.CountSum =>
-          resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + 1))
-        case _ =>
-          resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + 
newScore))
-      }
-    } else {
-      resultEdges.put(hashKey, (filterHashKey, convertedEdge, newScore))
-      edgeWithScoreSorted.append((hashKey, filterHashKey, convertedEdge, 
newScore))
-    }
-  }
-
-  def aggregateResults(queryRequestWithResult: QueryRequestWithResult,
-                       queryParamResult: Result,
-                       edgesToInclude: util.HashSet[FilterHashKey],
-                       edgesToExclude: util.HashSet[FilterHashKey]): 
QueryRequestWithResult = {
-    val (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
-    val (query, stepIdx, _, queryParam) = 
QueryRequest.unapply(queryRequest).get
-
-    val (duplicateEdges, resultEdges, edgeWithScoreSorted) = queryParamResult
-    val edgesWithScores = for {
-      (hashKey, filterHashKey, edge, _) <- edgeWithScoreSorted if 
!edgesToExclude.contains(filterHashKey) || 
edgesToInclude.contains(filterHashKey)
-      score = resultEdges.get(hashKey)._3
-      (duplicateEdge, aggregatedScore) <- fetchDuplicatedEdges(edge, score, 
hashKey, duplicateEdges) if aggregatedScore >= queryParam.threshold
-    } yield EdgeWithScore(duplicateEdge, aggregatedScore)
-
-    QueryRequestWithResult(queryRequest, QueryResult(edgesWithScores))
-  }
-
-  def fetchDuplicatedEdges(edge: Edge,
-                           score: Double,
-                           hashKey: HashKey,
-                           duplicateEdges: ConcurrentHashMap[HashKey, 
ListBuffer[(Edge, Double)]]) = {
-    (edge -> score) +: (if (duplicateEdges.containsKey(hashKey)) 
duplicateEdges.get(hashKey) else Seq.empty)
-  }
-
-  def queryResultWithFilter(queryRequestWithResult: QueryRequestWithResult) = {
-    val (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
-    val (_, _, _, queryParam) = QueryRequest.unapply(queryRequest).get
-    val whereFilter = queryParam.where.get
-    if (whereFilter == WhereParser.success) queryResult.edgeWithScoreLs
-    else queryResult.edgeWithScoreLs.withFilter(edgeWithScore => 
whereFilter.filter(edgeWithScore.edge))
-  }
-
-  def filterEdges(queryResultLsFuture: Future[Seq[QueryRequestWithResult]],
-                  alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = 
Map.empty[(LabelWithDirection, Vertex), Boolean])
-                 (implicit ec: scala.concurrent.ExecutionContext): 
Future[Seq[QueryRequestWithResult]] = {
-
-    queryResultLsFuture.map { queryRequestWithResultLs =>
-      if (queryRequestWithResultLs.isEmpty) Nil
-      else {
-        val (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResultLs.head).get
-        val (q, stepIdx, srcVertex, queryParam) = 
QueryRequest.unapply(queryRequest).get
-        val step = q.steps(stepIdx)
-
-        val nextStepOpt = if (stepIdx < q.steps.size - 1) 
Option(q.steps(stepIdx + 1)) else None
-
-        val excludeLabelWithDirSet = new util.HashSet[(Int, Int)]
-        val includeLabelWithDirSet = new util.HashSet[(Int, Int)]
-        step.queryParams.filter(_.exclude).foreach(l => 
excludeLabelWithDirSet.add(l.labelWithDir.labelId -> l.labelWithDir.dir))
-        step.queryParams.filter(_.include).foreach(l => 
includeLabelWithDirSet.add(l.labelWithDir.labelId -> l.labelWithDir.dir))
-
-        val edgesToExclude = new util.HashSet[FilterHashKey]()
-        val edgesToInclude = new util.HashSet[FilterHashKey]()
-
-        val queryParamResultLs = new ListBuffer[Result]
-        queryRequestWithResultLs.foreach { queryRequestWithResult =>
-          val (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
-          val queryParam = queryRequest.queryParam
-          val duplicateEdges = new util.concurrent.ConcurrentHashMap[HashKey, 
ListBuffer[(Edge, Double)]]()
-          val resultEdges = new util.concurrent.ConcurrentHashMap[HashKey, 
(FilterHashKey, Edge, Double)]()
-          val edgeWithScoreSorted = new ListBuffer[(HashKey, FilterHashKey, 
Edge, Double)]
-          val labelWeight = 
step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0)
-
-          // store degree value with Array.empty so if degree edge exist, it 
comes at very first.
-          def checkDegree() = queryResult.edgeWithScoreLs.headOption.exists { 
edgeWithScore =>
-            edgeWithScore.edge.isDegree
-          }
-          var isDegree = checkDegree()
-
-          val includeExcludeKey = queryParam.labelWithDir.labelId -> 
queryParam.labelWithDir.dir
-          val shouldBeExcluded = 
excludeLabelWithDirSet.contains(includeExcludeKey)
-          val shouldBeIncluded = 
includeLabelWithDirSet.contains(includeExcludeKey)
-
-          queryResultWithFilter(queryRequestWithResult).foreach { 
edgeWithScore =>
-            val (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-            if (queryParam.transformer.isDefault) {
-              val convertedEdge = edge
-
-              val (hashKey, filterHashKey) = toHashKey(queryParam, 
convertedEdge, isDegree)
-
-              /** check if this edge should be exlcuded. */
-              if (shouldBeExcluded && !isDegree) {
-                edgesToExclude.add(filterHashKey)
-              } else {
-                if (shouldBeIncluded && !isDegree) {
-                  edgesToInclude.add(filterHashKey)
-                }
-                val tsVal = processTimeDecay(queryParam, convertedEdge)
-                val newScore = labelWeight * score * tsVal
-                aggregateScore(newScore, resultEdges, duplicateEdges, 
edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge)
-              }
-            } else {
-              convertEdges(queryParam, edge, nextStepOpt).foreach { 
convertedEdge =>
-                val (hashKey, filterHashKey) = toHashKey(queryParam, 
convertedEdge, isDegree)
-
-                /** check if this edge should be exlcuded. */
-                if (shouldBeExcluded && !isDegree) {
-                  edgesToExclude.add(filterHashKey)
-                } else {
-                  if (shouldBeIncluded && !isDegree) {
-                    edgesToInclude.add(filterHashKey)
-                  }
-                  val tsVal = processTimeDecay(queryParam, convertedEdge)
-                  val newScore = labelWeight * score * tsVal
-                  aggregateScore(newScore, resultEdges, duplicateEdges, 
edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge)
-                }
-              }
-            }
-            isDegree = false
-          }
-          val ret = (duplicateEdges, resultEdges, edgeWithScoreSorted)
-          queryParamResultLs.append(ret)
-        }
-
-        val aggregatedResults = for {
-          (queryRequestWithResult, queryParamResult) <- 
queryRequestWithResultLs.zip(queryParamResultLs)
-        } yield {
-            aggregateResults(queryRequestWithResult, queryParamResult, 
edgesToInclude, edgesToExclude)
-          }
-
-        aggregatedResults
-      }
-    }
-  }
-
-  def toGraphElement(s: String, labelMapping: Map[String, String] = 
Map.empty): Option[GraphElement] = Try {
-    val parts = GraphUtil.split(s)
-    val logType = parts(2)
-    val element = if (logType == "edge" | logType == "e") {
-      /** current only edge is considered to be bulk loaded */
-      labelMapping.get(parts(5)) match {
-        case None =>
-        case Some(toReplace) =>
-          parts(5) = toReplace
-      }
-      toEdge(parts)
-    } else if (logType == "vertex" | logType == "v") {
-      toVertex(parts)
-    } else {
-      throw new GraphExceptions.JsonParseException("log type is not exist in 
log.")
-    }
-
-    element
-  } recover {
-    case e: Exception =>
-      logger.error(s"$e", e)
-      None
-  } get
-
-
-  def toVertex(s: String): Option[Vertex] = {
-    toVertex(GraphUtil.split(s))
-  }
-
-  def toEdge(s: String): Option[Edge] = {
-    toEdge(GraphUtil.split(s))
-  }
-
-  //"1418342849000\tu\te\t3286249\t71770\ttalk_friend\t{\"is_hidden\":false}"
-  //{"from":1,"to":101,"label":"graph_test","props":{"time":-1, 
"weight":10},"timestamp":1417616431},
-  def toEdge(parts: Array[String]): Option[Edge] = Try {
-    val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), 
parts(2), parts(3), parts(4), parts(5))
-    val props = if (parts.length >= 7) parts(6) else "{}"
-    val tempDirection = if (parts.length >= 8) parts(7) else "out"
-    val direction = if (tempDirection != "out" && tempDirection != "in") "out" 
else tempDirection
-
-    val edge = Management.toEdge(ts.toLong, operation, srcId, tgtId, label, 
direction, props)
-    //            logger.debug(s"toEdge: $edge")
-    Some(edge)
-  } recover {
-    case e: Exception =>
-      logger.error(s"toEdge: $e", e)
-      throw e
-  } get
-
-  //"1418342850000\ti\tv\t168756793\ttalk_user_id\t{\"country_iso\":\"KR\"}"
-  def toVertex(parts: Array[String]): Option[Vertex] = Try {
-    val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), 
parts(1), parts(2), parts(3), parts(4), parts(5))
-    val props = if (parts.length >= 7) parts(6) else "{}"
-    Some(Management.toVertex(ts.toLong, operation, srcId, serviceName, 
colName, props))
-  } recover {
-    case e: Throwable =>
-      logger.error(s"toVertex: $e", e)
-      throw e
-  } get
-
-  def initStorage(config: Config)(ec: ExecutionContext) = {
-    config.getString("s2graph.storage.backend") match {
-      case "hbase" => new AsynchbaseStorage(config)(ec)
-      case _ => throw new RuntimeException("not supported storage.")
-    }
-  }
-}
-
-class Graph(_config: Config)(implicit val ec: ExecutionContext) {
-  val config = _config.withFallback(Graph.DefaultConfig)
-
-  Model.apply(config)
-  Model.loadCache()
-
-  // TODO: Make storage client by config param
-  val storage = Graph.initStorage(config)(ec)
-
-
-  for {
-    entry <- config.entrySet() if Graph.DefaultConfigs.contains(entry.getKey)
-    (k, v) = (entry.getKey, entry.getValue)
-  } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}")
-
-  /** select */
-  def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): 
Future[Seq[QueryRequestWithResult]] = storage.checkEdges(params)
-
-  def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = 
storage.getEdges(q)
-
-  def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = 
storage.getVertices(vertices)
-
-  /** write */
-  def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], 
dir: Int, ts: Long): Future[Boolean] =
-    storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts)
-
-  def mutateElements(elements: Seq[GraphElement], withWait: Boolean = false): 
Future[Seq[Boolean]] =
-    storage.mutateElements(elements, withWait)
-
-  def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): 
Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait)
-
-  def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): 
Future[Seq[Boolean]] = storage.mutateVertices(vertices, withWait)
-
-  def incrementCounts(edges: Seq[Edge], withWait: Boolean): 
Future[Seq[(Boolean, Long)]] = storage.incrementCounts(edges, withWait)
-
-  def shutdown(): Unit = {
-    storage.flush()
-    Model.shutdown()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/GraphElement.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/GraphElement.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/GraphElement.scala
deleted file mode 100644
index e6c043d..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/GraphElement.scala
+++ /dev/null
@@ -1,12 +0,0 @@
-package com.kakao.s2graph.core
-
-import org.hbase.async.{HBaseRpc}
-
-trait GraphElement {
-  def serviceName: String
-  def ts: Long
-  def isAsync: Boolean
-  def queueKey: String
-  def queuePartitionKey: String
-  def toLogString(): String
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/GraphExceptions.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/GraphExceptions.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/GraphExceptions.scala
deleted file mode 100644
index fa186c2..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/GraphExceptions.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-package com.kakao.s2graph.core
-
-
-object GraphExceptions {
-
-  case class JsonParseException(msg: String) extends Exception(msg)
-
-  case class LabelNotExistException(msg: String) extends Exception(msg)
-
-  case class ModelNotFoundException(msg: String) extends Exception(msg)
-
-  case class MaxPropSizeReachedException(msg: String) extends Exception(msg)
-
-  case class LabelAlreadyExistException(msg: String) extends Exception(msg)
-
-  case class InternalException(msg: String) extends Exception(msg)
-
-  case class IllegalDataTypeException(msg: String) extends Exception(msg)
-
-  case class WhereParserException(msg: String, ex: Exception = null) extends 
Exception(msg, ex)
-
-  case class BadQueryException(msg: String, ex: Throwable = null) extends 
Exception(msg, ex)
-
-  case class InvalidHTableException(msg: String) extends Exception(msg)
-
-  case class FetchTimeoutException(msg: String) extends Exception(msg)
-
-  case class DropRequestException(msg: String) extends Exception(msg)
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/GraphUtil.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/GraphUtil.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/GraphUtil.scala
deleted file mode 100644
index 0359963..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/GraphUtil.scala
+++ /dev/null
@@ -1,138 +0,0 @@
-package com.kakao.s2graph.core
-
-import scala.util.Random
-import scala.util.hashing.MurmurHash3
-import java.util.regex.Pattern
-import play.api.libs.json.Json
-
-object GraphUtil {
-  private val TOKEN_DELIMITER = Pattern.compile("[\t]")
-  val operations = Map("i" -> 0, "insert" -> 0, "u" -> 1, "update" -> 1,
-    "increment" -> 2, "d" -> 3, "delete" -> 3,
-    "deleteAll" -> 4, "insertBulk" -> 5, "incrementCount" -> 6).map {
-    case (k, v) =>
-      k -> v.toByte
-  }
-  val BitsForMurMurHash = 16
-  val bytesForMurMurHash = 2
-  val defaultOpByte = operations("insert")
-  val directions = Map("out" -> 0, "in" -> 1, "undirected" -> 2, "u" -> 2, 
"directed" -> 0, "d" -> 0)
-  val consistencyLevel = Map("weak" -> 0, "strong" -> 1)
-
-  def toType(t: String) = {
-    t.trim().toLowerCase match {
-      case "e" | "edge" => "edge"
-      case "v" | "vertex" => "vertex"
-    }
-  }
-
-  def toDir(direction: String): Option[Byte] = {
-    val d = direction.trim().toLowerCase match {
-      case "directed" | "d" => Some(0)
-      case "undirected" | "u" => Some(2)
-      case "out" => Some(0)
-      case "in" => Some(1)
-      case _ => None
-    }
-    d.map(x => x.toByte)
-  }
-
-  def toDirection(direction: String): Int = {
-    direction.trim().toLowerCase match {
-      case "directed" | "d" => 0
-      case "undirected" | "u" => 2
-      case "out" => 0
-      case "in" => 1
-      case _ => 2
-    }
-  }
-
-  def fromDirection(direction: Int) = {
-    direction match {
-      case 0 => "out"
-      case 1 => "in"
-      case 2 => "undirected"
-    }
-  }
-
-  def toggleDir(dir: Int) = {
-    dir match {
-      case 0 => 1
-      case 1 => 0
-      case 2 => 2
-      case _ => throw new UnsupportedOperationException(s"toggleDirection: 
$dir")
-    }
-  }
-
-  def toOp(op: String): Option[Byte] = {
-    op.trim() match {
-      case "i" | "insert" => Some(0)
-      case "d" | "delete" => Some(3)
-      case "u" | "update" => Some(1)
-      case "increment" => Some(2)
-      case "deleteAll" => Some(4)
-      case "insertBulk" => Some(5)
-      case "incrementCount" => Option(6)
-      case _ => None
-    }
-  }
-
-  def fromOp(op: Byte): String = {
-    op match {
-      case 0 => "insert"
-      case 3 => "delete"
-      case 1 => "update"
-      case 2 => "increment"
-      case 4 => "deleteAll"
-      case 5 => "insertBulk"
-      case 6 => "incrementCount"
-      case _ =>
-        throw new UnsupportedOperationException(s"op : $op (only support 
0(insert),1(delete),2(updaet),3(increment))")
-    }
-  }
-
-  //  def toggleOp(op: Byte): Byte = {
-  //    val ret = op match {
-  //      case 0 => 1
-  //      case 1 => 0
-  //      case x: Byte => x
-  //    }
-  //    ret.toByte
-  //  }
-  // 2^31 - 1
-
-  def transformHash(h: Int): Int = {
-    //    h / 2 + (Int.MaxValue / 2 - 1)
-    if (h < 0) -1 * (h + 1) else h
-  }
-  def murmur3Int(s: String): Int = {
-    val hash = MurmurHash3.stringHash(s)
-    transformHash(hash)
-  }
-  def murmur3(s: String): Short = {
-    val hash = MurmurHash3.stringHash(s)
-    val positiveHash = transformHash(hash) >> BitsForMurMurHash
-    positiveHash.toShort
-//    Random.nextInt(Short.MaxValue).toShort
-  }
-
-  def smartSplit(s: String, delemiter: String) = {
-    val trimed_string = s.trim()
-    if (trimed_string.equals("")) {
-      Seq[String]()
-    } else {
-      trimed_string.split(delemiter).toList
-    }
-  }
-
-  def split(line: String) = TOKEN_DELIMITER.split(line)
-
-  def parseString(s: String): List[String] = {
-    if (!s.startsWith("[")) {
-      s.split("\n").toList
-    } else {
-      Json.parse(s).asOpt[List[String]].getOrElse(List.empty[String])
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/JSONParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/JSONParser.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/JSONParser.scala
deleted file mode 100644
index 9663f69..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/JSONParser.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-package com.kakao.s2graph.core
-
-import com.kakao.s2graph.core.types.{InnerVal, InnerValLike}
-import com.kakao.s2graph.core.utils.logger
-import play.api.libs.json._
-
-
-trait JSONParser {
-
-  //TODO: check result notation on bigDecimal.
-  def innerValToJsValue(innerVal: InnerValLike, dataType: String): 
Option[JsValue] = {
-    try {
-      val dType = InnerVal.toInnerDataType(dataType)
-      val jsValue = dType match {
-        case InnerVal.STRING => JsString(innerVal.value.asInstanceOf[String])
-        case InnerVal.BOOLEAN => 
JsBoolean(innerVal.value.asInstanceOf[Boolean])
-        case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | 
InnerVal.FLOAT | InnerVal.DOUBLE =>
-          //        case t if InnerVal.NUMERICS.contains(t) =>
-          innerVal.value match {
-            case l: Long => JsNumber(l)
-            case i: Int => JsNumber(i)
-            case s: Short => JsNumber(s.toLong)
-            case b: Byte => JsNumber(b.toLong)
-            case f: Float => JsNumber(f.toDouble)
-            case d: Double =>
-              //              JsNumber(d)
-              dType match {
-                case InnerVal.BYTE => JsNumber(d.toInt)
-                case InnerVal.SHORT => JsNumber(d.toInt)
-                case InnerVal.INT => JsNumber(d.toInt)
-                case InnerVal.LONG => JsNumber(d.toLong)
-                case InnerVal.FLOAT => JsNumber(d.toDouble)
-                case InnerVal.DOUBLE => JsNumber(d.toDouble)
-                case _ => throw new RuntimeException(s"$innerVal, $dType => 
$dataType")
-              }
-            case num: BigDecimal =>
-              //              JsNumber(num)
-              //              
JsNumber(InnerVal.scaleNumber(num.asInstanceOf[BigDecimal], dType))
-              dType match {
-                case InnerVal.BYTE => JsNumber(num.toInt)
-                case InnerVal.SHORT => JsNumber(num.toInt)
-                case InnerVal.INT => JsNumber(num.toInt)
-                case InnerVal.LONG => JsNumber(num.toLong)
-                case InnerVal.FLOAT => JsNumber(num.toDouble)
-                case InnerVal.DOUBLE => JsNumber(num.toDouble)
-                case _ => throw new RuntimeException(s"$innerVal, $dType => 
$dataType")
-              }
-            //              JsNumber(num.toLong)
-            case _ => throw new RuntimeException(s"$innerVal, Numeric Unknown 
=> $dataType")
-          }
-        //          
JsNumber(InnerVal.scaleNumber(innerVal.asInstanceOf[BigDecimal], dType))
-        case _ => throw new RuntimeException(s"$innerVal, Unknown => 
$dataType")
-      }
-      Some(jsValue)
-    } catch {
-      case e: Exception =>
-        logger.info(s"JSONParser.innerValToJsValue: $e")
-        None
-    }
-  }
-
-  //  def innerValToString(innerVal: InnerValLike, dataType: String): String = 
{
-  //    val dType = InnerVal.toInnerDataType(dataType)
-  //    InnerVal.toInnerDataType(dType) match {
-  //      case InnerVal.STRING => innerVal.toString
-  //      case InnerVal.BOOLEAN => innerVal.toString
-  //      //      case t if InnerVal.NUMERICS.contains(t)  =>
-  //      case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | 
InnerVal.FLOAT | InnerVal.DOUBLE =>
-  //        BigDecimal(innerVal.toString).bigDecimal.toPlainString
-  //      case _ => innerVal.toString
-  //      //        throw new RuntimeException("innerVal to jsValue failed.")
-  //    }
-  //  }
-
-  def toInnerVal(str: String, dataType: String, version: String): InnerValLike 
= {
-    //TODO:
-    //        logger.error(s"toInnerVal: $str, $dataType, $version")
-    val s =
-      if (str.startsWith("\"") && str.endsWith("\"")) str.substring(1, 
str.length - 1)
-      else str
-    val dType = InnerVal.toInnerDataType(dataType)
-
-    dType match {
-      case InnerVal.STRING => InnerVal.withStr(s, version)
-      //      case t if InnerVal.NUMERICS.contains(t) => 
InnerVal.withNumber(BigDecimal(s), version)
-      case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | 
InnerVal.FLOAT | InnerVal.DOUBLE =>
-        InnerVal.withNumber(BigDecimal(s), version)
-      case InnerVal.BOOLEAN => InnerVal.withBoolean(s.toBoolean, version)
-      case InnerVal.BLOB => InnerVal.withBlob(s.getBytes, version)
-      case _ =>
-        //        InnerVal.withStr("")
-        throw new RuntimeException(s"illegal datatype for string: dataType is 
$dataType for $s")
-    }
-  }
-
-  def jsValueToInnerVal(jsValue: JsValue, dataType: String, version: String): 
Option[InnerValLike] = {
-    val ret = try {
-      val dType = InnerVal.toInnerDataType(dataType.toLowerCase())
-      jsValue match {
-        case n: JsNumber =>
-          dType match {
-            case InnerVal.STRING => Some(InnerVal.withStr(jsValue.toString, 
version))
-            //            case t if InnerVal.NUMERICS.contains(t) =>
-            case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG 
| InnerVal.FLOAT | InnerVal.DOUBLE =>
-              Some(InnerVal.withNumber(n.value, version))
-            case _ => None
-          }
-        case s: JsString =>
-          dType match {
-            case InnerVal.STRING => Some(InnerVal.withStr(s.value, version))
-            case InnerVal.BOOLEAN => 
Some(InnerVal.withBoolean(s.as[String].toBoolean, version))
-            //            case t if InnerVal.NUMERICS.contains(t) =>
-            case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG 
| InnerVal.FLOAT | InnerVal.DOUBLE =>
-              Some(InnerVal.withNumber(BigDecimal(s.value), version))
-            case _ => None
-          }
-        case b: JsBoolean =>
-          dType match {
-            case InnerVal.STRING => Some(InnerVal.withStr(b.toString, version))
-            case InnerVal.BOOLEAN => Some(InnerVal.withBoolean(b.value, 
version))
-            case _ => None
-          }
-        case _ =>
-          None
-      }
-    } catch {
-      case e: Exception =>
-        logger.error(e.getMessage)
-        None
-    }
-
-    ret
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala
deleted file mode 100644
index ccf9d1f..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala
+++ /dev/null
@@ -1,368 +0,0 @@
-package com.kakao.s2graph.core
-
-
-import com.kakao.s2graph.core.GraphExceptions.{InvalidHTableException, 
LabelAlreadyExistException, LabelNotExistException}
-import com.kakao.s2graph.core.Management.JsonModel.{Index, Prop}
-import com.kakao.s2graph.core.mysqls._
-import com.kakao.s2graph.core.types.HBaseType._
-import com.kakao.s2graph.core.types._
-import play.api.libs.json.Reads._
-import play.api.libs.json._
-import scala.util.Try
-
-/**
- * This is designed to be bridge between rest to s2core.
- * s2core never use this for finding models.
- */
-object Management extends JSONParser {
-
-  object JsonModel {
-
-    case class Prop(name: String, defaultValue: String, datatType: String)
-
-    object Prop extends ((String, String, String) => Prop)
-
-    case class Index(name: String, propNames: Seq[String])
-
-  }
-
-  import HBaseType._
-
-  val DefaultCompressionAlgorithm = "gz"
-
-
-  def findService(serviceName: String) = {
-    Service.findByName(serviceName, useCache = false)
-  }
-
-  def deleteService(serviceName: String) = {
-    Service.findByName(serviceName).foreach { service =>
-      //      service.deleteAll()
-    }
-  }
-
-  def updateHTable(labelName: String, newHTableName: String): Try[Int] = Try {
-    val targetLabel = Label.findByName(labelName).getOrElse(throw new 
LabelNotExistException(s"Target label $labelName does not exist."))
-    if (targetLabel.hTableName == newHTableName) throw new 
InvalidHTableException(s"New HTable name is already in use for target label.")
-
-    Label.updateHTableName(targetLabel.label, newHTableName)
-  }
-
-
-
-  def createServiceColumn(serviceName: String,
-                          columnName: String,
-                          columnType: String,
-                          props: Seq[Prop],
-                          schemaVersion: String = DEFAULT_VERSION) = {
-
-    Model withTx { implicit session =>
-      val serviceOpt = Service.findByName(serviceName)
-      serviceOpt match {
-        case None => throw new RuntimeException(s"create service $serviceName 
has not been created.")
-        case Some(service) =>
-          val serviceColumn = ServiceColumn.findOrInsert(service.id.get, 
columnName, Some(columnType), schemaVersion)
-          for {
-            Prop(propName, defaultValue, dataType) <- props
-          } yield {
-            ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType)
-          }
-      }
-    }
-  }
-
-  def deleteColumn(serviceName: String, columnName: String, schemaVersion: 
String = DEFAULT_VERSION) = {
-    Model withTx { implicit session =>
-      val service = Service.findByName(serviceName, useCache = 
false).getOrElse(throw new RuntimeException("Service not Found"))
-      val serviceColumns = ServiceColumn.find(service.id.get, columnName, 
useCache = false)
-      val columnNames = serviceColumns.map { serviceColumn =>
-        ServiceColumn.delete(serviceColumn.id.get)
-        serviceColumn.columnName
-      }
-
-      columnNames.getOrElse(throw new RuntimeException("column not found"))
-    }
-  }
-
-  def findLabel(labelName: String): Option[Label] = {
-    Label.findByName(labelName, useCache = false)
-  }
-
-  def deleteLabel(labelName: String) = {
-    Model withTx { implicit session =>
-      Label.findByName(labelName, useCache = false).foreach { label =>
-        Label.deleteAll(label)
-      }
-      labelName
-    }
-  }
-
-  def addIndex(labelStr: String, indices: Seq[Index]): Try[Label] = {
-    Model withTx { implicit session =>
-      val label = Label.findByName(labelStr).getOrElse(throw 
LabelNotExistException(s"$labelStr not found"))
-      val labelMetaMap = label.metaPropsInvMap
-
-      indices.foreach { index =>
-        val metaSeq = index.propNames.map { name => labelMetaMap(name).seq }
-        LabelIndex.findOrInsert(label.id.get, index.name, metaSeq.toList, 
"none")
-      }
-
-      label
-    }
-  }
-
-  def addProp(labelStr: String, prop: Prop) = {
-    Model withTx { implicit session =>
-      val labelOpt = Label.findByName(labelStr)
-      val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr 
not found"))
-
-      LabelMeta.findOrInsert(label.id.get, prop.name, prop.defaultValue, 
prop.datatType)
-    }
-  }
-
-  def addProps(labelStr: String, props: Seq[Prop]) = {
-    Model withTx { implicit session =>
-      val labelOpt = Label.findByName(labelStr)
-      val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr 
not found"))
-
-      props.map {
-        case Prop(propName, defaultValue, dataType) =>
-          LabelMeta.findOrInsert(label.id.get, propName, defaultValue, 
dataType)
-      }
-    }
-  }
-
-  def addVertexProp(serviceName: String,
-                    columnName: String,
-                    propsName: String,
-                    propsType: String,
-                    schemaVersion: String = DEFAULT_VERSION): ColumnMeta = {
-    val result = for {
-      service <- Service.findByName(serviceName, useCache = false)
-      serviceColumn <- ServiceColumn.find(service.id.get, columnName)
-    } yield {
-        ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType)
-      }
-    result.getOrElse({
-      throw new RuntimeException(s"add property on vertex failed")
-    })
-  }
-
-  def getServiceLable(label: String): Option[Label] = {
-    Label.findByName(label, useCache = true)
-  }
-
-  /**
-   *
-   */
-
-  def toLabelWithDirectionAndOp(label: Label, direction: String): 
Option[LabelWithDirection] = {
-    for {
-      labelId <- label.id
-      dir = GraphUtil.toDirection(direction)
-    } yield LabelWithDirection(labelId, dir)
-  }
-
-  def tryOption[A, R](key: A, f: A => Option[R]) = {
-    f(key) match {
-      case None => throw new GraphExceptions.InternalException(s"$key is not 
found in DB. create $key first.")
-      case Some(r) => r
-    }
-  }
-
-  def toEdge(ts: Long, operation: String, srcId: String, tgtId: String,
-             labelStr: String, direction: String = "", props: String): Edge = {
-
-    val label = tryOption(labelStr, getServiceLable)
-    val dir =
-      if (direction == "")
-//        GraphUtil.toDirection(label.direction)
-        GraphUtil.directions("out")
-      else
-        GraphUtil.toDirection(direction)
-
-    //    logger.debug(s"$srcId, ${label.srcColumnWithDir(dir)}")
-    //    logger.debug(s"$tgtId, ${label.tgtColumnWithDir(dir)}")
-
-    val srcVertexId = toInnerVal(srcId, label.srcColumn.columnType, 
label.schemaVersion)
-    val tgtVertexId = toInnerVal(tgtId, label.tgtColumn.columnType, 
label.schemaVersion)
-
-    val srcColId = label.srcColumn.id.get
-    val tgtColId = label.tgtColumn.id.get
-    val (srcVertex, tgtVertex) = if (dir == GraphUtil.directions("out")) {
-      (Vertex(SourceVertexId(srcColId, srcVertexId), 
System.currentTimeMillis()),
-        Vertex(TargetVertexId(tgtColId, tgtVertexId), 
System.currentTimeMillis()))
-    } else {
-      (Vertex(SourceVertexId(tgtColId, tgtVertexId), 
System.currentTimeMillis()),
-        Vertex(TargetVertexId(srcColId, srcVertexId), 
System.currentTimeMillis()))
-    }
-
-    //    val dir = if (direction == "") 
GraphUtil.toDirection(label.direction) else GraphUtil.toDirection(direction)
-    val labelWithDir = LabelWithDirection(label.id.get, dir)
-    val op = tryOption(operation, GraphUtil.toOp)
-
-    val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj())
-    val parsedProps = toProps(label, jsObject.fields).toMap
-    val propsWithTs = parsedProps.map(kv => (kv._1 -> 
InnerValLikeWithTs(kv._2, ts))) ++
-      Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(ts, 
label.schemaVersion), ts))
-
-    Edge(srcVertex, tgtVertex, labelWithDir, op, version = ts, propsWithTs = 
propsWithTs)
-
-  }
-
-  def toVertex(ts: Long, operation: String, id: String, serviceName: String, 
columnName: String, props: String): Vertex = {
-    Service.findByName(serviceName) match {
-      case None => throw new RuntimeException(s"$serviceName does not exist. 
create service first.")
-      case Some(service) =>
-        ServiceColumn.find(service.id.get, columnName) match {
-          case None => throw new RuntimeException(s"$columnName is not exist. 
create service column first.")
-          case Some(col) =>
-            val idVal = toInnerVal(id, col.columnType, col.schemaVersion)
-            val op = tryOption(operation, GraphUtil.toOp)
-            val jsObject = 
Json.parse(props).asOpt[JsObject].getOrElse(Json.obj())
-            val parsedProps = toProps(col, jsObject).toMap
-            Vertex(VertexId(col.id.get, idVal), ts, parsedProps, op = op)
-        }
-    }
-  }
-
-  def toProps(column: ServiceColumn, js: JsObject): Seq[(Int, InnerValLike)] = 
{
-
-    val props = for {
-      (k, v) <- js.fields
-      meta <- column.metasInvMap.get(k)
-    } yield {
-        val innerVal = jsValueToInnerVal(v, meta.dataType, 
column.schemaVersion).getOrElse(
-          throw new RuntimeException(s"$k is not defined. create schema for 
vertex."))
-
-        (meta.seq.toInt, innerVal)
-      }
-    props
-
-  }
-
-  def toProps(label: Label, js: Seq[(String, JsValue)]): Seq[(Byte, 
InnerValLike)] = {
-    val props = for {
-      (k, v) <- js
-      meta <- label.metaPropsInvMap.get(k)
-      innerVal <- jsValueToInnerVal(v, meta.dataType, label.schemaVersion)
-    } yield (meta.seq, innerVal)
-
-    props
-  }
-
-
-  /**
-   * update label name.
-   */
-  def updateLabelName(oldLabelName: String, newLabelName: String) = {
-    for {
-      old <- Label.findByName(oldLabelName)
-    } {
-      Label.findByName(newLabelName) match {
-        case None =>
-          Label.updateName(oldLabelName, newLabelName)
-        case Some(_) =>
-        //          throw new RuntimeException(s"$newLabelName already exist")
-      }
-    }
-  }
-}
-
-class Management(graph: Graph) {
-  import Management._
-  val storage = graph.storage
-
-  def createTable(zkAddr: String,
-                  tableName: String,
-                  cfs: List[String],
-                  regionMultiplier: Int,
-                  ttl: Option[Int],
-                  compressionAlgorithm: String = DefaultCompressionAlgorithm): 
Unit =
-    storage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, 
compressionAlgorithm)
-
-  /** HBase specific code */
-  def createService(serviceName: String,
-                    cluster: String, hTableName: String,
-                    preSplitSize: Int, hTableTTL: Option[Int],
-                    compressionAlgorithm: String = 
DefaultCompressionAlgorithm): Try[Service] = {
-
-    Model withTx { implicit session =>
-      val service = Service.findOrInsert(serviceName, cluster, hTableName, 
preSplitSize, hTableTTL, compressionAlgorithm)
-      /** create hbase table for service */
-      storage.createTable(cluster, hTableName, List("e", "v"), preSplitSize, 
hTableTTL, compressionAlgorithm)
-      service
-    }
-  }
-
-  /** HBase specific code */
-  def createLabel(label: String,
-                  srcServiceName: String,
-                  srcColumnName: String,
-                  srcColumnType: String,
-                  tgtServiceName: String,
-                  tgtColumnName: String,
-                  tgtColumnType: String,
-                  isDirected: Boolean = true,
-                  serviceName: String,
-                  indices: Seq[Index],
-                  props: Seq[Prop],
-                  consistencyLevel: String,
-                  hTableName: Option[String],
-                  hTableTTL: Option[Int],
-                  schemaVersion: String = DEFAULT_VERSION,
-                  isAsync: Boolean,
-                  compressionAlgorithm: String = "gz"): Try[Label] = {
-
-    val labelOpt = Label.findByName(label, useCache = false)
-
-    Model withTx { implicit session =>
-      labelOpt match {
-        case Some(l) =>
-          throw new GraphExceptions.LabelAlreadyExistException(s"Label name 
${l.label} already exist.")
-        case None =>
-          /** create all models */
-          val newLabel = Label.insertAll(label,
-            srcServiceName, srcColumnName, srcColumnType,
-            tgtServiceName, tgtColumnName, tgtColumnType,
-            isDirected, serviceName, indices, props, consistencyLevel,
-            hTableName, hTableTTL, schemaVersion, isAsync, 
compressionAlgorithm)
-
-          /** create hbase table */
-          val service = newLabel.service
-          (hTableName, hTableTTL) match {
-            case (None, None) => // do nothing
-            case (None, Some(hbaseTableTTL)) => throw new RuntimeException("if 
want to specify ttl, give hbaseTableName also")
-            case (Some(hbaseTableName), None) =>
-              // create own hbase table with default ttl on service level.
-              storage.createTable(service.cluster, hbaseTableName, List("e", 
"v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm)
-            case (Some(hbaseTableName), Some(hbaseTableTTL)) =>
-              // create own hbase table with own ttl.
-              storage.createTable(service.cluster, hbaseTableName, List("e", 
"v"), service.preSplitSize, hTableTTL, compressionAlgorithm)
-          }
-          newLabel
-      }
-    }
-  }
-
-  /**
-   * label
-   */
-  /**
-   * copy label when if oldLabel exist and newLabel do not exist.
-   * copy label: only used by bulk load job. not sure if we need to 
parameterize hbase cluster.
-   */
-  def copyLabel(oldLabelName: String, newLabelName: String, hTableName: 
Option[String]) = {
-    val old = Label.findByName(oldLabelName).getOrElse(throw new 
LabelAlreadyExistException(s"Old label $oldLabelName not exists."))
-    if (Label.findByName(newLabelName).isDefined) throw new 
LabelAlreadyExistException(s"New label $newLabelName already exists.")
-
-    val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, 
labelMeta.defaultValue, labelMeta.dataType) }
-    val allIndices = old.indices.map { index => Index(index.name, 
index.propNames) }
-
-    createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, 
old.srcColumnType,
-      old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType,
-      old.isDirected, old.serviceName,
-      allIndices, allProps,
-      old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, 
old.isAsync, old.compressionAlgorithm)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala
deleted file mode 100644
index 33ec7d9..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-package com.kakao.s2graph.core
-
-import com.kakao.s2graph.core.types.InnerValLike
-import play.api.libs.json.{JsNumber, JsString, JsValue}
-
-object OrderingUtil {
-
-  implicit object JsValueOrdering extends Ordering[JsValue] {
-    override def compare(x: JsValue, y: JsValue): Int = {
-      (x, y) match {
-        case (JsNumber(xv), JsNumber(yv)) =>
-          Ordering.BigDecimal.compare(xv, yv)
-        case (JsString(xv), JsString(yv)) =>
-          Ordering.String.compare(xv, yv)
-        case _ => throw new Exception(s"unsupported type")
-      }
-    }
-  }
-
-  implicit object InnerValLikeOrdering extends Ordering[InnerValLike] {
-    override def compare(x: InnerValLike, y: InnerValLike): Int = {
-      x.compare(y)
-    }
-  }
-
-  implicit object MultiValueOrdering extends Ordering[Any] {
-    override def compare(x: Any, y: Any): Int = {
-      (x, y) match {
-        case (xv: Int, yv: Int) => implicitly[Ordering[Int]].compare(xv, yv)
-        case (xv: Long, yv: Long) => implicitly[Ordering[Long]].compare(xv, yv)
-        case (xv: Double, yv: Double) => 
implicitly[Ordering[Double]].compare(xv, yv)
-        case (xv: String, yv: String) => 
implicitly[Ordering[String]].compare(xv, yv)
-        case (xv: JsValue, yv: JsValue) => 
implicitly[Ordering[JsValue]].compare(xv, yv)
-        case (xv: InnerValLike, yv: InnerValLike) => 
implicitly[Ordering[InnerValLike]].compare(xv, yv)
-      }
-    }
-  }
-
-  def TupleMultiOrdering[T](ascendingLs: Seq[Boolean])(implicit ord: 
Ordering[T]): Ordering[(T, T, T, T)] = {
-    new Ordering[(T, T, T, T)] {
-      override def compare(tx: (T, T, T, T), ty: (T, T, T, T)): Int = {
-        val len = ascendingLs.length
-        val it = ascendingLs.iterator
-        if (len >= 1) {
-          val (x, y) = it.next() match {
-            case true => tx -> ty
-            case false => ty -> tx
-          }
-          val compare1 = ord.compare(x._1, y._1)
-          if (compare1 != 0) return compare1
-        }
-
-        if (len >= 2) {
-          val (x, y) = it.next() match {
-            case true => tx -> ty
-            case false => ty -> tx
-          }
-          val compare2 = ord.compare(x._2, y._2)
-          if (compare2 != 0) return compare2
-        }
-
-        if (len >= 3) {
-          val (x, y) = it.next() match {
-            case true => tx -> ty
-            case false => ty -> tx
-          }
-          val compare3 = ord.compare(x._3, y._3)
-          if (compare3 != 0) return compare3
-        }
-
-        if (len >= 4) {
-          val (x, y) = it.next() match {
-            case true => tx -> ty
-            case false => ty -> tx
-          }
-          val compare4 = ord.compare(x._4, y._4)
-          if (compare4 != 0) return compare4
-        }
-        0
-      }
-    }
-  }
-}
-
-class SeqMultiOrdering[T](ascendingLs: Seq[Boolean], defaultAscending: Boolean 
= true)(implicit ord: Ordering[T]) extends Ordering[Seq[T]] {
-  override def compare(x: Seq[T], y: Seq[T]): Int = {
-    val xe = x.iterator
-    val ye = y.iterator
-    val oe = ascendingLs.iterator
-
-    while (xe.hasNext && ye.hasNext) {
-      val ascending = if (oe.hasNext) oe.next() else defaultAscending
-      val (xev, yev) = ascending match {
-        case true => xe.next() -> ye.next()
-        case false => ye.next() -> xe.next()
-      }
-      val res = ord.compare(xev, yev)
-      if (res != 0) return res
-    }
-
-    Ordering.Boolean.compare(xe.hasNext, ye.hasNext)
-  }
-}
-
-//class TupleMultiOrdering[T](ascendingLs: Seq[Boolean])(implicit ord: 
Ordering[T]) extends Ordering[(T, T, T, T)] {
-//  override def compare(tx: (T, T, T, T), ty: (T, T, T, T)): Int = {
-//    val len = ascendingLs.length
-//    val it = ascendingLs.iterator
-//    if (len >= 1) {
-//      val (x, y) = it.next() match {
-//        case true => tx -> ty
-//        case false => ty -> tx
-//      }
-//      val compare1 = ord.compare(x._1, y._1)
-//      if (compare1 != 0) return compare1
-//    }
-//
-//    if (len >= 2) {
-//      val (x, y) = it.next() match {
-//        case true => tx -> ty
-//        case false => ty -> tx
-//      }
-//      val compare2 = ord.compare(x._2, y._2)
-//      if (compare2 != 0) return compare2
-//    }
-//
-//    if (len >= 3) {
-//      val (x, y) = it.next() match {
-//        case true => tx -> ty
-//        case false => ty -> tx
-//      }
-//      val compare3 = ord.compare(x._3, y._3)
-//      if (compare3 != 0) return compare3
-//    }
-//
-//    if (len >= 4) {
-//      val (x, y) = it.next() match {
-//        case true => tx -> ty
-//        case false => ty -> tx
-//      }
-//      val compare4 = ord.compare(x._4, y._4)
-//      if (compare4 != 0) return compare4
-//    }
-//    0
-//  }
-//}

Reply via email to