http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
new file mode 100644
index 0000000..b641ec5
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
@@ -0,0 +1,150 @@
+package org.apache.s2graph.core.mysqls
+
+/**
+ * Created by shon on 6/3/15.
+ */
+
+import org.apache.s2graph.core.GraphExceptions.MaxPropSizeReachedException
+import org.apache.s2graph.core.{GraphExceptions, JSONParser}
+import play.api.libs.json.Json
+import scalikejdbc._
+
+object LabelMeta extends Model[LabelMeta] with JSONParser {
+
+  /** dummy sequences */
+
+  val fromSeq = -4.toByte
+  val toSeq = -5.toByte
+  val lastOpSeq = -3.toByte
+  val lastDeletedAt = -2.toByte
+  val timeStampSeq = 0.toByte
+  val countSeq = (Byte.MaxValue - 2).toByte
+  val degreeSeq = (Byte.MaxValue - 1).toByte
+  val maxValue = Byte.MaxValue
+  val emptyValue = Byte.MaxValue
+
+  /** reserved sequences */
+  //  val deleted = LabelMeta(id = Some(lastDeletedAt), labelId = 
lastDeletedAt, name = "lastDeletedAt",
+  //    seq = lastDeletedAt, defaultValue = "", dataType = "long")
+  val from = LabelMeta(id = Some(fromSeq), labelId = fromSeq, name = "_from",
+    seq = fromSeq, defaultValue = fromSeq.toString, dataType = "long")
+  val to = LabelMeta(id = Some(toSeq), labelId = toSeq, name = "_to",
+    seq = toSeq, defaultValue = toSeq.toString, dataType = "long")
+  val timestamp = LabelMeta(id = Some(-1), labelId = -1, name = "_timestamp",
+    seq = timeStampSeq, defaultValue = "0", dataType = "long")
+  val degree = LabelMeta(id = Some(-1), labelId = -1, name = "_degree",
+    seq = degreeSeq, defaultValue = "0", dataType = "long")
+  val count = LabelMeta(id = Some(-1), labelId = -1, name = "_count",
+    seq = countSeq, defaultValue = "-1", dataType = "long")
+
+  // Each reserved column(_timestamp, timestamp) has same seq number, starts 
with '_' has high priority
+  val reservedMetas = List(from, to, degree, timestamp, count).flatMap { lm => 
List(lm, lm.copy(name = lm.name.drop(1))) }.reverse
+  val reservedMetasInner = List(from, to, degree, timestamp, count)
+
+  def apply(rs: WrappedResultSet): LabelMeta = {
+    LabelMeta(Some(rs.int("id")), rs.int("label_id"), rs.string("name"), 
rs.byte("seq"), rs.string("default_value"), rs.string("data_type").toLowerCase)
+  }
+
+  def isValidSeq(seq: Byte): Boolean = seq >= 0 && seq <= countSeq
+  def isValidSeqForAdmin(seq: Byte): Boolean = seq > 0 && seq < countSeq
+
+  def findById(id: Int)(implicit session: DBSession = AutoSession): LabelMeta 
= {
+    val cacheKey = "id=" + id
+
+    withCache(cacheKey) {
+      sql"""select * from label_metas where id = ${id}""".map { rs => 
LabelMeta(rs) }.single.apply
+    }.get
+  }
+
+  def findAllByLabelId(labelId: Int, useCache: Boolean = true)(implicit 
session: DBSession = AutoSession): List[LabelMeta] = {
+    val cacheKey = "labelId=" + labelId
+    lazy val labelMetas = sql"""select *
+                                                               from label_metas
+                                                               where label_id 
= ${labelId} order by seq ASC""".map(LabelMeta(_)).list.apply()
+
+    if (useCache) withCaches(cacheKey)(labelMetas)
+    else labelMetas
+  }
+
+  def findByName(labelId: Int, name: String, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession): Option[LabelMeta] = {
+    name match {
+      case timestamp.name => Some(timestamp)
+      case from.name => Some(from)
+      case to.name => Some(to)
+      case _ =>
+        val cacheKey = "labelId=" + labelId + ":name=" + name
+        lazy val labelMeta = sql"""
+            select *
+            from label_metas where label_id = ${labelId} and name = ${name}"""
+          .map { rs => LabelMeta(rs) }.single.apply()
+
+        if (useCache) withCache(cacheKey)(labelMeta)
+        else labelMeta
+    }
+  }
+
+  def insert(labelId: Int, name: String, defaultValue: String, dataType: 
String)(implicit session: DBSession = AutoSession) = {
+    val ls = findAllByLabelId(labelId, useCache = false)
+    val seq = ls.size + 1
+
+    if (seq < maxValue) {
+      sql"""insert into label_metas(label_id, name, seq, default_value, 
data_type)
+    select ${labelId}, ${name}, ${seq}, ${defaultValue}, 
${dataType}""".updateAndReturnGeneratedKey.apply()
+    } else {
+      throw MaxPropSizeReachedException("max property size reached")
+    }
+  }
+
+  def findOrInsert(labelId: Int,
+                   name: String,
+                   defaultValue: String,
+                   dataType: String)(implicit session: DBSession = 
AutoSession): LabelMeta = {
+
+    findByName(labelId, name) match {
+      case Some(c) => c
+      case None =>
+        insert(labelId, name, defaultValue, dataType)
+        val cacheKey = "labelId=" + labelId + ":name=" + name
+        val cacheKeys = "labelId=" + labelId
+        expireCache(cacheKey)
+        expireCaches(cacheKeys)
+        findByName(labelId, name, useCache = false).get
+    }
+  }
+
+  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
+    val labelMeta = findById(id)
+    val (labelId, name) = (labelMeta.labelId, labelMeta.name)
+    sql"""delete from label_metas where id = ${id}""".execute.apply()
+    val cacheKeys = List(s"id=$id", s"labelId=$labelId", 
s"labelId=$labelId:name=$name")
+    cacheKeys.foreach { key =>
+      expireCache(key)
+      expireCaches(key)
+    }
+  }
+
+  def findAll()(implicit session: DBSession = AutoSession) = {
+    val ls = sql"""select * from label_metas""".map { rs => LabelMeta(rs) 
}.list.apply
+    putsToCache(ls.map { x =>
+      val cacheKey = s"id=${x.id.get}"
+      cacheKey -> x
+    })
+    putsToCache(ls.map { x =>
+      val cacheKey = s"labelId=${x.labelId}:name=${x.name}"
+      cacheKey -> x
+    })
+    putsToCache(ls.map { x =>
+      val cacheKey = s"labelId=${x.labelId}:seq=${x.seq}"
+      cacheKey -> x
+    })
+
+    putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
+      val cacheKey = s"labelId=${labelId}"
+      cacheKey -> ls
+    }.toList)
+  }
+}
+
+case class LabelMeta(id: Option[Int], labelId: Int, name: String, seq: Byte, 
defaultValue: String, dataType: String) extends JSONParser {
+  lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, 
"dataType" -> dataType)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
new file mode 100644
index 0000000..700b8d7
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
@@ -0,0 +1,100 @@
+package org.apache.s2graph.core.mysqls
+
+import java.util.concurrent.Executors
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.utils.{SafeUpdateCache, logger}
+import scalikejdbc._
+
+import scala.concurrent.ExecutionContext
+import scala.language.{higherKinds, implicitConversions}
+import scala.util.{Failure, Try}
+
+object Model {
+  var maxSize = 10000
+  var ttl = 60
+  val numOfThread = Runtime.getRuntime.availableProcessors()
+  val threadPool = Executors.newFixedThreadPool(numOfThread)
+  val ec = ExecutionContext.fromExecutor(threadPool)
+
+  def apply(config: Config) = {
+    maxSize = config.getInt("cache.max.size")
+    ttl = config.getInt("cache.ttl.seconds")
+    Class.forName(config.getString("db.default.driver"))
+
+    val settings = ConnectionPoolSettings(
+      initialSize = 10,
+      maxSize = 10,
+      connectionTimeoutMillis = 30000L,
+      validationQuery = "select 1;")
+
+    ConnectionPool.singleton(
+      config.getString("db.default.url"),
+      config.getString("db.default.user"),
+      config.getString("db.default.password"),
+      settings)
+  }
+
+  def withTx[T](block: DBSession => T): Try[T] = {
+    using(DB(ConnectionPool.borrow())) { conn =>
+      val res = Try {
+        conn.begin()
+        val session = conn.withinTxSession()
+        val result = block(session)
+
+        conn.commit()
+
+        result
+      } recoverWith {
+        case e: Exception =>
+          conn.rollbackIfActive()
+          Failure(e)
+      }
+
+      res
+    }
+  }
+
+  def shutdown() = {
+    ConnectionPool.closeAll()
+  }
+
+  def loadCache() = {
+    Service.findAll()
+    ServiceColumn.findAll()
+    Label.findAll()
+    LabelMeta.findAll()
+    LabelIndex.findAll()
+    ColumnMeta.findAll()
+  }
+}
+
+trait Model[V] extends SQLSyntaxSupport[V] {
+
+  import Model._
+
+  implicit val ec: ExecutionContext = Model.ec
+
+  val cName = this.getClass.getSimpleName()
+  logger.info(s"LocalCache[$cName]: TTL[$ttl], MaxSize[$maxSize]")
+
+  val optionCache = new SafeUpdateCache[Option[V]](cName, maxSize, ttl)
+  val listCache = new SafeUpdateCache[List[V]](cName, maxSize, ttl)
+
+  val withCache = optionCache.withCache _
+
+  val withCaches = listCache.withCache _
+
+  val expireCache = optionCache.invalidate _
+
+  val expireCaches = listCache.invalidate _
+
+  def putsToCache(kvs: List[(String, V)]) = kvs.foreach {
+    case (key, value) => optionCache.put(key, Option(value))
+  }
+
+  def putsToCaches(kvs: List[(String, List[V])]) = kvs.foreach {
+    case (key, values) => listCache.put(key, values)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
new file mode 100644
index 0000000..98e1866
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
@@ -0,0 +1,94 @@
+package org.apache.s2graph.core.mysqls
+
+import java.util.UUID
+
+import org.apache.s2graph.core.utils.logger
+import play.api.libs.json.Json
+import scalikejdbc._
+
+object Service extends Model[Service] {
+  def apply(rs: WrappedResultSet): Service = {
+    Service(rs.intOpt("id"), rs.string("service_name"), 
rs.string("access_token"),
+      rs.string("cluster"), rs.string("hbase_table_name"), 
rs.int("pre_split_size"), rs.intOpt("hbase_table_ttl"))
+  }
+
+  def findByAccessToken(accessToken: String)(implicit session: DBSession = 
AutoSession): Option[Service] = {
+    val cacheKey = s"accessToken=$accessToken"
+    withCache(cacheKey)( sql"""select * from services where access_token = 
${accessToken}""".map { rs => Service(rs) }.single.apply)
+  }
+
+  def findById(id: Int)(implicit session: DBSession = AutoSession): Service = {
+    val cacheKey = "id=" + id
+    withCache(cacheKey)( sql"""select * from services where id = ${id}""".map 
{ rs => Service(rs) }.single.apply).get
+  }
+
+  def findByName(serviceName: String, useCache: Boolean = true)(implicit 
session: DBSession = AutoSession): Option[Service] = {
+    val cacheKey = "serviceName=" + serviceName
+    lazy val serviceOpt = sql"""
+        select * from services where service_name = ${serviceName}
+      """.map { rs => Service(rs) }.single.apply()
+
+    if (useCache) withCache(cacheKey)(serviceOpt)
+    else serviceOpt
+  }
+
+  def insert(serviceName: String, cluster: String,
+             hTableName: String, preSplitSize: Int, hTableTTL: Option[Int],
+             compressionAlgorithm: String)(implicit session: DBSession = 
AutoSession): Unit = {
+    logger.info(s"$serviceName, $cluster, $hTableName, $preSplitSize, 
$hTableTTL, $compressionAlgorithm")
+    val accessToken = UUID.randomUUID().toString()
+    sql"""insert into services(service_name, access_token, cluster, 
hbase_table_name, pre_split_size, hbase_table_ttl)
+    values(${serviceName}, ${accessToken}, ${cluster}, ${hTableName}, 
${preSplitSize}, ${hTableTTL})""".execute.apply()
+  }
+
+  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
+    val service = findById(id)
+    val serviceName = service.serviceName
+    sql"""delete from service_columns where id = ${id}""".execute.apply()
+    val cacheKeys = List(s"id=$id", s"serviceName=$serviceName")
+    cacheKeys.foreach { key =>
+      expireCache(key)
+      expireCaches(key)
+    }
+  }
+
+  def findOrInsert(serviceName: String, cluster: String, hTableName: String,
+                   preSplitSize: Int, hTableTTL: Option[Int], 
compressionAlgorithm: String)(implicit session: DBSession = AutoSession): 
Service = {
+    findByName(serviceName) match {
+      case Some(s) => s
+      case None =>
+        insert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, 
compressionAlgorithm)
+        val cacheKey = "serviceName=" + serviceName
+        expireCache(cacheKey)
+        findByName(serviceName).get
+    }
+  }
+
+  def findAll()(implicit session: DBSession = AutoSession) = {
+    val ls = sql"""select * from services""".map { rs => Service(rs) 
}.list.apply
+    putsToCache(ls.map { x =>
+      val cacheKey = s"id=${x.id.get}"
+      (cacheKey -> x)
+    })
+
+    putsToCache(ls.map { x =>
+      val cacheKey = s"serviceName=${x.serviceName}"
+      (cacheKey -> x)
+    })
+  }
+
+  def findAllConn()(implicit session: DBSession = AutoSession): List[String] = 
{
+    sql"""select distinct(cluster) from services""".map { rs => 
rs.string("cluster") }.list.apply
+  }
+}
+
+case class Service(id: Option[Int], serviceName: String, accessToken: String, 
cluster: String, hTableName: String, preSplitSize: Int, hTableTTL: Option[Int]) 
{
+  lazy val toJson =
+    id match {
+      case Some(_id) =>
+        Json.obj("id" -> _id, "name" -> serviceName, "accessToken" -> 
accessToken, "cluster" -> cluster,
+          "hTableName" -> hTableName, "preSplitSize" -> preSplitSize, 
"hTableTTL" -> hTableTTL)
+      case None =>
+        Json.parse("{}")
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
new file mode 100644
index 0000000..6d33fe2
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
@@ -0,0 +1,82 @@
+package org.apache.s2graph.core.mysqls
+
+/**
+ * Created by shon on 6/3/15.
+ */
+
+import org.apache.s2graph.core.JSONParser
+import play.api.libs.json.Json
+import scalikejdbc._
+object ServiceColumn extends Model[ServiceColumn] {
+
+  def apply(rs: WrappedResultSet): ServiceColumn = {
+    ServiceColumn(rs.intOpt("id"), rs.int("service_id"), 
rs.string("column_name"), rs.string("column_type").toLowerCase(), 
rs.string("schema_version"))
+  }
+
+  def findById(id: Int)(implicit session: DBSession = AutoSession): 
ServiceColumn = {
+//    val cacheKey = s"id=$id"
+    val cacheKey = "id=" + id
+    withCache(cacheKey)(sql"""select * from service_columns where id = 
${id}""".map { x => ServiceColumn(x) }.single.apply).get
+  }
+  def find(serviceId: Int, columnName: String, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession): Option[ServiceColumn] = {
+//    val cacheKey = s"serviceId=$serviceId:columnName=$columnName"
+    val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName
+    if (useCache) {
+      withCache(cacheKey) {
+        sql"""
+          select * from service_columns where service_id = ${serviceId} and 
column_name = ${columnName}
+        """.map { rs => ServiceColumn(rs) }.single.apply()
+      }
+    } else {
+      sql"""
+        select * from service_columns where service_id = ${serviceId} and 
column_name = ${columnName}
+      """.map { rs => ServiceColumn(rs) }.single.apply()
+    }
+  }
+  def insert(serviceId: Int, columnName: String, columnType: Option[String], 
schemaVersion: String)(implicit session: DBSession = AutoSession) = {
+    sql"""insert into service_columns(service_id, column_name, column_type, 
schema_version)
+         values(${serviceId}, ${columnName}, ${columnType}, 
${schemaVersion})""".execute.apply()
+  }
+  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
+    val serviceColumn = findById(id)
+    val (serviceId, columnName) = (serviceColumn.serviceId, 
serviceColumn.columnName)
+    sql"""delete from service_columns where id = ${id}""".execute.apply()
+    val cacheKeys = List(s"id=$id", 
s"serviceId=$serviceId:columnName=$columnName")
+    cacheKeys.foreach { key =>
+      expireCache(key)
+      expireCaches(key)
+    }
+  }
+  def findOrInsert(serviceId: Int, columnName: String, columnType: 
Option[String], schemaVersion: String)(implicit session: DBSession = 
AutoSession): ServiceColumn = {
+    find(serviceId, columnName) match {
+      case Some(sc) => sc
+      case None =>
+        insert(serviceId, columnName, columnType, schemaVersion)
+//        val cacheKey = s"serviceId=$serviceId:columnName=$columnName"
+        val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName
+        expireCache(cacheKey)
+        find(serviceId, columnName).get
+    }
+  }
+  def findAll()(implicit session: DBSession = AutoSession) = {
+    val ls = sql"""select * from service_columns""".map { rs => 
ServiceColumn(rs) }.list.apply
+    putsToCache(ls.map { x =>
+      var cacheKey = s"id=${x.id.get}"
+      (cacheKey -> x)
+    })
+    putsToCache(ls.map { x =>
+      var cacheKey = s"serviceId=${x.serviceId}:columnName=${x.columnName}"
+      (cacheKey -> x)
+    })
+  }
+}
+case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, 
columnType: String, schemaVersion: String) extends JSONParser {
+
+  lazy val service = Service.findById(serviceId)
+  lazy val metas = ColumnMeta.findAllByColumn(id.get)
+  lazy val metasInvMap = metas.map { meta => meta.name -> meta} toMap
+  lazy val metaNamesMap = (ColumnMeta.lastModifiedAtColumn :: metas).map(x => 
(x.seq.toInt, x.name)) toMap
+  lazy val toJson = Json.obj("serviceName" -> service.serviceName, 
"columnName" -> columnName, "columnType" -> columnType)
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
new file mode 100644
index 0000000..d5a3687
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
@@ -0,0 +1,207 @@
+package org.apache.s2graph.core.parsers
+
+import org.apache.s2graph.core.GraphExceptions.WhereParserException
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.types.InnerValLike
+import org.apache.s2graph.core.{Edge, GraphExceptions, JSONParser}
+
+import scala.annotation.tailrec
+import scala.util.Try
+import scala.util.parsing.combinator.JavaTokenParsers
+
+trait ExtractValue extends JSONParser {
+  val parent = "_parent."
+
+  def propToInnerVal(edge: Edge, key: String) = {
+    val (propKey, parentEdge) = findParentEdge(edge, key)
+
+    val label = parentEdge.label
+    val metaPropInvMap = label.metaPropsInvMap
+    val labelMeta = metaPropInvMap.getOrElse(propKey, throw 
WhereParserException(s"Where clause contains not existing property name: 
$propKey"))
+    val metaSeq = labelMeta.seq
+
+    metaSeq match {
+      case LabelMeta.from.seq => parentEdge.srcVertex.innerId
+      case LabelMeta.to.seq => parentEdge.tgtVertex.innerId
+      case _ => parentEdge.propsWithTs.get(metaSeq) match {
+        case None => toInnerVal(labelMeta.defaultValue, labelMeta.dataType, 
label.schemaVersion)
+        case Some(edgeVal) => edgeVal.innerVal
+      }
+    }
+  }
+
+  def valueToCompare(edge: Edge, key: String, value: String) = {
+    val label = edge.label
+    if (value.startsWith(parent) || label.metaPropsInvMap.contains(value)) 
propToInnerVal(edge, value)
+    else {
+      val (propKey, _) = findParentEdge(edge, key)
+
+      val labelMeta = label.metaPropsInvMap.getOrElse(propKey, throw 
WhereParserException(s"Where clause contains not existing property name: 
$propKey"))
+      val (srcColumn, tgtColumn) = label.srcTgtColumn(edge.labelWithDir.dir)
+      val dataType = propKey match {
+        case "_to" | "to" => tgtColumn.columnType
+        case "_from" | "from" => srcColumn.columnType
+        case _ => labelMeta.dataType
+      }
+      toInnerVal(value, dataType, label.schemaVersion)
+    }
+  }
+
+  @tailrec
+  private def findParent(edge: Edge, depth: Int): Edge =
+    if (depth > 0) findParent(edge.parentEdges.head.edge, depth - 1)
+    else edge
+
+  private def findParentEdge(edge: Edge, key: String): (String, Edge) = {
+    if (!key.startsWith(parent)) (key, edge)
+    else {
+      val split = key.split(parent)
+      val depth = split.length - 1
+      val propKey = split.last
+
+      val parentEdge = findParent(edge, depth)
+
+      (propKey, parentEdge)
+    }
+  }
+}
+
+trait Clause extends ExtractValue {
+  def and(otherField: Clause): Clause = And(this, otherField)
+
+  def or(otherField: Clause): Clause = Or(this, otherField)
+
+  def filter(edge: Edge): Boolean
+
+  def binaryOp(binOp: (InnerValLike, InnerValLike) => Boolean)(propKey: 
String, value: String)(edge: Edge): Boolean = {
+    val propValue = propToInnerVal(edge, propKey)
+    val compValue = valueToCompare(edge, propKey, value)
+
+    binOp(propValue, compValue)
+  }
+}
+
+case class Where(clauses: Seq[Clause] = Seq.empty[Clause]) {
+  def filter(edge: Edge) =
+    if (clauses.isEmpty) true else clauses.map(_.filter(edge)).forall(identity)
+}
+
+case class Gt(propKey: String, value: String) extends Clause {
+  override def filter(edge: Edge): Boolean = binaryOp(_ > _)(propKey, 
value)(edge)
+}
+
+case class Lt(propKey: String, value: String) extends Clause {
+  override def filter(edge: Edge): Boolean = binaryOp(_ < _)(propKey, 
value)(edge)
+}
+
+case class Eq(propKey: String, value: String) extends Clause {
+  override def filter(edge: Edge): Boolean = binaryOp(_ == _)(propKey, 
value)(edge)
+}
+
+case class InWithoutParent(label: Label, propKey: String, values: Set[String]) 
extends Clause {
+  val innerValLikeLs = values.map { value =>
+    val labelMeta = label.metaPropsInvMap.getOrElse(propKey, throw 
WhereParserException(s"Where clause contains not existing property name: 
$propKey"))
+    val dataType = propKey match {
+      case "_to" | "to" => label.tgtColumn.columnType
+      case "_from" | "from" => label.srcColumn.columnType
+      case _ => labelMeta.dataType
+    }
+    toInnerVal(value, dataType, label.schemaVersion)
+  }
+  override def filter(edge: Edge): Boolean = {
+    val propVal = propToInnerVal(edge, propKey)
+    innerValLikeLs.contains(propVal)
+  }
+}
+
+case class IN(propKey: String, values: Set[String]) extends Clause {
+  override def filter(edge: Edge): Boolean = {
+    val propVal = propToInnerVal(edge, propKey)
+    values.exists { value =>
+      valueToCompare(edge, propKey, value) == propVal
+    }
+  }
+}
+
+case class Between(propKey: String, minValue: String, maxValue: String) 
extends Clause {
+  override def filter(edge: Edge): Boolean = {
+    val propVal = propToInnerVal(edge, propKey)
+    val minVal = valueToCompare(edge, propKey, minValue)
+    val maxVal = valueToCompare(edge, propKey, maxValue)
+
+    minVal <= propVal && propVal <= maxVal
+  }
+}
+
+case class Not(self: Clause) extends Clause {
+  override def filter(edge: Edge) = !self.filter(edge)
+}
+
+case class And(left: Clause, right: Clause) extends Clause {
+  override def filter(edge: Edge) = left.filter(edge) && right.filter(edge)
+}
+
+case class Or(left: Clause, right: Clause) extends Clause {
+  override def filter(edge: Edge) = left.filter(edge) || right.filter(edge)
+}
+
+object WhereParser {
+  val success = Where()
+}
+
+case class WhereParser(label: Label) extends JavaTokenParsers with JSONParser {
+
+  val anyStr = "[^\\s(),]+".r
+
+  val and = "and|AND".r
+
+  val or = "or|OR".r
+
+  val between = "between|BETWEEN".r
+
+  val in = "in|IN".r
+
+  val notIn = "not in|NOT IN".r
+
+  def where: Parser[Where] = rep(clause) ^^ (Where(_))
+
+  def paren: Parser[Clause] = "(" ~> clause <~ ")"
+
+  def clause: Parser[Clause] = (predicate | paren) * (and ^^^ { (a: Clause, b: 
Clause) => And(a, b) } | or ^^^ { (a: Clause, b: Clause) => Or(a, b) })
+
+  def identWithDot: Parser[String] = repsep(ident, ".") ^^ { case values => 
values.mkString(".") }
+
+  def predicate = {
+    identWithDot ~ ("!=" | "=") ~ anyStr ^^ {
+      case f ~ op ~ s =>
+        if (op == "=") Eq(f, s)
+        else Not(Eq(f, s))
+    } | identWithDot ~ (">=" | "<=" | ">" | "<") ~ anyStr ^^ {
+      case f ~ op ~ s => op match {
+        case ">" => Gt(f, s)
+        case ">=" => Or(Gt(f, s), Eq(f, s))
+        case "<" => Lt(f, s)
+        case "<=" => Or(Lt(f, s), Eq(f, s))
+      }
+    } | identWithDot ~ (between ~> anyStr <~ and) ~ anyStr ^^ {
+      case f ~ minV ~ maxV => Between(f, minV, maxV)
+    } | identWithDot ~ (notIn | in) ~ ("(" ~> repsep(anyStr, ",") <~ ")") ^^ {
+      case f ~ op ~ values =>
+        val inClause =
+          if (f.startsWith("_parent")) IN(f, values.toSet)
+          else InWithoutParent(label, f, values.toSet)
+        if (op.toLowerCase == "in") inClause
+        else Not(inClause)
+
+
+      case _ => throw WhereParserException(s"Failed to parse where clause. ")
+    }
+  }
+
+  def parse(sql: String): Try[Where] = Try {
+    parseAll(where, sql) match {
+      case Success(r, q) => r
+      case fail => throw WhereParserException(s"Where parsing error: 
${fail.toString}")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
new file mode 100644
index 0000000..b0af967
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -0,0 +1,610 @@
+package org.apache.s2graph.core.rest
+
+import java.util.concurrent.{Callable, TimeUnit}
+
+import com.google.common.cache.CacheBuilder
+import com.typesafe.config.Config
+import org.apache.s2graph.core.GraphExceptions.{BadQueryException, 
ModelNotFoundException}
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.parsers.{Where, WhereParser}
+import org.apache.s2graph.core.types._
+import play.api.libs.json._
+
+import scala.util.{Failure, Success, Try}
+
+object TemplateHelper {
+  val findVar = """\"?\$\{(.*?)\}\"?""".r
+  val num = 
"""(next_day|next_hour|next_week|now)?\s*(-?\s*[0-9]+)?\s*(hour|day|week)?""".r
+  val hour = 60 * 60 * 1000L
+  val day = hour * 24L
+  val week = day * 7L
+  def calculate(now: Long, n: Int, unit: String): Long = {
+    val duration = unit match {
+      case "hour" | "HOUR" => n * hour
+      case "day" | "DAY" => n * day
+      case "week" | "WEEK" => n * week
+      case _ => n * day
+    }
+
+    duration + now
+  }
+
+  def replaceVariable(now: Long, body: String): String = {
+    findVar.replaceAllIn(body, m => {
+      val matched = m group 1
+
+      num.replaceSomeIn(matched, m => {
+        val (_pivot, n, unit) = (m.group(1), m.group(2), m.group(3))
+        val ts = _pivot match {
+          case null => now
+          case "now" | "NOW" => now
+          case "next_week" | "NEXT_WEEK" => now / week * week + week
+          case "next_day" | "NEXT_DAY" => now / day * day + day
+          case "next_hour" | "NEXT_HOUR" => now / hour * hour + hour
+        }
+
+        if (_pivot == null && n == null && unit == null) None
+        else if (n == null || unit == null) Option(ts.toString)
+        else Option(calculate(ts, n.replaceAll(" ", "").toInt, unit).toString)
+      })
+    })
+  }
+}
+
+class RequestParser(config: Config) extends JSONParser {
+
+  import Management.JsonModel._
+
+  val hardLimit = 100000
+  val defaultLimit = 100
+  val maxLimit = Int.MaxValue - 1
+  val DefaultRpcTimeout = config.getInt("hbase.rpc.timeout")
+  val DefaultMaxAttempt = config.getInt("hbase.client.retries.number")
+  val DefaultCluster = config.getString("hbase.zookeeper.quorum")
+  val DefaultCompressionAlgorithm = 
config.getString("hbase.table.compression.algorithm")
+  val DefaultPhase = config.getString("phase")
+  val parserCache = CacheBuilder.newBuilder()
+    .expireAfterAccess(10000, TimeUnit.MILLISECONDS)
+    .expireAfterWrite(10000, TimeUnit.MILLISECONDS)
+    .maximumSize(10000)
+    .initialCapacity(1000)
+    .build[String, Try[Where]]
+
+  private def extractScoring(labelId: Int, value: JsValue) = {
+    val ret = for {
+      js <- parse[Option[JsObject]](value, "scoring")
+    } yield {
+      for {
+        (k, v) <- js.fields
+        labelOrderType <- LabelMeta.findByName(labelId, k)
+      } yield {
+        val value = v match {
+          case n: JsNumber => n.as[Double]
+          case _ => throw new Exception("scoring weight should be double.")
+        }
+        (labelOrderType.seq, value)
+      }
+    }
+    ret
+  }
+
+  def extractInterval(label: Label, _jsValue: JsValue) = {
+    val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), 
_jsValue.toString())
+    val jsValue = Json.parse(replaced)
+
+    def extractKv(js: JsValue) = js match {
+      case JsObject(obj) => obj
+      case JsArray(arr) => arr.flatMap {
+        case JsObject(obj) => obj
+        case _ => throw new RuntimeException(s"cannot support json type $js")
+      }
+      case _ => throw new RuntimeException(s"cannot support json type: $js")
+    }
+
+    val ret = for {
+      js <- parse[Option[JsObject]](jsValue, "interval")
+      fromJs <- (js \ "from").asOpt[JsValue]
+      toJs <- (js \ "to").asOpt[JsValue]
+    } yield {
+      val from = Management.toProps(label, extractKv(fromJs))
+      val to = Management.toProps(label, extractKv(toJs))
+      (from, to)
+    }
+
+    ret
+  }
+
+  def extractDuration(label: Label, _jsValue: JsValue) = {
+    val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), 
_jsValue.toString())
+    val jsValue = Json.parse(replaced)
+
+    for {
+      js <- parse[Option[JsObject]](jsValue, "duration")
+    } yield {
+      val minTs = parse[Option[Long]](js, "from").getOrElse(Long.MaxValue)
+      val maxTs = parse[Option[Long]](js, "to").getOrElse(Long.MinValue)
+
+      if (minTs > maxTs) {
+        throw new BadQueryException("Duration error. Timestamp of From cannot 
be larger than To.")
+      }
+
+      (minTs, maxTs)
+    }
+  }
+
+  def extractHas(label: Label, jsValue: JsValue) = {
+    val ret = for {
+      js <- parse[Option[JsObject]](jsValue, "has")
+    } yield {
+      for {
+        (k, v) <- js.fields
+        labelMeta <- LabelMeta.findByName(label.id.get, k)
+        value <- jsValueToInnerVal(v, labelMeta.dataType, label.schemaVersion)
+      } yield {
+        labelMeta.seq -> value
+      }
+    }
+    ret.map(_.toMap).getOrElse(Map.empty[Byte, InnerValLike])
+  }
+  
+  def extractWhere(label: Label, whereClauseOpt: Option[String]): Try[Where] = 
{
+    whereClauseOpt match {
+      case None => Success(WhereParser.success)
+      case Some(_where) =>
+        val where = TemplateHelper.replaceVariable(System.currentTimeMillis(), 
_where)
+        val whereParserKey = s"${label.label}_${where}"
+        parserCache.get(whereParserKey, new Callable[Try[Where]] {
+          override def call(): Try[Where] = {
+            WhereParser(label).parse(where) match {
+              case s@Success(_) => s
+              case Failure(ex) => throw BadQueryException(ex.getMessage, ex)
+            }
+          }
+        })
+    }
+  }
+
+  def toVertices(labelName: String, direction: String, ids: Seq[JsValue]): 
Seq[Vertex] = {
+    val vertices = for {
+      label <- Label.findByName(labelName).toSeq
+      serviceColumn = if (direction == "out") label.srcColumn else 
label.tgtColumn
+      id <- ids
+      innerId <- jsValueToInnerVal(id, serviceColumn.columnType, 
label.schemaVersion)
+    } yield {
+      Vertex(SourceVertexId(serviceColumn.id.get, innerId), 
System.currentTimeMillis())
+    }
+    vertices.toSeq
+  }
+
+  def toMultiQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): MultiQuery 
= {
+    val queries = for {
+      queryJson <- (jsValue \ 
"queries").asOpt[Seq[JsValue]].getOrElse(Seq.empty)
+    } yield {
+      toQuery(queryJson, isEdgeQuery)
+    }
+    val weights = (jsValue \ 
"weights").asOpt[Seq[Double]].getOrElse(queries.map(_ => 1.0))
+    MultiQuery(queries = queries, weights = weights,
+      queryOption = toQueryOption(jsValue), jsonQuery = jsValue)
+  }
+
+  def toQueryOption(jsValue: JsValue): QueryOption = {
+    val filterOutFields = (jsValue \ 
"filterOutFields").asOpt[List[String]].getOrElse(List(LabelMeta.to.name))
+    val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => 
toQuery(v) }.map { q =>
+      q.copy(queryOption = q.queryOption.copy(filterOutFields = 
filterOutFields))
+    }
+    val removeCycle = (jsValue \ "removeCycle").asOpt[Boolean].getOrElse(true)
+    val selectColumns = (jsValue \ 
"select").asOpt[List[String]].getOrElse(List.empty)
+    val groupByColumns = (jsValue \ 
"groupBy").asOpt[List[String]].getOrElse(List.empty)
+    val orderByColumns: List[(String, Boolean)] = (jsValue \ 
"orderBy").asOpt[List[JsObject]].map { jsLs =>
+      for {
+        js <- jsLs
+        (column, orderJs) <- js.fields
+      } yield {
+        val ascending = orderJs.as[String].toUpperCase match {
+          case "ASC" => true
+          case "DESC" => false
+        }
+        column -> ascending
+      }
+    }.getOrElse(List("score" -> false, "timestamp" -> false))
+    val withScore = (jsValue \ "withScore").asOpt[Boolean].getOrElse(true)
+    val returnTree = (jsValue \ "returnTree").asOpt[Boolean].getOrElse(false)
+    //TODO: Refactor this
+    val limitOpt = (jsValue \ "limit").asOpt[Int]
+    val returnAgg = (jsValue \ "returnAgg").asOpt[Boolean].getOrElse(true)
+    val scoreThreshold = (jsValue \ 
"scoreThreshold").asOpt[Double].getOrElse(Double.MinValue)
+    val returnDegree = (jsValue \ 
"returnDegree").asOpt[Boolean].getOrElse(true)
+
+    QueryOption(removeCycle = removeCycle,
+      selectColumns = selectColumns,
+      groupByColumns = groupByColumns,
+      orderByColumns = orderByColumns,
+      filterOutQuery = filterOutQuery,
+      filterOutFields = filterOutFields,
+      withScore = withScore,
+      returnTree = returnTree,
+      limitOpt = limitOpt,
+      returnAgg = returnAgg,
+      scoreThreshold = scoreThreshold,
+      returnDegree = returnDegree
+    )
+  }
+  def toQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): Query = {
+    try {
+      val vertices =
+        (for {
+          value <- parse[List[JsValue]](jsValue, "srcVertices")
+          serviceName = parse[String](value, "serviceName")
+          column = parse[String](value, "columnName")
+        } yield {
+          val service = Service.findByName(serviceName).getOrElse(throw 
BadQueryException("service not found"))
+          val col = ServiceColumn.find(service.id.get, column).getOrElse(throw 
BadQueryException("bad column name"))
+          val (idOpt, idsOpt) = ((value \ "id").asOpt[JsValue], (value \ 
"ids").asOpt[List[JsValue]])
+          for {
+            idVal <- idOpt ++ idsOpt.toSeq.flatten
+
+            /** bug, need to use labels schemaVersion  */
+            innerVal <- jsValueToInnerVal(idVal, col.columnType, 
col.schemaVersion)
+          } yield {
+            Vertex(SourceVertexId(col.id.get, innerVal), 
System.currentTimeMillis())
+          }
+        }).flatten
+
+      if (vertices.isEmpty) throw BadQueryException("srcVertices`s id is 
empty")
+      val steps = parse[Vector[JsValue]](jsValue, "steps")
+
+      val queryOption = toQueryOption(jsValue)
+
+      val querySteps =
+        steps.zipWithIndex.map { case (step, stepIdx) =>
+          val labelWeights = step match {
+            case obj: JsObject =>
+              val converted = for {
+                (k, v) <- (obj \ 
"weights").asOpt[JsObject].getOrElse(Json.obj()).fields
+                l <- Label.findByName(k)
+              } yield {
+                l.id.get -> v.toString().toDouble
+              }
+              converted.toMap
+            case _ => Map.empty[Int, Double]
+          }
+          val queryParamJsVals = step match {
+            case arr: JsArray => arr.as[List[JsValue]]
+            case obj: JsObject => (obj \ "step").as[List[JsValue]]
+            case _ => List.empty[JsValue]
+          }
+          val nextStepScoreThreshold = step match {
+            case obj: JsObject => (obj \ 
"nextStepThreshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold)
+            case _ => QueryParam.DefaultThreshold
+          }
+          val nextStepLimit = step match {
+            case obj: JsObject => (obj \ 
"nextStepLimit").asOpt[Int].getOrElse(-1)
+            case _ => -1
+          }
+          val cacheTTL = step match {
+            case obj: JsObject => (obj \ "cacheTTL").asOpt[Int].getOrElse(-1)
+            case _ => -1
+          }
+          val queryParams =
+            for {
+              labelGroup <- queryParamJsVals
+              queryParam <- parseQueryParam(labelGroup)
+            } yield {
+              val (_, columnName) =
+                if (queryParam.labelWithDir.dir == 
GraphUtil.directions("out")) {
+                  (queryParam.label.srcService.serviceName, 
queryParam.label.srcColumnName)
+                } else {
+                  (queryParam.label.tgtService.serviceName, 
queryParam.label.tgtColumnName)
+                }
+              //FIXME:
+              if (stepIdx == 0 && vertices.nonEmpty && !vertices.exists(v => 
v.serviceColumn.columnName == columnName)) {
+                throw BadQueryException("srcVertices contains incompatiable 
serviceName or columnName with first step.")
+              }
+
+              queryParam
+            }
+          Step(queryParams.toList, labelWeights = labelWeights,
+            //            scoreThreshold = stepThreshold,
+            nextStepScoreThreshold = nextStepScoreThreshold,
+            nextStepLimit = nextStepLimit,
+            cacheTTL = cacheTTL)
+
+        }
+
+      val ret = Query(vertices, querySteps, queryOption, jsValue)
+      //      logger.debug(ret.toString)
+      ret
+    } catch {
+      case e: BadQueryException =>
+        throw e
+      case e: ModelNotFoundException =>
+        throw BadQueryException(e.getMessage, e)
+      case e: Exception =>
+        throw BadQueryException(s"$jsValue, $e", e)
+    }
+  }
+
+  private def parseQueryParam(labelGroup: JsValue): Option[QueryParam] = {
+    for {
+      labelName <- parse[Option[String]](labelGroup, "label")
+    } yield {
+      val label = Label.findByName(labelName).getOrElse(throw 
BadQueryException(s"$labelName not found"))
+      val direction = parse[Option[String]](labelGroup, 
"direction").map(GraphUtil.toDirection(_)).getOrElse(0)
+      val limit = {
+        parse[Option[Int]](labelGroup, "limit") match {
+          case None => defaultLimit
+          case Some(l) if l < 0 => maxLimit
+          case Some(l) if l >= 0 =>
+            val default = hardLimit
+            Math.min(l, default)
+        }
+      }
+      val offset = parse[Option[Int]](labelGroup, "offset").getOrElse(0)
+      val interval = extractInterval(label, labelGroup)
+      val duration = extractDuration(label, labelGroup)
+      val scoring = extractScoring(label.id.get, 
labelGroup).getOrElse(List.empty[(Byte, Double)]).toList
+      val exclude = parse[Option[Boolean]](labelGroup, 
"exclude").getOrElse(false)
+      val include = parse[Option[Boolean]](labelGroup, 
"include").getOrElse(false)
+      val hasFilter = extractHas(label, labelGroup)
+      val labelWithDir = LabelWithDirection(label.id.get, direction)
+      val indexNameOpt = (labelGroup \ "index").asOpt[String]
+      val indexSeq = indexNameOpt match {
+        case None => label.indexSeqsMap.get(scoring.map(kv => 
kv._1)).map(_.seq).getOrElse(LabelIndex.DefaultSeq)
+        case Some(indexName) => 
label.indexNameMap.get(indexName).map(_.seq).getOrElse(throw new 
RuntimeException("cannot find index"))
+      }
+      val whereClauseOpt = (labelGroup \ "where").asOpt[String]
+      val where = extractWhere(label, whereClauseOpt)
+      val includeDegree = (labelGroup \ 
"includeDegree").asOpt[Boolean].getOrElse(true)
+      val rpcTimeout = (labelGroup \ 
"rpcTimeout").asOpt[Int].getOrElse(DefaultRpcTimeout)
+      val maxAttempt = (labelGroup \ 
"maxAttempt").asOpt[Int].getOrElse(DefaultMaxAttempt)
+      val tgtVertexInnerIdOpt = (labelGroup \ "_to").asOpt[JsValue].flatMap { 
jsVal =>
+        jsValueToInnerVal(jsVal, label.tgtColumnWithDir(direction).columnType, 
label.schemaVersion)
+      }
+      val cacheTTL = (labelGroup \ "cacheTTL").asOpt[Long].getOrElse(-1L)
+      val timeDecayFactor = (labelGroup \ "timeDecay").asOpt[JsObject].map { 
jsVal =>
+        val propName = (jsVal \ 
"propName").asOpt[String].getOrElse(LabelMeta.timestamp.name)
+        val propNameSeq = 
label.metaPropsInvMap.get(propName).map(_.seq).getOrElse(LabelMeta.timeStampSeq)
+        val initial = (jsVal \ "initial").asOpt[Double].getOrElse(1.0)
+        val decayRate = (jsVal \ "decayRate").asOpt[Double].getOrElse(0.1)
+        if (decayRate >= 1.0 || decayRate <= 0.0) throw new 
BadQueryException("decay rate should be 0.0 ~ 1.0")
+        val timeUnit = (jsVal \ "timeUnit").asOpt[Double].getOrElse(60 * 60 * 
24.0)
+        TimeDecay(initial, decayRate, timeUnit, propNameSeq)
+      }
+      val threshold = (labelGroup \ 
"threshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold)
+      // TODO: refactor this. dirty
+      val duplicate = parse[Option[String]](labelGroup, "duplicate").map(s => 
Query.DuplicatePolicy(s))
+
+      val outputField = (labelGroup \ "outputField").asOpt[String].map(s => 
Json.arr(Json.arr(s)))
+      val transformer = if (outputField.isDefined) outputField else 
(labelGroup \ "transform").asOpt[JsValue]
+      val scorePropagateOp = (labelGroup \ 
"scorePropagateOp").asOpt[String].getOrElse("multiply")
+      val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1)
+      val shouldNormalize = (labelGroup \ 
"normalize").asOpt[Boolean].getOrElse(false)
+      val cursorOpt = (labelGroup \ "cursor").asOpt[String]
+      // FIXME: Order of command matter
+      QueryParam(labelWithDir)
+        .sample(sample)
+        .limit(offset, limit)
+        .rank(RankParam(label.id.get, scoring))
+        .exclude(exclude)
+        .include(include)
+        .duration(duration)
+        .has(hasFilter)
+        .labelOrderSeq(indexSeq)
+        .interval(interval)
+        .where(where)
+        .duplicatePolicy(duplicate)
+        .includeDegree(includeDegree)
+        .rpcTimeout(rpcTimeout)
+        .maxAttempt(maxAttempt)
+        .tgtVertexInnerIdOpt(tgtVertexInnerIdOpt)
+        .cacheTTLInMillis(cacheTTL)
+        .timeDecay(timeDecayFactor)
+        .threshold(threshold)
+        .transformer(transformer)
+        .scorePropagateOp(scorePropagateOp)
+        .shouldNormalize(shouldNormalize)
+        .cursorOpt(cursorOpt)
+    }
+  }
+
+  private def parse[R](js: JsValue, key: String)(implicit read: Reads[R]): R = 
{
+    (js \ key).validate[R]
+      .fold(
+        errors => {
+          val msg = (JsError.toFlatJson(errors) \ 
"obj").as[List[JsValue]].map(x => x \ "msg")
+          val e = Json.obj("args" -> key, "error" -> msg)
+          throw new GraphExceptions.JsonParseException(Json.obj("error" -> 
key).toString)
+        },
+        r => {
+          r
+        })
+  }
+
+  def toJsValues(jsValue: JsValue): List[JsValue] = {
+    jsValue match {
+      case obj: JsObject => List(obj)
+      case arr: JsArray => arr.as[List[JsValue]]
+      case _ => List.empty[JsValue]
+    }
+
+  }
+
+  def toEdgesWithOrg(jsValue: JsValue, operation: String): (List[Edge], 
List[JsValue]) = {
+    val jsValues = toJsValues(jsValue)
+    val edges = jsValues.flatMap(toEdge(_, operation))
+
+    (edges, jsValues)
+  }
+
+  def toEdges(jsValue: JsValue, operation: String): List[Edge] = {
+    toJsValues(jsValue).flatMap { edgeJson =>
+      toEdge(edgeJson, operation)
+    }
+  }
+
+
+  private def toEdge(jsValue: JsValue, operation: String): List[Edge] = {
+
+    def parseId(js: JsValue) = js match {
+      case s: JsString => s.as[String]
+      case o@_ => s"${o}"
+    }
+    val srcId = (jsValue \ "from").asOpt[JsValue].toList.map(parseId(_))
+    val tgtId = (jsValue \ "to").asOpt[JsValue].toList.map(parseId(_))
+    val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms 
=> froms.map(js => parseId(js))) ++ srcId
+    val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(froms 
=> froms.map(js => parseId(js))) ++ tgtId
+
+    val label = parse[String](jsValue, "label")
+    val timestamp = parse[Long](jsValue, "timestamp")
+    val direction = parse[Option[String]](jsValue, "direction").getOrElse("")
+    val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}")
+    for {
+      srcId <- srcIds
+      tgtId <- tgtIds
+    } yield {
+      Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, 
props.toString)
+    }
+  }
+
+  def toVertices(jsValue: JsValue, operation: String, serviceName: 
Option[String] = None, columnName: Option[String] = None) = {
+    toJsValues(jsValue).map(toVertex(_, operation, serviceName, columnName))
+  }
+
+  def toVertex(jsValue: JsValue, operation: String, serviceName: 
Option[String] = None, columnName: Option[String] = None): Vertex = {
+    val id = parse[JsValue](jsValue, "id")
+    val ts = parse[Option[Long]](jsValue, 
"timestamp").getOrElse(System.currentTimeMillis())
+    val sName = if (serviceName.isEmpty) parse[String](jsValue, "serviceName") 
else serviceName.get
+    val cName = if (columnName.isEmpty) parse[String](jsValue, "columnName") 
else columnName.get
+    val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
+    Management.toVertex(ts, operation, id.toString, sName, cName, 
props.toString)
+  }
+
+  def toPropElements(jsObj: JsValue) = Try {
+    val propName = (jsObj \ "name").as[String]
+    val dataType = InnerVal.toInnerDataType((jsObj \ "dataType").as[String])
+    val defaultValue = (jsObj \ "defaultValue").as[JsValue] match {
+      case JsString(s) => s
+      case _@js => js.toString
+    }
+    Prop(propName, defaultValue, dataType)
+  }
+
+  def toPropsElements(jsValue: JsValue): Seq[Prop] = for {
+    jsObj <- jsValue.asOpt[Seq[JsValue]].getOrElse(Nil)
+  } yield {
+    val propName = (jsObj \ "name").as[String]
+    val dataType = InnerVal.toInnerDataType((jsObj \ "dataType").as[String])
+    val defaultValue = (jsObj \ "defaultValue").as[JsValue] match {
+      case JsString(s) => s
+      case _@js => js.toString
+    }
+    Prop(propName, defaultValue, dataType)
+  }
+
+  def toIndicesElements(jsValue: JsValue): Seq[Index] = for {
+    jsObj <- jsValue.as[Seq[JsValue]]
+    indexName = (jsObj \ "name").as[String]
+    propNames = (jsObj \ "propNames").as[Seq[String]]
+  } yield Index(indexName, propNames)
+
+  def toLabelElements(jsValue: JsValue) = Try {
+    val labelName = parse[String](jsValue, "label")
+    val srcServiceName = parse[String](jsValue, "srcServiceName")
+    val tgtServiceName = parse[String](jsValue, "tgtServiceName")
+    val srcColumnName = parse[String](jsValue, "srcColumnName")
+    val tgtColumnName = parse[String](jsValue, "tgtColumnName")
+    val srcColumnType = parse[String](jsValue, "srcColumnType")
+    val tgtColumnType = parse[String](jsValue, "tgtColumnType")
+    val serviceName = (jsValue \ 
"serviceName").asOpt[String].getOrElse(tgtServiceName)
+    val isDirected = (jsValue \ "isDirected").asOpt[Boolean].getOrElse(true)
+
+    val allProps = toPropsElements(jsValue \ "props")
+    val indices = toIndicesElements(jsValue \ "indices")
+
+    val consistencyLevel = (jsValue \ 
"consistencyLevel").asOpt[String].getOrElse("weak")
+
+    // expect new label don`t provide hTableName
+    val hTableName = (jsValue \ "hTableName").asOpt[String]
+    val hTableTTL = (jsValue \ "hTableTTL").asOpt[Int]
+    val schemaVersion = (jsValue \ 
"schemaVersion").asOpt[String].getOrElse(HBaseType.DEFAULT_VERSION)
+    val isAsync = (jsValue \ "isAsync").asOpt[Boolean].getOrElse(false)
+    val compressionAlgorithm = (jsValue \ 
"compressionAlgorithm").asOpt[String].getOrElse(DefaultCompressionAlgorithm)
+
+    (labelName, srcServiceName, srcColumnName, srcColumnType,
+      tgtServiceName, tgtColumnName, tgtColumnType, isDirected, serviceName,
+      indices, allProps, consistencyLevel, hTableName, hTableTTL, 
schemaVersion, isAsync, compressionAlgorithm)
+  }
+
+  def toIndexElements(jsValue: JsValue) = Try {
+    val labelName = parse[String](jsValue, "label")
+    val indices = toIndicesElements(jsValue \ "indices")
+    (labelName, indices)
+  }
+
+  def toServiceElements(jsValue: JsValue) = {
+    val serviceName = parse[String](jsValue, "serviceName")
+    val cluster = (jsValue \ "cluster").asOpt[String].getOrElse(DefaultCluster)
+    val hTableName = (jsValue \ 
"hTableName").asOpt[String].getOrElse(s"${serviceName}-${DefaultPhase}")
+    val preSplitSize = (jsValue \ "preSplitSize").asOpt[Int].getOrElse(1)
+    val hTableTTL = (jsValue \ "hTableTTL").asOpt[Int]
+    val compressionAlgorithm = (jsValue \ 
"compressionAlgorithm").asOpt[String].getOrElse(DefaultCompressionAlgorithm)
+    (serviceName, cluster, hTableName, preSplitSize, hTableTTL, 
compressionAlgorithm)
+  }
+
+  def toServiceColumnElements(jsValue: JsValue) = Try {
+    val serviceName = parse[String](jsValue, "serviceName")
+    val columnName = parse[String](jsValue, "columnName")
+    val columnType = parse[String](jsValue, "columnType")
+    val props = toPropsElements(jsValue \ "props")
+    (serviceName, columnName, columnType, props)
+  }
+
+  def toCheckEdgeParam(jsValue: JsValue) = {
+    val params = jsValue.as[List[JsValue]]
+    var isReverted = false
+    val labelWithDirs = scala.collection.mutable.HashSet[LabelWithDirection]()
+    val quads = for {
+      param <- params
+      labelName <- (param \ "label").asOpt[String]
+      direction <- GraphUtil.toDir((param \ 
"direction").asOpt[String].getOrElse("out"))
+      label <- Label.findByName(labelName)
+      srcId <- jsValueToInnerVal((param \ "from").as[JsValue], 
label.srcColumnWithDir(direction.toInt).columnType, label.schemaVersion)
+      tgtId <- jsValueToInnerVal((param \ "to").as[JsValue], 
label.tgtColumnWithDir(direction.toInt).columnType, label.schemaVersion)
+    } yield {
+      val labelWithDir = LabelWithDirection(label.id.get, direction)
+      labelWithDirs += labelWithDir
+      val (src, tgt, dir) = if (direction == 1) {
+        isReverted = true
+        (Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, 
tgtId)),
+          Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, 
srcId)), 0)
+      } else {
+        (Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, 
srcId)),
+          Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, 
tgtId)), 0)
+      }
+      (src, tgt, QueryParam(LabelWithDirection(label.id.get, dir)))
+    }
+    (quads, isReverted)
+  }
+
+  def toGraphElements(str: String): Seq[GraphElement] = {
+    val edgeStrs = str.split("\\n")
+
+    for {
+      edgeStr <- edgeStrs
+      str <- GraphUtil.parseString(edgeStr)
+      element <- Graph.toGraphElement(str)
+    } yield element
+  }
+
+  def toDeleteParam(json: JsValue) = {
+    val labelName = (json \ "label").as[String]
+    val labels = Label.findByName(labelName).map { l => Seq(l) 
}.getOrElse(Nil).filterNot(_.isAsync)
+    val direction = (json \ "direction").asOpt[String].getOrElse("out")
+
+    val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil)
+    val ts = (json \ 
"timestamp").asOpt[Long].getOrElse(System.currentTimeMillis())
+    val vertices = toVertices(labelName, direction, ids)
+    (labels, direction, ids, ts, vertices)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
new file mode 100644
index 0000000..a48bc7c
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
@@ -0,0 +1,244 @@
+package org.apache.s2graph.core.rest
+
+import java.net.URL
+
+import org.apache.s2graph.core.GraphExceptions.BadQueryException
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.mysqls.{Bucket, Experiment, Service}
+import org.apache.s2graph.core.utils.logger
+import play.api.libs.json._
+
+import scala.concurrent.{ExecutionContext, Future}
+
+
+object RestHandler {
+  case class HandlerResult(body: Future[JsValue], headers: (String, String)*)
+}
+
+/**
+  * Public API, only return Future.successful or Future.failed
+  * Don't throw exception
+  */
+class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
+
+  import RestHandler._
+  val requestParser = new RequestParser(graph.config)
+
+  /**
+    * Public APIS
+    */
+  def doPost(uri: String, body: String, impKeyOpt: => Option[String] = None): 
HandlerResult = {
+    try {
+      val jsQuery = Json.parse(body)
+
+      uri match {
+        case "/graphs/getEdges" => 
HandlerResult(getEdgesAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
+        case "/graphs/getEdges/grouped" => 
HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted))
+        case "/graphs/getEdgesExcluded" => 
HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
+        case "/graphs/getEdgesExcluded/grouped" => 
HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
+        case "/graphs/checkEdges" => checkEdges(jsQuery)
+        case "/graphs/getEdgesGrouped" => 
HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList))
+        case "/graphs/getEdgesGroupedExcluded" => 
HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude))
+        case "/graphs/getEdgesGroupedExcludedFormatted" => 
HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
+        case "/graphs/getVertices" => HandlerResult(getVertices(jsQuery))
+        case uri if uri.startsWith("/graphs/experiment") =>
+          val Array(accessToken, experimentName, uuid) = 
uri.split("/").takeRight(3)
+          experiment(jsQuery, accessToken, experimentName, uuid, impKeyOpt)
+        case _ => throw new RuntimeException("route is not found")
+      }
+    } catch {
+      case e: Exception => HandlerResult(Future.failed(e))
+    }
+  }
+
+  // TODO: Refactor to doGet
+  def checkEdges(jsValue: JsValue): HandlerResult = {
+    try {
+      val (quads, isReverted) = requestParser.toCheckEdgeParam(jsValue)
+
+      HandlerResult(graph.checkEdges(quads).map { case 
queryRequestWithResultLs =>
+        val edgeJsons = for {
+          queryRequestWithResult <- queryRequestWithResultLs
+          (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
+          edgeWithScore <- queryResult.edgeWithScoreLs
+          (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+          convertedEdge = if (isReverted) edge.duplicateEdge else edge
+          edgeJson = PostProcess.edgeToJson(convertedEdge, score, 
queryRequest.query, queryRequest.queryParam)
+        } yield Json.toJson(edgeJson)
+
+        Json.toJson(edgeJsons)
+      })
+    } catch {
+      case e: Exception => HandlerResult(Future.failed(e))
+    }
+  }
+
+
+  /**
+    * Private APIS
+    */
+  private def experiment(contentsBody: JsValue, accessToken: String, 
experimentName: String, uuid: String, impKeyOpt: => Option[String]): 
HandlerResult = {
+
+    try {
+      val bucketOpt = for {
+        service <- Service.findByAccessToken(accessToken)
+        experiment <- Experiment.findBy(service.id.get, experimentName)
+        bucket <- experiment.findBucket(uuid, impKeyOpt)
+      } yield bucket
+
+      val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is 
not found"))
+      if (bucket.isGraphQuery) {
+        val ret = buildRequestInner(contentsBody, bucket, uuid)
+        HandlerResult(ret.body, Experiment.impressionKey -> 
bucket.impressionId)
+      }
+      else throw new RuntimeException("not supported yet")
+    } catch {
+      case e: Exception => HandlerResult(Future.failed(e))
+    }
+  }
+
+  private def buildRequestInner(contentsBody: JsValue, bucket: Bucket, uuid: 
String): HandlerResult = {
+    if (bucket.isEmpty) 
HandlerResult(Future.successful(PostProcess.emptyResults))
+    else {
+      val body = buildRequestBody(Option(contentsBody), bucket, uuid)
+      val url = new URL(bucket.apiPath)
+      val path = url.getPath
+
+      // dummy log for sampling
+      val experimentLog = s"POST $path took -1 ms 200 -1 $body"
+      logger.debug(experimentLog)
+
+      doPost(path, body)
+    }
+  }
+
+  private def eachQuery(post: (Seq[QueryRequestWithResult], 
Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = {
+    val filterOutQueryResultsLs = q.filterOutQuery match {
+      case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
+      case None => Future.successful(Seq.empty)
+    }
+
+    for {
+      queryResultsLs <- graph.getEdges(q)
+      filterOutResultsLs <- filterOutQueryResultsLs
+    } yield {
+      val json = post(queryResultsLs, filterOutResultsLs)
+      json
+    }
+  }
+
+  def getEdgesAsync(jsonQuery: JsValue)
+                   (post: (Seq[QueryRequestWithResult], 
Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
+
+    val fetch = eachQuery(post) _
+    jsonQuery match {
+      case JsArray(arr) => 
Future.traverse(arr.map(requestParser.toQuery(_)))(fetch).map(JsArray)
+      case obj@JsObject(_) =>
+        (obj \ "queries").asOpt[JsValue] match {
+          case None => fetch(requestParser.toQuery(obj))
+          case _ =>
+            val multiQuery = requestParser.toMultiQuery(obj)
+            val filterOutFuture = multiQuery.queryOption.filterOutQuery match {
+              case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
+              case None => Future.successful(Seq.empty)
+            }
+            val futures = multiQuery.queries.zip(multiQuery.weights).map { 
case (query, weight) =>
+              val filterOutQueryResultsLs = query.queryOption.filterOutQuery 
match {
+                case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
+                case None => Future.successful(Seq.empty)
+              }
+              for {
+                queryRequestWithResultLs <- graph.getEdges(query)
+                filterOutResultsLs <- filterOutQueryResultsLs
+              } yield {
+                val newQueryRequestWithResult = for {
+                  queryRequestWithResult <- queryRequestWithResultLs
+                  queryResult = queryRequestWithResult.queryResult
+                } yield {
+                  val newEdgesWithScores = for {
+                    edgeWithScore <- 
queryRequestWithResult.queryResult.edgeWithScoreLs
+                  } yield {
+                    edgeWithScore.copy(score = edgeWithScore.score * weight)
+                  }
+                  queryRequestWithResult.copy(queryResult = 
queryResult.copy(edgeWithScoreLs = newEdgesWithScores))
+                }
+                logger.debug(s"[Size]: 
${newQueryRequestWithResult.map(_.queryResult.edgeWithScoreLs.size).sum}")
+                (newQueryRequestWithResult, filterOutResultsLs)
+              }
+            }
+            for {
+              filterOut <- filterOutFuture
+              resultWithExcludeLs <- Future.sequence(futures)
+            } yield {
+              PostProcess.toSimpleVertexArrJsonMulti(multiQuery.queryOption, 
resultWithExcludeLs, filterOut)
+              //              val initial = 
(ListBuffer.empty[QueryRequestWithResult], 
ListBuffer.empty[QueryRequestWithResult])
+              //              val (results, excludes) = 
resultWithExcludeLs.foldLeft(initial) { case ((prevResults, prevExcludes), 
(results, excludes)) =>
+              //                (prevResults ++= results, prevExcludes ++= 
excludes)
+              //              }
+              //              
PostProcess.toSimpleVertexArrJson(multiQuery.queryOption, results, excludes ++ 
filterOut)
+            }
+        }
+      case _ => throw BadQueryException("Cannot support")
+    }
+  }
+
+  private def getEdgesExcludedAsync(jsonQuery: JsValue)
+                                   (post: (Seq[QueryRequestWithResult], 
Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
+    val q = requestParser.toQuery(jsonQuery)
+    val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
+
+    val fetchFuture = graph.getEdges(q)
+    val excludeFuture = graph.getEdges(filterOutQuery)
+
+    for {
+      queryResultLs <- fetchFuture
+      exclude <- excludeFuture
+    } yield {
+      post(queryResultLs, exclude)
+    }
+  }
+
+  private def getVertices(jsValue: JsValue) = {
+    val jsonQuery = jsValue
+    val ts = System.currentTimeMillis()
+    val props = "{}"
+
+    val vertices = jsonQuery.as[List[JsValue]].flatMap { js =>
+      val serviceName = (js \ "serviceName").as[String]
+      val columnName = (js \ "columnName").as[String]
+      for (id <- (js \ 
"ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield {
+        Management.toVertex(ts, "insert", id.toString, serviceName, 
columnName, props)
+      }
+    }
+
+    graph.getVertices(vertices) map { vertices => 
PostProcess.verticesToJson(vertices) }
+  }
+
+  private def buildRequestBody(requestKeyJsonOpt: Option[JsValue], bucket: 
Bucket, uuid: String): String = {
+    var body = bucket.requestBody.replace("#uuid", uuid)
+
+    //    // replace variable
+    //    body = TemplateHelper.replaceVariable(System.currentTimeMillis(), 
body)
+
+    // replace param
+    for {
+      requestKeyJson <- requestKeyJsonOpt
+      jsObj <- requestKeyJson.asOpt[JsObject]
+      (key, value) <- jsObj.fieldSet
+    } {
+      val replacement = value match {
+        case JsString(s) => s
+        case _ => value.toString
+      }
+      body = body.replace(key, replacement)
+    }
+
+    body
+  }
+
+  def calcSize(js: JsValue): Int = js match {
+    case JsObject(obj) => (js \ "size").asOpt[Int].getOrElse(0)
+    case JsArray(seq) => seq.map(js => (js \ 
"size").asOpt[Int].getOrElse(0)).sum
+    case _ => 0
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
new file mode 100644
index 0000000..12e9547
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
@@ -0,0 +1,24 @@
+package org.apache.s2graph.core.storage
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.types.{LabelWithDirection, SourceVertexId, 
VertexId}
+
+
+trait Deserializable[E] extends StorageDeserializable[E] {
+  import StorageDeserializable._
+
+  type RowKeyRaw = (VertexId, LabelWithDirection, Byte, Boolean, Int)
+
+  /** version 1 and version 2 share same code for parsing row key part */
+  def parseRow(kv: SKeyValue, version: String): RowKeyRaw = {
+    var pos = 0
+    val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, 
kv.row.length, version)
+    pos += srcIdLen
+    val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+    pos += 4
+    val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, 
pos)
+
+    val rowLen = srcIdLen + 4 + 1
+    (srcVertexId, labelWithDir, labelIdxSeq, isInverted, rowLen)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
new file mode 100644
index 0000000..5310248
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
@@ -0,0 +1,47 @@
+package org.apache.s2graph.core.storage
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.hbase.async.KeyValue
+
+
+object SKeyValue {
+  val Put = 1
+  val Delete = 2
+  val Increment = 3
+  val Default = Put
+}
+case class SKeyValue(table: Array[Byte],
+                     row: Array[Byte],
+                     cf: Array[Byte],
+                     qualifier: Array[Byte],
+                     value: Array[Byte],
+                     timestamp: Long,
+                     operation: Int = SKeyValue.Default) {
+  def toLogString = {
+    Map("table" -> table.toList, "row" -> row.toList, "cf" -> 
Bytes.toString(cf),
+      "qualifier" -> qualifier.toList, "value" -> value.toList, "timestamp" -> 
timestamp,
+      "operation" -> operation).toString
+  }
+  override def toString(): String = toLogString
+}
+
+trait CanSKeyValue[T] {
+  def toSKeyValue(from: T): SKeyValue
+}
+
+object CanSKeyValue {
+
+  // For asyncbase KeyValues
+  implicit val asyncKeyValue = new CanSKeyValue[KeyValue] {
+    def toSKeyValue(kv: KeyValue): SKeyValue = {
+      SKeyValue(Array.empty[Byte], kv.key(), kv.family(), kv.qualifier(), 
kv.value(), kv.timestamp())
+    }
+  }
+
+  // For asyncbase KeyValues
+  implicit val sKeyValue = new CanSKeyValue[SKeyValue] {
+    def toSKeyValue(kv: SKeyValue): SKeyValue = kv
+  }
+
+  // For hbase KeyValues
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala
new file mode 100644
index 0000000..08a3f73
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala
@@ -0,0 +1,8 @@
+package org.apache.s2graph.core.storage
+
+object Serializable {
+  val vertexCf = "v".getBytes()
+  val edgeCf = "e".getBytes()
+}
+
+trait Serializable[E] extends StorageSerializable[E]

Reply via email to