Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 36d5485bd -> e207f676f


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/utils/FutureCache.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/utils/FutureCache.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/utils/FutureCache.scala
new file mode 100644
index 0000000..17d9e8f
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/utils/FutureCache.scala
@@ -0,0 +1,82 @@
+package com.kakao.s2graph.core.utils
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.cache.CacheBuilder
+import com.typesafe.config.Config
+
+import scala.concurrent.{Promise, Future, ExecutionContext}
+
+
+class FutureCache[R](config: Config)(implicit ex: ExecutionContext) {
+
+  type Value = (Long, Future[R])
+
+  private val maxSize = config.getInt("future.cache.max.size")
+  private val expireAfterWrite = 
config.getInt("future.cache.expire.after.write")
+  private val expireAfterAccess = 
config.getInt("future.cache.expire.after.access")
+
+  private val futureCache = CacheBuilder.newBuilder()
+  .initialCapacity(maxSize)
+  .concurrencyLevel(Runtime.getRuntime.availableProcessors())
+  .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS)
+  .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS)
+  .maximumSize(maxSize).build[java.lang.Long, (Long, Promise[R])]()
+
+
+  def asMap() = futureCache.asMap()
+
+  def getIfPresent(cacheKey: Long): Value = {
+    val (cachedAt, promise) = futureCache.getIfPresent(cacheKey)
+    (cachedAt, promise.future)
+  }
+
+  private def checkAndExpire(cacheKey: Long,
+                             cachedAt: Long,
+                             cacheTTL: Long,
+                             oldFuture: Future[R])(op: => Future[R]): 
Future[R] = {
+    if (System.currentTimeMillis() >= cachedAt + cacheTTL) {
+      // future is too old. so need to expire and fetch new data from storage.
+      futureCache.asMap().remove(cacheKey)
+
+      val newPromise = Promise[R]
+      val now = System.currentTimeMillis()
+
+      futureCache.asMap().putIfAbsent(cacheKey, (now, newPromise)) match {
+        case null =>
+          // only one thread succeed to come here concurrently
+          // initiate fetch to storage then add callback on complete to finish 
promise.
+          op.onSuccess { case value =>
+            newPromise.success(value)
+            value
+          }
+          newPromise.future
+        case (cachedAt, oldPromise) => oldPromise.future
+      }
+    } else {
+      // future is not to old so reuse it.
+      oldFuture
+    }
+  }
+  def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => Future[R]): 
Future[R] = {
+    val cacheVal = futureCache.getIfPresent(cacheKey)
+    cacheVal match {
+      case null =>
+        val promise = Promise[R]
+        val now = System.currentTimeMillis()
+        val (cachedAt, cachedPromise) = 
futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match {
+          case null =>
+            op.onSuccess { case value =>
+              promise.success(value)
+              value
+            }
+            (now, promise)
+          case oldVal => oldVal
+        }
+        checkAndExpire(cacheKey, cacheTTL, cachedAt, cachedPromise.future)(op)
+
+      case (cachedAt, cachedPromise) =>
+        checkAndExpire(cacheKey, cacheTTL, cachedAt, cachedPromise.future)(op)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala 
b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala
index 438b97d..1c09778 100644
--- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala
+++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala
@@ -173,7 +173,7 @@ class CrudTest extends IntegrateCommon {
             TestUtil.toEdge(ts, op, "e", srcId, tgtId, labelName, props)
           })
 
-          TestUtil.mutateEdgesSync(bulkEdges: _*)
+          TestUtil.insertEdgesSync(bulkEdges: _*)
 
           for {
             label <- Label.findByName(labelName)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala 
b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
index 751a7a7..ae9e514 100644
--- 
a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
+++ 
b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
@@ -2,7 +2,7 @@ package com.kakao.s2graph.core.Integrate
 
 import com.kakao.s2graph.core._
 import com.kakao.s2graph.core.mysqls.Label
-import com.kakao.s2graph.core.rest.RequestParser
+import com.kakao.s2graph.core.rest.{RequestParser, RestHandler}
 import com.kakao.s2graph.core.utils.logger
 import com.typesafe.config._
 import org.scalatest._
@@ -33,8 +33,8 @@ trait IntegrateCommon extends FunSuite with Matchers with 
BeforeAndAfterAll {
   }
 
   /**
-    * Make Service, Label, Vertex for integrate test
-    */
+   * Make Service, Label, Vertex for integrate test
+   */
   def initTestData() = {
     println("[init start]: 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
     Management.deleteService(testServiceName)
@@ -82,8 +82,8 @@ trait IntegrateCommon extends FunSuite with Matchers with 
BeforeAndAfterAll {
 
 
   /**
-    * Test Helpers
-    */
+   * Test Helpers
+   */
   object TestUtil {
     implicit def ec = scala.concurrent.ExecutionContext.global
 
@@ -111,22 +111,18 @@ trait IntegrateCommon extends FunSuite with Matchers with 
BeforeAndAfterAll {
 
     def getEdgesSync(queryJson: JsValue): JsValue = {
       logger.info(Json.prettyPrint(queryJson))
-
-      val ret = graph.getEdges(parser.toQuery(queryJson))
-      val result = Await.result(ret, HttpRequestWaitingTime)
-      val jsResult = PostProcess.toSimpleVertexArrJson(result)
-
-      jsResult
+      val restHandler = new RestHandler(graph)
+      
Await.result(restHandler.getEdgesAsync(queryJson)(PostProcess.toSimpleVertexArrJson),
 HttpRequestWaitingTime)
     }
 
-    def mutateEdgesSync(bulkEdges: String*) = {
+    def insertEdgesSync(bulkEdges: String*) = {
       val req = 
graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait 
= true)
       val jsResult = Await.result(req, HttpRequestWaitingTime)
 
       jsResult
     }
 
-    def mutateEdgesAsync(bulkEdges: String*) = {
+    def insertEdgesAsync(bulkEdges: String*) = {
       val req = 
graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait 
= true)
       req
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala 
b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala
index 0d2d82e..0b26608 100644
--- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala
+++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala
@@ -1,14 +1,17 @@
 package com.kakao.s2graph.core.Integrate
 
+import com.kakao.s2graph.core.GraphExceptions.BadQueryException
+import com.kakao.s2graph.core.utils.logger
 import org.scalatest.BeforeAndAfterEach
-import play.api.libs.json._
+import play.api.libs.json.{JsNull, JsNumber, JsValue, Json}
+
+import scala.util.{Success, Try}
 
 class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
 
   import TestUtil._
 
   val insert = "insert"
-  val delete = "delete"
   val e = "e"
   val weight = "weight"
   val is_hidden = "is_hidden"
@@ -132,7 +135,7 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
 
     val result = getEdgesSync(queryGroupBy(0, Seq("weight")))
     (result \ "size").as[Int] should be(2)
-    val weights = (result \\ "groupBy").map { js =>
+    val weights = (result \ "results" \\ "groupBy").map { js =>
       (js \ "weight").as[Int]
     }
     weights should contain(30)
@@ -163,11 +166,10 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
     (result \ "results").as[List[JsValue]].size should be(2)
 
     result = getEdgesSync(queryTransform(0, "[[\"weight\"]]"))
-    (result \\ "to").map(_.toString).sorted should be((result \\ 
"weight").map(_.toString).sorted)
+    (result \ "results" \\ "to").map(_.toString).sorted should be((result \ 
"results" \\ "weight").map(_.toString).sorted)
 
     result = getEdgesSync(queryTransform(0, "[[\"_from\"]]"))
-    val results = (result \ "results").as[JsValue]
-    (result \\ "to").map(_.toString).sorted should be((results \\ 
"from").map(_.toString).sorted)
+    (result \ "results" \\ "to").map(_.toString).sorted should be((result \ 
"results" \\ "from").map(_.toString).sorted)
   }
 
   test("index") {
@@ -217,6 +219,57 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
   //      }
   //    }
 
+
+
+  test("duration") {
+    def queryDuration(ids: Seq[Int], from: Int, to: Int) = {
+      val $from = Json.arr(
+        Json.obj("serviceName" -> testServiceName,
+          "columnName" -> testColumnName,
+          "ids" -> ids))
+
+      val $step = Json.arr(Json.obj(
+        "label" -> testLabelName, "direction" -> "out", "offset" -> 0, "limit" 
-> 100,
+        "duration" -> Json.obj("from" -> from, "to" -> to)))
+
+      val $steps = Json.arr(Json.obj("step" -> $step))
+
+      Json.obj("srcVertices" -> $from, "steps" -> $steps)
+    }
+
+    // get all
+    var result = getEdgesSync(queryDuration(Seq(0, 2), from = 0, to = 5000))
+    (result \ "results").as[List[JsValue]].size should be(4)
+    // inclusive, exclusive
+    result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 4000))
+    (result \ "results").as[List[JsValue]].size should be(3)
+
+    result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 2000))
+    (result \ "results").as[List[JsValue]].size should be(1)
+
+    val bulkEdges = Seq(
+      toEdge(1001, insert, e, 0, 1, testLabelName, Json.obj(weight -> 10, 
is_hidden -> true)),
+      toEdge(2002, insert, e, 0, 2, testLabelName, Json.obj(weight -> 20, 
is_hidden -> false)),
+      toEdge(3003, insert, e, 2, 0, testLabelName, Json.obj(weight -> 30)),
+      toEdge(4004, insert, e, 2, 1, testLabelName, Json.obj(weight -> 40))
+    )
+    insertEdgesSync(bulkEdges: _*)
+
+    // duration test after udpate
+    // get all
+    result = getEdgesSync(queryDuration(Seq(0, 2), from = 0, to = 5000))
+    (result \ "results").as[List[JsValue]].size should be(4)
+
+    // inclusive, exclusive
+    result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 4000))
+    (result \ "results").as[List[JsValue]].size should be(3)
+
+    result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 2000))
+    (result \ "results").as[List[JsValue]].size should be(1)
+
+  }
+
+
   test("return tree") {
     def queryParents(id: Long) = Json.parse(
       s"""
@@ -246,7 +299,7 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
     val src = 100
     val tgt = 200
 
-    mutateEdgesSync(toEdge(1001, "insert", "e", src, tgt, testLabelName))
+    insertEdgesSync(toEdge(1001, "insert", "e", src, tgt, testLabelName))
 
     val result = TestUtil.getEdgesSync(queryParents(src))
     val parents = (result \ "results").as[Seq[JsValue]]
@@ -259,54 +312,55 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
 
 
 
-  test("pagination and _to") {
-    def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: Int) 
= Json.parse(
-      s"""
-        { "srcVertices": [
-          { "serviceName": "${testServiceName}",
-            "columnName": "${testColumnName}",
-            "id": ${id}
-           }],
-          "steps": [
-          [ {
-              "label": "${testLabelName}",
-              "direction": "out",
-              "offset": $offset,
-              "limit": $limit,
-              "_to": $to
-            }
-          ]]
-        }
-        """)
-
-    val src = System.currentTimeMillis().toInt
-
-    val bulkEdges = Seq(
-      toEdge(1001, insert, e, src, 1, testLabelName, Json.obj(weight -> 10, 
is_hidden -> true)),
-      toEdge(2002, insert, e, src, 2, testLabelName, Json.obj(weight -> 20, 
is_hidden -> false)),
-      toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)),
-      toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40))
-    )
-    mutateEdgesSync(bulkEdges: _*)
-
-    var result = getEdgesSync(querySingle(src, offset = 0, limit = 2))
-    var edges = (result \ "results").as[List[JsValue]]
-
-    edges.size should be(2)
-    (edges(0) \ "to").as[Long] should be(4)
-    (edges(1) \ "to").as[Long] should be(3)
-
-    result = getEdgesSync(querySingle(src, offset = 1, limit = 2))
-
-    edges = (result \ "results").as[List[JsValue]]
-    edges.size should be(2)
-    (edges(0) \ "to").as[Long] should be(3)
-    (edges(1) \ "to").as[Long] should be(2)
+//  test("pagination and _to") {
+//    def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: 
Int) = Json.parse(
+//      s"""
+//        { "srcVertices": [
+//          { "serviceName": "${testServiceName}",
+//            "columnName": "${testColumnName}",
+//            "id": ${id}
+//           }],
+//          "steps": [
+//          [ {
+//              "label": "${testLabelName}",
+//              "direction": "out",
+//              "offset": $offset,
+//              "limit": $limit,
+//              "_to": $to
+//            }
+//          ]]
+//        }
+//        """)
+//
+//    val src = System.currentTimeMillis().toInt
+//
+//    val bulkEdges = Seq(
+//      toEdge(1001, insert, e, src, 1, testLabelName, Json.obj(weight -> 10, 
is_hidden -> true)),
+//      toEdge(2002, insert, e, src, 2, testLabelName, Json.obj(weight -> 20, 
is_hidden -> false)),
+//      toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)),
+//      toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40))
+//    )
+//    insertEdgesSync(bulkEdges: _*)
+//
+//    var result = getEdgesSync(querySingle(src, offset = 0, limit = 2))
+//    var edges = (result \ "results").as[List[JsValue]]
+//
+//    edges.size should be(2)
+//    (edges(0) \ "to").as[Long] should be(4)
+//    (edges(1) \ "to").as[Long] should be(3)
+//
+//    result = getEdgesSync(querySingle(src, offset = 1, limit = 2))
+//
+//    edges = (result \ "results").as[List[JsValue]]
+//    edges.size should be(2)
+//    (edges(0) \ "to").as[Long] should be(3)
+//    (edges(1) \ "to").as[Long] should be(2)
+//
+//    result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to 
= 1))
+//    edges = (result \ "results").as[List[JsValue]]
+//    edges.size should be(1)
+//  }
 
-    result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to = 
1))
-    edges = (result \ "results").as[List[JsValue]]
-    edges.size should be(1)
-  }
   test("order by") {
     def queryScore(id: Int, scoring: Map[String, Int]): JsValue = Json.obj(
       "srcVertices" -> Json.arr(
@@ -351,7 +405,7 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
       toEdge(4004, insert, e, 2, 1, testLabelName, Json.obj(weight -> 40))
     )
 
-    mutateEdgesSync(bulkEdges: _*)
+    insertEdgesSync(bulkEdges: _*)
 
     // get edges
     val edges = getEdgesSync(queryScore(0, Map("weight" -> 1)))
@@ -368,39 +422,6 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
     edgesTo.reverse should be(ascOrderByTo)
   }
 
-  test("query with '_to' option after delete") {
-    val from = 90210
-    val to = 90211
-    val inserts = Seq(toEdge(1, insert, e, from, to, testLabelName))
-    mutateEdgesSync(inserts: _*)
-
-    val deletes = Seq(toEdge(2, delete, e, from, to, testLabelName))
-    mutateEdgesSync(deletes: _*)
-
-    def queryWithTo = Json.parse(
-      s"""
-        { "srcVertices": [
-          { "serviceName": "$testServiceName",
-            "columnName": "$testColumnName",
-            "id": $from
-           }],
-          "steps": [
-            {
-              "step": [{
-                "label": "$testLabelName",
-                "direction": "out",
-                "offset": 0,
-                "limit": 10,
-                "_to": $to
-                }]
-            }
-          ]
-        }
-      """)
-    val result = getEdgesSync(queryWithTo)
-    (result \ "results").as[List[JsValue]].size should be(0)
-
-  }
 
   test("query with sampling") {
     def queryWithSampling(id: Int, sample: Int) = Json.parse(
@@ -502,32 +523,324 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
       toEdge(ts, insert, e, 322, 3322, testLabelName)
     )
 
-    mutateEdgesSync(bulkEdges: _*)
+    insertEdgesSync(bulkEdges: _*)
 
-    var result = getEdgesSync(queryWithSampling(testId, sampleSize))
-    println(Json.toJson(result))
-    (result \ "results").as[List[JsValue]].size should 
be(scala.math.min(sampleSize, bulkEdges.size))
+    val result1 = getEdgesSync(queryWithSampling(testId, sampleSize))
+    (result1 \ "results").as[List[JsValue]].size should 
be(math.min(sampleSize, bulkEdges.size))
 
-    result = getEdgesSync(twoStepQueryWithSampling(testId, sampleSize))
-    println(Json.toJson(result))
-    (result \ "results").as[List[JsValue]].size should 
be(scala.math.min(sampleSize * sampleSize, bulkEdges.size * bulkEdges.size))
+    val result2 = getEdgesSync(twoStepQueryWithSampling(testId, sampleSize))
+    (result2 \ "results").as[List[JsValue]].size should be(math.min(sampleSize 
* sampleSize, bulkEdges.size * bulkEdges.size))
 
-    result = getEdgesSync(twoQueryWithSampling(testId, sampleSize))
-    println(Json.toJson(result))
-    (result \ "results").as[List[JsValue]].size should be(sampleSize + 3) // 
edges in testLabelName2 = 3
+    val result3 = getEdgesSync(twoQueryWithSampling(testId, sampleSize))
+    (result3 \ "results").as[List[JsValue]].size should be(sampleSize + 3) // 
edges in testLabelName2 = 3
+  }
+  test("test query with filterOut query") {
+    def queryWithFilterOut(id1: String, id2: String) = Json.parse(
+      s"""{
+         |     "limit": 10,
+         |     "filterOut": {
+         |             "srcVertices": [{
+         |                     "serviceName": "$testServiceName",
+         |                     "columnName": "$testColumnName",
+         |                     "id": $id1
+         |             }],
+         |             "steps": [{
+         |                     "step": [{
+         |                             "label": "$testLabelName",
+         |                             "direction": "out",
+         |                             "offset": 0,
+         |                             "limit": 10
+         |                     }]
+         |             }]
+         |     },
+         |     "srcVertices": [{
+         |             "serviceName": "$testServiceName",
+         |             "columnName": "$testColumnName",
+         |             "id": $id2
+         |     }],
+         |     "steps": [{
+         |             "step": [{
+         |                     "label": "$testLabelName",
+         |                     "direction": "out",
+         |                     "offset": 0,
+         |                     "limit": 5
+         |             }]
+         |     }]
+         |}
+       """.stripMargin
+    )
+
+    val testId1 = "-23"
+    val testId2 = "-25"
+
+    val bulkEdges = Seq(
+      toEdge(1, insert, e, testId1, 111, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(2, insert, e, testId1, 222, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(3, insert, e, testId1, 333, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(4, insert, e, testId2, 111, testLabelName, Json.obj(weight -> 1)),
+      toEdge(5, insert, e, testId2, 333, testLabelName, Json.obj(weight -> 1)),
+      toEdge(6, insert, e, testId2, 555, testLabelName, Json.obj(weight -> 1))
+    )
+    logger.debug(s"${bulkEdges.mkString("\n")}")
+    insertEdgesSync(bulkEdges: _*)
+
+    val rs = getEdgesSync(queryWithFilterOut(testId1, testId2))
+    logger.debug(Json.prettyPrint(rs))
+    val results = (rs \ "results").as[List[JsValue]]
+    results.size should be(1)
+    (results(0) \ "to").toString should be("555")
+  }
 
-    result = getEdgesSync(queryWithSampling(testId, 0))
-    println(Json.toJson(result))
-    (result \ "results").as[List[JsValue]].size should be(0) // edges in 
testLabelName2 = 3
 
-    result = getEdgesSync(queryWithSampling(testId, 10))
-    println(Json.toJson(result))
-    (result \ "results").as[List[JsValue]].size should be(3) // edges in 
testLabelName2 = 3
+  /** note that this merge two different label result into one */
+  test("weighted union") {
+    def queryWithWeightedUnion(id1: String, id2: String) = Json.parse(
+      s"""
+                               |{
+                               |  "limit": 10,
+                               |  "weights": [
+                               |    10,
+                               |    1
+                               |  ],
+                               |  "groupBy": ["weight"],
+                               |  "queries": [
+                               |    {
+                               |      "srcVertices": [
+                               |        {
+                               |          "serviceName": "$testServiceName",
+                               |          "columnName": "$testColumnName",
+                               |          "id": $id1
+                               |        }
+                               |      ],
+                               |      "steps": [
+                               |        {
+                               |          "step": [
+                               |            {
+                               |              "label": "$testLabelName",
+                               |              "direction": "out",
+                               |              "offset": 0,
+                               |              "limit": 5
+                               |            }
+                               |          ]
+                               |        }
+                               |      ]
+                               |    },
+                               |    {
+                               |      "srcVertices": [
+                               |        {
+                               |          "serviceName": "$testServiceName",
+                               |          "columnName": "$testColumnName",
+                               |          "id": $id2
+                               |        }
+                               |      ],
+                               |      "steps": [
+                               |        {
+                               |          "step": [
+                               |            {
+                               |              "label": "$testLabelName2",
+                               |              "direction": "out",
+                               |              "offset": 0,
+                               |              "limit": 5
+                               |            }
+                               |          ]
+                               |        }
+                               |      ]
+                               |    }
+                               |  ]
+                               |}
+       """.stripMargin
+    )
 
-    result = getEdgesSync(queryWithSampling(testId, -1))
-    println(Json.toJson(result))
-    (result \ "results").as[List[JsValue]].size should be(3) // edges in 
testLabelName2 = 3
+    val testId1 = "1"
+    val testId2 = "2"
 
+    val bulkEdges = Seq(
+      toEdge(1, insert, e, testId1, 111, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(2, insert, e, testId1, 222, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(3, insert, e, testId1, 333, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(4, insert, e, testId2, 444, testLabelName2, Json.obj(weight -> 
1)),
+      toEdge(5, insert, e, testId2, 555, testLabelName2, Json.obj(weight -> 
1)),
+      toEdge(6, insert, e, testId2, 666, testLabelName2, Json.obj(weight -> 1))
+    )
+
+    insertEdgesSync(bulkEdges: _*)
+
+    val rs = getEdgesSync(queryWithWeightedUnion(testId1, testId2))
+    logger.debug(Json.prettyPrint(rs))
+    val results = (rs \ "results").as[List[JsValue]]
+    results.size should be(2)
+    (results(0) \ "scoreSum").as[Float] should be(30.0)
+    (results(0) \ "agg").as[List[JsValue]].size should be(3)
+    (results(1) \ "scoreSum").as[Float] should be(3.0)
+    (results(1) \ "agg").as[List[JsValue]].size should be(3)
+  }
+
+  test("weighted union with options") {
+    def queryWithWeightedUnionWithOptions(id1: String, id2: String) = 
Json.parse(
+      s"""
+         |{
+         |  "limit": 10,
+         |  "weights": [
+         |    10,
+         |    1
+         |  ],
+         |  "groupBy": ["to"],
+         |  "select": ["to", "weight"],
+         |  "filterOut": {
+         |    "srcVertices": [
+         |      {
+         |        "serviceName": "$testServiceName",
+         |        "columnName": "$testColumnName",
+         |        "id": $id1
+         |      }
+         |    ],
+         |    "steps": [
+         |      {
+         |        "step": [
+         |          {
+         |            "label": "$testLabelName",
+         |            "direction": "out",
+         |            "offset": 0,
+         |            "limit": 10
+         |          }
+         |        ]
+         |      }
+         |    ]
+         |  },
+         |  "queries": [
+         |    {
+         |      "srcVertices": [
+         |        {
+         |          "serviceName": "$testServiceName",
+         |          "columnName": "$testColumnName",
+         |          "id": $id1
+         |        }
+         |      ],
+         |      "steps": [
+         |        {
+         |          "step": [
+         |            {
+         |              "label": "$testLabelName",
+         |              "direction": "out",
+         |              "offset": 0,
+         |              "limit": 5
+         |            }
+         |          ]
+         |        }
+         |      ]
+         |    },
+         |    {
+         |      "srcVertices": [
+         |        {
+         |          "serviceName": "$testServiceName",
+         |          "columnName": "$testColumnName",
+         |          "id": $id2
+         |        }
+         |      ],
+         |      "steps": [
+         |        {
+         |          "step": [
+         |            {
+         |              "label": "$testLabelName2",
+         |              "direction": "out",
+         |              "offset": 0,
+         |              "limit": 5
+         |            }
+         |          ]
+         |        }
+         |      ]
+         |    }
+         |  ]
+         |}
+       """.stripMargin
+    )
+
+    val testId1 = "-192848"
+    val testId2 = "-193849"
+
+    val bulkEdges = Seq(
+      toEdge(1, insert, e, testId1, 111, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(2, insert, e, testId1, 222, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(3, insert, e, testId1, 333, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(4, insert, e, testId2, 111, testLabelName2, Json.obj(weight -> 
1)),
+      toEdge(5, insert, e, testId2, 333, testLabelName2, Json.obj(weight -> 
1)),
+      toEdge(6, insert, e, testId2, 555, testLabelName2, Json.obj(weight -> 1))
+    )
+
+    insertEdgesSync(bulkEdges: _*)
+
+    val rs = getEdgesSync(queryWithWeightedUnionWithOptions(testId1, testId2))
+    logger.debug(Json.prettyPrint(rs))
+    val results = (rs \ "results").as[List[JsValue]]
+    results.size should be(1)
+
+  }
+
+  test("scoreThreshold") {
+    def queryWithScoreThreshold(id: String, scoreThreshold: Int) = Json.parse(
+      s"""{
+         |  "limit": 10,
+         |  "scoreThreshold": $scoreThreshold,
+         |  "groupBy": ["to"],
+         |  "srcVertices": [
+         |    {
+         |      "serviceName": "$testServiceName",
+         |      "columnName": "$testColumnName",
+         |      "id": $id
+         |    }
+         |  ],
+         |  "steps": [
+         |    {
+         |      "step": [
+         |        {
+         |          "label": "$testLabelName",
+         |          "direction": "out",
+         |          "offset": 0,
+         |          "limit": 10
+         |        }
+         |      ]
+         |    },
+         |    {
+         |      "step": [
+         |        {
+         |          "label": "$testLabelName",
+         |          "direction": "out",
+         |          "offset": 0,
+         |          "limit": 10
+         |        }
+         |      ]
+         |    }
+         |  ]
+         |}
+       """.stripMargin
+    )
+
+    val testId = "-23903"
+
+    val bulkEdges = Seq(
+      toEdge(1, insert, e, testId, 101, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, testId, 102, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, testId, 103, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, 101, 102, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, 101, 103, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, 101, 104, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, 102, 103, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, 102, 104, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, 103, 105, testLabelName, Json.obj(weight -> 10))
+    )
+    // expected: 104 -> 2, 103 -> 2, 102 -> 1,, 105 -> 1
+    insertEdgesSync(bulkEdges: _*)
+
+    var rs = getEdgesSync(queryWithScoreThreshold(testId, 2))
+    logger.debug(Json.prettyPrint(rs))
+    var results = (rs \ "results").as[List[JsValue]]
+    results.size should be(2)
+
+    rs = getEdgesSync(queryWithScoreThreshold(testId, 1))
+    logger.debug(Json.prettyPrint(rs))
+
+    results = (rs \ "results").as[List[JsValue]]
+    results.size should be(4)
   }
 
   def querySingle(id: Int, offset: Int = 0, limit: Int = 100) = Json.parse(
@@ -548,6 +861,23 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
           }
           """)
 
+  def queryGlobalLimit(id: Int, limit: Int): JsValue = Json.obj(
+    "limit" -> limit,
+    "srcVertices" -> Json.arr(
+      Json.obj("serviceName" -> testServiceName, "columnName" -> 
testColumnName, "id" -> id)
+    ),
+    "steps" -> Json.arr(
+      Json.obj(
+        "step" -> Json.arr(
+          Json.obj(
+            "label" -> testLabelName
+          )
+        )
+      )
+    )
+  )
+
+
   // called by each test, each
   override def beforeEach = initTestData()
 
@@ -555,7 +885,7 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
   override def initTestData(): Unit = {
     super.initTestData()
 
-    mutateEdgesSync(
+    insertEdgesSync(
       toEdge(1000, insert, e, 0, 1, testLabelName, Json.obj(weight -> 40, 
is_hidden -> true)),
       toEdge(2000, insert, e, 0, 2, testLabelName, Json.obj(weight -> 30, 
is_hidden -> false)),
       toEdge(3000, insert, e, 2, 0, testLabelName, Json.obj(weight -> 20)),

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala
 
b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala
index bdcdb9e..aae108e 100644
--- 
a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala
+++ 
b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala
@@ -14,7 +14,7 @@ class StrongLabelDeleteTest extends IntegrateCommon {
   import TestUtil._
 
   test("Strong consistency select") {
-    mutateEdgesSync(bulkEdges(): _*)
+    insertEdgesSync(bulkEdges(): _*)
 
     var result = getEdgesSync(query(0))
     (result \ "results").as[List[JsValue]].size should be(2)
@@ -56,7 +56,7 @@ class StrongLabelDeleteTest extends IntegrateCommon {
     println(result)
     (result \ "results").as[List[JsValue]].size should be(0)
 
-    mutateEdgesSync(bulkEdges(startTs = deletedAt + 1): _*)
+    insertEdgesSync(bulkEdges(startTs = deletedAt + 1): _*)
 
     result = getEdgesSync(query(20, direction = "in", columnName = 
testTgtColumnName))
     println(result)
@@ -69,7 +69,8 @@ class StrongLabelDeleteTest extends IntegrateCommon {
     val ret = for {
       i <- 0 until testNum
     } yield {
-      val src = System.currentTimeMillis()
+        val src = (i + 1) * 10000
+//      val src = System.currentTimeMillis()
 
       val (ret, last) = testInner(i, src)
       ret should be(true)
@@ -136,7 +137,7 @@ class StrongLabelDeleteTest extends IntegrateCommon {
     val allRequests = Random.shuffle(insertRequests ++ deleteRequests)
     //        val allRequests = insertRequests ++ deleteRequests
     val futures = allRequests.grouped(numOfConcurrentBatch).map { bulkRequests 
=>
-      mutateEdgesAsync(bulkRequests: _*)
+      insertEdgesAsync(bulkRequests: _*)
     }
 
     Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES))
@@ -175,7 +176,7 @@ class StrongLabelDeleteTest extends IntegrateCommon {
     }
     val allRequests = Random.shuffle(insertRequests ++ deleteRequests)
     val futures = allRequests.grouped(numOfConcurrentBatch).map { bulkRequests 
=>
-      mutateEdgesAsync(bulkRequests: _*)
+      insertEdgesAsync(bulkRequests: _*)
     }
 
     Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES))
@@ -199,7 +200,7 @@ class StrongLabelDeleteTest extends IntegrateCommon {
     val labelName = testLabelName2
     val maxTgtId = 10
     val batchSize = 10
-    val testNum = 3
+    val testNum = 100
     val numOfBatch = 10
 
     def testInner(startTs: Long, src: Long) = {
@@ -217,13 +218,13 @@ class StrongLabelDeleteTest extends IntegrateCommon {
         val op = if (Random.nextDouble() < 0.5) "delete" else "update"
 
         lastOps(tgt) = op
-        Seq(currentTs, op, "e", src, src + tgt, labelName, "{}").mkString("\t")
+        Seq(currentTs, op, "e", src, tgt, labelName, "{}").mkString("\t")
       }
 
       allRequests.foreach(println(_))
 
       val futures = Random.shuffle(allRequests).grouped(batchSize).map { 
bulkRequests =>
-        mutateEdgesAsync(bulkRequests: _*)
+        insertEdgesAsync(bulkRequests: _*)
       }
 
       Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES))

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala
 
b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala
index 1e2d836..b80d9c7 100644
--- 
a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala
+++ 
b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala
@@ -76,7 +76,7 @@ class WeakLabelDeleteTest extends IntegrateCommon with 
BeforeAndAfterEach {
     println(result)
     (result \ "results").as[List[JsValue]].size should be(0)
 
-    mutateEdgesSync(bulkEdges(startTs = deletedAt + 1): _*)
+    insertEdgesSync(bulkEdges(startTs = deletedAt + 1): _*)
 
     result = getEdgesSync(query(20, "in", testTgtColumnName))
     (result \ "results").as[List[JsValue]].size should be(3)
@@ -90,7 +90,7 @@ class WeakLabelDeleteTest extends IntegrateCommon with 
BeforeAndAfterEach {
 
   override def initTestData(): Unit = {
     super.initTestData()
-    mutateEdgesSync(bulkEdges(): _*)
+    insertEdgesSync(bulkEdges(): _*)
   }
 
   object WeakLabelDeleteHelper {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/test/scala/com/kakao/s2graph/core/ManagementTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/ManagementTest.scala 
b/s2core/src/test/scala/com/kakao/s2graph/core/ManagementTest.scala
deleted file mode 100644
index e5b3e51..0000000
--- a/s2core/src/test/scala/com/kakao/s2graph/core/ManagementTest.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-//package com.kakao.s2graph.core
-//import com.kakao.s2graph.core.Graph
-//import com.kakao.s2graph.core.models.{HLabel, HService, HServiceColumn, 
HBaseModel}
-//import com.typesafe.config.ConfigFactory
-//import org.scalatest.{FunSuite, Matchers}
-//import play.api.libs.json.{JsString, JsBoolean, JsNumber, Json}
-//
-//import scala.concurrent.ExecutionContext
-//
-///**
-// * Created by shon on 5/15/15.
-// */
-//class ManagementTest extends FunSuite with Matchers {
-//  val labelName = "test_label"
-//  val serviceName = "test_service"
-//  val columnName = "test_column"
-//  val columnType = "long"
-//  val indexProps = Seq("weight" -> JsNumber(5), "is_hidden" -> 
JsBoolean(true))
-//  val props = Seq("is_blocked" -> JsBoolean(true), "category" -> 
JsString("sports"))
-//  val consistencyLevel = "weak"
-//  val hTableName = Some("testHTable")
-//  val hTableTTL = Some(86000)
-//  val preSplitSize = 10
-//  val zkQuorum = "localhost"
-//
-//  val config = ConfigFactory.parseString(s"hbase.zookeeper.quorum=$zkQuorum")
-//  Graph(config)(ExecutionContext.Implicits.global)
-//  HBaseModel(zkQuorum)
-//  val current = System.currentTimeMillis()
-//  val serviceNames = (0 until 2).map { i => s"$serviceName-${current + i}" }
-//  val labelNames = (0 until 2).map { i => s"$labelName-${current + i}" }
-//
-////  def runTC[T <: HBaseModel](prevSeq: Long, prevSize: Int, prefix: 
String)(testSize: Int)(createF: String => T)(fetchF: String => 
Option[T])(deleteF: String => Boolean) = {
-////    var lastSeq = prevSeq
-////    val createds = collection.mutable.Map.empty[String, T]
-////    val names = (0 until testSize) map { i => s"$prefix-${current + i}"}
-////
-////    val rets = for {
-////      name <- names
-////    } yield {
-////        val created = createF(name)
-////        val testSeq = created.id.get > lastSeq
-////        lastSeq = created.id.get
-////        createds += (name -> created)
-////        val fetched = fetchF(name)
-////        fetched.isDefined && created == fetched.get && testSeq
-////      }
-////
-////    val deletes = for {
-////      name <- names
-////    } yield {
-////        deleteF(name)
-////      }
-////
-////    (rets ++ deletes).forall(_)
-////  }
-//  test("test create service") {
-//
-//    var prevSeq = Management.getSequence("HService")
-//    val prevSize = HService.findAllServices().size
-//    val createds = collection.mutable.Map.empty[String, HService]
-//
-//    val rets = for {
-//      serviceName <- serviceNames
-//    } yield {
-//      val service = Management.createService(serviceName, zkQuorum, 
hTableName.get, preSplitSize, hTableTTL)
-//      val testSeq = service.id.get > prevSeq
-//      prevSeq = service.id.get
-//      createds += (service.serviceName -> service)
-//      val other = Management.findService(service.serviceName)
-//      other.isDefined && service == other.get && testSeq
-//    }
-//
-//    val deletes = for {
-//      serviceName <- serviceNames
-//    } yield {
-//      Management.deleteService(serviceName)
-//    }
-//    (rets ++ deletes).forall(_)
-//
-//    HService.findAllServices().size == prevSize
-//  }
-//  test("test create label") {
-//    val service = Management.createService(serviceName, zkQuorum, 
hTableName.get, preSplitSize, hTableTTL)
-//    var prevSeq = Management.getSequence("HLabel")
-//    val prevSize = HLabel.findAll(useCache = false)
-//    val createds = collection.mutable.Map.empty[String, HLabel]
-//
-//    val rets = for {
-//      lName <- labelNames
-//    } yield {
-//      val label = Management.createLabel(lName, serviceName, columnName, 
columnType,
-//        serviceName, columnName, columnType,
-//        true, serviceName, indexProps, props,
-//        consistencyLevel, hTableName, hTableTTL
-//      )
-//      val testSeq = label.id.get > prevSeq
-//      prevSeq = label.id.get
-//
-//      createds += (label.label -> label)
-//      val other = Management.findLabel(label.label)
-//      other.isDefined && label == other.get && testSeq
-//    }
-//    println(HLabel.findAll(useCache = false))
-//    val deletes = for {
-//      lName <- labelNames
-//    } yield {
-//        Management.deleteLabel(lName)
-//      }
-//    (rets ++ deletes).forall(_)
-//    HLabel.findAll(useCache = false).size == prevSize
-//  }
-//  test("test update label") {
-//    HLabel.updateLabel(labelName, Seq("is_blocked" -> JsBoolean(false)))
-//    for {
-//      label <- HLabel.findByName(labelName, useCache = false)
-//    } yield {
-//      println(label)
-//    }
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala 
b/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
index b431cc2..febad3d 100644
--- a/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
+++ b/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
@@ -12,7 +12,7 @@ class WhereParserTest extends FunSuite with Matchers with 
TestCommonWithModels {
 
   // dummy data for dummy edge
   initTests()
-
+  
   import HBaseType.{VERSION1, VERSION2}
 
   val ts = System.currentTimeMillis()
@@ -167,6 +167,58 @@ class WhereParserTest extends FunSuite with Matchers with 
TestCommonWithModels {
     }
   }
 
+  test("replace reserved") {
+    val ts = 0
+    import com.kakao.s2graph.core.rest.TemplateHelper._
+
+    calculate(ts, 1, "hour") should be(hour + ts)
+    calculate(ts, 1, "day") should be(day + ts)
+
+    calculate(ts + 10, 1, "HOUR") should be(hour + ts + 10)
+    calculate(ts + 10, 1, "DAY") should be(day + ts + 10)
+
+    val body = """{
+               "day": ${1day},
+          "hour": ${1hour},
+          "-day": "${-10 day}",
+          "-hour": ${-10 hour},
+          "now": "${now}"
+        }
+      """
+
+    val parsed = replaceVariable(ts, body)
+    val json = Json.parse(parsed)
+
+    (json \ "day").as[Long] should be (1 * day + ts)
+    (json \ "hour").as[Long] should be (1 * hour + ts)
+
+    (json \ "-day").as[Long] should be (-10 * day + ts)
+    (json \ "-hour").as[Long] should be (-10 * hour + ts)
+
+    (json \ "now").as[Long] should be (ts)
+
+    val otherBody = """{
+          "nextday": "${next_day}",
+          "3dayago": "${next_day - 3 day}",
+          "nexthour": "${next_hour}"
+        }"""
+
+    val currentTs = System.currentTimeMillis()
+    val expectedDayTs = currentTs / day * day + day
+    val expectedHourTs = currentTs / hour * hour + hour
+    val threeDayAgo = expectedDayTs - 3 * day
+    val currentTsLs = (1 until 1000).map(currentTs + _)
+
+    currentTsLs.foreach { ts =>
+      val parsed = replaceVariable(ts, otherBody)
+      val json = Json.parse(parsed)
+
+      (json \ "nextday").as[Long] should be(expectedDayTs)
+      (json \ "nexthour").as[Long] should be(expectedHourTs)
+      (json \ "3dayago").as[Long] should be(threeDayAgo)
+    }
+  }
+
   //  test("time decay") {
   //    val ts = System.currentTimeMillis()
   //

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilderTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilderTest.scala
 
b/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilderTest.scala
index 38b5b99..128f2d7 100644
--- 
a/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilderTest.scala
+++ 
b/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilderTest.scala
@@ -1,53 +1,53 @@
-package com.kakao.s2graph.core.storage.hbase
-
-import com.kakao.s2graph.core.Graph
-import com.typesafe.config.ConfigFactory
-
-import org.apache.hadoop.hbase.util.Bytes
-import org.hbase.async.GetRequest
-import org.scalatest.{FunSuite, Matchers}
-
-import scala.concurrent.ExecutionContext
-
-class AsynchbaseQueryBuilderTest extends FunSuite with Matchers {
-
-  val dummyRequests = {
-    for {
-      id <- 0 until 1000
-    } yield {
-      new GetRequest("a", Bytes.toBytes(id))
-    }
-  }
-
-  implicit val ec = ExecutionContext.Implicits.global
-  val config = ConfigFactory.load()
-  val graph = new Graph(config)
-
-  val qb = new 
AsynchbaseQueryBuilder(graph.storage.asInstanceOf[AsynchbaseStorage])
-
-  test("test toCacheKeyBytes") {
-    val startedAt = System.nanoTime()
-
-    for {
-      i <- dummyRequests.indices
-      x = qb.toCacheKeyBytes(dummyRequests(i))
-    } {
-      for {
-        j <- dummyRequests.indices if i != j
-        y = qb.toCacheKeyBytes(dummyRequests(j))
-      } {
-        x should not equal y
-      }
-    }
-
-    dummyRequests.zip(dummyRequests).foreach { case (x, y) =>
-      val xHash = qb.toCacheKeyBytes(x)
-      val yHash = qb.toCacheKeyBytes(y)
-      //      println(xHash, yHash)
-      xHash should be(yHash)
-    }
-    val duration = System.nanoTime() - startedAt
-
-    println(s">> bytes: $duration")
-  }
-}
+//package com.kakao.s2graph.core.storage.hbase
+//
+//import com.kakao.s2graph.core.Graph
+//import com.typesafe.config.ConfigFactory
+//
+//import org.apache.hadoop.hbase.util.Bytes
+//import org.hbase.async.GetRequest
+//import org.scalatest.{FunSuite, Matchers}
+//
+//import scala.concurrent.ExecutionContext
+//
+//class AsynchbaseQueryBuilderTest extends FunSuite with Matchers {
+//
+//  val dummyRequests = {
+//    for {
+//      id <- 0 until 1000
+//    } yield {
+//      new GetRequest("a", Bytes.toBytes(id))
+//    }
+//  }
+//
+//  implicit val ec = ExecutionContext.Implicits.global
+//  val config = ConfigFactory.load()
+//  val graph = new Graph(config)
+//
+//  val qb = new 
AsynchbaseQueryBuilder(graph.storage.asInstanceOf[AsynchbaseStorage])
+//
+//  test("test toCacheKeyBytes") {
+//    val startedAt = System.nanoTime()
+//
+//    for {
+//      i <- dummyRequests.indices
+//      x = qb.toCacheKeyBytes(dummyRequests(i))
+//    } {
+//      for {
+//        j <- dummyRequests.indices if i != j
+//        y = qb.toCacheKeyBytes(dummyRequests(j))
+//      } {
+//        x should not equal y
+//      }
+//    }
+//
+//    dummyRequests.zip(dummyRequests).foreach { case (x, y) =>
+//      val xHash = qb.toCacheKeyBytes(x)
+//      val yHash = qb.toCacheKeyBytes(y)
+//      //      println(xHash, yHash)
+//      xHash should be(yHash)
+//    }
+//    val duration = System.nanoTime() - startedAt
+//
+//    println(s">> bytes: $duration")
+//  }
+//}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala 
b/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala
index 32e3d0c..a36b55f 100644
--- a/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala
+++ b/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala
@@ -128,7 +128,7 @@ object CounterFunctions extends Logging with WithKafka {
       itemRankingRdd.unpersist(false)
     }
   }
-  
+
   def makeTrxLogRdd(rdd: RDD[(String, String)], numPartitions: Int): 
RDD[TrxLog] = {
     rdd.mapPartitions { part =>
       assert(initialize)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2counter_loader/src/test/scala/s2/counter/stream/ExactCounterStreamingSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/test/scala/s2/counter/stream/ExactCounterStreamingSpec.scala
 
b/s2counter_loader/src/test/scala/s2/counter/stream/ExactCounterStreamingSpec.scala
new file mode 100644
index 0000000..4970399
--- /dev/null
+++ 
b/s2counter_loader/src/test/scala/s2/counter/stream/ExactCounterStreamingSpec.scala
@@ -0,0 +1,196 @@
+package s2.counter.stream
+
+import com.kakao.s2graph.core.GraphUtil
+import com.kakao.s2graph.core.mysqls.Label
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import play.api.libs.json.Json
+import s2.config.{S2ConfigFactory, S2CounterConfig}
+import s2.counter.core.CounterFunctions.HashMapAccumulable
+import s2.counter.core.TimedQualifier.IntervalUnit
+import s2.counter.core._
+import s2.counter.core.v2.{ExactStorageGraph, GraphOperation, 
RankingStorageGraph}
+import s2.helper.CounterAdmin
+import s2.models.{Counter, DBModel, DefaultCounterModel}
+import s2.spark.HashMapParam
+
+import scala.collection.mutable.{HashMap => MutableHashMap}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.util.{Failure, Success}
+
+/**
+  * Created by hsleep([email protected]) on 2015. 11. 19..
+  */
+class ExactCounterStreamingSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
+  private val master = "local[2]"
+  private val appName = "exact_counter_streaming"
+  private val batchDuration = Seconds(1)
+
+  private var sc: SparkContext = _
+  private var ssc: StreamingContext = _
+
+  val admin = new CounterAdmin(S2ConfigFactory.config)
+  val graphOp = new GraphOperation(S2ConfigFactory.config)
+  val s2config = new S2CounterConfig(S2ConfigFactory.config)
+
+  val exactCounter = new ExactCounter(S2ConfigFactory.config, new 
ExactStorageGraph(S2ConfigFactory.config))
+  val rankingCounter = new RankingCounter(S2ConfigFactory.config, new 
RankingStorageGraph(S2ConfigFactory.config))
+
+  val service = "test"
+  val action = "test_case"
+
+  override def beforeAll(): Unit = {
+    DBModel.initialize(S2ConfigFactory.config)
+
+    val conf = new SparkConf()
+      .setMaster(master)
+      .setAppName(appName)
+
+    ssc = new StreamingContext(conf, batchDuration)
+
+    sc = ssc.sparkContext
+
+    // create test_case label
+    com.kakao.s2graph.core.Management.createService(service, 
s2config.HBASE_ZOOKEEPER_QUORUM, s"${service}_dev", 1, None, "gz")
+    if (Label.findByName(action, useCache = false).isEmpty) {
+      val strJs =
+        s"""
+           |{
+           |  "label": "$action",
+           |  "srcServiceName": "$service",
+           |  "srcColumnName": "src",
+           |  "srcColumnType": "string",
+           |  "tgtServiceName": "$service",
+           |  "tgtColumnName": "$action",
+           |  "tgtColumnType": "string",
+           |  "indices": [
+           |  ],
+           |  "props": [
+           |  ]
+           |}
+       """.stripMargin
+      graphOp.createLabel(Json.parse(strJs))
+    }
+
+    // action
+    admin.deleteCounter(service, action).foreach {
+      case Success(v) =>
+      case Failure(ex) =>
+        println(s"$ex")
+    }
+    admin.createCounter(Counter(useFlag = true, 2, service, action, 
Counter.ItemType.STRING, autoComb = true, "is_shared,relationship", useRank = 
true))
+  }
+
+  override def afterAll(): Unit = {
+    admin.deleteCounter(service, action)
+    if (ssc != null) {
+      ssc.stop()
+    }
+  }
+
+  "ExactCounter" should "update" in {
+    val policy = DefaultCounterModel.findByServiceAction(service, action).get
+    val data =
+      s"""
+        |1434534565675 $service        $action 70362200_94013572857366866      
{"is_shared":"false","relationship":"FE"}       
{"userId":"48255079","userIdType":"profile_id","value":"1"}
+        |1434534565675 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"48255079","userIdType":"profile_id","value":"1"}
+        |1434534566220 $service        $action 51223360_94013140590929619      
{"is_shared":"false","relationship":"FE"}       
{"userId":"312383","userIdType":"profile_id","value":"1"}
+        |1434534566508 $service        $action 63808459_94013420047377826      
{"is_shared":"false","relationship":"FE"}       
{"userId":"21968241","userIdType":"profile_id","value":"1"}
+        |1434534566210 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"6062217","userIdType":"profile_id","value":"1"}
+        |1434534566459 $service        $action 49699692_94012186431261763      
{"is_shared":"false","relationship":"FE"}       
{"userId":"67863471","userIdType":"profile_id","value":"1"}
+        |1434534565681 $service        $action 64556827_94012311028641810      
{"is_shared":"false","relationship":"FE"}       
{"userId":"19381218","userIdType":"profile_id","value":"1"}
+        |1434534565865 $service        $action 41814266_94012477588942163      
{"is_shared":"false","relationship":"FE"}       
{"userId":"19268547","userIdType":"profile_id","value":"1"}
+        |1434534565865 $service        $action 66697741_94007840665633458      
{"is_shared":"false","relationship":"FE"}       
{"userId":"19268547","userIdType":"profile_id","value":"1"}
+        |1434534566142 $service        $action 66444074_94012737377133826      
{"is_shared":"false","relationship":"FE"}       
{"userId":"11917195","userIdType":"profile_id","value":"1"}
+        |1434534566077 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"37709890","userIdType":"profile_id","value":"1"}
+        |1434534565938 $service        $action 40921487_94012905738975266      
{"is_shared":"false","relationship":"FE"}       
{"userId":"59869223","userIdType":"profile_id","value":"1"}
+        |1434534566033 $service        $action 64506628_93994707216829506      
{"is_shared":"false","relationship":"FE"}       
{"userId":"50375575","userIdType":"profile_id","value":"1"}
+        |1434534566451 $service        $action 40748868_94013448321919139      
{"is_shared":"false","relationship":"FE"}       
{"userId":"12249539","userIdType":"profile_id","value":"1"}
+        |1434534566669 $service        $action 64499956_94013227717457106      
{"is_shared":"false","relationship":"FE"}       
{"userId":"25167419","userIdType":"profile_id","value":"1"}
+        |1434534566669 $service        $action 66444074_94012737377133826      
{"is_shared":"false","relationship":"FE"}       
{"userId":"25167419","userIdType":"profile_id","value":"1"}
+        |1434534566318 $service        $action 64774665_94012837889027027      
{"is_shared":"true","relationship":"F"} 
{"userId":"71557816","userIdType":"profile_id","value":"1"}
+        |1434534566274 $service        $action 67075480_94008509166933763      
{"is_shared":"false","relationship":"FE"}       
{"userId":"57931860","userIdType":"profile_id","value":"1"}
+        |1434534566659 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"19990823","userIdType":"profile_id","value":"1"}
+        |1434534566250 $service        $action 70670053_93719933175630611      
{"is_shared":"true","relationship":"F"} 
{"userId":"68897412","userIdType":"profile_id","value":"1"}
+        |1434534566402 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"15541439","userIdType":"profile_id","value":"1"}
+        |1434534566122 $service        $action 48890741_94013463616012786      
{"is_shared":"false","relationship":"FE"}       
{"userId":"48040409","userIdType":"profile_id","value":"1"}
+        |1434534566055 $service        $action 64509008_94002318232678546      
{"is_shared":"true","relationship":"F"} 
{"userId":"46532039","userIdType":"profile_id","value":"1"}
+        |1434534565994 $service        $action 66644368_94009163363033795      
{"is_shared":"false","relationship":"FE"}       
{"userId":"4143147","userIdType":"profile_id","value":"1"}
+        |1434534566448 $service        $action 64587644_93938555963733954      
{"is_shared":"false","relationship":"FE"}       
{"userId":"689042","userIdType":"profile_id","value":"1"}
+        |1434534565935 $service        $action 52812511_94012009551561315      
{"is_shared":"false","relationship":"FE"}       
{"userId":"35509692","userIdType":"profile_id","value":"1"}
+        |1434534566544 $service        $action 70452048_94008573197583762      
{"is_shared":"false","relationship":"FE"}       
{"userId":"5172421","userIdType":"profile_id","value":"1"}
+        |1434534565929 $service        $action 54547023_94013384964278435      
{"is_shared":"false","relationship":"FE"}       
{"userId":"33556498","userIdType":"profile_id","value":"1"}
+        |1434534566358 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"8987346","userIdType":"profile_id","value":"1"}
+        |1434534566057 $service        $action 67075480_94008509166933763      
{"is_shared":"false","relationship":"FE"}       
{"userId":"35134964","userIdType":"profile_id","value":"1"}
+        |1434534566140 $service        $action 54547023_94013384964278435      
{"is_shared":"false","relationship":"FE"}       
{"userId":"11900315","userIdType":"profile_id","value":"1"}
+        |1434534566158 $service        $action 64639374_93888330176053635      
{"is_shared":"true","relationship":"F"} 
{"userId":"49996643","userIdType":"profile_id","value":"1"}
+        |1434534566025 $service        $action 67265128_94009084771192002      
{"is_shared":"false","relationship":"FE"}       
{"userId":"37801480","userIdType":"profile_id","value":"1"}
+      """.stripMargin.trim
+    //    println(data)
+    val rdd = sc.parallelize(Seq(("", data)))
+
+    //    rdd.foreachPartition { part =>
+    //      part.foreach(println)
+    //    }
+    val resultRdd = CounterFunctions.makeExactRdd(rdd, 2)
+    val result = resultRdd.collect().toMap
+
+    //    result.foreachPartition { part =>
+    //      part.foreach(println)
+    //    }
+
+    val parsed = {
+      for {
+        line <- GraphUtil.parseString(data)
+        item <- CounterEtlItem(line).toSeq
+        ev <- CounterFunctions.exactMapper(item).toSeq
+      } yield {
+        ev
+      }
+    }
+    val parsedResult = parsed.groupBy(_._1).mapValues(values => 
values.map(_._2).reduce(CounterFunctions.reduceValue[ExactQualifier, Long](_ + 
_, 0L)))
+
+    //    parsedResult.foreach { case (k, v) =>
+    //      println(k, v)
+    //    }
+
+    result should not be empty
+    result should equal (parsedResult)
+
+    val itemId = "46889329_94013502934177075"
+    val key = ExactKey(DefaultCounterModel.findByServiceAction(service, 
action).get, itemId, checkItemType = true)
+    val value = result.get(key)
+
+    value should not be empty
+    value.get.get(ExactQualifier(TimedQualifier("t", 0), Map.empty[String, 
String])) should equal (Some(6L))
+
+    exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map.empty[String, Set[String]]) should be (None)
+
+    val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, 
Long], "Throughput")(HashMapParam[String, Long](_ + _))
+    resultRdd.foreachPartition { part =>
+      CounterFunctions.updateExactCounter(part.toSeq, acc)
+    }
+
+    Option(FetchedCountsGrouped(key, Map(
+      (IntervalUnit.TOTAL, Map.empty[String, String]) -> 
Map(ExactQualifier(TimedQualifier("t", 0), "") -> 6l)
+    ))).foreach { expected =>
+      exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map.empty[String, Set[String]]) should be (Some(expected))
+    }
+    Option(FetchedCountsGrouped(key, Map(
+      (IntervalUnit.TOTAL, Map("is_shared" -> "false")) -> 
Map(ExactQualifier(TimedQualifier("t", 0), "is_shared.false") -> 6l)
+    ))).foreach { expected =>
+      exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map("is_shared" -> Set("false"))) should be (Some(expected))
+    }
+    Option(FetchedCountsGrouped(key, Map(
+      (IntervalUnit.TOTAL, Map("relationship" -> "FE")) -> 
Map(ExactQualifier(TimedQualifier("t", 0), "relationship.FE") -> 6l)
+    ))).foreach { expected =>
+      exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map("relationship" -> Set("FE"))) should be (Some(expected))
+    }
+    Option(FetchedCountsGrouped(key, Map(
+      (IntervalUnit.TOTAL, Map("is_shared" -> "false", "relationship" -> 
"FE")) -> Map(ExactQualifier(TimedQualifier("t", 0), 
"is_shared.relationship.false.FE") -> 6l)
+    ))).foreach { expected =>
+      exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map("is_shared" -> Set("false"), "relationship" -> Set("FE"))) should be 
(Some(expected))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_netty/build.sbt
----------------------------------------------------------------------
diff --git a/s2rest_netty/build.sbt b/s2rest_netty/build.sbt
new file mode 100644
index 0000000..666fd80
--- /dev/null
+++ b/s2rest_netty/build.sbt
@@ -0,0 +1,7 @@
+name := "s2rest_netty"
+
+enablePlugins(JavaAppPackaging)
+
+libraryDependencies ++= Seq(
+  "io.netty" % "netty-all" % "4.0.33.Final"
+)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_netty/conf/logger.xml
----------------------------------------------------------------------
diff --git a/s2rest_netty/conf/logger.xml b/s2rest_netty/conf/logger.xml
new file mode 100644
index 0000000..2d767c2
--- /dev/null
+++ b/s2rest_netty/conf/logger.xml
@@ -0,0 +1,83 @@
+<configuration>
+
+    <conversionRule conversionWord="coloredLevel" 
converterClass="play.api.Logger$ColoredLevel"/>
+
+    <appender name="FILE" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>logs/application.log</file>
+
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            <fileNamePattern>logs/application.%i.log</fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>9</maxIndex>
+        </rollingPolicy>
+
+        <triggeringPolicy 
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>500MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%date [%level] [%logger] [%thread] - %message 
%xException%n</pattern>
+        </encoder>
+    </appender>
+
+
+    <appender name="ERROR" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>logs/error.log</file>
+        <append>true</append>
+        <encoder>
+            <pattern>%date [%level] [%logger] [%thread] - %message 
%xException%n</pattern>
+        </encoder>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>logs/error.log.%d.%i</fileNamePattern>
+            <timeBasedFileNamingAndTriggeringPolicy 
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+                <maxFileSize>500MB</maxFileSize>
+            </timeBasedFileNamingAndTriggeringPolicy>
+            <maxHistory>3</maxHistory>
+        </rollingPolicy>
+    </appender>
+
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%coloredLevel %logger{15} - 
%message%n%xException%n</pattern>
+        </encoder>
+    </appender>
+
+    <appender name="ACTOR" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>logs/actor.log</file>
+        <append>true</append>
+        <encoder>
+            <pattern>%date [%level] [%logger] [%thread] - %message 
%xException%n</pattern>
+        </encoder>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>logs/actor.log.%d.%i</fileNamePattern>
+            <timeBasedFileNamingAndTriggeringPolicy 
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+                <maxFileSize>200MB</maxFileSize>
+            </timeBasedFileNamingAndTriggeringPolicy>
+            <maxHistory>7</maxHistory>
+        </rollingPolicy>
+    </appender>
+
+    <logger name="play" level="INFO">
+        <appender-ref ref="STDOUT"/>
+        <appender-ref ref="FILE"/>
+    </logger>
+
+    <logger name="application" level="INFO">
+        <appender-ref ref="STDOUT"/>
+        <appender-ref ref="FILE"/>
+    </logger>
+
+    <logger name="error" level="INFO">
+        <appender-ref ref="STDOUT"/>
+        <appender-ref ref="ERROR"/>
+    </logger>
+
+    <logger name="actor" level="INFO">
+        <appender-ref ref="ACTOR"/>
+    </logger>
+
+    <logger name="akka" level="INFO">
+        <appender-ref ref="STDOUT"/>
+        <appender-ref ref="FILE"/>
+    </logger>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_netty/conf/reference.conf
----------------------------------------------------------------------
diff --git a/s2rest_netty/conf/reference.conf b/s2rest_netty/conf/reference.conf
new file mode 100644
index 0000000..a992e5d
--- /dev/null
+++ b/s2rest_netty/conf/reference.conf
@@ -0,0 +1,131 @@
+# This is the main configuration file for the application.
+# ~~~~~
+
+# Secret key
+# ~~~~~
+# The secret key is used to secure cryptographics functions.
+#
+# This must be changed for production, but we recommend not changing it in 
this file.
+#
+# See http://www.playframework.com/documentation/latest/ApplicationSecret for 
more details.
+application.secret="/`==g^yr2DNnZGK_L^rguLZeR`60uLOVgY@OhyTv:maatl:Tl>9or/d1xME3b/Pi"
+
+# The application languages
+# ~~~~~
+application.langs="en"
+
+# Global object class
+# ~~~~~
+# Define the Global object class for this application.
+# Default to Global in the root package.
+# application.global=Global
+
+# Router
+# ~~~~~
+# Define the Router object to use for this application.
+# This router will be looked up first when the application is starting up,
+# so make sure this is the entry point.
+# Furthermore, it's assumed your route file is named properly.
+# So for an application router like `my.application.Router`,
+# you may need to define a router file `conf/my.application.routes`.
+# Default to Routes in the root package (and conf/routes)
+# application.router=my.application.Routes
+
+# Database configuration
+# ~~~~~
+# You can declare as many datasources as you want.
+# By convention, the default datasource is named `default`
+#
+# db.default.driver=org.h2.Driver
+# db.default.url="jdbc:h2:mem:play"
+# db.default.user=sa
+# db.default.password=""
+
+# Evolutions
+# ~~~~~
+# You can disable evolutions if needed
+# evolutionplugin=disabled
+
+# Logger
+# ~~~~~
+# You can also configure logback (http://logback.qos.ch/),
+# by providing an application-logger.xml file in the conf directory.
+
+# Root logger:
+logger.root=ERROR
+
+# Logger used by the framework:
+logger.play=INFO
+
+# Logger provided to your application:
+logger.application=DEBUG
+
+# APP PHASE
+phase=dev
+host=localhost
+
+# DB
+s2graph.models.table.name="models-dev"
+hbase.zookeeper.quorum=${host}
+db.default.url="jdbc:mysql://"${host}":3306/graph_dev"
+# Query server
+is.query.server=true
+is.write.server=true
+query.hard.limit=100000
+
+# Local Cache
+cache.ttl.seconds=60
+cache.max.size=100000
+
+# HBASE
+#hbase.client.operation.timeout=1000
+#async.hbase.client.flush.interval=100
+hbase.table.compression.algorithm="gz"
+
+# Asynchbase
+hbase.client.retries.number=100
+hbase.rpcs.buffered_flush_interval=100
+hbase.rpc.timeout=0
+#hbase.nsre.high_watermark=1000000
+#hbase.timer.tick=5
+#hbase.timer.ticks_per_wheel=5
+
+# Kafka
+kafka.metadata.broker.list=${host}
+kafka.producer.pool.size=0
+
+# HTTP
+parsers.text.maxLength=512K
+parsers.json.maxLength=512K
+trustxforwarded=false
+
+# Local Queue Actor
+local.queue.actor.max.queue.size=100000
+local.queue.actor.rate.limit=1000000
+
+# local retry number
+max.retry.number=100
+max.back.off=50
+delete.all.fetch.size=10000
+hbase.fail.prob=-1.0
+
+# max allowd edges for deleteAll is multiply of above two configuration.
+
+# set global obejct package, TODO: remove global
+application.global=com.kakao.s2graph.rest.Global
+
+akka {
+  loggers = ["akka.event.slf4j.Slf4jLogger"]
+  loglevel = "DEBUG"
+}
+
+
+# Future cache.
+future.cache.max.size=100000
+future.cache.expire.after.write=10000
+future.cache.expire.after.access=5000
+
+
+# Counter
+redis.instances = [${host}]
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_netty/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/s2rest_netty/src/main/resources/application.conf 
b/s2rest_netty/src/main/resources/application.conf
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_netty/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/s2rest_netty/src/main/resources/reference.conf 
b/s2rest_netty/src/main/resources/reference.conf
new file mode 100644
index 0000000..a992e5d
--- /dev/null
+++ b/s2rest_netty/src/main/resources/reference.conf
@@ -0,0 +1,131 @@
+# This is the main configuration file for the application.
+# ~~~~~
+
+# Secret key
+# ~~~~~
+# The secret key is used to secure cryptographics functions.
+#
+# This must be changed for production, but we recommend not changing it in 
this file.
+#
+# See http://www.playframework.com/documentation/latest/ApplicationSecret for 
more details.
+application.secret="/`==g^yr2DNnZGK_L^rguLZeR`60uLOVgY@OhyTv:maatl:Tl>9or/d1xME3b/Pi"
+
+# The application languages
+# ~~~~~
+application.langs="en"
+
+# Global object class
+# ~~~~~
+# Define the Global object class for this application.
+# Default to Global in the root package.
+# application.global=Global
+
+# Router
+# ~~~~~
+# Define the Router object to use for this application.
+# This router will be looked up first when the application is starting up,
+# so make sure this is the entry point.
+# Furthermore, it's assumed your route file is named properly.
+# So for an application router like `my.application.Router`,
+# you may need to define a router file `conf/my.application.routes`.
+# Default to Routes in the root package (and conf/routes)
+# application.router=my.application.Routes
+
+# Database configuration
+# ~~~~~
+# You can declare as many datasources as you want.
+# By convention, the default datasource is named `default`
+#
+# db.default.driver=org.h2.Driver
+# db.default.url="jdbc:h2:mem:play"
+# db.default.user=sa
+# db.default.password=""
+
+# Evolutions
+# ~~~~~
+# You can disable evolutions if needed
+# evolutionplugin=disabled
+
+# Logger
+# ~~~~~
+# You can also configure logback (http://logback.qos.ch/),
+# by providing an application-logger.xml file in the conf directory.
+
+# Root logger:
+logger.root=ERROR
+
+# Logger used by the framework:
+logger.play=INFO
+
+# Logger provided to your application:
+logger.application=DEBUG
+
+# APP PHASE
+phase=dev
+host=localhost
+
+# DB
+s2graph.models.table.name="models-dev"
+hbase.zookeeper.quorum=${host}
+db.default.url="jdbc:mysql://"${host}":3306/graph_dev"
+# Query server
+is.query.server=true
+is.write.server=true
+query.hard.limit=100000
+
+# Local Cache
+cache.ttl.seconds=60
+cache.max.size=100000
+
+# HBASE
+#hbase.client.operation.timeout=1000
+#async.hbase.client.flush.interval=100
+hbase.table.compression.algorithm="gz"
+
+# Asynchbase
+hbase.client.retries.number=100
+hbase.rpcs.buffered_flush_interval=100
+hbase.rpc.timeout=0
+#hbase.nsre.high_watermark=1000000
+#hbase.timer.tick=5
+#hbase.timer.ticks_per_wheel=5
+
+# Kafka
+kafka.metadata.broker.list=${host}
+kafka.producer.pool.size=0
+
+# HTTP
+parsers.text.maxLength=512K
+parsers.json.maxLength=512K
+trustxforwarded=false
+
+# Local Queue Actor
+local.queue.actor.max.queue.size=100000
+local.queue.actor.rate.limit=1000000
+
+# local retry number
+max.retry.number=100
+max.back.off=50
+delete.all.fetch.size=10000
+hbase.fail.prob=-1.0
+
+# max allowd edges for deleteAll is multiply of above two configuration.
+
+# set global obejct package, TODO: remove global
+application.global=com.kakao.s2graph.rest.Global
+
+akka {
+  loggers = ["akka.event.slf4j.Slf4jLogger"]
+  loglevel = "DEBUG"
+}
+
+
+# Future cache.
+future.cache.max.size=100000
+future.cache.expire.after.write=10000
+future.cache.expire.after.access=5000
+
+
+# Counter
+redis.instances = [${host}]
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_netty/src/main/scala/Server.scala
----------------------------------------------------------------------
diff --git a/s2rest_netty/src/main/scala/Server.scala 
b/s2rest_netty/src/main/scala/Server.scala
index 7290b8a..16477b1 100644
--- a/s2rest_netty/src/main/scala/Server.scala
+++ b/s2rest_netty/src/main/scala/Server.scala
@@ -1,9 +1,12 @@
 package com.kakao.s2graph.rest.netty
 
+import java.util.Map.Entry
 import java.util.concurrent.Executors
+import java.util.function.Consumer
 
 import com.kakao.s2graph.core.GraphExceptions.BadQueryException
 import com.kakao.s2graph.core._
+import com.kakao.s2graph.core.mysqls.Experiment
 import com.kakao.s2graph.core.rest.RestHandler.HandlerResult
 import com.kakao.s2graph.core.rest._
 import com.kakao.s2graph.core.utils.Extensions._
@@ -88,11 +91,11 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: 
ExecutionContext) extends
         case e: BadQueryException =>
           logger.error(s"{$requestBody}, ${e.getMessage}", e)
           val buf: ByteBuf = 
Unpooled.copiedBuffer(PostProcess.badRequestResults(e).toString, 
CharsetUtil.UTF_8)
-          simpleResponse(ctx, Ok, byteBufOpt = Option(buf), 
channelFutureListenerOpt = closeOpt, headers = headers.result())
+          simpleResponse(ctx, Ok, byteBufOpt = Option(buf), 
channelFutureListenerOpt = CloseOpt, headers = headers.result())
         case e: Exception =>
           logger.error(s"${requestBody}, ${e.getMessage}", e)
           val buf: ByteBuf = 
Unpooled.copiedBuffer(PostProcess.emptyResults.toString, CharsetUtil.UTF_8)
-          simpleResponse(ctx, InternalServerError, byteBufOpt = Option(buf), 
channelFutureListenerOpt = closeOpt, headers = headers.result())
+          simpleResponse(ctx, InternalServerError, byteBufOpt = Option(buf), 
channelFutureListenerOpt = CloseOpt, headers = headers.result())
       }
     }
   }
@@ -130,11 +133,10 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: 
ExecutionContext) extends
         } else badRoute(ctx)
 
       case HttpMethod.POST =>
-        val jsonString = req.content.toString(CharsetUtil.UTF_8)
-        val jsQuery = Json.parse(jsonString)
+        val body = req.content.toString(CharsetUtil.UTF_8)
 
-        val result = s2rest.doPost(uri, jsQuery)
-        toResponse(ctx, req, jsQuery, result, startedAt)
+        val result = s2rest.doPost(uri, body, 
Option(req.headers().get(Experiment.impressionKey)))
+        toResponse(ctx, req, Json.parse(body), result, startedAt)
 
       case _ =>
         simpleResponse(ctx, BadRequest, byteBufOpt = None, 
channelFutureListenerOpt = CloseOpt)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_play/app/Bootstrap.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/Bootstrap.scala b/s2rest_play/app/Bootstrap.scala
index 8005f79..6ce3ac4 100644
--- a/s2rest_play/app/Bootstrap.scala
+++ b/s2rest_play/app/Bootstrap.scala
@@ -7,7 +7,7 @@ import com.kakao.s2graph.core.rest._
 import com.kakao.s2graph.core.utils.logger
 import com.kakao.s2graph.core.{Management, ExceptionHandler, Graph}
 import config.Config
-import controllers.{AdminController, ApplicationController}
+import controllers.{ApplicationController}
 import play.api.Application
 import play.api.mvc.{WithFilters, _}
 import play.filters.gzip.GzipFilter

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_play/app/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/EdgeController.scala 
b/s2rest_play/app/controllers/EdgeController.scala
index 2b3b32e..a8e1a41 100644
--- a/s2rest_play/app/controllers/EdgeController.scala
+++ b/s2rest_play/app/controllers/EdgeController.scala
@@ -1,11 +1,9 @@
 package controllers
 
 import actors.QueueActor
-import com.kakao.s2graph.core.GraphExceptions.BadQueryException
 import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls.{LabelMeta, Label}
+import com.kakao.s2graph.core.mysqls.{Label}
 import com.kakao.s2graph.core.rest.RequestParser
-import com.kakao.s2graph.core.types.LabelWithDirection
 import com.kakao.s2graph.core.utils.logger
 import config.Config
 import org.apache.kafka.clients.producer.ProducerRecord
@@ -14,7 +12,6 @@ import play.api.mvc.{Controller, Result}
 
 import scala.collection.Seq
 import scala.concurrent.Future
-import scala.util.{Failure, Success}
 
 object EdgeController extends Controller {
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_play/build.sbt
----------------------------------------------------------------------
diff --git a/s2rest_play/build.sbt b/s2rest_play/build.sbt
old mode 100755
new mode 100644

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala 
b/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala
index bf24ed7..a387ba5 100644
--- a/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala
+++ b/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala
@@ -4,43 +4,43 @@ import play.api.libs.json.JsNumber
 import play.api.test.{FakeApplication, PlaySpecification, WithApplication}
 import play.libs.Json
 
-class JsonBenchmarkSpec extends BenchmarkCommon with PlaySpecification {
-  "to json" should {
-    implicit val app = FakeApplication()
-
-    "json benchmark" in new WithApplication(app) {
+class JsonBenchmarkSpec extends BenchmarkCommon {
+  "to json" >> {
+    "json benchmark" >> {
 
       duration("map to json") {
-        (0 to 100) foreach { n =>
+        (0 to 10) foreach { n =>
           val numberMaps = (0 to 100).map { n => (n.toString -> JsNumber(n * 
n)) }.toMap
           Json.toJson(numberMaps)
         }
       }
 
       duration("directMakeJson") {
-        (0 to 100) foreach { n =>
+        (0 to 10) foreach { n =>
           var jsObj = play.api.libs.json.Json.obj()
-          (0 to 100).foreach { n =>
+          (0 to 10).foreach { n =>
             jsObj += (n.toString -> JsNumber(n * n))
           }
         }
       }
 
       duration("map to json 2") {
-        (0 to 500) foreach { n =>
-          val numberMaps = (0 to 100).map { n => (n.toString -> JsNumber(n * 
n)) }.toMap
+        (0 to 50) foreach { n =>
+          val numberMaps = (0 to 10).map { n => (n.toString -> JsNumber(n * 
n)) }.toMap
           Json.toJson(numberMaps)
         }
       }
 
       duration("directMakeJson 2") {
-        (0 to 500) foreach { n =>
+        (0 to 50) foreach { n =>
           var jsObj = play.api.libs.json.Json.obj()
-          (0 to 100).foreach { n =>
+          (0 to 10).foreach { n =>
             jsObj += (n.toString -> JsNumber(n * n))
           }
         }
       }
+      true
     }
+    true
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala 
b/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala
index d38ff5e..d2d3624 100644
--- a/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala
+++ b/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala
@@ -8,10 +8,7 @@ import play.api.{Application => PlayApplication}
 
 import scala.util.Random
 
-/**
-  * Created by hsleep([email protected]) on 2015. 11. 9..
-  */
-class OrderingUtilBenchmarkSpec extends BenchmarkCommon with PlaySpecification 
{
+class OrderingUtilBenchmarkSpec extends BenchmarkCommon {
   "OrderingUtilBenchmarkSpec" should {
 
     "performance MultiOrdering any" >> {
@@ -24,7 +21,7 @@ class OrderingUtilBenchmarkSpec extends BenchmarkCommon with 
PlaySpecification {
       }
 
       val sorted1 = duration("TupleOrdering double,long") {
-        (0 until 10000) foreach { _ =>
+        (0 until 1000) foreach { _ =>
           tupLs.sortBy { case (x, y) =>
             -x -> -y
           }
@@ -35,7 +32,7 @@ class OrderingUtilBenchmarkSpec extends BenchmarkCommon with 
PlaySpecification {
       }.map { x => x._1 }
 
       val sorted2 = duration("MultiOrdering double,long") {
-        (0 until 10000) foreach { _ =>
+        (0 until 1000) foreach { _ =>
           seqLs.sorted(new SeqMultiOrdering[Any](Seq(false, false)))
         }
         seqLs.sorted(new SeqMultiOrdering[Any](Seq(false, false)))
@@ -45,7 +42,7 @@ class OrderingUtilBenchmarkSpec extends BenchmarkCommon with 
PlaySpecification {
     }
 
     "performance MultiOrdering double" >> {
-      val tupLs = (0 until 500) map { i =>
+      val tupLs = (0 until 50) map { i =>
         Random.nextDouble() -> Random.nextDouble()
       }
 
@@ -54,13 +51,13 @@ class OrderingUtilBenchmarkSpec extends BenchmarkCommon 
with PlaySpecification {
       }
 
       duration("MultiOrdering double") {
-        (0 until 10000) foreach { _ =>
+        (0 until 1000) foreach { _ =>
           seqLs.sorted(new SeqMultiOrdering[Double](Seq(false, false)))
         }
       }
 
       duration("TupleOrdering double") {
-        (0 until 10000) foreach { _ =>
+        (0 until 1000) foreach { _ =>
           tupLs.sortBy { case (x, y) =>
             -x -> -y
           }
@@ -71,7 +68,7 @@ class OrderingUtilBenchmarkSpec extends BenchmarkCommon with 
PlaySpecification {
     }
 
     "performance MultiOrdering jsvalue" >> {
-      val tupLs = (0 until 500) map { i =>
+      val tupLs = (0 until 50) map { i =>
         Random.nextDouble() -> Random.nextLong()
       }
 
@@ -80,7 +77,7 @@ class OrderingUtilBenchmarkSpec extends BenchmarkCommon with 
PlaySpecification {
       }
 
       val sorted1 = duration("TupleOrdering double,long") {
-        (0 until 10000) foreach { _ =>
+        (0 until 1000) foreach { _ =>
           tupLs.sortBy { case (x, y) =>
             -x -> -y
           }
@@ -91,7 +88,7 @@ class OrderingUtilBenchmarkSpec extends BenchmarkCommon with 
PlaySpecification {
       }
 
       val sorted2 = duration("MultiOrdering jsvalue") {
-        (0 until 10000) foreach { _ =>
+        (0 until 1000) foreach { _ =>
           seqLs.sorted(new SeqMultiOrdering[JsValue](Seq(false, false)))
         }
         seqLs.sorted(new SeqMultiOrdering[JsValue](Seq(false, false)))

Reply via email to