http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
new file mode 100644
index 0000000..6b3babb
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
@@ -0,0 +1,66 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.types.{InnerVal, InnerValLike}
+import org.scalatest.{FunSuite, Matchers}
+
+class JsonParserTest extends FunSuite with Matchers with TestCommon with 
JSONParser {
+
+  import InnerVal._
+  import types.HBaseType._
+
+  val innerValsPerVersion = for {
+    version <- List(VERSION2, VERSION1)
+  } yield {
+      val innerVals = List(
+        (InnerVal.withStr("ABC123", version), STRING),
+        (InnerVal.withNumber(23, version), BYTE),
+        (InnerVal.withNumber(23, version), INT),
+        (InnerVal.withNumber(Int.MaxValue, version), INT),
+        (InnerVal.withNumber(Int.MinValue, version), INT),
+        (InnerVal.withNumber(Long.MaxValue, version), LONG),
+        (InnerVal.withNumber(Long.MinValue, version), LONG),
+        (InnerVal.withBoolean(true, version), BOOLEAN)
+      )
+      val doubleVals = if (version == VERSION2) {
+        List(
+          (InnerVal.withDouble(Double.MaxValue, version), DOUBLE),
+          (InnerVal.withDouble(Double.MinValue, version), DOUBLE),
+          (InnerVal.withDouble(0.1, version), DOUBLE),
+          (InnerVal.withFloat(Float.MinValue, version), FLOAT),
+          (InnerVal.withFloat(Float.MaxValue, version), FLOAT),
+          (InnerVal.withFloat(0.9f, version), FLOAT)
+        )
+      } else {
+        List.empty[(InnerValLike, String)]
+      }
+      (innerVals ++ doubleVals, version)
+    }
+
+  def testInnerValToJsValue(innerValWithDataTypes: Seq[(InnerValLike, String)],
+                            version: String) = {
+    for {
+      (innerVal, dataType) <- innerValWithDataTypes
+    } {
+      val jsValueOpt = innerValToJsValue(innerVal, dataType)
+      val decodedOpt = jsValueOpt.flatMap { jsValue =>
+        jsValueToInnerVal(jsValue, dataType, version)
+      }
+      println(s"jsValue: $jsValueOpt")
+      println(s"innerVal: $decodedOpt")
+      (decodedOpt.isDefined && innerVal == decodedOpt.get) shouldBe true
+    }
+  }
+
+  test("aa") {
+    val innerVal = InnerVal.withStr("abc", VERSION2)
+    val tmp = innerValToJsValue(innerVal, "string")
+    println(tmp)
+  }
+  test("JsValue <-> InnerVal with dataType") {
+    for {
+      (innerValWithDataTypes, version) <- innerValsPerVersion
+    } {
+      testInnerValToJsValue(innerValWithDataTypes, version)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/OrderingUtilTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/OrderingUtilTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/OrderingUtilTest.scala
new file mode 100644
index 0000000..e3f1ed5
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/OrderingUtilTest.scala
@@ -0,0 +1,130 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.OrderingUtil._
+import org.scalatest.{FunSuite, Matchers}
+import play.api.libs.json.JsString
+
+class OrderingUtilTest extends FunSuite with Matchers {
+  test("test SeqMultiOrdering") {
+    val jsLs: Seq[Seq[Any]] = Seq(
+      Seq(0, "a"),
+      Seq(0, "b"),
+      Seq(1, "a"),
+      Seq(1, "b"),
+      Seq(2, "c")
+    )
+
+    // number descending, string ascending
+    val sortedJsLs: Seq[Seq[Any]] = Seq(
+      Seq(2, "c"),
+      Seq(1, "a"),
+      Seq(1, "b"),
+      Seq(0, "a"),
+      Seq(0, "b")
+    )
+
+    val ascendingLs: Seq[Boolean] = Seq(false, true)
+    val resultJsLs = jsLs.sorted(new SeqMultiOrdering[Any](ascendingLs))
+
+    resultJsLs.toString() should equal(sortedJsLs.toString())
+  }
+
+  test("test tuple 1 TupleMultiOrdering") {
+    val jsLs: Seq[(Any, Any, Any, Any)] = Seq(
+      (0, None, None, None),
+      (0, None, None, None),
+      (1, None, None, None),
+      (1, None, None, None),
+      (2, None, None, None)
+    )
+
+    val sortedJsLs: Seq[(Any, Any, Any, Any)] = Seq(
+      (2, None, None, None),
+      (1, None, None, None),
+      (1, None, None, None),
+      (0, None, None, None),
+      (0, None, None, None)
+    )
+
+    val ascendingLs: Seq[Boolean] = Seq(false)
+    val resultJsLs = jsLs.sorted(TupleMultiOrdering[Any](ascendingLs))
+
+    resultJsLs.toString() should equal(sortedJsLs.toString())
+  }
+
+  test("test tuple 2 TupleMultiOrdering") {
+    val jsLs: Seq[(Any, Any, Any, Any)] = Seq(
+      (0, "a", None, None),
+      (0, "b", None, None),
+      (1, "a", None, None),
+      (1, "b", None, None),
+      (2, "c", None, None)
+    )
+
+    // number descending, string ascending
+    val sortedJsLs: Seq[(Any, Any, Any, Any)] = Seq(
+      (2, "c", None, None),
+      (1, "a", None, None),
+      (1, "b", None, None),
+      (0, "a", None, None),
+      (0, "b", None, None)
+    )
+
+    val ascendingLs: Seq[Boolean] = Seq(false, true)
+    val resultJsLs = jsLs.sorted(TupleMultiOrdering[Any](ascendingLs))
+
+    resultJsLs.toString() should equal(sortedJsLs.toString())
+  }
+
+  test("test tuple 3 TupleMultiOrdering") {
+    val jsLs: Seq[(Any, Any, Any, Any)] = Seq(
+      (0, "a", 0l, None),
+      (0, "a", 1l, None),
+      (0, "b", 0l, None),
+      (1, "a", 0l, None),
+      (1, "b", 0l, None),
+      (2, "c", 0l, None)
+    )
+
+    val sortedJsLs: Seq[(Any, Any, Any, Any)] = Seq(
+      (0, "a", 1l, None),
+      (0, "a", 0l, None),
+      (0, "b", 0l, None),
+      (1, "a", 0l, None),
+      (1, "b", 0l, None),
+      (2, "c", 0l, None)
+    )
+
+    val ascendingLs: Seq[Boolean] = Seq(true, true, false)
+    val resultJsLs = jsLs.sorted(TupleMultiOrdering[Any](ascendingLs))
+
+    resultJsLs.toString() should equal(sortedJsLs.toString())
+  }
+
+  test("test tuple 4 TupleMultiOrdering") {
+    val jsLs: Seq[(Any, Any, Any, Any)] = Seq(
+      (0, "a", 0l, JsString("a")),
+      (0, "a", 0l, JsString("b")),
+      (0, "a", 1l, JsString("a")),
+      (0, "b", 0l, JsString("b")),
+      (1, "a", 0l, JsString("b")),
+      (1, "b", 0l, JsString("b")),
+      (2, "c", 0l, JsString("b"))
+    )
+
+    val sortedJsLs: Seq[(Any, Any, Any, Any)] = Seq(
+      (0, "a", 0l, JsString("b")),
+      (0, "a", 0l, JsString("a")),
+      (0, "a", 1l, JsString("a")),
+      (0, "b", 0l, JsString("b")),
+      (1, "a", 0l, JsString("b")),
+      (1, "b", 0l, JsString("b")),
+      (2, "c", 0l, JsString("b"))
+    )
+
+    val ascendingLs: Seq[Boolean] = Seq(true, true, true, false)
+    val resultJsLs = jsLs.sorted(TupleMultiOrdering[Any](ascendingLs))
+
+    resultJsLs.toString() should equal(sortedJsLs.toString())
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala
new file mode 100644
index 0000000..5b324e1
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala
@@ -0,0 +1,86 @@
+package org.apache.s2graph.core
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.types.LabelWithDirection
+import org.scalatest.{FunSuite, Matchers}
+
+class QueryParamTest extends FunSuite with Matchers with TestCommon {
+//  val version = HBaseType.VERSION2
+//  val testEdge = Management.toEdge(ts, "insert", "1", "10", labelNameV2, 
"out", Json.obj("is_blocked" -> true, "phone_number" -> "xxxx", "age" -> 
20).toString)
+//  test("EdgeTransformer toInnerValOpt") {
+//
+//    /** only labelNameV2 has string type output */
+//    val jsVal = Json.arr(Json.arr("_to"), Json.arr("phone_number.$", 
"phone_number"), Json.arr("age.$", "age"))
+//    val transformer = EdgeTransformer(queryParamV2, jsVal)
+//    val convertedLs = transformer.transform(testEdge, None)
+//
+//    convertedLs(0).tgtVertex.innerId.toString == "10" shouldBe true
+//    convertedLs(1).tgtVertex.innerId.toString == "phone_number.xxxx" 
shouldBe true
+//    convertedLs(2).tgtVertex.innerId.toString == "age.20" shouldBe true
+//    true
+//  }
+
+  val dummyRequests = {
+    for {
+      id <- 0 until 1000
+    } yield {
+      Bytes.toBytes(id)
+    }
+  }
+
+  test("QueryParam toCacheKey bytes") {
+    val startedAt = System.nanoTime()
+    val queryParam = QueryParam(LabelWithDirection(1, 0))
+
+    for {
+      i <- dummyRequests.indices
+      x = queryParam.toCacheKey(dummyRequests(i))
+    } {
+      for {
+        j <- dummyRequests.indices if i != j
+        y = queryParam.toCacheKey(dummyRequests(j))
+      } {
+        x should not equal y
+      }
+    }
+
+    dummyRequests.zip(dummyRequests).foreach { case (x, y) =>
+      val xHash = queryParam.toCacheKey(x)
+      val yHash = queryParam.toCacheKey(y)
+//      println(xHash, yHash)
+      xHash should be(yHash)
+    }
+    val duration = System.nanoTime() - startedAt
+
+    println(s">> bytes: $duration")
+  }
+
+  test("QueryParam toCacheKey with variable params") {
+    val startedAt = System.nanoTime()
+    val queryParam = QueryParam(LabelWithDirection(1, 0))
+
+    dummyRequests.zip(dummyRequests).foreach { case (x, y) =>
+      x shouldBe y
+      queryParam.limit(0, 10)
+      var xHash = queryParam.toCacheKey(x)
+      xHash shouldBe queryParam.toCacheKey(y)
+      queryParam.limit(1, 10)
+      var yHash = queryParam.toCacheKey(y)
+      queryParam.toCacheKey(x) shouldBe yHash
+//      println(xHash, yHash)
+      xHash should not be yHash
+
+      queryParam.limit(0, 10)
+      xHash = queryParam.toCacheKey(x)
+      queryParam.limit(0, 11)
+      yHash = queryParam.toCacheKey(y)
+
+      xHash should not be yHash
+    }
+
+    val duration = System.nanoTime() - startedAt
+
+    println(s">> diff: $duration")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
new file mode 100644
index 0000000..2c16972
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
@@ -0,0 +1,188 @@
+package org.apache.s2graph.core
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.mysqls.{LabelIndex, LabelMeta}
+import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLikeWithTs, 
LabelWithDirection}
+
+
+trait TestCommon {
+  val ts = System.currentTimeMillis()
+  val testServiceId = 1
+  val testColumnId = 1
+  val testLabelId = 1
+  val testDir = GraphUtil.directions("out")
+  val testOp = GraphUtil.operations("insert")
+  val testLabelOrderSeq = LabelIndex.DefaultSeq
+  val testLabelWithDir = LabelWithDirection(testLabelId, testDir)
+  val labelMeta = LabelMeta
+
+  def equalsTo(x: Array[Byte], y: Array[Byte]) = Bytes.compareTo(x, y) == 0
+
+  def largerThan(x: Array[Byte], y: Array[Byte]) = Bytes.compareTo(x, y) > 0
+
+  def lessThan(x: Array[Byte], y: Array[Byte]) = Bytes.compareTo(x, y) < 0
+
+  def lessThanEqual(x: Array[Byte], y: Array[Byte]) = Bytes.compareTo(x, y) <= 0
+
+  /** */
+  import HBaseType.{VERSION1, VERSION2}
+  private val tsValSmall = InnerVal.withLong(ts, VERSION1)
+  private val tsValLarge = InnerVal.withLong(ts + 1, VERSION1)
+  private val boolValSmall = InnerVal.withBoolean(false, VERSION1)
+  private val boolValLarge = InnerVal.withBoolean(true, VERSION1)
+  private val doubleValSmall = InnerVal.withDouble(-0.1, VERSION1)
+  private val doubleValLarge = InnerVal.withDouble(0.1, VERSION1)
+  private val toSeq = LabelMeta.toSeq.toInt
+  private val toVal = InnerVal.withLong(Long.MinValue, VERSION1)
+
+
+  private val tsValSmallV2 = InnerVal.withLong(ts, VERSION2)
+  private val tsValLargeV2 = InnerVal.withLong(ts + 1, VERSION2)
+  private val boolValSmallV2 = InnerVal.withBoolean(false, VERSION2)
+  private val boolValLargeV2 = InnerVal.withBoolean(true, VERSION2)
+  private val doubleValSmallV2 = InnerVal.withDouble(-0.1, VERSION2)
+  private val doubleValLargeV2 = InnerVal.withDouble(0.1, VERSION2)
+  private val toValV2 = InnerVal.withLong(Long.MinValue, VERSION2)
+
+  val intVals = (Int.MinValue until Int.MinValue + 10) ++ (-129 to -126) ++ 
(-1 to 1) ++ (126 to 129) ++
+    (Int.MaxValue - 10 until Int.MaxValue)
+  val intInnerVals = intVals.map { v => InnerVal.withNumber(BigDecimal(v), 
VERSION1) }
+
+  val intInnerValsV2 = intVals.map { v => InnerVal.withNumber(BigDecimal(v), 
VERSION2) }
+
+  val stringVals = List("abc", "abd", "ac", "aca", "b")
+  val stringInnerVals = stringVals.map { s => InnerVal.withStr(s, VERSION1)}
+  val stringInnerValsV2 = stringVals.map { s => InnerVal.withStr(s, VERSION2)}
+
+  val numVals = (Long.MinValue until Long.MinValue + 10).map(BigDecimal(_)) ++
+    (Int.MinValue until Int.MinValue + 10).map(BigDecimal(_)) ++
+    (Int.MaxValue - 10 until Int.MaxValue).map(BigDecimal(_)) ++
+    (Long.MaxValue - 10 until Long.MaxValue).map(BigDecimal(_))
+  val numInnerVals = numVals.map { n => InnerVal.withLong(n.toLong, VERSION1)}
+  val numInnerValsV2 = numVals.map { n => InnerVal.withNumber(n, VERSION2)}
+
+  val doubleStep = Double.MaxValue / 5
+  val doubleVals = (Double.MinValue until 0 by doubleStep).map(BigDecimal(_)) 
++
+    (-9999.9 until -9994.1 by 1.1).map(BigDecimal(_)) ++
+    (-128.0 until 128.0 by 1.2).map(BigDecimal(_)) ++
+    (129.0 until 142.0 by 1.1).map(BigDecimal(_)) ++
+    (doubleStep until Double.MaxValue by doubleStep).map(BigDecimal(_))
+  val doubleInnerVals = doubleVals.map { d => InnerVal.withDouble(d.toDouble, 
VERSION1)}
+  val doubleInnerValsV2 = doubleVals.map { d => 
InnerVal.withDouble(d.toDouble, VERSION2)}
+
+  /** version 1 string order is broken */
+  val idxPropsLs = Seq(
+    Seq((0 -> tsValSmall), (1 -> boolValSmall), (2 -> InnerVal.withStr("ac", 
VERSION1)),(toSeq -> toVal)),
+    Seq((0 -> tsValSmall), (1 -> boolValSmall), (2 -> InnerVal.withStr("ab", 
VERSION1)), (toSeq -> toVal)),
+    Seq((0 -> tsValSmall), (1 -> boolValSmall), (2-> InnerVal.withStr("b", 
VERSION1)), (toSeq -> toVal)),
+    Seq((0 -> tsValSmall), (1 -> boolValLarge), (2 -> InnerVal.withStr("b", 
VERSION1)), (toSeq -> toVal)),
+    Seq((0 -> tsValLarge), (1 -> boolValSmall), (2 -> InnerVal.withStr("a", 
VERSION1)), (toSeq -> toVal))
+  ).map(seq => seq.map(t => t._1.toByte -> t._2 ))
+
+  val idxPropsLsV2 = Seq(
+    Seq((0 -> tsValSmallV2), (1 -> boolValSmallV2), (2 -> 
InnerVal.withStr("a", VERSION2)), (3 -> doubleValSmallV2), (toSeq -> toValV2)),
+    Seq((0 -> tsValSmallV2), (1 -> boolValSmallV2), (2 -> 
InnerVal.withStr("a", VERSION2)), (3 -> doubleValLargeV2), (toSeq -> toValV2)),
+    Seq((0 -> tsValSmallV2), (1 -> boolValSmallV2), (2 -> 
InnerVal.withStr("ab", VERSION2)), (3 -> doubleValLargeV2), (toSeq -> toValV2)),
+    Seq((0 -> tsValSmallV2), (1 -> boolValSmallV2), (2-> InnerVal.withStr("b", 
VERSION2)), (3 ->doubleValLargeV2), (toSeq -> toValV2)),
+    Seq((0 -> tsValSmallV2), (1 -> boolValLargeV2), (2 -> 
InnerVal.withStr("a", VERSION2)), (3 ->doubleValLargeV2), (toSeq -> toValV2)),
+    Seq((0 -> tsValLargeV2), (1 -> boolValSmallV2), (2 -> 
InnerVal.withStr("a", VERSION2)), (3 ->doubleValLargeV2), (toSeq -> toValV2))
+  ).map(seq => seq.map(t => t._1.toByte -> t._2 ) )
+
+  val idxPropsWithTsLs = idxPropsLs.map { idxProps =>
+    idxProps.map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }
+  }
+  val idxPropsWithTsLsV2 = idxPropsLsV2.map { idxProps =>
+    idxProps.map { case (k, v) => k -> InnerValLikeWithTs(v, ts) }
+  }
+  //
+  //  def testOrder(idxPropsLs: Seq[Seq[(Byte, InnerValLike)]],
+  //                innerVals: Iterable[InnerValLike], skipHashBytes: Boolean 
= false)
+  //               (createFunc: (Seq[(Byte, InnerValLike)], InnerValLike) => 
HBaseSerializable,
+  //                fromBytesFunc: Array[Byte] => HBaseSerializable) = {
+  //    /** check if increasing target vertex id is ordered properly with same 
indexProps */
+  //    val rets = for {
+  //      idxProps <- idxPropsLs
+  //    } yield {
+  //        val head = createFunc(idxProps, innerVals.head)
+  //        val start = head
+  //        var prev = head
+  //        val rets = for {
+  //          innerVal <- innerVals.tail
+  //        } yield {
+  //            val current = createFunc(idxProps, innerVal)
+  //            val bytes = current.bytes
+  //            val decoded = fromBytesFunc(bytes)
+  //            println(s"current: $current")
+  //            println(s"decoded: $decoded")
+  //
+  //            val prevBytes = if (skipHashBytes) 
prev.bytes.drop(GraphUtil.bytesForMurMurHash) else prev.bytes
+  //            val currentBytes = if (skipHashBytes) 
bytes.drop(GraphUtil.bytesForMurMurHash) else bytes
+  //            val (isSame, orderPreserved) = (current, decoded) match {
+  //              case (c: v2.EdgeQualifier, d: v2.EdgeQualifier) if 
(idxProps.map(_._1).contains(toSeq)) =>
+  //                /** _to is used in indexProps */
+  //                (c.props.map(_._2) == d.props.map(_._2) && c.op == d.op, 
Bytes.compareTo(currentBytes, prevBytes) <= 0)
+  //              case _ =>
+  //                (current == decoded, lessThan(currentBytes, prevBytes))
+  //            }
+  //
+  //            println(s"$current ${bytes.toList}")
+  //            println(s"$prev ${prev.bytes.toList}")
+  //            println(s"SerDe[$isSame], Order[$orderPreserved]")
+  //            prev = current
+  //            isSame && orderPreserved
+  //          }
+  //        rets.forall(x => x)
+  //      }
+  //    rets.forall(x => x)
+  //  }
+  //  def testOrderReverse(idxPropsLs: Seq[Seq[(Byte, InnerValLike)]], 
innerVals: Iterable[InnerValLike],
+  //                       skipHashBytes: Boolean = false)
+  //                      (createFunc: (Seq[(Byte, InnerValLike)], 
InnerValLike) => HBaseSerializable,
+  //                       fromBytesFunc: Array[Byte] => HBaseSerializable) = {
+  //    /** check if increasing target vertex id is ordered properly with same 
indexProps */
+  //    val rets = for {
+  //      innerVal <- innerVals
+  //    } yield {
+  //        val head = createFunc(idxPropsLs.head, innerVal)
+  //        val start = head
+  //        var prev = head
+  //        val rets = for {
+  //          idxProps <- idxPropsLs.tail
+  //        } yield {
+  //            val current = createFunc(idxProps, innerVal)
+  //            val bytes = current.bytes
+  //            val decoded = fromBytesFunc(bytes)
+  //            println(s"current: $current")
+  //            println(s"decoded: $decoded")
+  //
+  //            val prevBytes = if (skipHashBytes) 
prev.bytes.drop(GraphUtil.bytesForMurMurHash) else prev.bytes
+  //            val currentBytes = if (skipHashBytes) 
bytes.drop(GraphUtil.bytesForMurMurHash) else bytes
+  //            val (isSame, orderPreserved) = (current, decoded) match {
+  //              case (c: v2.EdgeQualifier, d: v2.EdgeQualifier) if 
(idxProps.map(_._1).contains(toSeq)) =>
+  //                /** _to is used in indexProps */
+  //                (c.props.map(_._2) == d.props.map(_._2) && c.op == d.op, 
Bytes.compareTo(currentBytes, prevBytes) <= 0)
+  //              case _ =>
+  //                (current == decoded, lessThan(currentBytes, prevBytes))
+  //            }
+  //
+  //            println(s"$current ${bytes.toList}")
+  //            println(s"$prev ${prev.bytes.toList}")
+  //            println(s"SerDe[$isSame], Order[$orderPreserved]")
+  //            prev = current
+  //            isSame && orderPreserved
+  //          }
+  //
+  //        rets.forall(x => x)
+  //      }
+  //
+  //    rets.forall(x => x)
+  //  }
+  //
+  //
+  //  def putToKeyValues(put: PutRequest) = {
+  //    val ts = put.timestamp()
+  //    for ((q, v) <- put.qualifiers().zip(put.values)) yield {
+  //      new KeyValue(put.key(), put.family(), q, ts, v)
+  //    }
+  //  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
new file mode 100644
index 0000000..74b4197
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
@@ -0,0 +1,194 @@
+package org.apache.s2graph.core
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.mysqls.{Label, LabelIndex, Service, 
ServiceColumn}
+import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection}
+import scalikejdbc.AutoSession
+
+import scala.concurrent.ExecutionContext
+
+trait TestCommonWithModels {
+
+  import InnerVal._
+  import types.HBaseType._
+
+  var graph: Graph = _
+  var config: Config = _
+  var management: Management = _
+
+  def initTests() = {
+    config = ConfigFactory.load()
+    graph = new Graph(config)(ExecutionContext.Implicits.global)
+    management = new Management(graph)
+
+    implicit val session = AutoSession
+
+    deleteTestLabel()
+    deleteTestService()
+
+    createTestService()
+    createTestLabel()
+  }
+
+  def zkQuorum = config.getString("hbase.zookeeper.quorum")
+
+  def cluster = config.getString("hbase.zookeeper.quorum")
+
+  implicit val session = AutoSession
+
+  val serviceName = "_test_service"
+  val serviceNameV2 = "_test_service_v2"
+  val serviceNameV3 = "_test_service_v3"
+  val serviceNameV4 = "_test_service_v4"
+
+  val columnName = "user_id"
+  val columnNameV2 = "user_id_v2"
+  val columnNameV3 = "user_id_v3"
+  val columnNameV4 = "user_id_v4"
+
+  val columnType = "long"
+  val columnTypeV2 = "long"
+  val columnTypeV3 = "long"
+  val columnTypeV4 = "long"
+
+  val tgtColumnName = "itme_id"
+  val tgtColumnNameV2 = "item_id_v2"
+  val tgtColumnNameV3 = "item_id_v3"
+  val tgtColumnNameV4 = "item_id_v4"
+
+  val tgtColumnType = "string"
+  val tgtColumnTypeV2 = "string"
+  val tgtColumnTypeV3 = "string"
+  val tgtColumnTypeV4 = "string"
+
+  val hTableName = "_test_cases"
+  val preSplitSize = 0
+
+  val labelName = "_test_label"
+  val labelNameV2 = "_test_label_v2"
+  val labelNameV3 = "_test_label_v3"
+  val labelNameV4 = "_test_label_v4"
+
+  val undirectedLabelName = "_test_label_undirected"
+  val undirectedLabelNameV2 = "_test_label_undirected_v2"
+
+  val testProps = Seq(
+    Prop("affinity_score", "0.0", DOUBLE),
+    Prop("is_blocked", "false", BOOLEAN),
+    Prop("time", "0", INT),
+    Prop("weight", "0", INT),
+    Prop("is_hidden", "true", BOOLEAN),
+    Prop("phone_number", "xxx-xxx-xxxx", STRING),
+    Prop("score", "0.1", FLOAT),
+    Prop("age", "10", INT)
+  )
+  val testIdxProps = Seq(Index("_PK", Seq("_timestamp", "affinity_score")))
+  val consistencyLevel = "strong"
+  val hTableTTL = None
+
+
+  def createTestService() = {
+    implicit val session = AutoSession
+    management.createService(serviceName, cluster, hTableName, preSplitSize, 
hTableTTL = None, "gz")
+    management.createService(serviceNameV2, cluster, hTableName, preSplitSize, 
hTableTTL = None, "gz")
+    management.createService(serviceNameV3, cluster, hTableName, preSplitSize, 
hTableTTL = None, "gz")
+    management.createService(serviceNameV4, cluster, hTableName, preSplitSize, 
hTableTTL = None, "gz")
+  }
+
+  def deleteTestService() = {
+    implicit val session = AutoSession
+    Management.deleteService(serviceName)
+    Management.deleteService(serviceNameV2)
+    Management.deleteService(serviceNameV3)
+    Management.deleteService(serviceNameV4)
+  }
+
+  def deleteTestLabel() = {
+    implicit val session = AutoSession
+    Management.deleteLabel(labelName)
+    Management.deleteLabel(labelNameV2)
+    Management.deleteLabel(undirectedLabelName)
+    Management.deleteLabel(undirectedLabelNameV2)
+  }
+
+  def createTestLabel() = {
+    implicit val session = AutoSession
+    management.createLabel(labelName, serviceName, columnName, columnType, 
serviceName, columnName, columnType,
+      isDirected = true, serviceName, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4")
+
+    management.createLabel(labelNameV2, serviceNameV2, columnNameV2, 
columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
+      isDirected = true, serviceNameV2, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4")
+
+    management.createLabel(labelNameV3, serviceNameV3, columnNameV3, 
columnTypeV3, serviceNameV3, tgtColumnNameV3, tgtColumnTypeV3,
+      isDirected = true, serviceNameV3, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4")
+
+    management.createLabel(labelNameV4, serviceNameV4, columnNameV4, 
columnTypeV4, serviceNameV4, tgtColumnNameV4, tgtColumnTypeV4,
+      isDirected = true, serviceNameV4, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION4, false, "lg4")
+
+    management.createLabel(undirectedLabelName, serviceName, columnName, 
columnType, serviceName, tgtColumnName, tgtColumnType,
+      isDirected = false, serviceName, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4")
+
+    management.createLabel(undirectedLabelNameV2, serviceNameV2, columnNameV2, 
columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
+      isDirected = false, serviceName, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4")
+  }
+
+  def service = Service.findByName(serviceName, useCache = false).get
+
+  def serviceV2 = Service.findByName(serviceNameV2, useCache = false).get
+
+  def serviceV3 = Service.findByName(serviceNameV3, useCache = false).get
+
+  def serviceV4 = Service.findByName(serviceNameV4, useCache = false).get
+
+  def column = ServiceColumn.find(service.id.get, columnName, useCache = 
false).get
+
+  def columnV2 = ServiceColumn.find(serviceV2.id.get, columnNameV2, useCache = 
false).get
+
+  def columnV3 = ServiceColumn.find(serviceV3.id.get, columnNameV3, useCache = 
false).get
+
+  def columnV4 = ServiceColumn.find(serviceV4.id.get, columnNameV4, useCache = 
false).get
+
+  def tgtColumn = ServiceColumn.find(service.id.get, tgtColumnName, useCache = 
false).get
+
+  def tgtColumnV2 = ServiceColumn.find(serviceV2.id.get, tgtColumnNameV2, 
useCache = false).get
+
+  def tgtColumnV3 = ServiceColumn.find(serviceV3.id.get, tgtColumnNameV3, 
useCache = false).get
+
+  def tgtColumnV4 = ServiceColumn.find(serviceV4.id.get, tgtColumnNameV4, 
useCache = false).get
+
+  def label = Label.findByName(labelName, useCache = false).get
+
+  def labelV2 = Label.findByName(labelNameV2, useCache = false).get
+
+  def labelV3 = Label.findByName(labelNameV3, useCache = false).get
+
+  def labelV4 = Label.findByName(labelNameV4, useCache = false).get
+
+  def undirectedLabel = Label.findByName(undirectedLabelName, useCache = 
false).get
+
+  def undirectedLabelV2 = Label.findByName(undirectedLabelNameV2, useCache = 
false).get
+
+  def dir = GraphUtil.directions("out")
+
+  def op = GraphUtil.operations("insert")
+
+  def labelOrderSeq = LabelIndex.DefaultSeq
+
+  def labelWithDir = LabelWithDirection(label.id.get, dir)
+
+  def labelWithDirV2 = LabelWithDirection(labelV2.id.get, dir)
+
+  def labelWithDirV3 = LabelWithDirection(labelV3.id.get, dir)
+
+  def labelWithDirV4 = LabelWithDirection(labelV4.id.get, dir)
+
+  def queryParam = QueryParam(labelWithDir)
+
+  def queryParamV2 = QueryParam(labelWithDirV2)
+
+  def queryParamV3 = QueryParam(labelWithDirV3)
+
+  def queryParamV4 = QueryParam(labelWithDirV4)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala
new file mode 100644
index 0000000..5b32a0f
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala
@@ -0,0 +1,130 @@
+package org.apache.s2graph.core.models
+
+import org.apache.s2graph.core.TestCommonWithModels
+import org.apache.s2graph.core.mysqls.Label
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+class ModelTest extends FunSuite with Matchers with TestCommonWithModels with 
BeforeAndAfterAll {
+  override def beforeAll(): Unit = {
+    initTests()
+  }
+
+  override def afterAll(): Unit = {
+    graph.shutdown()
+  }
+
+  //  val serviceName = "testService"
+  //  val newServiceName = "newTestService"
+  //  val cluster = "localhost"
+  //  val hbaseTableName = "s2graph-dev"
+  //  val columnName = "user_id"
+  //  val columnType = "long"
+  //  val labelName = "model_test_label"
+  //  val newLabelName = "new_model_test_label"
+  //  val columnMetaName = "is_valid_user"
+  //  val labelMetaName = "is_hidden"
+  //  val hbaseTableTTL = -1
+  //  val id = 1
+  //
+  //  val service = HService(Map("id" -> id, "serviceName" -> serviceName, 
"cluster" -> cluster,
+  //    "hbaseTableName" -> hbaseTableName, "preSplitSize" -> 0, 
"hbaseTableTTL" -> -1))
+  //  val serviceColumn = HServiceColumn(Map("id" -> id, "serviceId" -> 
service.id.get,
+  //    "columnName" -> columnName, "columnType" -> columnType))
+  //  val columnMeta = HColumnMeta(Map("id" -> id, "columnId" -> 
serviceColumn.id.get, "name" -> columnMetaName,  "seq" -> 1.toByte))
+  //  val label = HLabel(Map("id" -> id, "label" -> labelName,
+  //    "srcServiceId" -> service.id.get, "srcColumnName" -> columnName, 
"srcColumnType" -> columnType,
+  //    "tgtServiceId" -> service.id.get, "tgtColumnName" -> columnName, 
"tgtColumnType" -> columnType,
+  //    "isDirected" -> true, "serviceName" -> service.serviceName, 
"serviceId" -> service.id.get,
+  //    "consistencyLevel" -> "weak", "hTableName" -> hbaseTableName, 
"hTableTTL" -> -1
+  //  ))
+  //  val labelMeta = HLabelMeta(Map("id" -> id, "labelId" -> label.id.get, 
"name" -> labelMetaName, "seq" -> 1.toByte,
+  //    "defaultValue" -> false, "dataType" -> "boolean", "usedInIndex" -> 
false))
+  //  val labelIndex = HLabelIndex(Map("id" -> id, "labelId" -> label.id.get, 
"seq" -> 1.toByte,
+  //    "metaSeqs" -> "0", "formular" -> "none"))
+  test("test Label.findByName") {
+    val labelOpt = Label.findByName(labelName, useCache = false)
+    println(labelOpt)
+    labelOpt.isDefined shouldBe true
+    val indices = labelOpt.get.indices
+    indices.size > 0 shouldBe true
+    println(indices)
+    val defaultIndexOpt = labelOpt.get.defaultIndex
+    println(defaultIndexOpt)
+    defaultIndexOpt.isDefined shouldBe true
+    val metas = labelOpt.get.metaProps
+    println(metas)
+    metas.size > 0 shouldBe true
+    val srcService = labelOpt.get.srcService
+    println(srcService)
+    val tgtService = labelOpt.get.tgtService
+    println(tgtService)
+    val service = labelOpt.get.service
+    println(service)
+    val srcColumn = labelOpt.get.srcService
+    println(srcColumn)
+    val tgtColumn = labelOpt.get.tgtService
+    println(tgtColumn)
+  }
+  //  test("test create") {
+  //    service.create()
+  //    HService.findByName(serviceName, useCache = false) == Some(service)
+  //
+  //    serviceColumn.create()
+  //    HServiceColumn.findsByServiceId(service.id.get, useCache = 
false).headOption == Some(serviceColumn)
+  //
+  //    columnMeta.create()
+  //    HColumnMeta.findByName(serviceColumn.id.get, columnMetaName, useCache 
= false) == Some(columnMeta)
+  //
+  //    label.create()
+  //    HLabel.findByName(labelName, useCache = false) == Some(label)
+  //
+  //    labelMeta.create()
+  //    HLabelMeta.findByName(label.id.get, labelMetaName, useCache = false) 
== Some(labelMeta)
+  //
+  //    labelIndex.create()
+  //    HLabelIndex.findByLabelIdAll(label.id.get, useCache = 
false).headOption == Some(labelIndex)
+  //  }
+  //
+  //  test("test update") {
+  //    service.update("cluster", "...")
+  //    HService.findById(service.id.get, useCache = false).cluster == "..."
+  //
+  //    service.update("serviceName", newServiceName)
+  //    assert(HService.findByName(serviceName, useCache = false) == None)
+  //    HService.findByName(newServiceName, useCache = false).map { service => 
service.id.get == service.id.get}
+  //
+  //    label.update("label", newLabelName)
+  //    HLabel.findById(label.id.get, useCache = false).label == "newLabelName"
+  //
+  //    label.update("consistencyLevel", "strong")
+  //    HLabel.findById(label.id.get, useCache = false).consistencyLevel == 
"strong" &&
+  //    HLabel.findByName(newLabelName).isDefined &&
+  //    HLabel.findByName(labelName) == None
+  //
+  //  }
+  //  test("test read by index") {
+  //    val labels = HLabel.findBySrcServiceId(service.id.get, useCache = 
false)
+  //    val idxs = HLabelIndex.findByLabelIdAll(label.id.get, useCache = false)
+  //    labels.length == 1 &&
+  //    labels.head == label
+  //    idxs.length == 1 &&
+  //    idxs.head == labelIndex
+  //  }
+  //  test("test delete") {
+  ////    HLabel.findByName(labelName).foreach { label =>
+  ////      label.deleteAll()
+  ////    }
+  //    HLabel.findByName(newLabelName).foreach { label =>
+  //      label.deleteAll()
+  //    }
+  //    HLabelMeta.findAllByLabelId(label.id.get, useCache = false).isEmpty &&
+  //    HLabelIndex.findByLabelIdAll(label.id.get, useCache = false).isEmpty
+  //
+  //    service.deleteAll()
+  //  }
+
+  //  test("test labelIndex") {
+  //    println(HLabelIndex.findByLabelIdAll(1))
+  //  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/mysqls/ExperimentSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/mysqls/ExperimentSpec.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/mysqls/ExperimentSpec.scala
new file mode 100644
index 0000000..ccae6b6
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/mysqls/ExperimentSpec.scala
@@ -0,0 +1,64 @@
+package org.apache.s2graph.core.mysqls
+
+import java.util.Properties
+
+import com.typesafe.config.ConfigFactory
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import scalikejdbc._
+
+class ExperimentSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+  val Ttl = 2
+  override def beforeAll(): Unit = {
+    /*
+    maxSize = config.getInt("cache.max.size")
+    ttl = config.getInt("cache.ttl.seconds")
+     */
+    val props = new Properties()
+    props.setProperty("cache.ttl.seconds", Ttl.toString)
+    Model.apply(ConfigFactory.load(ConfigFactory.parseProperties(props)))
+
+    implicit val session = AutoSession
+    sql"""DELETE FROM buckets""".update().apply()
+    sql"""DELETE FROM experiments""".update().apply()
+
+    val expId = sql"""INSERT INTO experiments(service_id, service_name, 
`name`, description) VALUES(1, "s1", "exp1", 
"")""".updateAndReturnGeneratedKey().apply()
+    sql"""INSERT INTO
+           buckets(experiment_id, modular, http_verb, api_path, request_body, 
impression_id)
+           VALUES($expId, "1~100", "POST", "/a/b/c", "None", 
"imp1")""".update().apply()
+
+  }
+
+  "Experiment" should "find bucket list" in {
+    Experiment.findBy(1, "exp1") should not be empty
+
+    Experiment.findBy(1, "exp1").foreach { exp =>
+      val bucket = exp.buckets.head
+      bucket.impressionId should equal("imp1")
+    }
+  }
+
+  it should "update bucket list after cache ttl time" in {
+    Experiment.findBy(1, "exp1").foreach { exp =>
+      val bucket = exp.buckets.head
+      bucket.impressionId should equal("imp1")
+
+      implicit val session = AutoSession
+
+      sql"""UPDATE buckets SET impression_id = "imp2" WHERE id = 
${bucket.id}""".update().apply()
+    }
+
+    // sleep ttl time
+    Thread.sleep((Ttl + 1) * 1000)
+
+    // update experiment and bucket
+    Experiment.findBy(1, "exp1").foreach(exp => exp.buckets)
+
+    // wait for cache updating
+    Thread.sleep(1 * 1000)
+
+    // should be updated
+    Experiment.findBy(1, "exp1").foreach { exp =>
+      exp.buckets.head.impressionId should equal("imp2")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
new file mode 100644
index 0000000..f251654
--- /dev/null
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
@@ -0,0 +1,234 @@
+package org.apache.s2graph.core.parsers
+
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.rest.TemplateHelper
+import org.apache.s2graph.core.types._
+import org.scalatest.{FunSuite, Matchers}
+import play.api.libs.json.Json
+
+class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels 
{
+  initTests()
+
+  // dummy data for dummy edge
+  initTests()
+  
+  import HBaseType.{VERSION1, VERSION2}
+
+  val ts = System.currentTimeMillis()
+  val dummyTs = (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, 
label.schemaVersion))
+
+  def ids(version: String) = {
+    val colId = if (version == VERSION2) columnV2.id.get else column.id.get
+    val srcId = SourceVertexId(colId, InnerVal.withLong(1, version))
+    val tgtId = TargetVertexId(colId, InnerVal.withLong(2, version))
+
+    val srcIdStr = SourceVertexId(colId, InnerVal.withStr("abc", version))
+    val tgtIdStr = TargetVertexId(colId, InnerVal.withStr("def", version))
+
+    val srcVertex = Vertex(srcId, ts)
+    val tgtVertex = Vertex(tgtId, ts)
+    val srcVertexStr = Vertex(srcIdStr, ts)
+    val tgtVertexStr = Vertex(tgtIdStr, ts)
+    (srcId, tgtId, srcIdStr, tgtIdStr, srcVertex, tgtVertex, srcVertexStr, 
tgtVertexStr, version)
+  }
+
+
+  def validate(label: Label)(edge: Edge)(sql: String)(expected: Boolean) = {
+    val whereOpt = WhereParser(label).parse(sql)
+    whereOpt.isSuccess shouldBe true
+
+    
println("=================================================================")
+    println(sql)
+    println(whereOpt.get)
+
+    val ret = whereOpt.get.filter(edge)
+    if (ret != expected) {
+      println("==================")
+      println(s"$whereOpt")
+      println(s"$edge")
+      println("==================")
+    }
+    ret shouldBe expected
+  }
+
+  test("check where clause not nested") {
+    for {
+      (srcId, tgtId, srcIdStr, tgtIdStr, srcVertex, tgtVertex, srcVertexStr, 
tgtVertexStr, schemaVer) <- List(ids(VERSION1), ids(VERSION2))
+    } {
+      /** test for each version */
+      val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" 
-> 10, "time" -> 3, "name" -> "abc")
+      val propsInner = Management.toProps(label, js.fields).map { case (k, v) 
=> k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs
+      val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, 
propsInner)
+      val f = validate(label)(edge) _
+
+      /** labelName label is long-long relation */
+      f(s"_to=${tgtVertex.innerId.toString}")(true)
+
+      // currently this throw exception since label`s _to is long type.
+      f(s"_to=19230495")(false)
+      f(s"_to!=19230495")(true)
+    }
+  }
+
+  test("check where clause nested") {
+    for {
+      (srcId, tgtId, srcIdStr, tgtIdStr, srcVertex, tgtVertex, srcVertexStr, 
tgtVertexStr, schemaVer) <- List(ids(VERSION1), ids(VERSION2))
+    } {
+      /** test for each version */
+      val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" 
-> 10, "time" -> 3, "name" -> "abc")
+      val propsInner = Management.toProps(label, js.fields).map { case (k, v) 
=> k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs
+      val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, 
propsInner)
+
+      val f = validate(label)(edge) _
+
+      // time == 3
+      f("time >= 3")(true)
+      f("time > 2")(true)
+      f("time <= 3")(true)
+      f("time < 2")(false)
+
+      f("(time in (1, 2, 3) and is_blocked = true) or is_hidden = 
false")(false)
+      f("(time in (1, 2, 3) or is_blocked = true) or is_hidden = false")(true)
+      f("(time in (1, 2, 3) and is_blocked = true) or is_hidden = true")(true)
+      f("(time in (1, 2, 3) or is_blocked = true) and is_hidden = true")(true)
+
+      f("((time in (  1, 2, 3) and weight between 1 and 10) or 
is_hidden=false)")(true)
+      f("(time in (1, 2, 4 ) or weight between 1 and 9) or (is_hidden = 
false)")(false)
+      f("(time in ( 1,2,4 ) or weight between 1 and 9) or is_hidden= 
true")(true)
+      f("(time in (1,2,3) or weight between 1 and 10) and is_hidden 
=false")(false)
+    }
+  }
+
+  test("check where clause with from/to long") {
+    for {
+      (srcId, tgtId, srcIdStr, tgtIdStr, srcVertex, tgtVertex, srcVertexStr, 
tgtVertexStr, schemaVer) <- List(ids(VERSION1), ids(VERSION2))
+    } {
+      /** test for each version */
+      val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" 
-> 10, "time" -> 3, "name" -> "abc")
+      val propsInner = Management.toProps(label, js.fields).map { case (k, v) 
=> k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs
+      val labelWithDirection = if (schemaVer == VERSION2) labelWithDirV2 else 
labelWithDir
+      val edge = Edge(srcVertex, tgtVertex, labelWithDirection, 0.toByte, ts, 
propsInner)
+      val lname = if (schemaVer == VERSION2) labelNameV2 else labelName
+      val f = validate(label)(edge) _
+
+      f(s"_from = -1 or _to = ${tgtVertex.innerId.value}")(true)
+      f(s"_from = ${srcVertex.innerId.value} and _to = 
${tgtVertex.innerId.value}")(true)
+      f(s"_from = ${tgtVertex.innerId.value} and _to = 102934")(false)
+      f(s"_from = -1")(false)
+      f(s"_from in (-1, -0.1)")(false)
+    }
+  }
+
+
+  test("check where clause with parent") {
+    for {
+      (srcId, tgtId, srcIdStr, tgtIdStr, srcVertex, tgtVertex, srcVertexStr, 
tgtVertexStr, schemaVer) <- List(ids(VERSION1), ids(VERSION2))
+    } {
+      /** test for each version */
+      val js = Json.obj("is_hidden" -> true, "is_blocked" -> false, "weight" 
-> 10, "time" -> 1, "name" -> "abc")
+      val parentJs = Json.obj("is_hidden" -> false, "is_blocked" -> false, 
"weight" -> 20, "time" -> 3, "name" -> "a")
+
+      val propsInner = Management.toProps(label, js.fields).map { case (k, v) 
=> k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs
+      val parentPropsInner = Management.toProps(label, parentJs.fields).map { 
case (k, v) => k -> InnerValLikeWithTs(v, ts) }.toMap + dummyTs
+
+      val grandParentEdge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, 
ts, parentPropsInner)
+      val parentEdge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, 
parentPropsInner,
+        parentEdges = Seq(EdgeWithScore(grandParentEdge, 1.0)))
+      val edge = Edge(srcVertex, tgtVertex, labelWithDir, 0.toByte, ts, 
propsInner,
+        parentEdges = Seq(EdgeWithScore(parentEdge, 1.0)))
+
+      println(edge.toString)
+      println(parentEdge.toString)
+      println(grandParentEdge.toString)
+
+      val f = validate(label)(edge) _
+
+      // Compare edge's prop(`_from`) with edge's prop(`name`)
+      f("_from = 1")(true)
+      f("_to = 2")(true)
+      f("_from = 123")(false)
+      f("_from = time")(true)
+
+      // Compare edge's prop(`weight`) with edge's prop(`time`)
+      f("weight = time")(false)
+      f("weight = is_blocked")(false)
+
+      // Compare edge's prop(`weight`) with parent edge's prop(`weight`)
+      f("_parent.is_blocked = is_blocked")(true)
+      f("is_hidden = _parent.is_hidden")(false)
+      f("_parent.weight = weight")(false)
+
+      // Compare edge's prop(`is_hidden`) with parent of parent edge's 
prop(`is_hidden`)
+      f("_parent._parent.is_hidden = is_hidden")(false)
+      f("_parent._parent.is_blocked = is_blocked")(true)
+      f("_parent._parent.weight = weight")(false)
+      f("_parent._parent.weight = _parent.weight")(true)
+    }
+  }
+
+  test("replace reserved") {
+    val ts = 0
+    import TemplateHelper._
+
+    calculate(ts, 1, "hour") should be(hour + ts)
+    calculate(ts, 1, "day") should be(day + ts)
+
+    calculate(ts + 10, 1, "HOUR") should be(hour + ts + 10)
+    calculate(ts + 10, 1, "DAY") should be(day + ts + 10)
+
+    val body = """{
+               "day": ${1day},
+          "hour": ${1hour},
+          "-day": "${-10 day}",
+          "-hour": ${-10 hour},
+          "now": "${now}"
+        }
+      """
+
+    val parsed = replaceVariable(ts, body)
+    val json = Json.parse(parsed)
+
+    (json \ "day").as[Long] should be (1 * day + ts)
+    (json \ "hour").as[Long] should be (1 * hour + ts)
+
+    (json \ "-day").as[Long] should be (-10 * day + ts)
+    (json \ "-hour").as[Long] should be (-10 * hour + ts)
+
+    (json \ "now").as[Long] should be (ts)
+
+    val otherBody = """{
+          "nextday": "${next_day}",
+          "3dayago": "${next_day - 3 day}",
+          "nexthour": "${next_hour}"
+        }"""
+
+    val currentTs = System.currentTimeMillis()
+    val expectedDayTs = currentTs / day * day + day
+    val expectedHourTs = currentTs / hour * hour + hour
+    val threeDayAgo = expectedDayTs - 3 * day
+    val currentTsLs = (1 until 1000).map(currentTs + _)
+
+    currentTsLs.foreach { ts =>
+      val parsed = replaceVariable(ts, otherBody)
+      val json = Json.parse(parsed)
+
+      (json \ "nextday").as[Long] should be(expectedDayTs)
+      (json \ "nexthour").as[Long] should be(expectedHourTs)
+      (json \ "3dayago").as[Long] should be(threeDayAgo)
+    }
+  }
+
+  //  test("time decay") {
+  //    val ts = System.currentTimeMillis()
+  //
+  //    for {
+  //      i <- (0 until 10)
+  //    } {
+  //      val timeUnit = 60 * 60
+  //      val diff = i * timeUnit
+  //      val x = TimeDecay(1.0, 0.05, timeUnit)
+  //      println(x.decay(diff))
+  //    }
+  //  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageTest.scala
 
b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageTest.scala
new file mode 100644
index 0000000..7b601b6
--- /dev/null
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageTest.scala
@@ -0,0 +1,33 @@
+package org.apache.s2graph.core.storage.hbase
+
+import org.scalatest.{FunSuite, Matchers}
+
+class AsynchbaseStorageTest extends FunSuite with Matchers {
+
+  /** need secured cluster */
+//  test("test secure cluster connection") {
+//    val config = ConfigFactory.parseMap(
+//      Map(
+//        "hbase.zookeeper.quorum" -> "localhost",
+//        "hbase.security.auth.enable" -> "true",
+//        "hbase.security.authentication" -> "kerberos",
+//        "hbase.kerberos.regionserver.principal" -> "hbase/[email protected]",
+//        "hbase.sasl.clientconfig" -> "Client",
+//        "java.security.krb5.conf" -> "krb5.conf",
+//        "java.security.auth.login.config" -> "async-client.jaas.conf")
+//    )
+//
+//    val client = AsynchbaseStorage.makeClient(config)
+//    val table = "test".getBytes()
+//
+//    val putRequest = new PutRequest(table, "a".getBytes(), "e".getBytes, 
"a".getBytes, "a".getBytes)
+//    val getRequest = new GetRequest(table, "a".getBytes(), "e".getBytes)
+//    val ret = client.put(putRequest).join()
+//    val kvs = client.get(getRequest).join()
+//    for {
+//      kv <- kvs
+//    } {
+//      println(kv.toString)
+//    }
+//  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
 
b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
new file mode 100644
index 0000000..84e7458
--- /dev/null
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
@@ -0,0 +1,81 @@
+package org.apache.s2graph.core.storage.hbase
+
+import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
+import org.apache.s2graph.core.types._
+import org.apache.s2graph.core.{IndexEdge, TestCommonWithModels, Vertex}
+import org.scalatest.{FunSuite, Matchers}
+
+
+class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels {
+  initTests()
+
+  /**
+   * check if storage serializer/deserializer can translate from/to bytes 
array.
+   * @param l: label for edge.
+   * @param ts: timestamp for edge.
+   * @param to: to VertexId for edge.
+   * @param props: expected props of edge.
+   */
+  def check(l: Label, ts: Long, to: InnerValLike, props: Map[Byte, 
InnerValLike]): Unit = {
+    val from = InnerVal.withLong(1, l.schemaVersion)
+    val vertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, from)
+    val tgtVertexId = TargetVertexId(HBaseType.DEFAULT_COL_ID, to)
+    val vertex = Vertex(vertexId, ts)
+    val tgtVertex = Vertex(tgtVertexId, ts)
+    val labelWithDir = LabelWithDirection(l.id.get, 0)
+
+    val indexEdge = IndexEdge(vertex, tgtVertex, labelWithDir, 0, ts, 
LabelIndex.DefaultSeq, props)
+    val _indexEdgeOpt = 
graph.storage.indexEdgeDeserializer(l.schemaVersion).fromKeyValues(queryParam,
+      graph.storage.indexEdgeSerializer(indexEdge).toKeyValues, 
l.schemaVersion, None)
+
+    _indexEdgeOpt should not be empty
+    indexEdge should be(_indexEdgeOpt.get)
+  }
+
+
+  /** note that props have to be properly set up for equals */
+  test("test serializer/deserializer for index edge.") {
+    val ts = System.currentTimeMillis()
+    for {
+      l <- Seq(label, labelV2, labelV3, labelV4)
+    } {
+      val to = InnerVal.withLong(101, l.schemaVersion)
+      val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion)
+      val props = Map(LabelMeta.timeStampSeq -> tsInnerVal,
+        1.toByte -> InnerVal.withDouble(2.1, l.schemaVersion))
+
+      check(l, ts, to, props)
+    }
+  }
+
+  test("test serializer/deserializer for degree edge.") {
+    val ts = System.currentTimeMillis()
+    for {
+      l <- Seq(label, labelV2, labelV3, labelV4)
+    } {
+      val to = InnerVal.withStr("0", l.schemaVersion)
+      val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion)
+      val props = Map(
+        LabelMeta.degreeSeq -> InnerVal.withLong(10, l.schemaVersion),
+        LabelMeta.timeStampSeq -> tsInnerVal)
+
+      check(l, ts, to, props)
+    }
+  }
+
+  test("test serializer/deserializer for incrementCount index edge.") {
+    val ts = System.currentTimeMillis()
+    for {
+      l <- Seq(label, labelV2, labelV3, labelV4)
+    } {
+      val to = InnerVal.withLong(101, l.schemaVersion)
+
+      val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion)
+      val props = Map(LabelMeta.timeStampSeq -> tsInnerVal,
+        1.toByte -> InnerVal.withDouble(2.1, l.schemaVersion),
+        LabelMeta.countSeq -> InnerVal.withLong(10, l.schemaVersion))
+
+      check(l, ts, to, props)
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/types/InnerValTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/types/InnerValTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/types/InnerValTest.scala
new file mode 100644
index 0000000..607f46d
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/types/InnerValTest.scala
@@ -0,0 +1,128 @@
+package org.apache.s2graph.core.types
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.TestCommonWithModels
+import org.scalatest.{FunSuite, Matchers}
+
+class InnerValTest extends FunSuite with Matchers with TestCommonWithModels {
+  initTests()
+
+  import HBaseType.VERSION2
+  val decimals = List(
+    BigDecimal(Long.MinValue),
+    BigDecimal(Int.MinValue),
+    BigDecimal(Double.MinValue),
+    BigDecimal(Float.MinValue),
+    BigDecimal(Short.MinValue),
+    BigDecimal(Byte.MinValue),
+
+    BigDecimal(-1),
+    BigDecimal(0),
+    BigDecimal(1),
+    BigDecimal(Long.MaxValue),
+    BigDecimal(Int.MaxValue),
+    BigDecimal(Double.MaxValue),
+    BigDecimal(Float.MaxValue),
+    BigDecimal(Short.MaxValue),
+    BigDecimal(Byte.MaxValue)
+  )
+  val booleans = List(
+    false, true
+  )
+  val strings = List(
+    "abc", "abd", "ac", "aca"
+  )
+  val texts = List(
+    (0 until 1000).map(x => "a").mkString
+  )
+  val blobs = List(
+    (0 until 1000).map(x => Byte.MaxValue).toArray
+  )
+  def testEncodeDecode(ranges: List[InnerValLike], version: String) = {
+    for {
+      innerVal <- ranges
+    }  {
+      val bytes = innerVal.bytes
+      val (decoded, numOfBytesUsed) = InnerVal.fromBytes(bytes, 0, 
bytes.length, version)
+      innerVal == decoded shouldBe true
+      bytes.length == numOfBytesUsed shouldBe true
+    }
+  }
+  //  test("big decimal") {
+  //    for {
+  //      version <- List(VERSION2, VERSION1)
+  //    } {
+  //      val innerVals = decimals.map { num => InnerVal.withNumber(num, 
version)}
+  //      testEncodeDecode(innerVals, version)
+  //    }
+  //  }
+  //  test("text") {
+  //    for {
+  //      version <- List(VERSION2)
+  //    } {
+  //      val innerVals = texts.map { t => InnerVal.withStr(t, version) }
+  //      testEncodeDecode(innerVals, version)
+  //    }
+  //  }
+  //  test("string") {
+  //    for {
+  //      version <- List(VERSION2, VERSION1)
+  //    } {
+  //      val innerVals = strings.map { t => InnerVal.withStr(t, version) }
+  //      testEncodeDecode(innerVals, version)
+  //    }
+  //  }
+  //  test("blob") {
+  //    for {
+  //      version <- List(VERSION2)
+  //    } {
+  //      val innerVals = blobs.map { t => InnerVal.withBlob(t, version) }
+  //      testEncodeDecode(innerVals, version)
+  //    }
+  //  }
+  //  test("boolean") {
+  //    for {
+  //      version <- List(VERSION2, VERSION1)
+  //    } {
+  //      val innerVals = booleans.map { t => InnerVal.withBoolean(t, version) 
}
+  //      testEncodeDecode(innerVals, version)
+  //    }
+  //  }
+  test("korean") {
+    val small = InnerVal.withStr("가", VERSION2)
+    val large = InnerVal.withStr("나", VERSION2)
+    val smallBytes = small.bytes
+    val largeBytes = large.bytes
+
+    println (Bytes.compareTo(smallBytes, largeBytes))
+    true
+  }
+  //  test("innerVal") {
+  //    val srcVal = InnerVal.withLong(44391298, VERSION2)
+  //    val srcValV1 = InnerVal.withLong(44391298, VERSION1)
+  //    val tgtVal = InnerVal.withLong(7295564, VERSION2)
+  //
+  //    val a = VertexId(0, srcVal)
+  //    val b = SourceVertexId(0, srcVal)
+  //    val c = TargetVertexId(0, srcVal)
+  //    val aa = VertexId(0, srcValV1)
+  //    val bb = SourceVertexId(0, srcValV1)
+  //    val cc = TargetVertexId(0, srcValV1)
+  //    println(a.bytes.toList)
+  //    println(b.bytes.toList)
+  //    println(c.bytes.toList)
+  //
+  //    println(aa.bytes.toList)
+  //    println(bb.bytes.toList)
+  //    println(cc.bytes.toList)
+  //  }
+  //  test("aa") {
+  //    val bytes = InnerVal.withLong(Int.MaxValue, VERSION2).bytes
+  //    val pbr = new SimplePositionedByteRange(bytes)
+  //    pbr.setOffset(1)
+  //    println(pbr.getPosition)
+  //    val num = OrderedBytes.decodeNumericAsBigDecimal(pbr)
+  //    println(pbr.getPosition)
+  //    true
+  //  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/TrxLog.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/TrxLog.scala 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/TrxLog.scala
new file mode 100644
index 0000000..530d775
--- /dev/null
+++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/TrxLog.scala
@@ -0,0 +1,25 @@
+package org.apache.s2graph.counter
+
+import play.api.libs.json.Json
+
+// item1 -> likedCount -> month:2015-10, 1
+// edge
+  // policyId = Label.findByName(likedCount).id.get
+  // item = edge.srcVertexId
+  // results =
+case class TrxLog(success: Boolean, policyId: Int, item: String, results: 
Iterable[TrxLogResult])
+
+// interval = m, ts = 2015-10, "age.gender.20.M", 1, 2
+case class TrxLogResult(interval: String, ts: Long, dimension: String, value: 
Long, result: Long = -1)
+
+object TrxLogResult {
+  implicit val writes = Json.writes[TrxLogResult]
+  implicit val reads = Json.reads[TrxLogResult]
+  implicit val formats = Json.format[TrxLogResult]
+}
+
+object TrxLog {
+  implicit val writes = Json.writes[TrxLog]
+  implicit val reads = Json.reads[TrxLog]
+  implicit val formats = Json.format[TrxLog]
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/config/ConfigFunctions.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/config/ConfigFunctions.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/config/ConfigFunctions.scala
new file mode 100644
index 0000000..219568b
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/config/ConfigFunctions.scala
@@ -0,0 +1,30 @@
+package org.apache.s2graph.counter.config
+
+import com.typesafe.config.Config
+
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
+abstract class ConfigFunctions(conf: Config) {
+   def getOrElse[T: ClassTag](key: String, default: T): T = {
+     val ret = if (conf.hasPath(key)) (default match {
+       case _: String => conf.getString(key)
+       case _: Int | _: Integer => conf.getInt(key)
+       case _: Float | _: Double => conf.getDouble(key)
+       case _: Boolean => conf.getBoolean(key)
+       case _ => default
+     }).asInstanceOf[T]
+     else default
+     println(s"${this.getClass.getName}: $key -> $ret")
+     ret
+   }
+
+   def getConfigMap(path: String): Map[String, String] = {
+     conf.getConfig(path).entrySet().map { entry =>
+       val key = s"$path.${entry.getKey}"
+       val value = conf.getString(key)
+       println(s"${this.getClass.getName}: $key -> $value")
+       key -> value
+     }.toMap
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/config/S2CounterConfig.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/config/S2CounterConfig.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/config/S2CounterConfig.scala
new file mode 100644
index 0000000..c9e1e38
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/config/S2CounterConfig.scala
@@ -0,0 +1,44 @@
+package org.apache.s2graph.counter.config
+
+import com.typesafe.config.Config
+
+import scala.collection.JavaConversions._
+
+class S2CounterConfig(config: Config) extends ConfigFunctions(config) {
+   // HBase
+   lazy val HBASE_ZOOKEEPER_QUORUM = getOrElse("hbase.zookeeper.quorum", "")
+   lazy val HBASE_TABLE_NAME = getOrElse("hbase.table.name", "s2counter")
+   lazy val HBASE_TABLE_POOL_SIZE = getOrElse("hbase.table.pool.size", 100)
+   lazy val HBASE_CONNECTION_POOL_SIZE = 
getOrElse("hbase.connection.pool.size", 10)
+
+   lazy val HBASE_CLIENT_IPC_POOL_SIZE = 
getOrElse("hbase.client.ipc.pool.size", 5)
+   lazy val HBASE_CLIENT_MAX_TOTAL_TASKS = 
getOrElse("hbase.client.max.total.tasks", 100)
+   lazy val HBASE_CLIENT_MAX_PERSERVER_TASKS = 
getOrElse("hbase.client.max.perserver.tasks", 5)
+   lazy val HBASE_CLIENT_MAX_PERREGION_TASKS = 
getOrElse("hbase.client.max.perregion.tasks", 1)
+   lazy val HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = 
getOrElse("hbase.client.scanner.timeout.period", 300)
+   lazy val HBASE_CLIENT_OPERATION_TIMEOUT = 
getOrElse("hbase.client.operation.timeout", 100)
+   lazy val HBASE_CLIENT_RETRIES_NUMBER = 
getOrElse("hbase.client.retries.number", 1)
+
+   // MySQL
+   lazy val DB_DEFAULT_DRIVER = getOrElse("db.default.driver", 
"com.mysql.jdbc.Driver")
+   lazy val DB_DEFAULT_URL = getOrElse("db.default.url", "")
+   lazy val DB_DEFAULT_USER = getOrElse("db.default.user", "graph")
+   lazy val DB_DEFAULT_PASSWORD = getOrElse("db.default.password", "graph")
+
+   // Redis
+   lazy val REDIS_INSTANCES = (for {
+     s <- config.getStringList("redis.instances")
+   } yield {
+     val sp = s.split(':')
+     (sp(0), if (sp.length > 1) sp(1).toInt else 6379)
+   }).toList
+
+   // Graph
+   lazy val GRAPH_URL = getOrElse("s2graph.url", "http://localhost:9000";)
+   lazy val GRAPH_READONLY_URL = getOrElse("s2graph.read-only.url", GRAPH_URL)
+
+   // Cache
+   lazy val CACHE_TTL_SECONDS = getOrElse("cache.ttl.seconds", 600)
+   lazy val CACHE_MAX_SIZE = getOrElse("cache.max.size", 10000)
+   lazy val CACHE_NEGATIVE_TTL_SECONDS = 
getOrElse("cache.negative.ttl.seconds", CACHE_TTL_SECONDS)
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/BytesUtil.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/BytesUtil.scala 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/BytesUtil.scala
new file mode 100644
index 0000000..0f48b0b
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/BytesUtil.scala
@@ -0,0 +1,13 @@
+package org.apache.s2graph.counter.core
+
+
+trait BytesUtil {
+  def getRowKeyPrefix(id: Int): Array[Byte]
+
+  def toBytes(key: ExactKeyTrait): Array[Byte]
+  def toBytes(eq: ExactQualifier): Array[Byte]
+  def toBytes(tq: TimedQualifier): Array[Byte]
+
+  def toExactQualifier(bytes: Array[Byte]): ExactQualifier
+  def toTimedQualifier(bytes: Array[Byte]): TimedQualifier
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactCounter.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactCounter.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactCounter.scala
new file mode 100644
index 0000000..a7e40ae
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactCounter.scala
@@ -0,0 +1,243 @@
+package org.apache.s2graph.counter.core
+
+import com.typesafe.config.Config
+import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit
+import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit.IntervalUnit
+import org.apache.s2graph.counter.{TrxLogResult, TrxLog}
+import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit.IntervalUnit
+import org.apache.s2graph.counter.decay.ExpDecayFormula
+import org.apache.s2graph.counter.models.Counter
+import org.apache.s2graph.counter.util.{FunctionParser, CollectionCacheConfig, 
CollectionCache}
+import org.slf4j.LoggerFactory
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
+
+
+
+case class ExactCounterRow(key: ExactKeyTrait, value: Map[ExactQualifier, 
Long])
+case class FetchedCounts(exactKey: ExactKeyTrait, qualifierWithCountMap: 
Map[ExactQualifier, Long])
+case class DecayedCounts(exactKey: ExactKeyTrait, qualifierWithCountMap: 
Map[ExactQualifier, Double])
+case class FetchedCountsGrouped(exactKey: ExactKeyTrait, intervalWithCountMap: 
Map[(IntervalUnit, Map[String, String]), Map[ExactQualifier, Long]])
+
+class ExactCounter(config: Config, storage: ExactStorage) {
+  import ExactCounter._
+
+  val syncDuration = Duration(10, SECONDS)
+  private val log = LoggerFactory.getLogger(getClass)
+
+  val storageStatusCache = new 
CollectionCache[Option[Boolean]](CollectionCacheConfig(1000, 60, negativeCache 
= false, 60))
+  
+  // dimension: age, value of ages
+  def getCountAsync(policy: Counter,
+                    itemId: String,
+                    intervals: Seq[IntervalUnit],
+                    from: Long,
+                    to: Long,
+                    dimension: Map[String, Set[String]])
+                   (implicit ex: ExecutionContext): 
Future[Option[FetchedCountsGrouped]] = {
+    for {
+      fetchedCounts <- getCountsAsync(policy, Seq(itemId),
+        intervals.map(interval => (TimedQualifier(interval, from), 
TimedQualifier(interval, to))), dimension)
+    } yield {
+      fetchedCounts.headOption
+    }
+  }
+
+  // multi item, time range and multi dimension
+  def getCountsAsync(policy: Counter,
+                     items: Seq[String],
+                     timeRange: Seq[(TimedQualifier, TimedQualifier)],
+                     dimQuery: Map[String, Set[String]])
+                    (implicit ex: ExecutionContext): 
Future[Seq[FetchedCountsGrouped]] = {
+    storage.get(policy, items, timeRange, dimQuery)
+  }
+
+  def getCount(policy: Counter,
+               itemId: String,
+               intervals: Seq[IntervalUnit],
+               from: Long,
+               to: Long,
+               dimension: Map[String, Set[String]])
+              (implicit ex: ExecutionContext): Option[FetchedCountsGrouped] = {
+    getCounts(policy, Seq(itemId),
+      intervals.map(interval => (TimedQualifier(interval, from), 
TimedQualifier(interval, to))), dimension).headOption
+  }
+
+  def getCount(policy: Counter,
+               itemId: String,
+               intervals: Seq[IntervalUnit],
+               from: Long,
+               to: Long)
+              (implicit ex: ExecutionContext): Option[FetchedCounts] = {
+    val future = storage.get(policy,
+      Seq(itemId),
+      intervals.map(interval => (TimedQualifier(interval, from), 
TimedQualifier(interval, to))))
+    Await.result(future, syncDuration).headOption
+  }
+
+  // multi item, time range and multi dimension
+  def getCounts(policy: Counter,
+                items: Seq[String],
+                timeRange: Seq[(TimedQualifier, TimedQualifier)],
+                dimQuery: Map[String, Set[String]])
+               (implicit ex: ExecutionContext): Seq[FetchedCountsGrouped] = {
+    Await.result(storage.get(policy, items, timeRange, dimQuery), syncDuration)
+  }
+
+  def getRelatedCounts(policy: Counter, keyWithQualifiers: Seq[(String, 
Seq[ExactQualifier])])
+                      (implicit ex: ExecutionContext): Map[String, 
Map[ExactQualifier, Long]] = {
+    val queryKeyWithQualifiers = {
+      for {
+        (itemKey, qualifiers) <- keyWithQualifiers
+      } yield {
+        val relKey = ExactKey(policy.id, policy.version, policy.itemType, 
itemKey)
+        (relKey, qualifiers)
+      }
+    }
+    val future = storage.get(policy, queryKeyWithQualifiers)
+
+    for {
+      FetchedCounts(exactKey, exactQualifierToLong) <- Await.result(future, 
syncDuration)
+    } yield {
+      exactKey.itemKey -> exactQualifierToLong
+    }
+  }.toMap
+
+  def getPastCounts(policy: Counter, keyWithQualifiers: Seq[(String, 
Seq[ExactQualifier])])
+                   (implicit ex: ExecutionContext): Map[String, 
Map[ExactQualifier, Long]] = {
+    // query paste count
+    val queryKeyWithQualifiers = {
+      for {
+        (itemKey, qualifiers) <- keyWithQualifiers
+      } yield {
+        val relKey = ExactKey(policy.id, policy.version, policy.itemType, 
itemKey)
+        (relKey, qualifiers.map(eq => eq.copy(tq = eq.tq.add(-1))))
+      }
+    }
+    val future = storage.get(policy, queryKeyWithQualifiers)
+
+    for {
+      FetchedCounts(exactKey, exactQualifierToLong) <- Await.result(future, 
syncDuration)
+    } yield {
+      // restore tq
+      exactKey.itemKey -> exactQualifierToLong.map { case (eq, v) =>
+        eq.copy(tq = eq.tq.add(1)) -> v
+      }
+    }
+  }.toMap
+
+  def getDecayedCountsAsync(policy: Counter,
+                            items: Seq[String],
+                            timeRange: Seq[(TimedQualifier, TimedQualifier)],
+                            dimQuery: Map[String, Set[String]],
+                            qsSum: Option[String])(implicit ex: 
ExecutionContext): Future[Seq[DecayedCounts]] = {
+    val groupedTimeRange = timeRange.groupBy(_._1.q)
+    getCountsAsync(policy, items, timeRange, dimQuery).map { seq =>
+      for {
+        FetchedCountsGrouped(k, intervalWithCountMap) <- seq
+      } yield {
+        DecayedCounts(k, {
+          for {
+            ((interval, dimKeyValues), grouped) <- intervalWithCountMap
+          } yield {
+            val (tqFrom, tqTo) = groupedTimeRange(interval).head
+            val formula = {
+              for {
+                strSum <- qsSum
+                (func, arg) <- FunctionParser(strSum)
+              } yield {
+                // apply function
+                func.toLowerCase match {
+                  case "exp_decay" => 
ExpDecayFormula.byMeanLifeTime(arg.toLong * TimedQualifier.getTsUnit(interval))
+                  case _ => throw new UnsupportedOperationException(s"unknown 
function: $strSum")
+                }
+              }
+            }
+            ExactQualifier(tqFrom, dimKeyValues) -> {
+              grouped.map { case (eq, count) =>
+                formula match {
+                  case Some(decay) =>
+                    decay(count, tqTo.ts - eq.tq.ts)
+                  case None =>
+                    count
+                }
+              }.sum
+            }
+          }
+        })
+      }
+    }
+  }
+
+  def updateCount(policy: Counter, counts: Seq[(ExactKeyTrait, 
ExactValueMap)]): Seq[TrxLog] = {
+    ready(policy) match {
+      case true =>
+        val updateResults = storage.update(policy, counts)
+        for {
+          (exactKey, values) <- counts
+          results = updateResults.getOrElse(exactKey, Nil.toMap)
+        } yield {
+          TrxLog(results.nonEmpty, exactKey.policyId, exactKey.itemKey, 
makeTrxLogResult(values, results))
+        }
+      case false =>
+        Nil
+    }
+  }
+
+  def deleteCount(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = {
+    storage.delete(policy, keys)
+  }
+
+  private def makeTrxLogResult(values: ExactValueMap, results: ExactValueMap): 
Seq[TrxLogResult] = {
+    for {
+      (eq, value) <- values
+    } yield {
+      val result = results.getOrElse(eq, -1l)
+      TrxLogResult(eq.tq.q.toString, eq.tq.ts, eq.dimension, value, result)
+    }
+  }.toSeq
+
+  def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): Seq[Boolean] 
= {
+    storage.insertBlobValue(policy, keys)
+  }
+
+  def getBlobValue(policy: Counter, blobId: String): Option[String] = {
+    storage.getBlobValue(policy, blobId)
+  }
+
+  def prepare(policy: Counter) = {
+    storage.prepare(policy)
+  }
+
+  def destroy(policy: Counter) = {
+    storage.destroy(policy)
+  }
+
+  def ready(policy: Counter): Boolean = {
+    storageStatusCache.withCache(s"${policy.id}") {
+      val ready = storage.ready(policy)
+      if (!ready) {
+        // if key is not in cache, log message
+        log.warn(s"${policy.service}.${policy.action} storage is not ready.")
+      }
+      Some(ready)
+    }.getOrElse(false)
+  }
+}
+
+object ExactCounter {
+  object ColumnFamily extends Enumeration {
+    type ColumnFamily = Value
+
+    val SHORT = Value("s")
+    val LONG = Value("l")
+  }
+  import IntervalUnit._
+  val intervalsMap = Map(MINUTELY -> ColumnFamily.SHORT, HOURLY -> 
ColumnFamily.SHORT,
+    DAILY -> ColumnFamily.LONG, MONTHLY -> ColumnFamily.LONG, TOTAL -> 
ColumnFamily.LONG)
+
+  val blobCF = ColumnFamily.LONG.toString.getBytes
+  val blobColumn = "b".getBytes
+
+  type ExactValueMap = Map[ExactQualifier, Long]
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactKey.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactKey.scala 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactKey.scala
new file mode 100644
index 0000000..b472649
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactKey.scala
@@ -0,0 +1,26 @@
+package org.apache.s2graph.counter.core
+
+import org.apache.s2graph.counter.models.Counter
+import org.apache.s2graph.counter.models.Counter.ItemType
+import org.apache.s2graph.counter.models.Counter.ItemType.ItemType
+import org.apache.s2graph.counter.util.Hashes
+
+trait ExactKeyTrait {
+  def policyId: Int
+  def version: Byte
+  def itemType: ItemType
+  def itemKey: String
+}
+
+case class ExactKey(policyId: Int, version: Byte, itemType: ItemType, itemKey: 
String) extends ExactKeyTrait
+case class BlobExactKey(policyId: Int, version: Byte, itemType: ItemType, 
itemKey: String, itemId: String) extends ExactKeyTrait
+
+object ExactKey {
+  def apply(policy: Counter, itemId: String, checkItemType: Boolean): 
ExactKeyTrait = {
+    if (checkItemType && policy.itemType == ItemType.BLOB) {
+      BlobExactKey(policy.id, policy.version, ItemType.BLOB, 
Hashes.sha1(itemId), itemId)
+    } else {
+      ExactKey(policy.id, policy.version, policy.itemType, itemId)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactQualifier.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactQualifier.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactQualifier.scala
new file mode 100644
index 0000000..96104bf
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactQualifier.scala
@@ -0,0 +1,73 @@
+package org.apache.s2graph.counter.core
+
+import java.util
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit.IntervalUnit
+import scala.collection.JavaConversions._
+
+case class ExactQualifier(tq: TimedQualifier, dimKeyValues: Map[String, 
String], dimension: String) {
+  def checkDimensionEquality(dimQuery: Map[String, Set[String]]): Boolean = {
+//    println(s"self: $dimKeyValues, query: $dimQuery")
+    dimQuery.size == dimKeyValues.size && {
+      for {
+        (k, v) <- dimKeyValues
+      } yield {
+        dimQuery.get(k).exists(qv => qv.isEmpty || qv.contains(v))
+      }
+    }.forall(x => x)
+  }
+}
+
+object ExactQualifier {
+  val cache: LoadingCache[String, Map[String, String]] = 
CacheBuilder.newBuilder()
+    .maximumSize(10000)
+    .build(
+      new CacheLoader[String, Map[String, String]]() {
+        def load(s: String): Map[String, String] = {
+          strToDimensionMap(s)
+        }
+      }
+    )
+
+  def apply(tq: TimedQualifier, dimension: String): ExactQualifier = {
+    ExactQualifier(tq, cache.get(dimension), dimension)
+  }
+
+  def apply(tq: TimedQualifier, dimKeyValues: Map[String, String]): 
ExactQualifier = {
+    ExactQualifier(tq, dimKeyValues, makeDimensionStr(dimKeyValues))
+  }
+
+  def makeSortedDimension(dimKeyValues: Map[String, String]): Iterator[String] 
= {
+    val sortedDimKeyValues = new util.TreeMap[String, String](dimKeyValues)
+    sortedDimKeyValues.keysIterator ++ sortedDimKeyValues.valuesIterator
+  }
+
+  def makeDimensionStr(dimKeyValues: Map[String, String]): String = {
+    makeSortedDimension(dimKeyValues).mkString(".")
+  }
+
+  def getQualifiers(intervals: Seq[IntervalUnit], ts: Long, dimKeyValues: 
Map[String, String]): Seq[ExactQualifier] = {
+    for {
+      tq <- TimedQualifier.getQualifiers(intervals, ts)
+    } yield {
+      ExactQualifier(tq, dimKeyValues, makeDimensionStr(dimKeyValues))
+    }
+  }
+
+  def strToDimensionMap(dimension: String): Map[String, String] = {
+    val dimSp = {
+      val sp = dimension.split('.')
+      if (dimension == ".") {
+        Array("", "")
+      }
+      else if (dimension.nonEmpty && dimension.last == '.') {
+        sp ++ Array("")
+      } else {
+        sp
+      }
+    }
+    val dimKey = dimSp.take(dimSp.length / 2)
+    val dimVal = dimSp.takeRight(dimSp.length / 2)
+    dimKey.zip(dimVal).toMap
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactStorage.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactStorage.scala
new file mode 100644
index 0000000..c34e631
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/ExactStorage.scala
@@ -0,0 +1,32 @@
+package org.apache.s2graph.counter.core
+
+import org.apache.s2graph.counter.core.ExactCounter.ExactValueMap
+import org.apache.s2graph.counter.models.Counter
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait ExactStorage {
+  // for range query and check dimension
+  def get(policy: Counter,
+          items: Seq[String],
+          timeRange: Seq[(TimedQualifier, TimedQualifier)],
+          dimQuery: Map[String, Set[String]])
+         (implicit ec: ExecutionContext): Future[Seq[FetchedCountsGrouped]]
+  // for range query
+  def get(policy: Counter,
+          items: Seq[String],
+          timeRange: Seq[(TimedQualifier, TimedQualifier)])
+         (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]]
+  // for query exact qualifier
+  def get(policy: Counter,
+          queries: Seq[(ExactKeyTrait, Seq[ExactQualifier])])
+         (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]]
+  def update(policy: Counter, counts: Seq[(ExactKeyTrait, ExactValueMap)]): 
Map[ExactKeyTrait, ExactValueMap]
+  def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit
+  def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): Seq[Boolean]
+  def getBlobValue(policy: Counter, blobId: String): Option[String]
+
+  def prepare(policy: Counter): Unit
+  def destroy(policy: Counter): Unit
+  def ready(policy: Counter): Boolean
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingCounter.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingCounter.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingCounter.scala
new file mode 100644
index 0000000..e3ea1b8
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingCounter.scala
@@ -0,0 +1,101 @@
+package org.apache.s2graph.counter.core
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import com.typesafe.config.Config
+import org.apache.s2graph.counter.core.RankingCounter.RankingValueMap
+import org.apache.s2graph.counter.models.Counter
+import org.apache.s2graph.counter.util.{CollectionCacheConfig, CollectionCache}
+import org.slf4j.LoggerFactory
+import scala.collection.JavaConversions._
+
+case class RankingRow(key: RankingKey, value: Map[String, RankingValue])
+case class RateRankingRow(key: RankingKey, value: Map[String, 
RateRankingValue])
+
+class RankingCounter(config: Config, storage: RankingStorage) {
+  private val log = LoggerFactory.getLogger(getClass)
+
+  val storageStatusCache = new 
CollectionCache[Option[Boolean]](CollectionCacheConfig(1000, 60, negativeCache 
= false, 60))
+
+  val cache: LoadingCache[RankingKey, RankingResult] = 
CacheBuilder.newBuilder()
+    .maximumSize(1000000)
+    .expireAfterWrite(10l, TimeUnit.MINUTES)
+    .build(
+      new CacheLoader[RankingKey, RankingResult]() {
+        def load(rankingKey: RankingKey): RankingResult = {
+//          log.warn(s"cache load: $rankingKey")
+          storage.getTopK(rankingKey, 
Int.MaxValue).getOrElse(RankingResult(-1, Nil))
+        }
+      }
+    )
+
+  def getTopK(rankingKey: RankingKey, k: Int = Int.MaxValue): 
Option[RankingResult] = {
+    val tq = rankingKey.eq.tq
+    if (TimedQualifier.getQualifiers(Seq(tq.q), 
System.currentTimeMillis()).head == tq) {
+      // do not use cache
+      storage.getTopK(rankingKey, k)
+    }
+    else {
+      val result = cache.get(rankingKey)
+      if (result.values.nonEmpty) {
+        Some(result.copy(values = result.values.take(k)))
+      }
+      else {
+        None
+      }
+    }
+  }
+
+  def update(key: RankingKey, value: RankingValueMap, k: Int): Unit = {
+    storage.update(key, value, k)
+  }
+
+  def update(values: Seq[(RankingKey, RankingValueMap)], k: Int): Unit = {
+    storage.update(values, k)
+  }
+
+  def delete(key: RankingKey): Unit = {
+    storage.delete(key)
+  }
+
+  def getAllItems(keys: Seq[RankingKey], k: Int = Int.MaxValue): Seq[String] = 
{
+    val oldKeys = keys.filter(key => 
TimedQualifier.getQualifiers(Seq(key.eq.tq.q), System.currentTimeMillis()).head 
!= key.eq.tq)
+    val cached = cache.getAllPresent(oldKeys)
+    val missed = keys.diff(cached.keys.toSeq)
+    val found = storage.getTopK(missed, k)
+
+//    log.warn(s"cached: ${cached.size()}, missed: ${missed.size}")
+
+    for {
+      (key, result) <- found
+    } {
+      cache.put(key, result)
+    }
+
+    for {
+      (key, RankingResult(totalScore, values)) <- cached ++ found
+      (item, score) <- values
+    } yield {
+      item
+    }
+  }.toSeq.distinct
+
+  def prepare(policy: Counter): Unit = {
+    storage.prepare(policy)
+  }
+
+  def destroy(policy: Counter): Unit = {
+    storage.destroy(policy)
+  }
+
+  def ready(policy: Counter): Boolean = {
+    storageStatusCache.withCache(s"${policy.id}") {
+      Some(storage.ready(policy))
+    }.getOrElse(false)
+  }
+}
+
+object RankingCounter {
+  type RankingValueMap = Map[String, RankingValue]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingKey.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingKey.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingKey.scala
new file mode 100644
index 0000000..9e3dc9a
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingKey.scala
@@ -0,0 +1,4 @@
+package org.apache.s2graph.counter.core
+
+
+case class RankingKey(policyId: Int, version: Byte, eq: ExactQualifier)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingResult.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingResult.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingResult.scala
new file mode 100644
index 0000000..e738614
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingResult.scala
@@ -0,0 +1,3 @@
+package org.apache.s2graph.counter.core
+
+case class RankingResult(totalScore: Double, values: Seq[(String, Double)])

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingStorage.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingStorage.scala
new file mode 100644
index 0000000..34af94f
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingStorage.scala
@@ -0,0 +1,17 @@
+package org.apache.s2graph.counter.core
+
+import org.apache.s2graph.counter.core.RankingCounter.RankingValueMap
+import org.apache.s2graph.counter.models.Counter
+
+
+trait RankingStorage {
+  def getTopK(key: RankingKey, k: Int): Option[RankingResult]
+  def getTopK(keys: Seq[RankingKey], k: Int): Seq[(RankingKey, RankingResult)]
+  def update(key: RankingKey, value: RankingValueMap, k: Int): Unit
+  def update(values: Seq[(RankingKey, RankingValueMap)], k: Int): Unit
+  def delete(key: RankingKey)
+
+  def prepare(policy: Counter): Unit
+  def destroy(policy: Counter): Unit
+  def ready(policy: Counter): Boolean
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingValue.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingValue.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingValue.scala
new file mode 100644
index 0000000..7691f35
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RankingValue.scala
@@ -0,0 +1,15 @@
+package org.apache.s2graph.counter.core
+
+/**
+ * ranking score and increment value
+ * @param score ranking score
+ * @param increment increment value for v1
+ */
+case class RankingValue(score: Double, increment: Double)
+
+object RankingValue {
+  def reduce(r1: RankingValue, r2: RankingValue): RankingValue = {
+    // maximum score and sum of increment
+    RankingValue(math.max(r1.score, r2.score), r1.increment + r2.increment)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RateRankingValue.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RateRankingValue.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RateRankingValue.scala
new file mode 100644
index 0000000..370926f
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/RateRankingValue.scala
@@ -0,0 +1,15 @@
+package org.apache.s2graph.counter.core
+
+case class RateRankingValue(actionScore: Double, baseScore: Double) {
+  // increment score do not use.
+  lazy val rankingValue: RankingValue = {
+    RankingValue(actionScore / math.max(1d, baseScore), 0)
+  }
+}
+
+object RateRankingValue {
+  def reduce(r1: RateRankingValue, r2: RateRankingValue): RateRankingValue = {
+    // maximum score and sum of increment
+    RateRankingValue(math.max(r1.actionScore, r2.actionScore), 
math.max(r1.baseScore, r2.baseScore))
+  }
+}

Reply via email to