Repository: incubator-s2graph Updated Branches: refs/heads/master be304d6c9 -> f2311f25c
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/test/controllers/QuerySpec.scala ---------------------------------------------------------------------- diff --git a/test/controllers/QuerySpec.scala b/test/controllers/QuerySpec.scala deleted file mode 100644 index f6381bc..0000000 --- a/test/controllers/QuerySpec.scala +++ /dev/null @@ -1,604 +0,0 @@ -package controllers - -import play.api.libs.json._ -import play.api.test.{FakeApplication, FakeRequest, PlaySpecification} -import play.api.{Application => PlayApplication} - -import scala.concurrent.Await - -class QuerySpec extends SpecCommon with PlaySpecification { - - import Helper._ - - implicit val app = FakeApplication() - - init() - - "query test" should { - running(FakeApplication()) { - - // insert bulk and wait .. - val bulkEdges: String = Seq( - edge"1000 insert e 0 1 $testLabelName"($(weight = 40, is_hidden = true)), - edge"2000 insert e 0 2 $testLabelName"($(weight = 30, is_hidden = false)), - edge"3000 insert e 2 0 $testLabelName"($(weight = 20)), - edge"4000 insert e 2 1 $testLabelName"($(weight = 10)), - edge"3000 insert e 10 20 $testLabelName"($(weight = 20)), - edge"4000 insert e 20 20 $testLabelName"($(weight = 10)), - edge"1 insert e -1 1000 $testLabelName", - edge"1 insert e -1 2000 $testLabelName", - edge"1 insert e -1 3000 $testLabelName", - edge"1 insert e 1000 10000 $testLabelName", - edge"1 insert e 1000 11000 $testLabelName", - edge"1 insert e 2000 11000 $testLabelName", - edge"1 insert e 2000 12000 $testLabelName", - edge"1 insert e 3000 12000 $testLabelName", - edge"1 insert e 3000 13000 $testLabelName", - edge"1 insert e 10000 100000 $testLabelName", - edge"2 insert e 11000 200000 $testLabelName", - edge"3 insert e 12000 300000 $testLabelName").mkString("\n") - - val jsResult = contentAsJson(EdgeController.mutateAndPublish(bulkEdges, withWait = true)) - } - - def queryParents(id: Long) = Json.parse( s""" - { - "returnTree": true, - "srcVertices": [ - { "serviceName": "${testServiceName}", - "columnName": "${testColumnName}", - "id": ${id} - }], - "steps": [ - [ { - "label": "${testLabelName}", - "direction": "out", - "offset": 0, - "limit": 2 - } - ],[{ - "label": "${testLabelName}", - "direction": "in", - "offset": 0, - "limit": -1 - } - ]] - }""".stripMargin) - - def queryExclude(id: Int) = Json.parse( s""" - { "srcVertices": [ - { "serviceName": "${testServiceName}", - "columnName": "${testColumnName}", - "id": ${id} - }], - "steps": [ - [ { - "label": "${testLabelName}", - "direction": "out", - "offset": 0, - "limit": 2 - }, - { - "label": "${testLabelName}", - "direction": "in", - "offset": 0, - "limit": 2, - "exclude": true - } - ]] - }""") - - def queryTransform(id: Int, transforms: String) = Json.parse( s""" - { "srcVertices": [ - { "serviceName": "${testServiceName}", - "columnName": "${testColumnName}", - "id": ${id} - }], - "steps": [ - [ { - "label": "${testLabelName}", - "direction": "out", - "offset": 0, - "transform": $transforms - } - ]] - }""") - - def queryWhere(id: Int, where: String) = Json.parse( s""" - { "srcVertices": [ - { "serviceName": "${testServiceName}", - "columnName": "${testColumnName}", - "id": ${id} - }], - "steps": [ - [ { - "label": "${testLabelName}", - "direction": "out", - "offset": 0, - "limit": 100, - "where": "${where}" - } - ]] - }""") - - def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: Int) = Json.parse( s""" - { "srcVertices": [ - { "serviceName": "${testServiceName}", - "columnName": "${testColumnName}", - "id": ${id} - }], - "steps": [ - [ { - "label": "${testLabelName}", - "direction": "out", - "offset": $offset, - "limit": $limit, - "_to": $to - } - ]] - } - """) - - def querySingle(id: Int, offset: Int = 0, limit: Int = 100) = Json.parse( s""" - { "srcVertices": [ - { "serviceName": "${testServiceName}", - "columnName": "${testColumnName}", - "id": ${id} - }], - "steps": [ - [ { - "label": "${testLabelName}", - "direction": "out", - "offset": $offset, - "limit": $limit - } - ]] - } - """) - - def queryWithSampling(id: Int, sample: Int) = Json.parse( s""" - { "srcVertices": [ - { "serviceName": "${testServiceName}", - "columnName": "${testColumnName}", - "id": ${id} - }], - "steps": [ - { - "step": [{ - "label": "${testLabelName}", - "direction": "out", - "offset": 0, - "limit": 100, - "sample": ${sample} - }] - } - ] - }""") - - - def twoStepQueryWithSampling(id: Int, sample: Int) = Json.parse( s""" - { "srcVertices": [ - { "serviceName": "${testServiceName}", - "columnName": "${testColumnName}", - "id": ${id} - }], - "steps": [ - { - "step": [{ - "label": "${testLabelName}", - "direction": "out", - "offset": 0, - "limit": 100, - "sample": ${sample} - }] - }, - { - "step": [{ - "label": "${testLabelName}", - "direction": "out", - "offset": 0, - "limit": 100, - "sample": ${sample} - }] - } - ] - }""") - - def twoQueryWithSampling(id: Int, sample: Int) = Json.parse( s""" - { "srcVertices": [ - { "serviceName": "${testServiceName}", - "columnName": "${testColumnName}", - "id": ${id} - }], - "steps": [ - { - "step": [{ - "label": "${testLabelName}", - "direction": "out", - "offset": 0, - "limit": 50, - "sample": ${sample} - }, - { - "label": "${testLabelName2}", - "direction": "out", - "offset": 0, - "limit": 50 - }] - } - ] - }""") - - def queryUnion(id: Int, size: Int) = JsArray(List.tabulate(size)(_ => querySingle(id))) - - def queryGroupBy(id: Int, props: Seq[String]): JsValue = { - Json.obj( - "groupBy" -> props, - "srcVertices" -> Json.arr( - Json.obj("serviceName" -> testServiceName, "columnName" -> testColumnName, "id" -> id) - ), - "steps" -> Json.arr( - Json.obj( - "step" -> Json.arr( - Json.obj( - "label" -> testLabelName - ) - ) - ) - ) - ) - } - - def queryScore(id: Int, scoring: Map[String, Int]): JsValue = { - val q = Json.obj( - "srcVertices" -> Json.arr( - Json.obj( - "serviceName" -> testServiceName, - "columnName" -> testColumnName, - "id" -> id - ) - ), - "steps" -> Json.arr( - Json.obj( - "step" -> Json.arr( - Json.obj( - "label" -> testLabelName, - "scoring" -> scoring - ) - ) - ) - ) - ) - println(q) - q - } - - def queryOrderBy(id: Int, scoring: Map[String, Int], props: Seq[Map[String, String]]): JsValue = { - Json.obj( - "orderBy" -> props, - "srcVertices" -> Json.arr( - Json.obj("serviceName" -> testServiceName, "columnName" -> testColumnName, "id" -> id) - ), - "steps" -> Json.arr( - Json.obj( - "step" -> Json.arr( - Json.obj( - "label" -> testLabelName, - "scoring" -> scoring - ) - ) - ) - ) - ) - } - - def getEdges(queryJson: JsValue): JsValue = { - val ret = route(FakeRequest(POST, "/graphs/getEdges").withJsonBody(queryJson)).get - contentAsJson(ret) - } - - def queryIndex(ids: Seq[Int], indexName: String) = { - val $from = $a( - $(serviceName = testServiceName, - columnName = testColumnName, - ids = ids)) - - val $step = $a($(label = testLabelName, index = indexName)) - val $steps = $a($(step = $step)) - - val js = $(withScore = false, srcVertices = $from, steps = $steps).toJson - js - } - - def queryDuration(ids: Seq[Int], from: Int, to: Int) = { - val $from = $a( - $(serviceName = testServiceName, - columnName = testColumnName, - ids = ids)) - - val $step = $a($( - label = testLabelName, direction = "out", offset = 0, limit = 100, - duration = $(from = from, to = to))) - - val $steps = $a($(step = $step)) - - $(srcVertices = $from, steps = $steps).toJson - } - - "union query" in { - running(FakeApplication()) { - var result = getEdges(queryUnion(0, 2)) - result.as[List[JsValue]].size must equalTo(2) - - result = getEdges(queryUnion(0, 3)) - result.as[List[JsValue]].size must equalTo(3) - - result = getEdges(queryUnion(0, 4)) - result.as[List[JsValue]].size must equalTo(4) - - result = getEdges(queryUnion(0, 5)) - result.as[List[JsValue]].size must equalTo(5) - - val union = result.as[List[JsValue]].head - val single = getEdges(querySingle(0)) - - (union \\ "from").map(_.toString).sorted must equalTo((single \\ "from").map(_.toString).sorted) - (union \\ "to").map(_.toString).sorted must equalTo((single \\ "to").map(_.toString).sorted) - (union \\ "weight").map(_.toString).sorted must equalTo((single \\ "weight").map(_.toString).sorted) - } - } - - "get edge with where condition" in { - running(FakeApplication()) { - var result = getEdges(queryWhere(0, "is_hidden=false and _from in (-1, 0)")) - (result \ "results").as[List[JsValue]].size must equalTo(1) - - result = getEdges(queryWhere(0, "is_hidden=true and _to in (1)")) - (result \ "results").as[List[JsValue]].size must equalTo(1) - - result = getEdges(queryWhere(0, "_from=0")) - (result \ "results").as[List[JsValue]].size must equalTo(2) - - result = getEdges(queryWhere(2, "_from=2 or weight in (-1)")) - (result \ "results").as[List[JsValue]].size must equalTo(2) - - result = getEdges(queryWhere(2, "_from=2 and weight in (10, 20)")) - (result \ "results").as[List[JsValue]].size must equalTo(2) - } - } - - "get edge exclude" in { - running(FakeApplication()) { - val result = getEdges(queryExclude(0)) - (result \ "results").as[List[JsValue]].size must equalTo(1) - } - } - - "get edge groupBy property" in { - running(FakeApplication()) { - val result = getEdges(queryGroupBy(0, Seq("weight"))) - (result \ "size").as[Int] must_== 2 - val weights = (result \\ "groupBy").map { js => - (js \ "weight").as[Int] - } - weights must contain(exactly(30, 40)) - weights must not contain (10) - } - } - - "edge transform " in { - running(FakeApplication()) { - var result = getEdges(queryTransform(0, "[[\"_to\"]]")) - (result \ "results").as[List[JsValue]].size must equalTo(2) - - result = getEdges(queryTransform(0, "[[\"weight\"]]")) - (result \\ "to").map(_.toString).sorted must equalTo((result \\ "weight").map(_.toString).sorted) - - result = getEdges(queryTransform(0, "[[\"_from\"]]")) - val results = (result \ "results").as[JsValue] - (result \\ "to").map(_.toString).sorted must equalTo((results \\ "from").map(_.toString).sorted) - } - } - - "index" in { - running(FakeApplication()) { - // weight order - var result = getEdges(queryIndex(Seq(0), "idx_1")) - ((result \ "results").as[List[JsValue]].head \\ "weight").head must equalTo(JsNumber(40)) - - // timestamp order - result = getEdges(queryIndex(Seq(0), "idx_2")) - ((result \ "results").as[List[JsValue]].head \\ "weight").head must equalTo(JsNumber(30)) - } - } - - "checkEdges" in { - running(FakeApplication()) { - val json = Json.parse( s""" - [{"from": 0, "to": 1, "label": "$testLabelName"}, - {"from": 0, "to": 2, "label": "$testLabelName"}] - """) - - def checkEdges(queryJson: JsValue): JsValue = { - val ret = route(FakeRequest(POST, "/graphs/checkEdges").withJsonBody(queryJson)).get - contentAsJson(ret) - } - - val res = checkEdges(json) - val typeRes = res.isInstanceOf[JsArray] - typeRes must equalTo(true) - - val fst = res.as[Seq[JsValue]].head \ "to" - fst.as[Int] must equalTo(1) - - val snd = res.as[Seq[JsValue]].last \ "to" - snd.as[Int] must equalTo(2) - } - } - - "duration" in { - running(FakeApplication()) { - // get all - var result = getEdges(queryDuration(Seq(0, 2), from = 0, to = 5000)) - (result \ "results").as[List[JsValue]].size must equalTo(4) - - // inclusive, exclusive - result = getEdges(queryDuration(Seq(0, 2), from = 1000, to = 4000)) - (result \ "results").as[List[JsValue]].size must equalTo(3) - - result = getEdges(queryDuration(Seq(0, 2), from = 1000, to = 2000)) - (result \ "results").as[List[JsValue]].size must equalTo(1) - - val bulkEdges: String = Seq( - edge"1001 insert e 0 1 $testLabelName"($(weight = 10, is_hidden = true)), - edge"2002 insert e 0 2 $testLabelName"($(weight = 20, is_hidden = false)), - edge"3003 insert e 2 0 $testLabelName"($(weight = 30)), - edge"4004 insert e 2 1 $testLabelName"($(weight = 40)) - ).mkString("\n") - - val jsResult = contentAsJson(EdgeController.mutateAndPublish(bulkEdges, withWait = true)) - // duration test after udpate - // get all - result = getEdges(queryDuration(Seq(0, 2), from = 0, to = 5000)) - (result \ "results").as[List[JsValue]].size must equalTo(4) - - // inclusive, exclusive - result = getEdges(queryDuration(Seq(0, 2), from = 1000, to = 4000)) - (result \ "results").as[List[JsValue]].size must equalTo(3) - - result = getEdges(queryDuration(Seq(0, 2), from = 1000, to = 2000)) - (result \ "results").as[List[JsValue]].size must equalTo(1) - true - } - } - - "returnTree" in { - running(FakeApplication()) { - val src = 100 - val tgt = 200 - val labelName = testLabelName - - val bulkEdges: String = Seq( - edge"1001 insert e $src $tgt $labelName" - ).mkString("\n") - - val jsResult = contentAsJson(EdgeController.mutateAndPublish(bulkEdges, withWait = true)) - - val result = getEdges(queryParents(src)) - - val parents = (result \ "results").as[Seq[JsValue]] - val ret = parents.forall { edge => (edge \ "parents").as[Seq[JsValue]].size == 1 } - ret must equalTo(true) - } - } - - "pagination and _to" in { - running(FakeApplication()) { - val src = System.currentTimeMillis().toInt - val labelName = testLabelName - val bulkEdges: String = Seq( - edge"1001 insert e $src 1 $labelName"($(weight = 10, is_hidden = true)), - edge"2002 insert e $src 2 $labelName"($(weight = 20, is_hidden = false)), - edge"3003 insert e $src 3 $labelName"($(weight = 30)), - edge"4004 insert e $src 4 $labelName"($(weight = 40)) - ).mkString("\n") - - val jsResult = contentAsJson(EdgeController.mutateAndPublish(bulkEdges, withWait = true)) - - var result = getEdges(querySingle(src, offset = 0, limit = 2)) - println(result) - var edges = (result \ "results").as[List[JsValue]] - edges.size must equalTo(2) - (edges(0) \ "to").as[Long] must beEqualTo(4) - (edges(1) \ "to").as[Long] must beEqualTo(3) - - result = getEdges(querySingle(src, offset = 1, limit = 2)) - println(result) - edges = (result \ "results").as[List[JsValue]] - edges.size must equalTo(2) - (edges(0) \ "to").as[Long] must beEqualTo(3) - (edges(1) \ "to").as[Long] must beEqualTo(2) - - result = getEdges(querySingleWithTo(src, offset = 0, limit = -1, to = 1)) - println(result) - edges = (result \ "results").as[List[JsValue]] - edges.size must equalTo(1) - } - } - - "orderBy" >> { - running(FakeApplication()) { - // insert test set - val bulkEdges: String = Seq( - edge"1001 insert e 0 1 $testLabelName"($(weight = 10, is_hidden = true)), - edge"2002 insert e 0 2 $testLabelName"($(weight = 20, is_hidden = false)), - edge"3003 insert e 2 0 $testLabelName"($(weight = 30)), - edge"4004 insert e 2 1 $testLabelName"($(weight = 40)) - ).mkString("\n") - contentAsJson(EdgeController.mutateAndPublish(bulkEdges, withWait = true)) - - // get edges - val edges = getEdges(queryScore(0, Map("weight" -> 1))) - val orderByScore = getEdges(queryOrderBy(0, Map("weight" -> 1), Seq(Map("score" -> "DESC", "timestamp" -> "DESC")))) - val ascOrderByScore = getEdges(queryOrderBy(0, Map("weight" -> 1), Seq(Map("score" -> "ASC", "timestamp" -> "DESC")))) - - println(edges) - println(orderByScore) - println(ascOrderByScore) - - val edgesTo = edges \ "results" \\ "to" - val orderByTo = orderByScore \ "results" \\ "to" - val ascOrderByTo = ascOrderByScore \ "results" \\ "to" - - edgesTo must_== Seq(JsNumber(2), JsNumber(1)) - edgesTo must_== orderByTo - ascOrderByTo must_== Seq(JsNumber(1), JsNumber(2)) - edgesTo.reverse must_== ascOrderByTo - } - } - - "query with sampling" in { - running(FakeApplication()) { - val sampleSize = 2 - val testId = 22 - val bulkEdges = Seq( - edge"1442985659166 insert e $testId 122 $testLabelName", - edge"1442985659166 insert e $testId 222 $testLabelName", - edge"1442985659166 insert e $testId 322 $testLabelName", - - edge"1442985659166 insert e $testId 922 $testLabelName2", - edge"1442985659166 insert e $testId 222 $testLabelName2", - edge"1442985659166 insert e $testId 322 $testLabelName2", - - edge"1442985659166 insert e 122 1122 $testLabelName", - edge"1442985659166 insert e 122 1222 $testLabelName", - edge"1442985659166 insert e 122 1322 $testLabelName", - edge"1442985659166 insert e 222 2122 $testLabelName", - edge"1442985659166 insert e 222 2222 $testLabelName", - edge"1442985659166 insert e 222 2322 $testLabelName", - edge"1442985659166 insert e 322 3122 $testLabelName", - edge"1442985659166 insert e 322 3222 $testLabelName", - edge"1442985659166 insert e 322 3322 $testLabelName" - ) - - val req = FakeRequest(POST, "/graphs/edges/bulk").withBody(bulkEdges.mkString("\n")) - Await.result(route(req).get, HTTP_REQ_WAITING_TIME) - - Thread.sleep(asyncFlushInterval) - - - val result1 = getEdges(queryWithSampling(testId, sampleSize)) - println(Json.toJson(result1)) - (result1 \ "results").as[List[JsValue]].size must equalTo(scala.math.min(sampleSize, bulkEdges.size)) - - val result2 = getEdges(twoStepQueryWithSampling(testId, sampleSize)) - println(Json.toJson(result2)) - (result2 \ "results").as[List[JsValue]].size must equalTo(scala.math.min(sampleSize * sampleSize, bulkEdges.size * bulkEdges.size)) - - val result3 = getEdges(twoQueryWithSampling(testId, sampleSize)) - println(Json.toJson(result3)) - (result3 \ "results").as[List[JsValue]].size must equalTo(sampleSize + 3) // edges in testLabelName2 = 3 - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/test/controllers/SpecCommon.scala ---------------------------------------------------------------------- diff --git a/test/controllers/SpecCommon.scala b/test/controllers/SpecCommon.scala deleted file mode 100644 index 93648ce..0000000 --- a/test/controllers/SpecCommon.scala +++ /dev/null @@ -1,365 +0,0 @@ -package controllers - -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls._ -import org.specs2.mutable.Specification -import play.api.libs.json._ -import play.api.test.FakeApplication -import play.api.test.Helpers._ - -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.util.Random - -trait SpecCommon extends Specification { - sequential - object Helper { - - import org.json4s.native.Serialization - - type KV = Map[String, Any] - - import scala.language.dynamics - - def $aa[T](args: T*) = List($a(args: _ *)) - - def $a[T](args: T*) = args.toList - - object $ extends Dynamic { - def applyDynamicNamed(name: String)(args: (String, Any)*): Map[String, Any] = args.toMap - } - - implicit class anyMapOps(map: Map[String, Any]) { - def toJson: JsValue = { - val js = Serialization.write(map)(org.json4s.DefaultFormats) - Json.parse(js) - } - } - - implicit class S2Context(val sc: StringContext) { - def edge(args: Any*)(implicit map: Map[String, Any] = Map.empty): String = { - val parts = sc.s(args: _*).split("\\s") - assert(parts.length == 6) - (parts.toList :+ map.toJson.toString).mkString("\t") - } - } - - } - - val curTime = System.currentTimeMillis - val t1 = curTime + 0 - val t2 = curTime + 1 - val t3 = curTime + 2 - val t4 = curTime + 3 - val t5 = curTime + 4 - - protected val testServiceName = "s2graph" - protected val testLabelName = "s2graph_label_test" - protected val testLabelName2 = "s2graph_label_test_2" - protected val testLabelNameV1 = "s2graph_label_test_v1" - protected val testLabelNameWeak = "s2graph_label_test_weak" - protected val testColumnName = "user_id_test" - protected val testColumnType = "long" - protected val testTgtColumnName = "item_id_test" - protected val testHTableName = "test-htable" - protected val newHTableName = "new-htable" - - val NUM_OF_EACH_TEST = 100 - val HTTP_REQ_WAITING_TIME = Duration(300, SECONDS) - val asyncFlushInterval = 100 - - val createService = s"""{"serviceName" : "$testServiceName"}""" - val testLabelNameCreate = - s""" - { - "label": "$testLabelName", - "srcServiceName": "$testServiceName", - "srcColumnName": "$testColumnName", - "srcColumnType": "long", - "tgtServiceName": "$testServiceName", - "tgtColumnName": "$testColumnName", - "tgtColumnType": "long", - "indices": [ - {"name": "idx_1", "propNames": ["weight", "time", "is_hidden", "is_blocked"]}, - {"name": "idx_2", "propNames": ["_timestamp"]} - ], - "props": [ - { - "name": "time", - "dataType": "long", - "defaultValue": 0 - }, - { - "name": "weight", - "dataType": "long", - "defaultValue": 0 - }, - { - "name": "is_hidden", - "dataType": "boolean", - "defaultValue": false - }, - { - "name": "is_blocked", - "dataType": "boolean", - "defaultValue": false - } - ], - "consistencyLevel": "strong", - "schemaVersion": "v2", - "compressionAlgorithm": "gz", - "hTableName": "$testHTableName" - }""" - - val testLabelName2Create = - s""" - { - "label": "$testLabelName2", - "srcServiceName": "$testServiceName", - "srcColumnName": "$testColumnName", - "srcColumnType": "long", - "tgtServiceName": "$testServiceName", - "tgtColumnName": "$testTgtColumnName", - "tgtColumnType": "string", - "indices": [{"name": "idx_1", "propNames": ["time", "weight", "is_hidden", "is_blocked"]}], - "props": [ - { - "name": "time", - "dataType": "long", - "defaultValue": 0 - }, - { - "name": "weight", - "dataType": "long", - "defaultValue": 0 - }, - { - "name": "is_hidden", - "dataType": "boolean", - "defaultValue": false - }, - { - "name": "is_blocked", - "dataType": "boolean", - "defaultValue": false - } - ], - "consistencyLevel": "strong", - "isDirected": false, - "schemaVersion": "v3", - "compressionAlgorithm": "gz" - }""" - - val testLabelNameV1Create = - s""" - { - "label": "$testLabelNameV1", - "srcServiceName": "$testServiceName", - "srcColumnName": "$testColumnName", - "srcColumnType": "long", - "tgtServiceName": "$testServiceName", - "tgtColumnName": "${testTgtColumnName}_v1", - "tgtColumnType": "string", - "indices": [{"name": "idx_1", "propNames": ["time", "weight", "is_hidden", "is_blocked"]}], - "props": [ - { - "name": "time", - "dataType": "long", - "defaultValue": 0 - }, - { - "name": "weight", - "dataType": "long", - "defaultValue": 0 - }, - { - "name": "is_hidden", - "dataType": "boolean", - "defaultValue": false - }, - { - "name": "is_blocked", - "dataType": "boolean", - "defaultValue": false - } - ], - "consistencyLevel": "strong", - "isDirected": true, - "schemaVersion": "v1", - "compressionAlgorithm": "gz" - }""" - val testLabelNameWeakCreate = - s""" - { - "label": "$testLabelNameWeak", - "srcServiceName": "$testServiceName", - "srcColumnName": "$testColumnName", - "srcColumnType": "long", - "tgtServiceName": "$testServiceName", - "tgtColumnName": "$testTgtColumnName", - "tgtColumnType": "string", - "indices": [{"name": "idx_1", "propNames": ["time", "weight", "is_hidden", "is_blocked"]}], - "props": [ - { - "name": "time", - "dataType": "long", - "defaultValue": 0 - }, - { - "name": "weight", - "dataType": "long", - "defaultValue": 0 - }, - { - "name": "is_hidden", - "dataType": "boolean", - "defaultValue": false - }, - { - "name": "is_blocked", - "dataType": "boolean", - "defaultValue": false - } - ], - "consistencyLevel": "weak", - "isDirected": true, - "compressionAlgorithm": "gz" - }""" - - val vertexPropsKeys = List( - ("age", "int") - ) - - val createVertex = - s"""{ - "serviceName": "$testServiceName", - "columnName": "$testColumnName", - "columnType": "$testColumnType", - "props": [ - {"name": "is_active", "dataType": "boolean", "defaultValue": true}, - {"name": "phone_number", "dataType": "string", "defaultValue": "-"}, - {"name": "nickname", "dataType": "string", "defaultValue": ".."}, - {"name": "activity_score", "dataType": "float", "defaultValue": 0.0}, - {"name": "age", "dataType": "integer", "defaultValue": 0} - ] - }""" - - - val TS = System.currentTimeMillis() - - def queryJson(serviceName: String, columnName: String, labelName: String, id: String, dir: String, cacheTTL: Long = -1L) = { - val s = - s"""{ - "srcVertices": [ - { - "serviceName": "$serviceName", - "columnName": "$columnName", - "id": $id - } - ], - "steps": [ - [ - { - "label": "$labelName", - "direction": "$dir", - "offset": 0, - "limit": 10, - "cacheTTL": $cacheTTL - } - ] - ] - }""" - println(s) - Json.parse(s) - } - - def checkEdgeQueryJson(params: Seq[(String, String, String, String)]) = { - val arr = for { - (label, dir, from, to) <- params - } yield { - Json.obj("label" -> label, "direction" -> dir, "from" -> from, "to" -> to) - } - - val s = Json.toJson(arr) - println(s) - s - } - - def vertexQueryJson(serviceName: String, columnName: String, ids: Seq[Int]) = { - Json.parse( - s""" - |[ - |{"serviceName": "$serviceName", "columnName": "$columnName", "ids": [${ids.mkString(",")} - ]} - |] - """.stripMargin) - } - - def randomProps() = { - (for { - (propKey, propType) <- vertexPropsKeys - } yield { - propKey -> Random.nextInt(100) - }).toMap - } - - def vertexInsertsPayload(serviceName: String, columnName: String, ids: Seq[Int]): Seq[JsValue] = { - ids.map { id => - Json.obj("id" -> id, "props" -> randomProps, "timestamp" -> System.currentTimeMillis()) - } - } - - def commonCheck(rslt: Future[play.api.mvc.Result]): JsValue = { - status(rslt) must equalTo(OK) - contentType(rslt) must beSome.which(_ == "application/json") - val jsRslt = contentAsJson(rslt) - println("======") - println(jsRslt) - println("======") - jsRslt.as[JsObject].keys.contains("size") must equalTo(true) - (jsRslt \ "size").as[Int] must greaterThan(0) - jsRslt.as[JsObject].keys.contains("results") must equalTo(true) - val jsRsltsObj = jsRslt \ "results" - jsRsltsObj.as[JsArray].value(0).as[JsObject].keys.contains("from") must equalTo(true) - jsRsltsObj.as[JsArray].value(0).as[JsObject].keys.contains("to") must equalTo(true) - jsRsltsObj.as[JsArray].value(0).as[JsObject].keys.contains("_timestamp") must equalTo(true) - jsRsltsObj.as[JsArray].value(0).as[JsObject].keys.contains("props") must equalTo(true) - jsRslt - } - - def init() = { - running(FakeApplication()) { - println("[init start]: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") - Management.deleteService(testServiceName) - - // 1. createService - val result = AdminController.createServiceInner(Json.parse(createService)) - println(s">> Service created : $createService, $result") - - val labelNames = Map(testLabelName -> testLabelNameCreate, - testLabelName2 -> testLabelName2Create, - testLabelNameV1 -> testLabelNameV1Create, - testLabelNameWeak -> testLabelNameWeakCreate) - - for { - (labelName, create) <- labelNames - } { - Management.deleteLabel(labelName) - Label.findByName(labelName, useCache = false) match { - case None => - AdminController.createLabelInner(Json.parse(create)) - case Some(label) => - println(s">> Label already exist: $create, $label") - } - } - - // 5. create vertex - vertexPropsKeys.map { case (key, keyType) => - Management.addVertexProp(testServiceName, testColumnName, key, keyType) - } - - println("[init end]: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/test/controllers/StrongLabelDeleteSpec.scala ---------------------------------------------------------------------- diff --git a/test/controllers/StrongLabelDeleteSpec.scala b/test/controllers/StrongLabelDeleteSpec.scala deleted file mode 100644 index 04ccc79..0000000 --- a/test/controllers/StrongLabelDeleteSpec.scala +++ /dev/null @@ -1,345 +0,0 @@ -package controllers - -import java.util.concurrent.TimeUnit - - -import com.kakao.s2graph.core.utils.logger -import play.api.libs.json._ -import play.api.test.Helpers._ -import play.api.test.{FakeApplication, FakeRequest} - -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} -import scala.util.Random - -class StrongLabelDeleteSpec extends SpecCommon { - init() -// implicit val timeout = Timeout(Duration(20, TimeUnit.MINUTES)) - - def bulkEdges(startTs: Int = 0) = Seq( - Seq(startTs + 1, "insert", "e", "0", "1", testLabelName2, s"""{"time": 10}""").mkString("\t"), - Seq(startTs + 2, "insert", "e", "0", "1", testLabelName2, s"""{"time": 11}""").mkString("\t"), - Seq(startTs + 3, "insert", "e", "0", "1", testLabelName2, s"""{"time": 12}""").mkString("\t"), - Seq(startTs + 4, "insert", "e", "0", "2", testLabelName2, s"""{"time": 10}""").mkString("\t"), - Seq(startTs + 5, "insert", "e", "10", "20", testLabelName2, s"""{"time": 10}""").mkString("\t"), - Seq(startTs + 6, "insert", "e", "10", "21", testLabelName2, s"""{"time": 11}""").mkString("\t"), - Seq(startTs + 7, "insert", "e", "11", "20", testLabelName2, s"""{"time": 12}""").mkString("\t"), - Seq(startTs + 8, "insert", "e", "12", "20", testLabelName2, s"""{"time": 13}""").mkString("\t") - ).mkString("\n") - - def query(id: Long, serviceName: String = testServiceName, columnName: String = testColumnName, - labelName: String = testLabelName2, direction: String = "out") = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "$serviceName", - "columnName": "$columnName", - "id": $id - }], - "steps": [ - [ { - "label": "$labelName", - "direction": "${direction}", - "offset": 0, - "limit": -1, - "duplicate": "raw" - } - ]] - }""") - - def getEdges(queryJson: JsValue): JsValue = { -// implicit val timeout = Timeout(Duration(20, TimeUnit.MINUTES)) - - val ret = route(FakeRequest(POST, "/graphs/getEdges").withJsonBody(queryJson)).get - contentAsJson(ret) - } - - def getDegree(jsValue: JsValue): Long = { - ((jsValue \ "degrees") \\ "_degree").headOption.map(_.as[Long]).getOrElse(0L) - } - - - "strong label delete test" should { - running(FakeApplication()) { - // insert bulk and wait .. - val edges = bulkEdges() - println(edges) - val jsResult = contentAsJson(EdgeController.mutateAndPublish(edges, withWait = true)) - } - - "test strong consistency select" in { - running(FakeApplication()) { - var result = getEdges(query(0)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(2) - result = getEdges(query(10)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(2) - true - } - } - - "test strong consistency duration. insert -> delete -> insert" in { - running(FakeApplication()) { - val ts0 = 1 - val ts1 = 2 - val ts2 = 3 - - val edges = Seq( - Seq(5, "insert", "edge", "-10", "-20", testLabelName2).mkString("\t"), - Seq(10, "delete", "edge", "-10", "-20", testLabelName2).mkString("\t"), - Seq(20, "insert", "edge", "-10", "-20", testLabelName2).mkString("\t") - ).mkString("\n") - - val jsResult = contentAsJson(EdgeController.mutateAndPublish(edges, withWait = true)) - - val result = getEdges(query(-10)) - - println(result) - - true - } - } - - "test strong consistency deleteAll" in { - running(FakeApplication()) { - - val deletedAt = 100 - var result = getEdges(query(20, direction = "in", columnName = testTgtColumnName)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(3) - - val json = Json.arr(Json.obj("label" -> testLabelName2, - "direction" -> "in", "ids" -> Json.arr("20"), "timestamp" -> deletedAt)) - println(json) - contentAsString(EdgeController.deleteAllInner(json, withWait = true)) - - result = getEdges(query(11, direction = "out")) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(0) - - result = getEdges(query(12, direction = "out")) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(0) - - result = getEdges(query(10, direction = "out")) - println(result) - // 10 -> out -> 20 should not be in result. - (result \ "results").as[List[JsValue]].size must equalTo(1) - (result \\ "to").size must equalTo(1) - (result \\ "to").head.as[String] must equalTo("21") - - result = getEdges(query(20, direction = "in", columnName = testTgtColumnName)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(0) - - val jsResult = contentAsJson(EdgeController.mutateAndPublish(bulkEdges(startTs = deletedAt + 1), withWait = true)) - - result = getEdges(query(20, direction = "in", columnName = testTgtColumnName)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(3) - - true - - } - } - } - - - "labelargeSet of contention" should { - val labelName = testLabelName2 - val maxTgtId = 10 - val batchSize = 10 - val testNum = 3 - val numOfBatch = 10 - - def testInner(startTs: Long, src: Long) = { - val labelName = testLabelName2 - val lastOps = Array.fill(maxTgtId)("none") - var currentTs = startTs - - val allRequests = for { - ith <- (0 until numOfBatch) - jth <- (0 until batchSize) - } yield { - currentTs += 1 - - val tgt = Random.nextInt(maxTgtId) - val op = if (Random.nextDouble() < 0.5) "delete" else "update" - - lastOps(tgt) = op - Seq(currentTs, op, "e", src, src + tgt, labelName, "{}").mkString("\t") - } - - - allRequests.foreach(println(_)) -// println(lastOps.count(op => op != "delete" && op != "none")) -// println(lastOps) -// -// Thread.sleep(1000) - - val futures = Random.shuffle(allRequests).grouped(batchSize).map { bulkRequest => - val bulkEdge = bulkRequest.mkString("\n") - EdgeController.mutateAndPublish(bulkEdge, withWait = true) - } - - Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) - - val expectedDegree = lastOps.count(op => op != "delete" && op != "none") - val queryJson = query(id = src) - val result = getEdges(queryJson) - val resultSize = (result \ "size").as[Long] - val resultDegree = getDegree(result) - - println(lastOps.toList) - println(result) - - val ret = resultDegree == expectedDegree && resultSize == resultDegree - if (!ret) System.err.println(s"[Contention Failed]: $resultDegree, $expectedDegree") - - (ret, currentTs) - } - - "update delete" in { - running(FakeApplication()) { - val ret = for { - i <- (0 until testNum) - } yield { - val src = System.currentTimeMillis() - - val (ret, last) = testInner(i, src) - ret must beEqualTo(true) - ret - } - - ret.forall(identity) - } - } - - "update delete 2" in { - running(FakeApplication()) { - - val src = System.currentTimeMillis() - var ts = 0L - - val ret = for { - i <- (0 until testNum) - } yield { - val (ret, lastTs) = testInner(ts, src) - val deletedAt = lastTs + 1 - val deletedAt2 = lastTs + 2 - ts = deletedAt2 + 1 // nex start ts - - ret must beEqualTo(true) - - - val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt)) - val deleteAllRequest2 = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt2)) - - val deleteRet = EdgeController.deleteAllInner(deleteAllRequest, withWait = true) - val deleteRet2 = EdgeController.deleteAllInner(deleteAllRequest2, withWait = true) - - - Await.result(deleteRet, Duration(20, TimeUnit.MINUTES)) - Await.result(deleteRet2, Duration(20, TimeUnit.MINUTES)) - - val result = getEdges(query(id = src)) - println(result) - - val resultEdges = (result \ "results").as[Seq[JsValue]] - resultEdges.isEmpty must beEqualTo(true) - - val degreeAfterDeleteAll = getDegree(result) - degreeAfterDeleteAll must beEqualTo(0) - true - } - - ret.forall(identity) - } - } - - /** this test stress out test on degree - * when contention is low but number of adjacent edges are large */ - "large degrees" in { - running(FakeApplication()) { - - - val labelName = testLabelName2 - val dir = "out" - val maxSize = 100 - val deleteSize = 10 - val numOfConcurrentBatch = 100 - val src = System.currentTimeMillis() - val tgts = (0 until maxSize).map { ith => src + ith } - val deleteTgts = Random.shuffle(tgts).take(deleteSize) - val insertRequests = tgts.map { tgt => - Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t") - } - val deleteRequests = deleteTgts.take(deleteSize).map { tgt => - Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t") - } - val allRequests = Random.shuffle(insertRequests ++ deleteRequests) - // val allRequests = insertRequests ++ deleteRequests - val futures = allRequests.grouped(numOfConcurrentBatch).map { requests => - EdgeController.mutateAndPublish(requests.mkString("\n"), withWait = true) - } - Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) - - val expectedDegree = insertRequests.size - deleteRequests.size - val queryJson = query(id = src) - val result = getEdges(queryJson) - val resultSize = (result \ "size").as[Long] - val resultDegree = getDegree(result) - - // println(result) - - val ret = resultSize == expectedDegree && resultDegree == resultSize - println(s"[MaxSize]: $maxSize") - println(s"[DeleteSize]: $deleteSize") - println(s"[ResultDegree]: $resultDegree") - println(s"[ExpectedDegree]: $expectedDegree") - println(s"[ResultSize]: $resultSize") - ret must beEqualTo(true) - } - } - - "deleteAll" in { - running(FakeApplication()) { - - val labelName = testLabelName2 - val dir = "out" - val maxSize = 100 - val deleteSize = 10 - val numOfConcurrentBatch = 100 - val src = System.currentTimeMillis() - val tgts = (0 until maxSize).map { ith => src + ith } - val deleteTgts = Random.shuffle(tgts).take(deleteSize) - val insertRequests = tgts.map { tgt => - Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t") - } - val deleteRequests = deleteTgts.take(deleteSize).map { tgt => - Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t") - } - val allRequests = Random.shuffle(insertRequests ++ deleteRequests) - // val allRequests = insertRequests ++ deleteRequests - val futures = allRequests.grouped(numOfConcurrentBatch).map { requests => - EdgeController.mutateAndPublish(requests.mkString("\n"), withWait = true) - } - Await.result(Future.sequence(futures), Duration(10, TimeUnit.MINUTES)) - - val deletedAt = System.currentTimeMillis() - val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt)) - - Await.result(EdgeController.deleteAllInner(deleteAllRequest, withWait = true), Duration(10, TimeUnit.MINUTES)) - - val result = getEdges(query(id = src)) - println(result) - val resultEdges = (result \ "results").as[Seq[JsValue]] - resultEdges.isEmpty must beEqualTo(true) - - val degreeAfterDeleteAll = getDegree(result) - degreeAfterDeleteAll must beEqualTo(0) - } - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/test/controllers/VertexSpec.scala ---------------------------------------------------------------------- diff --git a/test/controllers/VertexSpec.scala b/test/controllers/VertexSpec.scala deleted file mode 100644 index f427b57..0000000 --- a/test/controllers/VertexSpec.scala +++ /dev/null @@ -1,39 +0,0 @@ -package controllers - -import play.api.libs.json._ -import play.api.test.FakeApplication -import play.api.test.Helpers._ - -class VertexSpec extends SpecCommon { - // init() - - "vetex tc" should { - "tc1" in { - - running(FakeApplication()) { - val ids = (7 until 20).map(tcNum => tcNum * 1000 + 0) - - val (serviceName, columnName) = (testServiceName, testColumnName) - - val data = vertexInsertsPayload(serviceName, columnName, ids) - val payload = Json.parse(Json.toJson(data).toString) - println(payload) - - val jsResult = contentAsString(VertexController.tryMutates(payload, "insert", - Option(serviceName), Option(columnName), withWait = true)) - - val query = vertexQueryJson(serviceName, columnName, ids) - val ret = contentAsJson(QueryController.getVerticesInner(query)) - println(">>>", ret) - val fetched = ret.as[Seq[JsValue]] - for { - (d, f) <- data.zip(fetched) - } yield { - (d \ "id") must beEqualTo((f \ "id")) - ((d \ "props") \ "age") must beEqualTo(((f \ "props") \ "age")) - } - } - true - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/test/controllers/WeakLabelDeleteSpec.scala ---------------------------------------------------------------------- diff --git a/test/controllers/WeakLabelDeleteSpec.scala b/test/controllers/WeakLabelDeleteSpec.scala deleted file mode 100644 index 164c52b..0000000 --- a/test/controllers/WeakLabelDeleteSpec.scala +++ /dev/null @@ -1,126 +0,0 @@ -package controllers - -import play.api.libs.json._ -import play.api.test.Helpers._ -import play.api.test.{FakeApplication, FakeRequest} - -class WeakLabelDeleteSpec extends SpecCommon { - init() - - def bulkEdges(startTs: Int = 0) = Seq( - Seq(startTs + 1, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 10}""").mkString("\t"), - Seq(startTs + 2, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 11}""").mkString("\t"), - Seq(startTs + 3, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 12}""").mkString("\t"), - Seq(startTs + 4, "insert", "e", "0", "2", testLabelNameWeak, s"""{"time": 10}""").mkString("\t"), - Seq(startTs + 5, "insert", "e", "10", "20", testLabelNameWeak, s"""{"time": 10}""").mkString("\t"), - Seq(startTs + 6, "insert", "e", "10", "21", testLabelNameWeak, s"""{"time": 11}""").mkString("\t"), - Seq(startTs + 7, "insert", "e", "11", "20", testLabelNameWeak, s"""{"time": 12}""").mkString("\t"), - Seq(startTs + 8, "insert", "e", "12", "20", testLabelNameWeak, s"""{"time": 13}""").mkString("\t") - ).mkString("\n") - - "weak label delete test" should { - running(FakeApplication()) { - // insert bulk and wait .. - val jsResult = contentAsJson(EdgeController.mutateAndPublish(bulkEdges(), withWait = true)) - } - - - def query(id: Int, direction: String = "out", columnName: String = testColumnName) = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "$testServiceName", - "columnName": "$columnName", - "id": ${id} - }], - "steps": [ - [ { - "label": "${testLabelNameWeak}", - "direction": "${direction}", - "offset": 0, - "limit": 10, - "duplicate": "raw" - } - ]] - }""") - - def getEdges(queryJson: JsValue): JsValue = { - val ret = route(FakeRequest(POST, "/graphs/getEdges").withJsonBody(queryJson)).get - contentAsJson(ret) - } - - "test weak consistency select" in { - running(FakeApplication()) { - var result = getEdges(query(0)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(4) - result = getEdges(query(10)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(2) - true - } - } - - "test weak consistency delete" in { - running(FakeApplication()) { - var result = getEdges(query(0)) - println(result) - - /** expect 4 edges */ - (result \ "results").as[List[JsValue]].size must equalTo(4) - val edges = (result \ "results").as[List[JsObject]] - - contentAsJson(EdgeController.tryMutates(Json.toJson(edges), "delete", withWait = true)) - - /** expect noting */ - result = getEdges(query(0)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(0) - - /** insert should be ignored */ - contentAsJson(EdgeController.tryMutates(Json.toJson(edges), "insert", withWait = true)) - - result = getEdges(query(0)) - (result \ "results").as[List[JsValue]].size must equalTo(0) - } - } - - "test weak consistency deleteAll" in { - running(FakeApplication()) { - val deletedAt = 100 - var result = getEdges(query(20, "in", testTgtColumnName)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(3) - - val json = Json.arr(Json.obj("label" -> testLabelNameWeak, - "direction" -> "in", "ids" -> Json.arr("20"), "timestamp" -> deletedAt)) - println(json) - contentAsString(EdgeController.deleteAllInner(json, withWait = true)) - - - result = getEdges(query(11, "out")) - (result \ "results").as[List[JsValue]].size must equalTo(0) - - result = getEdges(query(12, "out")) - (result \ "results").as[List[JsValue]].size must equalTo(0) - - result = getEdges(query(10, "out")) - - // 10 -> out -> 20 should not be in result. - (result \ "results").as[List[JsValue]].size must equalTo(1) - (result \\ "to").size must equalTo(1) - (result \\ "to").head.as[String] must equalTo("21") - - result = getEdges(query(20, "in", testTgtColumnName)) - println(result) - (result \ "results").as[List[JsValue]].size must equalTo(0) - - val jsResult = contentAsJson(EdgeController.mutateAndPublish(bulkEdges(startTs = deletedAt + 1), withWait = true)) - - result = getEdges(query(20, "in", testTgtColumnName)) - (result \ "results").as[List[JsValue]].size must equalTo(3) - } - } - } - -} -
