Repository: incubator-s2graph
Updated Branches:
  refs/heads/master b5908311a -> 8dbb9a3ee


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
index fda9991..6054d67 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
@@ -20,27 +20,27 @@
 package org.apache.s2graph.core.Integrate
 
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.utils.logger
 import play.api.libs.json.{JsObject, Json}
 
 class CrudTest extends IntegrateCommon {
   import CrudHelper._
   import TestUtil._
 
-  test("test CRUD") {
-    var tcNum = 0
-    var tcString = ""
-    var bulkQueries = List.empty[(Long, String, String)]
-    var expected = Map.empty[String, String]
-
-    val curTime = System.currentTimeMillis
-    val t1 = curTime + 0
-    val t2 = curTime + 1
-    val t3 = curTime + 2
-    val t4 = curTime + 3
-    val t5 = curTime + 4
-
-    val tcRunner = new CrudTestRunner()
-    tcNum = 1
+  var tcString = ""
+  var bulkQueries = List.empty[(Long, String, String)]
+  var expected = Map.empty[String, String]
+
+  val curTime = System.currentTimeMillis
+  val t1 = curTime + 0
+  val t2 = curTime + 1
+  val t3 = curTime + 2
+  val t4 = curTime + 3
+  val t5 = curTime + 4
+
+  val tcRunner = new CrudTestRunner()
+  test("1: [t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test") {
+    val tcNum = 1
     tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) 
test "
 
     bulkQueries = List(
@@ -50,8 +50,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
-    tcNum = 2
+  }
+  test("2: [t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test") {
+    val tcNum = 2
     tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) 
test "
     bulkQueries = List(
       (t1, "insert", "{\"time\": 10}"),
@@ -60,8 +61,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
-    tcNum = 3
+  }
+  test("3: [t3 -> t2 -> t1 test case] insert(t3) delete(t2) insert(t1) test") {
+    val tcNum = 3
     tcString = "[t3 -> t2 -> t1 test case] insert(t3) delete(t2) insert(t1) 
test "
     bulkQueries = List(
       (t3, "insert", "{\"time\": 10, \"weight\": 20}"),
@@ -70,8 +72,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
-    tcNum = 4
+  }
+  test("4: [t3 -> t1 -> t2 test case] insert(t3) insert(t1) delete(t2) test") {
+    val tcNum = 4
     tcString = "[t3 -> t1 -> t2 test case] insert(t3) insert(t1) delete(t2) 
test "
     bulkQueries = List(
       (t3, "insert", "{\"time\": 10, \"weight\": 20}"),
@@ -80,8 +83,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
-    tcNum = 5
+  }
+  test("5: [t2 -> t1 -> t3 test case] delete(t2) insert(t1) insert(t3) test") {
+    val tcNum = 5
     tcString = "[t2 -> t1 -> t3 test case] delete(t2) insert(t1) insert(t3) 
test"
     bulkQueries = List(
       (t2, "delete", ""),
@@ -90,8 +94,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
-    tcNum = 6
+  }
+  test("6: [t2 -> t3 -> t1 test case] delete(t2) insert(t3) insert(t1) test") {
+    val tcNum = 6
     tcString = "[t2 -> t3 -> t1 test case] delete(t2) insert(t3) insert(t1) 
test "
     bulkQueries = List(
       (t2, "delete", ""),
@@ -100,8 +105,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
-    tcNum = 7
+  }
+  test("7: [t1 -> t2 -> t3 test case] update(t1) delete(t2) update(t3) test ") 
{
+    val tcNum = 7
     tcString = "[t1 -> t2 -> t3 test case] update(t1) delete(t2) update(t3) 
test "
     bulkQueries = List(
       (t1, "update", "{\"time\": 10}"),
@@ -110,7 +116,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-    tcNum = 8
+  }
+  test("8: [t1 -> t3 -> t2 test case] update(t1) update(t3) delete(t2) test ") 
{
+    val tcNum = 8
     tcString = "[t1 -> t3 -> t2 test case] update(t1) update(t3) delete(t2) 
test "
     bulkQueries = List(
       (t1, "update", "{\"time\": 10}"),
@@ -119,7 +127,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-    tcNum = 9
+  }
+  test("9: [t2 -> t1 -> t3 test case] delete(t2) update(t1) update(t3) test") {
+    val tcNum = 9
     tcString = "[t2 -> t1 -> t3 test case] delete(t2) update(t1) update(t3) 
test "
     bulkQueries = List(
       (t2, "delete", ""),
@@ -128,7 +138,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-    tcNum = 10
+  }
+  test("10: [t2 -> t3 -> t1 test case] delete(t2) update(t3) update(t1) test") 
{
+    val tcNum = 10
     tcString = "[t2 -> t3 -> t1 test case] delete(t2) update(t3) update(t1) 
test"
     bulkQueries = List(
       (t2, "delete", ""),
@@ -137,7 +149,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-    tcNum = 11
+  }
+  test("11: [t3 -> t2 -> t1 test case] update(t3) delete(t2) update(t1) test") 
{
+    val tcNum = 11
     tcString = "[t3 -> t2 -> t1 test case] update(t3) delete(t2) update(t1) 
test "
     bulkQueries = List(
       (t3, "update", "{\"time\": 10, \"weight\": 20}"),
@@ -146,7 +160,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-    tcNum = 12
+  }
+  test("12: [t3 -> t1 -> t2 test case] update(t3) update(t1) delete(t2) test") 
{
+    val tcNum = 12
     tcString = "[t3 -> t1 -> t2 test case] update(t3) update(t1) delete(t2) 
test "
     bulkQueries = List(
       (t3, "update", "{\"time\": 10, \"weight\": 20}"),
@@ -155,8 +171,9 @@ class CrudTest extends IntegrateCommon {
     expected = Map("time" -> "10", "weight" -> "20")
 
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
-
-    tcNum = 13
+  }
+  test("13: [t5 -> t1 -> t3 -> t2 -> t4 test case] update(t5) insert(t1) 
insert(t3) delete(t2) update(t4) test ") {
+    val tcNum = 13
     tcString = "[t5 -> t1 -> t3 -> t2 -> t4 test case] update(t5) insert(t1) 
insert(t3) delete(t2) update(t4) test "
     bulkQueries = List(
       (t5, "update", "{\"is_blocked\": true}"),
@@ -169,6 +186,15 @@ class CrudTest extends IntegrateCommon {
     tcRunner.run(tcNum, tcString, bulkQueries, expected)
   }
 
+  test("14 - test lock expire") {
+    for {
+      labelName <- List(testLabelName, testLabelName2)
+    } {
+      val id = 0
+      tcRunner.expireTC(labelName, id)
+    }
+  }
+
 
   object CrudHelper {
 
@@ -191,7 +217,7 @@ class CrudTest extends IntegrateCommon {
           val bulkEdges = (for ((ts, op, props) <- opWithProps) yield {
             TestUtil.toEdge(ts, op, "e", srcId, tgtId, labelName, props)
           })
-
+          println(s"${bulkEdges.mkString("\n")}")
           TestUtil.insertEdgesSync(bulkEdges: _*)
 
           for {
@@ -210,6 +236,7 @@ class CrudTest extends IntegrateCommon {
             val jsResult = TestUtil.getEdgesSync(query)
 
             val results = jsResult \ "results"
+
             val deegrees = (jsResult \ "degrees").as[List[JsObject]]
             val propsLs = (results \\ "props").seq
             (deegrees.head \ LabelMeta.degree.name).as[Int] should be(1)
@@ -229,6 +256,63 @@ class CrudTest extends IntegrateCommon {
         }
       }
 
+      def expireTC(labelName: String, id: Int) = {
+        var i = 1
+        val label = Label.findByName(labelName).get
+        val serviceName = label.serviceName
+        val columnName = label.srcColumnName
+        val id = 0
+
+        while (i < 1000) {
+          val bulkEdges = Seq(TestUtil.toEdge(i, "u", "e", id, id, 
testLabelName, Json.obj("time" -> 10).toString()))
+          val rets = TestUtil.insertEdgesSync(bulkEdges: _*)
+
+
+          val queryJson = querySnapshotEdgeJson(serviceName, columnName, 
labelName, id)
+
+          if (!rets.forall(identity)) {
+            Thread.sleep(graph.storage.LockExpireDuration + 100)
+            /** expect current request would be ignored */
+            val bulkEdges = Seq(TestUtil.toEdge(i-1, "u", "e", 0, 0, 
testLabelName, Json.obj("time" -> 20).toString()))
+            val rets = TestUtil.insertEdgesSync(bulkEdges: _*)
+            if (rets.forall(identity)) {
+              // check
+              val jsResult = TestUtil.getEdgesSync(queryJson)
+              (jsResult \\ "time").head.as[Int] should be(10)
+              logger.debug(jsResult)
+              i = 100000
+            }
+          }
+
+          i += 1
+        }
+
+        i = 1
+        while (i < 1000) {
+          val bulkEdges = Seq(TestUtil.toEdge(i, "u", "e", id, id, 
testLabelName, Json.obj("time" -> 10).toString()))
+          val rets = TestUtil.insertEdgesSync(bulkEdges: _*)
+
+
+          val queryJson = querySnapshotEdgeJson(serviceName, columnName, 
labelName, id)
+
+          if (!rets.forall(identity)) {
+            Thread.sleep(graph.storage.LockExpireDuration + 100)
+            /** expect current request would be applied */
+            val bulkEdges = Seq(TestUtil.toEdge(i+1, "u", "e", 0, 0, 
testLabelName, Json.obj("time" -> 20).toString()))
+            val rets = TestUtil.insertEdgesSync(bulkEdges: _*)
+            if (rets.forall(identity)) {
+              // check
+              val jsResult = TestUtil.getEdgesSync(queryJson)
+              (jsResult \\ "time").head.as[Int] should be(20)
+              logger.debug(jsResult)
+              i = 100000
+            }
+          }
+
+          i += 1
+        }
+      }
+
       def queryJson(serviceName: String, columnName: String, labelName: 
String, id: String, dir: String, cacheTTL: Long = -1L) = Json.parse(
         s""" { "srcVertices": [
              { "serviceName": "$serviceName",
@@ -240,6 +324,15 @@ class CrudTest extends IntegrateCommon {
              "offset": 0,
              "limit": 10,
              "cacheTTL": $cacheTTL }]]}""")
+
+      def querySnapshotEdgeJson(serviceName: String, columnName: String, 
labelName: String, id: Int) = Json.parse(
+        s""" { "srcVertices": [
+             { "serviceName": "$serviceName",
+               "columnName": "$columnName",
+               "id": $id } ],
+             "steps": [ [ {
+             "label": "$labelName",
+             "_to": $id }]]}""")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
index 225d396..b341ec5 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
@@ -43,7 +43,7 @@ trait IntegrateCommon extends FunSuite with Matchers with 
BeforeAndAfterAll {
     config = ConfigFactory.load()
     graph = new Graph(config)(ExecutionContext.Implicits.global)
     management = new Management(graph)
-    parser = new RequestParser(graph.config)
+    parser = new RequestParser(graph)
     initTestData()
   }
 
@@ -120,7 +120,8 @@ trait IntegrateCommon extends FunSuite with Matchers with 
BeforeAndAfterAll {
     def deleteAllSync(jsValue: JsValue) = {
       val future = Future.sequence(jsValue.as[Seq[JsValue]] map { json =>
         val (labels, direction, ids, ts, vertices) = parser.toDeleteParam(json)
-        val future = graph.deleteAllAdjacentEdges(vertices.toList, labels, 
GraphUtil.directions(direction), ts)
+        val srcVertices = vertices
+        val future = graph.deleteAllAdjacentEdges(srcVertices.toList, labels, 
GraphUtil.directions(direction), ts)
 
         future
       })
@@ -131,10 +132,13 @@ trait IntegrateCommon extends FunSuite with Matchers with 
BeforeAndAfterAll {
     def getEdgesSync(queryJson: JsValue): JsValue = {
       logger.info(Json.prettyPrint(queryJson))
       val restHandler = new RestHandler(graph)
-      
Await.result(restHandler.getEdgesAsync(queryJson)(PostProcess.toSimpleVertexArrJson),
 HttpRequestWaitingTime)
+      val result = 
Await.result(restHandler.getEdgesAsync(queryJson)(PostProcess.toJson), 
HttpRequestWaitingTime)
+      logger.debug(s"${Json.prettyPrint(result)}")
+      result
     }
 
     def insertEdgesSync(bulkEdges: String*) = {
+      logger.debug(s"${bulkEdges.mkString("\n")}")
       val req = 
graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait 
= true)
       val jsResult = Await.result(req, HttpRequestWaitingTime)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
index 9c52b32..54bb12c 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
@@ -307,7 +307,7 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
               "label": "$testLabelName",
               "direction": "in",
               "offset": 0,
-              "limit": 10
+              "limit": 1000
             }
           ]]
         }""".stripMargin)
@@ -328,54 +328,54 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
 
 
 
-//  test("pagination and _to") {
-//    def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: 
Int) = Json.parse(
-//      s"""
-//        { "srcVertices": [
-//          { "serviceName": "${testServiceName}",
-//            "columnName": "${testColumnName}",
-//            "id": ${id}
-//           }],
-//          "steps": [
-//          [ {
-//              "label": "${testLabelName}",
-//              "direction": "out",
-//              "offset": $offset,
-//              "limit": $limit,
-//              "_to": $to
-//            }
-//          ]]
-//        }
-//        """)
-//
-//    val src = System.currentTimeMillis().toInt
-//
-//    val bulkEdges = Seq(
-//      toEdge(1001, insert, e, src, 1, testLabelName, Json.obj(weight -> 10, 
is_hidden -> true)),
-//      toEdge(2002, insert, e, src, 2, testLabelName, Json.obj(weight -> 20, 
is_hidden -> false)),
-//      toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)),
-//      toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40))
-//    )
-//    insertEdgesSync(bulkEdges: _*)
-//
-//    var result = getEdgesSync(querySingle(src, offset = 0, limit = 2))
-//    var edges = (result \ "results").as[List[JsValue]]
-//
-//    edges.size should be(2)
-//    (edges(0) \ "to").as[Long] should be(4)
-//    (edges(1) \ "to").as[Long] should be(3)
-//
-//    result = getEdgesSync(querySingle(src, offset = 1, limit = 2))
-//
-//    edges = (result \ "results").as[List[JsValue]]
-//    edges.size should be(2)
-//    (edges(0) \ "to").as[Long] should be(3)
-//    (edges(1) \ "to").as[Long] should be(2)
-//
-//    result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to 
= 1))
-//    edges = (result \ "results").as[List[JsValue]]
-//    edges.size should be(1)
-//  }
+  test("pagination and _to") {
+    def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: Int) 
= Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "${testServiceName}",
+            "columnName": "${testColumnName}",
+            "id": ${id}
+           }],
+          "steps": [
+          [ {
+              "label": "${testLabelName}",
+              "direction": "out",
+              "offset": $offset,
+              "limit": $limit,
+              "_to": $to
+            }
+          ]]
+        }
+        """)
+
+    val src = System.currentTimeMillis().toInt
+
+    val bulkEdges = Seq(
+      toEdge(1001, insert, e, src, 1, testLabelName, Json.obj(weight -> 10, 
is_hidden -> true)),
+      toEdge(2002, insert, e, src, 2, testLabelName, Json.obj(weight -> 20, 
is_hidden -> false)),
+      toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)),
+      toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40))
+    )
+    insertEdgesSync(bulkEdges: _*)
+
+    var result = getEdgesSync(querySingle(src, offset = 0, limit = 2))
+    var edges = (result \ "results").as[List[JsValue]]
+
+    edges.size should be(2)
+    (edges(0) \ "to").as[Long] should be(4)
+    (edges(1) \ "to").as[Long] should be(3)
+
+    result = getEdgesSync(querySingle(src, offset = 1, limit = 2))
+
+    edges = (result \ "results").as[List[JsValue]]
+    edges.size should be(2)
+    (edges(0) \ "to").as[Long] should be(3)
+    (edges(1) \ "to").as[Long] should be(2)
+
+    result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to = 
1))
+    edges = (result \ "results").as[List[JsValue]]
+    edges.size should be(1)
+  }
 
   test("order by") {
     def queryScore(id: Int, scoring: Map[String, Int]): JsValue = Json.obj(
@@ -907,7 +907,7 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
        """.stripMargin
     )
 
-    def querySingleVertexWithOp(id: String, op: String, shrinkageVal: Long) = 
Json.parse(
+    def queryWithOp(ids: Seq[String], op: String, shrinkageVal: Long) = 
Json.parse(
       s"""{
          |  "limit": 10,
          |  "groupBy": ["from"],
@@ -916,7 +916,7 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
          |    {
          |      "serviceName": "$testServiceName",
          |      "columnName": "$testColumnName",
-         |      "id": $id
+         |      "ids": [${ids.mkString(",")}]
          |    }
          |  ],
          |  "steps": [
@@ -949,47 +949,6 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
        """.stripMargin
     )
 
-    def queryMultiVerticesWithOp(id: String, id2: String, op: String, 
shrinkageVal: Long) = Json.parse(
-      s"""{
-         |  "limit": 10,
-         |  "groupBy": ["from"],
-         |  "duplicate": "sum",
-         |  "srcVertices": [
-         |    {
-         |      "serviceName": "$testServiceName",
-         |      "columnName": "$testColumnName",
-         |      "ids": [$id, $id2]
-         |    }
-         |  ],
-         |  "steps": [
-         |    {
-         |      "step": [
-         |        {
-         |          "label": "$testLabelName",
-         |          "direction": "out",
-         |          "offset": 0,
-         |          "limit": 10,
-         |          "groupBy": ["from"],
-         |          "duplicate": "countSum",
-         |          "transform": [["_from"]]
-         |        }
-         |      ]
-         |    }, {
-         |      "step": [
-         |        {
-         |          "label": "$testLabelName2",
-         |          "direction": "out",
-         |          "offset": 0,
-         |          "limit": 10,
-         |          "scorePropagateOp": "$op",
-         |          "scorePropagateShrinkage": $shrinkageVal
-         |        }
-         |      ]
-         |    }
-         |  ]
-         |}
-       """.stripMargin
-    )
     val testId = "-30000"
     val testId2 = "-4000"
 
@@ -1014,15 +973,15 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
     val secondStepEdgeCount = 4l
 
     var shrinkageVal = 10l
-    var rs = getEdgesSync(querySingleVertexWithOp(testId, "divide", 
shrinkageVal))
-    logger.debug(Json.prettyPrint(rs))
+    var rs = getEdgesSync(queryWithOp(Seq(testId), "divide", shrinkageVal))
+
     var results = (rs \ "results").as[List[JsValue]]
     results.size should be(1)
     var scoreSum = secondStepEdgeCount.toDouble / (firstStepEdgeCount.toDouble 
+ shrinkageVal)
     (results(0) \ "scoreSum").as[Double] should be(scoreSum)
 
-    rs = getEdgesSync(queryMultiVerticesWithOp(testId, testId2, "divide", 
shrinkageVal))
-    logger.debug(Json.prettyPrint(rs))
+    rs = getEdgesSync(queryWithOp(Seq(testId, testId2), "divide", 
shrinkageVal))
+
     results = (rs \ "results").as[List[JsValue]]
     results.size should be(2)
     scoreSum = secondStepEdgeCount.toDouble / (firstStepEdgeCount.toDouble + 
shrinkageVal)
@@ -1033,21 +992,21 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
     // check for divide zero case
     shrinkageVal = 30l
     rs = getEdgesSync(queryWithPropertyOp(testId, "divide", shrinkageVal))
-    logger.debug(Json.prettyPrint(rs))
+
     results = (rs \ "results").as[List[JsValue]]
     results.size should be(1)
     (results(0) \ "scoreSum").as[Double] should be(0)
 
     // "plus" operation
-    rs = getEdgesSync(querySingleVertexWithOp(testId, "plus", shrinkageVal))
-    logger.debug(Json.prettyPrint(rs))
+    rs = getEdgesSync(queryWithOp(Seq(testId), "plus", shrinkageVal))
+
     results = (rs \ "results").as[List[JsValue]]
     results.size should be(1)
     scoreSum = (firstStepEdgeCount + 1) * secondStepEdgeCount
     (results(0) \ "scoreSum").as[Long] should be(scoreSum)
 
     // "multiply" operation
-    rs = getEdgesSync(querySingleVertexWithOp(testId, "multiply", 
shrinkageVal))
+    rs = getEdgesSync(queryWithOp(Seq(testId), "multiply", shrinkageVal))
     logger.debug(Json.prettyPrint(rs))
     results = (rs \ "results").as[List[JsValue]]
     results.size should be(1)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala
 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala
index 99b56f7..6092fe4 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala
@@ -141,11 +141,12 @@ class StrongLabelDeleteTest extends IntegrateCommon {
   test("large degrees") {
     val labelName = testLabelName2
     val dir = "out"
+    val minSize = 0
     val maxSize = 100
     val deleteSize = 10
     val numOfConcurrentBatch = 100
-    val src = System.currentTimeMillis()
-    val tgts = (0 until maxSize).map { ith => src + ith }
+    val src = 1092983
+    val tgts = (minSize until maxSize).map { ith => src + ith }
     val deleteTgts = Random.shuffle(tgts).take(deleteSize)
     val insertRequests = tgts.map { tgt =>
       Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t")
@@ -181,11 +182,12 @@ class StrongLabelDeleteTest extends IntegrateCommon {
   test("deleteAll") {
     val labelName = testLabelName2
     val dir = "out"
-    val maxSize = 100
+    val minSize = 200
+    val maxSize = 300
     val deleteSize = 10
     val numOfConcurrentBatch = 100
-    val src = System.currentTimeMillis()
-    val tgts = (0 until maxSize).map { ith => src + ith }
+    val src = 192338237
+    val tgts = (minSize until maxSize).map { ith => src + ith }
     val deleteTgts = Random.shuffle(tgts).take(deleteSize)
     val insertRequests = tgts.map { tgt =>
       Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t")
@@ -220,7 +222,7 @@ class StrongLabelDeleteTest extends IntegrateCommon {
 //    val labelName = testLabelName
     val maxTgtId = 10
     val batchSize = 10
-    val testNum = 100
+    val testNum = 10
     val numOfBatch = 10
 
     def testInner(startTs: Long, src: Long) = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala
 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala
index a1bff68..603ca12 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala
@@ -31,6 +31,8 @@ class VertexTestHelper extends IntegrateCommon {
   import TestUtil._
   import VertexTestHelper._
 
+
+
   test("vertex") {
     val ids = (7 until 20).map(tcNum => tcNum * 1000 + 0)
     val (serviceName, columnName) = (testServiceName, testColumnName)
@@ -40,9 +42,10 @@ class VertexTestHelper extends IntegrateCommon {
     println(payload)
 
     val vertices = parser.toVertices(payload, "insert", Option(serviceName), 
Option(columnName))
-    Await.result(graph.mutateVertices(vertices, withWait = true), 
HttpRequestWaitingTime)
+    val srcVertices = vertices
+    Await.result(graph.mutateVertices(srcVertices, withWait = true), 
HttpRequestWaitingTime)
 
-    val res = graph.getVertices(vertices).map { vertices =>
+    val res = graph.getVertices(srcVertices).map { vertices =>
       PostProcess.verticesToJson(vertices)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
index 3f76d59..d62dee8 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,11 +33,12 @@ class WeakLabelDeleteTest extends IntegrateCommon with 
BeforeAndAfterEach {
   import WeakLabelDeleteHelper._
 
   test("test weak consistency select") {
+    insertEdgesSync(bulkEdges(): _*)
     var result = getEdgesSync(query(0))
-    println(result)
+
     (result \ "results").as[List[JsValue]].size should be(4)
     result = getEdgesSync(query(10))
-    println(result)
+
     (result \ "results").as[List[JsValue]].size should be(2)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
index 419e9c4..bab6e03 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
@@ -19,10 +19,11 @@
 
 package org.apache.s2graph.core
 
-import org.apache.s2graph.core.types.{InnerVal, InnerValLike}
 import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.types.{InnerVal, InnerValLike}
 import org.scalatest.{FunSuite, Matchers}
 
+
 class JsonParserTest extends FunSuite with Matchers with TestCommon {
 
   import InnerVal._

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala
index 06af38c..61d1096 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.types.LabelWithDirection
+import org.apache.s2graph.core.types.{InnerVal, InnerValLike, 
HBaseSerializable, LabelWithDirection}
 import org.scalatest.{FunSuite, Matchers}
 
 class QueryParamTest extends FunSuite with Matchers with TestCommon {
@@ -101,5 +101,59 @@ class QueryParamTest extends FunSuite with Matchers with 
TestCommon {
 
     println(s">> diff: $duration")
   }
+  test("QueryParam interval min/max bytes padding test") {
+    import HBaseSerializable._
+    val queryParam = QueryParam.Empty
+    def compare(_from: Seq[InnerValLike], _to: Seq[InnerValLike], _value: 
Seq[InnerValLike]): Boolean = {
+      val len = _from.length.toByte
 
+      val from = _from.zipWithIndex map { case (innerVal: InnerValLike, idx: 
Int) => idx.toByte -> innerVal }
+      val to = _to.zipWithIndex map { case (innerVal: InnerValLike, idx: Int) 
=> idx.toByte -> innerVal }
+      val value = _value.zipWithIndex map { case (innerVal: InnerValLike, idx: 
Int) => idx.toByte -> innerVal }
+
+      val (fromBytes, toBytes) = queryParam.paddingInterval(len, from, to)
+      val valueBytes = propsToBytes(value)
+
+      val validFrom = Bytes.compareTo(fromBytes, valueBytes) <= 0
+      val validTo = Bytes.compareTo(toBytes, valueBytes) >= 0
+
+      val res = validFrom && validTo
+      //      if (!res) logger.error(s"from: $validFrom, to: $validTo, from: 
${_from} to: ${_to} value: ${_value}")
+      res
+    }
+
+    val v = "v3"
+    compare(
+      Seq(InnerVal.withLong(0L, v)),
+      Seq(InnerVal.withLong(0L, v)),
+      Seq(InnerVal.withLong(0L, v))) shouldBe true
+
+    compare(
+      Seq(InnerVal.withLong(0L, v)),
+      Seq(InnerVal.withLong(0L, v)),
+      Seq(InnerVal.withLong(1L, v))) shouldBe false
+
+    compare(
+      Seq(InnerVal.withLong(1L, v)),
+      Seq(InnerVal.withLong(1L, v)),
+      Seq(InnerVal.withLong(0L, v))) shouldBe false
+
+    compare(
+      Seq(InnerVal.withLong(0L, v)),
+      Seq(InnerVal.withLong(1L, v)),
+      Seq(InnerVal.withLong(2L, v))) shouldBe false
+
+    val testNum = 100000
+    val tests = for {
+      n <- 0 to testNum
+      min = scala.util.Random.nextInt(Int.MaxValue / 2) + 1
+      max = min + scala.util.Random.nextInt(min)
+      value = min + scala.util.Random.nextInt(max - min + 1)
+    } yield compare(
+        Seq(InnerVal.withLong(min, v)),
+        Seq(InnerVal.withLong(max, v)),
+        Seq(InnerVal.withLong(value, v)))
+
+    tests.forall(identity) shouldBe true
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
index 33a901d..584a641 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
@@ -146,7 +146,7 @@ trait TestCommonWithModels {
       isDirected = true, serviceNameV4, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION4, false, "lg4", None)
 
     management.createLabel(undirectedLabelName, serviceName, columnName, 
columnType, serviceName, tgtColumnName, tgtColumnType,
-      isDirected = false, serviceName, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4", None)
+      isDirected = false, serviceName, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4", None)
 
     management.createLabel(undirectedLabelNameV2, serviceNameV2, columnNameV2, 
columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
       isDirected = false, serviceName, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4", None)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala
 
b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala
index 8ba9ea2..05dcd30 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala
@@ -23,41 +23,55 @@ import play.api.libs.json.JsNumber
 import play.libs.Json
 
 class JsonBenchmarkSpec extends BenchmarkCommon {
-  "to json" >> {
-    "json benchmark" >> {
 
-      duration("map to json") {
-        (0 to 10) foreach { n =>
-          val numberMaps = (0 to 100).map { n => (n.toString -> JsNumber(n * 
n)) }.toMap
-          Json.toJson(numberMaps)
-        }
+  "JsonBenchSpec" should {
+
+    "Json Append" >> {
+      import play.api.libs.json.{Json, _}
+      val numberJson = Json.toJson((0 to 1000).map { i => s"$i" -> JsNumber(i 
* i) }.toMap).as[JsObject]
+
+      /** dummy warm-up **/
+      (0 to 10000) foreach { n =>
+        Json.obj(s"$n" -> "dummy") ++ numberJson
+      }
+      (0 to 10000) foreach { n =>
+        Json.obj(s"$n" -> numberJson)
       }
 
-      duration("directMakeJson") {
-        (0 to 10) foreach { n =>
-          var jsObj = play.api.libs.json.Json.obj()
-          (0 to 10).foreach { n =>
-            jsObj += (n.toString -> JsNumber(n * n))
-          }
+      duration("Append by JsObj ++ JsObj ") {
+        (0 to 100000) foreach { n =>
+          numberJson ++ Json.obj(s"$n" -> "dummy")
         }
       }
 
-      duration("map to json 2") {
-        (0 to 50) foreach { n =>
-          val numberMaps = (0 to 10).map { n => (n.toString -> JsNumber(n * 
n)) }.toMap
-          Json.toJson(numberMaps)
+      duration("Append by Json.obj(newJson -> JsObj)") {
+        (0 to 100000) foreach { n =>
+          Json.obj(s"$n" -> numberJson)
         }
       }
+      true
+    }
+  }
 
-      duration("directMakeJson 2") {
-        (0 to 50) foreach { n =>
-          var jsObj = play.api.libs.json.Json.obj()
-          (0 to 10).foreach { n =>
-            jsObj += (n.toString -> JsNumber(n * n))
-          }
+  "Make Json" >> {
+    duration("map to json") {
+      (0 to 10000) foreach { n =>
+        val numberMaps = (0 to 100).map { n =>
+          n.toString -> JsNumber(n * n)
+        }.toMap
+
+        Json.toJson(numberMaps)
+      }
+    }
+
+    duration("direct") {
+      (0 to 10000) foreach { n =>
+        var jsObj = play.api.libs.json.Json.obj()
+
+        (0 to 100).foreach { n =>
+          jsObj += (n.toString -> JsNumber(n * n))
         }
       }
-      true
     }
     true
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala
 
b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala
index a8777fb..ec19641 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala
@@ -1,102 +1,105 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.benchmark
-
-import scala.annotation.tailrec
-import scala.util.Random
-
-class SamplingBenchmarkSpec extends BenchmarkCommon {
-  "sample" should {
-
-    "sample benchmark" in {
-      @tailrec
-      def randomInt(n: Int, range: Int, set: Set[Int] = Set.empty[Int]): 
Set[Int] = {
-        if (set.size == n) set
-        else randomInt(n, range, set + Random.nextInt(range))
-      }
-
-      // sample using random array
-      def randomArraySample[T](num: Int, ls: List[T]): List[T] = {
-        val randomNum = randomInt(num, ls.size)
-        var sample = List.empty[T]
-        var idx = 0
-        ls.foreach { e =>
-          if (randomNum.contains(idx)) sample = e :: sample
-          idx += 1
-        }
-        sample
-      }
-
-      // sample using shuffle
-      def shuffleSample[T](num: Int, ls: List[T]): List[T] = {
-        Random.shuffle(ls).take(num)
-      }
-
-      // sample using random number generation
-      def rngSample[T](num: Int, ls: List[T]): List[T] = {
-        var sampled = List.empty[T]
-        val N = ls.size // population
-        var t = 0 // total input records dealt with
-        var m = 0 // number of items selected so far
-
-        while (m < num) {
-          val u = Random.nextDouble()
-          if ((N - t) * u < num - m) {
-            sampled = ls(t) :: sampled
-            m += 1
-          }
-          t += 1
-        }
-        sampled
-      }
-
-      // test data
-      val testLimit = 10000
-      val testNum = 10
-      val testData = (0 to 1000).toList
-
-      // dummy for warm-up
-      (0 to testLimit) foreach { n =>
-        randomArraySample(testNum, testData)
-        shuffleSample(testNum, testData)
-        rngSample(testNum, testData)
-      }
-
-      duration("Random Array Sampling") {
-        (0 to testLimit) foreach { _ =>
-          val sampled = randomArraySample(testNum, testData)
-        }
-      }
-
-      duration("Shuffle Sampling") {
-        (0 to testLimit) foreach { _ =>
-          val sampled = shuffleSample(testNum, testData)
-        }
-      }
-
-      duration("RNG Sampling") {
-        (0 to testLimit) foreach { _ =>
-          val sampled = rngSample(testNum, testData)
-        }
-      }
-      true
-    }
-  }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *   http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied.  See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// */
+//
+//package org.apache.s2graph.rest.play.benchmark
+//
+//import play.api.test.{FakeApplication, PlaySpecification, WithApplication}
+//
+//import scala.annotation.tailrec
+//import scala.util.Random
+//
+//class SamplingBenchmarkSpec extends BenchmarkCommon with PlaySpecification {
+//  "sample" should {
+//    implicit val app = FakeApplication()
+//
+//    "sample benchmark" in new WithApplication(app) {
+//      @tailrec
+//      def randomInt(n: Int, range: Int, set: Set[Int] = Set.empty[Int]): 
Set[Int] = {
+//        if (set.size == n) set
+//        else randomInt(n, range, set + Random.nextInt(range))
+//      }
+//
+//      // sample using random array
+//      def randomArraySample[T](num: Int, ls: List[T]): List[T] = {
+//        val randomNum = randomInt(num, ls.size)
+//        var sample = List.empty[T]
+//        var idx = 0
+//        ls.foreach { e =>
+//          if (randomNum.contains(idx)) sample = e :: sample
+//          idx += 1
+//        }
+//        sample
+//      }
+//
+//      // sample using shuffle
+//      def shuffleSample[T](num: Int, ls: List[T]): List[T] = {
+//        Random.shuffle(ls).take(num)
+//      }
+//
+//      // sample using random number generation
+//      def rngSample[T](num: Int, ls: List[T]): List[T] = {
+//        var sampled = List.empty[T]
+//        val N = ls.size // population
+//        var t = 0 // total input records dealt with
+//        var m = 0 // number of items selected so far
+//
+//        while (m < num) {
+//          val u = Random.nextDouble()
+//          if ((N - t) * u < num - m) {
+//            sampled = ls(t) :: sampled
+//            m += 1
+//          }
+//          t += 1
+//        }
+//        sampled
+//      }
+//
+//      // test data
+//      val testLimit = 10000
+//      val testNum = 10
+//      val testData = (0 to 1000).toList
+//
+//      // dummy for warm-up
+//      (0 to testLimit) foreach { n =>
+//        randomArraySample(testNum, testData)
+//        shuffleSample(testNum, testData)
+//        rngSample(testNum, testData)
+//      }
+//
+//      duration("Random Array Sampling") {
+//        (0 to testLimit) foreach { _ =>
+//          val sampled = randomArraySample(testNum, testData)
+//        }
+//      }
+//
+//      duration("Shuffle Sampling") {
+//        (0 to testLimit) foreach { _ =>
+//          val sampled = shuffleSample(testNum, testData)
+//        }
+//      }
+//
+//      duration("RNG Sampling") {
+//        (0 to testLimit) foreach { _ =>
+//          val sampled = rngSample(testNum, testData)
+//        }
+//      }
+//      true
+//    }
+//  }
+//}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala 
b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
index a654a83..c8f65bf 100644
--- a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
+++ b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
@@ -25,7 +25,7 @@ import com.typesafe.config.ConfigFactory
 import io.netty.bootstrap.ServerBootstrap
 import io.netty.buffer.{ByteBuf, Unpooled}
 import io.netty.channel._
-import io.netty.channel.epoll.{EpollServerSocketChannel, EpollEventLoopGroup}
+import io.netty.channel.epoll.{EpollEventLoopGroup, EpollServerSocketChannel}
 import io.netty.channel.nio.NioEventLoopGroup
 import io.netty.channel.socket.SocketChannel
 import io.netty.channel.socket.nio.NioServerSocketChannel
@@ -36,14 +36,14 @@ import io.netty.util.CharsetUtil
 import org.apache.s2graph.core.GraphExceptions.BadQueryException
 import org.apache.s2graph.core.mysqls.Experiment
 import org.apache.s2graph.core.rest.RestHandler
-import org.apache.s2graph.core.rest.RestHandler.HandlerResult
+import org.apache.s2graph.core.rest.RestHandler.{CanLookup, HandlerResult}
 import org.apache.s2graph.core.utils.Extensions._
 import org.apache.s2graph.core.utils.logger
 import org.apache.s2graph.core.{Graph, JSONParser, PostProcess}
 import play.api.libs.json._
 
 import scala.collection.mutable
-import scala.concurrent.ExecutionContext
+import scala.concurrent.{ExecutionContext, Future}
 import scala.io.Source
 import scala.util.{Failure, Success, Try}
 import scala.language.existentials
@@ -58,6 +58,10 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: 
ExecutionContext) extends
   val NotFound = HttpResponseStatus.NOT_FOUND
   val InternalServerError = HttpResponseStatus.INTERNAL_SERVER_ERROR
 
+  implicit val nettyHeadersLookup = new CanLookup[HttpHeaders] {
+    override def lookup(m: HttpHeaders, key: String) = Option(m.get(key))
+  }
+
   def badRoute(ctx: ChannelHandlerContext) =
     simpleResponse(ctx, BadGateway, byteBufOpt = None, 
channelFutureListenerOpt = CloseOpt)
 
@@ -82,7 +86,7 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: 
ExecutionContext) extends
     }
   }
 
-  def toResponse(ctx: ChannelHandlerContext, req: FullHttpRequest, 
requestBody: JsValue, result: HandlerResult, startedAt: Long) = {
+  def toResponse(ctx: ChannelHandlerContext, req: FullHttpRequest, 
requestBody: String, result: HandlerResult, startedAt: Long) = {
     var closeOpt = CloseOpt
     var headers = mutable.ArrayBuilder.make[(String, String)]
 
@@ -119,43 +123,68 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: 
ExecutionContext) extends
     }
   }
 
+  private def healthCheck(ctx: ChannelHandlerContext)(predicate: Boolean): 
Unit = {
+    if (predicate) {
+      val healthCheckMsg = Unpooled.copiedBuffer(NettyServer.deployInfo, 
CharsetUtil.UTF_8)
+      simpleResponse(ctx, Ok, byteBufOpt = Option(healthCheckMsg), 
channelFutureListenerOpt = CloseOpt)
+    } else {
+      simpleResponse(ctx, NotFound, channelFutureListenerOpt = CloseOpt)
+    }
+  }
+
+  private def updateHealthCheck(ctx: ChannelHandlerContext)(newValue: 
Boolean)(updateOp: Boolean => Unit): Unit = {
+    updateOp(newValue)
+    val newHealthCheckMsg = Unpooled.copiedBuffer(newValue.toString, 
CharsetUtil.UTF_8)
+    simpleResponse(ctx, Ok, byteBufOpt = Option(newHealthCheckMsg), 
channelFutureListenerOpt = CloseOpt)
+  }
+
   override def channelRead0(ctx: ChannelHandlerContext, req: FullHttpRequest): 
Unit = {
     val uri = req.getUri
     val startedAt = System.currentTimeMillis()
-
+    val checkFunc = healthCheck(ctx) _
+    val updateFunc = updateHealthCheck(ctx) _
     req.getMethod match {
       case HttpMethod.GET =>
         uri match {
-          case "/health_check.html" =>
-            if (NettyServer.isHealthy) {
-              val healthCheckMsg = 
Unpooled.copiedBuffer(NettyServer.deployInfo, CharsetUtil.UTF_8)
-              simpleResponse(ctx, Ok, byteBufOpt = Option(healthCheckMsg), 
channelFutureListenerOpt = CloseOpt)
+          case "/health_check.html" => checkFunc(NettyServer.isHealthy)
+          case "/fallback_check.html" => 
checkFunc(NettyServer.isFallbackHealthy)
+          case "/query_fallback_check.html" => 
checkFunc(NettyServer.isQueryFallbackHealthy)
+          case s if s.startsWith("/graphs/getEdge/") =>
+            if (!NettyServer.isQueryFallbackHealthy) {
+              val result = HandlerResult(body = 
Future.successful(PostProcess.emptyResults))
+              toResponse(ctx, req, s, result, startedAt)
             } else {
-              simpleResponse(ctx, NotFound, channelFutureListenerOpt = 
CloseOpt)
+              val Array(srcId, tgtId, labelName, direction) = 
s.split("/").takeRight(4)
+              val params = Json.arr(Json.obj("label" -> labelName, "direction" 
-> direction, "from" -> srcId, "to" -> tgtId))
+              val result = s2rest.checkEdges(params)
+              toResponse(ctx, req, s, result, startedAt)
             }
-
-          case s if s.startsWith("/graphs/getEdge/") =>
-            // src, tgt, label, dir
-            val Array(srcId, tgtId, labelName, direction) = 
s.split("/").takeRight(4)
-            val params = Json.arr(Json.obj("label" -> labelName, "direction" 
-> direction, "from" -> srcId, "to" -> tgtId))
-            val result = s2rest.checkEdges(params)
-            toResponse(ctx, req, params, result, startedAt)
           case _ => badRoute(ctx)
         }
 
       case HttpMethod.PUT =>
         if (uri.startsWith("/health_check/")) {
-          val newHealthCheck = uri.split("/").last.toBoolean
-          NettyServer.isHealthy = newHealthCheck
-          val newHealthCheckMsg = 
Unpooled.copiedBuffer(NettyServer.isHealthy.toString, CharsetUtil.UTF_8)
-          simpleResponse(ctx, Ok, byteBufOpt = Option(newHealthCheckMsg), 
channelFutureListenerOpt = CloseOpt)
-        } else badRoute(ctx)
+          val newValue = uri.split("/").last.toBoolean
+          updateFunc(newValue) { v => NettyServer.isHealthy = v }
+        } else if (uri.startsWith("/query_fallback_check/")) {
+          val newValue = uri.split("/").last.toBoolean
+          updateFunc(newValue) { v => NettyServer.isQueryFallbackHealthy = v }
+        } else if (uri.startsWith("/fallback_check/")) {
+          val newValue = uri.split("/").last.toBoolean
+          updateFunc(newValue) { v => NettyServer.isFallbackHealthy = v }
+        } else {
+          badRoute(ctx)
+        }
 
       case HttpMethod.POST =>
         val body = req.content.toString(CharsetUtil.UTF_8)
-
-        val result = s2rest.doPost(uri, body, 
Option(req.headers().get(Experiment.impressionKey)))
-        toResponse(ctx, req, Json.parse(body), result, startedAt)
+        if (!NettyServer.isQueryFallbackHealthy) {
+          val result = HandlerResult(body = 
Future.successful(PostProcess.emptyResults))
+          toResponse(ctx, req, body, result, startedAt)
+        } else {
+          val result = s2rest.doPost(uri, body, req.headers())
+          toResponse(ctx, req, body, result, startedAt)
+        }
 
       case _ =>
         simpleResponse(ctx, BadRequest, byteBufOpt = None, 
channelFutureListenerOpt = CloseOpt)
@@ -163,9 +192,14 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: 
ExecutionContext) extends
   }
 
   override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
-    cause.printStackTrace()
-    logger.error(s"exception on query.", cause)
-    simpleResponse(ctx, BadRequest, byteBufOpt = None, 
channelFutureListenerOpt = CloseOpt)
+    cause match {
+      case e: java.io.IOException =>
+        ctx.channel().close().addListener(CloseOpt.get)
+      case _ =>
+        cause.printStackTrace()
+        logger.error(s"exception on query.", cause)
+        simpleResponse(ctx, BadRequest, byteBufOpt = None, 
channelFutureListenerOpt = CloseOpt)
+    }
   }
 }
 
@@ -178,6 +212,7 @@ object NettyServer {
   val config = ConfigFactory.load()
   val port = Try(config.getInt("http.port")).recover { case _ => 9000 }.get
   val transport = Try(config.getString("netty.transport")).recover { case _ => 
"jdk" }.get
+  val maxBodySize = Try(config.getInt("max.body.size")).recover { case _ => 
65536 * 2 }.get
 
   // init s2graph with config
   val s2graph = new Graph(config)(ec)
@@ -185,6 +220,8 @@ object NettyServer {
 
   val deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover 
{ case _ => "release info not found\n" }.get
   var isHealthy = config.getBooleanWithFallback("app.health.on", true)
+  var isFallbackHealthy = true
+  var isQueryFallbackHealthy = true
 
   logger.info(s"starts with num of thread: $numOfThread, 
${threadPool.getClass.getSimpleName}")
   logger.info(s"transport: $transport")
@@ -201,14 +238,13 @@ object NettyServer {
     try {
       val b: ServerBootstrap = new ServerBootstrap()
         .option(ChannelOption.SO_BACKLOG, Int.box(2048))
-
       b.group(bossGroup, workerGroup).channel(channelClass)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer[SocketChannel] {
           override def initChannel(ch: SocketChannel) {
             val p = ch.pipeline()
             p.addLast(new HttpServerCodec())
-            p.addLast(new HttpObjectAggregator(65536))
+            p.addLast(new HttpObjectAggregator(maxBodySize))
             p.addLast(new S2RestHandler(rest)(ec))
           }
         })

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
index 30a5ee4..692ab1e 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
@@ -52,7 +52,7 @@ object Global extends WithFilters(new GzipFilter()) {
     // init s2graph with config
     s2graph = new Graph(config)(ec)
     storageManagement = new Management(s2graph)
-    s2parser = new RequestParser(s2graph.config) // merged config
+    s2parser = new RequestParser(s2graph) 
     s2rest = new RestHandler(s2graph)(ec)
 
     logger.info(s"starts with num of thread: $numOfThread, 
${threadPool.getClass.getSimpleName}")
@@ -83,7 +83,7 @@ object Global extends WithFilters(new GzipFilter()) {
     wallLogHandler.shutdown()
     QueueActor.shutdown()
 
-    /*
+    /**
      * shutdown hbase client for flush buffers.
      */
     shutdown()

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala
index 3c488fe..3c49954 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala
@@ -43,8 +43,12 @@ object Config {
   lazy val KAFKA_METADATA_BROKER_LIST = 
conf.getString("kafka.metadata.broker.list").getOrElse("localhost")
 
   lazy val KAFKA_LOG_TOPIC = s"s2graphIn${PHASE}"
+  lazy val KAFKA_LOG_TOPIC_JSON = s"s2graphIn${PHASE}Json"
   lazy val KAFKA_LOG_TOPIC_ASYNC = s"s2graphIn${PHASE}Async"
+  lazy val KAFKA_LOG_TOPIC_ASYNC_JSON = s"s2graphIn${PHASE}AsyncJson"
   lazy val KAFKA_FAIL_TOPIC = s"s2graphIn${PHASE}Failed"
+  lazy val KAFKA_FAIL_TOPIC_JSON = s"s2graphIn${PHASE}FailedJson"
+  lazy val KAFKA_MUTATE_FAIL_TOPIC = s"mutateFailed_${PHASE}"
 
   // is query or write
   lazy val IS_QUERY_SERVER = conf.getBoolean("is.query.server").getOrElse(true)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
index 13639b9..b32c16a 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
@@ -22,10 +22,10 @@ package org.apache.s2graph.rest.play.controllers
 import akka.util.ByteString
 import org.apache.s2graph.core.GraphExceptions.BadQueryException
 import org.apache.s2graph.core.PostProcess
+import org.apache.s2graph.core.rest.RestHandler.CanLookup
 import org.apache.s2graph.core.utils.logger
 import org.apache.s2graph.rest.play.config.Config
 import play.api.http.HttpEntity
-import play.api.libs.iteratee.Enumerator
 import play.api.libs.json.{JsString, JsValue}
 import play.api.mvc._
 
@@ -42,6 +42,10 @@ object ApplicationController extends Controller {
 
   val jsonText: BodyParser[String] = s2parse.jsonText
 
+  implicit val oneTupleLookup = new CanLookup[Headers] {
+    override def lookup(m: Headers, key: String) = m.get(key)
+  }
+
   private def badQueryExceptionResults(ex: Exception) =
     
Future.successful(BadRequest(PostProcess.badRequestResults(ex)).as(applicationJsonHeader))
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
index 53f3fce..b3ac89d 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
@@ -156,9 +156,11 @@ object CounterController extends Controller {
           useProfile = useProfile, bucketImpId, useRank = useRank, ttl, 
dailyTtl, Some(hbaseTable), intervalUnit,
           rateActionId, rateBaseId, rateThreshold)
 
-        // prepare exact storage
-        exactCounter(version).prepare(policy)
-        if (useRank) {
+        if (rateAction.isEmpty) {
+          // prepare exact storage
+          exactCounter(version).prepare(policy)
+        }
+        if (useRank || rateAction.isDefined) {
           // prepare ranking storage
           rankingCounter(version).prepare(policy)
         }
@@ -253,8 +255,11 @@ object CounterController extends Controller {
           // change table name
           val newTableName = Seq(tablePrefixMap(version), service, policy.ttl) 
++ policy.dailyTtl mkString "_"
           val newPolicy = policy.copy(version = version, hbaseTable = 
Some(newTableName))
-          exactCounter(version).prepare(newPolicy)
-          if (newPolicy.useRank) {
+
+          if (newPolicy.rateActionId.isEmpty) {
+            exactCounter(version).prepare(newPolicy)
+          }
+          if (newPolicy.useRank || newPolicy.rateActionId.isDefined) {
             rankingCounter(version).prepare(newPolicy)
           }
           Ok(Json.toJson(Map("msg" -> s"prepare storage v$version 
$service/$action")))
@@ -272,8 +277,10 @@ object CounterController extends Controller {
         policy <- counterModel.findByServiceAction(service, action, useCache = 
false)
       } yield {
         Try {
-          exactCounter(policy.version).destroy(policy)
-          if (policy.useRank) {
+          if (policy.rateActionId.isEmpty) {
+            exactCounter(policy.version).destroy(policy)
+          }
+          if (policy.useRank || policy.rateActionId.isDefined) {
             rankingCounter(policy.version).destroy(policy)
           }
           counterModel.deleteServiceAction(policy)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index da88c3d..8000cf8 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -19,6 +19,7 @@
 
 package org.apache.s2graph.rest.play.controllers
 
+import com.fasterxml.jackson.databind.JsonMappingException
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.Label
 import org.apache.s2graph.core.rest.RequestParser
@@ -30,19 +31,19 @@ import play.api.mvc.{Controller, Result}
 
 import scala.collection.Seq
 import scala.concurrent.Future
-import scala.util.Random
 
 object EdgeController extends Controller {
 
   import ApplicationController._
+  import ExceptionHandler._
   import play.api.libs.concurrent.Execution.Implicits._
 
   private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph
   private val requestParser: RequestParser = 
org.apache.s2graph.rest.play.Global.s2parser
   private val walLogHandler: ExceptionHandler = 
org.apache.s2graph.rest.play.Global.wallLogHandler
 
-  private def enqueue(topic: String, elem: GraphElement, tsv: String) = {
-    val kafkaMessage = ExceptionHandler.toKafkaMessage(topic, elem, 
Option(tsv))
+  private def enqueue(topic: String, elem: GraphElement, tsv: String, 
publishJson: Boolean = false) = {
+    val kafkaMessage = ExceptionHandler.toKafkaMessage(topic, elem, 
Option(tsv), publishJson)
     walLogHandler.enqueue(kafkaMessage)
   }
 
@@ -50,63 +51,129 @@ object EdgeController extends Controller {
     val kafkaTopic = toKafkaTopic(graphElem.isAsync)
 
     graphElem match {
-      case v: Vertex => enqueue(kafkaTopic, graphElem, tsv)
+      case v: Vertex =>
+        enqueue(kafkaTopic, graphElem, tsv)
       case e: Edge =>
         e.label.extraOptions.get("walLog") match {
-          case None => enqueue(kafkaTopic, e, tsv)
+          case None =>
+            enqueue(kafkaTopic, e, tsv)
           case Some(walLogOpt) =>
-            (walLogOpt \ "method").as[JsValue] match {
+            (walLogOpt \ "method").get match {
               case JsString("drop") => // pass
               case JsString("sample") =>
                 val rate = (walLogOpt \ "rate").as[Double]
-                if (scala.util.Random.nextDouble() < rate) enqueue(kafkaTopic, 
e, tsv)
-              case _ => enqueue(kafkaTopic, e, tsv)
+                if (scala.util.Random.nextDouble() < rate) {
+                  enqueue(kafkaTopic, e, tsv)
+                }
+              case _ =>
+                enqueue(kafkaTopic, e, tsv)
             }
         }
+      case _ => logger.error(s"Unknown graph element type: ${graphElem}")
     }
   }
 
+  private def toDeleteAllFailMessages(srcVertices: Seq[Vertex], labels: 
Seq[Label], dir: Int, ts: Long ) = {
+    for {
+      vertex <- srcVertices
+      id = vertex.id.toString
+      label <- labels
+    } yield {
+      val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", 
GraphUtil.fromOp(dir.toByte)).mkString("\t")
+      ExceptionHandler.toKafkaMessage(Config.KAFKA_MUTATE_FAIL_TOPIC, tsv)
+    }
+  }
+
+  private def publishFailTopic(kafkaMessages: Seq[KafkaMessage]): Unit ={
+    kafkaMessages.foreach(walLogHandler.enqueue)
+  }
+
+  def mutateElementsWithFailLog(elements: Seq[(GraphElement, String)]) ={
+    val result = s2.mutateElements(elements.map(_._1), true)
+    result onComplete { results =>
+      results.get.zip(elements).map {
+        case (false, (e: Edge, tsv: String)) =>
+          val kafkaMessages = if(e.op == GraphUtil.operations("deleteAll")){
+            toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.label), 
e.labelWithDir.dir, e.ts)
+          } else{
+            
Seq(ExceptionHandler.toKafkaMessage(Config.KAFKA_MUTATE_FAIL_TOPIC, e, 
Some(tsv)))
+          }
+          publishFailTopic(kafkaMessages)
+        case _ =>
+      }
+    }
+    result
+  }
+
   private def tryMutate(elementsWithTsv: Seq[(GraphElement, String)], 
withWait: Boolean): Future[Result] = {
     if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
     else {
-      try {
-        elementsWithTsv.foreach { case (graphElem, tsv) =>
-          publish(graphElem, tsv)
-        }
+      elementsWithTsv.foreach { case (graphElem, tsv) =>
+        publish(graphElem, tsv)
+      }
 
-        val elementsToStore = for {
-          (e, _tsv) <- elementsWithTsv if !skipElement(e.isAsync)
-        } yield e
-
-        if (elementsToStore.isEmpty) Future.successful(jsonResponse(JsArray()))
-        else {
-          if (withWait) {
-            val rets = s2.mutateElements(elementsToStore, withWait)
-            rets.map(Json.toJson(_)).map(jsonResponse(_))
-          } else {
-            val rets = elementsToStore.map { element => QueueActor.router ! 
element; true }
-            Future.successful(jsonResponse(Json.toJson(rets)))
+      if (elementsWithTsv.isEmpty) Future.successful(jsonResponse(JsArray()))
+      else {
+        val elementWithIdxs = elementsWithTsv.zipWithIndex
+        if (withWait) {
+          val (elementSync, elementAsync) = elementWithIdxs.partition { case 
((element, tsv), idx) =>
+            !skipElement(element.isAsync)
+          }
+          val retToSkip = elementAsync.map(_._2 -> true)
+          val elementsToStore = elementSync.map(_._1)
+          val elementsIdxToStore = elementSync.map(_._2)
+          mutateElementsWithFailLog(elementsToStore).map { rets =>
+            elementsIdxToStore.zip(rets) ++ retToSkip
+          }.map { rets =>
+            Json.toJson(rets.sortBy(_._1).map(_._2))
+          }.map(jsonResponse(_))
+        } else {
+          val rets = elementWithIdxs.map { case ((element, tsv), idx) =>
+            if (!skipElement(element.isAsync)) QueueActor.router ! (element, 
tsv)
+            true
           }
+          Future.successful(jsonResponse(Json.toJson(rets)))
         }
-      } catch {
-        case e: GraphExceptions.JsonParseException => 
Future.successful(BadRequest(s"$e"))
-        case e: Throwable =>
-          logger.error(s"tryMutate: ${e.getMessage}", e)
-          Future.successful(InternalServerError(s"${e.getStackTrace}"))
       }
     }
   }
 
   def mutateJsonFormat(jsValue: JsValue, operation: String, withWait: Boolean 
= false): Future[Result] = {
     logger.debug(s"$jsValue")
-    val edgesWithTsv = requestParser.parseJsonFormat(jsValue, operation)
-    tryMutate(edgesWithTsv, withWait)
+
+    try {
+      val edgesWithTsv = requestParser.parseJsonFormat(jsValue, operation)
+      tryMutate(edgesWithTsv, withWait)
+    } catch {
+      case e: JsonMappingException =>
+        logger.malformed(jsValue, e)
+        Future.successful(BadRequest(s"${e.getMessage}"))
+      case e: GraphExceptions.JsonParseException  =>
+        logger.malformed(jsValue, e)
+        Future.successful(BadRequest(s"${e.getMessage}"))
+      case e: Exception =>
+        logger.malformed(jsValue, e)
+        Future.failed(e)
+    }
   }
 
   def mutateBulkFormat(str: String, withWait: Boolean = false): Future[Result] 
= {
     logger.debug(s"$str")
-    val elementsWithTsv = requestParser.parseBulkFormat(str)
-    tryMutate(elementsWithTsv, withWait)
+
+    try {
+      val elementsWithTsv = requestParser.parseBulkFormat(str)
+      tryMutate(elementsWithTsv, withWait)
+    } catch {
+      case e: JsonMappingException =>
+        logger.malformed(str, e)
+        Future.successful(BadRequest(s"${e.getMessage}"))
+      case e: GraphExceptions.JsonParseException  =>
+        logger.malformed(str, e)
+        Future.successful(BadRequest(s"${e.getMessage}"))
+      case e: Exception =>
+        logger.malformed(str, e)
+        Future.failed(e)
+    }
   }
 
   def mutateBulk() = withHeaderAsync(parse.text) { request =>
@@ -163,29 +230,34 @@ object EdgeController extends Controller {
 
     if (edges.isEmpty) Future.successful(jsonResponse(JsArray()))
     else {
+
       s2.incrementCounts(edges, withWait = true).map { results =>
         val json = results.map { case (isSuccess, resultCount) =>
           Json.obj("success" -> isSuccess, "result" -> resultCount)
         }
+
         jsonResponse(Json.toJson(json))
       }
     }
   }
 
   def deleteAll() = withHeaderAsync(jsonParser) { request =>
-//    deleteAllInner(request.body, withWait = false)
     deleteAllInner(request.body, withWait = true)
   }
 
+  def deleteAllWithOutWait() = withHeaderAsync(jsonParser) { request =>
+    deleteAllInner(request.body, withWait = false)
+  }
+
   def deleteAllInner(jsValue: JsValue, withWait: Boolean) = {
 
-    /* logging for delete all request */
+    /** logging for delete all request */
     def enqueueLogMessage(ids: Seq[JsValue], labels: Seq[Label], ts: Long, 
direction: String, topicOpt: Option[String]) = {
       val kafkaMessages = for {
         id <- ids
         label <- labels
       } yield {
-        val tsv = Seq(ts, "deleteAll", "e", requestParser.jsToStr(id), 
requestParser.jsToStr(id), label.label, "{}", direction).mkString("\t")
+        val tsv = Seq(ts, "deleteAll", "e", RequestParser.jsToStr(id), 
RequestParser.jsToStr(id), label.label, "{}", direction).mkString("\t")
         val topic = topicOpt.getOrElse { toKafkaTopic(label.isAsync) }
 
         ExceptionHandler.toKafkaMessage(topic, tsv)
@@ -194,10 +266,18 @@ object EdgeController extends Controller {
       kafkaMessages.foreach(walLogHandler.enqueue)
     }
 
-    def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue], 
ts: Long, vertices: Seq[Vertex]) = {
-      enqueueLogMessage(ids, labels, ts, direction, None)
+    def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue],
+                   ts: Long, vertices: Seq[Vertex]) = {
+
       val future = s2.deleteAllAdjacentEdges(vertices.toList, labels, 
GraphUtil.directions(direction), ts)
       if (withWait) {
+        future onComplete {
+          case ret =>
+            if (!ret.get) {
+              val messages = toDeleteAllFailMessages(vertices.toList, labels, 
GraphUtil.directions(direction), ts)
+              publishFailTopic(messages)
+            }
+        }
         future
       } else {
         Future.successful(true)
@@ -205,9 +285,13 @@ object EdgeController extends Controller {
     }
 
     val deleteFutures = jsValue.as[Seq[JsValue]].map { json =>
-      val (labels, direction, ids, ts, vertices) = 
requestParser.toDeleteParam(json)
+      val (_labels, direction, ids, ts, vertices) = 
requestParser.toDeleteParam(json)
+      val srcVertices = vertices
+      enqueueLogMessage(ids, _labels, ts, direction, None)
+      val labels = _labels.filterNot(e => skipElement(e.isAsync))
+
       if (labels.isEmpty || ids.isEmpty) Future.successful(true)
-      else deleteEach(labels, direction, ids, ts, vertices)
+      else deleteEach(labels, direction, ids, ts, srcVertices)
     }
 
     val deleteResults = Future.sequence(deleteFutures)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala
index 9c4a061..760211a 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala
@@ -19,7 +19,6 @@
 
 package org.apache.s2graph.rest.play.controllers
 
-import org.apache.s2graph.core.mysqls.Experiment
 import org.apache.s2graph.core.rest.RestHandler
 import play.api.mvc._
 
@@ -30,10 +29,10 @@ object ExperimentController extends Controller {
 
   import ApplicationController._
 
+  def experiments() = experiment("", "", "")
   def experiment(accessToken: String, experimentName: String, uuid: String) = 
withHeaderAsync(jsonText) { request =>
     val body = request.body
-
-    val res = rest.doPost(request.uri, body, 
request.headers.get(Experiment.impressionKey))
+    val res = rest.doPost(request.uri, body, request.headers)
     res.body.map { case js =>
       val headers = res.headers :+ ("result_size" -> 
rest.calcSize(js).toString)
       jsonResponse(js, headers: _*)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
index 0260b7a..6a7d4f7 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
@@ -63,12 +63,4 @@ object PublishController extends Controller {
 
   def publish(topic: String) = publishOnly(topic)
 
-  //  def mutateBulk(topic: String) = Action.async(parse.text) { request =>
-  //    EdgeController.mutateAndPublish(Config.KAFKA_LOG_TOPIC, 
Config.KAFKA_FAIL_TOPIC, request.body).map { result =>
-  //      result.withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> 
"timeout=10, max=10")
-  //    }
-  //  }
-  def mutateBulk(topic: String) = withHeaderAsync(parse.text) { request =>
-    EdgeController.mutateBulkFormat(request.body)
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala
index 495cf7b..1e49ca7 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala
@@ -19,8 +19,6 @@
 
 package org.apache.s2graph.rest.play.controllers
 
-import org.apache.s2graph.core.JSONParser
-import org.apache.s2graph.core.mysqls.Experiment
 import org.apache.s2graph.core.rest.RestHandler
 import play.api.libs.json.Json
 import play.api.mvc._
@@ -31,13 +29,11 @@ object QueryController extends Controller {
 
   import ApplicationController._
   import play.api.libs.concurrent.Execution.Implicits.defaultContext
-
   private val rest: RestHandler = org.apache.s2graph.rest.play.Global.s2rest
 
   def delegate(request: Request[String]) = {
-    rest.doPost(request.uri, request.body, 
request.headers.get(Experiment.impressionKey)).body.map {
-      js =>
-        jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
+    rest.doPost(request.uri, request.body, request.headers).body.map { js =>
+      jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
     } recoverWith ApplicationController.requestFallback(request.body)
   }
 
@@ -58,13 +54,12 @@ object QueryController extends Controller {
   def getEdgesGroupedExcludedFormatted() = withHeaderAsync(jsonText)(delegate)
 
   def getEdge(srcId: String, tgtId: String, labelName: String, direction: 
String) =
-    withHeaderAsync(jsonText) {
-      request =>
-        val params = Json.arr(Json.obj("label" -> labelName, "direction" -> 
direction, "from" -> srcId, "to" -> tgtId))
-        rest.checkEdges(params).body.map {
-          js =>
-            jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
-        } recoverWith ApplicationController.requestFallback(request.body)
+    withHeaderAsync(jsonText) { request =>
+      val params = Json.arr(Json.obj("label" -> labelName, "direction" -> 
direction, "from" -> srcId, "to" -> tgtId))
+      rest.checkEdges(params).body.map {
+        js =>
+          jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
+      } recoverWith ApplicationController.requestFallback(request.body)
     }
 
   def getVertices() = withHeaderAsync(jsonText)(delegate)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
index 0fdbe43..72e6e82 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
@@ -50,14 +50,16 @@ object VertexController extends Controller {
         }
 
         //FIXME:
-        val verticesToStore = vertices.filterNot(v => v.isAsync)
-
-        if (withWait) {
-          val rets = s2.mutateVertices(verticesToStore, withWait = true)
-          rets.map(Json.toJson(_)).map(jsonResponse(_))
-        } else {
-          val rets = verticesToStore.map { vertex => QueueActor.router ! 
vertex; true }
-          Future.successful(jsonResponse(Json.toJson(rets)))
+        val verticesToStore = vertices.filterNot(v => skipElement(v.isAsync))
+        if (verticesToStore.isEmpty) 
Future.successful(jsonResponse(Json.toJson(Seq.empty[Boolean])))
+        else {
+          if (withWait) {
+            val rets = s2.mutateVertices(verticesToStore, withWait = true)
+            rets.map(Json.toJson(_)).map(jsonResponse(_))
+          } else {
+            val rets = verticesToStore.map { vertex => QueueActor.router ! 
vertex; true }
+            Future.successful(jsonResponse(Json.toJson(rets)))
+          }
         }
       } catch {
         case e: GraphExceptions.JsonParseException => 
Future.successful(BadRequest(s"e"))

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/conf/reference.conf
----------------------------------------------------------------------
diff --git a/s2rest_play/conf/reference.conf b/s2rest_play/conf/reference.conf
index bda503c..c3b716b 100644
--- a/s2rest_play/conf/reference.conf
+++ b/s2rest_play/conf/reference.conf
@@ -125,6 +125,7 @@ local.queue.actor.rate.limit=1000000
 # local retry number
 max.retry.number=100
 max.back.off=50
+back.off.timeout=1000
 delete.all.fetch.size=10000
 hbase.fail.prob=-1.0
 lock.expire.time=600000

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/conf/routes
----------------------------------------------------------------------
diff --git a/s2rest_play/conf/routes b/s2rest_play/conf/routes
index ee49c69..c360e6f 100644
--- a/s2rest_play/conf/routes
+++ b/s2rest_play/conf/routes
@@ -23,7 +23,7 @@
 
 
 # publish
-POST        /publish/:topic                                               
org.apache.s2graph.rest.play.controllers.PublishController.mutateBulk(topic)
+#POST        /publish/:topic                                               
org.apache.s2graph.rest.play.controllers.PublishController.mutateBulk(topic)
 POST        /publishOnly/:topic                                           
org.apache.s2graph.rest.play.controllers.PublishController.publishOnly(topic)
 
 #### Health Check
@@ -37,6 +37,7 @@ POST        /graphs/edges/insertBulk                          
            org.ap
 POST        /graphs/edges/delete                                          
org.apache.s2graph.rest.play.controllers.EdgeController.deletes()
 POST        /graphs/edges/deleteWithWait                                  
org.apache.s2graph.rest.play.controllers.EdgeController.deletesWithWait()
 POST        /graphs/edges/deleteAll                                       
org.apache.s2graph.rest.play.controllers.EdgeController.deleteAll()
+POST        /graphs/edges/deleteAllWithOutWait                            
org.apache.s2graph.rest.play.controllers.EdgeController.deleteAllWithOutWait()
 POST        /graphs/edges/update                                          
org.apache.s2graph.rest.play.controllers.EdgeController.updates()
 POST        /graphs/edges/updateWithWait                                  
org.apache.s2graph.rest.play.controllers.EdgeController.updatesWithWait()
 POST        /graphs/edges/increment                                       
org.apache.s2graph.rest.play.controllers.EdgeController.increments()
@@ -81,7 +82,7 @@ GET         /graphs/getLabels/:serviceName                    
            org.ap
 POST        /graphs/createLabel                                           
org.apache.s2graph.rest.play.controllers.AdminController.createLabel()
 POST        /graphs/addIndex                                              
org.apache.s2graph.rest.play.controllers.AdminController.addIndex()
 GET         /graphs/getLabel/:labelName                                   
org.apache.s2graph.rest.play.controllers.AdminController.getLabel(labelName)
-PUT         /graphs/deleteLabel/:labelName                                
org.apache.s2graph.rest.play.controllers.AdminController.deleteLabel(labelName)
+PUT         /graphs/deleteLabelReally/:labelName                          
org.apache.s2graph.rest.play.controllers.AdminController.deleteLabel(labelName)
 
 POST        /graphs/addProp/:labelName                                    
org.apache.s2graph.rest.play.controllers.AdminController.addProp(labelName)
 POST        /graphs/createServiceColumn                                   
org.apache.s2graph.rest.play.controllers.AdminController.createServiceColumn()
@@ -117,7 +118,7 @@ POST    /counter/v1/mget                                    
            org.apac
 
 # Experiment API
 POST        /graphs/experiment/:accessToken/:experimentName/:uuid         
org.apache.s2graph.rest.play.controllers.ExperimentController.experiment(accessToken,
 experimentName, uuid)
-
+POST        /graphs/experiments                                           
org.apache.s2graph.rest.play.controllers.ExperimentController.experiments()
 
 # Map static resources from the /public folder to the /assets URL path
 GET         /images/*file                                                 
controllers.Assets.at(path="/public/images", file)

Reply via email to