http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
new file mode 100644
index 0000000..d67de7c
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
@@ -0,0 +1,535 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId}
+import org.apache.s2graph.core.utils.logger
+import org.scalatest.FunSuite
+
+class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
+  initTests()
+
+  test("toLogString") {
+    val testLabelName = labelNameV2
+    val bulkQueries = List(
+      ("1445240543366", "update", "{\"is_blocked\":true}"),
+      ("1445240543362", "insert", "{\"is_hidden\":false}"),
+      ("1445240543364", "insert", "{\"is_hidden\":false,\"weight\":10}"),
+      ("1445240543363", "delete", "{}"),
+      ("1445240543365", "update", "{\"time\":1, \"weight\":-10}"))
+
+    val (srcId, tgtId, labelName) = ("1", "2", testLabelName)
+
+    val bulkEdge = (for ((ts, op, props) <- bulkQueries) yield {
+      Management.toEdge(ts.toLong, op, srcId, tgtId, labelName, "out", 
props).toLogString
+    }).mkString("\n")
+
+    val expected = Seq(
+      Seq("1445240543366", "update", "e", "1", "2", testLabelName, 
"{\"is_blocked\":true}"),
+      Seq("1445240543362", "insert", "e", "1", "2", testLabelName, 
"{\"is_hidden\":false}"),
+      Seq("1445240543364", "insert", "e", "1", "2", testLabelName, 
"{\"is_hidden\":false,\"weight\":10}"),
+      Seq("1445240543363", "delete", "e", "1", "2", testLabelName),
+      Seq("1445240543365", "update", "e", "1", "2", testLabelName, 
"{\"time\":1,\"weight\":-10}")
+    ).map(_.mkString("\t")).mkString("\n")
+
+    assert(bulkEdge === expected)
+  }
+
+  test("buildOperation") {
+    val schemaVersion = "v2"
+    val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion))
+    val srcVertex = Vertex(vertexId)
+    val tgtVertex = srcVertex
+
+    val timestampProp = LabelMeta.timeStampSeq -> 
InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
+
+    val snapshotEdge = None
+    val propsWithTs = Map(timestampProp)
+    val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = 
propsWithTs)
+    val newVersion = 0L
+
+    val newPropsWithTs = Map(
+      timestampProp,
+      1.toByte -> InnerValLikeWithTs(InnerVal.withBoolean(false, 
schemaVersion), 1)
+    )
+
+    val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, 
propsWithTs, newPropsWithTs)
+    logger.info(edgeMutate.toLogString)
+
+    assert(edgeMutate.newSnapshotEdge.isDefined)
+    assert(edgeMutate.edgesToInsert.nonEmpty)
+    assert(edgeMutate.edgesToDelete.isEmpty)
+  }
+
+  test("buildMutation: snapshotEdge: None with newProps") {
+    val schemaVersion = "v2"
+    val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion))
+    val srcVertex = Vertex(vertexId)
+    val tgtVertex = srcVertex
+
+    val timestampProp = LabelMeta.timeStampSeq -> 
InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
+
+    val snapshotEdge = None
+    val propsWithTs = Map(timestampProp)
+    val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = 
propsWithTs)
+    val newVersion = 0L
+
+    val newPropsWithTs = Map(
+      timestampProp,
+      1.toByte -> InnerValLikeWithTs(InnerVal.withBoolean(false, 
schemaVersion), 1)
+    )
+
+    val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, 
propsWithTs, newPropsWithTs)
+    logger.info(edgeMutate.toLogString)
+
+    assert(edgeMutate.newSnapshotEdge.isDefined)
+    assert(edgeMutate.edgesToInsert.nonEmpty)
+    assert(edgeMutate.edgesToDelete.isEmpty)
+  }
+
+  test("buildMutation: oldPropsWithTs == newPropsWithTs, Drop all requests") {
+    val schemaVersion = "v2"
+    val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion))
+    val srcVertex = Vertex(vertexId)
+    val tgtVertex = srcVertex
+
+    val timestampProp = LabelMeta.timeStampSeq -> 
InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
+
+    val snapshotEdge = None
+    val propsWithTs = Map(timestampProp)
+    val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = 
propsWithTs)
+    val newVersion = 0L
+
+    val newPropsWithTs = propsWithTs
+
+    val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, 
propsWithTs, newPropsWithTs)
+    logger.info(edgeMutate.toLogString)
+
+    assert(edgeMutate.newSnapshotEdge.isEmpty)
+    assert(edgeMutate.edgesToInsert.isEmpty)
+    assert(edgeMutate.edgesToDelete.isEmpty)
+  }
+
+  test("buildMutation: All props older than snapshotEdge's LastDeletedAt") {
+    val schemaVersion = "v2"
+    val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion))
+    val srcVertex = Vertex(vertexId)
+    val tgtVertex = srcVertex
+
+    val timestampProp = LabelMeta.timeStampSeq -> 
InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
+    val oldPropsWithTs = Map(
+      timestampProp,
+      LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, 
schemaVersion), 3)
+    )
+
+    val propsWithTs = Map(
+      timestampProp,
+      3.toByte -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 2),
+      LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, 
schemaVersion), 3)
+    )
+
+    val snapshotEdge =
+      Option(Edge(srcVertex, tgtVertex, labelWithDirV2, op = 
GraphUtil.operations("delete"), propsWithTs = oldPropsWithTs))
+
+    val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = 
propsWithTs)
+    val newVersion = 0L
+    val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, 
oldPropsWithTs, propsWithTs)
+    logger.info(edgeMutate.toLogString)
+
+    assert(edgeMutate.newSnapshotEdge.nonEmpty)
+    assert(edgeMutate.edgesToInsert.isEmpty)
+    assert(edgeMutate.edgesToDelete.isEmpty)
+  }
+
+  test("buildMutation: All props newer than snapshotEdge's LastDeletedAt") {
+    val schemaVersion = "v2"
+    val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion))
+    val srcVertex = Vertex(vertexId)
+    val tgtVertex = srcVertex
+
+    val timestampProp = LabelMeta.timeStampSeq -> 
InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
+    val oldPropsWithTs = Map(
+      timestampProp,
+      LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, 
schemaVersion), 3)
+    )
+
+    val propsWithTs = Map(
+      timestampProp,
+      3.toByte -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 4),
+      LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, 
schemaVersion), 3)
+    )
+
+    val snapshotEdge =
+      Option(Edge(srcVertex, tgtVertex, labelWithDirV2, op = 
GraphUtil.operations("delete"), propsWithTs = oldPropsWithTs))
+
+    val requestEdge = Edge(srcVertex, tgtVertex, labelWithDirV2, propsWithTs = 
propsWithTs)
+    val newVersion = 0L
+    val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, 
oldPropsWithTs, propsWithTs)
+    logger.info(edgeMutate.toLogString)
+
+    assert(edgeMutate.newSnapshotEdge.nonEmpty)
+    assert(edgeMutate.edgesToInsert.nonEmpty)
+    assert(edgeMutate.edgesToDelete.isEmpty)
+  }
+}
+
+//import com.kakao.s2graph.core.types._
+//import org.hbase.async.PutRequest
+//import org.scalatest.{FunSuite, Matchers}
+//
+///**
+// * Created by shon on 5/29/15.
+// */
+//class EdgeTest extends FunSuite with Matchers with TestCommon with 
TestCommonWithModels {
+//
+//
+//  import HBaseType.{VERSION1, VERSION2}
+//
+//
+//  def srcVertex(innerVal: InnerValLike)(version: String) = {
+//    val colId = if (version == VERSION1) column.id.get else columnV2.id.get
+//    Vertex(SourceVertexId(colId, innerVal), ts)
+//  }
+//
+//  def tgtVertex(innerVal: InnerValLike)(version: String) = {
+//    val colId = if (version == VERSION1) column.id.get else columnV2.id.get
+//    Vertex(TargetVertexId(colId, innerVal), ts)
+//  }
+//
+//
+//  def testEdges(version: String) = {
+//    val innerVals = if (version == VERSION1) intInnerVals else intInnerValsV2
+//    val tgtV = tgtVertex(innerVals.head)(version)
+//    innerVals.tail.map { intInnerVal =>
+//      val labelWithDirection = if (version == VERSION1) labelWithDir else 
labelWithDirV2
+//      val idxPropsList = if (version == VERSION1) idxPropsLs else 
idxPropsLsV2
+//      idxPropsList.map { idxProps =>
+//        val currentTs = idxProps.toMap.get(0.toByte).get.toString.toLong
+//        val idxPropsWithTs = idxProps.map { case (k, v) => k -> 
InnerValLikeWithTs(v, currentTs) }
+//
+//        Edge(srcVertex(intInnerVal)(version), tgtV, labelWithDirection, op, 
currentTs, currentTs, idxPropsWithTs.toMap)
+//      }
+//    }
+//  }
+//
+//  def testPropsUpdate(oldProps: Map[Byte, InnerValLikeWithTs],
+//                      newProps: Map[Byte, InnerValLikeWithTs],
+//                      expected: Map[Byte, Any],
+//                      expectedShouldUpdate: Boolean)
+//                     (f: PropsPairWithTs => (Map[Byte, InnerValLikeWithTs], 
Boolean))(version: String) = {
+//
+//    val timestamp = newProps.toList.head._2.ts
+//    val (updated, shouldUpdate) = f((oldProps, newProps, timestamp, version))
+//    val rets = for {
+//      (k, v) <- expected
+//    } yield {
+//        v match {
+//          case v: String =>
+//            v match {
+//              case "left" => updated.get(k).isDefined && updated(k) == 
oldProps(k)
+//              case "right" => updated.get(k).isDefined && updated(k) == 
newProps(k)
+//              case "none" => updated.get(k).isEmpty
+//            }
+//          case value: InnerValLikeWithTs => updated.get(k).get == value
+//          case _ => throw new RuntimeException(s"not supported keyword: $v")
+//        }
+//      }
+//    println(rets)
+//    rets.forall(x => x) && shouldUpdate == expectedShouldUpdate
+//  }
+//
+//  def testEdgeWithIndex(edges: Seq[Seq[Edge]])(queryParam: QueryParam) = {
+//    val rets = for {
+//      edgeForSameTgtVertex <- edges
+//    } yield {
+//        val head = edgeForSameTgtVertex.head
+//        val start = head
+//        var prev = head
+//        val rets = for {
+//          edge <- edgeForSameTgtVertex.tail
+//        } yield {
+//            println(s"prevEdge: $prev")
+//            println(s"currentEdge: $edge")
+//            val prevEdgeWithIndex = edge.edgesWithIndex
+//            val edgesWithIndex = edge.edgesWithIndex
+//
+//            /** test encodeing decoding */
+//            for {
+//              edgeWithIndex <- edge.edgesWithIndex
+//              put <- edgeWithIndex.buildPutsAsync()
+//              kv <- putToKeyValues(put.asInstanceOf[PutRequest])
+//            } {
+//              val decoded = Edge.toEdge(kv, queryParam, None, Seq())
+//              val comp = decoded.isDefined && decoded.get == edge
+//              println(s"${decoded.get}")
+//              println(s"$edge")
+//              println(s"${decoded.get == edge}")
+//              comp shouldBe true
+//
+//              /** test order
+//                * same source, target vertex. same indexProps keys.
+//                * only difference is indexProps values so comparing 
qualifier is good enough
+//                * */
+//              for {
+//                prevEdgeWithIndex <- prev.edgesWithIndex
+//              } {
+//                println(edgeWithIndex.qualifier)
+//                println(prevEdgeWithIndex.qualifier)
+//                println(edgeWithIndex.qualifier.bytes.toList)
+//                println(prevEdgeWithIndex.qualifier.bytes.toList)
+//                /** since index of this test label only use 0, 1 as 
indexProps
+//                  * if 0, 1 is not different then qualifier bytes should be 
same
+//                  * */
+//                val comp = lessThanEqual(edgeWithIndex.qualifier.bytes, 
prevEdgeWithIndex.qualifier.bytes)
+//                comp shouldBe true
+//              }
+//            }
+//            prev = edge
+//          }
+//      }
+//  }
+//
+//  def testInvertedEdge(edges: Seq[Seq[Edge]])(queryParam: QueryParam) = {
+//    val rets = for {
+//      edgeForSameTgtVertex <- edges
+//    } yield {
+//        val head = edgeForSameTgtVertex.head
+//        val start = head
+//        var prev = head
+//        val rets = for {
+//          edge <- edgeForSameTgtVertex.tail
+//        } yield {
+//            println(s"prevEdge: $prev")
+//            println(s"currentEdge: $edge")
+//            val prevEdgeWithIndexInverted = prev.toSnapshotEdge
+//            val edgeWithInvertedIndex = edge.toSnapshotEdge
+//            /** test encode decoding */
+//
+//            val put = edgeWithInvertedIndex.buildPutAsync()
+//            for {
+//              kv <- putToKeyValues(put)
+//            } yield {
+//              val decoded = Edge.toSnapshotEdge(kv, queryParam, None, 
isInnerCall = false, Seq())
+//              val comp = decoded.isDefined && decoded.get == edge
+//              println(s"${decoded.get}")
+//              println(s"$edge")
+//              println(s"${decoded.get == edge}")
+//              comp shouldBe true
+//
+//              /** no need to test ordering because qualifier only use 
targetVertexId */
+//            }
+//            prev = edge
+//          }
+//      }
+//  }
+//
+//  test("insert for edgesWithIndex version 2") {
+//    val version = VERSION2
+//    testEdgeWithIndex(testEdges(version))(queryParamV2)
+//  }
+//  test("insert for edgesWithIndex version 1") {
+//    val version = VERSION1
+//    testEdgeWithIndex(testEdges(version))(queryParam)
+//  }
+//
+//  test("insert for edgeWithInvertedIndex version 1") {
+//    val version = VERSION1
+//    testInvertedEdge(testEdges(version))(queryParam)
+//  }
+//
+//  test("insert for edgeWithInvertedIndex version 2") {
+//    val version = VERSION2
+//    testInvertedEdge(testEdges(version))(queryParamV2)
+//  }
+//
+//
+//  //  /** test cases for each operation */
+//
+//  def oldProps(timestamp: Long, version: String) = {
+//    Map(
+//      labelMeta.lastDeletedAt -> InnerValLikeWithTs.withLong(timestamp - 2, 
timestamp - 2, version),
+//      1.toByte -> InnerValLikeWithTs.withLong(0L, timestamp, version),
+//      2.toByte -> InnerValLikeWithTs.withLong(1L, timestamp - 1, version),
+//      4.toByte -> InnerValLikeWithTs.withStr("old", timestamp - 1, version)
+//    )
+//  }
+//
+//  def newProps(timestamp: Long, version: String) = {
+//    Map(
+//      2.toByte -> InnerValLikeWithTs.withLong(-10L, timestamp, version),
+//      3.toByte -> InnerValLikeWithTs.withLong(20L, timestamp, version)
+//    )
+//  }
+//
+//  def deleteProps(timestamp: Long, version: String) = Map(
+//    labelMeta.lastDeletedAt -> InnerValLikeWithTs.withLong(timestamp, 
timestamp, version)
+//  )
+//
+//  /** upsert */
+//  test("Edge.buildUpsert") {
+//    val shouldUpdate = true
+//    val oldState = oldProps(ts, VERSION2)
+//    val newState = newProps(ts + 1, VERSION2)
+//    val expected = Map(
+//      labelMeta.lastDeletedAt -> "left",
+//      1.toByte -> "none",
+//      2.toByte -> "right",
+//      3.toByte -> "right",
+//      4.toByte -> "none")
+//    testPropsUpdate(oldState, newState, expected, 
shouldUpdate)(Edge.buildUpsert)(VERSION2) shouldBe true
+//  }
+//  test("Edge.buildUpsert shouldUpdate false") {
+//    val shouldUpdate = false
+//    val oldState = oldProps(ts, VERSION2)
+//    val newState = newProps(ts - 10, VERSION2)
+//    val expected = Map(
+//      labelMeta.lastDeletedAt -> "left",
+//      1.toByte -> "left",
+//      2.toByte -> "left",
+//      3.toByte -> "none",
+//      4.toByte -> "left")
+//    testPropsUpdate(oldState, newState, expected, 
shouldUpdate)(Edge.buildUpsert)(VERSION2) shouldBe true
+//  }
+//
+//  /** update */
+//  test("Edge.buildUpdate") {
+//    val shouldUpdate = true
+//    val oldState = oldProps(ts, VERSION2)
+//    val newState = newProps(ts + 1, VERSION2)
+//    val expected = Map(
+//      labelMeta.lastDeletedAt -> "left",
+//      1.toByte -> "left",
+//      2.toByte -> "right",
+//      3.toByte -> "right",
+//      4.toByte -> "left"
+//    )
+//    testPropsUpdate(oldState, newState, expected, 
shouldUpdate)(Edge.buildUpdate)(VERSION2) shouldBe true
+//  }
+//  test("Edge.buildUpdate shouldUpdate false") {
+//    val shouldUpdate = false
+//    val oldState = oldProps(ts, VERSION2)
+//    val newState = newProps(ts - 10, VERSION2)
+//    val expected = Map(
+//      labelMeta.lastDeletedAt -> "left",
+//      1.toByte -> "left",
+//      2.toByte -> "left",
+//      3.toByte -> "none",
+//      4.toByte -> "left"
+//    )
+//    testPropsUpdate(oldState, newState, expected, 
shouldUpdate)(Edge.buildUpdate)(VERSION2) shouldBe true
+//  }
+//
+//  /** delete */
+//  test("Edge.buildDelete") {
+//    val shouldUpdate = true
+//    val oldState = oldProps(ts, VERSION2)
+//    val newState = deleteProps(ts + 1, VERSION2)
+//    val expected = Map(
+//      labelMeta.lastDeletedAt -> "right",
+//      1.toByte -> "none",
+//      2.toByte -> "none",
+//      4.toByte -> "none"
+//    )
+//    testPropsUpdate(oldState, newState, expected, 
shouldUpdate)(Edge.buildDelete)(VERSION2) shouldBe true
+//  }
+//  test("Edge.buildDelete shouldUpdate false") {
+//    val shouldUpdate = false
+//    val oldState = oldProps(ts, VERSION2)
+//    val newState = deleteProps(ts - 10, VERSION2)
+//    val expected = Map(
+//      labelMeta.lastDeletedAt -> "left",
+//      1.toByte -> "left",
+//      2.toByte -> "left",
+//      4.toByte -> "left"
+//    )
+//    testPropsUpdate(oldState, newState, expected, 
shouldUpdate)(Edge.buildDelete)(VERSION2) shouldBe true
+//  }
+//
+//  /** increment */
+//  test("Edge.buildIncrement") {
+//    val shouldUpdate = true
+//    val oldState = oldProps(ts, VERSION2).filterNot(kv => kv._1 == 4.toByte)
+//    val newState = newProps(ts + 1, VERSION2)
+//    val expected = Map(
+//      labelMeta.lastDeletedAt -> "left",
+//      1.toByte -> "left",
+//      2.toByte -> InnerValLikeWithTs.withLong(-9L, ts - 1, VERSION2),
+//      3.toByte -> "right"
+//    )
+//    testPropsUpdate(oldState, newState, expected, 
shouldUpdate)(Edge.buildIncrement)(VERSION2) shouldBe true
+//  }
+//  test("Edge.buildIncrement shouldRepalce false") {
+//    val shouldUpdate = false
+//    val oldState = oldProps(ts, VERSION2).filterNot(kv => kv._1 == 4.toByte)
+//    val newState = newProps(ts - 10, VERSION2)
+//    val expected = Map(
+//      labelMeta.lastDeletedAt -> "left",
+//      1.toByte -> "left",
+//      2.toByte -> "left"
+//    )
+//    testPropsUpdate(oldState, newState, expected, 
shouldUpdate)(Edge.buildIncrement)(VERSION2) shouldBe true
+//  }
+//
+//  test("Edge`s srcVertex") {
+//
+//    val version = VERSION2
+//    val srcId = InnerVal.withLong(10, version)
+//    val tgtId = InnerVal.withStr("abc", version)
+//    val srcColumn = columnV2
+//    val tgtColumn = tgtColumnV2
+//    val srcVertexId = VertexId(srcColumn.id.get, srcId)
+//    val tgtVertexId = VertexId(tgtColumn.id.get, tgtId)
+//
+//    val srcVertex = Vertex(srcVertexId)
+//    val tgtVertex = Vertex(tgtVertexId)
+//
+//    val labelId = undirectedLabelV2.id.get
+//
+//    val outDir = LabelWithDirection(labelId, GraphUtil.directions("out"))
+//    val inDir = LabelWithDirection(labelId, GraphUtil.directions("in"))
+//    val bothDir = LabelWithDirection(labelId, 
GraphUtil.directions("undirected"))
+//
+//    val op = GraphUtil.operations("insert")
+//
+//
+//    val bothEdge = Edge(srcVertex, tgtVertex, bothDir)
+//    println(s"edge: $bothEdge")
+//    bothEdge.relatedEdges.foreach { edge =>
+//      println(edge)
+//    }
+//
+//  }
+//  test("edge buildIncrementBulk") {
+//
+//    /**
+//     * 172567371     List(97, 74, 2, 117, -74, -44, -76, 0, 0, 4, 8, 2)
+//169116518    List(68, -110, 2, 117, -21, 124, -103, 0, 0, 4, 9, 2)
+//11646834     List(17, 33, 2, 127, 78, 72, -115, 0, 0, 4, 9, 2)
+//148171217    List(62, 54, 2, 119, 43, 22, 46, 0, 0, 4, 9, 2)
+//116315188    List(41, 86, 2, 121, 17, 43, -53, 0, 0, 4, 9, 2)
+//180667876    List(48, -82, 2, 117, 59, 58, 27, 0, 0, 4, 8, 2)
+//4594410      List(82, 29, 2, 127, -71, -27, 21, 0, 0, 4, 8, 2)
+//151435444    List(1, 105, 2, 118, -7, 71, 75, 0, 0, 4, 8, 2)
+//168460895    List(67, -35, 2, 117, -11, 125, -96, 0, 0, 4, 9, 2)
+//7941614      List(115, 67, 2, 127, -122, -46, 17, 0, 0, 4, 8, 2)
+//171169732    List(61, -42, 2, 117, -52, 40, 59, 0, 0, 4, 9, 2)
+//174381375    List(91, 2, 2, 117, -101, 38, -64, 0, 0, 4, 9, 2)
+//12754019     List(9, -80, 2, 127, 61, 99, -100, 0, 0, 4, 9, 2)
+//175518092    List(111, 32, 2, 117, -119, -50, 115, 0, 0, 4, 8, 2)
+//174748531    List(28, -81, 2, 117, -107, -116, -116, 0, 0, 4, 8, 2)
+//
+//     */
+//    //    val incrementsOpt = Edge.buildIncrementDegreeBulk("169116518", 
"talk_friend_long_term_agg_test", "out", 10)
+//    //
+//    //    for {
+//    //      increments <- incrementsOpt
+//    //      increment <- increments
+//    //      (cf, qs) <- increment.getFamilyMapOfLongs
+//    //      (q, v) <- qs
+//    //    } {
+//    //      println(increment.getRow.toList)
+//    //      println(q.toList)
+//    //      println(v)
+//    //    }
+//    //List(68, -110, -29, -4, 116, -24, 124, -37, 0, 0, 52, -44, 2)
+//  }
+//}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
new file mode 100644
index 0000000..13f0b0a
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
@@ -0,0 +1,226 @@
+package org.apache.s2graph.core.Integrate
+
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import play.api.libs.json.{JsObject, Json}
+
+class CrudTest extends IntegrateCommon {
+  import CrudHelper._
+  import TestUtil._
+
+  test("test CRUD") {
+    var tcNum = 0
+    var tcString = ""
+    var bulkQueries = List.empty[(Long, String, String)]
+    var expected = Map.empty[String, String]
+
+    val curTime = System.currentTimeMillis
+    val t1 = curTime + 0
+    val t2 = curTime + 1
+    val t3 = curTime + 2
+    val t4 = curTime + 3
+    val t5 = curTime + 4
+
+    val tcRunner = new CrudTestRunner()
+    tcNum = 1
+    tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) 
test "
+
+    bulkQueries = List(
+      (t1, "insert", "{\"time\": 10}"),
+      (t2, "delete", ""),
+      (t3, "insert", "{\"time\": 10, \"weight\": 20}"))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+
+    tcNum = 2
+    tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) 
test "
+    bulkQueries = List(
+      (t1, "insert", "{\"time\": 10}"),
+      (t3, "insert", "{\"time\": 10, \"weight\": 20}"),
+      (t2, "delete", ""))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+
+    tcNum = 3
+    tcString = "[t3 -> t2 -> t1 test case] insert(t3) delete(t2) insert(t1) 
test "
+    bulkQueries = List(
+      (t3, "insert", "{\"time\": 10, \"weight\": 20}"),
+      (t2, "delete", ""),
+      (t1, "insert", "{\"time\": 10}"))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+
+    tcNum = 4
+    tcString = "[t3 -> t1 -> t2 test case] insert(t3) insert(t1) delete(t2) 
test "
+    bulkQueries = List(
+      (t3, "insert", "{\"time\": 10, \"weight\": 20}"),
+      (t1, "insert", "{\"time\": 10}"),
+      (t2, "delete", ""))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+
+    tcNum = 5
+    tcString = "[t2 -> t1 -> t3 test case] delete(t2) insert(t1) insert(t3) 
test"
+    bulkQueries = List(
+      (t2, "delete", ""),
+      (t1, "insert", "{\"time\": 10}"),
+      (t3, "insert", "{\"time\": 10, \"weight\": 20}"))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+
+    tcNum = 6
+    tcString = "[t2 -> t3 -> t1 test case] delete(t2) insert(t3) insert(t1) 
test "
+    bulkQueries = List(
+      (t2, "delete", ""),
+      (t3, "insert", "{\"time\": 10, \"weight\": 20}"),
+      (t1, "insert", "{\"time\": 10}"))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+
+    tcNum = 7
+    tcString = "[t1 -> t2 -> t3 test case] update(t1) delete(t2) update(t3) 
test "
+    bulkQueries = List(
+      (t1, "update", "{\"time\": 10}"),
+      (t2, "delete", ""),
+      (t3, "update", "{\"time\": 10, \"weight\": 20}"))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+    tcNum = 8
+    tcString = "[t1 -> t3 -> t2 test case] update(t1) update(t3) delete(t2) 
test "
+    bulkQueries = List(
+      (t1, "update", "{\"time\": 10}"),
+      (t3, "update", "{\"time\": 10, \"weight\": 20}"),
+      (t2, "delete", ""))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+    tcNum = 9
+    tcString = "[t2 -> t1 -> t3 test case] delete(t2) update(t1) update(t3) 
test "
+    bulkQueries = List(
+      (t2, "delete", ""),
+      (t1, "update", "{\"time\": 10}"),
+      (t3, "update", "{\"time\": 10, \"weight\": 20}"))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+    tcNum = 10
+    tcString = "[t2 -> t3 -> t1 test case] delete(t2) update(t3) update(t1) 
test"
+    bulkQueries = List(
+      (t2, "delete", ""),
+      (t3, "update", "{\"time\": 10, \"weight\": 20}"),
+      (t1, "update", "{\"time\": 10}"))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+    tcNum = 11
+    tcString = "[t3 -> t2 -> t1 test case] update(t3) delete(t2) update(t1) 
test "
+    bulkQueries = List(
+      (t3, "update", "{\"time\": 10, \"weight\": 20}"),
+      (t2, "delete", ""),
+      (t1, "update", "{\"time\": 10}"))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+    tcNum = 12
+    tcString = "[t3 -> t1 -> t2 test case] update(t3) update(t1) delete(t2) 
test "
+    bulkQueries = List(
+      (t3, "update", "{\"time\": 10, \"weight\": 20}"),
+      (t1, "update", "{\"time\": 10}"),
+      (t2, "delete", ""))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+
+    tcNum = 13
+    tcString = "[t5 -> t1 -> t3 -> t2 -> t4 test case] update(t5) insert(t1) 
insert(t3) delete(t2) update(t4) test "
+    bulkQueries = List(
+      (t5, "update", "{\"is_blocked\": true}"),
+      (t1, "insert", "{\"is_hidden\": false}"),
+      (t3, "insert", "{\"is_hidden\": false, \"weight\": 10}"),
+      (t2, "delete", ""),
+      (t4, "update", "{\"time\": 1, \"weight\": -10}"))
+    expected = Map("time" -> "1", "weight" -> "-10", "is_hidden" -> "false", 
"is_blocked" -> "true")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+  }
+
+
+  object CrudHelper {
+
+    class CrudTestRunner {
+      var seed = 0
+
+      def run(tcNum: Int, tcString: String, opWithProps: List[(Long, String, 
String)], expected: Map[String, String]) = {
+        for {
+          labelName <- List(testLabelName, testLabelName2)
+          i <- 0 until NumOfEachTest
+        } {
+          seed += 1
+          val srcId = seed.toString
+          val tgtId = srcId
+
+          val maxTs = opWithProps.map(t => t._1).max
+
+          /** insert edges */
+          println(s"---- TC${tcNum}_init ----")
+          val bulkEdges = (for ((ts, op, props) <- opWithProps) yield {
+            TestUtil.toEdge(ts, op, "e", srcId, tgtId, labelName, props)
+          })
+
+          TestUtil.insertEdgesSync(bulkEdges: _*)
+
+          for {
+            label <- Label.findByName(labelName)
+            direction <- List("out", "in")
+            cacheTTL <- List(-1L)
+          } {
+            val (serviceName, columnName, id, otherId) = direction match {
+              case "out" => (label.srcService.serviceName, 
label.srcColumn.columnName, srcId, tgtId)
+              case "in" => (label.tgtService.serviceName, 
label.tgtColumn.columnName, tgtId, srcId)
+            }
+
+            val qId = if (labelName == testLabelName) id else "\"" + id + "\""
+            val query = queryJson(serviceName, columnName, labelName, qId, 
direction, cacheTTL)
+
+            val jsResult = TestUtil.getEdgesSync(query)
+
+            val results = jsResult \ "results"
+            val deegrees = (jsResult \ "degrees").as[List[JsObject]]
+            val propsLs = (results \\ "props").seq
+            (deegrees.head \ LabelMeta.degree.name).as[Int] should be(1)
+
+            val from = (results \\ "from").seq.last.toString.replaceAll("\"", 
"")
+            val to = (results \\ "to").seq.last.toString.replaceAll("\"", "")
+
+            from should be(id.toString)
+            to should be(otherId.toString)
+            (results \\ "_timestamp").seq.last.as[Long] should be(maxTs)
+
+            for ((key, expectedVal) <- expected) {
+              propsLs.last.as[JsObject].keys.contains(key) should be(true)
+              (propsLs.last \ key).toString should be(expectedVal)
+            }
+          }
+        }
+      }
+
+      def queryJson(serviceName: String, columnName: String, labelName: 
String, id: String, dir: String, cacheTTL: Long = -1L) = Json.parse(
+        s""" { "srcVertices": [
+             { "serviceName": "$serviceName",
+               "columnName": "$columnName",
+               "id": $id } ],
+             "steps": [ [ {
+             "label": "$labelName",
+             "direction": "$dir",
+             "offset": 0,
+             "limit": 10,
+             "cacheTTL": $cacheTTL }]]}""")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
new file mode 100644
index 0000000..86bc85c
--- /dev/null
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
@@ -0,0 +1,309 @@
+package org.apache.s2graph.core.Integrate
+
+import com.typesafe.config._
+import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.rest.{RequestParser, RestHandler}
+import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.{Graph, GraphUtil, Management, PostProcess}
+import org.scalatest._
+import play.api.libs.json.{JsValue, Json}
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
+
+trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll {
+
+  import TestUtil._
+
+  var graph: Graph = _
+  var parser: RequestParser = _
+  var management: Management = _
+  var config: Config = _
+
+  override def beforeAll = {
+    config = ConfigFactory.load()
+    graph = new Graph(config)(ExecutionContext.Implicits.global)
+    management = new Management(graph)
+    parser = new RequestParser(graph.config)
+    initTestData()
+  }
+
+  override def afterAll(): Unit = {
+    graph.shutdown()
+  }
+
+  /**
+   * Make Service, Label, Vertex for integrate test
+   */
+  def initTestData() = {
+    println("[init start]: 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
+    Management.deleteService(testServiceName)
+
+    // 1. createService
+    val jsValue = Json.parse(createService)
+    val (serviceName, cluster, tableName, preSplitSize, ttl, 
compressionAlgorithm) =
+      parser.toServiceElements(jsValue)
+
+    val tryRes =
+      management.createService(serviceName, cluster, tableName, preSplitSize, 
ttl, compressionAlgorithm)
+    println(s">> Service created : $createService, $tryRes")
+
+    val labelNames = Map(testLabelName -> testLabelNameCreate,
+      testLabelName2 -> testLabelName2Create,
+      testLabelNameV1 -> testLabelNameV1Create,
+      testLabelNameWeak -> testLabelNameWeakCreate)
+
+    for {
+      (labelName, create) <- labelNames
+    } {
+      Management.deleteLabel(labelName)
+      Label.findByName(labelName, useCache = false) match {
+        case None =>
+          val json = Json.parse(create)
+          val tryRes = for {
+            labelArgs <- parser.toLabelElements(json)
+            label <- (management.createLabel _).tupled(labelArgs)
+          } yield label
+
+          tryRes.get
+        case Some(label) =>
+          println(s">> Label already exist: $create, $label")
+      }
+    }
+
+    val vertexPropsKeys = List("age" -> "int")
+
+    vertexPropsKeys.map { case (key, keyType) =>
+      Management.addVertexProp(testServiceName, testColumnName, key, keyType)
+    }
+
+    println("[init end]: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
+  }
+
+
+  /**
+   * Test Helpers
+   */
+  object TestUtil {
+    implicit def ec = scala.concurrent.ExecutionContext.global
+
+    //    def checkEdgeQueryJson(params: Seq[(String, String, String, 
String)]) = {
+    //      val arr = for {
+    //        (label, dir, from, to) <- params
+    //      } yield {
+    //        Json.obj("label" -> label, "direction" -> dir, "from" -> from, 
"to" -> to)
+    //      }
+    //
+    //      val s = Json.toJson(arr)
+    //      s
+    //    }
+
+    def deleteAllSync(jsValue: JsValue) = {
+      val future = Future.sequence(jsValue.as[Seq[JsValue]] map { json =>
+        val (labels, direction, ids, ts, vertices) = parser.toDeleteParam(json)
+        val future = graph.deleteAllAdjacentEdges(vertices.toList, labels, 
GraphUtil.directions(direction), ts)
+
+        future
+      })
+
+      Await.result(future, HttpRequestWaitingTime)
+    }
+
+    def getEdgesSync(queryJson: JsValue): JsValue = {
+      logger.info(Json.prettyPrint(queryJson))
+      val restHandler = new RestHandler(graph)
+      
Await.result(restHandler.getEdgesAsync(queryJson)(PostProcess.toSimpleVertexArrJson),
 HttpRequestWaitingTime)
+    }
+
+    def insertEdgesSync(bulkEdges: String*) = {
+      val req = 
graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait 
= true)
+      val jsResult = Await.result(req, HttpRequestWaitingTime)
+
+      jsResult
+    }
+
+    def insertEdgesAsync(bulkEdges: String*) = {
+      val req = 
graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait 
= true)
+      req
+    }
+
+    def toEdge(elems: Any*): String = elems.mkString("\t")
+
+    // common tables
+    val testServiceName = "s2graph"
+    val testLabelName = "s2graph_label_test"
+    val testLabelName2 = "s2graph_label_test_2"
+    val testLabelNameV1 = "s2graph_label_test_v1"
+    val testLabelNameWeak = "s2graph_label_test_weak"
+    val testColumnName = "user_id_test"
+    val testColumnType = "long"
+    val testTgtColumnName = "item_id_test"
+    val testHTableName = "test-htable"
+    val newHTableName = "new-htable"
+    val index1 = "idx_1"
+    val index2 = "idx_2"
+
+    val NumOfEachTest = 30
+    val HttpRequestWaitingTime = Duration("60 seconds")
+
+    val createService = s"""{"serviceName" : "$testServiceName"}"""
+
+    val testLabelNameCreate =
+      s"""
+  {
+    "label": "$testLabelName",
+    "srcServiceName": "$testServiceName",
+    "srcColumnName": "$testColumnName",
+    "srcColumnType": "long",
+    "tgtServiceName": "$testServiceName",
+    "tgtColumnName": "$testColumnName",
+    "tgtColumnType": "long",
+    "indices": [
+      {"name": "$index1", "propNames": ["weight", "time", "is_hidden", 
"is_blocked"]},
+      {"name": "$index2", "propNames": ["_timestamp"]}
+    ],
+    "props": [
+    {
+      "name": "time",
+      "dataType": "long",
+      "defaultValue": 0
+    },
+    {
+      "name": "weight",
+      "dataType": "long",
+      "defaultValue": 0
+    },
+    {
+      "name": "is_hidden",
+      "dataType": "boolean",
+      "defaultValue": false
+    },
+    {
+      "name": "is_blocked",
+      "dataType": "boolean",
+      "defaultValue": false
+    }
+    ],
+    "consistencyLevel": "strong",
+    "schemaVersion": "v4",
+    "compressionAlgorithm": "gz",
+    "hTableName": "$testHTableName"
+  }"""
+
+    val testLabelName2Create =
+      s"""
+  {
+    "label": "$testLabelName2",
+    "srcServiceName": "$testServiceName",
+    "srcColumnName": "$testColumnName",
+    "srcColumnType": "long",
+    "tgtServiceName": "$testServiceName",
+    "tgtColumnName": "$testTgtColumnName",
+    "tgtColumnType": "string",
+    "indices": [{"name": "$index1", "propNames": ["time", "weight", 
"is_hidden", "is_blocked"]}],
+    "props": [
+    {
+      "name": "time",
+      "dataType": "long",
+      "defaultValue": 0
+    },
+    {
+      "name": "weight",
+      "dataType": "long",
+      "defaultValue": 0
+    },
+    {
+      "name": "is_hidden",
+      "dataType": "boolean",
+      "defaultValue": false
+    },
+    {
+      "name": "is_blocked",
+      "dataType": "boolean",
+      "defaultValue": false
+    }
+    ],
+    "consistencyLevel": "strong",
+    "isDirected": false,
+    "schemaVersion": "v3",
+    "compressionAlgorithm": "gz"
+  }"""
+
+    val testLabelNameV1Create =
+      s"""
+  {
+    "label": "$testLabelNameV1",
+    "srcServiceName": "$testServiceName",
+    "srcColumnName": "$testColumnName",
+    "srcColumnType": "long",
+    "tgtServiceName": "$testServiceName",
+    "tgtColumnName": "${testTgtColumnName}_v1",
+    "tgtColumnType": "string",
+    "indices": [{"name": "$index1", "propNames": ["time", "weight", 
"is_hidden", "is_blocked"]}],
+    "props": [
+    {
+      "name": "time",
+      "dataType": "long",
+      "defaultValue": 0
+    },
+    {
+      "name": "weight",
+      "dataType": "long",
+      "defaultValue": 0
+    },
+    {
+      "name": "is_hidden",
+      "dataType": "boolean",
+      "defaultValue": false
+    },
+    {
+      "name": "is_blocked",
+      "dataType": "boolean",
+      "defaultValue": false
+    }
+    ],
+    "consistencyLevel": "strong",
+    "isDirected": true,
+    "schemaVersion": "v1",
+    "compressionAlgorithm": "gz"
+  }"""
+
+    val testLabelNameWeakCreate =
+      s"""
+  {
+    "label": "$testLabelNameWeak",
+    "srcServiceName": "$testServiceName",
+    "srcColumnName": "$testColumnName",
+    "srcColumnType": "long",
+    "tgtServiceName": "$testServiceName",
+    "tgtColumnName": "$testTgtColumnName",
+    "tgtColumnType": "string",
+    "indices": [{"name": "$index1", "propNames": ["time", "weight", 
"is_hidden", "is_blocked"]}],
+    "props": [
+    {
+      "name": "time",
+      "dataType": "long",
+      "defaultValue": 0
+    },
+    {
+      "name": "weight",
+      "dataType": "long",
+      "defaultValue": 0
+    },
+    {
+      "name": "is_hidden",
+      "dataType": "boolean",
+      "defaultValue": false
+    },
+    {
+      "name": "is_blocked",
+      "dataType": "boolean",
+      "defaultValue": false
+    }
+    ],
+    "consistencyLevel": "weak",
+    "isDirected": true,
+    "compressionAlgorithm": "gz"
+  }"""
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
new file mode 100644
index 0000000..232e685
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
@@ -0,0 +1,906 @@
+package org.apache.s2graph.core.Integrate
+
+import org.apache.s2graph.core.utils.logger
+import org.scalatest.BeforeAndAfterEach
+import play.api.libs.json.{JsNumber, JsValue, Json}
+
+class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
+
+  import TestUtil._
+
+  val insert = "insert"
+  val e = "e"
+  val weight = "weight"
+  val is_hidden = "is_hidden"
+
+  test("interval") {
+    def queryWithInterval(id: Int, index: String, prop: String, fromVal: Int, 
toVal: Int) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "$testServiceName",
+            "columnName": "$testColumnName",
+            "id": $id
+           }],
+          "steps": [
+          [ {
+              "label": "$testLabelName",
+              "index": "$index",
+              "interval": {
+                  "from": [ { "$prop": $fromVal } ],
+                  "to": [ { "$prop": $toVal } ]
+              }
+            }
+          ]]
+        }
+        """)
+
+    var edges = getEdgesSync(queryWithInterval(0, index2, "_timestamp", 1000, 
1001)) // test interval on timestamp index
+    (edges \ "size").toString should be("1")
+
+    edges = getEdgesSync(queryWithInterval(0, index2, "_timestamp", 1000, 
2000)) // test interval on timestamp index
+    (edges \ "size").toString should be("2")
+
+    edges = getEdgesSync(queryWithInterval(2, index1, "weight", 10, 11)) // 
test interval on weight index
+    (edges \ "size").toString should be("1")
+
+    edges = getEdgesSync(queryWithInterval(2, index1, "weight", 10, 20)) // 
test interval on weight index
+    (edges \ "size").toString should be("2")
+  }
+
+  test("get edge with where condition") {
+    def queryWhere(id: Int, where: String) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "${testServiceName}",
+            "columnName": "${testColumnName}",
+            "id": ${id}
+           }],
+          "steps": [
+          [ {
+              "label": "${testLabelName}",
+              "direction": "out",
+              "offset": 0,
+              "limit": 100,
+              "where": "${where}"
+            }
+          ]]
+        }""")
+
+    var result = getEdgesSync(queryWhere(0, "is_hidden=false and _from in (-1, 
0)"))
+    (result \ "results").as[List[JsValue]].size should be(1)
+
+    result = getEdgesSync(queryWhere(0, "is_hidden=true and _to in (1)"))
+    (result \ "results").as[List[JsValue]].size should be(1)
+
+    result = getEdgesSync(queryWhere(0, "_from=0"))
+    (result \ "results").as[List[JsValue]].size should be(2)
+
+    result = getEdgesSync(queryWhere(2, "_from=2 or weight in (-1)"))
+    (result \ "results").as[List[JsValue]].size should be(2)
+
+    result = getEdgesSync(queryWhere(2, "_from=2 and weight in (10, 20)"))
+    (result \ "results").as[List[JsValue]].size should be(2)
+  }
+
+  test("get edge exclude") {
+    def queryExclude(id: Int) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "${testServiceName}",
+            "columnName": "${testColumnName}",
+            "id": ${id}
+           }],
+          "steps": [
+          [ {
+              "label": "${testLabelName}",
+              "direction": "out",
+              "offset": 0,
+              "limit": 2
+            },
+            {
+              "label": "${testLabelName}",
+              "direction": "in",
+              "offset": 0,
+              "limit": 2,
+              "exclude": true
+            }
+          ]]
+        }""")
+
+    val result = getEdgesSync(queryExclude(0))
+    (result \ "results").as[List[JsValue]].size should be(1)
+  }
+
+  test("get edge groupBy property") {
+    def queryGroupBy(id: Int, props: Seq[String]): JsValue = {
+      Json.obj(
+        "groupBy" -> props,
+        "srcVertices" -> Json.arr(
+          Json.obj("serviceName" -> testServiceName, "columnName" -> 
testColumnName, "id" -> id)
+        ),
+        "steps" -> Json.arr(
+          Json.obj(
+            "step" -> Json.arr(
+              Json.obj(
+                "label" -> testLabelName
+              )
+            )
+          )
+        )
+      )
+    }
+
+    val result = getEdgesSync(queryGroupBy(0, Seq("weight")))
+    (result \ "size").as[Int] should be(2)
+    val weights = (result \ "results" \\ "groupBy").map { js =>
+      (js \ "weight").as[Int]
+    }
+    weights should contain(30)
+    weights should contain(40)
+
+    weights should not contain (10)
+  }
+
+  test("edge transform") {
+    def queryTransform(id: Int, transforms: String) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "${testServiceName}",
+            "columnName": "${testColumnName}",
+            "id": ${id}
+           }],
+          "steps": [
+          [ {
+              "label": "${testLabelName}",
+              "direction": "out",
+              "offset": 0,
+              "transform": $transforms
+            }
+          ]]
+        }""")
+
+    var result = getEdgesSync(queryTransform(0, "[[\"_to\"]]"))
+    (result \ "results").as[List[JsValue]].size should be(2)
+
+    result = getEdgesSync(queryTransform(0, "[[\"weight\"]]"))
+    (result \ "results" \\ "to").map(_.toString).sorted should be((result \ 
"results" \\ "weight").map(_.toString).sorted)
+
+    result = getEdgesSync(queryTransform(0, "[[\"_from\"]]"))
+    (result \ "results" \\ "to").map(_.toString).sorted should be((result \ 
"results" \\ "from").map(_.toString).sorted)
+  }
+
+  test("index") {
+    def queryIndex(ids: Seq[Int], indexName: String) = {
+      val $from = Json.arr(
+        Json.obj("serviceName" -> testServiceName,
+          "columnName" -> testColumnName,
+          "ids" -> ids))
+
+      val $step = Json.arr(Json.obj("label" -> testLabelName, "index" -> 
indexName))
+      val $steps = Json.arr(Json.obj("step" -> $step))
+
+      val js = Json.obj("withScore" -> false, "srcVertices" -> $from, "steps" 
-> $steps)
+      js
+    }
+
+    // weight order
+    var result = getEdgesSync(queryIndex(Seq(0), "idx_1"))
+    ((result \ "results").as[List[JsValue]].head \\ "weight").head should 
be(JsNumber(40))
+
+    // timestamp order
+    result = getEdgesSync(queryIndex(Seq(0), "idx_2"))
+    ((result \ "results").as[List[JsValue]].head \\ "weight").head should 
be(JsNumber(30))
+  }
+
+  //    "checkEdges" in {
+  //      running(FakeApplication()) {
+  //        val json = Json.parse( s"""
+  //         [{"from": 0, "to": 1, "label": "$testLabelName"},
+  //          {"from": 0, "to": 2, "label": "$testLabelName"}]
+  //        """)
+  //
+  //        def checkEdges(queryJson: JsValue): JsValue = {
+  //          val ret = route(FakeRequest(POST, 
"/graphs/checkEdges").withJsonBody(queryJson)).get
+  //          contentAsJson(ret)
+  //        }
+  //
+  //        val res = checkEdges(json)
+  //        val typeRes = res.isInstanceOf[JsArray]
+  //        typeRes must equalTo(true)
+  //
+  //        val fst = res.as[Seq[JsValue]].head \ "to"
+  //        fst.as[Int] must equalTo(1)
+  //
+  //        val snd = res.as[Seq[JsValue]].last \ "to"
+  //        snd.as[Int] must equalTo(2)
+  //      }
+  //    }
+
+
+
+  test("duration") {
+    def queryDuration(ids: Seq[Int], from: Int, to: Int) = {
+      val $from = Json.arr(
+        Json.obj("serviceName" -> testServiceName,
+          "columnName" -> testColumnName,
+          "ids" -> ids))
+
+      val $step = Json.arr(Json.obj(
+        "label" -> testLabelName, "direction" -> "out", "offset" -> 0, "limit" 
-> 100,
+        "duration" -> Json.obj("from" -> from, "to" -> to)))
+
+      val $steps = Json.arr(Json.obj("step" -> $step))
+
+      Json.obj("srcVertices" -> $from, "steps" -> $steps)
+    }
+
+    // get all
+    var result = getEdgesSync(queryDuration(Seq(0, 2), from = 0, to = 5000))
+    (result \ "results").as[List[JsValue]].size should be(4)
+    // inclusive, exclusive
+    result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 4000))
+    (result \ "results").as[List[JsValue]].size should be(3)
+
+    result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 2000))
+    (result \ "results").as[List[JsValue]].size should be(1)
+
+    val bulkEdges = Seq(
+      toEdge(1001, insert, e, 0, 1, testLabelName, Json.obj(weight -> 10, 
is_hidden -> true)),
+      toEdge(2002, insert, e, 0, 2, testLabelName, Json.obj(weight -> 20, 
is_hidden -> false)),
+      toEdge(3003, insert, e, 2, 0, testLabelName, Json.obj(weight -> 30)),
+      toEdge(4004, insert, e, 2, 1, testLabelName, Json.obj(weight -> 40))
+    )
+    insertEdgesSync(bulkEdges: _*)
+
+    // duration test after udpate
+    // get all
+    result = getEdgesSync(queryDuration(Seq(0, 2), from = 0, to = 5000))
+    (result \ "results").as[List[JsValue]].size should be(4)
+
+    // inclusive, exclusive
+    result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 4000))
+    (result \ "results").as[List[JsValue]].size should be(3)
+
+    result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 2000))
+    (result \ "results").as[List[JsValue]].size should be(1)
+
+  }
+
+
+  test("return tree") {
+    def queryParents(id: Long) = Json.parse(
+      s"""
+        {
+          "returnTree": true,
+          "srcVertices": [
+          { "serviceName": "$testServiceName",
+            "columnName": "$testColumnName",
+            "id": $id
+           }],
+          "steps": [
+          [ {
+              "label": "$testLabelName",
+              "direction": "out",
+              "offset": 0,
+              "limit": 2
+            }
+          ],[{
+              "label": "$testLabelName",
+              "direction": "in",
+              "offset": 0,
+              "limit": -1
+            }
+          ]]
+        }""".stripMargin)
+
+    val src = 100
+    val tgt = 200
+
+    insertEdgesSync(toEdge(1001, "insert", "e", src, tgt, testLabelName))
+
+    val result = TestUtil.getEdgesSync(queryParents(src))
+    val parents = (result \ "results").as[Seq[JsValue]]
+    val ret = parents.forall {
+      edge => (edge \ "parents").as[Seq[JsValue]].size == 1
+    }
+
+    ret should be(true)
+  }
+
+
+
+//  test("pagination and _to") {
+//    def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: 
Int) = Json.parse(
+//      s"""
+//        { "srcVertices": [
+//          { "serviceName": "${testServiceName}",
+//            "columnName": "${testColumnName}",
+//            "id": ${id}
+//           }],
+//          "steps": [
+//          [ {
+//              "label": "${testLabelName}",
+//              "direction": "out",
+//              "offset": $offset,
+//              "limit": $limit,
+//              "_to": $to
+//            }
+//          ]]
+//        }
+//        """)
+//
+//    val src = System.currentTimeMillis().toInt
+//
+//    val bulkEdges = Seq(
+//      toEdge(1001, insert, e, src, 1, testLabelName, Json.obj(weight -> 10, 
is_hidden -> true)),
+//      toEdge(2002, insert, e, src, 2, testLabelName, Json.obj(weight -> 20, 
is_hidden -> false)),
+//      toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)),
+//      toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40))
+//    )
+//    insertEdgesSync(bulkEdges: _*)
+//
+//    var result = getEdgesSync(querySingle(src, offset = 0, limit = 2))
+//    var edges = (result \ "results").as[List[JsValue]]
+//
+//    edges.size should be(2)
+//    (edges(0) \ "to").as[Long] should be(4)
+//    (edges(1) \ "to").as[Long] should be(3)
+//
+//    result = getEdgesSync(querySingle(src, offset = 1, limit = 2))
+//
+//    edges = (result \ "results").as[List[JsValue]]
+//    edges.size should be(2)
+//    (edges(0) \ "to").as[Long] should be(3)
+//    (edges(1) \ "to").as[Long] should be(2)
+//
+//    result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to 
= 1))
+//    edges = (result \ "results").as[List[JsValue]]
+//    edges.size should be(1)
+//  }
+
+  test("order by") {
+    def queryScore(id: Int, scoring: Map[String, Int]): JsValue = Json.obj(
+      "srcVertices" -> Json.arr(
+        Json.obj(
+          "serviceName" -> testServiceName,
+          "columnName" -> testColumnName,
+          "id" -> id
+        )
+      ),
+      "steps" -> Json.arr(
+        Json.obj(
+          "step" -> Json.arr(
+            Json.obj(
+              "label" -> testLabelName,
+              "scoring" -> scoring
+            )
+          )
+        )
+      )
+    )
+    def queryOrderBy(id: Int, scoring: Map[String, Int], props: 
Seq[Map[String, String]]): JsValue = Json.obj(
+      "orderBy" -> props,
+      "srcVertices" -> Json.arr(
+        Json.obj("serviceName" -> testServiceName, "columnName" -> 
testColumnName, "id" -> id)
+      ),
+      "steps" -> Json.arr(
+        Json.obj(
+          "step" -> Json.arr(
+            Json.obj(
+              "label" -> testLabelName,
+              "scoring" -> scoring
+            )
+          )
+        )
+      )
+    )
+
+    val bulkEdges = Seq(
+      toEdge(1001, insert, e, 0, 1, testLabelName, Json.obj(weight -> 10, 
is_hidden -> true)),
+      toEdge(2002, insert, e, 0, 2, testLabelName, Json.obj(weight -> 20, 
is_hidden -> false)),
+      toEdge(3003, insert, e, 2, 0, testLabelName, Json.obj(weight -> 30)),
+      toEdge(4004, insert, e, 2, 1, testLabelName, Json.obj(weight -> 40))
+    )
+
+    insertEdgesSync(bulkEdges: _*)
+
+    // get edges
+    val edges = getEdgesSync(queryScore(0, Map("weight" -> 1)))
+    val orderByScore = getEdgesSync(queryOrderBy(0, Map("weight" -> 1), 
Seq(Map("score" -> "DESC", "timestamp" -> "DESC"))))
+    val ascOrderByScore = getEdgesSync(queryOrderBy(0, Map("weight" -> 1), 
Seq(Map("score" -> "ASC", "timestamp" -> "DESC"))))
+
+    val edgesTo = edges \ "results" \\ "to"
+    val orderByTo = orderByScore \ "results" \\ "to"
+    val ascOrderByTo = ascOrderByScore \ "results" \\ "to"
+
+    edgesTo should be(Seq(JsNumber(2), JsNumber(1)))
+    edgesTo should be(orderByTo)
+    ascOrderByTo should be(Seq(JsNumber(1), JsNumber(2)))
+    edgesTo.reverse should be(ascOrderByTo)
+  }
+
+
+  test("query with sampling") {
+    def queryWithSampling(id: Int, sample: Int) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "$testServiceName",
+            "columnName": "$testColumnName",
+            "id": $id
+           }],
+          "steps": [
+            {
+              "step": [{
+                "label": "$testLabelName",
+                "direction": "out",
+                "offset": 0,
+                "limit": 100,
+                "sample": $sample
+                }]
+            }
+          ]
+        }""")
+
+    def twoStepQueryWithSampling(id: Int, sample: Int) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "$testServiceName",
+            "columnName": "$testColumnName",
+            "id": $id
+           }],
+          "steps": [
+            {
+              "step": [{
+                "label": "$testLabelName",
+                "direction": "out",
+                "offset": 0,
+                "limit": 100,
+                "sample": $sample
+                }]
+            },
+            {
+               "step": [{
+                 "label": "$testLabelName",
+                 "direction": "out",
+                 "offset": 0,
+                 "limit": 100,
+                 "sample": $sample
+               }]
+            }
+          ]
+        }""")
+
+    def twoQueryWithSampling(id: Int, sample: Int) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "$testServiceName",
+            "columnName": "$testColumnName",
+            "id": $id
+           }],
+          "steps": [
+            {
+              "step": [{
+                "label": "$testLabelName",
+                "direction": "out",
+                "offset": 0,
+                "limit": 50,
+                "sample": $sample
+              },
+              {
+                "label": "$testLabelName2",
+                "direction": "out",
+                "offset": 0,
+                "limit": 50
+              }]
+            }
+          ]
+        }""")
+
+    val sampleSize = 2
+    val ts = "1442985659166"
+    val testId = 22
+
+    val bulkEdges = Seq(
+      toEdge(ts, insert, e, testId, 122, testLabelName),
+      toEdge(ts, insert, e, testId, 222, testLabelName),
+      toEdge(ts, insert, e, testId, 322, testLabelName),
+
+      toEdge(ts, insert, e, testId, 922, testLabelName2),
+      toEdge(ts, insert, e, testId, 222, testLabelName2),
+      toEdge(ts, insert, e, testId, 322, testLabelName2),
+
+      toEdge(ts, insert, e, 122, 1122, testLabelName),
+      toEdge(ts, insert, e, 122, 1222, testLabelName),
+      toEdge(ts, insert, e, 122, 1322, testLabelName),
+      toEdge(ts, insert, e, 222, 2122, testLabelName),
+      toEdge(ts, insert, e, 222, 2222, testLabelName),
+      toEdge(ts, insert, e, 222, 2322, testLabelName),
+      toEdge(ts, insert, e, 322, 3122, testLabelName),
+      toEdge(ts, insert, e, 322, 3222, testLabelName),
+      toEdge(ts, insert, e, 322, 3322, testLabelName)
+    )
+
+    insertEdgesSync(bulkEdges: _*)
+
+    val result1 = getEdgesSync(queryWithSampling(testId, sampleSize))
+    (result1 \ "results").as[List[JsValue]].size should 
be(math.min(sampleSize, bulkEdges.size))
+
+    val result2 = getEdgesSync(twoStepQueryWithSampling(testId, sampleSize))
+    (result2 \ "results").as[List[JsValue]].size should be(math.min(sampleSize 
* sampleSize, bulkEdges.size * bulkEdges.size))
+
+    val result3 = getEdgesSync(twoQueryWithSampling(testId, sampleSize))
+    (result3 \ "results").as[List[JsValue]].size should be(sampleSize + 3) // 
edges in testLabelName2 = 3
+  }
+  test("test query with filterOut query") {
+    def queryWithFilterOut(id1: String, id2: String) = Json.parse(
+      s"""{
+         |     "limit": 10,
+         |     "filterOut": {
+         |             "srcVertices": [{
+         |                     "serviceName": "$testServiceName",
+         |                     "columnName": "$testColumnName",
+         |                     "id": $id1
+         |             }],
+         |             "steps": [{
+         |                     "step": [{
+         |                             "label": "$testLabelName",
+         |                             "direction": "out",
+         |                             "offset": 0,
+         |                             "limit": 10
+         |                     }]
+         |             }]
+         |     },
+         |     "srcVertices": [{
+         |             "serviceName": "$testServiceName",
+         |             "columnName": "$testColumnName",
+         |             "id": $id2
+         |     }],
+         |     "steps": [{
+         |             "step": [{
+         |                     "label": "$testLabelName",
+         |                     "direction": "out",
+         |                     "offset": 0,
+         |                     "limit": 5
+         |             }]
+         |     }]
+         |}
+       """.stripMargin
+    )
+
+    val testId1 = "-23"
+    val testId2 = "-25"
+
+    val bulkEdges = Seq(
+      toEdge(1, insert, e, testId1, 111, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(2, insert, e, testId1, 222, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(3, insert, e, testId1, 333, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(4, insert, e, testId2, 111, testLabelName, Json.obj(weight -> 1)),
+      toEdge(5, insert, e, testId2, 333, testLabelName, Json.obj(weight -> 1)),
+      toEdge(6, insert, e, testId2, 555, testLabelName, Json.obj(weight -> 1))
+    )
+    logger.debug(s"${bulkEdges.mkString("\n")}")
+    insertEdgesSync(bulkEdges: _*)
+
+    val rs = getEdgesSync(queryWithFilterOut(testId1, testId2))
+    logger.debug(Json.prettyPrint(rs))
+    val results = (rs \ "results").as[List[JsValue]]
+    results.size should be(1)
+    (results(0) \ "to").toString should be("555")
+  }
+
+
+  /** note that this merge two different label result into one */
+  test("weighted union") {
+    def queryWithWeightedUnion(id1: String, id2: String) = Json.parse(
+      s"""
+                               |{
+                               |  "limit": 10,
+                               |  "weights": [
+                               |    10,
+                               |    1
+                               |  ],
+                               |  "groupBy": ["weight"],
+                               |  "queries": [
+                               |    {
+                               |      "srcVertices": [
+                               |        {
+                               |          "serviceName": "$testServiceName",
+                               |          "columnName": "$testColumnName",
+                               |          "id": $id1
+                               |        }
+                               |      ],
+                               |      "steps": [
+                               |        {
+                               |          "step": [
+                               |            {
+                               |              "label": "$testLabelName",
+                               |              "direction": "out",
+                               |              "offset": 0,
+                               |              "limit": 5
+                               |            }
+                               |          ]
+                               |        }
+                               |      ]
+                               |    },
+                               |    {
+                               |      "srcVertices": [
+                               |        {
+                               |          "serviceName": "$testServiceName",
+                               |          "columnName": "$testColumnName",
+                               |          "id": $id2
+                               |        }
+                               |      ],
+                               |      "steps": [
+                               |        {
+                               |          "step": [
+                               |            {
+                               |              "label": "$testLabelName2",
+                               |              "direction": "out",
+                               |              "offset": 0,
+                               |              "limit": 5
+                               |            }
+                               |          ]
+                               |        }
+                               |      ]
+                               |    }
+                               |  ]
+                               |}
+       """.stripMargin
+    )
+
+    val testId1 = "1"
+    val testId2 = "2"
+
+    val bulkEdges = Seq(
+      toEdge(1, insert, e, testId1, 111, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(2, insert, e, testId1, 222, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(3, insert, e, testId1, 333, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(4, insert, e, testId2, 444, testLabelName2, Json.obj(weight -> 
1)),
+      toEdge(5, insert, e, testId2, 555, testLabelName2, Json.obj(weight -> 
1)),
+      toEdge(6, insert, e, testId2, 666, testLabelName2, Json.obj(weight -> 1))
+    )
+
+    insertEdgesSync(bulkEdges: _*)
+
+    val rs = getEdgesSync(queryWithWeightedUnion(testId1, testId2))
+    logger.debug(Json.prettyPrint(rs))
+    val results = (rs \ "results").as[List[JsValue]]
+    results.size should be(2)
+    (results(0) \ "scoreSum").as[Float] should be(30.0)
+    (results(0) \ "agg").as[List[JsValue]].size should be(3)
+    (results(1) \ "scoreSum").as[Float] should be(3.0)
+    (results(1) \ "agg").as[List[JsValue]].size should be(3)
+  }
+
+  test("weighted union with options") {
+    def queryWithWeightedUnionWithOptions(id1: String, id2: String) = 
Json.parse(
+      s"""
+         |{
+         |  "limit": 10,
+         |  "weights": [
+         |    10,
+         |    1
+         |  ],
+         |  "groupBy": ["to"],
+         |  "select": ["to", "weight"],
+         |  "filterOut": {
+         |    "srcVertices": [
+         |      {
+         |        "serviceName": "$testServiceName",
+         |        "columnName": "$testColumnName",
+         |        "id": $id1
+         |      }
+         |    ],
+         |    "steps": [
+         |      {
+         |        "step": [
+         |          {
+         |            "label": "$testLabelName",
+         |            "direction": "out",
+         |            "offset": 0,
+         |            "limit": 10
+         |          }
+         |        ]
+         |      }
+         |    ]
+         |  },
+         |  "queries": [
+         |    {
+         |      "srcVertices": [
+         |        {
+         |          "serviceName": "$testServiceName",
+         |          "columnName": "$testColumnName",
+         |          "id": $id1
+         |        }
+         |      ],
+         |      "steps": [
+         |        {
+         |          "step": [
+         |            {
+         |              "label": "$testLabelName",
+         |              "direction": "out",
+         |              "offset": 0,
+         |              "limit": 5
+         |            }
+         |          ]
+         |        }
+         |      ]
+         |    },
+         |    {
+         |      "srcVertices": [
+         |        {
+         |          "serviceName": "$testServiceName",
+         |          "columnName": "$testColumnName",
+         |          "id": $id2
+         |        }
+         |      ],
+         |      "steps": [
+         |        {
+         |          "step": [
+         |            {
+         |              "label": "$testLabelName2",
+         |              "direction": "out",
+         |              "offset": 0,
+         |              "limit": 5
+         |            }
+         |          ]
+         |        }
+         |      ]
+         |    }
+         |  ]
+         |}
+       """.stripMargin
+    )
+
+    val testId1 = "-192848"
+    val testId2 = "-193849"
+
+    val bulkEdges = Seq(
+      toEdge(1, insert, e, testId1, 111, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(2, insert, e, testId1, 222, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(3, insert, e, testId1, 333, testLabelName, Json.obj(weight -> 
10)),
+      toEdge(4, insert, e, testId2, 111, testLabelName2, Json.obj(weight -> 
1)),
+      toEdge(5, insert, e, testId2, 333, testLabelName2, Json.obj(weight -> 
1)),
+      toEdge(6, insert, e, testId2, 555, testLabelName2, Json.obj(weight -> 1))
+    )
+
+    insertEdgesSync(bulkEdges: _*)
+
+    val rs = getEdgesSync(queryWithWeightedUnionWithOptions(testId1, testId2))
+    logger.debug(Json.prettyPrint(rs))
+    val results = (rs \ "results").as[List[JsValue]]
+    results.size should be(1)
+
+  }
+
+  test("scoreThreshold") {
+    def queryWithScoreThreshold(id: String, scoreThreshold: Int) = Json.parse(
+      s"""{
+         |  "limit": 10,
+         |  "scoreThreshold": $scoreThreshold,
+         |  "groupBy": ["to"],
+         |  "srcVertices": [
+         |    {
+         |      "serviceName": "$testServiceName",
+         |      "columnName": "$testColumnName",
+         |      "id": $id
+         |    }
+         |  ],
+         |  "steps": [
+         |    {
+         |      "step": [
+         |        {
+         |          "label": "$testLabelName",
+         |          "direction": "out",
+         |          "offset": 0,
+         |          "limit": 10
+         |        }
+         |      ]
+         |    },
+         |    {
+         |      "step": [
+         |        {
+         |          "label": "$testLabelName",
+         |          "direction": "out",
+         |          "offset": 0,
+         |          "limit": 10
+         |        }
+         |      ]
+         |    }
+         |  ]
+         |}
+       """.stripMargin
+    )
+
+    val testId = "-23903"
+
+    val bulkEdges = Seq(
+      toEdge(1, insert, e, testId, 101, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, testId, 102, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, testId, 103, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, 101, 102, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, 101, 103, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, 101, 104, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, 102, 103, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, 102, 104, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, 103, 105, testLabelName, Json.obj(weight -> 10))
+    )
+    // expected: 104 -> 2, 103 -> 2, 102 -> 1,, 105 -> 1
+    insertEdgesSync(bulkEdges: _*)
+
+    var rs = getEdgesSync(queryWithScoreThreshold(testId, 2))
+    logger.debug(Json.prettyPrint(rs))
+    var results = (rs \ "results").as[List[JsValue]]
+    results.size should be(2)
+
+    rs = getEdgesSync(queryWithScoreThreshold(testId, 1))
+    logger.debug(Json.prettyPrint(rs))
+
+    results = (rs \ "results").as[List[JsValue]]
+    results.size should be(4)
+  }
+
+  def querySingle(id: Int, offset: Int = 0, limit: Int = 100) = Json.parse(
+    s"""
+          { "srcVertices": [
+            { "serviceName": "$testServiceName",
+              "columnName": "$testColumnName",
+              "id": $id
+             }],
+            "steps": [
+            [ {
+                "label": "$testLabelName",
+                "direction": "out",
+                "offset": $offset,
+                "limit": $limit
+              }
+            ]]
+          }
+          """)
+
+  def queryGlobalLimit(id: Int, limit: Int): JsValue = Json.obj(
+    "limit" -> limit,
+    "srcVertices" -> Json.arr(
+      Json.obj("serviceName" -> testServiceName, "columnName" -> 
testColumnName, "id" -> id)
+    ),
+    "steps" -> Json.arr(
+      Json.obj(
+        "step" -> Json.arr(
+          Json.obj(
+            "label" -> testLabelName
+          )
+        )
+      )
+    )
+  )
+
+
+  // called by each test, each
+  override def beforeEach = initTestData()
+
+  // called by start test, once
+  override def initTestData(): Unit = {
+    super.initTestData()
+
+    insertEdgesSync(
+      toEdge(1000, insert, e, 0, 1, testLabelName, Json.obj(weight -> 40, 
is_hidden -> true)),
+      toEdge(2000, insert, e, 0, 2, testLabelName, Json.obj(weight -> 30, 
is_hidden -> false)),
+      toEdge(3000, insert, e, 2, 0, testLabelName, Json.obj(weight -> 20)),
+      toEdge(4000, insert, e, 2, 1, testLabelName, Json.obj(weight -> 10)),
+      toEdge(3000, insert, e, 10, 20, testLabelName, Json.obj(weight -> 20)),
+      toEdge(4000, insert, e, 20, 20, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, -1, 1000, testLabelName),
+      toEdge(1, insert, e, -1, 2000, testLabelName),
+      toEdge(1, insert, e, -1, 3000, testLabelName),
+      toEdge(1, insert, e, 1000, 10000, testLabelName),
+      toEdge(1, insert, e, 1000, 11000, testLabelName),
+      toEdge(1, insert, e, 2000, 11000, testLabelName),
+      toEdge(1, insert, e, 2000, 12000, testLabelName),
+      toEdge(1, insert, e, 3000, 12000, testLabelName),
+      toEdge(1, insert, e, 3000, 13000, testLabelName),
+      toEdge(1, insert, e, 10000, 100000, testLabelName),
+      toEdge(2, insert, e, 11000, 200000, testLabelName),
+      toEdge(3, insert, e, 12000, 300000, testLabelName)
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala
 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala
new file mode 100644
index 0000000..1c179bf
--- /dev/null
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala
@@ -0,0 +1,283 @@
+package org.apache.s2graph.core.Integrate
+
+import java.util.concurrent.TimeUnit
+
+import play.api.libs.json.{JsValue, Json}
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+import scala.util.Random
+
+class StrongLabelDeleteTest extends IntegrateCommon {
+
+  import StrongDeleteUtil._
+  import TestUtil._
+
+  test("Strong consistency select") {
+    insertEdgesSync(bulkEdges(): _*)
+
+    var result = getEdgesSync(query(0))
+    (result \ "results").as[List[JsValue]].size should be(2)
+    result = getEdgesSync(query(10))
+    (result \ "results").as[List[JsValue]].size should be(2)
+  }
+
+  test("Strong consistency deleteAll") {
+    val deletedAt = 100
+    var result = getEdgesSync(query(20, direction = "in", columnName = 
testTgtColumnName))
+
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(3)
+
+    val deleteParam = Json.arr(
+      Json.obj("label" -> testLabelName2,
+        "direction" -> "in",
+        "ids" -> Json.arr("20"),
+        "timestamp" -> deletedAt))
+
+    deleteAllSync(deleteParam)
+
+    result = getEdgesSync(query(11, direction = "out"))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(0)
+
+    result = getEdgesSync(query(12, direction = "out"))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(0)
+
+    result = getEdgesSync(query(10, direction = "out"))
+    println(result)
+    // 10 -> out -> 20 should not be in result.
+    (result \ "results").as[List[JsValue]].size should be(1)
+    (result \\ "to").size should be(1)
+    (result \\ "to").head.as[String] should be("21")
+
+    result = getEdgesSync(query(20, direction = "in", columnName = 
testTgtColumnName))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(0)
+
+    insertEdgesSync(bulkEdges(startTs = deletedAt + 1): _*)
+
+    result = getEdgesSync(query(20, direction = "in", columnName = 
testTgtColumnName))
+    println(result)
+
+    (result \ "results").as[List[JsValue]].size should be(3)
+  }
+
+
+  test("update delete") {
+    val ret = for {
+      i <- 0 until testNum
+    } yield {
+        val src = (i + 1) * 10000
+//      val src = System.currentTimeMillis()
+
+      val (ret, last) = testInner(i, src)
+      ret should be(true)
+      ret
+    }
+
+    ret.forall(identity)
+  }
+
+  test("update delete 2") {
+    val src = System.currentTimeMillis()
+    var ts = 0L
+
+    val ret = for {
+      i <- 0 until testNum
+    } yield {
+      val (ret, lastTs) = testInner(ts, src)
+      val deletedAt = lastTs + 1
+      val deletedAt2 = lastTs + 2
+      ts = deletedAt2 + 1 // nex start ts
+
+      ret should be(true)
+
+      val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> 
Json.arr(src), "timestamp" -> deletedAt))
+      val deleteAllRequest2 = Json.arr(Json.obj("label" -> labelName, "ids" -> 
Json.arr(src), "timestamp" -> deletedAt2))
+
+      val deleteRet = deleteAllSync(deleteAllRequest)
+      val deleteRet2 = deleteAllSync(deleteAllRequest2)
+
+      val result = getEdgesSync(query(id = src))
+      println(result)
+
+      val resultEdges = (result \ "results").as[Seq[JsValue]]
+      resultEdges.isEmpty should be(true)
+
+      val degreeAfterDeleteAll = getDegree(result)
+
+      degreeAfterDeleteAll should be(0)
+      degreeAfterDeleteAll === (0)
+    }
+
+    ret.forall(identity)
+  }
+
+  /** This test stress out test on degree
+    * when contention is low but number of adjacent edges are large
+    * Large set of contention test
+  */
+  test("large degrees") {
+    val labelName = testLabelName2
+    val dir = "out"
+    val maxSize = 100
+    val deleteSize = 10
+    val numOfConcurrentBatch = 100
+    val src = System.currentTimeMillis()
+    val tgts = (0 until maxSize).map { ith => src + ith }
+    val deleteTgts = Random.shuffle(tgts).take(deleteSize)
+    val insertRequests = tgts.map { tgt =>
+      Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t")
+    }
+    val deleteRequests = deleteTgts.take(deleteSize).map { tgt =>
+      Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", 
dir).mkString("\t")
+    }
+    val allRequests = Random.shuffle(insertRequests ++ deleteRequests)
+    //        val allRequests = insertRequests ++ deleteRequests
+    val futures = allRequests.grouped(numOfConcurrentBatch).map { bulkRequests 
=>
+      insertEdgesAsync(bulkRequests: _*)
+    }
+
+    Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES))
+
+    val expectedDegree = insertRequests.size - deleteRequests.size
+    val queryJson = query(id = src)
+    val result = getEdgesSync(queryJson)
+    val resultSize = (result \ "size").as[Long]
+    val resultDegree = getDegree(result)
+
+    //        println(result)
+
+    val ret = resultSize == expectedDegree && resultDegree == resultSize
+    println(s"[MaxSize]: $maxSize")
+    println(s"[DeleteSize]: $deleteSize")
+    println(s"[ResultDegree]: $resultDegree")
+    println(s"[ExpectedDegree]: $expectedDegree")
+    println(s"[ResultSize]: $resultSize")
+    ret should be(true)
+  }
+
+  test("deleteAll") {
+    val labelName = testLabelName2
+    val dir = "out"
+    val maxSize = 100
+    val deleteSize = 10
+    val numOfConcurrentBatch = 100
+    val src = System.currentTimeMillis()
+    val tgts = (0 until maxSize).map { ith => src + ith }
+    val deleteTgts = Random.shuffle(tgts).take(deleteSize)
+    val insertRequests = tgts.map { tgt =>
+      Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t")
+    }
+    val deleteRequests = deleteTgts.take(deleteSize).map { tgt =>
+      Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", 
dir).mkString("\t")
+    }
+    val allRequests = Random.shuffle(insertRequests ++ deleteRequests)
+    val futures = allRequests.grouped(numOfConcurrentBatch).map { bulkRequests 
=>
+      insertEdgesAsync(bulkRequests: _*)
+    }
+
+    Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES))
+
+    val deletedAt = System.currentTimeMillis()
+    val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> 
Json.arr(src), "timestamp" -> deletedAt))
+
+    deleteAllSync(deleteAllRequest)
+
+    val result = getEdgesSync(query(id = src))
+    println(result)
+    val resultEdges = (result \ "results").as[Seq[JsValue]]
+    resultEdges.isEmpty should be(true)
+
+    val degreeAfterDeleteAll = getDegree(result)
+    degreeAfterDeleteAll should be(0)
+  }
+
+  object StrongDeleteUtil {
+
+    val labelName = testLabelName2
+//    val labelName = testLabelName
+    val maxTgtId = 10
+    val batchSize = 10
+    val testNum = 100
+    val numOfBatch = 10
+
+    def testInner(startTs: Long, src: Long) = {
+      val lastOps = Array.fill(maxTgtId)("none")
+      var currentTs = startTs
+
+      val allRequests = for {
+        ith <- 0 until numOfBatch
+        jth <- 0 until batchSize
+      } yield {
+        currentTs += 1
+
+        val tgt = Random.nextInt(maxTgtId)
+        val op = if (Random.nextDouble() < 0.5) "delete" else "update"
+
+        lastOps(tgt) = op
+        Seq(currentTs, op, "e", src, tgt, labelName, "{}").mkString("\t")
+      }
+
+      allRequests.foreach(println(_))
+
+      val futures = Random.shuffle(allRequests).grouped(batchSize).map { 
bulkRequests =>
+        insertEdgesAsync(bulkRequests: _*)
+      }
+
+      Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES))
+
+      val expectedDegree = lastOps.count(op => op != "delete" && op != "none")
+      val queryJson = query(id = src)
+      val result = getEdgesSync(queryJson)
+      val resultSize = (result \ "size").as[Long]
+      val resultDegree = getDegree(result)
+
+      println(lastOps.toList)
+      println(result)
+
+      val ret = resultDegree == expectedDegree && resultSize == resultDegree
+      if (!ret) System.err.println(s"[Contention Failed]: $resultDegree, 
$expectedDegree")
+
+      (ret, currentTs)
+    }
+
+    def bulkEdges(startTs: Int = 0) = Seq(
+      toEdge(startTs + 1, "insert", "e", "0", "1", labelName, s"""{"time": 
10}"""),
+      toEdge(startTs + 2, "insert", "e", "0", "1", labelName, s"""{"time": 
11}"""),
+      toEdge(startTs + 3, "insert", "e", "0", "1", labelName, s"""{"time": 
12}"""),
+      toEdge(startTs + 4, "insert", "e", "0", "2", labelName, s"""{"time": 
10}"""),
+      toEdge(startTs + 5, "insert", "e", "10", "20", labelName, s"""{"time": 
10}"""),
+      toEdge(startTs + 6, "insert", "e", "10", "21", labelName, s"""{"time": 
11}"""),
+      toEdge(startTs + 7, "insert", "e", "11", "20", labelName, s"""{"time": 
12}"""),
+      toEdge(startTs + 8, "insert", "e", "12", "20", labelName, s"""{"time": 
13}""")
+    )
+
+    def query(id: Long, serviceName: String = testServiceName, columnName: 
String = testColumnName,
+              _labelName: String = labelName, direction: String = "out") = 
Json.parse(
+      s"""
+          { "srcVertices": [
+            { "serviceName": "$serviceName",
+              "columnName": "$columnName",
+              "id": $id
+             }],
+            "steps": [
+            [ {
+                "label": "${_labelName}",
+                "direction": "${direction}",
+                "offset": 0,
+                "limit": -1,
+                "duplicate": "raw"
+              }
+            ]]
+          }""")
+
+    def getDegree(jsValue: JsValue): Long = {
+      ((jsValue \ "degrees") \\ 
"_degree").headOption.map(_.as[Long]).getOrElse(0L)
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala
 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala
new file mode 100644
index 0000000..b8bfcf6
--- /dev/null
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala
@@ -0,0 +1,71 @@
+package org.apache.s2graph.core.Integrate
+
+import org.apache.s2graph.core.PostProcess
+import play.api.libs.json.{JsValue, Json}
+
+import scala.concurrent.Await
+import scala.util.Random
+
+
+class VertexTestHelper extends IntegrateCommon {
+
+  import TestUtil._
+  import VertexTestHelper._
+
+  test("vertex") {
+    val ids = (7 until 20).map(tcNum => tcNum * 1000 + 0)
+    val (serviceName, columnName) = (testServiceName, testColumnName)
+
+    val data = vertexInsertsPayload(serviceName, columnName, ids)
+    val payload = Json.parse(Json.toJson(data).toString)
+    println(payload)
+
+    val vertices = parser.toVertices(payload, "insert", Option(serviceName), 
Option(columnName))
+    Await.result(graph.mutateVertices(vertices, withWait = true), 
HttpRequestWaitingTime)
+
+    val res = graph.getVertices(vertices).map { vertices =>
+      PostProcess.verticesToJson(vertices)
+    }
+
+    val ret = Await.result(res, HttpRequestWaitingTime)
+    val fetched = ret.as[Seq[JsValue]]
+    for {
+      (d, f) <- data.zip(fetched)
+    } yield {
+      (d \ "id") should be(f \ "id")
+      ((d \ "props") \ "age") should be((f \ "props") \ "age")
+    }
+  }
+
+  object VertexTestHelper {
+    def vertexQueryJson(serviceName: String, columnName: String, ids: 
Seq[Int]) = {
+      Json.parse(
+        s"""
+           |[
+           |{"serviceName": "$serviceName", "columnName": "$columnName", 
"ids": [${ids.mkString(",")}
+         ]}
+           |]
+       """.stripMargin)
+    }
+
+    def vertexInsertsPayload(serviceName: String, columnName: String, ids: 
Seq[Int]): Seq[JsValue] = {
+      ids.map { id =>
+        Json.obj("id" -> id, "props" -> randomProps, "timestamp" -> 
System.currentTimeMillis())
+      }
+    }
+
+    val vertexPropsKeys = List(
+      ("age", "int")
+    )
+
+    def randomProps() = {
+      (for {
+        (propKey, propType) <- vertexPropsKeys
+      } yield {
+        propKey -> Random.nextInt(100)
+      }).toMap
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
new file mode 100644
index 0000000..81c57c6
--- /dev/null
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
@@ -0,0 +1,136 @@
+package org.apache.s2graph.core.Integrate
+
+import java.util.concurrent.TimeUnit
+
+import org.scalatest.BeforeAndAfterEach
+import play.api.libs.json.{JsObject, JsValue, Json}
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach {
+
+  import TestUtil._
+  import WeakLabelDeleteHelper._
+
+  test("test weak consistency select") {
+    var result = getEdgesSync(query(0))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(4)
+    result = getEdgesSync(query(10))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(2)
+  }
+
+  test("test weak consistency delete") {
+    var result = getEdgesSync(query(0))
+    println(result)
+
+    /** expect 4 edges */
+    (result \ "results").as[List[JsValue]].size should be(4)
+    val edges = (result \ "results").as[List[JsObject]]
+    val edgesToStore = parser.toEdges(Json.toJson(edges), "delete")
+    val rets = graph.mutateEdges(edgesToStore, withWait = true)
+    Await.result(rets, Duration(20, TimeUnit.MINUTES))
+
+    /** expect noting */
+    result = getEdgesSync(query(0))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(0)
+
+    /** insert should be ignored */
+    /**
+     * I am wondering if this is right test case
+     * This makes sense because hbase think cell is deleted when there are
+     * insert/delete with same timestamp(version) on same cell.
+     * This can be different on different storage system so I think
+     * this test should be removed.
+     */
+//    val edgesToStore2 = parser.toEdges(Json.toJson(edges), "insert")
+//    val rets2 = graph.mutateEdges(edgesToStore2, withWait = true)
+//    Await.result(rets2, Duration(20, TimeUnit.MINUTES))
+//
+//    result = getEdgesSync(query(0))
+//    (result \ "results").as[List[JsValue]].size should be(0)
+  }
+
+
+  test("test weak consistency deleteAll") {
+    val deletedAt = 100
+    var result = getEdgesSync(query(20, "in", testTgtColumnName))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(3)
+
+    val json = Json.arr(Json.obj("label" -> testLabelNameWeak,
+      "direction" -> "in", "ids" -> Json.arr("20"), "timestamp" -> deletedAt))
+    println(json)
+    deleteAllSync(json)
+
+    result = getEdgesSync(query(11, "out"))
+    (result \ "results").as[List[JsValue]].size should be(0)
+
+    result = getEdgesSync(query(12, "out"))
+    (result \ "results").as[List[JsValue]].size should be(0)
+
+    result = getEdgesSync(query(10, "out"))
+
+    // 10 -> out -> 20 should not be in result.
+    (result \ "results").as[List[JsValue]].size should be(1)
+    (result \\ "to").size should be(1)
+    (result \\ "to").head.as[String] should be("21")
+
+    result = getEdgesSync(query(20, "in", testTgtColumnName))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(0)
+
+    insertEdgesSync(bulkEdges(startTs = deletedAt + 1): _*)
+
+    result = getEdgesSync(query(20, "in", testTgtColumnName))
+    (result \ "results").as[List[JsValue]].size should be(3)
+  }
+
+
+  // called by each test, each
+  override def beforeEach = initTestData()
+
+  // called by start test, once
+
+  override def initTestData(): Unit = {
+    super.initTestData()
+    insertEdgesSync(bulkEdges(): _*)
+  }
+
+  object WeakLabelDeleteHelper {
+
+    def bulkEdges(startTs: Int = 0) = Seq(
+      toEdge(startTs + 1, "insert", "e", "0", "1", testLabelNameWeak, 
s"""{"time": 10}"""),
+      toEdge(startTs + 2, "insert", "e", "0", "1", testLabelNameWeak, 
s"""{"time": 11}"""),
+      toEdge(startTs + 3, "insert", "e", "0", "1", testLabelNameWeak, 
s"""{"time": 12}"""),
+      toEdge(startTs + 4, "insert", "e", "0", "2", testLabelNameWeak, 
s"""{"time": 10}"""),
+      toEdge(startTs + 5, "insert", "e", "10", "20", testLabelNameWeak, 
s"""{"time": 10}"""),
+      toEdge(startTs + 6, "insert", "e", "10", "21", testLabelNameWeak, 
s"""{"time": 11}"""),
+      toEdge(startTs + 7, "insert", "e", "11", "20", testLabelNameWeak, 
s"""{"time": 12}"""),
+      toEdge(startTs + 8, "insert", "e", "12", "20", testLabelNameWeak, 
s"""{"time": 13}""")
+    )
+
+    def query(id: Int, direction: String = "out", columnName: String = 
testColumnName) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "$testServiceName",
+            "columnName": "$columnName",
+            "id": ${id}
+           }],
+          "steps": [
+          [ {
+              "label": "${testLabelNameWeak}",
+              "direction": "${direction}",
+              "offset": 0,
+              "limit": 10,
+              "duplicate": "raw"
+            }
+          ]]
+        }""")
+  }
+
+}
+


Reply via email to