# 208 Move test cases from s2rest_play to s2graph_core
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/610d519d Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/610d519d Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/610d519d Branch: refs/heads/feature/test_daewon Commit: 610d519d3a8cf2c140499d0b4e4d3c75faaf08c9 Parents: 395b00f Author: daewon <[email protected]> Authored: Tue Dec 29 17:01:17 2015 +0900 Committer: daewon <[email protected]> Committed: Tue Dec 29 17:01:17 2015 +0900 ---------------------------------------------------------------------- .../scala/com/kakao/s2graph/core/Graph.scala | 2 +- .../kakao/s2graph/core/rest/RequestParser.scala | 11 + .../scala/com/kakao/s2graph/core/EdgeTest.scala | 11 +- .../kakao/s2graph/core/Integrate/CrudTest.scala | 141 ++-- .../core/Integrate/IntegrateCommon.scala | 107 +-- .../s2graph/core/Integrate/QueryTest.scala | 86 +-- .../core/Integrate/StrongLabelDeleteTest.scala | 282 ++++++++ .../core/Integrate/WeakLabelDeleteTest.scala | 129 ++++ .../app/controllers/EdgeController.scala | 9 +- .../benchmark/PostProcessBenchmarkSpec.scala | 6 +- .../test/controllers/PostProcessSpec.scala | 2 +- s2rest_play/test/controllers/SpecCommon.scala | 35 +- .../controllers/StrongLabelDeleteSpec.scala | 690 +++++++++---------- .../test/controllers/WeakLabelDeleteSpec.scala | 252 +++---- 14 files changed, 1076 insertions(+), 687 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/610d519d/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala b/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala index 9a540d2..5bba009 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala @@ -312,7 +312,7 @@ object Graph { } get } -class Graph(_config: Config)(implicit ec: ExecutionContext) { +class Graph(_config: Config)(implicit val ec: ExecutionContext) { val config = _config.withFallback(Graph.DefaultConfig) val cacheSize = config.getInt("cache.max.size") // val cache = CacheBuilder.newBuilder().maximumSize(cacheSize).build[java.lang.Integer, Seq[QueryResult]]() http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/610d519d/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala index 04ac113..f22dfcf 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala @@ -503,4 +503,15 @@ class RequestParser(config: Config) extends JSONParser { element <- Graph.toGraphElement(str) } yield element } + + def toDeleteParam(json: JsValue) = { + val labelName = (json \ "label").as[String] + val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil) + val direction = (json \ "direction").asOpt[String].getOrElse("out") + + val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil) + val ts = (json \ "timestamp").asOpt[Long].getOrElse(System.currentTimeMillis()) + val vertices = toVertices(labelName, direction, ids) + (labels, direction, ids, ts, vertices) + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/610d519d/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala index d86541d..2aad32f 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala @@ -7,6 +7,7 @@ import org.scalatest.FunSuite import org.scalatest.matchers.Matcher class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { + initTests() test("toLogString") { val testLabelName = labelNameV2 @@ -24,11 +25,11 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { }).mkString("\n") val expected = Seq( - Seq("1445240543366", "update", "e", "1", "2", "s2graph_label_test", "{\"is_blocked\":true}"), - Seq("1445240543362", "insert", "e", "1", "2", "s2graph_label_test", "{\"is_hidden\":false}"), - Seq("1445240543364", "insert", "e", "1", "2", "s2graph_label_test", "{\"is_hidden\":false,\"weight\":10}"), - Seq("1445240543363", "delete", "e", "1", "2", "s2graph_label_test"), - Seq("1445240543365", "update", "e", "1", "2", "s2graph_label_test", "{\"time\":1,\"weight\":-10}") + Seq("1445240543366", "update", "e", "1", "2", testLabelName, "{\"is_blocked\":true}"), + Seq("1445240543362", "insert", "e", "1", "2", testLabelName, "{\"is_hidden\":false}"), + Seq("1445240543364", "insert", "e", "1", "2", testLabelName, "{\"is_hidden\":false,\"weight\":10}"), + Seq("1445240543363", "delete", "e", "1", "2", testLabelName), + Seq("1445240543365", "update", "e", "1", "2", testLabelName, "{\"time\":1,\"weight\":-10}") ).map(_.mkString("\t")).mkString("\n") assert(bulkEdge === expected) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/610d519d/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala index d178419..f3bbdd9 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala @@ -1,14 +1,13 @@ package com.kakao.s2graph.core.Integrate -import com.kakao.s2graph.core.{Management, PostProcess} import com.kakao.s2graph.core.mysqls._ import play.api.libs.json.{JsObject, Json} -import scala.concurrent.Await -import scala.concurrent.duration.Duration - class CrudTest extends IntegrateCommon { + import CrudHelper._ + import TestUtil._ + test("test CRUD") { var tcNum = 0 var tcString = "" @@ -152,87 +151,77 @@ class CrudTest extends IntegrateCommon { tcRunner.run(tcNum, tcString, bulkQueries, expected) } - class CrudTestRunner { - var seed = 0 - - def run(tcNum: Int, tcString: String, opWithProps: List[(Long, String, String)], expected: Map[String, String]) = { - for { - labelName <- List(testLabelName, testLabelName2) - i <- 0 until NumOfEachTest - } { - seed += 1 - val srcId = seed.toString - val tgtId = srcId - - val maxTs = opWithProps.map(t => t._1).max - /** insert edges */ - println(s"---- TC${tcNum}_init ----") - val bulkEdges = (for ((ts, op, props) <- opWithProps) yield { - Util.toEdge(ts, op, "e", srcId, tgtId, labelName, props) - }) + object CrudHelper { - Util.insertEdges(bulkEdges:_*) + class CrudTestRunner { + var seed = 0 + def run(tcNum: Int, tcString: String, opWithProps: List[(Long, String, String)], expected: Map[String, String]) = { for { - label <- Label.findByName(labelName) - direction <- List("out", "in") - cacheTTL <- List(-1L) + labelName <- List(testLabelName, testLabelName2) + i <- 0 until NumOfEachTest } { - val (serviceName, columnName, id, otherId) = direction match { - case "out" => (label.srcService.serviceName, label.srcColumn.columnName, srcId, tgtId) - case "in" => (label.tgtService.serviceName, label.tgtColumn.columnName, tgtId, srcId) - } - - val qId = if (labelName == testLabelName) id else "\"" + id + "\"" - val query = queryJson(serviceName, columnName, labelName, qId, direction, cacheTTL) - - val jsResult = Util.getEdges(query) - - val results = jsResult \ "results" - val deegrees = (jsResult \ "degrees").as[List[JsObject]] - val propsLs = (results \\ "props").seq - (deegrees.head \ LabelMeta.degree.name).as[Int] should be(1) - - val from = (results \\ "from").seq.last.toString.replaceAll("\"", "") - val to = (results \\ "to").seq.last.toString.replaceAll("\"", "") - - from should be(id.toString) - to should be(otherId.toString) - (results \\ "_timestamp").seq.last.as[Long] should be(maxTs) - - for ((key, expectedVal) <- expected) { - propsLs.last.as[JsObject].keys.contains(key) should be(true) - (propsLs.last \ key).toString should be(expectedVal) + seed += 1 + val srcId = seed.toString + val tgtId = srcId + + val maxTs = opWithProps.map(t => t._1).max + + /** insert edges */ + println(s"---- TC${tcNum}_init ----") + val bulkEdges = (for ((ts, op, props) <- opWithProps) yield { + TestUtil.toEdge(ts, op, "e", srcId, tgtId, labelName, props) + }) + + TestUtil.insertEdgesSync(bulkEdges: _*) + + for { + label <- Label.findByName(labelName) + direction <- List("out", "in") + cacheTTL <- List(-1L) + } { + val (serviceName, columnName, id, otherId) = direction match { + case "out" => (label.srcService.serviceName, label.srcColumn.columnName, srcId, tgtId) + case "in" => (label.tgtService.serviceName, label.tgtColumn.columnName, tgtId, srcId) + } + + val qId = if (labelName == testLabelName) id else "\"" + id + "\"" + val query = queryJson(serviceName, columnName, labelName, qId, direction, cacheTTL) + + val jsResult = TestUtil.getEdgesSync(query) + + val results = jsResult \ "results" + val deegrees = (jsResult \ "degrees").as[List[JsObject]] + val propsLs = (results \\ "props").seq + (deegrees.head \ LabelMeta.degree.name).as[Int] should be(1) + + val from = (results \\ "from").seq.last.toString.replaceAll("\"", "") + val to = (results \\ "to").seq.last.toString.replaceAll("\"", "") + + from should be(id.toString) + to should be(otherId.toString) + (results \\ "_timestamp").seq.last.as[Long] should be(maxTs) + + for ((key, expectedVal) <- expected) { + propsLs.last.as[JsObject].keys.contains(key) should be(true) + (propsLs.last \ key).toString should be(expectedVal) + } } } } - } - def queryJson(serviceName: String, columnName: String, labelName: String, id: String, dir: String, cacheTTL: Long = -1L) = { - val s = - s"""{ - "srcVertices": [ - { - "serviceName": "$serviceName", - "columnName": "$columnName", - "id": $id - } - ], - "steps": [ - [ - { - "label": "$labelName", - "direction": "$dir", - "offset": 0, - "limit": 10, - "cacheTTL": $cacheTTL - } - ] - ] - }""" - Json.parse(s) + def queryJson(serviceName: String, columnName: String, labelName: String, id: String, dir: String, cacheTTL: Long = -1L) = Json.parse( + s""" { "srcVertices": [ + { "serviceName": "$serviceName", + "columnName": "$columnName", + "id": $id } ], + "steps": [ [ { + "label": "$labelName", + "direction": "$dir", + "offset": 0, + "limit": 10, + "cacheTTL": $cacheTTL }]]}""") } } - } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/610d519d/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala index 6db250d..230fa9c 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala @@ -8,9 +8,12 @@ import org.scalatest._ import play.api.libs.json.{JsValue, Json} import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext} +import scala.concurrent.{Await, ExecutionContext, Future} trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { + + import TestUtil._ + var graph: Graph = _ var parser: RequestParser = _ var config: Config = _ @@ -78,19 +81,32 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { /** * Test Helpers */ - object Util { -// def checkEdgeQueryJson(params: Seq[(String, String, String, String)]) = { -// val arr = for { -// (label, dir, from, to) <- params -// } yield { -// Json.obj("label" -> label, "direction" -> dir, "from" -> from, "to" -> to) -// } -// -// val s = Json.toJson(arr) -// s -// } - - def getEdges(queryJson: JsValue): JsValue = { + object TestUtil { + implicit def ec = graph.ec + + // def checkEdgeQueryJson(params: Seq[(String, String, String, String)]) = { + // val arr = for { + // (label, dir, from, to) <- params + // } yield { + // Json.obj("label" -> label, "direction" -> dir, "from" -> from, "to" -> to) + // } + // + // val s = Json.toJson(arr) + // s + // } + + def deleteAllSync(jsValue: JsValue) = { + val future = Future.sequence(jsValue.as[Seq[JsValue]] map { json => + val (labels, direction, ids, ts, vertices) = parser.toDeleteParam(json) + val future = graph.deleteAllAdjacentEdges(vertices.toList, labels, GraphUtil.directions(direction), ts) + + future + }) + + Await.result(future, HttpRequestWaitingTime) + } + + def getEdgesSync(queryJson: JsValue): JsValue = { val ret = graph.getEdges(parser.toQuery(queryJson)) val result = Await.result(ret, HttpRequestWaitingTime) val jsResult = PostProcess.toSimpleVertexArrJson(result) @@ -98,37 +114,41 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { jsResult } - def insertEdges(bulkEdges: String*) = { + def insertEdgesSync(bulkEdges: String*) = { val req = graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait = true) val jsResult = Await.result(req, HttpRequestWaitingTime) jsResult } + def insertEdgesAsync(bulkEdges: String*) = { + val req = graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait = true) + req + } + def toEdge(elems: Any*): String = elems.mkString("\t") - } - // common tables - protected val testServiceName = "s2graph" - protected val testLabelName = "s2graph_label_test" - protected val testLabelName2 = "s2graph_label_test_2" - protected val testLabelNameV1 = "s2graph_label_test_v1" - protected val testLabelNameWeak = "s2graph_label_test_weak" - protected val testColumnName = "user_id_test" - protected val testColumnType = "long" - protected val testTgtColumnName = "item_id_test" - protected val testHTableName = "test-htable" - protected val newHTableName = "new-htable" - protected val index1 = "idx_1" - protected val index2 = "idx_2" - - val NumOfEachTest = 30 - val HttpRequestWaitingTime = Duration("Inf") - - val createService = s"""{"serviceName" : "$testServiceName"}""" - - val testLabelNameCreate = - s""" + // common tables + val testServiceName = "s2graph" + val testLabelName = "s2graph_label_test" + val testLabelName2 = "s2graph_label_test_2" + val testLabelNameV1 = "s2graph_label_test_v1" + val testLabelNameWeak = "s2graph_label_test_weak" + val testColumnName = "user_id_test" + val testColumnType = "long" + val testTgtColumnName = "item_id_test" + val testHTableName = "test-htable" + val newHTableName = "new-htable" + val index1 = "idx_1" + val index2 = "idx_2" + + val NumOfEachTest = 30 + val HttpRequestWaitingTime = Duration("Inf") + + val createService = s"""{"serviceName" : "$testServiceName"}""" + + val testLabelNameCreate = + s""" { "label": "$testLabelName", "srcServiceName": "$testServiceName", @@ -169,8 +189,8 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { "hTableName": "$testHTableName" }""" - val testLabelName2Create = - s""" + val testLabelName2Create = + s""" { "label": "$testLabelName2", "srcServiceName": "$testServiceName", @@ -208,8 +228,8 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { "compressionAlgorithm": "gz" }""" - val testLabelNameV1Create = - s""" + val testLabelNameV1Create = + s""" { "label": "$testLabelNameV1", "srcServiceName": "$testServiceName", @@ -247,8 +267,8 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { "compressionAlgorithm": "gz" }""" - val testLabelNameWeakCreate = - s""" + val testLabelNameWeakCreate = + s""" { "label": "$testLabelNameWeak", "srcServiceName": "$testServiceName", @@ -284,4 +304,5 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { "isDirected": true, "compressionAlgorithm": "gz" }""" + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/610d519d/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala index 505927b..bd81b7b 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala @@ -8,7 +8,7 @@ import scala.util.{Success, Try} class QueryTest extends IntegrateCommon with BeforeAndAfterEach { - import Util._ + import TestUtil._ val insert = "insert" val e = "e" @@ -36,16 +36,16 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { } """) - var edges = getEdges(queryWithInterval(0, index2, "_timestamp", 1000, 1001)) // test interval on timestamp index + var edges = getEdgesSync(queryWithInterval(0, index2, "_timestamp", 1000, 1001)) // test interval on timestamp index (edges \ "size").toString should be("1") - edges = getEdges(queryWithInterval(0, index2, "_timestamp", 1000, 2000)) // test interval on timestamp index + edges = getEdgesSync(queryWithInterval(0, index2, "_timestamp", 1000, 2000)) // test interval on timestamp index (edges \ "size").toString should be("2") - edges = getEdges(queryWithInterval(2, index1, "weight", 10, 11)) // test interval on weight index + edges = getEdgesSync(queryWithInterval(2, index1, "weight", 10, 11)) // test interval on weight index (edges \ "size").toString should be("1") - edges = getEdges(queryWithInterval(2, index1, "weight", 10, 20)) // test interval on weight index + edges = getEdgesSync(queryWithInterval(2, index1, "weight", 10, 20)) // test interval on weight index (edges \ "size").toString should be("2") } @@ -68,19 +68,19 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { ]] }""") - var result = getEdges(queryWhere(0, "is_hidden=false and _from in (-1, 0)")) + var result = getEdgesSync(queryWhere(0, "is_hidden=false and _from in (-1, 0)")) (result \ "results").as[List[JsValue]].size should be(1) - result = getEdges(queryWhere(0, "is_hidden=true and _to in (1)")) + result = getEdgesSync(queryWhere(0, "is_hidden=true and _to in (1)")) (result \ "results").as[List[JsValue]].size should be(1) - result = getEdges(queryWhere(0, "_from=0")) + result = getEdgesSync(queryWhere(0, "_from=0")) (result \ "results").as[List[JsValue]].size should be(2) - result = getEdges(queryWhere(2, "_from=2 or weight in (-1)")) + result = getEdgesSync(queryWhere(2, "_from=2 or weight in (-1)")) (result \ "results").as[List[JsValue]].size should be(2) - result = getEdges(queryWhere(2, "_from=2 and weight in (10, 20)")) + result = getEdgesSync(queryWhere(2, "_from=2 and weight in (10, 20)")) (result \ "results").as[List[JsValue]].size should be(2) } @@ -109,7 +109,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { ]] }""") - val result = getEdges(queryExclude(0)) + val result = getEdgesSync(queryExclude(0)) (result \ "results").as[List[JsValue]].size should be(1) } @@ -132,7 +132,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { ) } - val result = getEdges(queryGroupBy(0, Seq("weight"))) + val result = getEdgesSync(queryGroupBy(0, Seq("weight"))) (result \ "size").as[Int] should be(2) val weights = (result \\ "groupBy").map { js => (js \ "weight").as[Int] @@ -161,13 +161,13 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { ]] }""") - var result = getEdges(queryTransform(0, "[[\"_to\"]]")) + var result = getEdgesSync(queryTransform(0, "[[\"_to\"]]")) (result \ "results").as[List[JsValue]].size should be(2) - result = getEdges(queryTransform(0, "[[\"weight\"]]")) + result = getEdgesSync(queryTransform(0, "[[\"weight\"]]")) (result \\ "to").map(_.toString).sorted should be((result \\ "weight").map(_.toString).sorted) - result = getEdges(queryTransform(0, "[[\"_from\"]]")) + result = getEdgesSync(queryTransform(0, "[[\"_from\"]]")) val results = (result \ "results").as[JsValue] (result \\ "to").map(_.toString).sorted should be((results \\ "from").map(_.toString).sorted) } @@ -187,11 +187,11 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { } // weight order - var result = getEdges(queryIndex(Seq(0), "idx_1")) + var result = getEdgesSync(queryIndex(Seq(0), "idx_1")) ((result \ "results").as[List[JsValue]].head \\ "weight").head should be(JsNumber(40)) // timestamp order - result = getEdges(queryIndex(Seq(0), "idx_2")) + result = getEdgesSync(queryIndex(Seq(0), "idx_2")) ((result \ "results").as[List[JsValue]].head \\ "weight").head should be(JsNumber(30)) } @@ -237,13 +237,13 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { } // get all - var result = getEdges(queryDuration(Seq(0, 2), from = 0, to = 5000)) + var result = getEdgesSync(queryDuration(Seq(0, 2), from = 0, to = 5000)) (result \ "results").as[List[JsValue]].size should be(4) // inclusive, exclusive - result = getEdges(queryDuration(Seq(0, 2), from = 1000, to = 4000)) + result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 4000)) (result \ "results").as[List[JsValue]].size should be(3) - result = getEdges(queryDuration(Seq(0, 2), from = 1000, to = 2000)) + result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 2000)) (result \ "results").as[List[JsValue]].size should be(1) val bulkEdges = Seq( @@ -252,21 +252,21 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { toEdge(3003, insert, e, 2, 0, testLabelName, Json.obj(weight -> 30)), toEdge(4004, insert, e, 2, 1, testLabelName, Json.obj(weight -> 40)) ) - insertEdges(bulkEdges: _*) + insertEdgesSync(bulkEdges: _*) // duration test after udpate // get all - result = getEdges(queryDuration(Seq(0, 2), from = 0, to = 5000)) + result = getEdgesSync(queryDuration(Seq(0, 2), from = 0, to = 5000)) (result \ "results").as[List[JsValue]].size should be(4) // inclusive, exclusive - result = getEdges(queryDuration(Seq(0, 2), from = 1000, to = 4000)) + result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 4000)) (result \ "results").as[List[JsValue]].size should be(3) - result = getEdges(queryDuration(Seq(0, 2), from = 1000, to = 2000)) + result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 2000)) (result \ "results").as[List[JsValue]].size should be(1) - def a: JsValue = getEdges(queryDuration(Seq(0, 2), from = 3000, to = 2000)) + def a: JsValue = getEdgesSync(queryDuration(Seq(0, 2), from = 3000, to = 2000)) Try(a).recover { case e: BadQueryException => JsNull } should be(Success(JsNull)) @@ -301,9 +301,9 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { val src = 100 val tgt = 200 - insertEdges(toEdge(1001, "insert", "e", src, tgt, testLabelName)) + insertEdgesSync(toEdge(1001, "insert", "e", src, tgt, testLabelName)) - val result = Util.getEdges(queryParents(src)) + val result = TestUtil.getEdgesSync(queryParents(src)) val parents = (result \ "results").as[Seq[JsValue]] val ret = parents.forall { edge => (edge \ "parents").as[Seq[JsValue]].size == 1 @@ -342,23 +342,23 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)), toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40)) ) - insertEdges(bulkEdges: _*) + insertEdgesSync(bulkEdges: _*) - var result = getEdges(querySingle(src, offset = 0, limit = 2)) + var result = getEdgesSync(querySingle(src, offset = 0, limit = 2)) var edges = (result \ "results").as[List[JsValue]] edges.size should be(2) (edges(0) \ "to").as[Long] should be(4) (edges(1) \ "to").as[Long] should be(3) - result = getEdges(querySingle(src, offset = 1, limit = 2)) + result = getEdgesSync(querySingle(src, offset = 1, limit = 2)) edges = (result \ "results").as[List[JsValue]] edges.size should be(2) (edges(0) \ "to").as[Long] should be(3) (edges(1) \ "to").as[Long] should be(2) - result = getEdges(querySingleWithTo(src, offset = 0, limit = -1, to = 1)) + result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to = 1)) edges = (result \ "results").as[List[JsValue]] edges.size should be(1) } @@ -406,12 +406,12 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { toEdge(4004, insert, e, 2, 1, testLabelName, Json.obj(weight -> 40)) ) - insertEdges(bulkEdges: _*) + insertEdgesSync(bulkEdges: _*) // get edges - val edges = getEdges(queryScore(0, Map("weight" -> 1))) - val orderByScore = getEdges(queryOrderBy(0, Map("weight" -> 1), Seq(Map("score" -> "DESC", "timestamp" -> "DESC")))) - val ascOrderByScore = getEdges(queryOrderBy(0, Map("weight" -> 1), Seq(Map("score" -> "ASC", "timestamp" -> "DESC")))) + val edges = getEdgesSync(queryScore(0, Map("weight" -> 1))) + val orderByScore = getEdgesSync(queryOrderBy(0, Map("weight" -> 1), Seq(Map("score" -> "DESC", "timestamp" -> "DESC")))) + val ascOrderByScore = getEdgesSync(queryOrderBy(0, Map("weight" -> 1), Seq(Map("score" -> "ASC", "timestamp" -> "DESC")))) val edgesTo = edges \ "results" \\ "to" val orderByTo = orderByScore \ "results" \\ "to" @@ -523,27 +523,27 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { toEdge(ts, insert, e, 322, 3322, testLabelName) ) - insertEdges(bulkEdges: _*) + insertEdgesSync(bulkEdges: _*) - val result1 = getEdges(queryWithSampling(testId, sampleSize)) + val result1 = getEdgesSync(queryWithSampling(testId, sampleSize)) (result1 \ "results").as[List[JsValue]].size should be(math.min(sampleSize, bulkEdges.size)) - val result2 = getEdges(twoStepQueryWithSampling(testId, sampleSize)) + val result2 = getEdgesSync(twoStepQueryWithSampling(testId, sampleSize)) (result2 \ "results").as[List[JsValue]].size should be(math.min(sampleSize * sampleSize, bulkEdges.size * bulkEdges.size)) - val result3 = getEdges(twoQueryWithSampling(testId, sampleSize)) + val result3 = getEdgesSync(twoQueryWithSampling(testId, sampleSize)) (result3 \ "results").as[List[JsValue]].size should be(sampleSize + 3) // edges in testLabelName2 = 3 } test("limit") { - insertEdges( + insertEdgesSync( toEdge(1001, insert, e, 0, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)), toEdge(2002, insert, e, 0, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)), toEdge(3003, insert, e, 2, 0, testLabelName, Json.obj(weight -> 30)), toEdge(4004, insert, e, 2, 1, testLabelName, Json.obj(weight -> 40))) - val edges = getEdges(querySingle(0, limit = 1)) - val limitEdges = getEdges(queryGlobalLimit(0, 1)) + val edges = getEdgesSync(querySingle(0, limit = 1)) + val limitEdges = getEdgesSync(queryGlobalLimit(0, 1)) val edgesTo = edges \ "results" \\ "to" val limitEdgesTo = limitEdges \ "results" \\ "to" @@ -615,7 +615,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { override def initTestData(): Unit = { super.initTestData() - insertEdges( + insertEdgesSync( toEdge(1000, insert, e, 0, 1, testLabelName, Json.obj(weight -> 40, is_hidden -> true)), toEdge(2000, insert, e, 0, 2, testLabelName, Json.obj(weight -> 30, is_hidden -> false)), toEdge(3000, insert, e, 2, 0, testLabelName, Json.obj(weight -> 20)), http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/610d519d/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala new file mode 100644 index 0000000..69a49b4 --- /dev/null +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala @@ -0,0 +1,282 @@ +package com.kakao.s2graph.core.Integrate + +import java.util.concurrent.TimeUnit + +import play.api.libs.json.{JsValue, Json} + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} +import scala.util.Random + +class StrongLabelDeleteTest extends IntegrateCommon { + + import StrongDeleteUtil._ + import TestUtil._ + + test("Strong consistency select") { + insertEdgesSync(bulkEdges(): _*) + + var result = getEdgesSync(query(0)) + (result \ "results").as[List[JsValue]].size should be(2) + result = getEdgesSync(query(10)) + (result \ "results").as[List[JsValue]].size should be(2) + } + + test("Strong consistency deleteAll") { + val deletedAt = 100 + var result = getEdgesSync(query(20, direction = "in", columnName = testTgtColumnName)) + + println(result) + (result \ "results").as[List[JsValue]].size should be(3) + + val deleteParam = Json.arr( + Json.obj("label" -> testLabelName2, + "direction" -> "in", + "ids" -> Json.arr("20"), + "timestamp" -> deletedAt)) + + deleteAllSync(deleteParam) + + result = getEdgesSync(query(11, direction = "out")) + println(result) + (result \ "results").as[List[JsValue]].size should be(0) + + result = getEdgesSync(query(12, direction = "out")) + println(result) + (result \ "results").as[List[JsValue]].size should be(0) + + result = getEdgesSync(query(10, direction = "out")) + println(result) + // 10 -> out -> 20 should not be in result. + (result \ "results").as[List[JsValue]].size should be(1) + (result \\ "to").size should be(1) + (result \\ "to").head.as[String] should be("21") + + result = getEdgesSync(query(20, direction = "in", columnName = testTgtColumnName)) + println(result) + (result \ "results").as[List[JsValue]].size should be(0) + + insertEdgesSync(bulkEdges(startTs = deletedAt + 1): _*) + + result = getEdgesSync(query(20, direction = "in", columnName = testTgtColumnName)) + println(result) + + (result \ "results").as[List[JsValue]].size should be(3) + } + + + test("update delete") { + val ret = for { + i <- 0 until testNum + } yield { + val src = System.currentTimeMillis() + + val (ret, last) = testInner(i, src) + ret should be(true) + ret + } + + ret.forall(identity) + } + + test("update delete 2") { + val src = System.currentTimeMillis() + var ts = 0L + + val ret = for { + i <- 0 until testNum + } yield { + val (ret, lastTs) = testInner(ts, src) + val deletedAt = lastTs + 1 + val deletedAt2 = lastTs + 2 + ts = deletedAt2 + 1 // nex start ts + + ret should be(true) + + val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt)) + val deleteAllRequest2 = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt2)) + + val deleteRet = deleteAllSync(deleteAllRequest) + val deleteRet2 = deleteAllSync(deleteAllRequest2) + + val result = getEdgesSync(query(id = src)) + println(result) + + val resultEdges = (result \ "results").as[Seq[JsValue]] + resultEdges.isEmpty should be(true) + + val degreeAfterDeleteAll = getDegree(result) + + degreeAfterDeleteAll should be(0) + degreeAfterDeleteAll === (0) + } + + ret.forall(identity) + } + + /** This test stress out test on degree + * when contention is low but number of adjacent edges are large + * Large set of contention test + */ + test("large degrees") { + val labelName = testLabelName2 + val dir = "out" + val maxSize = 100 + val deleteSize = 10 + val numOfConcurrentBatch = 100 + val src = System.currentTimeMillis() + val tgts = (0 until maxSize).map { ith => src + ith } + val deleteTgts = Random.shuffle(tgts).take(deleteSize) + val insertRequests = tgts.map { tgt => + Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t") + } + val deleteRequests = deleteTgts.take(deleteSize).map { tgt => + Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t") + } + val allRequests = Random.shuffle(insertRequests ++ deleteRequests) + // val allRequests = insertRequests ++ deleteRequests + val futures = allRequests.grouped(numOfConcurrentBatch).map { bulkRequests => + insertEdgesAsync(bulkRequests: _*) + } + + Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) + + val expectedDegree = insertRequests.size - deleteRequests.size + val queryJson = query(id = src) + val result = getEdgesSync(queryJson) + val resultSize = (result \ "size").as[Long] + val resultDegree = getDegree(result) + + // println(result) + + val ret = resultSize == expectedDegree && resultDegree == resultSize + println(s"[MaxSize]: $maxSize") + println(s"[DeleteSize]: $deleteSize") + println(s"[ResultDegree]: $resultDegree") + println(s"[ExpectedDegree]: $expectedDegree") + println(s"[ResultSize]: $resultSize") + ret should be(true) + } + + test("deleteAll") { + val labelName = testLabelName2 + val dir = "out" + val maxSize = 100 + val deleteSize = 10 + val numOfConcurrentBatch = 100 + val src = System.currentTimeMillis() + val tgts = (0 until maxSize).map { ith => src + ith } + val deleteTgts = Random.shuffle(tgts).take(deleteSize) + val insertRequests = tgts.map { tgt => + Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t") + } + val deleteRequests = deleteTgts.take(deleteSize).map { tgt => + Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t") + } + val allRequests = Random.shuffle(insertRequests ++ deleteRequests) + val futures = allRequests.grouped(numOfConcurrentBatch).map { bulkRequests => + insertEdgesAsync(bulkRequests: _*) + } + + Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) + + val deletedAt = System.currentTimeMillis() + val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt)) + + deleteAllSync(deleteAllRequest) + + val result = getEdgesSync(query(id = src)) + println(result) + val resultEdges = (result \ "results").as[Seq[JsValue]] + resultEdges.isEmpty should be(true) + + val degreeAfterDeleteAll = getDegree(result) + degreeAfterDeleteAll should be(0) + } + + object StrongDeleteUtil { + + val labelName = testLabelName2 + val maxTgtId = 10 + val batchSize = 10 + val testNum = 3 + val numOfBatch = 10 + + def testInner(startTs: Long, src: Long) = { + val labelName = testLabelName2 + val lastOps = Array.fill(maxTgtId)("none") + var currentTs = startTs + + val allRequests = for { + ith <- 0 until numOfBatch + jth <- 0 until batchSize + } yield { + currentTs += 1 + + val tgt = Random.nextInt(maxTgtId) + val op = if (Random.nextDouble() < 0.5) "delete" else "update" + + lastOps(tgt) = op + Seq(currentTs, op, "e", src, src + tgt, labelName, "{}").mkString("\t") + } + + allRequests.foreach(println(_)) + + val futures = Random.shuffle(allRequests).grouped(batchSize).map { bulkRequests => + insertEdgesAsync(bulkRequests: _*) + } + + Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) + + val expectedDegree = lastOps.count(op => op != "delete" && op != "none") + val queryJson = query(id = src) + val result = getEdgesSync(queryJson) + val resultSize = (result \ "size").as[Long] + val resultDegree = getDegree(result) + + println(lastOps.toList) + println(result) + + val ret = resultDegree == expectedDegree && resultSize == resultDegree + if (!ret) System.err.println(s"[Contention Failed]: $resultDegree, $expectedDegree") + + (ret, currentTs) + } + + def bulkEdges(startTs: Int = 0) = Seq( + toEdge(startTs + 1, "insert", "e", "0", "1", testLabelName2, s"""{"time": 10}"""), + toEdge(startTs + 2, "insert", "e", "0", "1", testLabelName2, s"""{"time": 11}"""), + toEdge(startTs + 3, "insert", "e", "0", "1", testLabelName2, s"""{"time": 12}"""), + toEdge(startTs + 4, "insert", "e", "0", "2", testLabelName2, s"""{"time": 10}"""), + toEdge(startTs + 5, "insert", "e", "10", "20", testLabelName2, s"""{"time": 10}"""), + toEdge(startTs + 6, "insert", "e", "10", "21", testLabelName2, s"""{"time": 11}"""), + toEdge(startTs + 7, "insert", "e", "11", "20", testLabelName2, s"""{"time": 12}"""), + toEdge(startTs + 8, "insert", "e", "12", "20", testLabelName2, s"""{"time": 13}""") + ) + + def query(id: Long, serviceName: String = testServiceName, columnName: String = testColumnName, + labelName: String = testLabelName2, direction: String = "out") = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "$serviceName", + "columnName": "$columnName", + "id": $id + }], + "steps": [ + [ { + "label": "$labelName", + "direction": "${direction}", + "offset": 0, + "limit": -1, + "duplicate": "raw" + } + ]] + }""") + + def getDegree(jsValue: JsValue): Long = { + ((jsValue \ "degrees") \\ "_degree").headOption.map(_.as[Long]).getOrElse(0L) + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/610d519d/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala new file mode 100644 index 0000000..b80d9c7 --- /dev/null +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala @@ -0,0 +1,129 @@ +package com.kakao.s2graph.core.Integrate + +import java.util.concurrent.TimeUnit + +import org.scalatest.BeforeAndAfterEach +import play.api.libs.json.{JsObject, JsValue, Json} + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach { + + import TestUtil._ + import WeakLabelDeleteHelper._ + + test("test weak consistency select") { + var result = getEdgesSync(query(0)) + println(result) + (result \ "results").as[List[JsValue]].size should be(4) + result = getEdgesSync(query(10)) + println(result) + (result \ "results").as[List[JsValue]].size should be(2) + } + + test("test weak consistency delete") { + var result = getEdgesSync(query(0)) + println(result) + + /** expect 4 edges */ + (result \ "results").as[List[JsValue]].size should be(4) + val edges = (result \ "results").as[List[JsObject]] + val edgesToStore = parser.toEdges(Json.toJson(edges), "delete") + val rets = graph.mutateEdges(edgesToStore, withWait = true) + Await.result(rets, Duration(20, TimeUnit.MINUTES)) + + /** expect noting */ + result = getEdgesSync(query(0)) + println(result) + (result \ "results").as[List[JsValue]].size should be(0) + + /** insert should be ignored */ + val edgesToStore2 = parser.toEdges(Json.toJson(edges), "insert") + val rets2 = graph.mutateEdges(edgesToStore2, withWait = true) + Await.result(rets2, Duration(20, TimeUnit.MINUTES)) + + result = getEdgesSync(query(0)) + (result \ "results").as[List[JsValue]].size should be(0) + } + + + test("test weak consistency deleteAll") { + val deletedAt = 100 + var result = getEdgesSync(query(20, "in", testTgtColumnName)) + println(result) + (result \ "results").as[List[JsValue]].size should be(3) + + val json = Json.arr(Json.obj("label" -> testLabelNameWeak, + "direction" -> "in", "ids" -> Json.arr("20"), "timestamp" -> deletedAt)) + println(json) + deleteAllSync(json) + + result = getEdgesSync(query(11, "out")) + (result \ "results").as[List[JsValue]].size should be(0) + + result = getEdgesSync(query(12, "out")) + (result \ "results").as[List[JsValue]].size should be(0) + + result = getEdgesSync(query(10, "out")) + + // 10 -> out -> 20 should not be in result. + (result \ "results").as[List[JsValue]].size should be(1) + (result \\ "to").size should be(1) + (result \\ "to").head.as[String] should be("21") + + result = getEdgesSync(query(20, "in", testTgtColumnName)) + println(result) + (result \ "results").as[List[JsValue]].size should be(0) + + insertEdgesSync(bulkEdges(startTs = deletedAt + 1): _*) + + result = getEdgesSync(query(20, "in", testTgtColumnName)) + (result \ "results").as[List[JsValue]].size should be(3) + } + + + // called by each test, each + override def beforeEach = initTestData() + + // called by start test, once + + override def initTestData(): Unit = { + super.initTestData() + insertEdgesSync(bulkEdges(): _*) + } + + object WeakLabelDeleteHelper { + + def bulkEdges(startTs: Int = 0) = Seq( + toEdge(startTs + 1, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 10}"""), + toEdge(startTs + 2, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 11}"""), + toEdge(startTs + 3, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 12}"""), + toEdge(startTs + 4, "insert", "e", "0", "2", testLabelNameWeak, s"""{"time": 10}"""), + toEdge(startTs + 5, "insert", "e", "10", "20", testLabelNameWeak, s"""{"time": 10}"""), + toEdge(startTs + 6, "insert", "e", "10", "21", testLabelNameWeak, s"""{"time": 11}"""), + toEdge(startTs + 7, "insert", "e", "11", "20", testLabelNameWeak, s"""{"time": 12}"""), + toEdge(startTs + 8, "insert", "e", "12", "20", testLabelNameWeak, s"""{"time": 13}""") + ) + + def query(id: Int, direction: String = "out", columnName: String = testColumnName) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$columnName", + "id": ${id} + }], + "steps": [ + [ { + "label": "${testLabelNameWeak}", + "direction": "${direction}", + "offset": 0, + "limit": 10, + "duplicate": "raw" + } + ]] + }""") + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/610d519d/s2rest_play/app/controllers/EdgeController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/EdgeController.scala b/s2rest_play/app/controllers/EdgeController.scala index 8df4f9a..e72aff7 100644 --- a/s2rest_play/app/controllers/EdgeController.scala +++ b/s2rest_play/app/controllers/EdgeController.scala @@ -157,14 +157,7 @@ object EdgeController extends Controller { def deleteAllInner(jsValue: JsValue, withWait: Boolean) = { val deleteResults = Future.sequence(jsValue.as[Seq[JsValue]] map { json => - - val labelName = (json \ "label").as[String] - val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil) - val direction = (json \ "direction").asOpt[String].getOrElse("out") - - val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil) - val ts = (json \ "timestamp").asOpt[Long].getOrElse(System.currentTimeMillis()) - val vertices = requestParser.toVertices(labelName, direction, ids) + val (labels, direction, ids, ts, vertices) = requestParser.toDeleteParam(json) /** logging for delete all request */ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/610d519d/s2rest_play/test/benchmark/PostProcessBenchmarkSpec.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/benchmark/PostProcessBenchmarkSpec.scala b/s2rest_play/test/benchmark/PostProcessBenchmarkSpec.scala index 0122f28..3ae08e2 100644 --- a/s2rest_play/test/benchmark/PostProcessBenchmarkSpec.scala +++ b/s2rest_play/test/benchmark/PostProcessBenchmarkSpec.scala @@ -15,10 +15,6 @@ import scala.concurrent.duration._ * Created by hsleep([email protected]) on 2015. 11. 6.. */ class PostProcessBenchmarkSpec extends SpecCommon with BenchmarkCommon with PlaySpecification { - sequential - - import Helper._ - init() override def init() = { @@ -48,7 +44,7 @@ class PostProcessBenchmarkSpec extends SpecCommon with BenchmarkCommon with Play // create edges val bulkEdges: String = (0 until 500).map { i => - edge"${System.currentTimeMillis()} insert e 0 $i $testLabelNameWeak"($(weight=i)) + Seq(System.currentTimeMillis(), "insert", "e", "0", i, testLabelNameWeak, Json.obj("weight" -> i)).mkString("\t") }.mkString("\n") val jsResult = contentAsJson(EdgeController.mutateAndPublish(bulkEdges, withWait = true)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/610d519d/s2rest_play/test/controllers/PostProcessSpec.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/controllers/PostProcessSpec.scala b/s2rest_play/test/controllers/PostProcessSpec.scala index 9573791..30ea307 100644 --- a/s2rest_play/test/controllers/PostProcessSpec.scala +++ b/s2rest_play/test/controllers/PostProcessSpec.scala @@ -8,7 +8,7 @@ import play.api.test.PlaySpecification /** * Created by hsleep on 2015. 11. 4.. */ -class PostProcessSpec extends SpecCommon with PlaySpecification { +class PostProcessSpec extends PlaySpecification { "test order by json" >> { val jsLs: Seq[Seq[JsValue]] = Seq( http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/610d519d/s2rest_play/test/controllers/SpecCommon.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/controllers/SpecCommon.scala b/s2rest_play/test/controllers/SpecCommon.scala index 7752544..28c4c23 100644 --- a/s2rest_play/test/controllers/SpecCommon.scala +++ b/s2rest_play/test/controllers/SpecCommon.scala @@ -12,41 +12,8 @@ import scala.concurrent.duration._ import scala.util.Random trait SpecCommon extends Specification { - sequential - object Helper { - - import org.json4s.native.Serialization - - type KV = Map[String, Any] - - import scala.language.dynamics - - def $aa[T](args: T*) = List($a(args: _ *)) - - def $a[T](args: T*) = args.toList - - object $ extends Dynamic { - def applyDynamicNamed(name: String)(args: (String, Any)*): Map[String, Any] = args.toMap - } - - implicit class anyMapOps(map: Map[String, Any]) { - def toJson: JsValue = { - val js = Serialization.write(map)(org.json4s.DefaultFormats) - Json.parse(js) - } - } - - implicit class S2Context(val sc: StringContext) { - def edge(args: Any*)(implicit map: Map[String, Any] = Map.empty): String = { - val parts = sc.s(args: _*).split("\\s") - assert(parts.length == 6) - (parts.toList :+ map.toJson.toString).mkString("\t") - } - } - - } - val curTime = System.currentTimeMillis + val t1 = curTime + 0 val t2 = curTime + 1 val t3 = curTime + 2 http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/610d519d/s2rest_play/test/controllers/StrongLabelDeleteSpec.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/controllers/StrongLabelDeleteSpec.scala b/s2rest_play/test/controllers/StrongLabelDeleteSpec.scala index 04ccc79..f1d8754 100644 --- a/s2rest_play/test/controllers/StrongLabelDeleteSpec.scala +++ b/s2rest_play/test/controllers/StrongLabelDeleteSpec.scala @@ -1,345 +1,345 @@ -package controllers - -import java.util.concurrent.TimeUnit - - -import com.kakao.s2graph.core.utils.logger -import play.api.libs.json._ -import play.api.test.Helpers._ -import play.api.test.{FakeApplication, FakeRequest} - -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} -import scala.util.Random - -class StrongLabelDeleteSpec extends SpecCommon { - init() -// implicit val timeout = Timeout(Duration(20, TimeUnit.MINUTES)) - - def bulkEdges(startTs: Int = 0) = Seq( - Seq(startTs + 1, "insert", "e", "0", "1", testLabelName2, s"""{"time": 10}""").mkString("\t"), - Seq(startTs + 2, "insert", "e", "0", "1", testLabelName2, s"""{"time": 11}""").mkString("\t"), - Seq(startTs + 3, "insert", "e", "0", "1", testLabelName2, s"""{"time": 12}""").mkString("\t"), - Seq(startTs + 4, "insert", "e", "0", "2", testLabelName2, s"""{"time": 10}""").mkString("\t"), - Seq(startTs + 5, "insert", "e", "10", "20", testLabelName2, s"""{"time": 10}""").mkString("\t"), - Seq(startTs + 6, "insert", "e", "10", "21", testLabelName2, s"""{"time": 11}""").mkString("\t"), - Seq(startTs + 7, "insert", "e", "11", "20", testLabelName2, s"""{"time": 12}""").mkString("\t"), - Seq(startTs + 8, "insert", "e", "12", "20", testLabelName2, s"""{"time": 13}""").mkString("\t") - ).mkString("\n") - - def query(id: Long, serviceName: String = testServiceName, columnName: String = testColumnName, - labelName: String = testLabelName2, direction: String = "out") = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "$serviceName", - "columnName": "$columnName", - "id": $id - }], - "steps": [ - [ { - "label": "$labelName", - "direction": "${direction}", - "offset": 0, - "limit": -1, - "duplicate": "raw" - } - ]] - }""") - - def getEdges(queryJson: JsValue): JsValue = { -// implicit val timeout = Timeout(Duration(20, TimeUnit.MINUTES)) - - val ret = route(FakeRequest(POST, "/graphs/getEdges").withJsonBody(queryJson)).get - contentAsJson(ret) - } - - def getDegree(jsValue: JsValue): Long = { - ((jsValue \ "degrees") \\ "_degree").headOption.map(_.as[Long]).getOrElse(0L) - } - - - "strong label delete test" should { - running(FakeApplication()) { - // insert bulk and wait .. - val edges = bulkEdges() - println(edges) - val jsResult = contentAsJson(EdgeController.mutateAndPublish(edges, withWait = true)) - } - - "test strong consistency select" in { - running(FakeApplication()) { - var result = getEdges(query(0)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(2) - result = getEdges(query(10)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(2) - true - } - } - - "test strong consistency duration. insert -> delete -> insert" in { - running(FakeApplication()) { - val ts0 = 1 - val ts1 = 2 - val ts2 = 3 - - val edges = Seq( - Seq(5, "insert", "edge", "-10", "-20", testLabelName2).mkString("\t"), - Seq(10, "delete", "edge", "-10", "-20", testLabelName2).mkString("\t"), - Seq(20, "insert", "edge", "-10", "-20", testLabelName2).mkString("\t") - ).mkString("\n") - - val jsResult = contentAsJson(EdgeController.mutateAndPublish(edges, withWait = true)) - - val result = getEdges(query(-10)) - - println(result) - - true - } - } - - "test strong consistency deleteAll" in { - running(FakeApplication()) { - - val deletedAt = 100 - var result = getEdges(query(20, direction = "in", columnName = testTgtColumnName)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(3) - - val json = Json.arr(Json.obj("label" -> testLabelName2, - "direction" -> "in", "ids" -> Json.arr("20"), "timestamp" -> deletedAt)) - println(json) - contentAsString(EdgeController.deleteAllInner(json, withWait = true)) - - result = getEdges(query(11, direction = "out")) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(0) - - result = getEdges(query(12, direction = "out")) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(0) - - result = getEdges(query(10, direction = "out")) - println(result) - // 10 -> out -> 20 should not be in result. - (result \ "results").as[List[JsValue]].size must equalTo(1) - (result \\ "to").size must equalTo(1) - (result \\ "to").head.as[String] must equalTo("21") - - result = getEdges(query(20, direction = "in", columnName = testTgtColumnName)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(0) - - val jsResult = contentAsJson(EdgeController.mutateAndPublish(bulkEdges(startTs = deletedAt + 1), withWait = true)) - - result = getEdges(query(20, direction = "in", columnName = testTgtColumnName)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(3) - - true - - } - } - } - - - "labelargeSet of contention" should { - val labelName = testLabelName2 - val maxTgtId = 10 - val batchSize = 10 - val testNum = 3 - val numOfBatch = 10 - - def testInner(startTs: Long, src: Long) = { - val labelName = testLabelName2 - val lastOps = Array.fill(maxTgtId)("none") - var currentTs = startTs - - val allRequests = for { - ith <- (0 until numOfBatch) - jth <- (0 until batchSize) - } yield { - currentTs += 1 - - val tgt = Random.nextInt(maxTgtId) - val op = if (Random.nextDouble() < 0.5) "delete" else "update" - - lastOps(tgt) = op - Seq(currentTs, op, "e", src, src + tgt, labelName, "{}").mkString("\t") - } - - - allRequests.foreach(println(_)) -// println(lastOps.count(op => op != "delete" && op != "none")) -// println(lastOps) -// -// Thread.sleep(1000) - - val futures = Random.shuffle(allRequests).grouped(batchSize).map { bulkRequest => - val bulkEdge = bulkRequest.mkString("\n") - EdgeController.mutateAndPublish(bulkEdge, withWait = true) - } - - Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) - - val expectedDegree = lastOps.count(op => op != "delete" && op != "none") - val queryJson = query(id = src) - val result = getEdges(queryJson) - val resultSize = (result \ "size").as[Long] - val resultDegree = getDegree(result) - - println(lastOps.toList) - println(result) - - val ret = resultDegree == expectedDegree && resultSize == resultDegree - if (!ret) System.err.println(s"[Contention Failed]: $resultDegree, $expectedDegree") - - (ret, currentTs) - } - - "update delete" in { - running(FakeApplication()) { - val ret = for { - i <- (0 until testNum) - } yield { - val src = System.currentTimeMillis() - - val (ret, last) = testInner(i, src) - ret must beEqualTo(true) - ret - } - - ret.forall(identity) - } - } - - "update delete 2" in { - running(FakeApplication()) { - - val src = System.currentTimeMillis() - var ts = 0L - - val ret = for { - i <- (0 until testNum) - } yield { - val (ret, lastTs) = testInner(ts, src) - val deletedAt = lastTs + 1 - val deletedAt2 = lastTs + 2 - ts = deletedAt2 + 1 // nex start ts - - ret must beEqualTo(true) - - - val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt)) - val deleteAllRequest2 = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt2)) - - val deleteRet = EdgeController.deleteAllInner(deleteAllRequest, withWait = true) - val deleteRet2 = EdgeController.deleteAllInner(deleteAllRequest2, withWait = true) - - - Await.result(deleteRet, Duration(20, TimeUnit.MINUTES)) - Await.result(deleteRet2, Duration(20, TimeUnit.MINUTES)) - - val result = getEdges(query(id = src)) - println(result) - - val resultEdges = (result \ "results").as[Seq[JsValue]] - resultEdges.isEmpty must beEqualTo(true) - - val degreeAfterDeleteAll = getDegree(result) - degreeAfterDeleteAll must beEqualTo(0) - true - } - - ret.forall(identity) - } - } - - /** this test stress out test on degree - * when contention is low but number of adjacent edges are large */ - "large degrees" in { - running(FakeApplication()) { - - - val labelName = testLabelName2 - val dir = "out" - val maxSize = 100 - val deleteSize = 10 - val numOfConcurrentBatch = 100 - val src = System.currentTimeMillis() - val tgts = (0 until maxSize).map { ith => src + ith } - val deleteTgts = Random.shuffle(tgts).take(deleteSize) - val insertRequests = tgts.map { tgt => - Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t") - } - val deleteRequests = deleteTgts.take(deleteSize).map { tgt => - Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t") - } - val allRequests = Random.shuffle(insertRequests ++ deleteRequests) - // val allRequests = insertRequests ++ deleteRequests - val futures = allRequests.grouped(numOfConcurrentBatch).map { requests => - EdgeController.mutateAndPublish(requests.mkString("\n"), withWait = true) - } - Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) - - val expectedDegree = insertRequests.size - deleteRequests.size - val queryJson = query(id = src) - val result = getEdges(queryJson) - val resultSize = (result \ "size").as[Long] - val resultDegree = getDegree(result) - - // println(result) - - val ret = resultSize == expectedDegree && resultDegree == resultSize - println(s"[MaxSize]: $maxSize") - println(s"[DeleteSize]: $deleteSize") - println(s"[ResultDegree]: $resultDegree") - println(s"[ExpectedDegree]: $expectedDegree") - println(s"[ResultSize]: $resultSize") - ret must beEqualTo(true) - } - } - - "deleteAll" in { - running(FakeApplication()) { - - val labelName = testLabelName2 - val dir = "out" - val maxSize = 100 - val deleteSize = 10 - val numOfConcurrentBatch = 100 - val src = System.currentTimeMillis() - val tgts = (0 until maxSize).map { ith => src + ith } - val deleteTgts = Random.shuffle(tgts).take(deleteSize) - val insertRequests = tgts.map { tgt => - Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t") - } - val deleteRequests = deleteTgts.take(deleteSize).map { tgt => - Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t") - } - val allRequests = Random.shuffle(insertRequests ++ deleteRequests) - // val allRequests = insertRequests ++ deleteRequests - val futures = allRequests.grouped(numOfConcurrentBatch).map { requests => - EdgeController.mutateAndPublish(requests.mkString("\n"), withWait = true) - } - Await.result(Future.sequence(futures), Duration(10, TimeUnit.MINUTES)) - - val deletedAt = System.currentTimeMillis() - val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt)) - - Await.result(EdgeController.deleteAllInner(deleteAllRequest, withWait = true), Duration(10, TimeUnit.MINUTES)) - - val result = getEdges(query(id = src)) - println(result) - val resultEdges = (result \ "results").as[Seq[JsValue]] - resultEdges.isEmpty must beEqualTo(true) - - val degreeAfterDeleteAll = getDegree(result) - degreeAfterDeleteAll must beEqualTo(0) - } - } - } -} - +//package controllers +// +//import java.util.concurrent.TimeUnit +// +// +//import com.kakao.s2graph.core.utils.logger +//import play.api.libs.json._ +//import play.api.test.Helpers._ +//import play.api.test.{FakeApplication, FakeRequest} +// +//import scala.concurrent.duration.Duration +//import scala.concurrent.{Await, Future} +//import scala.util.Random +// +//class StrongLabelDeleteSpec extends SpecCommon { +// init() +//// implicit val timeout = Timeout(Duration(20, TimeUnit.MINUTES)) +// +// def bulkEdges(startTs: Int = 0) = Seq( +// Seq(startTs + 1, "insert", "e", "0", "1", testLabelName2, s"""{"time": 10}""").mkString("\t"), +// Seq(startTs + 2, "insert", "e", "0", "1", testLabelName2, s"""{"time": 11}""").mkString("\t"), +// Seq(startTs + 3, "insert", "e", "0", "1", testLabelName2, s"""{"time": 12}""").mkString("\t"), +// Seq(startTs + 4, "insert", "e", "0", "2", testLabelName2, s"""{"time": 10}""").mkString("\t"), +// Seq(startTs + 5, "insert", "e", "10", "20", testLabelName2, s"""{"time": 10}""").mkString("\t"), +// Seq(startTs + 6, "insert", "e", "10", "21", testLabelName2, s"""{"time": 11}""").mkString("\t"), +// Seq(startTs + 7, "insert", "e", "11", "20", testLabelName2, s"""{"time": 12}""").mkString("\t"), +// Seq(startTs + 8, "insert", "e", "12", "20", testLabelName2, s"""{"time": 13}""").mkString("\t") +// ).mkString("\n") +// +// def query(id: Long, serviceName: String = testServiceName, columnName: String = testColumnName, +// labelName: String = testLabelName2, direction: String = "out") = Json.parse( +// s""" +// { "srcVertices": [ +// { "serviceName": "$serviceName", +// "columnName": "$columnName", +// "id": $id +// }], +// "steps": [ +// [ { +// "label": "$labelName", +// "direction": "${direction}", +// "offset": 0, +// "limit": -1, +// "duplicate": "raw" +// } +// ]] +// }""") +// +// def getEdges(queryJson: JsValue): JsValue = { +//// implicit val timeout = Timeout(Duration(20, TimeUnit.MINUTES)) +// +// val ret = route(FakeRequest(POST, "/graphs/getEdges").withJsonBody(queryJson)).get +// contentAsJson(ret) +// } +// +// def getDegree(jsValue: JsValue): Long = { +// ((jsValue \ "degrees") \\ "_degree").headOption.map(_.as[Long]).getOrElse(0L) +// } +// +// +// "strong label delete test" should { +// running(FakeApplication()) { +// // insert bulk and wait .. +// val edges = bulkEdges() +// println(edges) +// val jsResult = contentAsJson(EdgeController.mutateAndPublish(edges, withWait = true)) +// } +// +// "test strong consistency select" in { +// running(FakeApplication()) { +// var result = getEdges(query(0)) +// println(result) +// (result \ "results").as[List[JsValue]].size must equalTo(2) +// result = getEdges(query(10)) +// println(result) +// (result \ "results").as[List[JsValue]].size must equalTo(2) +// true +// } +// } +// +// "test strong consistency duration. insert -> delete -> insert" in { +// running(FakeApplication()) { +// val ts0 = 1 +// val ts1 = 2 +// val ts2 = 3 +// +// val edges = Seq( +// Seq(5, "insert", "edge", "-10", "-20", testLabelName2).mkString("\t"), +// Seq(10, "delete", "edge", "-10", "-20", testLabelName2).mkString("\t"), +// Seq(20, "insert", "edge", "-10", "-20", testLabelName2).mkString("\t") +// ).mkString("\n") +// +// val jsResult = contentAsJson(EdgeController.mutateAndPublish(edges, withWait = true)) +// +// val result = getEdges(query(-10)) +// +// println(result) +// +// true +// } +// } +// +// "test strong consistency deleteAll" in { +// running(FakeApplication()) { +// +// val deletedAt = 100 +// var result = getEdges(query(20, direction = "in", columnName = testTgtColumnName)) +// println(result) +// (result \ "results").as[List[JsValue]].size must equalTo(3) +// +// val json = Json.arr(Json.obj("label" -> testLabelName2, +// "direction" -> "in", "ids" -> Json.arr("20"), "timestamp" -> deletedAt)) +// println(json) +// contentAsString(EdgeController.deleteAllInner(json, withWait = true)) +// +// result = getEdges(query(11, direction = "out")) +// println(result) +// (result \ "results").as[List[JsValue]].size must equalTo(0) +// +// result = getEdges(query(12, direction = "out")) +// println(result) +// (result \ "results").as[List[JsValue]].size must equalTo(0) +// +// result = getEdges(query(10, direction = "out")) +// println(result) +// // 10 -> out -> 20 should not be in result. +// (result \ "results").as[List[JsValue]].size must equalTo(1) +// (result \\ "to").size must equalTo(1) +// (result \\ "to").head.as[String] must equalTo("21") +// +// result = getEdges(query(20, direction = "in", columnName = testTgtColumnName)) +// println(result) +// (result \ "results").as[List[JsValue]].size must equalTo(0) +// +// val jsResult = contentAsJson(EdgeController.mutateAndPublish(bulkEdges(startTs = deletedAt + 1), withWait = true)) +// +// result = getEdges(query(20, direction = "in", columnName = testTgtColumnName)) +// println(result) +// (result \ "results").as[List[JsValue]].size must equalTo(3) +// +// true +// +// } +// } +// } +// +// +// "labelargeSet of contention" should { +// val labelName = testLabelName2 +// val maxTgtId = 10 +// val batchSize = 10 +// val testNum = 3 +// val numOfBatch = 10 +// +// def testInner(startTs: Long, src: Long) = { +// val labelName = testLabelName2 +// val lastOps = Array.fill(maxTgtId)("none") +// var currentTs = startTs +// +// val allRequests = for { +// ith <- (0 until numOfBatch) +// jth <- (0 until batchSize) +// } yield { +// currentTs += 1 +// +// val tgt = Random.nextInt(maxTgtId) +// val op = if (Random.nextDouble() < 0.5) "delete" else "update" +// +// lastOps(tgt) = op +// Seq(currentTs, op, "e", src, src + tgt, labelName, "{}").mkString("\t") +// } +// +// +// allRequests.foreach(println(_)) +//// println(lastOps.count(op => op != "delete" && op != "none")) +//// println(lastOps) +//// +//// Thread.sleep(1000) +// +// val futures = Random.shuffle(allRequests).grouped(batchSize).map { bulkRequest => +// val bulkEdge = bulkRequest.mkString("\n") +// EdgeController.mutateAndPublish(bulkEdge, withWait = true) +// } +// +// Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) +// +// val expectedDegree = lastOps.count(op => op != "delete" && op != "none") +// val queryJson = query(id = src) +// val result = getEdges(queryJson) +// val resultSize = (result \ "size").as[Long] +// val resultDegree = getDegree(result) +// +// println(lastOps.toList) +// println(result) +// +// val ret = resultDegree == expectedDegree && resultSize == resultDegree +// if (!ret) System.err.println(s"[Contention Failed]: $resultDegree, $expectedDegree") +// +// (ret, currentTs) +// } +// +// "update delete" in { +// running(FakeApplication()) { +// val ret = for { +// i <- (0 until testNum) +// } yield { +// val src = System.currentTimeMillis() +// +// val (ret, last) = testInner(i, src) +// ret must beEqualTo(true) +// ret +// } +// +// ret.forall(identity) +// } +// } +// +// "update delete 2" in { +// running(FakeApplication()) { +// +// val src = System.currentTimeMillis() +// var ts = 0L +// +// val ret = for { +// i <- (0 until testNum) +// } yield { +// val (ret, lastTs) = testInner(ts, src) +// val deletedAt = lastTs + 1 +// val deletedAt2 = lastTs + 2 +// ts = deletedAt2 + 1 // nex start ts +// +// ret must beEqualTo(true) +// +// +// val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt)) +// val deleteAllRequest2 = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt2)) +// +// val deleteRet = EdgeController.deleteAllInner(deleteAllRequest, withWait = true) +// val deleteRet2 = EdgeController.deleteAllInner(deleteAllRequest2, withWait = true) +// +// +// Await.result(deleteRet, Duration(20, TimeUnit.MINUTES)) +// Await.result(deleteRet2, Duration(20, TimeUnit.MINUTES)) +// +// val result = getEdges(query(id = src)) +// println(result) +// +// val resultEdges = (result \ "results").as[Seq[JsValue]] +// resultEdges.isEmpty must beEqualTo(true) +// +// val degreeAfterDeleteAll = getDegree(result) +// degreeAfterDeleteAll must beEqualTo(0) +// true +// } +// +// ret.forall(identity) +// } +// } +// +// /** this test stress out test on degree +// * when contention is low but number of adjacent edges are large */ +// "large degrees" in { +// running(FakeApplication()) { +// +// +// val labelName = testLabelName2 +// val dir = "out" +// val maxSize = 100 +// val deleteSize = 10 +// val numOfConcurrentBatch = 100 +// val src = System.currentTimeMillis() +// val tgts = (0 until maxSize).map { ith => src + ith } +// val deleteTgts = Random.shuffle(tgts).take(deleteSize) +// val insertRequests = tgts.map { tgt => +// Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t") +// } +// val deleteRequests = deleteTgts.take(deleteSize).map { tgt => +// Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t") +// } +// val allRequests = Random.shuffle(insertRequests ++ deleteRequests) +// // val allRequests = insertRequests ++ deleteRequests +// val futures = allRequests.grouped(numOfConcurrentBatch).map { requests => +// EdgeController.mutateAndPublish(requests.mkString("\n"), withWait = true) +// } +// Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) +// +// val expectedDegree = insertRequests.size - deleteRequests.size +// val queryJson = query(id = src) +// val result = getEdges(queryJson) +// val resultSize = (result \ "size").as[Long] +// val resultDegree = getDegree(result) +// +// // println(result) +// +// val ret = resultSize == expectedDegree && resultDegree == resultSize +// println(s"[MaxSize]: $maxSize") +// println(s"[DeleteSize]: $deleteSize") +// println(s"[ResultDegree]: $resultDegree") +// println(s"[ExpectedDegree]: $expectedDegree") +// println(s"[ResultSize]: $resultSize") +// ret must beEqualTo(true) +// } +// } +// +// "deleteAll" in { +// running(FakeApplication()) { +// +// val labelName = testLabelName2 +// val dir = "out" +// val maxSize = 100 +// val deleteSize = 10 +// val numOfConcurrentBatch = 100 +// val src = System.currentTimeMillis() +// val tgts = (0 until maxSize).map { ith => src + ith } +// val deleteTgts = Random.shuffle(tgts).take(deleteSize) +// val insertRequests = tgts.map { tgt => +// Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t") +// } +// val deleteRequests = deleteTgts.take(deleteSize).map { tgt => +// Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t") +// } +// val allRequests = Random.shuffle(insertRequests ++ deleteRequests) +// // val allRequests = insertRequests ++ deleteRequests +// val futures = allRequests.grouped(numOfConcurrentBatch).map { requests => +// EdgeController.mutateAndPublish(requests.mkString("\n"), withWait = true) +// } +// Await.result(Future.sequence(futures), Duration(10, TimeUnit.MINUTES)) +// +// val deletedAt = System.currentTimeMillis() +// val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt)) +// +// Await.result(EdgeController.deleteAllInner(deleteAllRequest, withWait = true), Duration(10, TimeUnit.MINUTES)) +// +// val result = getEdges(query(id = src)) +// println(result) +// val resultEdges = (result \ "results").as[Seq[JsValue]] +// resultEdges.isEmpty must beEqualTo(true) +// +// val degreeAfterDeleteAll = getDegree(result) +// degreeAfterDeleteAll must beEqualTo(0) +// } +// } +// } +//} +// http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/610d519d/s2rest_play/test/controllers/WeakLabelDeleteSpec.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/controllers/WeakLabelDeleteSpec.scala b/s2rest_play/test/controllers/WeakLabelDeleteSpec.scala index 164c52b..1497b85 100644 --- a/s2rest_play/test/controllers/WeakLabelDeleteSpec.scala +++ b/s2rest_play/test/controllers/WeakLabelDeleteSpec.scala @@ -1,126 +1,126 @@ -package controllers - -import play.api.libs.json._ -import play.api.test.Helpers._ -import play.api.test.{FakeApplication, FakeRequest} - -class WeakLabelDeleteSpec extends SpecCommon { - init() - - def bulkEdges(startTs: Int = 0) = Seq( - Seq(startTs + 1, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 10}""").mkString("\t"), - Seq(startTs + 2, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 11}""").mkString("\t"), - Seq(startTs + 3, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 12}""").mkString("\t"), - Seq(startTs + 4, "insert", "e", "0", "2", testLabelNameWeak, s"""{"time": 10}""").mkString("\t"), - Seq(startTs + 5, "insert", "e", "10", "20", testLabelNameWeak, s"""{"time": 10}""").mkString("\t"), - Seq(startTs + 6, "insert", "e", "10", "21", testLabelNameWeak, s"""{"time": 11}""").mkString("\t"), - Seq(startTs + 7, "insert", "e", "11", "20", testLabelNameWeak, s"""{"time": 12}""").mkString("\t"), - Seq(startTs + 8, "insert", "e", "12", "20", testLabelNameWeak, s"""{"time": 13}""").mkString("\t") - ).mkString("\n") - - "weak label delete test" should { - running(FakeApplication()) { - // insert bulk and wait .. - val jsResult = contentAsJson(EdgeController.mutateAndPublish(bulkEdges(), withWait = true)) - } - - - def query(id: Int, direction: String = "out", columnName: String = testColumnName) = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "$testServiceName", - "columnName": "$columnName", - "id": ${id} - }], - "steps": [ - [ { - "label": "${testLabelNameWeak}", - "direction": "${direction}", - "offset": 0, - "limit": 10, - "duplicate": "raw" - } - ]] - }""") - - def getEdges(queryJson: JsValue): JsValue = { - val ret = route(FakeRequest(POST, "/graphs/getEdges").withJsonBody(queryJson)).get - contentAsJson(ret) - } - - "test weak consistency select" in { - running(FakeApplication()) { - var result = getEdges(query(0)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(4) - result = getEdges(query(10)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(2) - true - } - } - - "test weak consistency delete" in { - running(FakeApplication()) { - var result = getEdges(query(0)) - println(result) - - /** expect 4 edges */ - (result \ "results").as[List[JsValue]].size must equalTo(4) - val edges = (result \ "results").as[List[JsObject]] - - contentAsJson(EdgeController.tryMutates(Json.toJson(edges), "delete", withWait = true)) - - /** expect noting */ - result = getEdges(query(0)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(0) - - /** insert should be ignored */ - contentAsJson(EdgeController.tryMutates(Json.toJson(edges), "insert", withWait = true)) - - result = getEdges(query(0)) - (result \ "results").as[List[JsValue]].size must equalTo(0) - } - } - - "test weak consistency deleteAll" in { - running(FakeApplication()) { - val deletedAt = 100 - var result = getEdges(query(20, "in", testTgtColumnName)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(3) - - val json = Json.arr(Json.obj("label" -> testLabelNameWeak, - "direction" -> "in", "ids" -> Json.arr("20"), "timestamp" -> deletedAt)) - println(json) - contentAsString(EdgeController.deleteAllInner(json, withWait = true)) - - - result = getEdges(query(11, "out")) - (result \ "results").as[List[JsValue]].size must equalTo(0) - - result = getEdges(query(12, "out")) - (result \ "results").as[List[JsValue]].size must equalTo(0) - - result = getEdges(query(10, "out")) - - // 10 -> out -> 20 should not be in result. - (result \ "results").as[List[JsValue]].size must equalTo(1) - (result \\ "to").size must equalTo(1) - (result \\ "to").head.as[String] must equalTo("21") - - result = getEdges(query(20, "in", testTgtColumnName)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(0) - - val jsResult = contentAsJson(EdgeController.mutateAndPublish(bulkEdges(startTs = deletedAt + 1), withWait = true)) - - result = getEdges(query(20, "in", testTgtColumnName)) - (result \ "results").as[List[JsValue]].size must equalTo(3) - } - } - } - -} - +//package controllers +// +//import play.api.libs.json._ +//import play.api.test.Helpers._ +//import play.api.test.{FakeApplication, FakeRequest} +// +//class WeakLabelDeleteSpec extends SpecCommon { +// init() +// +// def bulkEdges(startTs: Int = 0) = Seq( +// Seq(startTs + 1, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 10}""").mkString("\t"), +// Seq(startTs + 2, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 11}""").mkString("\t"), +// Seq(startTs + 3, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 12}""").mkString("\t"), +// Seq(startTs + 4, "insert", "e", "0", "2", testLabelNameWeak, s"""{"time": 10}""").mkString("\t"), +// Seq(startTs + 5, "insert", "e", "10", "20", testLabelNameWeak, s"""{"time": 10}""").mkString("\t"), +// Seq(startTs + 6, "insert", "e", "10", "21", testLabelNameWeak, s"""{"time": 11}""").mkString("\t"), +// Seq(startTs + 7, "insert", "e", "11", "20", testLabelNameWeak, s"""{"time": 12}""").mkString("\t"), +// Seq(startTs + 8, "insert", "e", "12", "20", testLabelNameWeak, s"""{"time": 13}""").mkString("\t") +// ).mkString("\n") +// +// "weak label delete test" should { +// running(FakeApplication()) { +// // insert bulk and wait .. +// val jsResult = contentAsJson(EdgeController.mutateAndPublish(bulkEdges(), withWait = true)) +// } +// +// +// def query(id: Int, direction: String = "out", columnName: String = testColumnName) = Json.parse( +// s""" +// { "srcVertices": [ +// { "serviceName": "$testServiceName", +// "columnName": "$columnName", +// "id": ${id} +// }], +// "steps": [ +// [ { +// "label": "${testLabelNameWeak}", +// "direction": "${direction}", +// "offset": 0, +// "limit": 10, +// "duplicate": "raw" +// } +// ]] +// }""") +// +// def getEdges(queryJson: JsValue): JsValue = { +// val ret = route(FakeRequest(POST, "/graphs/getEdges").withJsonBody(queryJson)).get +// contentAsJson(ret) +// } +// +// "test weak consistency select" in { +// running(FakeApplication()) { +// var result = getEdges(query(0)) +// println(result) +// (result \ "results").as[List[JsValue]].size must equalTo(4) +// result = getEdges(query(10)) +// println(result) +// (result \ "results").as[List[JsValue]].size must equalTo(2) +// true +// } +// } +// +// "test weak consistency delete" in { +// running(FakeApplication()) { +// var result = getEdges(query(0)) +// println(result) +// +// /** expect 4 edges */ +// (result \ "results").as[List[JsValue]].size must equalTo(4) +// val edges = (result \ "results").as[List[JsObject]] +// +// contentAsJson(EdgeController.tryMutates(Json.toJson(edges), "delete", withWait = true)) +// +// /** expect noting */ +// result = getEdges(query(0)) +// println(result) +// (result \ "results").as[List[JsValue]].size must equalTo(0) +// +// /** insert should be ignored */ +// contentAsJson(EdgeController.tryMutates(Json.toJson(edges), "insert", withWait = true)) +// +// result = getEdges(query(0)) +// (result \ "results").as[List[JsValue]].size must equalTo(0) +// } +// } +// +// "test weak consistency deleteAll" in { +// running(FakeApplication()) { +// val deletedAt = 100 +// var result = getEdges(query(20, "in", testTgtColumnName)) +// println(result) +// (result \ "results").as[List[JsValue]].size must equalTo(3) +// +// val json = Json.arr(Json.obj("label" -> testLabelNameWeak, +// "direction" -> "in", "ids" -> Json.arr("20"), "timestamp" -> deletedAt)) +// println(json) +// contentAsString(EdgeController.deleteAllInner(json, withWait = true)) +// +// +// result = getEdges(query(11, "out")) +// (result \ "results").as[List[JsValue]].size must equalTo(0) +// +// result = getEdges(query(12, "out")) +// (result \ "results").as[List[JsValue]].size must equalTo(0) +// +// result = getEdges(query(10, "out")) +// +// // 10 -> out -> 20 should not be in result. +// (result \ "results").as[List[JsValue]].size must equalTo(1) +// (result \\ "to").size must equalTo(1) +// (result \\ "to").head.as[String] must equalTo("21") +// +// result = getEdges(query(20, "in", testTgtColumnName)) +// println(result) +// (result \ "results").as[List[JsValue]].size must equalTo(0) +// +// val jsResult = contentAsJson(EdgeController.mutateAndPublish(bulkEdges(startTs = deletedAt + 1), withWait = true)) +// +// result = getEdges(query(20, "in", testTgtColumnName)) +// (result \ "results").as[List[JsValue]].size must equalTo(3) +// } +// } +// } +// +//} +//
